from Packages import * class Hdbscan: """ Runs an automatic optimized sklearn.HDBSCAN clustering on Dimensionality reduced space. Vars: data: the Dimensionality reduced space, raw result of the UMAP.fit() param_dist: the HDBSCAN optimization parameters to test Density-Based Clustering Validation - DBCV (https://github.com/christopherjenness/DBCV/tree/master ; Moulavi, Davoud, et al. "Density-based clustering validation." Proceedings of the 2014 SIAM International Conference on Data Mining. Society for Industrial and Applied Mathematics, 2014.) is used as a metric to optimize HDBSCAN algorithm. Functions DBCV, _core_dist, _mutual_reachability_dist, _mutual_reach_dist_graph, _mutual_reach_dist_graph, _mutual_reach_dist_MST, _cluster_density_sparseness, _cluster_density_separation, _cluster_validity_index, _clustering_validity_index and _get_label_members aim at DBCV computing. _score is a dataframe with the DBCV value for each combination of param_dist. We search for the higher value and compute an HDBSCAN with the best parameters. The HDBSCAN_scores_ @property return the cluster number of each sample (_labels) and the DBCV best score. """ def __init__(self, data): # Really fast # self._param_dist = {'min_samples': [1], # 'min_cluster_size':[5], # 'metric' : ['euclidean','manhattan'], # } # Medium self._param_dist = {'min_samples': [1,10], 'min_cluster_size':[5,50], 'metric' : ['euclidean','manhattan'], } # Complete # self._param_dist = {'min_samples': [1,5,10,], # 'min_cluster_size':[5,25,50,], # 'metric' : ['euclidean','manhattan'], # } self._clusterable_embedding = data # RandomizedSearchCV not working... # def scoring(model, clusterable_embedding): # label = HDBSCAN().fit_predict(clusterable_embedding) # hdbscan_score = DBCV(clusterable_embedding, label, dist_function=euclidean) # return hdbscan_score # tunning = RandomizedSearchCV(estimator=HDBSCAN(), param_distributions=param_dist, scoring=scoring) # tunning.fit(clusterable_embedding) # return tunning # compute optimization. Test each combination of parameters and store DBCV score into _score. self._score = pd.DataFrame() for i in self._param_dist.get('min_samples'): for j in self._param_dist.get('min_cluster_size'): self._ij_label = HDBSCAN(min_samples=i, min_cluster_size=j).fit_predict(self._clusterable_embedding) self._ij_hdbscan_score = self.DBCV(self._clusterable_embedding, self._ij_label,)# dist_function=euclidean) self._score.at[i,j] = self._ij_hdbscan_score # get the best DBCV score self._hdbscan_score = max(self._score.max()) # find the coordinates of the best clustering parameters and run HDBSCAN below self._bparams = np.where(self._score == self._hdbscan_score) # run HDBSCAN with best params self._labels = HDBSCAN(min_samples=self._param_dist['min_samples'][self._bparams[0][0]], min_cluster_size=self._param_dist['min_cluster_size'][self._bparams[1][0]], metric=self._param_dist['metric'][self._bparams[1][0]]).fit_predict(self._clusterable_embedding) def DBCV(self, X, labels, dist_function=euclidean): """ Implimentation of Density-Based Clustering Validation "DBCV" Citation: Moulavi, Davoud, et al. "Density-based clustering validation." Proceedings of the 2014 SIAM International Conference on Data Mining. Society for Industrial and Applied Mathematics, 2014. Density Based clustering validation Args: X (np.ndarray): ndarray with dimensions [n_samples, n_features] data to check validity of clustering labels (np.array): clustering assignments for data X dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: cluster_validity (float) score in range[-1, 1] indicating validity of clustering assignments """ graph = self._mutual_reach_dist_graph(X, labels, dist_function) mst = self._mutual_reach_dist_MST(graph) cluster_validity = self._clustering_validity_index(mst, labels) return cluster_validity def _core_dist(self, point, neighbors, dist_function): """ Computes the core distance of a point. Core distance is the inverse density of an object. Args: point (np.array): array of dimensions (n_features,) point to compute core distance of neighbors (np.ndarray): array of dimensions (n_neighbors, n_features): array of all other points in object class dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: core_dist (float) inverse density of point """ n_features = np.shape(point)[0] n_neighbors = np.shape(neighbors)[0] distance_vector = cdist(point.reshape(1, -1), neighbors) distance_vector = distance_vector[distance_vector != 0] numerator = ((1/distance_vector)**n_features).sum() core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features) return core_dist def _mutual_reachability_dist(self, point_i, point_j, neighbors_i, neighbors_j, dist_function): """. Computes the mutual reachability distance between points Args: point_i (np.array): array of dimensions (n_features,) point i to compare to point j point_j (np.array): array of dimensions (n_features,) point i to compare to point i neighbors_i (np.ndarray): array of dims (n_neighbors, n_features): array of all other points in object class of point i neighbors_j (np.ndarray): array of dims (n_neighbors, n_features): array of all other points in object class of point j dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: mutual_reachability (float) mutual reachability between points i and j """ core_dist_i = self._core_dist(point_i, neighbors_i, dist_function) core_dist_j = self._core_dist(point_j, neighbors_j, dist_function) dist = dist_function(point_i, point_j) mutual_reachability = np.max([core_dist_i, core_dist_j, dist]) return mutual_reachability def _mutual_reach_dist_graph(self, X, labels, dist_function): """ Computes the mutual reach distance complete graph. Graph of all pair-wise mutual reachability distances between points Args: X (np.ndarray): ndarray with dimensions [n_samples, n_features] data to check validity of clustering labels (np.array): clustering assignments for data X dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: graph (np.ndarray) array of dimensions (n_samples, n_samples) Graph of all pair-wise mutual reachability distances between points. """ n_samples = np.shape(X)[0] graph = [] counter = 0 for row in range(n_samples): graph_row = [] for col in range(n_samples): point_i = X[row] point_j = X[col] class_i = labels[row] class_j = labels[col] members_i = self._get_label_members(X, labels, class_i) members_j = self._get_label_members(X, labels, class_j) dist = self._mutual_reachability_dist(point_i, point_j, members_i, members_j, dist_function) graph_row.append(dist) counter += 1 graph.append(graph_row) graph = np.array(graph) return graph def _mutual_reach_dist_MST(self, dist_tree): """ Computes minimum spanning tree of the mutual reach distance complete graph Args: dist_tree (np.ndarray): array of dimensions (n_samples, n_samples) Graph of all pair-wise mutual reachability distances between points. Returns: minimum_spanning_tree (np.ndarray) array of dimensions (n_samples, n_samples) minimum spanning tree of all pair-wise mutual reachability distances between points. """ mst = minimum_spanning_tree(dist_tree).toarray() return mst + np.transpose(mst) def _cluster_density_sparseness(self, MST, labels, cluster): """ Computes the cluster density sparseness, the minimum density within a cluster Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X cluster (int): cluster of interest Returns: cluster_density_sparseness (float) value corresponding to the minimum density within a cluster """ indices = np.where(labels == cluster)[0] cluster_MST = MST[indices][:, indices] cluster_density_sparseness = np.max(cluster_MST) return cluster_density_sparseness def _cluster_density_separation(self, MST, labels, cluster_i, cluster_j): """ Computes the density separation between two clusters, the maximum density between clusters. Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X cluster_i (int): cluster i of interest cluster_j (int): cluster j of interest Returns: density_separation (float): value corresponding to the maximum density between clusters """ indices_i = np.where(labels == cluster_i)[0] indices_j = np.where(labels == cluster_j)[0] shortest_paths = csgraph.dijkstra(MST, indices=indices_i) relevant_paths = shortest_paths[:, indices_j] density_separation = np.min(relevant_paths) return density_separation def _cluster_validity_index(self, MST, labels, cluster): """ Computes the validity of a cluster (validity of assignmnets) Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X cluster (int): cluster of interest Returns: cluster_validity (float) value corresponding to the validity of cluster assignments """ min_density_separation = np.inf for cluster_j in np.unique(labels): if cluster_j != cluster: cluster_density_separation = self._cluster_density_separation(MST, labels, cluster, cluster_j) if cluster_density_separation < min_density_separation: min_density_separation = cluster_density_separation cluster_density_sparseness = self._cluster_density_sparseness(MST, labels, cluster) numerator = min_density_separation - cluster_density_sparseness denominator = np.max([min_density_separation, cluster_density_sparseness]) cluster_validity = numerator / denominator return cluster_validity def _clustering_validity_index(self, MST, labels): """ Computes the validity of all clustering assignments for a clustering algorithm Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X Returns: validity_index (float): score in range[-1, 1] indicating validity of clustering assignments """ n_samples = len(labels) validity_index = 0 for label in np.unique(labels): fraction = np.sum(labels == label) / float(n_samples) cluster_validity = self._cluster_validity_index(MST, labels, label) validity_index += fraction * cluster_validity return validity_index def _get_label_members(self, X, labels, cluster): """ Helper function to get samples of a specified cluster. Args: X (np.ndarray): ndarray with dimensions [n_samples, n_features] data to check validity of clustering labels (np.array): clustering assignments for data X cluster (int): cluster of interest Returns: members (np.ndarray) array of dimensions (n_samples, n_features) of samples of the specified cluster. """ indices = np.where(labels == cluster)[0] members = X[indices] return members @property def HDBSCAN_scores_(self): return self._labels, self._hdbscan_score