Newer
Older
from Packages import *
st.set_page_config(page_title="NIRS Utils", page_icon=":goat:", layout="wide")
from Modules import *
# HTML pour le bandeau "CEFE - CNRS"
# bandeau_html = """
# <div style="width: 100%; background-color: #4682B4; padding: 10px; margin-bottom: 10px;">
# <h1 style="text-align: center; color: white;">CEFE - CNRS</h1>
# </div>
# """
# # Injecter le code HTML du bandeau
# st.markdown(bandeau_html, unsafe_allow_html=True)
add_header()
add_sidebar(pages_folder)
hash_ = ''
def p_hash(add):
global hash_
hash_ = hash_data(hash_+str(add))
return hash_
dirpath = Path('Report/out/model')
if dirpath.exists() and dirpath.is_dir():
if 'Predict' not in st.session_state:
st.session_state['Predict'] = False
# #################################### Methods ##############################################
# empty temp figures
def delete_files(keep):
supp = []
# Walk through the directory
for root, dirs, files in os.walk('Report/', topdown=False):
for file in files:
if file != 'logo_cefe.png' and not any(file.endswith(ext) for ext in keep):
os.remove(os.path.join(root, file))
###################################################################
c1, c2 = st.columns([2, 1])
c1.image("./images/prediction making.png", use_column_width=True)
pred_data = DataFrame

