Coverage for python/bellhop/environment.py: 98%

213 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 12:04 +0000

1 

2""" 

3Environment configuration for BELLHOP. 

4 

5This module provides dataclass-based environment configuration with automatic validation, 

6replacing manual option checking with field validators. 

7""" 

8 

9from collections.abc import MutableMapping 

10from dataclasses import dataclass, asdict, fields 

11from typing import Optional, Union, Any, Dict, Iterator 

12from pprint import pformat 

13import warnings 

14 

15import numpy as _np 

16import pandas as _pd 

17 

18from .constants import _Strings, _Maps, Defaults 

19 

20@dataclass 

21class Environment(MutableMapping[str, Any]): 

22 """Dataclass for underwater acoustic environment configuration. 

23 

24 This class provides automatic validation of environment parameters, 

25 eliminating the need for manual checking of option validity. 

26 

27 These entries are either intended to be set or edited by the user, or with `_` prefix are 

28 internal state read from a .env file or inferred by other data. Some others are ignored.""" 

29 

30 # Basic environment properties 

31 name: str = 'bellhop/python default' 

32 type: str = '2D' 

33 frequency: float = 25000.0 # Hz 

34 _num_media: int = 1 # must always = 1 in bellhop 

35 

36 # Sound speed parameters 

37 soundspeed: Union[float, Any] = 1500.0 # m/s 

38 soundspeed_interp: str = _Strings.linear 

39 

40 # Depth parameters 

41 depth: Union[float, Any] = 25.0 # m 

42 depth_interp: str = _Strings.linear 

43 _mesh_npts: int = 0 # ignored by bellhop 

44 _depth_sigma: float = 0.0 # ignored by bellhop 

45 depth_max: Optional[float] = None # m 

46 

47 # Flags to read/write from separate files 

48 _bathymetry: str = _Strings.flat # set to "from-file" if multiple bottom depths 

49 _altimetry: str = _Strings.flat # set to "from-file" if multiple surface heights 

50 _sbp_file: str = _Strings.default # set to "from-file" if source_directionality defined 

51 

52 # Bottom parameters 

53 bottom_interp: Optional[str] = None 

54 bottom_soundspeed: float = 1600.0 # m/s 

55 _bottom_soundspeed_shear: float = 0.0 # m/s (ignored) 

56 bottom_density: float = 1600 # kg/m^3 # this value doesn't seem right but is copied from ARLpy 

57 bottom_attenuation: Optional[float] = None # dB/wavelength 

58 _bottom_attenuation_shear: Optional[float] = None # dB/wavelength (ignored) 

59 bottom_roughness: float = 0.0 # m (rms) 

60 bottom_beta: Optional[float] = None 

61 bottom_transition_freq: Optional[float] = None # Hz 

62 bottom_boundary_condition: str = _Strings.acousto_elastic 

63 bottom_reflection_coefficient: Optional[Any] = None 

64 

65 # Surface parameters 

66 surface: Optional[Any] = None # surface profile 

67 surface_interp: str = _Strings.linear # curvilinear/linear 

68 surface_boundary_condition: str = _Strings.vacuum 

69 surface_reflection_coefficient: Optional[Any] = None 

70 surface_depth: float = 0.0 # m 

71 surface_soundspeed: float = 1600.0 # m/s 

72 _surface_soundspeed_shear: float = 0.0 # m/s (ignored) 

73 surface_density: float = 1000.0 # kg/m^3 

74 surface_attenuation: Optional[float] = None # dB/wavelength 

75 _surface_attenuation_shear: Optional[float] = None # dB/wavelength (ignored) 

76 

77 # Source parameters 

78 source_type: str = 'default' 

79 source_depth: Union[float, Any] = 5.0 # m - Any allows for np.ndarray 

80 source_ndepth: Optional[int] = None 

81 source_directionality: Optional[Any] = None # [(deg, dB)...] 

82 

83 # Receiver parameters 

84 receiver_depth: Union[float, Any] = 10.0 # m - Any allows for np.ndarray 

85 receiver_range: Union[float, Any] = 1000.0 # m - Any allows for np.ndarray 

86 receiver_ndepth: Optional[int] = None 

87 receiver_nrange: Optional[int] = None 

88 

89 # Beam settings 

