Skip to content
Snippets Groups Projects
DATA_HANDLING.py 6.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • DIANE's avatar
    DIANE committed
    from .Evaluation_Metrics import metrics
    
    
    ## try to automatically detect the field separator within the CSV
    def find_delimiter(filename):
        sniffer = csv.Sniffer()
        with open(filename) as fp:
            delimiter = sniffer.sniff(fp.read(200)).delimiter
        return delimiter
    
    def find_col_index(filename):
        with open(filename) as fp:
            lines = pd.read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=str(find_delimiter(filename)))
            col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
        return col_index
    
    
    # detection of columns categories and scaling
    def col_cat(data_import):
        """detect numerical and categorical columns in the csv"""
        # set first column as sample names
        name_col = pd.DataFrame(list(data_import.index), index = list(data_import.index))
        # name_col=name_col.rename(columns = {0:'name'})
        numerical_columns_list = []
        categorical_columns_list = []
        for i in data_import.columns:
            if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
                numerical_columns_list.append(data_import[i])
            else:
                categorical_columns_list.append(data_import[i])
        if len(numerical_columns_list) == 0:
            empty = [0 for x in range(len(data_import))]
            numerical_columns_list.append(empty)
        if len(categorical_columns_list) > 0:
            categorical_data = pd.concat(categorical_columns_list, axis=1)
            categorical_data.insert(0, 'name', name_col)
        if len(categorical_columns_list) == 0:
            categorical_data = pd.DataFrame
        # Create numerical data matrix from the numerical columns list and fill na with the mean of the column
        numerical_data = pd.concat(numerical_columns_list, axis=1)
        numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
    
        return numerical_data, categorical_data
    
    
    
    def list_files(mypath, import_type):
        list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
        if list_files == []:
            list_files = ['Please, create a model before - no model available yet']
        return list_files
    
    
    
    
    DIANE's avatar
    DIANE committed
    def standardize(X, center = True, scale = False):
        sk = StandardScaler(with_mean=center, with_std = scale)
        sc = pd.DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns)
    
        return sc
    
    def MinMaxScale(X):
        t = X
        sk = MinMaxScaler(feature_range=(0,1))
        sc = pd.DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns)
    
    DIANE's avatar
    DIANE committed
        return sc
    
    ######################################## Spectral preprocessing
    def Detrend(X):
        c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False)
        return c
    
    def Snv(X):
        xt = np.array(X).T
        c = (xt-xt.mean())/xt.std()
        return pd.DataFrame(c.T, index=X.index, columns= X.columns)
    
    
    DIANE's avatar
    DIANE committed
    def No_transformation(X):
    
    DIANE's avatar
    DIANE committed
        return X
    
    
    ######################################## Cross val split ############################
    
    DIANE's avatar
    DIANE committed
    class KF_CV:
        ### method for generating test sets index
        @staticmethod
        def CV(x, y, n_folds:int):
            test_folds = {}
            folds_name = [f'Fold{i+1}' for i in range(n_folds)]
            kf = ks.KFold(n_splits=n_folds, device='cpu')
            for i in range(n_folds):
                d = []
                for _, i_test in kf.split(x, y):
                    d.append(i_test)
                test_folds[folds_name[i]] = d[i]        
            return test_folds
    
    DIANE's avatar
    DIANE committed
        ### Cross validate the model and return the predictions and samples index
        @staticmethod
        def cross_val_predictor(model, x, y, n_folds:int):
            x = np.array(x)
            y = np.array(y)
    
            yp = {}
    
    DIANE's avatar
    DIANE committed
            folds = KF_CV.CV(x=x, y=y, n_folds=n_folds)### Test index
    
    DIANE's avatar
    DIANE committed
            key = list(folds.keys())
    
            for i in range(n_folds):
                model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0))
                yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold
    
    DIANE's avatar
    DIANE committed
            
    
            cvcv = {}
            coeff = {}
            for i, Fname in enumerate(folds.keys()):
                r = pd.DataFrame()
                r['Predicted'] = yp[Fname]
                r['Measured'] = y[folds[Fname]]
                ols = LinearRegression().fit(pd.DataFrame(y[folds[Fname]]),yp[Fname].reshape(-1,1))
                r.index = folds[Fname]
                r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0]
                cvcv[i] = r
                coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]]
    
            data = pd.concat(cvcv, axis = 0)
            data['index'] = [data.index[i][1] for i in range(data.shape[0])]
            data.index = data['index']
            coeff = pd.DataFrame(coeff, index = ['Slope', 'Intercept'])    
            return yp, folds, data, coeff
    
    DIANE's avatar
    DIANE committed
    
        ### compute metrics for each fold
        @staticmethod
        def process(model, x, y, n_folds:int):
    
    DIANE's avatar
    DIANE committed
            f, idx,_ , _ = KF_CV.cross_val_predictor(model, x=x,y=y, n_folds=n_folds)
    
    DIANE's avatar
    DIANE committed
            e = {}
            for i in idx.keys():
                e[i] = metrics().reg_(y.iloc[idx[i]],f[i])
            r = pd.DataFrame(e)
            return r
        
        ### bias and variance
        @staticmethod
        def cv_scores(model, x, y, n_folds:int):
            x = KF_CV.process(model, x, y, n_folds)
            mean = x.mean(axis = 1)
            sd = x.std(axis = 1)
            rsd = sd*100/mean
            data = pd.concat([mean, sd, rsd], axis = 1).round(2)
            data.columns = ['mean', 'sd', 'cv(%)']
            return data
        
        ### Return ycv
        @staticmethod
        def ycv(model, x, y, n_folds:int):
            ycv = np.zeros(y.shape[0])
    
    DIANE's avatar
    DIANE committed
            f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds)
    
    DIANE's avatar
    DIANE committed
            for i in f.keys():
                ycv[idx[i]] = f[i]            
            return ycv
    
    DIANE's avatar
    DIANE committed
    
    
    ### Selectivity ratio
    def sel_ratio(model, x ):
        from scipy.stats import f
    
        x = pd.DataFrame(x)
        wtp = model.coef_.T/ np.linalg.norm(model.coef_.T)
        ttp = np.array(x @ wtp)
        ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp)
        qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2
        e = np.array(x-x.mean()) - ttp @ ptp.T
        qres = np.linalg.norm(e, axis = 0)**2
        sr = pd.DataFrame(qexpi/qres, index = x.columns, columns = ['sr'])
    
        fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3)
        c = sr > fcr
        sr.index = np.arange(x.shape[1])
        SR = sr.iloc[c.to_numpy(),:]
        return SR