import datetime
import re
import threading
import time
from typing import Any, Dict, List, Optional
import requests # type: ignore
from ray.util.metrics import Gauge
[docs]
class Monitor:
def __init__(self) -> None:
self.pretrain_time_cost_gauge = Gauge(
"pretrain_time_cost", description="Latencies of pretrain_time_cost in ms."
)
self.train_time_cost_gauge = Gauge(
"train_time_cost", description="Latencies of train_time_cost in ms."
)
self.pretrain_node_network_gauge = Gauge(
"pretrain_node_network",
description="Network data sent during training per pod.",
)
self.train_node_network_gauge = Gauge(
"train_node_network",
description="Network data sent during training per pod.",
)
self.pretrain_memory_gauge = Gauge(
"pretrain_memory_usage", description="Memory usage during pretraining."
)
self.train_memory_gauge = Gauge(
"train_memory_usage", description="Memory usage during training."
)
self.pretrain_start_time: Optional[datetime.datetime] = None
self.pretrain_end_time: Optional[datetime.datetime] = None
self.train_start_time: Optional[datetime.datetime] = None
self.train_end_time: Optional[datetime.datetime] = None
self.current_round = 0
self.initial_network_data: Dict[str, float] = {}
self.final_network_data: Dict[str, float] = {}
self.memory_usage_list: List[Any] = []
self.memory_thread = threading.Thread(target=self.collect_memory, daemon=True)
self.memory_thread.start()
# Add large pod mapping
self.large_pod_mapping: Dict[str, str] = {}
[docs]
def collect_memory(self, interval_seconds=30):
while True:
memory_data = self._fetch_memory_usage()
self.memory_usage_list.append(memory_data)
time.sleep(interval_seconds)
def _get_network_data(self) -> Dict[str, float]:
response = requests.get(
"http://prometheus-kube-prometheus-prometheus.prometheus-system:9090/api/v1/query?query=ray_node_network_sent"
)
data = response.json()
pod_data = {}
large_pod_count = 1
for item in data["data"]["result"]:
pod_name = item["metric"]["pod"]
# Assign unique names for large pods
if re.search(r"large", pod_name):
if pod_name not in self.large_pod_mapping:
self.large_pod_mapping[pod_name] = f"Large{large_pod_count}"
large_pod_count += 1
pod_data[self.large_pod_mapping[pod_name]] = float(item["value"][1])
elif re.search(r"head", pod_name):
pod_data["Server"] = float(item["value"][1])
else:
pod_data[pod_name] = float(item["value"][1])
return pod_data
def _fetch_memory_usage(self) -> Dict[str, float]:
response = requests.get(
f"http://prometheus-kube-prometheus-prometheus.prometheus-system:9090/api/v1/query?query=ray_node_mem_used"
)
data = response.json()
memory_data = {}
large_pod_count = 1
for item in data["data"]["result"]:
pod_name = item["metric"]["pod"]
# Use the same large pod naming scheme
if re.search(r"large", pod_name):
if pod_name not in self.large_pod_mapping:
self.large_pod_mapping[pod_name] = f"Large{large_pod_count}"
large_pod_count += 1
memory_data[self.large_pod_mapping[pod_name]] = float(item["value"][1])
elif re.search(r"head", pod_name):
memory_data["Server"] = float(item["value"][1])
else:
memory_data["Server"] = float(item["value"][1])
return memory_data
[docs]
def pretrain_time_start(self) -> None:
self.pretrain_start_time = datetime.datetime.now()
self.initial_network_data = self._get_network_data()
print("Pretrain start time recorded and initial network data collected.")
self.memory_usage_list = []
[docs]
def pretrain_time_end(self, interval_seconds=30) -> None:
if self.pretrain_start_time is not None:
self.pretrain_end_time = datetime.datetime.now()
pretrain_duration = (
self.pretrain_end_time - self.pretrain_start_time
).total_seconds() * 1000
self.pretrain_time_cost_gauge.set(pretrain_duration)
print(f"//pretrain_time: {pretrain_duration} //end")
time.sleep(interval_seconds)
self.final_network_data = self._get_network_data()
# Output memory values for large pods
for pod_name in self.large_pod_mapping.values():
large_memory_values = [
memory_data.get(pod_name, 0)
for memory_data in self.memory_usage_list
if pod_name in memory_data
]
if large_memory_values:
print(
f"//Log Max memory for {pod_name}: {max(large_memory_values)} //end"
)
else:
print(f"No memory values found for {pod_name}.")
# Output memory value for Server pod
server_memory_values = [
max(
memory_data.get("Server", 0)
for pod_name in memory_data
if re.search(r"Server", pod_name)
)
for memory_data in self.memory_usage_list
if any(re.search(r"Server", pod) for pod in memory_data)
]
if server_memory_values:
print(f"//Log Max memory for Server: {max(server_memory_values)} //end")
else:
print("No memory values found for Server.")
# Output network data for large pods
for pod_name, pod_value in self.final_network_data.items():
if re.search(r"Large", pod_name):
network_diff = pod_value - self.initial_network_data.get(
pod_name, 0
)
self.pretrain_node_network_gauge.set(network_diff)
print(f"//Log {pod_name} network: {network_diff} //end")
# Output network data for Server pod
if "Server" in self.final_network_data:
network_diff = self.final_network_data[
"Server"
] - self.initial_network_data.get("Server", 0)
self.pretrain_node_network_gauge.set(network_diff)
print(f"//Log Server network: {network_diff} //end")
print("Pretrain end time recorded and duration set to gauge.")
print("Pretrain end time recorded and duration set to gauge.")
[docs]
def train_time_start(self) -> None:
self.current_round += 1
self.train_start_time = datetime.datetime.now()
self.initial_network_data = self._get_network_data()
print(self.initial_network_data)
self.memory_usage_list = []
print("Train start time recorded and initial network data collected.")
[docs]
def train_time_end(self, interval_seconds=30) -> None:
if self.train_start_time is not None:
self.train_end_time = datetime.datetime.now()
train_duration = (
self.train_end_time - self.train_start_time
).total_seconds() * 1000
self.train_time_cost_gauge.set(train_duration)
print(f"//Log train_time: {train_duration} //end")
time.sleep(interval_seconds)
self.final_network_data = self._get_network_data()
# Output memory values for large pods
for pod_name in self.large_pod_mapping.values():
large_memory_values = [
memory_data.get(pod_name, 0)
for memory_data in self.memory_usage_list
if pod_name in memory_data
]
if large_memory_values:
print(
f"//Log Max memory for {pod_name}: {max(large_memory_values)} //end"
)
else:
print(f"No memory values found for {pod_name}.")
# Output memory value for Server pod
server_memory_values = [
max(
memory_data.get("Server", 0)
for pod_name in memory_data
if re.search(r"Server", pod_name)
)
for memory_data in self.memory_usage_list
if any(re.search(r"Server", pod) for pod in memory_data)
]
if server_memory_values:
print(f"//Log Max memory for Server: {max(server_memory_values)} //end")
else:
print("No memory values found for Server.")
# Output network data for large pods
for pod_name, pod_value in self.final_network_data.items():
if re.search(r"Large", pod_name):
network_diff = pod_value - self.initial_network_data.get(
pod_name, 0
)
self.train_node_network_gauge.set(network_diff)
print(f"//Log {pod_name} network: {network_diff} //end")
# Output network data for Server pod
if "Server" in self.final_network_data:
network_diff = self.final_network_data[
"Server"
] - self.initial_network_data.get("Server", 0)
self.train_node_network_gauge.set(network_diff)
print(f"//Log Server network: {network_diff} //end")
print(
"Train end time recorded, duration set to gauge, and network data difference calculated."
)