Coverage for python/bellhop/bellhop.py: 99%

220 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 12:04 +0000

1 

2import os as _os 

3import subprocess as _proc 

4import shutil 

5 

6from tempfile import mkstemp as _mkstemp 

7from typing import Any, Dict, List, Optional, Tuple, TextIO 

8 

9import numpy as _np 

10import pandas as _pd 

11 

12from .constants import Defaults, _Strings, _Maps, _File_Ext 

13from .environment import Environment 

14from .readers import read_shd, read_arrivals, read_rays 

15 

16class Bellhop: 

17 """ 

18 Interface to the Bellhop 2D underwater acoustics ray tracing propagation model. 

19 

20 Two public methods are defined: `supports()` and `run()`. 

21 Both take arguments of environment and task, and respectively 

22 report whether the executable can perform the task, and to do so. 

23 

24 Parameters 

25 ---------- 

26 name : str 

27 User-fancing name for the model 

28 exe : str 

29 Filename of Bellhop executable 

30 """ 

31 

32 def __init__(self, name: str = Defaults.model_name, 

33 exe: str = Defaults.exe, 

34 env_comment_pad: int = Defaults.env_comment_pad, 

35 ) -> None: 

36 self.name: str = name 

37 self.exe: str = exe 

38 self.env_comment_pad: int = env_comment_pad 

39 

40 def supports(self, env: Optional[Environment] = None, 

41 task: Optional[str] = None, 

42 exe: Optional[str] = None, 

43 ) -> bool: 

44 """Check whether the model supports the task. 

45 

46 This function is supposed to diagnose whether this combination of environment 

47 and task is supported by the model.""" 

48 

49 which_bool = shutil.which(exe or self.exe) is not None 

50 task_bool = task is None or task in self.taskmap 

51 

52 return (which_bool and task_bool) 

53 

54 def run(self, env: Environment, 

55 task: str, 

56 debug: bool = False, 

57 fname_base: Optional[str] = None, 

58 ) -> Any: 

59 """ 

60 High-level interface function which runs the model. 

61 

62 The function definition performs setup and cleanup tasks 

63 and passes the execution off to an auxiliary function. 

64 

65 Uses the `taskmap` data structure to relate input flags to 

66 processng stages, in particular how to select specific "tasks" 

67 to be executed. 

68 """ 

69 

70 task_flag, load_task_data, task_ext = self.taskmap[task] 

71 

72 fd, fname_base = self._prepare_env_file(fname_base) 

73 with _os.fdopen(fd, "w") as fh: 

74 self._create_env_file(env, task_flag, fh, fname_base) 

75 

76 self._run_exe(fname_base) 

77 results = load_task_data(fname_base + task_ext) 

78 

79 if debug: 

80 print('[DEBUG] Bellhop working files NOT deleted: '+fname_base+'.*') 

81 else: 

82 self._rm_files(fname_base) 

83 

84 return results 

85 

86 @property 

87 def taskmap(self) -> Dict[Any, List[Any]]: 

88 """Dictionary which maps tasks to execution functions and their parameters""" 

89 return { 

90 _Strings.arrivals: ['A', read_arrivals, _File_Ext.arr], 

91 _Strings.eigenrays: ['E', read_rays, _File_Ext.ray], 

92 _Strings.rays: ['R', read_rays, _File_Ext.ray], 

93 _Strings.coherent: ['C', read_shd, _File_Ext.shd], 

94 _Strings.incoherent: ['I', read_shd, _File_Ext.shd], 

95 _Strings.semicoherent: ['S', read_shd, _File_Ext.shd], 

96 } 

97 

98 def _prepare_env_file(self, fname_base: Optional[str]) -> Tuple[int, str]: 

99 """Opens a file for writing the .env file, in a temp location if necessary, and delete other files with same basename. 

100 

101 Parameters 

102 ---------- 

103 fname_base : str, optional 

104 Filename base (no extension) for writing -- if not specified a temporary file (and location) will be used instead 

105 

106 Returns 

107 ------- 

108 fh : int 

109 File descriptor 

110 fname_base : str 

111 Filename base 

112 """ 

