from Packages import * from Class_Mod import metrics from Class_Mod import * from scipy.signal import savgol_filter class TpeIpls: ''' This framework is added to the clan of wavelengths selection algorithms.It was introduced as an improvement to the forward and backward intervall selection algorithms. This framework combines the partial least squares algorithm and the tree-parzen structed estimatior, which is a bayesian optimization algorithm that was first introduced in 2011. This combination provides a wrapper method for intervall-PLS. This work keeps the integrity of the spectral data. by treating the data as a sequential data rather than using descrete optimization (point to point selection) ''' '''Optimization algorithms can be used to find the subset of variables that optimize a certain criterion (e.g., maximize predictive performance, minimize overfitting)''' SCORE = 100000000 index_export = pd.DataFrame() def __init__(self, x_train, x_test, y_train, y_test, scale, Kfold, n_intervall): TpeIpls.SCORE = 10000 self.xtrain = x_train self.xtest = x_test self.y_train= y_train self.y_test = y_test self.scale = scale self.Kfold = Kfold self.p = self.xtrain.shape[1] self.n_intervall = n_intervall self.n_arrets = self.n_intervall*2 self.PLS_params = {f'v{i}': hp.randint(f'v{i}', 0, self.p) for i in range(1,self.n_arrets+1)} self.PLS_params['n_components'] = hp.randint("n_components", 1, 10) self.PLS_params['Preprocess'] = {'Scatter':hp.choice('Scatter',['Snv', None]), 'window_length_sg':hp.choice('window_length_sg', [9, 13, 17, 21]), 'polyorder_sg':hp.choice('polyorder_sg',[2]), 'deriv_sg':hp.choice('deriv_sg', [1])} def objective(self, params): self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)] self.idx.sort() arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)] id = np.unique(np.concatenate(arrays, axis=0), axis=0) ## first preprocessing method if params['Preprocess']['Scatter'] =='Snv': xtrain1 = Snv(self.xtrain) xtest1 = Snv(self.xtest) else: xtrain1 = self.xtrain xtest1 = self.xtest ## Second first preprocessing method if params['Preprocess']['deriv_sg'] > params['Preprocess']['polyorder_sg'] or params['Preprocess']['polyorder_sg'] > params['Preprocess']['window_length_sg']: params['Preprocess']['deriv_sg'] = 0 params['Preprocess']['polyorder_sg'] = 0 params['Preprocess']['window_length_sg'] = 1 pt = params['Preprocess'] self.x_train = pd.DataFrame(eval(f"savgol_filter(xtrain1, polyorder=pt['deriv_sg'], deriv=pt['deriv_sg'], window_length = pt['window_length_sg'], delta=1.0, axis=-1, mode='interp', cval=0.0)") , columns = self.xtrain.columns, index= self.xtrain.index) self.x_test = pd.DataFrame(eval(f"savgol_filter(xtest1, polyorder=pt['deriv_sg'], deriv=pt['deriv_sg'], window_length = pt['window_length_sg'], delta=1.0, axis=-1, mode='interp', cval=0.0)") , columns = self.xtest.columns, index= self.xtest.index) # Train the model try: Model = PLSRegression(scale = self.scale,n_components = params['n_components']) Model.fit(self.x_train.iloc[:,id], self.y_train) except ValueError as ve: params["n_components"] = 1 Model = PLSRegression(scale = self.scale,n_components = params['n_components']) Model.fit(self.x_train.iloc[:,id], self.y_train) ## make prediction yc = Model.predict(self.x_train.iloc[:,id]).ravel() ycv = cross_val_predict(Model, self.x_train.iloc[:,id], self.y_train, cv=self.Kfold, n_jobs=-1).ravel() yt = Model.predict(self.x_test.iloc[:, id]).ravel() ### compute r-squared #r2c = r2_score(self.y_train, yc) #r2cv = r2_score(self.y_train, ycv) #r2t = r2_score(self.y_test, yt) rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv)) rmsec = np.sqrt(mean_squared_error(self.y_train, yc)) score = np.round(rmsecv/rmsec + rmsecv*100/self.y_train.mean()) if score < TpeIpls.SCORE-0.5: TpeIpls.SCORE = score self.nlv = params['n_components'] TpeIpls.index_export = pd.DataFrame() TpeIpls.index_export["Vars"] = self.x_test.columns[id] TpeIpls.index_export.index = id self.best = params self.segments = arrays return score ############################################## def BandSelect(self, n_iter): trials = Trials() best_params = fmin(fn=self.objective, space=self.PLS_params, algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach max_evals=n_iter, trials=trials, verbose=0) ban = {} if self.segments:####### test for i in range(len(self.segments)): ban[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]] self.bands = pd.DataFrame(ban).T self.bands.columns = ['from', 'to'] f = [] for i in range(self.bands.shape[0]): f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1)) variables_idx = list(set(f)) ############################################ for i in range(self.bands.shape[0]): f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1)) variables_idx = list(set(f)) self.pls = PLSRegression(n_components=self.nlv, scale= self.scale) self.pls.fit(self.x_train.iloc[:,variables_idx], self.y_train) self.yc = self.pls.predict(self.x_train.iloc[:,variables_idx]).ravel() self.ycv = cross_val_predict(self.pls, self.x_train.iloc[:,variables_idx], self.y_train, cv=self.Kfold, n_jobs=-1).ravel() self.yt = self.pls.predict(self.x_test.iloc[:,variables_idx]).ravel() return self.bands, variables_idx @property def best_hyperparams(self): self.b = {'Scatter':self.best['Preprocess']['Scatter'], 'Saitzky-Golay derivative parameters':{'polyorder':self.best['Preprocess']['polyorder_sg'], 'deriv':self.best['Preprocess']['deriv_sg'], 'window_length':self.best['Preprocess']['window_length_sg']}} return self.b @property def model_(self): return self.pls @property def pred_data_(self): return self.yc, self.ycv, self.yt