1import datetime 

2import itertools 

3import json 

4import os 

5import socket 

6from collections import namedtuple 

7from copy import deepcopy 

8from time import sleep 

9from typing import Any, Callable, Dict, Optional, Sequence, Tuple 

10 

11from pystencils.runhelper import Database 

12from pystencils.utils import DotDict 

13 

14ParameterDict = Dict[str, Any] 

15WeightFunction = Callable[[Dict], int] 

16FilterFunction = Callable[[ParameterDict], Optional[ParameterDict]] 

17 

18 

19class ParameterStudy: 

20 """Manages and runs multiple configurations locally or distributed and stores results in NoSQL database. 

21 

22 To run a parameter study, define a run function that takes all parameters as keyword arguments and returns the 

23 results as a (possibly nested) dictionary. Then, define the parameter sets that this function should be run with. 

24 

25 Examples: 

26 >>> import tempfile 

27 >>> 

28 >>> def dummy_run_function(p1, p2, p3, p4): 

29 ... print("Run called with", p1, p2, p3, p4) 

30 ... return { 'result1': p1 * p2, 'result2': p3 + p4 } 

31 >>> 

32 >>> with tempfile.TemporaryDirectory() as tmp_dir: 

33 ... ps = ParameterStudy(dummy_run_function, database_connector=tmp_dir) 

34 ... ps.add_run({'p1': 5, 'p2': 42, 'p3': 'abc', 'p4': 'def'}) 

35 ... ps.add_combinations( [('p1', [1, 2]), 

36 ... ('p3', ['x', 'y'])], constant_parameters={'p2': 5, 'p4': 'z' }) 

37 ... ps.run() 

38 ... ps.run_scenarios_not_in_database() 

39 ... ps.run_from_command_line(argv=['local']) # alternative to run - exposes a command line interface if 

40 ... # no argv is passed. Does not run anything here, because 

41 ... # configuration already in database are skipped 

42 Run called with 2 5 y z 

43 Run called with 2 5 x z 

44 Run called with 1 5 y z 

45 Run called with 1 5 x z 

46 Run called with 5 42 abc def 

47 

48 Above example runs all parameter combinations locally and stores the returned result in the NoSQL database. 

49 It is also possible to distribute the runs to multiple processes, by starting a server on one machine and multiple 

50 executing runners on other machines. The server distributes configurations to the runners, collects their results 

51 to stores the results in the database. 

52 """ 

53 

54 Run = namedtuple("Run", ['parameter_dict', 'weight']) 

55 

56 def __init__(self, run_function: Callable[..., Dict], runs: Sequence = (), 

57 database_connector: str = './db') -> None: 

58 self.runs = list(runs) 

59 self.run_function = run_function 

60 self.db = Database(database_connector) 

61 

62 def add_run(self, parameter_dict: ParameterDict, weight: int = 1) -> None: 

63 """Schedule a dictionary of parameters to run in this parameter study. 

64 

65 Args: 

66 parameter_dict: used as keyword arguments to the run function. 

67 weight: weight of the run configuration which should be proportional to runtime of this case, 

68 used for progress display and distribution to processes. 

69 """ 

70 self.runs.append(self.Run(parameter_dict, weight)) 

71 

72 def add_combinations(self, degrees_of_freedom: Sequence[Tuple[str, Sequence[Any]]], 

73 constant_parameters: Optional[ParameterDict] = None, 

74 filter_function: Optional[FilterFunction] = None, 

75 runtime_weight_function: Optional[WeightFunction] = None) -> None: 

76 """Add all possible combinations of given parameters as runs. 

77 

78 This is a convenience function to simulate all possible parameter combinations of a scenario. 

79 Configurations can be filtered and weighted by passing filter- and weighting functions. 

80 

81 Args: 

82 degrees_of_freedom: defines for each parameter the possible values it can take on 

83 constant_parameters: parameter dict, for parameters that should not be changed 

84 filter_function: optional function that receives a parameter dict and returns the potentially modified dict 

85 or None if this combination should not be added. 

86 runtime_weight_function: function mapping a parameter dict to the runtime weight (see weight at add_runs) 

87 

88 Examples: 

89 degrees_of_freedom = [('p1', [1,2]), 

90 ('p2', ['a', 'b'])] 

91 is equivalent to calling add_run four times, with all possible parameter combinations. 

92 """ 

