1from typing import Optional, Union 

2 

3import numpy as np 

4import sympy as sp 

5 

6from pystencils.fd import Diff 

7from pystencils.fd.derivative import diff_args 

8from pystencils.fd.spatial import fd_stencils_standard 

9from pystencils.field import Field 

10from pystencils.simp.assignment_collection import AssignmentCollection 

11from pystencils.sympyextensions import fast_subs 

12 

13FieldOrFieldAccess = Union[Field, Field.Access] 

14 

15 

16# --------------------------------------- Advection Diffusion ---------------------------------------------------------- 

17 

18 

19def diffusion(scalar, diffusion_coeff, idx=None): 

20 """Diffusion term ∇·( diffusion_coeff · ∇(scalar)) 

21 

22 Examples: 

23 >>> f = Field.create_generic('f', spatial_dimensions=2) 

24 >>> d = sp.Symbol("d") 

25 >>> dx = sp.Symbol("dx") 

26 >>> diffusion_term = diffusion(scalar=f, diffusion_coeff=d) 

27 >>> discretization = Discretization2ndOrder() 

28 >>> expected_output = ((f[-1, 0] + f[0, -1] - 4 * f[0, 0] + f[0, 1] + f[1, 0]) * d) / dx**2 

29 >>> sp.simplify(discretization(diffusion_term) - expected_output) 

30 0 

31 """ 

32 if isinstance(scalar, Field): 

33 first_arg = scalar.center 

34 elif isinstance(scalar, Field.Access): 

35 first_arg = scalar 

36 else: 

37 raise ValueError("Diffused scalar has to be a pystencils Field or Field.Access") 

38 

39 args = [first_arg, diffusion_coeff if not isinstance(diffusion_coeff, Field) else diffusion_coeff.center] 

40 if idx is not None: 

41 args.append(idx) 

42 return Diffusion(*args) 

43 

44 

45def advection(advected_scalar: FieldOrFieldAccess, velocity_field: FieldOrFieldAccess, idx: Optional[int] = None): 

46 """Advection term ∇·(velocity_field · advected_scalar) 

47 

48 Term that describes the advection of a scalar quantity in a velocity field. 

49 """ 

50 if isinstance(advected_scalar, Field): 

51 first_arg = advected_scalar.center 

52 elif isinstance(advected_scalar, Field.Access): 

53 first_arg = advected_scalar 

54 else: 

55 raise ValueError("Advected scalar has to be a pystencils Field or Field.Access") 

56 

57 args = [first_arg, velocity_field if not isinstance(velocity_field, Field) else velocity_field.center] 

58 if idx is not None: 

59 args.append(idx) 

60 return Advection(*args) 

61 

62 

63def transient(scalar, idx=None): 

64 """Transient term ∂_t(scalar)""" 

65 if isinstance(scalar, Field): 

66 args = [scalar.center] 

67 elif isinstance(scalar, Field.Access): 

68 args = [scalar] 

69 else: 

70 raise ValueError("Scalar has to be a pystencils Field or Field.Access") 

71 if idx is not None: 

72 args.append(idx) 

73 return Transient(*args) 

74 

75 

76class Discretization2ndOrder: 

77 def __init__(self, dx=sp.Symbol("dx"), dt=sp.Symbol("dt"), discretization_stencil_func=fd_stencils_standard): 

78 self.dx = dx 

79 self.dt = dt 

80 self.spatial_stencil = discretization_stencil_func 

81 

82 def _discretize_diffusion(self, e): 

83 result = 0 

84 for c in range(e.dim): 

85 first_diffs = [offset 

86 * (e.diffusion_scalar_at_offset(c, offset) * e.diffusion_coefficient_at_offset(c, offset) 

87 - e.diffusion_scalar_at_offset(0, 0) * e.diffusion_coefficient_at_offset(0, 0)) 

88 for offset in [-1, 1]] 

89 result += first_diffs[1] - first_diffs[0] 

90 return result / (self.dx ** 2) 

91 

92 def _discretize_advection(self, expr): 

93 result = 0 

94 for c in range(expr.dim): 

95 interpolated = [(expr.advected_scalar_at_offset(c, offset) * expr.velocity_field_at_offset(c, offset, c) 

96 + expr.advected_scalar_at_offset(c, 0) * expr.velocity_field_at_offset(c, 0, c)) / 2 

97 for offset in [-1, 1]] 

