Newer
Older
import streamlit as st
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import csv
# local CSS
## load the custom CSS in the style folder
def local_css(file_name):
with open(file_name) as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
local_css("style/style.css")
## try to automatically detect the field separator within the CSV
def find_delimiter(filename):
sniffer = csv.Sniffer()
with open(filename) as fp:
delimiter = sniffer.sniff(fp.read(5000)).delimiter
return delimiter
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# PCA function for the Sample Selection module
def pca_maker(data_import):
# detect numerical and categorical columns in the csv
numerical_columns_list = []
categorical_columns_list = []
for i in data_import.columns:
if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
numerical_columns_list.append(data_import[i])
else:
categorical_columns_list.append(data_import[i])
if len(numerical_columns_list) == 0:
empty = [0 for x in range(len(data_import))]
numerical_columns_list.append(empty)
if len(categorical_columns_list) > 0:
categorical_data = pd.concat(categorical_columns_list, axis=1)
if len(categorical_columns_list) == 0:
empty = ["" for x in range(len(data_import))]
categorical_columns_list.append(empty)
categorical_data = pd.DataFrame(categorical_columns_list).T
categorical_data.columns = ['no categories']
# Create numerical data matrix from the numerical columns list and fill na with the mean of the column
numerical_data = pd.concat(numerical_columns_list, axis=1)
numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
# Scale the numerical data
scaler = StandardScaler()
scaled_values = scaler.fit_transform(numerical_data)
# Compute a 6 components PCA on scaled values
pca = PCA(n_components=6)
pca_fit = pca.fit(scaled_values)
pca_data = pca_fit.transform(scaled_values)
pca_data = pd.DataFrame(pca_data, index=numerical_data.index)
# Set PCA column names with component number and explained variance %
new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)]
# Format the output
column_mapper = dict(zip(list(pca_data.columns), new_column_names))
pca_data = pca_data.rename(columns=column_mapper)
output = pd.concat([data_import, pca_data], axis=1)
return output, list(categorical_data.columns), new_column_names
# create model module with PINARD
def model(xcal_csv, ycal_csv, sep, hdr, rd_seed):
from pinard import utils
from pinard import preprocessing as pp
from pinard.model_selection import train_test_split_idx
from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import MinMaxScaler
from sklearn.compose import TransformedTargetRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
from sklearn.cross_decomposition import PLSRegression
np.random.seed(rd_seed)
# hdr var correspond to column header True or False in the CSV
x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
# Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed)
# Assign data to training and test sets
X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))
# Declare preprocessing pipeline
svgolay = [ ('_sg1',pp.SavitzkyGolay()),
('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data
('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter
('derivate', pp.Derivate()), # Calculate the first derivative of the data
('SVG', FeatureUnion(svgolay))
# Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())]) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
# Declare complete pipeline
pipeline = Pipeline([
('scaler', MinMaxScaler()), # scaling the data
('preprocessing', FeatureUnion(preprocessing)), # preprocessing
('PLS', PLSRegression()) # regressor
])
# Estimator including y values scaling
estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler())
# Training
trained = estimator.fit(X_train, y_train)
# fit scores
st.write("fit scores / R²: " + str(estimator.score(X_test,y_test)))
# Predictions on test set
Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable
st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds)))
st.write("MSE: " + str(mean_squared_error(y_test, Y_preds)))
st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds)))
return (trained)
# predict module
def predict():
display = "Prediction with: " + str(NIRS_csv), str(psep), str(phdr)
st.success(display)