90 beam_type: str = _Strings.default 

91 beam_angle_min: Optional[float] = None # deg 

92 beam_angle_max: Optional[float] = None # deg 

93 beam_num: int = 0 # (0 = auto) 

94 single_beam_index: Optional[int] = None 

95 _single_beam: str = _Strings.default # value inferred from `single_beam_index` 

96 

97 # Solution parameters 

98 step_size: Optional[float] = 0.0 

99 box_depth: Optional[float] = None 

100 box_range: Optional[float] = None 

101 grid_type: str = 'default' 

102 task: Optional[str] = None 

103 interference_mode: Optional[str] = None # subset of `task` for providing TL interface 

104 

105 # Attenuation parameters 

106 volume_attenuation: str = 'none' 

107 attenuation_units: str = 'frequency dependent' 

108 

109 # Francois-Garrison volume attenuation parameters 

110 fg_salinity: Optional[float] = None 

111 fg_temperature: Optional[float] = None 

112 fg_pH: Optional[float] = None 

113 fg_depth: Optional[float] = None 

114 

115 

116 def check(self) -> "Environment": 

117 self._finalise() 

118 try: 

119 self._check_env_header() 

120 self._check_env_surface() 

121 self._check_env_depth() 

122 self._check_env_ssp() 

123 self._check_env_sbp() 

124 self._check_env_beam() 

125 return self 

126 except AssertionError as e: 

127 raise ValueError(f"Env check error: {str(e)}") from None 

128 

129 def _finalise(self) -> "Environment": 

130 """Reviews the data within an environment and updates settings for consistency. 

131 

132 This function is run as the first step of check_env(). 

133 """ 

134 

135 if _np.size(self['depth']) > 1: 

136 self["_bathymetry"] = _Strings.from_file 

137 if self["surface"] is not None: 

138 self["_altimetry"] = _Strings.from_file 

139 if self["bottom_reflection_coefficient"] is not None: 

140 self["bottom_boundary_condition"] = _Strings.from_file 

141 if self["surface_reflection_coefficient"] is not None: 

142 self["surface_boundary_condition"] = _Strings.from_file 

143 

144 if self['depth_max'] is None: 

145 if _np.size(self['depth']) == 1: 

146 self['depth_max'] = self['depth'] 

147 else: 

148 # depth : Nx2 array = [ranges,depths] 

149 self['depth_max'] = _np.max(self['depth'][:,1]) 

150 

151 if not isinstance(self['soundspeed'], _pd.DataFrame): 

152 if _np.size(self['soundspeed']) == 1: 

153 speed = [float(self["soundspeed"]), float(self["soundspeed"])] 

154 depth = [0, float(self['depth_max'])] 

155 self["soundspeed"] = _pd.DataFrame(speed, columns=["speed"], index=depth) 

156 self["soundspeed"].index.name = "depth" 

157 elif self['soundspeed'].shape[0] == 1 and self['soundspeed'].shape[1] == 2: 

158 speed = [float(self["soundspeed"][0,1]), float(self["soundspeed"][0,1])] 

159 d1 = float(min([0.0, self["soundspeed"][0,0]])) 

160 d2 = float(max([self["soundspeed"][0,0], self['depth_max']])) 

161 self["soundspeed"] = _pd.DataFrame(speed, columns=["speed"], index=[d1, d2]) 

162 self["soundspeed"].index.name = "depth" 

163 elif self['soundspeed'].ndim == 2 and self['soundspeed'].shape[1] == 2: 

164 depth = self['soundspeed'][:,0] 

165 speed = self['soundspeed'][:,1] 

166 self["soundspeed"] = _pd.DataFrame(speed, columns=["speed"], index=depth) 

167 self["soundspeed"].index.name = "depth" 

168 else: 

169 raise ValueError("Soundspeed array must be a 2xN array (better to use a DataFrame)") 

170 

171 if "depth" in self["soundspeed"].columns: 

172 self["soundspeed"] = self["soundspeed"].set_index("depth") 

173 

174 if len(self['soundspeed'].columns) > 1: 

175 self['soundspeed_interp'] == _Strings.quadrilateral 

176 

177 # Beam angle ranges default to half-space if source is left-most, otherwise full-space: 

178 if self['beam_angle_min'] is None: 