DIANE
committed
def preparespecdf(df):
other = df.select_dtypes(exclude = 'float')
spec = df.select_dtypes(include='float')
if other.shape[1] > 0:
rownames = other.iloc[:,0]
spec.index = rownames
else:
rownames = [str(i) for i in range(df.shape[0])]
if spec.shape[1]<60:
spec = DataFrame
def check_exist(var):
out = var in globals()
return out
with c2:
zip = st.file_uploader("Load your zip file:", type = ['.zip'], help=" :mushroom: select a csv matrix with samples as rows and lambdas as columns")
if not zip:
st.info('Info: Insert your zip file above!')
disable1 = False if zip else True
new_data = st.file_uploader("Load NIRS Data for prediction making:", type = ['csv', 'dx'], help=" :mushroom: select a csv matrix with samples as rows and lambdas as columns", disabled=disable1)
if not disable1 :
info1 = st.info('Info: Insert your NIRS data file above!')
if zip:
@st.cache_data
def tempdir(prefix, dir):
with TemporaryDirectory( prefix= prefix, dir= dir ) as temp_dir:# create a temp directory
tempdirname = os.path.split(temp_dir)[1]
return tempdirname
temp_dir = tempdir(prefix = "pred_temp", dir = "./temp")
# Open and extract the zip file
from zipfile import ZipFile
with ZipFile(zip, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def find_pkl_files(root_dir):
# List to store the paths of .pkl files
pkl_files = []
# Walk through the directory
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
# Check if the file has a .pkl extension
if filename.endswith('.pkl'):
# Construct the full file path
file_path = os.path.join(dirpath, filename)
pkl_files.append(file_path)
return pkl_files
pkl = find_pkl_files(root_dir=temp_dir)
system_file = [path for path in pkl if 'file_system' in path]
if len(system_file) ==1 :
with open(system_file[0], 'rb') as fi:
system_data = load(fi)
if new_data:
info1.empty()
with c2:
if new_data:
p_hash(new_data.name)
test = new_data.name.split('.')[-1]
export_name = 'Pred of'
export_name += new_data.name[:new_data.name.find('.')]
match test:
case 'csv':
qsep = st.radio("Select csv separator : " , options = [';', ','], key = 2, horizontal = True)
qhdr = st.radio("indexes column in csv? : " , options = ['yes', 'no'], key = 3, horizontal = True)
col = 0 if qhdr == 'yes' else None
p_hash([qsep,qhdr])
df = read_csv(new_data, sep=qsep, header= col, decimal=".")
pred_data, cat, rownames = preparespecdf(df)
case "dx":
with NamedTemporaryFile(delete=False, suffix=".dx") as tmp:
tmp.write(new_data.read())
tmp_path = tmp.name
with open(tmp.name, 'r') as dd:
dxdata = new_data.read()
p_hash(str(dxdata)+str(new_data.name))
## load and parse the temp dx file
@st.cache_data
def dx_loader(change):
chem_data, spectra, meta_data, _ = read_dx(file = tmp_path)
return chem_data, spectra, meta_data, _
chem_data, spectra, meta_data, _ = dx_loader(change = hash_)
st.success("The data have been loaded successfully", icon="✅")
if chem_data.to_numpy().shape[1]>0:
yname = st.selectbox('Select target', options=chem_data.columns)
measured = chem_data.loc[:,yname] == 0
y = chem_data.loc[:,yname].loc[measured]
pred_data = spectra.loc[measured]
else:
pred_data = spectra
os.unlink(tmp_path)
st.subheader("I - Spectral data preprocessing & visualization", divider='blue')
# try:
if not pred_data.empty:# Load the model with joblib
@st.cache_data

Nicolas Barthes
committed
def preprocess_spectra(data, change):
# M4.write(ProcessLookupError)
if system_data['spec-preprocessing']['normalization'] == 'Snv':

Nicolas Barthes
committed
x1 = Snv(data)
norm = 'Standard Normal Variate'
else:
norm = 'No Normalization was applied'

Nicolas Barthes
committed
x1 = data
x2 = savgol_filter(x1,
window_length = int(system_data['spec-preprocessing']['SavGol(polyorder,window_length,deriv)'][1]),
polyorder = int(system_data['spec-preprocessing']['SavGol(polyorder,window_length,deriv)'][0]),
deriv = int(system_data['spec-preprocessing']['SavGol(polyorder,window_length,deriv)'][2]),
delta=1.0, axis=-1, mode="interp", cval=0.0)

Nicolas Barthes
committed
preprocessed = DataFrame(x2, index = data.index, columns = data.columns)

Nicolas Barthes
committed
norm, preprocessed = preprocess_spectra(pred_data, change= hash_)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #
# @st.cache_data
# def specplot_raw(change):
# fig2 = plot_spectra(pred_data, xunits = 'lab', yunits = "meta_data.loc[:,'yunits'][0]")
# return fig2
# rawspectraplot = specplot_raw(change = hash_)
rawspectraplot = plot_spectra(pred_data, xunits = 'Wavelength/Wavenumber', yunits = "Signal intensity")
c3, c4 = st.columns([2, 1])
with c3:
st.write('Raw spectra')
st.pyplot(rawspectraplot)
## plot preprocessed spectra
if check_exist("preprocessed"):
# def specplot_prep(change):
# fig2 = plot_spectra(preprocessed, xunits = 'lab', yunits = "meta_data.loc[:,'yunits'][0]")
# return fig2
# prepspectraplot = specplot_prep(change = hash_)
prepspectraplot = plot_spectra(preprocessed, xunits = 'Wavelength/Wavenumber', yunits = "Signal intensity")
st.write('Preprocessed spectra')
st.pyplot(prepspectraplot)
with c4:
@st.cache_data
def prep_info(change):
SG = f'- Savitzky-Golay derivative parameters \n:(Window_length:{system_data['spec-preprocessing']['SavGol(polyorder,window_length,deriv)'][1]}; polynomial order: {system_data['spec-preprocessing']['SavGol(polyorder,window_length,deriv)'][0]}; Derivative order : {system_data['spec-preprocessing']['SavGol(polyorder,window_length,deriv)'][2]})'
Norm = f'- Spectral Normalization \n: {system_data['spec-preprocessing']['normalization']}'
return SG, Norm
SG, Norm = prep_info(change = hash_)
st.info('The spectra were preprocessed using:\n'+SG+"\n"+Norm)
################### Predictions making ##########################
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
disable2 = False if check_exist("pred_data") else True
pred_button = st.button('Predict', type='primary', disabled= disable2, use_container_width=False)
if pred_button:st.session_state['Predict'] = True
if st.session_state['Predict']:
if check_exist("pred_data"):# Load the model with joblib
c5, c6 = st.columns([2, 1])
with c6:
model = system_data['model_']
if system_data['model_type'] in ['PLS','TPE-iPLS']:
nvar = system_data['model_'].n_features_in_
elif system_data['model_type'] =='LW-PLS':
nvar = system_data['data']['raw-spectra'].shape[1]
if check_exist('preprocessed'):
if isinstance(system_data['selected-wls']['idx'], DataFrame):
idx = np.concatenate([np.arange(system_data['selected-wls']['idx'].values.reshape((-1,))[2*i],system_data['selected-wls']['idx'].values.reshape((-1,))[2*i+1]+1) for i in range(system_data['selected-wls']['idx'].shape[0])])
else:
idx = np.arange(nvar)
if np.max(idx) <= preprocessed.shape[1]:
preprocesseddf = preprocessed.iloc[:,idx] ### get predictors
else:
st.error("Error: The number of columns in your data does not match the number of columns used to train the model. Please ensure they are the same.")
if check_exist("preprocesseddf"):
if st.session_state['Predict'] and nvar == preprocesseddf.shape[1]:
# if nvar == preprocesseddf.shape[1]:
match system_data['model_type']:
case 'PLS'|'TPE-iPLS':
try:
result = DataFrame(system_data['model_'].predict(preprocesseddf), index = rownames, columns = ['Results'])
except:
st.error(f'''Error: Length mismatch: the number of samples indices is {len(rownames)}, while the model produced
{len(model.predict(preprocesseddf))} values. correct the "indexes column in csv?" parameter''')
case 'LW-PLS':
temp_path = Path('temp/')

Nicolas Barthes
committed
# export data to csv for Julia train/pred
st.write(system_data['data'])
# spectra = system_data['data']['raw-spectra'] # without pretreatments
spectra = preprocess_spectra(system_data['data']['raw-spectra'], change= hash_)
# with pretreatments
x_pred = preprocessed
y = system_data['data']['target']
data_to_work_with = ['spectra', 'y', 'x_pred']
spectra_np, y_np, x_pred_np = spectra.to_numpy(), y.to_numpy(), x_pred.to_numpy()
# export spectra, y, x_pred to temp folder as csv files

Nicolas Barthes
committed
j = globals()[i]
# st.write(j)

Nicolas Barthes
committed
# # run Julia Jchemo as subprocess

Nicolas Barthes
committed
# subprocess.run([f"{sys.executable}", subprocess_path / "LWPLSR_Call.py"])
# # retrieve json results from Julia JChemo
# try:
# with open(temp_path / "lwplsr_outputs.json", "r") as outfile:
# Reg_json = json.load(outfile)
# # delete csv files
# for i in data_to_work_with: os.unlink(temp_path / str(i + ".csv"))
# # delete json file after import
# os.unlink(temp_path / "lwplsr_outputs.json")
# os.unlink(temp_path / "lwplsr_preTreatments.json")
# # format result data into Reg object
# pred = ['pred_data_train', 'pred_data_test']### keys of the dict
# for i in range(nb_folds):
# pred.append("CV" + str(i+1)) ### add cv folds keys to pred
# except FileNotFoundError as e:
# Reg = None
# for i in data_to_work_with: os.unlink(temp_path / str(i + ".csv"))
#
# st.write(Reg_json)
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
################################### results display ###################################
if check_exist("preprocesseddf"):
if preprocesseddf.shape[1]>1 and check_exist('result'):
hist = pred_hist(pred=result)
with c5:
st.write('Predicted values distribution')
st.pyplot(hist)
st.write('Predicted values table')
st.dataframe(result.T)
with c6:
st.info('descriptive statistics for the model output')
st.write(DataFrame(desc_stats(result)))
elif pred_button and nvar != preprocesseddf.shape[1]:
with c6:
st.error(f'Error: The model was trained on {nvar} wavelengths, but you provided {preprocessed.shape[1]} wavelengths for prediction. Please ensure they match!')
################################# Download results #################################
if check_exist('result'):
@st.cache_data(show_spinner =False)
def preparing_results_for_downloading(change):
match test:
# load csv file
case 'csv':
df.to_csv('Report/out/dataset/'+ new_data.name, sep = ';', encoding = 'utf-8', mode = 'a')
case 'dx':
with open('Report/out/dataset/'+new_data.name, 'w') as dd:
dd.write(dxdata)
prepspectraplot.savefig('./Report/out/figures/raw_spectra.png')
rawspectraplot.savefig('./Report/out/figures/preprocessed_spectra.png')
hist.savefig('./Report/out/figures/histogram.png')
result.round(4).to_csv('./Report/out/The analysis result.csv', sep = ";")
return change
preparing_results_for_downloading(change = hash_)
@st.cache_data(show_spinner =False)
def tempdir(change):
with TemporaryDirectory( prefix="results", dir="./Report") as temp_dir:# create a temp directory
tempdirname = os.path.split(temp_dir)[1]
if len(os.listdir('./Report/out/figures/'))==3:
make_archive(base_name="./Report/Results", format="zip", base_dir="out", root_dir = "./Report")# create a zip file
move("./Report/Results.zip", f"./Report/{tempdirname}/Results.zip")# put the inside the temp dir
with open(f"./Report/{tempdirname}/Results.zip", "rb") as f:
zip_data = f.read()
return tempdirname, zip_data
date_time = datetime.now().strftime('%y%m%d%H%M')
try :
tempdirname, zip_data = tempdir(change = hash_)
st.download_button(label = 'Download', data = zip_data, file_name = f'Nirs_Workflow_{date_time}_Pred_.zip', mime ="application/zip",
args = None, kwargs = None,type = "primary",use_container_width = True)
except:
st.write('rtt')
# except:
# c2.error('''Error: Data loading failed. Please check your file. Consider fine-tuning the dialect settings or ensure the file isn't corrupted.''')
else:
with c2:
if new_data:
st.error("Error!:The The data you provided for making predictions doesn't appear to be multivariable.!")