Coverage for wifa_uq / model_error_database / multi_farm_gen.py: 82%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-19 02:10 +0000

1""" 

2Multi-farm database generator for cross-validation studies. 

3 

4Generates combined databases from multiple wind farms, enabling 

5LeaveOneGroupOut cross-validation across farm groups. 

6""" 

7 

8from __future__ import annotations 

9 

10import logging 

11from pathlib import Path 

12 

13import numpy as np 

14import xarray as xr 

15 

16from wifa_uq.model_error_database.database_gen import DatabaseGenerator 

17from wifa_uq.model_error_database.path_inference import ( 

18 infer_paths_from_system_config, 

19 validate_required_paths, 

20) 

21from wifa_uq.preprocessing.preprocessing import PreprocessingInputs 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26class MultiFarmDatabaseGenerator: 

27 """ 

28 Generates a combined database from multiple wind farms. 

29 

30 Each farm config must specify: 

31 - name: Unique identifier for the farm 

32 - system_config: Path to wind energy system YAML (windIO format) 

33 

34 Optional (paths are auto-inferred if not provided): 

35 - reference_power: Path to reference power NetCDF 

36 - reference_resource: Path to reference resource NetCDF 

37 - wind_farm_layout: Path to wind farm layout YAML 

38 

39 Parameters 

40 ---------- 

41 farm_configs : list[dict] 

42 List of farm configurations with 'name' and 'system_config' keys 

43 (and optionally resolved paths from workflow.py) 

44 

45 param_config : dict 

46 Parameter sampling configuration (shared across all farms) 

47 

48 n_samples : int 

49 Number of parameter samples per farm 

50 

51 output_dir : Path 

52 Directory for output files 

53 

54 database_file : str 

55 Name of combined database file 

56 

57 model : str 

58 Flow model to use (default: 'pywake') 

59 

60 preprocessing_steps : list[str] 

61 Preprocessing steps to apply to each farm 

62 

63 run_preprocessing : bool 

64 Whether to run preprocessing 

65 """ 

66 

67 def __init__( 

68 self, 

69 farm_configs: list[dict], 

70 param_config: dict, 

71 n_samples: int, 

72 output_dir: Path, 

73 database_file: str = "combined_database.nc", 

74 model: str = "pywake", 

75 preprocessing_steps: list[str] = None, 

76 run_preprocessing: bool = True, 

77 ): 

78 self.farm_configs = farm_configs 

79 self.param_config = param_config 

80 self.n_samples = n_samples 

81 self.output_dir = Path(output_dir) 

82 self.database_file = database_file 

83 self.model = model 

84 self.preprocessing_steps = preprocessing_steps or [] 

85 self.run_preprocessing = run_preprocessing 

86 

87 self._validate_farm_configs() 

88 

89 def _validate_farm_configs(self) -> None: 

90 """Validate farm configurations.""" 

91 required_keys = {"name", "system_config"} 

92 

93 names = [] 

94 for i, farm in enumerate(self.farm_configs): 

95 missing = required_keys - set(farm.keys()) 

96 if missing: 

97 raise ValueError( 

98 f"Farm #{i + 1} missing required keys: {missing}. " 

99 f"Each farm must have 'name' and 'system_config'." 

100 ) 

101 

102 name = farm["name"] 

103 if name in names: 

104 raise ValueError(f"Duplicate farm name: '{name}'") 

105 names.append(name) 

106 

107 logger.info(f"Validated {len(self.farm_configs)} farm configurations") 

108 

109 def _ensure_farm_paths(self, farm_config: dict) -> dict: 

110 """ 

111 Ensure all required paths are present for a farm. 

112 

113 If paths were already resolved by workflow.py, use those. 

114 Otherwise, infer from system_config. 

115 """ 

116 farm_name = farm_config["name"] 

117 

118 # Check if paths are already resolved (Path objects present) 

119 required_paths = [ 

120 "system_config", 

121 "reference_power", 

122 "reference_resource", 

123 "wind_farm_layout", 

124 ] 

125 all_resolved = all( 

126 key in farm_config and isinstance(farm_config.get(key), Path) 

127 for key in required_paths 

128 ) 

129 

130 if all_resolved: 

131 # Already resolved by workflow.py 

