Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import streamlit as st
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import csv
# local CSS
def local_css(file_name):
with open(file_name) as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
local_css("style/style.css")
def find_delimiter(filename):
sniffer = csv.Sniffer()
with open(filename) as fp:
delimiter = sniffer.sniff(fp.read(5000)).delimiter
return delimiter
# predict function
def predict():
display = "Prediction with: " + str(NIRS_csv), str(psep), str(phdr)
st.success(display)
# create model function
def model(xcal_csv, ycal_csv, sep, hdr, rd_seed):
from pinard import utils
from pinard import preprocessing as pp
from pinard.model_selection import train_test_split_idx
from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict, cross_validate
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import MinMaxScaler
from sklearn.compose import TransformedTargetRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
from sklearn.cross_decomposition import PLSRegression
np.random.seed(rd_seed)
if hdr == 'yes':
col = 0
else:
col = False
x, y = utils.load_csv(xcal_csv, ycal_csv, autoremove_na=True, sep=sep, x_hdr=0, y_hdr=0, x_index_col=col, y_index_col=col)
# Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
train_index, test_index = train_test_split_idx(x, y=y, method="kennard_stone", metric="correlation", test_size=0.25, random_state=rd_seed)
# Assign data to training and test sets
X_train, y_train, X_test, y_test = x[train_index], y[train_index], x[test_index], y[test_index]
st.write("Size of train and test sets: train " + str(X_train.shape) + ' ' + str(y_train.shape) + ' / test ' + str(X_test.shape) + ' ' + str(y_test.shape))
# Declare preprocessing pipeline
svgolay = [ ('_sg1',pp.SavitzkyGolay()),
('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data
('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter
('derivate', pp.Derivate()), # Calculate the first derivative of the data
('SVG', FeatureUnion(svgolay))
# Pipeline([('_sg1',pp.SavitzkyGolay()),('_sg2',pp.SavitzkyGolay())]) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
]
pipeline = Pipeline([
('scaler', MinMaxScaler()), # scaling the data
('preprocessing', FeatureUnion(preprocessing)), # preprocessing
('PLS', PLSRegression()) # regressor
])
# Estimator including y values scaling
estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler())
# Training
trained = estimator.fit(X_train, y_train)
# fit scores
st.write("fit scores / R²: " + str(estimator.score(X_test,y_test)))
# Predictions on test set
Y_preds = estimator.predict(X_test) # make predictions on test data and assign to Y_preds variable
st.write("MAE: " + str(mean_absolute_error(y_test, Y_preds)))
st.write("MSE: " + str(mean_squared_error(y_test, Y_preds)))
st.write("MAPE: " + str(mean_absolute_percentage_error(y_test, Y_preds)))
return (trained)
def pca_maker(data_import):
# Declare complete pipeline
numerical_columns_list = []
categorical_columns_list = []
for i in data_import.columns:
if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"):
numerical_columns_list.append(data_import[i])
else:
categorical_columns_list.append(data_import[i])
if len(numerical_columns_list) == 0:
empty = [0 for x in range(len(data_import))]
numerical_columns_list.append(empty)
if len(categorical_columns_list) > 0:
categorical_data = pd.concat(categorical_columns_list, axis=1)
if len(categorical_columns_list) == 0:
empty = ["" for x in range(len(data_import))]
categorical_columns_list.append(empty)
# else:
categorical_data = pd.DataFrame(categorical_columns_list).T
categorical_data.columns = ['no categories']
numerical_data = pd.concat(numerical_columns_list, axis=1)
numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x)))
scaler = StandardScaler()
scaled_values = scaler.fit_transform(numerical_data)
pca = PCA(n_components=6)
pca_fit = pca.fit(scaled_values)
pca_data = pca_fit.transform(scaled_values)
pca_data = pd.DataFrame(pca_data, index=numerical_data.index)
new_column_names = ["PCA_" + str(i) + ' - ' + str(round(pca_fit.explained_variance_ratio_[i-1], 3) *100) + '%' for i in range(1, len(pca_data.columns) + 1)]
column_mapper = dict(zip(list(pca_data.columns), new_column_names))
pca_data = pca_data.rename(columns=column_mapper)
output = pd.concat([data_import, pca_data], axis=1)
return output, list(categorical_data.columns), new_column_names