1import pystencils as ps 

2import sympy as sp 

3from pystencils.fd.derivation import FiniteDifferenceStaggeredStencilDerivation as FDS, \ 

4 FiniteDifferenceStencilDerivation as FD 

5import itertools 

6from collections import defaultdict 

7from collections.abc import Iterable 

8 

9 

10def get_access_and_direction(term): 

11 direction1 = term.args[1] 

12 if isinstance(term.args[0], ps.Field.Access): # first derivative 

13 access = term.args[0] 

14 direction = (direction1,) 

15 elif isinstance(term.args[0], ps.fd.Diff): # nested derivative 

16 if isinstance(term.args[0].args[0], ps.fd.Diff): # third or higher derivative 

17 raise ValueError("can only handle first and second derivatives") 

18 elif not isinstance(term.args[0].args[0], ps.Field.Access): 

19 raise ValueError("can only handle derivatives of field accesses") 

20 

21 access, direction2 = term.args[0].args[:2] 

22 direction = (direction1, direction2) 

23 else: 

24 raise NotImplementedError(f"can only deal with derivatives of field accesses, " 

25 f"but not {type(term.args[0])}; expansion of derivatives probably failed") 

26 return access, direction 

27 

28 

29class FVM1stOrder: 

30 """Finite-volume discretization 

31 

32 Args: 

33 field: the field with the quantity to calculate, e.g. a concentration 

34 flux: a list of sympy expressions that specify the flux, one for each cartesian direction 

35 source: a list of sympy expressions that specify the source 

36 """ 

37 def __init__(self, field: ps.field.Field, flux=0, source=0): 

38 def normalize(f, shape): 

39 shape = tuple(s for s in shape if s != 1) 

40 if not shape: 

41 shape = None 

42 

43 if isinstance(f, sp.Array) or isinstance(f, Iterable) or isinstance(f, sp.Matrix): 

44 return sp.Array(f, shape) 

45 else: 

46 return sp.Array([f] * (sp.Mul(*shape) if shape else 1)) 

47 

48 self.c = field 

49 self.dim = self.c.spatial_dimensions 

50 self.j = normalize(flux, (self.dim, ) + self.c.index_shape) 

51 self.q = normalize(source, self.c.index_shape) 

52 

53 def discrete_flux(self, flux_field: ps.field.Field): 

54 """Return a list of assignments for the discrete fluxes 

55 

56 Args: 

57 flux_field: a staggered field to which the fluxes should be assigned 

58 """ 

59 

60 assert ps.FieldType.is_staggered(flux_field) 

61 

62 def discretize(term, neighbor): 

63 if isinstance(term, sp.Matrix): 

64 nw = term.applyfunc(lambda t: discretize(t, neighbor)) 

65 return nw 

66 elif isinstance(term, ps.field.Field.Access): 

67 avg = (term.get_shifted(*neighbor) + term) * sp.Rational(1, 2) 

68 return avg 

69 elif isinstance(term, ps.fd.Diff): 

70 access, direction = get_access_and_direction(term) 

71 

72 fds = FDS(neighbor, access.field.spatial_dimensions, direction) 

73 return fds.apply(access) 

74 

75 if term.args: 

76 new_args = [discretize(a, neighbor) for a in term.args] 

77 return term.func(*new_args) 

78 else: 

79 return term 

80 

81 fluxes = self.j.applyfunc(ps.fd.derivative.expand_diff_full) 

82 fluxes = [sp.Matrix(fluxes.tolist()[i]) if flux_field.index_dimensions > 1 else fluxes.tolist()[i] 

83 for i in range(self.dim)] 

84 

85 A0 = sum([sp.Matrix(ps.stencil.direction_string_to_offset(d)).norm() 

86 for d in flux_field.staggered_stencil]) / self.dim 

87 

88 discrete_fluxes = [] 

89 for neighbor in flux_field.staggered_stencil: 

90 neighbor = ps.stencil.direction_string_to_offset(neighbor) 

91 directional_flux = fluxes[0] * int(neighbor[0]) 

92 for i in range(1, self.dim): 

93 directional_flux += fluxes[i] * int(neighbor[i]) 

94 discrete_flux = discretize(directional_flux, neighbor) 

95 discrete_fluxes.append(discrete_flux / sp.Matrix(neighbor).norm()) 

96 

97 if flux_field.index_dimensions > 1: 

98 return [ps.Assignment(lhs, rhs / A0) 

99 for i, d in enumerate(flux_field.staggered_stencil) if discrete_fluxes[i] 

100 for lhs, rhs in zip(flux_field.staggered_vector_access(d), sp.simplify(discrete_fluxes[i]))] 

101 else: 

102 return [ps.Assignment(flux_field.staggered_access(d), sp.simplify(discrete_fluxes[i]) / A0) 

103 for i, d in enumerate(flux_field.staggered_stencil)] 

104 

105 def discrete_source(self): 

106 """Return a list of assignments for the discrete source term""" 

107 

108 def discretize(term): 

109 if isinstance(term, ps.fd.Diff): 

110 access, direction = get_access_and_direction(term) 

111 

112 if self.dim == 2: 

113 stencil = ["".join(a).replace(" ", "") for a in itertools.product("NS ", "EW ") 

114 if "".join(a).strip()] 

115 else: 

116 stencil = ["".join(a).replace(" ", "") for a in itertools.product("NS ", "EW ", "TB ") 

117 if "".join(a).strip()] 

118 weights = None 

119 for stencil in [["N", "S", "E", "W", "T", "B"][:2 * self.dim], stencil]: 

120 stencil = [tuple(ps.stencil.direction_string_to_offset(d, self.dim)) for d in stencil] 

