import glob
import logging
import re
import time
from io import BytesIO
from pathlib import Path
import attridict
import numpy as np
import scipy.sparse as sp
import torch
import torch_geometric
# from huggingface_hub import HfApi, HfFolder, hf_hub_download, upload_file
[docs]
def normalize(mx: sp.csc_matrix) -> sp.csr_matrix:
"""
This function is to row-normalize sparse matrix for efficient computation of the graph
Parameters
----------
mx : sparse matrix
Input sparse matrix to row-normalize.
Returns
-------
mx : sparse matrix
Row-normalized sparse matrix.
Note
----
Row-normalizing is usually done in graph algorithms to enable equal node contributions
regardless of the node's degree and to stabilize, ease numerical computations.
"""
rowsum = np.array(mx.sum(1))
r_inv = np.power(rowsum, -1).flatten()
r_inv[np.isinf(r_inv)] = 0.0
r_mat_inv = sp.diags(r_inv)
mx = r_mat_inv.dot(mx)
return mx
[docs]
def intersect1d(t1: torch.Tensor, t2: torch.Tensor) -> torch.Tensor:
"""
Concatenates the two input tensors, finding common elements between these two
Parameters
----------
t1 : torch.Tensor
The first input tensor for the operation.
t2 : torch.Tensor
The second input tensor for the operation.
Returns
-------
intersection : torch.Tensor
Intersection of the two input tensors.
"""
combined = torch.cat((t1, t2))
uniques, counts = combined.unique(return_counts=True)
intersection = uniques[counts > 1]
return intersection
[docs]
def setdiff1d(t1: torch.Tensor, t2: torch.Tensor) -> torch.Tensor:
"""
Computes the set difference between the two input tensors
Parameters
----------
t1 : torch.Tensor
The first input tensor for the operation.
t2 : torch.Tensor
The second input tensor for the operation.
Returns
-------
difference : torch.Tensor
Difference in elements of the two input tensors.
"""
combined = torch.cat((t1, t2))
uniques, counts = combined.unique(return_counts=True)
difference = uniques[counts == 1]
return difference
[docs]
def label_dirichlet_partition(
labels: np.array,
N: int,
K: int,
n_parties: int,
beta: float,
distribution_type: str = "average",
) -> list:
# logger.info(
# f"Starting label_dirichlet_partition with {n_parties} parties and {K} classes"
# )
start_time = time.time()
min_require_size = max(
1, min(10, N // (n_parties * K))
) # Adjust minimum size based on dataset
# Generate weights
if distribution_type == "lognormal":
weights = np.random.lognormal(mean=0, sigma=2, size=n_parties)
elif distribution_type == "powerlaw":
weights = np.random.power(a=0.31653612251668856, size=n_parties)
elif distribution_type == "exponential":
weights = np.random.exponential(scale=1.0, size=n_parties)
else:
weights = np.ones(n_parties)
weights /= weights.sum()
# logger.info(f"Generated weights using {distribution_type} distribution")
# Pre-compute label indices
label_indices = [np.where(labels == k)[0] for k in range(K)]
attempts = 0
max_attempts = 1000 # increased // can revoke later
while attempts < max_attempts:
attempts += 1
idx_batch = [[] for _ in range(n_parties)]
for k in range(K):
idx_k = label_indices[k]
np.random.shuffle(idx_k)
proportions = np.random.dirichlet(np.repeat(beta, n_parties))
if distribution_type == "average":
proportions = np.array(
[
p * (len(idx_j) < N / n_parties)
for p, idx_j in zip(proportions, idx_batch)
]
)
proportions *= weights
proportions /= proportions.sum()
proportions = (np.cumsum(proportions) * len(idx_k)).astype(int)[:-1]
idx_batch = [
idx_j + idx.tolist()
for idx_j, idx in zip(idx_batch, np.split(idx_k, proportions))
]
min_size = min(len(idx_j) for idx_j in idx_batch)
if min_size >= min_require_size:
break
# if attempts % 10 == 0:
# logger.warning(
# f"Attempt {attempts}: min_size ({min_size}) < min_require_size ({min_require_size})"
# )
# if attempts >= max_attempts:
# logger.warning(
# f"Failed to meet min_require_size after {max_attempts} attempts. Using best attempt."
# )
# logger.info(f"Partitioning completed after {attempts} attempts")
split_data_indexes = [np.random.permutation(idx_j).tolist() for idx_j in idx_batch]
# logger.info(
# f"label_dirichlet_partition completed in {time.time() - start_time:.2f} seconds"
# )
return split_data_indexes
[docs]
def get_in_comm_indexes(
edge_index: torch.Tensor,
split_node_indexes: list,
num_clients: int,
L_hop: int,
idx_train: torch.Tensor,
idx_test: torch.Tensor,
) -> tuple:
"""
Extract and preprocess data indices and edge information. It determines the nodes that each client
will communicate with, based on the L-hop neighborhood, and aggregates the edge information accordingly.
It also determines the indices of training and test data points that are available to each client.
Parameters
----------
edge_index : torch.Tensor
A tensor representing the edge information (connections between nodes) of the graph dataset.
split_node_indexes : list
A list of node indices. Each list element corresponds to a subset of nodes assigned to a specific client
after data partitioning.
num_clients : int
The total number of clients.
L_hop : int
The number of hops to consider when determining the neighborhood of each node. For example, if L_hop=1,
the 1-hop neighborhood of a node includes the node itself and all of its immediate neighbors.
idx_train : torch.Tensor
Tensor containing indices of training data in the graph.
idx_test : torch.Tensor
Tensor containing indices of test data in the graph.
Returns
-------
communicate_node_indexes : list
A list of node indices for each client, representing nodes involved in communication.
in_com_train_node_indexes : list
A list of tensors, where each tensor contains the indices of training data points available to each client.
in_com_test_node_indexes : list
A list of tensors, where each tensor contains the indices of test data points available to each client.
edge_indexes_clients : list
A list of tensors representing the edges between nodes within each client's subgraph.
"""
communicate_node_indexes = []
in_com_train_node_indexes = []
edge_indexes_clients = []
for i in range(num_clients):
communicate_node_index = split_node_indexes[i]
if L_hop == 0:
(
communicate_node_index,
current_edge_index,
_,
__,
) = torch_geometric.utils.k_hop_subgraph(
communicate_node_index, 0, edge_index, relabel_nodes=True
)
del _
del __
elif L_hop == 1 or L_hop == 2:
(
communicate_node_index,
current_edge_index,
_,
__,
) = torch_geometric.utils.k_hop_subgraph(
communicate_node_index, 1, edge_index, relabel_nodes=False
)
del _
del __
# Assert that the number of distinct elements are equal
# distinct_communicate_node_index = torch.unique(communicate_node_index)
# # Flatten the 2D current_edge_index to get the unique node indices involved in edges
# distinct_current_edge_nodes = torch.unique(current_edge_index.flatten())
# assert len(distinct_communicate_node_index) == len(
# distinct_current_edge_nodes
# ), f"Distinct counts do not match: communicate_node_index ({len(distinct_communicate_node_index)}) != current_edge_nodes ({len(distinct_current_edge_nodes)})"
communicate_node_index = communicate_node_index.to("cpu")
current_edge_index = current_edge_index.to("cpu")
communicate_node_indexes.append(communicate_node_index)
"""
current_edge_index = torch_sparse.SparseTensor(
row=current_edge_index[0],
col=current_edge_index[1],
sparse_sizes=(len(communicate_node_index), len(communicate_node_index)),
)
"""
edge_indexes_clients.append(current_edge_index)
inter = intersect1d(
split_node_indexes[i], idx_train
) # only count the train data of nodes in current server(not communicate nodes)
in_com_train_node_indexes.append(
torch.searchsorted(communicate_node_indexes[i], inter).clone()
) # local id in block matrix
in_com_test_node_indexes = []
for i in range(num_clients):
inter = intersect1d(split_node_indexes[i], idx_test)
in_com_test_node_indexes.append(
torch.searchsorted(communicate_node_indexes[i], inter).clone()
)
return (
communicate_node_indexes,
in_com_train_node_indexes,
in_com_test_node_indexes,
edge_indexes_clients,
)
[docs]
def get_1hop_feature_sum(
node_features: torch.Tensor,
edge_index: torch.Tensor,
device: str,
include_self: bool = True,
) -> torch.Tensor:
"""
Computes the sum of features of 1-hop neighbors for each node in a graph. The function
can be used to iterate over each node, identifying its neighbors based on the `edge_index`.
Parameters
----------
node_features : torch.Tensor
A 2D tensor containing the features of each node in the graph. Each row corresponds to a node,
and each column corresponds to a feature.
edge_index : torch.Tensor
A 2D tensor representing the adjacency information of the graph which has the size of (2, num_edges),
where the first row represents the source node, and the second row represents the target node.
include_self : bool, optional (default=True)
A flag to include the node's own features in the sum. If True, the features of the node itself
are included in the summation. If False, only the features of the neighboring nodes are summed.
Returns
-------
(tensor) : torch.Tensor
A 2D tensor where each row represents the summed features of the 1-hop neighbors for each node.
The tensor has the same number of rows as `node_features` and the same number of columns as the
number of features per node.
"""
source_nodes = edge_index[0]
target_nodes = edge_index[1]
num_nodes, num_features = node_features.shape
summed_features = torch.zeros((num_nodes, num_features)).to(device)
# encryption
# encrypted_node_features = [ts.ckks_vector(context, node_features[i].tolist()) for i in range(num_nodes)]
if include_self:
# print("using spare matrix method")
adjacency_matrix = torch.sparse_coo_tensor(
edge_index,
torch.ones_like(source_nodes, dtype=torch.float32),
(num_nodes, num_nodes),
).to(device)
summed_features = torch.sparse.mm(adjacency_matrix.float(), node_features)
else:
for node in range(num_nodes):
neighbor_indices = torch.where(
(source_nodes == node) & (target_nodes != node)
) # exclude self-loop
neighbor_features = node_features[target_nodes[neighbor_indices]]
summed_features[node] = torch.sum(neighbor_features, dim=0)
return summed_features
[docs]
def increment_dir(dir: str, comment: str = "") -> str:
"""
This function is used to create a new directory path by incrementing a numeric suffix in the original
directory path.
Parameters
----------
dir : str
The original directory path.
comment : str, optional)
An optional comment that can be appended to the directory name.
Returns
-------
(str) : str
Returns a string with the path of the new directory.
"""
# Increments a directory runs/exp1 --> runs/exp2_comment
n = 0 # number
dir = str(Path(dir)) # os-agnostic
dirs = sorted(glob.glob(dir + "*")) # directories
if dirs:
matches = [re.search(r"exp(\d+)", d) for d in dirs]
idxs = [int(m.groups()[0]) for m in matches if m]
if idxs:
n = max(idxs) + 1 # increment
return dir + str(n) + ("_" + comment if comment else "")
[docs]
def save_trainer_data_to_hugging_face(
trainer_id,
local_node_index,
communicate_node_global_index,
global_edge_index_client,
train_labels,
test_labels,
features,
in_com_train_node_local_indexes,
in_com_test_node_local_indexes,
args,
):
repo_name = f"FedGraph/fedgraph_{args.dataset}_{args.n_trainer}trainer_{args.num_hops}hop_iid_beta_{args.iid_beta}_trainer_id_{trainer_id}"
user = HfFolder.get_token()
api = HfApi()
try:
api.create_repo(
repo_id=repo_name, token=user, repo_type="dataset", exist_ok=True
)
except Exception as e:
print(f"Failed to create or access the repository: {str(e)}")
return
def save_tensor_to_hf(tensor, file_name):
buffer = BytesIO()
torch.save(tensor, buffer)
buffer.seek(0)
api.upload_file(
path_or_fileobj=buffer,
path_in_repo=file_name,
repo_id=repo_name,
repo_type="dataset",
token=user,
)
save_tensor_to_hf(local_node_index, "local_node_index.pt")
save_tensor_to_hf(communicate_node_global_index, "communicate_node_index.pt")
save_tensor_to_hf(global_edge_index_client, "adj.pt")
save_tensor_to_hf(train_labels, "train_labels.pt")
save_tensor_to_hf(test_labels, "test_labels.pt")
save_tensor_to_hf(features, "features.pt")
save_tensor_to_hf(in_com_train_node_local_indexes, "idx_train.pt")
save_tensor_to_hf(in_com_test_node_local_indexes, "idx_test.pt")
print(f"Uploaded data for trainer {trainer_id}")
[docs]
def save_all_trainers_data(
split_node_indexes,
communicate_node_global_indexes,
global_edge_indexes_clients,
labels,
features,
in_com_train_node_local_indexes,
in_com_test_node_local_indexes,
n_trainer,
args,
):
for i in range(n_trainer):
save_trainer_data_to_hugging_face(
trainer_id=i,
local_node_index=split_node_indexes[i],
communicate_node_global_index=communicate_node_global_indexes[i],
global_edge_index_client=global_edge_indexes_clients[i],
train_labels=labels[communicate_node_global_indexes[i]][
in_com_train_node_local_indexes[i]
],
test_labels=labels[communicate_node_global_indexes[i]][
in_com_test_node_local_indexes[i]
],
features=features[split_node_indexes[i]],
in_com_train_node_local_indexes=in_com_train_node_local_indexes[i],
in_com_test_node_local_indexes=in_com_test_node_local_indexes[i],
args=args,
)