"""
Functions to visualize the results of temporal clusters.
"""
from copy import deepcopy
import matplotlib.pyplot as plt
import numpy as np
import scipy.cluster.hierarchy as sch
from sklearn.metrics import adjusted_rand_score
from phasik.utils.clusters import rand_index_over_methods_and_sizes
__all__ = [
"plot_randindex_bars_over_methods_and_sizes",
"plot_cluster_set",
"plot_cluster_sets",
"plot_dendrogram",
"plot_average_silhouettes",
"plot_ns_clusters",
"relabel_next_clusterset_sorted",
"relabel_clusters_sorted",
"relabel_clustersets",
"relabel_clustersets_from_dict",
]
[docs]def plot_randindex_bars_over_methods_and_sizes(
valid_cluster_sets, reference_method="ward", ax=None, plot_ref=False, **kwargs
):
"""
Plot Rand Index as bars, to compare any method to a reference method.
This compares all combinations of methods and number of clusters.
Parameters
----------
valid_cluster_sets : list
A list of tuples representing valid cluster sets.
Each tuple contains the ClusterSet and the clustering method name.
reference_method : str, optional
The reference method to compare other methods to. Defaults to "ward".
ax : matplotlib.axes.Axes, optional
The axes to plot the bars on. If not provided, the current axes will be used.
plot_ref : bool, optional
Determines whether to plot the reference method bars (will have height one). Defaults to False.
**kwargs :
Other parameters to pass to matpotlib's bar.
Returns
-------
matplotlib.axes.Axes
The axis object to draw on
Examples
--------
>>> import phasik as pk
>>> clustering_methods = ["k_means", "centroid","average", "ward"]
>>> valid_cluster_sets = []
>>> for clustering_method in clustering_methods:
>>> distance_matrix = pk.DistanceMatrix.from_temporal_network(
>>> temporal_network, "euclidean"
>>> )
>>> cluster_sets = pk.ClusterSets.from_distance_matrix(
>>> distance_matrix, "maxclust", range(2, 12), clustering_method
>>> )
>>> valid_cluster_sets.append((cluster_sets, clustering_method))
>>> pk.plot_randindex_bars_over_methods_and_sizes(valid_cluster_sets, reference_method="ward")
>>> ax.set_ylabel("Rand index")
>>> ax.set_xlabel("# clusters")
"""
if ax is None:
ax = plt.gca()
valid_methods = [sets[1] for sets in valid_cluster_sets]
i_ref = valid_methods.index(reference_method)
clusters_ref = valid_cluster_sets[i_ref][0]
rand_index = rand_index_over_methods_and_sizes(valid_cluster_sets, reference_method)
n_sizes, n_methods = rand_index.shape
if not plot_ref:
n_methods -= 1
width = 1 # bar width
width_size = n_methods * width # width of all bars for a given # of clusters
width_inter_size = 4 * width # width space between two # of clusters
xlabels = clusters_ref.n_clusters
xticks = np.arange(n_sizes) * (width_size + width_inter_size) # the label locations
for i, method in enumerate(valid_methods):
heights = rand_index[:, i]
if not plot_ref and i == i_ref:
pass
else: # don't plot i_ref if plot_ref is False
ax.bar(
xticks + i * width - width_size / 2,
heights,
width,
label=method,
**kwargs,
)
ax.set_xticks(xticks)
ax.set_xticklabels(xlabels)
return ax
[docs]def plot_cluster_set(
cluster_set,
colors=None,
cmap="tab10",
vmin=None,
vmax=None,
y_height=0,
ax=None,
**kwargs,
):
"""
Visualize the clusters in `cluster_set`.
For each time point, a marker is drawn with a color corresponding to
the cluster to which it belongs.
Parameters
----------
cluster_set : ClusterSet
ClusterSet object
colors: list of int, optional
If None (default), cluster label 0 is assigned its automatic color "C0"
and so on. If `colors` is a list (e.g. [3,1,2]), it relabels the clusters in that order
and assigns them the new corresponding colors.
cmap : colormap, optional
Desired colormap (default 'tab10').
vmin/vmax : float, optional
Min and max values to use for the color mapping. If None (default), computed
from the data in `colors`.
y_height : int or float, optional
Vertical value at which to draw the markers (default 0). If a single cluster
is drawn this value does not matter.
ax : matplotlib.Axes, optional
Axes on which to plot
**kwargs :
Other parameters to pass to matplotlib's scatter.
Returns
-------
matplotlib.axes.Axes
The axis object to draw on
See Also
--------
plot_cluster_sets
plot_average_silhouettes
plot_ns_clusters
Examples
--------
>>> import phasik as pk
>>> distance_matrix = pk.DistanceMatrix.from_temporal_network(
>>> temporal_network, "euclidean"
>>> )
>>> cluster_set = pk.ClusterSet.from_distance_matrix(
>>> distance_matrix, "maxclust", 5, "ward"
>>> )
>>> pk.plot_cluster_set(cluster_set)
"""
if ax is None:
ax = plt.gca()
y = np.ones(len(cluster_set.times)) * y_height
if isinstance(colors, list):
clusters_plot = relabel_clusters_sorted(
cluster_set.clusters, final_labels=colors
)
else:
clusters_plot = cluster_set.clusters
# check that the clustering has not changed
assert adjusted_rand_score(clusters_plot, cluster_set.clusters) == 1
im = ax.scatter(
cluster_set.times,
y,
c=clusters_plot,
cmap=cmap,
vmin=vmin,
vmax=vmax,
**kwargs,
)
return ax
[docs]def plot_cluster_sets(
cluster_sets,
axs=None,
cmap="tab10",
vmin=None,
vmax=None,
coloring="consistent",
translation=None,
with_silhouettes=False,
with_n_clusters=False,
**kwargs,
):
"""Visualize the clusters in `cluster_sets`.
For each time point, a marker is drawn with a color corresponding to
the cluster to which it belongs. Clusterings for different numbers of clusters
are drawn at different heights on the vertical axis.
Parameters
----------
cluster_sets : phasik ClusterSets
ClusterSets object containing partitions to plot
axs : matplotlib.Axes, optional
Matplotlib axes on which to plot. If None (default), creates a single axis.
cmap : colormap, optional
Desired colormap (default 'tab10').
vmin/vmax : float, optional
Min and max values to use for the color mapping. If None (default), computed
from the data in `colors`.
coloring : {'ascending', 'consistent', None}, optional
The method to use to obtain consistent coloring across cluster sets.
See `relabel_clustersets` for details. By default, "consistent"
translation : dict, optional
If None (default), has no effect. Elsee, dictionary that determines which label
should be replaced by which other label
For example {1: 2, 2: 3, 3: 1}
It is applied after the order relabling from `method`.
with_silhouettes : bool, optional
Whether to draw the corresponding silhouette scores on a second axis.
See `plot_average_silhouettes` for details. Default: False.
with_n_clusters : bool, optional
Whether to draw the corresponding number of clusters on a third axis.
See `plot_ns_clusters` for details. Default: False.
Returns
-------
tuple of matplotlib.axes.Axes
The axis object to draw on
See Also
--------
plot_cluster_set
plot_average_silhouettes
plot_ns_clusters
Examples
--------
>>> import phasik as pk
>>> distance_matrix = pk.DistanceMatrix.from_temporal_network(
>>> temporal_network, "euclidean"
>>> )
>>> cluster_sets = pk.ClusterSets.from_distance_matrix(
>>> distance_matrix, "maxclust", range(2, 12), "ward"
>>> )
>>> pk.plot_cluster_sets(cluster_sets)
"""
if axs is None:
assert not with_silhouettes
assert not with_n_clusters
ax1 = plt.gca()
ax2, ax3 = None, None
else:
if with_silhouettes:
if with_n_clusters:
ax1, ax2, ax3 = axs
else:
ax1, ax2 = axs
ax3 = None
else:
if isinstance(axs, tuple):
ax1 = axs[0]
else:
ax1 = axs
ax2, ax3 = None, None
if coloring is not None:
cluster_sets = relabel_clustersets(
cluster_sets, method=coloring, translation=translation
)
for cluster_set in cluster_sets:
# (cmap, number_of_colors) = ('tab20', 20) if cluster_set.size > 10 else ('tab10', 10)
# replace by single colour palette with 20 colours such that first 10 colours are same as tab10
# cmap = palette_20_ordered(as_map=True)
plot_cluster_set(
cluster_set,
cmap=cmap,
vmin=vmin,
vmax=vmax,
y_height=cluster_set.n_max,
ax=ax1,
)
if with_silhouettes:
plot_average_silhouettes(cluster_sets, ax=ax2)
if with_n_clusters:
plot_ns_clusters(cluster_sets, ax=ax3)
if with_n_clusters:
return (ax1, ax2, ax3)
if with_silhouettes:
return (ax1, ax2)
if ax1 is not None:
return ax1
[docs]def plot_dendrogram(
cluster_set,
ax=None,
distance_threshold=None,
leaf_rotation=90,
leaf_font_size=6,
):
"""
Draw the results of hierarchical clustering as a dendrogram.
The particular clustering passed as argument is the result of
choosing a specific threshold in this dendrogram.
Parameters
----------
cluster_set : ClusterSet
Cluster set for which to draw a dendrogram
ax : matplotlib.Axes, optional
Axes on which to plot
distance_threshold : float, optional
Threshold at which to draw a horizontal line and above which
to use different colors for different branches.
leaf_rotation : int or float, optional
Rotation to apply to the x-axis (leaf) labels (default 90)
leaf_font_size : int or str, optional
Desired size of the x-axis (leaf) labels (default 6)
Returns
-------
matplotlib.axes.Axes
The axis object to draw on
Examples
--------
>>> import phasik as pk
>>> distance_matrix = pk.DistanceMatrix.from_temporal_network(
>>> temporal_network, "euclidean"
>>> )
>>> cluster_set = pk.ClusterSet.from_distance_matrix(
>>> distance_matrix, "maxclust", 5, "ward"
>>> )
>>> pk.plot_dendrogram(cluster_set)
"""
if ax is None:
ax = plt.gca()
if cluster_set.linkage is None:
raise ValueError(
"Cannot compute the threshold of a non-hierarchical clustering"
)
# Calculate the distance threshold at which clusters stop, so that below this threshold we plot the
# dendrogram in colour, while above it we plot in black.
if distance_threshold is None:
distance_threshold = cluster_set.distance_threshold()
sch.dendrogram(
cluster_set.linkage,
leaf_rotation=leaf_rotation,
leaf_font_size=leaf_font_size,
color_threshold=distance_threshold,
above_threshold_color="black",
ax=ax,
)
ax.axhline(y=distance_threshold, c="grey", ls="--", zorder=1)
return ax
[docs]def plot_average_silhouettes(
cluster_sets, ax=None, c="k", marker="o", ls="-", **kwargs
):
"""Draw the average silhouette score for each cluster set in `cluster_sets`.
The silhouette score is a measure of the quality of a clustering.
Parameters
----------
cluster_sets : ClusterSets
Cluster sets for which to draw the silhouette scores
ax : matplotlib.Axes, optional
Axes on which to plot
c : color, optional
Color to use for the curve. Default: black.
marker : str, optional
Markers to use for the curve. Default: "o".
ls : str, optional
Linestyle to use for the cruve. Default: "-".
**kwargs :
Other parameters to pass to matplotlib's plot.
Returns
-------
matplotlib.axes.Axes
The axis object to draw on
See Also
--------
plot_cluster_set
plot_cluster_sets
plot_ns_clusters
Examples
--------
>>> import phasik as pk
>>> distance_matrix = pk.DistanceMatrix.from_temporal_network(
>>> temporal_network, "euclidean"
>>> )
>>> cluster_sets = pk.ClusterSets.from_distance_matrix(
>>> distance_matrix, "maxclust", range(2, 12), "ward"
>>> )
>>> pk.plot_average_silhouettes(cluster_sets)
"""
if ax is None:
ax = plt.gca()
ax.plot(
cluster_sets.silhouettes_average,
cluster_sets.ns_max,
c=c,
marker=marker,
ls=ls,
**kwargs,
)
ax.set_xlabel("Average silhouette")
return ax
[docs]def plot_ns_clusters(cluster_sets, ax=None, c="k", marker="o", ls="-", **kwargs):
"""Plot the actual number of clusters against the requested number of clusters.
These numbers are plotted for each cluster set in `cluster_sets`.
Parameters
----------
cluster_sets : ClusterSets
Cluster sets information to plot
ax : matplotlib.Axes, optional
Axes on which to plot
c : color, optional
Color or the markers and line. Default: "k".
marker : string, optional
Marker to use, default: "o".
ls : str, optional
Style of the line. Default: "-".
**kwargs :
Other parameters to pass to matplotlib's plot.
Returns
-------
matplotlib.axes.Axes
The axis object to draw on
See Also
--------
plot_cluster_set
plot_cluster_sets
plot_average_silhouettes
Examples
--------
>>> import phasik as pk
>>> distance_matrix = pk.DistanceMatrix.from_temporal_network(
>>> temporal_network, "euclidean"
>>> )
>>> cluster_sets = pk.ClusterSets.from_distance_matrix(
>>> distance_matrix, "maxclust", range(2, 12), "ward"
>>> )
>>> pk.plot_ns_clusters(cluster_sets)
"""
if ax is None:
ax = plt.gca()
ax.plot(
cluster_sets.n_clusters,
cluster_sets.ns_max,
c=c,
marker=marker,
ls=ls,
**kwargs,
)
return ax
[docs]def relabel_clustersets_from_dict(cluster_sets, translation):
"""Relabels clusters in each cluster set, so that clusters are labeled according to the
translation dictionary
This is especially useful when plotting cluster sets, to have consistent colouring between
different figures with cluster sets.
Parameters
----------
cluster_sets : ClusterSets
translation : dict
Dictionary that determines which label should be replaced by which other label
For example {1: 2, 2: 3, 3: 1}
Returns
-------
cluster_sets_sorted: ClusterSets
See Also
--------
relabel_clustersets
relabel_clusters_sorted
relabel_next_clusterset_sorted
Examples
--------
>>> print(cluster_sets.clusters)
[[1 1 1 2 2 2]
[1 1 1 2 2 3]
[2 1 1 3 3 4]]
>>> translation = {1: 2, 2: 3, 3: 4, 4: 1}
>>> clustersets_new = pk.relabel_clustersets_from_dict(cluster_sets, translation)
>>> print(clustersets_new.clusters)
[[2 2 2 3 3 3]
[2 2 2 3 3 4]
[3 2 2 4 4 1]]
"""
if set(translation.keys()) != set(translation.values()):
raise ValueError(
"`translation` is not a valid dict: it does not preserve the original cluster structure."
)
cluster_sets_relabled = deepcopy(cluster_sets)
# swap label values in summary array
for k, v in translation.items():
cluster_sets_relabled.clusters[cluster_sets.clusters == k] = v
# swap label values in each clusterset
for i, clusters in enumerate(cluster_sets_relabled.clusters):
cluster_sets_relabled.clusters_sets[i].clusters = clusters
return cluster_sets_relabled
[docs]def relabel_clustersets(cluster_sets, method="consistent", translation=None):
"""Relabels clusters in each cluster set, for consistency across different numbers
of clusters.
This is especially useful when plotting cluster sets, to have consistent colouring.
This function iterates over the different partitions in the cluster set and relabels
them using `relabel_next_clusterset_sorted` or `relabel_clusters_sorted` depending
on the `method`.
Parameters
----------
cluster_sets : ClusterSets
method : {'consistent', 'ascending'}, optional
translation : dict, optional
If None (default), has no effect. Else, dictionary that determines which label
should be replaced by which other label
For example {1: 2, 2: 3, 3: 1}
It is applied after the order relabling from `method`.
Returns
-------
cluster_sets_sorted: ClusterSets
See Also
--------
relabel_clustersets_from_dict
relabel_clusters_sorted
relabel_next_clusterset_sorted
Examples
--------
>>> print(clusterset.clusters)
[[1 1 1 2 2 2]
[1 1 1 2 2 3]
[2 1 1 3 3 4]]
>>> clusterset_sorted = pk.cluster_sets, method="consistent")
>>> print(clusterset_sorted.clusters) # unchanged because consistent
[[1 1 1 2 2 2]
[1 1 1 2 2 3]
[4 1 1 2 2 3]]
>>> clusterset_sorted = pk.cluster_sets, method="ascending")
>>> print(clust_sorted.clusters)
[[1 1 1 2 2 2]
[1 1 1 2 2 3]
[1 2 2 3 3 4]]
"""
if method not in ["consistent", "ascending"]:
raise ValueError("Method should be one of ['consistent', 'ascending'].")
n = len(cluster_sets.n_clusters)
cluster_sets_sorted = deepcopy(cluster_sets)
if method == "ascending" or method == "consistent":
cluster_sets_sorted.clusters[0] = relabel_clusters_sorted(
cluster_sets_sorted.clusters[0]
)
cluster_sets_sorted[0].clusters = relabel_clusters_sorted(
cluster_sets_sorted.clusters[0]
)
# compute without modifying original
for i in range(n - 1):
if method == "consistent":
cluster_sets_sorted = relabel_next_clusterset_sorted(
cluster_sets, cluster_sets_sorted, i
)
elif method == "ascending":
cluster_sets_sorted.clusters[i + 1] = relabel_clusters_sorted(
cluster_sets_sorted.clusters[i + 1]
)
cluster_sets_sorted[i + 1].clusters = relabel_clusters_sorted(
cluster_sets_sorted.clusters[i + 1]
)
else:
raise KeyError("Unknown sorting method")
if translation is not None:
cluster_sets_sorted = relabel_clustersets_from_dict(
cluster_sets_sorted, translation
)
return cluster_sets_sorted
[docs]def relabel_clusters_sorted(clusters, final_labels=None):
"""Returns array of cluster labels sorted in order of appearance, with clusters unchanged
Parameters
----------
clusters : array of int
Cluster labels
final_labels : array of int
Cluster labels in expected order (has size of the number of clusters)
Returns
-------
arr : np.ndarray
Resulting clusters
See Also
--------
relabel_clustersets_from_dict
relabel_clustersets
relabel_clusters_sorted
Examples
--------
>>> clusters = np.array([2, 2, 2, 3, 3, 1, 1, 1])
>>> relabel_clusters_sorted(clusters)
[ 1 1 1 2 2 3 3 3 ]
"""
clusters = np.array(clusters)
arr = -clusters
i = 1
for j, el in enumerate(arr):
if el >= 0:
pass
else:
arr[arr == el] = i
i += 1
if final_labels is not None:
if len(set(clusters)) != len(set(final_labels)):
raise ValueError("The length of final_labels must the number of clusters")
if isinstance(final_labels, list):
arr = list(map(lambda k: final_labels[k - 1], arr))
# check that the clustering has not changed
assert adjusted_rand_score(clusters, arr) == 1
return np.array(arr)
[docs]def relabel_next_clusterset_sorted(cluster_sets, cluster_sets_sorted, i):
"""Relabels the clusters in i+1-th cluster set so that it is consistent with i-th cluster set.
This is especially useful when plotting cluster sets, to have consistent colouring.
Parameters
----------
cluster_sets : ClusterSets
Original cluster sets
cluster_sets_sorted : ClusterSets
Cluster sets being sorted, already sorted up to i-1
i : int
Index of reference cluster set
Returns
-------
cluster_sets_sorted : ClusterSets
See Also
--------
relabel_clustersets_from_dict
relabel_clustersets
relabel_clusters_sorted
Examples
--------
>>> print(clusterset.clusters)
[[1 1 1 2 2 2]
[1 1 1 2 2 3]
[2 1 1 3 3 4]]
>>> clusterset_sorted = deepcopy(clusterset)
>>> pk.relabel_next_clusterset_sorted(clust, clust_sorted, 0)
>>> print(clusterset_sorted.clusters) # unchanged because consistent
[[1 1 1 2 2 2]
[1 1 1 2 2 3]
[2 1 1 3 3 4]]
>>> pk.relabel_next_clusterset_sorted(clust, clust_sorted, 1)
>>> print(clust_sorted.clusters)
[[1 1 1 2 2 2]
[1 1 1 2 2 3]
[4 1 1 2 2 3]]
# note that the clusters at index 2 were relabeled
"""
# first we need the original clusters
# to determine which cluster was split going from i to i+1 clusters
clusters_ref = cluster_sets.clusters[i] # i clusters
clusters_up = cluster_sets.clusters[i + 1] # i+1 clusters
n_ref = cluster_sets.n_clusters[i]
n_up = cluster_sets.n_clusters[i + 1]
# those labels that changed between ref and up
diff = clusters_ref[clusters_ref != clusters_up]
if diff.size == 0: # empty array, no difference between i and i+1
# print("pass, empty array")
pass
else: # otherwise, sort
# label of reference cluster that was split in up
label_split = min(diff)
# size of cluster before splitting (in ref)
len_ref = len(clusters_ref[clusters_ref == label_split])
# size of cluster after splitting (in up)
len_up = len(clusters_up[clusters_up == label_split])
# the cluster is split into two clusters: they have labels label_split and label_split+1.
# we keep the same colour for the bigger of the two, i.e. we assign it label label_split
# the smaller one is assigned the new colour, i.e. label n_up
# we need to shift the other labels accordingly
clusters_ref_sorted = cluster_sets_sorted.clusters[i]
clusters_up_sorted = cluster_sets_sorted.clusters[i + 1]
n_diff = n_up - n_ref # number of additional clusters between i and i+1
if n_diff == 1:
if (
len_up >= len_ref / 2
): # split cluster with old label is bigger than new label: old label stays unchanged
clusters_up_sorted[
clusters_up == label_split + 1
] = -1 # flag new cluster
unchanged = clusters_up_sorted != -1
clusters_up_sorted[unchanged] = clusters_ref_sorted[unchanged]
clusters_up_sorted[
clusters_up_sorted == -1
] = n_up # assign new colour to new cluster
else:
clusters_up_sorted[clusters_up == label_split] = -1 # flag old cluster
unchanged = clusters_up_sorted != -1
clusters_up_sorted[unchanged] = clusters_ref_sorted[unchanged]
clusters_up_sorted[
clusters_up_sorted == -1
] = n_up # assign new colour to old cluster
else: # more than 1, then cluster is split into labels label_split, label_split+1, label_split+2, ...
lens_new = [
len(clusters_up[clusters_up == label_split + j])
for j in range(n_diff + 1)
]
j_max = np.argmax(lens_new) - 1
if (
j_max == -1
): # split cluster with old label is bigger than new label: old label stays unchanged
for j in range(n_diff):
clusters_up_sorted[clusters_up == label_split + 1 + j] = (
-1 - j
) # flag new cluster
unchanged = clusters_up_sorted > 0
clusters_up_sorted[unchanged] = clusters_ref_sorted[unchanged]
for j in range(n_diff):
clusters_up_sorted[clusters_up_sorted == -1 - j] = (
n_up - n_diff + 1 + j
) # assign new colour to new cluster
else: # swap old cluster label_split with j_max
clusters_up_sorted[
clusters_up == label_split
] = -label_split # flag old cluster
for j in range(n_diff):
clusters_up_sorted[clusters_up == label_split + 1 + j] = (
-label_split - 1 - j
) # flag new clusters
unchanged = clusters_up_sorted > 0
clusters_up_sorted[unchanged] = clusters_ref_sorted[unchanged]
clusters_up_sorted[
clusters_up_sorted == -label_split - 1 - j_max
] = label_split
for j in range(n_diff):
if j != j_max:
clusters_up_sorted[
clusters_up_sorted == -label_split - 1 - j
] = (
n_up - n_diff + 1 + j
) # assign new colour to new cluster
clusters_up_sorted[clusters_up_sorted == -label_split] = (
n_up - n_diff + 1 + j_max
) # assign new colour to old cluster
# update clusters also in cluster_set instance
cluster_sets_sorted[i + 1].clusters = clusters_up_sorted
# check that the clustering has not changed
assert adjusted_rand_score(clusters_up_sorted, clusters_up) == 1
return cluster_sets_sorted