93 parameter_names = [e[0] for e in degrees_of_freedom] 

94 parameter_values = [e[1] for e in degrees_of_freedom] 

95 

96 default_params_dict = {} if constant_parameters is None else constant_parameters 

97 for value_tuple in itertools.product(*parameter_values): 

98 params_dict = deepcopy(default_params_dict) 

99 params_dict.update({name: value for name, value in zip(parameter_names, value_tuple)}) 

100 params = DotDict(params_dict) 

101 if filter_function: 

102 params = filter_function(params) 

103 if params is None: 

104 continue 

105 weight = 1 if not runtime_weight_function else runtime_weight_function(params) 

106 self.add_run(params, weight) 

107 

108 def run(self, process: int = 0, num_processes: int = 1, parameter_update: Optional[ParameterDict] = None) -> None: 

109 """Runs all added configurations. 

110 

111 Args: 

112 process: configurations are split into num_processes chunks according to weights and only the 

113 process'th chunk is run. To run all, use process=0 and num_processes=1 

114 num_processes: see above 

115 parameter_update: Extend/override all configurations with this dictionary. 

116 """ 

117 parameter_update = {} if parameter_update is None else parameter_update 

118 own_runs = self._distribute_runs(self.runs, process, num_processes) 

119 for run in own_runs: 

120 parameter_dict = run.parameter_dict.copy() 

121 parameter_dict.update(parameter_update) 

122 result = self.run_function(**parameter_dict) 

123 

124 self.db.save(run.parameter_dict, result, None, changed_params=parameter_update) 

125 

126 def run_scenarios_not_in_database(self, parameter_update: Optional[ParameterDict] = None) -> None: 

127 """Same as run method, but runs only configuration for which no result is in the database yet.""" 

128 parameter_update = {} if parameter_update is None else parameter_update 

129 filtered_runs = self._filter_already_simulated(self.runs) 

130 for run in filtered_runs: 

131 parameter_dict = run.parameter_dict.copy() 

132 parameter_dict.update(parameter_update) 

133 result = self.run_function(**parameter_dict) 

134 

135 self.db.save(run.parameter_dict, result, changed_params=parameter_update) 

136 

137 def run_server(self, ip: str = "0.0.0.0", port: int = 8342): 

138 """Runs server to supply runner clients with scenarios to simulate and collect results from them. 

139 Skips scenarios that are already in the database.""" 

140 from http.server import BaseHTTPRequestHandler, HTTPServer 

141 filtered_runs = self._filter_already_simulated(self.runs) 

142 

143 if not filtered_runs: 

144 print("No Scenarios to simulate") 

145 return 

146 

147 class ParameterStudyServer(BaseHTTPRequestHandler): 

148 parameterStudy = self 

149 all_runs = filtered_runs 

150 runs = filtered_runs.copy() 

151 currently_running = {} 

152 finished_runs = [] 

153 

154 def next_scenario(self, received_json_data): 

155 client_name = received_json_data['client_name'] 

156 if len(self.runs) > 0: 

157 run_status = "%d/%d" % (len(self.finished_runs), len(self.all_runs)) 

158 work_status = "%d/%d" % (sum(r.weight for r in self.finished_runs), 

159 sum(r.weight for r in self.all_runs)) 

160 format_args = { 

161 'remaining': len(self.runs), 

162 'time': datetime.datetime.now().strftime("%H:%M:%S"), 

163 'client_name': client_name, 

164 'run_status': run_status, 

165 'work_status': work_status, 

166 } 

167 

168 scenario = self.runs.pop(0) 

