1import itertools 

2import time 

3from typing import Sequence, Union 

4 

5import numpy as np 

6 

7from pystencils.datahandling.blockiteration import SerialBlock 

8from pystencils.datahandling.datahandling_interface import DataHandling 

9from pystencils.datahandling.pycuda import PyCudaArrayHandler, PyCudaNotAvailableHandler 

10from pystencils.datahandling.pyopencl import PyOpenClArrayHandler 

11from pystencils.field import ( 

12 Field, FieldType, create_numpy_array_with_layout, layout_string_to_tuple, 

13 spatial_layout_string_to_tuple) 

14from pystencils.slicing import normalize_slice, remove_ghost_layers 

15from pystencils.utils import DotDict 

16 

17 

18class SerialDataHandling(DataHandling): 

19 

20 def __init__(self, 

21 domain_size: Sequence[int], 

22 default_ghost_layers: int = 1, 

23 default_layout: str = 'SoA', 

24 periodicity: Union[bool, Sequence[bool]] = False, 

25 default_target: str = 'cpu', 

26 opencl_queue=None, 

27 opencl_ctx=None, 

28 array_handler=None) -> None: 

29 """ 

30 Creates a data handling for single node simulations. 

31 

32 Args: 

33 domain_size: size of the spatial domain as tuple 

34 default_ghost_layers: default number of ghost layers used, if not overridden in add_array() method 

35 default_layout: default layout used, if not overridden in add_array() method 

36 default_target: either 'cpu' or 'gpu' . If set to 'gpu' for each array also a GPU version is allocated 

37 if not overwritten in add_array, and synchronization functions are for the GPU by default 

38 """ 

39 super(SerialDataHandling, self).__init__() 

40 self._domainSize = tuple(domain_size) 

41 self.default_ghost_layers = default_ghost_layers 

42 self.default_layout = default_layout 

43 self._fields = DotDict() 

44 self.cpu_arrays = DotDict() 

45 self.gpu_arrays = DotDict() 

46 self.custom_data_cpu = DotDict() 

47 self.custom_data_gpu = DotDict() 

48 self._custom_data_transfer_functions = {} 

49 self._opencl_queue = opencl_queue 

50 self._opencl_ctx = opencl_ctx 

51 

52 if not array_handler: 52 ↛ 61line 52 didn't jump to line 61, because the condition on line 52 was never false

53 try: 

54 self.array_handler = PyCudaArrayHandler() 

55 except Exception: 

56 self.array_handler = PyCudaNotAvailableHandler() 

57 

58 if default_target == 'opencl' or opencl_queue: 58 ↛ 59line 58 didn't jump to line 59, because the condition on line 58 was never true

59 self.array_handler = PyOpenClArrayHandler(opencl_queue) 

60 else: 

61 self.array_handler = array_handler 

62 

63 if periodicity is None or periodicity is False: 

64 periodicity = [False] * self.dim 

65 if periodicity is True: 65 ↛ 66line 65 didn't jump to line 66, because the condition on line 65 was never true

66 periodicity = [True] * self.dim 

67 

68 self._periodicity = periodicity 

69 self._field_information = {} 

70 self.default_target = default_target 

71 self._start_time = time.perf_counter() 

72 

73 @property 

74 def dim(self): 

75 return len(self._domainSize) 

76 

77 @property 

78 def shape(self): 

79 return self._domainSize 

80 

81 @property 

82 def periodicity(self): 

83 return self._periodicity 

84 

85 @property 

86 def fields(self): 

87 return self._fields 

88 

89 def ghost_layers_of_field(self, name): 

90 return self._field_information[name]['ghost_layers'] 

91 

92 def values_per_cell(self, name): 

93 return self._field_information[name]['values_per_cell'] 

94 

95 def add_array(self, name, values_per_cell=1, dtype=np.float64, latex_name=None, ghost_layers=None, layout=None, 

96 cpu=True, gpu=None, alignment=False, field_type=FieldType.GENERIC): 

97 if ghost_layers is None: 

98 ghost_layers = self.default_ghost_layers 

99 if layout is None: 99 ↛ 101line 99 didn't jump to line 101, because the condition on line 99 was never false

