Coverage for wifa_uq / postprocessing / calibration / basic_calibration.py: 77%
126 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 02:10 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 02:10 +0000
1# wifa_uq/postprocessing/calibration/basic_calibration.py
3import numpy as np
4import pandas as pd
5from sklearn.multioutput import MultiOutputRegressor
6from sklearn.ensemble import RandomForestRegressor
7from sklearn.base import clone
8import xgboost as xgb
9from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
12class DefaultParams:
13 """
14 Uses default parameter values from database metadata.
15 Returns the sample index closest to the default parameter values.
16 """
18 def __init__(self, dataset_train):
19 self.dataset_train = dataset_train
20 self.swept_params = dataset_train.attrs.get("swept_params", [])
21 self.param_defaults = dataset_train.attrs.get("param_defaults", {})
23 self.best_idx_ = None
24 self.best_params_ = None
26 def fit(self):
27 """Find sample closest to default parameter values."""
28 if self.param_defaults:
29 # Calculate distance to default for each sample
30 distances = np.zeros(len(self.dataset_train.sample))
31 for param_name, default_val in self.param_defaults.items():
32 if default_val is not None and param_name in self.dataset_train.coords:
33 param_values = self.dataset_train.coords[param_name].values
34 distances += (param_values - default_val) ** 2
36 self.best_idx_ = int(np.argmin(distances))
37 else:
38 self.best_idx_ = 0 # Fallback to first sample
40 # Extract parameters at this index
41 self.best_params_ = {}
42 for param_name in self.swept_params:
43 if param_name in self.dataset_train.coords:
44 self.best_params_[param_name] = float(
45 self.dataset_train.coords[param_name]
46 .isel(sample=self.best_idx_)
47 .values
48 )
50 return self
53class MinBiasCalibrator:
54 """
55 Finds the sample with minimum total absolute bias.
56 Works with ANY swept parameters stored in the database.
58 This is a GLOBAL calibrator - returns a single set of parameters
59 for the entire dataset.
60 """
62 def __init__(self, dataset_train):
63 self.dataset_train = dataset_train
64 self.swept_params = dataset_train.attrs.get("swept_params", [])
65 if not self.swept_params:
66 self.swept_params = self._infer_swept_params()
68 self.best_idx_ = None
69 self.best_params_ = None
71 def _infer_swept_params(self):
72 """Infer which coordinates are swept parameters."""
73 swept = []
74 for coord_name in self.dataset_train.coords:
75 coord = self.dataset_train.coords[coord_name]
76 if "sample" in coord.dims and coord_name != "sample":
77 swept.append(coord_name)
78 return swept
80 def fit(self):
81 """Find sample index that minimizes total absolute bias."""
82 abs_total_bias = np.abs(
83 self.dataset_train["model_bias_cap"].sum(dim="case_index")
84 )
85 self.best_idx_ = int(abs_total_bias.argmin().values)
87 # Extract best parameters dynamically
88 self.best_params_ = {}
89 for param_name in self.swept_params:
90 if param_name in self.dataset_train.coords:
91 self.best_params_[param_name] = float(
92 self.dataset_train.coords[param_name]
93 .isel(sample=self.best_idx_)
94 .values
95 )
97 return self
100class LocalParameterPredictor:
101 """
102 Predicts optimal parameters as a function of flow conditions.
103 Works with ANY swept parameters stored in the database.
105 This is a LOCAL calibrator - predicts different parameters for each
106 flow case based on input features.
108 Interface differs from global calibrators:
109 - fit() trains an ML model
110 - predict(X) returns optimal params for new conditions
111 - get_optimal_indices() returns per-case optimal sample indices for training data
112 """
114 def __init__(
115 self,
116 dataset_train,
117 feature_names,
118 regressor=None,
119 regressor_name=None,
120 regressor_params=None,
121 ):
122 self.dataset_train = dataset_train
123 self.feature_names = feature_names
124 self.swept_params = dataset_train.attrs.get("swept_params", [])
126 if not self.swept_params:
127 self.swept_params = self._infer_swept_params()
129 # Build regressor from name/params if provided, otherwise use passed regressor or default
130 if regressor_name is not None:
131 base_regressor = self._build_regressor(
132 regressor_name, regressor_params or {}
133 )
134 elif regressor is None:
135 base_regressor = RandomForestRegressor(n_estimators=100, random_state=42)
136 else:
137 base_regressor = regressor
139 # Wrap in MultiOutputRegressor if we have multiple parameters
140 if len(self.swept_params) > 1:
141 self.regressor = MultiOutputRegressor(base_regressor)
142 else:
143 self.regressor = clone(base_regressor)
145 self.is_fitted = False
146 self.optimal_indices_ = None # Per-case optimal sample indices
147 self.optimal_params_ = None # Per-case optimal parameter values
149 def _infer_swept_params(self):
150 """Infer which coordinates are swept parameters."""
151 swept = []
152 for coord_name in self.dataset_train.coords:
153 coord = self.dataset_train.coords[coord_name]
154 if "sample" in coord.dims and coord_name != "sample":
155 swept.append(coord_name)
156 return swept
158 def _build_regressor(self, name: str, params: dict):
159 """Build a regressor from name and params."""
161 if name == "RandomForest":
162 return RandomForestRegressor(
163 **params, random_state=params.get("random_state", 42)
164 )
165 elif name == "Linear":
166 return LinearRegression()
167 elif name == "Ridge":
168 return Ridge(alpha=params.get("alpha", 1.0))
169 elif name == "Lasso":
170 return Lasso(alpha=params.get("alpha", 1.0))
171 elif name == "ElasticNet":
172 return ElasticNet(
173 alpha=params.get("alpha", 1.0), l1_ratio=params.get("l1_ratio", 0.5)
174 )
175 elif name == "XGB":
176 return xgb.XGBRegressor(**params)
177 else:
178 raise ValueError(f"Unknown regressor '{name}' for LocalParameterPredictor.")
180 def fit(self):
181 """
182 For each flow case, find optimal parameters, then train
183 ML model to predict optimal params from features.
184 """
185 n_cases = len(self.dataset_train.case_index)
186 # n_samples = len(self.dataset_train.sample)
188 # Find optimal parameters for each case
189 self.optimal_indices_ = np.zeros(n_cases, dtype=int)
190 self.optimal_params_ = {p: np.zeros(n_cases) for p in self.swept_params}
192 for case_idx in range(n_cases):
193 # Get bias values across all samples for this case
194 bias_values = (
195 self.dataset_train["model_bias_cap"].isel(case_index=case_idx).values
196 )
197 best_sample_idx = int(np.argmin(np.abs(bias_values)))
199 self.optimal_indices_[case_idx] = best_sample_idx
201 for param_name in self.swept_params:
202 if param_name in self.dataset_train.coords:
203 param_val = float(
204 self.dataset_train.coords[param_name]
205 .isel(sample=best_sample_idx)
206 .values
207 )
208 self.optimal_params_[param_name][case_idx] = param_val
210 # Build training data for ML model
211 # Use sample=0 to get feature values (they're the same across samples)
212 X_df = self.dataset_train.isel(sample=0).to_dataframe().reset_index()
214 # Check that all features exist
215 missing_features = [f for f in self.feature_names if f not in X_df.columns]
216 if missing_features:
217 raise ValueError(f"Features not found in dataset: {missing_features}")
219 X = X_df[self.feature_names].values
221 # Target is optimal parameter values
222 if len(self.swept_params) == 1:
223 y = self.optimal_params_[self.swept_params[0]]
224 else:
225 y = np.column_stack([self.optimal_params_[p] for p in self.swept_params])
227 # Train regressor
228 self.regressor.fit(X, y)
229 self.is_fitted = True
231 return self
233 def predict(self, X):
234 """
235 Predict optimal parameters for new flow conditions.
237 Args:
238 X: Feature matrix (n_cases, n_features) or DataFrame
240 Returns:
241 DataFrame with columns for each swept parameter
242 """
243 if not self.is_fitted:
244 raise RuntimeError("Must call fit() before predict()")
246 if hasattr(X, "values"):
247 X = X.values
249 predictions = self.regressor.predict(X)
251 if len(self.swept_params) == 1:
252 predictions = predictions.reshape(-1, 1)
254 return pd.DataFrame(predictions, columns=self.swept_params)
256 def get_optimal_indices(self):
257 """
258 Get the optimal sample indices for the training data.
259 Used by MainPipeline to extract training targets.
260 """
261 if self.optimal_indices_ is None:
262 raise RuntimeError("Must call fit() first")
263 return self.optimal_indices_