import community.community_louvain as community_louvain
import networkx as nx
import numpy as np
from sklearn import preprocessing, metrics
dcp = 3 # decimal precision
[docs]
def ARI(G, clustering, clustering_label):
"""
Compute the Adjust Rand Index (clustering accuracy) of "clustering" with "clustering_label" as ground truth.
:param G: A graph with node attribute "clustering_label" as ground truth.
:type G: networkx.Graph
:param clustering: Predicted community clustering.
:type clustering: dict, list, or list of sets
:param clustering_label: Node attribute name for ground truth.
:type clustering_label: str
:returns: Adjust Rand Index for predicted community.
:rtype: float
"""
complex_list = nx.get_node_attributes(G, clustering_label)
le = preprocessing.LabelEncoder()
y_true = le.fit_transform(list(complex_list.values()))
if isinstance(clustering, dict):
# python-louvain partition format
y_pred = np.array([clustering[v] for v in complex_list.keys()])
elif isinstance(clustering[0], set):
# networkx partition format
predict_dict = {c: idx for idx, comp in enumerate(clustering) for c in comp}
y_pred = np.array([predict_dict[v] for v in complex_list.keys()])
elif isinstance(clustering, list):
# sklearn partition format
y_pred = clustering
else:
print("\nINVALID CLUSTERING TYPE, couldn't compute ARI successfully.\n")
return -1
return metrics.adjusted_rand_score(y_true, y_pred)
[docs]
def check_accuracy(
G_origin, weight="weight", clustering_label="community", eval_cut=False
):
"""
Evaluate the clustering quality while cutting edges with a given weight using different thresholds.
This function iteratively removes edges based on a weight threshold (Ricci flow metric) and evaluates
the clustering results using modularity and Adjusted Rand Index (ARI). If `eval_cut` is enabled,
it also estimates a "good" cut threshold by detecting significant drops in modularity.
:param G_origin: A graph with ``weight`` as the Ricci flow metric used for edge removal.
:type G_origin: networkx.Graph
:param weight: The edge weight attribute used as the Ricci flow metric. Defaults to ``"weight"``.
:type weight: str
:param clustering_label: Node attribute name for ground truth communities. Defaults to ``"community"``.
:type clustering_label: str
:param eval_cut: Whether to compute an estimated optimal cut threshold based on modularity drops. Defaults to ``False``.
:type eval_cut: bool
:returns: A tuple containing:
- **maxw** (*float*): Maximum edge weight value in the graph.
- **cutoff_range** (*numpy.ndarray*): Array of tested cutoff values for edge removal.
- **modularity** (*list[float]*): Modularity values at each cutoff.
- **ari** (*list[float]*): ARI values at each cutoff.
- (**Optional**) **good_cut** (*float*): Estimated best cutoff based on modularity drop (only if ``eval_cut=True``).
- (**Optional**) **best_ari** (*float*): Highest ARI value achieved (only if ``eval_cut=True``).
- (**Optional**) **best_mod** (*float*): Modularity value at best_ari (only if ``eval_cut=True``).
:rtype: tuple
- If ``eval_cut=False`` → ``(float, numpy.ndarray, list[float], list[float])``
- If ``eval_cut=True`` → ``(float, numpy.ndarray, list[float], list[float], float, float, float)``
"""
G = G_origin.copy()
modularity, ari = [], []
maxw = max(nx.get_edge_attributes(G, weight).values())
cutoff_range = np.arange(maxw, 1, -0.025)
best_ari = 0
best_cutoff = 0
for cutoff in cutoff_range:
edge_trim_list = []
for n1, n2 in G.edges():
if G[n1][n2][weight] > cutoff:
edge_trim_list.append((n1, n2))
G.remove_edges_from(edge_trim_list)
# Get connected component after cut as clustering
clustering = {
c: idx for idx, comp in enumerate(nx.connected_components(G)) for c in comp
}
# Compute modularity and ari
modularity.append(community_louvain.modularity(clustering, G, weight))
current_ari = ARI(G, clustering, clustering_label=clustering_label)
ari.append(current_ari)
# Update best ARI and corresponding cutoff
if current_ari > best_ari:
best_ari = current_ari
best_cutoff = cutoff
if eval_cut: # Search for a good cut looking at modularity
good_cut = -1
mod_last = modularity[-1]
drop_threshold = (
0.01 # at least drop this much to be considered as a drop for good_cut
)
# search for drop in modularity
drops = []
for i in range(len(modularity) - 1, 0, -1):
mod_now = modularity[i]
if mod_last > 1e-4 and (mod_now / mod_last) < drop_threshold:
drops.append((cutoff_range[i + 1], mod_last))
mod_last = mod_now
# Find the tuple with the highest modularity value in drops
best_drop = max(drops, key=lambda x: x[1])
# Extract the cutoff value and modularity value
good_cut = best_drop[0]
best_mod = best_drop[1]
print(
f"\nBest ARI: {best_ari:.{dcp+1}f}, with cutoff = {best_cutoff:.{dcp}f}\nGuessed cutoff = {good_cut:.{dcp}f}"
)
return maxw, cutoff_range, modularity, ari, good_cut, best_ari, best_mod
else:
print(f"\nBest ARI: {best_ari:.{dcp+1}f}, with cutoff = {best_cutoff:.{dcp}f}")
return maxw, cutoff_range, modularity, ari
[docs]
def get_best_cut(G_origin, weight="weight", clustering_label="value"):
"""
Determine the best edge removal cutoff threshold to maximize clustering accuracy (ARI).
This function iteratively removes edges based on a weight threshold (e.g., Ricci flow metric)
and evaluates the resulting clustering accuracy using the Adjusted Rand Index (ARI).
The best cutoff is selected as the one that yields the highest ARI.
:param G_origin: A graph where edges have an attribute ``weight`` used as a removal criterion.
:type G_origin: networkx.Graph
:param weight: The edge weight attribute used as the removal metric. Defaults to ``"weight"``.
:type weight: str
:param clustering_label: Node attribute name for ground truth communities. Defaults to ``"value"``.
:type clustering_label: str
:returns: The cutoff threshold that results in the highest ARI.
:rtype: float
"""
G = G_origin.copy()
maxw = max(nx.get_edge_attributes(G, weight).values())
cutoff_range = np.arange(maxw, 1, -0.025)
best_ari = 0
best_cutoff = 0
for cutoff in cutoff_range:
edge_trim_list = []
for n1, n2 in G.edges():
if G[n1][n2][weight] > cutoff:
edge_trim_list.append((n1, n2))
G.remove_edges_from(edge_trim_list)
# Get connected component after cut as clustering
clustering = {
c: idx for idx, comp in enumerate(nx.connected_components(G)) for c in comp
}
current_ari = ARI(G, clustering, clustering_label=clustering_label)
# Update best ARI & best cutoff
if current_ari > best_ari:
best_ari = current_ari
best_cutoff = cutoff
return best_cutoff