Coverage for python/bellhop/readers.py: 100%

364 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 12:04 +0000

1 

2import os 

3 

4from struct import unpack as _unpack 

5from pathlib import Path 

6from typing import Any, Optional, Tuple, Union, TextIO, List, cast, IO 

7from numpy.typing import NDArray 

8 

9import numpy as _np 

10import pandas as _pd 

11from bellhop.constants import _Strings, _Maps, _File_Ext 

12from bellhop.environment import Environment 

13 

14def _read_next_valid_line(f: TextIO) -> str: 

15 """Read the next valid text line of an input file, discarding empty content. 

16 

17 Args: 

18 f: File handle to read from 

19 

20 Returns: 

21 Non-empty line with comments and whitespace removed 

22 

23 Raises: 

24 EOFError: If end of file reached without finding valid content 

25 """ 

26 while True: 

27 raw_line = f.readline() 

28 if not raw_line: # EOF 

29 raise EOFError("End of file reached before finding a valid line") 

30 line = raw_line.split('!', 1)[0].strip() 

31 if line: 

32 return line 

33 

34def _parse_line(line: str) -> list[str]: 

35 """Parse a line, removing comments, /, and whitespace, and return the parts in a list""" 

36 line = line.split("!", 1)[0].split('/', 1)[0].strip() 

37 return line.split() 

38 

39def _unquote_string(line: str) -> str: 

40 """Extract string from within single quotes, possibly with commas too.""" 

41 return line.strip().strip(",'") 

42 

43def _parse_vector(f: TextIO, dtype: type = float) -> Tuple[NDArray[_np.float64], int]: 

44 """Parse a vector that starts with count then values, ending with '/'""" 

45 

46 # First line is the count 

47 line = _read_next_valid_line(f) 

48 linecount = int(_parse_line(line)[0]) 

49 

50 # Second line has the values 

51 values_line = _read_next_valid_line(f) 

52 parts = _parse_line(values_line) 

53 val = [dtype(p) for p in parts] 

54 

55 valout = _np.array(val) if len(val) > 1 else val[0] 

56 return valout, linecount 

57 

58def _read_ssp_points(f: TextIO) -> _pd.DataFrame: 

59 """Read sound speed profile points until we find the bottom boundary line 

60 

61 Default values are according to 'EnvironmentalFile.htm'.""" 

62 

63 ssp_depth: list[float] = [] 

64 ssp_speed: list[float] = [] 

65 ssp = dict(depth=0.0, speed=1500.0, speed_shear=0.0, density=1000.0, att=0.0, att_shear=0.0) 

66 

67 while True: 

68 line = f.readline() 

69 if not line: 

70 raise EOFError("File ended during env file reading of SSP points.") 

71 line = line.strip() 

72 if not line: # completely empty line 

73 continue 

74 if line.startswith("'"): # Check if this is a bottom boundary line (starts with quote) 

75 # This is the bottom boundary line, put it back 

76 f.seek(f.tell() - len(line.encode()) - 1) 

77 break 

78 

79 parts = (_parse_line(line) + [None] * 6)[0:6] 

80 if parts[0] is None: # empty line after stripping comments 

81 continue 

82 ssp.update({ 

83 k: float(v) if v is not None else ssp[k] for k, v in zip(ssp.keys(), parts) 

84 }) 

85 ssp_depth.append(ssp["depth"]) 

86 ssp_speed.append(ssp["speed"]) 

87 # TODO: add extra terms (but this needs adjustments elsewhere) 

88 

89 if len(ssp_speed) == 0: 

90 raise ValueError("No SSP points were found in the env file.") 

91 elif len(ssp_speed) == 1: 

92 raise ValueError("Only one SSP point found but at least two required (top and bottom)") 

93 

94 df = _pd.DataFrame(ssp_speed,index=ssp_depth,columns=["speed"]) 

95 df.index.name = "depth" 

96 return df 

97 

98def _opt_lookup(name: str, opt: str, _map: dict[str, _Strings]) -> Optional[str]: 

99 opt_str = _map.get(opt) 

100 if opt_str is None: 

101 raise ValueError(f"{name} option {opt!r} not available") 

102 return opt_str 

103 

104def _float(x: Any, scale: float = 1) -> Optional[float]: 

105 """Permissive float-enator with unit scaling""" 

106 return None if x is None else float(x) * scale 

107 

108def _int(x: Any) -> Optional[int]: 

109 """Permissive int-enator""" 

110 return None if x is None else int(x) 

