Skip to content
Snippets Groups Projects
Commit 01cef6b4 authored by DIANE's avatar DIANE
Browse files

update app files

parent 8d8d165e
No related branches found
No related tags found
No related merge requests found
Showing with 30 additions and 1244 deletions
from Packages import *
from utils import Plsr, LinearPCA, Umap, find_col_index, PinardPlsr, Nmf, AP
from utils import LWPLSR, list_files, metrics, TpeIpls, reg_plot, resid_plot, Sk_Kmeans, DxRead, Hdbscan, read_dx, PlsProcess
from utils.DATA_HANDLING import *
from utils.Miscellaneous import prediction, download_results, plot_spectra, local_css, desc_stats, hash_data,data_split, pred_hist
from utils.Hash import create_hash, check_hash
from report import report
css_file = Path("style/")
pages_folder = Path("pages/")
from style.header import add_header, add_sidebar
from config.config import pdflatex_path
local_css(css_file / "style.css")
from utils import KS, RDM
"""This package provides a complete workflow to users how want to proced to NIRS analysis without particular knowledge.
This is a webapp with Streamlit.
GUI shows whatever is needed for Samples Selection based on NIRS spectra and then, to compute a model to predict
chemical values on your samples.
streamlit run ./
\ No newline at end of file
from common import * from common import *
st.set_page_config(page_title="NIRS Utils", page_icon=":goat:", layout="wide")
# """This package provides a complete workflow to users how want to proced to NIRS analysis without particular knowledge.
# This is a webapp with Streamlit.
# GUI shows whatever is needed for Samples Selection based on NIRS spectra and then, to compute a model to predict
# chemical values on your samples.
# Examples:
# streamlit run ./
# """
# ##
import streamlit as st
from pathlib import Path
css_file = Path("style/")
pages_folder = Path("pages/")
from utils.data_parsing import JcampParser, CsvParser
from style.layout import BackgroundImg, add_header, add_sidebar, local_css
from utils.data_handling import *
from utils.data_parsing import *
from utils.hash import *
from utils.visualize import *
from utils.miscellaneous import ObjectHash
from utils.samsel import RDM, KS
from report import report
\ No newline at end of file
"""This package provides a complete workflow to users how want to proced to NIRS analysis without particular knowledge.
This is a webapp with Streamlit.
GUI shows whatever is needed for Samples Selection based on NIRS spectra and then, to compute a model to predict
chemical values on your samples.
streamlit run ./
from Packages import *
# from utils import read_dx, DxRead, Plsr, LinearPCA, Umap, find_col_index, PinardPlsr, Nmf, AP
# from utils import LWPLSR, list_files, metrics, TpeIpls, reg_plot, resid_plot, Sk_Kmeans, DxRead, Hdbscan, read_dx, PlsProcess, PinardPlsr, Plsr
from utils.DATA_HANDLING import *
from utils.Miscellaneous import prediction, download_results, plot_spectra, local_css, desc_stats, hash_data, hist,data_split, pred_hist,background_img
from utils.Hash import create_hash, check_hash
from report import report
css_file = Path("style/")
pages_folder = Path("pages/")
from style import add_header, add_sidebar
# from style.header import add_header, add_sidebar
from config.config import pdflatex_path
local_css(css_file / "style.css")
\ No newline at end of file
from common import * from common import *
st.set_page_config(page_title="NIRS Utils", page_icon=":goat:", layout="wide")
from Packages import *
st.set_page_config(page_title="NIRS Utils", page_icon=":goat:", layout="wide",)
st.session_state["interface"] = st.session_state.get('interface')
#""" if st.session_state["interface"] == 'simple':
# hide_pages("Predictions") """
# from Modules import *
from mod import *
from utils.DATA_HANDLING import *
#Import Header
pages_folder = Path("pages/")
# Initialize session state
if 'form_submitted' not in st.session_state:
st.session_state['form_submitted'] = False
with st.container():
# Text input fields
st.subheader("Complete and save the following form with the data context:",divider="blue")
st.warning('Make sure that the form is well completed, because the reliability of the results depends mainly on it !', icon="⚠️")
with st.form(key = 'my_form'):
_,col1, col3,col2 = st.columns((0.1, 1.4,0.5,2))
with col1:
############## Project information ###########
st.subheader("Project information", divider="blue")
meta_project = st.text_input('Project name :')
meta_machine_ID = st.text_input('NIRS ID :',)
meta_scan_place_options = ["Pace", "Other"]
meta_scan_place ="Analysis Laboratory :", meta_scan_place_options)
meta_sample_species = st.text_input('Samples species (If relevant, provide the sample species; otherwise insert No):')
with col2:
clo3,_, col4,_ = st.columns([1,0.2,1,0.3])
with clo3:
############## The Nature of the Samples ###########
if '' in [meta_project, meta_machine_ID,meta_sample_species]: disabled1 = True
else: disabled1 = False
st.subheader("The Nature of the Samples",divider="blue")
meta_sample_category_options = ["Soil", "Plant", "Animal", "Other"]
meta_sample_category ="Samples category :", [""] + meta_sample_category_options)
meta_sample_sub_category_options = ["Green leaves", "Leaf litter", "Litter", "Humus", "Soil", "Animal part", "Animal Powder", "Fungal sample", "Other"]
meta_sample_sub_category ="Sample category description :", [""] + meta_sample_sub_category_options)
with col4:
st.subheader("The Physical State of the Samples",divider="blue")
meta_sample_humidity_options = ["Dry", "Fresh", "Wet"]
meta_sample_humidity ="Humidity state of the sample :", [""] + meta_sample_humidity_options)
meta_sample_pretreatment_options = ["Powder", "Pastile", "Liquid"]
meta_sample_pretreatment ="Type of sample pre-treatment :", [""] + meta_sample_pretreatment_options)
# Création du dictionnaire avec les données du formulaire
form_data = {
"meta_project": meta_project,
"meta_sample_species": meta_sample_species,
"meta_sample_category": meta_sample_category,
"meta_sample_pretreatment": meta_sample_pretreatment,
"meta_machine_ID": meta_machine_ID,
"meta_sample_sub_category": meta_sample_sub_category,
"meta_sample_humidity": meta_sample_humidity,
"meta_scan_place": meta_scan_place
submitted = st.form_submit_button(label='Save')
if submitted:
if '' not in form_data.values():
# Save the form data here
st.session_state['form_submitted'] = True
st.success('Form was saved successfully!', icon="")
# Enregistrement des données dans un fichier JSON
with open('form_data.json', 'w') as json_file:
json.dump(form_data, json_file)
if st.session_state['interface'] == 'simple':
header3, header4 = st.columns(2)
if header3.button("Samples Selection"):
st.switch_page(pages_folder / '')
if header4.button("Model Creation"):
st.switch_page(pages_folder / '')
elif st.session_state['interface'] == 'advanced':
header3, header4, header5 = st.columns(3)
if header3.button("Samples Selection"):
st.switch_page(pages_folder / '')
if header4.button("Model Creation"):
st.switch_page(pages_folder / '')
if header5.button("Prediction"):
st.switch_page(pages_folder / '')
st.error('Error: The form was not saved, please ensure the required fields are filled!')
from Packages import *
def add_header():
<div style="width: 100%;height: 130px;background-color: rgb(0,0,0,0);border: 4px solid rgb(122,176,199);padding: 1px;margin-bottom: 0px;border-radius: 20%; ">
<h2 style="font-family: 'Arial',d; text-align: center;color: #39bf55;">PACE - MEEB / CEFE</h1>
<h3 style="font-family: 'Arial';text-align: center; color: #2cb048;">NIRS Utils</h2>
.block-container {padding-top: 3rem;padding-bottom: 0rem;padding-left: 5rem;padding-right: 5rem;}
""", unsafe_allow_html=True)
def add_sidebar(pages_folder):
if 'interface' not in st.session_state:
st.session_state['interface'] = 'simple'
st.session_state["interface"] = st.session_state.get('interface')
# # TOC menu on the left
[Page("", "Home"),
Page(str(pages_folder / ""), "Inputs"),
Page(str(pages_folder / ""), "Samples Selection"),
Page(str(pages_folder / ""), "Models Creation & Predictions"),
with st.sidebar:
interface ="Interface", options=['simple', 'advanced'], key='interface')
# st.page_link(str(pages_folder / ''))
if st.session_state['interface'] == 'simple':
# st.page_link(str(pages_folder / ''))
# if advanced interface, split Models Creation and Predictions
elif st.session_state['interface'] == 'advanced':
[Page("", "Home"),
Page(str(pages_folder / ""), "Inputs"),
Page(str(pages_folder / ""), "Samples Selection"),
Page(str(pages_folder / ""), "Models Creation"),
Page(str(pages_folder / ""), "Predictions"),
# st.page_link(str(pages_folder / ''))
# st.page_link(str(pages_folder / ''))
from Packages import *
class AP:
def __init__(self, X):
## input matrix
self.__x = np.array(X)
# Fit PCA model
self.M = AffinityPropagation(damping=0.5, max_iter=200, convergence_iter=15, copy=True, preference=None,
affinity='euclidean', verbose=False, random_state=None)
self.yp = self.M.predict(self.__x)+1
def fit_optimal_(self):
clu = [f'cluster#{i}' for i in self.yp]
return self.__x, clu, self.M.cluster_centers_
\ No newline at end of file
from Packages import *
import jcamp as jc
class DxRead:
'''This module is designed to help retrieve spectral data as well as metadata of smaples from jcamp file'''
def __init__(self, path):
#self.__path = path.replace('\\','/')
self.__path = path
self.__dxfile = jc.jcamp_readfile(self.__path)
# Access samples data
self.__nb = self.__dxfile['blocks'] # Get the total number of blocks = The total number of scanned samples
self.__list_of_blocks = self.__dxfile['children'] # Store all blocks within a a list
self.__wl = self.__list_of_blocks[0]["x"] # Wavelengths/frequencies/range
# Start retreiving the data
specs = np.zeros((self.__nb, len(self.__list_of_blocks[0]["y"])), dtype=float) # preallocate a np matrix for sotoring spectra
self.idx = np.arange(self.__nb) # This list is designed to store samples name
self.__met = {}
for i in range(self.__nb): # Loop over the blocks
specs[i] = self.__list_of_blocks[i]['y']
block = self.__list_of_blocks[i]
block_met = { 'name': block['title'],
'origin': block['origin'],
'date': block['date'],
#'time': block['time'],
'spectrometer': block['spectrometer/data system'].split('\n$$')[0],
'n_scans':block['spectrometer/data system'].split('\n$$')[6].split('=')[1],
'resolution': block['spectrometer/data system'].split('\n$$')[8].split('=')[1],
#'instrumental parameters': block['instrumental parameters'],
'xunits': block['xunits'],
'yunits': block['yunits'],
#'xfactor': block['xfactor'],
#'yfactor': block['yfactor'],
'firstx': block['firstx'],
'lastx': block['lastx'],
#'miny': block['miny'],
#'maxy': block['maxy'],
'npoints': block['npoints'],
self.__met[f'{i}'] = block_met
self.metadata_ = DataFrame(self.__met).T
self.spectra = DataFrame(np.fliplr(specs), columns= self.__wl[::-1], index = self.metadata_['name']) # Storing spectra in a dataframe
#### Concentrarions
self.pattern = r"\(([^,]+),(\d+(\.\d+)?),([^)]+)"
aa = self.__list_of_blocks[0]['concentrations']
a = '\n'.join(line for line in aa.split('\n') if "NCU" not in line and "<<undef>>" not in line)
n_elements = a.count('(')
## Get the name of analyzed chamical elements
elements_name = []
for match in re.findall(self.pattern, a):
## Retrieve concentrationds
df = self.metadata_['concentrations']
cc = {}
for i in range(self.metadata_.shape[0]):
cc[df.index[i]] = self.conc(df[str(i)])
### dataframe conntaining chemical data
self.chem_data = DataFrame(cc, index=elements_name).T.astype(float)
self.chem_data.index = self.metadata_['name']
### Method for retrieving the concentration of a single sample
def conc(self,sample):
prep = '\n'.join(line for line in sample.split('\n') if "NCU" not in line and "<<undef>>" not in line)
c = []
for match in re.findall(self.pattern, prep):
concentration = np.array(c)
return concentration
def specs_df_(self):
return self.spectra
def md_df_(self):
me = self.metadata_.drop("concentrations", axis = 1)
me = me.drop(me.columns[(me == '').all()], axis = 1)
return me
def md_df_st_(self):
rt = ['origin','date']
cl = self.metadata_.loc[:,rt]
return cl
def chem_data_(self):
return self.chem_data
def read_dx(file):
M = DxRead(file)
return M.chem_data, M.specs_df_, M.md_df_, M.md_df_st_
\ No newline at end of file
from Packages import *
class metrics:
def __init__(self, c:Optional[float] = None, cv:Optional[List] = None, t:Optional[List] = None, method = 'regression')-> DataFrame:
phase = [c, cv, t]
index = np.array(["train", "cv", "test"])
notnone = [i for i in range(3) if phase[i] != None]
met_index = index[notnone]
methods = ['regression', 'classification']
perf = {}
for i in notnone:
if method == 'regression':
perf[index[i]] = metrics.reg_(phase[i][0], phase[i][1])
elif method == 'classification':
perf[index[i]] = metrics.class_(phase[i][0], phase[i][1])
if notnone == 1:
self.ret = perf.T
self.ret = DataFrame(perf).T
def reg_(meas, pred):
meas = np.array(meas)
pred = np.array(pred)
xbar = np.mean(meas) # the average of measured values
e = np.subtract(meas , pred)
e2 = e**2# the squared error
# Sum of squared:
sst = np.sum((meas - xbar)**2)
ssr = np.sum(e2)
ssm = np.sum(pred - xbar)
# Compute statistical metrics
metr = {}
metr['r'] = np.corrcoef(meas, pred)[0, 1]
metr['r2'] = 1-ssr/sst
metr['rmse'] = np.sqrt(np.mean(e2))
metr['mae'] = np.mean(np.abs(e2))
metr['rpd'] = np.std(meas)/np.sqrt(np.mean(e2))
metr['rpiq'] = (np.quantile(meas, .75) - np.quantile(meas, .25))/np.sqrt(np.mean(e2))
return metr
def class_(meas, pred):
def scores_(self):
return self.ret
\ No newline at end of file
from Packages import *
class Hdbscan:
"""Runs an automatically optimized sklearn.HDBSCAN clustering on dimensionality reduced space.
The HDBSCAN_scores_ @Property returns the cluster number of each sample (_labels) and the DBCV best score.
_labels (DataFrame): DataFrame with the cluster belonging number for each sample
_hdbscan_score (float): a float with the best DBCV score after optimization
- clustering = HDBSCAN((data)
- scores = clustering.HDBSCAN_scores_
def __init__(self, data):
"""Initiate the HDBSCAN calculation
data (DataFrame): the Dimensionality reduced space, raw result of the
param_dist (dictionary): the HDBSCAN optimization parameters to test
_score (DataFrame): is a dataframe with the DBCV value for each combination of param_dist. We search for the higher value to then compute an HDBSCAN with the best parameters.
# Really fast
self._param_dist = {'min_samples': [8],
'metric' : ['euclidean'],#,'manhattan'],
# Medium
# self._param_dist = {'min_samples': [1,10],
# 'min_cluster_size':[5,50],
# 'metric' : ['euclidean','manhattan'],
# }
# Complete
# self._param_dist = {'min_samples': [1,5,10,],
# 'min_cluster_size':[5,25,50,],
# 'metric' : ['euclidean','manhattan'],
# }
self._clusterable_embedding = data
# RandomizedSearchCV not working...
# def scoring(model, clusterable_embedding):
# label = HDBSCAN().fit_predict(clusterable_embedding)
# hdbscan_score = DBCV(clusterable_embedding, label, dist_function=euclidean)
# return hdbscan_score
# tunning = RandomizedSearchCV(estimator=HDBSCAN(), param_distributions=param_dist, scoring=scoring)
# return tunning
# compute optimization. Test each combination of parameters and store DBCV score into _score.
# self._score = DataFrame()
# for i in self._param_dist.get('min_samples'):
# for j in self._param_dist.get('min_cluster_size'):
# self._ij_label = HDBSCAN(min_samples=i, min_cluster_size=j).fit_predict(self._clusterable_embedding)
# self._ij_hdbscan_score = self.DBCV(self._clusterable_embedding, self._ij_label,)# dist_function=euclidean)
#[i,j] = self._ij_hdbscan_score
# get the best DBCV score
# self._hdbscan_bscore = max(self._score.max())
# find the coordinates of the best clustering parameters and run HDBSCAN below
# self._bparams = np.where(self._score == self._hdbscan_bscore)
# run HDBSCAN with best params
# self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][self._bparams[0][0]], min_cluster_size=self._param_dist['min_cluster_size'][self._bparams[1][0]], metric=self._param_dist['metric'][self._bparams[1][0]], store_centers="medoid", )
self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][0], min_cluster_size=self._param_dist['min_cluster_size'][0], metric=self._param_dist['metric'][0], store_centers="medoid", )
self._labels = self.best_hdbscan.labels_
self._centers = self.best_hdbscan.medoids_
# def DBCV(self, X, labels, dist_function=euclidean):
# """
# Implimentation of Density-Based Clustering Validation "DBCV"
# Citation: Moulavi, Davoud, et al. "Density-based clustering validation."
# Proceedings of the 2014 SIAM International Conference on Data Mining.
# Society for Industrial and Applied Mathematics, 2014.
# Density Based clustering validation
# Args:
# X (np.ndarray): ndarray with dimensions [n_samples, n_features]
# data to check validity of clustering
# labels (np.array): clustering assignments for data X
# dist_dunction (func): function to determine distance between objects
# func args must be [np.array, np.array] where each array is a point
# Returns:
# cluster_validity (float): score in range[-1, 1] indicating validity of clustering assignments
# """
# graph = self._mutual_reach_dist_graph(X, labels, dist_function)
# mst = self._mutual_reach_dist_MST(graph)
# cluster_validity = self._clustering_validity_index(mst, labels)
# return cluster_validity
# def _core_dist(self, point, neighbors, dist_function):
# """
# Computes the core distance of a point.
# Core distance is the inverse density of an object.
# Args:
# point (np.array): array of dimensions (n_features,)
# point to compute core distance of
# neighbors (np.ndarray): array of dimensions (n_neighbors, n_features):
# array of all other points in object class
# dist_dunction (func): function to determine distance between objects
# func args must be [np.array, np.array] where each array is a point
# Returns: core_dist (float)
# inverse density of point
# """
# n_features = np.shape(point)[0]
# n_neighbors = np.shape(neighbors)[0]
# distance_vector = cdist(point.reshape(1, -1), neighbors)
# distance_vector = distance_vector[distance_vector != 0]
# numerator = ((1/distance_vector)**n_features).sum()
# core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features)
# return core_dist
# def _mutual_reachability_dist(self, point_i, point_j, neighbors_i,
# neighbors_j, dist_function):
# """.
# Computes the mutual reachability distance between points
# Args:
# point_i (np.array): array of dimensions (n_features,)
# point i to compare to point j
# point_j (np.array): array of dimensions (n_features,)
# point i to compare to point i
# neighbors_i (np.ndarray): array of dims (n_neighbors, n_features):
# array of all other points in object class of point i
# neighbors_j (np.ndarray): array of dims (n_neighbors, n_features):
# array of all other points in object class of point j
# dist_function (func): function to determine distance between objects
# func args must be [np.array, np.array] where each array is a point
# Returns:
# mutual_reachability (float)
# mutual reachability between points i and j
# """
# core_dist_i = self._core_dist(point_i, neighbors_i, dist_function)
# core_dist_j = self._core_dist(point_j, neighbors_j, dist_function)
# dist = dist_function(point_i, point_j)
# mutual_reachability = np.max([core_dist_i, core_dist_j, dist])
# return mutual_reachability
# def _mutual_reach_dist_graph(self, X, labels, dist_function):
# """
# Computes the mutual reach distance complete graph.
# Graph of all pair-wise mutual reachability distances between points
# Args:
# X (np.ndarray): ndarray with dimensions [n_samples, n_features]
# data to check validity of clustering
# labels (np.array): clustering assignments for data X
# dist_dunction (func): function to determine distance between objects
# func args must be [np.array, np.array] where each array is a point
# Returns: graph (np.ndarray)
# array of dimensions (n_samples, n_samples)
# Graph of all pair-wise mutual reachability distances between points.
# """
# n_samples = np.shape(X)[0]
# graph = []
# counter = 0
# for row in range(n_samples):
# graph_row = []
# for col in range(n_samples):
# point_i = X[row]
# point_j = X[col]
# class_i = labels[row]
# class_j = labels[col]
# members_i = self._get_label_members(X, labels, class_i)
# members_j = self._get_label_members(X, labels, class_j)
# dist = self._mutual_reachability_dist(point_i, point_j,
# members_i, members_j,
# dist_function)
# graph_row.append(dist)
# counter += 1
# graph.append(graph_row)
# graph = np.array(graph)
# return graph
# def _mutual_reach_dist_MST(self, dist_tree):
# """
# Computes minimum spanning tree of the mutual reach distance complete graph
# Args:
# dist_tree (np.ndarray): array of dimensions (n_samples, n_samples)
# Graph of all pair-wise mutual reachability distances
# between points.
# Returns: minimum_spanning_tree (np.ndarray)
# array of dimensions (n_samples, n_samples)
# minimum spanning tree of all pair-wise mutual reachability
# distances between points.
# """
# mst = minimum_spanning_tree(dist_tree).toarray()
# return mst + np.transpose(mst)
# def _cluster_density_sparseness(self, MST, labels, cluster):
# """
# Computes the cluster density sparseness, the minimum density
# within a cluster
# Args:
# MST (np.ndarray): minimum spanning tree of all pair-wise
# mutual reachability distances between points.
# labels (np.array): clustering assignments for data X
# cluster (int): cluster of interest
# Returns: cluster_density_sparseness (float)
# value corresponding to the minimum density within a cluster
# """
# indices = np.where(labels == cluster)[0]
# cluster_MST = MST[indices][:, indices]
# cluster_density_sparseness = np.max(cluster_MST)
# return cluster_density_sparseness
# def _cluster_density_separation(self, MST, labels, cluster_i, cluster_j):
# """
# Computes the density separation between two clusters, the maximum
# density between clusters.
# Args:
# MST (np.ndarray): minimum spanning tree of all pair-wise
# mutual reachability distances between points.
# labels (np.array): clustering assignments for data X
# cluster_i (int): cluster i of interest
# cluster_j (int): cluster j of interest
# Returns: density_separation (float):
# value corresponding to the maximum density between clusters
# """
# indices_i = np.where(labels == cluster_i)[0]
# indices_j = np.where(labels == cluster_j)[0]
# shortest_paths = csgraph.dijkstra(MST, indices=indices_i)
# relevant_paths = shortest_paths[:, indices_j]
# density_separation = np.min(relevant_paths)
# return density_separation
# def _cluster_validity_index(self, MST, labels, cluster):
# """
# Computes the validity of a cluster (validity of assignmnets)
# Args:
# MST (np.ndarray): minimum spanning tree of all pair-wise
# mutual reachability distances between points.
# labels (np.array): clustering assignments for data X
# cluster (int): cluster of interest
# Returns: cluster_validity (float)
# value corresponding to the validity of cluster assignments
# """
# min_density_separation = np.inf
# for cluster_j in np.unique(labels):
# if cluster_j != cluster:
# cluster_density_separation = self._cluster_density_separation(MST,
# labels,
# cluster,
# cluster_j)
# if cluster_density_separation < min_density_separation:
# min_density_separation = cluster_density_separation
# cluster_density_sparseness = self._cluster_density_sparseness(MST,
# labels,
# cluster)
# numerator = min_density_separation - cluster_density_sparseness
# denominator = np.max([min_density_separation, cluster_density_sparseness])
# cluster_validity = numerator / denominator
# return cluster_validity
# def _clustering_validity_index(self, MST, labels):
# """
# Computes the validity of all clustering assignments for a
# clustering algorithm
# Args:
# MST (np.ndarray): minimum spanning tree of all pair-wise
# mutual reachability distances between points.
# labels (np.array): clustering assignments for data X
# Returns: validity_index (float):
# score in range[-1, 1] indicating validity of clustering assignments
# """
# n_samples = len(labels)
# validity_index = 0
# for label in np.unique(labels):
# fraction = np.sum(labels == label) / float(n_samples)
# cluster_validity = self._cluster_validity_index(MST, labels, label)
# validity_index += fraction * cluster_validity
# return validity_index
# def _get_label_members(self, X, labels, cluster):
# """
# Helper function to get samples of a specified cluster.
# Args:
# X (np.ndarray): ndarray with dimensions [n_samples, n_features]
# data to check validity of clustering
# labels (np.array): clustering assignments for data X
# cluster (int): cluster of interest
# Returns: members (np.ndarray)
# array of dimensions (n_samples, n_features) of samples of the
# specified cluster.
# """
# indices = np.where(labels == cluster)[0]
# members = X[indices]
# return members
def centers_(self):
# return self._labels, self._hdbscan_bscore, self._centers
return self._centers
def labels_(self):
labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels]
return labels
def non_clustered(self):
labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels]
non_clustered = np.where(np.array(labels) == 'Non clustered')[0]
return non_clustered
from Packages import *
class Sk_Kmeans:
"""K-Means clustering for Samples selection.
inertia_ (DataFrame): DataFrame with ...
x (DataFrame): Initial data
clu (DataFrame): Cluster name for each sample
model.cluster_centers_ (DataFrame): Coordinates of the center of each cluster
def __init__(self, x, max_clusters):
"""Initiate the KMeans class.
x (DataFrame): the original reduced data to cluster
max_cluster (Int): the max number of desired clusters.
self.x = x
self.max_clusters = max_clusters
self.inertia = DataFrame()
for i in range(1, max_clusters+1):
model = KMeans(n_clusters = i, init = 'k-means++', random_state = 42)
self.inertia[f'{i}_clust']= [model.inertia_]
self.inertia.index = ['inertia']
def inertia_(self):
return self.inertia
def suggested_n_clusters_(self):
idxidx = []
values = []
s = self.inertia.to_numpy().ravel()
for i in range(self.max_clusters-1):
values.append((s[i] - s[i+1])*100 / s[i])
id = np.max(np.where(np.array(values) > 5))+2
return id
def fit_optimal_(self):
model = KMeans(n_clusters = self.suggested_n_clusters_, init = 'k-means++', random_state = 42)
yp = model.predict(self.x)+1
clu = [f'cluster#{i}' for i in yp]
return self.x, clu, model.cluster_centers_
\ No newline at end of file
from Packages import *
from typing import Sequence, Dict, Optional, Union
class KS:
def __init__(self, x:Optional[Union[np.ndarray|DataFrame]], rset:Optional[Union[float|int]]):
self.x = x
self.ratio = rset
self._train, self._test = ks_train_test_split(self.x, train_size = self.ratio)
def calset(self):
clu = self._train.index.tolist()
return self.x, clu
class RDM:
def __init__(self, x:Optional[Union[np.ndarray|DataFrame]], rset:Optional[Union[float|int]]):
self.x = x
self.ratio = rset
self._train, self._test = train_test_split(self.x, train_size = self.ratio)
def calset(self):
clu = self._train.index.tolist()
return self.x, clu
\ No newline at end of file
from Packages import *
class Nmf:
def __init__(self, X, Ncomp=3):
## input matrix
if np.min(X)<0:
self.__x = np.array(X-np.min(X))
self.__x = np.array(X)
## set the number of components to compute and fit the model
self.__ncp = Ncomp
# Fit PCA model
Mo = NMF(n_components=self.__ncp, init=None, solver='cd', beta_loss='frobenius',
tol=0.0001, max_iter=300, random_state=None, alpha_W=0.0, alpha_H='same',
l1_ratio=0.0, verbose=0, shuffle=False)
# Results
self._p = Mo.components_.T
self._t = Mo.transform(self.__x)
def scores_(self):
return DataFrame(self._t)
def loadings_(self):
return DataFrame(self._p)
\ No newline at end of file
from Packages import *
class LinearPCA:
def __init__(self, X, Ncomp=10):
## input matrix
self.__x = np.array(X)
## set the number of components to compute and fit the model
self.__ncp = Ncomp
# Fit PCA model
M = PCA(n_components = self.__ncp)
######## results ########
# Results
self.__pcnames = [f'PC{i+1}({100 * M.explained_variance_ratio_[i].round(2)}%)' for i in range(self.__ncp)]
self._Qexp_ratio = DataFrame(100 * M.explained_variance_ratio_, columns = ["Qexp"], index= [f'PC{i+1}' for i in range(self.__ncp)])
self._p = M.components_.T
self._t = M.transform(self.__x)
self.eigvals = M.singular_values_**2
self.Lambda = np.diag(self.eigvals)
# Matrix reconstruction or prediction making
self.T2 = {}
self._xp = {}
self._qres = {}
self.leverage = {}
for i in range(self.__ncp):
# Matrix reconstruction- prediction
self._xp[i] =[:,:i+1], self._p.T[:i+1,:])
#self.T2[i] = np.diag(self._t[:,:i+1] @ np.transpose(self._t[:,:i+1]))
def scores_(self):
return DataFrame(self._t, columns= self.__pcnames)
def loadings_(self):
return DataFrame(self._p, columns=self.__pcnames)
def residuals_(self):
res = DataFrame(self._qres)
return res
\ No newline at end of file
from Packages import *
from utils.Miscellaneous import *
from utils.Evaluation_Metrics import metrics
class PinardPlsr:
def __init__(self, x_train, y_train, x_test, y_test):
self.x_train = x_train
self.x_test = x_test
self.y_train = y_train
self.y_test = y_test
# create model module with PINARD
# Declare preprocessing pipeline
svgolay = [ ('_sg1',pp.SavitzkyGolay()),
('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing
preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data
('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter
('derivate', pp.Derivate()), # Calculate the first derivative of the data
('SVG', FeatureUnion(svgolay))
# Declare complete pipeline
pipeline = Pipeline([
('scaler', MinMaxScaler()), # scaling the data
('preprocessing', FeatureUnion(preprocessing)), # preprocessing
('PLS', PLSRegression(n_components=14))])
# Estimator including y values scaling
estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler())
# Training
self.trained =, self.y_train)
# fit scores
# Predictions on test set
self.yc = DataFrame(self.trained.predict(self.x_train)) # make predictions on test data and assign to Y_preds variable
self.ycv = DataFrame(cross_val_predict(self.trained, self.x_train, self.y_train, cv = 3)) # make predictions on test data and assign to Y_preds variable = DataFrame(self.trained.predict(self.x_test)) # make predictions on test data and assign to Y_preds variable
def model_(self):
return self.trained
def pred_data_(self):
return self.yc, self.ycv,
\ No newline at end of file
from Packages import *
from utils import metrics
from utils.DATA_HANDLING import *
class PlsProcess:
SCORE = 100000000
index_export = DataFrame()
def __init__(self, x_train, x_test, y_train, y_test, scale, Kfold):
PlsProcess.SCORE = 10000
self.xtrain = x_train
self.xtest = x_test
self.y_train = y_train
self.y_test = y_test
self.scale = scale
self.Kfold = Kfold
self.model = None
self.p = self.xtrain.shape[1]
self.PLS_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]),
'deriv': hp.choice('deriv', [0, 1, 2]),
'window_length': hp.choice('window_length', [15, 19, 23, 27]),
'scatter': hp.choice('scatter', ['Snv', 'Non'])}
self.PLS_params['n_components'] = hp.randint("n_components", 2, 20)
def objective(self, params):
# Train the model
self.xtrain = eval(f'{params['scatter']}(self.xtrain)')
self.xtest = eval( f'{params['scatter']}(self.xtest)')
if params['deriv'] > params['polyorder'] or params['polyorder'] > params['window_length']:
params['deriv'] = 0
params['polyorder'] = 0
params['window_length'] = 1
self.x_train = self.xtrain
self.x_test = self.xtest
self.x_train = DataFrame(eval(f'savgol_filter(self.xtrain, polyorder={params['polyorder']}, deriv={params['deriv']}, window_length = {params['window_length']})'),
columns = self.xtrain.columns, index= self.xtrain.index)
self.x_test = DataFrame(eval(f'savgol_filter(self.xtest, polyorder={params['polyorder']}, deriv={params['deriv']}, window_length = {params['window_length']})'), columns = self.xtest.columns , index= self.xtest.index)
Model = PLSRegression(scale = self.scale, n_components = params['n_components']), self.y_train)
except ValueError as ve:
params["n_components"] = 1
Model = PLSRegression(scale = self.scale, n_components = params["n_components"]), self.y_train)
## make prediction
yc = Model.predict(self.x_train).reshape(-1)
ycv = cross_val_predict(Model, self.x_train, self.y_train, cv=self.Kfold, n_jobs=-1).reshape(-1)
yt = Model.predict(self.x_test).reshape(-1)
rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv))
rmsec = np.sqrt(mean_squared_error(self.y_train, yc))
rmset = np.sqrt(mean_squared_error(self.y_test, yt))
score = rmsecv/rmsec*np.round(rmset/rmsecv)*rmsecv*100/self.y_train.mean()*rmset*1000/self.y_test.mean()
if score < PlsProcess.SCORE-0.5 :
PlsProcess.SCORE = score
self.nlv = params['n_components'] = params
self.model = Model
self.yc = yc
self.ycv = ycv = yt
return score
def tune(self, n_iter):
trials = Trials()
best_params = fmin(fn=self.objective,
algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
def best_hyperparams(self):
self.b = {'Scatter'['scatter'], 'Saitzky-Golay derivative parameters':{'polyorder'['polyorder'],
return self.b
def model_(self):
return self.model
def pred_data_(self):
return self.yc, self.ycv,
\ No newline at end of file
from Packages import *
from utils import metrics, Snv, No_transformation, KF_CV, sel_ratio
class Regmodel(object):
def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 3, **kwargs):
self.SCORE = 100000000
self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1]
self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1]
self._model, self._best = None, None
self._yc, self._ycv, self._yt = None, None, None
self._cv_df = DataFrame()
self._sel_ratio = DataFrame()
self._nfolds = nfolds
self._selected_bands = DataFrame(index = ['from', 'to'])
self.important_features = None
self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]),
'deriv': hp.choice('deriv', [0, 1, 2]),
'window_length': hp.choice('window_length', [15, 21, 27, 33]),
'normalization': hp.choice('normalization', ['Snv', 'No_transformation'])}
if add_hyperparams is not None:
self._best = None
trials = Trials()
best_params = fmin(fn=self.objective,
algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
def train_data_(self):
return [self._xc, self._ytrain]
def test_data_(self):
return [self._xt, self._ytest]
def pretreated_spectra_(self):
return self.pretreated
def get_params_(self):### This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters
return self._hyper_params
def objective(self, params):
def best_hyperparams_(self): ### This method returns the subset of selected hyperparametes
return self._best
def best_hyperparams_print(self):### This method returns a sentence telling what signal preprocessing method was applied
if self._best['normalization'] == 'Snv':
a = 'Standard Normal Variate (SNV)'
elif self._best['normalization'] == 'No_transformation':
a = " No transformation was performed"
SG = f'- Savitzky-Golay derivative parameters \:(Window_length:{self._best['window_length']}; polynomial order: {self._best['polyorder']}; Derivative order : {self._best['deriv']})'
Norm = f'- Spectral Normalization \: {a}'
return SG+"\n"+Norm
def model_(self): # This method returns the developed model
return self._model
def pred_data_(self): ## this method returns the predicted data in training and testing steps
return self._yc, self._yt
def cv_data_(self): ## Cross validation data
return self._ycv
def CV_results_(self):
return self._cv_df
def important_features_(self):
return self.important_features
def selected_features_(self):
return self._selected_bands
def sel_ratio_(self):
return self._sel_ratio
########################################### PLSR #########################################
class Plsr(Regmodel):
def __init__(self, train, test, n_iter = 10, cv = 3):
super().__init__(train, test, n_iter, nfolds = cv, add_hyperparams = {'n_components': hp.randint('n_components', 1,20)})
### parameters in common
def objective(self, params):
params['n_components'] = int(params['n_components'])
x0 = [self._xc, self._xt]
x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)]
a, b, c = params['deriv'], params['polyorder'], params['window_length']
if a > b or b > c:
if self._best is not None:
a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']
a, b, c = 0, 0, 1
params['deriv'], params['polyorder'], params['window_length'] = a, b, c
x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
model = PLSRegression(scale = False, n_components = params['n_components'])
folds = KF_CV().CV(x = x2[0], y = np.array(self._ytrain), n_folds = self._nfolds)
yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = x2[0], y = np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
score = self._cv_df.loc["cv",'rmse']
Model = PLSRegression(scale = False, n_components = params['n_components'])[0], self._ytrain)
if self.SCORE > score:
self.SCORE = score
self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds)
self._yc = Model.predict(x2[0])
self._yt = Model.predict(x2[1])
self._model = Model
for key,value in params.items():
try: params[key] = int(value)
except (TypeError, ValueError): params[key] = value
self._best = params
self.pretreated = DataFrame(x2[0])
self._sel_ratio = sel_ratio(Model, x2[0])
return score
############################################ iplsr #########################################
class TpeIpls(Regmodel):
def __init__(self, train, test, n_iter = 10, n_intervall = 5, cv = 3):
self.n_intervall = n_intervall
self.n_arrets = self.n_intervall*2
r = {'n_components': hp.randint('n_components', 1,20)}
r.update({f'v{i}': hp.randint(f'v{i}', 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)})
super().__init__(train, test, n_iter, add_hyperparams = r, nfolds = cv)
### parameters in common
def objective(self, params):
### wevelengths index
self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)]
arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)]
id = np.unique(np.concatenate(arrays, axis=0), axis=0)
### Preprocessing
x0 = [self._xc, self._xt]
x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)]
a, b, c = params['deriv'], params['polyorder'], params['window_length']
if a > b or b > c:
if self._best is not None:
a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']
a, b, c = 0, 0, 1
params['deriv'], params['polyorder'], params['window_length'] = a, b, c
x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)]
prepared_data = [x2[i][:,id] for i in range(2)]
### Modelling
folds = KF_CV().CV(x = prepared_data[0], y = np.array(self._ytrain), n_folds = self._nfolds)
model = PLSRegression(scale = False, n_components = params['n_components'])
yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
except ValueError as ve:
params["n_components"] = 1
model = PLSRegression(scale = False, n_components = params["n_components"])
yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain))
self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1]
score = self._cv_df.loc['cv','rmse']
Model = PLSRegression(scale = False, n_components = model.n_components)[0], self._ytrain)
if self.SCORE > score:
self.SCORE = score
self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds)
self._yc = Model.predict(prepared_data[0])
self._yt = Model.predict(prepared_data[1])
self._model = Model
for key,value in params.items():
try: params[key] = int(value)
except (TypeError, ValueError): params[key] = value
self._best = params
self.pretreated = DataFrame(x2[0])
self.segments = arrays
for i in range(len(self.segments)):
self._selected_bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]]
self._selected_bands.index = ['from','to']
return score
########################################### LWPLSR #########################################
############################################ Pcr #########################################
class Pcr(Regmodel):
def __init__(self, train, test, n_iter = 10, n_val = 5):
{f'pc{i}': hp.randint(f'pc{i+1}', 0, train[0].shape[1]) for i in range(self.n_val)}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment