1import os 

2import warnings 

3 

4import numpy as np 

5# noinspection PyPep8Naming 

6import waLBerla as wlb 

7 

8from pystencils.datahandling.blockiteration import block_iteration, sliced_block_iteration 

9from pystencils.datahandling.datahandling_interface import DataHandling 

10from pystencils.field import Field, FieldType 

11from pystencils.kernelparameters import FieldPointerSymbol 

12from pystencils.utils import DotDict 

13 

14 

15class ParallelDataHandling(DataHandling): 

16 GPU_DATA_PREFIX = "gpu_" 

17 VTK_COUNTER = 0 

18 

19 def __init__(self, blocks, default_ghost_layers=1, default_layout='SoA', dim=3, default_target='cpu'): 

20 """ 

21 Creates data handling based on walberla block storage 

22 

23 Args: 

24 blocks: walberla block storage 

25 default_ghost_layers: nr of ghost layers used if not specified in add() method 

26 default_layout: layout used if no layout is given to add() method 

27 dim: dimension of scenario, 

28 walberla always uses three dimensions, so if dim=2 the extend of the 

29 z coordinate of blocks has to be 1 

30 default_target: either 'cpu' or 'gpu' . If set to 'gpu' for each array also a GPU version is allocated 

31 if not overwritten in add_array, and synchronization functions are for the GPU by default 

32 """ 

33 super(ParallelDataHandling, self).__init__() 

34 assert dim in (2, 3) 

35 self.blocks = blocks 

36 self.default_ghost_layers = default_ghost_layers 

37 self.default_layout = default_layout 

38 self._fields = DotDict() # maps name to symbolic pystencils field 

39 self._field_name_to_cpu_data_name = {} 

40 self._field_name_to_gpu_data_name = {} 

41 self.data_names = set() 

42 self._dim = dim 

43 self._fieldInformation = {} 

44 self._cpu_gpu_pairs = [] 

45 self._custom_data_transfer_functions = {} 

46 self._custom_data_names = [] 

47 self._reduce_map = { 

48 'sum': wlb.mpi.SUM, 

49 'min': wlb.mpi.MIN, 

50 'max': wlb.mpi.MAX, 

51 } 

52 

53 if self._dim == 2: 

54 assert self.blocks.getDomainCellBB().size[2] == 1 

55 self.default_target = default_target 

56 

57 @property 

58 def dim(self): 

59 return self._dim 

60 

61 @property 

62 def shape(self): 

63 return self.blocks.getDomainCellBB().size[:self.dim] 

64 

65 @property 

66 def periodicity(self): 

67 return self.blocks.periodic[:self._dim] 

68 

69 @property 

70 def fields(self): 

71 return self._fields 

72 

73 def ghost_layers_of_field(self, name): 

74 return self._fieldInformation[name]['ghost_layers'] 

75 

76 def values_per_cell(self, name): 

77 return self._fieldInformation[name]['values_per_cell'] 

78 

79 def add_custom_data(self, name, cpu_creation_function, 

80 gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None): 

81 if cpu_creation_function and gpu_creation_function: 

82 if cpu_to_gpu_transfer_func is None or gpu_to_cpu_transfer_func is None: 

83 raise ValueError("For GPU data, both transfer functions have to be specified") 

84 self._custom_data_transfer_functions[name] = (cpu_to_gpu_transfer_func, gpu_to_cpu_transfer_func) 

85 

86 if cpu_creation_function: 

87 self.blocks.addBlockData(name, cpu_creation_function) 

88 if gpu_creation_function: 

89 self.blocks.addBlockData(self.GPU_DATA_PREFIX + name, gpu_creation_function) 

90 self._custom_data_names.append(name) 

91 

92 def add_array(self, name, values_per_cell=1, dtype=np.float64, latex_name=None, ghost_layers=None, 

93 layout=None, cpu=True, gpu=None, alignment=False, field_type=FieldType.GENERIC): 

94 if ghost_layers is None: 

95 ghost_layers = self.default_ghost_layers 

96 if gpu is None: 

97 gpu = self.default_target == 'gpu' 

98 if layout is None: 

99 layout = self.default_layout 

100 if len(self.blocks) == 0: 

101 raise ValueError("Data handling expects that each process has at least one block") 

102 if hasattr(dtype, 'type'): 

103 dtype = dtype.type 

104 if name in self.blocks[0].fieldNames or self.GPU_DATA_PREFIX + name in self.blocks[0].fieldNames: 

105 raise ValueError("Data with this name has already been added") 

106 

107 if alignment is False or alignment is None: 

