Skip to content
Snippets Groups Projects
data_handling.py 8.4 KiB
Newer Older
DIANE's avatar
DIANE committed
from utils.eval_metrics import metrics
DIANE's avatar
DIANE committed
import numpy as np
from pandas import DataFrame
DIANE's avatar
DIANE committed

## try to automatically detect the field separator within the CSV
def find_delimiter(filename):
    import clevercsv
    with open(filename, newline='') as csvfile:
        delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter
    # sniffer = csv.Sniffer()
    # with open(filename) as fp:
    #     delimiter = sniffer.sniff(fp.read(200)).delimiter
    return delimiter

def find_col_index(filename):
    with open(filename) as fp:
DIANE's avatar
DIANE committed
        lines = read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename))
DIANE's avatar
DIANE committed
        col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
    return col_index


# detection of columns categories and scaling
def col_cat(data_import):
    """detect numerical and categorical columns in the csv"""
    # set first column as sample names
DIANE's avatar
DIANE committed
    name_col = DataFrame(list(data_import.index), index = list(data_import.index))
DIANE's avatar
DIANE committed
    # name_col=name_col.rename(columns = {0:'name'})
    numerical_columns_list = []
    categorical_columns_list = []
    for i in data_import.columns:
        if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
            numerical_columns_list.append(data_import[i])
        else:
            categorical_columns_list.append(data_import[i])
    if len(numerical_columns_list) == 0:
        empty = [0 for x in range(len(data_import))]
        numerical_columns_list.append(empty)
    if len(categorical_columns_list) > 0:
DIANE's avatar
DIANE committed
        categorical_data = concat(categorical_columns_list, axis=1)
DIANE's avatar
DIANE committed
        categorical_data.insert(0, 'name', name_col)
    if len(categorical_columns_list) == 0:
DIANE's avatar
DIANE committed
        categorical_data = DataFrame
DIANE's avatar
DIANE committed
    # Create numerical data matrix from the numerical columns list and fill na with the mean of the column
DIANE's avatar
DIANE committed
    numerical_data = concat(numerical_columns_list, axis=1)
DIANE's avatar
DIANE committed
    numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))

    return numerical_data, categorical_data


def list_files(mypath, import_type):
    list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
    if list_files == []:
        list_files = ['Please, create a model before - no model available yet']
    return list_files



def standardize(X, center = True, scale = False):
DIANE's avatar
DIANE committed
    from pandas import DataFrame
    from sklearn.preprocessing import StandardScaler
DIANE's avatar
DIANE committed
    sk = StandardScaler(with_mean=center, with_std = scale)
DIANE's avatar
DIANE committed
    sc = DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns)
DIANE's avatar
DIANE committed
    return sc

def MinMaxScale(X):
    t = X
    sk = MinMaxScaler(feature_range=(0,1))
DIANE's avatar
DIANE committed
    sc = DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns)
DIANE's avatar
DIANE committed
    return sc

######################################## Spectral preprocessing
def Detrend(X):
    c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False)
    return c

def Snv(X):
    xt = np.array(X).T
DIANE's avatar
DIANE committed
    c = (xt-xt.mean())/xt.std(axis = 0)
    return DataFrame(c.T, index=X.index, columns= X.columns)
DIANE's avatar
DIANE committed

def No_transformation(X):
    return X


######################################## Cross val split ############################
class KF_CV:
    ### method for generating test sets index
    ### KFCV(dict) returns a testset indices/Fold 
    @staticmethod
    def CV(x, y, n_folds:int):
DIANE's avatar
DIANE committed
        from kennard_stone import KFold as ks_KFold
DIANE's avatar
DIANE committed
        test_folds = {}
        folds_name = [f'Fold{i+1}' for i in range(n_folds)]
DIANE's avatar
DIANE committed
        kf = ks_KFold(n_splits=n_folds, device='cpu')