169 print(" {time} {client_name} fetched scenario. Scenarios: {run_status}, Work: {work_status}" 

170 .format(**format_args)) 

171 self.currently_running[client_name] = scenario 

172 return {'status': 'ok', 'params': scenario.parameter_dict} 

173 else: 

174 return {'status': 'finished'} 

175 

176 def result(self, received_json_data): 

177 client_name = received_json_data['client_name'] 

178 run = self.currently_running[client_name] 

179 self.finished_runs.append(run) 

180 del self.currently_running[client_name] 

181 d = received_json_data 

182 

183 def hash_dict(dictionary): 

184 import hashlib 

185 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

186 

187 assert hash_dict(d['params']) == hash_dict(run.parameter_dict), \ 

188 str(d['params']) + "is not equal to " + str(run.parameter_dict) 

189 self.parameterStudy.db.save(run.parameter_dict, 

190 result=d['result'], env=d['env'], changed_params=d['changed_params']) 

191 return {} 

192 

193 # noinspection PyPep8Naming 

194 def do_POST(self) -> None: 

195 mapping = {'/next_scenario': self.next_scenario, 

196 '/result': self.result} 

197 if self.path in mapping.keys(): 

198 data = self._read_contents() 

199 self.send_response(200) 

200 self.send_header("Content-type", "application/json") 

201 self.end_headers() 

202 json_data = json.loads(data) 

203 response = mapping[self.path](json_data) 

204 self.wfile.write(json.dumps(response).encode()) 

205 else: 

206 self.send_response(400) 

207 

208 # noinspection PyPep8Naming 

209 def do_GET(self): 

210 return self.do_POST() 

211 

212 def _read_contents(self): 

213 return self.rfile.read(int(self.headers['Content-Length'])).decode() 

214 

215 def log_message(self, fmt, *args): 

216 return 

217 

218 print(f"Listening to connections on {ip}:{port}. Scenarios to simulate: {len(filtered_runs)}") 

219 server = HTTPServer((ip, port), ParameterStudyServer) 

220 while len(ParameterStudyServer.currently_running) > 0 or len(ParameterStudyServer.runs) > 0: 

221 server.handle_request() 

222 server.handle_request() 

223 

224 def run_client(self, client_name: str = "{hostname}_{pid}", server: str = 'localhost', port: int = 8342, 

225 parameter_update: Optional[ParameterDict] = None, max_time=None) -> None: 

226 """Start runner client that retrieves configuration from server, runs it and reports results back to server. 

227 

228 Args: 

229 client_name: name of the client. Has to be unique for each client. 

230 Placeholders {hostname} and {pid} can be used to generate unique name. 

231 server: url to server 

232 port: port as specified in run_server 

233 parameter_update: Used to override/extend parameters received from the server. 

234 Typical use cases is to set optimization or GPU parameters for some clients to make 

235 some clients simulate on CPU, others on GPU 

236 max_time: maximum runtime in seconds: the client runs scenario after scenario, but starts only a new 

237 scenario if not more than max_time seconds have passed since this function was called. 

238 So the time given here should be the total maximum runtime minus a typical runtime for one setup 

239 """ 

240 from urllib.request import urlopen 

241 from urllib.error import URLError 

242 import time 

243 parameter_update = {} if parameter_update is None else parameter_update 

244 url = f"http://{server}:{port}" 

245 client_name = client_name.format(hostname=socket.gethostname(), pid=os.getpid()) 

246 start_time = time.time() 

247 while True: 

248 try: 

249 if max_time is not None and (time.time() - start_time) > max_time: 

250 print("Stopping client - maximum time reached") 

251 break 

252 http_response = urlopen(url + "/next_scenario", 

253 data=json.dumps({'client_name': client_name}).encode()) 

254 scenario = json.loads(http_response.read().decode()) 

255 if scenario['status'] != 'ok': 

256 break 

257 original_params = scenario['params'].copy() 

258 scenario['params'].update(parameter_update) 

259 result = self.run_function(**scenario['params']) 

