Newer
Older
import numpy as np
from pandas import DataFrame
from utils.eval_metrics import metrics
from scipy.signal import savgol_filter
from sklearn.cross_decomposition import PLSRegression
from hyperopt import fmin, hp, tpe, Trials, space_eval, STATUS_OK, anneal
from utils.data_handling import Snv, No_transformation, KF_CV, sel_ratio
def __init__(self, train, test, n_iter, add_hyperparams=None, remove_hyperparams=None, nfolds=3, **kwargs):
self.SCORE = 100000000
self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1]
self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1]
self._model, self._best = None, None
self._yc, self._ycv, self._yt = None, None, None
self._cv_df = DataFrame()
self._sel_ratio = DataFrame()
self._nfolds = nfolds
if self._xc.shape[1] > 1000:
a = [15, 21, 27, 33]
else:
a = [5, 7, 9]
self._hyper_params = {'polyorder': hp.choice('polyorder', [2]),
'deriv': hp.choice('deriv', [0, 1]),
# [15, 21, 27, 33]
'window_length': hp.choice('window_length', [9]),
'normalization': hp.choice('normalization', ['No_transformation'])}
if remove_hyperparams is not None:
for i in remove_hyperparams:
self._hyper_params.pop(i, None)
if add_hyperparams is not None:
self._hyper_params.update(add_hyperparams)
self._best = None
trials = Trials()
best_params = fmin(fn=self.objective,
space=self._hyper_params,
# Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
algo=tpe.suggest,
max_evals=n_iter,
trials=trials,
verbose=1)
@property
def train_data_(self):
return [self._xc, self._ytrain]
@property
def test_data_(self):
return [self._xt, self._ytest]
@property
def pretreated_spectra_(self):
return self.pretreated
@property
def get_params_(self): # This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters
return self._hyper_params
# This method returns the subset of selected hyperparametes
def best_hyperparams_(self):
# This method returns a sentence telling what signal preprocessing method was applied
def best_hyperparams_print(self):
if self._best['normalization'] == 'Snv':
a = 'Standard Normal Variate (SNV)'
elif self._best['normalization'] == 'No_transformation':
a = " No transformation was performed"
bb, cc, dd = str(self._best['window_length']), str(
self._best['polyorder']), str(self._best['deriv'])
SG = '- Savitzky-Golay derivative parameters: \n(Window_length:' + \
bb+';polynomial order:' + cc+'; Derivative order : ' + dd
def pred_data_(self): # this method returns the predicted data in training and testing steps
@property
def CV_results_(self):
return self._cv_df
@property
def important_features_(self):
return self.important_features
@property
def selected_features_(self):
return self._selected_bands
@property
def sel_ratio_(self):
return self._sel_ratio
########################################### PLSR #########################################
def __init__(self, train, test, n_iter=10, cv=3):
super().__init__(train, test, n_iter, nfolds=cv, add_hyperparams={
'n_components': hp.randint('n_components', 1, 20)})
# parameters in common
def objective(self, params):
params['n_components'] = int(params['n_components'])
x0 = [self._xc, self._xt]
x1 = []
x1.append(eval(str(params['normalization'])+'(x0[0])'))
x1.append(eval(str(params['normalization'])+'(x0[1])'))
a, b, c = params['deriv'], params['polyorder'], params['window_length']
if a > b or b > c:
if self._best is not None:
a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']
else:
a, b, c = 0, 0, 1
params['deriv'], params['polyorder'], params['window_length'] = a, b, c
x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'],
window_length=params['window_length']) for i in range(2)]
model = PLSRegression(scale=False, n_components=params['n_components'])
folds = KF_CV().CV(x=x2[0], y=np.array(
self._ytrain), n_folds=self._nfolds)
yp = KF_CV().cross_val_predictor(
model=model, folds=folds, x=x2[0], y=np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(
y=np.array(self._ytrain), ypcv=yp, folds=folds)[1]
score = self._cv_df.loc["mean", 'rmse'] / \
np.max([0.01, self._cv_df.loc["mean", 'r2']])
Model = PLSRegression(scale=False, n_components=params['n_components'])
Model.fit(x2[0], self._ytrain)
if self.SCORE > score:
self.SCORE = score
self._ycv = KF_CV().meas_pred_eq(y=np.array(self._ytrain), ypcv=yp, folds=folds)
self._yc = Model.predict(x2[0])
self._yt = Model.predict(x2[1])
self._model = Model
for key, value in params.items():
try:
params[key] = int(value)
except (TypeError, ValueError):
params[key] = value
self._best = params
self.pretreated = DataFrame(x2[0])
self._sel_ratio = sel_ratio(Model, x2[0])
return score
############################################ iplsr #########################################
def __init__(self, train, test, n_iter=10, n_intervall=5, cv=3, bestglobalparams=None):
self.glob = bestglobalparams
self._best = {}
self.folds = KF_CV().CV(x=np.array(
train[0]), y=np.array(train[1]), n_folds=3)
x1 = [eval(str(self.glob['normalization'])+'(train[0])'),
eval(str(self.glob['normalization'])+'(test[0])')]
self.x2 = [savgol_filter(x1[i], polyorder=self.glob['polyorder'], deriv=self.glob['deriv'],
window_length=self.glob['window_length']) for i in range(2)]
self.n_intervall = n_intervall
self.n_arrets = self.n_intervall*2
add = {'n_components': hp.randint('n_components', 1, 20)}
add.update({'v'+str(i): hp.randint('v'+str(i), 0,
train[0].shape[1]) for i in range(1, self.n_arrets+1)})
super().__init__(train, test, n_iter, nfolds=cv, add_hyperparams=add)
def objective(self, params):
# wevelengths index
self.idx = [params['v'+str(i)] for i in range(1, self.n_arrets+1)]
self.idx.sort()
arrays = [np.arange(self.idx[2*i], self.idx[2*i+1]+1)
for i in range(self.n_intervall)]
id = np.unique(np.concatenate(arrays, axis=0), axis=0)
prepared_data = [self.x2[i][:, id] for i in range(2)]
# Modelling
folds = KF_CV().CV(x=prepared_data[0], y=np.array(
self._ytrain), n_folds=self._nfolds)
model = PLSRegression(
scale=False, n_components=params['n_components'])
yp = KF_CV().cross_val_predictor(model=model, folds=folds,
x=prepared_data[0], y=np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(
y=np.array(self._ytrain), ypcv=yp, folds=folds)[1]
model = PLSRegression(
scale=False, n_components=params["n_components"])
yp = KF_CV().cross_val_predictor(model=model, folds=folds,
x=prepared_data[0], y=np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(
y=np.array(self._ytrain), ypcv=yp, folds=folds)[1]
score = self._cv_df.loc["mean", 'rmse'] / \
np.max([0.01, self._cv_df.loc["mean", 'r2']])
if self.SCORE > score:
self.SCORE = score
self._best = params
self.arrays = arrays
self.prepared_data = prepared_data
self.model = model
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def best_fit(self):
Model = PLSRegression(
scale=False, n_components=self.model.n_components)
Model.fit(self.prepared_data[0], self._ytrain)
self._yc = Model.predict(self.prepared_data[0])
yp = KF_CV().cross_val_predictor(model=Model, folds=self.folds,
x=self.prepared_data[0], y=np.array(self._ytrain))
self._ycv = KF_CV().meas_pred_eq(y=np.array(
self._ytrain), ypcv=yp, folds=self.folds)
self._yt = Model.predict(self.prepared_data[1])
self._model = Model
for key, value in self._best.items():
try:
self._best[key] = int(value)
except (TypeError, ValueError):
self._best[key] = value
self.pretreated = DataFrame(self.x2[0])
limits = np.ones(len(self.arrays)*2)
for i in range(len(self.arrays)):
limits[2*i], limits[2*i +
1] = self.arrays[i][0], self.arrays[i][self.arrays[i].shape[0]-1]
self.limits = limits.astype(int)
########################################### LWPLSR #########################################
if Reg_json is not None and pred is not None:
from pandas import json_normalize
self.model_ = Reg_json['model']
self.best_hyperparams_ = Reg_json['best_lwplsr_params']
self.pred_data_ = [json_normalize(Reg_json[i]) for i in pred]
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
############################################ Pcr #########################################
class LWPLS(Regmodel):
def __init__(self, train, test, n_iter=10, cv=3, bestglobalparams=None):
self.glob = bestglobalparams
self._best = {}
add = {
'localplsVL': hp.randint('localplsVL', 2, bestglobalparams['n_components']),
'dist': hp.choice('dist', ['euc', 'mah']),
'h': hp.randint('h', 1, 3)}
self.folds = KF_CV().CV(x=np.array(
train[0]), y=np.array(train[1]), n_folds=3)
x1 = [eval(str(self.glob['normalization'])+'(train[0])'),
eval(str(self.glob['normalization'])+'(test[0])')]
self.x2 = [savgol_filter(x1[i], polyorder=self.glob['polyorder'], deriv=self.glob['deriv'],
window_length=self.glob['window_length']) for i in range(2)]
super().__init__(train, test, n_iter, nfolds=cv,
add_hyperparams=add, remove_hyperparams=None)
def objective(self, params):
yp = {}
for i in self.folds.keys():
yp[i] = lwpls(Xtrain=np.delete(np.array(self.x2[0]), self.folds[i], axis=0),
ytrain=np.delete(
np.array(self._ytrain), self.folds[i], axis=0),
Xtest=np.array(self.x2[0])[self.folds[i]],
globalplsVL=self.glob['n_components'], metric=params['dist'], h=params['h'], k=200,
localplsVL=params['localplsVL'], center=True, scale=False, sklearn=True).ravel()
self._cv_df = KF_CV().metrics_cv(y=np.array(
self._ytrain), ypcv=yp, folds=self.folds)[1]
score = self._cv_df.loc["mean", 'rmse'] / \
np.max([0.01, self._cv_df.loc["mean", 'r2']])
if self.SCORE > score:
self.SCORE = score
self._best = params
return score
def best_fit(self):
from .lwplsr_julia_converted import lwpls
yp = {}
for i in self.folds.keys():
yp[i] = lwpls(Xtrain=np.delete(np.array(self.x2[0]), self.folds[i], axis=0),
ytrain=np.delete(
np.array(self._ytrain), self.folds[i], axis=0),
Xtest=np.array(self.x2[0])[self.folds[i]],
globalplsVL=self.glob['n_components'], metric=self._best['dist'], h=self._best['h'], k=200,
localplsVL=self._best['localplsVL'], center=True, scale=False, sklearn=True).ravel()
self._ycv = KF_CV().meas_pred_eq(y=np.array(
self._ytrain), ypcv=yp, folds=self.folds)
self._yt = lwpls(Xtrain=np.array(self.x2[0]),
ytrain=np.array(self._ytrain),
Xtest=np.array(self.x2[1]),
globalplsVL=self.glob['n_components'], metric=self._best['dist'], h=self._best['h'], k=200,
localplsVL=self._best['localplsVL'], center=True, scale=False, sklearn=True).ravel()
self.pretreated = DataFrame(self.x2[0])
self._model = "LW-PLS"
for key, value in self._best.items():
self._best[key] = int(value) if isinstance(value, np.int64) else float(
value) if isinstance(value, np.float64) else value
self._model = {'globalplsVL': self.glob['n_components'],
'localplsVL': self._best['localplsVL'],
'dist': self._best['dist'],
'k': 200,
'h': self._best['h']}
self._best = {'normalization':self.glob['normalization'],
'polyorder':self.glob['polyorder'],
'window_length':self.glob['window_length'],
'deriv':self.glob['deriv'],
'globalplsVL': self.glob['n_components']}