Newer
Older
st.set_page_config(page_title = "NIRS Utils", page_icon = ":goat:", layout = "wide")
from Modules import *
from Class_Mod.DATA_HANDLING import *
add_sidebar(pages_folder)
local_css(css_file / "style_model.css")#load specific model page css
hash_ = ''
def p_hash(add):
global hash_
hash_ = hash_data(hash_+str(add))
return hash_
# Initialize the variable in session state if it doesn't exist for st.cache_data
if 'counter' not in st.session_state:
st.session_state.counter = 0
# #################################### Methods ##############################################
supp = []
# Walk through the directory
for root, dirs, files in os.walk('Report/', topdown=False):
for file in files:
if file != 'logo_cefe.png' and not any(file.endswith(ext) for ext in keep):
class lw:
def __init__(self, Reg_json, pred):
self.model_ = Reg_json['model']
self.best_hyperparams_ = Reg_json['best_lwplsr_params']
self.pred_data_ = [pd.json_normalize(Reg_json[i]) for i in pred]
################ clean the results dir #############
delete_files(keep = ['.py', '.pyc','.bib'])
dirpath = Path('Report/out/model')
if not dirpath.exists():
os.mkdir(path = dirpath)
# ####################################### page preamble #######################################
st.title("Calibration Model Development") # page title
st.markdown("Create a predictive model, then use it for predicting your target variable (chemical data) from NIRS spectra")
M0, M00 = st.columns([1, .4])
M0.image("./images/model_creation.png", use_column_width = True) # graphical abstract
################################################################# Begin : I- Data loading and preparation ######################################
files_format = ['csv', 'dx'] # Supported files format
file = M00.radio('Select files format:', options = files_format,horizontal = True) # Select a file format
spectra = pd.DataFrame() # preallocate the spectral data block
y = pd.DataFrame() # preallocate the target(s) data block
match file:
# load csv file
case 'csv':
with M00:
# Load X-block data
xcal_csv = st.file_uploader("Select NIRS Data", type = "csv", help = " :mushroom: select a csv matrix with samples as rows and lambdas as columns")
if xcal_csv:
sepx = st.radio("Select separator (X file) - _detected_: " + str(find_delimiter('data/'+xcal_csv.name)),
options = [";", ","], index = [";", ","].index(str(find_delimiter('data/'+xcal_csv.name))), key = 0,horizontal = True)
hdrx = st.radio("samples name (X file)? - _detected_: " + str(find_col_index('data/'+xcal_csv.name)),
options = ["no", "yes"], index = ["no", "yes"].index(str(find_col_index('data/'+xcal_csv.name))), key = 1,horizontal = True)
st.info('Info: Insert your spectral data file above!')
# Load Y-block data
ycal_csv = st.file_uploader("Select corresponding Chemical Data", type = "csv", help = " :mushroom: select a csv matrix with samples as rows and chemical values as a column")
if ycal_csv:
sepy = st.radio("Select separator (Y file) - _detected_: " + str(find_delimiter('data/'+ycal_csv.name)),
options = [";", ","], index = [";", ","].index(str(find_delimiter('data/'+ycal_csv.name))), key = 2, horizontal = True)
hdry = st.radio("samples name (Y file)? - _detected_: " + str(find_col_index('data/'+ycal_csv.name)),
options = ["no", "yes"], index = ["no", "yes"].index(str(find_col_index('data/'+ycal_csv.name))), key = 3, horizontal = True)
match hdry:
case "yes":
col = 0
case "no":
col = False
st.info('Info: Insert your target data file above!')
# AFTER LOADING BOTH X AND Y FILES
if xcal_csv and ycal_csv:
# create a str instance for storing the hash of both x and y data
from io import StringIO
for i in ["xcal_csv", "ycal_csv"]:
stringio = StringIO(eval(f'{i}.getvalue().decode("utf-8")'))
p_hash([xy_str + str(xcal_csv.name) + str(ycal_csv.name), hdrx, sepx, hdry, sepy])
# p_hash(add = )
@st.cache_data
def csv_loader(change):
file_name = str(xcal_csv.name) +' and '+ str(ycal_csv.name)
xfile = pd.read_csv(xcal_csv, decimal = '.', sep = sepx, index_col = col, header = 0)
yfile = pd.read_csv(ycal_csv, decimal = '.', sep = sepy, index_col = col)
return xfile, yfile, file_name
xfile, yfile, file_name = csv_loader(change = hash_)
if yfile.shape[1]>0 and xfile.shape[1]>0 :
# prepare x data
try:
spectra, meta_data = col_cat(xfile)
except:
st.error('Error: The format of the X-file does not correspond to the expected dialect settings. To read the file correctly, please adjust the separator parameters.')
spectra = pd.DataFrame(spectra).astype(float)
# prepare y data
try:
chem_data, idx = col_cat(yfile)
except:
st.error('Error: The format of the Y-file does not correspond to the expected dialect settings. To read the file correctly, please adjust the separator parameters.')
if 'chem_data' in globals():
if chem_data.shape[1]>1:
yname = M00.selectbox('Select a target', options = ['']+chem_data.columns.tolist(), format_func = lambda x: x if x else "<Select>")
if yname:
y = chem_data.loc[:, yname]
else:
M00.info('Info: Select the target analyte from the drop down list!')
if not y.empty:
if spectra.shape[0] != y.shape[0]:
st.error('Error: X and Y have different sample size')
y = pd.DataFrame
spectra = pd.DataFrame
st.error('Error: The data has not been loaded successfully, please consider tuning the dialect settings!')
# Load .dx file
case 'dx':
with M00:
data_file = st.file_uploader("Select Data", type = ".dx", help = " :mushroom: select a dx file")
if data_file:
file_name = str(data_file.name)
## creating the temp file
with NamedTemporaryFile(delete = False, suffix = ".dx") as tmp:
tmp.write(data_file.read())
tmp_path = tmp.name
with open(tmp.name, 'r') as dd:
dxdata = dd.read()
p_hash(str(dxdata)+str(data_file.name))
## load and parse the temp dx file
@st.cache_data
def dx_loader(change):
chem_data, spectra, meta_data, meta_data_st = read_dx(file = tmp_path)
os.unlink(tmp_path)
return chem_data, spectra, meta_data, meta_data_st
chem_data, spectra, meta_data, meta_data_st = dx_loader(change = hash_)
if not spectra.empty:
st.success("Info: The data have been loaded successfully", icon = "✅")
if chem_data.shape[1]>0:
yname = st.selectbox('Select the target analyte', options = ['']+chem_data.columns.tolist(), format_func = lambda x: x if x else "<Select>" )
if yname:
measured = chem_data.loc[:, yname] > 0
y = chem_data.loc[:, yname].loc[measured]
spectra = spectra.loc[measured]
else:
st.info('Info: Please select the target analyte from the dropdown list!')
st.warning('Warning: your file includes no target variables to model !', icon = "⚠️")
################################################### END : I- Data loading and preparation ####################################################
################################################### BEGIN : visualize and split the data ####################################################
if not spectra.empty and not y.empty:
def visualize(change):
colnames = spectra.columns
else:
colnames = np.arange(spectra.shape[1])
# Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing
train_index, test_index = train_test_split_idx(spectra, y = y, method = "kennard_stone", metric = "correlation", test_size = 0.25, random_state = 42)
# Assign data to training and test sets
X_train, y_train = pd.DataFrame(spectra.iloc[train_index,:]), y.iloc[train_index]
X_test, y_test = pd.DataFrame(spectra.iloc[test_index,:]), y.iloc[test_index]
#### insight on loaded data
# M0, M000 = st.columns([1, .4])
fig1, ax1 = plt.subplots( figsize = (12, 3))
spectra.T.plot(legend = False, ax = ax1, linestyle = '-', linewidth = 0.6)
ax1.set_ylabel('Signal intensity')
ax1.margins(0)
plt.tight_layout()
fig2, ax2 = plt.subplots(figsize = (12,3))
sns.histplot(y, color = "deeppink", kde = True, label = "y", ax = ax2, fill = True)
sns.histplot(y_train, color = "blue", kde = True, label = "y (train)", ax = ax2, fill = True)
sns.histplot(y_test, color = "green", kde = True, label = "y (test)", ax = ax2, fill = True)
ax2.set_xlabel('y')
plt.legend()
plt.tight_layout()
stats = pd.DataFrame([desc_stats(y_train), desc_stats(y_test), desc_stats(y)], index =['train', 'test', 'total'] ).round(2)
return X_train, X_test, y_train, y_test, colnames, train_index, test_index, stats, fig1, fig2
X_train, X_test, y_train, y_test, colnames, train_index, test_index, stats, spectra_plot, target_plot = visualize(change = hash_)