108 alignment = 0 

109 if hasattr(values_per_cell, '__len__'): 

110 raise NotImplementedError("Parallel data handling does not support multiple index dimensions") 

111 

112 self._fieldInformation[name] = { 

113 'ghost_layers': ghost_layers, 

114 'values_per_cell': values_per_cell, 

115 'layout': layout, 

116 'dtype': dtype, 

117 'alignment': alignment, 

118 'field_type': field_type, 

119 } 

120 

121 layout_map = {'fzyx': wlb.field.Layout.fzyx, 'zyxf': wlb.field.Layout.zyxf, 

122 'f': wlb.field.Layout.fzyx, 

123 'SoA': wlb.field.Layout.fzyx, 'AoS': wlb.field.Layout.zyxf} 

124 

125 if cpu: 

126 wlb.field.addToStorage(self.blocks, name, dtype, fSize=values_per_cell, layout=layout_map[layout], 

127 ghostLayers=ghost_layers, alignment=alignment) 

128 if gpu: 

129 if alignment != 0: 

130 raise ValueError("Alignment for walberla GPU fields not yet supported") 

131 wlb.cuda.addGpuFieldToStorage(self.blocks, self.GPU_DATA_PREFIX + name, dtype, fSize=values_per_cell, 

132 usePitchedMem=False, ghostLayers=ghost_layers, layout=layout_map[layout]) 

133 

134 if cpu and gpu: 

135 self._cpu_gpu_pairs.append((name, self.GPU_DATA_PREFIX + name)) 

136 

137 block_bb = self.blocks.getBlockCellBB(self.blocks[0]) 

138 shape = tuple(s + 2 * ghost_layers for s in block_bb.size[:self.dim]) 

139 index_dimensions = 1 if values_per_cell > 1 else 0 

140 if index_dimensions == 1: 

141 shape += (values_per_cell,) 

142 

143 assert all(f.name != name for f in self.fields.values()), "Symbolic field with this name already exists" 

144 

145 self.fields[name] = Field.create_generic(name, self.dim, dtype, index_dimensions, layout, 

146 index_shape=(values_per_cell,) if index_dimensions > 0 else None, 

147 field_type=field_type) 

148 self.fields[name].latex_name = latex_name 

149 self._field_name_to_cpu_data_name[name] = name 

150 if gpu: 

151 self._field_name_to_gpu_data_name[name] = self.GPU_DATA_PREFIX + name 

152 return self.fields[name] 

153 

154 def has_data(self, name): 

155 return name in self._fields 

156 

157 @property 

158 def array_names(self): 

159 return tuple(self.fields.keys()) 

160 

161 @property 

162 def custom_data_names(self): 

163 return tuple(self._custom_data_names) 

164 

165 def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None): 

166 return self.add_array(name, latex_name=latex_name, cpu=cpu, gpu=gpu, 

167 **self._fieldInformation[name_of_template_field]) 

168 

169 def swap(self, name1, name2, gpu=False): 

170 if gpu: 

171 name1 = self.GPU_DATA_PREFIX + name1 

172 name2 = self.GPU_DATA_PREFIX + name2 

173 for block in self.blocks: 

174 block[name1].swapDataPointers(block[name2]) 

175 

176 def iterate(self, slice_obj=None, gpu=False, ghost_layers=True, inner_ghost_layers=True): 

177 if ghost_layers is True: 

178 ghost_layers = self.default_ghost_layers 

179 elif ghost_layers is False: 

180 ghost_layers = 0 

181 elif isinstance(ghost_layers, str): 

182 ghost_layers = self.ghost_layers_of_field(ghost_layers) 

183 

184 if inner_ghost_layers is True: 

185 inner_ghost_layers = self.default_ghost_layers 

186 elif inner_ghost_layers is False: 

187 inner_ghost_layers = 0 

188 elif isinstance(ghost_layers, str): 

189 ghost_layers = self.ghost_layers_of_field(ghost_layers) 

190 

191 prefix = self.GPU_DATA_PREFIX if gpu else "" 

192 if slice_obj is not None: 

193 yield from sliced_block_iteration(self.blocks, slice_obj, inner_ghost_layers, ghost_layers, 

194 self.dim, prefix) 

195 else: 

196 yield from block_iteration(self.blocks, ghost_layers, self.dim, prefix) 

197 

198 def gather_array(self, name, slice_obj=None, all_gather=False, ghost_layers=False): 

199 if ghost_layers is not False: 

200 warnings.warn("gather_array with ghost layers is only supported in serial data handling. " 

201 "Array without ghost layers is returned") 

