Skip to content
Snippets Groups Projects
VarSel.py 5.24 KiB
Newer Older
from Packages import *
from Class_Mod import metrics

class TpeIpls:
    '''
    This framework is added to the clan of wavelengths selection algorithms.It was introduced as an improvement
      to the forward and backward intervall selection algorithms. This framework combines 
      the partial least squares algorithm and the tree-parzen structed estimatior, which is a bayesian optimization algorithm
      that was first introduced in 2011. This combination provides a wrapper method for intervall-PLS.
    This work keeps the integrity of the spectral data. by treating the data as a sequential data rather than using
      descrete optimization (point to point selection)
    '''

    '''Optimization algorithms can be used to find the subset of variables that optimize a certain criterion
      (e.g., maximize predictive performance, minimize overfitting)'''
    SCORE = 100000000
    index_export = pd.DataFrame()
    def __init__(self, x_train, x_test, y_train, y_test,
                  scale, Kfold, n_intervall):
        TpeIpls.SCORE = 10000
        self.x_train = x_train
        self.x_test = x_test
        self.y_train=  y_train
        self.y_test = y_test
        self.scale = scale
        self.Kfold = Kfold
        self.p = self.x_train.shape[1]
        self.n_intervall = n_intervall
        self.n_arrets = self.n_intervall*2
        self.PLS_params = {f'v{i}': hp.randint(f'v{i}', 0, self.p) for i in range(1,self.n_arrets+1)}
        self.PLS_params['n_components'] = hp.randint("n_components", 1, 10)

    def objective(self, params):
        self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)]
        self.idx.sort()
        
        arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]

        id = np.unique(np.concatenate(arrays, axis=0), axis=0)
        # Train the model
        try:
            Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
            Model.fit(self.x_train.iloc[:,id], self.y_train)
        except ValueError as ve:
            params["n_components"] = 1
            Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
            Model.fit(self.x_train.iloc[:,id], self.y_train)

        ## make prediction
        yc = Model.predict(self.x_train.iloc[:,id]).ravel()
        ycv = cross_val_predict(Model, self.x_train.iloc[:,id], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
        yt = Model.predict(self.x_test.iloc[:, id]).ravel()

        ### compute r-squared
        #r2c = r2_score(self.y_train, yc)
        #r2cv = r2_score(self.y_train, ycv)
        #r2t = r2_score(self.y_test, yt)
        rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv))
        rmsec = np.sqrt(mean_squared_error(self.y_train, yc))

        score = np.round(rmsecv/rmsec +  rmsecv*100/self.y_train.mean())
        if score < TpeIpls.SCORE-0.5:
            TpeIpls.SCORE = score
            self.nlv = params['n_components'] 


            TpeIpls.index_export = pd.DataFrame()
            TpeIpls.index_export["Vars"] = self.x_test.columns[id]
            TpeIpls.index_export.index = id

       
            self.segments = arrays
        return score

    


    ##############################################

    def BandSelect(self, n_iter):
        trials = Trials()
        
        best_params = fmin(fn=self.objective,
                           space=self.PLS_params,
                           algo=tpe.suggest,  # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
                           max_evals=n_iter,
                           trials=trials,
                           verbose=0)

        ban = {}
        for i in range(len(self.segments)):
            ban[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
        
        self.bands = pd.DataFrame(ban).T
        self.bands.columns = ['from', 'to']


        f = []
        for i in range(self.bands.shape[0]):
            f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1))
        variables_idx = list(set(f))



        ############################################
        for i in range(self.bands.shape[0]):
            f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1))
        variables_idx = list(set(f))
        
        self.pls = PLSRegression(n_components=self.nlv, scale= self.scale)
        self.pls.fit(self.x_train.iloc[:,variables_idx], self.y_train)

        self.yc = self.pls.predict(self.x_train.iloc[:,variables_idx]).ravel()
        self.ycv = cross_val_predict(self.pls, self.x_train.iloc[:,variables_idx], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
        self.yt = self.pls.predict(self.x_test.iloc[:,variables_idx]).ravel()
        
        return self.bands, variables_idx
    

    @property
    def model_(self):
        return self.pls
    @property
    def metrics_(self):
        metc = metrics(self.y_train, self.yc)
        metc = metc.evaluate_

        metcv = metrics(self.y_train, self.ycv)
        metcv = metcv.evaluate_

        mett = metrics( self.y_test, self.yt)
        mett = mett.evaluate_
        
        met = pd.concat([metc, metcv, mett], axis = 0)
        met.index = ['calib','cv','test']
        return met

    @property
    def pred_data_(self):
        return self.yc, self.ycv, self.yt