Newer
Older
st.set_page_config(page_title="NIRS Utils", page_icon=":goat:", layout="wide")
# layout
UiComponents(pagespath = pages_folder, csspath= css_file,imgpath=image_path ,
header=True, sidebar= True, bgimg=False, colborders=True)
st.header("Calibration Subset Selection") # page title
st.markdown("Create a predictive model, then use it for predicting your target variable (chemical data) from NIRS spectra")
c1, c2 = st.columns([3, 1])
c1.image("./images/sample selection.png", use_column_width=True) # graphical abstract
for file in files:
if file != 'logo_cefe.png' and not any(file.endswith(ext) for ext in keep):
os.remove(os.path.join(root, file))
if Path('report/out/model').exists() and Path('report/out/model').is_dir():
rmtree(Path('report/out/model'))

Nicolas Barthes
committed
selec_strategy = ['random']
case 'advanced':
dim_red_methods=['PCA','UMAP', 'NMF'] # List of dimensionality reduction algos
cluster_methods = ['Kmeans','HDBSCAN', 'AP'] # List of clustering algos
selec_strategy = ['center','random']
# ~~~~~~~~~~~~~~~~ clean the analysis results dir ~~~~~~~~~~~~~~~~
delete_files(keep = ['.py', '.pyc','.bib'])
################################### I - Data Loading and Visualization ########################################
files_format = ['csv', 'dx'] # Supported files format
# loader for datafile
file = c2.file_uploader("Data file", type = ["csv", "dx"], help = " :mushroom: select a csv matrix with samples as rows and lambdas as columns", key = 5)
spectra = DataFrame()
meta_data = DataFrame()
tcr=DataFrame()
sam=DataFrame()
sam1=DataFrame()
selected_samples = DataFrame()
labels = []
color_palette = None
dr_model = None # dimensionality reduction model
cl_model = None # clustering model

DIANE
committed
else:
extension = file.name.split(".")[-1]
userfilename = file.name.replace(f".{extension}", '')
c2_1, c2_2 = st.columns([.5, .5])
with c2_1:
dec = st.radio('decimal:', options= [".", ","], horizontal = True)
sep = st.radio("separator:", options = [";", ","], horizontal = True)
with c2_2:
phdr = st.radio("header: ", options = ["yes", "no"], horizontal = True)
pnames = st.radio("samples name:", options = ["yes", "no"], horizontal = True)
hdr = 0 if phdr =="yes" else None
names = 0 if pnames =="yes" else None
hash_ = ObjectHash(current=hash_, add= [userfilename, hdr, names, dec, sep])
from io import StringIO
stringio = StringIO(file.getvalue().decode("utf-8"))
data_str = str(stringio.read())
@st.cache_data
par.parse(decimal = dec, separator = sep, index_col = names, header = hdr)
return par.float, par.meta_data, par.meta_data_st_, par.df
spectra, meta_data, md_df_st_, imp = read_csv(file= file, change = hash_)
st.success("The data have been loaded successfully", icon="✅")
except:
st.error('''Error: The format of the file does not correspond to the expected dialect settings.
To read the file correctly, please adjust the separator parameters.''')
tmp.write(file.read())
tmp_path = tmp.name
with open(tmp.name, 'r') as dd:
dxdata = dd.read()
## load and parse the temp dx file
@st.cache_data
return M.chem_data, M.specs_df_, M.meta_data, M.meta_data_st_
hash_ = ObjectHash(current=hash_, add= dxdata)
################################################### END : I- Data loading and preparation ####################################################
if not spectra.empty:
with c2:
st.write('Data summary:')
st.write(f'- the number of spectra:{spectra.shape[0]}')
st.write(f'- the number of wavelengths:{spectra.shape[1]}')
st.write(f'- the number of categorical variables:{meta_data.shape[1]}')
################################################### BEGIN : visualize and split the data ####################################################

DIANE
committed
n_samples = spectra.shape[0]
nwl = spectra.shape[1]
# retrieve columns name and rows name of the dataframe
colnames = list(spectra.columns)
rownames = [str(i) for i in list(spectra.index)]
spectra.index = rownames
fig = plot_spectra(spectra, xunits = 'Wavelength/Wavenumber', yunits = "Signal intensity")
data_info = DataFrame({'Name': [file.name],
'Number of scanned samples': [n_samples]},
index = ['Input file'])
return fig, data_info

