Source code for phasik.classes.clustering.ClusterSet

"""
Base class for a cluster set of temporal network snapshots
"""

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import scipy.cluster.hierarchy as sch
import seaborn as sb
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_samples, silhouette_score

from phasik.classes import DistanceMatrix
from phasik.drawing.drawing_clusters import plot_cluster_set, plot_dendrogram

__all__ = ["ClusterSet"]


[docs]class ClusterSet: """Base class for a set of clusters (partition) of timepoints Attributes ----------- clusters : list of int Clusters as a list of cluster labels times : list of (int or float) Sorted list of time associated to each clustered snapshot n_clusters : int Number of clusters in the cluster set (partition) cluster_method : float Method used to cluster the snapshots . Examples : 'k_means', 'centroid', 'average', 'complete', 'weighted', 'median', 'single', 'ward' n_max_type : float Method that was used to determine when to stop clustering when creating this cluster set. e.g. A cluster set can be created by clustering until a particular number of clusters has been reached ('maxclust'), or until every cluster is at least a certain distance away from each other ('distance'). n_max : int Value corresponding to the n_max_type described above. distance_metric : str Distance metric used to compute the distance between snapshots, e.g. 'euclidean', with sklearn.metrics.pairwise.paired_distances. It must be one of the options allowed by scipy.spatial.distance.pdist for its metric parameter (e.g. 'chebyshev', 'cityblock', 'correlation', 'cosine', 'euclidean', 'hamming', 'jaccard', etc.), or a metric listed in pairwise.PAIRWISE_DISTANCE_FUNCTIONS. """ def __init__( self, clusters, times, linkage, distance_matrix, distance_metric, cluster_method, n_clusters_max, n_max_type, ): """ Parameters ---------- clusters : list of int Clusters as a list of cluster labels times : list of (int or float) Sorted list of time associated to each clustered snapshot linkage : Linkage of the clustering distance_matrix : phasik.DistanceMatrix Distance matrix from which the clusters were computed cluster_method : float Method used to cluster the snapshots . Examples : k_means', 'centroid', 'average', 'complete', 'weighted', 'median', 'single', 'ward' n_max_type : float Method that was used to determine when to stop clustering when creating this cluster set. e.g. A cluster set can be created by clustering until a particular number of clusters has been reached ('maxclust'), or until every cluster is at least a certain distance away from each other ('distance'). n_clusters_max : int Value corresponding to the n_max_type described above. distance_metric : str Distance metric used to compute the distance between snapshots, e.g. 'euclidean', with sklearn.metrics.pairwise.paired_distances. It must be one of the options allowed by scipy.spatial.distance.pdist for its metric parameter (e.g. 'chebyshev', 'cityblock', 'correlation', 'cosine', 'euclidean', 'hamming', 'jaccard', etc.), or a metric listed in pairwise.PAIRWISE_DISTANCE_FUNCTIONS. """ self._clusters = clusters self._times = times self.n_clusters = len(set(clusters)) self._cluster_method = cluster_method self._n_max = n_clusters_max self._n_max_type = n_max_type self._distance_metric = distance_metric try: self.silhouette_average = silhouette_score( distance_matrix.distance_matrix, clusters, metric="precomputed" ) self.silhouette_samples = silhouette_samples( distance_matrix.distance_matrix, clusters, metric="precomputed" ) except ValueError as error: # Often the number of clusters is 1, which sklearn does not like. print( f"WARNING: unable to compute silhouette for cluster set. Error is: {error}" ) self.silhouette_average = 0 self.silhouette_samples = np.array([]) self.linkage = linkage @property def clusters(self): """Returns the clusters, i.e. a list of cluster labels (int)""" return self._clusters @clusters.setter def clusters(self, arr): self._clusters = arr @property def times(self): """Returns the list of times corresponding to datapoints clustered""" return self._times @property def cluster_method(self): """Returns the clustering method used to cluster the temporal data""" return self._cluster_method @property def n_max_type(self): """Returns the method (str) that determines when to stop clustering""" return self._n_max_type @property def n_max(self): """Returns the value corresponding to the n_max_type described above.""" return self._n_max @property def distance_metric(self): """Returns the distance metric used to compute the distance between snapshots, e.g. 'euclidean'""" return self._distance_metric
[docs] @classmethod def from_distance_matrix( cls, distance_matrix, n_max_type, n_clusters_max, cluster_method ): """Generates a ClusterSet from a distance matrix Parameters ---------- distance_matrix : phasik.DistanceMatrix Distance matrix from which to cluster n_max_type : str The method that determines when to stop clustering. For example, cluster set can be created by clustering until a particular number of clusters has been reached ('maxclust'), or until every cluster is at least a certain distance away from each other ('distance'). n_clusters_max : int Value corresponding to the n_max_type described above. cluster_method : str Clustering method used to cluster the temporal network snapshots. Examples : 'k_means', 'centroid', 'average', 'complete', 'weighted', 'median', 'single', 'ward' Returns ------- ClusterSet """ times = distance_matrix.times distance_metric = distance_matrix.distance_metric if cluster_method == "k_means": # k-means clustering is only applicable for n_max_type of 'maxclust' if n_max_type != "maxclust": raise ValueError( f"With {cluster_method}, the n_max_type must be 'maxclust'." ) k_means = KMeans(n_clusters=n_clusters_max, random_state=None) clusters = k_means.fit_predict(distance_matrix.snapshots_flat) + 1 linkage = None else: # hierarchical clustering TODO specify allowed methods # From scipy's documentation: # "Methods ‘centroid’, ‘median’, and ‘ward’ are correctly defined # only if Euclidean pairwise metric is used. If 'y' is passed as precomputed pairwise distances, # then it is the user’s responsibility to assure that these distances are in fact Euclidean, # otherwise the produced result will be incorrect." if cluster_method in ["ward", "centroid", "median"]: if distance_metric != "euclidean": raise ValueError( f"With {cluster_method}-linkage, the distance metric must be 'euclidean'." ) # if len(distance_matrix.distance_matrix_flat) > 0 : # TODO check what it checks linkage = sch.linkage( distance_matrix.distance_matrix_flat, method=cluster_method ) clusters = sch.fcluster(linkage, n_clusters_max, criterion=n_max_type) return cls( clusters, times, linkage, distance_matrix, distance_metric, cluster_method, n_clusters_max, n_max_type, )
[docs] @classmethod def from_temporal_network( cls, temporal_network, distance_metric, cluster_method, n_max_type, n_clusters_max, ): """Generates a ClusterSet from a temporal network Parameters ---------- temporal_network : TemporalNetwork Temporal network from which to compute the distance matrix distance_metric : str Distance metric used to compute the distance between snapshots, e.g. 'euclidean', with sklearn.metrics.pairwise.paired_distances. It must be one of the options allowed by scipy.spatial.distance.pdist for its metric parameter (e.g. 'chebyshev', 'cityblock', 'correlation', 'cosine', 'euclidean', 'hamming', 'jaccard', etc.), or a metric listed in pairwise.PAIRWISE_DISTANCE_FUNCTIONS. cluster_method : str Clustering method used to cluster the temporal network snapshots. Examples : 'k_means', 'centroid', 'average', 'complete', 'weighted', 'median', 'single', 'ward' n_max_type : str The method that determines when to stop clustering. For example, cluster set can be created by clustering until a particular number of clusters has been reached ('maxclust'), or until every cluster is at least a certain distance away from each other ('distance'). n_clusters_max : int Value corresponding to the n_max_type described above. Returns ------- ClusterSet """ distance_matrix = DistanceMatrix.from_temporal_network( temporal_network, distance_metric ) return cls.from_distance_matrix( distance_matrix, n_max_type, n_clusters_max, cluster_method )
[docs] def distance_threshold(self): """Calculate the distance at which clustering stops Returns ------- int Smallest number d such that the distance between any two clusters is < d. """ if self.linkage is None: raise ValueError( "Cannot compute the threshold of a non-hierarchical clustering" ) number_of_observations = self.linkage.shape[0] + 1 if self.n_clusters >= number_of_observations: return 0 elif self.n_clusters <= 1: return self.linkage[-1, 2] * 1.001 else: return self.linkage[-self.n_clusters, 2] * 1.001
[docs] def plot_dendrogram( self, ax=None, distance_threshold=None, leaf_rotation=90, leaf_font_size=6, ): """Plot this cluster set as a dendrogram Parameters ---------- ax : matplotlib.Axes, optional Axes on which to plot distance_threshold : float, optional Threshold at which to draw a horizontal line and above which to use different colors for different branches. leaf_rotation : int or float, optional Rotation to apply to the x-axis (leaf) labels (default 90) leaf_font_size : int or str, optional Desired size of the x-axis (leaf) labels (default 6) Returns ------- None """ return plot_dendrogram( self, ax, distance_threshold, leaf_rotation, leaf_font_size )
[docs] def plot( self, colors=None, cmap="tab10", vmin=None, vmax=None, y_height=0, ax=None, **kwargs, ): """ Visualize the clusters in `cluster_set`. For each time point, a marker is drawn with a color corresponding to the cluster to which it belongs. Parameters ---------- colors: list of int, optional If None (default), cluster label 0 is assigned its automatic color "C0" and so on. If `colors` is a list (e.g. [3,1,2]), it relabels the clusters in that order and assigns them the new corresponding colors. cmap : colormap, optional Desired colormap (default 'tab10'). vmin/vmax : float, optional Min and max values to use for the color mapping. If None (default), computed from the data in `colors`. y_height : int or float, optional Vertical value at which to draw the markers (default 0). If a single cluster is drawn this value does not matter. ax : matplotlib.Axes, optional Axes on which to plot **kwargs : Other parameters to pass to matplotlib's scatter. Returns ------- None """ return plot_cluster_set( self, ax=ax, colors=colors, cmap=cmap, vmin=vmin, vmax=vmax, y_height=y_height, **kwargs, )
[docs] def plot_silhouette_samples(self, ax=None): """Plot the silhouette samples from this cluster set Parameters ---------- ax : matplotlib.Axes, optional Axes on which to plot Returns ------- None """ if ax is None: ax = plt.gca() # If there are more than 10 clusters in this cluster set, we'll need to use more colours in our plot. # sb.set_palette("tab20" if self.size > 10 else "tab10") # replace by single colour palette with 20 colours such that first 10 colours are same as tab10 pal = sb.color_palette("tab20", n_colors=20) pal2_arr = np.append(pal[::2], pal[1::2], axis=0) pal2 = sb.color_palette(pal2_arr) sb.set_palette(pal2) if self.silhouette_samples.size > 0: y_lower = 0 for i, cluster in enumerate(np.unique(self.clusters)): # Aggregate the silhouette scores for samples belonging to each cluster, and sort them silhouette_values = self.silhouette_samples[self.clusters == cluster] silhouette_values.sort() silhouette_size = silhouette_values.shape[0] # Calculate height of this cluster y_upper = y_lower + silhouette_size y = np.arange(y_lower, y_upper) ax.fill_betweenx( y, 0, silhouette_values, facecolor=f"C{i}", edgecolor=f"C{i}", alpha=1, ) # Compute the new y_lower for next cluster vertical_padding = 0 y_lower = y_upper + vertical_padding ax.axvline(x=self.silhouette_average, c="k", ls="--")