"""
Clustering operations.
elbow_method,
gap_statistic,
dbscan_clustering
"""
from __future__ import annotations
from typing import Callable
import numpy as np
from pandas import DataFrame
from sklearn.cluster import DBSCAN, KMeans
from pymove.utils.constants import EARTH_RADIUS, LATITUDE, LONGITUDE, N_CLUSTER
from pymove.utils.conversions import meters_to_eps
from pymove.utils.log import logger, progress_bar, timer_decorator
[docs]@timer_decorator
def elbow_method(
move_data: DataFrame,
k_initial: int = 1,
max_clusters: int = 15,
k_iteration: int = 1,
random_state: int | None = None
) -> dict:
"""
Determines the optimal number of clusters.
In the range set by the user using the elbow method.
Parameters
----------
move_data : dataframe
The input trajectory data.
k_initial: int, optional
The initial value used in the interaction of the elbow method.
Represents the maximum numbers of clusters, by default 1
max_clusters: int, optional
The maximum value used in the interaction of the elbow method.
Maximum number of clusters to test for, by default 15
k_iteration: int, optional
Increment value of the sequence used by the elbow method, by default 1
random_state: int, RandomState instance
Determines random number generation for centroid initialization.
Use an int to make the randomness deterministic, by default None
Returns
-------
dict
The inertia values for the different numbers of clusters
Example
-------
clustering.elbow_method(move_data=move_df, k_iteration=3)
{
1: 55084.15957839036,
4: 245.68365592382938,
7: 92.31472644640075,
10: 62.618599956870355,
13: 45.59653757292055,
}
"""
message = 'Executing Elbow Method for {} to {} clusters at {} steps\n'.format(
k_initial, max_clusters, k_iteration
)
logger.debug(message)
inertia_dic = {}
for k in progress_bar(
range(k_initial, max_clusters + 1, k_iteration), desc='Running KMeans'
):
km = KMeans(n_clusters=k, random_state=random_state)
inertia_dic[k] = km.fit(move_data[[LATITUDE, LONGITUDE]]).inertia_
return inertia_dic
[docs]@timer_decorator
def gap_statistic(
move_data: DataFrame,
nrefs: int = 3,
k_initial: int = 1,
max_clusters: int = 15,
k_iteration: int = 1,
random_state: int | None = None
) -> dict:
"""
Calculates optimal clusters numbers using Gap Statistic.
From Tibshirani, Walther, Hastie.
Parameters
----------
move_data: ndarray of shape (n_samples, n_features).
The input trajectory data.
nrefs: int, optional
number of sample reference datasets to create, by default 3
k_initial: int, optional.
The initial value used in the interaction of the elbow method, by default 1
Represents the maximum numbers of clusters.
max_clusters: int, optional
Maximum number of clusters to test for, by default 15
k_iteration:int, optional
Increment value of the sequence used by the elbow method, by default 1
random_state: int, RandomState instance
Determines random number generation for centroid initialization.
Use an int to make the randomness deterministic, by default None
Returns
-------
dict
The error value for each cluster number
Notes
-----
https://anaconda.org/milesgranger/gap-statistic/notebook
"""
message = 'Executing Gap Statistic for {} to {} clusters at {} steps\n'.format(
k_initial, max_clusters, k_iteration
)
logger.debug(message)
gaps = {}
np.random.seed(random_state)
for k in progress_bar(
range(k_initial, max_clusters + 1, k_iteration), desc='Running KMeans'
):
# Holder for reference dispersion results
ref_disps = np.zeros(nrefs)
# For n references, generate random sample and perform kmeans
# getting resulting dispersion of each loop
for i in range(nrefs):
# Create new random reference set
random_reference = np.random.random_sample(size=move_data.shape)
# Fit to it
km = KMeans(n_clusters=k, random_state=random_state)
ref_disps[i] = km.fit(random_reference).inertia_
# Fit cluster to original data and create dispersion
km = KMeans(k).fit(move_data[[LATITUDE, LONGITUDE]])
orig_disp = km.inertia_
# Calculate gap statistic
gap = np.log(np.mean(ref_disps)) - np.log(orig_disp)
# Assign this loop gap statistic to gaps
gaps[k] = gap
return gaps
[docs]@timer_decorator
def dbscan_clustering(
move_data: DataFrame,
cluster_by: str,
meters: int = 10,
min_sample: float = 1680 / 2,
earth_radius: float = EARTH_RADIUS,
metric: str | Callable = 'euclidean',
inplace: bool = False
) -> DataFrame | None:
"""
Performs density based clustering on the move_dataframe according to cluster_by.
Parameters
----------
move_data : dataframe
the input trajectory
cluster_by : str
the colum to cluster
meters : int, optional
distance to use in the clustering, by default 10
min_sample : float, optional
the minimum number of samples to consider a cluster, by default 1680/2
earth_radius : int
Y offset from your original position in meters, by default EARTH_RADIUS
metric: string, or callable, optional
The metric to use when calculating distance between instances in a feature array
by default 'euclidean'
inplace : bool, optional
Whether to return a new DataFrame, by default False
Returns
-------
DataFrame
Clustered dataframe or None
"""
if not inplace:
move_data = move_data[:]
move_data.reset_index(drop=True, inplace=True)
move_data[N_CLUSTER] = -1
for cluster_id in progress_bar(move_data[cluster_by].unique(), desc='Clustering'):
df_filter = move_data[move_data[cluster_by] == cluster_id]
dbscan = DBSCAN(
eps=meters_to_eps(meters, earth_radius),
min_samples=min_sample,
metric=metric
)
dbscan_result = dbscan.fit(df_filter[[LATITUDE, LONGITUDE]].to_numpy())
idx = df_filter.index
res = dbscan_result.labels_ + move_data[N_CLUSTER].max() + 1
move_data.at[idx, N_CLUSTER] = res
if not inplace:
return move_data