113 if fname_base is not None: 

114 fname = fname_base + _File_Ext.env 

115 fh = _os.open(_os.path.abspath(fname), _os.O_WRONLY | _os.O_CREAT | _os.O_TRUNC) 

116 else: 

117 fh, fname = _mkstemp(suffix = _File_Ext.env) 

118 fname_base = fname[:-len(_File_Ext.env)] 

119 self._rm_files(fname_base, not_env=True) # delete all other files 

120 return fh, fname_base 

121 

122 def _rm_files(self, fname_base: str, not_env: bool = False) -> None: 

123 """Remove files that would be constructed as bellhop inputs or created as bellhop outputs.""" 

124 all_ext = [v for k, v in vars(_File_Ext).items() if not k.startswith('_')] 

125 if not_env: 

126 all_ext.remove(_File_Ext.env) 

127 for ext in all_ext: 

128 self._unlink(fname_base + ext) 

129 

130 def _run_exe(self, fname_base: str, 

131 args: str = "", 

132 debug: bool = False, 

133 exe: Optional[str] = None, 

134 ) -> None: 

135 """Run the executable and raise exceptions if there are errors.""" 

136 

137 exe_path = shutil.which(exe or self.exe) 

138 if exe_path is None: 

139 raise FileNotFoundError(f"Executable ({exe_path}) not found in PATH.") 

140 

141 runcmd = [exe_path, fname_base] + args.split() 

142 if debug: 

143 print("RUNNING:", " ".join(runcmd)) 

144 result = _proc.run(runcmd, stderr=_proc.STDOUT, stdout=_proc.PIPE, text=True) 

145 

146 if debug and result.stdout: 

147 print(result.stdout.strip()) 

148 

149 if result.returncode != 0: 

150 err = self._check_error(fname_base) 

151 raise RuntimeError( 

152 f"Execution of '{exe_path}' failed with return code {result.returncode}.\n" 

153 f"\nCommand: {' '.join(runcmd)}\n" 

154 f"\nOutput:\n{result.stdout.strip()}\n" 

155 f"\nExtract from PRT file:\n{err}" 

156 ) 

157 

158 

159 def _check_error(self, fname_base: str) -> Optional[str]: 

160 """Extracts Bellhop error text from the .prt file""" 

161 try: 

162 err = "" 

163 fatal = False 

164 with open(fname_base + _File_Ext.prt, 'rt') as f: 

165 for s in f: 

166 if fatal and len(s.strip()) > 0: 

167 err += '[FATAL] ' + s.strip() + '\n' 

168 if '*** FATAL ERROR ***' in s: 

169 fatal = True 

170 except FileNotFoundError: 

171 pass 

172 return err if err != "" else None 

173 

174 def _unlink(self, f: str) -> None: 

175 """Delete file only if it exists""" 

176 try: 

177 _os.unlink(f) 

178 except FileNotFoundError: 

179 pass 

180 

181 def _create_env_file(self, env: Environment, taskcode: str, fh: TextIO, fname_base: str) -> None: 

182 """Writes a complete .env file for specifying a Bellhop simulation 

183 

184 Parameters 

185 ---------- 

186 env : dict 

187 Environment dict 

188 taskcode : str 

189 Task string which defines the computation to run 

190 fh : file object 

191 File reference (already opened) 

192 fname_base : str 

193 Filename base (without extension) 

194 :returns fname_base: filename base (no extension) of written file 

195 

196 We liberally insert comments and empty lines for readability and take care to 

197 ensure that comments are consistently aligned. 

198 This doesn't make a difference to bellhop.exe, it just makes debugging far easier. 

199 """ 

200 

201 self._print_env_line(fh,"") 

202 self._write_env_header(fh, env) 

203 self._print_env_line(fh,"") 

204 self._write_env_surface_depth(fh, env) 

205 self._write_env_sound_speed(fh, env) 

206 self._print_env_line(fh,"") 

207 self._write_env_bottom(fh, env) 

208 self._print_env_line(fh,"") 

209 self._write_env_source_receiver(fh, env) 

