Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from Packages import *
from Class_Mod import metrics
class TpeIpls:
'''
This framework is added to the clan of wavelengths selection algorithms.It was introduced as an improvement
to the forward and backward intervall selection algorithms. This framework combines
the partial least squares algorithm and the tree-parzen structed estimatior, which is a bayesian optimization algorithm
that was first introduced in 2011. This combination provides a wrapper method for intervall-PLS.
This work keeps the integrity of the spectral data. by treating the data as a sequential data rather than using
descrete optimization (point to point selection)
'''
'''Optimization algorithms can be used to find the subset of variables that optimize a certain criterion
(e.g., maximize predictive performance, minimize overfitting)'''
SCORE = 10000
index_export = pd.DataFrame()
def __init__(self, x_train, x_test, y_train, y_test, scale, Kfold, n_intervall):
TpeIpls.SCORE = 10000
self.x_train = x_train
self.x_test = x_test
self.y_train= y_train
self.y_test = y_test
self.scale = scale
self.Kfold = Kfold
self.p = self.x_train.shape[1]
self.n_intervall = n_intervall
self.__n_arrets = self.n_intervall*2
self.PLS_params = {f'v{i}': hp.randint(f'v{i}', 0, self.p) for i in range(1,self.__n_arrets+1)}
self.PLS_params['n_components'] = hp.randint("n_components", 1, 6)
def _objective(self, params):
self.idx = [params[f'v{i}'] for i in range(1,self.__n_arrets+1)]
self.idx.sort()
arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
id = np.unique(np.concatenate(arrays, axis=0), axis=0)
# Train the model
try:
Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
Model.fit(self.x_train.iloc[:,id], self.y_train)
except ValueError as ve:
params["n_components"] = 1
Model = PLSRegression(scale = self.scale,n_components = params['n_components'])
Model.fit(self.x_train.iloc[:,id], self.y_train)
## make prediction
yc = Model.predict(self.x_train.iloc[:,id]).ravel()
ycv = cross_val_predict(Model, self.x_train.iloc[:,id], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
yt = Model.predict(self.x_test.iloc[:, id]).ravel()
### compute r-squared
r2c = r2_score(self.y_train, yc)
r2cv = r2_score(self.y_train, ycv)
r2t = r2_score(self.y_test, yt)
rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv))
rmsec = np.sqrt(mean_squared_error(self.y_train, yc))
score = np.round(rmsecv/rmsec + rmsecv*100/self.y_train.mean())
if score < TpeIpls.SCORE-0.5:
TpeIpls.SCORE = score
self.nlv = params['n_components']
print('--**-------------##---------#~###~#---------##---------------**--')
print(f'***** R²train : [{round(r2c * 100)}]**** R²cv : [{round(r2cv * 100)}]**** R²test : [{round(r2t * 100)}]*****')
print(f'***** N Predictiors : [{len(id)}] ******** NLV : [{params["n_components"]}]*****')
TpeIpls.index_export = pd.DataFrame()
TpeIpls.index_export["Vars"] = self.x_test.columns[id]
TpeIpls.index_export.index = id
# Save model
#TpeIpls.index_export.to_excel(path + 'variables.xlsx')
##3-performance
metrics(train=(self.y_train, yc), cv=(self.y_train, ycv) , test=(self.y_test, yt)).round(2).to_excel(path + "performance.xlsx")
self.segments = arrays
print("''---------------------------- evolution noticed, hence a new model was saved-------------------------------''")
self.idx = self.idx
return score
def tune(self, n_iter):
print('------------------------------------------------ Optimization of the process has started ---------------------------------------------')
trials = Trials()
best_params = fmin(fn=self._objective,
space=self.PLS_params,
algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
max_evals=n_iter,
trials=trials,
verbose=2)
@property
def segments_(self):
self.bands = {}
for i in range(len(self.segments)):
self.bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
bands = pd.DataFrame(self.bands).T
bands.columns = ['from', 'to']
return bands
@property
def tpe_pls_performance(self):
f = []
for i in range(self.segments_.shape[0]):
f.extend(np.arange(self.segments_["from"][i], self.segments_["to"][i]+1))
variables_idx = list(set(f))
pls = PLSRegression(n_components=self.nlv, scale= self.scale)
pls.fit(self.x_train.iloc[:,variables_idx], self.y_train)
self.yc = pls.predict(self.x_train.iloc[:,variables_idx]).ravel()
self.ycv = cross_val_predict(pls, self.x_train.iloc[:,variables_idx], self.y_train, cv=self.Kfold, n_jobs=-1).ravel()
self.yt = pls.predict(self.x_test.iloc[:,variables_idx]).ravel()
perf = metrics(train=(self.y_train, self.yc), cv=(self.y_train, self.ycv) , test=(self.y_test, self.yt)).round(2)
return perf
@property
def meas_vs_pred(self):
fig, ax = plt.subplots()
sns.regplot(x = self.y_train ,y = self.yc, ax = ax)
sns.regplot(x = self.y_train ,y = self.ycv,ax = ax)
sns.regplot(x = self.y_test,y = self.yt,ax = ax)
plt.show()