Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from Packages import *
from .Evaluation_Metrics import metrics
## try to automatically detect the field separator within the CSV
def find_delimiter(filename):
import clevercsv
with open(filename, newline='') as csvfile:
delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter
# sniffer = csv.Sniffer()
# with open(filename) as fp:
# delimiter = sniffer.sniff(fp.read(200)).delimiter
return delimiter
def find_col_index(filename):
with open(filename) as fp:
lines = pd.read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename))
col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
return col_index
# detection of columns categories and scaling
def col_cat(data_import):
"""detect numerical and categorical columns in the csv"""
# set first column as sample names
name_col = pd.DataFrame(list(data_import.index), index = list(data_import.index))
# name_col=name_col.rename(columns = {0:'name'})
numerical_columns_list = []
categorical_columns_list = []
for i in data_import.columns:
if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
numerical_columns_list.append(data_import[i])
else:
categorical_columns_list.append(data_import[i])
if len(numerical_columns_list) == 0:
empty = [0 for x in range(len(data_import))]
numerical_columns_list.append(empty)
if len(categorical_columns_list) > 0:
categorical_data = pd.concat(categorical_columns_list, axis=1)
categorical_data.insert(0, 'name', name_col)
if len(categorical_columns_list) == 0:
categorical_data = pd.DataFrame
# Create numerical data matrix from the numerical columns list and fill na with the mean of the column
numerical_data = pd.concat(numerical_columns_list, axis=1)
numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
return numerical_data, categorical_data
def list_files(mypath, import_type):
list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
if list_files == []:
list_files = ['Please, create a model before - no model available yet']
return list_files
def standardize(X, center = True, scale = False):
sk = StandardScaler(with_mean=center, with_std = scale)
sc = pd.DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns)
return sc
def MinMaxScale(X):
t = X
sk = MinMaxScaler(feature_range=(0,1))
sc = pd.DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns)
return sc
######################################## Spectral preprocessing
def Detrend(X):
c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False)
return c
def Snv(X):
xt = np.array(X).T
c = (xt-xt.mean())/xt.std()
return pd.DataFrame(c.T, index=X.index, columns= X.columns)
def No_transformation(X):
return X
######################################## Cross val split ############################
class KF_CV:
### method for generating test sets index
### KFCV(dict) returns a testset indices/Fold
@staticmethod
def CV(x, y, n_folds:int):
test_folds = {}
folds_name = [f'Fold{i+1}' for i in range(n_folds)]
kf = ks.KFold(n_splits=n_folds, device='cpu')
for i in range(n_folds):
d = []
for _, i_test in kf.split(x, y):
d.append(i_test)
test_folds[folds_name[i]] = d[i]
return test_folds ## returns a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set
### Cross validate the model and return the predictions and samples index
@staticmethod
def cross_val_predictor(model, folds, x, y):
"""" model: the object to be cross-validated,
folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
x and y: the data used for CV"""
x = np.array(x)
y = np.array(y)
yp = {}
key = list(folds.keys())
n_folds = len(folds.keys())
for i in range(n_folds):
model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0))
yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold
return yp # returns a tuple with keys are names of folds and the corresponding values are the predicted Y/fold
@staticmethod
def meas_pred_eq(y, ypcv, folds):
"""" y: the target variable,
ypcv: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with predictions/fold (from cross_val_predictor method)
folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
x and y: the data used for CV
returns:
two dataframe:
- a n x 4 dataframe containing measured values, predicted values, ols reg equation, and index (n is the total number of samples)
- a 2 x k dataframe containing ols regression coefficients(k is the number of folds)
"""
cvcv = {}
coeff = {}
y = np.array(y)
for i, Fname in enumerate(folds.keys()):
r = pd.DataFrame()
r['Predicted'] = ypcv[Fname]
r['Measured'] = y[folds[Fname]]
ols = LinearRegression().fit(pd.DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1))
r.index = folds[Fname]
r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0]
cvcv[i] = r
coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]]
data = pd.concat(cvcv, axis = 0)
data['index'] = [data.index[i][1] for i in range(data.shape[0])]
data.index = data['index']
coeff = pd.DataFrame(coeff, index = ['Slope', 'Intercept'])
return data, coeff ## returns values predicted in cross validation, ,coefficients of regression
@staticmethod
def metrics_cv(y, ypcv, folds):
y = np.array(y)
e = {}
for i in folds.keys():
e[i] = metrics().reg_(y[folds[i]],ypcv[i])
r = pd.DataFrame(e)
r_print = r.copy()
r_print['mean'] = r.mean(axis = 1)
r_print['sd'] = r.std(axis = 1)
r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
return r.T, r_print.T
### compute metrics for each fold
@staticmethod
def cv_scores(y, ypcv, folds):
""" Takes as input the Y vactor, the tuple of preducted values/fold(from cross_val_predictor method), and the index/fold(from CV method)
and returns two dataframes, the first is containing metrics scores/fold and the second is similar to the first by with additional mean, sd, and rsd variables
"""
y = np.array(y)
e = {}
for i in folds.keys():
e[i] = metrics().reg_(y[folds[i]],ypcv[i])
r = pd.DataFrame(e)
r_print = r
r_print['mean'] = r.mean(axis = 1)
r_print['sd'] = r.std(axis = 1)
r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
return r.T, r_print.T
# ### Return ycv
# @staticmethod
# def ycv(model, x, y, n_folds:int):
# ycv = np.zeros(y.shape[0])
# f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds)
# for i in f.keys():
# ycv[idx[i]] = f[i]
# return ycv
### Selectivity ratio
def sel_ratio(model, x ):
from scipy.stats import f
x = pd.DataFrame(x)
wtp = model.coef_.T/ np.linalg.norm(model.coef_.T)
ttp = np.array(x @ wtp)
ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp)
qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2
e = np.array(x-x.mean()) - ttp @ ptp.T
qres = np.linalg.norm(e, axis = 0)**2
sr = pd.DataFrame(qexpi/qres, index = x.columns, columns = ['sr'])
fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3)
c = sr > fcr
sr.index = np.arange(x.shape[1])
SR = sr.iloc[c.to_numpy(),:]
return SR