Skip to content
Snippets Groups Projects
VarSel.py 5.24 KiB
Newer Older
  • Learn to ignore specific revisions
  • from Packages import *
    from Class_Mod import metrics
    
    class TpeIpls:
        '''
        This framework is added to the clan of wavelengths selection algorithms.It was introduced as an improvement
          to the forward and backward intervall selection algorithms. This framework combines 
          the partial least squares algorithm and the tree-parzen structed estimatior, which is a bayesian optimization algorithm
          that was first introduced in 2011. This combination provides a wrapper method for intervall-PLS.
        This work keeps the integrity of the spectral data. by treating the data as a sequential data rather than using
          descrete optimization (point to point selection)
        '''
    
        '''Optimization algorithms can be used to find the subset of variables that optimize a certain criterion
          (e.g., maximize predictive performance, minimize overfitting)'''
        SCORE = 100000000
        index_export = pd.DataFrame()
        def __init__(self, x_train, x_test, y_train, y_test,
                      scale, Kfold, n_intervall):
            TpeIpls.SCORE = 10000
            self.x_train = x_train
            self.x_test = x_test
            self.y_train=  y_train
            self.y_test = y_test
            self.scale = scale
            self.Kfold = Kfold
            self.p = self.x_train.shape[1]
            self.n_intervall = n_intervall
            self.n_arrets = self.n_intervall*2
            self.PLS_params = {f'v{i}': hp.randint(f'v{i}', 0, self.p) for i in range(1,self.n_arrets+1)}
            self.PLS_params['n_components'] = hp.randint("n_components", 1, 10)
    
        def objective(self, params):
            self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)]
            self.idx.sort()
            
            arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
    
            id = np.unique(np.concatenate(arrays, axis=0), axis=0)
            # Train the model
            try:
                Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
                Model.fit(self.x_train.iloc[:,id], self.y_train)
            except ValueError as ve:
                params["n_components"] = 1
                Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
                Model.fit(self.x_train.iloc[:,id], self.y_train)
    
            ## make prediction
            yc = Model.predict(self.x_train.iloc[:,id]).ravel()
            ycv = cross_val_predict(Model, self.x_train.iloc[:,id], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
            yt = Model.predict(self.x_test.iloc[:, id]).ravel()
    
            ### compute r-squared
            #r2c = r2_score(self.y_train, yc)
            #r2cv = r2_score(self.y_train, ycv)
            #r2t = r2_score(self.y_test, yt)
            rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv))
            rmsec = np.sqrt(mean_squared_error(self.y_train, yc))
    
            score = np.round(rmsecv/rmsec +  rmsecv*100/self.y_train.mean())
            if score < TpeIpls.SCORE-0.5:
                TpeIpls.SCORE = score
                self.nlv = params['n_components'] 
    
    
                TpeIpls.index_export = pd.DataFrame()
                TpeIpls.index_export["Vars"] = self.x_test.columns[id]
                TpeIpls.index_export.index = id
    
           
                self.segments = arrays
            return score
    
        
    
    
        ##############################################
    
        def BandSelect(self, n_iter):
            trials = Trials()
            
            best_params = fmin(fn=self.objective,
                               space=self.PLS_params,
                               algo=tpe.suggest,  # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
                               max_evals=n_iter,
                               trials=trials,
                               verbose=0)
    
            ban = {}
            for i in range(len(self.segments)):
                ban[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
            
            self.bands = pd.DataFrame(ban).T
            self.bands.columns = ['from', 'to']
    
    
            f = []
            for i in range(self.bands.shape[0]):
                f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1))
            variables_idx = list(set(f))
    
    
    
            ############################################
            for i in range(self.bands.shape[0]):
                f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1))
            variables_idx = list(set(f))
            
            self.pls = PLSRegression(n_components=self.nlv, scale= self.scale)
            self.pls.fit(self.x_train.iloc[:,variables_idx], self.y_train)
    
            self.yc = self.pls.predict(self.x_train.iloc[:,variables_idx]).ravel()
            self.ycv = cross_val_predict(self.pls, self.x_train.iloc[:,variables_idx], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
            self.yt = self.pls.predict(self.x_test.iloc[:,variables_idx]).ravel()
            
            return self.bands, variables_idx
        
    
        @property
        def model_(self):
            return self.pls
        @property
        def metrics_(self):
            metc = metrics(self.y_train, self.yc)
            metc = metc.evaluate_
    
            metcv = metrics(self.y_train, self.ycv)
            metcv = metcv.evaluate_
    
            mett = metrics( self.y_test, self.yt)
            mett = mett.evaluate_
            
            met = pd.concat([metc, metcv, mett], axis = 0)
            met.index = ['calib','cv','test']
            return met
    
        @property
        def pred_data_(self):
    
            return self.yc, self.ycv, self.yt