111 

112def _prepare_filename(fname: str, ext: str, name: str) -> Tuple[str,str]: 

113 """Checks filename is present and file exists.""" 

114 if fname.endswith(ext): 

115 nchar = len(ext) 

116 fname_base = fname[:-nchar] 

117 else: 

118 fname_base = fname 

119 fname = fname + ext 

120 

121 if not os.path.exists(fname): 

122 raise FileNotFoundError(f"{name} file not found: {fname}") 

123 

124 return fname, fname_base 

125 

126def read_env(fname: str) -> Environment: 

127 """Read a 2D underwater environment from a BELLHOP .env file. 

128 

129 This function parses a BELLHOP .env file and returns a Python data structure 

130 that is compatible with create_env(). This enables round-trip testing and 

131 compatibility between file-based and programmatic environment definitions. 

132 

133 Parameters 

134 ---------- 

135 fname : str 

136 Path to .env file (with or without .env extension) 

137 

138 Returns 

139 ------- 

140 dict 

141 Environment dictionary compatible with create_env() 

142 

143 Notes 

144 ----- 

145 **Unit conversions performed:** 

146 

147 - Receiver ranges: km → m 

148 - Bottom density: g/cm³ → kg/m³ 

149 - All other units preserved as in ENV file 

150 

151 Examples 

152 -------- 

153 >>> import bellhop as bh 

154 >>> env = bh.read_env('examples/Munk/MunkB_ray.env') 

155 >>> print(env['name']) 

156 'Munk profile' 

157 >>> print(env['frequency']) 

158 50.0 

159 

160 >>> # Use with existing functions 

161 >>> checked_env = bh.check_env(env) 

162 >>> rays = bh.compute_rays(env) 

163 

164 >>> # Round-trip compatibility 

165 >>> env_orig = bh.create_env(name="test", frequency=100) 

166 >>> # ... write to file via BELLHOP ... 

167 >>> env_read = bh.read_env("test.env") 

168 >>> assert env_read['frequency'] == env_orig['frequency'] 

169 

170 """ 

171 

172 reader = EnvironmentReader(fname) 

173 return reader.read() 

174 

175class EnvironmentReader: 

176 """Read and parse Bellhop environment files. 

177 

178 Although this class is only used for one task, 

179 the use of a class provides the clearest code interface compared 

180 to nested functions, which either implicitly set 

181 dict parameters, or have many repeated and superfluous 

182 arguments as dicts are passed in and returned at 

183 each stage. 

184 """ 

185 

186 def __init__(self, fname: str): 

187 """Initialize reader with filename. 

188 

189 Args: 

190 fname: Path to .env file (with or without extension) 

191 """ 

192 self.fname, self.fname_base = _prepare_filename(fname, _File_Ext.env, "Environment") 

193 self.env: Environment = Environment() 

194 

195 def read(self) -> Environment: 

196 """Do the reading...""" 

197 with open(self.fname, 'r') as f: 

198 self._read_header(f) 

199 self._read_top_boundary(f) 

200 self._read_sound_speed_profile(f) 

201 self._read_bottom_boundary(f) 

202 self._read_sources_receivers_task(f) 

203 self._read_beams_limits(f) 

204 return self.env 

205 

206 def _read_header(self, f: TextIO) -> None: 

207 """Read environment file header""" 

208 

209 # Line 1: Title 

210 title_line = _read_next_valid_line(f) 

211 self.env['name'] = _unquote_string(title_line) 

212 # Line 2: Frequency 

213 freq_line = _read_next_valid_line(f) 

214 self.env['frequency'] = float(_parse_line(freq_line)[0]) 

215 # Line 3: NMedia (should be 1 for BELLHOP) 

216 nmedia_line = _read_next_valid_line(f) 

217 self.env["_num_media"] = int(_parse_line(nmedia_line)[0]) 

218 

219 def _read_top_boundary(self, f: TextIO) -> None: 

220 """Read environment file top boundary options (multiple lines)""" 

221 

222 # Line 4: Top boundary options 

223 topopt_line = _read_next_valid_line(f) 

224 topopt = _unquote_string(topopt_line) + " " 

225 self.env["soundspeed_interp"] = _opt_lookup("Interpolation", topopt[0], _Maps.soundspeed_interp) 

226 self.env["surface_boundary_condition"] = _opt_lookup("Top boundary condition", topopt[1], _Maps.surface_boundary_condition) 

