Coverage for wifa_uq / preprocessing / preprocessing.py: 58%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-19 02:10 +0000

1# ./wifa_uq/preprocessing/preprocessing.py 

2 

3# %% 

4import xarray as xr 

5import numpy as np 

6import shutil 

7from windIO.yaml import load_yaml 

8from matplotlib.ticker import ScalarFormatter 

9from wifa.wayve_api import ci_fitting 

10import matplotlib.pyplot as plt 

11from pathlib import Path 

12 

13# --- Helper Functions (pasted from our previous steps) --- 

14 

15 

16def _calculate_abl_from_velocity(ds: xr.Dataset) -> xr.DataArray: 

17 """ 

18 Calculates ABL height from the velocity profile. 

19 

20 Definition: Height where wind speed first reaches 99% of the 

21 maximum wind speed in the profile (freestream or LLJ peak). 

22 """ 

23 print(" Calculating ABL_height from velocity profile (99% of max)...") 

24 

25 # Get the maximum wind speed for each time step (flow_case) 

26 max_wind_speed = ds["wind_speed"].max(dim="height") 

27 

28 # Calculate the 99% threshold 

29 threshold = 0.99 * max_wind_speed 

30 

31 # Find all heights where wind speed is >= 99% of max 

32 above_threshold = ds["wind_speed"] >= threshold 

33 

34 # Create a DataArray of heights, broadcasted to match the wind_speed array 

35 height_da = ds["height"] * xr.ones_like(ds["wind_speed"]) 

36 

37 # Where wind speed is below threshold, set height to infinity 

38 valid_heights = height_da.where(above_threshold, np.inf) 

39 

40 # Find the minimum height (the first to cross the threshold) for each time step 

41 abl_height = valid_heights.min(dim="height") 

42 

43 # Handle cases with no valid height (all inf) by setting to max height 

44 abl_height = abl_height.where(abl_height != np.inf, ds["height"].max()) 

45 

46 return abl_height 

47 

48 

49def _calculate_veer(ds: xr.Dataset) -> xr.DataArray: 

50 """Calculates wind veer (d(WD)/dz) from the wind direction profile.""" 

51 print(" Calculating wind veer...") 

52 

53 # Ensure wind_direction and height exist 

54 if "wind_direction" not in ds or "height" not in ds: 

55 raise ValueError("Missing 'wind_direction' or 'height' for veer calculation.") 

56 

57 # 1. Convert degrees to radians 

58 wd_rad = np.deg2rad(ds["wind_direction"]) 

59 

60 # 2. Unwrap angles along the height dimension to handle 0/360 crossing 

61 unwrapped_rad = xr.apply_ufunc( 

62 np.unwrap, 

63 wd_rad, 

64 input_core_dims=[["height"]], 

65 output_core_dims=[["height"]], 

66 dask="parallelized", 

67 output_dtypes=[wd_rad.dtype], 

68 ) 

69 

70 # 3. Calculate the gradient (d(rad)/dz) 

71 height_coords = ds["height"].values 

72 

73 veer_rad_per_m = xr.apply_ufunc( 

74 np.gradient, 

75 unwrapped_rad, 

76 height_coords, 

77 input_core_dims=[["height"], ["height"]], 

78 output_core_dims=[["height"]], 

79 kwargs={"axis": -1}, # Tell numpy.gradient to use the last (core) dimension 

80 dask="parallelized", 

81 output_dtypes=[unwrapped_rad.dtype], 

82 ) 

83 

84 # 4. Convert back to degrees/meter for interpretability 

85 veer_deg_per_m = np.rad2deg(veer_rad_per_m) 

86 

87 veer_deg_per_m.attrs["long_name"] = "Wind Veer" 

88 veer_deg_per_m.attrs["units"] = "deg/m" 

89 

90 return veer_deg_per_m 

91 

92 

93# --- Main Class --- 

94 

95 

96class PreprocessingInputs: 

97 def __init__( 

98 self, ref_resource_path: Path, output_path: Path, steps: list[str] = None 

99 ): 

