Coverage for wifa_uq / model_error_database / database_gen.py: 88%
155 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 02:10 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 02:10 +0000
1import xarray as xr
2import numpy as np
3import time
4import re # Import regular expression library
5from pathlib import Path
6from scipy.interpolate import interp1d
7from windIO.yaml import load_yaml
8from wifa import run_pywake, run_foxes
9from wifa_uq.model_error_database.run_sweep import run_parameter_sweep
10from wifa_uq.model_error_database.utils import (
11 calc_boundary_area,
12 blockage_metrics,
13 farm_length_width,
14)
17class DatabaseGenerator:
18 def __init__(
19 self,
20 nsamples: int,
21 param_config: dict,
22 system_yaml_path: Path,
23 ref_power_path: Path,
24 processed_resource_path: Path,
25 wf_layout_path: Path,
26 output_db_path: Path,
27 model="pywake",
28 ):
29 """
30 Initializes the DatabaseGenerator.
32 Args:
33 nsamples (int): Number of parameter samples to run.
34 param_config (dict): Dictionary of parameters to sample.
35 system_yaml_path (Path): Path to the windIO system YAML file.
36 ref_power_path (Path): Path to the reference power NetCDF file (the "truth" data).
37 processed_resource_path (Path): Path to the *preprocessed* physical inputs NetCDF.
38 wf_layout_path (Path): Path to the wind_farm YAML (for layout utils).
39 output_db_path (Path): Full path to save the final stacked NetCDF database.
40 model (str, optional): Model to use. Defaults to "pywake".
41 """
42 self.nsamples = nsamples
43 self.param_config = self._normalize_param_config(param_config)
44 self.model = model
45 self.system_yaml_path = Path(system_yaml_path)
46 self.ref_power_path = Path(ref_power_path)
47 self.processed_resource_path = Path(processed_resource_path)
48 self.wf_layout_path = Path(wf_layout_path)
49 self.output_db_path = Path(output_db_path)
51 # Ensure output directory exists
52 self.output_db_path.parent.mkdir(parents=True, exist_ok=True)
54 # Validate input paths
55 if not self.system_yaml_path.exists():
56 raise FileNotFoundError(f"System YAML not found: {self.system_yaml_path}")
57 if not self.ref_power_path.exists():
58 raise FileNotFoundError(
59 f"Reference power file not found: {self.ref_power_path}"
60 )
61 if not self.processed_resource_path.exists():
62 raise FileNotFoundError(
63 f"Processed resource file not found: {self.processed_resource_path}"
64 )
65 if not self.wf_layout_path.exists():
66 raise FileNotFoundError(
67 f"Wind farm layout file not found: {self.wf_layout_path}"
68 )
70 def _normalize_param_config(self, param_config: dict) -> dict:
71 """
72 Normalize param_config to full format.
73 Handles both simple [min, max] and full {range, default, short_name} formats.
74 """
75 normalized = {}
76 for path, config in param_config.items():
77 if isinstance(config, list):
78 # Simple format: [min, max]
79 short_name = path.split(".")[-1] # Use last part of path
80 normalized[path] = {
81 "range": config,
82 "default": None,
83 "short_name": short_name,
84 }
85 elif isinstance(config, dict):
86 # Full format
87 if "short_name" not in config:
88 config["short_name"] = path.split(".")[-1]
89 normalized[path] = config
90 else:
91 raise ValueError(f"Invalid param_config format for {path}")
93 return normalized
95 def _infer_rated_power(self, wf_dat: dict, system_dat: dict) -> float:
96 """
97 Attempts to find the rated power using multiple strategies, in order.
98 """
99 # The 'turbines' dict could be at the top level of wf_dat
100 # or inside system_dat['wind_farm']
101 turbine_data_sources = [
102 wf_dat.get("turbines"),
103 system_dat.get("wind_farm", {}).get("turbines"),
104 ]
106 for turbine_data in turbine_data_sources:
107 if not turbine_data:
108 continue
110 # --- Strategy 1: Check for explicit 'rated_power' key ---
111 try:
112 power = turbine_data["performance"]["rated_power"]
113 if power:
114 print(f"Found 'rated_power' key: {power} W")
115 return float(power)
116 except (KeyError, TypeError):
117 pass # Not found, try next strategy
119 # --- Strategy 2: Get max from 'power_curve' ---
120 try:
121 power_values = turbine_data["performance"]["power_curve"][
122 "power_values"
123 ]
124 if power_values:
125 power = max(power_values)
126 print(f"Found 'power_curve'. Max power: {power} W")
127 return float(power)
128 except (KeyError, TypeError):
129 pass # Not found, try next strategy
131 # --- Strategy 3: Parse the turbine 'name' ---
132 try:
133 name = turbine_data["name"]
134 # Regex to find numbers (int or float) followed by "MW" (case-insensitive)
135 match = re.search(r"(\d+(\.\d+)?)\s*MW", name, re.IGNORECASE)
136 if match:
137 power_mw = float(match.group(1))
138 power_w = power_mw * 1_000_000
139 print(
140 f"Inferred rated power from turbine name '{name}': {power_w} W"
141 )
142 return power_w
143 except (KeyError, TypeError):
144 pass # Not found, try next strategy
146 # All strategies failed
147 raise ValueError(
148 "Could not find or infer 'rated_power'.\n"
149 "Tried: \n"
150 " 1. 'rated_power' key in '...turbines.performance'.\n"
151 " 2. 'max(power_curve.power_values)' in '...turbines.performance'.\n"
152 " 3. Parsing 'XMW' from '...turbines.name' field.\n"
153 "Please add one of these to your windIO turbine file."
154 )
156 def generate_database(self) -> xr.Dataset:
157 """
158 Runs the full database generation pipeline for the single specified case.
159 """
160 starttime = time.time()
162 # --- 1. Load all data from explicit paths ---
163 print(f"Loading system config: {self.system_yaml_path.name}")
164 dat = load_yaml(self.system_yaml_path)
165 print(f"Loading reference power: {self.ref_power_path.name}")
166 reference_power = xr.load_dataset(self.ref_power_path)
167 print(f"Loading processed resource: {self.processed_resource_path.name}")
168 reference_physical_inputs = xr.load_dataset(self.processed_resource_path)
169 print(f"Loading wind farm layout: {self.wf_layout_path.name}")
170 wf_dat = load_yaml(self.wf_layout_path)
172 # --- 2. Infer metadata (replaces meta.yaml) ---
174 # --- NEW ROBUST INFERENCE ---
175 turb_rated_power = self._infer_rated_power(wf_dat, dat)
177 # Get other metadata from the loaded files
178 nt = len(wf_dat["layouts"][0]["coordinates"]["x"])
179 hh = wf_dat["turbines"]["hub_height"]
180 d = wf_dat["turbines"]["rotor_diameter"]
181 case_name = wf_dat.get("name", self.system_yaml_path.stem)
183 print(
184 f"Case: {case_name}, {nt} turbines, Rated Power: {turb_rated_power / 1e6:.1f} MW, Hub Height: {hh} m"
185 )
187 # --- 3. Run parameter sweep ---
188 output_dir = self.output_db_path.parent / "samples"
189 output_dir.mkdir(exist_ok=True)
191 if self.model == "pywake":
192 result = run_parameter_sweep(
193 run_pywake,
194 turb_rated_power,
195 dat,
196 self.param_config,
197 reference_power,
198 None, # reference_physical_inputs is not actually used by run_pywake_sweep
199 n_samples=self.nsamples,
200 seed=1,
201 output_dir=output_dir,
202 )
203 elif self.model == "foxes":
204 result = run_parameter_sweep(
205 run_foxes,
206 turb_rated_power,
207 dat,
208 self.param_config,
209 reference_power,
210 None, # reference_physical_inputs is not actually used by run_pywake_sweep
211 n_samples=self.nsamples,
212 seed=1,
213 output_dir=output_dir,
214 run_func_kwargs={"verbosity": 0},
215 )
216 else:
217 raise NotImplementedError(f"Model '{self.model}' not implemented yet.")
219 print("Parameter sweep complete. Processing physical inputs...")
221 # --- 4. Process and add physical inputs ---
222 phys_inputs = reference_physical_inputs.copy()
224 # Rename 'time' to 'flow_case' to match run_pywake_sweep output
225 if "time" in phys_inputs.dims:
226 # Check if flow_case dimension already exists from run_pywake_sweep
227 n_flow_cases = len(result.flow_case)
228 if len(phys_inputs.time) != n_flow_cases:
229 raise ValueError(
230 f"Mismatch in 'time' dimension of resource ({len(phys_inputs.time)}) "
231 f"and 'flow_case' dimension of simulation ({n_flow_cases})."
232 )
234 # Use the coordinates from the simulation result, but data from resource
235 phys_inputs = phys_inputs.rename({"time": "flow_case"})
236 phys_inputs = phys_inputs.assign_coords(flow_case=result.flow_case)
238 # Interpolate to hub height if 'height' dimension exists
239 if "height" in phys_inputs.dims:
240 print(f"Interpolating physical inputs to hub height ({hh} m)...")
241 heights = phys_inputs.height.values
243 interp_ds = xr.Dataset(coords=phys_inputs.coords)
244 for var, da in phys_inputs.data_vars.items():
245 if "height" in da.dims:
246 # Create 1D interpolation function for each flow case
247 f_interp = interp1d(
248 heights,
249 da,
250 axis=da.dims.index("height"),
251 fill_value="extrapolate",
252 )
253 # Create new DataArray with interpolated values
254 new_dims = [dim for dim in da.dims if dim != "height"]
255 interp_ds[var] = (new_dims, f_interp(hh))
256 else:
257 # Keep non-height-dependent variables
258 interp_ds[var] = da
260 phys_inputs = interp_ds
261 else:
262 print("Physical inputs have no 'height' dim, assuming hub-height or 0D.")
264 # Add all physical inputs to the results dataset
265 for var in phys_inputs.data_vars:
266 if var in result.coords:
267 continue # Don't overwrite coords
268 result[var] = phys_inputs[var]
270 # --- 5. Add farm-level features ---
271 print("Adding farm-level features...")
272 result = result.expand_dims(dim={"wind_farm": [case_name]})
273 result["turb_rated_power"] = xr.DataArray(
274 [turb_rated_power], dims=["wind_farm"]
275 )
276 result["nt"] = xr.DataArray([nt], dims=["wind_farm"])
278 x = wf_dat["layouts"][0]["coordinates"]["x"]
279 y = wf_dat["layouts"][0]["coordinates"]["y"]
280 density = calc_boundary_area(x, y, show=False) / nt
281 result["farm_density"] = xr.DataArray([density], dims=["wind_farm"])
283 # --- 6. Flatten and add layout features ---
284 print("Stacking dataset...")
285 stacked = result.stack(case_index=("wind_farm", "flow_case"))
286 stacked = stacked.dropna(dim="case_index", how="all", subset=["model_bias_cap"])
287 stacked = stacked.reset_index("case_index") # Makes case_index a variable
289 print("Adding layout-dependent features (Blockage, etc.)...")
290 BR_farms, BD_farms, lengths, widths = [], [], [], []
292 xy = np.column_stack((x, y))
293 wind_dirs = stacked.wind_direction.values # Get all wind directions at once
295 for wd in wind_dirs:
296 # L_inf_factor=20.0, grid_res=151
297 BR, BD, BR_farm, BD_farm = blockage_metrics(
298 xy, wd, d, grid_res=51, plot=False
299 )
300 length, width = farm_length_width(x, y, wd, d, plot=False)
302 BR_farms.append(BR_farm)
303 BD_farms.append(BD_farm)
304 lengths.append(length)
305 widths.append(width)
307 stacked["Blockage_Ratio"] = xr.DataArray(BR_farms, dims=["case_index"])
308 stacked["Blocking_Distance"] = xr.DataArray(BD_farms, dims=["case_index"])
309 stacked["Farm_Length"] = xr.DataArray(lengths, dims=["case_index"])
310 stacked["Farm_Width"] = xr.DataArray(widths, dims=["case_index"])
312 # --- 7. Save the final database ---
313 print(f"Saving final database to: {self.output_db_path}")
314 self.output_db_path.parent.mkdir(parents=True, exist_ok=True)
315 stacked.to_netcdf(self.output_db_path)
317 tottime = round(time.time() - starttime, 3)
318 print(f"Database generation complete. Total time: {tottime} seconds")
320 return stacked
323# This check prevents this code from running when imported
324if __name__ == "__main__":
325 # You can add a simple test harness here for debugging this file directly
326 print("This script is a module and is intended to be imported by 'workflow.py'.")
327 print("To run a workflow, please use 'run.py' in the root directory.")