"""
Functions to manipulate and sort clusters
"""
from copy import deepcopy
import numpy as np
from sklearn.metrics import adjusted_rand_score
__all__ = [
"aggregate_network_by_cluster",
"convert_cluster_labels_to_dict",
"rand_index_over_methods_and_sizes",
"cluster_sort",
]
[docs]def aggregate_network_by_cluster(
temporal_network, clusters, sort_clusters=None, output="averaged"
):
"""
Aggregates the temporal network over eacher cluster in a cluster set
Parameters
----------
temporal_network : phasik.TemporalNetwork
Temporal network to aggregate
clusters : array of int
Cluster labels of length that is the number of time points
in the temporal network.
sort_clusters : bool
If True, sort cluster labels based on ascending times
output : {'weighted', 'averaged', 'binary', 'normalised'}, optional
Determines the type of output edge weights
Returns
-------
aggregates : dict
Dict each key is a cluster label and each value is a tuple
of the form (networkx.Graph, list of time indices of cluster).
Examples
--------
>>> import phasik as pk
>>> clusters = [1, 1, 1, 2, 2, 3]
>>> pk.aggregate_network_by_cluster(temporal_network, clusters, output="averaged")
{1: (<networkx.classes.graph.Graph at 0x177665df0>, [0, 1, 2]),
2: (<networkx.classes.graph.Graph at 0x177668580>, [3, 4]),
3: (<networkx.classes.graph.Graph at 0x177668e20>, [5])}
"""
aggregates = {}
clusters = np.array(clusters)
if sort_clusters is True: # sort by ascending times
clusters = cluster_sort(clusters)
elif (sort_clusters is False) or (sort_clusters is None):
pass
elif isinstance(sort_clusters, list): # sort by specified order
clusters = cluster_sort(clusters, final_labels=sort_clusters)
else:
raise ValueError(
"Invalid value for 'sort_clusters': must be True or a list of cluster labels"
)
cluster_time_indices = convert_cluster_labels_to_dict(clusters)
for cluster_label, time_indices in cluster_time_indices.items():
aggregates[cluster_label] = (
temporal_network.aggregated_network(
time_indices=time_indices, output=output
),
time_indices,
)
return aggregates
[docs]def convert_cluster_labels_to_dict(clusters):
"""Returns dictionary where each key is a cluster label and each
value is list of the time indices composing the cluster.
Parameters
----------
clusters : list of int
List of cluster labels
Returns
-------
cluster_times : dict
Examples
--------
>>> import phasik as pk
>>> pk.convert_cluster_labels_to_dict([1, 1, 1, 2, 2, 3])
{1: [0, 1, 2], 2: [3, 4], 3: [5]}
"""
n_max = max(clusters)
clusters = np.array(clusters)
cluster_times = {n: list(np.where(clusters == n)[0]) for n in range(1, n_max + 1)}
return cluster_times
[docs]def rand_index_over_methods_and_sizes(valid_cluster_sets, reference_method="ward"):
"""
Compute the Rand Index to compare any clustering method to a reference method, for all combinations of methods
and number of clusters.
Parameters
----------
valid_cluster_sets : list
List of tuples (cluster_object, method_name) representing the clustering object and the name of the
clustering method used to obtain it.
reference_method : str, optional
The name of the reference method to compare against. The default is "ward".
Returns
-------
rand_scores : ndarray
Array of dimension (n_sizes, n_methods) with Rand Index scores.
Notes
-----
The Rand Index is a measure of the similarity between two clusterings. It is based on the number of pairs of
samples that are assigned to the same or different clusters in the two clusterings. The adjusted Rand Index is a
modification of the Rand Index that takes into account chance agreements.
Examples
--------
>>> import phasik as pk
>>> clustering_methods = ["k_means", "centroid","average", "ward"]
>>> valid_cluster_sets = []
>>> for clustering_method in clustering_methods:
>>> distance_matrix = pk.DistanceMatrix.from_temporal_network(
>>> temporal_network, "euclidean"
>>> )
>>> cluster_sets = pk.ClusterSets.from_distance_matrix(
>>> distance_matrix, "maxclust", range(2, 12), clustering_method
>>> )
>>> valid_cluster_sets.append((cluster_sets, clustering_method))
>>> pk.rand_index_over_methods_and_sizes(valid_cluster_sets, reference_method="ward")
"""
# Extract the list of methods used to obtain the clusters and the reference method
valid_methods = [sets[1] for sets in valid_cluster_sets]
# Find the index of the reference method
i_ref = valid_methods.index(reference_method)
# Extract the clusters obtained with the reference method
clusters_ref = valid_cluster_sets[i_ref][0]
# Compute Rand Index to compare each method with reference method, for each number of clusters
n_sizes = len(clusters_ref.n_clusters)
n_methods = len(valid_cluster_sets)
rand_scores = np.zeros((n_sizes, n_methods))
for i_size, size in enumerate(clusters_ref.n_clusters):
for i_method, method in enumerate(valid_methods):
# Extract the clusters obtained with the current method
clusters2 = valid_cluster_sets[i_method][0]
# Compute the Rand Index between the two clusterings
rand_index = adjusted_rand_score(
clusters_ref.clusters[i_size], clusters2.clusters[i_size]
)
# Store the Rand Index in the results array
rand_scores[i_size, i_method] = rand_index
return rand_scores
[docs]def cluster_sort(clusters, final_labels=None):
"""
Sorts an array of cluster labels in order of appearance, and returns the sorted array while leaving the original clusters unchanged.
Parameters
----------
clusters : numpy.ndarray
An array of cluster labels.
final_labels : list or None, optional
A list of final labels (as integers) to replace the original cluster labels, by default None.
Returns
-------
numpy.ndarray or list
An array of cluster labels sorted in order of appearance. If `final_labels` is not None, it will return a list of final labels with the same length as `clusters`.
Examples
--------
>>> clusters = np.array([2, 2, 2, 3, 3, 1, 1, 1])
>>> cluster_sort(clusters)
array([1, 1, 1, 2, 2, 3, 3, 3])
>>> final_labels = [4, 5, 6]
>>> cluster_sort(clusters, final_labels)
[4, 4, 4, 5, 5, 6, 6, 6]
"""
# give temporary negative values to labels
arr = -clusters
i = 1
for j, el in enumerate(arr):
if el >= 0: # already sorted
pass
else: # give new label
arr[arr == el] = i
i += 1
# if final_labels is a list, replace each element of the array with the corresponding label
if isinstance(final_labels, list):
arr = list(map(lambda i: final_labels[i - 1], arr))
# check that the clustering has not changed
assert adjusted_rand_score(clusters, arr) == 1
return arr