Coverage for python / aubellhop / writers.py: 87%

189 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-24 14:11 +0000

1"""File writing class methods for aubellhop. 

2 

3These files are the input files passed directly into the `bellhop(3d).exe` binaries. 

4 

5The `EnvironmentWriter` class provides encapsulation of methods. It is not intended 

6to be user-facing. It is the end of the chain for env writing: 

7 

8 ~~bellhop.py~~ ~~environment.py~~ ~~writers.py~~ 

9 model_fn.write_env() → env.to_file() → EnvironmentWriter().write() 

10 

11""" 

12 

13from __future__ import annotations 

14 

15from typing import Any, TextIO 

16import numpy as np 

17import pandas as pd 

18from .environment import Environment 

19from .constants import BHStrings, FlagMaps 

20 

21 

22class EnvironmentWriter: 

23 """Bellhop file-writer class which creates the `.env` and related input files.""" 

24 

25 def __init__(self, 

26 env: Environment, 

27 file_handle: TextIO, 

28 fname_base: str, 

29 taskcode: str 

30 ): 

31 """Initialize writer with existing file reference and filename base, plus taskcode. 

32 

33 The filename base is, e.g., the `foo.env` file stripped of its extension. 

34 

35 Although the taskcode can be stored within the environment, it is so common to 

36 override this that we use an explicit function input. 

37 

38 Parameters 

39 ---------- 

40 env : Environment 

41 The Environment instance 

42 fh : file object 

43 File reference (already opened) 

44 fname_base : str 

45 Filename base (without extension) 

46 taskcode : str 

47 Task char which defines the computation to run (`R`, `I`, `C`, etc.) 

48 

49 """ 

50 self.env = env 

51 self.file_handle = file_handle 

52 self.fname_base = fname_base 

53 self.taskcode = taskcode 

54 

55 

56 def write(self) -> None: 

57 """Writes a complete .env file for specifying a Bellhop simulation 

58 

59 Returns 

60 ------- 

61 self.fname_base : str 

62 Filename base (no extension) of written file 

63 

64 We liberally insert comments and empty lines for readability and take care to 

65 ensure that comments are consistently aligned. 

66 This doesn't make a difference to bellhop.exe, it just makes debugging far easier. 

67 """ 

68 

69 fh = self.file_handle 

70 self._print_env_line(fh,"") 

71 self._write_env_header(fh) 

72 self._print_env_line(fh,"") 

73 self._write_env_surface_depth(fh) 

74 self._write_env_sound_speed(fh) 

75 self._print_env_line(fh,"") 

76 self._write_env_bottom(fh) 

77 self._print_env_line(fh,"") 

78 self._write_env_source_receiver(fh) 

79 self._print_env_line(fh,"") 

80 self._write_env_task(fh, self.taskcode) 

81 self._write_env_beam_footer(fh) 

82 self._print_env_line(fh,"") 

83 self._write_gaussian_params(fh) 

84 self._print_env_line(fh,"","End of Bellhop environment file") 

85 

86 if self.env['surface_boundary_condition'] == BHStrings.from_file: 

87 self._create_refl_coeff_file(self.fname_base+".trc", self.env['surface_reflection_coefficient']) 

88 if np.size(self.env["surface_depth"]) > 1: 

89 self._create_bty_ati_file(self.fname_base+'.ati', self.env['surface_depth'], self.env['surface_interp']) 

90 if self.env['soundspeed_interp'] == BHStrings.quadrilateral: 

91 self._create_ssp_quad_file(self.fname_base+'.ssp', self.env['soundspeed']) 

92 if np.size(self.env['bottom_depth']) > 1: 

93 self._create_bty_ati_file(self.fname_base+'.bty', self.env['bottom_depth'], self.env['bottom_interp']) 

94 if self.env['bottom_boundary_condition'] == BHStrings.from_file: 

95 self._create_refl_coeff_file(self.fname_base+".brc", self.env['bottom_reflection_coefficient']) 

96 if self.env['source_directionality'] is not None: 

97 self._create_sbp_file(self.fname_base+'.sbp', self.env['source_directionality']) 

98 

99 

100 def _write_env_header(self, fh: TextIO) -> None: 

101 """Writes header of env file.""" 

102 self._print_env_line(fh,"'"+self.env['name']+"'","Bellhop environment name/description") 

103 self._print_env_line(fh,self.env['frequency'],"Frequency (Hz)") 

104 self._print_env_line(fh,1,"NMedia -- always =1 for Bellhop") 

105 

106 def _write_env_surface_depth(self, fh: TextIO) -> None: 

107 """Writes surface boundary and depth lines of env file.""" 

108 

109 svp_interp = FlagMaps.soundspeed_interp_rev[self.env['soundspeed_interp']] 

