1from typing import Optional, Union

3import numpy as np

4import sympy as sp

6from pystencils.fd import Diff

7from pystencils.fd.derivative import diff_args

8from pystencils.fd.spatial import fd_stencils_standard

9from pystencils.field import Field

10from pystencils.simp.assignment_collection import AssignmentCollection

11from pystencils.sympyextensions import fast_subs

13FieldOrFieldAccess = Union[Field, Field.Access]

19def diffusion(scalar, diffusion_coeff, idx=None):

20 """Diffusion term ∇·( diffusion_coeff · ∇(scalar))

22 Examples:

23 >>> f = Field.create_generic('f', spatial_dimensions=2)

24 >>> d = sp.Symbol("d")

25 >>> dx = sp.Symbol("dx")

26 >>> diffusion_term = diffusion(scalar=f, diffusion_coeff=d)

27 >>> discretization = Discretization2ndOrder()

28 >>> expected_output = ((f[-1, 0] + f[0, -1] - 4 * f[0, 0] + f[0, 1] + f[1, 0]) * d) / dx**2

29 >>> sp.simplify(discretization(diffusion_term) - expected_output)

30 0

31 """

32 if isinstance(scalar, Field):

33 first_arg = scalar.center

34 elif isinstance(scalar, Field.Access):

35 first_arg = scalar

36 else:

37 raise ValueError("Diffused scalar has to be a pystencils Field or Field.Access")

39 args = [first_arg, diffusion_coeff if not isinstance(diffusion_coeff, Field) else diffusion_coeff.center]

40 if idx is not None:

41 args.append(idx)

42 return Diffusion(*args)

48 Term that describes the advection of a scalar quantity in a velocity field.