DIANE
committed
st.info('Information on the loaded data file')
st.write(data_info) ## table showing the number of samples in the data file
################################################### END : visualize and split the data ####################################################
############################## Exploratory data analysis ###############################
st.subheader("II - Exploratory Data Analysis-Multivariable Data Analysis", divider='blue')
xc = standardize(spectra, center=True, scale=False)
c5, c6, c7, c8, c9, c10, c11 = st.columns([1, 1, 0.6, 0.6, 0.6, 1.5, 1.5])
with c5:
dim_red_method = st.selectbox("Dimensionality reduction techniques: ",
options = ['']+dim_red_methods if len(dim_red_methods)>2 else dim_red_methods,
key = 37, format_func = lambda x: x if x else "<Select>", disabled = False if len(dim_red_methods)>2 else True)
if dim_red_method == '':
st.info('Info: Select a dimensionality reduction technique!')
filter = md_df_st_.columns.tolist()
supervised = st.selectbox('Supervised UMAP by(optional):', options = ['']+filter, format_func = lambda x: x if x else "<Select>", key=108)
umapsupervisor = [None if supervised == '' else md_df_st_[supervised]][0]
else:
supervised = st.selectbox('Supervised UMAP by:', options = ["Meta-data is not available"], disabled=True, format_func = lambda x: x if x else "<Select>", key=108)
umapsupervisor = None
disablewidgets = [False if (dim_red_method and st.session_state.interface == 'advanced') else True][0]
clus_method = st.selectbox("Clustering techniques(optional): ",
options = ['']+cluster_methods if len(cluster_methods)>2 else cluster_methods,
key = 38, format_func = lambda x: x if x else "<Select>", disabled= disablewidgets)
# if disablewidgets == False and dim_red_method in dim_red_methods:
# inf = st.info('Info: Select a clustering technique!')
dr_model = Umap(numerical_data = MinMaxScale(spectra), cat_data = umapsupervisor)
case 'NMF':
dr_model = Nmf(spectra, Ncomp= 3)
return dr_model
axis1 = c7.selectbox("x-axis", options = dr_model.scores_.columns, index=0)
axis2 = c8.selectbox("y-axis", options = dr_model.scores_.columns, index=1)
axis3 = c9.selectbox("z-axis", options = dr_model.scores_.columns, index=2)
t = dr_model.scores_.loc[:,np.unique(axis)]
tcr = standardize(t)

DIANE
committed
if dim_red_method == 'UMAP':

DIANE
committed
else:

DIANE
committed
with c6:
sel_ratio = st.number_input('Enter the number/fraction of samples to be selected:',min_value=0.01, max_value=float("{:.2f}".format(spectra.shape[0])), value=0.20, format="%.2f", disabled= disablewidgets)
if sel_ratio:
if sel_ratio > 1.00:
ratio = int(sel_ratio)
elif sel_ratio < 1.00:
ratio = int(sel_ratio*spectra.shape[0])
if st.session_state["interface"] =='simple':
clus_method = 'KS'
else:
if dr_model and not clus_method:
clus_method = st.radio('Select samples selection strategy:', options = ['RDM', 'KS'])
elif dr_model and clus_method:
disabled1 = False if clus_method in cluster_methods else True
selection = st.radio('Select samples selection strategy:', options = selec_strategy, disabled = disabled1)
cl_model = Sk_Kmeans(tcr, max_clusters = ratio)
data, labels, clu_centers = cl_model.fit_optimal_
ncluster = clu_centers.shape[0]
cl_model = Hdbscan(np.array(tcr))
labels, clu_centers, non_clustered = cl_model.labels_,cl_model.centers_, cl_model.non_clustered
ncluster = len(clu_centers)
# 3- Affinity propagation
case 'AP':
cl_model = AP(X = tcr)
data, labels, clu_centers = cl_model.fit_optimal_
ncluster = len(clu_centers)
case 'KS':
_, selected_samples_idx = cl_model.calset
labels = ["ind"]*n_samples
ncluster = "1"
selection_number = 'None'
selection = 'None'
# #################################################### III - Samples selection using the reduced data presentation ######

