Coverage for wifa_uq / postprocessing / calibration / basic_calibration.py: 77%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-19 02:10 +0000

1# wifa_uq/postprocessing/calibration/basic_calibration.py 

2 

3import numpy as np 

4import pandas as pd 

5from sklearn.multioutput import MultiOutputRegressor 

6from sklearn.ensemble import RandomForestRegressor 

7from sklearn.base import clone 

8import xgboost as xgb 

9from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet 

10 

11 

12class DefaultParams: 

13 """ 

14 Uses default parameter values from database metadata. 

15 Returns the sample index closest to the default parameter values. 

16 """ 

17 

18 def __init__(self, dataset_train): 

19 self.dataset_train = dataset_train 

20 self.swept_params = dataset_train.attrs.get("swept_params", []) 

21 self.param_defaults = dataset_train.attrs.get("param_defaults", {}) 

22 

23 self.best_idx_ = None 

24 self.best_params_ = None 

25 

26 def fit(self): 

27 """Find sample closest to default parameter values.""" 

28 if self.param_defaults: 

29 # Calculate distance to default for each sample 

30 distances = np.zeros(len(self.dataset_train.sample)) 

31 for param_name, default_val in self.param_defaults.items(): 

32 if default_val is not None and param_name in self.dataset_train.coords: 

33 param_values = self.dataset_train.coords[param_name].values 

34 distances += (param_values - default_val) ** 2 

35 

36 self.best_idx_ = int(np.argmin(distances)) 

37 else: 

38 self.best_idx_ = 0 # Fallback to first sample 

39 

40 # Extract parameters at this index 

41 self.best_params_ = {} 

42 for param_name in self.swept_params: 

43 if param_name in self.dataset_train.coords: 

44 self.best_params_[param_name] = float( 

45 self.dataset_train.coords[param_name] 

46 .isel(sample=self.best_idx_) 

47 .values 

48 ) 

49 

50 return self 

51 

52 

53class MinBiasCalibrator: 

54 """ 

55 Finds the sample with minimum total absolute bias. 

56 Works with ANY swept parameters stored in the database. 

57 

58 This is a GLOBAL calibrator - returns a single set of parameters 

59 for the entire dataset. 

60 """ 

61 

62 def __init__(self, dataset_train): 

63 self.dataset_train = dataset_train 

64 self.swept_params = dataset_train.attrs.get("swept_params", []) 

65 if not self.swept_params: 

66 self.swept_params = self._infer_swept_params() 

67 

68 self.best_idx_ = None 

69 self.best_params_ = None 

70 

71 def _infer_swept_params(self): 

72 """Infer which coordinates are swept parameters.""" 

73 swept = [] 

74 for coord_name in self.dataset_train.coords: 

75 coord = self.dataset_train.coords[coord_name] 

76 if "sample" in coord.dims and coord_name != "sample": 

77 swept.append(coord_name) 

78 return swept 

79 

80 def fit(self): 

81 """Find sample index that minimizes total absolute bias.""" 

82 abs_total_bias = np.abs( 

83 self.dataset_train["model_bias_cap"].sum(dim="case_index") 

84 ) 

85 self.best_idx_ = int(abs_total_bias.argmin().values) 

86 

87 # Extract best parameters dynamically 

88 self.best_params_ = {} 

89 for param_name in self.swept_params: 

90 if param_name in self.dataset_train.coords: 

91 self.best_params_[param_name] = float( 

92 self.dataset_train.coords[param_name] 

93 .isel(sample=self.best_idx_) 

94 .values 

95 ) 

96 

97 return self 

98 

99 

100class LocalParameterPredictor: 

101 """ 

102 Predicts optimal parameters as a function of flow conditions. 

103 Works with ANY swept parameters stored in the database. 

104 

105 This is a LOCAL calibrator - predicts different parameters for each 

106 flow case based on input features. 

107 

108 Interface differs from global calibrators: 

109 - fit() trains an ML model 

110 - predict(X) returns optimal params for new conditions 

111 - get_optimal_indices() returns per-case optimal sample indices for training data 

112 """ 

113 

114 def __init__( 

115 self, 

116 dataset_train, 

117 feature_names, 

118 regressor=None, 

119 regressor_name=None, 

120 regressor_params=None, 

121 ): 

122 self.dataset_train = dataset_train 

123 self.feature_names = feature_names 

124 self.swept_params = dataset_train.attrs.get("swept_params", []) 

125 

126 if not self.swept_params: 

127 self.swept_params = self._infer_swept_params() 

128 

129 # Build regressor from name/params if provided, otherwise use passed regressor or default 

130 if regressor_name is not None: 

131 base_regressor = self._build_regressor( 

132 regressor_name, regressor_params or {} 

133 ) 

134 elif regressor is None: 

135 base_regressor = RandomForestRegressor(n_estimators=100, random_state=42) 

