Newer
Older
from Packages import *
from .Evaluation_Metrics import metrics
## try to automatically detect the field separator within the CSV
def find_delimiter(filename):
import clevercsv
with open(filename, newline='') as csvfile:
delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter
# sniffer = csv.Sniffer()
# with open(filename) as fp:
# delimiter = sniffer.sniff(fp.read(200)).delimiter
return delimiter
def find_col_index(filename):
with open(filename) as fp:
lines = read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename))
col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
return col_index
# detection of columns categories and scaling
def col_cat(data_import):
"""detect numerical and categorical columns in the csv"""
# set first column as sample names
name_col = DataFrame(list(data_import.index), index = list(data_import.index))
# name_col=name_col.rename(columns = {0:'name'})
numerical_columns_list = []
categorical_columns_list = []
for i in data_import.columns:
if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
numerical_columns_list.append(data_import[i])
else:
categorical_columns_list.append(data_import[i])
if len(numerical_columns_list) == 0:
empty = [0 for x in range(len(data_import))]
numerical_columns_list.append(empty)
if len(categorical_columns_list) > 0:
categorical_data.insert(0, 'name', name_col)
if len(categorical_columns_list) == 0:
# Create numerical data matrix from the numerical columns list and fill na with the mean of the column
numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
return numerical_data, categorical_data
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
############## new function
def csv_loader(file):
import clevercsv
import numpy as np
import pandas as pd
dec_dia = ['.',',']
sep_dia = [',',';']
dec, sep = [], []
with open(file, mode = 'r') as csvfile:
lines = [csvfile.readline() for i in range(3)]
for i in lines:
for j in range(2):
dec.append(i.count(dec_dia[j]))
sep.append(i.count(sep_dia[j]))
if dec[0] != dec[2]:
header = 0
else:
header = 0
semi = np.sum([sep[2*i+1] for i in range(3)])
commas = np.sum([sep[2*i] for i in range(3)])
if semi>commas:separator = ';'
elif semi<commas: separator = ','
elif semi ==0 and commas == 0: separator = ';'
commasdec = np.sum([dec[2*i+1] for i in range(1,3)])
dot = np.sum([dec[2*i] for i in range(1,3)])
if commasdec>dot:decimal = ','
elif commasdec<=dot:decimal = '.'
if decimal == separator or len(np.unique(dec)) <= 2:
decimal = "."
df = pd.read_csv(file, decimal=decimal, sep=separator, header=None, index_col=None)
try:
rat = np.mean(df.iloc[0,50:60]/df.iloc[5,50:60])>10
header = 0 if rat or np.nan else None
except:
header = 0
from pandas.api.types import is_float_dtype
if is_float_dtype(df.iloc[1:,0]):
index_col = None
else:
try:
te = df.iloc[1:,0].to_numpy().astype(float).dtype
except:
te = set(df.iloc[1:,0])
if len(te) == df.shape[0]-1:
index_col = 0
elif len(te) < df.shape[0]-1:
index_col = None
else:
index_col = None
# index_col = 0 if len(set(df.iloc[1:,0])) == df.shape[0]-1 and is_float_dtype(df.iloc[:,0])==False else None
df = pd.read_csv(file, decimal=decimal, sep=separator, header=header, index_col=index_col)
# st.write(decimal, separator, index_col, header)
if df.select_dtypes(exclude='float').shape[1] >0:
non_float = df.select_dtypes(exclude='float')
else:
non_float = pd.DataFrame()
if df.select_dtypes(include='float').shape[1] >0:
float_data = df.select_dtypes(include='float')
else:
float_data = pd.DataFrame()
return float_data, non_float
def list_files(mypath, import_type):
list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
if list_files == []:
list_files = ['Please, create a model before - no model available yet']
return list_files
def standardize(X, center = True, scale = False):
sk = StandardScaler(with_mean=center, with_std = scale)
sc = DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns)
return sc
def MinMaxScale(X):
t = X
sk = MinMaxScaler(feature_range=(0,1))
sc = DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns)
return sc
######################################## Spectral preprocessing
def Detrend(X):
c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False)
return c
def Snv(X):
xt = np.array(X).T
c = (xt-xt.mean())/xt.std(axis = 0)
return DataFrame(c.T, index=X.index, columns= X.columns)
def No_transformation(X):
return X
######################################## Cross val split ############################
class KF_CV:
### method for generating test sets index
### KFCV(dict) returns a testset indices/Fold
@staticmethod
def CV(x, y, n_folds:int):
test_folds = {}
folds_name = [f'Fold{i+1}' for i in range(n_folds)]
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
for i in range(n_folds):
d = []
for _, i_test in kf.split(x, y):
d.append(i_test)
test_folds[folds_name[i]] = d[i]
return test_folds ## returns a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set
### Cross validate the model and return the predictions and samples index
@staticmethod
def cross_val_predictor(model, folds, x, y):
"""" model: the object to be cross-validated,
folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
x and y: the data used for CV"""
x = np.array(x)
y = np.array(y)
yp = {}
key = list(folds.keys())
n_folds = len(folds.keys())
for i in range(n_folds):
model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0))
yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold
return yp # returns a tuple with keys are names of folds and the corresponding values are the predicted Y/fold
@staticmethod
def meas_pred_eq(y, ypcv, folds):
"""" y: the target variable,
ypcv: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with predictions/fold (from cross_val_predictor method)
folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
x and y: the data used for CV
returns:
two dataframe:
- a n x 4 dataframe containing measured values, predicted values, ols reg equation, and index (n is the total number of samples)
- a 2 x k dataframe containing ols regression coefficients(k is the number of folds)
"""
cvcv = {}
coeff = {}
y = np.array(y)
for i, Fname in enumerate(folds.keys()):
r['Predicted'] = ypcv[Fname]
r['Measured'] = y[folds[Fname]]
ols = LinearRegression().fit(DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1))
r.index = folds[Fname]
r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0]
cvcv[i] = r
coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]]
data['index'] = [data.index[i][1] for i in range(data.shape[0])]
data.index = data['index']
return data, coeff ## returns values predicted in cross validation, ,coefficients of regression
@staticmethod
def metrics_cv(y, ypcv, folds):
y = np.array(y)
e = {}
for i in folds.keys():
e[i] = metrics().reg_(y[folds[i]],ypcv[i])
r_print = r.copy()
r_print['mean'] = r.mean(axis = 1)
r_print['sd'] = r.std(axis = 1)
r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
return r.T, r_print.T
### compute metrics for each fold
@staticmethod
def cv_scores(y, ypcv, folds):
""" Takes as input the Y vactor, the tuple of preducted values/fold(from cross_val_predictor method), and the index/fold(from CV method)
and returns two dataframes, the first is containing metrics scores/fold and the second is similar to the first by with additional mean, sd, and rsd variables
"""
y = np.array(y)
e = {}
for i in folds.keys():
e[i] = metrics().reg_(y[folds[i]],ypcv[i])
r_print = r
r_print['mean'] = r.mean(axis = 1)
r_print['sd'] = r.std(axis = 1)
r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
return r.T, r_print.T
# ### Return ycv
# @staticmethod
# def ycv(model, x, y, n_folds:int):
# ycv = np.zeros(y.shape[0])
# f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds)
# for i in f.keys():
# ycv[idx[i]] = f[i]
# return ycv
### Selectivity ratio
def sel_ratio(model, x ):
from scipy.stats import f
wtp = model.coef_.T/ np.linalg.norm(model.coef_.T)
ttp = np.array(x @ wtp)
ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp)
qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2
e = np.array(x-x.mean()) - ttp @ ptp.T
qres = np.linalg.norm(e, axis = 0)**2