110 svp_boundcond = FlagMaps.surface_boundary_condition_rev[self.env['surface_boundary_condition']] 

111 svp_attenuation_units = FlagMaps.attenuation_units_rev[self.env['attenuation_units']] 

112 svp_volume_attenuation = FlagMaps.volume_attenuation_rev[self.env['volume_attenuation']] 

113 svp_alti = FlagMaps._altimetry_rev[self.env['_altimetry']] 

114 svp_singlebeam = FlagMaps._single_beam_rev[self.env['_single_beam']] 

115 

116 # Line 4 

117 comment = "SSP parameters: Interp / Top Boundary Cond / Attenuation Units / Volume Attenuation)" 

118 topopt = self._quoted_opt(svp_interp, svp_boundcond, svp_attenuation_units, svp_volume_attenuation, svp_alti, svp_singlebeam) 

119 self._print_env_line(fh,f"{topopt}",comment) 

120 

121 if self.env['volume_attenuation'] == BHStrings.francois_garrison: 

122 comment = "Francois-Garrison volume attenuation parameters (sal, temp, pH, depth)" 

123 self._print_env_line(fh,f"{self.env['_fg_salinity']} {self.env['_fg_temperature']} {self.env['_fg_pH']} {self.env['_fg_depth']}",comment) 

124 

125 # Line 4a 

126 if self.env['surface_boundary_condition'] == BHStrings.acousto_elastic: 

127 comment = "DEPTH_Top (m) TOP_SoundSpeed (m/s) TOP_SoundSpeed_Shear (m/s) TOP_Density (g/cm^3) [ TOP_Absorp [ TOP_Absorp_Shear ] ]" 

128 array_str = self._array2str([ 

129 self.env['_surface_min'], 

130 self.env['surface_soundspeed'], 

131 self.env['_surface_soundspeed_shear'], 

132 self._float(self.env['surface_density'],scale=1/1000), 

133 self.env['surface_attenuation'], 

134 self.env['_surface_attenuation_shear'] 

135 ]) 

136 self._print_env_line(fh,array_str,comment) 

137 

138 # Line 4b 

139 if self.env['biological_layer_parameters'] is not None: 

140 self._write_env_biological(fh, self.env['biological_layer_parameters']) 

141 

142 def _write_env_biological(self, fh: TextIO, biol: pd.DataFrame) -> None: 

143 """Writes biological layer parameters to env file.""" 

144 self._print_env_line(fh, biol.shape[0], "N_Biol_Layers / z1 z2 w0 Q a0") 

145 for j, row in enumerate(biol.values): 

146 self._print_env_line(fh, self._array2str(row), f"biol_{j}") 

147 

148 def _write_env_sound_speed(self, fh: TextIO) -> None: 

149 """Writes sound speed profile lines of env file.""" 

150 

151 comment = "[Npts - ignored] [Sigma - ignored] [Depth_Max - inferred]" 

152 self._print_env_line(fh,f"{self.env['_mesh_npts']} {self.env['_depth_sigma']} {self.env['_depth_max']}",comment) 

153 

154 svp = self.env['soundspeed'] 

155 if "density" in svp.columns: 

156 svp["density"] *= 1/1000 # kg/m^3 -> g/cm^3 

157 

158 if self.env['soundspeed_interp'] == BHStrings.quadrilateral: 

159 for j in range(svp.shape[0]): 

160 # only print a single "dummy" column -- rest of data in .ssp file 

161 self._print_env_line(fh,self._array2str([svp.index[j], svp.iloc[j,0]]),f"ssp_{j}") 

162 else: 

163 for j in range(svp.shape[0]): 

164 row_values = [svp.index[j]] + svp.iloc[j,:].tolist() 

165 self._print_env_line(fh, self._array2str(row_values), f"ssp_{j}") 

166 

167 def _write_env_bottom(self, fh: TextIO) -> None: 

168 """Writes bottom boundary lines of env file.""" 

169 bot_bc = FlagMaps.bottom_boundary_condition_rev[self.env['bottom_boundary_condition']] 

170 dp_flag = FlagMaps._bathymetry_rev[self.env['_bathymetry']] 

171 bot_str = self._quoted_opt(bot_bc,dp_flag) 

172 comment = "BOT_Boundary_cond / BOT_Roughness" 

173 self._print_env_line(fh,f"{bot_str} {self.env['bottom_roughness']}",comment) 

174 if self.env['bottom_boundary_condition'] == BHStrings.acousto_elastic: 

175 comment = "Depth_Max BOT_SoundSpeed BOT_SS_Shear BOT_Density BOT_Absorp BOT_Absorp Shear" 

176 array_str = self._array2str([ 

177 self.env['_bottom_depth'] or self.env['_depth_max'], 

178 self.env['bottom_soundspeed'], 

179 self.env['_bottom_soundspeed_shear'], 

180 self._float(self.env['bottom_density'],scale=1/1000), 

181 self.env['bottom_attenuation'], 

182 self.env['_bottom_attenuation_shear'] 

183 ]) 

