Skip to content
Snippets Groups Projects
data_handling.py 15 KiB
Newer Older
DIANE's avatar
DIANE committed
from utils.eval_metrics import metrics
DIANE's avatar
DIANE committed
import numpy as np
from pandas import DataFrame
DIANE's avatar
DIANE committed

## try to automatically detect the field separator within the CSV
DIANE's avatar
DIANE committed
# def find_delimiter(filename):
#     import clevercsv
#     with open(filename, newline='') as csvfile:
#         delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter
#     # sniffer = csv.Sniffer()
#     # with open(filename) as fp:
#     #     delimiter = sniffer.sniff(fp.read(200)).delimiter
#     return delimiter

# def find_col_index(filename):
#     with open(filename) as fp:
#         lines = read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename))
#         col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
#     return col_index
DIANE's avatar
DIANE committed


# detection of columns categories and scaling
DIANE's avatar
DIANE committed
# def col_cat(data_import):
#     """detect numerical and categorical columns in the csv"""
#     # set first column as sample names
#     name_col = DataFrame(list(data_import.index), index = list(data_import.index))
#     # name_col=name_col.rename(columns = {0:'name'})
#     numerical_columns_list = []
#     categorical_columns_list = []
#     for i in data_import.columns:
#         if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
#             numerical_columns_list.append(data_import[i])
#         else:
#             categorical_columns_list.append(data_import[i])
#     if len(numerical_columns_list) == 0:
#         empty = [0 for x in range(len(data_import))]
#         numerical_columns_list.append(empty)
#     if len(categorical_columns_list) > 0:
#         categorical_data = concat(categorical_columns_list, axis=1)
#         categorical_data.insert(0, 'name', name_col)
#     if len(categorical_columns_list) == 0:
#         categorical_data = DataFrame
#     # Create numerical data matrix from the numerical columns list and fill na with the mean of the column
#     numerical_data = concat(numerical_columns_list, axis=1)
#     numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))

#     return numerical_data, categorical_data
DIANE's avatar
DIANE committed


def list_files(mypath, import_type):
    list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
    if list_files == []:
        list_files = ['Please, create a model before - no model available yet']
    return list_files



DIANE's avatar
DIANE committed
from pandas import DataFrame
from sklearn.preprocessing import StandardScaler

def standardize(X: DataFrame, center: bool = True, scale: bool = False) -> DataFrame:
    """
    Standardizes the input DataFrame using z-score normalization.
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    This function applies standardization to the features in the input DataFrame,
    centering and scaling the data according to the specified parameters. 

    Parameters
    ----------
    X : DataFrame
        A pandas DataFrame containing the data to be standardized. Each column represents a feature.

    center : bool, optional
        If True, the mean of each feature will be subtracted from the data. Default is True.

    scale : bool, optional
        If True, each feature will be scaled to unit variance. Default is False.

    Returns
    -------
    DataFrame
        A pandas DataFrame containing the standardized values, with the same indices and column names
        as the input DataFrame.
    """
    sk = StandardScaler(with_mean=center, with_std=scale)
    sc = DataFrame(sk.fit_transform(X), index=X.index, columns=X.columns)
DIANE's avatar
DIANE committed
    return sc

######################################## Spectral preprocessing
def Detrend(X):
    c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False)
    return c

DIANE's avatar
DIANE committed



def Snv(X: DataFrame) -> DataFrame:
    """
    Performs Standard Normal Variate (SNV) transformation on the input DataFrame.

    This function standardizes each feature by removing the mean and scaling to unit variance.
    The standardization is performed column-wise, and the resulting DataFrame retains the original
    indices and column names.

    Parameters
    ----------
    X : DataFrame
        A pandas DataFrame containing the data to be transformed. Each column represents a feature.

    Returns
    -------
    DataFrame
        A pandas DataFrame containing the standardized values, with the same indices and column names
        as the input DataFrame.
    """
DIANE's avatar
DIANE committed
    xt = np.array(X).T
