1from abc import ABC, abstractmethod 

2from typing import Callable, Dict, Iterable, Optional, Sequence, Tuple, Union 

3 

4import numpy as np 

5 

6from pystencils.field import Field, FieldType 

7 

8 

9class DataHandling(ABC): 

10 """ 

11 Manages the storage of arrays and maps them to a symbolic field. 

12 Two versions are available: a simple, pure Python implementation for single node 

13 simulations :py:class:SerialDataHandling and a distributed version using walberla in :py:class:ParallelDataHandling 

14 

15 Keep in mind that the data can be distributed, so use the 'access' method whenever possible and avoid the 

16 'gather' function that has collects (parts of the) distributed data on a single process. 

17 """ 

18 

19 _GPU_LIKE_TARGETS = ['gpu', 'opencl'] 

20 _GPU_LIKE_BACKENDS = ['gpucuda', 'opencl'] 

21 

22 # ---------------------------- Adding and accessing data ----------------------------------------------------------- 

23 

24 @property 

25 @abstractmethod 

26 def dim(self) -> int: 

27 """Dimension of the domain, either 2 or 3""" 

28 

29 @property 

30 @abstractmethod 

31 def shape(self) -> Tuple[int, ...]: 

32 """Shape of outer bounding box.""" 

33 

34 @property 

35 @abstractmethod 

36 def periodicity(self) -> Tuple[bool, ...]: 

37 """Returns tuple of booleans for x,y,(z) directions with True if domain is periodic in that direction.""" 

38 

39 @abstractmethod 

40 def add_array(self, name: str, values_per_cell, dtype=np.float64, 

41 latex_name: Optional[str] = None, ghost_layers: Optional[int] = None, layout: Optional[str] = None, 

42 cpu: bool = True, gpu: Optional[bool] = None, alignment=False, field_type=FieldType.GENERIC) -> Field: 

43 """Adds a (possibly distributed) array to the handling that can be accessed using the given name. 

44 

45 For each array a symbolic field is available via the 'fields' dictionary 

46 

47 Args: 

48 name: unique name that is used to access the field later 

49 values_per_cell: shape of the dim+1 coordinate. DataHandling supports zero or one index dimensions, 

50 i.e. scalar fields and vector fields. This parameter gives the shape of the index 

51 dimensions. The default value of 1 means no index dimension are created. 

52 dtype: data type of the array as numpy data type 

53 latex_name: optional, name of the symbolic field, if not given 'name' is used 

54 ghost_layers: number of ghost layers - if not specified a default value specified in the constructor 

55 is used 

56 layout: memory layout of array, either structure of arrays 'SoA' or array of structures 'AoS'. 

57 this is only important if values_per_cell > 1 

58 cpu: allocate field on the CPU 

59 gpu: allocate field on the GPU, if None, a GPU field is allocated if default_target is 'gpu' 

60 alignment: either False for no alignment, or the number of bytes to align to 

61 Returns: 

62 pystencils field, that can be used to formulate symbolic kernels 

63 """ 

64 

65 def add_arrays(self, 

66 description: str, 

67 dtype=np.float64, 

68 ghost_layers: Optional[int] = None, 

69 layout: Optional[str] = None, 

70 cpu: bool = True, 

71 gpu: Optional[bool] = None, 

72 alignment=False, 

73 field_type=FieldType.GENERIC) -> Tuple[Field]: 