121 

122 derivation = FD(direction, stencil).get_stencil() 

123 if not derivation.accuracy: 

124 continue 

125 weights = derivation.weights 

126 

127 # if the weights are underdefined, we can choose the free symbols to find the sparsest stencil 

128 free_weights = set(itertools.chain(*[w.free_symbols for w in weights])) 

129 if len(free_weights) > 0: 

130 zero_counts = defaultdict(list) 

131 for values in itertools.product([-1, -sp.Rational(1, 2), 0, 1, sp.Rational(1, 2)], 

132 repeat=len(free_weights)): 

133 subs = {free_weight: value for free_weight, value in zip(free_weights, values)} 

134 weights = [w.subs(subs) for w in derivation.weights] 

135 if not all(a == 0 for a in weights): 

136 zero_count = sum([1 for w in weights if w == 0]) 

137 zero_counts[zero_count].append(weights) 

138 best = zero_counts[max(zero_counts.keys())] 

139 if len(best) > 1: 

140 raise NotImplementedError("more than one suitable set of weights found, " 

141 "don't know how to proceed") 

142 weights = best[0] 

143 break 

144 if not weights: 

145 raise Exception('the requested derivative cannot be performed with the available neighbors') 

146 assert weights 

147 

148 if access._field.index_dimensions == 0: 

149 return sum([access._field.__getitem__(point) * weight for point, weight in zip(stencil, weights)]) 

150 else: 

151 total = access.get_shifted(*stencil[0]).at_index(*access.index) * weights[0] 

152 for point, weight in zip(stencil[1:], weights[1:]): 

153 addl = access.get_shifted(*point).at_index(*access.index) * weight 

154 total += addl 

155 return total 

156 

157 if term.args: 

158 new_args = [discretize(a) for a in term.args] 

159 return term.func(*new_args) 

160 else: 

161 return term 

162 

163 source = self.q.applyfunc(ps.fd.derivative.expand_diff_full) 

164 source = source.applyfunc(discretize) 

165 

166 return [ps.Assignment(lhs, rhs) for lhs, rhs in zip(self.c.center_vector, sp.flatten(source)) if rhs] 

167 

168 def discrete_continuity(self, flux_field: ps.field.Field): 

169 """Return a list of assignments for the continuity equation, which includes the source term 

170 

171 Args: 

172 flux_field: a staggered field from which the fluxes are taken 

173 """ 

174 

175 assert ps.FieldType.is_staggered(flux_field) 

176 

177 neighbors = flux_field.staggered_stencil + [ps.stencil.inverse_direction_string(d) 

178 for d in flux_field.staggered_stencil] 

179 divergence = flux_field.staggered_vector_access(neighbors[0]) 

180 for d in neighbors[1:]: 

181 divergence += flux_field.staggered_vector_access(d) 

182 

183 source = self.discrete_source() 

184 source = {s.lhs: s.rhs for s in source} 

185 

186 return [ps.Assignment(lhs, (lhs - rhs + source[lhs]) if lhs in source else (lhs - rhs)) 

187 for lhs, rhs in zip(self.c.center_vector, divergence)] 

188 

189 

190def VOF(j: ps.field.Field, v: ps.field.Field, ρ: ps.field.Field): 

191 """Volume-of-fluid discretization of advection 

192 

193 Args: 

194 j: the staggered field to write the fluxes to. Should have a D2Q9/D3Q27 stencil. Other stencils work too, but 

195 incur a small error (D2Q5/D3Q7: v^2, D3Q19: v^3). 

196 v: the flow velocity field 

197 ρ: the quantity to advect 

198 """ 

199 assert ps.FieldType.is_staggered(j) 

200 

201 fluxes = [[] for i in range(j.index_shape[0])] 

202 

203 v0 = v.center_vector 

204 for d, neighbor in enumerate(j.staggered_stencil): 

205 c = ps.stencil.direction_string_to_offset(neighbor) 

206 v1 = v.neighbor_vector(c) 

207 

208 # going out 

209 cond = sp.And(*[sp.Or(c[i] * v0[i] > 0, c[i] == 0) for i in range(len(v0))]) 

210 overlap1 = [1 - sp.Abs(v0[i]) for i in range(len(v0))] 

211 overlap2 = [c[i] * v0[i] for i in range(len(v0))] 

212 overlap = sp.Mul(*[(overlap1[i] if c[i] == 0 else overlap2[i]) for i in range(len(v0))]) 

213 fluxes[d].append(ρ.center_vector * overlap * sp.Piecewise((1, cond), (0, True))) 

214 

215 # coming in 

216 cond = sp.And(*[sp.Or(c[i] * v1[i] < 0, c[i] == 0) for i in range(len(v1))]) 

217 overlap1 = [1 - sp.Abs(v1[i]) for i in range(len(v1))] 

218 overlap2 = [v1[i] for i in range(len(v1))] 

219 overlap = sp.Mul(*[(overlap1[i] if c[i] == 0 else overlap2[i]) for i in range(len(v1))]) 

220 sign = (c == 1).sum() % 2 * 2 - 1 

221 fluxes[d].append(sign * ρ.neighbor_vector(c) * overlap * sp.Piecewise((1, cond), (0, True))) 

222 

223 for i, ff in enumerate(fluxes): 

224 fluxes[i] = ff[0] 

225 for f in ff[1:]: 

226 fluxes[i] += f 

227 

228 assignments = [] 

229 for i, d in enumerate(j.staggered_stencil): 

230 for lhs, rhs in zip(j.staggered_vector_access(d).values(), fluxes[i].values()): 

231 assignments.append(ps.Assignment(lhs, rhs)) 

232 return assignments