Skip to content
Snippets Groups Projects
Commit 2e63bc87 authored by DIANE's avatar DIANE
Browse files

refactoring

parent cce2ab51
No related branches found
No related tags found
No related merge requests found
# """This package provides a complete workflow to users how want to proced to NIRS analysis without particular knowledge.
# This is a webapp with Streamlit.
# GUI shows whatever is needed for Samples Selection based on NIRS spectra and then, to compute a model to predict
# chemical values on your samples.
# Examples:
# streamlit run ./app.py
# """
# ##
import streamlit as st
from pathlib import Path
......@@ -28,7 +17,7 @@ from datetime import datetime
import json
from shutil import rmtree, move, make_archive
from utils.data_parsing import JcampParser, CsvParser
from utils.data_parsing import jcamp_parser, csv_parser
from style.layout import UiComponents
from utils.data_handling import *
from utils.data_parsing import *
......@@ -36,4 +25,5 @@ from utils.hash import *
from utils.visualize import *
from utils.miscellaneous import ObjectHash
from utils.samsel import Samplers
from report import report
\ No newline at end of file
from report import report
from utils.data_handling import fmt
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
......@@ -33,7 +33,8 @@ if 'Predict' not in st.session_state:
st.session_state['Predict'] = False
#################################### Methods ##############################################
st.header("Prediction making using a previously developed model")
st.header("Prediction Making")
st.markdown("Predict future values using previously developed calibration.")
c1, c2 = st.columns([2, 1])
c1.image("./images/prediction making.png", use_column_width=True)
pred_data = DataFrame
......@@ -159,8 +160,8 @@ with c2:
## load and parse the temp dx file
@st.cache_data
def dx_loader(change):
from utils.data_parsing import JcampParser
M = JcampParser(path = tmp_path)
from utils.data_parsing import jcamp_parser
M = jcamp_parser(path = tmp_path)
M.parse()
return M.chem_data, M.specs_df_, M.meta_data, M.meta_data_st_
......
......@@ -25,25 +25,4 @@ class HandleItems:
@staticmethod
def creat_dir():
pass
def load_csv(file= None, dec= None, sep= None, names= None, hdr= None, change = None):
from utils.data_parsing import CsvParser
import pandas as pd
M = CsvParser(uploaded=file)
M.parse(decimal = dec, separator = sep, index_col = names, header = hdr)
return M.float, M.non_float
# df = pd.read_csv(file, decimal = dec, sep = sep, index_col = names, header = hdr)
@st.cache_data
def load_dx(tmp_path, change = None):
from utils.data_parsing import JcampParser
M = JcampParser(path = tmp_path)
M.parse()
return M.chem_data, M.specs_df_, M.meta_data
\ No newline at end of file
pass
\ No newline at end of file
......@@ -8,9 +8,9 @@ def UiComponents(pagespath, csspath, imgpath, header = True, sidebar = True, bgi
st.markdown(
"""
<div style="width: 100%; height: 170px; background-color: #7ab0c7; padding: 0px; margin-bottom: 10px; ">
<h1 style="font-family: 'Arial',d;text-align: center; color: rgb(255, 255, 255);">PACE - MEEB / CEFE</h1>
<h2 style="font-family: 'Arial';text-align: center; color: rgb(255, 255, 255);">NIRS Utils</h2>
<div style="width: 100%; height: 130px; background-color: #7ab0c7; padding: 0px; margin-bottom: 40px;margin-top: 0px; ">
<h2 style="font-family: 'Arial',d;text-align: center; color: rgb(255, 255, 255);">PACE - MEEB / CEFE</h1>
<h3 style="font-family: 'Arial';text-align: center; color: rgb(255, 255, 255);">NIRS Utils</h2>
</div>
""",
unsafe_allow_html=True
......
......@@ -46,6 +46,17 @@ from pandas import DataFrame
# return numerical_data, categorical_data
def fmt(x):
return x if x else "<Select>"
def update_state(variable, update_state):
import streamlit as st
if variable not in st.session_state:
st.session_state[variable] = 1
if update_state:
st.session_state[variable] *= -1
def list_files(mypath, import_type):
list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')]
......
......@@ -2,140 +2,220 @@ import jcamp as jc
import numpy as np
from tempfile import NamedTemporaryFile
class JcampParser:
'''This module is designed to help retrieve spectral data as well as metadata of smaples from jcamp file'''
def __init__(self, path):
# Create a temporary file to save the uploaded file
with NamedTemporaryFile(delete = False, suffix = ".dx") as tmp:
tmp.write(path.read())
self.__path = tmp.name
self.__dxfile = jc.jcamp_readfile(self.__path)
# Access samples data
self.__nb = self.__dxfile['blocks'] # Get the total number of blocks = The total number of scanned samples
self.__list_of_blocks = self.__dxfile['children'] # Store all blocks within a a list
self.__wl = self.__list_of_blocks[0]["x"] # Wavelengths/frequencies/range
def parse(self):
# Start retreiving the data
specs = np.zeros((self.__nb, len(self.__list_of_blocks[0]["y"])), dtype=float) # preallocate a np matrix for sotoring spectra
self.idx = np.arange(self.__nb) # This list is designed to store samples name
self.__met = {}
for i in range(self.__nb): # Loop over the blocks
specs[i] = self.__list_of_blocks[i]['y']
block = self.__list_of_blocks[i]
block_met = { 'name': block['title'],
'origin': block['origin'],
'date': block['date'],
#'time': block['time'],
'spectrometer': block['spectrometer/data system'].split('\n$$')[0],
'n_scans':block['spectrometer/data system'].split('\n$$')[6].split('=')[1],
'resolution': block['spectrometer/data system'].split('\n$$')[8].split('=')[1],
#'instrumental parameters': block['instrumental parameters'],
'xunits': block['xunits'],
'yunits': block['yunits'],
#'xfactor': block['xfactor'],
#'yfactor': block['yfactor'],
'firstx': block['firstx'],
'lastx': block['lastx'],
#'firsty':block['firsty'],
#'miny': block['miny'],
#'maxy': block['maxy'],
'npoints': block['npoints'],
'concentrations':block['concentrations'],
#'deltax':block['deltax']
}
self.__met[f'{i}'] = block_met
from pandas import DataFrame
self.metadata_ = DataFrame(self.__met).T
self.metadata_.index = self.metadata_['name']
self.spectra = DataFrame(np.fliplr(specs), columns= self.__wl[::-1], index = self.metadata_['name']) # Storing spectra in a dataframe
#### Concentrarions
self.pattern = r"\(([^,]+),(\d+(\.\d+)?),([^)]+)"
aa = self.__list_of_blocks[0]['concentrations']
def jcamp_parser(path, include, change = None):
"""
Parses a JCAMP-DX file and extracts spectral data, target concentrations,
and metadata as per the specified `include` parameter.
Parameters:
path (str): The file path to the JCAMP-DX file to be parsed.
include (list): Specifies which data blocks to include in the output.
Options are:
- 'x_block': Extract spectra.
- 'y_block': Extract target concentrations.
- 'meta': Extract metadata.
- 'all': Extract all available information (default).
Returns:
tuple: (x_block, y_block, met)
- x_block (DataFrame): Spectral data with samples as rows and wavelengths as columns.
- y_block (DataFrame): Target concentrations with samples as rows and analytes as columns.
- met (DataFrame): Metadata for each sample.
"""
import jcamp as jc
import numpy as np
from pandas import DataFrame
import re
# Read the JCAMP-DX file
dxfile = jc.jcamp_readfile(path)
nb = dxfile['blocks']
list_of_blocks = dxfile['children']
idx = [] # List to store sample names
metdata = {} # Dictionary to store metadata
# Preallocate matrix for spectral data if 'x_block' or 'all' is included
if 'x_block' in include or 'all' in include:
specs = np.zeros((nb, len(list_of_blocks[0]["y"])), dtype=float)
# Initialize containers for target concentrations if 'y_block' or 'all' is included
if 'y_block' in include or 'all' in include:
targets_tuple = {}
pattern = r"\(([^,]+),(\d+(\.\d+)?),([^)]+)"
aa = list_of_blocks[0]['concentrations']
a = '\n'.join(line for line in aa.split('\n') if "NCU" not in line and "<<undef>>" not in line)
n_elements = a.count('(')
# Extract chemical element names
elements_name = [match[0] for match in re.findall(pattern, a)]
# Helper function to extract concentration values
def conc(sample=None, pattern=None):
prep = '\n'.join(line for line in sample.split('\n') if "NCU" not in line and "<<undef>>" not in line)
c = [np.NaN if match[1] == '0' else np.float64(match[1]) for match in re.findall(pattern, prep)]
return np.array(c)
# Loop through all blocks in the file
for i in range(nb):
idx.append(str(list_of_blocks[i]['title'])) # Store sample names
# Extract spectra if 'x_block' or 'all' is included
if 'x_block' in include or 'all' in include:
specs[i] = list_of_blocks[i]['y']
# Extract metadata if 'meta' or 'all' is included
block = list_of_blocks[i]
if 'meta' in include or 'all' in include:
metdata[i] = {
'name': block['title'],
'origin': block['origin'],
'date': block['date'],
'spectrometer': block['spectrometer/data system'].split('\n$$')[0],
'n_scans': block['spectrometer/data system'].split('\n$$')[6].split('=')[1],
'resolution': block['spectrometer/data system'].split('\n$$')[8].split('=')[1],
'xunits': block['xunits'],
'yunits': block['yunits'],
'firstx': block['firstx'],
'lastx': block['lastx'],
'npoints': block['npoints'],
}
# Extract target concentrations if 'y_block' or 'all' is included
if 'y_block' in include or 'all' in include:
targets_tuple[i] = conc(sample=block['concentrations'], pattern=pattern)
# Create DataFrame for target concentrations
if 'y_block' in include or 'all' in include:
y_block = DataFrame(targets_tuple, index=elements_name, columns=idx).T
else:
y_block = DataFrame
## Get the name of analyzed chamical elements
import re
elements_name = []
for match in re.findall(self.pattern, a):
elements_name.append(match[0])
## Retrieve concentrations
df = self.metadata_['concentrations']
cc = {}
for i in range(self.metadata_.shape[0]):
cc[df.index[i]] = self.conc(df[str(i)])
### dataframe conntaining chemical data
self.chem_data = DataFrame(cc, index=elements_name).T.astype(float)
self.chem_data.index = self.metadata_['name']
### Method for retrieving the concentration of a single sample
def conc(self,sample):
import re
prep = '\n'.join(line for line in sample.split('\n') if "NCU" not in line and "<<undef>>" not in line)
c = []
for match in re.findall(self.pattern, prep):
c.append(match[1])
concentration = np.array(c)
return concentration
@property
def specs_df_(self):
return self.spectra
@property
def meta_data(self):
me = self.metadata_.drop("concentrations", axis = 1)
me = me.drop(me.columns[(me == '').all()], axis = 1).map(lambda x: x.upper() if isinstance(x, str) else x)
return me
@property
def chem_data_(self):
return self.chem_data
# Create DataFrame for spectral data
if 'x_block' in include or 'all' in include:
wls = list_of_blocks[0]["x"] # Wavelengths/frequencies/range
x_block = DataFrame(specs, columns=wls, index=idx).astype('float64')
else:
x_block = DataFrame
# Create DataFrame for metadata
if 'meta' in include or 'all' in include:
m = DataFrame(metdata).T
m.index = idx
met = m.drop(m.columns[(m == '').all()], axis=1)
else:
met = DataFrame
return x_block, y_block, met
def csv_parser(path, decimal, separator, index_col, header, change = None):
"""
Parse a CSV file and return two DataFrames: one with floating point columns and the other with non-floating point columns.
Parameters:
-----------
path : str
The file path to the CSV file to be read.
class CsvParser:
import clevercsv
decimal : str
Character to recognize as decimal separator (e.g., '.' or ',').
def __init__(self, uploaded):
with NamedTemporaryFile(delete = False, suffix = ".csv") as tmp:
tmp.write(uploaded.read())
self.file = tmp.name
# self.file = uploaded
separator : str
The character used to separate values in the CSV file (e.g., ',' or '\t').
def parse(self, decimal, separator, index_col, header):
from pandas import read_csv
self.df = read_csv(self.file, decimal = decimal, sep = separator, index_col = index_col, header = header)
self.float, self.non_float = self.df.select_dtypes(include = 'float'), self.df.select_dtypes(exclude = 'float').map(lambda x: x.upper() if isinstance(x, str) else x)
# self.float = self.df
# self.non_float = self.df
index_col : int or str, optional
Column to set as the index of the DataFrame. Default is None.
@property
def rownames(self):
return self.df.index
header : int, list of int, or None, optional
Row(s) to use as the header. Default is 'infer'.
Returns:
--------
tuple
A tuple containing two DataFrames:
- float : DataFrame with columns that are of type float.
- non_float : DataFrame with non-floating point columns, with strings uppercased if applicable.
Notes:
------
- This function reads a CSV file into a pandas DataFrame, then separates the columns into floating point and non-floating point types.
- The non-floating columns will be converted to uppercase if they are of string type, unless a `change` function is provided to modify them otherwise.
- If `change` is provided, it will be applied to the non-floating point columns before returning them.
"""
from pandas import read_csv
df = read_csv(path, decimal=decimal, sep=separator, index_col=index_col, header=header)
# Select columns with float data type
float = df.select_dtypes(include='float')
# Select columns without float data type and apply changes (like uppercasing strings)
non_float = df.select_dtypes(exclude='float').map(lambda x: x.upper() if isinstance(x, str) else x)
return float, non_float
def meta_st(df):
"""
Preprocesses a DataFrame by retaining columns with between 2 and 59 unique values
and converting string columns to uppercase.
Parameters:
-----------
df : pandas.DataFrame
The input DataFrame to be processed.
Returns:
--------
pandas.DataFrame
A DataFrame that:
- Retains columns with between 2 and 59 unique values.
- Converts string columns to uppercase (if applicable).
- Returns an empty DataFrame if the input DataFrame is empty.
Notes:
------
- The function filters out columns with fewer than 2 unique values or more than 59 unique values.
- String columns (non-numeric columns) are converted to uppercase.
- If the input DataFrame is empty, it returns an empty DataFrame.
Example:
--------
import pandas as pd
data = {
'Name': ['alice', 'bob', 'charlie'],
'Age': [25, 30, 35],
'Country': ['usa', 'uk', 'canada'],
'Score': [90.5, 88.0, 92.3],
'IsActive': [True, False, True]
}
df = pd.DataFrame(data)
# Apply the function
result = meta_st(df)
print(result)
"""
import pandas as pd
if not df.empty:
# Convert string columns to uppercase
for i in df.columns:
try:
df[[i]].astype('float')
except:
df[[i]] = df[[i]].apply(lambda x: x.str.upper())
# Retain columns with unique values between 2 and 59
retained = df.loc[:, (df.nunique() > 1) & (df.nunique() < 60)]
else:
retained = df
# Return an empty DataFrame if the input DataFrame is empty
retained = pd.DataFrame()
return retained
# def parse(self):
# import pandas as pd
......
......@@ -64,8 +64,9 @@ class LinearPCA:
Returns:
tuple: A tuple containing eigenvalues (eigvals) and the Lambda matrix (diagonal matrix of eigenvalues).
"""
eigvals = self.model.singular_values_**2
Lambda = np.diag(eigvals)
eigvals = self.model.singular_values_**2 /self.__x.shape[0]
labels= [f'PC{i+1}({100 * self.model.explained_variance_ratio_[i].round(2)}%)' for i in range(self.__ncp)]
Lambda = DataFrame(np.diag(eigvals), index = labels, columns = labels)
return eigvals, Lambda
@property
......
......@@ -227,6 +227,14 @@ class TpeIpls(Regmodel):
########################################### LWPLSR #########################################
class LwplsObject:
def __init__(self, Reg_json = None, pred = None):
if Reg_json is not None and pred is not None:
from pandas import json_normalize
self.model_ = Reg_json['model']
self.best_hyperparams_ = Reg_json['best_lwplsr_params']
self.pred_data_ = [json_normalize(Reg_json[i]) for i in pred]
############################################ Pcr #########################################
class Pcr(Regmodel):
......
import numpy as np
def vip(x, y, model):
t = model.x_scores_
w = model.x_weights_
q = model.y_loadings_
m, p = x.shape
_, h = t.shape
vips = np.zeros((p,))
s = np.diag(t.T @ t @ q.T @ q).reshape(h, -1) # variabilité éxpliquée par chaque composante
total_s = np.sum(s) #C'est la somme totale de la matrice diagonale s, représentant la variance totale expliquée par le modèle.
for i in range(p):
weight = np.array([ (w[i,j] / np.linalg.norm(w[:,j]))**2 for j in range(h) ])
vips[i] = np.sqrt(p*(s.T @ weight)/total_s)
return vips
def sel_ratio(x, y, model):
pass
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment