Coverage for wifa_uq / model_error_database / multi_farm_gen.py: 82%
119 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 02:10 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 02:10 +0000
1"""
2Multi-farm database generator for cross-validation studies.
4Generates combined databases from multiple wind farms, enabling
5LeaveOneGroupOut cross-validation across farm groups.
6"""
8from __future__ import annotations
10import logging
11from pathlib import Path
13import numpy as np
14import xarray as xr
16from wifa_uq.model_error_database.database_gen import DatabaseGenerator
17from wifa_uq.model_error_database.path_inference import (
18 infer_paths_from_system_config,
19 validate_required_paths,
20)
21from wifa_uq.preprocessing.preprocessing import PreprocessingInputs
23logger = logging.getLogger(__name__)
26class MultiFarmDatabaseGenerator:
27 """
28 Generates a combined database from multiple wind farms.
30 Each farm config must specify:
31 - name: Unique identifier for the farm
32 - system_config: Path to wind energy system YAML (windIO format)
34 Optional (paths are auto-inferred if not provided):
35 - reference_power: Path to reference power NetCDF
36 - reference_resource: Path to reference resource NetCDF
37 - wind_farm_layout: Path to wind farm layout YAML
39 Parameters
40 ----------
41 farm_configs : list[dict]
42 List of farm configurations with 'name' and 'system_config' keys
43 (and optionally resolved paths from workflow.py)
45 param_config : dict
46 Parameter sampling configuration (shared across all farms)
48 n_samples : int
49 Number of parameter samples per farm
51 output_dir : Path
52 Directory for output files
54 database_file : str
55 Name of combined database file
57 model : str
58 Flow model to use (default: 'pywake')
60 preprocessing_steps : list[str]
61 Preprocessing steps to apply to each farm
63 run_preprocessing : bool
64 Whether to run preprocessing
65 """
67 def __init__(
68 self,
69 farm_configs: list[dict],
70 param_config: dict,
71 n_samples: int,
72 output_dir: Path,
73 database_file: str = "combined_database.nc",
74 model: str = "pywake",
75 preprocessing_steps: list[str] = None,
76 run_preprocessing: bool = True,
77 ):
78 self.farm_configs = farm_configs
79 self.param_config = param_config
80 self.n_samples = n_samples
81 self.output_dir = Path(output_dir)
82 self.database_file = database_file
83 self.model = model
84 self.preprocessing_steps = preprocessing_steps or []
85 self.run_preprocessing = run_preprocessing
87 self._validate_farm_configs()
89 def _validate_farm_configs(self) -> None:
90 """Validate farm configurations."""
91 required_keys = {"name", "system_config"}
93 names = []
94 for i, farm in enumerate(self.farm_configs):
95 missing = required_keys - set(farm.keys())
96 if missing:
97 raise ValueError(
98 f"Farm #{i + 1} missing required keys: {missing}. "
99 f"Each farm must have 'name' and 'system_config'."
100 )
102 name = farm["name"]
103 if name in names:
104 raise ValueError(f"Duplicate farm name: '{name}'")
105 names.append(name)
107 logger.info(f"Validated {len(self.farm_configs)} farm configurations")
109 def _ensure_farm_paths(self, farm_config: dict) -> dict:
110 """
111 Ensure all required paths are present for a farm.
113 If paths were already resolved by workflow.py, use those.
114 Otherwise, infer from system_config.
115 """
116 farm_name = farm_config["name"]
118 # Check if paths are already resolved (Path objects present)
119 required_paths = [
120 "system_config",
121 "reference_power",
122 "reference_resource",
123 "wind_farm_layout",
124 ]
125 all_resolved = all(
126 key in farm_config and isinstance(farm_config.get(key), Path)
127 for key in required_paths
128 )
130 if all_resolved:
131 # Already resolved by workflow.py
132 return farm_config
134 # Need to infer paths
135 system_config_path = Path(farm_config["system_config"])
137 # Build explicit paths dict
138 explicit_paths = {}
139 for key in ["reference_power", "reference_resource", "wind_farm_layout"]:
140 if key in farm_config and farm_config[key] is not None:
141 explicit_paths[key] = Path(farm_config[key])
143 print(f" Inferring paths for {farm_name}...")
144 resolved = infer_paths_from_system_config(
145 system_config_path=system_config_path,
146 explicit_paths=explicit_paths,
147 )
149 # Validate
150 validate_required_paths(resolved)
152 # Add name back
153 resolved["name"] = farm_name
155 return resolved
157 def _generate_single_farm(self, farm_config: dict) -> xr.Dataset:
158 """
159 Generate database for a single farm.
161 Parameters
162 ----------
163 farm_config : dict
164 Farm configuration with resolved paths
166 Returns
167 -------
168 xr.Dataset
169 Database with wind_farm coordinate set to farm name
170 """
171 farm_name = farm_config["name"]
172 print(f"\nProcessing farm: {farm_name}")
174 # Ensure all paths are resolved
175 paths = self._ensure_farm_paths(farm_config)
177 print(f" system_config: {paths['system_config'].name}")
178 print(f" reference_power: {paths['reference_power'].name}")
179 print(f" reference_resource: {paths['reference_resource'].name}")
180 print(f" wind_farm_layout: {paths['wind_farm_layout'].name}")
182 # Create farm-specific output directory
183 farm_output_dir = self.output_dir / farm_name
184 farm_output_dir.mkdir(parents=True, exist_ok=True)
186 # Preprocessing
187 if self.run_preprocessing and self.preprocessing_steps:
188 print(" Running preprocessing...")
189 processed_resource_path = farm_output_dir / "processed_physical_inputs.nc"
191 preprocessor = PreprocessingInputs(
192 ref_resource_path=paths["reference_resource"],
193 output_path=processed_resource_path,
194 steps=self.preprocessing_steps,
195 )
196 preprocessor.run_pipeline()
197 else:
198 processed_resource_path = paths["reference_resource"]
200 # Database generation
201 print(" Generating database...")
202 database_path = farm_output_dir / "database.nc"
204 generator = DatabaseGenerator(
205 nsamples=self.n_samples,
206 param_config=self.param_config,
207 system_yaml_path=paths["system_config"],
208 ref_power_path=paths["reference_power"],
209 processed_resource_path=processed_resource_path,
210 wf_layout_path=paths["wind_farm_layout"],
211 output_db_path=database_path,
212 model=self.model,
213 )
215 db = generator.generate_database()
217 # Ensure wind_farm coordinate is set to our explicit name
218 if "wind_farm" in db.dims:
219 db = db.assign_coords(wind_farm=[farm_name])
221 return db
223 def _combine_databases(self, databases: list[xr.Dataset]) -> xr.Dataset:
224 """
225 Combine databases from multiple farms.
227 Re-indexes case_index to ensure uniqueness across farms.
228 Preserves wind_farm coordinate for LeaveOneGroupOut CV.
230 Parameters
231 ----------
232 databases : list[xr.Dataset]
233 List of per-farm databases
235 Returns
236 -------
237 xr.Dataset
238 Combined database with unique case indices
239 """
240 print(f"\nCombining {len(databases)} farm databases...")
242 # Re-index case_index to be unique across farms
243 reindexed = []
244 offset = 0
246 for db in databases:
247 # Get the farm name from the database
248 if "wind_farm" in db.coords:
249 farm_name = (
250 str(db.wind_farm.values[0])
251 if db.wind_farm.size == 1
252 else str(db.wind_farm.values)
253 )
254 else:
255 farm_name = "unknown"
257 # Create new case indices
258 n_cases = db.dims["case_index"]
259 new_indices = np.arange(offset, offset + n_cases)
261 # Update dataset
262 db_reindexed = db.assign_coords(case_index=new_indices)
263 reindexed.append(db_reindexed)
265 print(
266 f" {farm_name}: {n_cases} cases, indices {new_indices[0]}-{new_indices[-1]}"
267 )
268 offset += n_cases
270 # Concatenate along case_index
271 combined = xr.concat(reindexed, dim="case_index")
273 # Validate
274 self._validate_combined_database(combined)
276 print(f"\nCombined database: {combined.dims['case_index']} total cases")
278 return combined
280 def _validate_combined_database(self, db: xr.Dataset) -> None:
281 """Validate the combined database."""
282 # Check case_index uniqueness
283 case_indices = db.case_index.values
284 if len(case_indices) != len(np.unique(case_indices)):
285 raise ValueError("case_index values are not unique")
287 # Check wind_farm coordinate exists
288 if "wind_farm" not in db.coords:
289 raise ValueError("wind_farm coordinate missing")
291 # Log summary by farm
292 farms = np.unique(db.wind_farm.values)
293 print("Farm summary:")
294 for farm in farms:
295 n = int((db.wind_farm == farm).sum())
296 print(f" {farm}: {n} cases")
298 def generate_database(self) -> xr.Dataset:
299 """
300 Generate combined database from all farms.
302 Returns
303 -------
304 xr.Dataset
305 Combined database with:
306 - Unique case_index across all farms
307 - wind_farm coordinate for grouping
308 - All variables from individual databases
309 """
310 print(
311 f"Starting multi-farm database generation for {len(self.farm_configs)} farms"
312 )
314 self.output_dir.mkdir(parents=True, exist_ok=True)
316 databases = []
318 for farm_config in self.farm_configs:
319 try:
320 db = self._generate_single_farm(farm_config)
321 databases.append(db)
322 except Exception as e:
323 print(f"ERROR: Failed to process farm '{farm_config['name']}': {e}")
324 raise
326 # Combine
327 combined = self._combine_databases(databases)
329 # Save
330 output_path = self.output_dir / self.database_file
331 combined.to_netcdf(output_path)
332 print(f"\nSaved combined database to: {output_path}")
334 return combined
337def generate_multi_farm_database(
338 farm_configs: list[dict],
339 param_config: dict,
340 n_samples: int,
341 output_dir: Path,
342 database_file: str = "combined_database.nc",
343 model: str = "pywake",
344 preprocessing_steps: list[str] = None,
345 run_preprocessing: bool = True,
346) -> xr.Dataset:
347 """
348 Convenience function to generate multi-farm database.
350 Parameters
351 ----------
352 farm_configs : list[dict]
353 List of farm configurations with 'name' and 'system_config' keys
354 (paths can be pre-resolved or will be auto-inferred)
355 param_config : dict
356 Parameter sampling configuration
357 n_samples : int
358 Number of parameter samples
359 output_dir : Path
360 Output directory
361 database_file : str
362 Output filename
363 model : str
364 Flow model (default: 'pywake')
365 preprocessing_steps : list[str]
366 Preprocessing steps to apply
367 run_preprocessing : bool
368 Whether to run preprocessing
370 Returns
371 -------
372 xr.Dataset
373 Combined database
374 """
375 generator = MultiFarmDatabaseGenerator(
376 farm_configs=farm_configs,
377 param_config=param_config,
378 n_samples=n_samples,
379 output_dir=output_dir,
380 database_file=database_file,
381 model=model,
382 preprocessing_steps=preprocessing_steps,
383 run_preprocessing=run_preprocessing,
384 )
386 return generator.generate_database()