Skip to content
Snippets Groups Projects
data_handling.py 10.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • DIANE's avatar
    DIANE committed
    from packages import *
    from utils.eval_metrics import metrics
    
    DIANE's avatar
    DIANE committed
    
    ## try to automatically detect the field separator within the CSV
    def find_delimiter(filename):
        import clevercsv
        with open(filename, newline='') as csvfile:
            delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter
        # sniffer = csv.Sniffer()
        # with open(filename) as fp:
        #     delimiter = sniffer.sniff(fp.read(200)).delimiter
        return delimiter
    
    def find_col_index(filename):
        with open(filename) as fp:
    
    DIANE's avatar
    DIANE committed
            lines = read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename))
    
    DIANE's avatar
    DIANE committed
            col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
        return col_index
    
    
    # detection of columns categories and scaling
    def col_cat(data_import):
        """detect numerical and categorical columns in the csv"""
        # set first column as sample names
    
    DIANE's avatar
    DIANE committed
        name_col = DataFrame(list(data_import.index), index = list(data_import.index))
    
    DIANE's avatar
    DIANE committed
        # name_col=name_col.rename(columns = {0:'name'})
        numerical_columns_list = []
        categorical_columns_list = []
        for i in data_import.columns:
            if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
                numerical_columns_list.append(data_import[i])
            else:
                categorical_columns_list.append(data_import[i])
        if len(numerical_columns_list) == 0:
            empty = [0 for x in range(len(data_import))]
            numerical_columns_list.append(empty)
        if len(categorical_columns_list) > 0:
    
    DIANE's avatar
    DIANE committed
            categorical_data = concat(categorical_columns_list, axis=1)
    
    DIANE's avatar
    DIANE committed
            categorical_data.insert(0, 'name', name_col)
        if len(categorical_columns_list) == 0:
    
    DIANE's avatar
    DIANE committed
            categorical_data = DataFrame
    
    DIANE's avatar
    DIANE committed
        # Create numerical data matrix from the numerical columns list and fill na with the mean of the column
    
    DIANE's avatar
    DIANE committed
        numerical_data = concat(numerical_columns_list, axis=1)
    
    DIANE's avatar
    DIANE committed
        numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
    
        return numerical_data, categorical_data
    
    
    
    DIANE's avatar
    DIANE committed
    ############## new function
    def csv_loader(file):
        import clevercsv
        import numpy as np
        import pandas as pd
    
        dec_dia = ['.',',']
        sep_dia = [',',';']
        dec, sep = [], []
        with open(file, mode = 'r') as csvfile:
            lines = [csvfile.readline() for i in range(3)]
            for i in lines:
                for j in range(2):
                    dec.append(i.count(dec_dia[j]))
                    sep.append(i.count(sep_dia[j]))
                
        if dec[0] != dec[2]:
            header = 0
        else:
            header = 0
    
    
        semi = np.sum([sep[2*i+1] for i in range(3)])
        commas = np.sum([sep[2*i] for i in range(3)])
    
        if semi>commas:separator = ';'
        elif semi<commas: separator = ','
        
        elif semi ==0 and commas == 0: separator = ';'
        
    
        commasdec = np.sum([dec[2*i+1] for i in range(1,3)])
        dot = np.sum([dec[2*i] for i in range(1,3)])
        if commasdec>dot:decimal = ','
        elif commasdec<=dot:decimal = '.'
        
        if decimal == separator or len(np.unique(dec)) <= 2:
            decimal = "."
        
        df = pd.read_csv(file, decimal=decimal, sep=separator, header=None, index_col=None)
        try:
            rat = np.mean(df.iloc[0,50:60]/df.iloc[5,50:60])>10
            header = 0 if rat or np.nan else None
        except:
            header = 0
    
        from pandas.api.types import is_float_dtype
    
        if is_float_dtype(df.iloc[1:,0]):
            index_col = None
        else:
            try:
                te = df.iloc[1:,0].to_numpy().astype(float).dtype
                
            except:
                te = set(df.iloc[1:,0])
    
            if len(te) == df.shape[0]-1:
                index_col = 0
            elif len(te) < df.shape[0]-1:
                index_col = None
            else:
                index_col = None
    
        # index_col = 0 if len(set(df.iloc[1:,0])) == df.shape[0]-1 and is_float_dtype(df.iloc[:,0])==False else None
        df = pd.read_csv(file, decimal=decimal, sep=separator, header=header, index_col=index_col)
        # st.write(decimal, separator, index_col, header)
        
        if df.select_dtypes(exclude='float').shape[1] >0:
            non_float = df.select_dtypes(exclude='float')
            
        else:
            non_float = pd.DataFrame()
    
    
        if df.select_dtypes(include='float').shape[1] >0:
            float_data = df.select_dtypes(include='float')
            
        else:
            float_data = pd.DataFrame()
        return float_data, non_float
    
    
    
    
    DIANE's avatar
    DIANE committed
    
    def list_files(mypath, import_type):
        list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
        if list_files == []:
            list_files = ['Please, create a model before - no model available yet']
        return list_files
    
    
    
    def standardize(X, center = True, scale = False):
        sk = StandardScaler(with_mean=center, with_std = scale)
    
    DIANE's avatar
    DIANE committed
        sc = DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns)
    
    DIANE's avatar
    DIANE committed
        return sc
    
    def MinMaxScale(X):
        t = X
        sk = MinMaxScaler(feature_range=(0,1))
    
    DIANE's avatar
    DIANE committed
        sc = DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns)
    
    DIANE's avatar
    DIANE committed
        return sc
    
    ######################################## Spectral preprocessing
    def Detrend(X):
        c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False)
        return c
    
    def Snv(X):
        xt = np.array(X).T
    
    DIANE's avatar
    DIANE committed
        c = (xt-xt.mean())/xt.std(axis = 0)
        return DataFrame(c.T, index=X.index, columns= X.columns)
    
    DIANE's avatar
    DIANE committed
    
    def No_transformation(X):
        return X
    
    
    ######################################## Cross val split ############################
    class KF_CV:
        ### method for generating test sets index
        ### KFCV(dict) returns a testset indices/Fold 
        @staticmethod
        def CV(x, y, n_folds:int):
            test_folds = {}
            folds_name = [f'Fold{i+1}' for i in range(n_folds)]
    
    DIANE's avatar
    DIANE committed
            kf = ks_KFold(n_splits=n_folds, device='cpu')
    
    DIANE's avatar
    DIANE committed
            for i in range(n_folds):
                d = []
                for _, i_test in kf.split(x, y):
                    d.append(i_test)
                test_folds[folds_name[i]] = d[i]        
            return test_folds ## returns a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set
        
        ### Cross validate the model and return the predictions and samples index
        @staticmethod
        def cross_val_predictor(model, folds, x, y):
            """" model: the object to be cross-validated,
              folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
              x and y: the data used for CV"""
            x = np.array(x)
            y = np.array(y)
    
            yp = {}
            key = list(folds.keys())
            n_folds = len(folds.keys())
    
            for i in range(n_folds):
                model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0))
                yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold
            return yp # returns a tuple with keys are names of folds and the corresponding values are the predicted Y/fold
        @staticmethod
        def meas_pred_eq(y, ypcv, folds):
            """" y: the target variable,
              ypcv: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with predictions/fold (from cross_val_predictor method)
              folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
              x and y: the data used for CV
              
            returns:
            two dataframe:
            - a n x 4 dataframe containing measured values, predicted values, ols reg equation, and index (n is the total number of samples)
            -  a 2 x k dataframe containing ols regression coefficients(k is the number of folds)
            """
            cvcv = {}
            coeff = {}
            y = np.array(y)
            for i, Fname in enumerate(folds.keys()):
    
    DIANE's avatar
    DIANE committed
                r = DataFrame()
    
    DIANE's avatar
    DIANE committed
                r['Predicted'] = ypcv[Fname]
                r['Measured'] = y[folds[Fname]]
    
    DIANE's avatar
    DIANE committed
                ols = LinearRegression().fit(DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1))
    
    DIANE's avatar
    DIANE committed
                r.index = folds[Fname]
                r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0]
                cvcv[i] = r
                coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]]
    
    
    DIANE's avatar
    DIANE committed
            data = concat(cvcv, axis = 0)
    
    DIANE's avatar
    DIANE committed
            data['index'] = [data.index[i][1] for i in range(data.shape[0])]
            data.index = data['index']
    
    DIANE's avatar
    DIANE committed
            coeff = DataFrame(coeff, index = ['Slope', 'Intercept'])    
    
    DIANE's avatar
    DIANE committed
            return data, coeff ## returns  values predicted in cross validation, ,coefficients of regression
        
        @staticmethod
        def metrics_cv(y, ypcv, folds):
            y = np.array(y)
            e = {}
            for i in folds.keys():
                e[i] = metrics().reg_(y[folds[i]],ypcv[i])
    
    DIANE's avatar
    DIANE committed
            r = DataFrame(e)
    
    DIANE's avatar
    DIANE committed
            r_print = r.copy()
            r_print['mean'] = r.mean(axis = 1)
            r_print['sd'] = r.std(axis = 1)
            r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
            return r.T, r_print.T
        
        ### compute metrics for each fold
        @staticmethod
        def cv_scores(y, ypcv, folds):
            """ Takes as input the Y vactor, the tuple of preducted values/fold(from cross_val_predictor method), and the index/fold(from CV method)
            and returns two dataframes, the first is containing metrics scores/fold and the second is similar to the first by with additional mean, sd, and rsd variables
            """
            y = np.array(y)
            e = {}
            for i in folds.keys():
                e[i] = metrics().reg_(y[folds[i]],ypcv[i])
    
    DIANE's avatar
    DIANE committed
            r = DataFrame(e)
    
    DIANE's avatar
    DIANE committed
            r_print = r
            r_print['mean'] = r.mean(axis = 1)
            r_print['sd'] = r.std(axis = 1)
            r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
            return r.T, r_print.T
        
        
        # ### Return ycv
        # @staticmethod
        # def ycv(model, x, y, n_folds:int):
        #     ycv = np.zeros(y.shape[0])
        #     f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds)
        #     for i in f.keys():
        #         ycv[idx[i]] = f[i]            
        #     return ycv
    
    
    ### Selectivity ratio
    def sel_ratio(model, x ):
        from scipy.stats import f
    
    
    DIANE's avatar
    DIANE committed
        x = DataFrame(x)
    
    DIANE's avatar
    DIANE committed
        wtp = model.coef_.T/ np.linalg.norm(model.coef_.T)
        ttp = np.array(x @ wtp)
        ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp)
        qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2
        e = np.array(x-x.mean()) - ttp @ ptp.T
        qres = np.linalg.norm(e, axis = 0)**2
    
    DIANE's avatar
    DIANE committed
        sr = DataFrame(qexpi/qres, index = x.columns, columns = ['sr'])
    
    DIANE's avatar
    DIANE committed
    
        fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3)
        c = sr > fcr
        sr.index = np.arange(x.shape[1])
        SR = sr.iloc[c.to_numpy(),:]
        return SR