"""
Base class for temporal networks
"""
from copy import deepcopy
import networkx as nx
import numpy as np
import pandas as pd
import phasik as pk
from phasik.utils.convert import (
convert_edge_timeseries_to_tedges,
convert_node_to_edge_timeseries,
)
__all__ = ["TemporalNetwork", "_process_input_tedges"]
[docs]class TemporalNetwork:
"""Base class for temporal networks
Temporal networks are networks with time-varying edges. They consist of nodes
and edges, and latter can have time-varying weights.
Attributes
----------
nodes : list of (str or int)
Sorted list of node names. Node names can be either strings or integers,
but they all need to be of the same type.
times : list of (int or float)
Sorted list of times for which we have temporal information
tedges : pandas.DataFrame
Dataframe containing tedges, also called timestamped data (potentially weighted).
Columns are ['i', 'j', 't', ('weight')] and each row represents a tedge.
snapshots : numpy array
Array of shape (T, N, N) storing the instantaneous values of
the adjacency matrix A_{ij}(t).
"""
[docs] def __init__(self):
self._nodes = []
self._times = []
self._tedges = pd.DataFrame()
self._snapshots = np.zeros((1,))
@property
def nodes(self):
"""Returns a list of nodes in the TemporalNetwork"""
return self._nodes
@property
def times(self):
"""Returns a list of times in the TemporalNetwork"""
return self._times
@property
def tedges(self):
"""Returns a DataFrame of tedges the TemporalNetwork"""
return self._tedges
@property
def snapshots(self):
"""Returns a numpy array of snapshots in the TemporalNetwork"""
return self._snapshots
def __len__(self):
"""Returns the number of nodes in the TemporalNetwork"""
return len(self._nodes)
def __str__(self):
"""Returns summary information about the TemporalNetwork"""
return f"{type(self)} with {self.N()} nodes and {self.T()} times"
def __iter__(self):
"""Returns an iterable over nodes"""
return iter(self._nodes)
def __contains__(self, node):
"""Returns True if node is a node, False otherwise. Use as 'node in TN'."""
try:
return node in self._nodes
except TypeError:
return False
[docs] def N(self):
"""Returns the number of nodes"""
return len(self._nodes)
[docs] def T(self):
"""Returns the number of times"""
return len(self._times)
[docs] def shape(self):
"""Returns the shape (N,T) of the TemporalNetwork"""
return self.N(), self.T()
[docs] def number_of_edges(self):
"""Returns the number of edges in the aggregated network"""
return len(self.edges_aggregated())
[docs] def is_weighted(self):
"""Returns True if tedges are weighted"""
return "weight" in self._tedges.columns
[docs] def has_node(self, node):
"""Returns True if node is in the TemporalNetwork"""
return node in self._nodes
[docs] def has_time(self, time):
"""Returns True if time is in the TemporalNetwork"""
return time in self._times
[docs] def has_tedge(self, tedge):
"""Returns True if tedge is in the TemporalNetwork, regardless of its weight"""
if len(tedge) == 3:
u, v, t = tedge
elif len(tedge) == 4:
u, v, t, _ = tedge
else:
raise Exception("Tedge must be of length 3 or 4")
# /!\ tedge must be sorted, only checking one orientation
return (
(self._tedges["i"] == u)
& (self._tedges["j"] == v)
& (self._tedges["t"] == t)
)
def _add_nodes(self, nodes_to_add):
"""Add multiple nodes to the TemporalNetwork, without updating it further
Parameters
----------
nodes_to_add : list of str or int
List of nodes to add
Returns
-------
None
"""
self._nodes += nodes_to_add
self._nodes = sorted(set(self._nodes))
def _add_times(self, times_to_add):
"""Add multiple times to the TemporalNetwork, without updating it further
Parameters
----------
nodes_to_add : list of str or int
List of nodes to add
Returns
-------
"""
self._times += times_to_add
self._times = sorted(set(self._times))
[docs] def add_tedges(self, tedges_to_add):
"""Adds multiple tedges (optionally weighted)
Parameters
----------
tedges_to_add : DataFrame or list of tuples
Returns
-------
None
"""
tedges_to_add = _process_input_tedges(tedges_to_add)
self._tedges = pd.concat([self._tedges, pd.DataFrame(tedges_to_add)])
self._tedges = self._tedges.drop_duplicates(subset=["i", "j", "t"], keep="last")
# sort?
# update nodes and times first to update their indices
times_to_add = tedges_to_add["t"].to_list()
nodes_to_add = tedges_to_add[["i", "j"]].to_numpy().flatten().tolist()
self._add_nodes(nodes_to_add)
self._add_times(times_to_add)
# update snapshots
self._compute_snapshots() # computes them from scratch
def _compute_snapshots(self):
"""Computes the snapshots from scratch from the tedges"""
snapshots = np.zeros((self.T(), self.N(), self.N()))
for row in self._tedges.itertuples(index=False):
tedge = (row.i, row.j, row.t)
if "weight" in self._tedges.columns:
weight = row.weight
else:
weight = 1
i, j, i_t = self._tedge_to_indices(tedge)
snapshots[i_t, i, j] = weight
snapshots[i_t, j, i] = weight # undirected edges
self._snapshots = snapshots
def _node_index(self, node):
"""Returns the index of node 'node'."""
return self._nodes.index(node)
def _time_index(self, time):
"""Returns the index of time 'time'."""
return self._times.index(time)
def _tedge_to_indices(self, tedge):
"""Returns the indices of the nodes and time in 'tedge'."""
u, v, t = tedge[:3] # discard potential weight
i = self._node_index(u)
j = self._node_index(v)
i_t = self._time_index(t)
return i, j, i_t
def _edge_to_indices(self, edge):
"""Returns the indices of the nodes in 'edge'."""
u, v = edge
i = self._node_index(u)
j = self._node_index(v)
return i, j
[docs] def neighbors(self):
"""
Returns a dictionary of neighboring nodes in the aggregate network.
Returns
-------
dict
A dictionary where keys represent the nodes in the aggregate network, and values are lists of neighboring nodes.
Notes
-----
This function relies on the `aggregated_network` method to obtain the aggregate network graph.
"""
G_agg = self.aggregated_network()
# get rid of weight dicts
return {
node: list(values.keys())
for node, values in dict(G_agg.adjacency()).items()
}
[docs] def edge_timeseries(self, edges=None):
"""Returns dict of edge time series. Keys are edge names and values are timeseries
Parameters
----------
edges : list of tuples
List of edges wanted, e.g. [('A, B')]. If None (default), all edges in the temporal network are used.
Returns
-------
all_series : dict
Dictionary with edge names as keys (as 'A-B'), and timeseries as values.
"""
if edges is None:
if isinstance(self, pk.PartiallyTemporalNetwork):
edges = self.temporal_edges
else:
edges = self.edges_aggregated()
all_series = {}
for edge in edges:
u, v = edge
i, j = self._edge_to_indices(edge)
edge_name = "-".join(edge)
all_series[edge_name] = self._snapshots[:, i, j]
return all_series
[docs] def tedges_of_edge(self, edge, return_mask=True, reverse=False):
"""Returns a filtered DataFrame containing only the tedges of edge 'edge'.
Optionally, return the boolean mask to filter the original DataFrame.
Parameters
----------
edge : tuple
Edge used to filter tedges
return_mask : bool, optional
If True (default), return boolean mask to filter the original DataFrame
reverse : bool, optional
If True, return the Dataframe obtained by filtered with logically opposite mask,
i.e. all tedges except those of edge 'edge'.
Returns
-------
None
"""
edge = tuple(sorted(edge))
u, v = edge
if edge not in self.edges_aggregated():
raise ValueError(f"Edge {edge} not an edge of the temporal network")
mask = (self._tedges["i"] == u) & (self._tedges["j"] == v)
if reverse:
mask = np.logical_not(mask)
if return_mask:
return self._tedges[mask], mask
else:
return self._tedges[mask]
[docs] def tedges_of_node(self, node, return_mask=True, reverse=False):
"""Returns a filtered DataFrame containing only the tedges of node 'node'.
Optionally, return the boolean mask to filter the original DataFrame.
Parameters
----------
node : str or int
Node used to filter tedges
return_mask : bool, optional
If True (default), return boolean mask to filter the original DataFrame
reverse : bool, optional
If True, return the Dataframe obtained by filtered with logically opposite mask,
i.e. all tedges except those of node 'node'.
Returns
-------
"""
if not self.has_node(node):
raise ValueError(f"Node {node} not a node of the temporal network")
mask = (self._tedges["i"] == node) | (self._tedges["j"] == node)
if reverse:
mask = np.logical_not(mask)
if return_mask:
return self._tedges[mask], mask
else:
return self._tedges[mask]
[docs] def aggregated_network(self, time_indices=None, output="weighted"):
"""Returns a time-aggregated network as a networkx.Graph
Parameters
----------
time_indices : list of int, optional
Indices of times over which to aggregate the network (default: all times).
output : {'weighted', 'averaged', 'binary', 'normalised'}, optional
Determines the type of output edge weights
Returns
-------
G_agg : networkx Graph
Aggregated network
"""
if time_indices is None:
time_indices = range(self.T())
adj_aggregated = self._snapshots[time_indices].sum(axis=0)
n_t = len(time_indices)
if output == "weighted":
pass
elif output == "averaged":
adj_aggregated /= n_t
elif output == "binary":
tol = 1e-3
adj_aggregated[adj_aggregated > tol] = 1
elif output == "normalised":
adj_aggregated /= np.max(adj_aggregated)
G_agg = nx.Graph(adj_aggregated)
G_agg = nx.relabel_nodes(G_agg, {i: node for i, node in enumerate(self._nodes)})
return G_agg
def network_at_time(self, time_index, output="weighted"):
"""Returns the temporal network at time 'time' as a networkx.Graph
Parameters
----------
time_index : int
Time index at which we want the temporal network
output : {'weighted', 'averaged', 'binary', 'normalised'}, optional
Determines the type of output edge weights
Returns
-------
networkx Graph
Network at time 'time'
"""
return self.aggregated_network(time_indices=[time_index], output=output)
[docs] def edges_aggregated(self):
"""Returns a list of edges in the aggregated network
Parameters
----------
None
Returns
-------
list of tuples
"""
# note : some class methods build complete networks, in which case all edges will be included
G_agg = self.aggregated_network()
return list(G_agg.edges)
[docs] def to_partially_temporal(self):
"""Returns a copy of the temporal network as a PartiallyTemporalNetwork"""
tedges = deepcopy(self._tedges)
return pk.PartiallyTemporalNetwork.from_tedges(tedges)
[docs] def discard_temporal_info_from_edge(self, edge, default_weight=1, reverse=False):
"""Discards temporal information from 'edge' by setting its weight to a constant
Returns a copy of the temporal network with the new edge weights
Parameters
----------
edge : tuple of int or str
Edge from which to discard temporal information
default_weight : float, optional
Value used for the edges with no temporal information
reverse : bool, optional
If True, discard temporal info from all edges except 'edge'.
Returns
-------
TN_modified : TemporalNetwork
"""
# after discarding temporal information, we need to have a PartiallyTemporalNetwork
if isinstance(self, pk.PartiallyTemporalNetwork):
TN_modified = deepcopy(self)
else:
TN_modified = self.to_partially_temporal()
if edge not in TN_modified.edges_aggregated():
raise ValueError(f"Edge {edge} not an edge in the temporal network.")
elif edge not in TN_modified.temporal_edges:
raise ValueError(f"Edge {edge} not a temporal edge.")
# udpate tedges
_, mask = TN_modified.tedges_of_edge(edge, reverse=reverse)
TN_modified._tedges.loc[mask, "weight"] = default_weight
# update snapshots
if not reverse:
i, j = TN_modified._edge_to_indices(edge) # one edge to modify
TN_modified.snapshots[:, i, j] = default_weight
TN_modified.snapshots[:, j, i] = default_weight
else:
for edge_to_modify in self.temporal_edges:
if edge_to_modify != edge:
i, j = TN_modified._edge_to_indices(
edge_to_modify
) # one edge to modify
TN_modified.snapshots[:, i, j] = default_weight
TN_modified.snapshots[:, j, i] = default_weight
# update temporal nodes and edges
if reverse:
TN_modified.temporal_nodes = sorted(edge)
TN_modified.temporal_edges = [edge]
else:
TN_modified.temporal_edges.remove(edge)
temporal_nodes = np.unique(TN_modified.temporal_edges)
TN_modified.temporal_nodes = list(temporal_nodes)
return TN_modified
[docs] def discard_temporal_info_from_node(self, node, default_weight=1, reverse=False):
"""Discards temporal information from 'node' by setting the weight of its edges to a constant
Returns a copy of the temporal network with the new edge weights
Parameters
----------
node : int or str
Node from which to discard temporal information
default_weight : float, optional
Value used for the edges with no temporal information
reverse : bool, optional
If True, discard temporal info from all nodes except 'node'.
Returns
-------
TN_modified : TemporalNetwork
"""
# after discarding temporal information, we need to have a PartiallyTemporalNetwork
if isinstance(self, pk.PartiallyTemporalNetwork):
TN_modified = deepcopy(self)
else:
TN_modified = self.to_partially_temporal()
if not TN_modified.has_node(node):
raise ValueError(f"Node {node} not an node in the temporal network.")
elif node not in TN_modified.temporal_nodes:
raise ValueError(f"Node {node} not a temporal node.")
# update tedges
_, mask = TN_modified.tedges_of_node(node, reverse=reverse)
TN_modified._tedges.loc[mask, "weight"] = default_weight
temporal_edges_selected = [edge for edge in self.temporal_edges if node in edge]
# update snapshots
if not reverse:
for edge_to_modify in temporal_edges_selected:
i, j = TN_modified._edge_to_indices(
edge_to_modify
) # one edge to modify
TN_modified.snapshots[:, i, j] = default_weight
TN_modified.snapshots[:, j, i] = default_weight
else:
TN_modified._compute_snapshots()
# update temporal nodes and edges
if reverse:
TN_modified.temporal_nodes = [node]
TN_modified.temporal_edges = temporal_edges_selected
else:
TN_modified.temporal_nodes.remove(node)
TN_modified.temporal_edges = [
edge
for edge in TN_modified.temporal_edges
if edge not in temporal_edges_selected
]
return TN_modified
[docs] @classmethod
def from_tedges(cls, tedges, normalise=None):
"""Creates a TemporalNetwork from a dataframe of tedges
Parameters
----------
tedges : pandas.DataFrame or list of tuples
List of tedges with 'i', 'j', 't', and optionally 'weight'
If DataFrame, these are the name of the columns, and each row contains a tedge
normalise : {'max', 'minmax', "standardise", None}
Choice of normalsation of the edge timeseries
Returns
-------
TN : TemporalNetwork
"""
tedges = _process_input_tedges(tedges)
if normalise:
if "weight" not in tedges.columns:
raise ValueError(
"Cannot normalise weights because edges are unweighted"
)
if normalise is None:
pass
elif normalise == "max":
grouped = tedges.groupby(["i", "j"])["weight"]
maxes = grouped.transform("max")
tedges["weight"] = tedges["weight"] / maxes
tedges["weight"] = tedges["weight"].fillna(1)
elif normalise == "minmax":
grouped = tedges.groupby(["i", "j"])["weight"]
maxes = grouped.transform("max")
mins = grouped.transform("min")
tedges["weight"] = (tedges["weight"] - mins) / (maxes - mins)
# In cases where max = min we'll have a division by zero error.
tedges["weight"] = tedges["weight"].fillna(0.5)
elif normalise == "standardise":
grouped = tedges.groupby(["i", "j"])["weight"]
stds = grouped.transform("std")
avgs = grouped.transform("mean")
tedges["weight"] = (tedges["weight"] - avgs) / stds
# In cases where cst we'll have a division by zero error.
tedges["weight"] = tedges["weight"].fillna(0)
else:
raise ValueError("Unknown value for 'normalise'")
TN = cls()
TN.add_tedges(tedges)
return TN
[docs] @classmethod
def from_edge_timeseries(cls, edge_timeseries, normalise="max"):
"""Creates a TemporalNetwork from a DataFrame of edge timeseries
All edges in the network are those of the timeseries, and nodes are extracted from edge names
Parameters
----------
edge_timeseries : pandas.DataFrame
Dataframe where each row is a timeseries, with index as edge names and columns as times
normalise : {'max', 'minmax', "standardise", None}
Choice of normalsation of the edge timeseries
Returns
-------
TemporalNetwork
"""
tedges = convert_edge_timeseries_to_tedges(edge_timeseries)
return cls.from_tedges(tedges, normalise)
[docs] @classmethod
def from_node_timeseries(cls, node_timeseries, normalise="max"):
"""Creates a temporal network by combining node timeseries into edge timeseries.
By construction, the underlying static network created is always fully connected.
Parameters
----------
node_timeseries : pandas.DataFrame
Timeseries of nodes, indexed by node name and times as columns
normalise : {'max', 'minmax', "standardise", None}
Choice of normalsation of the edge timeseries
Returns
-------
TemporalNetwork
"""
edge_series = convert_node_to_edge_timeseries(node_timeseries)
return cls.from_edge_timeseries(edge_series, normalise=normalise)
[docs] @classmethod
def from_static_network_and_tedges(
cls,
static_network,
tedges,
static_edge_default_weight=None,
normalise="max",
quiet=True,
):
"""Creates a temporal network by combining a static network with tedges
If all edges of the static network are represented in the tedges, create a temporal network
by setting time-varying edge weights from the tedges.
Raises an Exception if not all edges have temporal information.
Parameters
----------
static_network : networkx.Graph
Static network into which to integrate the temporal information
tedges : pandas.DataFrame or list of tuples
Tedges must be of the form (i, j, t, weight)
static_edge_default_weight : float
Weight to use for edges without temporal information
normalise : {'max', 'minmax', "standardise", None}
Choice of normalsation of the edge timeseries
quiet : bool
If True (default), print minimum informative messages
Returns
-------
"""
tedges = _process_input_tedges(tedges)
if "weight" not in tedges.columns:
tedges["weight"] = 1 # add column with weight 1
# convert static network's edges to DataFrame
static_network_edges = pd.DataFrame(static_network.edges)
static_network_edges.columns = ["static_i", "static_j"]
# sort nodes in each row, for undirected edges
static_network_edges[["static_i", "static_j"]] = np.sort(
static_network_edges[["static_i", "static_j"]], axis=1
)
tedges[["i", "j"]] = np.sort(tedges[["i", "j"]], axis=1)
# check that all static network edges have temporal info
edges_aggregated = set(tedges[["i", "j"]].itertuples(index=False, name=None))
static_network_edges_set = set(
static_network_edges[["static_i", "static_j"]].itertuples(
index=False, name=None
)
)
# missing_edges = set(static_network.edges).difference(edges_aggregated)
missing_edges = static_network_edges_set.difference(edges_aggregated)
if missing_edges == set():
tedges_merged = pd.merge(
static_network_edges,
tedges,
how="left",
left_on=["static_i", "static_j"],
right_on=["i", "j"],
)
tedges_merged = tedges_merged.drop(columns=["static_i", "static_j"])
return cls.from_tedges(tedges_merged, normalise=normalise)
else: # create a PartiallyTemporalNetwork
print(
f"WARNING: {len(missing_edges)}/{len(static_network_edges)} edges "
f"in the static network have no temporal information. \n"
f"A PartiallyTemporalNetwork is created instead."
)
if not quiet:
print("Edges with no temporal information:")
print(missing_edges)
return pk.PartiallyTemporalNetwork.from_static_network_and_tedges(
static_network,
tedges,
static_edge_default_weight,
normalise=normalise,
)
[docs] @classmethod
def from_static_network_and_edge_timeseries(
cls,
static_network,
edge_timeseries,
static_edge_default_weight=None,
normalise=None,
quiet=False,
):
"""Creates a temporal network by combining a static network with edge timeseries
If all edges of the static network are represented in the timeseries, create a temporal network
by setting time-varying edge weights from the tedges.
If not all edges of the static network, creates a partially temporal network.
Parameters
----------
static_network : nx.Graph
Static network into which to integrate the temporal information
edge_timeseries : Dataframe
Dataframe with indexed (rows) by edge names (formatted as 'A-B') and
with columns as times. Entries of the Dataframe represent the weight of
that edge at that time.
static_edge_default_weight : float
Weight to use for edges without temporal information
normalise : {'max', 'minmax', "standardise", None}
Choice of normalsation of the edge timeseries
quiet : bool
If True (default), print minimum informative messages
Returns
-------
TemporalNetwork
"""
tedges = convert_edge_timeseries_to_tedges(edge_timeseries)
return cls.from_static_network_and_tedges(
static_network,
tedges,
static_edge_default_weight,
normalise,
quiet,
)
[docs] @classmethod
def from_static_network_and_node_timeseries(
cls,
static_network,
node_timeseries,
combine_node_weights=lambda x, y: x * y,
static_edge_default_weight=None,
normalise=None,
quiet=False,
):
"""Creates a temporal network by combining a static network with node timeseries
Edge time series are generated for the subset of edges in the 'static_network'
that have both nodes in the 'node_timeseries', by combining their time series.
These edge times series are used to set the time-varying weights of the corresponding
edges in the temporal network.
If not all edges have temporal information, creates a partially temporal network.
Parameters
----------
static_network : nx.Graph
Static network into which to integrate the temporal information
node_timeseries : Dataframe
Dataframe with indexed (rows) by node names and
with columns as times. Entries of the Dataframe represent the value of
that node at that time.
combine_node_weights : function
Function that determines how two node timeseries are combined to generate
and edge timeseries. By default, the two node timeseries are multiplied.
static_edge_default_weight : float
Weight to use for edges without temporal information
normalise : {'max', 'minmax', "standardise", None}
Choice of normalsation of the edge timeseries
quiet : bool
If True (default), print minimum informative messages
Returns
-------
TemporalNetwork
"""
# only keep node timeseries from nodes that are in the static network
nodes_static = list(static_network.nodes)
nodes_temporal_all = list(node_timeseries.index)
nodes_temporal = [node for node in nodes_temporal_all if node in nodes_static]
node_series_in_network = node_timeseries[
node_timeseries.index.isin(nodes_temporal)
]
# combine node timeseries to obtain edge timeseries, only edges that a present in the static network
edge_series = convert_node_to_edge_timeseries(
node_series_in_network,
combine_node_weights,
static_edges=list(static_network.edges),
)
return cls.from_static_network_and_edge_timeseries(
static_network,
edge_series,
static_edge_default_weight,
normalise,
quiet,
)
def _process_input_tedges(tedges):
"""Check that input is valid and convert to DataFrame if needed
Parameters
----------
tedges : (list of tuples) or Dataframe
A list of (optionally weighted) tedges (i, j, t, weight) as tuples or in a Dataframe
Returns
-------
tedges : pandas DataFrame
"""
if isinstance(tedges, list): # convert to DataFrame
if all([isinstance(tedge, tuple) for tedge in tedges]):
if all([len(tedge) == 3 for tedge in tedges]):
columns = ["i", "j", "t"]
tedges_df = pd.DataFrame(data=tedges, columns=columns)
elif all([len(tedge) == 4 for tedge in tedges]):
columns = ["i", "j", "t", "weight"]
tedges_df = pd.DataFrame(data=tedges, columns=columns)
else:
raise ValueError(
"Tedges in list must have length 3 (i, j, t) or 4 (i, j, t, weight)"
)
else:
raise TypeError("All tedges in list should be tuples")
elif isinstance(tedges, pd.DataFrame): # check columns
tedges_df = tedges
if (list(tedges_df.columns) == ["i", "j", "t"]) or (
list(tedges_df.columns) == ["i", "j", "t", "weight"]
):
pass
else:
print(tedges_df)
raise ValueError(
"Tedge dataframe must have columns (i, j, t) or (i, j, t, weight)"
)
else:
raise TypeError(
"Invalid type of input tedges: should be a list of tuples or a DataFrame"
)
# remove self-edges
tedges_df = tedges_df[tedges_df["i"] != tedges_df["j"]]
tedges_df = tedges_df.sort_values(by=["i", "j", "t"])
return tedges_df