184 self._print_env_line(fh,array_str,comment) 

185 elif self.env['bottom_boundary_condition'] == BHStrings.grain: 

186 comment = "Grain_Depth Grain_Size" 

187 array_str = self._array2str([ 

188 self.env['_bottom_depth'] or self.env['_depth_max'], 

189 self.env['bottom_grain_size'] 

190 ]) 

191 self._print_env_line(fh,array_str,comment) 

192 

193 def _write_env_source_receiver(self, fh: TextIO) -> None: 

194 """Writes source and receiver lines of env file.""" 

195 if self.env._dimension == 2: 

196 self._print_array(fh, self.env['source_depth'], nn=self.env['source_ndepth'], label="Source depth (m)") 

197 self._print_array(fh, self.env['receiver_depth'], nn=self.env['receiver_ndepth'], label="Receiver depth (m)") 

198 self._print_array(fh, self.env['receiver_range']/1000, nn=self.env['receiver_nrange'], label="Receiver range (km)") 

199 elif self.env._dimension == 3: 

200 self._print_array(fh, self.env['source_range']/1000, nn=self.env['source_nrange'], label="Source range (km)") 

201 self._print_array(fh, self.env['source_cross_range']/1000, nn=self.env['source_ncrossrange'], label="Source cross range (km)") 

202 self._print_array(fh, self.env['source_depth'], nn=self.env['source_ndepth'], label="Source depth (m)") 

203 self._print_array(fh, self.env['receiver_depth'], nn=self.env['receiver_ndepth'], label="Receiver depth (m)") 

204 self._print_array(fh, self.env['receiver_range']/1000, nn=self.env['receiver_nrange'], label="Receiver range (km)") 

205 self._print_array(fh, self.env['receiver_bearing'], nn=self.env['receiver_nbearing'], label="Receiver bearing (°)") 

206 

207 def _write_env_task(self, fh: TextIO, taskcode: str) -> None: 

208 """Writes task lines of env file.""" 

209 beamtype = FlagMaps.beam_type_rev[self.env['beam_type']] 

210 beampattern = " " if self.env['source_directionality'] is None else "*" 

211 txtype = FlagMaps.source_type_rev[self.env['source_type']] 

212 gridtype = FlagMaps.grid_type_rev[self.env['grid_type']] 

213 runtype_str = self._quoted_opt(taskcode, beamtype, beampattern, txtype, gridtype) 

214 self._print_env_line(fh,f"{runtype_str}","RUN TYPE") 

215 

216 def _write_env_beam_footer(self, fh: TextIO) -> None: 

217 """Writes beam and footer lines of env file.""" 

218 self._print_env_line(fh,self._array2str([self.env['beam_num'], self.env['single_beam_index']]),"Num_Beams_Inclination [ Single_Beam_Index ]") 

219 self._print_env_line(fh,f"{self.env['beam_angle_min']} {self.env['beam_angle_max']} /","Inclination angle min/max (°)") 

220 if self.env['_dimension'] == 3: 

221 self._print_env_line(fh,f"{self.env['beam_bearing_num']}","Num_Beams_Bearing") 

222 self._print_env_line(fh,f"{self.env['beam_bearing_min']} {self.env['beam_bearing_max']} /","Bearing angle min/max (°)") 

223 if self.env['_dimension'] == 2: 

224 self._print_env_line(fh,f"{self.env['step_size']} {self.env['simulation_depth']} {self.env['simulation_range'] / 1000}","Step_Size (m), ZBOX (m), RBOX (km)") 

225 elif self.env['_dimension'] == 3: 

226 self._print_env_line(fh,f"{self.env['step_size']} {self.env['simulation_range'] / 1000} {self.env['simulation_cross_range'] / 1000} {self.env['simulation_depth']}","Step_Size (m), BoxRange (x) (km), BoxCrossRange (y) (km), BoxDepth (z) (m)") 

227 

228 def _write_gaussian_params(self, fh: TextIO) -> None: 

229 """Read parameters for Cerveny Gaussian Beams, if applicable""" 

230 if self.env['beam_type'] not in (BHStrings.cartesian, BHStrings.ray): 

231 return None 

232 rloop = None if self.env['beam_range_loop'] is None else self.env['beam_range_loop'] / 1000 

233 self._print_env_line(fh,self._array2str([self.env['beam_width_type'], self.env['beam_epsilon_multipler'], rloop]),"Beam_width_type Eps_Mult Range_Loop") 

234 self._print_env_line(fh,self._array2str([self.env['beam_images_num'], self.env['beam_window'], self.env['beam_component']]),"Beam_width_type Eps_Mult Range_Loop") 