100 """ 

101 Initializes the preprocessor for a SINGLE reference resource file. 

102 

103 Args: 

104 ref_resource_path (Path): Path to the original NetCDF resource file. 

105 output_path (Path): Path to write the processed NetCDF file. 

106 steps (list[str], optional): Steps to run: 'update_heights', 'recalculate_params'. 

107 """ 

108 self.ref_resource_path = Path(ref_resource_path) 

109 self.output_path = Path(output_path) 

110 self.steps = steps if steps is not None else [] 

111 

112 if not self.ref_resource_path.exists(): 

113 raise FileNotFoundError( 

114 f"Input resource file not found: {self.ref_resource_path}" 

115 ) 

116 

117 print(f"Preprocessor initialized for {self.ref_resource_path.name}.") 

118 if self.steps: 

119 print(f"Applying steps: {self.steps}") 

120 else: 

121 print("No preprocessing steps will be applied.") 

122 

123 def run_pipeline(self) -> Path: 

124 """ 

125 Runs the configured preprocessing pipeline. 

126 Returns the path to the processed file. 

127 """ 

128 if not self.steps: 

129 print( 

130 f"No steps to run. Copying {self.ref_resource_path.name} to {self.output_path.name}" 

131 ) 

132 self.output_path.parent.mkdir(parents=True, exist_ok=True) 

133 shutil.copy(self.ref_resource_path, self.output_path) 

134 return self.output_path 

135 

136 current_input_path = self.ref_resource_path 

137 

138 if "update_heights" in self.steps: 

139 print( 

140 " WARNING: Skipping 'update_heights'. This step is not fully supported " 

141 "in this refactor as it requires a target height grid." 

142 ) 

143 

144 if "recalculate_params" in self.steps: 

145 print(" Running 'recalculate_params'...") 

146 self._recalculate_params(current_input_path, self.output_path) 

147 

148 if not self.output_path.exists(): 

149 shutil.copy(self.ref_resource_path, self.output_path) 

150 

151 print(f"Preprocessing complete. Output at: {self.output_path}") 

152 return self.output_path 

153 

154 def _recalculate_params(self, input_path: Path, output_path: Path): 

155 """ 

156 Recalculate parameters from vertical profiles. 

157 Reads from input_path and writes to output_path. 

158 """ 

159 try: 

160 with xr.load_dataset(input_path) as ref_inputs: 

161 ref_inputs_modified = ref_inputs.copy() 

162 

163 # --- Update TI --- 

164 if "k" in ref_inputs and "wind_speed" in ref_inputs: 

165 ref_inputs_modified["turbulence_intensity"] = ( 

166 np.sqrt((2 / 3) * ref_inputs["k"]) / ref_inputs["wind_speed"] 

167 ) 

168 else: 

169 print( 

170 " Skipping TI recalculation: 'k' or 'wind_speed' not found." 

171 ) 

172 

173 # --- Calculate Wind Veer --- 

174 try: 

175 ref_inputs_modified["wind_veer"] = _calculate_veer( 

176 ref_inputs_modified 

177 ) 

178 except Exception as e: 

179 print(f" Skipping wind veer calculation: {e}") 

180 

181 # --- Try for velocity based ABL --- 

182 if ( 

183 "wind_speed" in ref_inputs_modified 

184 and "height" in ref_inputs_modified 

185 ): 

186 try: 

187 ref_inputs_modified["ABL_height"] = ( 

188 _calculate_abl_from_velocity(ref_inputs_modified) 

189 ) 

190 except Exception as e: 

191 print(f" Velocity-based ABL calculation failed: {e}") 

192 

193 # --- Run temperature-based calculations (Lapse Rate, etc.) --- 

194 print(" Checking for temperature-based parameters...") 

195 if ( 

196 "height" in ref_inputs_modified 

197 and "potential_temperature" in ref_inputs_modified 

198 ): 

199 non_height_dims = [ 

200 d 

201 for d in ref_inputs_modified["potential_temperature"].dims 

202 if d != "height" 

203 ] 

204 non_height_shape = [ 

205 s 

206 for d, s in ref_inputs_modified[ 

207 "potential_temperature" 

208 ].sizes.items() 

209 if d != "height" 

210 ] 

211 

212 if "LMO" not in ref_inputs_modified: 