136 else: 

137 base_regressor = regressor 

138 

139 # Wrap in MultiOutputRegressor if we have multiple parameters 

140 if len(self.swept_params) > 1: 

141 self.regressor = MultiOutputRegressor(base_regressor) 

142 else: 

143 self.regressor = clone(base_regressor) 

144 

145 self.is_fitted = False 

146 self.optimal_indices_ = None # Per-case optimal sample indices 

147 self.optimal_params_ = None # Per-case optimal parameter values 

148 

149 def _infer_swept_params(self): 

150 """Infer which coordinates are swept parameters.""" 

151 swept = [] 

152 for coord_name in self.dataset_train.coords: 

153 coord = self.dataset_train.coords[coord_name] 

154 if "sample" in coord.dims and coord_name != "sample": 

155 swept.append(coord_name) 

156 return swept 

157 

158 def _build_regressor(self, name: str, params: dict): 

159 """Build a regressor from name and params.""" 

160 

161 if name == "RandomForest": 

162 return RandomForestRegressor( 

163 **params, random_state=params.get("random_state", 42) 

164 ) 

165 elif name == "Linear": 

166 return LinearRegression() 

167 elif name == "Ridge": 

168 return Ridge(alpha=params.get("alpha", 1.0)) 

169 elif name == "Lasso": 

170 return Lasso(alpha=params.get("alpha", 1.0)) 

171 elif name == "ElasticNet": 

172 return ElasticNet( 

173 alpha=params.get("alpha", 1.0), l1_ratio=params.get("l1_ratio", 0.5) 

174 ) 

175 elif name == "XGB": 

176 return xgb.XGBRegressor(**params) 

177 else: 

178 raise ValueError(f"Unknown regressor '{name}' for LocalParameterPredictor.") 

179 

180 def fit(self): 

181 """ 

182 For each flow case, find optimal parameters, then train 

183 ML model to predict optimal params from features. 

184 """ 

185 n_cases = len(self.dataset_train.case_index) 

186 # n_samples = len(self.dataset_train.sample) 

187 

188 # Find optimal parameters for each case 

189 self.optimal_indices_ = np.zeros(n_cases, dtype=int) 

190 self.optimal_params_ = {p: np.zeros(n_cases) for p in self.swept_params} 

191 

192 for case_idx in range(n_cases): 

193 # Get bias values across all samples for this case 

194 bias_values = ( 

195 self.dataset_train["model_bias_cap"].isel(case_index=case_idx).values 

196 ) 

197 best_sample_idx = int(np.argmin(np.abs(bias_values))) 

198 

199 self.optimal_indices_[case_idx] = best_sample_idx 

200 

201 for param_name in self.swept_params: 

202 if param_name in self.dataset_train.coords: 

203 param_val = float( 

204 self.dataset_train.coords[param_name] 

205 .isel(sample=best_sample_idx) 

206 .values 

207 ) 

208 self.optimal_params_[param_name][case_idx] = param_val 

209 

210 # Build training data for ML model 

211 # Use sample=0 to get feature values (they're the same across samples) 

212 X_df = self.dataset_train.isel(sample=0).to_dataframe().reset_index() 

213 

214 # Check that all features exist 

215 missing_features = [f for f in self.feature_names if f not in X_df.columns] 

216 if missing_features: 

217 raise ValueError(f"Features not found in dataset: {missing_features}") 

218 

219 X = X_df[self.feature_names].values 

220 

221 # Target is optimal parameter values 

222 if len(self.swept_params) == 1: 

223 y = self.optimal_params_[self.swept_params[0]] 

224 else: 

225 y = np.column_stack([self.optimal_params_[p] for p in self.swept_params]) 

226 

227 # Train regressor 

228 self.regressor.fit(X, y) 

229 self.is_fitted = True 

230 

231 return self 

232 

233 def predict(self, X): 

234 """ 

235 Predict optimal parameters for new flow conditions. 

236 

237 Args: 

238 X: Feature matrix (n_cases, n_features) or DataFrame 

239 

240 Returns: 

241 DataFrame with columns for each swept parameter 

242 """ 

243 if not self.is_fitted: 

244 raise RuntimeError("Must call fit() before predict()") 

245 

246 if hasattr(X, "values"): 

247 X = X.values 

248 

249 predictions = self.regressor.predict(X) 

250 

251 if len(self.swept_params) == 1: 

252 predictions = predictions.reshape(-1, 1) 

253 

254 return pd.DataFrame(predictions, columns=self.swept_params) 

255 

256 def get_optimal_indices(self): 

257 """ 

258 Get the optimal sample indices for the training data. 

259 Used by MainPipeline to extract training targets. 

260 """ 

261 if self.optimal_indices_ is None: 

262 raise RuntimeError("Must call fit() first") 

263 return self.optimal_indices_