100 layout = self.default_layout 

101 if gpu is None: 101 ↛ 104line 101 didn't jump to line 104

102 gpu = self.default_target in self._GPU_LIKE_TARGETS 

103 

104 kwargs = { 

105 'shape': tuple(s + 2 * ghost_layers for s in self._domainSize), 

106 'dtype': dtype, 

107 } 

108 

109 if not hasattr(values_per_cell, '__len__'): 109 ↛ 111line 109 didn't jump to line 111, because the condition on line 109 was never false

110 values_per_cell = (values_per_cell, ) 

111 if len(values_per_cell) == 1 and values_per_cell[0] == 1: 

112 values_per_cell = () 

113 

114 self._field_information[name] = { 

115 'ghost_layers': ghost_layers, 

116 'values_per_cell': values_per_cell, 

117 'layout': layout, 

118 'dtype': dtype, 

119 'alignment': alignment, 

120 'field_type': field_type, 

121 } 

122 

123 index_dimensions = len(values_per_cell) 

124 kwargs['shape'] = kwargs['shape'] + values_per_cell 

125 

126 if index_dimensions > 0: 

127 layout_tuple = layout_string_to_tuple(layout, self.dim + index_dimensions) 

128 else: 

129 layout_tuple = spatial_layout_string_to_tuple(layout, self.dim) 

130 

131 # cpu_arr is always created - since there is no create_pycuda_array_with_layout() 

132 byte_offset = ghost_layers * np.dtype(dtype).itemsize 

133 cpu_arr = create_numpy_array_with_layout(layout=layout_tuple, alignment=alignment, 

134 byte_offset=byte_offset, **kwargs) 

135 

136 if alignment and gpu: 

137 raise NotImplementedError("Alignment for GPU fields not supported") 

138 

139 if cpu: 139 ↛ 143line 139 didn't jump to line 143, because the condition on line 139 was never false

140 if name in self.cpu_arrays: 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true

141 raise ValueError("CPU Field with this name already exists") 

142 self.cpu_arrays[name] = cpu_arr 

143 if gpu: 143 ↛ 144line 143 didn't jump to line 144, because the condition on line 143 was never true

144 if name in self.gpu_arrays: 

145 raise ValueError("GPU Field with this name already exists") 

146 self.gpu_arrays[name] = self.array_handler.to_gpu(cpu_arr) 

147 

148 assert all(f.name != name for f in self.fields.values()), "Symbolic field with this name already exists" 

149 self.fields[name] = Field.create_from_numpy_array(name, cpu_arr, index_dimensions=index_dimensions, 

150 field_type=field_type) 

151 self.fields[name].latex_name = latex_name 

152 return self.fields[name] 

153 

154 def add_custom_data(self, name, cpu_creation_function, 

155 gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None): 

156 

157 if cpu_creation_function and gpu_creation_function: 

158 if cpu_to_gpu_transfer_func is None or gpu_to_cpu_transfer_func is None: 

159 raise ValueError("For GPU data, both transfer functions have to be specified") 

160 self._custom_data_transfer_functions[name] = (cpu_to_gpu_transfer_func, gpu_to_cpu_transfer_func) 

161 

162 assert name not in self.custom_data_cpu 

163 if cpu_creation_function: 

164 assert name not in self.cpu_arrays 

165 self.custom_data_cpu[name] = cpu_creation_function() 

166 

167 if gpu_creation_function: 

168 assert name not in self.gpu_arrays 

169 self.custom_data_gpu[name] = gpu_creation_function() 

170 

171 def has_data(self, name): 

172 return name in self.fields 

173 

174 def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None): 

175 return self.add_array(name, latex_name=latex_name, cpu=cpu, gpu=gpu, 

176 **self._field_information[name_of_template_field]) 

177 

178 def iterate(self, slice_obj=None, gpu=False, ghost_layers=True, inner_ghost_layers=True): 

179 if ghost_layers is True: 179 ↛ 180line 179 didn't jump to line 180, because the condition on line 179 was never true

180 ghost_layers = self.default_ghost_layers 

181 elif ghost_layers is False: 

182 ghost_layers = 0 

183 elif isinstance(ghost_layers, str): 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true