202 

203 if slice_obj is None: 

204 slice_obj = tuple([slice(None, None, None)] * self.dim) 

205 if self.dim == 2: 

206 slice_obj = slice_obj[:2] + (0.5,) + slice_obj[2:] 

207 

208 last_element = slice_obj[3:] 

209 

210 array = wlb.field.gatherField(self.blocks, name, slice_obj[:3], all_gather) 

211 if array is None: 

212 return None 

213 

214 if self.dim == 2: 

215 array = array[:, :, 0] 

216 if last_element and self.fields[name].index_dimensions > 0: 

217 array = array[..., last_element[0]] 

218 

219 return array 

220 

221 def _normalize_arr_shape(self, arr, index_dimensions): 

222 if index_dimensions == 0 and len(arr.shape) > 3: 

223 arr = arr[..., 0] 

224 if self.dim == 2 and len(arr.shape) > 2: 

225 arr = arr[:, :, 0] 

226 return arr 

227 

228 def run_kernel(self, kernel_function, **kwargs): 

229 for arg_dict in self.get_kernel_kwargs(kernel_function, **kwargs): 

230 kernel_function(**arg_dict) 

231 

232 def get_kernel_kwargs(self, kernel_function, **kwargs): 

233 if kernel_function.ast.backend == 'gpucuda': 

234 name_map = self._field_name_to_gpu_data_name 

235 to_array = wlb.cuda.toGpuArray 

236 else: 

237 name_map = self._field_name_to_cpu_data_name 

238 to_array = wlb.field.toArray 

239 data_used_in_kernel = [(name_map[p.symbol.field_name], self.fields[p.symbol.field_name]) 

240 for p in kernel_function.parameters if 

241 isinstance(p.symbol, FieldPointerSymbol) and p.symbol.field_name not in kwargs] 

242 

243 result = [] 

244 for block in self.blocks: 

245 field_args = {} 

246 for data_name, f in data_used_in_kernel: 

247 arr = to_array(block[data_name], with_ghost_layers=[True, True, self.dim == 3]) 

248 arr = self._normalize_arr_shape(arr, f.index_dimensions) 

249 field_args[f.name] = arr 

250 field_args.update(kwargs) 

251 result.append(field_args) 

252 return result 

253 

254 def to_cpu(self, name): 

255 if name in self._custom_data_transfer_functions: 

256 transfer_func = self._custom_data_transfer_functions[name][1] 

257 for block in self.blocks: 

258 transfer_func(block[self.GPU_DATA_PREFIX + name], block[name]) 

259 else: 

260 wlb.cuda.copyFieldToCpu(self.blocks, self.GPU_DATA_PREFIX + name, name) 

261 

262 def to_gpu(self, name): 

263 if name in self._custom_data_transfer_functions: 

264 transfer_func = self._custom_data_transfer_functions[name][0] 

265 for block in self.blocks: 

266 transfer_func(block[self.GPU_DATA_PREFIX + name], block[name]) 

267 else: 

268 wlb.cuda.copyFieldToGpu(self.blocks, self.GPU_DATA_PREFIX + name, name) 

269 

270 def is_on_gpu(self, name): 

271 return (name, self.GPU_DATA_PREFIX + name) in self._cpu_gpu_pairs 

272 

273 def all_to_cpu(self): 

274 for cpu_name, gpu_name in self._cpu_gpu_pairs: 

275 wlb.cuda.copyFieldToCpu(self.blocks, gpu_name, cpu_name) 

276 for name in self._custom_data_transfer_functions.keys(): 

277 self.to_cpu(name) 

278 

279 def all_to_gpu(self): 

280 for cpu_name, gpu_name in self._cpu_gpu_pairs: 

281 wlb.cuda.copyFieldToGpu(self.blocks, gpu_name, cpu_name) 

282 for name in self._custom_data_transfer_functions.keys(): 

283 self.to_gpu(name) 

284 

285 def synchronization_function_cpu(self, names, stencil=None, buffered=True, stencil_restricted=False, **_): 

286 return self.synchronization_function(names, stencil, 'cpu', buffered, stencil_restricted) 

287 

288 def synchronization_function_gpu(self, names, stencil=None, buffered=True, stencil_restricted=False, **_): 

289 return self.synchronization_function(names, stencil, 'gpu', buffered, stencil_restricted) 

290 

291 def synchronization_function(self, names, stencil=None, target=None, buffered=True, stencil_restricted=False): 

292 if target is None: 

293 target = self.default_target 

294 

295 if stencil is None: 