235 

236 def _print(self, fh: TextIO, s: str, newline: bool = True) -> None: 

237 """Write a line of text with or w/o a newline char to the output file""" 

238 fh.write(s+'\n' if newline else s) 

239 

240 def _print_env_line(self, fh: TextIO, data: Any, comment: str = "") -> None: 

241 """Write a complete line to the .env file with a descriptive comment 

242 

243 We do some char counting (well, padding and stripping) to ensure the code comments all start from the same char. 

244 """ 

245 data_str = data if isinstance(data,str) else f"{data}" 

246 comment_str = comment if isinstance(comment,str) else f"{comment}" 

247 line_str = (data_str + " " * self.env['comment_pad'])[0:max(len(data_str),self.env['comment_pad'])] 

248 if comment_str != "": 

249 line_str = line_str + " ! " + comment_str 

250 self._print(fh,line_str) 

251 

252 def _print_array(self, fh: TextIO, a: Any, label: str = "", nn: int | None = None) -> None: 

253 """Print a 1D array to the .env file, prefixed by a count of the array length""" 

254 na = np.size(a) 

255 if nn is None: 

256 nn = na 

257 if nn == 1 or na == 1: 

258 self._print_env_line(fh, 1, f"{label} (single value)") 

259 self._print_env_line(fh, f"{a} /",f"{label} (single value)") 

260 else: 

261 self._print_env_line(fh, nn, f"{label}s ({nn} values)") 

262 for j in a: 

263 self._print(fh, f"{j} ", newline=False) 

264 self._print(fh, " /") 

265 

266 def _array2str(self, values: list[Any]) -> str: 

267 """Format list into space-separated string, trimmed at first None, ending with '/'.""" 

268 try: 

269 values = values[:values.index(None)] 

270 except ValueError: 

271 pass 

272 return " ".join( 

273 f"{v}" if isinstance(v, (int, float)) else str(v) 

274 for v in values 

275 ) + " /" 

276 

277 def _quoted_opt(self, *args: str) -> str: 

278 """Concatenate N input BHStrings. strip whitespace, surround with single quotes 

279 """ 

280 combined = "".join(args).strip() 

281 return f"'{combined}'" 

282 

283 def _float(self, x: float | None, scale: float = 1) -> float | None: 

284 """Permissive floatenator""" 

285 return None if x is None else float(x) * scale 

286 

287 def _create_bty_ati_file(self, filename: str, depth: Any, interp: BHStrings) -> None: 

288 """Write data to bathymetry/altimetry file 

289 

290 The short/long ("S"/"L") flags are hard-coded to keep it simple. 

291 """ 

292 with open(filename, 'wt') as f: 

293 format_flag = "S" if depth.shape[1] == 2 else "L" 

294 f.write(f"'{FlagMaps.bottom_interp_rev[interp]}{format_flag}'\n") 

295 f.write(str(depth.shape[0])+"\n") 

296 if depth.shape[1] == 2: 

297 for j in range(depth.shape[0]): 

298 f.write(f"{depth[j,0]/1000} {depth[j,1]}\n") 

299 elif depth.shape[1] == 7: 

300 for j in range(depth.shape[0]): 

301 f.write(f"{depth[j,0]/1000} {depth[j,1]} {depth[j,2]} {depth[j,3]} {depth[j,4]} {depth[j,5]} {depth[j,6]}\n") 

302 

303 def _create_sbp_file(self, filename: str, dir: Any) -> None: 

304 """Write data to sbp file""" 

305 with open(filename, 'wt') as f: 

306 f.write(str(dir.shape[0])+"\n") 

307 for j in range(dir.shape[0]): 

308 f.write(f"{dir[j,0]} {dir[j,1]}\n") 

309 

310 def _create_refl_coeff_file(self, filename: str, rc: Any) -> None: 

311 """Write data to brc/trc file""" 

312 with open(filename, 'wt') as f: 

313 f.write(str(rc.shape[0])+"\n") 

314 for j in range(rc.shape[0]): 

315 f.write(f"{rc[j,0]} {rc[j,1]} {rc[j,2]}\n") 

316 

317 def _create_ssp_quad_file(self, filename: str, svp: pd.DataFrame) -> None: 

318 """Write 2D SSP data to file""" 

319 with open(filename, 'wt') as f: 

320 f.write(str(svp.shape[1])+"\n") # number of SSP points 

321 for j in range(svp.shape[1]): 

322 f.write("%0.6f%c" % (svp.columns[j]/1000, '\n' if j == svp.shape[1]-1 else ' ')) 

323 for k in range(svp.shape[0]): 

324 for j in range(svp.shape[1]): 

325 f.write("%0.6f%c" % (svp.iloc[k,j], '\n' if j == svp.shape[1]-1 else ' '))