49 """

54 else:

55 raise ValueError("Advected scalar has to be a pystencils Field or Field.Access")

57 args = [first_arg, velocity_field if not isinstance(velocity_field, Field) else velocity_field.center]

58 if idx is not None:

59 args.append(idx)

63def transient(scalar, idx=None):

64 """Transient term ∂_t(scalar)"""

65 if isinstance(scalar, Field):

66 args = [scalar.center]

67 elif isinstance(scalar, Field.Access):

68 args = [scalar]

69 else:

70 raise ValueError("Scalar has to be a pystencils Field or Field.Access")

71 if idx is not None:

72 args.append(idx)

73 return Transient(*args)

76class Discretization2ndOrder:

77 def __init__(self, dx=sp.Symbol("dx"), dt=sp.Symbol("dt"), discretization_stencil_func=fd_stencils_standard):

78 self.dx = dx

79 self.dt = dt

80 self.spatial_stencil = discretization_stencil_func

82 def _discretize_diffusion(self, e):

83 result = 0

84 for c in range(e.dim):

85 first_diffs = [offset

86 * (e.diffusion_scalar_at_offset(c, offset) * e.diffusion_coefficient_at_offset(c, offset)

87 - e.diffusion_scalar_at_offset(0, 0) * e.diffusion_coefficient_at_offset(0, 0))

88 for offset in [-1, 1]]

89 result += first_diffs[1] - first_diffs[0]

90 return result / (self.dx ** 2)

93 result = 0

94 for c in range(expr.dim):

95 interpolated = [(expr.advected_scalar_at_offset(c, offset) * expr.velocity_field_at_offset(c, offset, c)

96 + expr.advected_scalar_at_offset(c, 0) * expr.velocity_field_at_offset(c, 0, c)) / 2

97 for offset in [-1, 1]]

98 result += interpolated[1] - interpolated[0]

99 return result / self.dx

101 def _discretize_spatial(self, e):

102 if isinstance(e, Diffusion):

103 return self._discretize_diffusion(e)

106 elif isinstance(e, Diff):

107 arg, *indices = diff_args(e)

108 from pystencils.interpolation_astnodes import InterpolatorAccess

110 if not isinstance(arg, (Field.Access, InterpolatorAccess)):

111 raise ValueError("Only derivatives with field or field accesses as arguments can be discretized")

112 return self.spatial_stencil(indices, self.dx, arg)

113 else:

114 new_args = [self._discretize_spatial(a) for a in e.args]

115 return e.func(*new_args) if new_args else e

117 def __call__(self, expr):

118 if isinstance(expr, list):

119 return [self(e) for e in expr]

120 elif isinstance(expr, sp.Matrix) or isinstance(expr, sp.ImmutableDenseMatrix):

121 return expr.applyfunc(self.__call__)

122 elif isinstance(expr, AssignmentCollection):

123 return expr.copy(main_assignments=[e for e in expr.main_assignments],

124 subexpressions=[e for e in expr.subexpressions])

126 transient_terms = expr.atoms(Transient)

127 if len(transient_terms) == 0:

128 return self._discretize_spatial(expr)

129 elif len(transient_terms) == 1:

130 transient_term = transient_terms.pop()

131 solve_result = sp.solve(expr, transient_term)

132 if len(solve_result) != 1:

133 raise ValueError("Could not solve for transient term" + str(solve_result))

134 rhs = solve_result.pop()

135 # explicit euler

136 return transient_term.scalar + self.dt * self._discretize_spatial(rhs)

137 else:

138 print(transient_terms)

139 raise NotImplementedError("Cannot discretize expression with more than one transient term")

142# -------------------------------------- Helper Classes ----------------------------------------------------------------

146 @property

147 def scalar(self):

148 return self.args[0].field

150 @property

151 def vector(self):

152 if isinstance(self.args[1], Field.Access):

153 return self.args[1].field

154 else:

155 return self.args[1]

157 @property

158 def scalar_index(self):

159 return None if len(self.args) <= 2 else int(self.args[2])

161 @property

162 def dim(self):

163 return self.scalar.spatial_dimensions

165 def _latex(self, printer):

166 name_suffix = f"_{self.scalar_index}" if self.scalar_index is not None else ""

167 if isinstance(self.vector, Field):

168 return r"\nabla \cdot(%s %s)" % (printer.doprint(sp.Symbol(self.vector.name)),

169 printer.doprint(sp.Symbol(self.scalar.name + name_suffix)))

170 else:

171 args = [r"\partial_%d(%s %s)" % (i, printer.doprint(sp.Symbol(self.scalar.name + name_suffix)),

172 printer.doprint(self.vector[i]))

173 for i in range(self.dim)]

174 return " + ".join(args)

176 # --- Interface for discretization strategy

178 def velocity_field_at_offset(self, offset_dim, offset_value, index):

179 v = self.vector

180 if isinstance(v, Field):

181 assert v.index_dimensions == 1

182 return v.neighbor(offset_dim, offset_value)(index)

183 else:

184 return v[index]

187 idx = 0 if self.scalar_index is None else int(self.scalar_index)

188 return self.scalar.neighbor(offset_dim, offset_value)(idx)

191class Diffusion(sp.Function):

193 @property

194 def scalar(self):

195 return self.args[0].field

197 @property

198 def diffusion_coeff(self):

199 if isinstance(self.args[1], Field.Access):

200 return self.args[1].field

201 else:

202 return self.args[1]

204 @property

205 def scalar_index(self):

206 return None if len(self.args) <= 2 else int(self.args[2])

208 @property

209 def dim(self):

210 return self.scalar.spatial_dimensions

212 def _latex(self, printer):

213 name_suffix = f"_{self.scalar_index}" if self.scalar_index is not None else ""

214 coeff = self.diffusion_coeff

215 diff_coeff = sp.Symbol(coeff.name) if isinstance(coeff, Field) else coeff

216 return r"div(%s \nabla %s)" % (printer.doprint(diff_coeff),

217 printer.doprint(sp.Symbol(self.scalar.name + name_suffix)))

219 # --- Interface for discretization strategy

221 def diffusion_scalar_at_offset(self, offset_dim, offset_value):

222 idx = 0 if self.scalar_index is None else self.scalar_index

223 return self.scalar.neighbor(offset_dim, offset_value)(idx)

225 def diffusion_coefficient_at_offset(self, offset_dim, offset_value):

226 d = self.diffusion_coeff

227 if isinstance(d, Field):

228 return d.neighbor(offset_dim, offset_value)

229 else:

230 return d

233class Transient(sp.Function):

234 @property

235 def scalar(self):

236 if self.scalar_index is None:

237 return self.args[0].field.center

238 else:

239 return self.args[0].field(self.scalar_index)

241 @property

242 def scalar_index(self):

243 return None if len(self.args) <= 1 else int(self.args[1])

245 def _latex(self, printer):

246 name_suffix = f"_{self.scalar_index}" if self.scalar_index is not None else ""

247 return r"\partial_t %s" % (printer.doprint(sp.Symbol(self.scalar.name + name_suffix)),)

250# -------------------------------------------- Deprecated Functions ----------------------------------------------------

254 r"""

255 Gradients are represented as a special symbol:

256 e.g. :math:\nabla x = (x^{\Delta 0}, x^{\Delta 1}, x^{\Delta 2})

258 This function takes a symbol and creates the gradient symbols according to convention above

260 Args:

261 var: symbol to take the gradient of

262 dim: dimension (length) of the gradient vector

263 """

264 if hasattr(var, "__getitem__"):

265 return [[sp.Symbol("%s^Delta^%d" % (v.name, i)) for v in var] for i in range(dim)]

266 else:

267 return [sp.Symbol("%s^Delta^%d" % (var.name, i)) for i in range(dim)]

270def discretize_center(term, symbols_to_field_dict, dx, dim=3):

271 """

272 Expects term that contains given symbols and gradient components of these symbols and replaces them

273 by field accesses. Gradients are replaced by centralized approximations:

274 (upper neighbor - lower neighbor ) / ( 2*dx)