184 ghost_layers = self.ghost_layers_of_field(ghost_layers) 

185 

186 if slice_obj is None: 186 ↛ 188line 186 didn't jump to line 188, because the condition on line 186 was never false

187 slice_obj = (slice(None, None, None),) * self.dim 

188 slice_obj = normalize_slice(slice_obj, tuple(s + 2 * ghost_layers for s in self._domainSize)) 

189 slice_obj = tuple(s if type(s) is slice else slice(s, s + 1, None) for s in slice_obj) 

190 

191 arrays = self.gpu_arrays if gpu else self.cpu_arrays 

192 custom_data_dict = self.custom_data_gpu if gpu else self.custom_data_cpu 

193 iter_dict = custom_data_dict.copy() 

194 for name, arr in arrays.items(): 

195 field_gls = self._field_information[name]['ghost_layers'] 

196 if field_gls < ghost_layers: 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true

197 continue 

198 arr = remove_ghost_layers(arr, index_dimensions=len(arr.shape) - self.dim, 

199 ghost_layers=field_gls - ghost_layers) 

200 iter_dict[name] = arr 

201 

202 offset = tuple(s.start - ghost_layers for s in slice_obj) 

203 yield SerialBlock(iter_dict, offset, slice_obj) 

204 

205 def gather_array(self, name, slice_obj=None, ghost_layers=False, **kwargs): 

206 gl_to_remove = self._field_information[name]['ghost_layers'] 

207 if isinstance(ghost_layers, int): 207 ↛ 209line 207 didn't jump to line 209, because the condition on line 207 was never false

208 gl_to_remove -= ghost_layers 

209 if ghost_layers is True: 209 ↛ 210line 209 didn't jump to line 210, because the condition on line 209 was never true

210 gl_to_remove = 0 

211 arr = self.cpu_arrays[name] 

212 ind_dimensions = self.fields[name].index_dimensions 

213 spatial_dimensions = self.fields[name].spatial_dimensions 

214 

215 arr = remove_ghost_layers(arr, index_dimensions=ind_dimensions, ghost_layers=gl_to_remove) 

216 

217 if slice_obj is not None: 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true

218 normalized_slice = normalize_slice(slice_obj[:spatial_dimensions], arr.shape[:spatial_dimensions]) 

219 normalized_slice = tuple(s if type(s) is slice else slice(s, s + 1, None) for s in normalized_slice) 

220 normalized_slice += slice_obj[spatial_dimensions:] 

221 arr = arr[normalized_slice] 

222 else: 

223 arr = arr.view() 

224 arr.flags.writeable = False 

225 return arr 

226 

227 def swap(self, name1, name2, gpu=None): 

228 if gpu is None: 

229 gpu = self.default_target in self._GPU_LIKE_TARGETS 

230 arr = self.gpu_arrays if gpu else self.cpu_arrays 

231 arr[name1], arr[name2] = arr[name2], arr[name1] 

232 

233 def all_to_cpu(self): 

234 for name in (self.cpu_arrays.keys() & self.gpu_arrays.keys()) | self._custom_data_transfer_functions.keys(): 234 ↛ 235line 234 didn't jump to line 235, because the loop on line 234 never started

235 self.to_cpu(name) 

236 

237 def all_to_gpu(self): 

238 for name in (self.cpu_arrays.keys() & self.gpu_arrays.keys()) | self._custom_data_transfer_functions.keys(): 238 ↛ 239line 238 didn't jump to line 239, because the loop on line 238 never started

239 self.to_gpu(name) 

240 

241 def run_kernel(self, kernel_function, **kwargs): 

242 arrays = self.gpu_arrays if kernel_function.ast.backend in self._GPU_LIKE_BACKENDS else self.cpu_arrays 

243 kernel_function(**{**arrays, **kwargs}) 

244 

245 def get_kernel_kwargs(self, kernel_function, **kwargs): 

246 result = {} 

247 result.update(self.gpu_arrays if kernel_function.ast.backend in self._GPU_LIKE_BACKENDS else self.cpu_arrays) 

248 result.update(kwargs) 

249 return [result] 

250 

251 def to_cpu(self, name): 

252 if name in self._custom_data_transfer_functions: 