132 return farm_config 

133 

134 # Need to infer paths 

135 system_config_path = Path(farm_config["system_config"]) 

136 

137 # Build explicit paths dict 

138 explicit_paths = {} 

139 for key in ["reference_power", "reference_resource", "wind_farm_layout"]: 

140 if key in farm_config and farm_config[key] is not None: 

141 explicit_paths[key] = Path(farm_config[key]) 

142 

143 print(f" Inferring paths for {farm_name}...") 

144 resolved = infer_paths_from_system_config( 

145 system_config_path=system_config_path, 

146 explicit_paths=explicit_paths, 

147 ) 

148 

149 # Validate 

150 validate_required_paths(resolved) 

151 

152 # Add name back 

153 resolved["name"] = farm_name 

154 

155 return resolved 

156 

157 def _generate_single_farm(self, farm_config: dict) -> xr.Dataset: 

158 """ 

159 Generate database for a single farm. 

160 

161 Parameters 

162 ---------- 

163 farm_config : dict 

164 Farm configuration with resolved paths 

165 

166 Returns 

167 ------- 

168 xr.Dataset 

169 Database with wind_farm coordinate set to farm name 

170 """ 

171 farm_name = farm_config["name"] 

172 print(f"\nProcessing farm: {farm_name}") 

173 

174 # Ensure all paths are resolved 

175 paths = self._ensure_farm_paths(farm_config) 

176 

177 print(f" system_config: {paths['system_config'].name}") 

178 print(f" reference_power: {paths['reference_power'].name}") 

179 print(f" reference_resource: {paths['reference_resource'].name}") 

180 print(f" wind_farm_layout: {paths['wind_farm_layout'].name}") 

181 

182 # Create farm-specific output directory 

183 farm_output_dir = self.output_dir / farm_name 

184 farm_output_dir.mkdir(parents=True, exist_ok=True) 

185 

186 # Preprocessing 

187 if self.run_preprocessing and self.preprocessing_steps: 

188 print(" Running preprocessing...") 

189 processed_resource_path = farm_output_dir / "processed_physical_inputs.nc" 

190 

191 preprocessor = PreprocessingInputs( 

192 ref_resource_path=paths["reference_resource"], 

193 output_path=processed_resource_path, 

194 steps=self.preprocessing_steps, 

195 ) 

196 preprocessor.run_pipeline() 

197 else: 

198 processed_resource_path = paths["reference_resource"] 

199 

200 # Database generation 

201 print(" Generating database...") 

202 database_path = farm_output_dir / "database.nc" 

203 

204 generator = DatabaseGenerator( 

205 nsamples=self.n_samples, 

206 param_config=self.param_config, 

207 system_yaml_path=paths["system_config"], 

208 ref_power_path=paths["reference_power"], 

209 processed_resource_path=processed_resource_path, 

210 wf_layout_path=paths["wind_farm_layout"], 

211 output_db_path=database_path, 

212 model=self.model, 

213 ) 

214 

215 db = generator.generate_database() 

216 

217 # Ensure wind_farm coordinate is set to our explicit name 

218 if "wind_farm" in db.dims: 

219 db = db.assign_coords(wind_farm=[farm_name]) 

220 

221 return db 

222 

223 def _combine_databases(self, databases: list[xr.Dataset]) -> xr.Dataset: 

224 """ 

225 Combine databases from multiple farms. 

226 

227 Re-indexes case_index to ensure uniqueness across farms. 

228 Preserves wind_farm coordinate for LeaveOneGroupOut CV. 

229 

230 Parameters 

231 ---------- 

232 databases : list[xr.Dataset] 

233 List of per-farm databases 

234 

235 Returns 

236 ------- 

237 xr.Dataset 

238 Combined database with unique case indices 

239 """ 

240 print(f"\nCombining {len(databases)} farm databases...") 

241 

242 # Re-index case_index to be unique across farms 

243 reindexed = [] 

244 offset = 0 

245 

246 for db in databases: 

247 # Get the farm name from the database 

248 if "wind_farm" in db.coords: 

249 farm_name = ( 

250 str(db.wind_farm.values[0]) 

251 if db.wind_farm.size == 1 

252 else str(db.wind_farm.values) 

253 ) 

254 else: 