DIANE
committed
M0, M000 = st.columns([1, .4])
st.pyplot(spectra_plot) ######## Loaded graph
st.pyplot(target_plot)
with M000:
st.write('Loaded data summary')
st.write(stats)
################################################### END : visualize and split the data #######################################################
# if 'model_type' not in st.session_state:
# st.cache_data.model_type = ''
# ################################################### BEGIN : Create Model ####################################################
model_type = None # initialize the selected regression algorithm

DIANE
committed
# select type of supervised modelling problem
modes = ['regression', 'classification']
model_type = M20.selectbox("Choose the regression algorithm", options = reg_algo, key = "model_type", format_func = lambda x: x if x else "<Select>")
reg_algo = ["", "PLS", "LW-PLS", "TPE-iPLS", 'LDA']
model_type = M20.selectbox("Choose the classification algorithm", options = reg_algo, key = 12, format_func = lambda x: x if x else "<Select>")
# if model_type != st.session_state.model_type:
# st.session_state.model_type = model_type
# increment()

DIANE
committed
# Training set preparation for cross-validation(CV)
nb_folds = 3
# Model creation-M20 columns
with M20:
def RequestingModelCreation(change):
# spectra_plot.savefig("./Report/figures/spectra_plot.png")
# target_plot.savefig("./Report/figures/histogram.png")
# st.session_state['hash_Reg'] = str(np.random.randint(2000000000))
folds = KF_CV.CV(X_train, y_train, nb_folds)# split train data into nb_folds for cross_validation
Reg = Plsr(train = [X_train, y_train], test = [X_test, y_test], n_iter = 10, cv = nb_folds)
# reg_model = Reg.model_
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
rega = Reg.selected_features_
case 'LW-PLS':
# export data to csv for Julia train/test
global x_train_np, y_train_np, x_test_np, y_test_np
data_to_work_with = ['x_train_np', 'y_train_np', 'x_test_np', 'y_test_np']
x_train_np, y_train_np, x_test_np, y_test_np = X_train.to_numpy(), y_train.to_numpy(), X_test.to_numpy(), y_test.to_numpy()
# Cross-Validation calculation
d = {}
for i in range(nb_folds):
d["xtr_fold{0}".format(i+1)], d["ytr_fold{0}".format(i+1)], d["xte_fold{0}".format(i+1)], d["yte_fold{0}".format(i+1)] = np.delete(x_train_np, folds[list(folds)[i]], axis=0), np.delete(y_train_np, folds[list(folds)[i]], axis=0), x_train_np[folds[list(folds)[i]]], y_train_np[folds[list(folds)[i]]]
data_to_work_with.append("xtr_fold{0}".format(i+1))
data_to_work_with.append("ytr_fold{0}".format(i+1))
data_to_work_with.append("xte_fold{0}".format(i+1))
data_to_work_with.append("yte_fold{0}".format(i+1))
# check best pre-treatment with a global PLSR model
preReg = Plsr(train = [X_train, y_train], test = [X_test, y_test], n_iter=20)
temp_path = Path('temp/')
with open(temp_path / "lwplsr_preTreatments.json", "w+") as outfile:
json.dump(preReg.best_hyperparams_, outfile)
# export Xtrain, Xtest, Ytrain, Ytest and all CV folds to temp folder as csv files
for i in data_to_work_with:
if 'fold' in i:
j = d[i]
else:
j = globals()[i]
# st.write(j)
np.savetxt(temp_path / str(i + ".csv"), j, delimiter=",")
# run Julia Jchemo as subprocess
import subprocess
subprocess_path = Path("Class_Mod/")
subprocess.run([f"{sys.executable}", subprocess_path / "LWPLSR_Call.py"])
# retrieve json results from Julia JChemo
try:
with open(temp_path / "lwplsr_outputs.json", "r") as outfile:
Reg_json = json.load(outfile)
# delete csv files
for i in data_to_work_with: os.unlink(temp_path / str(i + ".csv"))
# delete json file after import
os.unlink(temp_path / "lwplsr_outputs.json")
os.unlink(temp_path / "lwplsr_preTreatments.json")
# format result data into Reg object
pred = ['pred_data_train', 'pred_data_test']### keys of the dict
for i in range(nb_folds):
pred.append("CV" + str(i+1)) ### add cv folds keys to pred
# global Reg
# Reg = type('obj', (object,), {'model_' : Reg_json['model'], 'best_hyperparams_' : Reg_json['best_lwplsr_params'],
# 'pred_data_' : [pd.json_normalize(Reg_json[i]) for i in pred]})
# global Reg
Reg = lw(Reg_json = Reg_json, pred = pred)
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
Reg.CV_results_ = pd.DataFrame()
Reg.cv_data_ = {'YpredCV' : {}, 'idxCV' : {}}
# set indexes to Reg.pred_data (train, test, folds idx)
for i in range(len(pred)):
Reg.pred_data_[i] = Reg.pred_data_[i].T.reset_index().drop(columns = ['index'])
if i == 0: # data_train
# Reg.pred_data_[i] = np.array(Reg.pred_data_[i])
Reg.pred_data_[i].index = list(y_train.index)
Reg.pred_data_[i] = Reg.pred_data_[i].iloc[:,0]
elif i == 1: # data_test
# Reg.pred_data_[i] = np.array(Reg.pred_data_[i])
Reg.pred_data_[i].index = list(y_test.index)
Reg.pred_data_[i] = Reg.pred_data_[i].iloc[:,0]
else:
# CVi
Reg.pred_data_[i].index = folds[list(folds)[i-2]]
# Reg.CV_results_ = pd.concat([Reg.CV_results_, Reg.pred_data_[i]])
Reg.cv_data_['YpredCV']['Fold' + str(i-1)] = np.array(Reg.pred_data_[i]).reshape(-1)
Reg.cv_data_['idxCV']['Fold' + str(i-1)] = np.array(folds[list(folds)[i-2]]).reshape(-1)
Reg.CV_results_= KF_CV.metrics_cv(y = y_train, ypcv = Reg.cv_data_['YpredCV'], folds = folds)[1]
#### cross validation results print
Reg.best_hyperparams_print = Reg.best_hyperparams_
## plots
Reg.cv_data_ = KF_CV().meas_pred_eq(y = np.array(y_train), ypcv = Reg.cv_data_['YpredCV'], folds = folds)
Reg.pretreated_spectra_ = preReg.pretreated_spectra_
Reg.best_hyperparams_print = {**preReg.best_hyperparams_, **Reg.best_hyperparams_}
Reg.best_hyperparams_ = {**preReg.best_hyperparams_, **Reg.best_hyperparams_}
Reg.__hash__ = hash_data(Reg.best_hyperparams_print)
except FileNotFoundError as e:
Reg = None
for i in data_to_work_with: os.unlink(temp_path / str(i + ".csv"))
case 'TPE-iPLS':
Reg = TpeIpls(train = [X_train, y_train], test=[X_test, y_test], n_intervall = s, n_iter=it, cv = nb_folds)
global intervalls, intervalls_with_cols
intervalls = Reg.selected_features_.T
intervalls_with_cols = Reg.selected_features_.T
for i in range(intervalls.shape[0]):
for j in range(intervalls.shape[1]):
intervalls_with_cols.iloc[i,j] = spectra.columns[intervalls.iloc[i,j]]
rega = Reg.selected_features_
st.session_state.intervalls = Reg.selected_features_.T
st.session_state.intervalls_with_cols = intervalls_with_cols
return Reg
if model_type:
info = st.info('Info: The model is being created. This may take a few minutes.')
if model_type == 'TPE-iPLS':# if model type is ipls then ask for the number of iterations and intervalls
s = st.number_input(label = 'Enter the maximum number of intervals', min_value = 1, max_value = 6)
it = st.number_input(label = 'Enter the number of iterations', min_value = 2, max_value = 500, value = 2)
else:
s, it = None, None
p_hash(str(s)+str(it))
remodel_button = st.button('re-model the data', key=4, help=None, type="primary", use_container_width=True, on_click=increment)
p_hash(st.session_state.counter)
Reg = RequestingModelCreation(change = hash_)
reg_model = Reg.model_
hash_ = joblib.hash(Reg)
st.info('Info: Choose a modelling algorithm from the dropdown list!')
info.empty()
if Reg:
st.success('Success! Your model has been created and is ready to use.')
else:
st.error("Error: Model creation failed. Please try again.")
if model_type:
if model_type == 'TPE-iPLS':
if ('intervalls' and 'intervalls_with_cols') in st.session_state:
intervalls = st.session_state.intervalls
intervalls_with_cols = st.session_state.intervalls_with_cols
# remodel_button = st.button('re-model the data', key=4, help=None, type="primary", use_container_width=True)
# if remodel_button:# remodel feature for re-tuning the model
# increment()
# fitted values and predicted values
M1, M2 = st.columns([2 ,4])
with M1:
# Show and export the preprocessing methods
st.write('-- Spectral preprocessing info --')
st.write(Reg.best_hyperparams_print)
def preprocessings(change):
with open('Report/out/Preprocessing.json', "w") as outfile:
json.dump(Reg.best_hyperparams_, outfile)
preprocessings(change=hash_)
# Show the model performance table
st.write("-- Model performance --")
model_per = pd.DataFrame(metrics(c = [y_train, yc], t = [y_test, yt], method = 'regression').scores_)
model_per = pd.DataFrame(metrics(c = [y_train, yc], t = [y_test, yt], method = 'regression').scores_)
st.dataframe(model_per)
# M1.dataframe(model_per) # duplicate with line 371
def prep_important(change, model_type, model_hash):
fig, (ax1, ax2) = plt.subplots(2,1, figsize = (12, 4), sharex=True)
ax1.plot(colnames, np.mean(X_train, axis = 0), color = 'black', label = 'Average spectrum (Raw)')
ax2.plot(colnames, np.mean(Reg.pretreated_spectra_ , axis = 0), color = 'black', label = 'Average spectrum (Pretreated)')
ax2.set_xlabel('Wavelenghts')
plt.tight_layout()
eval(f'ax{i+1}').grid(color = 'grey', linestyle = ':', linewidth = 0.2)
eval(f'ax{i+1}').margins(x = 0)
eval(f'ax{i+1}').legend(loc = 'upper right')
eval(f'ax{i+1}').set_ylabel('Intensity')
a = change
for j in range(s):
if np.array(spectra.columns).dtype.kind in ['i','f']:
min, max = intervalls_with_cols.iloc[j,0], intervalls_with_cols.iloc[j,1]
eval(f'ax{i+1}').axvspan(min, max, color = '#00ff00', alpha = 0.5, lw = 0)
ax1.scatter(colnames[np.array(Reg.sel_ratio_.index)], np.mean(X_train, axis = 0).iloc[np.array(Reg.sel_ratio_.index)],
color = '#7ab0c7', label = 'Important variables')
ax2.scatter(colnames[Reg.sel_ratio_.index], np.mean(Reg.pretreated_spectra_, axis = 0)[np.array(Reg.sel_ratio_.index)],
color = '#7ab0c7', label = 'Important variables')
ax1.legend()
ax2.legend()
return fig
with M2:## Visualize raw,preprocessed spectra, and selected intervalls(in case of ipls)
st.write('-- Important Spectral regions used for model creation --')
st.table(intervalls_with_cols)
st.write('-- Visualization of the spectral regions used for model creation --')
imp_fig = prep_important(change = st.session_state.counter, model_type = model_type, model_hash = hash_)
st.pyplot(imp_fig)
# Display CV results
numbers_dict = {1: "One", 2: "Two",3: "Three",4: "Four",5: "Five",
6: "Six",7: "Seven",8: "Eight",9: "Nine",10: "Ten"}
st.header(f" {numbers_dict[nb_folds]}-Fold Cross-Validation results")
def cv_display(change):
fig1 = px.scatter(Reg.cv_data_[0], x = 'Measured', y = 'Predicted' , trendline = 'ols', color = 'Folds', symbol = 'Folds',
color_discrete_sequence=px.colors.qualitative.G10)
fig1.add_shape(type = 'line', x0 = .95 * min(Reg.cv_data_[0].loc[:,'Measured']), x1 = 1.05 * max(Reg.cv_data_[0].loc[:,'Measured']),
y0 = .95 * min(Reg.cv_data_[0].loc[:,'Measured']), y1 = 1.05 * max(Reg.cv_data_[0].loc[:,'Measured']), line = dict(color = 'black', dash = "dash"))
fig1.update_traces(marker_size = 7, showlegend=False)
fig0 = px.scatter(Reg.cv_data_[0], x ='Measured', y = 'Predicted' , trendline = 'ols', color = 'Folds', symbol = "Folds", facet_col = 'Folds',facet_col_wrap = 1,
color_discrete_sequence = px.colors.qualitative.G10, text = 'index', width = 800, height = 1000)
fig0.update_traces(marker_size = 8, showlegend = False)
return fig0, fig1
fig0, fig1 = cv_display(change= Reg.cv_data_)
cv1, cv2 = st.columns([2, 2])
with cv2:
cv_results = pd.DataFrame(Reg.CV_results_).round(4)# CV table
st.write('-- Cross-Validation Summary--')
st.write(cv_results.astype(str).style.map(lambda _: "background-color: #cecece;", subset = (cv_results.index.drop(['sd', 'mean', 'cv']), slice(None))))
st.write('-- Out-of-Fold Predictions Visualization (All in one) --')
st.plotly_chart(fig1, use_container_width = True)
with cv1:
st.write('-- Out-of-Fold Predictions Visualization (Separate plots) --')
st.plotly_chart(fig0, use_container_width=True)
################################################### BEGIN : Model Diagnosis ####################################################

