Skip to content
Snippets Groups Projects
RegModels.py 9.12 KiB
Newer Older
DIANE's avatar
DIANE committed
from Packages import *
DIANE's avatar
DIANE committed
from Class_Mod import metrics, Snv, No_transformation, KF_CV, sel_ratio
DIANE's avatar
DIANE committed


class Regmodel(object):
    
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 5, **kwargs):
DIANE's avatar
DIANE committed
        self.SCORE = 100000000
        self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1]
        self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1]
        self._model, self._best = None, None
        self._yc, self._ycv, self._yt = None, None, None
DIANE's avatar
DIANE committed
        self._cv_df = pd.DataFrame()
        self._sel_ratio = pd.DataFrame()
DIANE's avatar
DIANE committed
        self._nfolds = nfolds
        self._selected_bands = pd.DataFrame(index = ['from', 'to'])
        self.important_features = None
DIANE's avatar
DIANE committed
        self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]),
                            'deriv': hp.choice('deriv', [0, 1, 2]),
                            'window_length': hp.choice('window_length', [15, 21, 27, 33]),
DIANE's avatar
DIANE committed
                            'normalization': hp.choice('normalization', ['Snv', 'No_transformation'])}
DIANE's avatar
DIANE committed
        if add_hyperparams is not None:
            self._hyper_params.update(add_hyperparams)
            self._best = None

        trials = Trials()
        best_params = fmin(fn=self.objective,
                            space=self._hyper_params,
                            algo=tpe.suggest,  # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
                            max_evals=n_iter,
                            trials=trials,
                            verbose=1)
    
    @property
    def train_data_(self):
        return [self._xc, self._ytrain]
    
    @property
    def test_data_(self):
        return [self._xt, self._ytest]

    @property
    def pretreated_spectra_(self):
        return self.pretreated

DIANE's avatar
DIANE committed
    @property
    def get_params_(self):
       return self._hyper_params
    
    def objective(self, params):
       pass
    
    @property
    def best_hyperparams_(self):
        return self._best
    @property
    def best_hyperparams_print(self):
DIANE's avatar
DIANE committed
        if self._best['normalization'] == 'Snv':
DIANE's avatar
DIANE committed
        elif self._best['normalization'] == 'No_transformation':
            a = " No transformation was performed"

DIANE's avatar
DIANE committed
        SG = f'- Savitzky-Golay derivative parameters \:(Window_length:{self._best['window_length']};  polynomial order: {self._best['polyorder']};  Derivative order : {self._best['deriv']})'
        Norm = f'- Spectral Normalization \: {a}'
DIANE's avatar
DIANE committed
    
    @property
    def model_(self):
        return self._model
    
    @property
    def pred_data_(self):
        return self._yc, self._yt
    
    @property
    def cv_data_(self):
        return self._ycv
    
    @property
    def CV_results_(self):
        return self._cv_df
    @property
    def important_features_(self):
        return self.important_features
    @property
    def selected_features_(self):
        return self._selected_bands
DIANE's avatar
DIANE committed
    
    @property
    def sel_ratio_(self):
        return self._sel_ratio
DIANE's avatar
DIANE committed
  
###########################################    #########################################
class Plsr(Regmodel):
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter = 10):
DIANE's avatar
DIANE committed
        super().__init__(train, test, n_iter, add_hyperparams = {'n_components': hp.randint('n_components', 2,20)})
        ### parameters in common
        
    def objective(self, params):
        params['n_components'] = int(params['n_components'])
DIANE's avatar
DIANE committed
        x0 = [self._xc, self._xt]
        
DIANE's avatar
DIANE committed
        x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)]
DIANE's avatar
DIANE committed

        a, b, c = params['deriv'], params['polyorder'], params['window_length']
        if a > b or b > c:
            if self._best is not None:
                a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']

            else:
                a, b, c = 0, 0, 1

        params['deriv'], params['polyorder'], params['window_length']  = a, b, c
        x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]

        Model = PLSRegression(scale = False, n_components = params['n_components'])
        self._cv_df = KF_CV().process(model = Model, x = x2[0], y = self._ytrain, n_folds = self._nfolds)
        self._cv_df['Average'] = self._cv_df.mean(axis = 1)
        self._cv_df['S'] = self._cv_df.std(axis = 1)
        self._cv_df['CV(%)'] = self._cv_df['S'] * 100 / self._cv_df['Average']
        self._cv_df = self._cv_df.T.round(2)
        score = self._cv_df.loc["CV(%)",'rmse']
        
        Model = PLSRegression(scale = False, n_components = params['n_components'])
        Model.fit(x2[0], self._ytrain)

        if self.SCORE > score:
            self.SCORE = score
            self._ycv = KF_CV().cross_val_predictor(model = Model, x = x2[0], y = self._ytrain, n_folds = self._nfolds)
            self._yc = Model.predict(x2[0])
            self._yt = Model.predict(x2[1])
            self._model = Model
            for key,value in params.items():
                try: params[key] =  int(value)
                except (TypeError, ValueError): params[key] =  value

DIANE's avatar
DIANE committed
            self._best = params
            self.pretreated = pd.DataFrame(x2[0])
DIANE's avatar
DIANE committed
            self._sel_ratio = sel_ratio(Model, x2[0])
DIANE's avatar
DIANE committed
        return score


    ############################################    #########################################
class TpeIpls(Regmodel):
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter = 10, n_intervall = 5):
DIANE's avatar
DIANE committed
        self.n_intervall = n_intervall
        self.n_arrets = self.n_intervall*2
DIANE's avatar
DIANE committed
        
        r = {'n_components': hp.randint('n_components', 2,20)}
        r.update({f'v{i}': hp.randint(f'v{i}', 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)})

        super().__init__(train, test, n_iter, add_hyperparams = r)
DIANE's avatar
DIANE committed
        ### parameters in common
        
    def objective(self, params):
        ### wevelengths index
        self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)]
        self.idx.sort()
        arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
        id = np.unique(np.concatenate(arrays, axis=0), axis=0)

        # ## Preprocessing
        x0 = [self._xc, self._xt]
DIANE's avatar
DIANE committed
        x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)]
DIANE's avatar
DIANE committed

        a, b, c = params['deriv'], params['polyorder'], params['window_length']
        if a > b or b > c:
            if self._best is not None:
                a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']

            else:
                a, b, c = 0, 0, 1

        params['deriv'], params['polyorder'], params['window_length']  = a, b, c
        x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
        # print(x2)
DIANE's avatar
DIANE committed
        # ## Modelling
        try:
            Model = PLSRegression(scale = False, n_components = params['n_components'])
            self._cv_df = KF_CV().process(model = Model, x = x2[0][:,id], y = self._ytrain, n_folds = self._nfolds)
        except ValueError as ve:
            params["n_components"] = 1
            Model = PLSRegression(scale = False, n_components = params['n_components'])
            self._cv_df = KF_CV().process(model = Model, x = x2[0][:,id], y = self._ytrain, n_folds = self._nfolds)

DIANE's avatar
DIANE committed
        self._cv_df['Average'] = self._cv_df.mean(axis = 1)
        self._cv_df['S'] = self._cv_df.std(axis = 1)
        self._cv_df['CV(%)'] = self._cv_df['S'] * 100 / self._cv_df['Average']
        self._cv_df = self._cv_df.T.round(2)
        score = self._cv_df.loc['CV(%)','rmse']
        
        Model = PLSRegression(scale = False, n_components = params['n_components'])
        Model.fit(x2[0][:,id], self._ytrain)

        if self.SCORE > score:
            self.SCORE = score
            self._ycv = KF_CV().cross_val_predictor(model = Model, x = x2[0][:,id], y = self._ytrain, n_folds = self._nfolds)
            self._yc = Model.predict(x2[0][:,id])
            self._yt = Model.predict(x2[1][:,id])
            self._model = Model
            for key,value in params.items():
                try: params[key] =  int(value)
                except (TypeError, ValueError): params[key] =  value
DIANE's avatar
DIANE committed
            self._best = params
            self.pretreated = pd.DataFrame(x2[0])
DIANE's avatar
DIANE committed
            self.segments = arrays
            
            for i in range(len(self.segments)):
                self._selected_bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
            self._selected_bands.index = ['from','to']
DIANE's avatar
DIANE committed
                
        return score
    
    ############################################    #########################################

class Pcr(Regmodel):
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter = 10, n_val = 5):
DIANE's avatar
DIANE committed
        super.__init__()
        {f'pc{i}': hp.randint(f'pc{i+1}', 0, train[0].shape[1]) for i in range(self.n_val)}