Skip to content
Snippets Groups Projects
VarSel.py 5.93 KiB
Newer Older
DIANE's avatar
DIANE committed
from Packages import *
from Class_Mod import metrics


class TpeIpls:
    '''
    This framework is added to the clan of wavelengths selection algorithms.It was introduced as an improvement
      to the forward and backward intervall selection algorithms. This framework combines 
      the partial least squares algorithm and the tree-parzen structed estimatior, which is a bayesian optimization algorithm
      that was first introduced in 2011. This combination provides a wrapper method for intervall-PLS.
    This work keeps the integrity of the spectral data. by treating the data as a sequential data rather than using
      descrete optimization (point to point selection)
    '''

    '''Optimization algorithms can be used to find the subset of variables that optimize a certain criterion
      (e.g., maximize predictive performance, minimize overfitting)'''
    SCORE = 10000
    index_export = pd.DataFrame()
    def __init__(self, x_train, x_test, y_train, y_test, scale, Kfold, n_intervall):
        
        TpeIpls.SCORE = 10000
        self.x_train = x_train
        self.x_test = x_test
        self.y_train=  y_train
        self.y_test = y_test
        self.scale = scale
        self.Kfold = Kfold
        self.p = self.x_train.shape[1]
        self.n_intervall = n_intervall
        self.__n_arrets = self.n_intervall*2
        self.PLS_params = {f'v{i}': hp.randint(f'v{i}', 0, self.p) for i in range(1,self.__n_arrets+1)}
        self.PLS_params['n_components'] = hp.randint("n_components", 1, 6)


    def _objective(self, params):
        self.idx = [params[f'v{i}'] for i in range(1,self.__n_arrets+1)]
        self.idx.sort()
        
        arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]

        id = np.unique(np.concatenate(arrays, axis=0), axis=0)
        # Train the model
        try:
            Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
            Model.fit(self.x_train.iloc[:,id], self.y_train)
        except ValueError as ve:
            params["n_components"] = 1
            Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
            Model.fit(self.x_train.iloc[:,id], self.y_train)

        ## make prediction
        yc = Model.predict(self.x_train.iloc[:,id]).ravel()
        ycv = cross_val_predict(Model, self.x_train.iloc[:,id], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
        yt = Model.predict(self.x_test.iloc[:, id]).ravel()

        ### compute r-squared
        r2c = r2_score(self.y_train, yc)
        r2cv = r2_score(self.y_train, ycv)
        r2t = r2_score(self.y_test, yt)
        rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv))
        rmsec = np.sqrt(mean_squared_error(self.y_train, yc))

        score = np.round(rmsecv/rmsec +  rmsecv*100/self.y_train.mean())
        if score < TpeIpls.SCORE-0.5:
            TpeIpls.SCORE = score
            self.nlv = params['n_components'] 

            print('--**-------------##---------#~###~#---------##---------------**--')
            print(f'***** R²train : [{round(r2c * 100)}]**** R²cv : [{round(r2cv * 100)}]**** R²test : [{round(r2t * 100)}]*****')
            print(f'***** N Predictiors : [{len(id)}]   ********   NLV : [{params["n_components"]}]*****')            

            TpeIpls.index_export = pd.DataFrame()
            TpeIpls.index_export["Vars"] = self.x_test.columns[id]
            TpeIpls.index_export.index = id

            # Save model
            #TpeIpls.index_export.to_excel(path + 'variables.xlsx')
            ##3-performance
            metrics(train=(self.y_train, yc), cv=(self.y_train, ycv) , test=(self.y_test, yt)).round(2).to_excel(path + "performance.xlsx")
            self.segments = arrays

            print("''---------------------------- evolution noticed, hence a new model was saved-------------------------------''")
            self.idx = self.idx
        return score

    

    def tune(self, n_iter):
        print('------------------------------------------------  Optimization of the process has started ---------------------------------------------')
        trials = Trials()
        
        best_params = fmin(fn=self._objective,
                           space=self.PLS_params,
                           algo=tpe.suggest,  # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
                           max_evals=n_iter,
                           trials=trials,
                           verbose=2)
    


    @property
    def segments_(self):
        self.bands = {}
        for i in range(len(self.segments)):
            self.bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
        
        bands = pd.DataFrame(self.bands).T
        bands.columns = ['from', 'to']
        return bands
    

    @property
    def tpe_pls_performance(self):
        f = []
        for i in range(self.segments_.shape[0]):
            f.extend(np.arange(self.segments_["from"][i], self.segments_["to"][i]+1))
        variables_idx = list(set(f))
        
        pls = PLSRegression(n_components=self.nlv, scale= self.scale)
        pls.fit(self.x_train.iloc[:,variables_idx], self.y_train)

        self.yc = pls.predict(self.x_train.iloc[:,variables_idx]).ravel()
        self.ycv = cross_val_predict(pls, self.x_train.iloc[:,variables_idx], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
        self.yt = pls.predict(self.x_test.iloc[:,variables_idx]).ravel()
        
        perf = metrics(train=(self.y_train, self.yc), cv=(self.y_train, self.ycv) , test=(self.y_test, self.yt)).round(2)

        return perf

    @property
    def meas_vs_pred(self):
        fig, ax = plt.subplots()
        sns.regplot(x = self.y_train ,y = self.yc, ax = ax)
        sns.regplot(x = self.y_train ,y = self.ycv,ax = ax)
        sns.regplot(x = self.y_test,y = self.yt,ax = ax)
        plt.show()