Coverage for wifa_uq / model_error_database / database_gen.py: 88%

155 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-19 02:10 +0000

1import xarray as xr 

2import numpy as np 

3import time 

4import re # Import regular expression library 

5from pathlib import Path 

6from scipy.interpolate import interp1d 

7from windIO.yaml import load_yaml 

8from wifa import run_pywake, run_foxes 

9from wifa_uq.model_error_database.run_sweep import run_parameter_sweep 

10from wifa_uq.model_error_database.utils import ( 

11 calc_boundary_area, 

12 blockage_metrics, 

13 farm_length_width, 

14) 

15 

16 

17class DatabaseGenerator: 

18 def __init__( 

19 self, 

20 nsamples: int, 

21 param_config: dict, 

22 system_yaml_path: Path, 

23 ref_power_path: Path, 

24 processed_resource_path: Path, 

25 wf_layout_path: Path, 

26 output_db_path: Path, 

27 model="pywake", 

28 ): 

29 """ 

30 Initializes the DatabaseGenerator. 

31 

32 Args: 

33 nsamples (int): Number of parameter samples to run. 

34 param_config (dict): Dictionary of parameters to sample. 

35 system_yaml_path (Path): Path to the windIO system YAML file. 

36 ref_power_path (Path): Path to the reference power NetCDF file (the "truth" data). 

37 processed_resource_path (Path): Path to the *preprocessed* physical inputs NetCDF. 

38 wf_layout_path (Path): Path to the wind_farm YAML (for layout utils). 

39 output_db_path (Path): Full path to save the final stacked NetCDF database. 

40 model (str, optional): Model to use. Defaults to "pywake". 

41 """ 

42 self.nsamples = nsamples 

43 self.param_config = self._normalize_param_config(param_config) 

44 self.model = model 

45 self.system_yaml_path = Path(system_yaml_path) 

46 self.ref_power_path = Path(ref_power_path) 

47 self.processed_resource_path = Path(processed_resource_path) 

48 self.wf_layout_path = Path(wf_layout_path) 

49 self.output_db_path = Path(output_db_path) 

50 

51 # Ensure output directory exists 

52 self.output_db_path.parent.mkdir(parents=True, exist_ok=True) 

53 

54 # Validate input paths 

55 if not self.system_yaml_path.exists(): 

56 raise FileNotFoundError(f"System YAML not found: {self.system_yaml_path}") 

57 if not self.ref_power_path.exists(): 

58 raise FileNotFoundError( 

59 f"Reference power file not found: {self.ref_power_path}" 

60 ) 

61 if not self.processed_resource_path.exists(): 

62 raise FileNotFoundError( 

63 f"Processed resource file not found: {self.processed_resource_path}" 

64 ) 

65 if not self.wf_layout_path.exists(): 

66 raise FileNotFoundError( 

67 f"Wind farm layout file not found: {self.wf_layout_path}" 

68 ) 

69 

70 def _normalize_param_config(self, param_config: dict) -> dict: 

71 """ 

72 Normalize param_config to full format. 

73 Handles both simple [min, max] and full {range, default, short_name} formats. 

74 """ 

75 normalized = {} 

76 for path, config in param_config.items(): 

77 if isinstance(config, list): 

78 # Simple format: [min, max] 

79 short_name = path.split(".")[-1] # Use last part of path 

80 normalized[path] = { 

81 "range": config, 

82 "default": None, 

83 "short_name": short_name, 

84 } 

85 elif isinstance(config, dict): 

86 # Full format 

87 if "short_name" not in config: 

88 config["short_name"] = path.split(".")[-1] 

89 normalized[path] = config 

90 else: 

91 raise ValueError(f"Invalid param_config format for {path}") 

92 

93 return normalized 

94 

95 def _infer_rated_power(self, wf_dat: dict, system_dat: dict) -> float: 

96 """ 

97 Attempts to find the rated power using multiple strategies, in order. 

98 """ 

99 # The 'turbines' dict could be at the top level of wf_dat 

100 # or inside system_dat['wind_farm'] 

101 turbine_data_sources = [ 

102 wf_dat.get("turbines"), 

103 system_dat.get("wind_farm", {}).get("turbines"), 

104 ] 

105 

106 for turbine_data in turbine_data_sources: 

107 if not turbine_data: 

108 continue 

109 

110 # --- Strategy 1: Check for explicit 'rated_power' key --- 

111 try: 

112 power = turbine_data["performance"]["rated_power"] 

113 if power: 

114 print(f"Found 'rated_power' key: {power} W") 

115 return float(power) 

116 except (KeyError, TypeError): 

117 pass # Not found, try next strategy 

118 

119 # --- Strategy 2: Get max from 'power_curve' --- 

