Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
NIRS_Workflow
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
CEFE
PACE
NIRS_Workflow
Commits
90d465b1
Commit
90d465b1
authored
11 months ago
by
BARTHES Nicolas
Browse files
Options
Downloads
Patches
Plain Diff
added HDBSCAN clustering with optimization
parent
edd165e7
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
hdbscan.py
+285
-0
285 additions, 0 deletions
hdbscan.py
with
285 additions
and
0 deletions
hdbscan.py
0 → 100644
+
285
−
0
View file @
90d465b1
from
Packages
import
*
from
scipy.spatial.distance
import
euclidean
,
cdist
from
scipy.sparse.csgraph
import
minimum_spanning_tree
from
scipy.sparse
import
csgraph
def
DBCV
(
X
,
labels
,
dist_function
=
euclidean
):
"""
Implimentation of Density-Based Clustering Validation
"
DBCV
"
Citation:
Moulavi, Davoud, et al.
"
Density-based clustering validation.
"
Proceedings of the 2014 SIAM International Conference on Data Mining.
Society for Industrial and Applied Mathematics, 2014.
Density Based clustering validation
Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: cluster_validity (float)
score in range[-1, 1] indicating validity of clustering assignments
"""
graph
=
_mutual_reach_dist_graph
(
X
,
labels
,
dist_function
)
mst
=
_mutual_reach_dist_MST
(
graph
)
cluster_validity
=
_clustering_validity_index
(
mst
,
labels
)
return
cluster_validity
def
_core_dist
(
point
,
neighbors
,
dist_function
):
"""
Computes the core distance of a point.
Core distance is the inverse density of an object.
Args:
point (np.array): array of dimensions (n_features,)
point to compute core distance of
neighbors (np.ndarray): array of dimensions (n_neighbors, n_features):
array of all other points in object class
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: core_dist (float)
inverse density of point
"""
n_features
=
np
.
shape
(
point
)[
0
]
n_neighbors
=
np
.
shape
(
neighbors
)[
0
]
distance_vector
=
cdist
(
point
.
reshape
(
1
,
-
1
),
neighbors
)
distance_vector
=
distance_vector
[
distance_vector
!=
0
]
numerator
=
((
1
/
distance_vector
)
**
n_features
).
sum
()
core_dist
=
(
numerator
/
(
n_neighbors
-
1
))
**
(
-
1
/
n_features
)
return
core_dist
def
_mutual_reachability_dist
(
point_i
,
point_j
,
neighbors_i
,
neighbors_j
,
dist_function
):
"""
.
Computes the mutual reachability distance between points
Args:
point_i (np.array): array of dimensions (n_features,)
point i to compare to point j
point_j (np.array): array of dimensions (n_features,)
point i to compare to point i
neighbors_i (np.ndarray): array of dims (n_neighbors, n_features):
array of all other points in object class of point i
neighbors_j (np.ndarray): array of dims (n_neighbors, n_features):
array of all other points in object class of point j
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: mutual_reachability (float)
mutual reachability between points i and j
"""
core_dist_i
=
_core_dist
(
point_i
,
neighbors_i
,
dist_function
)
core_dist_j
=
_core_dist
(
point_j
,
neighbors_j
,
dist_function
)
dist
=
dist_function
(
point_i
,
point_j
)
mutual_reachability
=
np
.
max
([
core_dist_i
,
core_dist_j
,
dist
])
return
mutual_reachability
def
_mutual_reach_dist_graph
(
X
,
labels
,
dist_function
):
"""
Computes the mutual reach distance complete graph.
Graph of all pair-wise mutual reachability distances between points
Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
dist_dunction (func): function to determine distance between objects
func args must be [np.array, np.array] where each array is a point
Returns: graph (np.ndarray)
array of dimensions (n_samples, n_samples)
Graph of all pair-wise mutual reachability distances between points.
"""
n_samples
=
np
.
shape
(
X
)[
0
]
graph
=
[]
counter
=
0
for
row
in
range
(
n_samples
):
graph_row
=
[]
for
col
in
range
(
n_samples
):
point_i
=
X
[
row
]
point_j
=
X
[
col
]
class_i
=
labels
[
row
]
class_j
=
labels
[
col
]
members_i
=
_get_label_members
(
X
,
labels
,
class_i
)
members_j
=
_get_label_members
(
X
,
labels
,
class_j
)
dist
=
_mutual_reachability_dist
(
point_i
,
point_j
,
members_i
,
members_j
,
dist_function
)
graph_row
.
append
(
dist
)
counter
+=
1
graph
.
append
(
graph_row
)
graph
=
np
.
array
(
graph
)
return
graph
def
_mutual_reach_dist_MST
(
dist_tree
):
"""
Computes minimum spanning tree of the mutual reach distance complete graph
Args:
dist_tree (np.ndarray): array of dimensions (n_samples, n_samples)
Graph of all pair-wise mutual reachability distances
between points.
Returns: minimum_spanning_tree (np.ndarray)
array of dimensions (n_samples, n_samples)
minimum spanning tree of all pair-wise mutual reachability
distances between points.
"""
mst
=
minimum_spanning_tree
(
dist_tree
).
toarray
()
return
mst
+
np
.
transpose
(
mst
)
def
_cluster_density_sparseness
(
MST
,
labels
,
cluster
):
"""
Computes the cluster density sparseness, the minimum density
within a cluster
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest
Returns: cluster_density_sparseness (float)
value corresponding to the minimum density within a cluster
"""
indices
=
np
.
where
(
labels
==
cluster
)[
0
]
cluster_MST
=
MST
[
indices
][:,
indices
]
cluster_density_sparseness
=
np
.
max
(
cluster_MST
)
return
cluster_density_sparseness
def
_cluster_density_separation
(
MST
,
labels
,
cluster_i
,
cluster_j
):
"""
Computes the density separation between two clusters, the maximum
density between clusters.
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
cluster_i (int): cluster i of interest
cluster_j (int): cluster j of interest
Returns: density_separation (float):
value corresponding to the maximum density between clusters
"""
indices_i
=
np
.
where
(
labels
==
cluster_i
)[
0
]
indices_j
=
np
.
where
(
labels
==
cluster_j
)[
0
]
shortest_paths
=
csgraph
.
dijkstra
(
MST
,
indices
=
indices_i
)
relevant_paths
=
shortest_paths
[:,
indices_j
]
density_separation
=
np
.
min
(
relevant_paths
)
return
density_separation
def
_cluster_validity_index
(
MST
,
labels
,
cluster
):
"""
Computes the validity of a cluster (validity of assignmnets)
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest
Returns: cluster_validity (float)
value corresponding to the validity of cluster assignments
"""
min_density_separation
=
np
.
inf
for
cluster_j
in
np
.
unique
(
labels
):
if
cluster_j
!=
cluster
:
cluster_density_separation
=
_cluster_density_separation
(
MST
,
labels
,
cluster
,
cluster_j
)
if
cluster_density_separation
<
min_density_separation
:
min_density_separation
=
cluster_density_separation
cluster_density_sparseness
=
_cluster_density_sparseness
(
MST
,
labels
,
cluster
)
numerator
=
min_density_separation
-
cluster_density_sparseness
denominator
=
np
.
max
([
min_density_separation
,
cluster_density_sparseness
])
cluster_validity
=
numerator
/
denominator
return
cluster_validity
def
_clustering_validity_index
(
MST
,
labels
):
"""
Computes the validity of all clustering assignments for a
clustering algorithm
Args:
MST (np.ndarray): minimum spanning tree of all pair-wise
mutual reachability distances between points.
labels (np.array): clustering assignments for data X
Returns: validity_index (float):
score in range[-1, 1] indicating validity of clustering assignments
"""
n_samples
=
len
(
labels
)
validity_index
=
0
for
label
in
np
.
unique
(
labels
):
fraction
=
np
.
sum
(
labels
==
label
)
/
float
(
n_samples
)
cluster_validity
=
_cluster_validity_index
(
MST
,
labels
,
label
)
validity_index
+=
fraction
*
cluster_validity
return
validity_index
def
_get_label_members
(
X
,
labels
,
cluster
):
"""
Helper function to get samples of a specified cluster.
Args:
X (np.ndarray): ndarray with dimensions [n_samples, n_features]
data to check validity of clustering
labels (np.array): clustering assignments for data X
cluster (int): cluster of interest
Returns: members (np.ndarray)
array of dimensions (n_samples, n_features) of samples of the
specified cluster.
"""
indices
=
np
.
where
(
labels
==
cluster
)[
0
]
members
=
X
[
indices
]
return
members
def
HDBSCAN_function
(
data
,
min_cluster_size
):
# param_dist = {'min_samples': [1,5,10,30],
# 'min_cluster_size':[5,10,20,30,50,75,100],
# # 'cluster_selection_method' : ['eom','leaf'],
# # 'metric' : ['euclidean','manhattan']
# }
param_dist
=
{
'
min_samples
'
:
[
1
,
5
],
'
min_cluster_size
'
:[
5
,
10
],
}
clusterable_embedding
=
UMAP
(
n_neighbors
=
20
,
min_dist
=
0.0
,
n_components
=
5
,
random_state
=
42
,
).
fit_transform
(
data
)
min_score
=
pd
.
DataFrame
()
for
i
in
param_dist
.
get
(
'
min_samples
'
):
for
j
in
param_dist
.
get
(
'
min_cluster_size
'
):
ij_label
=
HDBSCAN
(
min_samples
=
i
,
min_cluster_size
=
j
).
fit_predict
(
clusterable_embedding
)
ij_hdbscan_score
=
DBCV
(
clusterable_embedding
,
ij_label
,
dist_function
=
euclidean
)
min_score
.
at
[
i
,
j
]
=
ij_hdbscan_score
hdbscan_score
=
max
(
min_score
.
max
())
# get the coordinates of the best clustering paramters and run HDBSCAN below
labels
=
HDBSCAN
(
min_samples
=
1
,
min_cluster_size
=
min_cluster_size
).
fit_predict
(
clusterable_embedding
)
return
labels
,
hdbscan_score
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment