"""
河图洛书多智能体系统 v3.1 - 自动演化版(修复版)
"""
import os
import time
import random
import json
import threading
from typing import Dict, Any, List, Optional, Tuple
from collections import deque
from dataclasses import dataclass, field
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
try:
import torchvision
import torchvision.transforms as transforms
HAS_TORCHVISION = True
except ImportError:
HAS_TORCHVISION = False
print("警告: torchvision未安装,将使用模拟数据")
# ==================== 配置 ====================
@dataclass
class EvolutionConfig:
"""演化系统配置"""
min_agents: int = 3
max_agents: int = 8
target_agents: int = 5
generation_size: int = 2
elite_count: int = 2
mutation_rate: float = 0.05
mutation_strength: float = 0.1
min_epochs_before_retire: int = 20
accuracy_threshold: float = 0.99
stagnation_epochs: int = 8
election_interval: float = 15.0
heartbeat_timeout: float = 30.0
max_concurrent_training: int = 2
# ==================== 洛书核心 ====================
class LuoShuCoreV3(nn.Module):
"""洛书核心 V3"""
def __init__(self):
super().__init__()
self.states = nn.Parameter(torch.randn(9) * 0.1)
ke_matrix = torch.zeros(9, 9)
ke_pairs = [(8, 7), (7, 6), (6, 5), (5, 4), (4, 3), (3, 2), (2, 1), (1, 0), (0, 8)]
for i, j in ke_pairs:
ke_matrix[i, j] = -0.5
ke_matrix[j, i] = -0.3
self.register_buffer('ke_matrix', ke_matrix)
self._feedback_buffer = []
def forward(self, feedback: torch.Tensor) -> torch.Tensor:
inhibition = torch.matmul(self.ke_matrix, torch.sigmoid(self.states))
delta = feedback + inhibition * 0.1
self._feedback_buffer.append(delta.detach().cpu())
if len(self._feedback_buffer) > 10:
self._feedback_buffer.pop(0)
return torch.sigmoid(self.states)
def apply_feedback(self):
if not self._feedback_buffer:
return
avg_delta = torch.stack(self._feedback_buffer[-5:]).mean(dim=0)
with torch.no_grad():
new_states = self.states + avg_delta.to(self.states.device) * 0.1
self.states.data = torch.tanh(new_states)
self._feedback_buffer = []
def extract_knowledge(self) -> Dict[str, torch.Tensor]:
return {'states': self.states.data.clone()}
def inject_knowledge(self, knowledge: Dict[str, torch.Tensor]):
with torch.no_grad():
if 'states' in knowledge:
self.states.data = knowledge['states'].clone()
def get_center_state(self) -> float:
return torch.sigmoid(self.states[4]).item()
def set_center_state(self, value: float) -> None:
with torch.no_grad():
self.states[4] = torch.tensor(float(value)).to(self.states.device)
def get_all_noncenter(self) -> torch.Tensor:
return torch.sigmoid(torch.cat([self.states[:4], self.states[5:]])).detach()
# ==================== 可演化的智能体 ====================
class EvolvableAgent(nn.Module):
"""支持演化的智能体"""
def __init__(
self,
agent_id: int,
num_classes: int = 10,
base_lr: float = 0.001,
parent: Optional['EvolvableAgent'] = None
):
super().__init__()
self.agent_id = agent_id
self.num_classes = num_classes
self.base_lr = base_lr
self.current_lr = base_lr
self.generation = 0
self.parent_id = -1
self.birth_time = time.time()
self.total_epochs = 0
self.trained_epochs = 0 # 实际训练轮数
# 洛书核心
self.luoshu = LuoShuCoreV3()
# 网络结构
self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
self.bn1 = nn.BatchNorm2d(32)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.bn2 = nn.BatchNorm2d(64)
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.bn3 = nn.BatchNorm2d(128)
self.pool = nn.MaxPool2d(2)
self.dropout = nn.Dropout(0.25)
self.fc1 = nn.Linear(128 * 3 * 3, 256)
self.fc2 = nn.Linear(256, num_classes)
# 如果有父智能体,继承知识
if parent:
self.inherit_from(parent)
# 训练状态
self._role = "pu"
self.entropy_history = deque(maxlen=200)
self.loss_history = deque(maxlen=200)
self.accuracy_history = deque(maxlen=100)
self.current_epoch = 0
self.best_accuracy = 0.0
self.current_accuracy = 0.0
self.stagnation_counter = 0
self._optimizer = None
print(f"[Agent {agent_id}] 诞生 (Gen={self.generation}, Parent={self.parent_id})")
def inherit_from(self, parent: 'EvolvableAgent'):
"""从父智能体继承"""
# 继承洛书核心
self.luoshu.inject_knowledge(parent.luoshu.extract_knowledge())
# 继承网络权重
self.load_state_dict(parent.state_dict(), strict=False)
# 继承元信息
self.generation = parent.generation + 1
self.parent_id = parent.agent_id
# 添加变异
self.mutate()
def mutate(self):
"""添加随机变异"""
with torch.no_grad():
for param in self.parameters():
if random.random() < EvolutionConfig.mutation_rate:
noise = torch.randn_like(param) * EvolutionConfig.mutation_strength
param.data = param.data + noise
def extract_dna(self) -> Dict[str, Any]:
"""提取DNA"""
return {
'agent_id': self.agent_id,
'generation': self.generation,
'parent_id': self.parent_id,
'best_accuracy': self.best_accuracy,
'total_epochs': self.total_epochs,
'model_state': self.state_dict(),
'luoshu_knowledge': self.luoshu.extract_knowledge()
}
def should_retire(self, config: EvolutionConfig) -> Tuple[bool, str]:
"""判断是否应该退役"""
if self.total_epochs < config.min_epochs_before_retire:
return False, "训练不足"
if self.best_accuracy >= config.accuracy_threshold:
return True, f"达到目标准确率 {self.best_accuracy:.3f}"
if self.stagnation_counter >= config.stagnation_epochs:
return True, f"停滞 {self.stagnation_counter} 轮无进步"
return False, "继续训练"
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, Dict[str, Any]]:
states = self.luoshu.get_all_noncenter()
x = self.pool(F.relu(self.bn1(self.conv1(x))))
x = self.pool(F.relu(self.bn2(self.conv2(x))))
x = self.pool(F.relu(self.bn3(self.conv3(x))))
x = x.view(x.size(0), -1)
x = self.dropout(F.relu(self.fc1(x)))
logits = self.fc2(x)
probs = F.softmax(logits, dim=-1)
entropy = -(probs * torch.log(probs + 1e-8)).sum(dim=-1).mean()
self.entropy_history.append(entropy.item())
return logits, {'entropy': entropy.item(), 'states': states}
def get_optimizer(self):
if self._optimizer is None:
self._optimizer = optim.AdamW(self.parameters(), lr=self.current_lr, weight_decay=1e-4)
return self._optimizer
def get_current_role(self) -> str:
return self._role
def set_role(self, role: str):
self._role = role
def compute_progress_rate(self) -> float:
if len(self.entropy_history) < 10:
return 0.0
recent = list(self.entropy_history)[-20:]
n = len(recent)
if n < 2:
return 0.0
x = np.arange(n)
y = np.array(recent)
x_mean, y_mean = x.mean(), y.mean()
numerator = ((x - x_mean) * (y - y_mean)).sum()
denominator = ((x - x_mean) ** 2).sum()
if denominator < 1e-8:
return 0.0
return numerator / denominator
def train_one_epoch(self, dataloader: DataLoader) -> Dict[str, float]:
self.train()
total_loss = 0.0
correct = 0
total = 0
steps = 0
optimizer = self.get_optimizer()
criterion = nn.CrossEntropyLoss()
device = next(self.parameters()).device
for x, y in dataloader:
x = x.to(device)
y = y.to(device).long()
optimizer.zero_grad()
logits, aux = self.forward(x)
loss = criterion(logits, y)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0)
optimizer.step()
feedback = torch.zeros(9, device=device)
states = aux['states']
feedback[:8] = aux['entropy'] * states[:8]
self.luoshu(feedback)
total_loss += loss.item()
pred = logits.argmax(dim=1)
correct += (pred == y).sum().item()
total += y.size(0)
steps += 1
self.luoshu.apply_feedback()
accuracy = correct / max(total, 1)
self.current_accuracy = accuracy
self.accuracy_history.append(accuracy)
self.total_epochs += 1
self.trained_epochs += 1
self.current_epoch += 1
# 检查进步情况
if accuracy > self.best_accuracy:
self.best_accuracy = accuracy
self.stagnation_counter = 0
else:
self.stagnation_counter += 1
# 修复:安全计算平均熵
avg_entropy = 0.0
if self.entropy_history:
entropy_list = list(self.entropy_history)
recent_entropy = entropy_list[-steps:] if steps <= len(entropy_list) else entropy_list
avg_entropy = sum(recent_entropy) / max(len(recent_entropy), 1)
return {
'loss': total_loss / max(steps, 1),
'accuracy': accuracy,
'best_accuracy': self.best_accuracy,
'avg_entropy': avg_entropy
}
@torch.no_grad()
def evaluate(self, dataloader: DataLoader) -> Dict[str, float]:
self.eval()
correct = 0
total = 0
device = next(self.parameters()).device
for x, y in dataloader:
x = x.to(device)
y = y.to(device).long()
logits, _ = self.forward(x)
pred = logits.argmax(dim=1)
correct += (pred == y).sum().item()
total += y.size(0)
return {'accuracy': correct / max(total, 1)}
def report_status(self) -> Dict[str, Any]:
return {
'agent_id': self.agent_id,
'generation': self.generation,
'timestamp': time.time(),
'entropy': self.entropy_history[-1] if self.entropy_history else 1.0,
'accuracy': self.current_accuracy,
'best_accuracy': self.best_accuracy,
'progress_rate': self.compute_progress_rate(),
'total_epochs': self.total_epochs,
'current_role': self._role
}
# ==================== 进化调度器 ====================
class EvolutionScheduler:
"""支持演化的调度器"""
def __init__(self, config: EvolutionConfig = None):
self.config = config or EvolutionConfig()
self.agents: Dict[int, EvolvableAgent] = {}
self.current_evolution_id: Optional[int] = None
self.running = True
self.election_timer = time.time()
self.generation_stats = []
self.total_born = 0
self.total_retired = 0
self.center = 0.5
self.best = float('inf')
self.worst = 0.0
self.agent_queue = []
self.next_id = 0
print("[进化调度器] 初始化完成")
print(f"配置: 智能体范围 [{config.min_agents}-{config.max_agents}], "
f"每代{config.generation_size}个, 保留{config.elite_count}个精英")
def get_next_id(self) -> int:
"""获取下一个可用ID"""
self.next_id += 1
return self.next_id - 1
def register_agent(self, agent: EvolvableAgent):
"""注册新智能体"""
self.agents[agent.agent_id] = agent
self.agent_queue.append(agent.agent_id)
self.total_born += 1
print(f"[调度器] 注册智能体 {agent.agent_id} (Gen={agent.generation})")
if len(self.agents) < self.config.min_agents:
self.create_new_generation()
def unregister_agent(self, agent_id: int):
"""注销智能体"""
if agent_id in self.agents:
agent = self.agents[agent_id]
print(f"[调度器] 智能体 {agent_id} 退役 "
f"(Gen={agent.generation}, BestAcc={agent.best_accuracy:.3f})")
del self.agents[agent_id]
self.total_retired += 1
if self.current_evolution_id == agent_id:
self.current_evolution_id = None
def create_new_generation(self):
"""创建新一代"""
if len(self.agents) >= self.config.max_agents:
return
print(f"\n🌱 创建新一代智能体...")
elites = sorted(
self.agents.values(),
key=lambda a: a.best_accuracy,
reverse=True
)[:self.config.elite_count]
new_agents = []
for i in range(self.config.generation_size):
new_id = self.get_next_id() + 1000
if elites and len(elites) > 0:
parent = random.choice(elites)
else:
parent = None
new_agent = EvolvableAgent(new_id, 10, parent=parent)
new_agents.append(new_agent)
for agent in new_agents:
self.register_agent(agent)
print(f" 创建了 {len(new_agents)} 个新智能体")
print(f" 当前总数: {len(self.agents)}")
def check_and_retire(self):
"""检查并退役"""
to_retire = []
for agent_id, agent in self.agents.items():
should_retire, reason = agent.should_retire(self.config)
if should_retire:
to_retire.append((agent_id, reason))
for agent_id, reason in to_retire:
print(f"[调度器] 智能体 {agent_id} 满足退役条件: {reason}")
self.unregister_agent(agent_id)
if len(self.agents) < self.config.min_agents:
self.create_new_generation()
def elect_evolution(self):
"""评选主进化态"""
candidates = []
for agent_id, agent in self.agents.items():
if len(agent.entropy_history) < 5:
continue
entropy = agent.entropy_history[-1]
progress = agent.compute_progress_rate()
if entropy <= self.best + 0.05:
continue
candidates.append({
'agent_id': agent_id,
'agent': agent,
'progress': progress,
'entropy': entropy
})
if not candidates and self.agents:
random_agent = random.choice(list(self.agents.values()))
candidates = [{
'agent_id': random_agent.agent_id,
'agent': random_agent,
'progress': 0,
'entropy': 1.0
}]
if not candidates:
return
candidates.sort(key=lambda x: x['progress'])
winner = candidates[0]
if self.current_evolution_id != winner['agent_id']:
if self.current_evolution_id is not None:
old_agent = self.agents.get(self.current_evolution_id)
if old_agent:
old_agent.set_role('pu')
print(f"[调度器] Agent {self.current_evolution_id} 降级为普通态")
self.current_evolution_id = winner['agent_id']
winner['agent'].set_role('evolution')
print(f"[调度器] Agent {self.current_evolution_id} 成为主进化态")
def update_scales(self, entropy: float):
"""更新乾隐三标尺"""
self.center = 0.99 * self.center + 0.01 * entropy
if entropy < self.best:
self.best = entropy
if entropy > self.worst:
self.worst = entropy
def get_agent_to_train(self) -> Optional[EvolvableAgent]:
"""获取下一个需要训练的智能体"""
if self.current_evolution_id:
agent = self.agents.get(self.current_evolution_id)
if agent:
return agent
while self.agent_queue:
agent_id = self.agent_queue.pop(0)
agent = self.agents.get(agent_id)
if agent:
return agent
if self.agents:
return random.choice(list(self.agents.values()))
return None
def get_status(self) -> Dict[str, Any]:
return {
'agent_count': len(self.agents),
'evolution_agent': self.current_evolution_id,
'total_born': self.total_born,
'total_retired': self.total_retired,
'scales': {'center': round(self.center, 4),
'best': round(self.best, 4) if self.best != float('inf') else None,
'worst': round(self.worst, 4)},
'agents': [
{
'id': aid,
'generation': a.generation,
'role': a.get_current_role(),
'best_acc': round(a.best_accuracy, 3),
'epochs': a.total_epochs
}
for aid, a in self.agents.items()
]
}
# ==================== 主演化系统 ====================
class EvolvingMultiAgentSystem:
"""自动演化的多智能体系统"""
def __init__(self, config: EvolutionConfig = None):
self.config = config or EvolutionConfig()
self.scheduler = EvolutionScheduler(config)
self.train_loader = None
self.test_loader = None
self.running = False
print("\n" + "=" * 60)
print("河图洛书多智能体系统 v3.1 - 自动演化版")
print("=" * 60)
def prepare_data(self):
"""准备数据"""
print("\n准备数据...")
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
try:
if HAS_TORCHVISION:
train_dataset = torchvision.datasets.MNIST(
root='./data', train=True, download=True, transform=transform
)
test_dataset = torchvision.datasets.MNIST(
root='./data', train=False, download=True, transform=transform
)
print("使用MNIST数据集")
else:
raise ImportError
except:
train_dataset = torch.utils.data.TensorDataset(
torch.randn(2000, 1, 28, 28),
torch.randint(0, 10, (2000,))
)
test_dataset = torch.utils.data.TensorDataset(
torch.randn(500, 1, 28, 28),
torch.randint(0, 10, (500,))
)
print("使用模拟数据")
self.train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
self.test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
def train_single_agent(self, agent: EvolvableAgent, max_epochs: int = 5):
"""训练单个智能体"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
agent.to(device)
for epoch in range(max_epochs):
if not self.running:
break
train_stats = agent.train_one_epoch(self.train_loader)
if epoch % 3 == 0 or epoch == max_epochs - 1:
eval_stats = agent.evaluate(self.test_loader)
entropy = agent.entropy_history[-1] if agent.entropy_history else 1.0
self.scheduler.update_scales(entropy)
print(f"[Agent {agent.agent_id}] Gen={agent.generation} | "
f"Epoch {agent.total_epochs} | "
f"Loss={train_stats['loss']:.3f} | "
f"Train={train_stats['accuracy']:.3f} | "
f"Test={eval_stats['accuracy']:.3f} | "
f"Best={agent.best_accuracy:.3f} | "
f"Role={agent.get_current_role()}")
def run_evolution_loop(self):
"""主演化循环"""
print("\n🚀 启动演化系统...")
print("=" * 60)
self.running = True
# 创建初始智能体
for i in range(self.config.min_agents):
agent = EvolvableAgent(i, 10)
self.scheduler.register_agent(agent)
cycle_count = 0
last_status_time = time.time()
try:
while self.running:
agent = self.scheduler.get_agent_to_train()
if agent:
self.train_single_agent(agent, max_epochs=5)
self.scheduler.check_and_retire()
self.scheduler.elect_evolution()
if time.time() - last_status_time > 30:
self.print_status()
last_status_time = time.time()
cycle_count += 1
if cycle_count > 200 and len(self.scheduler.agents) == 0:
print("\n所有智能体已退役,系统停止")
break
time.sleep(0.1)
except KeyboardInterrupt:
print("\n用户中断...")
finally:
self.stop()
def print_status(self):
"""打印状态"""
status = self.scheduler.get_status()
print("\n" + "=" * 60)
print(f"📊 系统状态")
print(f" 活跃智能体: {status['agent_count']}")
print(f" 主进化态: {status['evolution_agent']}")
print(f" 总诞生: {status['total_born']} | 总退役: {status['total_retired']}")
print("=" * 60)
def stop(self):
"""停止系统"""
self.running = False
print("\n系统已停止")
self.print_status()
def start(self):
"""启动系统"""
self.prepare_data()
self.run_evolution_loop()
# ==================== 主函数 ====================
def main():
config = EvolutionConfig(
min_agents=3,
max_agents=6,
target_agents=4,
generation_size=1,
elite_count=1,
mutation_rate=0.05,
mutation_strength=0.1,
min_epochs_before_retire=15,
accuracy_threshold=0.99,
stagnation_epochs=6,
election_interval=15.0
)
system = EvolvingMultiAgentSystem(config)
system.start()
if __name__ == '__main__':
main() |