1import pystencils as ps

2import sympy as sp

3from pystencils.fd.derivation import FiniteDifferenceStaggeredStencilDerivation as FDS, \

4 FiniteDifferenceStencilDerivation as FD

5import itertools

6from collections import defaultdict

7from collections.abc import Iterable

10def get_access_and_direction(term):

11 direction1 = term.args[1]

12 if isinstance(term.args[0], ps.Field.Access): # first derivative

13 access = term.args[0]

14 direction = (direction1,)

15 elif isinstance(term.args[0], ps.fd.Diff): # nested derivative

16 if isinstance(term.args[0].args[0], ps.fd.Diff): # third or higher derivative

17 raise ValueError("can only handle first and second derivatives")

18 elif not isinstance(term.args[0].args[0], ps.Field.Access):

19 raise ValueError("can only handle derivatives of field accesses")

21 access, direction2 = term.args[0].args[:2]

22 direction = (direction1, direction2)

23 else:

24 raise NotImplementedError(f"can only deal with derivatives of field accesses, "

25 f"but not {type(term.args[0])}; expansion of derivatives probably failed")

26 return access, direction

29class FVM1stOrder:

30 """Finite-volume discretization

32 Args:

33 field: the field with the quantity to calculate, e.g. a concentration

34 flux: a list of sympy expressions that specify the flux, one for each cartesian direction

35 source: a list of sympy expressions that specify the source

36 """

37 def __init__(self, field: ps.field.Field, flux=0, source=0):

38 def normalize(f, shape):

39 shape = tuple(s for s in shape if s != 1)

40 if not shape:

41 shape = None

43 if isinstance(f, sp.Array) or isinstance(f, Iterable) or isinstance(f, sp.Matrix):

44 return sp.Array(f, shape)

45 else:

46 return sp.Array([f] * (sp.Mul(*shape) if shape else 1))

48 self.c = field

49 self.dim = self.c.spatial_dimensions

50 self.j = normalize(flux, (self.dim, ) + self.c.index_shape)

51 self.q = normalize(source, self.c.index_shape)

53 def discrete_flux(self, flux_field: ps.field.Field):

54 """Return a list of assignments for the discrete fluxes

56 Args:

57 flux_field: a staggered field to which the fluxes should be assigned

58 """

60 assert ps.FieldType.is_staggered(flux_field)

62 def discretize(term, neighbor):

63 if isinstance(term, sp.Matrix):

64 nw = term.applyfunc(lambda t: discretize(t, neighbor))

65 return nw

66 elif isinstance(term, ps.field.Field.Access):

67 avg = (term.get_shifted(*neighbor) + term) * sp.Rational(1, 2)

68 return avg

69 elif isinstance(term, ps.fd.Diff):

70 access, direction = get_access_and_direction(term)

72 fds = FDS(neighbor, access.field.spatial_dimensions, direction)

73 return fds.apply(access)

75 if term.args:

76 new_args = [discretize(a, neighbor) for a in term.args]

77 return term.func(*new_args)

78 else:

79 return term

81 fluxes = self.j.applyfunc(ps.fd.derivative.expand_diff_full)

82 fluxes = [sp.Matrix(fluxes.tolist()[i]) if flux_field.index_dimensions > 1 else fluxes.tolist()[i]

83 for i in range(self.dim)]

85 A0 = sum([sp.Matrix(ps.stencil.direction_string_to_offset(d)).norm()

86 for d in flux_field.staggered_stencil]) / self.dim

88 discrete_fluxes = []

89 for neighbor in flux_field.staggered_stencil:

90 neighbor = ps.stencil.direction_string_to_offset(neighbor)

91 directional_flux = fluxes[0] * int(neighbor[0])

92 for i in range(1, self.dim):

93 directional_flux += fluxes[i] * int(neighbor[i])

94 discrete_flux = discretize(directional_flux, neighbor)

95 discrete_fluxes.append(discrete_flux / sp.Matrix(neighbor).norm())

97 if flux_field.index_dimensions > 1:

98 return [ps.Assignment(lhs, rhs / A0)

99 for i, d in enumerate(flux_field.staggered_stencil) if discrete_fluxes[i]

100 for lhs, rhs in zip(flux_field.staggered_vector_access(d), sp.simplify(discrete_fluxes[i]))]

101 else:

102 return [ps.Assignment(flux_field.staggered_access(d), sp.simplify(discrete_fluxes[i]) / A0)

103 for i, d in enumerate(flux_field.staggered_stencil)]

105 def discrete_source(self):

106 """Return a list of assignments for the discrete source term"""

108 def discretize(term):

109 if isinstance(term, ps.fd.Diff):

110 access, direction = get_access_and_direction(term)

112 if self.dim == 2:

113 stencil = ["".join(a).replace(" ", "") for a in itertools.product("NS ", "EW ")

114 if "".join(a).strip()]

115 else:

116 stencil = ["".join(a).replace(" ", "") for a in itertools.product("NS ", "EW ", "TB ")

117 if "".join(a).strip()]

118 weights = None

119 for stencil in [["N", "S", "E", "W", "T", "B"][:2 * self.dim], stencil]:

120 stencil = [tuple(ps.stencil.direction_string_to_offset(d, self.dim)) for d in stencil]