179 if _np.min(self['receiver_range']) < 0: 

180 self['beam_angle_min'] = - Defaults.beam_angle_fullspace 

181 else: 

182 self['beam_angle_min'] = - Defaults.beam_angle_halfspace 

183 if self['beam_angle_max'] is None: 

184 if _np.min(self['receiver_range']) < 0: 

185 self['beam_angle_max'] = Defaults.beam_angle_fullspace 

186 else: 

187 self['beam_angle_max'] = Defaults.beam_angle_halfspace 

188 

189 self['box_depth'] = self['box_depth'] or 1.01 * self['depth_max'] 

190 self['box_range'] = self['box_range'] or 1.01 * (_np.max(self['receiver_range']) - min(0,_np.min(self['receiver_range']))) 

191 

192 return self 

193 

194 

195 def _check_env_header(self) -> None: 

196 assert self['type'] == '2D', 'Not a 2D environment' 

197 assert self["_num_media"] == 1, f"BELLHOP only supports 1 medium, found {self['_num_media']}" 

198 

199 def _check_env_surface(self) -> None: 

200 max_range = _np.max(self['receiver_range']) 

201 if self['surface'] is not None: 

202 assert _np.size(self['surface']) > 1, 'surface must be an Nx2 array' 

203 assert self['surface'].ndim == 2, 'surface must be a scalar or an Nx2 array' 

204 assert self['surface'].shape[1] == 2, 'surface must be a scalar or an Nx2 array' 

205 assert self['surface'][0,0] <= 0, 'First range in surface array must be 0 m' 

206 assert self['surface'][-1,0] >= max_range, 'Last range in surface array must be beyond maximum range: '+str(max_range)+' m' 

207 assert _np.all(_np.diff(self['surface'][:,0]) > 0), 'surface array must be strictly monotonic in range' 

208 if self["surface_reflection_coefficient"] is not None: 

209 assert self["surface_boundary_condition"] == _Strings.from_file, "TRC values need to be read from file" 

210 

211 def _check_env_depth(self) -> None: 

212 max_range = _np.max(self['receiver_range']) 

213 if _np.size(self['depth']) > 1: 

214 assert self['depth'].ndim == 2, 'depth must be a scalar or an Nx2 array [ranges, depths]' 

215 assert self['depth'].shape[1] == 2, 'depth must be a scalar or an Nx2 array [ranges, depths]' 

216 assert self['depth'][-1,0] >= max_range, 'Last range in depth array must be beyond maximum range: '+str(max_range)+' m' 

217 assert _np.all(_np.diff(self['depth'][:,0]) > 0), 'Depth array must be strictly monotonic in range' 

218 assert self["_bathymetry"] == _Strings.from_file, 'len(depth)>1 requires BTY file' 

219 if self["bottom_reflection_coefficient"] is not None: 

220 assert self["bottom_boundary_condition"] == _Strings.from_file, "BRC values need to be read from file" 

221 assert _np.max(self['source_depth']) <= self['depth_max'], 'source_depth cannot exceed water depth: '+str(self['depth_max'])+' m' 

222 assert _np.max(self['receiver_depth']) <= self['depth_max'], 'receiver_depth cannot exceed water depth: '+str(self['depth_max'])+' m' 

223 

224 def _check_env_ssp(self) -> None: 

225 assert isinstance(self['soundspeed'], _pd.DataFrame), 'Soundspeed should always be a DataFrame by this point' 

226 assert self['soundspeed'].size > 1, "Soundspeed DataFrame should have been constructed internally to be two elements" 

227 if self['soundspeed'].size > 1: 

228 if len(self['soundspeed'].columns) > 1: 

229 assert self['soundspeed_interp'] == _Strings.quadrilateral, "SVP DataFrame with multiple columns implies quadrilateral interpolation." 

230 if self['soundspeed_interp'] == _Strings.spline: 

231 assert self['soundspeed'].shape[0] > 3, 'soundspeed profile must have at least 4 points for spline interpolation' 

232 else: 

233 assert self['soundspeed'].shape[0] > 1, 'soundspeed profile must have at least 2 points' 

234 assert self['soundspeed'].index[0] <= 0.0, 'First depth in soundspeed array must be 0 m' 