98 result += interpolated[1] - interpolated[0] 

99 return result / self.dx 

100 

101 def _discretize_spatial(self, e): 

102 if isinstance(e, Diffusion): 

103 return self._discretize_diffusion(e) 

104 elif isinstance(e, Advection): 

105 return self._discretize_advection(e) 

106 elif isinstance(e, Diff): 

107 arg, *indices = diff_args(e) 

108 from pystencils.interpolation_astnodes import InterpolatorAccess 

109 

110 if not isinstance(arg, (Field.Access, InterpolatorAccess)): 

111 raise ValueError("Only derivatives with field or field accesses as arguments can be discretized") 

112 return self.spatial_stencil(indices, self.dx, arg) 

113 else: 

114 new_args = [self._discretize_spatial(a) for a in e.args] 

115 return e.func(*new_args) if new_args else e 

116 

117 def __call__(self, expr): 

118 if isinstance(expr, list): 

119 return [self(e) for e in expr] 

120 elif isinstance(expr, sp.Matrix) or isinstance(expr, sp.ImmutableDenseMatrix): 

121 return expr.applyfunc(self.__call__) 

122 elif isinstance(expr, AssignmentCollection): 

123 return expr.copy(main_assignments=[e for e in expr.main_assignments], 

124 subexpressions=[e for e in expr.subexpressions]) 

125 

126 transient_terms = expr.atoms(Transient) 

127 if len(transient_terms) == 0: 

128 return self._discretize_spatial(expr) 

129 elif len(transient_terms) == 1: 

130 transient_term = transient_terms.pop() 

131 solve_result = sp.solve(expr, transient_term) 

132 if len(solve_result) != 1: 

133 raise ValueError("Could not solve for transient term" + str(solve_result)) 

134 rhs = solve_result.pop() 

135 # explicit euler 

136 return transient_term.scalar + self.dt * self._discretize_spatial(rhs) 

137 else: 

138 print(transient_terms) 

139 raise NotImplementedError("Cannot discretize expression with more than one transient term") 

140 

141 

142# -------------------------------------- Helper Classes ---------------------------------------------------------------- 

143 

144class Advection(sp.Function): 

145 

146 @property 

147 def scalar(self): 

148 return self.args[0].field 

149 

150 @property 

151 def vector(self): 

152 if isinstance(self.args[1], Field.Access): 

153 return self.args[1].field 

154 else: 

155 return self.args[1] 

156 

157 @property 

158 def scalar_index(self): 

159 return None if len(self.args) <= 2 else int(self.args[2]) 

160 

161 @property 

162 def dim(self): 

163 return self.scalar.spatial_dimensions 

164 

165 def _latex(self, printer): 

166 name_suffix = f"_{self.scalar_index}" if self.scalar_index is not None else "" 

167 if isinstance(self.vector, Field): 

168 return r"\nabla \cdot(%s %s)" % (printer.doprint(sp.Symbol(self.vector.name)), 

169 printer.doprint(sp.Symbol(self.scalar.name + name_suffix))) 

170 else: 

171 args = [r"\partial_%d(%s %s)" % (i, printer.doprint(sp.Symbol(self.scalar.name + name_suffix)), 

172 printer.doprint(self.vector[i])) 

173 for i in range(self.dim)] 

174 return " + ".join(args) 

175 

176 # --- Interface for discretization strategy 

177 

178 def velocity_field_at_offset(self, offset_dim, offset_value, index): 

179 v = self.vector 

180 if isinstance(v, Field): 

181 assert v.index_dimensions == 1 

182 return v.neighbor(offset_dim, offset_value)(index) 

183 else: 

184 return v[index] 

185 

186 def advected_scalar_at_offset(self, offset_dim, offset_value): 

187 idx = 0 if self.scalar_index is None else int(self.scalar_index) 

188 return self.scalar.neighbor(offset_dim, offset_value)(idx) 

189 

190 

191class Diffusion(sp.Function): 

192 

193 @property 

194 def scalar(self): 

195 return self.args[0].field 

196 

197 @property 

198 def diffusion_coeff(self): 

199 if isinstance(self.args[1], Field.Access): 

200 return self.args[1].field 

201 else: 

202 return self.args[1] 

203 

204 @property 

205 def scalar_index(self): 

