from Packages import * from scipy.spatial.distance import euclidean, cdist from scipy.sparse.csgraph import minimum_spanning_tree from scipy.sparse import csgraph def DBCV(X, labels, dist_function=euclidean): """ Implimentation of Density-Based Clustering Validation "DBCV" Citation: Moulavi, Davoud, et al. "Density-based clustering validation." Proceedings of the 2014 SIAM International Conference on Data Mining. Society for Industrial and Applied Mathematics, 2014. Density Based clustering validation Args: X (np.ndarray): ndarray with dimensions [n_samples, n_features] data to check validity of clustering labels (np.array): clustering assignments for data X dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: cluster_validity (float) score in range[-1, 1] indicating validity of clustering assignments """ graph = _mutual_reach_dist_graph(X, labels, dist_function) mst = _mutual_reach_dist_MST(graph) cluster_validity = _clustering_validity_index(mst, labels) return cluster_validity def _core_dist(point, neighbors, dist_function): """ Computes the core distance of a point. Core distance is the inverse density of an object. Args: point (np.array): array of dimensions (n_features,) point to compute core distance of neighbors (np.ndarray): array of dimensions (n_neighbors, n_features): array of all other points in object class dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: core_dist (float) inverse density of point """ n_features = np.shape(point)[0] n_neighbors = np.shape(neighbors)[0] distance_vector = cdist(point.reshape(1, -1), neighbors) distance_vector = distance_vector[distance_vector != 0] numerator = ((1/distance_vector)**n_features).sum() core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features) return core_dist def _mutual_reachability_dist(point_i, point_j, neighbors_i, neighbors_j, dist_function): """. Computes the mutual reachability distance between points Args: point_i (np.array): array of dimensions (n_features,) point i to compare to point j point_j (np.array): array of dimensions (n_features,) point i to compare to point i neighbors_i (np.ndarray): array of dims (n_neighbors, n_features): array of all other points in object class of point i neighbors_j (np.ndarray): array of dims (n_neighbors, n_features): array of all other points in object class of point j dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: mutual_reachability (float) mutual reachability between points i and j """ core_dist_i = _core_dist(point_i, neighbors_i, dist_function) core_dist_j = _core_dist(point_j, neighbors_j, dist_function) dist = dist_function(point_i, point_j) mutual_reachability = np.max([core_dist_i, core_dist_j, dist]) return mutual_reachability def _mutual_reach_dist_graph(X, labels, dist_function): """ Computes the mutual reach distance complete graph. Graph of all pair-wise mutual reachability distances between points Args: X (np.ndarray): ndarray with dimensions [n_samples, n_features] data to check validity of clustering labels (np.array): clustering assignments for data X dist_dunction (func): function to determine distance between objects func args must be [np.array, np.array] where each array is a point Returns: graph (np.ndarray) array of dimensions (n_samples, n_samples) Graph of all pair-wise mutual reachability distances between points. """ n_samples = np.shape(X)[0] graph = [] counter = 0 for row in range(n_samples): graph_row = [] for col in range(n_samples): point_i = X[row] point_j = X[col] class_i = labels[row] class_j = labels[col] members_i = _get_label_members(X, labels, class_i) members_j = _get_label_members(X, labels, class_j) dist = _mutual_reachability_dist(point_i, point_j, members_i, members_j, dist_function) graph_row.append(dist) counter += 1 graph.append(graph_row) graph = np.array(graph) return graph def _mutual_reach_dist_MST(dist_tree): """ Computes minimum spanning tree of the mutual reach distance complete graph Args: dist_tree (np.ndarray): array of dimensions (n_samples, n_samples) Graph of all pair-wise mutual reachability distances between points. Returns: minimum_spanning_tree (np.ndarray) array of dimensions (n_samples, n_samples) minimum spanning tree of all pair-wise mutual reachability distances between points. """ mst = minimum_spanning_tree(dist_tree).toarray() return mst + np.transpose(mst) def _cluster_density_sparseness(MST, labels, cluster): """ Computes the cluster density sparseness, the minimum density within a cluster Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X cluster (int): cluster of interest Returns: cluster_density_sparseness (float) value corresponding to the minimum density within a cluster """ indices = np.where(labels == cluster)[0] cluster_MST = MST[indices][:, indices] cluster_density_sparseness = np.max(cluster_MST) return cluster_density_sparseness def _cluster_density_separation(MST, labels, cluster_i, cluster_j): """ Computes the density separation between two clusters, the maximum density between clusters. Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X cluster_i (int): cluster i of interest cluster_j (int): cluster j of interest Returns: density_separation (float): value corresponding to the maximum density between clusters """ indices_i = np.where(labels == cluster_i)[0] indices_j = np.where(labels == cluster_j)[0] shortest_paths = csgraph.dijkstra(MST, indices=indices_i) relevant_paths = shortest_paths[:, indices_j] density_separation = np.min(relevant_paths) return density_separation def _cluster_validity_index(MST, labels, cluster): """ Computes the validity of a cluster (validity of assignmnets) Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X cluster (int): cluster of interest Returns: cluster_validity (float) value corresponding to the validity of cluster assignments """ min_density_separation = np.inf for cluster_j in np.unique(labels): if cluster_j != cluster: cluster_density_separation = _cluster_density_separation(MST, labels, cluster, cluster_j) if cluster_density_separation < min_density_separation: min_density_separation = cluster_density_separation cluster_density_sparseness = _cluster_density_sparseness(MST, labels, cluster) numerator = min_density_separation - cluster_density_sparseness denominator = np.max([min_density_separation, cluster_density_sparseness]) cluster_validity = numerator / denominator return cluster_validity def _clustering_validity_index(MST, labels): """ Computes the validity of all clustering assignments for a clustering algorithm Args: MST (np.ndarray): minimum spanning tree of all pair-wise mutual reachability distances between points. labels (np.array): clustering assignments for data X Returns: validity_index (float): score in range[-1, 1] indicating validity of clustering assignments """ n_samples = len(labels) validity_index = 0 for label in np.unique(labels): fraction = np.sum(labels == label) / float(n_samples) cluster_validity = _cluster_validity_index(MST, labels, label) validity_index += fraction * cluster_validity return validity_index def _get_label_members(X, labels, cluster): """ Helper function to get samples of a specified cluster. Args: X (np.ndarray): ndarray with dimensions [n_samples, n_features] data to check validity of clustering labels (np.array): clustering assignments for data X cluster (int): cluster of interest Returns: members (np.ndarray) array of dimensions (n_samples, n_features) of samples of the specified cluster. """ indices = np.where(labels == cluster)[0] members = X[indices] return members def HDBSCAN_function(data, min_cluster_size): # param_dist = {'min_samples': [1,5,10,30], # 'min_cluster_size':[5,10,20,30,50,75,100], # # 'cluster_selection_method' : ['eom','leaf'], # # 'metric' : ['euclidean','manhattan'] # } param_dist = {'min_samples': [1,5], 'min_cluster_size':[5,10], } clusterable_embedding = UMAP( n_neighbors=20, min_dist=0.0, n_components=5, random_state=42, ).fit_transform(data) min_score = pd.DataFrame() for i in param_dist.get('min_samples'): for j in param_dist.get('min_cluster_size'): ij_label = HDBSCAN(min_samples=i, min_cluster_size=j).fit_predict(clusterable_embedding) ij_hdbscan_score = DBCV(clusterable_embedding, ij_label, dist_function=euclidean) min_score.at[i,j] = ij_hdbscan_score hdbscan_score = max(min_score.max()) # get the coordinates of the best clustering paramters and run HDBSCAN below labels = HDBSCAN(min_samples=1, min_cluster_size=min_cluster_size).fit_predict(clusterable_embedding) return labels, hdbscan_score