213 print( 

214 " 'LMO' not found. Assuming neutral stability (LMO=1e10)." 

215 ) 

216 ref_inputs_modified["LMO"] = xr.DataArray( 

217 np.full(non_height_shape, 1e10), dims=non_height_dims 

218 ) 

219 

220 if "ABL_height" in ref_inputs_modified: 

221 abl_guess = ref_inputs_modified["ABL_height"] 

222 else: 

223 print( 

224 " 'ABL_height' (initial guess) not found. Assuming 1000m." 

225 ) 

226 abl_guess = xr.DataArray( 

227 np.full(non_height_shape, 1000.0), dims=non_height_dims 

228 ) 

229 

230 print( 

231 " Running 'ci_fitting' for thermal parameters (lapse_rate, etc.)..." 

232 ) 

233 H_temp, dthdz, dth, inv_thickness, unknown1, unknown2 = ( 

234 xr.apply_ufunc( 

235 ci_fitting, # wifa.wayve_api.ci_fitting 

236 ref_inputs_modified["height"], 

237 ref_inputs_modified["potential_temperature"], 

238 ref_inputs_modified["LMO"], # Use LMO (or default) 

239 abl_guess, # Use abl_guess 

240 input_core_dims=[["height"], ["height"], [], []], 

241 output_core_dims=[[], [], [], [], [], []], # <-- UPDATED 

242 vectorize=True, 

243 dask="allowed", 

244 output_dtypes=[ 

245 float, 

246 float, 

247 float, 

248 float, 

249 float, 

250 float, 

251 ], # <-- UPDATED 

252 ) 

253 ) 

254 

255 # ALWAYS save the thermal properties 

256 ref_inputs_modified["lapse_rate"] = dthdz 

257 ref_inputs_modified["capping_inversion_strength"] = dth 

258 ref_inputs_modified["capping_inversion_thickness"] = inv_thickness 

259 

260 if "ABL_height" not in ref_inputs_modified: 

261 print(" Using temperature-based ABL_height as fallback.") 

262 ref_inputs_modified["ABL_height"] = H_temp 

263 # --------------------------------- 

264 else: 

265 print( 

266 " Skipping temperature-based calculations: 'height' or 'potential_temperature' not found." 

267 ) 

268 

269 if "lapse_rate" not in ref_inputs_modified: 

270 print(" WARNING: Could not calculate lapse_rate.") 

271 

272 output_path.parent.mkdir(parents=True, exist_ok=True) 

273 ref_inputs_modified.to_netcdf(output_path) 

274 

275 except Exception as e: 

276 # This outer try/except block will catch the error from ci_fitting 

277 print( 

278 f" FATAL Error during 'recalculate_params': {type(e).__name__}: {e}" 

279 ) 

280 if not output_path.exists() and input_path != output_path: 

281 shutil.copy(input_path, output_path) 

282 raise e # Re-raise it to stop the main workflow 

283 

284 # ... (rest of the class, e.g., compare_physical_inputs, batch_update_params) ... 

285 # ... (the __main__ block at the end) ... 

286 

287 def compare_physical_inputs( 

288 self, 

289 case_name: str, 

290 TI_height_idx: int, 

291 base_dir="EDF_datasets", 

292 updated_file="updated_physical_inputs.nc", 

293 ): 

294 """ 

295 Compare original and modified physical inputs for a given case. 

296 

297 Parameters: 

298 - case_name: name of the case (used for path construction) 

299 - TI_height_idx: index of height at which to extract TI 

300 - base_dir: base directory containing cases 

301 - updated_file: name of the updated netcdf file (relative to case directory) 

302 """ 

303 meta_file = f"{self.base_dir}/{case_name}/meta.yaml" 

304 meta = load_yaml(Path(meta_file)) 

305 

306 # Load datasets 

307 original_path = f"{self.base_dir}/{case_name}/{meta['ref_resource']}" 

308 updated_path = f"{self.base_dir}/{case_name}/{updated_file}" 

309 

310 ds_orig = xr.load_dataset(original_path) 

311 ds_mod = xr.load_dataset(updated_path) 

312 

