Newer
Older
from Packages import *
st.set_page_config(page_title="NIRS Utils", page_icon=":goat:", layout="wide")
from Modules import *
repertoire_a_vider = 'D:/Mouhcine/nirs_workflow/src/Report/figures'
if os.path.exists(repertoire_a_vider):
for fichier in os.listdir(repertoire_a_vider):
chemin_fichier = os.path.join(repertoire_a_vider, fichier)
if os.path.isfile(chemin_fichier) or os.path.islink(chemin_fichier):
os.unlink(chemin_fichier)
elif os.path.isdir(chemin_fichier):
shutil.rmtree(chemin_fichier)
# HTML pour le bandeau "CEFE - CNRS"
# bandeau_html = """
# <div style="width: 100%; background-color: #4682B4; padding: 10px; margin-bottom: 10px;">
# <h1 style="text-align: center; color: white;">CEFE - CNRS</h1>
# </div>
# """
# # Injecter le code HTML du bandeau
# st.markdown(bandeau_html, unsafe_allow_html=True)
add_header()
tcr=pd.DataFrame()
sam=pd.DataFrame()
sam1=pd.DataFrame()
# path = os.path.dirname(os.path.abspath(__file__)).replace('\\','/')
# css_file = path[:path.find('/pages')]+'/style'
# local_css(css_file +"/style_model.css")
st.session_state["interface"] = st.session_state.get('interface')
if st.session_state["interface"] == 'simple':
hide_pages("Predictions")
################################### I - Data Loading and Visualization ########################################
col2, col1 = st.columns([3, 1])
meta_data = pd.DataFrame
selected_samples = pd.DataFrame
# loader for datafile
data_file = col1.file_uploader("Load NIRS Data", type=["csv","dx"], help=" :mushroom: select a csv matrix with samples as rows and lambdas as columns", key=5)
if data_file:
# Retrieve the extension of the file
test = data_file.name[data_file.name.find('.'):]
if test== '.csv':
with col1:
# Select list for CSV delimiter
psep = st.radio("Select csv separator - _detected_: " + str(find_delimiter('data/'+data_file.name)), options=[";", ","], index=[";", ","].index(str(find_delimiter('data/'+data_file.name))), key=9)
phdr = st.radio("indexes column in csv? - _detected_: " + str(find_col_index('data/'+data_file.name)), options=["no", "yes"], index=["no", "yes"].index(str(find_col_index('data/'+data_file.name))), key=31)
if phdr == 'yes':
col = 0
else:
col = False

BARTHES Nicolas
committed
# spectra = col_cat(imp)[0]
# meta_data = col_cat(imp)[1]
st.success("The data have been loaded successfully", icon="✅")
elif test == '.dx':
# Create a temporary file to save the uploaded file
with NamedTemporaryFile(delete=False, suffix=".dx") as tmp:
# retrieve columns name and rows name of spectra
colnames = list(spectra.columns)
rownames = [str(i) for i in list(spectra.index)]
spectra.index = rownames
if test =='.dx':
if meta_data.loc[:,'xunits'][0] == '1/cm':
lab = 'Wavenumber (1/cm)'
else:
lab = 'Wavelength (nm)'
fig = plot_spectra(spectra, xunits = lab, yunits = meta_data.loc[:,'yunits'][0])
else:
fig = plot_spectra(spectra, xunits = 'Wavelength/Wavenumber', yunits = 'Signal intensity')
fig.savefig("./Report/figures/Spectra_Plot.png")
############################## Exploratory data analysis ###############################
st.header("II - Exploratory Data Analysis-Multivariable Data Analysis", divider='blue')
scores, loadings, pc = st.columns([2, 3, 0.5])
influence, hotelling, qexp = st.columns([2, 2, 1])
st.header('III - Selected samples for chemical analysis', divider='blue')
dim_red_methods=['', 'PCA','UMAP', 'NMF'] # List of dimensionality reduction algos
cluster_methods = ['', 'Kmeans','HDBSCAN', 'AP'] # List of clustering algos
dr_model = None # dimensionality reduction model
cl_model = None # clustering model
t = pd.DataFrame # scores
p = pd.DataFrame # loadings
labels = []
if not spectra.empty:
dim_red_method = pc.selectbox("Dimensionality reduction techniques: ", options = dim_red_methods, key = 37)
clus_method = pc.selectbox("Clustering techniques: ", options = cluster_methods, key = 38)
xc = standardize(spectra, center=True, scale=False)

