diff --git a/src/Class_Mod/Ap.py b/src/Class_Mod/Ap.py deleted file mode 100644 index 2084d2563ba70720c0dad371d066b11f4ba2d5c9..0000000000000000000000000000000000000000 --- a/src/Class_Mod/Ap.py +++ /dev/null @@ -1,16 +0,0 @@ -from Packages import * - -class AP: - def __init__(self, X): - ## input matrix - self.__x = np.array(X) - - # Fit PCA model - self.M = AffinityPropagation(damping=0.5, max_iter=200, convergence_iter=15, copy=True, preference=None, - affinity='euclidean', verbose=False, random_state=None) - self.M.fit(self.__x) - self.yp = self.M.predict(self.__x)+1 - @property - def fit_optimal_(self): - clu = [f'cluster#{i}' for i in self.yp] - return self.__x, clu, self.M.cluster_centers_ \ No newline at end of file diff --git a/src/Class_Mod/DATA_HANDLING.py b/src/Class_Mod/DATA_HANDLING.py deleted file mode 100644 index 7f73676037f807782b933c9638d1ac7afb0a384d..0000000000000000000000000000000000000000 --- a/src/Class_Mod/DATA_HANDLING.py +++ /dev/null @@ -1,205 +0,0 @@ -from Packages import * -from .Evaluation_Metrics import metrics - -## try to automatically detect the field separator within the CSV -def find_delimiter(filename): - import clevercsv - with open(filename, newline='') as csvfile: - delimiter = clevercsv.Sniffer().sniff(csvfile.read(100)).delimiter - # sniffer = csv.Sniffer() - # with open(filename) as fp: - # delimiter = sniffer.sniff(fp.read(200)).delimiter - return delimiter - -def find_col_index(filename): - with open(filename) as fp: - lines = pd.read_csv(fp, skiprows=3, nrows=3, index_col=False, sep=find_delimiter(filename)) - col_index = 'yes' if lines.iloc[:,0].dtypes != np.float64 else 'no' - return col_index - - -# detection of columns categories and scaling -def col_cat(data_import): - """detect numerical and categorical columns in the csv""" - # set first column as sample names - name_col = pd.DataFrame(list(data_import.index), index = list(data_import.index)) - # name_col=name_col.rename(columns = {0:'name'}) - numerical_columns_list = [] - categorical_columns_list = [] - for i in data_import.columns: - if data_import[i].dtype == np.dtype("float64") or data_import[i].dtype == np.dtype("int64"): - numerical_columns_list.append(data_import[i]) - else: - categorical_columns_list.append(data_import[i]) - if len(numerical_columns_list) == 0: - empty = [0 for x in range(len(data_import))] - numerical_columns_list.append(empty) - if len(categorical_columns_list) > 0: - categorical_data = pd.concat(categorical_columns_list, axis=1) - categorical_data.insert(0, 'name', name_col) - if len(categorical_columns_list) == 0: - categorical_data = pd.DataFrame - # Create numerical data matrix from the numerical columns list and fill na with the mean of the column - numerical_data = pd.concat(numerical_columns_list, axis=1) - numerical_data = numerical_data.apply(lambda x: x.fillna(x.mean())) #np.mean(x))) - - return numerical_data, categorical_data - - - -def list_files(mypath, import_type): - list_files = [f for f in listdir(mypath) if isfile(join(mypath, f)) and f.endswith(import_type + '.pkl')] - if list_files == []: - list_files = ['Please, create a model before - no model available yet'] - return list_files - - - -def standardize(X, center = True, scale = False): - sk = StandardScaler(with_mean=center, with_std = scale) - sc = pd.DataFrame(sk.fit_transform(X), index = X.index, columns = X.columns) - return sc - -def MinMaxScale(X): - t = X - sk = MinMaxScaler(feature_range=(0,1)) - sc = pd.DataFrame(sk.fit_transform(X), index = t.index, columns = t.columns) - return sc - -######################################## Spectral preprocessing -def Detrend(X): - c = detrend(X, axis=-1, type='linear', bp=0, overwrite_data=False) - return c - -def Snv(X): - xt = np.array(X).T - c = (xt-xt.mean())/xt.std() - return pd.DataFrame(c.T, index=X.index, columns= X.columns) - -def No_transformation(X): - return X - - -######################################## Cross val split ############################ -class KF_CV: - ### method for generating test sets index - ### KFCV(dict) returns a testset indices/Fold - @staticmethod - def CV(x, y, n_folds:int): - test_folds = {} - folds_name = [f'Fold{i+1}' for i in range(n_folds)] - kf = ks.KFold(n_splits=n_folds, device='cpu') - for i in range(n_folds): - d = [] - for _, i_test in kf.split(x, y): - d.append(i_test) - test_folds[folds_name[i]] = d[i] - return test_folds ## returns a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set - - ### Cross validate the model and return the predictions and samples index - @staticmethod - def cross_val_predictor(model, folds, x, y): - """" model: the object to be cross-validated, - folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method) - x and y: the data used for CV""" - x = np.array(x) - y = np.array(y) - - yp = {} - key = list(folds.keys()) - n_folds = len(folds.keys()) - - for i in range(n_folds): - model.fit(np.delete(x, folds[key[i]], axis=0), np.delete(y, folds[key[i]], axis=0)) - yp[key[i]] = model.predict(x[folds[key[i]]]) #### predictions/fold - return yp # returns a tuple with keys are names of folds and the corresponding values are the predicted Y/fold - @staticmethod - def meas_pred_eq(y, ypcv, folds): - """" y: the target variable, - ypcv: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with predictions/fold (from cross_val_predictor method) - folds: a tuple where keys are the name of each fold, and the corresponding values is a 1d numpy array filled with indices of test set(from CV method) - x and y: the data used for CV - - returns: - two dataframe: - - a n x 4 dataframe containing measured values, predicted values, ols reg equation, and index (n is the total number of samples) - - a 2 x k dataframe containing ols regression coefficients(k is the number of folds) - """ - cvcv = {} - coeff = {} - y = np.array(y) - for i, Fname in enumerate(folds.keys()): - r = pd.DataFrame() - r['Predicted'] = ypcv[Fname] - r['Measured'] = y[folds[Fname]] - ols = LinearRegression().fit(pd.DataFrame(y[folds[Fname]]), ypcv[Fname].reshape(-1,1)) - r.index = folds[Fname] - r['Folds'] = [f'{Fname} (Predicted = {np.round(ols.intercept_[0], 2)} + {np.round(ols.coef_[0][0],2)} x Measured'] * r.shape[0] - cvcv[i] = r - coeff[Fname] = [ols.coef_[0][0], ols.intercept_[0]] - - data = pd.concat(cvcv, axis = 0) - data['index'] = [data.index[i][1] for i in range(data.shape[0])] - data.index = data['index'] - coeff = pd.DataFrame(coeff, index = ['Slope', 'Intercept']) - return data, coeff ## returns values predicted in cross validation, ,coefficients of regression - - @staticmethod - def metrics_cv(y, ypcv, folds): - y = np.array(y) - e = {} - for i in folds.keys(): - e[i] = metrics().reg_(y[folds[i]],ypcv[i]) - r = pd.DataFrame(e) - r_print = r.copy() - r_print['mean'] = r.mean(axis = 1) - r_print['sd'] = r.std(axis = 1) - r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1) - return r.T, r_print.T - - ### compute metrics for each fold - @staticmethod - def cv_scores(y, ypcv, folds): - """ Takes as input the Y vactor, the tuple of preducted values/fold(from cross_val_predictor method), and the index/fold(from CV method) - and returns two dataframes, the first is containing metrics scores/fold and the second is similar to the first by with additional mean, sd, and rsd variables - """ - y = np.array(y) - e = {} - for i in folds.keys(): - e[i] = metrics().reg_(y[folds[i]],ypcv[i]) - r = pd.DataFrame(e) - r_print = r - r_print['mean'] = r.mean(axis = 1) - r_print['sd'] = r.std(axis = 1) - r_print['cv'] = 100*r.std(axis = 1)/r.mean(axis = 1) - return r.T, r_print.T - - - # ### Return ycv - # @staticmethod - # def ycv(model, x, y, n_folds:int): - # ycv = np.zeros(y.shape[0]) - # f, idx,_,_ = KF_CV.cross_val_predictor(model, x,y, n_folds) - # for i in f.keys(): - # ycv[idx[i]] = f[i] - # return ycv - - -### Selectivity ratio -def sel_ratio(model, x ): - from scipy.stats import f - - x = pd.DataFrame(x) - wtp = model.coef_.T/ np.linalg.norm(model.coef_.T) - ttp = np.array(x @ wtp) - ptp = np.array(x.T) @ np.array(ttp)/(ttp.T @ ttp) - qexpi = np.linalg.norm(ttp @ ptp.T, axis = 0)**2 - e = np.array(x-x.mean()) - ttp @ ptp.T - qres = np.linalg.norm(e, axis = 0)**2 - sr = pd.DataFrame(qexpi/qres, index = x.columns, columns = ['sr']) - - fcr = f.ppf(0.05, sr.shape[0]-2, sr.shape[0]-3) - c = sr > fcr - sr.index = np.arange(x.shape[1]) - SR = sr.iloc[c.to_numpy(),:] - return SR \ No newline at end of file diff --git a/src/Class_Mod/DxReader.py b/src/Class_Mod/DxReader.py deleted file mode 100644 index 973372738142a6d6a1233146fef512d6c5f86461..0000000000000000000000000000000000000000 --- a/src/Class_Mod/DxReader.py +++ /dev/null @@ -1,103 +0,0 @@ -from Packages import * -import jcamp as jc - -class DxRead: - - '''This module is designed to help retrieve spectral data as well as metadata of smaples from jcamp file''' - def __init__(self, path): - #self.__path = path.replace('\\','/') - self.__path = path - self.__dxfile = jc.jcamp_readfile(self.__path) - - # Access samples data - self.__nb = self.__dxfile['blocks'] # Get the total number of blocks = The total number of scanned samples - self.__list_of_blocks = self.__dxfile['children'] # Store all blocks within a a list - self.__wl = self.__list_of_blocks[0]["x"] # Wavelengths/frequencies/range - - # Start retreiving the data - specs = np.zeros((self.__nb, len(self.__list_of_blocks[0]["y"])), dtype=float) # preallocate a np matrix for sotoring spectra - self.idx = np.arange(self.__nb) # This list is designed to store samples name - self.__met = {} - for i in range(self.__nb): # Loop over the blocks - specs[i] = self.__list_of_blocks[i]['y'] - block = self.__list_of_blocks[i] - block_met = { 'name': block['title'], - 'origin': block['origin'], - 'date': block['date'], - #'time': block['time'], - 'spectrometer': block['spectrometer/data system'].split('\n$$')[0], - 'n_scans':block['spectrometer/data system'].split('\n$$')[6].split('=')[1], - 'resolution': block['spectrometer/data system'].split('\n$$')[8].split('=')[1], - #'instrumental parameters': block['instrumental parameters'], - 'xunits': block['xunits'], - 'yunits': block['yunits'], - #'xfactor': block['xfactor'], - #'yfactor': block['yfactor'], - 'firstx': block['firstx'], - 'lastx': block['lastx'], - #'firsty':block['firsty'], - #'miny': block['miny'], - #'maxy': block['maxy'], - 'npoints': block['npoints'], - 'concentrations':block['concentrations'], - #'deltax':block['deltax'] - } - - self.__met[f'{i}'] = block_met - self.metadata_ = pd.DataFrame(self.__met).T - self.spectra = pd.DataFrame(np.fliplr(specs), columns= self.__wl[::-1], index = self.metadata_['name']) # Storing spectra in a pd.dataframe - - - - #### Concentrarions - self.pattern = r"\(([^,]+),(\d+(\.\d+)?),([^)]+)" - aa = self.__list_of_blocks[0]['concentrations'] - a = '\n'.join(line for line in aa.split('\n') if "NCU" not in line and "<<undef>>" not in line) - n_elements = a.count('(') - - ## Get the name of analyzed chamical elements - elements_name = [] - for match in re.findall(self.pattern, a): - elements_name.append(match[0]) - - ## Retrieve concentrationds - df = self.metadata_['concentrations'] - cc = {} - for i in range(self.metadata_.shape[0]): - cc[df.index[i]] = self.conc(df[str(i)]) - - ### dataframe conntaining chemical data - self.chem_data = pd.DataFrame(cc, index=elements_name).T.astype(float) - self.chem_data.index = self.metadata_['name'] - - ### Method for retrieving the concentration of a single sample - def conc(self,sample): - prep = '\n'.join(line for line in sample.split('\n') if "NCU" not in line and "<<undef>>" not in line) - c = [] - for match in re.findall(self.pattern, prep): - c.append(match[1]) - concentration = np.array(c) - return concentration - - @property - def specs_df_(self): - return self.spectra - @property - def md_df_(self): - me = self.metadata_.drop("concentrations", axis = 1) - me = me.drop(me.columns[(me == '').all()], axis = 1) - return me - @property - def md_df_st_(self): - rt = ['origin','date'] - cl = self.metadata_.loc[:,rt] - return cl - - @property - def chem_data_(self): - return self.chem_data - -@st.cache_data -def read_dx(file): - M = DxRead(file) - return M.chem_data, M.specs_df_, M.md_df_, M.md_df_st_ \ No newline at end of file diff --git a/src/Class_Mod/Evaluation_Metrics.py b/src/Class_Mod/Evaluation_Metrics.py deleted file mode 100644 index ebe94c0fa613913c426ce373bb649f7e5315ae8b..0000000000000000000000000000000000000000 --- a/src/Class_Mod/Evaluation_Metrics.py +++ /dev/null @@ -1,56 +0,0 @@ -from Packages import * - -class metrics: - def __init__(self, c:Optional[float] = None, cv:Optional[List] = None, t:Optional[List] = None, method = 'regression')-> pd.DataFrame: - phase = [c, cv, t] - index = np.array(["train", "cv", "test"]) - notnone = [i for i in range(3) if phase[i] != None] - met_index = index[notnone] - methods = ['regression', 'classification'] - perf = {} - for i in notnone: - if method == 'regression': - perf[index[i]] = metrics.reg_(phase[i][0], phase[i][1]) - - elif method == 'classification': - perf[index[i]] = metrics.class_(phase[i][0], phase[i][1]) - - if notnone == 1: - self.ret = perf.T - else: - self.ret = pd.DataFrame(perf).T - - @staticmethod - def reg_(meas, pred): - meas = np.array(meas) - pred = np.array(pred) - xbar = np.mean(meas) # the average of measured values - e = np.subtract(meas , pred) - e2 = e**2# the squared error - - # Sum of squared: - # TOTAL - sst = np.sum((meas - xbar)**2) - # RESIDUAL - ssr = np.sum(e2) - # REGRESSION OR MODEL - ssm = np.sum(pred - xbar) - - - # Compute statistical metrics - metr = {} - metr['r'] = np.corrcoef(meas, pred)[0, 1] - metr['r2'] = 1-ssr/sst - metr['rmse'] = np.sqrt(np.mean(e2)) - metr['mae'] = np.mean(np.abs(e2)) - metr['rpd'] = np.std(meas)/np.sqrt(np.mean(e2)) - metr['rpiq'] = (np.quantile(meas, .75) - np.quantile(meas, .25))/np.sqrt(np.mean(e2)) - return metr - - @staticmethod - def class_(meas, pred): - pass - - @property - def scores_(self): - return self.ret \ No newline at end of file diff --git a/src/Class_Mod/HDBSCAN_Clustering.py b/src/Class_Mod/HDBSCAN_Clustering.py deleted file mode 100644 index a5d3bc04794b45231dbd802ece5ce19f5ba97ba8..0000000000000000000000000000000000000000 --- a/src/Class_Mod/HDBSCAN_Clustering.py +++ /dev/null @@ -1,335 +0,0 @@ -from Packages import * - -class Hdbscan: - """Runs an automatically optimized sklearn.HDBSCAN clustering on dimensionality reduced space. - - The HDBSCAN_scores_ @Property returns the cluster number of each sample (_labels) and the DBCV best score. - - Returns: - _labels (pd.DataFrame): DataFrame with the cluster belonging number for each sample - _hdbscan_score (float): a float with the best DBCV score after optimization - - Examples: - - clustering = HDBSCAN((data) - - scores = clustering.HDBSCAN_scores_ - - """ - def __init__(self, data): - """Initiate the HDBSCAN calculation - - Args: - data (pd.DataFrame): the Dimensionality reduced space, raw result of the UMAP.fit() - param_dist (dictionary): the HDBSCAN optimization parameters to test - _score (pd.DataFrame): is a dataframe with the DBCV value for each combination of param_dist. We search for the higher value to then compute an HDBSCAN with the best parameters. - """ - # Really fast - self._param_dist = {'min_samples': [8], - 'min_cluster_size':[10], - 'metric' : ['euclidean'],#,'manhattan'], - } - # Medium - # self._param_dist = {'min_samples': [1,10], - # 'min_cluster_size':[5,50], - # 'metric' : ['euclidean','manhattan'], - # } - # Complete - # self._param_dist = {'min_samples': [1,5,10,], - # 'min_cluster_size':[5,25,50,], - # 'metric' : ['euclidean','manhattan'], - # } - - self._clusterable_embedding = data - - # RandomizedSearchCV not working... - # def scoring(model, clusterable_embedding): - # label = HDBSCAN().fit_predict(clusterable_embedding) - # hdbscan_score = DBCV(clusterable_embedding, label, dist_function=euclidean) - # return hdbscan_score - # tunning = RandomizedSearchCV(estimator=HDBSCAN(), param_distributions=param_dist, scoring=scoring) - # tunning.fit(clusterable_embedding) - # return tunning - - # compute optimization. Test each combination of parameters and store DBCV score into _score. - # self._score = pd.DataFrame() - # for i in self._param_dist.get('min_samples'): - # for j in self._param_dist.get('min_cluster_size'): - # self._ij_label = HDBSCAN(min_samples=i, min_cluster_size=j).fit_predict(self._clusterable_embedding) - # self._ij_hdbscan_score = self.DBCV(self._clusterable_embedding, self._ij_label,)# dist_function=euclidean) - # self._score.at[i,j] = self._ij_hdbscan_score - # get the best DBCV score - # self._hdbscan_bscore = max(self._score.max()) - # find the coordinates of the best clustering parameters and run HDBSCAN below - # self._bparams = np.where(self._score == self._hdbscan_bscore) - # run HDBSCAN with best params - - # self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][self._bparams[0][0]], min_cluster_size=self._param_dist['min_cluster_size'][self._bparams[1][0]], metric=self._param_dist['metric'][self._bparams[1][0]], store_centers="medoid", ) - self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][0], min_cluster_size=self._param_dist['min_cluster_size'][0], metric=self._param_dist['metric'][0], store_centers="medoid", ) - self.best_hdbscan.fit_predict(self._clusterable_embedding) - self._labels = self.best_hdbscan.labels_ - self._centers = self.best_hdbscan.medoids_ - - - # def DBCV(self, X, labels, dist_function=euclidean): - # """ - # Implimentation of Density-Based Clustering Validation "DBCV" - # - # Citation: Moulavi, Davoud, et al. "Density-based clustering validation." - # Proceedings of the 2014 SIAM International Conference on Data Mining. - # Society for Industrial and Applied Mathematics, 2014. - # - # Density Based clustering validation - # - # Args: - # X (np.ndarray): ndarray with dimensions [n_samples, n_features] - # data to check validity of clustering - # labels (np.array): clustering assignments for data X - # dist_dunction (func): function to determine distance between objects - # func args must be [np.array, np.array] where each array is a point - # - # Returns: - # cluster_validity (float): score in range[-1, 1] indicating validity of clustering assignments - # """ - # graph = self._mutual_reach_dist_graph(X, labels, dist_function) - # mst = self._mutual_reach_dist_MST(graph) - # cluster_validity = self._clustering_validity_index(mst, labels) - # return cluster_validity - # - # - # def _core_dist(self, point, neighbors, dist_function): - # """ - # Computes the core distance of a point. - # Core distance is the inverse density of an object. - # - # Args: - # point (np.array): array of dimensions (n_features,) - # point to compute core distance of - # neighbors (np.ndarray): array of dimensions (n_neighbors, n_features): - # array of all other points in object class - # dist_dunction (func): function to determine distance between objects - # func args must be [np.array, np.array] where each array is a point - # - # Returns: core_dist (float) - # inverse density of point - # """ - # n_features = np.shape(point)[0] - # n_neighbors = np.shape(neighbors)[0] - # - # distance_vector = cdist(point.reshape(1, -1), neighbors) - # distance_vector = distance_vector[distance_vector != 0] - # numerator = ((1/distance_vector)**n_features).sum() - # core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features) - # return core_dist - # - # def _mutual_reachability_dist(self, point_i, point_j, neighbors_i, - # neighbors_j, dist_function): - # """. - # Computes the mutual reachability distance between points - # - # Args: - # point_i (np.array): array of dimensions (n_features,) - # point i to compare to point j - # point_j (np.array): array of dimensions (n_features,) - # point i to compare to point i - # neighbors_i (np.ndarray): array of dims (n_neighbors, n_features): - # array of all other points in object class of point i - # neighbors_j (np.ndarray): array of dims (n_neighbors, n_features): - # array of all other points in object class of point j - # dist_function (func): function to determine distance between objects - # func args must be [np.array, np.array] where each array is a point - # - # Returns: - # mutual_reachability (float) - # mutual reachability between points i and j - # - # """ - # core_dist_i = self._core_dist(point_i, neighbors_i, dist_function) - # core_dist_j = self._core_dist(point_j, neighbors_j, dist_function) - # dist = dist_function(point_i, point_j) - # mutual_reachability = np.max([core_dist_i, core_dist_j, dist]) - # return mutual_reachability - # - # - # def _mutual_reach_dist_graph(self, X, labels, dist_function): - # """ - # Computes the mutual reach distance complete graph. - # Graph of all pair-wise mutual reachability distances between points - # - # Args: - # X (np.ndarray): ndarray with dimensions [n_samples, n_features] - # data to check validity of clustering - # labels (np.array): clustering assignments for data X - # dist_dunction (func): function to determine distance between objects - # func args must be [np.array, np.array] where each array is a point - # - # Returns: graph (np.ndarray) - # array of dimensions (n_samples, n_samples) - # Graph of all pair-wise mutual reachability distances between points. - # - # """ - # n_samples = np.shape(X)[0] - # graph = [] - # counter = 0 - # for row in range(n_samples): - # graph_row = [] - # for col in range(n_samples): - # point_i = X[row] - # point_j = X[col] - # class_i = labels[row] - # class_j = labels[col] - # members_i = self._get_label_members(X, labels, class_i) - # members_j = self._get_label_members(X, labels, class_j) - # dist = self._mutual_reachability_dist(point_i, point_j, - # members_i, members_j, - # dist_function) - # graph_row.append(dist) - # counter += 1 - # graph.append(graph_row) - # graph = np.array(graph) - # return graph - # - # - # def _mutual_reach_dist_MST(self, dist_tree): - # """ - # Computes minimum spanning tree of the mutual reach distance complete graph - # - # Args: - # dist_tree (np.ndarray): array of dimensions (n_samples, n_samples) - # Graph of all pair-wise mutual reachability distances - # between points. - # - # Returns: minimum_spanning_tree (np.ndarray) - # array of dimensions (n_samples, n_samples) - # minimum spanning tree of all pair-wise mutual reachability - # distances between points. - # """ - # mst = minimum_spanning_tree(dist_tree).toarray() - # return mst + np.transpose(mst) - # - # - # def _cluster_density_sparseness(self, MST, labels, cluster): - # """ - # Computes the cluster density sparseness, the minimum density - # within a cluster - # - # Args: - # MST (np.ndarray): minimum spanning tree of all pair-wise - # mutual reachability distances between points. - # labels (np.array): clustering assignments for data X - # cluster (int): cluster of interest - # - # Returns: cluster_density_sparseness (float) - # value corresponding to the minimum density within a cluster - # """ - # indices = np.where(labels == cluster)[0] - # cluster_MST = MST[indices][:, indices] - # cluster_density_sparseness = np.max(cluster_MST) - # return cluster_density_sparseness - # - # - # def _cluster_density_separation(self, MST, labels, cluster_i, cluster_j): - # """ - # Computes the density separation between two clusters, the maximum - # density between clusters. - # - # Args: - # MST (np.ndarray): minimum spanning tree of all pair-wise - # mutual reachability distances between points. - # labels (np.array): clustering assignments for data X - # cluster_i (int): cluster i of interest - # cluster_j (int): cluster j of interest - # - # Returns: density_separation (float): - # value corresponding to the maximum density between clusters - # """ - # indices_i = np.where(labels == cluster_i)[0] - # indices_j = np.where(labels == cluster_j)[0] - # shortest_paths = csgraph.dijkstra(MST, indices=indices_i) - # relevant_paths = shortest_paths[:, indices_j] - # density_separation = np.min(relevant_paths) - # return density_separation - # - # - # def _cluster_validity_index(self, MST, labels, cluster): - # """ - # Computes the validity of a cluster (validity of assignmnets) - # - # Args: - # MST (np.ndarray): minimum spanning tree of all pair-wise - # mutual reachability distances between points. - # labels (np.array): clustering assignments for data X - # cluster (int): cluster of interest - # - # Returns: cluster_validity (float) - # value corresponding to the validity of cluster assignments - # """ - # min_density_separation = np.inf - # for cluster_j in np.unique(labels): - # if cluster_j != cluster: - # cluster_density_separation = self._cluster_density_separation(MST, - # labels, - # cluster, - # cluster_j) - # if cluster_density_separation < min_density_separation: - # min_density_separation = cluster_density_separation - # cluster_density_sparseness = self._cluster_density_sparseness(MST, - # labels, - # cluster) - # numerator = min_density_separation - cluster_density_sparseness - # denominator = np.max([min_density_separation, cluster_density_sparseness]) - # cluster_validity = numerator / denominator - # return cluster_validity - # - # - # def _clustering_validity_index(self, MST, labels): - # """ - # Computes the validity of all clustering assignments for a - # clustering algorithm - # - # Args: - # MST (np.ndarray): minimum spanning tree of all pair-wise - # mutual reachability distances between points. - # labels (np.array): clustering assignments for data X - # - # Returns: validity_index (float): - # score in range[-1, 1] indicating validity of clustering assignments - # """ - # n_samples = len(labels) - # validity_index = 0 - # for label in np.unique(labels): - # fraction = np.sum(labels == label) / float(n_samples) - # cluster_validity = self._cluster_validity_index(MST, labels, label) - # validity_index += fraction * cluster_validity - # return validity_index - # - # - # def _get_label_members(self, X, labels, cluster): - # """ - # Helper function to get samples of a specified cluster. - # - # Args: - # X (np.ndarray): ndarray with dimensions [n_samples, n_features] - # data to check validity of clustering - # labels (np.array): clustering assignments for data X - # cluster (int): cluster of interest - # - # Returns: members (np.ndarray) - # array of dimensions (n_samples, n_features) of samples of the - # specified cluster. - # """ - # indices = np.where(labels == cluster)[0] - # members = X[indices] - # return members - - @property - def centers_(self): - # return self._labels, self._hdbscan_bscore, self._centers - return self._centers - @property - def labels_(self): - labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels] - return labels - @property - def non_clustered(self): - labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels] - non_clustered = np.where(np.array(labels) == 'Non clustered')[0] - return non_clustered diff --git a/src/Class_Mod/Hash.py b/src/Class_Mod/Hash.py deleted file mode 100644 index fb4138405efa4abbffcb4377cf1062df9758290f..0000000000000000000000000000000000000000 --- a/src/Class_Mod/Hash.py +++ /dev/null @@ -1,30 +0,0 @@ -from Packages import * - -def create_hash(to_hash): - #using the md5 hash function. - hash_func = hashlib.md5() - to_hash = str(to_hash) - encoded_to_hash = to_hash.encode() - hash_func.update(encoded_to_hash) - hash = hash_func.hexdigest() - return hash - -def check_hash(hash, hash_type): - # path to hash file and grep/cat functions for Win - subprocess_path = Path("src/data/hash/") - # run a grep from the hash onto the hash file - nb_hash = subprocess.run([subprocess_path / 'grep.exe', '-c', hash, subprocess_path / str(hash_type + ".txt")], shell=True) - # if hash present - if 'returncode=0' in str(nb_hash): - return 'existing hash' - # if hash not present - else: - return 'missing hash' - -def add_hash(hash, hash_type): - # add it to the file with cat function - add_hash = subprocess.run(['echo', str(hash) + '>>', subprocess_path / str(hash_type + ".txt")], shell=True) - if 'returncode=0' in str(add_hash): - return 'hash added' - else: - return 'error while adding the new hash' \ No newline at end of file diff --git a/src/Class_Mod/KMEANS_.py b/src/Class_Mod/KMEANS_.py deleted file mode 100644 index 78cb732f07aa961e38056cb4e6e070ad7588fb0c..0000000000000000000000000000000000000000 --- a/src/Class_Mod/KMEANS_.py +++ /dev/null @@ -1,52 +0,0 @@ -from Packages import * -class Sk_Kmeans: - """K-Means clustering for Samples selection. - - Returns: - inertia_ (pd.DataFrame): DataFrame with ... - x (pd.DataFrame): Initial data - clu (pd.DataFrame): Cluster name for each sample - model.cluster_centers_ (pd.DataFrame): Coordinates of the center of each cluster - """ - def __init__(self, x, max_clusters): - """Initiate the KMeans class. - - Args: - x (pd.DataFrame): the original reduced data to cluster - max_cluster (Int): the max number of desired clusters. - """ - self.x = x - self.max_clusters = max_clusters - - self.inertia = pd.DataFrame() - for i in range(1, max_clusters+1): - model = KMeans(n_clusters = i, init = 'k-means++', random_state = 42) - model.fit(x) - self.inertia[f'{i}_clust']= [model.inertia_] - self.inertia.index = ['inertia'] - - @property - def inertia_(self): - return self.inertia - - @property - def suggested_n_clusters_(self): - idxidx = [] - values = [] - - s = self.inertia.to_numpy().ravel() - for i in range(self.max_clusters-1): - idxidx.append(f'{i+1}_clust') - values.append((s[i] - s[i+1])*100 / s[i]) - - id = np.max(np.where(np.array(values) > 5))+2 - return id - - @property - def fit_optimal_(self): - model = KMeans(n_clusters = self.suggested_n_clusters_, init = 'k-means++', random_state = 42) - model.fit(self.x) - yp = model.predict(self.x)+1 - clu = [f'cluster#{i}' for i in yp] - - return self.x, clu, model.cluster_centers_ \ No newline at end of file diff --git a/src/Class_Mod/KennardStone.py b/src/Class_Mod/KennardStone.py deleted file mode 100644 index 3ad6c9179dbe92882666876c29ef2a3cf4f8a17c..0000000000000000000000000000000000000000 --- a/src/Class_Mod/KennardStone.py +++ /dev/null @@ -1,25 +0,0 @@ -from Packages import * -from typing import Sequence, Dict, Optional, Union - -class KS: - def __init__(self, x:Optional[Union[np.ndarray|pd.DataFrame]], rset:Optional[Union[float|int]]): - self.x = x - self.ratio = rset - self._train, self._test = ks.train_test_split(self.x, train_size = self.ratio) - - @property - def calset(self): - clu = self._train.index.tolist() - return self.x, clu - -class RDM: - def __init__(self, x:Optional[Union[np.ndarray|pd.DataFrame]], rset:Optional[Union[float|int]]): - self.x = x - self.ratio = rset - self._train, self._test = train_test_split(self.x, train_size = self.ratio) - - @property - def calset(self): - clu = self._train.index.tolist() - - return self.x, clu \ No newline at end of file diff --git a/src/Class_Mod/Kmedoids.py b/src/Class_Mod/Kmedoids.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/src/Class_Mod/LWPLSR_.py b/src/Class_Mod/LWPLSR_.py deleted file mode 100644 index 2e6c7a7f074b5a205c7648f6b880967c5570d3bb..0000000000000000000000000000000000000000 --- a/src/Class_Mod/LWPLSR_.py +++ /dev/null @@ -1,211 +0,0 @@ -from juliacall import Main as jl -import numpy as np -import pandas as pd - -class LWPLSR: - """The lwpls regression model from Jchemo (M. Lesnoff) - - Returns: - self.scores (DataFrame): various metrics and scores - self.predicted_results (Dictionary): Dict containing all predicted results (train, test, cross-validation) - self.mod (Julia model): the prepared model - """ - def __init__(self, dataset, preT): - """Initiate the LWPLSR and prepare data for Julia computing.""" - # get train / test data from dataset - self.x_train, self.y_train, self.x_test, self.y_test = [dataset[i] for i in range(4)] - # calculate number of KFolds and get CV data from dataset - self.nb_fold = int((len(dataset)-4)/4) - for i in range(self.nb_fold): - setattr(self, "xtr_fold"+str(i+1), dataset[i+7]) - setattr(self, "ytr_fold"+str(i+1), dataset[i+13]) - setattr(self, "xte_fold"+str(i+1), dataset[i+4]) - setattr(jl, "xtr_fold"+str(i+1), dataset[i+7]) - setattr(jl, "ytr_fold"+str(i+1), dataset[i+13]) - setattr(jl, "xte_fold"+str(i+1), dataset[i+4]) - - # prepare to send dataframes to julia and Jchemo (with the jl. prefix) - jl.x_train, jl.y_train, jl.x_test, jl.y_test = self.x_train, self.y_train, self.x_test, self.y_test - # Get parameters for preTreatment of the spectra (acquired from a global PLSR) - self.preT = preT - - # initialize vars from the class - y_shape = self.y_test.shape - self.pred_test = np.zeros(shape=(y_shape[0], 1)) - self.pred_train = np.zeros(shape=(y_shape[0], 1)) - self.mod = "" - self.best_lwplsr_params = np.zeros(shape=(5, 1)) - self.predicted_results = {} - - def Jchemo_lwplsr_fit(self): - """Send data to Julia to fit lwplsr. - - Args: - self.x_train (DataFrame): - self.y_train (DataFrame): - self.x_test (DataFrame): - self.y_test (DataFrame): - - Returns: - self.mod (Julia model): the prepared model - """ - # launch Julia Jchemo lwplsr and convert DataFrames from Python Pandas DataFrame to Julia DataFrame - jl.seval(""" - using DataFrames - using Pandas - using Jchemo - x_train |> Pandas.DataFrame |> DataFrames.DataFrame - y_train |> Pandas.DataFrame |> DataFrames.DataFrame - x_test |> Pandas.DataFrame |> DataFrames.DataFrame - y_test |> Pandas.DataFrame |> DataFrames.DataFrame - """) - # apply pre-treatments on X data - print('LWPLSR - preTreatment') - # apply pre-treatments to X data before working with - jl.npoint = self.preT['window_length'] - jl.deriv = self.preT['deriv'] - jl.degree = self.preT['polyorder'] - if self.preT['polyorder'] > 0: - jl.seval(""" - mod1 = model(snv; centr = true, scal = true) - mod2 = model(savgol; npoint = npoint, deriv = deriv, degree = degree) - """) - if self.preT['normalization'] == "No_transformation": - jl.seval(""" - preMod = mod2 - """) - elif self.preT['normalization'] == 'Snv': - jl.seval(""" - preMod = pip(mod1, mod2) - """) - jl.seval(""" - fit!(preMod, x_train) - x_train = transf(preMod, x_train) - x_test = transf(preMod, x_test) - """) - # LWPLSR tuning - print('LWPLSR - tuning') - # set tuning parameters to test - jl.seval(""" - nlvdis = [5; 10; 15] ; metric = [:eucl; :mah] - h = [1; 2; 6; Inf] ; k = [30; 80; 200] - nlv = 5:15 - pars = Jchemo.mpar(nlvdis = nlvdis, metric = metric, h = h, k = k) - """) - # split Train data into Cal/Val for tuning - jl.seval(""" - pct = .3 - ntrain = Jchemo.nro(x_train) - nval = Int(round(pct * ntrain)) - s = Jchemo.samprand(ntrain, nval) - Xcal = x_train[s.train, :] - ycal = y_train[s.train] - Xval = x_train[s.test, :] - yval = y_train[s.test] - ncal = ntrain - nval - """) - - # Create LWPLSR model and tune with GridScore - jl.seval(""" - mod = Jchemo.model(Jchemo.lwplsr) - res = gridscore(mod, Xcal, ycal, Xval, yval; score = Jchemo.rmsep, pars, nlv, verbose = false) - u = findall(res.y1 .== minimum(res.y1))[1] #best parameters combination - """) - # save best lwplsr parameters - self.best_lwplsr_params = {'nlvdis' : jl.res.nlvdis[jl.u], 'metric' : str(jl.res.metric[jl.u]), 'h' : jl.res.h[jl.u], 'k' : jl.res.k[jl.u], 'nlv' : jl.res.nlv[jl.u]} - print('best lwplsr params ' + str(self.best_lwplsr_params)) - # run LWPLSR model with best parameters - jl.seval(""" - mod = Jchemo.model(Jchemo.lwplsr; nlvdis = res.nlvdis[u], metric = res.metric[u], h = res.h[u], k = res.k[u], nlv = res.nlv[u]) - # Fit model - Jchemo.fit!(mod, x_train, y_train) - """) - # save Julia Jchemo model - self.mod = jl.mod - - def Jchemo_lwplsr_predict(self): - """Send data to Julia to predict with lwplsr. - - Args: - self.mod (Julia model): the prepared model - self.x_train (DataFrame): - self.y_train (DataFrame): - self.x_test (DataFrame): - self.y_test (DataFrame): - - Returns: - self.pred_test (Julia DataFrame): predicted values on x_test - self.pred_train (Julia DataFrame): predicted values on x_train - """ - # Predictions on x_test and store in self.pred - self.pred_test = jl.seval(""" - println("LWPLSR - start test predict") - res = Jchemo.predict(mod, x_test) - res.pred - """) - self.pred_train = jl.seval(""" - println("LWPLSR - start train predict") - res = Jchemo.predict(mod, x_train) - res.pred - """) - print('LWPLSR - end') - - def Jchemo_lwplsr_cv(self): - """Send Cross-Validation data to Julia to fit & predict with lwplsr. - - Args: - self.best_lwplsr_params: the best parameters to use (from tuning) for CV - self.xtr_fold1 (DataFrame): - self.ytr_fold1 (DataFrame): - self.xte_fold1 (DataFrame): - - Returns: - self.pred_cv (Julia DataFrame): predicted values on x_train with Cross-Validation - """ - for i in range(self.nb_fold): - jl.Xtr = getattr(self, "xtr_fold"+str(i+1)) - jl.Ytr = getattr(self, "ytr_fold"+str(i+1)) - jl.Xte = getattr(self, "xte_fold"+str(i+1)) - # convert Python Pandas DataFrame to Julia DataFrame - jl.seval(""" - using DataFrames - using Pandas - using Jchemo - Xtr |> Pandas.DataFrame |> DataFrames.DataFrame - Ytr |> Pandas.DataFrame |> DataFrames.DataFrame - Xte |> Pandas.DataFrame |> DataFrames.DataFrame - """) - # set lwplsr parameters as the best one from tuning - jl.nlvdis = int(self.best_lwplsr_params['nlvdis']) - jl.metric = self.best_lwplsr_params['metric'] - jl.h = self.best_lwplsr_params['h'] - jl.k = int(self.best_lwplsr_params['k']) - jl.nlv = int(self.best_lwplsr_params['nlv']) - jl.seval(""" - println("LWPLSR - start CV mod") - mod_cv = Jchemo.model(Jchemo.lwplsr; nlvdis = nlvdis, metric = Symbol(metric), h = h, k = k, nlv = nlv) - # Fit model - Jchemo.fit!(mod_cv, Xtr, Ytr) - """) - pred_cv = jl.seval(""" - println("LWPLSR - start CV predict") - res = Jchemo.predict(mod_cv, Xte) - res.pred - """) - # save predicted values for each KFold in the predicted_results dictionary - self.predicted_results["CV" + str(i+1)] = pd.DataFrame(pred_cv) - - @property - def pred_data_(self): - # convert predicted data from x_test to Pandas DataFrame - self.predicted_results["pred_data_train"] = pd.DataFrame(self.pred_train) - self.predicted_results["pred_data_test"] = pd.DataFrame(self.pred_test) - return self.predicted_results - - @property - def model_(self): - return self.mod - - @property - def best_lwplsr_params_(self): - return self.best_lwplsr_params diff --git a/src/Class_Mod/LWPLSR_Call.py b/src/Class_Mod/LWPLSR_Call.py deleted file mode 100644 index a0a8c30af7f38cb84e3d62ce6b7eba58621599a2..0000000000000000000000000000000000000000 --- a/src/Class_Mod/LWPLSR_Call.py +++ /dev/null @@ -1,53 +0,0 @@ -import numpy as np -from pathlib import Path -import json -from LWPLSR_ import LWPLSR -import os - -# loading the lwplsr_inputs.json -temp_path = Path("temp/") -data_to_work_with = ['x_train_np', 'y_train_np', 'x_test_np', 'y_test_np'] -# check data for cross-validation depending on KFold number -temp_files_list = os.listdir(temp_path) -nb_fold = 0 -for i in temp_files_list: - if 'fold' in i: - # add CV file name to data_to_work_with - data_to_work_with.append(str(i)[:-4]) - # and count the number of KFold - nb_fold += 1 -# Import data from csv files in the temp/ folder -dataset = [] -for i in data_to_work_with: - dataset.append(np.genfromtxt(temp_path / str(i + ".csv"), delimiter=',')) -print('CSV imported') - -# Get parameters for preTreatment of the spectra (acquired from a global PLSR) -with open(temp_path / "lwplsr_preTreatments.json", "r") as outfile: - preT = json.load(outfile) - -# launch LWPLSR Class from LWPLSR_.py in Class_Mod -print('start model creation') -Reg = LWPLSR(dataset, preT) -print('model created. \nnow fit') -LWPLSR.Jchemo_lwplsr_fit(Reg) -print('now predict') -LWPLSR.Jchemo_lwplsr_predict(Reg) -print('now CV') -LWPLSR.Jchemo_lwplsr_cv(Reg) - -# Export results in a json file to bring data back to 2-model_creation.py and streamlit interface -print('export to json') -pred = ['pred_data_train', 'pred_data_test'] -# add KFold results to predicted data -for i in range(int(nb_fold/4)): - pred.append("CV" + str(i+1)) -json_export = {} -for i in pred: - json_export[i] = Reg.pred_data_[i].to_dict() -# add the lwplsr global model to the json -json_export['model'] = str(Reg.model_) -# add the best parameters for the lwplsr obtained from GridScore tuning -json_export['best_lwplsr_params'] = Reg.best_lwplsr_params_ -with open(temp_path / "lwplsr_outputs.json", "w+") as outfile: - json.dump(json_export, outfile) diff --git a/src/Class_Mod/Miscellaneous.py b/src/Class_Mod/Miscellaneous.py deleted file mode 100644 index 788744990fc7078c0ed1df118ad8112fcb66b47e..0000000000000000000000000000000000000000 --- a/src/Class_Mod/Miscellaneous.py +++ /dev/null @@ -1,172 +0,0 @@ -from Packages import * - -# local CSS -## load the custom CSS in the style folder -@st.cache_data -def local_css(file_name): - with open(file_name) as f: - st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) - -# predict module -def prediction(NIRS_csv, qsep, qhdr, model): - # hdr var correspond to column header True or False in the CSV - if qhdr == 'yes': - col = 0 - else: - col = False - X_test = pd.read_csv(NIRS_csv, sep=qsep, index_col=col) - Y_preds = model.predict(X_test) - # Y_preds = X_test - return Y_preds - - -@st.cache_data -def reg_plot( meas, pred, train_idx, test_idx): - a0 = np.ones(2) - a1 = np.ones(2) - - for i in range(len(meas)): - meas[i] = np.array(meas[i]).reshape(-1, 1) - pred[i] = np.array(pred[i]).reshape(-1, 1) - - M = LinearRegression() - M.fit(meas[i], pred[i]) - a1[i] = np.round(M.coef_[0][0],2) - a0[i] = np.round(M.intercept_[0],2) - - ec = np.subtract(np.array(meas[0]).reshape(-1), np.array(pred[0]).reshape(-1)) - et = np.subtract(np.array(meas[1]).reshape(-1), np.array(pred[1]).reshape(-1)) - - fig, ax = plt.subplots(figsize = (12,4)) - sns.regplot(x = meas[0] , y = pred[0], color='blue', label = f'Calib (Predicted = {a0[0]} + {a1[0]} x Measured)') - sns.regplot(x = meas[1], y = pred[1], color='green', label = f'Test (Predicted = {a0[1]} + {a1[1]} x Measured)') - plt.plot([np.min(meas[0]) - 0.05, np.max([meas[0]]) + 0.05], [np.min(meas[0]) - 0.05, np.max([meas[0]]) + 0.05], color = 'black') - - for i, txt in enumerate(train_idx): - #plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i],ec[i])) - if np.abs(ec[i])> np.mean(ec)+ 3*np.std(ec): - plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i], np.array(pred[0]).reshape(-1)[i])) - - for i, txt in enumerate(test_idx): - if np.abs(et[i])> np.mean(et)+ 3*np.std(et): - plt.annotate(txt ,(np.array(meas[1]).reshape(-1)[i], np.array(pred[1]).reshape(-1)[i])) - - ax.set_ylabel('Predicted values') - ax.set_xlabel('Measured values') - plt.legend() - plt.margins(0) - # fig.savefig('./Report/figures/measured_vs_predicted.png') - return fig - -@st.cache_data -def resid_plot( meas, pred, train_idx, test_idx): - a0 = np.ones(2) - a1 = np.ones(2) - e = [np.subtract(meas[0] ,pred[0]), np.subtract(meas[1], pred[1])] - - for i in range(len(meas)): - M = LinearRegression() - M.fit( np.array(meas[i]).reshape(-1,1), np.array(e[i]).reshape(-1,1)) - a1[i] = np.round(M.coef_[0],2) - a0[i] = np.round(M.intercept_,2) - - - fig, ax = plt.subplots(figsize = (12,4)) - sns.scatterplot(x = pred[0], y = e[0], color='blue', label = f'Calib (Residual = {a0[0]} + {a1[0]} * Predicted)') - sns.scatterplot(x = pred[1], y = e[1], color='green', label = f'Test (Residual = {a0[1]} + {a1[1]} * Predicted)') - plt.axhline(y= 0, c ='black', linestyle = ':') - lim = np.max(abs(np.concatenate([e[0], e[1]], axis = 0)))*1.1 - plt.ylim(- lim, lim ) - - - for i in range(2): - e[i] = np.array(e[i]).reshape(-1,1) - - for i, txt in enumerate(train_idx): - #plt.annotate(txt ,(np.array(meas[0]).reshape(-1)[i],ec[i])) - if np.abs(e[0][i])> np.mean(e[0])+ 3*np.std(e[0]): - plt.annotate(txt ,(np.array(pred[0]).reshape(-1)[i],e[0][i])) - - for i, txt in enumerate(test_idx): - if np.abs(e[1][i])> np.mean(e[1])+ 3*np.std(e[1]): - plt.annotate(txt ,(np.array(pred[1]).reshape(-1)[i],e[1][i])) - ax.set_xlabel(f'{ train_idx.shape}') - ax.set_ylabel('Residuals') - ax.set_xlabel('Predicted values') - plt.legend() - plt.margins(0) - # fig.savefig('./Report/figures/residuals_plot.png') - return fig - - - -# function that create a download button - needs the data to save and the file name to store to -def download_results(data, export_name): - with open(data) as f: - st.download_button('Download', f, export_name, type='primary') - -@st.cache_resource -def plot_spectra(df, xunits, yunits): - fig, ax = plt.subplots(figsize = (30,7)) - if isinstance(df.columns[0], str): - df.T.plot(legend=False, ax = ax, color = 'blue') - min = 0 - else: - min = np.max(df.columns) - df.T.plot(legend=False, ax = ax, color = 'blue').invert_xaxis() - - ax.set_xlabel(xunits, fontsize=18) - ax.set_ylabel(yunits, fontsize=18) - plt.margins(x = 0) - plt.tight_layout() - - return fig - - -## descriptive stat -def desc_stats(x): - a = {} - a['N samples'] = x.shape[0] - a['Min'] = np.min(x) - a['Max'] = np.max(x) - a['Mean'] = np.mean(x) - a['Median'] = np.median(x) - a['S'] = np.std(x) - a['RSD'] = np.std(x)*100/np.mean(x) - a['Skew'] = skew(x, axis=0, bias=True) - a['Kurt'] = kurtosis(x, axis=0, bias=True) - return a - - -def hash_data(data): - import xxhash - """Hash various data types using MD5.""" - - # Convert to a string representation - if isinstance(data, pd.DataFrame): - data_str = data.to_string() - elif isinstance(data, pd.Series): - data_str = data.to_string() - elif isinstance(data, np.ndarray): - data_str = np.array2string(data, separator=',') - elif isinstance(data, (list, tuple)): - data_str = str(data) - elif isinstance(data, dict): - # Ensure consistent order for dict items - data_str = str(sorted(data.items())) - elif isinstance(data, (int, float, str, bool)): - data_str = str(data) - elif isinstance(data, bytes): - data_str = data.decode('utf-8', 'ignore') # Decode bytes to string - elif isinstance(data, str): # Check if it's a string representing file content - data_str = data - else: - raise TypeError(f"Unsupported data type: {type(data)}") - - # Encode the string to bytes - data_bytes = data_str.encode() - - # Compute the MD5 hash - md5_hash = xxhash.xxh32(data_bytes).hexdigest() - - return str(md5_hash) \ No newline at end of file diff --git a/src/Class_Mod/NMF_.py b/src/Class_Mod/NMF_.py deleted file mode 100644 index fead5eb4f82b256d0591fc16b44fd5ca0acc4114..0000000000000000000000000000000000000000 --- a/src/Class_Mod/NMF_.py +++ /dev/null @@ -1,28 +0,0 @@ -from Packages import * - - -class Nmf: - def __init__(self, X, Ncomp=3): - ## input matrix - if np.min(X)<0: - self.__x = np.array(X-np.min(X)) - else: - self.__x = np.array(X) - ## set the number of components to compute and fit the model - self.__ncp = Ncomp - - # Fit PCA model - Mo = NMF(n_components=self.__ncp, init=None, solver='cd', beta_loss='frobenius', - tol=0.0001, max_iter=300, random_state=None, alpha_W=0.0, alpha_H='same', - l1_ratio=0.0, verbose=0, shuffle=False) - Mo.fit(self.__x) - # Results - self._p = Mo.components_.T - self._t = Mo.transform(self.__x) - @property - def scores_(self): - return pd.DataFrame(self._t) - - @property - def loadings_(self): - return pd.DataFrame(self._p) \ No newline at end of file diff --git a/src/Class_Mod/PCA_.py b/src/Class_Mod/PCA_.py deleted file mode 100644 index 0d2afdb2d00add778fdfdd2f1a56e34e57886e5f..0000000000000000000000000000000000000000 --- a/src/Class_Mod/PCA_.py +++ /dev/null @@ -1,53 +0,0 @@ -from Packages import * - -class LinearPCA: - def __init__(self, X, Ncomp=10): - ## input matrix - self.__x = np.array(X) - ## set the number of components to compute and fit the model - self.__ncp = Ncomp - - # Fit PCA model - M = PCA(n_components = self.__ncp) - M.fit(self.__x) - - ######## results ######## - # Results - self.__pcnames = [f'PC{i+1}({100 * M.explained_variance_ratio_[i].round(2)}%)' for i in range(self.__ncp)] - self._Qexp_ratio = pd.DataFrame(100 * M.explained_variance_ratio_, columns = ["Qexp"], index= [f'PC{i+1}' for i in range(self.__ncp)]) - - self._p = M.components_.T - self._t = M.transform(self.__x) - self.eigvals = M.singular_values_**2 - self.Lambda = np.diag(self.eigvals) - - # Matrix reconstruction or prediction making - self.T2 = {} - self._xp = {} - self._qres = {} - self.leverage = {} - - # - for i in range(self.__ncp): - # Matrix reconstruction- prediction - self._xp[i] = np.dot(self._t[:,:i+1], self._p.T[:i+1,:]) - - - #self.T2[i] = np.diag(self._t[:,:i+1] @ np.transpose(self._t[:,:i+1])) - - - - - @property - def scores_(self): - return pd.DataFrame(self._t, columns= self.__pcnames) - - @property - def loadings_(self): - return pd.DataFrame(self._p, columns=self.__pcnames) - - @property - def residuals_(self): - res = pd.DataFrame(self._qres) - res.columns=self.__pcnames - return res \ No newline at end of file diff --git a/src/Class_Mod/PLSR_.py b/src/Class_Mod/PLSR_.py deleted file mode 100644 index 062f17026d3ee8a6db86b869537b6989b39cc729..0000000000000000000000000000000000000000 --- a/src/Class_Mod/PLSR_.py +++ /dev/null @@ -1,51 +0,0 @@ -from Packages import * -from Class_Mod.Miscellaneous import * -from Class_Mod.Evaluation_Metrics import metrics - -class PinardPlsr: - def __init__(self, x_train, y_train, x_test, y_test): - self.x_train = x_train - self.x_test = x_test - self.y_train = y_train - self.y_test = y_test - - # create model module with PINARD - # Declare preprocessing pipeline - svgolay = [ ('_sg1',pp.SavitzkyGolay()), - ('_sg2',pp.SavitzkyGolay()) # nested pipeline to perform the Savitzky-Golay method twice for 2nd order preprocessing - ] - preprocessing = [ ('id', pp.IdentityTransformer()), # Identity transformer, no change to the data - ('savgol', pp.SavitzkyGolay()), # Savitzky-Golay smoothing filter - ('derivate', pp.Derivate()), # Calculate the first derivative of the data - ('SVG', FeatureUnion(svgolay)) - ] - # Declare complete pipeline - pipeline = Pipeline([ - ('scaler', MinMaxScaler()), # scaling the data - ('preprocessing', FeatureUnion(preprocessing)), # preprocessing - ('PLS', PLSRegression(n_components=14))]) - # Estimator including y values scaling - estimator = TransformedTargetRegressor(regressor = pipeline, transformer = MinMaxScaler()) - # Training - self.trained = estimator.fit(self.x_train, self.y_train) - - - # fit scores - # Predictions on test set - self.yc = pd.DataFrame(self.trained.predict(self.x_train)) # make predictions on test data and assign to Y_preds variable - self.ycv = pd.DataFrame(cross_val_predict(self.trained, self.x_train, self.y_train, cv = 3)) # make predictions on test data and assign to Y_preds variable - self.yt = pd.DataFrame(self.trained.predict(self.x_test)) # make predictions on test data and assign to Y_preds variable - - ################################################################################################################ - - - ################################################################################################################ - - @property - def model_(self): - return self.trained - - @property - def pred_data_(self): - - return self.yc, self.ycv, self.yt \ No newline at end of file diff --git a/src/Class_Mod/PLSR_Preprocess.py b/src/Class_Mod/PLSR_Preprocess.py deleted file mode 100644 index aeb006617b37609c2f3a32f65338a9e88c701622..0000000000000000000000000000000000000000 --- a/src/Class_Mod/PLSR_Preprocess.py +++ /dev/null @@ -1,100 +0,0 @@ -from Packages import * -from Class_Mod import metrics -from Class_Mod.DATA_HANDLING import * - -class PlsProcess: - SCORE = 100000000 - index_export = pd.DataFrame() - def __init__(self, x_train, x_test, y_train, y_test, scale, Kfold): - - PlsProcess.SCORE = 10000 - self.xtrain = x_train - self.xtest = x_test - self.y_train = y_train - self.y_test = y_test - self.scale = scale - self.Kfold = Kfold - self.model = None - self.p = self.xtrain.shape[1] - self.PLS_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]), - 'deriv': hp.choice('deriv', [0, 1, 2]), - 'window_length': hp.choice('window_length', [15, 19, 23, 27]), - 'scatter': hp.choice('scatter', ['Snv', 'Non'])} - self.PLS_params['n_components'] = hp.randint("n_components", 2, 20) - - def objective(self, params): - # Train the model - self.xtrain = eval(f'{params['scatter']}(self.xtrain)') - self.xtest = eval( f'{params['scatter']}(self.xtest)') - - - - if params['deriv'] > params['polyorder'] or params['polyorder'] > params['window_length']: - params['deriv'] = 0 - params['polyorder'] = 0 - params['window_length'] = 1 - self.x_train = self.xtrain - self.x_test = self.xtest - else: - self.x_train = pd.DataFrame(eval(f'savgol_filter(self.xtrain, polyorder={params['polyorder']}, deriv={params['deriv']}, window_length = {params['window_length']})'), - columns = self.xtrain.columns, index= self.xtrain.index) - self.x_test = pd.DataFrame(eval(f'savgol_filter(self.xtest, polyorder={params['polyorder']}, deriv={params['deriv']}, window_length = {params['window_length']})'), columns = self.xtest.columns , index= self.xtest.index) - - - try: - Model = PLSRegression(scale = self.scale, n_components = params['n_components']) - Model.fit(self.x_train, self.y_train) - - except ValueError as ve: - params["n_components"] = 1 - Model = PLSRegression(scale = self.scale, n_components = params["n_components"]) - Model.fit(self.x_train, self.y_train) - - ## make prediction - yc = Model.predict(self.x_train).reshape(-1) - ycv = cross_val_predict(Model, self.x_train, self.y_train, cv=self.Kfold, n_jobs=-1).reshape(-1) - yt = Model.predict(self.x_test).reshape(-1) - #################### - rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv)) - rmsec = np.sqrt(mean_squared_error(self.y_train, yc)) - rmset = np.sqrt(mean_squared_error(self.y_test, yt)) - - - score = rmsecv/rmsec*np.round(rmset/rmsecv)*rmsecv*100/self.y_train.mean()*rmset*1000/self.y_test.mean() - if score < PlsProcess.SCORE-0.5 : - PlsProcess.SCORE = score - self.nlv = params['n_components'] - self.best = params - self.model = Model - self.yc = yc - self.ycv = ycv - self.yt = yt - return score - - - ############################################## - - def tune(self, n_iter): - trials = Trials() - - best_params = fmin(fn=self.objective, - space=self.PLS_params, - algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach - max_evals=n_iter, - trials=trials, - verbose=0) - - @property - def best_hyperparams(self): - self.b = {'Scatter':self.best['scatter'], 'Saitzky-Golay derivative parameters':{'polyorder':self.best['polyorder'], - 'deriv':self.best['deriv'], - 'window_length':self.best['window_length']}} - return self.b - - @property - def model_(self): - return self.model - - @property - def pred_data_(self): - return self.yc, self.ycv, self.yt \ No newline at end of file diff --git a/src/Class_Mod/RegModels.py b/src/Class_Mod/RegModels.py deleted file mode 100644 index 2762618af282453d547e86be2ce42ed967a35e23..0000000000000000000000000000000000000000 --- a/src/Class_Mod/RegModels.py +++ /dev/null @@ -1,229 +0,0 @@ -from Packages import * -from Class_Mod import metrics, Snv, No_transformation, KF_CV, sel_ratio - - -class Regmodel(object): - - def __init__(self, train, test, n_iter, add_hyperparams = None, nfolds = 3, **kwargs): - self.SCORE = 100000000 - self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1] - self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1] - self._model, self._best = None, None - self._yc, self._ycv, self._yt = None, None, None - self._cv_df = pd.DataFrame() - self._sel_ratio = pd.DataFrame() - self._nfolds = nfolds - self._selected_bands = pd.DataFrame(index = ['from', 'to']) - self.important_features = None - self._hyper_params = {'polyorder': hp.choice('polyorder', [0, 1, 2]), - 'deriv': hp.choice('deriv', [0, 1, 2]), - 'window_length': hp.choice('window_length', [15, 21, 27, 33]), - 'normalization': hp.choice('normalization', ['Snv', 'No_transformation'])} - if add_hyperparams is not None: - self._hyper_params.update(add_hyperparams) - self._best = None - - trials = Trials() - best_params = fmin(fn=self.objective, - space=self._hyper_params, - algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach - max_evals=n_iter, - trials=trials, - verbose=1) - - @property - def train_data_(self): - return [self._xc, self._ytrain] - - @property - def test_data_(self): - return [self._xt, self._ytest] - - @property - def pretreated_spectra_(self): - return self.pretreated - - @property - def get_params_(self):### This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters - return self._hyper_params - - def objective(self, params): - pass - - @property - def best_hyperparams_(self): ### This method returns the subset of selected hyperparametes - return self._best - @property - def best_hyperparams_print(self):### This method returns a sentence telling what signal preprocessing method was applied - if self._best['normalization'] == 'Snv': - a = 'Standard Normal Variate (SNV)' - - elif self._best['normalization'] == 'No_transformation': - a = " No transformation was performed" - - SG = f'- Savitzky-Golay derivative parameters \:(Window_length:{self._best['window_length']}; polynomial order: {self._best['polyorder']}; Derivative order : {self._best['deriv']})' - Norm = f'- Spectral Normalization \: {a}' - return SG+"\n"+Norm - - @property - def model_(self): # This method returns the developed model - return self._model - - @property - def pred_data_(self): ## this method returns the predicted data in training and testing steps - return self._yc, self._yt - - @property - def cv_data_(self): ## Cross validation data - return self._ycv - - @property - def CV_results_(self): - return self._cv_df - @property - def important_features_(self): - return self.important_features - @property - def selected_features_(self): - return self._selected_bands - - @property - def sel_ratio_(self): - return self._sel_ratio - -########################################### PLSR ######################################### -class Plsr(Regmodel): - def __init__(self, train, test, n_iter = 10, cv = 3): - super().__init__(train, test, n_iter, nfolds = cv, add_hyperparams = {'n_components': hp.randint('n_components', 1,20)}) - ### parameters in common - - def objective(self, params): - params['n_components'] = int(params['n_components']) - x0 = [self._xc, self._xt] - - x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)] - - a, b, c = params['deriv'], params['polyorder'], params['window_length'] - if a > b or b > c: - if self._best is not None: - a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length'] - - else: - a, b, c = 0, 0, 1 - - params['deriv'], params['polyorder'], params['window_length'] = a, b, c - x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)] - - model = PLSRegression(scale = False, n_components = params['n_components']) - folds = KF_CV().CV(x = x2[0], y = np.array(self._ytrain), n_folds = self._nfolds) - yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = x2[0], y = np.array(self._ytrain)) - self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] - - score = self._cv_df.loc["cv",'rmse'] - - Model = PLSRegression(scale = False, n_components = params['n_components']) - Model.fit(x2[0], self._ytrain) - - if self.SCORE > score: - self.SCORE = score - self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds) - self._yc = Model.predict(x2[0]) - self._yt = Model.predict(x2[1]) - self._model = Model - for key,value in params.items(): - try: params[key] = int(value) - except (TypeError, ValueError): params[key] = value - - self._best = params - self.pretreated = pd.DataFrame(x2[0]) - self._sel_ratio = sel_ratio(Model, x2[0]) - return score - - - ############################################ iplsr ######################################### -class TpeIpls(Regmodel): - def __init__(self, train, test, n_iter = 10, n_intervall = 5, cv = 3): - self.n_intervall = n_intervall - self.n_arrets = self.n_intervall*2 - - - r = {'n_components': hp.randint('n_components', 1,20)} - r.update({f'v{i}': hp.randint(f'v{i}', 0, train[0].shape[1]) for i in range(1,self.n_arrets+1)}) - - super().__init__(train, test, n_iter, add_hyperparams = r, nfolds = cv) - - ### parameters in common - - def objective(self, params): - ### wevelengths index - self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)] - self.idx.sort() - arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)] - id = np.unique(np.concatenate(arrays, axis=0), axis=0) - - ### Preprocessing - x0 = [self._xc, self._xt] - x1 = [eval(str(params['normalization'])+"(x0[i])") for i in range(2)] - - a, b, c = params['deriv'], params['polyorder'], params['window_length'] - if a > b or b > c: - if self._best is not None: - a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length'] - - else: - a, b, c = 0, 0, 1 - - params['deriv'], params['polyorder'], params['window_length'] = a, b, c - x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'], window_length = params['window_length']) for i in range(2)] - - - prepared_data = [x2[i][:,id] for i in range(2)] - - - ### Modelling - folds = KF_CV().CV(x = prepared_data[0], y = np.array(self._ytrain), n_folds = self._nfolds) - try: - model = PLSRegression(scale = False, n_components = params['n_components']) - yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain)) - self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] - except ValueError as ve: - params["n_components"] = 1 - model = PLSRegression(scale = False, n_components = params["n_components"]) - yp = KF_CV().cross_val_predictor(model = model, folds = folds, x = prepared_data[0], y = np.array(self._ytrain)) - self._cv_df = KF_CV().metrics_cv(y = np.array(self._ytrain), ypcv = yp, folds =folds)[1] - - - score = self._cv_df.loc['cv','rmse'] - - Model = PLSRegression(scale = False, n_components = model.n_components) - Model.fit(prepared_data[0], self._ytrain) - - if self.SCORE > score: - self.SCORE = score - self._ycv = KF_CV().meas_pred_eq(y = np.array(self._ytrain), ypcv=yp, folds=folds) - - self._yc = Model.predict(prepared_data[0]) - self._yt = Model.predict(prepared_data[1]) - self._model = Model - for key,value in params.items(): - try: params[key] = int(value) - except (TypeError, ValueError): params[key] = value - self._best = params - - self.pretreated = pd.DataFrame(x2[0]) - self.segments = arrays - - for i in range(len(self.segments)): - self._selected_bands[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]] - self._selected_bands.index = ['from','to'] - - return score - - - ########################################### LWPLSR ######################################### - ############################################ Pcr ######################################### - -class Pcr(Regmodel): - def __init__(self, train, test, n_iter = 10, n_val = 5): - super.__init__() - {f'pc{i}': hp.randint(f'pc{i+1}', 0, train[0].shape[1]) for i in range(self.n_val)} diff --git a/src/Class_Mod/SK_PLSR_.py b/src/Class_Mod/SK_PLSR_.py deleted file mode 100644 index 83ea85533bd2c4d9d7bbfbae94063e139aff464c..0000000000000000000000000000000000000000 --- a/src/Class_Mod/SK_PLSR_.py +++ /dev/null @@ -1,118 +0,0 @@ -from Packages import * -from Class_Mod.Miscellaneous import * -from Class_Mod.Evaluation_Metrics import metrics -from Class_Mod.DATA_HANDLING import Snv - -class PlsR: - SCORE = 100000000 - - def __init__(self, x_train, y_train, x_test, y_test): - self.PLS_params = {} - a = [0, 1, 2] - if min(a)==0: - b = [0] - elif min(a)==1: - b= [0,1] - elif min(a) ==2: - b = [0, 1, 2] - - self.PLS_params['Preprocess'] = {'Scatter':hp.choice('Scatter',['Snv', None]), - 'window_length_sg':hp.choice('window_length_sg', [9, 13, 17, 21]), - 'polyorder_sg':hp.choice('polyorder_sg',a), - 'deriv_sg':hp.choice('deriv_sg', b)} - - self.PLS_params['n_components'] = hp.choice("n_components", list(np.arange(1,21))) - - self.x_train = x_train - self.x_test = x_test - self.y_train = y_train - self.y_test = y_test - self.p = self.x_train.shape[1] - - trials = Trials() - best_params = fmin(fn=self.objective, - space=self.PLS_params, - algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach - max_evals=100, - trials=trials, - verbose=0) - ##################################################################################################### - if self.best['Preprocess']['Scatter'] is None: - xtrain = self.x_train - xtest = self.x_test - elif self.best_hyperparams['Preprocess']['Scatter'] == 'Snv': - xtrain = Snv(self.x_train) - xtest = Snv(self.x_test) - - x_train = savgol_filter(xtrain, window_length = self.best['Preprocess']['window_length_sg'], - polyorder = self.best['Preprocess']['polyorder_sg'], - deriv=self.best['Preprocess']['deriv_sg']) - - x_test = savgol_filter(xtest, window_length = self.best['Preprocess']['window_length_sg'], - polyorder = self.best['Preprocess']['polyorder_sg'], - deriv=self.best['Preprocess']['deriv_sg']) - - - - ###################################################################################################### - self.trained = PLSRegression(n_components= self.best['n_components'], scale = False) - self.trained.fit(x_train, self.y_train) - - self.yc = pd.DataFrame(self.trained.predict(x_train)) # make predictions on test data and assign to Y_preds variable - self.ycv = pd.DataFrame(cross_val_predict(self.trained, x_train, self.y_train, cv = 3)) # make predictions on test data and assign to Y_preds variable - self.yt = pd.DataFrame(self.trained.predict(x_test)) # make predictions on test data and assign to Y_preds variable - ####################################################################################################### - - def objective(self, params): - ws = params['Preprocess']['window_length_sg'] - po = params['Preprocess']['polyorder_sg'] - dr = params['Preprocess']['deriv_sg'] - - if params['Preprocess']['Scatter'] is None: - xtrain = self.x_train - xtest = self.x_test - elif params['Preprocess']['Scatter'] == 'Snv': - xtrain = Snv(self.x_train) - xtest = Snv(self.x_test) - - x_train = savgol_filter(xtrain, window_length = params['Preprocess']['window_length_sg'], - polyorder = params['Preprocess']['polyorder_sg'], - deriv=params['Preprocess']['deriv_sg']) - - x_test = savgol_filter(xtest, window_length = params['Preprocess']['window_length_sg'], - polyorder = params['Preprocess']['polyorder_sg'], - deriv=params['Preprocess']['deriv_sg']) - - - m = PLSRegression( n_components= params['n_components'], scale = False ) - m.fit(x_train, self.y_train) - - yc = m.predict(x_train) - ycv = cross_val_predict( m, x_train, self.y_train, cv = 5) - yt = m.predict(x_test) - - rmsec = mean_squared_error(self.y_train, yc) - rmsecv = mean_squared_error(self.y_train, ycv) - rmset = mean_squared_error(self.y_test, yt) - - SCORE = (rmsecv/rmset) + (rmsecv/rmsec) + (rmset/rmsec) - if SCORE < PlsR.SCORE: - PlsR.SCORE = SCORE - self.best = params - return SCORE - - - @property - def model_(self): - return self.trained - - @property - def best_hyperparams(self): - self.b = {'Scatter':self.best['Preprocess']['Scatter'], 'Saitzky-Golay derivative parameters':{'polyorder':self.best['Preprocess']['polyorder_sg'], - 'deriv':self.best['Preprocess']['deriv_sg'], - 'window_length':self.best['Preprocess']['window_length_sg']}} - return self.b - - @property - def pred_data_(self): - return np.array(self.yc).reshape(-1), np.array(self.ycv).reshape(-1), np.array(self.yt).reshape(-1) \ No newline at end of file diff --git a/src/Class_Mod/SQL_connect.py b/src/Class_Mod/SQL_connect.py deleted file mode 100644 index aeda61c59f7a62bfd080cdf97362e2702dd7d6b1..0000000000000000000000000000000000000000 --- a/src/Class_Mod/SQL_connect.py +++ /dev/null @@ -1,36 +0,0 @@ -from Packages import pyodbc, json - -class SQL_Database(): - - def __init__(self): - config_path = Path("../config/") - with open(config_path / 'config.json', 'r') as fh: - config = json.load(fh) - - self.driver = config['DRIVER'] - self.server = config['SERVER'] - self.database = config['DATABASE'] - self.uid = config['UID'] - self.pwd = config['PWD'] - self.port = config['PORT'] - self.encrypt = config['ENCRYPT'] - - def connect(self): - connection = pyodbc.connect( - f'Driver={self.driver};' - f'Server={self.server};' - f'Database={self.database};' - f'uid={self.uid};' - f'pwd={self.pwd};' - f'port={self.port};' - f'Encrypt={self.encrypt};' - ) - return connection - -# How to connect to the db? -# con = SQL_Database().connect() -# quest = con.execute("SELECT table_schema || '.' || table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('pg_catalog', 'information_schema');") -# row = quest.fetchone() -# print(row) -# quest.close() -# con.close() diff --git a/src/Class_Mod/UMAP_.py b/src/Class_Mod/UMAP_.py deleted file mode 100644 index 75c874639a0510fb48dac87a89c17d45ba8f5d7a..0000000000000000000000000000000000000000 --- a/src/Class_Mod/UMAP_.py +++ /dev/null @@ -1,31 +0,0 @@ -# UMAP function for the Sample Selection module -from Packages import * -from Class_Mod.DATA_HANDLING import * - -class Umap: - """ - The UMAP dimension reduction algorithm from scikit learn - """ - def __init__(self, numerical_data, cat_data): - self.numerical_data = numerical_data - if cat_data is None: - self.categorical_data_encoded = cat_data - elif len(cat_data) > 0: - self.categorical_data = cat_data - self.le = LabelEncoder() - self.categorical_data_encoded = self.le.fit_transform(self.categorical_data) - else: - self.categorical_data_encoded = None - - self.model = UMAP(n_neighbors=20, n_components=3, min_dist=0.0, )#random_state=42,) - self.model.fit(self.numerical_data, y = self.categorical_data_encoded) - self.scores_raw = self.model.transform(self.numerical_data) - self.scores = pd.DataFrame(self.scores_raw) - self.scores.columns = [f'axis_{i+1}' for i in range(self.scores_raw.shape[1])] - - @property - def scores_(self): - return self.scores - @property - def scores_raw_(self): - return self.scores_raw \ No newline at end of file diff --git a/src/Class_Mod/VarSel.py b/src/Class_Mod/VarSel.py deleted file mode 100644 index 9f60fc4a73294660fb31517afe74c112430ab12c..0000000000000000000000000000000000000000 --- a/src/Class_Mod/VarSel.py +++ /dev/null @@ -1,163 +0,0 @@ -from Packages import * -from Class_Mod import metrics -from Class_Mod import * -from scipy.signal import savgol_filter -class TpeIpls: - ''' - This framework is added to the clan of wavelengths selection algorithms.It was introduced as an improvement - to the forward and backward intervall selection algorithms. This framework combines - the partial least squares algorithm and the tree-parzen structed estimatior, which is a bayesian optimization algorithm - that was first introduced in 2011. This combination provides a wrapper method for intervall-PLS. - This work keeps the integrity of the spectral data. by treating the data as a sequential data rather than using - descrete optimization (point to point selection) - ''' - - '''Optimization algorithms can be used to find the subset of variables that optimize a certain criterion - (e.g., maximize predictive performance, minimize overfitting)''' - SCORE = 100000000 - index_export = pd.DataFrame() - def __init__(self, x_train, x_test, y_train, y_test, - scale, Kfold, n_intervall): - TpeIpls.SCORE = 10000 - self.xtrain = x_train - self.xtest = x_test - self.y_train= y_train - self.y_test = y_test - self.scale = scale - self.Kfold = Kfold - self.p = self.xtrain.shape[1] - self.n_intervall = n_intervall - self.n_arrets = self.n_intervall*2 - self.PLS_params = {f'v{i}': hp.randint(f'v{i}', 0, self.p) for i in range(1,self.n_arrets+1)} - self.PLS_params['n_components'] = hp.randint("n_components", 1, 10) - self.PLS_params['Preprocess'] = {'Scatter':hp.choice('Scatter',['Snv', None]), - 'window_length_sg':hp.choice('window_length_sg', [9, 13, 17, 21]), - 'polyorder_sg':hp.choice('polyorder_sg',[2]), - 'deriv_sg':hp.choice('deriv_sg', [1])} - def objective(self, params): - self.idx = [params[f'v{i}'] for i in range(1,self.n_arrets+1)] - self.idx.sort() - - arrays = [np.arange(self.idx[2*i],self.idx[2*i+1]+1) for i in range(self.n_intervall)] - - id = np.unique(np.concatenate(arrays, axis=0), axis=0) - - ## first preprocessing method - if params['Preprocess']['Scatter'] =='Snv': - xtrain1 = Snv(self.xtrain) - xtest1 = Snv(self.xtest) - else: - xtrain1 = self.xtrain - xtest1 = self.xtest - - ## Second first preprocessing method - if params['Preprocess']['deriv_sg'] > params['Preprocess']['polyorder_sg'] or params['Preprocess']['polyorder_sg'] > params['Preprocess']['window_length_sg']: - params['Preprocess']['deriv_sg'] = 0 - params['Preprocess']['polyorder_sg'] = 0 - params['Preprocess']['window_length_sg'] = 1 - - - pt = params['Preprocess'] - self.x_train = pd.DataFrame(eval(f"savgol_filter(xtrain1, polyorder=pt['deriv_sg'], deriv=pt['deriv_sg'], window_length = pt['window_length_sg'], delta=1.0, axis=-1, mode='interp', cval=0.0)") , - columns = self.xtrain.columns, index= self.xtrain.index) - - self.x_test = pd.DataFrame(eval(f"savgol_filter(xtest1, polyorder=pt['deriv_sg'], deriv=pt['deriv_sg'], window_length = pt['window_length_sg'], delta=1.0, axis=-1, mode='interp', cval=0.0)") , - columns = self.xtest.columns, index= self.xtest.index) - - - # Train the model - try: - Model = PLSRegression(scale = self.scale,n_components = params['n_components']) - Model.fit(self.x_train.iloc[:,id], self.y_train) - except ValueError as ve: - Model = PLSRegression(scale = self.scale,n_components = 1) - Model.fit(self.x_train.iloc[:,id], self.y_train) - params['n_components'] = 1 - - - ## make prediction - yc = Model.predict(self.x_train.iloc[:,id]).ravel() - ycv = cross_val_predict(Model, self.x_train.iloc[:,id], self.y_train, cv=self.Kfold, n_jobs=-1).ravel() - yt = Model.predict(self.x_test.iloc[:, id]).ravel() - - ### compute r-squared - #r2c = r2_score(self.y_train, yc) - #r2cv = r2_score(self.y_train, ycv) - #r2t = r2_score(self.y_test, yt) - rmsecv = np.sqrt(mean_squared_error(self.y_train, ycv)) - rmsec = np.sqrt(mean_squared_error(self.y_train, yc)) - - score = np.round(rmsecv/rmsec + rmsecv*100/self.y_train.mean()) - if score < TpeIpls.SCORE-0.5: - TpeIpls.SCORE = score - self.nlv = params['n_components'] - - - TpeIpls.index_export = pd.DataFrame() - TpeIpls.index_export["Vars"] = self.x_test.columns[id] - TpeIpls.index_export.index = id - self.best = params - - - self.segments = arrays - return score - - - - - ############################################## - - def BandSelect(self, n_iter): - trials = Trials() - - best_params = fmin(fn=self.objective, - space=self.PLS_params, - algo=tpe.suggest, # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach - max_evals=n_iter, - trials=trials, - verbose=0) - - ban = {} - if self.segments:####### test - for i in range(len(self.segments)): - ban[f'band{i+1}'] = [self.segments[i][0], self.segments[i][self.segments[i].shape[0]-1]] - - self.bands = pd.DataFrame(ban).T - self.bands.columns = ['from', 'to'] - - - f = [] - for i in range(self.bands.shape[0]): - f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1)) - variables_idx = list(set(f)) - - - - ############################################ - for i in range(self.bands.shape[0]): - f.extend(np.arange(self.bands["from"][i], self.bands["to"][i]+1)) - variables_idx = list(set(f)) - - self.pls = PLSRegression(n_components=self.nlv, scale= self.scale) - self.pls.fit(self.x_train.iloc[:,variables_idx], self.y_train) - - self.yc = self.pls.predict(self.x_train.iloc[:,variables_idx]).ravel() - self.ycv = cross_val_predict(self.pls, self.x_train.iloc[:,variables_idx], self.y_train, cv=self.Kfold, n_jobs=-1).ravel() - self.yt = self.pls.predict(self.x_test.iloc[:,variables_idx]).ravel() - - return self.bands, variables_idx - - @property - def best_hyperparams(self): - self.b = {'Scatter':self.best['Preprocess']['Scatter'], 'Saitzky-Golay derivative parameters':{'polyorder':self.best['Preprocess']['polyorder_sg'], - 'deriv':self.best['Preprocess']['deriv_sg'], - 'window_length':self.best['Preprocess']['window_length_sg']}} - return self.b - - @property - def model_(self): - return self.pls - @property - def pred_data_(self): - return self.yc, self.ycv, self.yt - \ No newline at end of file diff --git a/src/Class_Mod/__init__.py b/src/Class_Mod/__init__.py deleted file mode 100644 index c5434f5f66a91df7d2634dd82077d24df06676ce..0000000000000000000000000000000000000000 --- a/src/Class_Mod/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Here are all the classes to perform your analysis -""" -from .PCA_ import * -from .KMEANS_ import Sk_Kmeans -from .UMAP_ import Umap -from .DATA_HANDLING import * -from .PLSR_ import PinardPlsr -from .LWPLSR_ import LWPLSR -from .Evaluation_Metrics import metrics -#from .VarSel import TpeIpls -from .Miscellaneous import resid_plot, reg_plot, desc_stats, hash_data -from .DxReader import DxRead, read_dx -from .HDBSCAN_Clustering import Hdbscan -from .SK_PLSR_ import PlsR -from .PLSR_Preprocess import PlsProcess -from .NMF_ import Nmf -from .Ap import AP -from .RegModels import Plsr, TpeIpls -from .KennardStone import KS, RDM