Coverage for python / aubellhop / writers.py: 87%
189 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-24 14:11 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-24 14:11 +0000
1"""File writing class methods for aubellhop.
3These files are the input files passed directly into the `bellhop(3d).exe` binaries.
5The `EnvironmentWriter` class provides encapsulation of methods. It is not intended
6to be user-facing. It is the end of the chain for env writing:
8 ~~bellhop.py~~ ~~environment.py~~ ~~writers.py~~
9 model_fn.write_env() → env.to_file() → EnvironmentWriter().write()
11"""
13from __future__ import annotations
15from typing import Any, TextIO
16import numpy as np
17import pandas as pd
18from .environment import Environment
19from .constants import BHStrings, FlagMaps
22class EnvironmentWriter:
23 """Bellhop file-writer class which creates the `.env` and related input files."""
25 def __init__(self,
26 env: Environment,
27 file_handle: TextIO,
28 fname_base: str,
29 taskcode: str
30 ):
31 """Initialize writer with existing file reference and filename base, plus taskcode.
33 The filename base is, e.g., the `foo.env` file stripped of its extension.
35 Although the taskcode can be stored within the environment, it is so common to
36 override this that we use an explicit function input.
38 Parameters
39 ----------
40 env : Environment
41 The Environment instance
42 fh : file object
43 File reference (already opened)
44 fname_base : str
45 Filename base (without extension)
46 taskcode : str
47 Task char which defines the computation to run (`R`, `I`, `C`, etc.)
49 """
50 self.env = env
51 self.file_handle = file_handle
52 self.fname_base = fname_base
53 self.taskcode = taskcode
56 def write(self) -> None:
57 """Writes a complete .env file for specifying a Bellhop simulation
59 Returns
60 -------
61 self.fname_base : str
62 Filename base (no extension) of written file
64 We liberally insert comments and empty lines for readability and take care to
65 ensure that comments are consistently aligned.
66 This doesn't make a difference to bellhop.exe, it just makes debugging far easier.
67 """
69 fh = self.file_handle
70 self._print_env_line(fh,"")
71 self._write_env_header(fh)
72 self._print_env_line(fh,"")
73 self._write_env_surface_depth(fh)
74 self._write_env_sound_speed(fh)
75 self._print_env_line(fh,"")
76 self._write_env_bottom(fh)
77 self._print_env_line(fh,"")
78 self._write_env_source_receiver(fh)
79 self._print_env_line(fh,"")
80 self._write_env_task(fh, self.taskcode)
81 self._write_env_beam_footer(fh)
82 self._print_env_line(fh,"")
83 self._write_gaussian_params(fh)
84 self._print_env_line(fh,"","End of Bellhop environment file")
86 if self.env['surface_boundary_condition'] == BHStrings.from_file:
87 self._create_refl_coeff_file(self.fname_base+".trc", self.env['surface_reflection_coefficient'])
88 if np.size(self.env["surface_depth"]) > 1:
89 self._create_bty_ati_file(self.fname_base+'.ati', self.env['surface_depth'], self.env['surface_interp'])
90 if self.env['soundspeed_interp'] == BHStrings.quadrilateral:
91 self._create_ssp_quad_file(self.fname_base+'.ssp', self.env['soundspeed'])
92 if np.size(self.env['bottom_depth']) > 1:
93 self._create_bty_ati_file(self.fname_base+'.bty', self.env['bottom_depth'], self.env['bottom_interp'])
94 if self.env['bottom_boundary_condition'] == BHStrings.from_file:
95 self._create_refl_coeff_file(self.fname_base+".brc", self.env['bottom_reflection_coefficient'])
96 if self.env['source_directionality'] is not None:
97 self._create_sbp_file(self.fname_base+'.sbp', self.env['source_directionality'])
100 def _write_env_header(self, fh: TextIO) -> None:
101 """Writes header of env file."""
102 self._print_env_line(fh,"'"+self.env['name']+"'","Bellhop environment name/description")
103 self._print_env_line(fh,self.env['frequency'],"Frequency (Hz)")
104 self._print_env_line(fh,1,"NMedia -- always =1 for Bellhop")
106 def _write_env_surface_depth(self, fh: TextIO) -> None:
107 """Writes surface boundary and depth lines of env file."""
109 svp_interp = FlagMaps.soundspeed_interp_rev[self.env['soundspeed_interp']]
110 svp_boundcond = FlagMaps.surface_boundary_condition_rev[self.env['surface_boundary_condition']]
111 svp_attenuation_units = FlagMaps.attenuation_units_rev[self.env['attenuation_units']]
112 svp_volume_attenuation = FlagMaps.volume_attenuation_rev[self.env['volume_attenuation']]
113 svp_alti = FlagMaps._altimetry_rev[self.env['_altimetry']]
114 svp_singlebeam = FlagMaps._single_beam_rev[self.env['_single_beam']]
116 # Line 4
117 comment = "SSP parameters: Interp / Top Boundary Cond / Attenuation Units / Volume Attenuation)"
118 topopt = self._quoted_opt(svp_interp, svp_boundcond, svp_attenuation_units, svp_volume_attenuation, svp_alti, svp_singlebeam)
119 self._print_env_line(fh,f"{topopt}",comment)
121 if self.env['volume_attenuation'] == BHStrings.francois_garrison:
122 comment = "Francois-Garrison volume attenuation parameters (sal, temp, pH, depth)"
123 self._print_env_line(fh,f"{self.env['_fg_salinity']} {self.env['_fg_temperature']} {self.env['_fg_pH']} {self.env['_fg_depth']}",comment)
125 # Line 4a
126 if self.env['surface_boundary_condition'] == BHStrings.acousto_elastic:
127 comment = "DEPTH_Top (m) TOP_SoundSpeed (m/s) TOP_SoundSpeed_Shear (m/s) TOP_Density (g/cm^3) [ TOP_Absorp [ TOP_Absorp_Shear ] ]"
128 array_str = self._array2str([
129 self.env['_surface_min'],
130 self.env['surface_soundspeed'],
131 self.env['_surface_soundspeed_shear'],
132 self._float(self.env['surface_density'],scale=1/1000),
133 self.env['surface_attenuation'],
134 self.env['_surface_attenuation_shear']
135 ])
136 self._print_env_line(fh,array_str,comment)
138 # Line 4b
139 if self.env['biological_layer_parameters'] is not None:
140 self._write_env_biological(fh, self.env['biological_layer_parameters'])
142 def _write_env_biological(self, fh: TextIO, biol: pd.DataFrame) -> None:
143 """Writes biological layer parameters to env file."""
144 self._print_env_line(fh, biol.shape[0], "N_Biol_Layers / z1 z2 w0 Q a0")
145 for j, row in enumerate(biol.values):
146 self._print_env_line(fh, self._array2str(row), f"biol_{j}")
148 def _write_env_sound_speed(self, fh: TextIO) -> None:
149 """Writes sound speed profile lines of env file."""
151 comment = "[Npts - ignored] [Sigma - ignored] [Depth_Max - inferred]"
152 self._print_env_line(fh,f"{self.env['_mesh_npts']} {self.env['_depth_sigma']} {self.env['_depth_max']}",comment)
154 svp = self.env['soundspeed']
155 if "density" in svp.columns:
156 svp["density"] *= 1/1000 # kg/m^3 -> g/cm^3
158 if self.env['soundspeed_interp'] == BHStrings.quadrilateral:
159 for j in range(svp.shape[0]):
160 # only print a single "dummy" column -- rest of data in .ssp file
161 self._print_env_line(fh,self._array2str([svp.index[j], svp.iloc[j,0]]),f"ssp_{j}")
162 else:
163 for j in range(svp.shape[0]):
164 row_values = [svp.index[j]] + svp.iloc[j,:].tolist()
165 self._print_env_line(fh, self._array2str(row_values), f"ssp_{j}")
167 def _write_env_bottom(self, fh: TextIO) -> None:
168 """Writes bottom boundary lines of env file."""
169 bot_bc = FlagMaps.bottom_boundary_condition_rev[self.env['bottom_boundary_condition']]
170 dp_flag = FlagMaps._bathymetry_rev[self.env['_bathymetry']]
171 bot_str = self._quoted_opt(bot_bc,dp_flag)
172 comment = "BOT_Boundary_cond / BOT_Roughness"
173 self._print_env_line(fh,f"{bot_str} {self.env['bottom_roughness']}",comment)
174 if self.env['bottom_boundary_condition'] == BHStrings.acousto_elastic:
175 comment = "Depth_Max BOT_SoundSpeed BOT_SS_Shear BOT_Density BOT_Absorp BOT_Absorp Shear"
176 array_str = self._array2str([
177 self.env['_bottom_depth'] or self.env['_depth_max'],
178 self.env['bottom_soundspeed'],
179 self.env['_bottom_soundspeed_shear'],
180 self._float(self.env['bottom_density'],scale=1/1000),
181 self.env['bottom_attenuation'],
182 self.env['_bottom_attenuation_shear']
183 ])
184 self._print_env_line(fh,array_str,comment)
185 elif self.env['bottom_boundary_condition'] == BHStrings.grain:
186 comment = "Grain_Depth Grain_Size"
187 array_str = self._array2str([
188 self.env['_bottom_depth'] or self.env['_depth_max'],
189 self.env['bottom_grain_size']
190 ])
191 self._print_env_line(fh,array_str,comment)
193 def _write_env_source_receiver(self, fh: TextIO) -> None:
194 """Writes source and receiver lines of env file."""
195 if self.env._dimension == 2:
196 self._print_array(fh, self.env['source_depth'], nn=self.env['source_ndepth'], label="Source depth (m)")
197 self._print_array(fh, self.env['receiver_depth'], nn=self.env['receiver_ndepth'], label="Receiver depth (m)")
198 self._print_array(fh, self.env['receiver_range']/1000, nn=self.env['receiver_nrange'], label="Receiver range (km)")
199 elif self.env._dimension == 3:
200 self._print_array(fh, self.env['source_range']/1000, nn=self.env['source_nrange'], label="Source range (km)")
201 self._print_array(fh, self.env['source_cross_range']/1000, nn=self.env['source_ncrossrange'], label="Source cross range (km)")
202 self._print_array(fh, self.env['source_depth'], nn=self.env['source_ndepth'], label="Source depth (m)")
203 self._print_array(fh, self.env['receiver_depth'], nn=self.env['receiver_ndepth'], label="Receiver depth (m)")
204 self._print_array(fh, self.env['receiver_range']/1000, nn=self.env['receiver_nrange'], label="Receiver range (km)")
205 self._print_array(fh, self.env['receiver_bearing'], nn=self.env['receiver_nbearing'], label="Receiver bearing (°)")
207 def _write_env_task(self, fh: TextIO, taskcode: str) -> None:
208 """Writes task lines of env file."""
209 beamtype = FlagMaps.beam_type_rev[self.env['beam_type']]
210 beampattern = " " if self.env['source_directionality'] is None else "*"
211 txtype = FlagMaps.source_type_rev[self.env['source_type']]
212 gridtype = FlagMaps.grid_type_rev[self.env['grid_type']]
213 runtype_str = self._quoted_opt(taskcode, beamtype, beampattern, txtype, gridtype)
214 self._print_env_line(fh,f"{runtype_str}","RUN TYPE")
216 def _write_env_beam_footer(self, fh: TextIO) -> None:
217 """Writes beam and footer lines of env file."""
218 self._print_env_line(fh,self._array2str([self.env['beam_num'], self.env['single_beam_index']]),"Num_Beams_Inclination [ Single_Beam_Index ]")
219 self._print_env_line(fh,f"{self.env['beam_angle_min']} {self.env['beam_angle_max']} /","Inclination angle min/max (°)")
220 if self.env['_dimension'] == 3:
221 self._print_env_line(fh,f"{self.env['beam_bearing_num']}","Num_Beams_Bearing")
222 self._print_env_line(fh,f"{self.env['beam_bearing_min']} {self.env['beam_bearing_max']} /","Bearing angle min/max (°)")
223 if self.env['_dimension'] == 2:
224 self._print_env_line(fh,f"{self.env['step_size']} {self.env['simulation_depth']} {self.env['simulation_range'] / 1000}","Step_Size (m), ZBOX (m), RBOX (km)")
225 elif self.env['_dimension'] == 3:
226 self._print_env_line(fh,f"{self.env['step_size']} {self.env['simulation_range'] / 1000} {self.env['simulation_cross_range'] / 1000} {self.env['simulation_depth']}","Step_Size (m), BoxRange (x) (km), BoxCrossRange (y) (km), BoxDepth (z) (m)")
228 def _write_gaussian_params(self, fh: TextIO) -> None:
229 """Read parameters for Cerveny Gaussian Beams, if applicable"""
230 if self.env['beam_type'] not in (BHStrings.cartesian, BHStrings.ray):
231 return None
232 rloop = None if self.env['beam_range_loop'] is None else self.env['beam_range_loop'] / 1000
233 self._print_env_line(fh,self._array2str([self.env['beam_width_type'], self.env['beam_epsilon_multipler'], rloop]),"Beam_width_type Eps_Mult Range_Loop")
234 self._print_env_line(fh,self._array2str([self.env['beam_images_num'], self.env['beam_window'], self.env['beam_component']]),"Beam_width_type Eps_Mult Range_Loop")
236 def _print(self, fh: TextIO, s: str, newline: bool = True) -> None:
237 """Write a line of text with or w/o a newline char to the output file"""
238 fh.write(s+'\n' if newline else s)
240 def _print_env_line(self, fh: TextIO, data: Any, comment: str = "") -> None:
241 """Write a complete line to the .env file with a descriptive comment
243 We do some char counting (well, padding and stripping) to ensure the code comments all start from the same char.
244 """
245 data_str = data if isinstance(data,str) else f"{data}"
246 comment_str = comment if isinstance(comment,str) else f"{comment}"
247 line_str = (data_str + " " * self.env['comment_pad'])[0:max(len(data_str),self.env['comment_pad'])]
248 if comment_str != "":
249 line_str = line_str + " ! " + comment_str
250 self._print(fh,line_str)
252 def _print_array(self, fh: TextIO, a: Any, label: str = "", nn: int | None = None) -> None:
253 """Print a 1D array to the .env file, prefixed by a count of the array length"""
254 na = np.size(a)
255 if nn is None:
256 nn = na
257 if nn == 1 or na == 1:
258 self._print_env_line(fh, 1, f"{label} (single value)")
259 self._print_env_line(fh, f"{a} /",f"{label} (single value)")
260 else:
261 self._print_env_line(fh, nn, f"{label}s ({nn} values)")
262 for j in a:
263 self._print(fh, f"{j} ", newline=False)
264 self._print(fh, " /")
266 def _array2str(self, values: list[Any]) -> str:
267 """Format list into space-separated string, trimmed at first None, ending with '/'."""
268 try:
269 values = values[:values.index(None)]
270 except ValueError:
271 pass
272 return " ".join(
273 f"{v}" if isinstance(v, (int, float)) else str(v)
274 for v in values
275 ) + " /"
277 def _quoted_opt(self, *args: str) -> str:
278 """Concatenate N input BHStrings. strip whitespace, surround with single quotes
279 """
280 combined = "".join(args).strip()
281 return f"'{combined}'"
283 def _float(self, x: float | None, scale: float = 1) -> float | None:
284 """Permissive floatenator"""
285 return None if x is None else float(x) * scale
287 def _create_bty_ati_file(self, filename: str, depth: Any, interp: BHStrings) -> None:
288 """Write data to bathymetry/altimetry file
290 The short/long ("S"/"L") flags are hard-coded to keep it simple.
291 """
292 with open(filename, 'wt') as f:
293 format_flag = "S" if depth.shape[1] == 2 else "L"
294 f.write(f"'{FlagMaps.bottom_interp_rev[interp]}{format_flag}'\n")
295 f.write(str(depth.shape[0])+"\n")
296 if depth.shape[1] == 2:
297 for j in range(depth.shape[0]):
298 f.write(f"{depth[j,0]/1000} {depth[j,1]}\n")
299 elif depth.shape[1] == 7:
300 for j in range(depth.shape[0]):
301 f.write(f"{depth[j,0]/1000} {depth[j,1]} {depth[j,2]} {depth[j,3]} {depth[j,4]} {depth[j,5]} {depth[j,6]}\n")
303 def _create_sbp_file(self, filename: str, dir: Any) -> None:
304 """Write data to sbp file"""
305 with open(filename, 'wt') as f:
306 f.write(str(dir.shape[0])+"\n")
307 for j in range(dir.shape[0]):
308 f.write(f"{dir[j,0]} {dir[j,1]}\n")
310 def _create_refl_coeff_file(self, filename: str, rc: Any) -> None:
311 """Write data to brc/trc file"""
312 with open(filename, 'wt') as f:
313 f.write(str(rc.shape[0])+"\n")
314 for j in range(rc.shape[0]):
315 f.write(f"{rc[j,0]} {rc[j,1]} {rc[j,2]}\n")
317 def _create_ssp_quad_file(self, filename: str, svp: pd.DataFrame) -> None:
318 """Write 2D SSP data to file"""
319 with open(filename, 'wt') as f:
320 f.write(str(svp.shape[1])+"\n") # number of SSP points
321 for j in range(svp.shape[1]):
322 f.write("%0.6f%c" % (svp.columns[j]/1000, '\n' if j == svp.shape[1]-1 else ' '))
323 for k in range(svp.shape[0]):
324 for j in range(svp.shape[1]):
325 f.write("%0.6f%c" % (svp.iloc[k,j], '\n' if j == svp.shape[1]-1 else ' '))