Skip to content
Snippets Groups Projects
application_functions.py 5.49 KiB
Newer Older
Nicolas BARTHES's avatar
Nicolas BARTHES committed
import streamlit as st
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import csv

# local CSS
def local_css(file_name):
    with open(file_name) as f:
        st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)

local_css("style/style.css")

def find_delimiter(filename):
    sniffer = csv.Sniffer()
    with open(filename) as fp:
        delimiter = sniffer.sniff(fp.read(5000)).delimiter
    return delimiter

# predict function
def predict():
    display = "Prediction with: " + str(NIRS_csv), str(psep), str(phdr)
    st.success(display)

# create model function
def model(xcal_csv, ycal_csv, sep, hdr, rd_seed):
    from pinard import utils
    from pinard import preprocessing as pp
    from pinard.model_selection import train_test_split_idx
    from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate
    from sklearn.pipeline import Pipeline, FeatureUnion
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.compose import TransformedTargetRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
    from sklearn.cross_decomposition import PLSRegression
    np.random.seed(rd_seed)
    if hdr == 'yes':
        col = 0
    else:
        col = False
    x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
    # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
    train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed)
    # Assign data to training and test sets
    X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
    st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))
    # Declare preprocessing pipeline
    svgolay = [   ('_sg1',pp.SavitzkyGolay()),
                  ('_sg2',pp.SavitzkyGolay())  # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
                  ]
    preprocessing = [   ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data
                        ('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter
                        ('derivate', pp.Derivate()), # Calculate the first derivative of the data
                        ('SVG', FeatureUnion(svgolay))
                        # Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())])  # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
                        ]
    pipeline = Pipeline([
        ('scaler', MinMaxScaler()), # scaling the data
        ('preprocessing', FeatureUnion(preprocessing)), # preprocessing
        ('PLS',  PLSRegression()) # regressor
    ])
    # Estimator including y values scaling
    estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler())
    # Training
    trained = estimator.fit(X_train, y_train)
    # fit scores
    st.write("fit scores / R²: " + str(estimator.score(X_test,y_test)))
    # Predictions on test set
    Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable
    st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds)))
    st.write("MSE: " + str(mean_squared_error(y_test, Y_preds)))
    st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds)))
    return (trained)


def pca_maker(data_import):
    # Declare complete pipeline
    numerical_columns_list = []
    categorical_columns_list = []

    for i in data_import.columns:
        if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
            numerical_columns_list.append(data_import[i])
        else:
            categorical_columns_list.append(data_import[i])
    if len(numerical_columns_list) == 0:
        empty = [0 for x in range(len(data_import))]
        numerical_columns_list.append(empty)
    if len(categorical_columns_list) > 0:
        categorical_data = pd.concat(categorical_columns_list, axis=1)
    if len(categorical_columns_list) == 0:
        empty = ["" for x in range(len(data_import))]
        categorical_columns_list.append(empty)
    # else:
        categorical_data = pd.DataFrame(categorical_columns_list).T
        categorical_data.columns = ['no categories']
    numerical_data = pd.concat(numerical_columns_list, axis=1)
    numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))

    scaler = StandardScaler()
    scaled_values = scaler.fit_transform(numerical_data)

    pca = PCA(n_components=6)
    pca_fit = pca.fit(scaled_values)
    pca_data = pca_fit.transform(scaled_values)
    pca_data = pd.DataFrame(pca_data, index=numerical_data.index)
    new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)]

    column_mapper = dict(zip(list(pca_data.columns), new_column_names))

    pca_data = pca_data.rename(columns=column_mapper)

    output = pd.concat([data_import, pca_data], axis=1)

    return output, list(categorical_data.columns), new_column_names