Coverage for python/bellhop/environment.py: 98%
213 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 12:04 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 12:04 +0000
2"""
3Environment configuration for BELLHOP.
5This module provides dataclass-based environment configuration with automatic validation,
6replacing manual option checking with field validators.
7"""
9from collections.abc import MutableMapping
10from dataclasses import dataclass, asdict, fields
11from typing import Optional, Union, Any, Dict, Iterator
12from pprint import pformat
13import warnings
15import numpy as _np
16import pandas as _pd
18from .constants import _Strings, _Maps, Defaults
20@dataclass
21class Environment(MutableMapping[str, Any]):
22 """Dataclass for underwater acoustic environment configuration.
24 This class provides automatic validation of environment parameters,
25 eliminating the need for manual checking of option validity.
27 These entries are either intended to be set or edited by the user, or with `_` prefix are
28 internal state read from a .env file or inferred by other data. Some others are ignored."""
30 # Basic environment properties
31 name: str = 'bellhop/python default'
32 type: str = '2D'
33 frequency: float = 25000.0 # Hz
34 _num_media: int = 1 # must always = 1 in bellhop
36 # Sound speed parameters
37 soundspeed: Union[float, Any] = 1500.0 # m/s
38 soundspeed_interp: str = _Strings.linear
40 # Depth parameters
41 depth: Union[float, Any] = 25.0 # m
42 depth_interp: str = _Strings.linear
43 _mesh_npts: int = 0 # ignored by bellhop
44 _depth_sigma: float = 0.0 # ignored by bellhop
45 depth_max: Optional[float] = None # m
47 # Flags to read/write from separate files
48 _bathymetry: str = _Strings.flat # set to "from-file" if multiple bottom depths
49 _altimetry: str = _Strings.flat # set to "from-file" if multiple surface heights
50 _sbp_file: str = _Strings.default # set to "from-file" if source_directionality defined
52 # Bottom parameters
53 bottom_interp: Optional[str] = None
54 bottom_soundspeed: float = 1600.0 # m/s
55 _bottom_soundspeed_shear: float = 0.0 # m/s (ignored)
56 bottom_density: float = 1600 # kg/m^3 # this value doesn't seem right but is copied from ARLpy
57 bottom_attenuation: Optional[float] = None # dB/wavelength
58 _bottom_attenuation_shear: Optional[float] = None # dB/wavelength (ignored)
59 bottom_roughness: float = 0.0 # m (rms)
60 bottom_beta: Optional[float] = None
61 bottom_transition_freq: Optional[float] = None # Hz
62 bottom_boundary_condition: str = _Strings.acousto_elastic
63 bottom_reflection_coefficient: Optional[Any] = None
65 # Surface parameters
66 surface: Optional[Any] = None # surface profile
67 surface_interp: str = _Strings.linear # curvilinear/linear
68 surface_boundary_condition: str = _Strings.vacuum
69 surface_reflection_coefficient: Optional[Any] = None
70 surface_depth: float = 0.0 # m
71 surface_soundspeed: float = 1600.0 # m/s
72 _surface_soundspeed_shear: float = 0.0 # m/s (ignored)
73 surface_density: float = 1000.0 # kg/m^3
74 surface_attenuation: Optional[float] = None # dB/wavelength
75 _surface_attenuation_shear: Optional[float] = None # dB/wavelength (ignored)
77 # Source parameters
78 source_type: str = 'default'
79 source_depth: Union[float, Any] = 5.0 # m - Any allows for np.ndarray
80 source_ndepth: Optional[int] = None
81 source_directionality: Optional[Any] = None # [(deg, dB)...]
83 # Receiver parameters
84 receiver_depth: Union[float, Any] = 10.0 # m - Any allows for np.ndarray
85 receiver_range: Union[float, Any] = 1000.0 # m - Any allows for np.ndarray
86 receiver_ndepth: Optional[int] = None
87 receiver_nrange: Optional[int] = None
89 # Beam settings
90 beam_type: str = _Strings.default
91 beam_angle_min: Optional[float] = None # deg
92 beam_angle_max: Optional[float] = None # deg
93 beam_num: int = 0 # (0 = auto)
94 single_beam_index: Optional[int] = None
95 _single_beam: str = _Strings.default # value inferred from `single_beam_index`
97 # Solution parameters
98 step_size: Optional[float] = 0.0
99 box_depth: Optional[float] = None
100 box_range: Optional[float] = None
101 grid_type: str = 'default'
102 task: Optional[str] = None
103 interference_mode: Optional[str] = None # subset of `task` for providing TL interface
105 # Attenuation parameters
106 volume_attenuation: str = 'none'
107 attenuation_units: str = 'frequency dependent'
109 # Francois-Garrison volume attenuation parameters
110 fg_salinity: Optional[float] = None
111 fg_temperature: Optional[float] = None
112 fg_pH: Optional[float] = None
113 fg_depth: Optional[float] = None
116 def check(self) -> "Environment":
117 self._finalise()
118 try:
119 self._check_env_header()
120 self._check_env_surface()
121 self._check_env_depth()
122 self._check_env_ssp()
123 self._check_env_sbp()
124 self._check_env_beam()
125 return self
126 except AssertionError as e:
127 raise ValueError(f"Env check error: {str(e)}") from None
129 def _finalise(self) -> "Environment":
130 """Reviews the data within an environment and updates settings for consistency.
132 This function is run as the first step of check_env().
133 """
135 if _np.size(self['depth']) > 1:
136 self["_bathymetry"] = _Strings.from_file
137 if self["surface"] is not None:
138 self["_altimetry"] = _Strings.from_file
139 if self["bottom_reflection_coefficient"] is not None:
140 self["bottom_boundary_condition"] = _Strings.from_file
141 if self["surface_reflection_coefficient"] is not None:
142 self["surface_boundary_condition"] = _Strings.from_file
144 if self['depth_max'] is None:
145 if _np.size(self['depth']) == 1:
146 self['depth_max'] = self['depth']
147 else:
148 # depth : Nx2 array = [ranges,depths]
149 self['depth_max'] = _np.max(self['depth'][:,1])
151 if not isinstance(self['soundspeed'], _pd.DataFrame):
152 if _np.size(self['soundspeed']) == 1:
153 speed = [float(self["soundspeed"]), float(self["soundspeed"])]
154 depth = [0, float(self['depth_max'])]
155 self["soundspeed"] = _pd.DataFrame(speed, columns=["speed"], index=depth)
156 self["soundspeed"].index.name = "depth"
157 elif self['soundspeed'].shape[0] == 1 and self['soundspeed'].shape[1] == 2:
158 speed = [float(self["soundspeed"][0,1]), float(self["soundspeed"][0,1])]
159 d1 = float(min([0.0, self["soundspeed"][0,0]]))
160 d2 = float(max([self["soundspeed"][0,0], self['depth_max']]))
161 self["soundspeed"] = _pd.DataFrame(speed, columns=["speed"], index=[d1, d2])
162 self["soundspeed"].index.name = "depth"
163 elif self['soundspeed'].ndim == 2 and self['soundspeed'].shape[1] == 2:
164 depth = self['soundspeed'][:,0]
165 speed = self['soundspeed'][:,1]
166 self["soundspeed"] = _pd.DataFrame(speed, columns=["speed"], index=depth)
167 self["soundspeed"].index.name = "depth"
168 else:
169 raise ValueError("Soundspeed array must be a 2xN array (better to use a DataFrame)")
171 if "depth" in self["soundspeed"].columns:
172 self["soundspeed"] = self["soundspeed"].set_index("depth")
174 if len(self['soundspeed'].columns) > 1:
175 self['soundspeed_interp'] == _Strings.quadrilateral
177 # Beam angle ranges default to half-space if source is left-most, otherwise full-space:
178 if self['beam_angle_min'] is None:
179 if _np.min(self['receiver_range']) < 0:
180 self['beam_angle_min'] = - Defaults.beam_angle_fullspace
181 else:
182 self['beam_angle_min'] = - Defaults.beam_angle_halfspace
183 if self['beam_angle_max'] is None:
184 if _np.min(self['receiver_range']) < 0:
185 self['beam_angle_max'] = Defaults.beam_angle_fullspace
186 else:
187 self['beam_angle_max'] = Defaults.beam_angle_halfspace
189 self['box_depth'] = self['box_depth'] or 1.01 * self['depth_max']
190 self['box_range'] = self['box_range'] or 1.01 * (_np.max(self['receiver_range']) - min(0,_np.min(self['receiver_range'])))
192 return self
195 def _check_env_header(self) -> None:
196 assert self['type'] == '2D', 'Not a 2D environment'
197 assert self["_num_media"] == 1, f"BELLHOP only supports 1 medium, found {self['_num_media']}"
199 def _check_env_surface(self) -> None:
200 max_range = _np.max(self['receiver_range'])
201 if self['surface'] is not None:
202 assert _np.size(self['surface']) > 1, 'surface must be an Nx2 array'
203 assert self['surface'].ndim == 2, 'surface must be a scalar or an Nx2 array'
204 assert self['surface'].shape[1] == 2, 'surface must be a scalar or an Nx2 array'
205 assert self['surface'][0,0] <= 0, 'First range in surface array must be 0 m'
206 assert self['surface'][-1,0] >= max_range, 'Last range in surface array must be beyond maximum range: '+str(max_range)+' m'
207 assert _np.all(_np.diff(self['surface'][:,0]) > 0), 'surface array must be strictly monotonic in range'
208 if self["surface_reflection_coefficient"] is not None:
209 assert self["surface_boundary_condition"] == _Strings.from_file, "TRC values need to be read from file"
211 def _check_env_depth(self) -> None:
212 max_range = _np.max(self['receiver_range'])
213 if _np.size(self['depth']) > 1:
214 assert self['depth'].ndim == 2, 'depth must be a scalar or an Nx2 array [ranges, depths]'
215 assert self['depth'].shape[1] == 2, 'depth must be a scalar or an Nx2 array [ranges, depths]'
216 assert self['depth'][-1,0] >= max_range, 'Last range in depth array must be beyond maximum range: '+str(max_range)+' m'
217 assert _np.all(_np.diff(self['depth'][:,0]) > 0), 'Depth array must be strictly monotonic in range'
218 assert self["_bathymetry"] == _Strings.from_file, 'len(depth)>1 requires BTY file'
219 if self["bottom_reflection_coefficient"] is not None:
220 assert self["bottom_boundary_condition"] == _Strings.from_file, "BRC values need to be read from file"
221 assert _np.max(self['source_depth']) <= self['depth_max'], 'source_depth cannot exceed water depth: '+str(self['depth_max'])+' m'
222 assert _np.max(self['receiver_depth']) <= self['depth_max'], 'receiver_depth cannot exceed water depth: '+str(self['depth_max'])+' m'
224 def _check_env_ssp(self) -> None:
225 assert isinstance(self['soundspeed'], _pd.DataFrame), 'Soundspeed should always be a DataFrame by this point'
226 assert self['soundspeed'].size > 1, "Soundspeed DataFrame should have been constructed internally to be two elements"
227 if self['soundspeed'].size > 1:
228 if len(self['soundspeed'].columns) > 1:
229 assert self['soundspeed_interp'] == _Strings.quadrilateral, "SVP DataFrame with multiple columns implies quadrilateral interpolation."
230 if self['soundspeed_interp'] == _Strings.spline:
231 assert self['soundspeed'].shape[0] > 3, 'soundspeed profile must have at least 4 points for spline interpolation'
232 else:
233 assert self['soundspeed'].shape[0] > 1, 'soundspeed profile must have at least 2 points'
234 assert self['soundspeed'].index[0] <= 0.0, 'First depth in soundspeed array must be 0 m'
235 assert _np.all(_np.diff(self['soundspeed'].index) > 0), 'Soundspeed array must be strictly monotonic in depth'
236 if self['depth_max'] != self['soundspeed'].index[-1]:
237 if self['soundspeed'].shape[1] > 1:
238 # TODO: generalise interpolation trimming from np approach below
239 assert self['soundspeed'].index[-1] == self['depth_max'], '2D SSP: Final entry in soundspeed array must be at the maximum water depth: '+str(self['depth_max'])+' m'
240 else:
241 indlarger = _np.argwhere(self['soundspeed'].index > self['depth_max'])[0][0]
242 prev_ind = self['soundspeed'].index[:indlarger].tolist()
243 insert_ss_val = _np.interp(self['depth_max'], self['soundspeed'].index, self['soundspeed'].iloc[:,0])
244 new_row = _pd.DataFrame([self['depth_max'], insert_ss_val], columns=self['soundspeed'].columns)
245 self['soundspeed'] = _pd.concat([
246 self['soundspeed'].iloc[:(indlarger-1)], # rows before insertion
247 new_row, # new row
248 ], ignore_index=True)
249 self['soundspeed'].index = prev_ind + [self['depth_max']]
250 warnings.warn("Bellhop.py has used linear interpolation to ensure the sound speed profile ends at the max depth. Ensure this is what you want.", UserWarning)
251 print("ATTEMPTING TO FIX")
252 # TODO: check soundspeed range limits
254 def _check_env_sbp(self) -> None:
255 if self['source_directionality'] is not None:
256 assert _np.size(self['source_directionality']) > 1, 'source_directionality must be an Nx2 array'
257 assert self['source_directionality'].ndim == 2, 'source_directionality must be an Nx2 array'
258 assert self['source_directionality'].shape[1] == 2, 'source_directionality must be an Nx2 array'
259 assert _np.all(self['source_directionality'][:,0] >= -180) and _np.all(self['source_directionality'][:,0] <= 180), 'source_directionality angles must be in (-180, 180]'
261 def _check_env_beam(self) -> None:
262 assert self['beam_angle_min'] >= -180 and self['beam_angle_min'] <= 180, 'beam_angle_min must be in range (-180, 180]'
263 assert self['beam_angle_max'] >= -180 and self['beam_angle_max'] <= 180, 'beam_angle_max must be in range (-180, 180]'
264 if self['_single_beam'] == _Strings.single_beam:
265 assert self['single_beam_index'] is not None, 'Single beam was requested with option I but no index was provided in NBeam line'
268 def __getitem__(self, key: str) -> Any:
269 if not hasattr(self, key):
270 raise KeyError(key)
271 return getattr(self, key)
273 def __setitem__(self, key: str, value: Any) -> None:
274 self.__setattr__(key, value)
276 def __setattr__(self, key: str, value: Any) -> None:
277 if not hasattr(self, key):
278 raise KeyError(f"Unknown environment configuration parameter: {key!r}")
279 # Generalized validation of values
280 allowed = getattr(_Maps, key, None)
281 if allowed is not None and value is not None and value not in set(allowed.values()):
282 raise ValueError(f"Invalid value for {key!r}: {value}. Allowed: {set(allowed.values())}")
283 object.__setattr__(self, key, value)
285 def __delitem__(self, key: str) -> None:
286 raise KeyError("Environment parameters cannot be deleted")
288 def __iter__(self) -> Iterator[str]:
289 return (f.name for f in fields(self))
291 def __len__(self) -> int:
292 return len(fields(self))
294 def __repr__(self) -> str:
295 return pformat(self.to_dict())
297 def to_dict(self) -> Dict[str,Any]:
298 """Return a dictionary representation of the environment."""
299 return asdict(self)
301 def copy(self) -> "Environment":
302 """Return a shallow copy of the environment."""
303 # Copy all fields
304 data = {f.name: getattr(self, f.name) for f in fields(self)}
305 # Return a new instance
306 new_env = type(self)(**data)
307 return new_env