210 self._print_env_line(fh,"") 

211 self._write_env_task(fh, env, taskcode) 

212 self._write_env_beam_footer(fh, env) 

213 self._print_env_line(fh,"","End of Bellhop environment file") 

214 

215 if env['surface_boundary_condition'] == _Strings.from_file: 

216 self._create_refl_coeff_file(fname_base+".trc", env['surface_reflection_coefficient']) 

217 if env['surface'] is not None: 

218 self._create_bty_ati_file(fname_base+'.ati', env['surface'], env['surface_interp']) 

219 if env['soundspeed_interp'] == _Strings.quadrilateral: 

220 self._create_ssp_quad_file(fname_base+'.ssp', env['soundspeed']) 

221 if _np.size(env['depth']) > 1: 

222 self._create_bty_ati_file(fname_base+'.bty', env['depth'], env['depth_interp']) 

223 if env['bottom_boundary_condition'] == _Strings.from_file: 

224 self._create_refl_coeff_file(fname_base+".brc", env['bottom_reflection_coefficient']) 

225 if env['source_directionality'] is not None: 

226 self._create_sbp_file(fname_base+'.sbp', env['source_directionality']) 

227 

228 def _write_env_header(self, fh: TextIO, env: Environment) -> None: 

229 """Writes header of env file.""" 

230 self._print_env_line(fh,"'"+env['name']+"'","Bellhop environment name/description") 

231 self._print_env_line(fh,env['frequency'],"Frequency (Hz)") 

232 self._print_env_line(fh,1,"NMedia -- always =1 for Bellhop") 

233 

234 def _write_env_surface_depth(self, fh: TextIO, env: Environment) -> None: 

235 """Writes surface boundary and depth lines of env file.""" 

236 

237 svp_interp = _Maps.soundspeed_interp_rev[env['soundspeed_interp']] 

238 svp_boundcond = _Maps.surface_boundary_condition_rev[env['surface_boundary_condition']] 

239 svp_attenuation_units = _Maps.attenuation_units_rev[env['attenuation_units']] 

240 svp_volume_attenuation = _Maps.volume_attenuation_rev[env['volume_attenuation']] 

241 svp_alti = _Maps._altimetry_rev[env['_altimetry']] 

242 svp_singlebeam = _Maps._single_beam_rev[env['_single_beam']] 

243 

244 comment = "SSP parameters: Interp / Top Boundary Cond / Attenuation Units / Volume Attenuation)" 

245 topopt = self._quoted_opt(svp_interp, svp_boundcond, svp_attenuation_units, svp_volume_attenuation, svp_alti, svp_singlebeam) 

246 self._print_env_line(fh,f"{topopt}",comment) 

247 

248 if env['volume_attenuation'] == _Strings.francois_garrison: 

249 comment = "Francois-Garrison volume attenuation parameters (sal, temp, pH, depth)" 

250 self._print_env_line(fh,f"{env['fg_salinity']} {env['fg_temperature']} {env['fg_pH']} {env['fg_depth']}",comment) 

251 

252 if env['surface_boundary_condition'] == _Strings.acousto_elastic: 

253 comment = "DEPTH_Top (m) TOP_SoundSpeed (m/s) TOP_SoundSpeed_Shear (m/s) TOP_Density (g/cm^3) [ TOP_Absorp [ TOP_Absorp_Shear ] ]" 

254 array_str = self._array2str([ 

255 env['depth_max'], 

256 env['surface_soundspeed'], 

257 env['_surface_soundspeed_shear'], 

258 self._float(env['surface_density'],scale=1/1000), 

259 env['surface_attenuation'], 

260 env['_surface_attenuation_shear'] 

261 ]) 

262 self._print_env_line(fh,array_str,comment) 

263 

264 comment = "[Npts - ignored] [Sigma - ignored] Depth_Max" 

265 self._print_env_line(fh,f"{env['_mesh_npts']} {env['_depth_sigma']} {env['depth_max']}",comment) 

266 

267 def _write_env_sound_speed(self, fh: TextIO, env: Environment) -> None: 

268 """Writes sound speed profile lines of env file.""" 

