Newer
Older
from Packages import *
from Class_Mod import metrics
class TpeIpls:
'''
This framework is added to the clan of wavelengths selection algorithms.It was introduced as an improvement
to the forward and backward intervall selection algorithms. This framework combines
the partial least squares algorithm and the tree-parzen structed estimatior, which is a bayesian optimization algorithm
that was first introduced in 2011. This combination provides a wrapper method for intervall-PLS.
This work keeps the integrity of the spectral data. by treating the data as a sequential data rather than using
descrete optimization (point to point selection)
'''
'''Optimization algorithms can be used to find the subset of variables that optimize a certain criterion
(e.g., maximize predictive performance, minimize overfitting)'''
SCORE = 100000000
index_export = pd.DataFrame()
def __init__(self, x_train, x_test, y_train, y_test,
scale, Kfold, n_intervall):
TpeIpls.SCORE = 10000
self.y_train= y_train
self.y_test = y_test
self.scale = scale
self.Kfold = Kfold
self.n_intervall = n_intervall
self.n_arrets = self.n_intervall*2
self.PLS_params = {f'v{i}': hp.randint(f'v{i}', 0, self.p) for i in range(1,self.n_arrets+1)}
self.PLS_params['n_components'] = hp.randint("n_components", 1, 10)
self.PLS_params['Preprocess'] = {'Scatter':hp.choice('Scatter',['Snv', None]),
'window_length_sg':hp.choice('window_length_sg', [9, 13, 17, 21]),
'polyorder_sg':hp.choice('polyorder_sg',[2]),
'deriv_sg':hp.choice('deriv_sg', [1])}
def objective(self, params):
self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)]
self.idx.sort()
arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
id = np.unique(np.concatenate(arrays, axis=0), axis=0)
## first preprocessing method
if params['Preprocess']['Scatter'] =='Snv':
xtrain1 = Snv(self.xtrain)
xtest1 = Snv(self.xtest)
else:
xtrain1 = self.xtrain
xtest1 = self.xtest
## Second first preprocessing method
if params['Preprocess']['deriv_sg'] > params['Preprocess']['polyorder_sg'] or params['Preprocess']['polyorder_sg'] > params['Preprocess']['window_length_sg']:
params['Preprocess']['deriv_sg'] = 0
params['Preprocess']['polyorder_sg'] = 0
params['Preprocess']['window_length_sg'] = 1
pt = params['Preprocess']
self.x_train = pd.DataFrame(eval(f"savgol_filter(xtrain1, polyorder=pt['deriv_sg'], deriv=pt['deriv_sg'], window_length = pt['window_length_sg'], delta=1.0, axis=-1, mode='interp', cval=0.0)") ,
columns = self.xtrain.columns, index= self.xtrain.index)
self.x_test = pd.DataFrame(eval(f"savgol_filter(xtest1, polyorder=pt['deriv_sg'], deriv=pt['deriv_sg'], window_length = pt['window_length_sg'], delta=1.0, axis=-1, mode='interp', cval=0.0)") ,
columns = self.xtest.columns, index= self.xtest.index)
# Train the model
try:
Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
Model.fit(self.x_train.iloc[:,id], self.y_train)
except ValueError as ve:
params["n_components"] = 1
Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
Model.fit(self.x_train.iloc[:,id], self.y_train)
## make prediction
yc = Model.predict(self.x_train.iloc[:,id]).ravel()
ycv = cross_val_predict(Model, self.x_train.iloc[:,id], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
yt = Model.predict(self.x_test.iloc[:, id]).ravel()
### compute r-squared
#r2c = r2_score(self.y_train, yc)
#r2cv = r2_score(self.y_train, ycv)
#r2t = r2_score(self.y_test, yt)
rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv))
rmsec = np.sqrt(mean_squared_error(self.y_train, yc))
score = np.round(rmsecv/rmsec + rmsecv*100/self.y_train.mean())
if score < TpeIpls.SCORE-0.5:
TpeIpls.SCORE = score
self.nlv = params['n_components']
TpeIpls.index_export = pd.DataFrame()
TpeIpls.index_export["Vars"] = self.x_test.columns[id]
TpeIpls.index_export.index = id
self.segments = arrays
return score
##############################################
def BandSelect(self, n_iter):
trials = Trials()
best_params = fmin(fn=self.objective,
space=self.PLS_params,
algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
max_evals=n_iter,
trials=trials,
verbose=0)
ban = {}
if self.segments:####### test
for i in range(len(self.segments)):
ban[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
self.bands = pd.DataFrame(ban).T
self.bands.columns = ['from', 'to']
f = []
for i in range(self.bands.shape[0]):
f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1))
variables_idx = list(set(f))
############################################
for i in range(self.bands.shape[0]):
f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1))
variables_idx = list(set(f))
self.pls = PLSRegression(n_components=self.nlv, scale= self.scale)
self.pls.fit(self.x_train.iloc[:,variables_idx], self.y_train)
self.yc = self.pls.predict(self.x_train.iloc[:,variables_idx]).ravel()
self.ycv = cross_val_predict(self.pls, self.x_train.iloc[:,variables_idx], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
self.yt = self.pls.predict(self.x_test.iloc[:,variables_idx]).ravel()
return self.bands, variables_idx
@property
def best_hyperparams(self):
self.b = {'Scatter':self.best['Preprocess']['Scatter'], 'Saitzky-Golay derivative parameters':{'polyorder':self.best['Preprocess']['polyorder_sg'],
'deriv':self.best['Preprocess']['deriv_sg'],
'window_length':self.best['Preprocess']['window_length_sg']}}
return self.b
@property
def model_(self):
return self.pls
@property
def pred_data_(self):