import streamlit as st import numpy as np import pandas as pd from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler import csv # local CSS def local_css(file_name): with open(file_name) as f: st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) local_css("style/style.css") def find_delimiter(filename): sniffer = csv.Sniffer() with open(filename) as fp: delimiter = sniffer.sniff(fp.read(5000)).delimiter return delimiter # predict function def predict(): display = "Prediction with: " + str(NIRS_csv), str(psep), str(phdr) st.success(display) # create model function def model(xcal_csv, ycal_csv, sep, hdr, rd_seed): from pinard import utils from pinard import preprocessing as pp from pinard.model_selection import train_test_split_idx from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.preprocessing import MinMaxScaler from sklearn.compose import TransformedTargetRegressor from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score from sklearn.cross_decomposition import PLSRegression np.random.seed(rd_seed) if hdr == 'yes': col = 0 else: col = False x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col) # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed) # Assign data to training and test sets X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index] st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape)) # Declare preprocessing pipeline svgolay = [ ('_sg1',pp.SavitzkyGolay()), ('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing ] preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data ('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter ('derivate', pp.Derivate()), # Calculate the first derivative of the data ('SVG', FeatureUnion(svgolay)) # Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())]) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing ] pipeline = Pipeline([ ('scaler', MinMaxScaler()), # scaling the data ('preprocessing', FeatureUnion(preprocessing)), # preprocessing ('PLS', PLSRegression()) # regressor ]) # Estimator including y values scaling estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler()) # Training trained = estimator.fit(X_train, y_train) # fit scores st.write("fit scores / R²: " + str(estimator.score(X_test,y_test))) # Predictions on test set Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds))) st.write("MSE: " + str(mean_squared_error(y_test, Y_preds))) st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds))) return (trained) def pca_maker(data_import): # Declare complete pipeline numerical_columns_list = [] categorical_columns_list = [] for i in data_import.columns: if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"): numerical_columns_list.append(data_import[i]) else: categorical_columns_list.append(data_import[i]) if len(numerical_columns_list) == 0: empty = [0 for x in range(len(data_import))] numerical_columns_list.append(empty) if len(categorical_columns_list) > 0: categorical_data = pd.concat(categorical_columns_list, axis=1) if len(categorical_columns_list) == 0: empty = ["" for x in range(len(data_import))] categorical_columns_list.append(empty) # else: categorical_data = pd.DataFrame(categorical_columns_list).T categorical_data.columns = ['no categories'] numerical_data = pd.concat(numerical_columns_list, axis=1) numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x))) scaler = StandardScaler() scaled_values = scaler.fit_transform(numerical_data) pca = PCA(n_components=6) pca_fit = pca.fit(scaled_values) pca_data = pca_fit.transform(scaled_values) pca_data = pd.DataFrame(pca_data, index=numerical_data.index) new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)] column_mapper = dict(zip(list(pca_data.columns), new_column_names)) pca_data = pca_data.rename(columns=column_mapper) output = pd.concat([data_import, pca_data], axis=1) return output, list(categorical_data.columns), new_column_names