269 svp = env['soundspeed'] 

270 svp_interp = _Maps.soundspeed_interp_rev[env['soundspeed_interp']] 

271 if isinstance(svp, _pd.DataFrame) and len(svp.columns) == 1: 

272 svp = _np.hstack((_np.array([svp.index]).T, _np.asarray(svp))) 

273 if svp.size == 1: 

274 self._print_env_line(fh,self._array2str([0.0, svp]),"Min_Depth SSP_Const") 

275 self._print_env_line(fh,self._array2str([env['depth_max'], svp]),"Max_Depth SSP_Const") 

276 elif svp_interp == "Q": 

277 for j in range(svp.shape[0]): 

278 self._print_env_line(fh,self._array2str([svp.index[j], svp.iloc[j,0]]),f"ssp_{j}") 

279 else: 

280 for j in range(svp.shape[0]): 

281 self._print_env_line(fh,self._array2str([svp[j,0], svp[j,1]]),f"ssp_{j}") 

282 

283 def _write_env_bottom(self, fh: TextIO, env: Environment) -> None: 

284 """Writes bottom boundary lines of env file.""" 

285 bot_bc = _Maps.bottom_boundary_condition_rev[env['bottom_boundary_condition']] 

286 dp_flag = _Maps._bathymetry_rev[env['_bathymetry']] 

287 bot_str = self._quoted_opt(bot_bc,dp_flag) 

288 comment = "BOT_Boundary_cond / BOT_Roughness" 

289 self._print_env_line(fh,f"{bot_str} {env['bottom_roughness']}",comment) 

290 if env['bottom_boundary_condition'] == "acousto-elastic": 

291 comment = "Depth_Max BOT_SoundSpeed BOT_SS_Shear BOT_Density BOT_Absorp BOT_Absorp Shear" 

292 array_str = self._array2str([ 

293 env['depth_max'], 

294 env['bottom_soundspeed'], 

295 env['_bottom_soundspeed_shear'], 

296 self._float(env['bottom_density'],scale=1/1000), 

297 env['bottom_attenuation'], 

298 env['_bottom_attenuation_shear'] 

299 ]) 

300 self._print_env_line(fh,array_str,comment) 

301 

302 def _write_env_source_receiver(self, fh: TextIO, env: Environment) -> None: 

303 """Writes source and receiver lines of env file.""" 

304 self._print_array(fh, env['source_depth'], nn=env['source_ndepth'], label="Source depth (m)") 

305 self._print_array(fh, env['receiver_depth'], nn=env['receiver_ndepth'], label="Receiver depth (m)") 

306 self._print_array(fh, env['receiver_range']/1000, nn=env['receiver_nrange'], label="Receiver range (km)") 

307 

308 def _write_env_task(self, fh: TextIO, env: Environment, taskcode: str) -> None: 

309 """Writes task lines of env file.""" 

310 beamtype = _Maps.beam_type_rev[env['beam_type']] 

311 beampattern = " " if env['source_directionality'] is None else "*" 

312 txtype = _Maps.source_type_rev[env['source_type']] 

313 gridtype = _Maps.grid_type_rev[env['grid_type']] 

314 runtype_str = self._quoted_opt(taskcode, beamtype, beampattern, txtype, gridtype) 

315 self._print_env_line(fh,f"{runtype_str}","RUN TYPE") 

316 

317 def _write_env_beam_footer(self, fh: TextIO, env: Environment) -> None: 

318 """Writes beam and footer lines of env file.""" 

319 self._print_env_line(fh,self._array2str([env['beam_num'], env['single_beam_index']]),"Num_Beams [ Single_Beam_Index ]") 

320 self._print_env_line(fh,f"{env['beam_angle_min']} {env['beam_angle_max']} /","ALPHA1,2 (degrees)") 

321 self._print_env_line(fh,f"{env['step_size']} {env['box_depth']} {env['box_range'] / 1000}","Step_Size (m), ZBOX (m), RBOX (km)") 

322 

323 def _print(self, fh: TextIO, s: str, newline: bool = True) -> None: 