253 transfer_func = self._custom_data_transfer_functions[name][1] 

254 transfer_func(self.custom_data_gpu[name], self.custom_data_cpu[name]) 

255 else: 

256 self.array_handler.download(self.gpu_arrays[name], self.cpu_arrays[name]) 

257 

258 def to_gpu(self, name): 

259 if name in self._custom_data_transfer_functions: 

260 transfer_func = self._custom_data_transfer_functions[name][0] 

261 transfer_func(self.custom_data_gpu[name], self.custom_data_cpu[name]) 

262 else: 

263 self.array_handler.upload(self.gpu_arrays[name], self.cpu_arrays[name]) 

264 

265 def is_on_gpu(self, name): 

266 return name in self.gpu_arrays 

267 

268 def synchronization_function_cpu(self, names, stencil_name=None, **_): 

269 return self.synchronization_function(names, stencil_name, target='cpu') 

270 

271 def synchronization_function_gpu(self, names, stencil_name=None, **_): 

272 return self.synchronization_function(names, stencil_name, target='gpu') 

273 

274 def synchronization_function(self, names, stencil=None, target=None, functor=None, **_): 

275 if target is None: 

276 target = self.default_target 

277 if target == 'opencl': 

278 target = 'gpu' 

279 assert target in ('cpu', 'gpu') 

280 if not hasattr(names, '__len__') or type(names) is str: 

281 names = [names] 

282 

283 filtered_stencil = [] 

284 neighbors = [-1, 0, 1] 

285 

286 if (stencil is None and self.dim == 2) or (stencil is not None and stencil.startswith('D2')): 

287 directions = itertools.product(*[neighbors] * 2) 

288 elif (stencil is None and self.dim == 3) or (stencil is not None and stencil.startswith('D3')): 

289 directions = itertools.product(*[neighbors] * 3) 

290 else: 

291 raise ValueError("Invalid stencil") 

292 

293 for direction in directions: 

294 use_direction = True 

295 if direction == (0, 0) or direction == (0, 0, 0): 

296 use_direction = False 

297 for component, periodicity in zip(direction, self._periodicity): 

298 if not periodicity and component != 0: 

299 use_direction = False 

300 if use_direction: 

301 filtered_stencil.append(direction) 

302 

303 result = [] 

304 for name in names: 

305 gls = self._field_information[name]['ghost_layers'] 

306 values_per_cell = self._field_information[name]['values_per_cell'] 

307 if values_per_cell == (): 

308 values_per_cell = (1, ) 

309 if len(values_per_cell) == 1: 

310 values_per_cell = values_per_cell[0] 

311 

312 if len(filtered_stencil) > 0: 

313 if target == 'cpu': 

314 if functor is None: 

315 from pystencils.slicing import get_periodic_boundary_functor 

316 functor = get_periodic_boundary_functor 

317 result.append(functor(filtered_stencil, ghost_layers=gls)) 

318 else: 

319 if functor is None: 

320 from pystencils.gpucuda.periodicity import get_periodic_boundary_functor as functor 

321 target = 'gpu' if not isinstance(self.array_handler, PyOpenClArrayHandler) else 'opencl' 

322 result.append(functor(filtered_stencil, self._domainSize, 

323 index_dimensions=self.fields[name].index_dimensions, 

324 index_dim_shape=values_per_cell, 

325 dtype=self.fields[name].dtype.numpy_dtype, 

326 ghost_layers=gls, 

327 target=target, 

328 opencl_queue=self._opencl_queue, 

329 opencl_ctx=self._opencl_ctx)) 

330 

331 if target == 'cpu': 

332 def result_functor(): 

333 for arr_name, func in zip(names, result): 

334 func(pdfs=self.cpu_arrays[arr_name]) 

335 else: 

336 def result_functor(): 

337 for arr_name, func in zip(names, result): 

338 func(pdfs=self.gpu_arrays[arr_name]) 

339 

340 return result_functor 

341 

342 @property 

343 def array_names(self): 

344 return tuple(self.fields.keys()) 

345 

346 @property 

347 def custom_data_names(self): 

348 return tuple(self.custom_data_cpu.keys()) 

349 