313 fig, axs = plt.subplots(5, 1, figsize=(8, 15), sharex=True) 

314 

315 # Capping inversion strength [K] 

316 axs[0].plot( 

317 ds_orig.capping_inversion_strength.values, label="Old", linestyle="--" 

318 ) 

319 axs[0].plot(ds_mod.capping_inversion_strength.values, label="New") 

320 axs[0].set_title("Capping Inversion Strength") 

321 axs[0].set_ylabel(r"$\Delta \theta$ [K]") 

322 axs[0].legend() 

323 

324 # Capping inversion thickness [m] 

325 axs[1].plot( 

326 ds_orig.capping_inversion_thickness.values, label="Old", linestyle="--" 

327 ) 

328 axs[1].plot(ds_mod.capping_inversion_thickness.values, label="New") 

329 axs[1].set_title("Capping Inversion Thickness") 

330 axs[1].set_ylabel("Thickness [m]") 

331 axs[1].legend() 

332 

333 # ABL Height [m] 

334 axs[2].plot(ds_orig.ABL_height.values, label="Old", linestyle="--") 

335 axs[2].plot(ds_mod.ABL_height.values, label="New") 

336 axs[2].set_title("ABL Height") 

337 axs[2].set_ylabel("Height [m]") 

338 axs[2].legend() 

339 

340 # Lapse Rate [K/m] 

341 axs[3].plot(ds_orig.lapse_rate.values, label="Old", linestyle="--") 

342 axs[3].plot(ds_mod.lapse_rate.values, label="New") 

343 axs[3].set_title("Lapse Rate") 

344 axs[3].set_ylabel(r"$\partial \theta / \partial z$ [K/m]") 

345 axs[3].legend() 

346 

347 formatter = ScalarFormatter(useMathText=True) 

348 formatter.set_powerlimits( 

349 (-2, 2) 

350 ) # scientific notation outside range 10^-2 to 10^2 

351 axs[3].yaxis.set_major_formatter(formatter) 

352 

353 # Turbulence Intensity (unitless) 

354 axs[4].plot( 

355 ds_orig.turbulence_intensity.values[:, TI_height_idx], 

356 label=f"Old (height idx {TI_height_idx})", 

357 linestyle="--", 

358 ) 

359 axs[4].plot( 

360 ds_mod.turbulence_intensity.values[:, TI_height_idx], 

361 label=f"New (height idx {TI_height_idx})", 

362 ) 

363 axs[4].set_title("Turbulence Intensity") 

364 axs[4].set_ylabel("TI [-]") 

365 axs[4].set_xlabel("Time index") 

366 axs[4].legend() 

367 

368 plt.tight_layout() 

369 plt.show() 

370 

371 def batch_update_params(self): 

372 meta_file = self.base_dir / self.case_names[0] / "meta.yaml" 

373 meta = load_yaml(meta_file) 

374 ref_inputs = xr.load_dataset( 

375 self.base_dir / self.case_names[0] / meta["ref_resource"] 

376 ) 

377 new_heights = ref_inputs.height 

378 

379 for case in self.case_names: 

380 self.update_heights(case, new_heights) 

381 self.recalculate_params(case) 

382 

383 

384if __name__ == "__main__": 

385 case_names = [ 

386 "HR1", 

387 "HR2", 

388 "HR3", 

389 "NYSTED1", 

390 "NYSTED2", 

391 "VirtWF_ABL_IEA10", 

392 "VirtWF_ABL_IEA15_ali_DX5_DY5", 

393 "VirtWF_ABL_IEA15_stag_DX5_DY5", # this has 998 heights originally, interpolating later 

394 "VirtWF_ABL_IEA15_stag_DX5_DY7p5", # this has 998 heights originally, interpolating later 

395 "VirtWF_ABL_IEA15_stag_DX7p5_DY5", # this has 998 heights originally, interpolating later 

396 "VirtWF_ABL_IEA22", 

397 ] 

398 base_dir = "EDF_datasets" 

399 preprocessor = PreprocessingInputs(base_dir=base_dir, case_names=case_names) 

400 

401 # plotting for a single case 

402 preprocessor.compare_physical_inputs(case_name=case_names[5], TI_height_idx=100)