324 """Write a line of text with or w/o a newline char to the output file""" 

325 fh.write(s+'\n' if newline else s) 

326 

327 def _print_env_line(self, fh: TextIO, data: Any, comment: str = "") -> None: 

328 """Write a complete line to the .env file with a descriptive comment 

329 

330 We do some char counting (well, padding and stripping) to ensure the code comments all start from the same char. 

331 """ 

332 data_str = data if isinstance(data,str) else f"{data}" 

333 comment_str = comment if isinstance(comment,str) else f"{comment}" 

334 line_str = (data_str + " " * self.env_comment_pad)[0:max(len(data_str),self.env_comment_pad)] 

335 if comment_str != "": 

336 line_str = line_str + " ! " + comment_str 

337 self._print(fh,line_str) 

338 

339 def _print_array(self, fh: TextIO, a: Any, label: str = "", nn: Optional[int] = None) -> None: 

340 """Print a 1D array to the .env file, prefixed by a count of the array length""" 

341 na = _np.size(a) 

342 if nn is None: 

343 nn = na 

344 if nn == 1 or na == 1: 

345 self._print_env_line(fh, 1, f"{label} (single value)") 

346 self._print_env_line(fh, f"{a} /",f"{label} (single value)") 

347 else: 

348 self._print_env_line(fh, nn, f"{label}s ({nn} values)") 

349 for j in a: 

350 self._print(fh, f"{j} ", newline=False) 

351 self._print(fh, " /") 

352 

353 def _array2str(self, values: List[Any]) -> str: 

354 """Format list into space-separated string, trimmed at first None, ending with '/'.""" 

355 try: 

356 values = values[:values.index(None)] 

357 except ValueError: 

358 pass 

359 return " ".join( 

360 f"{v}" if isinstance(v, (int, float)) else str(v) 

361 for v in values 

362 ) + " /" 

363 

364 def _create_bty_ati_file(self, filename: str, depth: Any, interp: _Strings) -> None: 

365 with open(filename, 'wt') as f: 

366 f.write(f"'{_Maps.depth_interp_rev[interp]}'\n") 

367 f.write(str(depth.shape[0])+"\n") 

368 for j in range(depth.shape[0]): 

369 f.write(f"{depth[j,0]/1000} {depth[j,1]}\n") 

370 

371 def _create_sbp_file(self, filename: str, dir: Any) -> None: 

372 """Write data to sbp file""" 

373 with open(filename, 'wt') as f: 

374 f.write(str(dir.shape[0])+"\n") 

375 for j in range(dir.shape[0]): 

376 f.write(f"{dir[j,0]} {dir[j,1]}\n") 

377 

378 def _create_refl_coeff_file(self, filename: str, rc: Any) -> None: 

379 """Write data to brc/trc file""" 

380 with open(filename, 'wt') as f: 

381 f.write(str(rc.shape[0])+"\n") 

382 for j in range(rc.shape[0]): 

383 f.write(f"{rc[j,0]} {rc[j,1]} {rc[j,2]}\n") 

384 

385 def _create_ssp_quad_file(self, filename: str, svp: _pd.DataFrame) -> None: 

386 """Write 2D SSP data to file""" 

387 with open(filename, 'wt') as f: 

388 f.write(str(svp.shape[1])+"\n") # number of SSP points 

389 for j in range(svp.shape[1]): 

390 f.write("%0.6f%c" % (svp.columns[j]/1000, '\n' if j == svp.shape[1]-1 else ' ')) 

391 for k in range(svp.shape[0]): 

392 for j in range(svp.shape[1]): 

393 f.write("%0.6f%c" % (svp.iloc[k,j], '\n' if j == svp.shape[1]-1 else ' ')) 

394 

395 def _quoted_opt(self, *args: str) -> str: 

396 """Concatenate N input _Strings. strip whitespace, surround with single quotes 

397 """ 

398 combined = "".join(args).strip() 

399 return f"'{combined}'" 

400 

401 def _float(self, x: Optional[float], scale: float = 1) -> Optional[float]: 

402 """Permissive floatenator""" 

403 return None if x is None else float(x) * scale