227 self.env["attenuation_units"] = _opt_lookup("Attenuation units", topopt[2], _Maps.attenuation_units) 

228 self.env["volume_attenuation"] = _opt_lookup("Volume attenuation", topopt[3], _Maps.volume_attenuation) 

229 self.env["_altimetry"] = _opt_lookup("Altimetry", topopt[4], _Maps._altimetry) 

230 self.env["_single_beam"] = _opt_lookup("Single beam", topopt[5], _Maps._single_beam) 

231 if self.env["_altimetry"] == _Strings.from_file: 

232 self.env["surface"], self.env["surface_interp"] = read_ati(self.fname_base) 

233 

234 # Line 4a: Volume attenuation params 

235 if self.env["volume_attenuation"] == _Strings.francois_garrison: 

236 fg_spec_line = _read_next_valid_line(f) 

237 fg_parts = _parse_line(fg_spec_line) 

238 self.env["fg_salinity"] = float(fg_parts[0]) 

239 self.env["fg_temperature"] = float(fg_parts[1]) 

240 self.env["fg_pH"] = float(fg_parts[2]) 

241 self.env["fg_depth"] = float(fg_parts[3]) 

242 

243 # Line 4b: Boundary condition params 

244 if self.env["surface_boundary_condition"] == _Strings.acousto_elastic: 

245 surface_props_line = _read_next_valid_line(f) 

246 surface_props = _parse_line(surface_props_line) + [None] * 6 

247 self.env['surface_depth'] = _float(surface_props[0]) 

248 self.env['surface_soundspeed'] = _float(surface_props[1]) 

249 self.env['_surface_soundspeed_shear'] = _float(surface_props[2]) 

250 self.env['surface_density'] = _float(surface_props[3], scale=1000) # convert from g/cm³ to kg/m³ 

251 self.env['surface_attenuation'] = _float(surface_props[4]) 

252 self.env['_surface_attenuation_shear'] = _float(surface_props[5]) 

253 

254 def _read_sound_speed_profile(self, f: TextIO) -> None: 

255 """Read environment file sound speed profile""" 

256 

257 # SSP depth specification 

258 ssp_spec_line = _read_next_valid_line(f) 

259 ssp_parts = _parse_line(ssp_spec_line) + [None] * 3 

260 self.env['_mesh_npts'] = _int(ssp_parts[0]) 

261 self.env['_depth_sigma'] = _float(ssp_parts[1]) 

262 self.env['depth_max'] = _float(ssp_parts[2]) 

263 self.env['depth'] = self.env['depth_max'] 

264 

265 # Read SSP points and from file if applicable 

266 self.env['soundspeed'] = _read_ssp_points(f) 

267 if self.env["soundspeed_interp"] == _Strings.quadrilateral: 

268 self.env['soundspeed'] = read_ssp(self.fname_base, self.env['soundspeed'].index) 

269 

270 def _read_bottom_boundary(self, f: TextIO) -> None: 

271 """Read environment file bottom boundary condition""" 

272 

273 # Bottom boundary options 

274 bottom_line = _read_next_valid_line(f) 

275 bottom_parts = _parse_line(bottom_line) + [None] * 3 

276 botopt = _unquote_string(cast(str,bottom_parts[0])) + " " # cast() => I promise this is a str :) 

277 self.env["bottom_boundary_condition"] = _opt_lookup("Bottom boundary condition", botopt[0], _Maps.bottom_boundary_condition) 

278 self.env["_bathymetry"] = _opt_lookup("Bathymetry", botopt[1], _Maps._bathymetry) 

279 self.env['bottom_roughness'] = _float(bottom_parts[1]) 

280 self.env['bottom_beta'] = _float(bottom_parts[2]) 

281 self.env['bottom_transition_freq'] = _float(bottom_parts[3]) 

282 if self.env["_bathymetry"] == _Strings.from_file: 

283 self.env["depth"], self.env["bottom_interp"] = read_bty(self.fname_base) 

284 

285 # Bottom properties (depth, sound_speed, density, absorption) 

286 if self.env["bottom_boundary_condition"] == _Strings.acousto_elastic: 

287 bottom_props_line = _read_next_valid_line(f) 

288 bottom_props = _parse_line(bottom_props_line) + [None] * 6 

289 self.env['bottom_soundspeed'] = _float(bottom_props[1]) 

290 self.env['_bottom_soundspeed_shear'] = _float(bottom_props[2]) 

291 self.env['bottom_density'] = _float(bottom_props[3], 1000) # convert from g/cm³ to kg/m³ 