BARTHES Nicolas
committed
if dim_red_method == dim_red_methods[1]:
dr_model = LinearPCA(xc, Ncomp=8)
elif dim_red_method == dim_red_methods[2]:

Nicolas Barthes
committed
filter = filter.insert(0, 'Nothing')
col = pc.selectbox('Supervised UMAP by:', options= filter, key=108)

Nicolas Barthes
committed
if col == 'Nothing':
supervised = None
else:
supervised = md_df_st_[col]
dr_model = Umap(numerical_data = MinMaxScale(spectra), cat_data = supervised)
if dr_model:
axis1 = pc.selectbox("x-axis", options = dr_model.scores_.columns, index=0)
axis2 = pc.selectbox("y-axis", options = dr_model.scores_.columns, index=1)
axis3 = pc.selectbox("z-axis", options = dr_model.scores_.columns, index=2)
t = pd.concat([dr_model.scores_.loc[:,axis1], dr_model.scores_.loc[:,axis2], dr_model.scores_.loc[:,axis3]], axis = 1)
cl_model = Sk_Kmeans(tcr, max_clusters = 25)
ncluster = scores.number_input(min_value=2, max_value=25, value=cl_model.suggested_n_clusters_, label = 'Select the desired number of clusters')
scores.write(f"Suggested n_clusters : {cl_model.suggested_n_clusters_}")
scores.plotly_chart(fig2,use_container_width=True)
img = pio.to_image(fig2, format="png")
with open("./Report/figures/Elbow.png", "wb") as f:
f.write(img)
data, labels, clu_centers = cl_model.fit_optimal(nclusters = ncluster)