DIANE's avatar
DIANE committed
        for i in range(n_folds):
            d = []
            for _, i_test in kf.split(x, y):
                d.append(i_test)
            test_folds[folds_name[i]] = d[i]        
        return test_folds ## returns a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set
    
    ### Cross validate the model and return the predictions and samples index
    @staticmethod
    def cross_val_predictor(model, folds, x, y):
        """" model: the object to be cross-validated,
          folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
          x and y: the data used for CV"""
        x = np.array(x)
        y = np.array(y)

        yp = {}
        key = list(folds.keys())
        n_folds = len(folds.keys())

        for i in range(n_folds):
            model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0))
            yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold
        return yp # returns a tuple with keys are names of folds and the corresponding values are the predicted Y/fold
    @staticmethod
    def meas_pred_eq(y, ypcv, folds):
        """" y: the target variable,
          ypcv: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with predictions/fold (from cross_val_predictor method)
          folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method)
          x and y: the data used for CV
          
        returns:
        two dataframe:
        - a n x 4 dataframe containing measured values, predicted values, ols reg equation, and index (n is the total number of samples)
        -  a 2 x k dataframe containing ols regression coefficients(k is the number of folds)
        """
        cvcv = {}
        coeff = {}
        y = np.array(y)
        for i, Fname in enumerate(folds.keys()):
DIANE's avatar
DIANE committed
            r = DataFrame()
DIANE's avatar
DIANE committed
            r['Predicted'] = ypcv[Fname]
            r['Measured'] = y[folds[Fname]]
DIANE's avatar
DIANE committed
            from sklearn.linear_model import LinearRegression
DIANE's avatar
DIANE committed
            ols = LinearRegression().fit(DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1))
DIANE's avatar
DIANE committed
            r.index = folds[Fname]
            r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0]
            cvcv[i] = r
            coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]]

DIANE's avatar
DIANE committed
        from pandas import concat
DIANE's avatar
DIANE committed
        data = concat(cvcv, axis = 0)
DIANE's avatar
DIANE committed
        data['index'] = [data.index[i][1] for i in range(data.shape[0])]
        data.index = data['index']
DIANE's avatar
DIANE committed
        coeff = DataFrame(coeff, index = ['Slope', 'Intercept'])    
DIANE's avatar
DIANE committed
        return data, coeff ## returns  values predicted in cross validation, ,coefficients of regression
    
    @staticmethod
    def metrics_cv(y, ypcv, folds):
        y = np.array(y)
        e = {}
        for i in folds.keys():
            e[i] = metrics().reg_(y[folds[i]],ypcv[i])
DIANE's avatar
DIANE committed
        r = DataFrame(e)
DIANE's avatar
DIANE committed
        r_print = r.copy()
        r_print['mean'] = r.mean(axis = 1)
        r_print['sd'] = r.std(axis = 1)
        r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
        return r.T, r_print.T
    
    ### compute metrics for each fold
    @staticmethod
    def cv_scores(y, ypcv, folds):
        """ Takes as input the Y vactor, the tuple of preducted values/fold(from cross_val_predictor method), and the index/fold(from CV method)
        and returns two dataframes, the first is containing metrics scores/fold and the second is similar to the first by with additional mean, sd, and rsd variables
        """
        y = np.array(y)
        e = {}
        for i in folds.keys():
            e[i] = metrics().reg_(y[folds[i]],ypcv[i])
DIANE's avatar
DIANE committed
        r = DataFrame(e)
DIANE's avatar
DIANE committed
        r_print = r
        r_print['mean'] = r.mean(axis = 1)
        r_print['sd'] = r.std(axis = 1)
        r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1)
        return r.T, r_print.T
    
    
    # ### Return ycv
    # @staticmethod
    # def ycv(model, x, y, n_folds:int):
    #     ycv = np.zeros(y.shape[0])
    #     f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds)
    #     for i in f.keys():
    #         ycv[idx[i]] = f[i]            
    #     return ycv


### Selectivity ratio
def sel_ratio(model, x ):
    from scipy.stats import f

DIANE's avatar
DIANE committed
    x = DataFrame(x)
DIANE's avatar
DIANE committed
    wtp = model.coef_.T/ np.linalg.norm(model.coef_.T)
    ttp = np.array(x @ wtp)
    ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp)
    qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2
    e = np.array(x-x.mean()) - ttp @ ptp.T
    qres = np.linalg.norm(e, axis = 0)**2
DIANE's avatar
DIANE committed
    sr = DataFrame(qexpi/qres, index = x.columns, columns = ['sr'])
DIANE's avatar
DIANE committed

    fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3)
    c = sr > fcr
    sr.index = np.arange(x.shape[1])
    SR = sr.iloc[c.to_numpy(),:]
    return SR