260 

261 answer = {'params': original_params, 

262 'changed_params': parameter_update, 

263 'result': result, 

264 'env': Database.get_environment(), 

265 'client_name': client_name} 

266 urlopen(url + '/result', data=json.dumps(answer).encode()) 

267 except URLError: 

268 print(f"Cannot connect to server {url} retrying in 5 seconds...") 

269 sleep(5) 

270 

271 def run_from_command_line(self, argv: Optional[Sequence[str]] = None) -> None: 

272 """Exposes interface to command line with possibility to run directly or distributed via server/client.""" 

273 from argparse import ArgumentParser 

274 

275 def server(a): 

276 if a.database: 

277 self.db = Database(a.database) 

278 self.run_server(a.host, a.port) 

279 

280 def client(a): 

281 self.run_client(a.client_name, a.host, a.port, json.loads(a.parameter_override), a.max_time) 

282 

283 def local(a): 

284 if a.database: 

285 self.db = Database(a.database) 

286 self.run_scenarios_not_in_database(json.loads(a.parameter_override)) 

287 

288 parser = ArgumentParser() 

289 subparsers = parser.add_subparsers() 

290 

291 local_parser = subparsers.add_parser('local', aliases=['l'], 

292 help="Run scenarios locally which are not yet in database", ) 

293 local_parser.add_argument("-d", "--database", type=str, default="") 

294 local_parser.add_argument("-P", "--parameter_override", type=str, default="{}", 

295 help="JSON: the parameter dictionary is updated with these parameters. Use this to " 

296 "set host specific options like GPU call parameters. Enclose in \" ") 

297 local_parser.set_defaults(func=local) 

298 

299 server_parser = subparsers.add_parser('server', aliases=['s'], 

300 help="Runs server to distribute different scenarios to workers", ) 

301 server_parser.add_argument("-p", "--port", type=int, default=8342, help="Port to listen on") 

302 server_parser.add_argument("-H", "--host", type=str, default="0.0.0.0", help="IP/Hostname to listen on") 

303 server_parser.add_argument("-d", "--database", type=str, default="") 

304 server_parser.set_defaults(func=server) 

305 

306 client_parser = subparsers.add_parser('client', aliases=['c'], 

307 help="Runs a worker client connection to scenario distribution server") 

308 client_parser.add_argument("-p", "--port", type=int, default=8342, help="Port to connect to") 

309 client_parser.add_argument("-H", "--host", type=str, default="localhost", help="Host or IP to connect to") 

310 client_parser.add_argument("-n", "--client_name", type=str, default="{hostname}_{pid}", 

311 help="Unique client name, you can use {hostname} and {pid} as placeholder") 

312 client_parser.add_argument("-P", "--parameter_override", type=str, default="{}", 

313 help="JSON: the parameter dictionary is updated with these parameters. Use this to " 

314 "set host specific options like GPU call parameters. Enclose in \" ") 

315 client_parser.add_argument("-t", "--max_time", type=int, default=None, 

316 help="If more than this time in seconds has passed, " 

317 "the client stops running scenarios.") 

318 client_parser.set_defaults(func=client) 

319 

320 args = parser.parse_args(argv) 

321 if not len(vars(args)): 

322 parser.print_help() 

323 else: 

324 args.func(args) 

325 

326 def _filter_already_simulated(self, all_runs): 

327 """Removes all runs from the given list, that are already in the database""" 

328 already_simulated = {json.dumps(e.params) for e in self.db.filter({})} 

329 return [r for r in all_runs if json.dumps(r.parameter_dict) not in already_simulated] 

330 

331 @staticmethod 

332 def _distribute_runs(all_runs, process, num_processes): 

333 """Partitions runs by their weights into num_processes chunks and returns the process's chunk.""" 

334 sorted_runs = sorted(all_runs, key=lambda e: e.weight, reverse=True) 

335 result = sorted_runs[process::num_processes] 

336 result.reverse() # start with faster scenarios 

337 return result