Nicolas Barthes
committed
# all_labels, hdbscan_score, clu_centers = optimized_hdbscan.HDBSCAN_scores_
all_labels, clu_centers = optimized_hdbscan.HDBSCAN_scores_
labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in all_labels]
if clus_method == cluster_methods[2]:
#clustered = np.where(np.array(labels) != 'Non clustered')[0]
clustered = np.arange(tcr.shape[0])
non_clustered = np.where(np.array(labels) == 'Non clustered')[0]
else:
clustered = np.arange(tcr.shape[0])
non_clustered = None
new_tcr = tcr.iloc[clustered,:]
#################################################### III - Samples selection using the reduced data preentation ######
selec_strategy = ['center','random']
samples_df_chem = pd.DataFrame
selected_samples = []
selected_samples_idx = []
selection = scores.radio('Select samples selection strategy:',
if selection == selec_strategy[0]:
# list samples at clusters centers - Use sklearn.metrics.pairwise_distances_argmin if you want more than 1 sample per cluster
closest, _ = pairwise_distances_argmin_min(clu_centers, new_tcr)
selected_samples_idx = np.array(new_tcr.index)[list(closest)]
selected_samples_idx = selected_samples_idx.tolist()
#### Strategy 1
selection_number = scores.number_input('How many samples per cluster?',
min_value = 1, step=1, value = 3)
s = np.array(labels)[np.where(np.array(labels) !='Non clustered')[0]]
for i in np.unique(s):
km2 = KMeans(n_clusters = selection_number)
km2.fit(tcr.iloc[C,:])
clos, _ = pairwise_distances_argmin_min(km2.cluster_centers_, tcr.iloc[C,:])
selected_samples_idx.extend(tcr.iloc[C,:].iloc[list(clos)].index)
selected_samples_idx.extend(new_tcr.iloc[C,:].index.to_list())
# list indexes of selected samples for colored plot
if selected_samples_idx:
sam1 = pd.DataFrame({'name': spectra.index[clustered][selected_samples_idx],
'cluster':np.array(labels)[clustered][selected_samples_idx]},
sam1 = meta_data.iloc[clustered,:].iloc[selected_samples_idx,:]
sam1.insert(loc=0, column='index', value=selected_samples_idx)
sam1.insert(loc=1, column='cluster', value=np.array(labels)[selected_samples_idx])
sam1.index = np.arange(len(selected_samples_idx))+1
st.write(f' - The total number of samples: {tcr.shape[0]}.\n- The number of selected samples for chemical analysis: {sam1.shape[0]} - {round(sam1.shape[0]/tcr.shape[0]*100, 1)}%.')

Nicolas Barthes
committed
if clus_method == cluster_methods[2]:
unclus = st.checkbox("Include non clustered samples (for HDBSCAN clustering)", value=True)
if clus_method == cluster_methods[2]:
if selected_samples_idx:
if unclus:
if meta_data.empty:
sam2 = pd.DataFrame({'name': spectra.index[non_clustered],
'cluster':['Non clustered']*len(spectra.index[non_clustered])},
index = spectra.index[non_clustered])
else :
sam2 = meta_data.iloc[non_clustered,:]
sam2.insert(loc=0, column='index', value= spectra.index[non_clustered])
sam2.insert(loc=1, column='cluster', value=['Non clustered']*len(spectra.index[non_clustered]))
sam = pd.concat([sam1, sam2], axis = 0)
sam.index = np.arange(sam.shape[0])+1
st.write(f' The number of Non-clustered samples is {sam2.shape[0]} samples. Total selected samples: {sam1.shape[0] + sam2.shape[0]} - {round((sam1.shape[0] + sam2.shape[0]) / tcr.shape[0] * 100, 1)}%.')
################################ Plots visualization ############################################
st.write('Scores plot')
# scores plot with clustering
if list(labels) and meta_data.empty:
fig = px.scatter_3d(tcr, x=axis1, y=axis2, z = axis3, color = labels)
sns.scatterplot(data = tcr, x = axis1, y =axis2 , hue = labels, ax = ax1)
elif len(list(labels)) == 0 and not meta_data.empty:
col = st.selectbox('Color by:', options= filter)
if col == 0:
sns.scatterplot(data = tcr, x = axis2, y =axis3 , ax = ax2)
sns.scatterplot(data = tcr, x = axis1, y =axis3 , hue = list(map(str.lower,md_df_st_[col])), ax = ax3)
fig = px.scatter_3d(tcr, x=axis1, y=axis2, z = axis3, color = list(map(str.lower,md_df_st_[col])) )
sns.scatterplot(data = tcr, x = axis1, y =axis2 , hue = list(map(str.lower,md_df_st_[col])), ax = ax1)
sns.scatterplot(data = tcr, x = axis2, y =axis3 , hue = list(map(str.lower,md_df_st_[col])), ax = ax2)
sns.scatterplot(data = tcr, x = axis1, y =axis3 , hue = list(map(str.lower,md_df_st_[col])), ax = ax3)
# color with scores and metadata
elif len(list(labels)) > 0 and not meta_data.empty:
if clus_method in cluster_methods[1:]:
filter = ['None', clus_method]
col = st.selectbox('Color by:', options= filter)
if col == "None":
fig = px.scatter_3d(tcr, x=axis1, y=axis2, z = axis3)
elif col == clus_method:
fig = px.scatter_3d(tcr, x=axis1, y=axis2, z = axis3, color = labels)
fig = px.scatter_3d(tcr, x=axis1, y=axis2, z = axis3, color = list(map(str.lower,md_df_st_[col])))
sns.scatterplot(data = tcr, x = axis1, y =axis2 , hue = list(map(str.lower,md_df_st_[col])), ax = ax1)
sns.scatterplot(data = tcr, x = axis1, y =axis2 , hue = list(map(str.lower,md_df_st_[col])), ax = ax2)
sns.scatterplot(data = tcr, x = axis1, y =axis2 , hue = list(map(str.lower,md_df_st_[col])), ax = ax3)
fig = px.scatter_3d(tcr, x=axis1, y=axis2, z = axis3)
fig.update_traces(marker=dict(size=4))
if selected_samples_idx:
tt = tcr.iloc[selected_samples_idx,:]
fig.add_scatter3d(x = tt.loc[:,axis1], y = tt.loc[:,axis2],
z = tt.loc[:,axis3], mode ='markers', marker = dict(size = 5, color = 'black'),
st.plotly_chart(fig, use_container_width=True)
custom_color_palette = px.colors.qualitative.Plotly[:num_clusters]
color_discrete_sequence=custom_color_palette
# Créer et exporter le graphique Axe1-Axe2 en PNG
fig_axe1_axe2 = px.scatter(tcr, x=axis1, y=axis2, color=labels if list(labels) else None, color_discrete_sequence=custom_color_palette)
fig_axe1_axe2.update_layout(title='Axe1-Axe2')
fig_axe1_axe2.update_traces(marker=dict(size=4))
# Créer et exporter le graphique Axe1-Axe3 en PNG
fig_axe1_axe3 = px.scatter(tcr, x=axis1, y=axis3, color=labels if list(labels) else None, color_discrete_sequence=custom_color_palette)
fig_axe1_axe3.update_layout(title='Axe1-Axe3')
fig_axe1_axe3.update_traces(marker=dict(size=4))
# Créer et exporter le graphique Axe2-Axe3 en PNG
fig_axe2_axe3 = px.scatter(tcr, x=axis2, y=axis3, color=labels if list(labels) else None, color_discrete_sequence=custom_color_palette)
fig_axe2_axe3.update_layout(title='Axe2-Axe3')
fig_axe2_axe3.update_traces(marker=dict(size=4))
if dim_red_method == dim_red_methods[1] or dim_red_method == dim_red_methods[3]:
with loadings:
st.write('Loadings plot')
p = dr_model.loadings_
freq = pd.DataFrame(colnames, index=p.index)
if test =='.dx':
if meta_data.loc[:,'xunits'][0] == '1/cm':
freq.columns = ['Wavenumber (1/cm)']
pp = pd.concat([p, freq], axis=1)
#########################################
df1 = pp.melt(id_vars=freq.columns)
fig = px.line(df1, x=freq.columns, y='value', color='variable', color_discrete_sequence=px.colors.qualitative.Plotly)
fig.update_layout(legend=dict(x=1, y=0, font=dict(family="Courier", size=12, color="black"),
bordercolor="black", borderwidth=2))
fig.update_layout(xaxis_title = xlab,yaxis_title = "Intensity" ,xaxis = dict(autorange= inv))
st.plotly_chart(fig, use_container_width=True)
# Export du graphique
img = pio.to_image(fig, format="png")
with open("./Report/figures/graphe_loadings.png", "wb") as f:
f.write(img)
#############################################################################################################
with influence:
st.write('Influence plot')
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# Laverage
Hat = t.to_numpy() @ np.linalg.inv(np.transpose(t.to_numpy()) @ t.to_numpy()) @ np.transpose(t.to_numpy())
leverage = np.diag(Hat) / np.trace(Hat)
tresh3 = 2 * t.shape[1]/t.shape[0]
# Loadings
p = pd.concat([dr_model.loadings_.loc[:,axis1], dr_model.loadings_.loc[:,axis2], dr_model.loadings_.loc[:,axis3]], axis = 1)
# Matrix reconstruction
xp = np.dot(t,p.T)
# Q residuals: Q residuals represent the magnitude of the variation remaining in each sample after projection through the model
residuals = np.diag(np.subtract(xc.to_numpy(), xp)@ np.subtract(xc.to_numpy(), xp).T)
tresh4 = sc.stats.chi2.ppf(0.05, df = 3)
# color with metadata
if not meta_data.empty and clus_method:
if col == "None":
l1 = ["Samples"]* t.shape[0]
elif col == clus_method:
l1 = labels
else:
l1 = list(map(str.lower,md_df_st_[col]))
elif meta_data.empty and clus_method:
l1 = labels
elif meta_data.empty and not clus_method:
l1 = ["Samples"]* t.shape[0]
elif not meta_data.empty and not clus_method:
l1 = list(map(str.lower,md_df_st_[col]))
fig = px.scatter(x = leverage, y = residuals, color = l1)
fig.add_vline(x = tresh3, line_width = 1, line_dash = 'solid', line_color = 'red')
fig.add_hline(y=tresh4, line_width=1, line_dash='solid', line_color='red')
fig.update_layout(xaxis_title="Leverage", yaxis_title = "Residuals")
out3 = leverage > tresh3
out4 = residuals > tresh4
for i in range(t.shape[0]):
if out3[i]:
if not meta_data.empty:
ann = meta_data.loc[:,'name'][i]
else:
ann = t.index[i]
fig.add_annotation(dict(x = leverage[i], y = residuals[i], showarrow=True, text = ann,
xanchor = 'auto', yanchor = 'auto'))
st.plotly_chart(fig, use_container_width = True)
img = pio.to_image(fig, format="png")
st.write('T²-Hotelling vs Q residuals plot')
# Hotelling
hotelling = t.var(axis = 1)
# Q residuals: Q residuals represent the magnitude of the variation remaining in each sample after projection through the model
residuals = np.diag(np.subtract(xc.to_numpy(), xp)@ np.subtract(xc.to_numpy(), xp).T)
I = t.shape[0]
fcri = sc.stats.f.isf(0.05, 3, I)
tresh0 = (3 * (I ** 2 - 1) * fcri) / (I * (I - 3))
tresh1 = sc.stats.chi2.ppf(0.05, df = 3)
fig = px.scatter(t, x = hotelling, y = residuals, color = l1)
fig.update_layout(xaxis_title="T²",yaxis_title="Q-Residuals")
fig.add_vline(x=tresh0, line_width=1, line_dash='solid', line_color='red')
fig.add_hline(y=tresh1, line_width=1, line_dash='solid', line_color='red')
out0 = hotelling > tresh0
out1 = residuals > tresh1
for i in range(t.shape[0]):
if out0[i]:
if not meta_data.empty:
ann = meta_data.loc[:,'name'][i]
else:
ann = t.index[i]
fig.add_annotation(dict(x = hotelling[i], y = residuals[i], showarrow=True, text = ann,
xanchor = 'auto', yanchor = 'auto'))
st.plotly_chart(fig, use_container_width=True)
fig.write_image("./Report/figures/graphe_hotelling.png", format="png")
#st.write()
#st.write()
Nb_ech = str(tcr.shape[0])
nb_clu = str(sam1.shape[0])
Ac_Km = ['Spectra_Plot.png', 'Elbow.png', 'graphe_loadings.png', 'plot_axe1_axe2.png', 'plot_axe1_axe3.png', 'plot_axe2_axe3.png', 'graphe_hotelling.png', 'graphe_influence.png']
# Streamlit container
with st.container():
header3, header4 = st.columns(2)
if header3.button("Exporter le rapport"):
if test == '.csv':
if dim_red_method == dim_red_methods[1] and clus_method == cluster_methods[1]:
latex_report = report.report(sam, tcr, Nb_ech, nb_clu, 'sample', Ac_Km, 'csv', 'kmeans')
report.compile_latex()
elif dim_red_method == dim_red_methods[1] and clus_method == cluster_methods[2]:
latex_report = report.report(sam, tcr, Nb_ech, nb_clu, 'sample', Ac_Km, 'csv', 'hdb')
report.compile_latex()
elif dim_red_method == dim_red_methods[1] and clus_method == cluster_methods[3]:
latex_report = report.report(sam, tcr, Nb_ech, nb_clu, 'sample', Ac_Km, 'csv', 'AP')
report.compile_latex()
else:
latex_report = report.report(sam, 'sample', 'dx')
report.compile_latex()
else:
pass