1import os 

2import itertools 

3from collections import Counter 

4from contextlib import contextmanager 

5from tempfile import NamedTemporaryFile 

6from typing import Mapping 

7 

8import numpy as np 

9import sympy as sp 

10 

11 

12class DotDict(dict): 

13 """Normal dict with additional dot access for all keys""" 

14 __getattr__ = dict.get 

15 __setattr__ = dict.__setitem__ 

16 __delattr__ = dict.__delitem__ 

17 

18 # Recursively make DotDict: https://stackoverflow.com/questions/13520421/recursive-dotdict 

19 def __init__(self, dct={}): 

20 for key, value in dct.items(): 20 ↛ 21line 20 didn't jump to line 21, because the loop on line 20 never started

21 if isinstance(value, dict): 

22 value = DotDict(value) 

23 self[key] = value 

24 

25 

26def all_equal(iterator): 

27 iterator = iter(iterator) 

28 try: 

29 first = next(iterator) 

30 except StopIteration: 

31 return True 

32 return all(first == rest for rest in iterator) 

33 

34 

35def recursive_dict_update(d, u): 

36 """Updates the first dict argument, using second dictionary recursively. 

37 

38 Examples: 

39 >>> d = {'sub_dict': {'a': 1, 'b': 2}, 'outer': 42} 

40 >>> u = {'sub_dict': {'a': 5, 'c': 10}, 'outer': 41, 'outer2': 43} 

41 >>> recursive_dict_update(d, u) 

42 {'sub_dict': {'a': 5, 'b': 2, 'c': 10}, 'outer': 41, 'outer2': 43} 

43 """ 

44 d = d.copy() 

45 for k, v in u.items(): 

46 if isinstance(v, Mapping): 

47 r = recursive_dict_update(d.get(k, {}), v) 

48 d[k] = r 

49 else: 

50 d[k] = u[k] 

51 return d 

52 

53 

54@contextmanager 

55def file_handle_for_atomic_write(file_path): 

56 """Open temporary file object that atomically moves to destination upon exiting. 

57 

58 Allows reading and writing to and from the same filename. 

59 The file will not be moved to destination in case of an exception. 

60 

61 Args: 

62 file_path: path to file to be opened 

63 """ 

64 target_folder = os.path.dirname(os.path.abspath(file_path)) 

65 with NamedTemporaryFile(delete=False, dir=target_folder, mode='w') as f: 

66 try: 

67 yield f 

68 finally: 

69 f.flush() 

70 os.fsync(f.fileno()) 

71 os.rename(f.name, file_path) 

72 

73 

74@contextmanager 

75def atomic_file_write(file_path): 

76 target_folder = os.path.dirname(os.path.abspath(file_path)) 

77 with NamedTemporaryFile(delete=False, dir=target_folder) as f: 

78 f.file.close() 

79 yield f.name 

80 os.rename(f.name, file_path) 

81 

82 

83def fully_contains(l1, l2): 

84 """Tests if elements of sequence 1 are in sequence 2 in same or higher number. 

85 

86 >>> fully_contains([1, 1, 2], [1, 2]) # 1 is only present once in second list 

87 False 

88 >>> fully_contains([1, 1, 2], [1, 1, 4, 2]) 

89 True 

90 """ 

91 l1_counter = Counter(l1) 

92 l2_counter = Counter(l2) 

93 for element, count in l1_counter.items(): 

94 if l2_counter[element] < count: 

95 return False 

96 return True 

97 

98 

99def boolean_array_bounding_box(boolean_array): 

100 """Returns bounding box around "true" area of boolean array 

101 

102 >>> a = np.zeros((4, 4), dtype=bool) 

103 >>> a[1:-1, 1:-1] = True 

104 >>> boolean_array_bounding_box(a) 

105 [(1, 3), (1, 3)] 

106 """ 

107 dim = boolean_array.ndim 

108 shape = boolean_array.shape 

109 assert 0 not in shape, "Shape must not contain zero" 

110 bounds = [] 

111 for ax in itertools.combinations(reversed(range(dim)), dim - 1): 

112 nonzero = np.any(boolean_array, axis=ax) 

113 t = np.where(nonzero)[0][[0, -1]] 

114 bounds.append((t[0], t[1] + 1)) 

115 return bounds 

116 

117 

118class LinearEquationSystem: 

