from Packages import * from .Evaluation_Metrics import metrics ## try to automatically detect the field separator within the CSV def find_delimiter(filename): import clevercsv with open(filename, newline='') as csvfile: delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter # sniffer = csv.Sniffer() # with open(filename) as fp: # delimiter = sniffer.sniff(fp.read(200)).delimiter return delimiter def find_col_index(filename): with open(filename) as fp: lines = read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename)) col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no' return col_index # detection of columns categories and scaling def col_cat(data_import): """detect numerical and categorical columns in the csv""" # set first column as sample names name_col = DataFrame(list(data_import.index), index = list(data_import.index)) # name_col=name_col.rename(columns = {0:'name'}) numerical_columns_list = [] categorical_columns_list = [] for i in data_import.columns: if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"): numerical_columns_list.append(data_import[i]) else: categorical_columns_list.append(data_import[i]) if len(numerical_columns_list) == 0: empty = [0 for x in range(len(data_import))] numerical_columns_list.append(empty) if len(categorical_columns_list) > 0: categorical_data = concat(categorical_columns_list, axis=1) categorical_data.insert(0, 'name', name_col) if len(categorical_columns_list) == 0: categorical_data = DataFrame # Create numerical data matrix from the numerical columns list and fill na with the mean of the column numerical_data = concat(numerical_columns_list, axis=1) numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x))) return numerical_data, categorical_data ############## new function def csv_loader(file): import clevercsv import numpy as np import pandas as pd dec_dia = ['.',','] sep_dia = [',',';'] dec, sep = [], [] with open(file, mode = 'r') as csvfile: lines = [csvfile.readline() for i in range(3)] for i in lines: for j in range(2): dec.append(i.count(dec_dia[j])) sep.append(i.count(sep_dia[j])) if dec[0] != dec[2]: header = 0 else: header = 0 semi = np.sum([sep[2*i+1] for i in range(3)]) commas = np.sum([sep[2*i] for i in range(3)]) if semi>commas:separator = ';' elif semi<commas: separator = ',' elif semi ==0 and commas == 0: separator = ';' commasdec = np.sum([dec[2*i+1] for i in range(1,3)]) dot = np.sum([dec[2*i] for i in range(1,3)]) if commasdec>dot:decimal = ',' elif commasdec<=dot:decimal = '.' if decimal == separator or len(np.unique(dec)) <= 2: decimal = "." df = pd.read_csv(file, decimal=decimal, sep=separator, header=None, index_col=None) try: rat = np.mean(df.iloc[0,50:60]/df.iloc[5,50:60])>10 header = 0 if rat or np.nan else None except: header = 0 from pandas.api.types import is_float_dtype if is_float_dtype(df.iloc[1:,0]): index_col = None else: try: te = df.iloc[1:,0].to_numpy().astype(float).dtype except: te = set(df.iloc[1:,0]) if len(te) == df.shape[0]-1: index_col = 0 elif len(te) < df.shape[0]-1: index_col = None else: index_col = None # index_col = 0 if len(set(df.iloc[1:,0])) == df.shape[0]-1 and is_float_dtype(df.iloc[:,0])==False else None df = pd.read_csv(file, decimal=decimal, sep=separator, header=header, index_col=index_col) # st.write(decimal, separator, index_col, header) if df.select_dtypes(exclude='float').shape[1] >0: non_float = df.select_dtypes(exclude='float') else: non_float = pd.DataFrame() if df.select_dtypes(include='float').shape[1] >0: float_data = df.select_dtypes(include='float') else: float_data = pd.DataFrame() return float_data, non_float def list_files(mypath, import_type): list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')] if list_files == []: list_files = ['Please, create a model before - no model available yet'] return list_files def standardize(X, center = True, scale = False): sk = StandardScaler(with_mean=center, with_std = scale) sc = DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns) return sc def MinMaxScale(X): t = X sk = MinMaxScaler(feature_range=(0,1)) sc = DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns) return sc ######################################## Spectral preprocessing def Detrend(X): c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False) return c def Snv(X): xt = np.array(X).T c = (xt-xt.mean())/xt.std(axis = 0) return DataFrame(c.T, index=X.index, columns= X.columns) def No_transformation(X): return X ######################################## Cross val split ############################ class KF_CV: ### method for generating test sets index ### KFCV(dict) returns a testset indices/Fold @staticmethod def CV(x, y, n_folds:int): test_folds = {} folds_name = [f'Fold{i+1}' for i in range(n_folds)] kf = ks_KFold(n_splits=n_folds, device='cpu') for i in range(n_folds): d = [] for _, i_test in kf.split(x, y): d.append(i_test) test_folds[folds_name[i]] = d[i] return test_folds ## returns a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set ### Cross validate the model and return the predictions and samples index @staticmethod def cross_val_predictor(model, folds, x, y): """" model: the object to be cross-validated, folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method) x and y: the data used for CV""" x = np.array(x) y = np.array(y) yp = {} key = list(folds.keys()) n_folds = len(folds.keys()) for i in range(n_folds): model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0)) yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold return yp # returns a tuple with keys are names of folds and the corresponding values are the predicted Y/fold @staticmethod def meas_pred_eq(y, ypcv, folds): """" y: the target variable, ypcv: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with predictions/fold (from cross_val_predictor method) folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method) x and y: the data used for CV returns: two dataframe: - a n x 4 dataframe containing measured values, predicted values, ols reg equation, and index (n is the total number of samples) - a 2 x k dataframe containing ols regression coefficients(k is the number of folds) """ cvcv = {} coeff = {} y = np.array(y) for i, Fname in enumerate(folds.keys()): r = DataFrame() r['Predicted'] = ypcv[Fname] r['Measured'] = y[folds[Fname]] ols = LinearRegression().fit(DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1)) r.index = folds[Fname] r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0] cvcv[i] = r coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]] data = concat(cvcv, axis = 0) data['index'] = [data.index[i][1] for i in range(data.shape[0])] data.index = data['index'] coeff = DataFrame(coeff, index = ['Slope', 'Intercept']) return data, coeff ## returns values predicted in cross validation, ,coefficients of regression @staticmethod def metrics_cv(y, ypcv, folds): y = np.array(y) e = {} for i in folds.keys(): e[i] = metrics().reg_(y[folds[i]],ypcv[i]) r = DataFrame(e) r_print = r.copy() r_print['mean'] = r.mean(axis = 1) r_print['sd'] = r.std(axis = 1) r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1) return r.T, r_print.T ### compute metrics for each fold @staticmethod def cv_scores(y, ypcv, folds): """ Takes as input the Y vactor, the tuple of preducted values/fold(from cross_val_predictor method), and the index/fold(from CV method) and returns two dataframes, the first is containing metrics scores/fold and the second is similar to the first by with additional mean, sd, and rsd variables """ y = np.array(y) e = {} for i in folds.keys(): e[i] = metrics().reg_(y[folds[i]],ypcv[i]) r = DataFrame(e) r_print = r r_print['mean'] = r.mean(axis = 1) r_print['sd'] = r.std(axis = 1) r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1) return r.T, r_print.T # ### Return ycv # @staticmethod # def ycv(model, x, y, n_folds:int): # ycv = np.zeros(y.shape[0]) # f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds) # for i in f.keys(): # ycv[idx[i]] = f[i] # return ycv ### Selectivity ratio def sel_ratio(model, x ): from scipy.stats import f x = DataFrame(x) wtp = model.coef_.T/ np.linalg.norm(model.coef_.T) ttp = np.array(x @ wtp) ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp) qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2 e = np.array(x-x.mean()) - ttp @ ptp.T qres = np.linalg.norm(e, axis = 0)**2 sr = DataFrame(qexpi/qres, index = x.columns, columns = ['sr']) fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3) c = sr > fcr sr.index = np.arange(x.shape[1]) SR = sr.iloc[c.to_numpy(),:] return SR