网络状态与性能
当前网络概况
实时统计数据
活跃节点数量: 1,247 个
部署的 LLM 模型: 89 个
总模型数量: 312 个
平均响应时间: 47ms
网络正常运行时间: 99.97%
日处理请求量: 2,847,392 次
当前 TPS: 147 次/秒
网络分布
class NetworkStats:
def __init__(self):
self.regional_distribution = {
"北美": {"nodes": 387, "percentage": 31.0},
"欧洲": {"nodes": 298, "percentage": 23.9},
"亚太": {"nodes": 423, "percentage": 33.9},
"其他": {"nodes": 139, "percentage": 11.2}
}
self.node_types = {
"full_node": 456, # 完整节点
"light_node": 623, # 轻量节点
"compute_node": 134, # 计算节点
"storage_node": 34 # 存储节点
}
def get_network_health():
"""获取网络健康状况"""
return {
"overall_health": "健康",
"active_connections": 15234,
"failed_nodes": 12,
"recovery_time": "< 30秒",
"network_latency": {
"p50": 45, # 50分位延迟
"p95": 120, # 95分位延迟
"p99": 250 # 99分位延迟
}
}
性能监控
实时性能指标
import asyncio
import time
from collections import deque
class PerformanceMonitor:
def __init__(self, window_size=1000):
self.window_size = window_size
self.response_times = deque(maxlen=window_size)
self.throughput_data = deque(maxlen=window_size)
self.error_rates = deque(maxlen=window_size)
async def record_request(self, start_time, end_time, success=True):
"""记录请求性能"""
response_time = (end_time - start_time) * 1000 # 转换为毫秒
self.response_times.append(response_time)
# 记录错误率
self.error_rates.append(0 if success else 1)
# 更新吞吐量
current_minute = int(time.time() // 60)
self.throughput_data.append(current_minute)
def get_performance_metrics(self):
"""获取性能指标"""
if not self.response_times:
return None
response_times = list(self.response_times)
response_times.sort()
n = len(response_times)
return {
"avg_response_time": sum(response_times) / n,
"median_response_time": response_times[n // 2],
"p95_response_time": response_times[int(n * 0.95)],
"p99_response_time": response_times[int(n * 0.99)],
"error_rate": sum(self.error_rates) / len(self.error_rates) if self.error_rates else 0,
"current_tps": self.calculate_current_tps()
}
def calculate_current_tps(self):
"""计算当前每秒事务数"""
if len(self.throughput_data) < 2:
return 0
current_minute = int(time.time() // 60)
requests_this_minute = sum(1 for t in self.throughput_data if t == current_minute)
return requests_this_minute / 60 # 转换为每秒
网络拓扑分析
class NetworkTopology:
def __init__(self):
self.nodes = {}
self.connections = {}
def add_node(self, node_id, node_info):
"""添加节点"""
self.nodes[node_id] = {
"id": node_id,
"region": node_info.get("region"),
"node_type": node_info.get("type"),
"capacity": node_info.get("capacity"),
"connections": set(),
"last_seen": time.time()
}
def add_connection(self, node1_id, node2_id, latency):
"""添加连接"""
if node1_id in self.nodes and node2_id in self.nodes:
self.nodes[node1_id]["connections"].add(node2_id)
self.nodes[node2_id]["connections"].add(node1_id)
connection_key = tuple(sorted([node1_id, node2_id]))
self.connections[connection_key] = {
"latency": latency,
"bandwidth": None,
"last_tested": time.time()
}
def analyze_network_efficiency(self):
"""分析网络效率"""
total_nodes = len(self.nodes)
total_connections = len(self.connections)
# 计算平均连接度
avg_degree = (total_connections * 2) / total_nodes if total_nodes > 0 else 0
# 计算网络密度
max_possible_connections = total_nodes * (total_nodes - 1) / 2
network_density = total_connections / max_possible_connections if max_possible_connections > 0 else 0
# 分析区域分布
regional_stats = {}
for node in self.nodes.values():
region = node["region"]
if region not in regional_stats:
regional_stats[region] = 0
regional_stats[region] += 1
return {
"total_nodes": total_nodes,
"total_connections": total_connections,
"average_degree": avg_degree,
"network_density": network_density,
"regional_distribution": regional_stats,
"connectivity_score": min(avg_degree / 10, 1.0) # 归一化到0-1
}
模型性能统计
模型使用统计
class ModelUsageStats:
def __init__(self):
self.model_stats = {}
def record_model_usage(self, model_id, request_type, response_time, tokens_processed):
"""记录模型使用情况"""
if model_id not in self.model_stats:
self.model_stats[model_id] = {
"total_requests": 0,
"total_tokens": 0,
"total_response_time": 0,
"request_types": {},
"hourly_usage": {},
"error_count": 0
}
stats = self.model_stats[model_id]
stats["total_requests"] += 1
stats["total_tokens"] += tokens_processed
stats["total_response_time"] += response_time
# 按请求类型统计
if request_type not in stats["request_types"]:
stats["request_types"][request_type] = 0
stats["request_types"][request_type] += 1
# 按小时统计
current_hour = int(time.time() // 3600)
if current_hour not in stats["hourly_usage"]:
stats["hourly_usage"][current_hour] = 0
stats["hourly_usage"][current_hour] += 1
def get_top_models(self, limit=10):
"""获取使用量最高的模型"""
model_rankings = []
for model_id, stats in self.model_stats.items():
avg_response_time = (stats["total_response_time"] / stats["total_requests"]
if stats["total_requests"] > 0 else 0)
model_rankings.append({
"model_id": model_id,
"total_requests": stats["total_requests"],
"avg_response_time": avg_response_time,
"total_tokens": stats["total_tokens"],
"popularity_score": stats["total_requests"] * 0.7 + stats["total_tokens"] * 0.3
})
# 按受欢迎程度排序
model_rankings.sort(key=lambda x: x["popularity_score"], reverse=True)
return model_rankings[:limit]
热门模型排行榜
def generate_model_leaderboard():
"""生成模型排行榜"""
usage_stats = ModelUsageStats()
# 模拟一些数据(实际中从数据库获取)
leaderboard_data = [
{
"rank": 1,
"model_name": "ChatGPT-4-Turbo",
"model_id": "gpt4t_001",
"total_requests": 234567,
"avg_response_time": 45,
"success_rate": 99.2,
"user_rating": 4.8,
"monthly_revenue": 12543.67 # MESH
},
{
"rank": 2,
"model_name": "Claude-3-Sonnet",
"model_id": "claude3s_002",
"total_requests": 198432,
"avg_response_time": 52,
"success_rate": 98.9,
"user_rating": 4.7,
"monthly_revenue": 9876.32
},
{
"rank": 3,
"model_name": "Llama-3-70B",
"model_id": "llama3_70b_003",
"total_requests": 156789,
"avg_response_time": 38,
"success_rate": 97.8,
"user_rating": 4.5,
"monthly_revenue": 7234.55
}
]
return leaderboard_data
# 生成排行榜报告
def print_leaderboard_report():
"""打印排行榜报告"""
leaderboard = generate_model_leaderboard()
print("🏆 LLMESH 模型排行榜 (本月)")
print("=" * 80)
print(f"{'排名':<4} {'模型名称':<20} {'请求总数':<10} {'平均响应时间':<12} {'成功率':<8} {'用户评分':<8} {'月收益':<12}")
print("-" * 80)
for model in leaderboard:
print(f"{model['rank']:<4} {model['model_name']:<20} {model['total_requests']:<10} "
f"{model['avg_response_time']}ms{'':<7} {model['success_rate']}%{'':<4} "
f"{model['user_rating']:<8} {model['monthly_revenue']:.2f} MESH")
网络健康监控
健康检查系统
class NetworkHealthMonitor:
def __init__(self):
self.health_checks = {
"node_connectivity": self.check_node_connectivity,
"response_time": self.check_response_time,
"error_rate": self.check_error_rate,
"resource_utilization": self.check_resource_utilization,
"token_circulation": self.check_token_circulation
}
self.alert_thresholds = {
"max_response_time": 500, # 500ms
"max_error_rate": 0.05, # 5%
"min_active_nodes": 1000, # 最少1000个活跃节点
"max_cpu_usage": 0.85 # 85% CPU使用率
}
async def run_health_checks(self):
"""运行所有健康检查"""
health_report = {
"timestamp": time.time(),
"overall_status": "healthy",
"checks": {},
"alerts": []
}
for check_name, check_function in self.health_checks.items():
try:
result = await check_function()
health_report["checks"][check_name] = result
# 检查是否需要报警
alerts = self.evaluate_alerts(check_name, result)
health_report["alerts"].extend(alerts)
except Exception as e:
health_report["checks"][check_name] = {
"status": "error",
"error": str(e)
}
health_report["overall_status"] = "degraded"
# 更新整体状态
if health_report["alerts"]:
health_report["overall_status"] = "warning" if len(health_report["alerts"]) < 3 else "critical"
return health_report
async def check_node_connectivity(self):
"""检查节点连接性"""
# 模拟检查逻辑
active_nodes = 1247
failed_connections = 23
connectivity_rate = (active_nodes - failed_connections) / active_nodes
return {
"status": "healthy" if connectivity_rate > 0.95 else "warning",
"active_nodes": active_nodes,
"failed_connections": failed_connections,
"connectivity_rate": connectivity_rate
}
async def check_response_time(self):
"""检查响应时间"""
# 模拟获取响应时间数
Last updated