120 try: 

121 power_values = turbine_data["performance"]["power_curve"][ 

122 "power_values" 

123 ] 

124 if power_values: 

125 power = max(power_values) 

126 print(f"Found 'power_curve'. Max power: {power} W") 

127 return float(power) 

128 except (KeyError, TypeError): 

129 pass # Not found, try next strategy 

130 

131 # --- Strategy 3: Parse the turbine 'name' --- 

132 try: 

133 name = turbine_data["name"] 

134 # Regex to find numbers (int or float) followed by "MW" (case-insensitive) 

135 match = re.search(r"(\d+(\.\d+)?)\s*MW", name, re.IGNORECASE) 

136 if match: 

137 power_mw = float(match.group(1)) 

138 power_w = power_mw * 1_000_000 

139 print( 

140 f"Inferred rated power from turbine name '{name}': {power_w} W" 

141 ) 

142 return power_w 

143 except (KeyError, TypeError): 

144 pass # Not found, try next strategy 

145 

146 # All strategies failed 

147 raise ValueError( 

148 "Could not find or infer 'rated_power'.\n" 

149 "Tried: \n" 

150 " 1. 'rated_power' key in '...turbines.performance'.\n" 

151 " 2. 'max(power_curve.power_values)' in '...turbines.performance'.\n" 

152 " 3. Parsing 'XMW' from '...turbines.name' field.\n" 

153 "Please add one of these to your windIO turbine file." 

154 ) 

155 

156 def generate_database(self) -> xr.Dataset: 

157 """ 

158 Runs the full database generation pipeline for the single specified case. 

159 """ 

160 starttime = time.time() 

161 

162 # --- 1. Load all data from explicit paths --- 

163 print(f"Loading system config: {self.system_yaml_path.name}") 

164 dat = load_yaml(self.system_yaml_path) 

165 print(f"Loading reference power: {self.ref_power_path.name}") 

166 reference_power = xr.load_dataset(self.ref_power_path) 

167 print(f"Loading processed resource: {self.processed_resource_path.name}") 

168 reference_physical_inputs = xr.load_dataset(self.processed_resource_path) 

169 print(f"Loading wind farm layout: {self.wf_layout_path.name}") 

170 wf_dat = load_yaml(self.wf_layout_path) 

171 

172 # --- 2. Infer metadata (replaces meta.yaml) --- 

173 

174 # --- NEW ROBUST INFERENCE --- 

175 turb_rated_power = self._infer_rated_power(wf_dat, dat) 

176 

177 # Get other metadata from the loaded files 

178 nt = len(wf_dat["layouts"][0]["coordinates"]["x"]) 

179 hh = wf_dat["turbines"]["hub_height"] 

180 d = wf_dat["turbines"]["rotor_diameter"] 

181 case_name = wf_dat.get("name", self.system_yaml_path.stem) 

182 

183 print( 

184 f"Case: {case_name}, {nt} turbines, Rated Power: {turb_rated_power / 1e6:.1f} MW, Hub Height: {hh} m" 

185 ) 

186 

187 # --- 3. Run parameter sweep --- 

188 output_dir = self.output_db_path.parent / "samples" 

189 output_dir.mkdir(exist_ok=True) 

190 

191 if self.model == "pywake": 

192 result = run_parameter_sweep( 

193 run_pywake, 

194 turb_rated_power, 

195 dat, 

196 self.param_config, 

197 reference_power, 

198 None, # reference_physical_inputs is not actually used by run_pywake_sweep 

199 n_samples=self.nsamples, 

200 seed=1, 

201 output_dir=output_dir, 

202 ) 

203 elif self.model == "foxes": 

204 result = run_parameter_sweep( 

205 run_foxes, 

206 turb_rated_power, 

207 dat, 

208 self.param_config, 

209 reference_power, 

210 None, # reference_physical_inputs is not actually used by run_pywake_sweep 

211 n_samples=self.nsamples, 

212 seed=1, 

213 output_dir=output_dir, 

214 run_func_kwargs={"verbosity": 0}, 

215 ) 

216 else: 

217 raise NotImplementedError(f"Model '{self.model}' not implemented yet.") 

218 

219 print("Parameter sweep complete. Processing physical inputs...") 

220 

221 # --- 4. Process and add physical inputs --- 

222 phys_inputs = reference_physical_inputs.copy() 

223 

224 # Rename 'time' to 'flow_case' to match run_pywake_sweep output 

225 if "time" in phys_inputs.dims: 

226 # Check if flow_case dimension already exists from run_pywake_sweep 

227 n_flow_cases = len(result.flow_case) 

228 if len(phys_inputs.time) != n_flow_cases: 

