Skip to content
Snippets Groups Projects
clustering.py 17.5 KiB
Newer Older
DIANE's avatar
DIANE committed
import numpy as np
from pandas import DataFrame
from sklearn.cluster import KMeans


DIANE's avatar
DIANE committed

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~  kmeans ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
class Sk_Kmeans:
    """K-Means clustering for Samples selection.

    Returns:
        inertia_ (DataFrame): DataFrame with ...
        x (DataFrame): Initial data
        clu (DataFrame): Cluster name for each sample
        model.cluster_centers_ (DataFrame): Coordinates of the center of each cluster
    """
    def __init__(self, x, max_clusters):
        """Initiate the KMeans class.

        Args:
            x (DataFrame): the original reduced data to cluster
            max_cluster (Int): the max number of desired clusters.
        """
        self.x = x
        self.max_clusters = max_clusters

        self.inertia = DataFrame()
        for i in range(1, max_clusters+1):
            model = KMeans(n_clusters = i, init = 'k-means++', random_state = 42)
            model.fit(x)
            self.inertia[f'{i}_clust']= [model.inertia_]
        self.inertia.index = ['inertia']

    @property
    def inertia_(self):
        return self.inertia
    
    @property
    def suggested_n_clusters_(self):
        idxidx = []
        values = []

        s = self.inertia.to_numpy().ravel()
        for i in range(self.max_clusters-1):
            idxidx.append(f'{i+1}_clust')
            values.append((s[i] - s[i+1])*100 / s[i])

        id = np.max(np.where(np.array(values) > 5))+2
        return id
    
    @property
    def fit_optimal_(self):
        model = KMeans(n_clusters = self.suggested_n_clusters_, init = 'k-means++', random_state = 42)
        model.fit(self.x)
        yp = model.predict(self.x)+1
        clu = [f'cluster#{i}' for i in yp]

        return self.x, clu, model.cluster_centers_
    




    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~hdbscan ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class Hdbscan:
    """Runs an automatically optimized sklearn.HDBSCAN clustering on dimensionality reduced space.

    The HDBSCAN_scores_ @Property returns the cluster number of each sample (_labels) and the DBCV best score.

    Returns:
        _labels (DataFrame): DataFrame with the cluster belonging number for each sample
        _hdbscan_score (float): a float with the best DBCV score after optimization

    Examples:
        - clustering = HDBSCAN((data)
        - scores = clustering.HDBSCAN_scores_

    """
    def __init__(self, data):
DIANE's avatar
DIANE committed
        from sklearn.cluster import HDBSCAN

