Newer
Older
import numpy as np
from pandas import DataFrame
from utils.eval_metrics import metrics
from scipy.signal import savgol_filter
from sklearn.cross_decomposition import PLSRegression
from hyperopt import fmin, hp, tpe, Trials, space_eval, STATUS_OK, anneal
from utils.data_handling import Snv, No_transformation, KF_CV, sel_ratio
class Regmodel(object):
def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 3, **kwargs):
self.SCORE = 100000000
self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1]
self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1]
self._model, self._best = None, None
self._yc, self._ycv, self._yt = None, None, None
self._cv_df = DataFrame()
self._sel_ratio = DataFrame()
self._nfolds = nfolds
self._selected_bands = DataFrame(index = ['from', 'to'])
self.important_features = None
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]),
'deriv': hp.choice('deriv', [0, 1, 2]),
'window_length': hp.choice('window_length', [15, 21, 27, 33]),
'normalization': hp.choice('normalization', ['Snv', 'No_transformation'])}
if add_hyperparams is not None:
self._hyper_params.update(add_hyperparams)
self._best = None
trials = Trials()
best_params = fmin(fn=self.objective,
space=self._hyper_params,
algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
max_evals=n_iter,
trials=trials,
verbose=1)
@property
def train_data_(self):
return [self._xc, self._ytrain]
@property
def test_data_(self):
return [self._xt, self._ytest]
@property
def pretreated_spectra_(self):
return self.pretreated
@property
def get_params_(self):### This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters
return self._hyper_params
def objective(self, params):
pass
@property
def best_hyperparams_(self): ### This method returns the subset of selected hyperparametes
return self._best
@property
def best_hyperparams_print(self):### This method returns a sentence telling what signal preprocessing method was applied
if self._best['normalization'] == 'Snv':
a = 'Standard Normal Variate (SNV)'
elif self._best['normalization'] == 'No_transformation':
a = " No transformation was performed"
bb,cc,dd = str(self._best['window_length']), str(self._best['polyorder']),str(self._best['deriv'])
SG = '- Savitzky-Golay derivative parameters: \n(Window_length:'+bb+';polynomial order:'+ cc+'; Derivative order : '+ dd
Norm = '- Spectral Normalization: \n'+a
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
return SG+"\n"+Norm
@property
def model_(self): # This method returns the developed model
return self._model
@property
def pred_data_(self): ## this method returns the predicted data in training and testing steps
return self._yc, self._yt
@property
def cv_data_(self): ## Cross validation data
return self._ycv
@property
def CV_results_(self):
return self._cv_df
@property
def important_features_(self):
return self.important_features
@property
def selected_features_(self):
return self._selected_bands
@property
def sel_ratio_(self):
return self._sel_ratio
########################################### PLSR #########################################
class Plsr(Regmodel):
def __init__(self, train, test, n_iter = 10, cv = 3):
super().__init__(train, test, n_iter, nfolds = cv, add_hyperparams = {'n_components': hp.randint('n_components', 1,20)})
### parameters in common
def objective(self, params):
params['n_components'] = int(params['n_components'])
x0 = [self._xc, self._xt]
x1 = []
x1.append(eval(str(params['normalization'])+'(x0[0])'))
x1.append(eval(str(params['normalization'])+'(x0[1])'))
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
a, b, c = params['deriv'], params['polyorder'], params['window_length']
if a > b or b > c:
if self._best is not None:
a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']
else:
a, b, c = 0, 0, 1
params['deriv'], params['polyorder'], params['window_length'] = a, b, c
x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
model = PLSRegression(scale = False, n_components = params['n_components'])
folds = KF_CV().CV(x = x2[0], y = np.array(self._ytrain), n_folds = self._nfolds)
yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = x2[0], y = np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
score = self._cv_df.loc["cv",'rmse']
Model = PLSRegression(scale = False, n_components = params['n_components'])
Model.fit(x2[0], self._ytrain)
if self.SCORE > score:
self.SCORE = score
self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds)
self._yc = Model.predict(x2[0])
self._yt = Model.predict(x2[1])
self._model = Model
for key,value in params.items():
try: params[key] = int(value)
except (TypeError, ValueError): params[key] = value
self._best = params
self.pretreated = DataFrame(x2[0])
self._sel_ratio = sel_ratio(Model, x2[0])
return score
############################################ iplsr #########################################
class TpeIpls(Regmodel):
def __init__(self, train, test, n_iter = 10, n_intervall = 5, cv = 3):
self.n_intervall = n_intervall
self.n_arrets = self.n_intervall*2
r = {'n_components': hp.randint('n_components', 1,20)}
r.update({'v'+str(i): hp.randint('v'+str(i), 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)})
super().__init__(train, test, n_iter, add_hyperparams = r, nfolds = cv)
### parameters in common
def objective(self, params):
### wevelengths index
self.idx = [params['v'+str(i)] for i in range(1,self.n_arrets+1)]
self.idx.sort()
arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
id = np.unique(np.concatenate(arrays, axis=0), axis=0)
### Preprocessing
x0 = [self._xc, self._xt]
# x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)]
x1 = []
x1.append(eval(str(params['normalization'])+'(x0[0])'))
x1.append(eval(str(params['normalization'])+'(x0[1])'))
a, b, c = params['deriv'], params['polyorder'], params['window_length']
if a > b or b > c:
if self._best is not None:
a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']
else:
a, b, c = 0, 0, 1
params['deriv'], params['polyorder'], params['window_length'] = a, b, c
x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
prepared_data = [x2[i][:,id] for i in range(2)]
### Modelling
folds = KF_CV().CV(x = prepared_data[0], y = np.array(self._ytrain), n_folds = self._nfolds)
try:
model = PLSRegression(scale = False, n_components = params['n_components'])
yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
params["n_components"] = 1
model = PLSRegression(scale = False, n_components = params["n_components"])
yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
score = self._cv_df.loc['cv','rmse']
Model = PLSRegression(scale = False, n_components = model.n_components)
Model.fit(prepared_data[0], self._ytrain)
if self.SCORE > score:
self.SCORE = score
self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds)
self._yc = Model.predict(prepared_data[0])
self._yt = Model.predict(prepared_data[1])
self._model = Model
for key,value in params.items():
try: params[key] = int(value)
except (TypeError, ValueError): params[key] = value
self._best = params
self.pretreated = DataFrame(x2[0])
limits[2*i], limits[2*i+1] = arrays[i][0], arrays[i][arrays[i].shape[0]-1]
self.limits = limits.astype(int)
return score
########################################### LWPLSR #########################################
class LwplsObject:
def __init__(self, Reg_json = None, pred = None):
if Reg_json is not None and pred is not None:
from pandas import json_normalize
self.model_ = Reg_json['model']
self.best_hyperparams_ = Reg_json['best_lwplsr_params']
self.pred_data_ = [json_normalize(Reg_json[i]) for i in pred]
############################################ Pcr #########################################