206 return None if len(self.args) <= 2 else int(self.args[2]) 

207 

208 @property 

209 def dim(self): 

210 return self.scalar.spatial_dimensions 

211 

212 def _latex(self, printer): 

213 name_suffix = f"_{self.scalar_index}" if self.scalar_index is not None else "" 

214 coeff = self.diffusion_coeff 

215 diff_coeff = sp.Symbol(coeff.name) if isinstance(coeff, Field) else coeff 

216 return r"div(%s \nabla %s)" % (printer.doprint(diff_coeff), 

217 printer.doprint(sp.Symbol(self.scalar.name + name_suffix))) 

218 

219 # --- Interface for discretization strategy 

220 

221 def diffusion_scalar_at_offset(self, offset_dim, offset_value): 

222 idx = 0 if self.scalar_index is None else self.scalar_index 

223 return self.scalar.neighbor(offset_dim, offset_value)(idx) 

224 

225 def diffusion_coefficient_at_offset(self, offset_dim, offset_value): 

226 d = self.diffusion_coeff 

227 if isinstance(d, Field): 

228 return d.neighbor(offset_dim, offset_value) 

229 else: 

230 return d 

231 

232 

233class Transient(sp.Function): 

234 @property 

235 def scalar(self): 

236 if self.scalar_index is None: 

237 return self.args[0].field.center 

238 else: 

239 return self.args[0].field(self.scalar_index) 

240 

241 @property 

242 def scalar_index(self): 

243 return None if len(self.args) <= 1 else int(self.args[1]) 

244 

245 def _latex(self, printer): 

246 name_suffix = f"_{self.scalar_index}" if self.scalar_index is not None else "" 

247 return r"\partial_t %s" % (printer.doprint(sp.Symbol(self.scalar.name + name_suffix)),) 

248 

249 

250# -------------------------------------------- Deprecated Functions ---------------------------------------------------- 

251 

252 

253def grad(var, dim=3): 

254 r""" 

255 Gradients are represented as a special symbol: 

256 e.g. :math:`\nabla x = (x^{\Delta 0}, x^{\Delta 1}, x^{\Delta 2})` 

257 

258 This function takes a symbol and creates the gradient symbols according to convention above 

259 

260 Args: 

261 var: symbol to take the gradient of 

262 dim: dimension (length) of the gradient vector 

263 """ 

264 if hasattr(var, "__getitem__"): 

265 return [[sp.Symbol("%s^Delta^%d" % (v.name, i)) for v in var] for i in range(dim)] 

266 else: 

267 return [sp.Symbol("%s^Delta^%d" % (var.name, i)) for i in range(dim)] 

268 

269 

270def discretize_center(term, symbols_to_field_dict, dx, dim=3): 

271 """ 

272 Expects term that contains given symbols and gradient components of these symbols and replaces them 

273 by field accesses. Gradients are replaced by centralized approximations: 

274 ``(upper neighbor - lower neighbor ) / ( 2*dx)`` 

275 

276 Args: 

277 term: term where symbols and gradient(symbol) should be replaced 

278 symbols_to_field_dict: mapping of symbols to Field 

279 dx: width and height of one cell 

280 dim: dimension 

281 

282 Example: 

283 >>> x = sp.Symbol("x") 

284 >>> grad_x = grad(x, dim=3) 

285 >>> term = x * grad_x[0] 

286 >>> term 

287 x*x^Delta^0 

288 >>> f = Field.create_generic('f', spatial_dimensions=3) 

289 >>> expected_output = f[0, 0, 0] * (-f[-1, 0, 0]/2 + f[1, 0, 0]/2) 

290 >>> sp.simplify(discretize_center(term, { x: f }, dx=1, dim=3) - expected_output) 

291 0 

292 """ 

293 substitutions = {} 

294 for symbols, field in symbols_to_field_dict.items(): 

295 if not hasattr(symbols, "__getitem__"): 

296 symbols = [symbols] 

297 g = grad(symbols, dim) 

298 substitutions.update({symbol: field(i) for i, symbol in enumerate(symbols)}) 

299 for d in range(dim): 

300 up, down = __up_down_offsets(d, dim) 

301 substitutions.update({g[d][i]: (field[up](i) - field[down](i)) / dx / 2 for i in range(len(symbols))}) 

302 return fast_subs(term, substitutions) 