292 self.env['bottom_attenuation'] = _float(bottom_props[4]) 

293 self.env['_bottom_attenuation_shear'] = _float(bottom_props[5]) 

294 

295 def _read_sources_receivers_task(self, f: TextIO) -> None: 

296 """Read environment file sources, receivers, and task""" 

297 

298 # Source & receiver depths 

299 self.env['source_depth'], self.env['source_ndepth'] = _parse_vector(f) 

300 self.env['receiver_depth'], self.env['receiver_ndepth'] = _parse_vector(f) 

301 

302 # Receiver ranges (in km, need to convert to m) 

303 receiver_ranges, self.env['receiver_nrange'] = _parse_vector(f) 

304 self.env['receiver_range'] = receiver_ranges * 1000 # convert km to m 

305 

306 # Task/run type (e.g., 'R', 'C', etc.) 

307 task_line = _read_next_valid_line(f) 

308 task_code = _unquote_string(task_line) + " " 

309 self.env['task'] = _Maps.task.get(task_code[0]) 

310 self.env['beam_type'] = _Maps.beam_type.get(task_code[1]) 

311 self.env['_sbp_file'] = _Maps._sbp_file.get(task_code[2]) 

312 self.env['source_type'] = _Maps.source_type.get(task_code[3]) 

313 self.env['grid_type'] = _Maps.grid_type.get(task_code[4]) 

314 

315 # Check for source directionality 

316 if self.env["_sbp_file"] == _Strings.from_file: 

317 self.env["source_directionality"] = read_sbp(self.fname_base) 

318 

319 def _read_beams_limits(self, f: TextIO) -> None: 

320 """Read environment file beams and limits""" 

321 

322 # Number of beams 

323 beam_num_line = _read_next_valid_line(f) 

324 beam_num_parts = _parse_line(beam_num_line) + [None] * 1 

325 self.env['beam_num'] = int(beam_num_parts[0] or 0) 

326 self.env['single_beam_index'] = _int(beam_num_parts[1]) 

327 

328 # Beam angles (beam_angle_min, beam_angle_max) 

329 angles_line = _read_next_valid_line(f) 

330 angle_parts = _parse_line(angles_line) + [None] * 2 

331 self.env['beam_angle_min'] = _float(angle_parts[0]) 

332 self.env['beam_angle_max'] = _float(angle_parts[1]) 

333 

334 # Ray tracing limits (step, max_depth, max_range) - last line 

335 limits_line = _read_next_valid_line(f) 

336 limits_parts = _parse_line(limits_line) 

337 self.env['step_size'] = float(limits_parts[0]) 

338 self.env['box_depth'] = float(limits_parts[1]) 

339 self.env['box_range'] = float(limits_parts[2]) * 1000 # convert km to m 

340 

341 

342def read_ssp(fname: str, 

343 depths: Optional[Union[ 

344 List[float], 

345 NDArray[_np.float64], 

346 _pd.DataFrame]] = None 

347 ) -> Union[NDArray[_np.float64], _pd.DataFrame]: 

