import streamlit as st import numpy as np import pandas as pd from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler import csv from umap.umap_ import UMAP # local CSS ## load the custom CSS in the style folder def local_css(file_name): with open(file_name) as f: st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) local_css("style/style.css") ## try to automatically detect the field separator within the CSV def find_delimiter(filename): sniffer = csv.Sniffer() with open(filename) as fp: delimiter = sniffer.sniff(fp.read(5000)).delimiter return delimiter def find_col_index(filename): with open(filename) as fp: lines = pd.read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=str(find_delimiter(filename))) col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no' return col_index # detection of columns categories and scaling def col_cat(data_import): # detect numerical and categorical columns in the csv numerical_columns_list = [] categorical_columns_list = [] for i in data_import.columns: if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"): numerical_columns_list.append(data_import[i]) else: categorical_columns_list.append(data_import[i]) if len(numerical_columns_list) == 0: empty = [0 for x in range(len(data_import))] numerical_columns_list.append(empty) if len(categorical_columns_list) > 0: categorical_data = pd.concat(categorical_columns_list, axis=1) if len(categorical_columns_list) == 0: empty = ["" for x in range(len(data_import))] categorical_columns_list.append(empty) categorical_data = pd.DataFrame(categorical_columns_list).T categorical_data.columns = ['no categories'] # Create numerical data matrix from the numerical columns list and fill na with the mean of the column numerical_data = pd.concat(numerical_columns_list, axis=1) numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x))) # Scale the numerical data scaler = StandardScaler() scaled_values = scaler.fit_transform(numerical_data) return numerical_data, categorical_data, scaled_values # UMAP function for the Sample Selection module def umap_maker(data_import): numerical_data, categorical_data, scaled_values = col_cat(data_import) umap_func = UMAP(random_state=42, n_neighbors=20, n_components=4, min_dist=0.0,) umap_fit = umap_func.fit(scaled_values) umap_data = umap_fit.transform(scaled_values) umap_data = pd.DataFrame(umap_data, index=numerical_data.index) # Set UMAP column names with component number new_column_names = ["UMAP_" + str(i) for i in range(1, len(umap_data.columns) + 1)] # Format the output column_mapper = dict(zip(list(umap_data.columns), new_column_names)) umap_data = umap_data.rename(columns=column_mapper) output = pd.concat([data_import, umap_data], axis=1) return output, list(categorical_data.columns), new_column_names # PCA function for the Sample Selection module def pca_maker(data_import): numerical_data, categorical_data, scaled_values = col_cat(data_import) # Compute a 6 components PCA on scaled values pca = PCA(n_components=6) pca_fit = pca.fit(scaled_values) pca_data = pca_fit.transform(scaled_values) pca_data = pd.DataFrame(pca_data, index=numerical_data.index) # Set PCA column names with component number and explained variance % new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)] # Format the output column_mapper = dict(zip(list(pca_data.columns), new_column_names)) pca_data = pca_data.rename(columns=column_mapper) output = pd.concat([data_import, pca_data], axis=1) return output, list(categorical_data.columns), new_column_names # create model module with PINARD def model_PLSR(xcal_csv, ycal_csv, sep, hdr, rd_seed): from pinard import utils from pinard import preprocessing as pp from pinard.model_selection import train_test_split_idx from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.preprocessing import MinMaxScaler from sklearn.compose import TransformedTargetRegressor from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score from sklearn.cross_decomposition import PLSRegression np.random.seed(rd_seed) # hdr var correspond to column header True or False in the CSV if hdr == 'yes': col = 0 else: col = False # loading the csv x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col) # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed) # Assign data to training and test sets X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index] st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape)) # Declare preprocessing pipeline svgolay = [ ('_sg1',pp.SavitzkyGolay()), ('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing ] preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data ('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter ('derivate', pp.Derivate()), # Calculate the first derivative of the data ('SVG', FeatureUnion(svgolay)) # Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())]) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing ] # Declare complete pipeline pipeline = Pipeline([ ('scaler', MinMaxScaler()), # scaling the data ('preprocessing', FeatureUnion(preprocessing)), # preprocessing ('PLS', PLSRegression()) # regressor ]) # Estimator including y values scaling estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler()) # Training trained = estimator.fit(X_train, y_train) # fit scores st.write("fit scores / R²: " + str(estimator.score(X_test,y_test))) # Predictions on test set Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds))) st.write("MSE: " + str(mean_squared_error(y_test, Y_preds))) st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds))) # Cross-Validate the model CV_model(estimator, X_train, y_train, 3) return (trained) # Cross-Validation of the model def CV_model(estimator, x, y, cv): from sklearn.model_selection import cross_val_score, cross_val_predict, cross_validate from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score st.write('Cross-Validation of this model') st.write("CV_scores", cross_val_score(estimator, x, y, cv=cv)) st.write("-- CV predict --") Y_preds = cross_val_predict(estimator, x, y, cv=3) st.write("MAE", mean_absolute_error(y, Y_preds)) st.write("MSE", mean_squared_error(y, Y_preds)) st.write("MAPE", mean_absolute_percentage_error(y, Y_preds)) st.write("R²", r2_score(y, Y_preds)) st.write("-- Cross Validate --") cv_results = cross_validate(estimator, x, y, cv=cv, return_train_score=True, n_jobs=3) for key in cv_results.keys(): st.write(key, cv_results[key]) def model_LWPLSR(xcal_csv, ycal_csv, sep, hdr): import julia from julia import Jchemo from pinard import utils from pinard.model_selection import train_test_split_idx # hdr var correspond to column header True or False in the CSV if hdr == 'yes': col = 0 else: col = False # loading the csv x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col) # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=42) # Assign data to training and test sets X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index] st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape)) Jchemo.lwplsr(X_train, y_train, nlvdis=4, metric = eucl, k = 10) # predict module def prediction(NIRS_csv, qsep, qhdr, model): # hdr var correspond to column header True or False in the CSV if qhdr == 'yes': col = 0 else: col = False X_test = pd.read_csv(NIRS_csv, sep=qsep, index_col=col) Y_preds = model.predict(X_test) # Y_preds = X_test return Y_preds def list_files(mypath, import_type): from os import listdir from os.path import isfile, join list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')] return list_files