from Packages import * from .Evaluation_Metrics import metrics ## try to automatically detect the field separator within the CSV def find_delimiter(filename): import clevercsv with open(filename, newline='') as csvfile: delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter # sniffer = csv.Sniffer() # with open(filename) as fp: # delimiter = sniffer.sniff(fp.read(200)).delimiter return delimiter def find_col_index(filename): with open(filename) as fp: lines = pd.read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename)) col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no' return col_index # detection of columns categories and scaling def col_cat(data_import): """detect numerical and categorical columns in the csv""" # set first column as sample names name_col = pd.DataFrame(list(data_import.index), index = list(data_import.index)) # name_col=name_col.rename(columns = {0:'name'}) numerical_columns_list = [] categorical_columns_list = [] for i in data_import.columns: if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"): numerical_columns_list.append(data_import[i]) else: categorical_columns_list.append(data_import[i]) if len(numerical_columns_list) == 0: empty = [0 for x in range(len(data_import))] numerical_columns_list.append(empty) if len(categorical_columns_list) > 0: categorical_data = pd.concat(categorical_columns_list, axis=1) categorical_data.insert(0, 'name', name_col) if len(categorical_columns_list) == 0: categorical_data = pd.DataFrame # Create numerical data matrix from the numerical columns list and fill na with the mean of the column numerical_data = pd.concat(numerical_columns_list, axis=1) numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x))) return numerical_data, categorical_data def list_files(mypath, import_type): list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')] if list_files == []: list_files = ['Please, create a model before - no model available yet'] return list_files def standardize(X, center = True, scale = False): sk = StandardScaler(with_mean=center, with_std = scale) sc = pd.DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns) return sc def MinMaxScale(X): t = X sk = MinMaxScaler(feature_range=(0,1)) sc = pd.DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns) return sc ######################################## Spectral preprocessing def Detrend(X): c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False) return c def Snv(X): xt = np.array(X).T c = (xt-xt.mean())/xt.std() return pd.DataFrame(c.T, index=X.index, columns= X.columns) def No_transformation(X): return X ######################################## Cross val split ############################ class KF_CV: ### method for generating test sets index ### KFCV(dict) returns a testset indices/Fold @staticmethod def CV(x, y, n_folds:int): test_folds = {} folds_name = [f'Fold{i+1}' for i in range(n_folds)] kf = ks.KFold(n_splits=n_folds, device='cpu') for i in range(n_folds): d = [] for _, i_test in kf.split(x, y): d.append(i_test) test_folds[folds_name[i]] = d[i] return test_folds ## returns a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set ### Cross validate the model and return the predictions and samples index @staticmethod def cross_val_predictor(model, folds, x, y): """" model: the object to be cross-validated, folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method) x and y: the data used for CV""" x = np.array(x) y = np.array(y) yp = {} key = list(folds.keys()) n_folds = len(folds.keys()) for i in range(n_folds): model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0)) yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold return yp # returns a tuple with keys are names of folds and the corresponding values are the predicted Y/fold @staticmethod def meas_pred_eq(y, ypcv, folds): """" y: the target variable, ypcv: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with predictions/fold (from cross_val_predictor method) folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method) x and y: the data used for CV returns: two dataframe: - a n x 4 dataframe containing measured values, predicted values, ols reg equation, and index (n is the total number of samples) - a 2 x k dataframe containing ols regression coefficients(k is the number of folds) """ cvcv = {} coeff = {} y = np.array(y) for i, Fname in enumerate(folds.keys()): r = pd.DataFrame() r['Predicted'] = ypcv[Fname] r['Measured'] = y[folds[Fname]] ols = LinearRegression().fit(pd.DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1)) r.index = folds[Fname] r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0] cvcv[i] = r coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]] data = pd.concat(cvcv, axis = 0) data['index'] = [data.index[i][1] for i in range(data.shape[0])] data.index = data['index'] coeff = pd.DataFrame(coeff, index = ['Slope', 'Intercept']) return data, coeff ## returns values predicted in cross validation, ,coefficients of regression @staticmethod def metrics_cv(y, ypcv, folds): y = np.array(y) e = {} for i in folds.keys(): e[i] = metrics().reg_(y[folds[i]],ypcv[i]) r = pd.DataFrame(e) r_print = r.copy() r_print['mean'] = r.mean(axis = 1) r_print['sd'] = r.std(axis = 1) r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1) return r.T, r_print.T ### compute metrics for each fold @staticmethod def cv_scores(y, ypcv, folds): """ Takes as input the Y vactor, the tuple of preducted values/fold(from cross_val_predictor method), and the index/fold(from CV method) and returns two dataframes, the first is containing metrics scores/fold and the second is similar to the first by with additional mean, sd, and rsd variables """ y = np.array(y) e = {} for i in folds.keys(): e[i] = metrics().reg_(y[folds[i]],ypcv[i]) r = pd.DataFrame(e) r_print = r r_print['mean'] = r.mean(axis = 1) r_print['sd'] = r.std(axis = 1) r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1) return r.T, r_print.T # ### Return ycv # @staticmethod # def ycv(model, x, y, n_folds:int): # ycv = np.zeros(y.shape[0]) # f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds) # for i in f.keys(): # ycv[idx[i]] = f[i] # return ycv ### Selectivity ratio def sel_ratio(model, x ): from scipy.stats import f x = pd.DataFrame(x) wtp = model.coef_.T/ np.linalg.norm(model.coef_.T) ttp = np.array(x @ wtp) ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp) qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2 e = np.array(x-x.mean()) - ttp @ ptp.T qres = np.linalg.norm(e, axis = 0)**2 sr = pd.DataFrame(qexpi/qres, index = x.columns, columns = ['sr']) fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3) c = sr > fcr sr.index = np.arange(x.shape[1]) SR = sr.iloc[c.to_numpy(),:] return SR