348 """Read a 2D sound speed profile (.ssp) file used by BELLHOP. 

349 

350 This function reads BELLHOP's .ssp files which contain range-dependent 

351 sound speed profiles. The file format is: 

352 - Line 1: Number of range profiles (NPROFILES) 

353 - Line 2: Range coordinates in km (space-separated) 

354 - Line 3+: Sound speed values, one line per depth point across all ranges 

355 

356 Parameters 

357 ---------- 

358 fname : str 

359 Path to .ssp file (with or without .ssp extension) 

360 

361 Returns 

362 ------- 

363 numpy.ndarray or pandas.DataFrame 

364 For single-profile files: numpy array with [depth, soundspeed] pairs; 

365 for multi-profile files: pandas DataFrame with range-dependent sound speed data 

366 

367 Notes 

368 ----- 

369 **Return format:** 

370 

371 - **Single-profile files (1 range)**: Returns a 2D numpy array with [depth, soundspeed] pairs, 

372 compatible with create_env() soundspeed parameter. 

373 

374 - **Multi-profile files (>1 ranges)**: Returns a pandas DataFrame where: 

375 

376 - **Columns**: Range coordinates (in meters, converted from km in file) 

377 - **Index**: Depth indices (0, 1, 2, ... for each depth level in the file) 

378 - **Values**: Sound speeds (m/s) 

379 

380 This DataFrame can be directly assigned to create_env() soundspeed parameter 

381 for range-dependent acoustic modeling. 

382 

383 **Note on depths**: For multi-profile files, depth indices are used (0, 1, 2, ...) 

384 since the actual depth coordinates come from the associated BELLHOP .env file. 

385 Users can modify the DataFrame index if actual depth values are known. 

386 

387 Examples 

388 -------- 

389 >>> import bellhop as bh 

390 >>> # Single-profile file 

391 >>> ssp1 = bh.read_ssp("single_profile.ssp") # Returns numpy array 

392 >>> env = bh.create_env() 

393 >>> env["soundspeed"] = ssp1 

394 >>> 

395 >>> # Multi-profile file 

396 >>> ssp2 = bh.read_ssp("tests/MunkB_geo_rot/MunkB_geo_rot.ssp") # Returns DataFrame 

397 >>> env = bh.create_env() 

398 >>> env["soundspeed"] = ssp2 # Range-dependent sound speed 

399 

400 **File format example:** 

401 

402 :: 

403 

404 30 

405 -50 -5 -1 -.8 -.75 -.6 -.4 -.2 0 0.2 0.4 0.6 0.8 1.0 1.2 1.4 1.6 1.8 2.0 2.2 2.4 2.6 2.8 3.0 3.2 3.4 3.6 3.8 4.0 10.0 

406 1500 1500 1548.52 1530.29 1526.69 1517.78 1509.49 1504.30 1501.38 1500.14 1500.12 1501.02 1502.57 1504.62 1507.02 1509.69 1512.55 1515.56 1518.67 1521.85 1525.10 1528.38 1531.70 1535.04 1538.39 1541.76 1545.14 1548.52 1551.91 1551.91 

407 1500 1500 1548.52 1530.29 1526.69 1517.78 1509.49 1504.30 1501.38 1500.14 1500.12 1501.02 1502.57 1504.62 1507.02 1509.69 1512.55 1515.56 1518.67 1521.85 1525.10 1528.38 1531.70 1535.04 1538.39 1541.76 1545.14 1548.52 1551.91 1551.91 

408 """ 

409 

410 fname, _ = _prepare_filename(fname, _File_Ext.ssp, "SSP") 

411 with open(fname, 'r') as f: 

412 nranges = int(_read_next_valid_line(f)) 

413 range_line = _read_next_valid_line(f) 

414 ranges = _np.array([float(x) for x in _parse_line(range_line)]) 

415 ranges_m = ranges * 1000 # Convert ranges from km to meters (as expected by create_env) 

416 

417 if len(ranges) != nranges: 

418 raise ValueError(f"Expected {nranges} ranges, but found {len(ranges)}") 

419 

420 # Read sound speed data - read all remaining lines as a matrix 

421 ssp_data = [] 

422 line_num = 0 

423 for line in f: 

424 line_num += 1 

425 line = line.strip() 

426 if line: # Skip empty lines 

427 values = [float(x) for x in line.split()] 

428 if len(values) != nranges: 

429 raise ValueError(f"SSP line {line_num} has {len(values)} range values, expected {nranges}") 

430 ssp_data.append(values) 

431 

432 ssp_array = _np.array(ssp_data) 

433 ndepths = ssp_array.shape[0] 

434 

435 # Create depth indices (actual depths would normally come from associated .env file) 

436 if depths is None: 

437 depths = _np.arange(ndepths, dtype=float) 

438 

439 if ndepths == 0 or len(depths) != ndepths: 

440 raise ValueError("Wrong number of depths found in sound speed data file" 

441 f" (expected {ndepths}, found {ssp_array.shape[0]})") 

442 

443 df = _pd.DataFrame(ssp_array, index=depths, columns=ranges_m) 

444 df.index.name = "depth" 

445 return df 

446 

447def read_bty(fname: str) -> Tuple[NDArray[_np.float64], str]: 

448 """Read a bathymetry file used by Bellhop.""" 

449 fname, _ = _prepare_filename(fname, _File_Ext.bty, "BTY") 

450 return read_ati_bty(fname) 

451 

452def read_ati(fname: str) -> Tuple[NDArray[_np.float64], str]: 

453 """Read an altimetry file used by Bellhop.""" 

454 fname, _ = _prepare_filename(fname, _File_Ext.ati, "ATI") 

455 return read_ati_bty(fname) 

456 

457def read_ati_bty(fname: str) -> Tuple[NDArray[_np.float64], str]: 