276 Args:

277 term: term where symbols and gradient(symbol) should be replaced

278 symbols_to_field_dict: mapping of symbols to Field

279 dx: width and height of one cell

280 dim: dimension

282 Example:

283 >>> x = sp.Symbol("x")

285 >>> term = x * grad_x[0]

286 >>> term

287 x*x^Delta^0

288 >>> f = Field.create_generic('f', spatial_dimensions=3)

289 >>> expected_output = f[0, 0, 0] * (-f[-1, 0, 0]/2 + f[1, 0, 0]/2)

290 >>> sp.simplify(discretize_center(term, { x: f }, dx=1, dim=3) - expected_output)

291 0

292 """

293 substitutions = {}

294 for symbols, field in symbols_to_field_dict.items():

295 if not hasattr(symbols, "__getitem__"):

296 symbols = [symbols]

298 substitutions.update({symbol: field(i) for i, symbol in enumerate(symbols)})

299 for d in range(dim):

300 up, down = __up_down_offsets(d, dim)

301 substitutions.update({g[d][i]: (field[up](i) - field[down](i)) / dx / 2 for i in range(len(symbols))})

302 return fast_subs(term, substitutions)

305def discretize_staggered(term, symbols_to_field_dict, coordinate, coordinate_offset, dx, dim=3):

306 """

307 Expects term that contains given symbols and gradient components of these symbols and replaces them

308 by field accesses. Gradients in coordinate direction are replaced by staggered version at cell boundary.

309 Symbols themselves and gradients in other directions are replaced by interpolated version at cell face.

311 Args:

312 term: input term where symbols and gradients are replaced

313 symbols_to_field_dict: mapping of symbols to Field

314 coordinate: id for coordinate (0 for x, 1 for y, ... ) defining cell boundary.

315 Only gradients in this direction are replaced e.g. if symbol^Delta^coordinate

316 coordinate_offset: either +1 or -1 for upper or lower face in coordinate direction

317 dx: width and height of one cell

318 dim: dimension

320 Examples:

321 Discretizing at right/east face of cell i.e. coordinate=0, offset=1)

322 >>> x, dx = sp.symbols("x dx")

324 >>> term = x * grad_x[0]

325 >>> term

326 x*x^Delta^0

327 >>> f = Field.create_generic('f', spatial_dimensions=3)

328 >>> discretize_staggered(term, symbols_to_field_dict={ x: f}, dx=dx, coordinate=0, coordinate_offset=1, dim=3)

329 (-f_C + f_E)*(f_C/2 + f_E/2)/dx

330 """

331 assert coordinate_offset == 1 or coordinate_offset == -1

332 assert 0 <= coordinate < dim

334 substitutions = {}

335 for symbols, field in symbols_to_field_dict.items():

336 if not hasattr(symbols, "__getitem__"):

337 symbols = [symbols]

339 offset = [0] * dim

340 offset[coordinate] = coordinate_offset

341 offset = np.array(offset, dtype=int)

344 substitutions.update({s: (field[offset](i) + field(i)) / 2 for i, s in enumerate(symbols)})

345 substitutions.update({g: (field[offset](i) - field(i)) / dx * coordinate_offset

346 for i, g in enumerate(gradient)})

347 for d in range(dim):

348 if d == coordinate:

349 continue

350 up, down = __up_down_offsets(d, dim)

351 for i, s in enumerate(symbols):

352 center_grad = (field[up](i) - field[down](i)) / (2 * dx)

353 neighbor_grad = (field[up + offset](i) - field[down + offset](i)) / (2 * dx)

356 return fast_subs(term, substitutions)

359def discretize_divergence(vector_term, symbols_to_field_dict, dx):

360 """

361 Computes discrete divergence of symbolic vector

363 Args:

364 vector_term: sequence of terms, interpreted as vector

365 symbols_to_field_dict: mapping of symbols to Field

366 dx: length of a cell

368 Examples:

369 Laplace stencil

370 >>> x, dx = sp.symbols("x dx")

372 >>> f = Field.create_generic('f', spatial_dimensions=3)

373 >>> expected_output = (f[-1, 0, 0] + f[0, -1, 0] + f[0, 0, -1] -

374 ... 6*f[0, 0, 0] + f[0, 0, 1] + f[0, 1, 0] + f[1, 0, 0])/dx**2

375 >>> sp.simplify(discretize_divergence(grad_x, {x : f}, dx) - expected_output)

376 0

377 """

378 dim = len(vector_term)

379 result = 0

380 for d in range(dim):

381 for offset in [-1, 1]:

382 result += offset * discretize_staggered(vector_term[d], symbols_to_field_dict, d, offset, dx, dim)

383 return result / dx

386def __up_down_offsets(d, dim):

387 coord = [0] * dim

388 coord[d] = 1

389 up = np.array(coord, dtype=int)

390 coord[d] = -1

391 down = np.array(coord, dtype=int)

392 return up, down