74 """Adds multiple arrays using a string description similar to :func:`pystencils.fields` 

75 

76 

77 >>> from pystencils.datahandling import create_data_handling 

78 >>> dh = create_data_handling((20, 30)) 

79 >>> x, y =dh.add_arrays('x, y(9)') 

80 >>> print(dh.fields) 

81 {'x': x: double[22,32], 'y': y(9): double[22,32]} 

82 >>> assert x == dh.fields['x'] 

83 >>> assert dh.fields['x'].shape == (22, 32) 

84 >>> assert dh.fields['y'].index_shape == (9,) 

85 

86 Args: 

87 description (str): String description of the fields to add 

88 dtype: data type of the array as numpy data type 

89 ghost_layers: number of ghost layers - if not specified a default value specified in the constructor 

90 is used 

91 layout: memory layout of array, either structure of arrays 'SoA' or array of structures 'AoS'. 

92 this is only important if values_per_cell > 1 

93 cpu: allocate field on the CPU 

94 gpu: allocate field on the GPU, if None, a GPU field is allocated if default_target is 'gpu' 

95 alignment: either False for no alignment, or the number of bytes to align to 

96 Returns: 

97 Fields representing the just created arrays 

98 """ 

99 from pystencils.field import _parse_part1 

100 

101 names = [] 

102 for name, indices in _parse_part1(description): 

103 names.append(name) 

104 self.add_array(name, 

105 values_per_cell=indices, 

106 dtype=dtype, 

107 ghost_layers=ghost_layers, 

108 layout=layout, 

109 cpu=cpu, 

110 gpu=gpu, 

111 alignment=alignment, 

112 field_type=field_type) 

113 

114 return (self.fields[n] for n in names) 

115 

116 @abstractmethod 

117 def has_data(self, name): 

118 """Returns true if a field or custom data element with this name was added.""" 

119 

120 @abstractmethod 

121 def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None): 

122 """ 

123 Adds an array with the same parameters (number of ghost layers, values_per_cell, dtype) as existing array. 

124 

125 Args: 

126 name: name of new array 

127 name_of_template_field: name of array that is used as template 

128 latex_name: see 'add' method 

129 cpu: see 'add' method 

130 gpu: see 'add' method 

131 """ 

132 

133 @abstractmethod 

134 def add_custom_data(self, name: str, cpu_creation_function, 

135 gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None): 

136 """Adds custom (non-array) data to domain. 

137 

138 Args: 

139 name: name to access data 

140 cpu_creation_function: function returning a new instance of the data that should be stored 

141 gpu_creation_function: optional, function returning a new instance, stored on GPU 

142 cpu_to_gpu_transfer_func: function that transfers cpu to gpu version, 

143 getting two parameters (gpu_instance, cpu_instance) 

144 gpu_to_cpu_transfer_func: function that transfers gpu to cpu version, getting two parameters 

145 (gpu_instance, cpu_instance) 

146 """ 

147 

148 def add_custom_class(self, name: str, class_obj, cpu: bool = True, gpu: bool = False): 

149 """Adds non-array data by passing a class object with optional 'to_gpu' and 'to_cpu' member functions.""" 

150 cpu_to_gpu_transfer_func = class_obj.to_gpu if cpu and gpu and hasattr(class_obj, 'to_gpu') else None 

151 gpu_to_cpu_transfer_func = class_obj.to_cpu if cpu and gpu and hasattr(class_obj, 'to_cpu') else None 

152 self.add_custom_data(name, 

153 cpu_creation_function=class_obj if cpu else None, 

154 gpu_creation_function=class_obj if gpu else None, 

155 cpu_to_gpu_transfer_func=cpu_to_gpu_transfer_func, 

156 gpu_to_cpu_transfer_func=gpu_to_cpu_transfer_func) 

157 

158 @property 

159 @abstractmethod 

160 def fields(self) -> Dict[str, Field]: 

161 """Dictionary mapping data name to symbolic pystencils field - use this to create pystencils kernels.""" 

162 

163 @property 

164 @abstractmethod 

165 def array_names(self) -> Sequence[str]: 

166 """Sequence of all array names.""" 

167 

168 @property 

169 @abstractmethod 

170 def custom_data_names(self) -> Sequence[str]: 

171 """Sequence of all custom data names.""" 

172 

173 @abstractmethod 

174 def ghost_layers_of_field(self, name: str) -> int: 