458 """Read an altimetry (.ati) or bathymetry (.bty) file used by BELLHOP. 

459 

460 This function reads BELLHOP's .bty files which define the bottom depth 

461 profile. The file format is: 

462 - Line 1: Interpolation type ('L' for linear, 'C' for curvilinear) 

463 - Line 2: Number of points 

464 - Line 3+: Range (km) and depth (m) pairs 

465 

466 Parameters 

467 ---------- 

468 fname : str 

469 Path to .bty file (with or without .bty extension) 

470 

471 Returns 

472 ------- 

473 numpy.ndarray 

474 Numpy array with [range, depth] pairs compatible with create_env() 

475 

476 Notes 

477 ----- 

478 The returned array can be assigned to env["depth"] for range-dependent bathymetry. 

479 

480 **Examples:** 

481 

482 >>> import bellhop as bh 

483 >>> bty,bty_interp = bh.read_bty("tests/MunkB_geo_rot/MunkB_geo_rot.bty") 

484 >>> env = bh.create_env() 

485 >>> env["depth"] = bty 

486 >>> env["depth_interp"] = bty_interp 

487 >>> arrivals = bh.calculate_arrivals(env) 

488 

489 **File format example:** 

490 

491 :: 

492 

493 'L' 

494 5 

495 0 3000 

496 10 3000 

497 20 500 

498 30 3000 

499 100 3000 

500 """ 

501 

502 with open(fname, 'r') as f: 

503 # Read interpolation type (usually 'L' or 'C') 

504 interp_type = _read_next_valid_line(f).strip("'\"") 

505 npoints = int(_read_next_valid_line(f)) 

506 ranges = [] 

507 depths = [] 

508 for i in range(npoints): 

509 try: 

510 line = _read_next_valid_line(f) 

511 except EOFError: 

512 break 

513 parts = _parse_line(line) 

514 if len(parts) >= 2: 

515 ranges.append(float(parts[0])) # Range in km 

516 depths.append(float(parts[1])) # Depth in m 

517 

518 if len(ranges) != npoints: 

519 raise ValueError(f"Expected {npoints} altimetry/bathymetry points, but found {len(ranges)}") 

520 

521 # Convert ranges from km to m for consistency with bellhop env structure 

522 ranges_m = _np.array(ranges) * 1000 

523 depths_array = _np.array(depths) 

524 

525 # Return as [range, depth] pairs 

526 return _np.column_stack([ranges_m, depths_array]), _Maps.depth_interp[interp_type] 

527 

528def read_sbp(fname: str) -> NDArray[_np.float64]: 

529 """Read an source beam patterm (.sbp) file used by BELLHOP. 

530 

531 The file format is: 

532 - Line 1: Number of points 

533 - Line 2+: Angle (deg) and power (dB) pairs 

534 

535 Parameters 

536 ---------- 

537 fname : str 

538 Path to .sbp file (with or without extension) 

539 

540 Returns 

541 ------- 

542 numpy.ndarray 

543 Numpy array with [angle, power] pairs 

544 """ 

545 

546 fname, _ = _prepare_filename(fname, _File_Ext.sbp, "SBP") 

547 with open(fname, 'r') as f: 

548 

549 # Read number of points 

550 npoints = int(_read_next_valid_line(f)) 

551 

552 # Read range,depth pairs 

553 angles = [] 

554 powers = [] 

555 

556 for i in range(npoints): 

557 try: 

558 line = _read_next_valid_line(f) 

559 except EOFError: 

560 break 

561 parts = _parse_line(line) 

562 if len(parts) >= 2: 

563 angles.append(float(parts[0])) # Range in km 

564 powers.append(float(parts[1])) # Depth in m 

565 

566 if len(angles) != npoints: 

567 raise ValueError(f"Expected {npoints} points, but found {len(angles)}") 

568 

569 # Return as [range, depth] pairs 

570 return _np.column_stack([angles, powers]) 

571 

572def read_brc(fname: str) -> NDArray[_np.float64]: 

573 """Read a BRC file and return array of reflection coefficients. 

574 

575 See `read_refl_coeff` for documentation, but use this function for extension checkking.""" 

576 fname, _ = _prepare_filename(fname, _File_Ext.brc, "BRC") 

577 return read_refl_coeff(fname) 

578 

579def read_trc(fname: str) -> NDArray[_np.float64]: 

580 """Read a TRC file and return array of reflection coefficients. 

581 

582 See `read_refl_coeff` for documentation, but use this function for extension checkking.""" 

