Skip to content
Snippets Groups Projects
application_functions.py 9.87 KiB
Newer Older
Nicolas BARTHES's avatar
Nicolas BARTHES committed
import streamlit as st
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import csv
from umap.umap_ import UMAP
Nicolas BARTHES's avatar
Nicolas BARTHES committed

# local CSS
## load the custom CSS in the style folder
Nicolas BARTHES's avatar
Nicolas BARTHES committed
def local_css(file_name):
    with open(file_name) as f:
        st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
local_css("style/style.css")

## try to automatically detect the field separator within the CSV
Nicolas BARTHES's avatar
Nicolas BARTHES committed
def find_delimiter(filename):
    sniffer = csv.Sniffer()
    with open(filename) as fp:
        delimiter = sniffer.sniff(fp.read(5000)).delimiter
    return delimiter
def find_col_index(filename):
    with open(filename) as fp:
        lines = pd.read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=str(find_delimiter(filename)))
        col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
    return col_index
# detection of columns categories and scaling
def col_cat(data_import):
    # detect numerical and categorical columns in the csv
    numerical_columns_list = []
    categorical_columns_list = []
    for i in data_import.columns:
        if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
            numerical_columns_list.append(data_import[i])
        else:
            categorical_columns_list.append(data_import[i])
    if len(numerical_columns_list) == 0:
        empty = [0 for x in range(len(data_import))]
        numerical_columns_list.append(empty)
    if len(categorical_columns_list) > 0:
        categorical_data = pd.concat(categorical_columns_list, axis=1)
    if len(categorical_columns_list) == 0:
        empty = ["" for x in range(len(data_import))]
        categorical_columns_list.append(empty)
        categorical_data = pd.DataFrame(categorical_columns_list).T
        categorical_data.columns = ['no categories']
    # Create numerical data matrix from the numerical columns list and fill na with the mean of the column
    numerical_data = pd.concat(numerical_columns_list, axis=1)
    numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
    # Scale the numerical data
    scaler = StandardScaler()
    scaled_values = scaler.fit_transform(numerical_data)
    return numerical_data, categorical_data, scaled_values

# UMAP function for the Sample Selection module
def umap_maker(data_import):
    numerical_data, categorical_data, scaled_values = col_cat(data_import)
    umap_func = UMAP(random_state=42, n_neighbors=20, n_components=4, min_dist=0.0,)
    umap_fit = umap_func.fit(scaled_values)
    umap_data = umap_fit.transform(scaled_values)
    umap_data = pd.DataFrame(umap_data, index=numerical_data.index)
    # Set UMAP column names with component number
    new_column_names = ["UMAP_" + str(i) for i in range(1, len(umap_data.columns) + 1)]
    # Format the output
    column_mapper = dict(zip(list(umap_data.columns), new_column_names))
    umap_data = umap_data.rename(columns=column_mapper)
    output = pd.concat([data_import, umap_data], axis=1)
    return output, list(categorical_data.columns), new_column_names
# PCA function for the Sample Selection module
def pca_maker(data_import):
    numerical_data, categorical_data, scaled_values = col_cat(data_import)
    # Compute a 6 components PCA on scaled values
    pca = PCA(n_components=6)
    pca_fit = pca.fit(scaled_values)
    pca_data = pca_fit.transform(scaled_values)
    pca_data = pd.DataFrame(pca_data, index=numerical_data.index)
    # Set PCA column names with component number and explained variance %
    new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)]
    # Format the output
    column_mapper = dict(zip(list(pca_data.columns), new_column_names))
    pca_data = pca_data.rename(columns=column_mapper)
    output = pd.concat([data_import, pca_data], axis=1)
    return output, list(categorical_data.columns), new_column_names
Nicolas BARTHES's avatar
Nicolas BARTHES committed

# create model module with PINARD
Nicolas Barthes's avatar
Nicolas Barthes committed
def model_PLSR(xcal_csv, ycal_csv, sep, hdr, rd_seed):
Nicolas BARTHES's avatar
Nicolas BARTHES committed
    from pinard import utils
    from pinard import preprocessing as pp
    from pinard.model_selection import train_test_split_idx
    from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate
    from sklearn.pipeline import Pipeline, FeatureUnion
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.compose import TransformedTargetRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
    from sklearn.cross_decomposition import PLSRegression
    np.random.seed(rd_seed)
    # hdr var correspond to column header True or False in the CSV