DIANE's avatar
DIANE committed
        """Initiate the HDBSCAN calculation

        Args:
            data (DataFrame): the Dimensionality reduced space, raw result of the UMAP.fit()
            param_dist (dictionary): the HDBSCAN optimization parameters to test
            _score (DataFrame): is a dataframe with the DBCV value for each combination of param_dist. We search for the higher value to then compute an HDBSCAN with the best parameters.
        """
        # Really fast
        self._param_dist = {'min_samples': [8],
                      'min_cluster_size':[10],
                      'metric' : ['euclidean'],#,'manhattan'],
                      }
        # Medium
        # self._param_dist = {'min_samples': [1,10],
        #     'min_cluster_size':[5,50],
        #     'metric' : ['euclidean','manhattan'],
        #     }
        # Complete
        # self._param_dist = {'min_samples': [1,5,10,],
        #       'min_cluster_size':[5,25,50,],
        #       'metric' : ['euclidean','manhattan'],
        #       }

        self._clusterable_embedding = data

        # RandomizedSearchCV not working...
        # def scoring(model, clusterable_embedding):
        #     label = HDBSCAN().fit_predict(clusterable_embedding)
        #     hdbscan_score = DBCV(clusterable_embedding, label, dist_function=euclidean)
        #     return hdbscan_score
        # tunning = RandomizedSearchCV(estimator=HDBSCAN(), param_distributions=param_dist,  scoring=scoring)
        # tunning.fit(clusterable_embedding)
        # return tunning

        # compute optimization. Test each combination of parameters and store DBCV score into _score.
        # self._score = DataFrame()
        # for i in self._param_dist.get('min_samples'):
        #     for j in self._param_dist.get('min_cluster_size'):
        #         self._ij_label = HDBSCAN(min_samples=i, min_cluster_size=j).fit_predict(self._clusterable_embedding)
        #         self._ij_hdbscan_score = self.DBCV(self._clusterable_embedding, self._ij_label,)# dist_function=euclidean)
        #         self._score.at[i,j] = self._ij_hdbscan_score
        # get the best DBCV score
        # self._hdbscan_bscore  = max(self._score.max())
        # find the coordinates of the best clustering parameters and run HDBSCAN below
        # self._bparams = np.where(self._score == self._hdbscan_bscore)
        # run HDBSCAN with best params

        # self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][self._bparams[0][0]], min_cluster_size=self._param_dist['min_cluster_size'][self._bparams[1][0]], metric=self._param_dist['metric'][self._bparams[1][0]], store_centers="medoid", )
        self.best_hdbscan = HDBSCAN(min_samples=self._param_dist['min_samples'][0], min_cluster_size=self._param_dist['min_cluster_size'][0], metric=self._param_dist['metric'][0], store_centers="medoid", )
        self.best_hdbscan.fit_predict(self._clusterable_embedding)
        self._labels = self.best_hdbscan.labels_
        self._centers = self.best_hdbscan.medoids_


    # def DBCV(self, X, labels, dist_function=euclidean):
    #     """
    #     Implimentation of Density-Based Clustering Validation "DBCV"
    #
    #     Citation: Moulavi, Davoud, et al. "Density-based clustering validation."
    #     Proceedings of the 2014 SIAM International Conference on Data Mining.
    #     Society for Industrial and Applied Mathematics, 2014.
    #
    #     Density Based clustering validation
    #
    #     Args:
    #         X (np.ndarray): ndarray with dimensions [n_samples, n_features]
    #             data to check validity of clustering
    #         labels (np.array): clustering assignments for data X
    #         dist_dunction (func): function to determine distance between objects
    #             func args must be [np.array, np.array] where each array is a point
    #
    #     Returns:
    #         cluster_validity (float): score in range[-1, 1] indicating validity of clustering assignments
    #     """
    #     graph = self._mutual_reach_dist_graph(X, labels, dist_function)
    #     mst = self._mutual_reach_dist_MST(graph)
    #     cluster_validity = self._clustering_validity_index(mst, labels)
    #     return cluster_validity
    #
    #
    # def _core_dist(self, point, neighbors, dist_function):
    #     """
    #     Computes the core distance of a point.
    #     Core distance is the inverse density of an object.
    #
    #     Args:
    #         point (np.array): array of dimensions (n_features,)
    #             point to compute core distance of
    #         neighbors (np.ndarray): array of dimensions (n_neighbors, n_features):
    #             array of all other points in object class
    #         dist_dunction (func): function to determine distance between objects
    #             func args must be [np.array, np.array] where each array is a point
    #
    #     Returns: core_dist (float)
    #         inverse density of point
    #     """
    #     n_features = np.shape(point)[0]
    #     n_neighbors = np.shape(neighbors)[0]
    #
    #     distance_vector = cdist(point.reshape(1, -1), neighbors)
    #     distance_vector = distance_vector[distance_vector != 0]
    #     numerator = ((1/distance_vector)**n_features).sum()
    #     core_dist = (numerator / (n_neighbors - 1)) ** (-1/n_features)
    #     return core_dist
    #
    # def _mutual_reachability_dist(self, point_i, point_j, neighbors_i,
    #                               neighbors_j, dist_function):
    #     """.
    #     Computes the mutual reachability distance between points
    #
    #     Args:
    #         point_i (np.array): array of dimensions (n_features,)
    #             point i to compare to point j
    #         point_j (np.array): array of dimensions (n_features,)
    #             point i to compare to point i
    #         neighbors_i (np.ndarray): array of dims (n_neighbors, n_features):
    #             array of all other points in object class of point i
    #         neighbors_j (np.ndarray): array of dims (n_neighbors, n_features):
    #             array of all other points in object class of point j
    #         dist_function (func): function to determine distance between objects
    #             func args must be [np.array, np.array] where each array is a point
    #
    #     Returns:
    #         mutual_reachability (float)
    #         mutual reachability between points i and j
    #
    #     """
    #     core_dist_i = self._core_dist(point_i, neighbors_i, dist_function)
    #     core_dist_j = self._core_dist(point_j, neighbors_j, dist_function)
    #     dist = dist_function(point_i, point_j)
    #     mutual_reachability = np.max([core_dist_i, core_dist_j, dist])
    #     return mutual_reachability
    #
    #
    # def _mutual_reach_dist_graph(self, X, labels, dist_function):
    #     """
    #     Computes the mutual reach distance complete graph.
    #     Graph of all pair-wise mutual reachability distances between points
    #
    #     Args:
    #         X (np.ndarray): ndarray with dimensions [n_samples, n_features]
    #             data to check validity of clustering
    #         labels (np.array): clustering assignments for data X
    #         dist_dunction (func): function to determine distance between objects
    #             func args must be [np.array, np.array] where each array is a point
    #
    #     Returns: graph (np.ndarray)
    #         array of dimensions (n_samples, n_samples)
    #         Graph of all pair-wise mutual reachability distances between points.
    #
    #     """
    #     n_samples = np.shape(X)[0]
    #     graph = []
    #     counter = 0
    #     for row in range(n_samples):
    #         graph_row = []
    #         for col in range(n_samples):
    #             point_i = X[row]
    #             point_j = X[col]
    #             class_i = labels[row]
    #             class_j = labels[col]
    #             members_i = self._get_label_members(X, labels, class_i)
    #             members_j = self._get_label_members(X, labels, class_j)
    #             dist = self._mutual_reachability_dist(point_i, point_j,
    #                                              members_i, members_j,
    #                                              dist_function)
    #             graph_row.append(dist)
    #         counter += 1
    #         graph.append(graph_row)
    #     graph = np.array(graph)
    #     return graph
    #
    #
    # def _mutual_reach_dist_MST(self, dist_tree):
    #     """
    #     Computes minimum spanning tree of the mutual reach distance complete graph
    #
    #     Args:
    #         dist_tree (np.ndarray): array of dimensions (n_samples, n_samples)
    #             Graph of all pair-wise mutual reachability distances
    #             between points.
    #
    #     Returns: minimum_spanning_tree (np.ndarray)
    #         array of dimensions (n_samples, n_samples)
    #         minimum spanning tree of all pair-wise mutual reachability
    #             distances between points.
    #     """
    #     mst = minimum_spanning_tree(dist_tree).toarray()
    #     return mst + np.transpose(mst)
    #
    #
    # def _cluster_density_sparseness(self, MST, labels, cluster):
    #     """
    #     Computes the cluster density sparseness, the minimum density
    #         within a cluster
    #
    #     Args:
    #         MST (np.ndarray): minimum spanning tree of all pair-wise
    #             mutual reachability distances between points.
    #         labels (np.array): clustering assignments for data X
    #         cluster (int): cluster of interest
    #
    #     Returns: cluster_density_sparseness (float)
    #         value corresponding to the minimum density within a cluster
    #     """
    #     indices = np.where(labels == cluster)[0]
    #     cluster_MST = MST[indices][:, indices]
    #     cluster_density_sparseness = np.max(cluster_MST)
    #     return cluster_density_sparseness
    #
    #
    # def _cluster_density_separation(self, MST, labels, cluster_i, cluster_j):
    #     """
    #     Computes the density separation between two clusters, the maximum
    #         density between clusters.
    #
    #     Args:
    #         MST (np.ndarray): minimum spanning tree of all pair-wise
    #             mutual reachability distances between points.
    #         labels (np.array): clustering assignments for data X
    #         cluster_i (int): cluster i of interest
    #         cluster_j (int): cluster j of interest
    #
    #     Returns: density_separation (float):
    #         value corresponding to the maximum density between clusters
    #     """
    #     indices_i = np.where(labels == cluster_i)[0]
    #     indices_j = np.where(labels == cluster_j)[0]
    #     shortest_paths = csgraph.dijkstra(MST, indices=indices_i)
    #     relevant_paths = shortest_paths[:, indices_j]
    #     density_separation = np.min(relevant_paths)
    #     return density_separation
    #
    #
    # def _cluster_validity_index(self, MST, labels, cluster):
    #     """
    #     Computes the validity of a cluster (validity of assignmnets)
    #
    #     Args:
    #         MST (np.ndarray): minimum spanning tree of all pair-wise
    #             mutual reachability distances between points.
    #         labels (np.array): clustering assignments for data X
    #         cluster (int): cluster of interest
    #
    #     Returns: cluster_validity (float)
    #         value corresponding to the validity of cluster assignments
    #     """
    #     min_density_separation = np.inf
    #     for cluster_j in np.unique(labels):
    #         if cluster_j != cluster:
    #             cluster_density_separation = self._cluster_density_separation(MST,
    #                                                                      labels,
    #                                                                      cluster,
    #                                                                      cluster_j)
    #             if cluster_density_separation < min_density_separation:
    #                 min_density_separation = cluster_density_separation
    #     cluster_density_sparseness = self._cluster_density_sparseness(MST,
    #                                                              labels,
    #                                                              cluster)
    #     numerator = min_density_separation - cluster_density_sparseness
    #     denominator = np.max([min_density_separation, cluster_density_sparseness])
    #     cluster_validity = numerator / denominator
    #     return cluster_validity
    #
    #
    # def _clustering_validity_index(self, MST, labels):
    #     """
    #     Computes the validity of all clustering assignments for a
    #     clustering algorithm
    #
    #     Args:
    #         MST (np.ndarray): minimum spanning tree of all pair-wise
    #             mutual reachability distances between points.
    #         labels (np.array): clustering assignments for data X
    #
    #     Returns: validity_index (float):
    #         score in range[-1, 1] indicating validity of clustering assignments
    #     """
    #     n_samples = len(labels)
    #     validity_index = 0
    #     for label in np.unique(labels):
    #         fraction = np.sum(labels == label) / float(n_samples)
    #         cluster_validity = self._cluster_validity_index(MST, labels, label)
    #         validity_index += fraction * cluster_validity
    #     return validity_index
    #
    #
    # def _get_label_members(self, X, labels, cluster):
    #     """
    #     Helper function to get samples of a specified cluster.
    #
    #     Args:
    #         X (np.ndarray): ndarray with dimensions [n_samples, n_features]
    #             data to check validity of clustering
    #         labels (np.array): clustering assignments for data X
    #         cluster (int): cluster of interest
    #
    #     Returns: members (np.ndarray)
    #         array of dimensions (n_samples, n_features) of samples of the
    #         specified cluster.
    #     """
    #     indices = np.where(labels == cluster)[0]
    #     members = X[indices]
    #     return members

    @property
    def centers_(self):
        # return self._labels, self._hdbscan_bscore, self._centers
        return self._centers
    @property
    def labels_(self):
        labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels]
        return labels
    @property
    def non_clustered(self):
        labels = [f'cluster#{i+1}' if i !=-1 else 'Non clustered' for i in self._labels]
        non_clustered = np.where(np.array(labels) == 'Non clustered')[0]
        return non_clustered



    # ~~~~~~~~~~~~~~~~~~~~~~~~~ ap  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class AP:
    def __init__(self, X):
        ## input matrix
        self.__x = np.array(X)

        # Fit PCA model
DIANE's avatar
DIANE committed
        from sklearn.cluster import AffinityPropagation
DIANE's avatar
DIANE committed
        self.M = AffinityPropagation(damping=0.5, max_iter=200, convergence_iter=15, copy=True, preference=None,
                                 affinity='euclidean', verbose=False, random_state=None)
        self.M.fit(self.__x)
        self.yp = self.M.predict(self.__x)+1
    @property
    def fit_optimal_(self):
        clu = [f'cluster#{i}' for i in self.yp]
        return self.__x, clu, self.M.cluster_centers_