Skip to content
Snippets Groups Projects
regress.py 9.86 KiB
Newer Older
DIANE's avatar
DIANE committed
from utils import metrics, Snv, No_transformation, KF_CV, sel_ratio


class Regmodel(object):
DIANE's avatar
DIANE committed
    from hyperopt import fmin, hp, tpe, Trials, space_eval, STATUS_OK, anneal
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 3, **kwargs):
DIANE's avatar
DIANE committed
        from hyperopt import fmin, hp, tpe, Trials, space_eval, STATUS_OK, anneal

DIANE's avatar
DIANE committed
        self.SCORE = 100000000
        self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1]
        self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1]
        self._model, self._best = None, None
        self._yc, self._ycv, self._yt = None, None, None
        self._cv_df = DataFrame()
        self._sel_ratio = DataFrame()
        self._nfolds = nfolds
        self._selected_bands = DataFrame(index = ['from', 'to'])
        self.important_features = None
        self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]),
                            'deriv': hp.choice('deriv', [0, 1, 2]),
                            'window_length': hp.choice('window_length', [15, 21, 27, 33]),
                            'normalization': hp.choice('normalization', ['Snv', 'No_transformation'])}
        if add_hyperparams is not None:
            self._hyper_params.update(add_hyperparams)
            self._best = None

        trials = Trials()
        best_params = fmin(fn=self.objective,
                            space=self._hyper_params,
                            algo=tpe.suggest,  # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
                            max_evals=n_iter,
                            trials=trials,
                            verbose=1)
    
    @property
    def train_data_(self):
        return [self._xc, self._ytrain]
    
    @property
    def test_data_(self):
        return [self._xt, self._ytest]

    @property
    def pretreated_spectra_(self):
        return self.pretreated

    @property
    def get_params_(self):### This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters
       return self._hyper_params
    
    def objective(self, params):
       pass
    
    @property
    def best_hyperparams_(self): ### This method returns the subset of selected hyperparametes
        return self._best
    @property
    def best_hyperparams_print(self):### This method returns a sentence telling what signal preprocessing method was applied
        if self._best['normalization'] == 'Snv':
            a = 'Standard Normal Variate (SNV)'

        elif self._best['normalization'] == 'No_transformation':
            a = " No transformation was performed"

        SG = f'- Savitzky-Golay derivative parameters \:(Window_length:{self._best['window_length']};  polynomial order: {self._best['polyorder']};  Derivative order : {self._best['deriv']})'
        Norm = f'- Spectral Normalization \: {a}'
        return SG+"\n"+Norm
    
    @property
    def model_(self): # This method returns the developed model
        return self._model
    
    @property
    def pred_data_(self): ## this method returns the predicted data in training and testing steps
        return self._yc, self._yt
    
    @property
    def cv_data_(self): ## Cross validation data
        return self._ycv
    
    @property
    def CV_results_(self):
        return self._cv_df
    @property
    def important_features_(self):
        return self.important_features
    @property
    def selected_features_(self):
        return self._selected_bands
    
    @property
    def sel_ratio_(self):
        return self._sel_ratio
  
########################################### PLSR   #########################################
class Plsr(Regmodel):
    def __init__(self, train, test, n_iter = 10, cv = 3):
        super().__init__(train, test, n_iter, nfolds = cv, add_hyperparams = {'n_components': hp.randint('n_components', 1,20)})
        ### parameters in common
        
    def objective(self, params):
        params['n_components'] = int(params['n_components'])
        x0 = [self._xc, self._xt]
        
        x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)]

        a, b, c = params['deriv'], params['polyorder'], params['window_length']
        if a > b or b > c:
            if self._best is not None:
                a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']

            else:
                a, b, c = 0, 0, 1

        params['deriv'], params['polyorder'], params['window_length']  = a, b, c
        x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]

        model = PLSRegression(scale = False, n_components = params['n_components'])
        folds = KF_CV().CV(x = x2[0], y = np.array(self._ytrain), n_folds = self._nfolds)
        yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = x2[0], y = np.array(self._ytrain))
        self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
                
        score = self._cv_df.loc["cv",'rmse']
        
        Model = PLSRegression(scale = False, n_components = params['n_components'])
        Model.fit(x2[0], self._ytrain)

        if self.SCORE > score:
            self.SCORE = score
            self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds)
            self._yc = Model.predict(x2[0])
            self._yt = Model.predict(x2[1])
            self._model = Model
            for key,value in params.items():
                try: params[key] =  int(value)
                except (TypeError, ValueError): params[key] =  value

            self._best = params
            self.pretreated = DataFrame(x2[0])
            self._sel_ratio = sel_ratio(Model, x2[0])
        return score


    ############################################ iplsr #########################################
class TpeIpls(Regmodel):
    def __init__(self, train, test, n_iter = 10, n_intervall = 5, cv = 3):
        self.n_intervall = n_intervall
        self.n_arrets = self.n_intervall*2
        
        
        r = {'n_components': hp.randint('n_components', 1,20)}
        r.update({f'v{i}': hp.randint(f'v{i}', 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)})

        super().__init__(train, test, n_iter, add_hyperparams = r, nfolds = cv)
        
        ### parameters in common
        
    def objective(self, params):
        ### wevelengths index
        self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)]
        self.idx.sort()
        arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
        id = np.unique(np.concatenate(arrays, axis=0), axis=0)

        ### Preprocessing
        x0 = [self._xc, self._xt]
        x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)]

        a, b, c = params['deriv'], params['polyorder'], params['window_length']
        if a > b or b > c:
            if self._best is not None:
                a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']

            else:
                a, b, c = 0, 0, 1

        params['deriv'], params['polyorder'], params['window_length']  = a, b, c
        x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
        
        
        prepared_data = [x2[i][:,id] for i in range(2)]

        
        ### Modelling
        folds = KF_CV().CV(x = prepared_data[0], y = np.array(self._ytrain), n_folds = self._nfolds)
        try:
            model = PLSRegression(scale = False, n_components = params['n_components'])
            yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain))
            self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
        except ValueError as ve:
            params["n_components"] = 1
            model = PLSRegression(scale = False, n_components = params["n_components"])
            yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain))
            self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]


        score = self._cv_df.loc['cv','rmse']
        
        Model = PLSRegression(scale = False, n_components = model.n_components)
        Model.fit(prepared_data[0], self._ytrain)

        if self.SCORE > score:
            self.SCORE = score
            self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds)
            
            self._yc = Model.predict(prepared_data[0])
            self._yt = Model.predict(prepared_data[1])
            self._model = Model
            for key,value in params.items():
                try: params[key] =  int(value)
                except (TypeError, ValueError): params[key] =  value
            self._best = params

            self.pretreated = DataFrame(x2[0])
            self.segments = arrays
            
            for i in range(len(self.segments)):
                self._selected_bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
            self._selected_bands.index = ['from','to']
                
        return score
    

    ###########################################  LWPLSR  #########################################
    ############################################  Pcr  #########################################

class Pcr(Regmodel):
    def __init__(self, train, test, n_iter = 10, n_val = 5):
        super.__init__()
        {f'pc{i}': hp.randint(f'pc{i+1}', 0, train[0].shape[1]) for i in range(self.n_val)}