175 """Returns the number of ghost layers for a specific field/array.""" 

176 

177 @abstractmethod 

178 def values_per_cell(self, name: str) -> Tuple[int, ...]: 

179 """Returns values_per_cell of array.""" 

180 

181 @abstractmethod 

182 def iterate(self, slice_obj=None, gpu=False, ghost_layers=None, 

183 inner_ghost_layers=True) -> Iterable['Block']: 

184 """Iterate over local part of potentially distributed data structure.""" 

185 

186 @abstractmethod 

187 def gather_array(self, name, slice_obj=None, all_gather=False, ghost_layers=False) -> Optional[np.ndarray]: 

188 """ 

189 Gathers part of the domain on a local process. Whenever possible use 'access' instead, since this method copies 

190 the distributed data to a single process which is inefficient and may exhaust the available memory 

191 

192 Args: 

193 name: name of the array to gather 

194 slice_obj: slice expression of the rectangular sub-part that should be gathered 

195 all_gather: if False only the root process receives the result, if True all processes 

196 ghost_layers: number of outer ghost layers to include (only available for serial version of data handling) 

197 

198 Returns: 

199 gathered field that does not include any ghost layers, or None if gathered on another process 

200 """ 

201 

202 @abstractmethod 

203 def run_kernel(self, kernel_function, *args, **kwargs) -> None: 

204 """Runs a compiled pystencils kernel. 

205 

206 Uses the arrays stored in the DataHandling class for all array parameters. Additional passed arguments are 

207 directly passed to the kernel function and override possible parameters from the DataHandling 

208 """ 

209 

210 @abstractmethod 

211 def get_kernel_kwargs(self, kernel_function, **kwargs): 

212 """Returns the input arguments of a kernel""" 

213 

214 @abstractmethod 

215 def swap(self, name1, name2, gpu=False): 

216 """Swaps data of two arrays""" 

217 

218 # ------------------------------- CPU/GPU transfer ----------------------------------------------------------------- 

219 

220 @abstractmethod 

221 def to_cpu(self, name): 

222 """Copies GPU data of array with specified name to CPU. 

223 Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method.""" 

224 

225 @abstractmethod 

226 def to_gpu(self, name): 

227 """Copies GPU data of array with specified name to GPU. 

228 Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method.""" 

229 

230 @abstractmethod 

231 def all_to_cpu(self): 

232 """Copies data from GPU to CPU for all arrays that have a CPU and a GPU representation.""" 

233 

234 @abstractmethod 

235 def all_to_gpu(self): 

236 """Copies data from CPU to GPU for all arrays that have a CPU and a GPU representation.""" 

237 

238 @abstractmethod 

239 def is_on_gpu(self, name): 

240 """Checks if this data was also allocated on the GPU - does not check if this data item is in synced.""" 

241 

242 @abstractmethod 

243 def create_vtk_writer(self, file_name, data_names, ghost_layers=False) -> Callable[[int], None]: 

244 """VTK output for one or multiple arrays. 

245 

246 Args 

247 file_name: base file name without extension for the VTK output 

248 data_names: list of array names that should be included in the vtk output 

249 ghost_layers: true if ghost layer information should be written out as well 

250 

251 Returns: 

252 a function that can be called with an integer time step to write the current state 

253 i.e create_vtk_writer('some_file', ['velocity', 'density']) (1) 

254 """ 

255 

256 @abstractmethod 

257 def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name, 

258 ghost_layers=False) -> Callable[[int], None]: 

259 """VTK output for an unsigned integer field, where bits are interpreted as flags. 

260 

261 Args: 

262 file_name: see create_vtk_writer 

263 data_name: name of an array with uint type 

264 masks_to_name: dictionary mapping integer masks to a name in the output 

265 ghost_layers: see create_vtk_writer 

266 

267 Returns: 

268 functor that can be called with time step 

269 """ 

270 