DIANE
committed
if not labels:
custom_color_palette = px.colors.qualitative.Plotly[:1]
elif labels:
num_clusters = len(np.unique(labels))
custom_color_palette = px.colors.qualitative.Plotly[:num_clusters]
match selection:
# Strategy 0
case 'center':
# list samples at clusters centers - Use sklearn.metrics.pairwise_distances_argmin if you want more than 1 sample per cluster
closest, _ = pairwise_distances_argmin_min(clu_centers, new_tcr)
selected_samples_idx = np.array(new_tcr.index)[list(closest)]
selected_samples_idx = selected_samples_idx.tolist()
#### Strategy 1
case 'random':
s = np.array(labels)[np.where(np.array(labels) !='Non clustered')[0]]
for i in np.unique(s):
C = np.where(np.array(labels) == i)[0]
if C.shape[0] >= selection_number:
km2 = KMeans(n_clusters = selection_number)
km2.fit(tcr.iloc[C,:])
from sklearn.metrics import pairwise_distances_argmin_min
clos, _ = pairwise_distances_argmin_min(km2.cluster_centers_, tcr.iloc[C,:])
selected_samples_idx.extend(tcr.iloc[C,:].iloc[list(clos)].index)
else:
selected_samples_idx.extend(new_tcr.iloc[C,:].index.to_list())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ results visualization ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if meta_data.empty and clus_method in cluster_methods:
elif not meta_data.empty and clus_method in cluster_methods:
elif not meta_data.empty and clus_method not in cluster_methods:
filter = [''] + md_df_st_.columns.tolist()
elif meta_data.empty and not clus_method in cluster_methods:
filter = []
if st.session_state["interface"] =='simple':
desactivatelist = True
if meta_data.empty:
desactivatelist = True
filter = ['']
elif not meta_data.empty:
filter = [''] + md_df_st_.columns.tolist()
desactivatelist = False
else:
desactivatelist = False
if len(axis)== 1:
tcr_plot['1d'] = np.random.uniform(-.5, .5, tcr_plot.shape[0])
colfilter = st.selectbox('Color by:', options= filter,format_func = lambda x: x if x else "<Select>", disabled = desactivatelist)
if colfilter in cluster_methods:
tcr_plot[colfilter] = labels
elif not meta_data.empty and colfilter in md_df_st_.columns.tolist():
tcr_plot[f'{colfilter} :'] = list(map(str.lower,md_df_st_.loc[:,colfilter]))
tcr_plot[f'{colfilter} :'] = ['sample'] * tcr_plot.shape[0]
col_var_name = tcr_plot.columns.tolist()[-1]
n_categories = len(np.unique(tcr_plot[col_var_name]))
custom_color_palette = px.colors.qualitative.Plotly[:n_categories]
if selected_samples_idx:# color selected samples
t_selected = tcr_plot.iloc[selected_samples_idx,:]
match t.shape[1]:
case 3:
fig = px.scatter_3d(tcr_plot, x = axis[0], y = axis[1], z = axis[2], color = col_var_name ,color_discrete_sequence = custom_color_palette)
fig.update_traces(marker=dict(size=4))
if selected_samples_idx:# color selected samples
fig.add_scatter3d(x = t_selected.loc[:,axis[0]], y = t_selected.loc[:,axis[1]], z = t_selected.loc[:,axis[2]],
mode ='markers', marker = dict(size = 5, color = 'black'), name = 'selected samples')
case 2:
fig = px.scatter(tcr_plot, x = axis[0], y = axis[1], color = col_var_name ,color_discrete_sequence = custom_color_palette)
if selected_samples_idx:# color selected samples
fig.add_scatter(x = t_selected.loc[:,axis[0]], y = t_selected.loc[:,axis[1]],
mode ='markers', marker = dict(size = 5, color = 'black'), name = 'selected samples')
case 1:
yy = np.random.uniform(-.5, .5, tcr_plot.shape[0])
fig = px.scatter(tcr_plot, x = axis[0], y = '1d', color = col_var_name ,color_discrete_sequence = custom_color_palette)
fig.add_scatter(x = t_selected.loc[:,axis[0]], y = t_selected['1d'],
mode ='markers', marker = dict(size = 5, color = 'black'), name = 'selected samples')
fig.update_yaxes(visible=False)
st.plotly_chart(fig, use_container_width = True)
if labels:
fig_export = {}
# export 2D scores plot
if len(axis)== 3:
comb = [i for i in combinations(np.arange(len(axis)), 2)]
subcap = ['a','b','c']
for i in range(len(comb)):
fig_= px.scatter(tcr_plot, x = axis[(comb[i][0])], y=axis[(comb[i][1])],color = labels if list(labels) else None,color_discrete_sequence = custom_color_palette)
fig_.add_scatter(x = t_selected.loc[:,axis[(comb[i][0])]], y = t_selected.loc[:,axis[(comb[i][1])]], mode ='markers', marker = dict(size = 5, color = 'black'),
name = 'selected samples')
fig_.update_layout(font=dict(size=23))
fig_.add_annotation(text= f'({subcap[i]})', align='center', showarrow= False, xref='paper', yref='paper', x=-0.13, y= 1,
font= dict(color= "black", size= 35), bgcolor ='white', borderpad= 2, bordercolor= 'black', borderwidth= 3)
fig_.update_traces(marker=dict(size= 10), showlegend= False)
fig_export[f'scores_pc{comb[i][0]}_pc{comb[i][1]}'] = fig_
# fig_export.write_image(f'./report/out/figures/scores_pc{str(comb[i][0])}_pc{str(comb[i][1])}.png')
else:
fig_export['fig'] = fig
st.write('Loadings plot')
p = dr_model.loadings_
if meta_data.loc[:,'xunits'][0] == '1/cm':
freq.columns = ['Wavenumber (1/cm)']
#########################################
df1 = pp.melt(id_vars=freq.columns)
loadingsplot = px.line(df1, x=freq.columns, y='value', color='variable', color_discrete_sequence=px.colors.qualitative.Plotly)
loadingsplot.update_layout(legend=dict(x=1, y=0, font=dict(family="Courier", size=12, color="black"),
bordercolor="black", borderwidth=2))
loadingsplot.update_layout(xaxis_title = xlab,yaxis_title = "Intensity" ,xaxis = dict(autorange= inv))
st.plotly_chart(loadingsplot, use_container_width=True)
#############################################################################################################
# Laverage
Hat = t.to_numpy() @ np.linalg.inv(np.transpose(t.to_numpy()) @ t.to_numpy()) @ np.transpose(t.to_numpy())
leverage = np.diag(Hat) / np.trace(Hat)