296 stencil = 'D3Q27' if self.dim == 3 else 'D2Q9' 

297 

298 if not hasattr(names, '__len__') or type(names) is str: 

299 names = [names] 

300 

301 create_scheme = wlb.createUniformBufferedScheme if buffered else wlb.createUniformDirectScheme 

302 if target == 'cpu': 

303 create_packing = wlb.field.createPackInfo if buffered else wlb.field.createMPIDatatypeInfo 

304 if buffered and stencil_restricted: 

305 create_packing = wlb.field.createStencilRestrictedPackInfo 

306 else: 

307 assert target == 'gpu' 

308 create_packing = wlb.cuda.createPackInfo if buffered else wlb.cuda.createMPIDatatypeInfo 

309 names = [self.GPU_DATA_PREFIX + name for name in names] 

310 

311 sync_function = create_scheme(self.blocks, stencil) 

312 for name in names: 

313 sync_function.addDataToCommunicate(create_packing(self.blocks, name)) 

314 

315 return sync_function 

316 

317 def reduce_float_sequence(self, sequence, operation, all_reduce=False): 

318 if all_reduce: 

319 return np.array(wlb.mpi.allreduceReal(sequence, self._reduce_map[operation.lower()])) 

320 else: 

321 result = np.array(wlb.mpi.reduceReal(sequence, self._reduce_map[operation.lower()], 0)) 

322 return result if wlb.mpi.worldRank() == 0 else None 

323 

324 def reduce_int_sequence(self, sequence, operation, all_reduce=False): 

325 if all_reduce: 

326 return np.array(wlb.mpi.allreduceInt(sequence, self._reduce_map[operation.lower()])) 

327 else: 

328 result = np.array(wlb.mpi.reduceInt(sequence, self._reduce_map[operation.lower()], 0)) 

329 return result if wlb.mpi.worldRank() == 0 else None 

330 

331 def create_vtk_writer(self, file_name, data_names, ghost_layers=False): 

332 if ghost_layers is False: 

333 ghost_layers = 0 

334 if ghost_layers is True: 

335 ghost_layers = min(self.ghost_layers_of_field(n) for n in data_names) 

336 file_name = "%s_%02d" % (file_name, ParallelDataHandling.VTK_COUNTER) 

337 ParallelDataHandling.VTK_COUNTER += 1 

338 output = wlb.vtk.makeOutput(self.blocks, file_name, ghostLayers=ghost_layers) 

339 for n in data_names: 

340 output.addCellDataWriter(wlb.field.createVTKWriter(self.blocks, n)) 

341 return output 

342 

343 def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name, ghost_layers=False): 

344 if ghost_layers is False: 

345 ghost_layers = 0 

346 if ghost_layers is True: 

347 ghost_layers = self.ghost_layers_of_field(data_name) 

348 

349 output = wlb.vtk.makeOutput(self.blocks, file_name, ghostLayers=ghost_layers) 

350 for mask, name in masks_to_name.items(): 

351 w = wlb.field.createBinarizationVTKWriter(self.blocks, data_name, mask, name) 

352 output.addCellDataWriter(w) 

353 return output 

354 

355 @staticmethod 

356 def log(*args, level='INFO'): 

357 _log_map = { 

358 'DEVEL': wlb.log_devel, 

359 'RESULT': wlb.log_result, 

360 'INFO': wlb.log_info, 

361 'WARNING': wlb.log_warning, 

362 'PROGRESS': wlb.log_progress, 

363 } 

364 level = level.upper() 

365 message = " ".join(str(e) for e in args) 

366 _log_map[level](message) 

367 

368 def log_on_root(self, *args, level='INFO'): 

369 if self.is_root: 

370 ParallelDataHandling.log(*args, level=level) 

371 

372 @property 

373 def is_root(self): 

374 return wlb.mpi.worldRank() == 0 

375 

376 @property 

377 def world_rank(self): 

378 return wlb.mpi.worldRank() 

379 

380 def save_all(self, directory): 

381 if not os.path.exists(directory): 

382 os.mkdir(directory) 

383 if os.path.isfile(directory): 

384 raise RuntimeError(f"Trying to save to {directory}, but file exists already") 

385 

386 for field_name, data_name in self._field_name_to_cpu_data_name.items(): 

387 self.blocks.writeBlockData(data_name, os.path.join(directory, field_name + ".dat")) 

388 

389 def load_all(self, directory): 

390 for field_name, data_name in self._field_name_to_cpu_data_name.items(): 

391 self.blocks.readBlockData(data_name, os.path.join(directory, field_name + ".dat"))