583 fname, _ = _prepare_filename(fname, _File_Ext.trc, "TRC") 

584 return read_refl_coeff(fname) 

585 

586def read_refl_coeff(fname: str) -> NDArray[_np.float64]: 

587 """Read a reflection coefficient (.brc/.trc) file used by BELLHOP. 

588 

589 This function reads BELLHOP's .brc files which define the reflection coefficient 

590 data. The file format is: 

591 - Line 1: Number of points 

592 - Line 2+: THETA(j) RMAG(j) RPHASE(j) 

593 

594 Where: 

595 - THETA(): Angle (degrees) 

596 - RMAG(): Magnitude of reflection coefficient 

597 - RPHASE(): Phase of reflection coefficient (degrees) 

598 

599 Parameters 

600 ---------- 

601 fname : str 

602 Path to .brc/.trc file (extension required) 

603 

604 Returns 

605 ------- 

606 numpy.ndarray 

607 Numpy array with [theta, rmag, rphase] triplets compatible with create_env() 

608 

609 Notes 

610 ----- 

611 The returned array can be assigned to env["bottom_reflection_coefficient"] or env["surface_reflection_coefficient"] . 

612 

613 Examples 

614 -------- 

615 >>> import bellhop as bh 

616 >>> brc = bh.read_refl_coeff("tests/MunkB_geo_rot/MunkB_geo_rot.brc") 

617 >>> env = bh.create_env() 

618 >>> env["bottom_reflection_coefficient"] = brc 

619 >>> arrivals = bh.calculate_arrivals(env) 

620 

621 **File format example:** 

622 

623 :: 

624 

625 3 

626 0.0 1.00 180.0 

627 45.0 0.95 175.0 

628 90.0 0.90 170.0 

629 """ 

630 

631 with open(fname, 'r') as f: 

632 

633 # Read number of points 

634 npoints = int(_read_next_valid_line(f)) 

635 

636 # Read range,depth pairs 

637 theta = [] 

638 rmagn = [] 

639 rphas = [] 

640 

641 for i in range(npoints): 

642 try: 

643 line = _read_next_valid_line(f) 

644 except EOFError: 

645 break 

646 parts = _parse_line(line) 

647 if len(parts) == 3: 

648 theta.append(float(parts[0])) 

649 rmagn.append(float(parts[1])) 

650 rphas.append(float(parts[2])) 

651 

652 if len(theta) != npoints: 

653 raise ValueError(f"Expected {npoints} reflection coefficient points, but found {len(theta)}") 

654 

655 # Return as [range, depth] pairs 

656 return _np.column_stack([theta, rmagn, rphas]) 

657 

658 

659def read_arrivals(fname: str) -> _pd.DataFrame: 

660 """Read Bellhop arrivals file and parse data into a high level data structure""" 

661 path = _ensure_file_exists(fname) 

662 with path.open('rt') as f: 

663 hdr = f.readline() 

664 if hdr.find('2D') >= 0: 

665 freq = _read_array(f, (float,)) 

666 source_depth_info = _read_array(f, (int,), float) 

667 source_depth_count = source_depth_info[0] 

668 source_depth = source_depth_info[1:] 

669 assert source_depth_count == len(source_depth) 

670 receiver_depth_info = _read_array(f, (int,), float) 

671 receiver_depth_count = receiver_depth_info[0] 

672 receiver_depth = receiver_depth_info[1:] 

673 assert receiver_depth_count == len(receiver_depth) 

674 receiver_range_info = _read_array(f, (int,), float) 

675 receiver_range_count = receiver_range_info[0] 

676 receiver_range = receiver_range_info[1:] 

677 assert receiver_range_count == len(receiver_range) 

678# else: # worry about 3D later 

679# freq, source_depth_count, receiver_depth_count, receiver_range_count = _read_array(hdr, (float, int, int, int)) 

680# source_depth = _read_array(f, (float,)*source_depth_count) 

681# receiver_depth = _read_array(f, (float,)*receiver_depth_count) 

682# receiver_range = _read_array(f, (float,)*receiver_range_count) 

683 arrivals: List[_pd.DataFrame] = [] 

684 for j in range(source_depth_count): 

685 f.readline() 

686 for k in range(receiver_depth_count): 

687 for m in range(receiver_range_count): 

688 count = int(f.readline()) 

689 for n in range(count): 

690 data = _read_array(f, (float, float, float, float, float, float, int, int)) 

