diff --git a/src/style/layout.py b/src/style/layout.py new file mode 100644 index 0000000000000000000000000000000000000000..f4fe35f5bc2f2b9e89fe8f8a53754c0ccdb6f290 --- /dev/null +++ b/src/style/layout.py @@ -0,0 +1,104 @@ +# from packages import * +import streamlit as st + +## load the custom CSS in the style folder +@st.cache_data +def local_css(file_name): + import streamlit as st + with open(file_name) as f: + st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) + #~~~~~~~~~~~~~~~~~~~~~~~~~~~ background image ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +@st.cache_data +def BackgroundImg(change): + import base64 + import streamlit as st + image_path = './images/img-sky.jpg' + with open(image_path, "rb") as image_file: + base64_image= base64.b64encode(image_file.read()).decode('utf-8') + + + # CSS code to set the background image + # Get the base64-encoded image + + # CSS code to set the background image + background_image_style = f""" + <style> + .stApp {{ + background-image: url("data:image/jpeg;base64,{base64_image}"); + background-size: cover; + background-repeat: no-repeat; + background-attachment: fixed; + }} + </style> + """ + + # Inject the CSS style + st.markdown(background_image_style, unsafe_allow_html=True) + + + #~~~~~~~~~~~~~~~~~~~~~~~~~~~ add header ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +def add_header(): + import streamlit as st + st.markdown( + """ + <div style="width: 100%;height: 170px; background-color: rgb(0,0,0,0);border: 4px solid rgb(122,176,199); padding: 0px; margin-bottom: 10px;border-radius: 20%; "> + <h1 style="font-family: 'Arial',d;text-align: center; color: #39bf55;">PACE - MEEB / CEFE</h1> + <h2 style="font-family: 'Arial';text-align: center; color: #2cb048;">NIRS Utils</h2> + </div> + """, + unsafe_allow_html=True, + ) + + st.markdown(""" + <style> + .block-container { + padding-top: 3rem; + padding-bottom: 0rem; + padding-left: 5rem; + padding-right: 5rem; + } + </style> + """, unsafe_allow_html=True) + + #~~~~~~~~~~~~~~~~~~~~~~~~~~~ add sidebar ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +def add_sidebar(pages_folder): + import streamlit as st + from st_pages import Page, show_pages + # from st_pages import Page, Section, show_pages, add_page_title, hide_pages + + + if 'interface' not in st.session_state: + st.session_state['interface'] = 'simple' + else: + st.session_state["interface"] = st.session_state.get('interface') + + # # TOC menu on the left + show_pages( + [Page("app.py", "Home"), + Page(str(pages_folder / "0-inputs.py"), "Inputs"), + Page(str(pages_folder / "1-samples_selection.py"), "Samples Selection"), + Page(str(pages_folder / "2-model_creation.py"), "Models Creation & Predictions"), + + ] + ) + + with st.sidebar: + interface = st.radio(label="Interface", options=['simple', 'advanced'], key='interface') + # st.page_link(str(pages_folder / '1-samples_selection.py')) + if st.session_state['interface'] == 'simple': + # st.page_link(str(pages_folder / '2-model_creation.py')) + pass + # if advanced interface, split Models Creation and Predictions + elif st.session_state['interface'] == 'advanced': + show_pages( + [Page("app.py", "Home"), + Page(str(pages_folder / "0-inputs.py"), "Inputs"), + Page(str(pages_folder / "1-samples_selection.py"), "Samples Selection"), + Page(str(pages_folder / "2-model_creation.py"), "Models Creation"), + Page(str(pages_folder / "3-prediction.py"), "Predictions"), + ] + ) + # st.page_link(str(pages_folder / '2-model_creation.py')) + # st.page_link(str(pages_folder / '3-prediction.py')) diff --git a/src/utils/Miscellaneous.py b/src/utils/Miscellaneous.py index 6f09fc10112f690e30530ece61a3a3379e311276..70f03b331278b452e922319e7d886373d8d35adc 100644 --- a/src/utils/Miscellaneous.py +++ b/src/utils/Miscellaneous.py @@ -1,11 +1,5 @@ -from Packages import * +from packages import * -# local CSS -## load the custom CSS in the style folder -@st.cache_data -def local_css(file_name): - with open(file_name) as f: - st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) # predict module def prediction(NIRS_csv, qsep, qhdr, model): @@ -20,149 +14,11 @@ def prediction(NIRS_csv, qsep, qhdr, model): return Y_preds -@st.cache_data -def reg_plot( meas, pred, train_idx, test_idx): - a0 = np.ones(2) - a1 = np.ones(2) - - for i in range(len(meas)): - meas[i] = np.array(meas[i]).reshape(-1, 1) - pred[i] = np.array(pred[i]).reshape(-1, 1) - - M = LinearRegression() - M.fit(meas[i], pred[i]) - a1[i] = np.round(M.coef_[0][0],2) - a0[i] = np.round(M.intercept_[0],2) - - ec = np.subtract(np.array(meas[0]).reshape(-1), np.array(pred[0]).reshape(-1)) - et = np.subtract(np.array(meas[1]).reshape(-1), np.array(pred[1]).reshape(-1)) - - fig, ax = plt.subplots(figsize = (12,4)) - sns.regplot(x = meas[0] , y = pred[0], color="#2C6B6F", label = f'Cal (Predicted = {a0[0]} + {a1[0]} x Measured)', scatter_kws={'edgecolor': 'black'}) - sns.regplot(x = meas[1], y = pred[1], color='#d0f7be', label = f'Val (Predicted = {a0[1]} + {a1[1]} x Measured)', scatter_kws={'edgecolor': 'black'}) - plt.plot([np.min(meas[0]) - 0.05, np.max([meas[0]]) + 0.05], [np.min(meas[0]) - 0.05, np.max([meas[0]]) + 0.05], color = 'black') - - for i, txt in enumerate(train_idx): - #plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i],ec[i])) - if np.abs(ec[i])> np.mean(ec)+ 3*np.std(ec): - plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i], np.array(pred[0]).reshape(-1)[i])) - - for i, txt in enumerate(test_idx): - if np.abs(et[i])> np.mean(et)+ 3*np.std(et): - plt.annotate(txt ,(np.array(meas[1]).reshape(-1)[i], np.array(pred[1]).reshape(-1)[i])) - - ax.set_ylabel('Predicted values') - ax.set_xlabel('Measured values') - plt.legend() - plt.margins(0) - # fig.savefig('./report/figures/measured_vs_predicted.png') - return fig - -@st.cache_data -def resid_plot( meas, pred, train_idx, test_idx): - a0 = np.ones(2) - a1 = np.ones(2) - e = [np.subtract(meas[0] ,pred[0]), np.subtract(meas[1], pred[1])] - - for i in range(len(meas)): - M = LinearRegression() - M.fit( np.array(meas[i]).reshape(-1,1), np.array(e[i]).reshape(-1,1)) - a1[i] = np.round(M.coef_[0],2) - a0[i] = np.round(M.intercept_,2) - - - fig, ax = plt.subplots(figsize = (12,4)) - sns.scatterplot(x = pred[0], y = e[0], color="#2C6B6F", label = f'Cal', edgecolor="black") - sns.scatterplot(x = pred[1], y = e[1], color="#d0f7be", label = f'Val', edgecolor="black") - - # sns.scatterplot(x = pred[0], y = e[0], color='blue', label = f'Cal (Residual = {a0[0]} + {a1[0]} * Predicted)') - # sns.scatterplot(x = pred[1], y = e[1], color='green', label = f'Val (Residual = {a0[1]} + {a1[1]} * Predicted)') - plt.axhline(y= 0, c ='black', linestyle = ':') - lim = np.max(abs(np.concatenate([e[0], e[1]], axis = 0)))*1.1 - plt.ylim(- lim, lim ) - - - for i in range(2): - e[i] = np.array(e[i]).reshape(-1,1) - - for i, txt in enumerate(train_idx): - #plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i],ec[i])) - if np.abs(e[0][i])> np.mean(e[0])+ 3*np.std(e[0]): - plt.annotate(txt ,(np.array(pred[0]).reshape(-1)[i],e[0][i])) - - for i, txt in enumerate(test_idx): - if np.abs(e[1][i])> np.mean(e[1])+ 3*np.std(e[1]): - plt.annotate(txt ,(np.array(pred[1]).reshape(-1)[i],e[1][i])) - ax.set_xlabel(f'{ train_idx.shape}') - ax.set_ylabel('Residuals') - ax.set_xlabel('Predicted values') - plt.legend() - plt.margins(0) - # fig.savefig('./report/figures/residuals_plot.png') - return fig - - - # function that create a download button - needs the data to save and the file name to store to def download_results(data, export_name): with open(data) as f: st.download_button('Download', f, export_name, type='primary') -@st.cache_data -def plot_spectra(specdf, xunits, yunits): - fig, ax = plt.subplots(figsize = (30,7)) - if isinstance(specdf.columns[0], str): - specdf.T.plot(legend=False, ax = ax, color = '#2474b4') - min = 0 - else: - min = np.max(specdf.columns) - specdf.T.plot(legend=False, ax = ax, color = '#2474b4').invert_xaxis() - - ax.set_xlabel(xunits, fontsize=30) - ax.set_ylabel(yunits, fontsize=30) - plt.margins(x = 0) - plt.tight_layout() - return fig - -@st.cache_data -def hist(y, y_train, y_test, target_name = 'y'): - fig, ax = plt.subplots(figsize = (12,3)) - sns.histplot(y, color = "#004e9e", kde = True, label = str(target_name), ax = ax, fill = True) - sns.histplot(y_train, color = "#2C6B6F", kde = True, label = str(target_name)+" (Cal)", ax = ax, fill = True) - sns.histplot(y_test, color = "#d0f7be", kde = True, label = str(target_name)+" (Val)", ax = ax, fill = True) - ax.set_xlabel(str(target_name)) - plt.legend() - plt.tight_layout() - return fig - - -@st.cache_data -def pred_hist(pred): - # Creating histogram - hist, axs = plt.subplots(1, 1, figsize =(15, 3), - tight_layout = True) - - # Add x, y gridlines - axs.grid( color ='grey', linestyle ='-.', linewidth = 0.5, alpha = 0.6) - # Remove axes splines - for s in ['top', 'bottom', 'left', 'right']: - axs.spines[s].set_visible(False) - # Remove x, y ticks - axs.xaxis.set_ticks_position('none') - axs.yaxis.set_ticks_position('none') - # Add padding between axes and labels - axs.xaxis.set_tick_params(pad = 5) - axs.yaxis.set_tick_params(pad = 10) - # Creating histogram - N, bins, patches = axs.hist(pred, bins = 12) - return hist - -@st.cache_data -def fig_export(): - pass - - - @st.cache_data(show_spinner =True) def data_split(x, y): # Split data into training and test sets using the kennard_stone method and correlation metric, 25% of data is used for testing @@ -188,66 +44,59 @@ def desc_stats(x): return a -def hash_data(data): - import xxhash - """Hash various data types using MD5.""" - - # Convert to a string representation - if isinstance(data, DataFrame): - data_str = data.to_string() - elif isinstance(data, Series): - data_str = data.to_string() - elif isinstance(data, np.ndarray): - data_str = np.array2string(data, separator=',') - elif isinstance(data, (list, tuple)): - data_str = str(data) - elif isinstance(data, dict): - # Ensure consistent order for dict items - data_str = str(sorted(data.items())) - elif isinstance(data, (int, float, str, bool)): - data_str = str(data) - elif isinstance(data, bytes): - data_str = data.decode('utf-8', 'ignore') # Decode bytes to string - elif isinstance(data, str): # Check if it's a string representing file content - data_str = data - else: - raise TypeError(f"Unsupported data type: {type(data)}") - - # Encode the string to bytes - data_bytes = data_str.encode() + +def ObjectHash(current = None, add = None): + def DatatoStr(data): + from pandas import DataFrame, Series + import numpy as np + """Hash various data types using MD5.""" + + # Convert to a string representation + if isinstance(data, DataFrame): + data_str = data.to_string() + elif isinstance(data, Series): + data_str = data.to_string() + elif isinstance(data, np.ndarray): + data_str = np.array2string(data, separator=',') + elif isinstance(data, (list, tuple)): + data_str = str(data) + elif isinstance(data, dict): + # Ensure consistent order for dict items + data_str = str(sorted(data.items())) + elif isinstance(data, (int, float, str, bool)): + data_str = str(data) + elif isinstance(data, bytes): + data_str = data.decode('utf-8', 'ignore') # Decode bytes to string + elif isinstance(data, str): # Check if it's a string representing file content + data_str = data + else: + raise TypeError(f"Unsupported data type: {type(data)}") + + # Encode the string to bytes + data_bytes = data_str.encode() + return str(data_bytes) - # Compute the MD5 hash - md5_hash = xxhash.xxh32(data_bytes).hexdigest() + + import xxhash + if current == None and add == None: + object = "None" + print('Insert the object for which you want to compute the hash value.') + elif current != None and add != None: + object = DatatoStr(current)+ DatatoStr(add) + elif current == None and add != None: + object = DatatoStr(add) + elif current != None and add == None: + object = DatatoStr(current) + + # Compute the MD5 hash + md5_hash = xxhash.xxh32(object).hexdigest() return str(md5_hash) - -#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ style test - -@st.cache_data -def background_img(change): - import base64 - image_path = './images/img-sky.jpg' - with open(image_path, "rb") as image_file: - base64_image= base64.b64encode(image_file.read()).decode('utf-8') - - - # CSS code to set the background image - # Get the base64-encoded image - - # CSS code to set the background image - background_image_style = f""" - <style> - .stApp {{ - background-image: url("data:image/jpeg;base64,{base64_image}"); - background-size: cover; - background-repeat: no-repeat; - background-attachment: fixed; - }} - </style> - """ - - # Inject the CSS style - st.markdown(background_image_style, unsafe_allow_html=True) +def JointoMain(): + import os + for i in ['utils','style']: + import sys + sys.path.append(os.path.join(os.path.dirname(__file__), i)) diff --git a/src/utils/clustering.py b/src/utils/clustering.py new file mode 100644 index 0000000000000000000000000000000000000000..09a67a5d75e815c9322887acaa147bb591f11e53 --- /dev/null +++ b/src/utils/clustering.py @@ -0,0 +1,413 @@ +from packages import * + + + +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ kmeans ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class Sk_Kmeans: + """K-Means clustering for Samples selection. + + Returns: + inertia_ (DataFrame): DataFrame with ... + x (DataFrame): Initial data + clu (DataFrame): Cluster name for each sample + model.cluster_centers_ (DataFrame): Coordinates of the center of each cluster + """ + def __init__(self, x, max_clusters): + """Initiate the KMeans class. + + Args: + x (DataFrame): the original reduced data to cluster + max_cluster (Int): the max number of desired clusters. + """ + self.x = x + self.max_clusters = max_clusters + + self.inertia = DataFrame() + for i in range(1, max_clusters+1): + model = KMeans(n_clusters = i, init = 'k-means++', random_state = 42) + model.fit(x) + self.inertia[f'{i}_clust']= [model.inertia_] + self.inertia.index = ['inertia'] + + @property + def inertia_(self): + return self.inertia + + @property + def suggested_n_clusters_(self): + idxidx = [] + values = [] + + s = self.inertia.to_numpy().ravel() + for i in range(self.max_clusters-1): + idxidx.append(f'{i+1}_clust') + values.append((s[i] - s[i+1])*100 / s[i]) + + id = np.max(np.where(np.array(values) > 5))+2 + return id + + @property + def fit_optimal_(self): + model = KMeans(n_clusters = self.suggested_n_clusters_, init = 'k-means++', random_state = 42) + model.fit(self.x) + yp = model.predict(self.x)+1 + clu = [f'cluster#{i}' for i in yp] + + return self.x, clu, model.cluster_centers_ + + + + + + #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~hdbscan ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class Hdbscan: + """Runs an automatically optimized sklearn.HDBSCAN clustering on dimensionality reduced space. + + The HDBSCAN_scores_ @Property returns the cluster number of each sample (_labels) and the DBCV best score. + + Returns: + _labels (DataFrame): DataFrame with the cluster belonging number for each sample + _hdbscan_score (float): a float with the best DBCV score after optimization + + Examples: + - clustering = HDBSCAN((data) + - scores = clustering.HDBSCAN_scores_ + + """ + def __init__(self, data): + """Initiate the HDBSCAN calculation + + Args: + data (DataFrame): the Dimensionality reduced space, raw result of the UMAP.fit() + param_dist (dictionary): the HDBSCAN optimization parameters to test + _score (DataFrame): is a dataframe with the DBCV value for each combination of param_dist. We search for the higher value to then compute an HDBSCAN with the best parameters. + """ + # Really fast + self._param_dist = {'min_samples': [8], + 'min_cluster_size':[10], + 'metric' : ['euclidean'],#,'manhattan'], + } + # Medium + # self._param_dist = {'min_samples': [1,10], + # 'min_cluster_size':[5,50], + # 'metric' : ['euclidean','manhattan'], + # } + # Complete + # self._param_dist = {'min_samples': [1,5,10,], + # 'min_cluster_size':[5,25,50,], + # 'metric' : ['euclidean','manhattan'], + # } + + self._clusterable_embedding = data + + # RandomizedSearchCV not working... + # def scoring(model, clusterable_embedding): + # label = HDBSCAN().fit_predict(clusterable_embedding) + # hdbscan_score = DBCV(clusterable_embedding, label, dist_function=euclidean) + # return hdbscan_score + # tunning = RandomizedSearchCV(estimator=HDBSCAN(), param_distributions=param_dist, scoring=scoring) + # tunning.fit(clusterable_embedding) + # return tunning + + # compute optimization. Test each combination of parameters and store DBCV score into _score. + # self._score = DataFrame() + # for i in self._param_dist.get('min_samples'): + # for j in self._param_dist.get('min_cluster_size'): + # self._ij_label = HDBSCAN(min_samples=i, min_cluster_size=j).fit_predict(self._clusterable_embedding) + # self._ij_hdbscan_score = self.DBCV(self._clusterable_embedding, self._ij_label,)# dist_function=euclidean) + # self._score.at[i,j] = self._ij_hdbscan_score + # get the best DBCV score + # self._hdbscan_bscore = max(self._score.max()) + # find the coordinates of the best clustering parameters and run HDBSCAN below + # self._bparams = np.where(self._score == self._hdbscan_bscore) + # run HDBSCAN with best params + + # self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][self._bparams[0][0]], min_cluster_size=self._param_dist['min_cluster_size'][self._bparams[1][0]], metric=self._param_dist['metric'][self._bparams[1][0]], store_centers="medoid", ) + self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][0], min_cluster_size=self._param_dist['min_cluster_size'][0], metric=self._param_dist['metric'][0], store_centers="medoid", ) + self.best_hdbscan.fit_predict(self._clusterable_embedding) + self._labels = self.best_hdbscan.labels_ + self._centers = self.best_hdbscan.medoids_ + + + # def DBCV(self, X, labels, dist_function=euclidean): + # """ + # Implimentation of Density-Based Clustering Validation "DBCV" + # + # Citation: Moulavi, Davoud, et al. "Density-based clustering validation." + # Proceedings of the 2014 SIAM International Conference on Data Mining. + # Society for Industrial and Applied Mathematics, 2014. + # + # Density Based clustering validation + # + # Args: + # X (np.ndarray): ndarray with dimensions [n_samples, n_features] + # data to check validity of clustering + # labels (np.array): clustering assignments for data X + # dist_dunction (func): function to determine distance between objects + # func args must be [np.array, np.array] where each array is a point + # + # Returns: + # cluster_validity (float): score in range[-1, 1] indicating validity of clustering assignments + # """ + # graph = self._mutual_reach_dist_graph(X, labels, dist_function) + # mst = self._mutual_reach_dist_MST(graph) + # cluster_validity = self._clustering_validity_index(mst, labels) + # return cluster_validity + # + # + # def _core_dist(self, point, neighbors, dist_function): + # """ + # Computes the core distance of a point. + # Core distance is the inverse density of an object. + # + # Args: + # point (np.array): array of dimensions (n_features,) + # point to compute core distance of + # neighbors (np.ndarray): array of dimensions (n_neighbors, n_features): + # array of all other points in object class + # dist_dunction (func): function to determine distance between objects + # func args must be [np.array, np.array] where each array is a point + # + # Returns: core_dist (float) + # inverse density of point + # """ + # n_features = np.shape(point)[0] + # n_neighbors = np.shape(neighbors)[0] + # + # distance_vector = cdist(point.reshape(1, -1), neighbors) + # distance_vector = distance_vector[distance_vector != 0] + # numerator = ((1/distance_vector)**n_features).sum() + # core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features) + # return core_dist + # + # def _mutual_reachability_dist(self, point_i, point_j, neighbors_i, + # neighbors_j, dist_function): + # """. + # Computes the mutual reachability distance between points + # + # Args: + # point_i (np.array): array of dimensions (n_features,) + # point i to compare to point j + # point_j (np.array): array of dimensions (n_features,) + # point i to compare to point i + # neighbors_i (np.ndarray): array of dims (n_neighbors, n_features): + # array of all other points in object class of point i + # neighbors_j (np.ndarray): array of dims (n_neighbors, n_features): + # array of all other points in object class of point j + # dist_function (func): function to determine distance between objects + # func args must be [np.array, np.array] where each array is a point + # + # Returns: + # mutual_reachability (float) + # mutual reachability between points i and j + # + # """ + # core_dist_i = self._core_dist(point_i, neighbors_i, dist_function) + # core_dist_j = self._core_dist(point_j, neighbors_j, dist_function) + # dist = dist_function(point_i, point_j) + # mutual_reachability = np.max([core_dist_i, core_dist_j, dist]) + # return mutual_reachability + # + # + # def _mutual_reach_dist_graph(self, X, labels, dist_function): + # """ + # Computes the mutual reach distance complete graph. + # Graph of all pair-wise mutual reachability distances between points + # + # Args: + # X (np.ndarray): ndarray with dimensions [n_samples, n_features] + # data to check validity of clustering + # labels (np.array): clustering assignments for data X + # dist_dunction (func): function to determine distance between objects + # func args must be [np.array, np.array] where each array is a point + # + # Returns: graph (np.ndarray) + # array of dimensions (n_samples, n_samples) + # Graph of all pair-wise mutual reachability distances between points. + # + # """ + # n_samples = np.shape(X)[0] + # graph = [] + # counter = 0 + # for row in range(n_samples): + # graph_row = [] + # for col in range(n_samples): + # point_i = X[row] + # point_j = X[col] + # class_i = labels[row] + # class_j = labels[col] + # members_i = self._get_label_members(X, labels, class_i) + # members_j = self._get_label_members(X, labels, class_j) + # dist = self._mutual_reachability_dist(point_i, point_j, + # members_i, members_j, + # dist_function) + # graph_row.append(dist) + # counter += 1 + # graph.append(graph_row) + # graph = np.array(graph) + # return graph + # + # + # def _mutual_reach_dist_MST(self, dist_tree): + # """ + # Computes minimum spanning tree of the mutual reach distance complete graph + # + # Args: + # dist_tree (np.ndarray): array of dimensions (n_samples, n_samples) + # Graph of all pair-wise mutual reachability distances + # between points. + # + # Returns: minimum_spanning_tree (np.ndarray) + # array of dimensions (n_samples, n_samples) + # minimum spanning tree of all pair-wise mutual reachability + # distances between points. + # """ + # mst = minimum_spanning_tree(dist_tree).toarray() + # return mst + np.transpose(mst) + # + # + # def _cluster_density_sparseness(self, MST, labels, cluster): + # """ + # Computes the cluster density sparseness, the minimum density + # within a cluster + # + # Args: + # MST (np.ndarray): minimum spanning tree of all pair-wise + # mutual reachability distances between points. + # labels (np.array): clustering assignments for data X + # cluster (int): cluster of interest + # + # Returns: cluster_density_sparseness (float) + # value corresponding to the minimum density within a cluster + # """ + # indices = np.where(labels == cluster)[0] + # cluster_MST = MST[indices][:, indices] + # cluster_density_sparseness = np.max(cluster_MST) + # return cluster_density_sparseness + # + # + # def _cluster_density_separation(self, MST, labels, cluster_i, cluster_j): + # """ + # Computes the density separation between two clusters, the maximum + # density between clusters. + # + # Args: + # MST (np.ndarray): minimum spanning tree of all pair-wise + # mutual reachability distances between points. + # labels (np.array): clustering assignments for data X + # cluster_i (int): cluster i of interest + # cluster_j (int): cluster j of interest + # + # Returns: density_separation (float): + # value corresponding to the maximum density between clusters + # """ + # indices_i = np.where(labels == cluster_i)[0] + # indices_j = np.where(labels == cluster_j)[0] + # shortest_paths = csgraph.dijkstra(MST, indices=indices_i) + # relevant_paths = shortest_paths[:, indices_j] + # density_separation = np.min(relevant_paths) + # return density_separation + # + # + # def _cluster_validity_index(self, MST, labels, cluster): + # """ + # Computes the validity of a cluster (validity of assignmnets) + # + # Args: + # MST (np.ndarray): minimum spanning tree of all pair-wise + # mutual reachability distances between points. + # labels (np.array): clustering assignments for data X + # cluster (int): cluster of interest + # + # Returns: cluster_validity (float) + # value corresponding to the validity of cluster assignments + # """ + # min_density_separation = np.inf + # for cluster_j in np.unique(labels): + # if cluster_j != cluster: + # cluster_density_separation = self._cluster_density_separation(MST, + # labels, + # cluster, + # cluster_j) + # if cluster_density_separation < min_density_separation: + # min_density_separation = cluster_density_separation + # cluster_density_sparseness = self._cluster_density_sparseness(MST, + # labels, + # cluster) + # numerator = min_density_separation - cluster_density_sparseness + # denominator = np.max([min_density_separation, cluster_density_sparseness]) + # cluster_validity = numerator / denominator + # return cluster_validity + # + # + # def _clustering_validity_index(self, MST, labels): + # """ + # Computes the validity of all clustering assignments for a + # clustering algorithm + # + # Args: + # MST (np.ndarray): minimum spanning tree of all pair-wise + # mutual reachability distances between points. + # labels (np.array): clustering assignments for data X + # + # Returns: validity_index (float): + # score in range[-1, 1] indicating validity of clustering assignments + # """ + # n_samples = len(labels) + # validity_index = 0 + # for label in np.unique(labels): + # fraction = np.sum(labels == label) / float(n_samples) + # cluster_validity = self._cluster_validity_index(MST, labels, label) + # validity_index += fraction * cluster_validity + # return validity_index + # + # + # def _get_label_members(self, X, labels, cluster): + # """ + # Helper function to get samples of a specified cluster. + # + # Args: + # X (np.ndarray): ndarray with dimensions [n_samples, n_features] + # data to check validity of clustering + # labels (np.array): clustering assignments for data X + # cluster (int): cluster of interest + # + # Returns: members (np.ndarray) + # array of dimensions (n_samples, n_features) of samples of the + # specified cluster. + # """ + # indices = np.where(labels == cluster)[0] + # members = X[indices] + # return members + + @property + def centers_(self): + # return self._labels, self._hdbscan_bscore, self._centers + return self._centers + @property + def labels_(self): + labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels] + return labels + @property + def non_clustered(self): + labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels] + non_clustered = np.where(np.array(labels) == 'Non clustered')[0] + return non_clustered + + + + # ~~~~~~~~~~~~~~~~~~~~~~~~~ ap ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class AP: + def __init__(self, X): + ## input matrix + self.__x = np.array(X) + + # Fit PCA model + self.M = AffinityPropagation(damping=0.5, max_iter=200, convergence_iter=15, copy=True, preference=None, + affinity='euclidean', verbose=False, random_state=None) + self.M.fit(self.__x) + self.yp = self.M.predict(self.__x)+1 + @property + def fit_optimal_(self): + clu = [f'cluster#{i}' for i in self.yp] + return self.__x, clu, self.M.cluster_centers_ \ No newline at end of file diff --git a/src/utils/data_parsing.py b/src/utils/data_parsing.py new file mode 100644 index 0000000000000000000000000000000000000000..a39c87dc7cf51d70eb741100e81ef3ef0943b284 --- /dev/null +++ b/src/utils/data_parsing.py @@ -0,0 +1,104 @@ +from packages import * +import jcamp as jc + +class JcampParser: + '''This module is designed to help retrieve spectral data as well as metadata of smaples from jcamp file''' + def __init__(self, path): + #self.__path = path.replace('\\','/') + self.__path = path + self.__dxfile = jc.jcamp_readfile(self.__path) + + # Access samples data + self.__nb = self.__dxfile['blocks'] # Get the total number of blocks = The total number of scanned samples + self.__list_of_blocks = self.__dxfile['children'] # Store all blocks within a a list + self.__wl = self.__list_of_blocks[0]["x"] # Wavelengths/frequencies/range + + # Start retreiving the data + specs = np.zeros((self.__nb, len(self.__list_of_blocks[0]["y"])), dtype=float) # preallocate a np matrix for sotoring spectra + self.idx = np.arange(self.__nb) # This list is designed to store samples name + self.__met = {} + for i in range(self.__nb): # Loop over the blocks + specs[i] = self.__list_of_blocks[i]['y'] + block = self.__list_of_blocks[i] + block_met = { 'name': block['title'], + 'origin': block['origin'], + 'date': block['date'], + #'time': block['time'], + 'spectrometer': block['spectrometer/data system'].split('\n$$')[0], + 'n_scans':block['spectrometer/data system'].split('\n$$')[6].split('=')[1], + 'resolution': block['spectrometer/data system'].split('\n$$')[8].split('=')[1], + #'instrumental parameters': block['instrumental parameters'], + 'xunits': block['xunits'], + 'yunits': block['yunits'], + #'xfactor': block['xfactor'], + #'yfactor': block['yfactor'], + 'firstx': block['firstx'], + 'lastx': block['lastx'], + #'firsty':block['firsty'], + #'miny': block['miny'], + #'maxy': block['maxy'], + 'npoints': block['npoints'], + 'concentrations':block['concentrations'], + #'deltax':block['deltax'] + } + + self.__met[f'{i}'] = block_met + self.metadata_ = DataFrame(self.__met).T + self.spectra = DataFrame(np.fliplr(specs), columns= self.__wl[::-1], index = self.metadata_['name']) # Storing spectra in a dataframe + + + + #### Concentrarions + self.pattern = r"\(([^,]+),(\d+(\.\d+)?),([^)]+)" + aa = self.__list_of_blocks[0]['concentrations'] + a = '\n'.join(line for line in aa.split('\n') if "NCU" not in line and "<<undef>>" not in line) + n_elements = a.count('(') + + ## Get the name of analyzed chamical elements + elements_name = [] + for match in re.findall(self.pattern, a): + elements_name.append(match[0]) + + ## Retrieve concentrationds + df = self.metadata_['concentrations'] + cc = {} + for i in range(self.metadata_.shape[0]): + cc[df.index[i]] = self.conc(df[str(i)]) + + ### dataframe conntaining chemical data + self.chem_data = DataFrame(cc, index=elements_name).T.astype(float) + self.chem_data.index = self.metadata_['name'] + + ### Method for retrieving the concentration of a single sample + def conc(self,sample): + prep = '\n'.join(line for line in sample.split('\n') if "NCU" not in line and "<<undef>>" not in line) + c = [] + for match in re.findall(self.pattern, prep): + c.append(match[1]) + concentration = np.array(c) + return concentration + + @property + def specs_df_(self): + return self.spectra + @property + def md_df_(self): + me = self.metadata_.drop("concentrations", axis = 1) + me = me.drop(me.columns[(me == '').all()], axis = 1) + return me + @property + def md_df_st_(self): + rt = ['origin','date'] + cl = self.metadata_.loc[:,rt] + return cl + + @property + def chem_data_(self): + return self.chem_data + + + +class CsvParser: + def __init__(self) -> None: + pass + \ No newline at end of file diff --git a/src/utils/dim_reduction.py b/src/utils/dim_reduction.py new file mode 100644 index 0000000000000000000000000000000000000000..1da233740dc858bcb6396e50712d81589d03a83f --- /dev/null +++ b/src/utils/dim_reduction.py @@ -0,0 +1,114 @@ +from packages import * +from utils.data_handling import * + + + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ pca ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# +class LinearPCA: + def __init__(self, X, Ncomp=10): + ## input matrix + self.__x = np.array(X) + ## set the number of components to compute and fit the model + self.__ncp = Ncomp + + # Fit PCA model + M = PCA(n_components = self.__ncp) + M.fit(self.__x) + + ######## results ######## + # Results + self.__pcnames = [f'PC{i+1}({100 * M.explained_variance_ratio_[i].round(2)}%)' for i in range(self.__ncp)] + self._Qexp_ratio = DataFrame(100 * M.explained_variance_ratio_, columns = ["Qexp"], index= [f'PC{i+1}' for i in range(self.__ncp)]) + + self._p = M.components_.T + self._t = M.transform(self.__x) + self.eigvals = M.singular_values_**2 + self.Lambda = np.diag(self.eigvals) + + # Matrix reconstruction or prediction making + self.T2 = {} + self._xp = {} + self._qres = {} + self.leverage = {} + + # + for i in range(self.__ncp): + # Matrix reconstruction- prediction + self._xp[i] = np.dot(self._t[:,:i+1], self._p.T[:i+1,:]) + + + #self.T2[i] = np.diag(self._t[:,:i+1] @ np.transpose(self._t[:,:i+1])) + + + + + @property + def scores_(self): + return DataFrame(self._t, columns= self.__pcnames) + + @property + def loadings_(self): + return DataFrame(self._p, columns=self.__pcnames) + + @property + def residuals_(self): + res = DataFrame(self._qres) + res.columns=self.__pcnames + return res + + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ umap ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# +class Umap: + """ + The UMAP dimension reduction algorithm from scikit learn + """ + def __init__(self, numerical_data, cat_data): + self.numerical_data = numerical_data + if cat_data is None: + self.categorical_data_encoded = cat_data + elif len(cat_data) > 0: + self.categorical_data = cat_data + self.le = LabelEncoder() + self.categorical_data_encoded = self.le.fit_transform(self.categorical_data) + else: + self.categorical_data_encoded = None + + self.model = UMAP(n_neighbors=20, n_components=3, min_dist=0.0, )#random_state=42,) + self.model.fit(self.numerical_data, y = self.categorical_data_encoded) + self.scores_raw = self.model.transform(self.numerical_data) + self.scores = DataFrame(self.scores_raw) + self.scores.columns = [f'axis_{i+1}' for i in range(self.scores_raw.shape[1])] + + @property + def scores_(self): + return self.scores + @property + def scores_raw_(self): + return self.scores_raw + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ nmf ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# +class Nmf: + def __init__(self, X, Ncomp=3): + ## input matrix + if np.min(X)<0: + self.__x = np.array(X-np.min(X)) + else: + self.__x = np.array(X) + ## set the number of components to compute and fit the model + self.__ncp = Ncomp + + # Fit PCA model + Mo = NMF(n_components=self.__ncp, init=None, solver='cd', beta_loss='frobenius', + tol=0.0001, max_iter=300, random_state=None, alpha_W=0.0, alpha_H='same', + l1_ratio=0.0, verbose=0, shuffle=False) + Mo.fit(self.__x) + # Results + self._p = Mo.components_.T + self._t = Mo.transform(self.__x) + @property + def scores_(self): + return DataFrame(self._t) + + @property + def loadings_(self): + return DataFrame(self._p) \ No newline at end of file diff --git a/src/utils/eval_metrics.py b/src/utils/eval_metrics.py new file mode 100644 index 0000000000000000000000000000000000000000..08fc4928bd3333d39ef039167ffd99124e27497b --- /dev/null +++ b/src/utils/eval_metrics.py @@ -0,0 +1,56 @@ +from packages import * + +class metrics: + def __init__(self, c:Optional[float] = None, cv:Optional[List] = None, t:Optional[List] = None, method = 'regression')-> DataFrame: + phase = [c, cv, t] + index = np.array(["train", "cv", "test"]) + notnone = [i for i in range(3) if phase[i] != None] + met_index = index[notnone] + methods = ['regression', 'classification'] + perf = {} + for i in notnone: + if method == 'regression': + perf[index[i]] = metrics.reg_(phase[i][0], phase[i][1]) + + elif method == 'classification': + perf[index[i]] = metrics.class_(phase[i][0], phase[i][1]) + + if notnone == 1: + self.ret = perf.T + else: + self.ret = DataFrame(perf).T + + @staticmethod + def reg_(meas, pred): + meas = np.array(meas) + pred = np.array(pred) + xbar = np.mean(meas) # the average of measured values + e = np.subtract(meas , pred) + e2 = e**2# the squared error + + # Sum of squared: + # TOTAL + sst = np.sum((meas - xbar)**2) + # RESIDUAL + ssr = np.sum(e2) + # REGRESSION OR MODEL + ssm = np.sum(pred - xbar) + + + # Compute statistical metrics + metr = {} + metr['r'] = np.corrcoef(meas, pred)[0, 1] + metr['r2'] = 1-ssr/sst + metr['rmse'] = np.sqrt(np.mean(e2)) + metr['mae'] = np.mean(np.abs(e2)) + metr['rpd'] = np.std(meas)/np.sqrt(np.mean(e2)) + metr['rpiq'] = (np.quantile(meas, .75) - np.quantile(meas, .25))/np.sqrt(np.mean(e2)) + return metr + + @staticmethod + def class_(meas, pred): + pass + + @property + def scores_(self): + return self.ret \ No newline at end of file diff --git a/src/utils/regress.py b/src/utils/regress.py new file mode 100644 index 0000000000000000000000000000000000000000..03d0f16620d8c205ec517b84fa0054b04bcb7614 --- /dev/null +++ b/src/utils/regress.py @@ -0,0 +1,229 @@ +from packages import * +from utils import metrics, Snv, No_transformation, KF_CV, sel_ratio + + +class Regmodel(object): + + def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 3, **kwargs): + self.SCORE = 100000000 + self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1] + self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1] + self._model, self._best = None, None + self._yc, self._ycv, self._yt = None, None, None + self._cv_df = DataFrame() + self._sel_ratio = DataFrame() + self._nfolds = nfolds + self._selected_bands = DataFrame(index = ['from', 'to']) + self.important_features = None + self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]), + 'deriv': hp.choice('deriv', [0, 1, 2]), + 'window_length': hp.choice('window_length', [15, 21, 27, 33]), + 'normalization': hp.choice('normalization', ['Snv', 'No_transformation'])} + if add_hyperparams is not None: + self._hyper_params.update(add_hyperparams) + self._best = None + + trials = Trials() + best_params = fmin(fn=self.objective, + space=self._hyper_params, + algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach + max_evals=n_iter, + trials=trials, + verbose=1) + + @property + def train_data_(self): + return [self._xc, self._ytrain] + + @property + def test_data_(self): + return [self._xt, self._ytest] + + @property + def pretreated_spectra_(self): + return self.pretreated + + @property + def get_params_(self):### This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters + return self._hyper_params + + def objective(self, params): + pass + + @property + def best_hyperparams_(self): ### This method returns the subset of selected hyperparametes + return self._best + @property + def best_hyperparams_print(self):### This method returns a sentence telling what signal preprocessing method was applied + if self._best['normalization'] == 'Snv': + a = 'Standard Normal Variate (SNV)' + + elif self._best['normalization'] == 'No_transformation': + a = " No transformation was performed" + + SG = f'- Savitzky-Golay derivative parameters \:(Window_length:{self._best['window_length']}; polynomial order: {self._best['polyorder']}; Derivative order : {self._best['deriv']})' + Norm = f'- Spectral Normalization \: {a}' + return SG+"\n"+Norm + + @property + def model_(self): # This method returns the developed model + return self._model + + @property + def pred_data_(self): ## this method returns the predicted data in training and testing steps + return self._yc, self._yt + + @property + def cv_data_(self): ## Cross validation data + return self._ycv + + @property + def CV_results_(self): + return self._cv_df + @property + def important_features_(self): + return self.important_features + @property + def selected_features_(self): + return self._selected_bands + + @property + def sel_ratio_(self): + return self._sel_ratio + +########################################### PLSR ######################################### +class Plsr(Regmodel): + def __init__(self, train, test, n_iter = 10, cv = 3): + super().__init__(train, test, n_iter, nfolds = cv, add_hyperparams = {'n_components': hp.randint('n_components', 1,20)}) + ### parameters in common + + def objective(self, params): + params['n_components'] = int(params['n_components']) + x0 = [self._xc, self._xt] + + x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)] + + a, b, c = params['deriv'], params['polyorder'], params['window_length'] + if a > b or b > c: + if self._best is not None: + a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length'] + + else: + a, b, c = 0, 0, 1 + + params['deriv'], params['polyorder'], params['window_length'] = a, b, c + x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)] + + model = PLSRegression(scale = False, n_components = params['n_components']) + folds = KF_CV().CV(x = x2[0], y = np.array(self._ytrain), n_folds = self._nfolds) + yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = x2[0], y = np.array(self._ytrain)) + self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] + + score = self._cv_df.loc["cv",'rmse'] + + Model = PLSRegression(scale = False, n_components = params['n_components']) + Model.fit(x2[0], self._ytrain) + + if self.SCORE > score: + self.SCORE = score + self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds) + self._yc = Model.predict(x2[0]) + self._yt = Model.predict(x2[1]) + self._model = Model + for key,value in params.items(): + try: params[key] = int(value) + except (TypeError, ValueError): params[key] = value + + self._best = params + self.pretreated = DataFrame(x2[0]) + self._sel_ratio = sel_ratio(Model, x2[0]) + return score + + + ############################################ iplsr ######################################### +class TpeIpls(Regmodel): + def __init__(self, train, test, n_iter = 10, n_intervall = 5, cv = 3): + self.n_intervall = n_intervall + self.n_arrets = self.n_intervall*2 + + + r = {'n_components': hp.randint('n_components', 1,20)} + r.update({f'v{i}': hp.randint(f'v{i}', 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)}) + + super().__init__(train, test, n_iter, add_hyperparams = r, nfolds = cv) + + ### parameters in common + + def objective(self, params): + ### wevelengths index + self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)] + self.idx.sort() + arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)] + id = np.unique(np.concatenate(arrays, axis=0), axis=0) + + ### Preprocessing + x0 = [self._xc, self._xt] + x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)] + + a, b, c = params['deriv'], params['polyorder'], params['window_length'] + if a > b or b > c: + if self._best is not None: + a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length'] + + else: + a, b, c = 0, 0, 1 + + params['deriv'], params['polyorder'], params['window_length'] = a, b, c + x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)] + + + prepared_data = [x2[i][:,id] for i in range(2)] + + + ### Modelling + folds = KF_CV().CV(x = prepared_data[0], y = np.array(self._ytrain), n_folds = self._nfolds) + try: + model = PLSRegression(scale = False, n_components = params['n_components']) + yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain)) + self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] + except ValueError as ve: + params["n_components"] = 1 + model = PLSRegression(scale = False, n_components = params["n_components"]) + yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain)) + self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] + + + score = self._cv_df.loc['cv','rmse'] + + Model = PLSRegression(scale = False, n_components = model.n_components) + Model.fit(prepared_data[0], self._ytrain) + + if self.SCORE > score: + self.SCORE = score + self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds) + + self._yc = Model.predict(prepared_data[0]) + self._yt = Model.predict(prepared_data[1]) + self._model = Model + for key,value in params.items(): + try: params[key] = int(value) + except (TypeError, ValueError): params[key] = value + self._best = params + + self.pretreated = DataFrame(x2[0]) + self.segments = arrays + + for i in range(len(self.segments)): + self._selected_bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]] + self._selected_bands.index = ['from','to'] + + return score + + + ########################################### LWPLSR ######################################### + ############################################ Pcr ######################################### + +class Pcr(Regmodel): + def __init__(self, train, test, n_iter = 10, n_val = 5): + super.__init__() + {f'pc{i}': hp.randint(f'pc{i+1}', 0, train[0].shape[1]) for i in range(self.n_val)} diff --git a/src/utils/samsel.py b/src/utils/samsel.py new file mode 100644 index 0000000000000000000000000000000000000000..2d02a8caeb6c6c0ea1f7d53cc3713c24cd0475b2 --- /dev/null +++ b/src/utils/samsel.py @@ -0,0 +1,25 @@ +from packages import * +from typing import Sequence, Dict, Optional, Union + +class KS: + def __init__(self, x:Optional[Union[np.ndarray|DataFrame]], rset:Optional[Union[float|int]]): + self.x = x + self.ratio = rset + self._train, self._test = ks_train_test_split(self.x, train_size = self.ratio) + + @property + def calset(self): + clu = self._train.index.tolist() + return self.x, clu + +class RDM: + def __init__(self, x:Optional[Union[np.ndarray|DataFrame]], rset:Optional[Union[float|int]]): + self.x = x + self.ratio = rset + self._train, self._test = train_test_split(self.x, train_size = self.ratio) + + @property + def calset(self): + clu = self._train.index.tolist() + + return self.x, clu \ No newline at end of file diff --git a/src/utils/visualize.py b/src/utils/visualize.py new file mode 100644 index 0000000000000000000000000000000000000000..1ef4288dd1f18cc9f213c7677ce580d8592a6b4b --- /dev/null +++ b/src/utils/visualize.py @@ -0,0 +1,139 @@ +from packages import * + + +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ predictions histogram ~~~~~~~~~~~~~~~~~~~~~~~~~~ +@st.cache_data +def pred_hist(pred): + # Creating histogram + hist, axs = plt.subplots(1, 1, figsize =(15, 3), + tight_layout = True) + + # Add x, y gridlines + axs.grid( color ='grey', linestyle ='-.', linewidth = 0.5, alpha = 0.6) + # Remove axes splines + for s in ['top', 'bottom', 'left', 'right']: + axs.spines[s].set_visible(False) + # Remove x, y ticks + axs.xaxis.set_ticks_position('none') + axs.yaxis.set_ticks_position('none') + # Add padding between axes and labels + axs.xaxis.set_tick_params(pad = 5) + axs.yaxis.set_tick_params(pad = 10) + # Creating histogram + N, bins, patches = axs.hist(pred, bins = 12) + return hist + + +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ predictions histogram ~~~~~~~~~~~~~~~~~~~~~~~~~~ +@st.cache_data +def plot_spectra(specdf, xunits, yunits): + fig, ax = plt.subplots(figsize = (30,7)) + if isinstance(specdf.columns[0], str): + specdf.T.plot(legend=False, ax = ax, color = '#2474b4') + min = 0 + else: + min = np.max(specdf.columns) + specdf.T.plot(legend=False, ax = ax, color = '#2474b4').invert_xaxis() + + ax.set_xlabel(xunits, fontsize=30) + ax.set_ylabel(yunits, fontsize=30) + plt.margins(x = 0) + plt.tight_layout() + return fig + + +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Cal/val hist ~~~~~~~~~~~~~~~~~~~~~~~~~~ +@st.cache_data +def hist(y, y_train, y_test, target_name = 'y'): + fig, ax = plt.subplots(figsize = (12,3)) + sns.histplot(y, color = "#004e9e", kde = True, label = str(target_name), ax = ax, fill = True) + sns.histplot(y_train, color = "#2C6B6F", kde = True, label = str(target_name)+" (Cal)", ax = ax, fill = True) + sns.histplot(y_test, color = "#d0f7be", kde = True, label = str(target_name)+" (Val)", ax = ax, fill = True) + ax.set_xlabel(str(target_name)) + plt.legend() + plt.tight_layout() + return fig + + + +@st.cache_data +def reg_plot( meas, pred, train_idx, test_idx): + a0 = np.ones(2) + a1 = np.ones(2) + + for i in range(len(meas)): + meas[i] = np.array(meas[i]).reshape(-1, 1) + pred[i] = np.array(pred[i]).reshape(-1, 1) + + M = LinearRegression() + M.fit(meas[i], pred[i]) + a1[i] = np.round(M.coef_[0][0],2) + a0[i] = np.round(M.intercept_[0],2) + + ec = np.subtract(np.array(meas[0]).reshape(-1), np.array(pred[0]).reshape(-1)) + et = np.subtract(np.array(meas[1]).reshape(-1), np.array(pred[1]).reshape(-1)) + + fig, ax = plt.subplots(figsize = (12,4)) + sns.regplot(x = meas[0] , y = pred[0], color="#2C6B6F", label = f'Cal (Predicted = {a0[0]} + {a1[0]} x Measured)', scatter_kws={'edgecolor': 'black'}) + sns.regplot(x = meas[1], y = pred[1], color='#d0f7be', label = f'Val (Predicted = {a0[1]} + {a1[1]} x Measured)', scatter_kws={'edgecolor': 'black'}) + plt.plot([np.min(meas[0]) - 0.05, np.max([meas[0]]) + 0.05], [np.min(meas[0]) - 0.05, np.max([meas[0]]) + 0.05], color = 'black') + + for i, txt in enumerate(train_idx): + #plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i],ec[i])) + if np.abs(ec[i])> np.mean(ec)+ 3*np.std(ec): + plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i], np.array(pred[0]).reshape(-1)[i])) + + for i, txt in enumerate(test_idx): + if np.abs(et[i])> np.mean(et)+ 3*np.std(et): + plt.annotate(txt ,(np.array(meas[1]).reshape(-1)[i], np.array(pred[1]).reshape(-1)[i])) + + ax.set_ylabel('Predicted values') + ax.set_xlabel('Measured values') + plt.legend() + plt.margins(0) + # fig.savefig('./report/figures/measured_vs_predicted.png') + return fig + +# Resid plot +@st.cache_data +def resid_plot( meas, pred, train_idx, test_idx): + a0 = np.ones(2) + a1 = np.ones(2) + e = [np.subtract(meas[0] ,pred[0]), np.subtract(meas[1], pred[1])] + + for i in range(len(meas)): + M = LinearRegression() + M.fit( np.array(meas[i]).reshape(-1,1), np.array(e[i]).reshape(-1,1)) + a1[i] = np.round(M.coef_[0],2) + a0[i] = np.round(M.intercept_,2) + + + fig, ax = plt.subplots(figsize = (12,4)) + sns.scatterplot(x = pred[0], y = e[0], color="#2C6B6F", label = f'Cal', edgecolor="black") + sns.scatterplot(x = pred[1], y = e[1], color="#d0f7be", label = f'Val', edgecolor="black") + + # sns.scatterplot(x = pred[0], y = e[0], color='blue', label = f'Cal (Residual = {a0[0]} + {a1[0]} * Predicted)') + # sns.scatterplot(x = pred[1], y = e[1], color='green', label = f'Val (Residual = {a0[1]} + {a1[1]} * Predicted)') + plt.axhline(y= 0, c ='black', linestyle = ':') + lim = np.max(abs(np.concatenate([e[0], e[1]], axis = 0)))*1.1 + plt.ylim(- lim, lim ) + + + for i in range(2): + e[i] = np.array(e[i]).reshape(-1,1) + + for i, txt in enumerate(train_idx): + #plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i],ec[i])) + if np.abs(e[0][i])> np.mean(e[0])+ 3*np.std(e[0]): + plt.annotate(txt ,(np.array(pred[0]).reshape(-1)[i],e[0][i])) + + for i, txt in enumerate(test_idx): + if np.abs(e[1][i])> np.mean(e[1])+ 3*np.std(e[1]): + plt.annotate(txt ,(np.array(pred[1]).reshape(-1)[i],e[1][i])) + ax.set_xlabel(f'{ train_idx.shape}') + ax.set_ylabel('Residuals') + ax.set_xlabel('Predicted values') + plt.legend() + plt.margins(0) + # fig.savefig('./report/figures/residuals_plot.png') + return fig \ No newline at end of file