DIANE
committed
st.header("III - Model Diagnosis", divider='blue')
if Reg:
# signal preprocessing results preparation for latex report
prep_para = Reg.best_hyperparams_.copy()
prep_para.pop('n_components')
for i in ['deriv','polyorder']:
if Reg.best_hyperparams_[i] == 0:
prep_para[i] = '0'
elif Reg.best_hyperparams_[i] == 1:
prep_para[i] = '1st'
elif Reg.best_hyperparams_[i] > 1:
prep_para[i] = f"{Reg.best_hyperparams_[i]}nd"
# reg plot and residuals plot
if model_type != reg_algo[2]:
measured_vs_predicted = reg_plot([y_train, y_test],[yc, yt], train_idx = train_index, test_idx = test_index)
residuals_plot = resid_plot([y_train, y_test], [yc, yt], train_idx = train_index, test_idx = test_index)
measured_vs_predicted = reg_plot([y_train, y_test],[yc, yt], train_idx = train_index, test_idx = test_index)
residuals_plot = resid_plot([y_train, y_test], [yc, yt], train_idx=train_index, test_idx=test_index)
with M7:
st.write('Predicted vs Measured values')
st.pyplot(measured_vs_predicted)
# regression_plot.savefig('./Report/figures/measured_vs_predicted.png')
with M8:
st.write('Residuals plot')
st.pyplot(residuals_plot)
# residual_plot.savefig('./Report/figures/residuals_plot.png')
################################################### END : Model Diagnosis #######################################################
################################################### BEGIN : Download results #######################################################
##########################################################################################################################################
##########################################################################################################################################
st.header('Download the analysis results')
st.write("**Note:** Please check the box only after you have finished processing your data and are satisfied with the results. Checking the box prematurely may slow down the app and could lead to crashes.")
decis = st.checkbox("Yes, I want to download the results")
if decis:
@st.cache_data(show_spinner =False)
def export_report(change):
match model_type:
case 'PLS':
latex_report = report.report('Predictive model development', file_name, stats, list(prep_para.values()), model_type, model_per, cv_results)
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
case 'LW-PLS':
latex_report = report.report('Predictive model development', file_name, stats,
list({key: Reg.best_hyperparams_[key] for key in ['deriv', 'normalization', 'polyorder', 'window_length'] if key in Reg.best_hyperparams_}.values()), model_type, model_per, cv_results)
case 'TPE-iPLS':
latex_report = report.report('Predictive model development', file_name, stats,
list({key: Reg.best_hyperparams_[key] for key in ['deriv', 'normalization', 'polyorder', 'window_length'] if key in Reg.best_hyperparams_}.values()), model_type, model_per, cv_results)
case _:
st.warning('Data processing has not been performed or finished yet!', icon = "⚠️")
@st.cache_data(show_spinner =False)
def preparing_results_for_downloading(change):
match file:
# load csv file
case 'csv':
xfile.to_csv('Report/out/dataset/'+ xcal_csv.name, sep = ';', encoding = 'utf-8', mode = 'a')
yfile.to_csv('Report/out/dataset/'+ ycal_csv.name, sep = ';', encoding = 'utf-8', mode = 'a')
case 'dx':
with open('Report/out/dataset/'+data_file.name, 'w') as dd:
dd.write(dxdata)
with open('./Report/out/model/'+ model_type + '.pkl','wb') as f:# export model
joblib.dump(reg_model, f)
figpath ='./Report/out/figures/'
spectra_plot.savefig(figpath + "spectra_plot.png")
target_plot.savefig(figpath + "histogram.png")
imp_fig.savefig(figpath + "variable_importance.png")
fig1.write_image(figpath + "meas_vs_pred_cv_all.png")
fig0.write_image(figpath + "meas_vs_pred_cv_onebyone.png")
measured_vs_predicted.savefig(figpath + 'measured_vs_predicted.png')
residuals_plot.savefig(figpath + 'residuals_plot.png')
with open('Report/out/Preprocessing.json', "w") as outfile:
json.dump(Reg.best_hyperparams_, outfile)
if model_type == 'TPE-iPLS': # export selected wavelengths
wlfilename = './Report/out/model/'+ model_type+'-selected_wavelengths.xlsx'
all = pd.concat([intervalls_with_cols.T, Reg.selected_features_], axis = 0, ignore_index=True).T
all.columns=['wl_from','wl_to','idx_from', 'idx_to']
all.to_excel(wlfilename)
export_report(change = hash_)
if Path("./Report/report.tex").exists():
report.generate_report(change = hash_)
if Path("./Report/report.pdf").exists():
shutil.move("./Report/report.pdf", "./Report/out/report.pdf")
return change
preparing_results_for_downloading(change = hash_)
import tempfile
@st.cache_data(show_spinner =False)
def tempdir(change):
with tempfile.TemporaryDirectory( prefix="results", dir="./Report") as temp_dir:# create a temp directory
tempdirname = os.path.split(temp_dir)[1]
if len(os.listdir('./Report/out/figures/'))>2:
shutil.make_archive(base_name="./Report/Results", format="zip", base_dir="out", root_dir = "./Report")# create a zip file
shutil.move("./Report/Results.zip", f"./Report/{tempdirname}/Results.zip")# put the inside the temp dir
with open(f"./Report/{tempdirname}/Results.zip", "rb") as f:
zip_data = f.read()
return tempdirname, zip_data
date_time = datetime.datetime.now().strftime('%y%m%d%H%M')
try :
tempdirname, zip_data = tempdir(change = hash_)
st.download_button(label = 'Download', data = zip_data, file_name = f'Nirs_Workflow_{date_time}_Reg_.zip', mime ="application/zip",
args = None, kwargs = None,type = "primary",use_container_width = True)
except:
pass
delete_files(keep = ['.py', '.pyc','.bib'])