Nicolas BARTHES's avatar
Nicolas BARTHES committed
    if hdr == 'yes':
        col = 0
    else:
        col = False
Nicolas BARTHES's avatar
Nicolas BARTHES committed
    x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
    # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
    train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed)
    # Assign data to training and test sets
    X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
    st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))
    # Declare preprocessing pipeline
    svgolay = [   ('_sg1',pp.SavitzkyGolay()),
                  ('_sg2',pp.SavitzkyGolay())  # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
                  ]
    preprocessing = [   ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data
                        ('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter
                        ('derivate', pp.Derivate()), # Calculate the first derivative of the data
                        ('SVG', FeatureUnion(svgolay))
                        # Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())])  # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
                        ]
    # Declare complete pipeline
Nicolas BARTHES's avatar
Nicolas BARTHES committed
    pipeline = Pipeline([
        ('scaler', MinMaxScaler()), # scaling the data
        ('preprocessing', FeatureUnion(preprocessing)), # preprocessing
        ('PLS',  PLSRegression()) # regressor
    ])
    # Estimator including y values scaling
    estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler())
    # Training
    trained = estimator.fit(X_train, y_train)
    # fit scores
    st.write("fit scores / R²: " + str(estimator.score(X_test,y_test)))
    # Predictions on test set
    Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable
    st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds)))
    st.write("MSE: " + str(mean_squared_error(y_test, Y_preds)))
    st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds)))
Nicolas Barthes's avatar
Nicolas Barthes committed

    # Cross-Validate the model
    CV_model(estimator, X_train, y_train, 3)

Nicolas BARTHES's avatar
Nicolas BARTHES committed
    return (trained)

Nicolas Barthes's avatar
Nicolas Barthes committed
# Cross-Validation of the model
def CV_model(estimator, x, y, cv):
    from sklearn.model_selection import cross_val_score, cross_val_predict, cross_validate
    from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
    st.write('Cross-Validation of this model')
    st.write("CV_scores", cross_val_score(estimator, x, y, cv=cv))
    st.write("-- CV predict --")
    Y_preds = cross_val_predict(estimator, x, y, cv=3)
    st.write("MAE", mean_absolute_error(y, Y_preds))
    st.write("MSE", mean_squared_error(y, Y_preds))
    st.write("MAPE", mean_absolute_percentage_error(y, Y_preds))
    st.write("", r2_score(y, Y_preds))
    st.write("-- Cross Validate --")
    cv_results = cross_validate(estimator, x, y, cv=cv, return_train_score=True, n_jobs=3)
    for key in cv_results.keys():
        st.write(key, cv_results[key])

def model_LWPLSR(xcal_csv, ycal_csv, sep, hdr):
    import julia
    from julia import Jchemo
    from pinard import utils
    from pinard.model_selection import train_test_split_idx
    # hdr var correspond to column header True or False in the CSV
    if hdr == 'yes':
        col = 0
    else:
        col = False
    # loading the csv
    x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
    # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
    train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=42)
    # Assign data to training and test sets
    X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
    st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))

    Jchemo.lwplsr(X_train, y_train, nlvdis=4, metric = eucl, k = 10)

Nicolas BARTHES's avatar
Nicolas BARTHES committed

Nicolas Barthes's avatar
Nicolas Barthes committed
def prediction(NIRS_csv, qsep, qhdr, model):
    # hdr var correspond to column header True or False in the CSV
    if qhdr == 'yes':
        col = 0
    else:
        col = False
    X_test = pd.read_csv(NIRS_csv, sep=qsep, index_col=col)
    Y_preds = model.predict(X_test)
    # Y_preds = X_test
    return Y_preds
Nicolas BARTHES's avatar
Nicolas BARTHES committed

Nicolas Barthes's avatar
Nicolas Barthes committed
def list_files(mypath, import_type):
    from os import listdir
    from os.path import isfile, join
    list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
    return list_files