119 """Symbolic linear system of equations - consisting of matrix and right hand side. 

120 

121 Equations can be added incrementally. System is held in reduced row echelon form to quickly determine if 

122 system has a single, multiple, or no solution. 

123 

124 Example: 

125 >>> x, y= sp.symbols("x, y") 

126 >>> les = LinearEquationSystem([x, y]) 

127 >>> les.add_equation(x - y - 3) 

128 >>> les.solution_structure() 

129 'multiple' 

130 >>> les.add_equation(x + y - 4) 

131 >>> les.solution_structure() 

132 'single' 

133 >>> les.solution() 

134 {x: 7/2, y: 1/2} 

135 

136 """ 

137 def __init__(self, unknowns): 

138 size = len(unknowns) 

139 self._matrix = sp.zeros(size, size + 1) 

140 self.unknowns = unknowns 

141 self.next_zero_row = 0 

142 self._reduced = True 

143 

144 def copy(self): 

145 """Returns a copy of the equation system.""" 

146 new = LinearEquationSystem(self.unknowns) 

147 new._matrix = self._matrix.copy() 

148 new.next_zero_row = self.next_zero_row 

149 return new 

150 

151 def add_equation(self, linear_equation): 

152 """Add a linear equation as sympy expression. Implicit "-0" is assumed. Equation has to be linear and contain 

153 only unknowns passed to the constructor otherwise a ValueError is raised. """ 

154 self._resize_if_necessary() 

155 linear_equation = linear_equation.expand() 

156 zero_row_idx = self.next_zero_row 

157 self.next_zero_row += 1 

158 

159 control = 0 

160 for i, unknown in enumerate(self.unknowns): 

161 self._matrix[zero_row_idx, i] = linear_equation.coeff(unknown) 

162 control += unknown * self._matrix[zero_row_idx, i] 

163 rest = linear_equation - control 

164 if rest.atoms(sp.Symbol): 

165 raise ValueError("Not a linear equation in the unknowns") 

166 self._matrix[zero_row_idx, -1] = -rest 

167 self._reduced = False 

168 

169 def add_equations(self, linear_equations): 

170 """Add a sequence of equations. For details see `add_equation`. """ 

171 self._resize_if_necessary(len(linear_equations)) 

172 for eq in linear_equations: 

173 self.add_equation(eq) 

174 

175 def set_unknown_zero(self, unknown_idx): 

176 """Sets an unknown to zero - pass the index not the variable itself!""" 

177 assert unknown_idx < len(self.unknowns) 

178 self._resize_if_necessary() 

179 self._matrix[self.next_zero_row, unknown_idx] = 1 

180 self.next_zero_row += 1 

181 self._reduced = False 

182 

183 def reduce(self): 

184 """Brings the system in reduced row echelon form.""" 

185 if self._reduced: 

186 return 

187 self._matrix = self._matrix.rref()[0] 

188 self._update_next_zero_row() 

189 self._reduced = True 

190 

191 @property 

192 def matrix(self): 

193 """Return a matrix that represents the equation system. 

194 Has one column more than unknowns for the affine part.""" 

195 self.reduce() 

196 return self._matrix 

197 

198 @property 

199 def rank(self): 

200 self.reduce() 

201 return self.next_zero_row 

202 

203 def solution_structure(self): 

204 """Returns either 'multiple', 'none' or 'single' to indicate how many solutions the system has.""" 

205 self.reduce() 

206 non_zero_rows = self.next_zero_row 

207 num_unknowns = len(self.unknowns) 

208 if non_zero_rows == 0: 

209 return 'multiple' 

210 

211 *row_begin, left, right = self._matrix.row(non_zero_rows - 1) 

212 if non_zero_rows > num_unknowns: 

213 return 'none' 

214 elif non_zero_rows == num_unknowns: 

215 if left == 0 and right != 0: 

216 return 'none' 

217 else: 

218 return 'single' 

219 elif non_zero_rows < num_unknowns: 

220 if right != 0 and left == 0 and all(e == 0 for e in row_begin): 

221 return 'none' 

222 else: 

223 return 'multiple' 

224 

225 def solution(self): 

226 """Solves the system. Under- and overdetermined systems are supported. 

227 Returns a dictionary mapping symbol to solution value.""" 

228 return sp.solve_linear_system(self._matrix, *self.unknowns) 

229 

230 def _resize_if_necessary(self, new_rows=1): 

231 if self.next_zero_row + new_rows > self._matrix.shape[0]: 

232 self._matrix = self._matrix.row_insert(self._matrix.shape[0] + 1, 

233 sp.zeros(new_rows, self._matrix.shape[1])) 

234 

235 def _update_next_zero_row(self): 

236 result = self._matrix.shape[0] 

237 while result >= 0: 

238 row_to_check = result - 1 

239 if any(e != 0 for e in self._matrix.row(row_to_check)): 

240 break 

241 result -= 1 

242 self.next_zero_row = result