691 arrivals.append(_pd.DataFrame({ 

692 'source_depth_ndx': [j], 

693 'receiver_depth_ndx': [k], 

694 'receiver_range_ndx': [m], 

695 'source_depth': [source_depth[j]], 

696 'receiver_depth': [receiver_depth[k]], 

697 'receiver_range': [receiver_range[m]], 

698 'arrival_number': [n], 

699 # 'arrival_amplitude': [data[0]*_np.exp(1j * data[1]* _np.pi/180)], 

700 'arrival_amplitude': [data[0] * _np.exp( -1j * (_np.deg2rad(data[1]) + freq[0] * 2 * _np.pi * (data[3] * 1j + data[2])))], 

701 'time_of_arrival': [data[2]], 

702 'complex_time_of_arrival': [data[2] + 1j*data[3]], 

703 'angle_of_departure': [data[4]], 

704 'angle_of_arrival': [data[5]], 

705 'surface_bounces': [data[6]], 

706 'bottom_bounces': [data[7]] 

707 }, index=[len(arrivals)+1])) 

708 return _pd.concat(arrivals) 

709 

710 

711def read_shd(fname: str) -> _pd.DataFrame: 

712 """Read Bellhop shd file and parse data into a high level data structure""" 

713 path = _ensure_file_exists(fname) 

714 with path.open('rb') as f: 

715 recl, = _unpack('i', f.read(4)) 

716 # _title = str(f.read(80)) 

717 f.seek(4*recl, 0) 

718 ptype = f.read(10).decode('utf8').strip() 

719 assert ptype == 'rectilin', 'Invalid file format (expecting ptype == "rectilin")' 

720 f.seek(8*recl, 0) 

721 nfreq, ntheta, nsx, nsy, nsd, nrd, nrr, atten = _unpack('iiiiiiif', f.read(32)) 

722 assert nfreq == 1, 'Invalid file format (expecting nfreq == 1)' 

723 assert ntheta == 1, 'Invalid file format (expecting ntheta == 1)' 

724 assert nsd == 1, 'Invalid file format (expecting nsd == 1)' 

725 f.seek(32*recl, 0) 

726 pos_r_depth = _unpack('f'*nrd, f.read(4*nrd)) 

727 f.seek(36*recl, 0) 

728 pos_r_range = _unpack('f'*nrr, f.read(4*nrr)) 

729 pressure = _np.zeros((nrd, nrr), dtype=_np.complex128) 

730 for ird in range(nrd): 

731 recnum = 10 + ird 

732 f.seek(recnum*4*recl, 0) 

733 temp = _np.array(_unpack('f'*2*nrr, f.read(2*nrr*4))) 

734 pressure[ird,:] = temp[::2] + 1j*temp[1::2] 

735 return _pd.DataFrame(pressure, index=pos_r_depth, columns=pos_r_range) 

736 

737 

738def read_rays(fname: str) -> _pd.DataFrame: 

739 """Read Bellhop rays file and parse data into a high level data structure""" 

740 path = _ensure_file_exists(fname) 

741 with path.open('rt') as f: 

742 f.readline() 

743 f.readline() 

744 f.readline() 

745 f.readline() 

746 f.readline() 

747 f.readline() 

748 f.readline() 

749 rays = [] 

750 while True: 

751 s = f.readline() 

752 if s is None or len(s.strip()) == 0: 

753 break 

754 a = float(s) 

755 pts, sb, bb = _read_array(f, (int, int, int)) 

756 ray = _np.empty((pts, 2)) 

757 for k in range(pts): 

758 ray[k,:] = _read_array(f, (float, float)) 

759 rays.append(_pd.DataFrame({ 

760 'angle_of_departure': [a], 

761 'surface_bounces': [sb], 

762 'bottom_bounces': [bb], 

763 'ray': [ray] 

764 })) 

765 return _pd.concat(rays) 

766 

767def _ensure_file_exists(fname: str) -> Path: 

768 path = Path(fname) 

769 if not path.exists(): 

770 raise RuntimeError(f"Bellhop did not generate expected output file: {path}") 

771 return path 

772 

773def _read_array(f: IO[str], types: Tuple[Any, ...], dtype: type = str) -> Tuple[Any, ...]: 

774 """Wrapper around readline() to read in a 1D array of data""" 

775 p = f.readline().split() 

776 for j in range(len(p)): 

777 if len(types) > j: 

778 p[j] = types[j](p[j]) 

779 else: 

780 p[j] = dtype(p[j]) 

781 return tuple(p)