import numpy as np from pandas import DataFrame from utils.eval_metrics import metrics from scipy.signal import savgol_filter from sklearn.cross_decomposition import PLSRegression from hyperopt import fmin, hp, tpe, Trials, space_eval, STATUS_OK, anneal from utils.data_handling import Snv, No_transformation, KF_CV, sel_ratio class Regmodel(object): def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 3, **kwargs): self.SCORE = 100000000 self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1] self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1] self._model, self._best = None, None self._yc, self._ycv, self._yt = None, None, None self._cv_df = DataFrame() self._sel_ratio = DataFrame() self._nfolds = nfolds self._selected_bands = DataFrame(index = ['from', 'to']) self.important_features = None self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]), 'deriv': hp.choice('deriv', [0, 1, 2]), 'window_length': hp.choice('window_length', [15, 21, 27, 33]), 'normalization': hp.choice('normalization', ['Snv', 'No_transformation'])} if add_hyperparams is not None: self._hyper_params.update(add_hyperparams) self._best = None trials = Trials() best_params = fmin(fn=self.objective, space=self._hyper_params, algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach max_evals=n_iter, trials=trials, verbose=1) @property def train_data_(self): return [self._xc, self._ytrain] @property def test_data_(self): return [self._xt, self._ytest] @property def pretreated_spectra_(self): return self.pretreated @property def get_params_(self):### This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters return self._hyper_params def objective(self, params): pass @property def best_hyperparams_(self): ### This method returns the subset of selected hyperparametes return self._best @property def best_hyperparams_print(self):### This method returns a sentence telling what signal preprocessing method was applied if self._best['normalization'] == 'Snv': a = 'Standard Normal Variate (SNV)' elif self._best['normalization'] == 'No_transformation': a = " No transformation was performed" bb,cc,dd = str(self._best['window_length']), str(self._best['polyorder']),str(self._best['deriv']) SG = '- Savitzky-Golay derivative parameters: \n(Window_length:'+bb+';polynomial order:'+ cc+'; Derivative order : '+ dd Norm = '- Spectral Normalization: \n'+a return SG+"\n"+Norm @property def model_(self): # This method returns the developed model return self._model @property def pred_data_(self): ## this method returns the predicted data in training and testing steps return self._yc, self._yt @property def cv_data_(self): ## Cross validation data return self._ycv @property def CV_results_(self): return self._cv_df @property def important_features_(self): return self.important_features @property def selected_features_(self): return self._selected_bands @property def sel_ratio_(self): return self._sel_ratio ########################################### PLSR ######################################### class Plsr(Regmodel): def __init__(self, train, test, n_iter = 10, cv = 3): super().__init__(train, test, n_iter, nfolds = cv, add_hyperparams = {'n_components': hp.randint('n_components', 1,20)}) ### parameters in common def objective(self, params): params['n_components'] = int(params['n_components']) x0 = [self._xc, self._xt] x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)] a, b, c = params['deriv'], params['polyorder'], params['window_length'] if a > b or b > c: if self._best is not None: a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length'] else: a, b, c = 0, 0, 1 params['deriv'], params['polyorder'], params['window_length'] = a, b, c x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)] model = PLSRegression(scale = False, n_components = params['n_components']) folds = KF_CV().CV(x = x2[0], y = np.array(self._ytrain), n_folds = self._nfolds) yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = x2[0], y = np.array(self._ytrain)) self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] score = self._cv_df.loc["cv",'rmse'] Model = PLSRegression(scale = False, n_components = params['n_components']) Model.fit(x2[0], self._ytrain) if self.SCORE > score: self.SCORE = score self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds) self._yc = Model.predict(x2[0]) self._yt = Model.predict(x2[1]) self._model = Model for key,value in params.items(): try: params[key] = int(value) except (TypeError, ValueError): params[key] = value self._best = params self.pretreated = DataFrame(x2[0]) self._sel_ratio = sel_ratio(Model, x2[0]) return score ############################################ iplsr ######################################### class TpeIpls(Regmodel): def __init__(self, train, test, n_iter = 10, n_intervall = 5, cv = 3): self.n_intervall = n_intervall self.n_arrets = self.n_intervall*2 r = {'n_components': hp.randint('n_components', 1,20)} r.update({f'v{i}': hp.randint(f'v{i}', 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)}) super().__init__(train, test, n_iter, add_hyperparams = r, nfolds = cv) ### parameters in common def objective(self, params): ### wevelengths index self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)] self.idx.sort() arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)] id = np.unique(np.concatenate(arrays, axis=0), axis=0) ### Preprocessing x0 = [self._xc, self._xt] x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)] a, b, c = params['deriv'], params['polyorder'], params['window_length'] if a > b or b > c: if self._best is not None: a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length'] else: a, b, c = 0, 0, 1 params['deriv'], params['polyorder'], params['window_length'] = a, b, c x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)] prepared_data = [x2[i][:,id] for i in range(2)] ### Modelling folds = KF_CV().CV(x = prepared_data[0], y = np.array(self._ytrain), n_folds = self._nfolds) try: model = PLSRegression(scale = False, n_components = params['n_components']) yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain)) self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] except ValueError as ve: params["n_components"] = 1 model = PLSRegression(scale = False, n_components = params["n_components"]) yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain)) self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] score = self._cv_df.loc['cv','rmse'] Model = PLSRegression(scale = False, n_components = model.n_components) Model.fit(prepared_data[0], self._ytrain) if self.SCORE > score: self.SCORE = score self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds) self._yc = Model.predict(prepared_data[0]) self._yt = Model.predict(prepared_data[1]) self._model = Model for key,value in params.items(): try: params[key] = int(value) except (TypeError, ValueError): params[key] = value self._best = params self.pretreated = DataFrame(x2[0]) self.limits = np.ones(len(arrays)*2) for i in range(len(arrays)): self.limits[2*i], self.limits[2*i+1] = arrays[i][0], arrays[i][arrays[i].shape[0]-1] # for i in range(len(self.segments)): # self._selected_bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]] # self.limits = self.limits+[self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]] # self._selected_bands.index = ['from','to'] # slices = [] # for i in range(int(len(self.limits)/2)): # a = self.limits[2*i] # b = self.limits[2*i+1] # st.write(a,b) # # slices.append(self.pretreated[:, a:b]) # self.pretreated_selected = np.hstack(slices) return score # @property # def selected_limits_(self): ########################################### LWPLSR ######################################### class LwplsObject: def __init__(self, Reg_json = None, pred = None): if Reg_json is not None and pred is not None: from pandas import json_normalize self.model_ = Reg_json['model'] self.best_hyperparams_ = Reg_json['best_lwplsr_params'] self.pred_data_ = [json_normalize(Reg_json[i]) for i in pred] ############################################ Pcr ######################################### class Pcr(Regmodel): def __init__(self, train, test, n_iter = 10, n_val = 5): super.__init__() {f'pc{i}': hp.randint(f'pc{i+1}', 0, train[0].shape[1]) for i in range(self.n_val)}