Newer
Older
import streamlit as st
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import csv
## load the custom CSS in the style folder
def local_css(file_name):
with open(file_name) as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
local_css("style/style.css")
## try to automatically detect the field separator within the CSV
def find_delimiter(filename):
sniffer = csv.Sniffer()
with open(filename) as fp:
delimiter = sniffer.sniff(fp.read(5000)).delimiter
return delimiter
def find_col_index(filename):
with open(filename) as fp:
lines = pd.read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=str(find_delimiter(filename)))
col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no'
return col_index
# detection of columns categories and scaling
def col_cat(data_import):
# detect numerical and categorical columns in the csv
numerical_columns_list = []
categorical_columns_list = []
for i in data_import.columns:
if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
numerical_columns_list.append(data_import[i])
else:
categorical_columns_list.append(data_import[i])
if len(numerical_columns_list) == 0:
empty = [0 for x in range(len(data_import))]
numerical_columns_list.append(empty)
if len(categorical_columns_list) > 0:
categorical_data = pd.concat(categorical_columns_list, axis=1)
if len(categorical_columns_list) == 0:
empty = ["" for x in range(len(data_import))]
categorical_columns_list.append(empty)
categorical_data = pd.DataFrame(categorical_columns_list).T
categorical_data.columns = ['no categories']
# Create numerical data matrix from the numerical columns list and fill na with the mean of the column
numerical_data = pd.concat(numerical_columns_list, axis=1)
numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
# Scale the numerical data
scaler = StandardScaler()
scaled_values = scaler.fit_transform(numerical_data)
return numerical_data, categorical_data, scaled_values
# UMAP function for the Sample Selection module
def umap_maker(data_import):
numerical_data, categorical_data, scaled_values = col_cat(data_import)
umap_func = UMAP(random_state=42, n_neighbors=20, n_components=4, min_dist=0.0,)
umap_fit = umap_func.fit(scaled_values)
umap_data = umap_fit.transform(scaled_values)
umap_data = pd.DataFrame(umap_data, index=numerical_data.index)
# Set UMAP column names with component number
new_column_names = ["UMAP_" + str(i) for i in range(1, len(umap_data.columns) + 1)]
# Format the output
column_mapper = dict(zip(list(umap_data.columns), new_column_names))
umap_data = umap_data.rename(columns=column_mapper)
output = pd.concat([data_import, umap_data], axis=1)
return output, list(categorical_data.columns), new_column_names
# PCA function for the Sample Selection module
def pca_maker(data_import):
numerical_data, categorical_data, scaled_values = col_cat(data_import)
# Compute a 6 components PCA on scaled values
pca = PCA(n_components=6)
pca_fit = pca.fit(scaled_values)
pca_data = pca_fit.transform(scaled_values)
pca_data = pd.DataFrame(pca_data, index=numerical_data.index)
# Set PCA column names with component number and explained variance %
new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)]
# Format the output
column_mapper = dict(zip(list(pca_data.columns), new_column_names))
pca_data = pca_data.rename(columns=column_mapper)
output = pd.concat([data_import, pca_data], axis=1)
return output, list(categorical_data.columns), new_column_names
# create model module with PINARD
def model_PLSR(xcal_csv, ycal_csv, sep, hdr, rd_seed):
from pinard import utils
from pinard import preprocessing as pp
from pinard.model_selection import train_test_split_idx
from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import MinMaxScaler
from sklearn.compose import TransformedTargetRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
from sklearn.cross_decomposition import PLSRegression
np.random.seed(rd_seed)
# hdr var correspond to column header True or False in the CSV
x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
# Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed)
# Assign data to training and test sets
X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))
# Declare preprocessing pipeline
svgolay = [ ('_sg1',pp.SavitzkyGolay()),
('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data
('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter
('derivate', pp.Derivate()), # Calculate the first derivative of the data
('SVG', FeatureUnion(svgolay))
# Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())]) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
# Declare complete pipeline
pipeline = Pipeline([
('scaler', MinMaxScaler()), # scaling the data
('preprocessing', FeatureUnion(preprocessing)), # preprocessing
('PLS', PLSRegression()) # regressor
])
# Estimator including y values scaling
estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler())
# Training
trained = estimator.fit(X_train, y_train)
# fit scores
st.write("fit scores / R²: " + str(estimator.score(X_test,y_test)))
# Predictions on test set
Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable
st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds)))
st.write("MSE: " + str(mean_squared_error(y_test, Y_preds)))
st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds)))
# Cross-Validate the model
CV_model(estimator, X_train, y_train, 3)
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Cross-Validation of the model
def CV_model(estimator, x, y, cv):
from sklearn.model_selection import cross_val_score, cross_val_predict, cross_validate
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
st.write('Cross-Validation of this model')
st.write("CV_scores", cross_val_score(estimator, x, y, cv=cv))
st.write("-- CV predict --")
Y_preds = cross_val_predict(estimator, x, y, cv=3)
st.write("MAE", mean_absolute_error(y, Y_preds))
st.write("MSE", mean_squared_error(y, Y_preds))
st.write("MAPE", mean_absolute_percentage_error(y, Y_preds))
st.write("R²", r2_score(y, Y_preds))
st.write("-- Cross Validate --")
cv_results = cross_validate(estimator, x, y, cv=cv, return_train_score=True, n_jobs=3)
for key in cv_results.keys():
st.write(key, cv_results[key])
def model_LWPLSR(xcal_csv, ycal_csv, sep, hdr):
import julia
from julia import Jchemo
from pinard import utils
from pinard.model_selection import train_test_split_idx
# hdr var correspond to column header True or False in the CSV
if hdr == 'yes':
col = 0
else:
col = False
# loading the csv
x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
# Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=42)
# Assign data to training and test sets
X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))
Jchemo.lwplsr(X_train, y_train, nlvdis=4, metric = eucl, k = 10)
def prediction(NIRS_csv, qsep, qhdr, model):
# hdr var correspond to column header True or False in the CSV
if qhdr == 'yes':
col = 0
else:
col = False
X_test = pd.read_csv(NIRS_csv, sep=qsep, index_col=col)
Y_preds = model.predict(X_test)
# Y_preds = X_test
return Y_preds
def list_files(mypath, import_type):
from os import listdir
from os.path import isfile, join
list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
return list_files