255 farm_name = "unknown" 

256 

257 # Create new case indices 

258 n_cases = db.dims["case_index"] 

259 new_indices = np.arange(offset, offset + n_cases) 

260 

261 # Update dataset 

262 db_reindexed = db.assign_coords(case_index=new_indices) 

263 reindexed.append(db_reindexed) 

264 

265 print( 

266 f" {farm_name}: {n_cases} cases, indices {new_indices[0]}-{new_indices[-1]}" 

267 ) 

268 offset += n_cases 

269 

270 # Concatenate along case_index 

271 combined = xr.concat(reindexed, dim="case_index") 

272 

273 # Validate 

274 self._validate_combined_database(combined) 

275 

276 print(f"\nCombined database: {combined.dims['case_index']} total cases") 

277 

278 return combined 

279 

280 def _validate_combined_database(self, db: xr.Dataset) -> None: 

281 """Validate the combined database.""" 

282 # Check case_index uniqueness 

283 case_indices = db.case_index.values 

284 if len(case_indices) != len(np.unique(case_indices)): 

285 raise ValueError("case_index values are not unique") 

286 

287 # Check wind_farm coordinate exists 

288 if "wind_farm" not in db.coords: 

289 raise ValueError("wind_farm coordinate missing") 

290 

291 # Log summary by farm 

292 farms = np.unique(db.wind_farm.values) 

293 print("Farm summary:") 

294 for farm in farms: 

295 n = int((db.wind_farm == farm).sum()) 

296 print(f" {farm}: {n} cases") 

297 

298 def generate_database(self) -> xr.Dataset: 

299 """ 

300 Generate combined database from all farms. 

301 

302 Returns 

303 ------- 

304 xr.Dataset 

305 Combined database with: 

306 - Unique case_index across all farms 

307 - wind_farm coordinate for grouping 

308 - All variables from individual databases 

309 """ 

310 print( 

311 f"Starting multi-farm database generation for {len(self.farm_configs)} farms" 

312 ) 

313 

314 self.output_dir.mkdir(parents=True, exist_ok=True) 

315 

316 databases = [] 

317 

318 for farm_config in self.farm_configs: 

319 try: 

320 db = self._generate_single_farm(farm_config) 

321 databases.append(db) 

322 except Exception as e: 

323 print(f"ERROR: Failed to process farm '{farm_config['name']}': {e}") 

324 raise 

325 

326 # Combine 

327 combined = self._combine_databases(databases) 

328 

329 # Save 

330 output_path = self.output_dir / self.database_file 

331 combined.to_netcdf(output_path) 

332 print(f"\nSaved combined database to: {output_path}") 

333 

334 return combined 

335 

336 

337def generate_multi_farm_database( 

338 farm_configs: list[dict], 

339 param_config: dict, 

340 n_samples: int, 

341 output_dir: Path, 

342 database_file: str = "combined_database.nc", 

343 model: str = "pywake", 

344 preprocessing_steps: list[str] = None, 

345 run_preprocessing: bool = True, 

346) -> xr.Dataset: 

347 """ 

348 Convenience function to generate multi-farm database. 

349 

350 Parameters 

351 ---------- 

352 farm_configs : list[dict] 

353 List of farm configurations with 'name' and 'system_config' keys 

354 (paths can be pre-resolved or will be auto-inferred) 

355 param_config : dict 

356 Parameter sampling configuration 

357 n_samples : int 

358 Number of parameter samples 

359 output_dir : Path 

360 Output directory 

361 database_file : str 

362 Output filename 

363 model : str 

364 Flow model (default: 'pywake') 

365 preprocessing_steps : list[str] 

366 Preprocessing steps to apply 

367 run_preprocessing : bool 

368 Whether to run preprocessing 

369 

370 Returns 

371 ------- 

372 xr.Dataset 

373 Combined database 

374 """ 

375 generator = MultiFarmDatabaseGenerator( 

376 farm_configs=farm_configs, 

377 param_config=param_config, 

378 n_samples=n_samples, 

379 output_dir=output_dir, 

380 database_file=database_file, 

381 model=model, 

382 preprocessing_steps=preprocessing_steps, 

383 run_preprocessing=run_preprocessing, 

384 ) 

385 

386 return generator.generate_database()