350 def reduce_float_sequence(self, sequence, operation, all_reduce=False) -> np.array: 

351 return np.array(sequence) 

352 

353 def reduce_int_sequence(self, sequence, operation, all_reduce=False) -> np.array: 

354 return np.array(sequence) 

355 

356 def create_vtk_writer(self, file_name, data_names, ghost_layers=False): 

357 from pystencils.datahandling.vtk import image_to_vtk 

358 

359 def writer(step): 

360 full_file_name = "%s_%08d" % (file_name, step,) 

361 cell_data = {} 

362 for name in data_names: 

363 field = self._get_field_with_given_number_of_ghost_layers(name, ghost_layers) 

364 if self.dim == 2: 

365 field = field[:, :, np.newaxis] 

366 if len(field.shape) == 3: 

367 cell_data[name] = np.ascontiguousarray(field) 

368 elif len(field.shape) == 4: 

369 values_per_cell = field.shape[-1] 

370 if values_per_cell == self.dim: 

371 field = [np.ascontiguousarray(field[..., i]) for i in range(values_per_cell)] 

372 if len(field) == 2: 

373 field.append(np.zeros_like(field[0])) 

374 cell_data[name] = tuple(field) 

375 else: 

376 for i in range(values_per_cell): 

377 cell_data["%s[%d]" % (name, i)] = np.ascontiguousarray(field[..., i]) 

378 else: 

379 raise NotImplementedError("VTK export for fields with more than one index " 

380 "coordinate not implemented") 

381 image_to_vtk(full_file_name, cell_data=cell_data) 

382 return writer 

383 

384 def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name, ghost_layers=False): 

385 from pystencils.datahandling.vtk import image_to_vtk 

386 

387 def writer(step): 

388 full_file_name = "%s_%08d" % (file_name, step,) 

389 field = self._get_field_with_given_number_of_ghost_layers(data_name, ghost_layers) 

390 if self.dim == 2: 

391 field = field[:, :, np.newaxis] 

392 cell_data = {name: np.ascontiguousarray(np.bitwise_and(field, field.dtype.type(mask)) > 0, dtype=np.uint8) 

393 for mask, name in masks_to_name.items()} 

394 image_to_vtk(full_file_name, cell_data=cell_data) 

395 

396 return writer 

397 

398 def _get_field_with_given_number_of_ghost_layers(self, name, ghost_layers): 

399 actual_ghost_layers = self.ghost_layers_of_field(name) 

400 if ghost_layers is True: 

401 ghost_layers = actual_ghost_layers 

402 

403 gl_to_remove = actual_ghost_layers - ghost_layers 

404 ind_dims = len(self._field_information[name]['values_per_cell']) 

405 return remove_ghost_layers(self.cpu_arrays[name], ind_dims, gl_to_remove) 

406 

407 def log(self, *args, level='INFO'): 

408 level = level.upper() 

409 message = " ".join(str(e) for e in args) 

410 

411 time_running = time.perf_counter() - self._start_time 

412 spacing = 7 - len(str(int(time_running))) 

413 message = f"[{level: <8}]{spacing * '-'}({time_running:.3f} sec) {message} " 

414 print(message, flush=True) 

415 

416 def log_on_root(self, *args, level='INFO'): 

417 self.log(*args, level=level) 

418 

419 @property 

420 def is_root(self): 

421 return True 

422 

423 @property 

424 def world_rank(self): 

425 return 0 

426 

427 def save_all(self, file): 

428 np.savez_compressed(file, **self.cpu_arrays) 

429 

430 def load_all(self, file): 

431 if '.npz' not in file: 

432 file += '.npz' 

433 file_contents = np.load(file) 

434 for arr_name, arr_contents in self.cpu_arrays.items(): 

435 if arr_name not in file_contents: 

436 print(f"Skipping read data {arr_name} because there is no data with this name in data handling") 

437 continue 

438 if file_contents[arr_name].shape != arr_contents.shape: 

439 print(f"Skipping read data {arr_name} because shapes don't match. " 

440 f"Read array shape {file_contents[arr_name].shape}, existing array shape {arr_contents.shape}") 

441 continue 

442 np.copyto(arr_contents, file_contents[arr_name])