Newer
Older
import streamlit as st
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import csv
## load the custom CSS in the style folder
def local_css(file_name):
with open(file_name) as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
local_css("style/style.css")
## try to automatically detect the field separator within the CSV
def find_delimiter(filename):
sniffer = csv.Sniffer()
with open(filename) as fp:
delimiter = sniffer.sniff(fp.read(5000)).delimiter
return delimiter
# detection of columns categories and scaling
def col_cat(data_import):
# detect numerical and categorical columns in the csv
numerical_columns_list = []
categorical_columns_list = []
for i in data_import.columns:
if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
numerical_columns_list.append(data_import[i])
else:
categorical_columns_list.append(data_import[i])
if len(numerical_columns_list) == 0:
empty = [0 for x in range(len(data_import))]
numerical_columns_list.append(empty)
if len(categorical_columns_list) > 0:
categorical_data = pd.concat(categorical_columns_list, axis=1)
if len(categorical_columns_list) == 0:
empty = ["" for x in range(len(data_import))]
categorical_columns_list.append(empty)
categorical_data = pd.DataFrame(categorical_columns_list).T
categorical_data.columns = ['no categories']
# Create numerical data matrix from the numerical columns list and fill na with the mean of the column
numerical_data = pd.concat(numerical_columns_list, axis=1)
numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
# Scale the numerical data
scaler = StandardScaler()
scaled_values = scaler.fit_transform(numerical_data)
return numerical_data, categorical_data, scaled_values
# UMAP function for the Sample Selection module
def umap_maker(data_import):
numerical_data, categorical_data, scaled_values = col_cat(data_import)
umap_func = UMAP(random_state=42, n_neighbors=20, n_components=4, min_dist=0.0,)
umap_fit = umap_func.fit(scaled_values)
umap_data = umap_fit.transform(scaled_values)
umap_data = pd.DataFrame(umap_data, index=numerical_data.index)
# Set UMAP column names with component number
new_column_names = ["UMAP_" + str(i) for i in range(1, len(umap_data.columns) + 1)]
# Format the output
column_mapper = dict(zip(list(umap_data.columns), new_column_names))
umap_data = umap_data.rename(columns=column_mapper)
output = pd.concat([data_import, umap_data], axis=1)
return output, list(categorical_data.columns), new_column_names
# PCA function for the Sample Selection module
def pca_maker(data_import):
numerical_data, categorical_data, scaled_values = col_cat(data_import)
# Compute a 6 components PCA on scaled values
pca = PCA(n_components=6)
pca_fit = pca.fit(scaled_values)
pca_data = pca_fit.transform(scaled_values)
pca_data = pd.DataFrame(pca_data, index=numerical_data.index)
# Set PCA column names with component number and explained variance %
new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)]
# Format the output
column_mapper = dict(zip(list(pca_data.columns), new_column_names))
pca_data = pca_data.rename(columns=column_mapper)
output = pd.concat([data_import, pca_data], axis=1)
return output, list(categorical_data.columns), new_column_names
# create model module with PINARD
def model(xcal_csv, ycal_csv, sep, hdr, rd_seed):
from pinard import utils
from pinard import preprocessing as pp
from pinard.model_selection import train_test_split_idx
from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import MinMaxScaler
from sklearn.compose import TransformedTargetRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
from sklearn.cross_decomposition import PLSRegression
np.random.seed(rd_seed)
# hdr var correspond to column header True or False in the CSV
x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
# Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed)
# Assign data to training and test sets
X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))
# Declare preprocessing pipeline
svgolay = [ ('_sg1',pp.SavitzkyGolay()),
('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data
('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter
('derivate', pp.Derivate()), # Calculate the first derivative of the data
('SVG', FeatureUnion(svgolay))
# Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())]) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
# Declare complete pipeline
pipeline = Pipeline([
('scaler', MinMaxScaler()), # scaling the data
('preprocessing', FeatureUnion(preprocessing)), # preprocessing
('PLS', PLSRegression()) # regressor
])
# Estimator including y values scaling
estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler())
# Training
trained = estimator.fit(X_train, y_train)
# fit scores
st.write("fit scores / R²: " + str(estimator.score(X_test,y_test)))
# Predictions on test set
Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable
st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds)))
st.write("MSE: " + str(mean_squared_error(y_test, Y_preds)))
st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds)))
return (trained)
# predict module
def predict():
display = "Prediction with: " + str(NIRS_csv), str(psep), str(phdr)
st.success(display)