from .lwplsr_julia_converted import lwpls import streamlit as st import numpy as np from pandas import DataFrame from utils.eval_metrics import metrics from scipy.signal import savgol_filter from sklearn.cross_decomposition import PLSRegression from hyperopt import fmin, hp, tpe, Trials, space_eval, STATUS_OK, anneal from utils.data_handling import Snv, No_transformation, KF_CV, sel_ratio class Regmodel(object): def __init__(self, train, test, n_iter, add_hyperparams=None, remove_hyperparams=None, nfolds=3, **kwargs): self.SCORE = 100000000 self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1] self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1] self._model, self._best = None, None self._yc, self._ycv, self._yt = None, None, None self._cv_df = DataFrame() self._sel_ratio = DataFrame() self._nfolds = nfolds self._selected_bands = DataFrame(index=['from', 'to']) self.important_features = None if self._xc.shape[1] > 1000: a = [15, 21, 27, 33] else: a = [5, 7, 9] self._hyper_params = {'polyorder': hp.choice('polyorder', [2]), 'deriv': hp.choice('deriv', [0, 1]), # [15, 21, 27, 33] 'window_length': hp.choice('window_length', [9]), 'normalization': hp.choice('normalization', ['No_transformation'])} if remove_hyperparams is not None: for i in remove_hyperparams: self._hyper_params.pop(i, None) if add_hyperparams is not None: self._hyper_params.update(add_hyperparams) self._best = None trials = Trials() best_params = fmin(fn=self.objective, space=self._hyper_params, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach algo=tpe.suggest, max_evals=n_iter, trials=trials, verbose=1) @property def train_data_(self): return [self._xc, self._ytrain] @property def test_data_(self): return [self._xt, self._ytest] @property def pretreated_spectra_(self): return self.pretreated @property def get_params_(self): # This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters return self._hyper_params def objective(self, params): pass @property # This method returns the subset of selected hyperparametes def best_hyperparams_(self): return self._best @property # This method returns a sentence telling what signal preprocessing method was applied def best_hyperparams_print(self): if self._best['normalization'] == 'Snv': a = 'Standard Normal Variate (SNV)' elif self._best['normalization'] == 'No_transformation': a = " No transformation was performed" bb, cc, dd = str(self._best['window_length']), str( self._best['polyorder']), str(self._best['deriv']) SG = '- Savitzky-Golay derivative parameters: \n(Window_length:' + \ bb+';polynomial order:' + cc+'; Derivative order : ' + dd Norm = '- Spectral Normalization: \n'+a return SG+"\n"+Norm @property def model_(self): # This method returns the developed model return self._model @property def pred_data_(self): # this method returns the predicted data in training and testing steps return self._yc, self._yt @property def cv_data_(self): # Cross validation data return self._ycv @property def CV_results_(self): return self._cv_df @property def important_features_(self): return self.important_features @property def selected_features_(self): return self._selected_bands @property def sel_ratio_(self): return self._sel_ratio ########################################### PLSR ######################################### class Plsr(Regmodel): def __init__(self, train, test, n_iter=10, cv=3): super().__init__(train, test, n_iter, nfolds=cv, add_hyperparams={ 'n_components': hp.randint('n_components', 1, 20)}) # parameters in common def objective(self, params): params['n_components'] = int(params['n_components']) x0 = [self._xc, self._xt] x1 = [] x1.append(eval(str(params['normalization'])+'(x0[0])')) x1.append(eval(str(params['normalization'])+'(x0[1])')) a, b, c = params['deriv'], params['polyorder'], params['window_length'] if a > b or b > c: if self._best is not None: a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length'] else: a, b, c = 0, 0, 1 params['deriv'], params['polyorder'], params['window_length'] = a, b, c x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length=params['window_length']) for i in range(2)] model = PLSRegression(scale=False, n_components=params['n_components']) folds = KF_CV().CV(x=x2[0], y=np.array( self._ytrain), n_folds=self._nfolds) yp = KF_CV().cross_val_predictor( model=model, folds=folds, x=x2[0], y=np.array(self._ytrain)) self._cv_df = KF_CV().metrics_cv( y=np.array(self._ytrain), ypcv=yp, folds=folds)[1] score = self._cv_df.loc["mean", 'rmse'] / \ np.max([0.01, self._cv_df.loc["mean", 'r2']]) Model = PLSRegression(scale=False, n_components=params['n_components']) Model.fit(x2[0], self._ytrain) if self.SCORE > score: self.SCORE = score self._ycv = KF_CV().meas_pred_eq(y=np.array(self._ytrain), ypcv=yp, folds=folds) self._yc = Model.predict(x2[0]) self._yt = Model.predict(x2[1]) self._model = Model for key, value in params.items(): try: params[key] = int(value) except (TypeError, ValueError): params[key] = value self._best = params self.pretreated = DataFrame(x2[0]) self._sel_ratio = sel_ratio(Model, x2[0]) return score ############################################ iplsr ######################################### class TpeIpls(Regmodel): def __init__(self, train, test, n_iter=10, n_intervall=5, cv=3, bestglobalparams=None): self.glob = bestglobalparams self._best = {} self.folds = KF_CV().CV(x=np.array( train[0]), y=np.array(train[1]), n_folds=3) x1 = [eval(str(self.glob['normalization'])+'(train[0])'), eval(str(self.glob['normalization'])+'(test[0])')] self.x2 = [savgol_filter(x1[i], polyorder=self.glob['polyorder'], deriv=self.glob['deriv'], window_length=self.glob['window_length']) for i in range(2)] self.n_intervall = n_intervall self.n_arrets = self.n_intervall*2 add = {'n_components': hp.randint('n_components', 1, 20)} add.update({'v'+str(i): hp.randint('v'+str(i), 0, train[0].shape[1]) for i in range(1, self.n_arrets+1)}) super().__init__(train, test, n_iter, nfolds=cv, add_hyperparams=add) # parameters in common def objective(self, params): # wevelengths index self.idx = [params['v'+str(i)] for i in range(1, self.n_arrets+1)] self.idx.sort() arrays = [np.arange(self.idx[2*i], self.idx[2*i+1]+1) for i in range(self.n_intervall)] id = np.unique(np.concatenate(arrays, axis=0), axis=0) prepared_data = [self.x2[i][:, id] for i in range(2)] # Modelling folds = KF_CV().CV(x=prepared_data[0], y=np.array( self._ytrain), n_folds=self._nfolds) try: model = PLSRegression( scale=False, n_components=params['n_components']) yp = KF_CV().cross_val_predictor(model=model, folds=folds, x=prepared_data[0], y=np.array(self._ytrain)) self._cv_df = KF_CV().metrics_cv( y=np.array(self._ytrain), ypcv=yp, folds=folds)[1] except: params["n_components"] = 1 model = PLSRegression( scale=False, n_components=params["n_components"]) yp = KF_CV().cross_val_predictor(model=model, folds=folds, x=prepared_data[0], y=np.array(self._ytrain)) self._cv_df = KF_CV().metrics_cv( y=np.array(self._ytrain), ypcv=yp, folds=folds)[1] score = self._cv_df.loc["mean", 'rmse'] / \ np.max([0.01, self._cv_df.loc["mean", 'r2']]) if self.SCORE > score: self.SCORE = score self._best = params self.arrays = arrays self.prepared_data = prepared_data self.model = model return score def best_fit(self): Model = PLSRegression( scale=False, n_components=self.model.n_components) Model.fit(self.prepared_data[0], self._ytrain) self._yc = Model.predict(self.prepared_data[0]) yp = KF_CV().cross_val_predictor(model=Model, folds=self.folds, x=self.prepared_data[0], y=np.array(self._ytrain)) self._ycv = KF_CV().meas_pred_eq(y=np.array( self._ytrain), ypcv=yp, folds=self.folds) self._yt = Model.predict(self.prepared_data[1]) self._model = Model for key, value in self._best.items(): try: self._best[key] = int(value) except (TypeError, ValueError): self._best[key] = value self.pretreated = DataFrame(self.x2[0]) limits = np.ones(len(self.arrays)*2) for i in range(len(self.arrays)): limits[2*i], limits[2*i + 1] = self.arrays[i][0], self.arrays[i][self.arrays[i].shape[0]-1] self.limits = limits.astype(int) ########################################### LWPLSR ######################################### class LwplsObject: def __init__(self, Reg_json=None, pred=None): if Reg_json is not None and pred is not None: from pandas import json_normalize self.model_ = Reg_json['model'] self.best_hyperparams_ = Reg_json['best_lwplsr_params'] self.pred_data_ = [json_normalize(Reg_json[i]) for i in pred] ############################################ Pcr ######################################### class LWPLS(Regmodel): def __init__(self, train, test, n_iter=10, cv=3, bestglobalparams=None): self.glob = bestglobalparams self._best = {} add = { 'localplsVL': hp.randint('localplsVL', 2, bestglobalparams['n_components']), 'dist': hp.choice('dist', ['euc', 'mah']), 'h': hp.randint('h', 1, 3)} self.folds = KF_CV().CV(x=np.array( train[0]), y=np.array(train[1]), n_folds=3) x1 = [eval(str(self.glob['normalization'])+'(train[0])'), eval(str(self.glob['normalization'])+'(test[0])')] self.x2 = [savgol_filter(x1[i], polyorder=self.glob['polyorder'], deriv=self.glob['deriv'], window_length=self.glob['window_length']) for i in range(2)] super().__init__(train, test, n_iter, nfolds=cv, add_hyperparams=add, remove_hyperparams=None) def objective(self, params): yp = {} for i in self.folds.keys(): yp[i] = lwpls(Xtrain=np.delete(np.array(self.x2[0]), self.folds[i], axis=0), ytrain=np.delete( np.array(self._ytrain), self.folds[i], axis=0), Xtest=np.array(self.x2[0])[self.folds[i]], globalplsVL=self.glob['n_components'], metric=params['dist'], h=params['h'], k=200, localplsVL=params['localplsVL'], center=True, scale=False, sklearn=True).ravel() self._cv_df = KF_CV().metrics_cv(y=np.array( self._ytrain), ypcv=yp, folds=self.folds)[1] score = self._cv_df.loc["mean", 'rmse'] / \ np.max([0.01, self._cv_df.loc["mean", 'r2']]) if self.SCORE > score: self.SCORE = score self._best = params return score def best_fit(self): from .lwplsr_julia_converted import lwpls yp = {} for i in self.folds.keys(): yp[i] = lwpls(Xtrain=np.delete(np.array(self.x2[0]), self.folds[i], axis=0), ytrain=np.delete( np.array(self._ytrain), self.folds[i], axis=0), Xtest=np.array(self.x2[0])[self.folds[i]], globalplsVL=self.glob['n_components'], metric=self._best['dist'], h=self._best['h'], k=200, localplsVL=self._best['localplsVL'], center=True, scale=False, sklearn=True).ravel() self._ycv = KF_CV().meas_pred_eq(y=np.array( self._ytrain), ypcv=yp, folds=self.folds) self._yt = lwpls(Xtrain=np.array(self.x2[0]), ytrain=np.array(self._ytrain), Xtest=np.array(self.x2[1]), globalplsVL=self.glob['n_components'], metric=self._best['dist'], h=self._best['h'], k=200, localplsVL=self._best['localplsVL'], center=True, scale=False, sklearn=True).ravel() self.pretreated = DataFrame(self.x2[0]) self._model = "LW-PLS" for key, value in self._best.items(): self._best[key] = int(value) if isinstance(value, np.int64) else float( value) if isinstance(value, np.float64) else value self._model = {'globalplsVL': self.glob['n_components'], 'localplsVL': self._best['localplsVL'], 'dist': self._best['dist'], 'k': 200, 'h': self._best['h']} self._best = {'normalization':self.glob['normalization'], 'polyorder':self.glob['polyorder'], 'window_length':self.glob['window_length'], 'deriv':self.glob['deriv'], 'globalplsVL': self.glob['n_components']}