DIANE's avatar
DIANE committed
    c = (xt - xt.mean()) / xt.std(axis=0)
    return DataFrame(c.T, index=X.index, columns=X.columns)

DIANE's avatar
DIANE committed

def No_transformation(X):
    return X


######################################## Cross val split ############################
DIANE's avatar
DIANE committed
from typing import List, Dict, Tuple
import numpy as np
from pandas import DataFrame
from sklearn.linear_model import LinearRegression

DIANE's avatar
DIANE committed
class KF_CV:
DIANE's avatar
DIANE committed
    """
    A class for implementing cross-validation with Kennard-Stone fold generation.
    Provides methods for generating test set indices, cross-validating a model,
    calculating metrics, and analyzing predictions across folds.

    Methods
    -------
    CV(x, y, n_folds: int) -> Dict[str, np.ndarray]:
        Generates test set indices for each fold based on Kennard-Stone K-Fold.
        
    cross_val_predictor(model, folds: Dict[str, np.ndarray], x, y) -> Dict[str, np.ndarray]:
        Cross-validates the model, returning predictions for each fold.
        
    meas_pred_eq(y, ypcv: Dict[str, np.ndarray], folds: Dict[str, np.ndarray]) -> Tuple[DataFrame, DataFrame]:
        Analyzes predictions, returning dataframes for measured and predicted values
        with OLS regression equations and coefficients.
        
    metrics_cv(y, ypcv: Dict[str, np.ndarray], folds: Dict[str, np.ndarray]) -> Tuple[DataFrame, DataFrame]:
        Computes metrics for each fold, returning dataframes with metric scores per fold
        and summary statistics (mean, standard deviation, coefficient of variation).
        
    cv_scores(y, ypcv: Dict[str, np.ndarray], folds: Dict[str, np.ndarray]) -> Tuple[DataFrame, DataFrame]:
        Computes fold-wise metrics and provides a summary with mean, sd, and cv.
    """

DIANE's avatar
DIANE committed
    @staticmethod
DIANE's avatar
DIANE committed
    def CV(x, y, n_folds: int) -> Dict[str, np.ndarray]:
        """
        Generates test set indices for each fold using Kennard-Stone K-Fold.

        Parameters
        ----------
        x : array-like
            Feature matrix used for training.
        y : array-like
            Target variable.
        n_folds : int
            Number of folds for cross-validation.

        Returns
        -------
        Dict[str, np.ndarray]
            Dictionary where keys are fold names and values are numpy arrays
            containing indices of the test set for each fold.
        """
DIANE's avatar
DIANE committed
        from kennard_stone import KFold as ks_KFold
DIANE's avatar
DIANE committed
        test_folds = {}
        folds_name = [f'Fold{i+1}' for i in range(n_folds)]
DIANE's avatar
DIANE committed
        kf = ks_KFold(n_splits=n_folds, device='cpu')
DIANE's avatar
DIANE committed
        for i in range(n_folds):
            d = []
            for _, i_test in kf.split(x, y):
                d.append(i_test)
            test_folds[folds_name[i]] = d[i]        
DIANE's avatar
DIANE committed
        return test_folds

DIANE's avatar
DIANE committed
    @staticmethod
DIANE's avatar
DIANE committed
    def cross_val_predictor(model, folds: Dict[str, np.ndarray], x, y) -> Dict[str, np.ndarray]:
        """
        Cross-validates the model and returns predictions for each fold.

        Parameters
        ----------
        model : estimator object
            Model to be cross-validated.
        folds : Dict[str, np.ndarray]
            Dictionary with fold names as keys and test set indices as values (from CV method).
        x : array-like
            Feature matrix.
        y : array-like
            Target variable.

        Returns
        -------
        Dict[str, np.ndarray]
            Dictionary where keys are fold names and values are the predicted
            target values for each fold.
        """
DIANE's avatar
DIANE committed
        x = np.array(x)
        y = np.array(y)
        yp = {}
        key = list(folds.keys())
        n_folds = len(folds.keys())

        for i in range(n_folds):
            model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0))