235 assert _np.all(_np.diff(self['soundspeed'].index) > 0), 'Soundspeed array must be strictly monotonic in depth' 

236 if self['depth_max'] != self['soundspeed'].index[-1]: 

237 if self['soundspeed'].shape[1] > 1: 

238 # TODO: generalise interpolation trimming from np approach below 

239 assert self['soundspeed'].index[-1] == self['depth_max'], '2D SSP: Final entry in soundspeed array must be at the maximum water depth: '+str(self['depth_max'])+' m' 

240 else: 

241 indlarger = _np.argwhere(self['soundspeed'].index > self['depth_max'])[0][0] 

242 prev_ind = self['soundspeed'].index[:indlarger].tolist() 

243 insert_ss_val = _np.interp(self['depth_max'], self['soundspeed'].index, self['soundspeed'].iloc[:,0]) 

244 new_row = _pd.DataFrame([self['depth_max'], insert_ss_val], columns=self['soundspeed'].columns) 

245 self['soundspeed'] = _pd.concat([ 

246 self['soundspeed'].iloc[:(indlarger-1)], # rows before insertion 

247 new_row, # new row 

248 ], ignore_index=True) 

249 self['soundspeed'].index = prev_ind + [self['depth_max']] 

250 warnings.warn("Bellhop.py has used linear interpolation to ensure the sound speed profile ends at the max depth. Ensure this is what you want.", UserWarning) 

251 print("ATTEMPTING TO FIX") 

252 # TODO: check soundspeed range limits 

253 

254 def _check_env_sbp(self) -> None: 

255 if self['source_directionality'] is not None: 

256 assert _np.size(self['source_directionality']) > 1, 'source_directionality must be an Nx2 array' 

257 assert self['source_directionality'].ndim == 2, 'source_directionality must be an Nx2 array' 

258 assert self['source_directionality'].shape[1] == 2, 'source_directionality must be an Nx2 array' 

259 assert _np.all(self['source_directionality'][:,0] >= -180) and _np.all(self['source_directionality'][:,0] <= 180), 'source_directionality angles must be in (-180, 180]' 

260 

261 def _check_env_beam(self) -> None: 

262 assert self['beam_angle_min'] >= -180 and self['beam_angle_min'] <= 180, 'beam_angle_min must be in range (-180, 180]' 

263 assert self['beam_angle_max'] >= -180 and self['beam_angle_max'] <= 180, 'beam_angle_max must be in range (-180, 180]' 

264 if self['_single_beam'] == _Strings.single_beam: 

265 assert self['single_beam_index'] is not None, 'Single beam was requested with option I but no index was provided in NBeam line' 

266 

267 

268 def __getitem__(self, key: str) -> Any: 

269 if not hasattr(self, key): 

270 raise KeyError(key) 

271 return getattr(self, key) 

272 

273 def __setitem__(self, key: str, value: Any) -> None: 

274 self.__setattr__(key, value) 

275 

276 def __setattr__(self, key: str, value: Any) -> None: 

277 if not hasattr(self, key): 

278 raise KeyError(f"Unknown environment configuration parameter: {key!r}") 

279 # Generalized validation of values 

280 allowed = getattr(_Maps, key, None) 

281 if allowed is not None and value is not None and value not in set(allowed.values()): 

282 raise ValueError(f"Invalid value for {key!r}: {value}. Allowed: {set(allowed.values())}") 

283 object.__setattr__(self, key, value) 

284 

285 def __delitem__(self, key: str) -> None: 

286 raise KeyError("Environment parameters cannot be deleted") 

287 

288 def __iter__(self) -> Iterator[str]: 

289 return (f.name for f in fields(self)) 

290 

291 def __len__(self) -> int: 

292 return len(fields(self)) 

293 

294 def __repr__(self) -> str: 

295 return pformat(self.to_dict()) 

296 

297 def to_dict(self) -> Dict[str,Any]: 

298 """Return a dictionary representation of the environment.""" 

299 return asdict(self) 

300 

301 def copy(self) -> "Environment": 

302 """Return a shallow copy of the environment.""" 

303 # Copy all fields 

304 data = {f.name: getattr(self, f.name) for f in fields(self)} 

305 # Return a new instance 

306 new_env = type(self)(**data) 

307 return new_env