229 raise ValueError( 

230 f"Mismatch in 'time' dimension of resource ({len(phys_inputs.time)}) " 

231 f"and 'flow_case' dimension of simulation ({n_flow_cases})." 

232 ) 

233 

234 # Use the coordinates from the simulation result, but data from resource 

235 phys_inputs = phys_inputs.rename({"time": "flow_case"}) 

236 phys_inputs = phys_inputs.assign_coords(flow_case=result.flow_case) 

237 

238 # Interpolate to hub height if 'height' dimension exists 

239 if "height" in phys_inputs.dims: 

240 print(f"Interpolating physical inputs to hub height ({hh} m)...") 

241 heights = phys_inputs.height.values 

242 

243 interp_ds = xr.Dataset(coords=phys_inputs.coords) 

244 for var, da in phys_inputs.data_vars.items(): 

245 if "height" in da.dims: 

246 # Create 1D interpolation function for each flow case 

247 f_interp = interp1d( 

248 heights, 

249 da, 

250 axis=da.dims.index("height"), 

251 fill_value="extrapolate", 

252 ) 

253 # Create new DataArray with interpolated values 

254 new_dims = [dim for dim in da.dims if dim != "height"] 

255 interp_ds[var] = (new_dims, f_interp(hh)) 

256 else: 

257 # Keep non-height-dependent variables 

258 interp_ds[var] = da 

259 

260 phys_inputs = interp_ds 

261 else: 

262 print("Physical inputs have no 'height' dim, assuming hub-height or 0D.") 

263 

264 # Add all physical inputs to the results dataset 

265 for var in phys_inputs.data_vars: 

266 if var in result.coords: 

267 continue # Don't overwrite coords 

268 result[var] = phys_inputs[var] 

269 

270 # --- 5. Add farm-level features --- 

271 print("Adding farm-level features...") 

272 result = result.expand_dims(dim={"wind_farm": [case_name]}) 

273 result["turb_rated_power"] = xr.DataArray( 

274 [turb_rated_power], dims=["wind_farm"] 

275 ) 

276 result["nt"] = xr.DataArray([nt], dims=["wind_farm"]) 

277 

278 x = wf_dat["layouts"][0]["coordinates"]["x"] 

279 y = wf_dat["layouts"][0]["coordinates"]["y"] 

280 density = calc_boundary_area(x, y, show=False) / nt 

281 result["farm_density"] = xr.DataArray([density], dims=["wind_farm"]) 

282 

283 # --- 6. Flatten and add layout features --- 

284 print("Stacking dataset...") 

285 stacked = result.stack(case_index=("wind_farm", "flow_case")) 

286 stacked = stacked.dropna(dim="case_index", how="all", subset=["model_bias_cap"]) 

287 stacked = stacked.reset_index("case_index") # Makes case_index a variable 

288 

289 print("Adding layout-dependent features (Blockage, etc.)...") 

290 BR_farms, BD_farms, lengths, widths = [], [], [], [] 

291 

292 xy = np.column_stack((x, y)) 

293 wind_dirs = stacked.wind_direction.values # Get all wind directions at once 

294 

295 for wd in wind_dirs: 

296 # L_inf_factor=20.0, grid_res=151 

297 BR, BD, BR_farm, BD_farm = blockage_metrics( 

298 xy, wd, d, grid_res=51, plot=False 

299 ) 

300 length, width = farm_length_width(x, y, wd, d, plot=False) 

301 

302 BR_farms.append(BR_farm) 

303 BD_farms.append(BD_farm) 

304 lengths.append(length) 

305 widths.append(width) 

306 

307 stacked["Blockage_Ratio"] = xr.DataArray(BR_farms, dims=["case_index"]) 

308 stacked["Blocking_Distance"] = xr.DataArray(BD_farms, dims=["case_index"]) 

309 stacked["Farm_Length"] = xr.DataArray(lengths, dims=["case_index"]) 

310 stacked["Farm_Width"] = xr.DataArray(widths, dims=["case_index"]) 

311 

312 # --- 7. Save the final database --- 

313 print(f"Saving final database to: {self.output_db_path}") 

314 self.output_db_path.parent.mkdir(parents=True, exist_ok=True) 

315 stacked.to_netcdf(self.output_db_path) 

316 

317 tottime = round(time.time() - starttime, 3) 

318 print(f"Database generation complete. Total time: {tottime} seconds") 

319 

320 return stacked 

321 

322 

323# This check prevents this code from running when imported 

324if __name__ == "__main__": 

325 # You can add a simple test harness here for debugging this file directly 

326 print("This script is a module and is intended to be imported by 'workflow.py'.") 

327 print("To run a workflow, please use 'run.py' in the root directory.")