from Packages import * st.set_page_config(page_title = "NIRS Utils", page_icon = ":goat:", layout = "wide") from Modules import * from Class_Mod.DATA_HANDLING import * add_header() add_sidebar(pages_folder) local_css(css_file / "style_model.css") @st.cache_data def delete(): repertoire_a_vider = Path('Report/figures') if os.path.exists(repertoire_a_vider): for fichier in os.listdir(repertoire_a_vider): chemin_fichier = repertoire_a_vider / fichier if os.path.isfile(chemin_fichier) or os.path.islink(chemin_fichier): os.unlink(chemin_fichier) elif os.path.isdir(chemin_fichier): os.rmdir(chemin_fichier) delete() # Initialize the variable in session state if it doesn't exist for st.cache_data if 'counter' not in st.session_state: st.session_state.counter = 0 if 'files_deletion' not in st.session_state: st.session_state.files_deletion = 1 def delete_dir(): if st.session_state.files_deletion == 1: st.session_state.files_deletion -= 1 elif st.session_state.files_deletion == 0: st.session_state.files_deletion += 1 def increment(): st.session_state.counter += 1 # #################################### Methods ############################################## class lw: def __init__(self, Reg_json, pred): self.model_ = Reg_json['model'] self.best_hyperparams_ = Reg_json['best_lwplsr_params'] self.pred_data_ = [pd.json_normalize(Reg_json[i]) for i in pred] # ####################################### page preamble ####################################### st.title("Calibration Model Development") # page title st.markdown("Create a predictive model, then use it for predicting your target variable (chemical data) from NIRS spectra") M0, M00 = st.columns([1, .4]) M0.image("./images/model_creation.png", use_column_width = True) # graphical abstract ################################################################# Begin : I- Data loading and preparation ###################################### files_format = ['csv', 'dx'] # Supported files format file = M00.radio('Select files format:', options = files_format,horizontal = True) # Select a file format spectra = pd.DataFrame() # preallocate the spectral data block y = pd.DataFrame() # preallocate the target(s) data block match file: # load csv file case 'csv': with M00: # Load X-block data xcal_csv = st.file_uploader("Select NIRS Data", type = "csv", help = " :mushroom: select a csv matrix with samples as rows and lambdas as columns") if xcal_csv: sepx = st.radio("Select separator (X file) - _detected_: " + str(find_delimiter('data/'+xcal_csv.name)), options = [";", ","], index = [";", ","].index(str(find_delimiter('data/'+xcal_csv.name))), key = 0,horizontal = True) hdrx = st.radio("samples name (X file)? - _detected_: " + str(find_col_index('data/'+xcal_csv.name)), options = ["no", "yes"], index = ["no", "yes"].index(str(find_col_index('data/'+xcal_csv.name))), key = 1,horizontal = True) match hdrx: case "yes": col = 0 case "no": col = False else: st.warning('Insert your spectral data file here!') # Load Y-block data ycal_csv = st.file_uploader("Select corresponding Chemical Data", type = "csv", help = " :mushroom: select a csv matrix with samples as rows and chemical values as a column") if ycal_csv: sepy = st.radio("Select separator (Y file) - _detected_: " + str(find_delimiter('data/'+ycal_csv.name)), options = [";", ","], index = [";", ","].index(str(find_delimiter('data/'+ycal_csv.name))), key = 2, horizontal = True) hdry = st.radio("samples name (Y file)? - _detected_: " + str(find_col_index('data/'+ycal_csv.name)), options = ["no", "yes"], index = ["no", "yes"].index(str(find_col_index('data/'+ycal_csv.name))), key = 3, horizontal = True) match hdry: case "yes": col = 0 case "no": col = False else: st.warning('Insert your target data file here!') # AFTER LOADING BOTH X AND Y FILES if xcal_csv and ycal_csv: # create a str instance for storing the hash of both x and y data xy_hash = '' from io import StringIO for i in ["xcal_csv", "ycal_csv"]: stringio = StringIO(eval(f'{i}.getvalue().decode("utf-8")')) xy_hash += str(hash_data(str(stringio))) @st.cache_data def csv_loader(change): file_name = str(xcal_csv.name) +' and '+ str(ycal_csv.name) xfile = pd.read_csv(xcal_csv, decimal = '.', sep = sepx, index_col = col, header = 0) yfile = pd.read_csv(ycal_csv, decimal = '.', sep = sepy, index_col = col) xfile.to_csv("./Report/datasets/"+xcal_csv.name,sep = ';', encoding = 'utf-8', mode = 'a') yfile.to_csv("./Report/datasets/"+ycal_csv.name,sep = ';', encoding = 'utf-8', mode = 'a') return xfile, yfile, file_name xfile, yfile, file_name = csv_loader(change = xy_hash) if yfile.shape[1]>0 and xfile.shape[1]>0 : # prepare x data try: spectra, meta_data = col_cat(xfile) except: st.error('Error: The format of the X-file does not correspond to the expected dialect settings. To read the file correctly, please adjust the separator parameters.') spectra = pd.DataFrame(spectra).astype(float) # prepare y data try: chem_data, idx = col_cat(yfile) except: st.error('Error: The format of the Y-file does not correspond to the expected dialect settings. To read the file correctly, please adjust the separator parameters.') if 'chem_data' in globals(): if chem_data.shape[1]>1: yname = M00.selectbox('Select target', options = chem_data.columns) y = chem_data.loc[:, yname] else: y = chem_data.iloc[:, 0] ### warning if spectra.shape[0] != y.shape[0]: st.error('Error: X and Y have different sample size') y = pd.DataFrame spectra = pd.DataFrame else: st.error('Error: The data has not been loaded successfully, please consider tuning the dialect settings!') # Load .dx file case 'dx': with M00: data_file = st.file_uploader("Select Data", type = ".dx", help = " :mushroom: select a dx file") if data_file: file_name = str(data_file.name) ## creating the temp file with NamedTemporaryFile(delete = False, suffix = ".dx") as tmp: tmp.write(data_file.read()) tmp_path = tmp.name with open(tmp.name, 'r') as dd: dxdata = dd.read() xy_hash = hash_data(str(dxdata)) with open('Report/datasets/'+data_file.name, 'w') as dd: dd.write(dxdata) ## load and parse the temp dx file @st.cache_data def dx_loader(change): chem_data, spectra, meta_data, meta_data_st = read_dx(file = tmp_path) os.unlink(tmp_path) return chem_data, spectra, meta_data, meta_data_st chem_data, spectra, meta_data, meta_data_st = dx_loader(change = dxdata) if not spectra.empty: st.success("The data have been loaded successfully", icon = "✅") if chem_data.shape[1]>0: yname = st.selectbox('Select target', options = chem_data.columns) measured = chem_data.loc[:, yname] > 0 y = chem_data.loc[:, yname].loc[measured] spectra = spectra.loc[measured] else: st.warning('Warning: your file includes no target variables to model !', icon = "⚠️") else : st.warning('Load your file here!') ################################################### END : I- Data loading and preparation #################################################### ################################################### BEGIN : visualize and split the data #################################################### st.header("I - Data visualization", divider = 'blue') if not spectra.empty and not y.empty: @st.cache_data def visualize(change): if np.array(spectra.columns).dtype.kind in ['i', 'f']: colnames = spectra.columns else: colnames = np.arange(spectra.shape[1]) # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing train_index, test_index = train_test_split_idx(spectra, y = y, method = "kennard_stone", metric = "correlation", test_size = 0.25, random_state = 42) # Assign data to training and test sets X_train, y_train = pd.DataFrame(spectra.iloc[train_index,:]), y.iloc[train_index] X_test, y_test = pd.DataFrame(spectra.iloc[test_index,:]), y.iloc[test_index] #### insight on loaded data # M0, M000 = st.columns([1, .4]) fig1, ax1 = plt.subplots( figsize = (12, 3)) spectra.T.plot(legend = False, ax = ax1, linestyle = '-', linewidth = 0.6) ax1.set_ylabel('Signal intensity') ax1.margins(0) plt.tight_layout() fig2, ax2 = plt.subplots(figsize = (12,3)) sns.histplot(y, color = "deeppink", kde = True, label = "y", ax = ax2, fill = True) sns.histplot(y_train, color = "blue", kde = True, label = "y (train)", ax = ax2, fill = True) sns.histplot(y_test, color = "green", kde = True, label = "y (test)", ax = ax2, fill = True) ax2.set_xlabel('y') plt.legend() plt.tight_layout() stats = pd.DataFrame([desc_stats(y_train), desc_stats(y_test), desc_stats(y)], index =['train', 'test', 'total'] ).round(2) fig1.savefig("./Report/figures/spectra_plot.png") fig2.savefig("./Report/figures/histogram.png") return X_train, X_test, y_train, y_test, colnames, train_index, test_index, stats, fig1, fig2 X_train, X_test, y_train, y_test, colnames, train_index, test_index, stats, fig1, fig2 = visualize(change = xy_hash) M0, M000 = st.columns([1, .4]) with M0: st.pyplot(fig1) ######## Loaded graph st.pyplot(fig2) with M000: st.write('Loaded data summary') st.write(stats) ################################################### END : visualize and split the data ####################################################### ################################################### BEGIN : Create Model #################################################### regression_algo = None # initialize the selected regression algorithm Reg = None # initialize the regression model object # intervalls_with_cols = pd.DataFrame() st.header("II - Model creation", divider = 'blue') if not (spectra.empty and y.empty): M10, M20, M30, M40, M50 = st.columns([1, 1, 1, 1, 1]) # select type of supervised modelling problem modes = ['regression', 'classification'] mode = M10.radio("Analysis Methods", options=modes) match mode: case "regression": reg_algo = ["", "PLS", "LW-PLS", "TPE-iPLS"] regression_algo = M20.selectbox("Choose the regression algorithm", options = reg_algo, key = "regression_algo", format_func = lambda x: x if x else "<Select>") case 'classification': reg_algo = ["", "PLS", "LW-PLS", "TPE-iPLS"] regression_algo = M20.selectbox("Choose the classification algorithm", options = reg_algo, key = 12, format_func = lambda x: x if x else "<Select>") # # Training set preparation for cross-validation(CV) nb_folds = 3 folds = KF_CV.CV(X_train, y_train, nb_folds)# split train data into nb_folds for cross_validation # Model creation-M20 columns with M20: @st.cache_data def RequestingModelCreation(xydata, change, regression_algo, s, it): match regression_algo: case 'PLS': Reg = Plsr(train = [X_train, y_train], test = [X_test, y_test], n_iter = 50, nfolds = nb_folds) reg_model = Reg.model_ rega = Reg.selected_features_ case 'LW-PLS': # export data to csv for Julia train/test global x_train_np, y_train_np, x_test_np, y_test_np data_to_work_with = ['x_train_np', 'y_train_np', 'x_test_np', 'y_test_np'] x_train_np, y_train_np, x_test_np, y_test_np = X_train.to_numpy(), y_train.to_numpy(), X_test.to_numpy(), y_test.to_numpy() # Cross-Validation calculation d = {} for i in range(nb_folds): d["xtr_fold{0}".format(i+1)], d["ytr_fold{0}".format(i+1)], d["xte_fold{0}".format(i+1)], d["yte_fold{0}".format(i+1)] = np.delete(x_train_np, folds[list(folds)[i]], axis=0), np.delete(y_train_np, folds[list(folds)[i]], axis=0), x_train_np[folds[list(folds)[i]]], y_train_np[folds[list(folds)[i]]] data_to_work_with.append("xtr_fold{0}".format(i+1)) data_to_work_with.append("ytr_fold{0}".format(i+1)) data_to_work_with.append("xte_fold{0}".format(i+1)) data_to_work_with.append("yte_fold{0}".format(i+1)) # check best pre-treatment with a global PLSR model preReg = Plsr(train = [X_train, y_train], test = [X_test, y_test], n_iter=20) temp_path = Path('temp/') with open(temp_path / "lwplsr_preTreatments.json", "w+") as outfile: json.dump(preReg.best_hyperparams_, outfile) # export Xtrain, Xtest, Ytrain, Ytest and all CV folds to temp folder as csv files for i in data_to_work_with: if 'fold' in i: j = d[i] else: j = globals()[i] # st.write(j) np.savetxt(temp_path / str(i + ".csv"), j, delimiter=",") # run Julia Jchemo as subprocess import subprocess subprocess_path = Path("Class_Mod/") subprocess.run([f"{sys.executable}", subprocess_path / "LWPLSR_Call.py"]) # retrieve json results from Julia JChemo try: with open(temp_path / "lwplsr_outputs.json", "r") as outfile: Reg_json = json.load(outfile) # delete csv files for i in data_to_work_with: os.unlink(temp_path / str(i + ".csv")) # delete json file after import os.unlink(temp_path / "lwplsr_outputs.json") os.unlink(temp_path / "lwplsr_preTreatments.json") # format result data into Reg object pred = ['pred_data_train', 'pred_data_test']### keys of the dict for i in range(nb_folds): pred.append("CV" + str(i+1)) ### add cv folds keys to pred # global Reg # Reg = type('obj', (object,), {'model_' : Reg_json['model'], 'best_hyperparams_' : Reg_json['best_lwplsr_params'], # 'pred_data_' : [pd.json_normalize(Reg_json[i]) for i in pred]}) # global Reg Reg = lw(Reg_json = Reg_json, pred = pred) reg_model = Reg.model_ Reg.CV_results_ = pd.DataFrame() Reg.cv_data_ = {'YpredCV' : {}, 'idxCV' : {}} # set indexes to Reg.pred_data (train, test, folds idx) for i in range(len(pred)): Reg.pred_data_[i] = Reg.pred_data_[i].T.reset_index().drop(columns = ['index']) if i == 0: # data_train # Reg.pred_data_[i] = np.array(Reg.pred_data_[i]) Reg.pred_data_[i].index = list(y_train.index) Reg.pred_data_[i] = Reg.pred_data_[i].iloc[:,0] elif i == 1: # data_test # Reg.pred_data_[i] = np.array(Reg.pred_data_[i]) Reg.pred_data_[i].index = list(y_test.index) Reg.pred_data_[i] = Reg.pred_data_[i].iloc[:,0] else: # CVi Reg.pred_data_[i].index = folds[list(folds)[i-2]] # Reg.CV_results_ = pd.concat([Reg.CV_results_, Reg.pred_data_[i]]) Reg.cv_data_['YpredCV']['Fold' + str(i-1)] = np.array(Reg.pred_data_[i]).reshape(-1) Reg.cv_data_['idxCV']['Fold' + str(i-1)] = np.array(folds[list(folds)[i-2]]).reshape(-1) Reg.CV_results_= KF_CV.metrics_cv(y = y_train, ypcv = Reg.cv_data_['YpredCV'], folds = folds)[1] #### cross validation results print Reg.best_hyperparams_print = Reg.best_hyperparams_ ## plots Reg.cv_data_ = KF_CV().meas_pred_eq(y = np.array(y_train), ypcv = Reg.cv_data_['YpredCV'], folds = folds) Reg.pretreated_spectra_ = preReg.pretreated_spectra_ Reg.best_hyperparams_print = {**preReg.best_hyperparams_, **Reg.best_hyperparams_} Reg.best_hyperparams_ = {**preReg.best_hyperparams_, **Reg.best_hyperparams_} Reg.__hash__ = hash_data(Reg.best_hyperparams_print) except FileNotFoundError as e: Reg = None for i in data_to_work_with: os.unlink(temp_path / str(i + ".csv")) case 'TPE-iPLS': Reg = TpeIpls(train = [X_train, y_train], test=[X_test, y_test], n_intervall = s, n_iter=it, nfolds = nb_folds) reg_model = Reg.model_ global intervalls, intervalls_with_cols intervalls = Reg.selected_features_.T intervalls_with_cols = Reg.selected_features_.T for i in range(intervalls.shape[0]): for j in range(intervalls.shape[1]): intervalls_with_cols.iloc[i,j] = spectra.columns[intervalls.iloc[i,j]] rega = Reg.selected_features_ st.session_state.intervalls = Reg.selected_features_.T st.session_state.intervalls_with_cols = intervalls_with_cols return Reg if regression_algo: info = st.info('The model is being created. This may take a few minutes.') if regression_algo == 'TPE-iPLS':# if model type is ipls then ask for the number of iterations and intervalls s = st.number_input(label = 'Enter the maximum number of intervals', min_value = 1, max_value = 6) it = st.number_input(label = 'Enter the number of iterations', min_value = 2, max_value = 500, value = 2) else: s, it = None, None st.write() Reg = RequestingModelCreation( xydata = hash_data(xy_hash), change = st.session_state.counter, regression_algo = regression_algo, s = s, it = it) else: st.warning('Choose a modelling algorithm from the dropdown list !') if regression_algo: info.empty() if Reg: st.success('Success! Your model has been created and is ready to use.') else: st.error("Error: Model creation failed. Please try again.") if regression_algo: if regression_algo == 'TPE-iPLS': if ('intervalls' and 'intervalls_with_cols') in st.session_state: intervalls = st.session_state.intervalls intervalls_with_cols = st.session_state.intervalls_with_cols if Reg: if st.button('re-model the data', key=4, help=None, type="primary", use_container_width=True):# remodel feature for re-tuning the model increment() # fitted values and predicted values yc = Reg.pred_data_[0] yt = Reg.pred_data_[1] M1, M2 = st.columns([2 ,4]) with M1: # Show and export the preprocessing methods st.write('-- Spectral preprocessing info --') st.write(Reg.best_hyperparams_print) with open("data/params/Preprocessing.json", "w") as outfile: json.dump(Reg.best_hyperparams_, outfile) # Show the model performance table st.write("-- Model performance --") if regression_algo != reg_algo[2]: model_per = pd.DataFrame(metrics(c = [y_train, yc], t = [y_test, yt], method = 'regression').scores_) else: model_per = pd.DataFrame(metrics(c = [y_train, yc], t = [y_test, yt], method = 'regression').scores_) st.dataframe(model_per) # M1.dataframe(model_per) # duplicate with line 371 @st.cache_data def prep_important(change, regression_algo, model_hash): fig, (ax1, ax2) = plt.subplots(2,1, figsize = (12, 4), sharex=True) ax1.plot(colnames, np.mean(X_train, axis = 0), color = 'black', label = 'Average spectrum (Raw)') # if regression_algo != reg_algo[2]: ax2.plot(colnames, np.mean(Reg.pretreated_spectra_ , axis = 0), color = 'black', label = 'Average spectrum (Pretreated)') ax2.set_xlabel('Wavelenghts') plt.tight_layout() for i in range(2): eval(f'ax{i+1}').grid(color = 'grey', linestyle = ':', linewidth = 0.2) eval(f'ax{i+1}').margins(x = 0) eval(f'ax{i+1}').legend(loc = 'upper right') eval(f'ax{i+1}').set_ylabel('Intensity') if regression_algo == 'TPE-iPLS': a = change for j in range(s): if np.array(spectra.columns).dtype.kind in ['i','f']: min, max = intervalls_with_cols.iloc[j,0], intervalls_with_cols.iloc[j,1] else: min, max = intervalls.iloc[j,0], intervalls.iloc[j,1] eval(f'ax{i+1}').axvspan(min, max, color = '#00ff00', alpha = 0.5, lw = 0) if regression_algo == 'PLS': ax1.scatter(colnames[np.array(Reg.sel_ratio_.index)], np.mean(X_train, axis = 0).iloc[np.array(Reg.sel_ratio_.index)], color = '#7ab0c7', label = 'Important variables') ax2.scatter(colnames[Reg.sel_ratio_.index], np.mean(Reg.pretreated_spectra_, axis = 0)[np.array(Reg.sel_ratio_.index)], color = '#7ab0c7', label = 'Important variables') ax1.legend() ax2.legend() return fig if Reg: fig = prep_important(change = st.session_state.counter, regression_algo = regression_algo, model_hash = str(Reg.__hash__)) with M2:## Visualize raw,preprocessed spectra, and selected intervalls(in case of ipls) if regression_algo =='TPE-iPLS' : st.write('-- Important Spectral regions used for model creation --') st.table(intervalls_with_cols) st.write('-- Visualization of the spectral regions used for model creation --') fig.savefig("./Report/figures/variable_importance.png") st.pyplot(fig) if Reg: # Display CV results numbers_dict = {1: "One", 2: "Two",3: "Three",4: "Four",5: "Five", 6: "Six",7: "Seven",8: "Eight",9: "Nine",10: "Ten"} st.header(f" {numbers_dict[nb_folds]}-Fold Cross-Validation results") cv1, cv2 = st.columns([2, 2]) with cv2: cv_results = pd.DataFrame(Reg.CV_results_).round(4)# CV table st.write('-- Cross-Validation Summary--') st.write(cv_results.astype(str).style.map(lambda _: "background-color: #cecece;", subset = (cv_results.index.drop(['sd', 'mean', 'cv']), slice(None)))) st.write('-- Out-of-Fold Predictions Visualization (All in one) --') fig1 = px.scatter(Reg.cv_data_[0], x = 'Measured', y = 'Predicted' , trendline = 'ols', color = 'Folds', symbol = 'Folds', color_discrete_sequence=px.colors.qualitative.G10) fig1.add_shape(type = 'line', x0 = .95 * min(Reg.cv_data_[0].loc[:,'Measured']), x1 = 1.05 * max(Reg.cv_data_[0].loc[:,'Measured']), y0 = .95 * min(Reg.cv_data_[0].loc[:,'Measured']), y1 = 1.05 * max(Reg.cv_data_[0].loc[:,'Measured']), line = dict(color = 'black', dash = "dash")) fig1.update_traces(marker_size = 7, showlegend=False) st.plotly_chart(fig1, use_container_width = True) fig0 = px.scatter(Reg.cv_data_[0], x ='Measured', y = 'Predicted' , trendline = 'ols', color = 'Folds', symbol = "Folds", facet_col = 'Folds',facet_col_wrap = 1, color_discrete_sequence = px.colors.qualitative.G10, text = 'index', width = 800, height = 1000) fig0.update_traces(marker_size = 8, showlegend = False) fig0.write_image("./Report/figures/meas_vs_pred_cv_onebyone.png") with cv1: st.write('-- Out-of-Fold Predictions Visualization (Separate plots) --') st.plotly_chart(fig0, use_container_width=True) fig1.write_image("./Report/figures/meas_vs_pred_cv_all.png") ################################################### BEGIN : Model Diagnosis #################################################### st.header("III - Model Diagnosis", divider='blue') if Reg: # signal preprocessing results preparation for latex report prep_para = Reg.best_hyperparams_ if regression_algo != reg_algo[2]: prep_para.pop('n_components') for i in ['deriv','polyorder']: if Reg.best_hyperparams_[i] == 0: prep_para[i] = '0' elif Reg.best_hyperparams_[i] == 1: prep_para[i] = '1st' elif Reg.best_hyperparams_[i] > 1: prep_para[i] = f"{Reg.best_hyperparams_[i]}nd" # reg plot and residuals plot if regression_algo != reg_algo[2]: regression_plot = reg_plot([y_train, y_test],[yc, yt], train_idx = train_index, test_idx = test_index) residual_plot = resid_plot([y_train, y_test], [yc, yt], train_idx = train_index, test_idx = test_index) else: regression_plot = reg_plot([y_train, y_test],[yc, yt], train_idx = train_index, test_idx = test_index) residual_plot = resid_plot([y_train, y_test], [yc, yt], train_idx=train_index, test_idx=test_index) M7, M8 = st.columns([2,2]) with M7: st.write('Predicted vs Measured values') st.pyplot(regression_plot) regression_plot.savefig('./Report/figures/measured_vs_predicted.png') with M8: st.write('Residuals plot') st.pyplot(residual_plot) residual_plot.savefig('./Report/figures/residuals_plot.png') ################################################### END : Model Diagnosis #######################################################