DIANE's avatar
DIANE committed
            yp[key[i]] = model.predict(x[folds[key[i]]])
        return yp

DIANE's avatar
DIANE committed
    @staticmethod
DIANE's avatar
DIANE committed
    def meas_pred_eq(y, ypcv: Dict[str, np.ndarray], folds: Dict[str, np.ndarray]) -> Tuple[DataFrame, DataFrame]:
        """
        Computes and returns measured vs predicted data with regression equations.

        Parameters
        ----------
        y : array-like
            Target variable.
        ypcv : Dict[str, np.ndarray]
            Dictionary with fold names as keys and predicted values per fold as values.
        folds : Dict[str, np.ndarray]
            Dictionary with fold names as keys and test set indices per fold as values.

        Returns
        -------
        Tuple[DataFrame, DataFrame]
            - DataFrame with measured and predicted values and regression equation per fold.
            - DataFrame with regression coefficients (slope and intercept) for each fold.
DIANE's avatar
DIANE committed
        """
        cvcv = {}
        coeff = {}
        y = np.array(y)
        for i, Fname in enumerate(folds.keys()):
DIANE's avatar
DIANE committed
            r = DataFrame()
DIANE's avatar
DIANE committed
            r['Predicted'] = ypcv[Fname]
            r['Measured'] = y[folds[Fname]]
DIANE's avatar
DIANE committed
            ols = LinearRegression().fit(DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1))
DIANE's avatar
DIANE committed
            r.index = folds[Fname]
DIANE's avatar
DIANE committed
            r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0], 2)} x Measured'] * r.shape[0]
DIANE's avatar
DIANE committed
            cvcv[i] = r
            coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]]

DIANE's avatar
DIANE committed
        from pandas import concat
DIANE's avatar
DIANE committed
        data = concat(cvcv, axis=0)
DIANE's avatar
DIANE committed
        data['index'] = [data.index[i][1] for i in range(data.shape[0])]
        data.index = data['index']
DIANE's avatar
DIANE committed
        coeff = DataFrame(coeff, index=['Slope', 'Intercept'])
        return data, coeff

DIANE's avatar
DIANE committed
    @staticmethod
DIANE's avatar
DIANE committed
    def metrics_cv(y, ypcv: Dict[str, np.ndarray], folds: Dict[str, np.ndarray]) -> Tuple[DataFrame, DataFrame]:
        """
        Computes and returns evaluation metrics for each fold.

        Parameters
        ----------
        y : array-like
            Target variable.
        ypcv : Dict[str, np.ndarray]
            Dictionary with fold names as keys and predicted values per fold as values.
        folds : Dict[str, np.ndarray]
            Dictionary with fold names as keys and test set indices per fold as values.

        Returns
        -------
        Tuple[DataFrame, DataFrame]
            - DataFrame with metrics for each fold.
            - DataFrame with additional mean, standard deviation, and coefficient of variation.
        """
DIANE's avatar
DIANE committed
        y = np.array(y)
        e = {}
        for i in folds.keys():
DIANE's avatar
DIANE committed
            e[i] = metrics().reg_(y[folds[i]], ypcv[i])
DIANE's avatar
DIANE committed
        r = DataFrame(e)
DIANE's avatar
DIANE committed
        r_print = r.copy()
DIANE's avatar
DIANE committed
        r_print['mean'] = r.mean(axis=1)
        r_print['sd'] = r.std(axis=1)
        r_print['cv'] = 100 * r.std(axis=1) / r.mean(axis=1)
DIANE's avatar
DIANE committed
        return r.T, r_print.T
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @staticmethod
DIANE's avatar
DIANE committed
    def cv_scores(y, ypcv: Dict[str, np.ndarray], folds: Dict[str, np.ndarray]) -> Tuple[DataFrame, DataFrame]:
        """
        Computes and returns fold-wise evaluation scores with summary statistics.

        Parameters
        ----------
        y : array-like
            Target variable.
        ypcv : Dict[str, np.ndarray]
            Dictionary with fold names as keys and predicted values per fold as values.
        folds : Dict[str, np.ndarray]
            Dictionary with fold names as keys and test set indices per fold as values.

        Returns
        -------
        Tuple[DataFrame, DataFrame]
            - DataFrame with metric scores per fold.
            - DataFrame with metric scores along with mean, sd, and cv values.
DIANE's avatar
DIANE committed
        """
        y = np.array(y)
        e = {}
        for i in folds.keys():
