Skip to content
Snippets Groups Projects
RegModels.py 8.51 KiB
Newer Older
  • Learn to ignore specific revisions
  • DIANE's avatar
    DIANE committed
    from Packages import *
    from Class_Mod import metrics, Snv, No_transformation, KF_CV
    
    
    class Regmodel(object):
        
    
    DIANE's avatar
    DIANE committed
        def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 5, **kwargs):
    
    DIANE's avatar
    DIANE committed
            self.SCORE = 100000000
            self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1]
            self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1]
            self._model, self._best = None, None
            self._yc, self._ycv, self._yt = None, None, None
            self._cv_df = pd.DataFrame
            self._nfolds = nfolds
    
            self._selected_bands = pd.DataFrame(index = ['from', 'to'])
            self.important_features = None
    
    DIANE's avatar
    DIANE committed
            self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]),
                                'deriv': hp.choice('deriv', [0, 1, 2]),
                                'window_length': hp.choice('window_length', [15, 21, 27, 33]),
                                'scatter': hp.choice('scatter', ['Snv', 'No_transformation'])}
            if add_hyperparams is not None:
                self._hyper_params.update(add_hyperparams)
                self._best = None
    
            trials = Trials()
            best_params = fmin(fn=self.objective,
                                space=self._hyper_params,
                                algo=tpe.suggest,  # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
                                max_evals=n_iter,
                                trials=trials,
                                verbose=1)
        
        @property
        def train_data_(self):
            return [self._xc, self._ytrain]
        
        @property
        def test_data_(self):
            return [self._xt, self._ytest]
    
    
        @property
        def pretreated_spectra_(self):
            return self.pretreated
    
    
    DIANE's avatar
    DIANE committed
        @property
        def get_params_(self):
           return self._hyper_params
        
        def objective(self, params):
           pass
        
        @property
    
        def best_hyperparams_(self):
            return self._best
        @property
        def best_hyperparams_print(self):
            if self._best['scatter'] == 'Snv':
                a = 'Standard Normal Variate (SNV)'
    
            elif self._best['scatter'] == 'No_transformation':
                a = " No transformation was performed"
    
    
    DIANE's avatar
    DIANE committed
            SG = f'- Savitzky-Golay derivative parameters \:(Window_length:{self._best['window_length']};  polynomial order: {self._best['polyorder']};  Derivative order : {self._best['deriv']})'
    
            Norm = f'- Spectral Normalization \: {a}'
    
    DIANE's avatar
    DIANE committed
        
        @property
        def model_(self):
            return self._model
        
        @property
        def pred_data_(self):
            return self._yc, self._yt
        
        @property
        def cv_data_(self):
            return self._ycv
        
        @property
        def CV_results_(self):
            return self._cv_df
        @property
        def important_features_(self):
            return self.important_features
    
        @property
        def selected_features_(self):
            return self._selected_bands
    
    DIANE's avatar
    DIANE committed
      
    ###########################################    #########################################
    class Plsr(Regmodel):
    
    DIANE's avatar
    DIANE committed
        def __init__(self, train, test, n_iter = 10):
    
    DIANE's avatar
    DIANE committed
            super().__init__(train, test, n_iter, add_hyperparams = {'n_components': hp.randint('n_components', 2,20)})
            ### parameters in common
            
        def objective(self, params):
            x0 = [self._xc, self._xt]
            
            x1 = [eval(str(params['scatter'])+"(x0[i])") for i in range(2)]
    
            a, b, c = params['deriv'], params['polyorder'], params['window_length']
            if a > b or b > c:
                if self._best is not None:
                    a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']
    
                else:
                    a, b, c = 0, 0, 1
    
            params['deriv'], params['polyorder'], params['window_length']  = a, b, c
            x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
    
            Model = PLSRegression(scale = False, n_components = params['n_components'])
            self._cv_df = KF_CV().process(model = Model, x = x2[0], y = self._ytrain, n_folds = self._nfolds)
            self._cv_df['Average'] = self._cv_df.mean(axis = 1)
            self._cv_df['S'] = self._cv_df.std(axis = 1)
            self._cv_df['CV(%)'] = self._cv_df['S'] * 100 / self._cv_df['Average']
            self._cv_df = self._cv_df.T.round(2)
            score = self._cv_df.loc["CV(%)",'rmse']
            
            Model = PLSRegression(scale = False, n_components = params['n_components'])
            Model.fit(x2[0], self._ytrain)
    
            if self.SCORE > score:
                self.SCORE = score
                self._ycv = KF_CV().cross_val_predictor(model = Model, x = x2[0], y = self._ytrain, n_folds = self._nfolds)
                self._yc = Model.predict(x2[0])
                self._yt = Model.predict(x2[1])
                self._model = Model
                self._best = params
    
                self.pretreated = pd.DataFrame(x2[0])
    
    DIANE's avatar
    DIANE committed
            return score
    
    
        ############################################    #########################################
    class TpeIpls(Regmodel):
    
    DIANE's avatar
    DIANE committed
        def __init__(self, train, test, n_iter = 10, n_intervall = 5):
    
    DIANE's avatar
    DIANE committed
            self.n_intervall = n_intervall
            self.n_arrets = self.n_intervall*2
    
    DIANE's avatar
    DIANE committed
            
            r = {'n_components': hp.randint('n_components', 2,20)}
            r.update({f'v{i}': hp.randint(f'v{i}', 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)})
    
            super().__init__(train, test, n_iter, add_hyperparams = r)
    
    DIANE's avatar
    DIANE committed
            ### parameters in common
            
        def objective(self, params):
            ### wevelengths index
            self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)]
            self.idx.sort()
            arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
            id = np.unique(np.concatenate(arrays, axis=0), axis=0)
    
            # ## Preprocessing
            x0 = [self._xc, self._xt]
            x1 = [eval(str(params['scatter'])+"(x0[i])") for i in range(2)]
    
            a, b, c = params['deriv'], params['polyorder'], params['window_length']
            if a > b or b > c:
                if self._best is not None:
                    a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']
    
                else:
                    a, b, c = 0, 0, 1
    
            params['deriv'], params['polyorder'], params['window_length']  = a, b, c
            x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
            # print(x2)
    
    DIANE's avatar
    DIANE committed
            # ## Modelling
    
            try:
                Model = PLSRegression(scale = False, n_components = params['n_components'])
                self._cv_df = KF_CV().process(model = Model, x = x2[0][:,id], y = self._ytrain, n_folds = self._nfolds)
            except ValueError as ve:
                params["n_components"] = 1
                Model = PLSRegression(scale = False, n_components = params['n_components'])
                self._cv_df = KF_CV().process(model = Model, x = x2[0][:,id], y = self._ytrain, n_folds = self._nfolds)
    
    
    DIANE's avatar
    DIANE committed
            self._cv_df['Average'] = self._cv_df.mean(axis = 1)
            self._cv_df['S'] = self._cv_df.std(axis = 1)
            self._cv_df['CV(%)'] = self._cv_df['S'] * 100 / self._cv_df['Average']
            self._cv_df = self._cv_df.T.round(2)
            score = self._cv_df.loc['CV(%)','rmse']
            
            Model = PLSRegression(scale = False, n_components = params['n_components'])
            Model.fit(x2[0][:,id], self._ytrain)
    
            if self.SCORE > score:
                self.SCORE = score
                self._ycv = KF_CV().cross_val_predictor(model = Model, x = x2[0][:,id], y = self._ytrain, n_folds = self._nfolds)
                self._yc = Model.predict(x2[0][:,id])
                self._yt = Model.predict(x2[1][:,id])
                self._model = Model
                self._best = params
    
                self.pretreated = pd.DataFrame(x2[0])
    
    DIANE's avatar
    DIANE committed
                self.segments = arrays
                
                for i in range(len(self.segments)):
    
                    self._selected_bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
                self._selected_bands.index = ['from','to']
    
    DIANE's avatar
    DIANE committed
                    
            return score
        
        ############################################    #########################################
    
    class Pcr(Regmodel):
    
    DIANE's avatar
    DIANE committed
        def __init__(self, train, test, n_iter = 10, n_val = 5):
    
    DIANE's avatar
    DIANE committed
            super.__init__()
            {f'pc{i}': hp.randint(f'pc{i+1}', 0, train[0].shape[1]) for i in range(self.n_val)}