271 # ------------------------------- Communication -------------------------------------------------------------------- 

272 

273 @abstractmethod 

274 def synchronization_function(self, names, stencil=None, target=None, **kwargs) -> Callable[[], None]: 

275 """Synchronizes ghost layers for distributed arrays. 

276 

277 For serial scenario this has to be called for correct periodicity handling 

278 

279 Args: 

280 names: what data to synchronize: name of array or sequence of names 

281 stencil: stencil as string defining which neighbors are synchronized e.g. 'D2Q9', 'D3Q19' 

282 if None, a full synchronization (i.e. D2Q9 or D3Q27) is done 

283 target: either 'cpu' or 'gpu 

284 kwargs: implementation specific, optional optimization parameters for communication 

285 

286 Returns: 

287 function object to run the communication 

288 """ 

289 

290 def reduce_float_sequence(self, sequence, operation, all_reduce=False) -> np.array: 

291 """Takes a sequence of floating point values on each process and reduces it element-wise. 

292 

293 If all_reduce, all processes get the result, otherwise only the root process. 

294 Possible operations are 'sum', 'min', 'max' 

295 """ 

296 

297 def reduce_int_sequence(self, sequence, operation, all_reduce=False) -> np.array: 

298 """See function reduce_float_sequence - this is the same for integers""" 

299 

300 # ------------------------------- Data access and modification ----------------------------------------------------- 

301 

302 def fill(self, array_name: str, val, value_idx: Optional[Union[int, Tuple[int, ...]]] = None, 

303 slice_obj=None, ghost_layers=False, inner_ghost_layers=False) -> None: 

304 """Sets all cells to the same value. 

305 

306 Args: 

307 array_name: name of the array that should be modified 

308 val: value to set the array to 

309 value_idx: If an array stores multiple values per cell, this index chooses which of this values to fill. 

310 If None, all values are set 

311 slice_obj: if passed, only the defined slice is filled 

312 ghost_layers: True if the outer ghost layers should also be filled 

313 inner_ghost_layers: True if the inner ghost layers should be filled. Inner ghost layers occur only in 

314 parallel setups for distributed memory communication. 

315 """ 

316 if ghost_layers is True: 

317 ghost_layers = self.ghost_layers_of_field(array_name) 

318 if inner_ghost_layers is True: 318 ↛ 319line 318 didn't jump to line 319, because the condition on line 318 was never true

319 ghost_layers = self.ghost_layers_of_field(array_name) 

320 

321 for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers): 

322 if value_idx is not None: 322 ↛ 323line 322 didn't jump to line 323, because the condition on line 322 was never true

323 if isinstance(value_idx, int): 

324 value_idx = (value_idx,) 

325 assert len(value_idx) == len(self.values_per_cell(array_name)) 

326 b[array_name][(Ellipsis, *value_idx)].fill(val) 

327 else: 

328 b[array_name].fill(val) 

329 

330 def min(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True): 

331 """Returns the minimum value inside the domain or slice of the domain. 

332 

333 For meaning of arguments see documentation of :func:`DataHandling.fill`. 

334 

335 Returns: 

336 the minimum of the locally stored domain part is returned if reduce is False, otherwise the global minimum 

337 on the root process, on other processes None 

338 """ 

339 result = None 

340 if ghost_layers is True: 

341 ghost_layers = self.ghost_layers_of_field(array_name) 

342 if inner_ghost_layers is True: 

343 ghost_layers = self.ghost_layers_of_field(array_name) 

344 for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers): 

345 m = np.min(b[array_name]) 

346 result = m if result is None else np.min(result, m) 

347 return self.reduce_float_sequence([result], 'min', all_reduce=True)[0] if reduce else result 

348 

349 def max(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True): 

350 """Returns the maximum value inside the domain or slice of the domain. 

351 

352 For argument description see :func:`DataHandling.min` 

353 """ 

354 result = None 

355 if ghost_layers is True: 