122 derivation = FD(direction, stencil).get_stencil()

123 if not derivation.accuracy:

124 continue

125 weights = derivation.weights

127 # if the weights are underdefined, we can choose the free symbols to find the sparsest stencil

128 free_weights = set(itertools.chain(*[w.free_symbols for w in weights]))

129 if len(free_weights) > 0:

130 zero_counts = defaultdict(list)

131 for values in itertools.product([-1, -sp.Rational(1, 2), 0, 1, sp.Rational(1, 2)],

132 repeat=len(free_weights)):

133 subs = {free_weight: value for free_weight, value in zip(free_weights, values)}

134 weights = [w.subs(subs) for w in derivation.weights]

135 if not all(a == 0 for a in weights):

136 zero_count = sum([1 for w in weights if w == 0])

137 zero_counts[zero_count].append(weights)

138 best = zero_counts[max(zero_counts.keys())]

139 if len(best) > 1:

140 raise NotImplementedError("more than one suitable set of weights found, "

141 "don't know how to proceed")

142 weights = best[0]

143 break

144 if not weights:

145 raise Exception('the requested derivative cannot be performed with the available neighbors')

146 assert weights

148 if access._field.index_dimensions == 0:

149 return sum([access._field.__getitem__(point) * weight for point, weight in zip(stencil, weights)])

150 else:

151 total = access.get_shifted(*stencil[0]).at_index(*access.index) * weights[0]

152 for point, weight in zip(stencil[1:], weights[1:]):

153 addl = access.get_shifted(*point).at_index(*access.index) * weight

154 total += addl

155 return total

157 if term.args:

158 new_args = [discretize(a) for a in term.args]

159 return term.func(*new_args)

160 else:

161 return term

163 source = self.q.applyfunc(ps.fd.derivative.expand_diff_full)

164 source = source.applyfunc(discretize)

166 return [ps.Assignment(lhs, rhs) for lhs, rhs in zip(self.c.center_vector, sp.flatten(source)) if rhs]

168 def discrete_continuity(self, flux_field: ps.field.Field):

169 """Return a list of assignments for the continuity equation, which includes the source term

171 Args:

172 flux_field: a staggered field from which the fluxes are taken

173 """

175 assert ps.FieldType.is_staggered(flux_field)

177 neighbors = flux_field.staggered_stencil + [ps.stencil.inverse_direction_string(d)

178 for d in flux_field.staggered_stencil]

179 divergence = flux_field.staggered_vector_access(neighbors[0])

180 for d in neighbors[1:]:

181 divergence += flux_field.staggered_vector_access(d)

183 source = self.discrete_source()

184 source = {s.lhs: s.rhs for s in source}

186 return [ps.Assignment(lhs, (lhs - rhs + source[lhs]) if lhs in source else (lhs - rhs))

187 for lhs, rhs in zip(self.c.center_vector, divergence)]

190def VOF(j: ps.field.Field, v: ps.field.Field, ρ: ps.field.Field):

191 """Volume-of-fluid discretization of advection

193 Args:

194 j: the staggered field to write the fluxes to. Should have a D2Q9/D3Q27 stencil. Other stencils work too, but

195 incur a small error (D2Q5/D3Q7: v^2, D3Q19: v^3).

196 v: the flow velocity field

197 ρ: the quantity to advect

198 """

199 assert ps.FieldType.is_staggered(j)

201 fluxes = [[] for i in range(j.index_shape[0])]

203 v0 = v.center_vector

204 for d, neighbor in enumerate(j.staggered_stencil):

205 c = ps.stencil.direction_string_to_offset(neighbor)

206 v1 = v.neighbor_vector(c)

208 # going out

209 cond = sp.And(*[sp.Or(c[i] * v0[i] > 0, c[i] == 0) for i in range(len(v0))])

210 overlap1 = [1 - sp.Abs(v0[i]) for i in range(len(v0))]

211 overlap2 = [c[i] * v0[i] for i in range(len(v0))]

212 overlap = sp.Mul(*[(overlap1[i] if c[i] == 0 else overlap2[i]) for i in range(len(v0))])

213 fluxes[d].append(ρ.center_vector * overlap * sp.Piecewise((1, cond), (0, True)))

215 # coming in

216 cond = sp.And(*[sp.Or(c[i] * v1[i] < 0, c[i] == 0) for i in range(len(v1))])

217 overlap1 = [1 - sp.Abs(v1[i]) for i in range(len(v1))]

218 overlap2 = [v1[i] for i in range(len(v1))]

219 overlap = sp.Mul(*[(overlap1[i] if c[i] == 0 else overlap2[i]) for i in range(len(v1))])

220 sign = (c == 1).sum() % 2 * 2 - 1

221 fluxes[d].append(sign * ρ.neighbor_vector(c) * overlap * sp.Piecewise((1, cond), (0, True)))

223 for i, ff in enumerate(fluxes):

224 fluxes[i] = ff[0]

225 for f in ff[1:]:

226 fluxes[i] += f

228 assignments = []

229 for i, d in enumerate(j.staggered_stencil):

230 for lhs, rhs in zip(j.staggered_vector_access(d).values(), fluxes[i].values()):

231 assignments.append(ps.Assignment(lhs, rhs))

232 return assignments