DIANE's avatar
DIANE committed
            e[i] = metrics().reg_(y[folds[i]], ypcv[i])
DIANE's avatar
DIANE committed
        r = DataFrame(e)
DIANE's avatar
DIANE committed
        r_print = r
DIANE's avatar
DIANE committed
        r_print['mean'] = r.mean(axis=1)
        r_print['sd'] = r.std(axis=1)
        r_print['cv'] = 100 * r.std(axis=1) / r.mean(axis=1)
DIANE's avatar
DIANE committed
        return r.T, r_print.T
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    
    # ### Return ycv
    # @staticmethod
    # def ycv(model, x, y, n_folds:int):
    #     ycv = np.zeros(y.shape[0])
    #     f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds)
    #     for i in f.keys():
    #         ycv[idx[i]] = f[i]            
    #     return ycv


### Selectivity ratio
def sel_ratio(model, x ):
    from scipy.stats import f

DIANE's avatar
DIANE committed
    x = DataFrame(x)
DIANE's avatar
DIANE committed
    wtp = model.coef_.T/ np.linalg.norm(model.coef_.T)
    ttp = np.array(x @ wtp)
    ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp)
    qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2
    e = np.array(x-x.mean()) - ttp @ ptp.T
    qres = np.linalg.norm(e, axis = 0)**2
DIANE's avatar
DIANE committed
    sr = DataFrame(qexpi/qres, index = x.columns, columns = ['sr'])
DIANE's avatar
DIANE committed

    fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3)
    c = sr > fcr
    sr.index = np.arange(x.shape[1])
    SR = sr.iloc[c.to_numpy(),:]
DIANE's avatar
DIANE committed
    return SR





#####################################
from typing import List
from pathlib import Path

class HandleItems:
    """
    A utility class for managing files and directories, providing static methods to
    delete files, delete directories, and create directories based on given conditions.

    Methods
    -------
    delete_files(keep: List[str]):
        Deletes files from the "report" directory except specified files to keep.

    delete_dir(delete: List[str]):
        Deletes specified directories if they exist.

    create_dir(path: List[str]):
        Creates directories if they do not already exist.
    """

    @staticmethod
    def delete_files(keep: List[str]):
        """
        Deletes files in the "report" directory, except for those that match the
        specified extensions or the file 'logo_cefe.png'.

        Parameters
        ----------
        keep : List[str]
            A list of file extensions to keep in the directory. Files ending with any
            of these extensions will not be deleted.
        """
        from os import walk, remove, path
        
        # Walk through the directory
        for root, dirs, files in walk(Path("report"), topdown=False):
            for file in files:
                # Check if file should not be deleted
                if file != 'logo_cefe.png' and not any(file.endswith(ext) for ext in keep):
                    remove(path.join(root, file))

    @staticmethod
    def delete_dir(delete: List[str]):
        """
        Deletes specified directories if they exist.

        Parameters
        ----------
        delete : List[str]
            A list of directory paths to delete. Only directories that exist will be removed.
        """
        from shutil import rmtree
        for i in delete:
            dirpath = Path(i)
            if dirpath.exists() and dirpath.is_dir():
                rmtree(dirpath)

    @staticmethod
    def create_dir(path: List[str]):
        """
        Creates directories if they do not already exist.

        Parameters
        ----------
        path : List[str]
            A list of directory paths to create. Directories will only be created if
            they do not already exist.
        """
        for i in path:
            dirpath = Path(i)
            if not dirpath.exists():
                dirpath.mkdir(parents=True, exist_ok=True)