356 ghost_layers = self.ghost_layers_of_field(array_name) 

357 if inner_ghost_layers is True: 

358 ghost_layers = self.ghost_layers_of_field(array_name) 

359 for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers): 

360 m = np.max(b[array_name]) 

361 result = m if result is None else np.max(result, m) 

362 

363 return self.reduce_float_sequence([result], 'max', all_reduce=True)[0] if reduce else result 

364 

365 def save_all(self, file): 

366 """Saves all field data to disk into a file""" 

367 

368 def load_all(self, file): 

369 """Loads all field data from disk into a file 

370 

371 Works only if save_all was called with exactly the same field sizes, layouts etc. 

372 When run in parallel save and load has to be called with the same number of processes. 

373 Use for check pointing only - to store results use VTK output 

374 """ 

375 

376 def __str__(self): 

377 result = "" 

378 

379 first_column_width = max(len("Name"), max(len(a) for a in self.array_names)) 

380 row_format = "{:>%d}|{:>21}|{:>21}\n" % (first_column_width,) 

381 separator_line = "-" * (first_column_width + 21 + 21 + 2) + "\n" 

382 result += row_format.format("Name", "Inner (min/max)", "WithGl (min/max)") 

383 result += separator_line 

384 for arr_name in sorted(self.array_names): 

385 inner_min_max = (self.min(arr_name, ghost_layers=False), self.max(arr_name, ghost_layers=False)) 

386 with_gl_min_max = (self.min(arr_name, ghost_layers=True), self.max(arr_name, ghost_layers=True)) 

387 inner_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(inner_min_max) 

388 with_gl_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(with_gl_min_max) 

389 result += row_format.format(arr_name, inner_min_max, with_gl_min_max) 

390 return result 

391 

392 def log(self, *args, level='INFO'): 

393 """Similar to print with additional information (time, rank).""" 

394 

395 def log_on_root(self, *args, level='INFO'): 

396 """Logs only on root process. For serial setups this is equivalent to log""" 

397 

398 @property 

399 def is_root(self): 

400 """Returns True for exactly one process in the simulation""" 

401 

402 @property 

403 def world_rank(self): 

404 """Number of current process""" 

405 

406 

407class Block: 

408 """Represents locally stored part of domain. 

409 

410 Instances of this class are returned by DataHandling.iterate, do not create manually! 

411 """ 

412 

413 def __init__(self, offset, local_slice): 

414 self._offset = offset 

415 self._localSlice = local_slice 

416 

417 @property 

418 def offset(self): 

419 """Offset of the current block in global coordinates (where lower ghost layers have negative indices).""" 

420 return self._offset 

421 

422 @property 

423 def cell_index_arrays(self): 

424 """Global coordinate mesh-grid of cell coordinates. 

425 

426 Cell indices start at 0 at the first inner cell, lower ghost layers have negative indices 

427 """ 

428 mesh_grid_params = [offset + np.arange(width, dtype=np.int32) 

429 for offset, width in zip(self.offset, self.shape)] 

430 return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False) 

431 

432 @property 

433 def midpoint_arrays(self): 

434 """Global coordinate mesh-grid of cell midpoints which are shifted by 0.5 compared to cell indices.""" 

435 mesh_grid_params = [offset + 0.5 + np.arange(width, dtype=float) 

436 for offset, width in zip(self.offset, self.shape)] 

437 return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False) 

438 

439 @property 

440 def shape(self): 

441 """Shape of the fields (potentially including ghost layers).""" 

442 return tuple(s.stop - s.start for s in self._localSlice[:len(self._offset)]) 

443 

444 @property 

445 def global_slice(self): 

446 """Slice in global coordinates.""" 

447 return tuple(slice(off, off + size) for off, size in zip(self._offset, self.shape)) 

448 

449 def __getitem__(self, data_name: str) -> np.ndarray: 

450 raise NotImplementedError()