DIANE
committed
tresh3 = 2 * tcr.shape[1]/n_samples
# Matrix reconstruction
xp = np.dot(t,p.T)
# Q residuals: Q residuals represent the magnitude of the variation remaining in each sample after projection through the model
residuals = np.diag(np.subtract(xc.to_numpy(), xp)@ np.subtract(xc.to_numpy(), xp).T)
from scipy.stats import chi2
tresh4 = chi2.ppf(0.05, df = len(axis))

DIANE
committed
l1 = ["Samples"]* n_samples
tcr_plot["leverage"] = leverage
tcr_plot["residuals"] = residuals
influence_plot = px.scatter(data_frame =tcr_plot, x = "leverage", y = "residuals", color=col_var_name,
influence_plot.add_scatter(x = leverage[selected_samples_idx] , y = residuals[selected_samples_idx],
mode ='markers', marker = dict(size = 5, color = 'black'), name = 'selected samples')
influence_plot.add_vline(x = tresh3, line_width = 1, line_dash = 'solid', line_color = 'red')
influence_plot.add_hline(y=tresh4, line_width=1, line_dash='solid', line_color='red')
influence_plot.update_layout(xaxis_title="Leverage", yaxis_title = "Q-residuals", font=dict(size=20), width=800, height=600)
out3 = leverage > tresh3
out4 = residuals > tresh4
# for i in range(n_samples):
# if out3[i]:
# if not meta_data.empty:
# ann = meta_data.loc[:,'name'][i]
# else:
# ann = t.index[i]
# influence_plot.add_annotation(dict(x = leverage[i], y = residuals[i], showarrow=True, text = str(ann),font= dict(color= "black", size= 15),
# xanchor = 'auto', yanchor = 'auto'))
influence_plot.update_traces(marker=dict(size= 6), showlegend= True)
influence_plot.update_layout(font=dict(size=23), width=800, height=500)
st.plotly_chart(influence_plot, use_container_width=True)
for annotation in influence_plot.layout.annotations:
influence_plot.update_layout(font=dict(size=23), width=800, height=600)
influence_plot.update_traces(marker=dict(size= 10), showlegend= False)
influence_plot.add_annotation(text= '(a)', align='center', showarrow= False, xref='paper', yref='paper', x=-0.125, y= 1,
font= dict(color= "black", size= 35), bgcolor ='white', borderpad= 2, bordercolor= 'black', borderwidth= 3)
# influence_plot.write_image('./report/out/figures/influence_plot.png', engine = 'kaleido')

DIANE
committed
# Hotelling
hotelling = t.var(axis = 1)
# Q residuals: Q residuals represent the magnitude of the variation remaining in each sample after projection through the model
residuals = np.diag(np.subtract(xc.to_numpy(), xp)@ np.subtract(xc.to_numpy(), xp).T)
from scipy.stats import f, chi2
fcri = f.isf(0.05, 3, n_samples)

DIANE
committed
tresh0 = (3 * (n_samples ** 2 - 1) * fcri) / (n_samples * (n_samples - 3))
hotelling_plot = px.scatter(t, x = hotelling, y = residuals, color=labels if list(labels) else None,
hotelling_plot.add_scatter(x = hotelling[selected_samples_idx] , y = residuals[selected_samples_idx],
mode ='markers', marker = dict(size = 5, color = 'black'), name = 'selected samples')
hotelling_plot.update_layout(xaxis_title="Hotelling-T² distance",yaxis_title="Q-residuals")
hotelling_plot.add_vline(x=tresh0, line_width=1, line_dash='solid', line_color='red')
hotelling_plot.add_hline(y=tresh1, line_width=1, line_dash='solid', line_color='red')
out0 = hotelling > tresh0
out1 = residuals > tresh1

DIANE
committed
for i in range(n_samples):
if out0[i]:
if not meta_data.empty:
ann = meta_data.loc[:,'name'][i]
else:
ann = t.index[i]
hotelling_plot.add_annotation(dict(x = hotelling[i], y = residuals[i], showarrow=True, text = str(ann), font= dict(color= "black", size= 15),
hotelling_plot.update_traces(marker=dict(size= 6), showlegend= True)
hotelling_plot.update_layout(font=dict(size=23), width=800, height=500)
st.plotly_chart(hotelling_plot, use_container_width=True)
for annotation in hotelling_plot.layout.annotations:
hotelling_plot.update_layout(font=dict(size=23), width=800, height=600)
hotelling_plot.update_traces(marker=dict(size= 10), showlegend= False)
hotelling_plot.add_annotation(text= '(b)', align='center', showarrow= False, xref='paper', yref='paper', x=-0.125, y= 1,
font= dict(color= "black", size= 35), bgcolor ='white', borderpad= 2, bordercolor= 'black', borderwidth= 3)
# hotelling_plot.write_image("./report/out/figures/hotelling_plot.png", format="png")
st.subheader('III - Selected Samples for Reference Analysis', divider='blue')

DIANE
committed
if labels:
c16, c17 = st.columns([3, 1])
c16.write("Tabular identifiers of selected samples for reference analysis:")

DIANE
committed
if selected_samples_idx:
if meta_data.empty:
sam1 = DataFrame({'name': spectra.index[clustered][selected_samples_idx],

DIANE
committed
'cluster':np.array(labels)[clustered][selected_samples_idx]},
index = selected_samples_idx)
else:
sam1 = meta_data.iloc[clustered,:].iloc[selected_samples_idx,:]
sam1.insert(loc=0, column='index', value=selected_samples_idx)
sam1.insert(loc=1, column='cluster', value=np.array(labels)[selected_samples_idx])
sam1.index = np.arange(len(selected_samples_idx))+1
with c17:
st.info(f'Information !\n - The total number of samples: {n_samples}.\n- The number of samples selected for reference analysis: {sam1.shape[0]}.\n - The proportion of samples selected for reference analysis: {round(sam1.shape[0]/n_samples*100)}%.')

DIANE
committed
sam = sam1
if clus_method =='HDBSCAN':
with c16:
unclus = st.checkbox("Include non clustered samples (for HDBSCAN clustering)", value=True)

DIANE
committed
if selected_samples_idx:
if unclus:
if meta_data.empty:

DIANE
committed
'cluster':['Non clustered']*len(spectra.index[non_clustered])},
index = spectra.index[non_clustered])
else :
sam2 = meta_data.iloc[non_clustered,:]
sam2.insert(loc=0, column='index', value= spectra.index[non_clustered])
sam2.insert(loc=1, column='cluster', value=['Non clustered']*len(spectra.index[non_clustered]))

DIANE
committed
sam.index = np.arange(sam.shape[0])+1
with c17:
st.info(f'- The number of Non-clustered samples: {sam2.shape[0]}.\n - The proportion of Non-clustered samples: {round(sam2.shape[0]/n_samples*100)}%')

DIANE
committed
else:
sam = sam1

DIANE
committed
Nb_ech = str(n_samples)
nb_clu = str(sam1.shape[0])
st.write("**Note:** Please check the box only after you have finished processing your data and are satisfied with the results. Checking the box prematurely may slow down the app and could lead to crashes.")
decis = st.checkbox("Yes, I want to download the results")
if decis:
###################################################
# ## generate report
@st.cache_data
latex_report = report.report('Representative subset selection', file.name, dim_red_method,
clus_method, Nb_ech, ncluster, selection, selection_number, nb_clu,tcr, sam)

Nicolas Barthes
committed
@st.cache_data
def preparing_results_for_downloading(change):
# path_to_report = Path("report")############################### i am here
match extension:
# load csv file
case 'csv':
imp.to_csv('report/out/dataset/'+ file.name, sep = ';', encoding = 'utf-8', mode = 'a')
case 'dx':
with open('report/out/dataset/'+file.name, 'w') as dd:
dd.write(dxdata)
fig_spectra.savefig(report_path_rel/"out/figures/spectra_plot.png", dpi = 400) ## Export report
fig_export[f'scores_pc{comb[i][0]}_pc{comb[i][1]}'].write_image(report_path_rel/f'out/figures/scores_pc{str(comb[i][0]+1)}_pc{str(comb[i][1]+1)}.png')
fig_export['fig'].write_image(report_path_rel/'out/figures/scores_plot2D.png')
fig_export['fig'].write_image(report_path_rel/'out/figures/scores_plot1D.png')
# Export du graphique
if dim_red_method in ['PCA','NMF']:
hotelling_plot.write_image(report_path_rel/"out/figures/hotelling_plot.png", format="png")
influence_plot.write_image(report_path_rel/'out/figures/influence_plot.png', engine = 'kaleido')
sam.to_csv(report_path_rel/'out/Selected_subset_for_calib_development.csv', sep = ';')
if Path(report_path_rel/"report.pdf").exists():
move(report_path_rel/"report.pdf", "./report/out/report.pdf")
preparing_results_for_downloading(change = hash_)
report.generate_report(change = hash_)
with TemporaryDirectory( prefix="results", dir="./report") as temp_dir:# create a temp directory
make_archive(base_name= report_path_rel/"Results", format="zip", base_dir="out", root_dir = "./report")# create a zip file
move(report_path_rel/"Results.zip", f"./report/{tempdirname}/Results.zip")# put the inside the temp dir
with open(report_path_rel/f"{tempdirname}/Results.zip", "rb") as f:
# st.download_button(label = 'Download', data = zip_data, file_name = f'Nirs_Workflow_{date_time}_SamSel_.zip', mime ="application/zip",
# args = None, kwargs = None,type = "primary",use_container_width = True)
date_time = datetime.now().strftime('%y%m%d%H%M')
disabled_down = True if zip_data == '' else False
st.download_button(label = 'Download', data = zip_data, file_name = f'Nirs_Workflow_{date_time}_SamSel_.zip', mime ="application/zip",
args = None, kwargs = None,type = "primary",use_container_width = True, disabled = disabled_down)