303 

304 

305def discretize_staggered(term, symbols_to_field_dict, coordinate, coordinate_offset, dx, dim=3): 

306 """ 

307 Expects term that contains given symbols and gradient components of these symbols and replaces them 

308 by field accesses. Gradients in coordinate direction are replaced by staggered version at cell boundary. 

309 Symbols themselves and gradients in other directions are replaced by interpolated version at cell face. 

310 

311 Args: 

312 term: input term where symbols and gradients are replaced 

313 symbols_to_field_dict: mapping of symbols to Field 

314 coordinate: id for coordinate (0 for x, 1 for y, ... ) defining cell boundary. 

315 Only gradients in this direction are replaced e.g. if symbol^Delta^coordinate 

316 coordinate_offset: either +1 or -1 for upper or lower face in coordinate direction 

317 dx: width and height of one cell 

318 dim: dimension 

319 

320 Examples: 

321 Discretizing at right/east face of cell i.e. coordinate=0, offset=1) 

322 >>> x, dx = sp.symbols("x dx") 

323 >>> grad_x = grad(x, dim=3) 

324 >>> term = x * grad_x[0] 

325 >>> term 

326 x*x^Delta^0 

327 >>> f = Field.create_generic('f', spatial_dimensions=3) 

328 >>> discretize_staggered(term, symbols_to_field_dict={ x: f}, dx=dx, coordinate=0, coordinate_offset=1, dim=3) 

329 (-f_C + f_E)*(f_C/2 + f_E/2)/dx 

330 """ 

331 assert coordinate_offset == 1 or coordinate_offset == -1 

332 assert 0 <= coordinate < dim 

333 

334 substitutions = {} 

335 for symbols, field in symbols_to_field_dict.items(): 

336 if not hasattr(symbols, "__getitem__"): 

337 symbols = [symbols] 

338 

339 offset = [0] * dim 

340 offset[coordinate] = coordinate_offset 

341 offset = np.array(offset, dtype=int) 

342 

343 gradient = grad(symbols)[coordinate] 

344 substitutions.update({s: (field[offset](i) + field(i)) / 2 for i, s in enumerate(symbols)}) 

345 substitutions.update({g: (field[offset](i) - field(i)) / dx * coordinate_offset 

346 for i, g in enumerate(gradient)}) 

347 for d in range(dim): 

348 if d == coordinate: 

349 continue 

350 up, down = __up_down_offsets(d, dim) 

351 for i, s in enumerate(symbols): 

352 center_grad = (field[up](i) - field[down](i)) / (2 * dx) 

353 neighbor_grad = (field[up + offset](i) - field[down + offset](i)) / (2 * dx) 

354 substitutions[grad(s)[d]] = (center_grad + neighbor_grad) / 2 

355 

356 return fast_subs(term, substitutions) 

357 

358 

359def discretize_divergence(vector_term, symbols_to_field_dict, dx): 

360 """ 

361 Computes discrete divergence of symbolic vector 

362 

363 Args: 

364 vector_term: sequence of terms, interpreted as vector 

365 symbols_to_field_dict: mapping of symbols to Field 

366 dx: length of a cell 

367 

368 Examples: 

369 Laplace stencil 

370 >>> x, dx = sp.symbols("x dx") 

371 >>> grad_x = grad(x, dim=3) 

372 >>> f = Field.create_generic('f', spatial_dimensions=3) 

373 >>> expected_output = (f[-1, 0, 0] + f[0, -1, 0] + f[0, 0, -1] - 

374 ... 6*f[0, 0, 0] + f[0, 0, 1] + f[0, 1, 0] + f[1, 0, 0])/dx**2 

375 >>> sp.simplify(discretize_divergence(grad_x, {x : f}, dx) - expected_output) 

376 0 

377 """ 

378 dim = len(vector_term) 

379 result = 0 

380 for d in range(dim): 

381 for offset in [-1, 1]: 

382 result += offset * discretize_staggered(vector_term[d], symbols_to_field_dict, d, offset, dx, dim) 

383 return result / dx 

384 

385 

386def __up_down_offsets(d, dim): 

387 coord = [0] * dim 

388 coord[d] = 1 

389 up = np.array(coord, dtype=int) 

390 coord[d] = -1 

391 down = np.array(coord, dtype=int) 

392 return up, down