# hetu_luoshu_v5.py - 河图洛书智能体 V5(调子系统 + 老师批改 + 分类语素库)
# 架构说明:
# 火2:从语料提取1-4字语素,老师7批改(合法性+语义+词性),按词性分类存入火池
# 木3:从火池取语素,从木池取调子,生成句子,老师8批改后入库
# 水1:从木池取调子杂交,生成变体,老师6批改后入库
# 金4:从水池取变体,老师9批改后择优固化
# 道:真圆周率引擎,注入新奇信号
import os
import json
import random
import time
import requests
import re
import subprocess
from collections import Counter
from typing import List, Dict, Any, Optional, Tuple
from urllib.parse import quote
from datetime import datetime
from decimal import Decimal, getcontext
# 尝试导入 jieba 用于词性标注
try:
import jieba.posseg as pseg
JIEBA_AVAILABLE = True
except ImportError:
JIEBA_AVAILABLE = False
print("⚠️ jieba 未安装,将使用简单词性标注(建议安装: pip install jieba)")
# 尝试导入 gmpy2(真圆周率引擎)
try:
import gmpy2
GMPY2_AVAILABLE = True
except ImportError:
GMPY2_AVAILABLE = False
print("⚠️ gmpy2 未安装,将使用 BBP 引擎(精度有限)")
print(" 建议安装: pip install gmpy2")
# ==================== DeepSeek API 配置 ====================
DEEPSEEK_API_KEY = "sk-952a1c833f61473ca2fe38a1bb367e9e"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
_api_fail_count = 0
_api_fail_threshold = 5
def call_deepseek(prompt: str, max_tokens: int = 800, temperature: float = 0.5) -> Optional[str]:
global _api_fail_count
try:
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {DEEPSEEK_API_KEY}"}
data = {"model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": temperature}
resp = requests.post(DEEPSEEK_API_URL, json=data, headers=headers, timeout=30)
if resp.status_code == 200:
_api_fail_count = 0
return resp.json()["choices"][0]["message"]["content"]
_api_fail_count += 1
return None
except Exception as e:
_api_fail_count += 1
print(f"DeepSeek API 调用失败: {e}")
return None
def is_api_degraded() -> bool:
return _api_fail_count >= _api_fail_threshold
def web_search(keyword: str) -> Optional[str]:
try:
url = f"https://baike.baidu.com/item/{quote(keyword)}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
resp = requests.get(url, headers=headers, timeout=15)
if resp.status_code == 200:
match = re.search(r'<div class="lemma-summary">(.*?)</div>', resp.text, re.DOTALL)
if match:
text = re.sub(r'<[^>]+>', '', match.group(1))
return text[:500]
match = re.search(r'<meta name="description" content="(.*?)"', resp.text)
if match:
return match.group(1)[:500]
return None
except Exception as e:
print(f"网上搜索失败: {e}")
return None
# ==================== 道的圆周率引擎 V3(真圆周率 + 流式) ====================
class DaoPi:
"""道 V3:真圆周率引擎(基于 gmpy2)"""
def __init__(self, chunk_size: int = 5000):
self.chunk_size = chunk_size
self.pointer = 0
self.cache = []
self.current_chunk_start = 0
if GMPY2_AVAILABLE:
self._use_gmpy2 = True
print(f" 📐 道引擎:gmpy2(真圆周率),chunk_size={chunk_size}")
self._load_chunk_gmpy2(0)
else:
self._use_gmpy2 = False
print(f" 📐 道引擎:BBP(备用),chunk_size={chunk_size},精度有限")
self._load_chunk_bbp(0)
def _load_chunk_gmpy2(self, start_pos: int) -> None:
precision_bits = (start_pos + self.chunk_size + 100) * 4
gmpy2.get_context().precision = precision_bits
pi = gmpy2.const_pi()
decimal_places = start_pos + self.chunk_size + 50
pi_str = format(pi, f'.{decimal_places}f')
if '.' in pi_str:
pi_str = pi_str.split('.')[1]
else:
pi_str = ""
if start_pos < len(pi_str):
segment = pi_str[start_pos:start_pos + self.chunk_size]
else:
segment = ""
while len(segment) < self.chunk_size:
segment += "0"
self.cache = [int(ch) for ch in segment]
self.current_chunk_start = start_pos
print(f" 📐 道已加载新段: 位置 {start_pos} - {start_pos + self.chunk_size}(gmpy2)")
def _load_chunk_bbp(self, start_pos: int) -> None:
need_precision = start_pos + self.chunk_size + 50
getcontext().prec = need_precision + 10
pi = Decimal(0)
for k in range(need_precision):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) -
Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) -
Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
if start_pos < len(pi_str):
segment = pi_str[start_pos:start_pos + self.chunk_size]
else:
segment = ""
while len(segment) < self.chunk_size:
segment += "0"
self.cache = [int(ch) for ch in segment]
self.current_chunk_start = start_pos
print(f" 📐 道已加载新段: 位置 {start_pos} - {start_pos + self.chunk_size}(BBP)")
def _ensure_cache(self, pos: int) -> None:
if pos < self.current_chunk_start or pos >= self.current_chunk_start + self.chunk_size:
if self._use_gmpy2:
self._load_chunk_gmpy2(pos)
else:
self._load_chunk_bbp(pos)
def next_digit(self) -> int:
self._ensure_cache(self.pointer)
idx = self.pointer - self.current_chunk_start
digit = self.cache[idx]
self.pointer += 1
return digit
def novelty(self, length: int = 8) -> float:
segment = ''.join(str(self.next_digit()) for _ in range(length))
numeric = 0
for i, ch in enumerate(segment):
numeric += int(ch) * (0.1 ** (i+1))
novelty = numeric / 0.111111
return min(0.99, novelty)
def get_pointer(self) -> int:
return self.pointer
def reset_pointer(self):
self.pointer = 0
if self._use_gmpy2:
self._load_chunk_gmpy2(0)
else:
self._load_chunk_bbp(0)
# ==================== 小型策略网络 ====================
class SimpleStrategyNet:
def __init__(self, student_id: str):
self.student_id = student_id
self.experiences = []
def predict(self, recent_memory: List[Dict]) -> Dict[str, float]:
if len(recent_memory) < 3:
return {"curiosity": 0.3, "intensity": 0.5, "variety": 0.5}
recent_scores = [m.get("teacher_score", 0) for m in recent_memory[-3:]]
avg_score = sum(recent_scores) / len(recent_scores) if recent_scores else 0.5
if avg_score > 0.8:
return {"curiosity": 0.2, "intensity": 0.3, "variety": 0.4}
elif avg_score < 0.3:
return {"curiosity": 0.6, "intensity": 0.8, "variety": 0.7}
else:
return {"curiosity": 0.4, "intensity": 0.5, "variety": 0.5}
def update(self, teacher_score: float, last_action: Dict):
self.experiences.append({"time": time.time(), "action": last_action, "score": teacher_score})
if len(self.experiences) > 100:
self.experiences = self.experiences[-100:]
# ==================== 火池(分类语素库:按词性+频次) ====================
class FirePool:
"""火池:分类语素库,按词性分类,每类内部分高频/低频"""
# 词性分类
POS_CATEGORIES = {
'n': 'noun', # 名词
'v': 'verb', # 动词
'a': 'adj', # 形容词
'm': 'num', # 数词
'q': 'num', # 量词
'r': 'pron', # 代词
'p': 'particle', # 介词/虚词
'c': 'particle', # 连词
'u': 'particle', # 助词
'd': 'particle', # 副词
'default': 'other'
}
def __init__(self, max_per_category: int = 1000):
self.max_per_category = max_per_category
# 按词性分类存储: {pos: {"high": [{"unit": str, "freq": int, "quality": float}], "low": [...]}}
self.pools = {
'noun': {"high": [], "low": []},
'verb': {"high": [], "low": []},
'adj': {"high": [], "low": []},
'num': {"high": [], "low": []},
'pron': {"high": [], "low": []},
'particle': {"high": [], "low": []},
'other': {"high": [], "low": []}
}
self._dirty = True
self._load()
def _get_filename(self) -> str:
return "memories/fire_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pools = data.get("pools", self.pools)
total = sum(len(pool["high"]) + len(pool["low"]) for pool in self.pools.values())
print(f" 🔥 加载火池,{total} 个语素(按词性分类)")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pools": self.pools}, f, ensure_ascii=False, indent=2)
def _get_pos_category(self, pos_flag: str) -> str:
"""将词性标签映射到分类"""
return self.POS_CATEGORIES.get(pos_flag, 'other')
def add(self, unit: str, pos: str, frequency: int = 1, quality: float = 0.5):
"""添加语素到对应词性分类"""
category = self._get_pos_category(pos)
pool = self.pools[category]
# 检查是否已存在
for section in ['high', 'low']:
for i, item in enumerate(pool[section]):
if item["unit"] == unit:
pool[section]["frequency"] += frequency
pool[section]["quality"] = max(pool[section]["quality"], quality)
self._dirty = True
self._save()
return
# 新语素,根据质量决定放高频还是低频
new_item = {"unit": unit, "frequency": frequency, "quality": quality, "time": time.time()}
if quality >= 0.7:
pool["high"].append(new_item)
else:
pool["low"].append(new_item)
self._dirty = True
self._sort_and_trim(category)
self._save()
def _sort_and_trim(self, category: str):
"""排序并裁剪到最大容量"""
pool = self.pools[category]
pool["high"].sort(key=lambda x: x["frequency"] * x["quality"], reverse=True)
pool["low"].sort(key=lambda x: x["frequency"] * x["quality"], reverse=True)
# 裁剪高频区
high_target = self.max_per_category // 2
if len(pool["high"]) > high_target:
pool["high"] = pool["high"][:high_target]
# 裁剪低频区
low_target = self.max_per_category // 2
if len(pool["low"]) > low_target:
pool["low"] = pool["low"][:low_target]
def get_by_pos(self, pos: str, high_ratio: float = 0.7, total_count: int = 5) -> List[str]:
"""从指定词性获取语素,按比例混合高频和低频"""
category = self._get_pos_category(pos)
pool = self.pools[category]
high_needed = int(total_count * high_ratio)
low_needed = total_count - high_needed
result = []
if pool["high"]:
high_items = random.sample(pool["high"], min(high_needed, len(pool["high"])))
result.extend([item["unit"] for item in high_items])
if pool["low"]:
low_items = random.sample(pool["low"], min(low_needed, len(pool["low"])))
result.extend([item["unit"] for item in low_items])
random.shuffle(result)
return result
def get_stats(self) -> dict:
return {k: {"high": len(v["high"]), "low": len(v["low"])} for k, v in self.pools.items()}
# ==================== 木池(调子库:节奏+词性序列+比例) ====================
class WoodPool:
"""木池:调子库(节奏+词性序列+高低频比例)"""
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self.pool = [] # 每个元素: {"rhythm": List[int], "pos_seq": List[List[str]], "high_ratio": float, "count": int}
self._dirty = True
self._load()
def _get_filename(self) -> str:
return "memories/wood_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pool = data.get("pool", [])
print(f" 🌳 加载木池,{len(self.pool)} 个调子")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pool": self.pool}, f, ensure_ascii=False, indent=2)
def add(self, rhythm: List[int], pos_seq: List[List[str]], high_ratio: float, quality: float = 0.5):
"""添加调子,质量分影响计数权重"""
for existing in self.pool:
if existing["rhythm"] == rhythm and existing["pos_seq"] == pos_seq:
existing["count"] += max(1, int(quality * 3))
existing["quality"] = max(existing.get("quality", 0), quality)
existing["time"] = time.time()
self._dirty = True
self._save()
return
self.pool.append({
"rhythm": rhythm,
"pos_seq": pos_seq,
"high_ratio": high_ratio,
"count": max(1, int(quality * 3)),
"quality": quality,
"time": time.time()
})
self._dirty = True
self._sort_and_trim()
self._save()
def _sort_and_trim(self):
if self._dirty:
self.pool.sort(key=lambda x: x["count"] * x.get("quality", 0.5), reverse=True)
self._dirty = False
if len(self.pool) > self.max_size:
self.pool = self.pool[:self.max_size]
def get_random(self, num: int = 1) -> List[Dict]:
"""随机获取调子(用于探索)"""
if not self.pool:
return []
self._sort_and_trim()
source = self.pool[:min(200, len(self.pool))]
return random.sample(source, min(num, len(source)))
def get_best(self, num: int = 1) -> List[Dict]:
"""获取最高频的调子"""
self._sort_and_trim()
return self.pool[:num]
def get_stats(self) -> dict:
return {"size": len(self.pool), "max_size": self.max_size}
# ==================== 水池(变体库:存储水1产生的变体句子) ====================
class WaterPool:
"""水池:变体库"""
def __init__(self, max_size: int = 2000):
self.max_size = max_size
self.pool = [] # 每个元素: {"text": str, "score": float, "rhythm": List[int], "pos_seq": List[List[str]], "high_ratio": float}
self._dirty = True
self._load()
def _get_filename(self) -> str:
return "memories/water_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pool = data.get("pool", [])
print(f" 💧 加载水池,{len(self.pool)} 条变体")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pool": self.pool}, f, ensure_ascii=False, indent=2)
def add(self, text: str, rhythm: List[int], pos_seq: List[List[str]], high_ratio: float, score: float = 0.5):
for existing in self.pool:
if existing["text"] == text:
existing["score"] = max(existing["score"], score)
existing["time"] = time.time()
self._dirty = True
self._save()
return
self.pool.append({
"text": text,
"score": score,
"rhythm": rhythm,
"pos_seq": pos_seq,
"high_ratio": high_ratio,
"time": time.time()
})
self._dirty = True
self._sort_and_trim()
self._save()
def _sort_and_trim(self):
if self._dirty:
now = time.time()
for item in self.pool:
age = now - item["time"]
freshness = max(0.3, 1.0 - age / 7200)
item["_effective_score"] = item["score"] * freshness
self.pool.sort(key=lambda x: x.get("_effective_score", x["score"]), reverse=True)
self._dirty = False
if len(self.pool) > self.max_size:
self.pool = self.pool[:self.max_size]
def get_best(self, num: int = 10, min_len: int = 10, exclude: set = None) -> List[Dict]:
self._sort_and_trim()
if exclude is None:
exclude = set()
results = []
for item in self.pool:
text = item["text"]
if min_len <= len(text) <= 200 and text not in exclude:
results.append(item)
if len(results) >= num:
break
return results
def update_score(self, text: str, score: float):
for item in self.pool:
if item["text"] == text:
item["score"] = score
self._dirty = True
break
self._save()
def clean_old(self, max_age_seconds: int = 7200):
now = time.time()
old_count = len(self.pool)
self.pool = [item for item in self.pool if now - item["time"] < max_age_seconds]
if len(self.pool) != old_count:
self._dirty = True
self._save()
def get_stats(self) -> dict:
return {"size": len(self.pool), "max_size": self.max_size}
# ==================== 金池(作品库:存储金4固化的作品) ====================
class GoldenPool:
"""金池:作品库"""
def __init__(self, max_size: int = 500):
self.max_size = max_size
self.pool = [] # 每个元素: {"text": str, "score": float}
self._load()
def _get_filename(self) -> str:
return "memories/golden_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pool = data.get("pool", [])
print(f" 💎 加载金池,{len(self.pool)} 件作品")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pool": self.pool}, f, ensure_ascii=False, indent=2)
def add(self, text: str, score: float):
for existing in self.pool:
if existing["text"] == text:
existing["score"] = max(existing["score"], score)
self._save()
return
self.pool.append({
"text": text,
"score": score,
"time": time.time()
})
self.pool.sort(key=lambda x: x["score"], reverse=True)
if len(self.pool) > self.max_size:
self.pool = self.pool[:self.max_size]
self._save()
def get_best(self, num: int = 10) -> List[str]:
self.pool.sort(key=lambda x: x["score"], reverse=True)
return [item["text"] for item in self.pool[:num]]
def get_random(self, num: int = 5) -> List[str]:
if not self.pool:
return []
return [item["text"] for item in random.sample(self.pool, min(num, len(self.pool)))]
def get_stats(self) -> dict:
return {"size": len(self.pool), "max_size": self.max_size}
# ==================== 同义词自学习模块 ====================
class SynonymLearner:
def __init__(self):
self.synonyms = {}
self.cooccurrence = Counter()
self._load()
self._init_fallback()
def _init_fallback(self):
self._fallback = {
"好": ["棒", "优", "佳", "美"],
"大": ["巨", "宏", "浩", "庞"],
"小": ["微", "细", "精", "纤"],
"是": ["乃", "即", "为", "系"],
"有": ["具", "含", "拥", "备"],
"无": ["缺", "失", "乏", "没"],
"多": ["众", "繁", "丰", "盛"],
"少": ["稀", "寡", "微", "欠"],
"美": ["丽", "艳", "秀", "雅"],
"真": ["实", "诚", "确", "正"],
}
def _get_filename(self) -> str:
return "memories/synonyms_memory.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.synonyms = data.get("synonyms", {})
self.cooccurrence = Counter(data.get("cooccurrence", {}))
print(f" 📖 加载同义词库,{len(self.synonyms)} 组")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({
"synonyms": self.synonyms,
"cooccurrence": dict(self.cooccurrence)
}, f, ensure_ascii=False, indent=2)
def learn_from_corpus(self, texts: List[str]):
for text in texts:
words = re.findall(r'[\u4e00-\u9fff]{2,4}', text)
for i, w1 in enumerate(words):
for w2 in words[i+1:i+3]:
if w1 != w2:
self.cooccurrence[f"{w1}|{w2}"] += 1
self.cooccurrence[f"{w2}|{w1}"] += 1
threshold = 3
for pair, count in self.cooccurrence.items():
if count >= threshold:
w1, w2 = pair.split('|')
if w1 not in self.synonyms:
self.synonyms[w1] = []
if w2 not in self.synonyms[w1]:
self.synonyms[w1].append(w2)
if w2 not in self.synonyms:
self.synonyms[w2] = []
if w1 not in self.synonyms[w2]:
self.synonyms[w2].append(w1)
self._save()
print(f" 📚 同义词学习完成,现有 {len(self.synonyms)} 组")
def get(self, word: str) -> List[str]:
if word in self.synonyms and self.synonyms[word]:
return self.synonyms[word]
return self._fallback.get(word, [])
# ==================== 学生基类 ====================
class Student:
def __init__(self, student_id: str, name: str):
self.id = student_id
self.name = name
self.memory = []
self.learning_memory = []
self.strategy_net = SimpleStrategyNet(student_id)
self.skill_level = 0.3
self.blocked = False
self.learning_rounds = 0
self.consecutive_passes = 0
self.is_graduated = False
self.luoshu = None
self.can_think = False
self._load_individual_memory()
def _get_memory_filename(self) -> str:
name_map = {
"火2-化": "huo2_memory",
"木3-生": "mu3_memory",
"水1-变": "shui1_memory",
"金4-成": "jin4_memory"
}
base = name_map.get(self.name, self.name.replace("-", "_"))
return f"memories/{base}.json"
def _atomic_write(self, filename: str, data: dict):
tmp_filename = filename + ".tmp"
with open(tmp_filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_filename, filename)
def _load_individual_memory(self):
filename = self._get_memory_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.memory = data.get("memory", [])
self.learning_memory = data.get("learning_memory", [])
self.skill_level = data.get("skill_level", 0.3)
self.blocked = data.get("blocked", False)
self.learning_rounds = data.get("learning_rounds", 0)
self.consecutive_passes = data.get("consecutive_passes", 0)
self.is_graduated = data.get("is_graduated", False)
if hasattr(self, "works"):
self.works = data.get("works", [])
print(f" 📂 [{self.name}] 加载记忆,技能={self.skill_level:.2f}, 阻塞={self.blocked}")
except Exception as e:
print(f" ⚠️ [{self.name}] 加载记忆失败: {e}")
def _save_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
data = {
"name": self.name,
"memory": self.memory[-200:],
"learning_memory": self.learning_memory[-50:],
"skill_level": self.skill_level,
"blocked": self.blocked,
"learning_rounds": self.learning_rounds,
"consecutive_passes": self.consecutive_passes,
"is_graduated": self.is_graduated,
}
if hasattr(self, "works"):
data["works"] = self.works
self._atomic_write(filename, data)
def _search_web(self, query: str) -> Optional[str]:
print(f" 🌐 [{self.name}] 网上搜索: {query[:50]}...")
return web_search(query)
def _search_api(self, query: str) -> Optional[str]:
print(f" 🤖 [{self.name}] API搜索: {query[:50]}...")
return call_deepseek(f"请提供关于「{query}」的中文百科信息,200字以内。只返回客观事实。", max_tokens=500, temperature=0.3)
def _get_masterpiece_from_common(self) -> Optional[str]:
if self.luoshu:
masterpieces = self.luoshu.common_memory.get("masterpieces", [])
if masterpieces:
return random.choice(masterpieces).get("text", "")
return None
def _learn_from_sources(self, topic: str):
print(f" 📚 [{self.name}] 开始学习: {topic[:50]}...")
web_result = self._search_web(topic)
time.sleep(0.3)
api_result = self._search_api(topic)
time.sleep(0.3)
masterpiece = self._get_masterpiece_from_common()
self.learning_memory.append({"time": time.time(), "topic": topic, "web": web_result[:200] if web_result else "", "api": api_result[:200] if api_result else ""})
self.skill_level = min(1.0, self.skill_level + 0.1)
self._save_individual_memory()
print(f" 📈 [{self.name}] 技能等级: {self.skill_level:.2f}")
def learn(self, topic: str):
self.learning_rounds += 1
print(f" 📚 [{self.name}] 学习轮次: {self.learning_rounds}/3")
self._learn_from_sources(topic)
def on_pass(self):
self.consecutive_passes += 1
self._save_individual_memory()
print(f" ✅ [{self.name}] 通过 ({self.consecutive_passes}/3)")
if self.consecutive_passes >= 3:
self.is_graduated = True
print(f" 🎉 [{self.name}] 毕业!")
def on_fail(self, fail_reason: str = ""):
self.consecutive_passes = 0
self.is_graduated = False
self.blocked = True
self.learning_rounds = 0
self._save_individual_memory()
print(f" ❌ [{self.name}] 不通过,进入学习模式(需学习3轮)")
def can_retry(self) -> bool:
if not self.blocked:
return True
if self.learning_rounds >= 3:
self.blocked = False
self.learning_rounds = 0
self._save_individual_memory()
print(f" 🔓 [{self.name}] 学习完成,开始重做")
return True
return False
def receive_feedback(self, teacher_score: float, teacher_comment: str = ""):
self.memory.append({"time": time.time(), "score": teacher_score, "comment": teacher_comment})
self.strategy_net.update(teacher_score, {})
if len(self.memory) > 200:
self.memory = self.memory[-200:]
self._save_individual_memory()
def _get_mimic_ratio(self) -> float:
base_ratio = 0.20
skill_penalty = self.skill_level * 0.1
return max(0.10, min(0.30, base_ratio - skill_penalty))
def _get_dao_effect(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return self.luoshu.dao_novelty
return 0.5
def _maybe_mimic(self, original_output: Dict, context: Dict) -> Dict:
mimic_ratio = self._get_mimic_ratio()
if random.random() >= mimic_ratio:
return original_output
print(f"🎭 [{self.name}] 触发模仿模式 (比例={mimic_ratio:.0%})...")
if self.luoshu is None:
return original_output
if "火2" in self.name:
masterpieces = self.luoshu.golden_pool.get_best(10) if hasattr(self.luoshu, 'golden_pool') else []
if masterpieces:
all_words = []
for text in masterpieces:
words = re.findall(r'[\u4e00-\u9fff]{2,4}', text)
all_words.extend(words)
if all_words:
word_counter = Counter(all_words)
top_words = [w for w, c in word_counter.most_common(5)]
current_units = original_output.get("units", [])
new_units = list(set(current_units + top_words))
original_output["units"] = new_units
print(f"🔥 [火2] 模仿了优秀词汇: {top_words[:3]}")
elif "木3" in self.name:
masterpieces = self.luoshu.golden_pool.get_best(10) if hasattr(self.luoshu, 'golden_pool') else []
if masterpieces:
sample = random.choice(masterpieces)
sentences = re.findall(r'[^。!?;]*[。!?;]', sample)
if sentences:
template = sentences[0]
punctuation = re.findall(r'[,。!?;、]', template)
if punctuation:
sentence = original_output.get("sentence", "")
if sentence:
connectors = ["因为", "所以", "但是", "然而", "于是", "因此"]
prefix = random.choice(connectors)
original_output["sentence"] = f"{prefix}{sentence}"
print(f"🌳 [木3] 模仿了句式逻辑: {prefix}...")
else:
sentence = original_output.get("sentence", "")
if sentence:
original_output["sentence"] = f"{sentence}。"
print(f"🌳 [木3] 模仿了句号结尾")
elif "水1" in self.name:
sentence = original_output.get("sentence", "")
if sentence and len(sentence) > 5:
techniques = [
("加修辞前缀", lambda s: f"堪称完美的{s}" if len(s) < 30 else f"令人惊叹的{s}"),
("加反问语气", lambda s: f"难道{s}不是真理吗?"),
("加排比结构", lambda s: f"不仅{s},而且{s},更是{s}"[:100]),
("加夸张表达", lambda s: f"毫无疑问,{s}"),
("加古风表达", lambda s: f"夫{s},诚然也。"),
]
tech_name, tech_func = random.choice(techniques)
new_sentence = tech_func(sentence)
original_output["sentence"] = new_sentence
print(f"💧 [水1] 模仿了修辞: {tech_name}")
elif "金4" in self.name:
masterpieces = self.luoshu.golden_pool.get_best(10) if hasattr(self.luoshu, 'golden_pool') else []
if masterpieces:
best = max(masterpieces, key=len)
if best and len(best) > 10:
new_text = best
if self.luoshu and hasattr(self.luoshu, 'synonym_learner'):
words = re.findall(r'[\u4e00-\u9fff]{2,4}', new_text)
for word in words:
synonyms = self.luoshu.synonym_learner.get(word)
if synonyms and random.random() < 0.3:
new_text = new_text.replace(word, random.choice(synonyms), 1)
if new_text and new_text[-1] in "。!?":
new_text = new_text[:-1] + random.choice("。!?")
original_output["final"] = new_text
print(f"🏆 [金4] 模仿了满分作文并微调")
else:
templates = [
"天地之间,万物演化,河图洛书,揭示其理。",
"智能之形,源于结构,涌现于交互,成就于演化。",
"阴阳相生,五行相克,八卦相荡,万物相成。",
]
original_output["final"] = random.choice(templates)
print(f"🏆 [金4] 模仿了预设模板")
return original_output
def _try_think(self, context: Dict) -> Optional[Dict]:
return None
def execute(self, context: Dict = None) -> Dict:
if context is None:
context = {}
thinking_result = self._try_think(context)
if thinking_result and thinking_result.get("valid"):
print(f" 🧠 [{self.name}] 思考成功")
return thinking_result["output"]
print(f" ⚙️ [{self.name}] 使用规则")
output = self._rule_based_execute(context)
final_output = self._maybe_mimic(output, context)
return final_output
def _rule_based_execute(self, context: Dict) -> Dict:
raise NotImplementedError
# ==================== 老师基类 ====================
class Teacher:
def __init__(self, teacher_id: str, student_name: str):
self.id = teacher_id
self.student_name = student_name
self.records = []
self.pass_threshold = 0.6
self.evolution_advice = ""
self.rule_description = self._get_default_rule()
self.luoshu = None
self.api_fail_count = 0
self.use_fallback = False
self._load_individual_memory()
def _get_memory_filename(self) -> str:
return f"memories/teacher_{self.id}_memory.json"
def _get_standard_filename(self) -> str:
return f"standards/teacher_{self.id}_standard.json"
def _atomic_write(self, filename: str, data: dict):
tmp_filename = filename + ".tmp"
with open(tmp_filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_filename, filename)
def _load_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.records = data.get("records", [])
self.pass_threshold = data.get("pass_threshold", 0.6)
self.rule_description = data.get("rule_description", self.rule_description)
print(f" 📂 老师{self.id} 加载记忆,{len(self.records)} 条记录")
except Exception as e:
print(f" ⚠️ 老师{self.id} 加载记忆失败: {e}")
def _save_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
data = {
"teacher_id": self.id,
"student": self.student_name,
"records": self.records[-500:],
"pass_threshold": self.pass_threshold,
"rule_description": self.rule_description,
}
self._atomic_write(filename, data)
def _get_default_rule(self) -> str:
rules = {
"7": "检查语素:是否纯中文、是否有实际语义、是否合法",
"8": "检查句子:长度大于10且含标点为通顺,长度大于5为合适,否则太短。同时关注意境和文采。",
"6": "检查变体:生成2个以上不同变体为有效,1个为一般,0个为无效",
"9": "检查作品:长度大于10且含中文为合格。优先看句子是否有完整语义和逻辑连贯性。"
}
return rules.get(self.id, "根据学生输出判断是否通过")
def _save_standard(self, current_round: int):
os.makedirs("standards", exist_ok=True)
filename = self._get_standard_filename()
data = {
"teacher_id": self.id,
"student": self.student_name,
"timestamp": datetime.now().isoformat(),
"round": current_round,
"rule_description": self.rule_description,
"pass_threshold": self.pass_threshold,
"evolution_advice": self.evolution_advice,
"recent_pass_rate": self._get_recent_pass_rate(),
"use_fallback": self.use_fallback
}
history = []
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
history = json.load(f)
if isinstance(history, dict):
history = [history]
except:
pass
history.append(data)
if len(history) > 20:
history = history[-20:]
tmp_filename = filename + ".tmp"
with open(tmp_filename, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
os.replace(tmp_filename, filename)
txt_filename = f"standards/teacher_{self.id}_standard.txt"
with open(txt_filename, 'w', encoding='utf-8') as f:
f.write(f"老师{self.id}({self.student_name})评判标准\n")
f.write(f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"当前轮数: {current_round}\n")
f.write(f"通过阈值: {self.pass_threshold}\n")
f.write(f"最近通过率: {self._get_recent_pass_rate():.1%}\n")
f.write(f"降级模式: {'是' if self.use_fallback else '否'}\n")
f.write("-" * 40 + "\n")
f.write(f"评判规则:\n{self.rule_description}\n")
f.write("-" * 40 + "\n")
f.write(f"进化建议:\n{self.evolution_advice}\n")
def _get_recent_pass_rate(self) -> float:
if len(self.records) < 20:
return 0.5
recent = self.records[-50:]
return sum(1 for r in recent if r.get("passed", False)) / len(recent)
def _get_dao_effect(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return (self.luoshu.dao_novelty - 0.5) * 0.2
return 0.0
def evaluate(self, student_output: Dict) -> tuple:
score = 0.5
comment = ""
dao_effect = self._get_dao_effect()
# 老师7:批改语素
if self.id == "7":
unit = student_output.get("unit", "")
if not unit:
score = 0.0 + dao_effect
comment = "无语素"
elif not re.match(r'^[\u4e00-\u9fff]+$', unit):
score = 0.2 + dao_effect
comment = "包含非中文字符"
elif len(unit) > 4:
score = 0.5 + dao_effect
comment = "语素过长"
else:
# 基本合格
score = 0.7 + dao_effect
comment = "合格语素"
# 如果有jieba,进一步判断词性
if JIEBA_AVAILABLE and len(unit) >= 2:
words = list(pseg.cut(unit))
if words and words[0].flag:
comment = f"合格语素,词性{words[0].flag}"
# 老师8:批改句子
elif self.id == "8":
sentence = student_output.get("sentence", "")
if len(sentence) > 15 and any(p in sentence for p in "。!?;"):
score = 0.9 + dao_effect
comment = "句子通顺"
elif len(sentence) > 8:
score = 0.7 + dao_effect
comment = "句子长度合适"
else:
score = 0.3 + dao_effect
comment = "句子太短"
# 老师6:批改变体
elif self.id == "6":
variants = student_output.get("variants", [])
unique_variants = len(set(variants))
if unique_variants >= 2:
score = 0.8 + dao_effect
comment = f"生成{unique_variants}个有效变体"
elif unique_variants == 1:
score = 0.5 + dao_effect
comment = "只生成1个变体"
else:
score = 0.3 + dao_effect
comment = "生成0个变体"
# 老师9:批改作品
elif self.id == "9":
final = student_output.get("final", "")
if self.use_fallback:
if final and len(final) > 10:
score = 0.85 + dao_effect
comment = "作品合格(降级模式)"
else:
score = 0.2 + dao_effect
comment = "无作品"
else:
if final and len(final) > 10:
if any(kw in final for kw in ["故", "然", "所以", "因此", "则"]):
score = 0.95 + dao_effect
comment = "作品合格且有逻辑"
else:
score = 0.85 + dao_effect
comment = "作品合格"
else:
score = 0.2 + dao_effect
comment = "无作品"
score = max(0.0, min(1.0, score))
passed = score >= 0.6
self.records.append({
"time": time.time(),
"student_output": str(student_output)[:100],
"score": score,
"passed": passed,
"comment": comment
})
if len(self.records) > 500:
self.records = self.records[-500:]
self._save_individual_memory()
return passed, score, comment
def self_evolve(self, current_round: int):
if self.use_fallback:
print(f" ⏸️ 老师{self.id} 处于降级模式,跳过进化")
return
if len(self.records) < 10:
print(f" ⏭️ 老师{self.id} 记录不足10条,跳过进化")
return
recent_records = self.records[-30:]
pass_rate = sum(1 for r in recent_records if r.get("passed", False)) / len(recent_records) if recent_records else 0.5
passed_examples = [r.get("student_output", "")[:80] for r in recent_records if r.get("passed", False)][-2:]
failed_examples = [r.get("student_output", "")[:80] for r in recent_records if not r.get("passed", False)][-2:]
prompt = f"""你是老师{self.id},负责评判{self.student_name}。
当前规则:{self.rule_description}
最近30次评判通过率:{pass_rate:.1%}
通过示例:{passed_examples}
未通过示例:{failed_examples}
请给出改进建议。不要只看长度和数量,要关注意境的连贯性、逻辑的完整性、表达的自然度。一句话即可。
"""
print(f" 📖 老师{self.id} 开始自我进化...")
response = call_deepseek(prompt, max_tokens=200, temperature=0.6)
if response:
self.api_fail_count = 0
self.evolution_advice = response
self.rule_description = response[:300]
print(f" ✅ 老师{self.id} 进化完成: {self.rule_description[:80]}...")
else:
self.api_fail_count += 1
print(f" ⚠️ 老师{self.id} API无返回,连续失败 {self.api_fail_count} 次")
if self.api_fail_count >= 3:
self.use_fallback = True
print(f" 🔻 老师{self.id} 进入降级模式")
self._save_standard(current_round)
self._save_individual_memory()
def try_recover(self):
if not self.use_fallback:
return
test_response = call_deepseek("回复'OK'", max_tokens=5, temperature=0)
if test_response:
self.use_fallback = False
self.api_fail_count = 0
print(f" 🔺 老师{self.id} 已恢复,退出降级模式")
return True
return False
# ==================== 语料读取器 ====================
class CorpusReader:
def __init__(self, target_dirs: List[str]):
self.target_dirs = target_dirs
self.total_files = 0
self.scanned_count = 0
self.has_scanned_all_files = False
self.all_chars = []
self.all_text = ""
self.sentence_pool = []
self._build_pools()
def _read_file_with_fallback(self, file_path: str, max_chars: int = 8000) -> Optional[str]:
encodings = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5', 'latin-1']
for enc in encodings:
try:
with open(file_path, 'r', encoding=enc) as f:
return f.read(max_chars)
except:
continue
return None
def _build_pools(self):
print("📚 正在构建语料精华池...")
file_paths = []
for base_dir in self.target_dirs:
if not os.path.exists(base_dir):
continue
for root, dirs, files in os.walk(base_dir):
for file in files:
if file.endswith(('.txt', '.md', '.json', '.csv')):
file_paths.append(os.path.join(root, file))
self.total_files = len(file_paths)
print(f" 发现 {self.total_files} 个文件")
random.shuffle(file_paths)
all_text_parts = []
for idx, file_path in enumerate(file_paths):
content = self._read_file_with_fallback(file_path, 8000)
if content is None:
continue
try:
self.all_chars.extend([ch for ch in content if ch.isprintable()])
all_text_parts.append(content)
# 按句号分割句子
sentences = re.split(r'[。!?;]+', content)
for s in sentences:
s = s.strip()
if 5 < len(s) < 100 and re.search(r'[\u4e00-\u9fff]', s):
self.sentence_pool.append(s)
self.scanned_count += 1
if self.scanned_count % 100 == 0:
print(f" 扫描进度: {self.scanned_count}/{self.total_files}")
if len(self.all_chars) > 100000:
self.all_chars = self.all_chars[:100000]
if len(self.sentence_pool) > 5000:
self.sentence_pool = self.sentence_pool[:5000]
except Exception as e:
print(f" ⚠️ 处理失败 {file_path}: {e}")
self.all_text = ''.join(all_text_parts)
self.has_scanned_all_files = True
self.sentence_pool = list(set(self.sentence_pool))
print(f" ✅ 精华池构建完成!")
print(f" 已扫描文件: {self.scanned_count}/{self.total_files}")
print(f" 字符量: {len(self.all_chars)}")
print(f" 句子量: {len(self.sentence_pool)}")
def get_char_frequency(self, max_chars: int = 50000) -> Counter:
return Counter(self.all_chars[:max_chars])
def get_sample_sentences(self, num_sentences: int = 200) -> List[str]:
if not self.sentence_pool:
return []
random.shuffle(self.sentence_pool)
return self.sentence_pool[:num_sentences]
def get_all_text(self) -> str:
return self.all_text
def get_total_text_size(self) -> int:
total = 0
for base_dir in self.target_dirs:
if not os.path.exists(base_dir):
continue
for root, dirs, files in os.walk(base_dir):
for file in files:
if file.endswith(('.txt', '.md', '.json', '.csv')):
try:
total += os.path.getsize(os.path.join(root, file))
except:
pass
return total
# ==================== 火2:提取语素,老师7批改,分类存入火池 ====================
class Huo2(Student):
"""火2:拆迁队 - 从语料中提取语素,老师7批改后分类入库"""
def __init__(self, corpus_reader: CorpusReader):
super().__init__("2", "火2-化")
self.corpus_reader = corpus_reader
self.teacher7 = None
def _get_dao_curiosity(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return self.luoshu.dao_novelty
return 0.5
def _get_pos(self, word: str) -> str:
"""获取词性(优先使用jieba,否则简单推断)"""
if JIEBA_AVAILABLE and len(word) >= 2:
words = list(pseg.cut(word))
if words:
return words[0].flag
# 简单推断
if re.match(r'^[一二三四五六七八九十百千万亿]+$', word):
return 'm' # 数词
if len(word) == 1:
return 'n' # 默认名词
return 'n' # 默认名词
def _extract_ngrams_with_pos(self, text: str, max_len: int = 4) -> List[Tuple[str, str, int]]:
"""提取1-4字ngram,返回 (词, 词性, 频次)"""
counter = Counter()
text_len = len(text)
# 1字
for ch in text:
if '\u4e00' <= ch <= '\u9fff':
counter[ch] += 1
# 2-4字
for length in range(2, max_len + 1):
for i in range(text_len - length + 1):
ngram = text[i:i+length]
if re.match(r'^[\u4e00-\u9fff]+$', ngram):
counter[ngram] += 1
# 转换为带词性的列表
result = []
for word, freq in counter.items():
pos = self._get_pos(word)
result.append((word, pos, freq))
return result
def _rebuild_fire_pool(self):
"""重建火池:从语料中提取语素,老师7批改后分类入库"""
print(f" 🔥 [{self.name}] 正在重建火池(提取1-4字语素,老师7批改)...")
all_text = self.corpus_reader.get_all_text()
if not all_text:
print(f" ⚠️ [{self.name}] 语料为空,无法重建火池")
return
# 提取ngram带词性
ngrams = self._extract_ngrams_with_pos(all_text)
# 清空火池
if self.luoshu:
for category in self.luoshu.fire_pool.pools:
self.luoshu.fire_pool.pools[category]["high"] = []
self.luoshu.fire_pool.pools[category]["low"] = []
# 老师7批改并入库
for word, pos, freq in ngrams:
# 调用老师7批改
if self.teacher7:
passed, score, comment = self.teacher7.evaluate({"unit": word})
if passed and score >= 0.4:
self.luoshu.fire_pool.add(word, pos, frequency=freq, quality=score)
else:
# 没有老师7时简单过滤
if re.match(r'^[\u4e00-\u9fff]+$', word) and len(word) <= 4:
self.luoshu.fire_pool.add(word, pos, frequency=freq, quality=0.5)
stats = self.luoshu.fire_pool.get_stats()
total = sum(v["high"] + v["low"] for v in stats.values())
print(f" ✅ [{self.name}] 火池重建完成,共 {total} 个语素(按词性分类)")
def _rule_based_execute(self, context: Dict) -> Dict:
# 火2的任务:重建火池
self._rebuild_fire_pool()
strategy = self.strategy_net.predict(self.memory[-10:])
dao_curiosity = self._get_dao_curiosity()
curiosity = strategy["curiosity"] * (1.2 - self.skill_level) * (0.5 + dao_curiosity)
return {"units": ["火池已重建"], "curiosity": curiosity, "count": sum(len(v["high"]) + len(v["low"]) for v in self.luoshu.fire_pool.pools.values()) if self.luoshu else 0}
# ==================== 木3:从火池取语素,从木池取调子,生成句子,老师8批改 ====================
class Mu3(Student):
"""木3:建设队 - 按调子造句,老师8批改"""
def __init__(self, corpus_reader: CorpusReader):
super().__init__("3", "木3-生")
self.corpus_reader = corpus_reader
self.teacher8 = None
def _get_dao_length(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return self.luoshu.dao_novelty
return 0.5
def _extract_rhythm_from_sentence(self, sentence: str) -> Tuple[List[int], List[List[str]], float]:
"""从句子中提取调子:节奏数组、词性序列、高频比例"""
if not sentence or not self.luoshu:
return [], [], 0.7
# 按标点分割节奏
parts = re.split(r'[,,、]', sentence)
rhythm = []
pos_seq = []
for part in parts:
part = part.strip()
chinese_chars = re.findall(r'[\u4e00-\u9fff]', part)
if chinese_chars:
rhythm.append(len(chinese_chars))
# 简单获取词性(取第一个字的词性作为整段词性)
if JIEBA_AVAILABLE and part:
words = list(pseg.cut(part[:2]))
if words:
pos_seq.append([words[0].flag])
else:
pos_seq.append(['n'])
else:
pos_seq.append(['n'])
if not rhythm:
rhythm = [len(sentence)]
pos_seq = [['n']]
# 统计高低频比例(简化版)
high_ratio = 0.7
if self.luoshu.fire_pool:
# 计算句子中高频词比例
high_count = 0
total_count = 0
for ch in sentence:
if '\u4e00' <= ch <= '\u9fff':
total_count += 1
# 检查是否高频(简化:使用默认)
high_count += 1
if total_count > 0:
high_ratio = high_count / total_count
return rhythm, pos_seq, high_ratio
def _compose_sentence(self, rhythm: List[int], pos_seq: List[List[str]], high_ratio: float, dao_length: float) -> str:
"""根据调子合成句子"""
if not self.luoshu or not rhythm:
return ""
parts = []
for i, part_len in enumerate(rhythm):
# 获取该段的词性要求
pos_list = pos_seq if i < len(pos_seq) else ['n']
target_pos = random.choice(pos_list) if pos_list else 'n'
# 从火池按词性取语素
units = self.luoshu.fire_pool.get_by_pos(target_pos, high_ratio, part_len)
if not units:
# 如果取不到,从名词池取
units = self.luoshu.fire_pool.get_by_pos('n', high_ratio, part_len)
if not units:
# 还是取不到,用默认字符
units = ['?'] * part_len
part = ''.join(units[:part_len])
parts.append(part)
if not parts:
return ""
# 连接
punctuations = [",", ",", ",", "。", "!", "?"]
result = ""
for i, part in enumerate(parts):
result += part
if i < len(parts) - 1:
result += random.choice(punctuations[:3])
else:
result += random.choice(punctuations[3:])
# 道扰动
if dao_length > 0.7 and len(result) > 10:
result += random.choice(["!", "?", "……"])
elif dao_length < 0.3 and len(result) > 20:
result = result[:random.randint(10, len(result)-5)] + result[-1]
return result
def _build_wood_pool_from_corpus(self):
"""从语料中提取调子,存入木池"""
print(f" 🌳 [{self.name}] 正在从语料中提取调子...")
sentences = self.corpus_reader.get_sample_sentences(200)
count = 0
for sentence in sentences:
rhythm, pos_seq, high_ratio = self._extract_rhythm_from_sentence(sentence)
if rhythm:
self.luoshu.wood_pool.add(rhythm, pos_seq, high_ratio, quality=0.6)
count += 1
print(f" ✅ [{self.name}] 调子库构建完成,共 {len(self.luoshu.wood_pool.pool)} 个调子")
def _rule_based_execute(self, context: Dict) -> Dict:
if not self.luoshu:
return {"sentence": "", "length": 0}
dao_length = self._get_dao_length()
# 如果木池为空,先从语料中提取调子
if len(self.luoshu.wood_pool.pool) < 10:
self._build_wood_pool_from_corpus()
# 从木池获取调子
rhythms = self.luoshu.wood_pool.get_random(3)
if not rhythms:
# 使用默认调子
rhythms = [{"rhythm": [4, 6], "pos_seq": [['n'], ['v', 'n']], "high_ratio": 0.7}]
selected = random.choice(rhythms)
rhythm = selected["rhythm"]
pos_seq = selected.get("pos_seq", [['n']] * len(rhythm))
high_ratio = selected.get("high_ratio", 0.7)
# 生成句子
sentence = self._compose_sentence(rhythm, pos_seq, high_ratio, dao_length)
# 老师8批改
if self.teacher8:
passed, score, comment = self.teacher8.evaluate({"sentence": sentence})
if passed and score >= 0.6:
# 调子入库(计数+1)
self.luoshu.wood_pool.add(rhythm, pos_seq, high_ratio, quality=score)
else:
# 不合格,不入库
sentence = ""
return {"sentence": sentence, "length": len(sentence)}
# ==================== 水1:杂交调子,生成变体,老师6批改 ====================
class Shui1(Student):
"""水1:创新队 - 杂交调子,生成变体,老师6批改"""
def __init__(self, student_id: str, name: str):
super().__init__(student_id, name)
self.teacher6 = None
def _get_dao_variety(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return self.luoshu.dao_novelty
return 0.5
def _hybrid_rhythm(self, rhythm1: List[int], pos_seq1: List[List[str]],
rhythm2: List[int], pos_seq2: List[List[str]],
dao_variety: float) -> Tuple[List[int], List[List[str]]]:
"""杂交两个调子,产生新调子(必须比原调子长)"""
method = random.choice(["concat", "interleave", "extend"])
if method == "concat":
new_rhythm = rhythm1 + rhythm2
new_pos_seq = pos_seq1 + pos_seq2
elif method == "interleave":
new_rhythm = []
new_pos_seq = []
for a, pa in zip(rhythm1, pos_seq1):
new_rhythm.append(a)
new_pos_seq.append(pa)
for b, pb in zip(rhythm2, pos_seq2):
new_rhythm.append(b)
new_pos_seq.append(pb)
new_rhythm.extend(rhythm1[len(rhythm2):])
new_pos_seq.extend(pos_seq1[len(pos_seq2):])
new_rhythm.extend(rhythm2[len(rhythm1):])
new_pos_seq.extend(pos_seq2[len(pos_seq1):])
else:
# 扩展
new_rhythm = rhythm1 + [random.randint(3, 8)] + rhythm2
new_pos_seq = pos_seq1 + [['n']] + pos_seq2
# 道扰动:可能再加长
if dao_variety > 0.6:
new_rhythm.append(random.randint(3, 8))
new_pos_seq.append(['n'])
# 确保比两个原调子都长
min_len = max(len(rhythm1), len(rhythm2))
while len(new_rhythm) <= min_len:
new_rhythm.append(random.randint(3, 8))
new_pos_seq.append(['n'])
return new_rhythm, new_pos_seq
def _compose_sentence(self, rhythm: List[int], pos_seq: List[List[str]], high_ratio: float, dao_length: float) -> str:
"""根据调子合成句子(复用木3的逻辑)"""
if not self.luoshu or not rhythm:
return ""
parts = []
for i, part_len in enumerate(rhythm):
pos_list = pos_seq if i < len(pos_seq) else ['n']
target_pos = random.choice(pos_list) if pos_list else 'n'
units = self.luoshu.fire_pool.get_by_pos(target_pos, high_ratio, part_len)
if not units:
units = self.luoshu.fire_pool.get_by_pos('n', high_ratio, part_len)
if not units:
units = ['?'] * part_len
part = ''.join(units[:part_len])
parts.append(part)
if not parts:
return ""
punctuations = [",", ",", ",", "。", "!", "?"]
result = ""
for i, part in enumerate(parts):
result += part
if i < len(parts) - 1:
result += random.choice(punctuations[:3])
else:
result += random.choice(punctuations[3:])
if dao_length > 0.7 and len(result) > 10:
result += random.choice(["!", "?", "……"])
return result
def _rule_based_execute(self, context: Dict) -> Dict:
if not self.luoshu:
return {"variants": [], "original": ""}
dao_variety = self._get_dao_variety()
variants = []
# 从木池获取调子(用于杂交)
rhythms = self.luoshu.wood_pool.get_random(5)
if len(rhythms) < 2:
return {"variants": [], "original": ""}
# 杂交产生新调子
for i in range(min(3, len(rhythms) - 1)):
r1 = rhythms
r2 = rhythms[i+1]
new_rhythm, new_pos_seq = self._hybrid_rhythm(
r1["rhythm"], r1.get("pos_seq", [['n']] * len(r1["rhythm"])),
r2["rhythm"], r2.get("pos_seq", [['n']] * len(r2["rhythm"])),
dao_variety
)
new_high_ratio = (r1.get("high_ratio", 0.7) + r2.get("high_ratio", 0.7)) / 2
# 用新调子生成句子
sentence = self._compose_sentence(new_rhythm, new_pos_seq, new_high_ratio, dao_variety)
if sentence and len(sentence) > 10:
variants.append(sentence)
# 老师6批改
if self.teacher6:
passed, score, comment = self.teacher6.evaluate({"variants": [sentence]})
if passed and score >= 0.6:
self.luoshu.water_pool.add(sentence, new_rhythm, new_pos_seq, new_high_ratio, score=score)
else:
self.luoshu.water_pool.add(sentence, new_rhythm, new_pos_seq, new_high_ratio, score=0.5)
# 去重
variants = list(set(variants))
return {"variants": variants[:5], "original": ""}
# ==================== 金4:从水池取变体,老师9批改后择优固化 ====================
class Jin4(Student):
"""金4:验收队 - 筛选固化作品,老师9批改"""
def __init__(self):
super().__init__("4", "金4-成")
self.works = []
self.teacher9 = None
def _get_golden_subdir(self) -> str:
subdir_num = (self.luoshu.system.round // 1000) if self.luoshu and hasattr(self.luoshu, 'system') else 0
return f"golden_works/round_{subdir_num*1000}_{(subdir_num+1)*1000-1}"
def _save_golden_work(self, work: str):
golden_base = "golden_works"
subdir = self._get_golden_subdir()
golden_dir = os.path.join(golden_base, subdir)
os.makedirs(golden_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_title = re.sub(r'[\\/*?:"<>|\n\r\t]', '', work[:30].strip())
safe_title = re.sub(r'\s+', '_', safe_title)
if not safe_title:
safe_title = "unnamed"
filename = f"{golden_dir}/{timestamp}_{safe_title}.txt"
tmp_filename = filename + ".tmp"
try:
with open(tmp_filename, 'w', encoding='utf-8') as f:
f.write(f"# 固化时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"# 作品长度: {len(work)} 字符\n")
f.write("="*60 + "\n\n")
f.write(work)
f.write("\n\n" + "="*60 + "\n")
os.replace(tmp_filename, filename)
print(f" 💎 金作品已保存: {filename}")
except Exception as e:
print(f" ⚠️ 保存金作品失败: {e}")
def _get_dao_threshold(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return (self.luoshu.dao_novelty - 0.5) * 0.2
return 0.0
def _rule_based_execute(self, context: Dict) -> Dict:
teacher9_threshold = context.get("teacher9_threshold", 0.6)
if not self.luoshu:
return {"final": "", "works_count": len(self.works)}
# 从水池捞取候选
candidates = self.luoshu.water_pool.get_best(num=20, min_len=10, exclude=set(self.works))
if not candidates:
return {"final": "", "works_count": len(self.works)}
dao_effect = self._get_dao_threshold()
strategy = self.strategy_net.predict(self.memory[-10:])
dynamic_threshold = teacher9_threshold + strategy["intensity"] * 0.1 - self.skill_level * 0.1 + dao_effect
dynamic_threshold = max(0.3, min(0.95, dynamic_threshold))
# 取最高分候选
best_item = candidates[0]
best = best_item["text"]
# 老师9批改
if self.teacher9:
passed, score, comment = self.teacher9.evaluate({"final": best})
if passed and score >= 0.7:
self.works.append(best)
self._save_golden_work(best)
if self.luoshu:
self.luoshu.add_masterpiece(best, "金4")
self.luoshu.golden_pool.add(best, score=score)
print(f" ✅ 固化作品: {best[:50]}... (阈值={dynamic_threshold:.2f}, 分={score:.2f})")
return {"final": best, "works_count": len(self.works)}
else:
print(f" ⏸️ 作品不合格 (分={score:.2f})")
return {"final": "", "works_count": len(self.works)}
# 没有老师9时用随机阈值
if random.random() > dynamic_threshold:
self.works.append(best)
self._save_golden_work(best)
if self.luoshu:
self.luoshu.add_masterpiece(best, "金4")
self.luoshu.golden_pool.add(best, score=0.8)
print(f" ✅ 固化作品: {best[:50]}... (阈值={dynamic_threshold:.2f})")
return {"final": best, "works_count": len(self.works)}
print(f" ⏸️ 未固化新作品 (阈值={dynamic_threshold:.2f})")
return {"final": "", "works_count": len(self.works)}
# ==================== 洛书中心 ====================
class LuoShuCenter:
def __init__(self):
self.common_memory = {"masterpieces": [], "stats": {}, "stage": "小学", "weights": {"火2": 1.0, "木3": 1.0, "水1": 1.0, "金4": 1.0}}
self.evolution_advice = ""
self.quality_rule = "长度大于10且含中文,优先看语义连贯性"
self.memory = []
self.stage_start_round = 0
self.min_rounds_per_stage = 500
self.synonym_learner = SynonymLearner()
# 四大池子
self.fire_pool = FirePool(max_per_category=1000) # 分类语素库
self.wood_pool = WoodPool(max_size=1000) # 调子库
self.water_pool = WaterPool(max_size=2000) # 变体库
self.golden_pool = GoldenPool(max_size=500) # 作品库
# 道的圆周率引擎
self.dao = DaoPi(chunk_size=5000)
self.dao_novelty = 0.5
self._load_individual_memory()
self._load_standard()
def _atomic_write(self, filename: str, data: dict):
tmp_filename = filename + ".tmp"
with open(tmp_filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_filename, filename)
def _get_memory_filename(self) -> str:
return "memories/luoshu_center_memory.json"
def _get_standard_filename(self) -> str:
return "standards/luoshu_center_standard.json"
def _load_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.memory = data.get("memory", [])
self.common_memory = data.get("common_memory", self.common_memory)
print(f"📂 洛书中心加载记忆,{len(self.memory)} 条记录")
except Exception as e:
print(f"⚠️ 洛书中心加载记忆失败: {e}")
def _save_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
data = {
"memory": self.memory[-200:],
"common_memory": self.common_memory,
}
self._atomic_write(filename, data)
def _load_standard(self):
filename = self._get_standard_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list) and len(data) > 0:
latest = data[-1]
self.quality_rule = latest.get("quality_rule", self.quality_rule)
self.common_memory["stage"] = latest.get("stage", "小学")
self.common_memory["weights"] = latest.get("weights", self.common_memory["weights"])
self.stage_start_round = latest.get("stage_start_round", 0)
print(f"📊 洛书中心加载标准: {self.quality_rule[:50]}...")
except Exception as e:
print(f"⚠️ 洛书中心加载标准失败: {e}")
def _save_standard(self, system):
os.makedirs("standards", exist_ok=True)
filename = self._get_standard_filename()
data = {
"timestamp": datetime.now().isoformat(),
"round": system.round,
"stage": self.common_memory["stage"],
"weights": self.common_memory["weights"],
"quality_rule": self.quality_rule,
"evolution_advice": self.evolution_advice,
"stats": self.common_memory["stats"],
"stage_start_round": self.stage_start_round,
"min_rounds_per_stage": self.min_rounds_per_stage,
"dao_pointer": self.dao.get_pointer()
}
history = []
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
history = json.load(f)
if isinstance(history, dict):
history = [history]
except:
pass
history.append(data)
if len(history) > 20:
history = history[-20:]
tmp_filename = filename + ".tmp"
with open(tmp_filename, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
os.replace(tmp_filename, filename)
txt_filename = f"standards/luoshu_center_standard.txt"
with open(txt_filename, 'w', encoding='utf-8') as f:
f.write(f"洛书中心(校长)调控标准\n")
f.write(f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"当前轮数: {system.round}\n")
f.write(f"当前阶段: {self.common_memory['stage']}\n")
f.write(f"阶段开始轮数: {self.stage_start_round}\n")
f.write(f"各部权重: {self.common_memory['weights']}\n")
f.write(f"道指针位置: {self.dao.get_pointer()}\n")
f.write("-" * 40 + "\n")
f.write(f"作品质量规则:\n{self.quality_rule}\n")
f.write("-" * 40 + "\n")
f.write(f"进化建议:\n{self.evolution_advice}\n")
def report(self, student_id: str, score: float, passed: bool):
if student_id not in self.common_memory["stats"]:
self.common_memory["stats"][student_id] = {"scores": [], "pass_count": 0, "total_count": 0}
self.common_memory["stats"][student_id]["scores"].append(score)
self.common_memory["stats"][student_id]["total_count"] += 1
if passed:
self.common_memory["stats"][student_id]["pass_count"] += 1
if len(self.common_memory["stats"][student_id]["scores"]) > 100:
self.common_memory["stats"][student_id]["scores"] = self.common_memory["stats"][student_id]["scores"][-100:]
self._save_individual_memory()
def add_masterpiece(self, text: str, source: str):
for existing in self.common_memory["masterpieces"][-50:]:
if existing.get("text", "") == text:
return
self.common_memory["masterpieces"].append({"text": text, "source": source, "time": time.time()})
if len(self.common_memory["masterpieces"]) > 100:
self.common_memory["masterpieces"] = self.common_memory["masterpieces"][-100:]
self._save_individual_memory()
def evaluate_works_quality(self, works: List[str]) -> float:
if not works:
return 5.0
works_text = "\n".join([f"{i+1}. {w[:100]}" for i, w in enumerate(works[-5:])])
prompt = f"评价以下作品质量,0-10分,10分最高。优先看语义连贯性和逻辑完整性。只输出平均分。\n{works_text}"
response = call_deepseek(prompt, max_tokens=50, temperature=0.3)
if response:
try:
score = float(re.search(r'(\d+(?:\.\d+)?)', response).group(1))
return min(10, max(0, score))
except:
pass
return 5.0
def self_evolve(self, system):
self.dao_novelty = self.dao.novelty(6)
recent_works = system.jin4.works[-10:] if system.jin4.works else []
work_quality = self.evaluate_works_quality(recent_works)
adjusted_quality = work_quality * (0.8 + self.dao_novelty * 0.4)
self.quality_rule = f"基于第{system.round}轮金4作品质量({adjusted_quality:.1f}/10)自动调整,道新奇度:{self.dao_novelty:.2f}"
self.evolution_advice = f"洛书中心根据金4作品质量({adjusted_quality:.1f}/10)及道信号({self.dao_novelty:.2f})动态调整"
self._save_standard(system)
self._save_individual_memory()
print(f" 📊 洛书中心标准已保存 (质量={adjusted_quality:.1f}, 道新奇={self.dao_novelty:.2f}, 道已算{self.dao.get_pointer()}位)")
if system.round % 500 == 0 and system.round > 0:
texts = [mp.get("text", "") for mp in self.common_memory["masterpieces"][-50:]]
if texts:
self.synonym_learner.learn_from_corpus(texts)
print(f" 📚 同义词库已更新,共 {len(self.synonym_learner.synonyms)} 组")
if system.round % 100 == 0:
self.water_pool.clean_old(max_age_seconds=7200)
def _check_corpus_requirement(self, system, current_stage: str) -> bool:
if current_stage == "小学":
target_size = int(system.corpus_reader.get_total_text_size() * 0.9)
grown_size = system.get_crawler_size()
return system.corpus_reader.has_scanned_all_files and grown_size >= target_size
elif current_stage == "中学":
target_size = system.corpus_reader.get_total_text_size() * 3
grown_size = system.get_crawler_size()
return grown_size >= target_size
elif current_stage == "大学":
target_size = system.corpus_reader.get_total_text_size() * 9
grown_size = system.get_crawler_size()
return grown_size >= target_size
return False
def get_pool_stats(self) -> dict:
return {
"fire_pool": self.fire_pool.get_stats(),
"wood_pool": self.wood_pool.get_stats(),
"water_pool": self.water_pool.get_stats(),
"golden_pool": self.golden_pool.get_stats(),
"masterpieces": len(self.common_memory["masterpieces"]),
"dao_pointer": self.dao.get_pointer()
}
def update_weights_and_stage(self, system):
for student_id, weight in self.common_memory["weights"].items():
stat = self.common_memory["stats"].get(student_id, {"pass_count": 0, "total_count": 1})
pass_rate = stat["pass_count"] / max(1, stat["total_count"])
if pass_rate > 0.8:
self.common_memory["weights"][student_id] = max(0.5, weight * 0.9)
elif pass_rate < 0.3:
self.common_memory["weights"][student_id] = min(2.0, weight * 1.2)
all_pass = all(
self.common_memory["stats"].get(sid, {"pass_count": 0, "total_count": 1}).get("pass_count", 0) /
max(1, self.common_memory["stats"].get(sid, {"total_count": 1}).get("total_count", 1)) > 0.7
for sid in ["火2", "木3", "水1", "金4"]
)
if not all_pass:
return
if self.stage_start_round == 0:
self.stage_start_round = system.round
rounds_in_stage = system.round - self.stage_start_round
if rounds_in_stage < self.min_rounds_per_stage:
print(f"\n📚 四部已合格,但仍在当前阶段磨练中({rounds_in_stage}/{self.min_rounds_per_stage}轮)")
return
current_stage = self.common_memory["stage"]
stages_order = ["小学", "中学", "大学", "研究生"]
current_idx = stages_order.index(current_stage)
if current_idx >= len(stages_order) - 1:
return
if self._check_corpus_requirement(system, current_stage):
self.common_memory["stage"] = stages_order[current_idx + 1]
self.stage_start_round = system.round
print(f"\n🎉 洛书中心:全体毕业!语料达标!升入{stages_order[current_idx + 1]}阶段!")
print(f" 投喂语料: {system.corpus_reader.get_total_text_size()} 字节")
print(f" 自取语料: {system.get_crawler_size()} 字节")
else:
target_size = int(system.corpus_reader.get_total_text_size() * (0.9 if current_stage == "小学" else (3 if current_stage == "中学" else 9)))
print(f"\n📚 四部已合格,但语料不足,暂不升级")
print(f" 需自取语料: {system.get_crawler_size()}/{target_size} 字节")
self._save_individual_memory()
def get_dao_effect(self) -> float:
return (self.dao_novelty - 0.5) * 0.3
# ==================== 主系统 ====================
class HeTuLuoShuSystem:
def __init__(self):
print("🐉 河图洛书智能体 - V5(调子系统 + 老师批改 + 分类语素库)")
print(" 火2:提取语素 → 老师7批改 → 火池(按词性分类)")
print(" 木3:按调子造句 → 老师8批改 → 木池(调子库)")
print(" 水1:杂交调子 → 老师6批改 → 水池(变体库)")
print(" 金4:筛选固化 → 老师9批改 → 金池(作品库)")
print(" 道:真圆周率引擎,注入新奇信号")
print("="*60)
target_dirs = ["corpus", "learning_material", "novels", "self_grown"]
self.corpus_reader = CorpusReader(target_dirs)
self.crawler_lock = False
self.huo2 = Huo2(self.corpus_reader)
self.mu3 = Mu3(self.corpus_reader)
self.shui1 = Shui1("1", "水1-变")
self.jin4 = Jin4()
self.jin4.luoshu = type('obj', (object,), {'system': self, 'common_memory': {}})()
self.students = {"火2": self.huo2, "木3": self.mu3, "水1": self.shui1, "金4": self.jin4}
self.teachers = {
"7": Teacher("7", "火2"),
"8": Teacher("8", "木3"),
"6": Teacher("6", "水1"),
"9": Teacher("9", "金4")
}
self.luoshu = LuoShuCenter()
self.luoshu.synonym_learner = self.luoshu.synonym_learner
# 关联老师和学生
self.huo2.teacher7 = self.teachers["7"]
self.mu3.teacher8 = self.teachers["8"]
self.shui1.teacher6 = self.teachers["6"]
self.jin4.teacher9 = self.teachers["9"]
for s in self.students.values():
s.luoshu = self.luoshu
for t in self.teachers.values():
t.luoshu = self.luoshu
self.jin4.luoshu = self.luoshu
self.round = 0
self.masterpiece_round = 0
self._load_global_memory()
self.output_dir = "masterpieces"
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs("standards", exist_ok=True)
os.makedirs("golden_works", exist_ok=True)
os.makedirs("memories", exist_ok=True)
for tid, teacher in self.teachers.items():
teacher._save_standard(0)
self._ensure_initial_corpus()
def _ensure_initial_corpus(self):
total_size = self.corpus_reader.get_total_text_size()
if total_size < 10000:
print(f"\n⚠️ 投喂语料不足({total_size}字节),触发初始爬取...")
self._ensure_corpus_sufficient()
def _atomic_write(self, filename: str, data: dict):
tmp_filename = filename + ".tmp"
with open(tmp_filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_filename, filename)
def get_crawler_size(self) -> int:
try:
result = subprocess.run(
["python", "smart_crawler.py", "status"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
data = json.loads(result.stdout)
return data.get("total_size", 0)
except Exception as e:
print(f"⚠️ 获取爬虫状态失败: {e}")
return 0
def _check_and_recover_teachers(self):
for tid, teacher in self.teachers.items():
teacher.try_recover()
def _ensure_corpus_sufficient(self):
if self.crawler_lock:
return
lock_file = "self_grown/crawler.lock"
if os.path.exists(lock_file):
try:
mtime = os.path.getmtime(lock_file)
if time.time() - mtime < 600:
print(f"⚠️ 爬虫正在运行,跳过本次检查")
return
else:
os.remove(lock_file)
except:
pass
current_stage = self.luoshu.common_memory["stage"]
current_size = self.get_crawler_size()
if current_stage == "小学":
target_size = int(self.corpus_reader.get_total_text_size() * 0.9)
if current_size < target_size:
print(f"\n⚠️ 洛书中心:语料不足,启动独立爬虫进程")
print(f" 当前语料: {current_size} 字节")
print(f" 目标语料: {target_size} 字节")
self.crawler_lock = True
result = subprocess.run(
["python", "smart_crawler.py", "fetch", str(target_size)],
capture_output=True, text=True, timeout=300
)
self.crawler_lock = False
if result.returncode == 0:
print(f"✅ 爬虫完成,语料已更新")
else:
print(f"⚠️ 爬虫执行失败: {result.stderr}")
elif current_stage in ["中学", "大学"]:
target_size = self.corpus_reader.get_total_text_size() * (3 if current_stage == "中学" else 9)
if current_size < target_size:
print(f"\n⚠️ 洛书中心:{current_stage}阶段语料不足,启动爬虫")
self.crawler_lock = True
subprocess.run(["python", "smart_crawler.py", "fetch", str(target_size)], timeout=600)
self.crawler_lock = False
def _get_global_filename(self) -> str:
return "memory_snapshot.json"
def _save_global_memory(self):
memory_data = {
"round": self.round,
"masterpiece_round": self.masterpiece_round,
"common_memory": self.luoshu.common_memory,
"teachers": {
tid: {
"pass_threshold": t.pass_threshold,
"rule_description": t.rule_description,
"use_fallback": t.use_fallback
} for tid, t in self.teachers.items()
}
}
self._atomic_write(self._get_global_filename(), memory_data)
def _load_global_memory(self):
filename = self._get_global_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.round = data.get("round", 0)
self.masterpiece_round = data.get("masterpiece_round", 0)
self.luoshu.common_memory = data.get("common_memory", self.luoshu.common_memory)
for tid, tdata in data.get("teachers", {}).items():
if tid in self.teachers:
self.teachers[tid].pass_threshold = tdata.get("pass_threshold", 0.6)
self.teachers[tid].rule_description = tdata.get("rule_description", self.teachers[tid].rule_description)
self.teachers[tid].use_fallback = tdata.get("use_fallback", False)
print(f"📂 恢复全局记忆,从第 {self.round} 轮继续")
except Exception as e:
print(f"⚠️ 加载全局记忆失败: {e}")
def _save_masterpiece(self):
if self.round - self.masterpiece_round >= 100 and self.round > 0:
self.masterpiece_round = self.round
if self.jin4.works:
latest = self.jin4.works[-1]
filename = f"{self.output_dir}/masterpiece_{self.round:06d}.txt"
tmp_filename = filename + ".tmp"
with open(tmp_filename, 'w', encoding='utf-8') as f:
f.write(f"# 河图洛书智能体 - 第{self.round}轮作品\n")
f.write(f"# 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"# 当前阶段: {self.luoshu.common_memory['stage']}\n")
f.write(f"# 道已计算: {self.luoshu.dao.get_pointer()} 位圆周率\n")
f.write(f"# 作品长度: {len(latest)} 字符\n")
f.write("="*60 + "\n\n")
f.write(latest)
f.write("\n\n" + "="*60 + "\n")
os.replace(tmp_filename, filename)
print(f"\n📖 已保存作品: {filename}\n")
else:
filename = f"{self.output_dir}/masterpiece_{self.round:06d}_none.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"# 河图洛书智能体 - 第{self.round}轮无新作品\n")
f.write(f"# 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"# 道已计算: {self.luoshu.dao.get_pointer()} 位圆周率\n")
print(f"\n📖 第{self.round}轮无新作品\n")
def _get_learning_topic(self, student_name: str) -> str:
topics = {
"火2-化": "中文文本特征提取和关键词抽取方法",
"木3-生": "如何生成通顺、有意义的完整中文句子",
"水1-变": "中文句子的变形技巧:反转、移位、重组",
"金4-成": "优秀文学作品的评判标准和语言艺术"
}
return topics.get(student_name, "中文语言学习")
def _run_student(self, student, teacher, prev_output=None):
student_name = student.name
if student.blocked:
print(f" 🚫 [{student_name}] 阻塞中,学习轮次: {student.learning_rounds}/3")
if student.can_retry():
print(f" 🔓 [{student_name}] 已解封,开始执行")
else:
topic = self._get_learning_topic(student_name)
student.learn(topic)
return None, False
context = {}
if student_name == "木3-生" and prev_output:
context["units"] = prev_output.get("units", [])
elif student_name == "水1-变" and prev_output:
context["sentence"] = prev_output.get("sentence", "")
elif student_name == "金4-成" and prev_output:
context["variants"] = prev_output.get("variants", [])
context["original_sentence"] = prev_output.get("sentence", "")
context["teacher9_threshold"] = teacher.pass_threshold
output = student.execute(context)
passed, score, comment = teacher.evaluate(output)
student.receive_feedback(score, comment)
self.luoshu.report(student_name.split("-")[0], score, passed)
print(f" 📋 老师{teacher.id}: {'✅通过' if passed else '❌不通过'} (分:{score:.2f}) {comment}")
if passed:
student.on_pass()
# 只有通过的学生才能把作品入库
if student_name == "火2-化":
pass # 火池已经在内部处理了
elif student_name == "木3-生":
sentence = output.get("sentence", "")
if sentence and self.luoshu:
rhythm, pos_seq, high_ratio = student._extract_rhythm_from_sentence(sentence)
if rhythm:
self.luoshu.wood_pool.add(rhythm, pos_seq, high_ratio, quality=score)
elif student_name == "水1-变":
for variant in output.get("variants", []):
self.luoshu.water_pool.update_score(variant, score)
else:
student.on_fail(comment)
return output, passed
def run_one_round(self):
self.round += 1
print(f"\n{'='*60}")
print(f"第 {self.round} 轮")
print(f"阶段: {self.luoshu.common_memory['stage']}")
print(f"道已计算: {self.luoshu.dao.get_pointer()} 位圆周率")
print(f" 火2毕业: {self.huo2.is_graduated} | 木3毕业: {self.mu3.is_graduated} | 水1毕业: {self.shui1.is_graduated} | 金4毕业: {self.jin4.is_graduated}")
output = None
# 火2:每200轮重建一次火池
if self.round == 1 or self.round % 200 == 0:
output, passed = self._run_student(self.huo2, self.teachers["7"])
if not self.huo2.is_graduated:
self._save_global_memory()
return
# 木3:生成多个句子
for _ in range(10):
output, passed = self._run_student(self.mu3, self.teachers["8"], output)
if not self.mu3.is_graduated:
self._save_global_memory()
return
# 水1:生成多个变体
for _ in range(5):
output, passed = self._run_student(self.shui1, self.teachers["6"], output)
if not self.shui1.is_graduated:
self._save_global_memory()
return
# 金4:固化
output, passed = self._run_student(self.jin4, self.teachers["9"], output)
if self.round % 100 == 0 and self.round > 0:
print(f"\n{'='*40}")
print("【系统进化】")
for tid, teacher in self.teachers.items():
teacher.self_evolve(self.round)
time.sleep(0.5)
self.luoshu.self_evolve(self)
self.luoshu.update_weights_and_stage(self)
self._check_and_recover_teachers()
self._ensure_corpus_sufficient()
print("="*40)
if self.round % 50 == 0:
stats = self.luoshu.get_pool_stats()
print(f"\n📊 统计: 金池作品 {stats['golden_pool']['size']}/{stats['golden_pool']['max_size']}")
print(f" 火池分类: {stats['fire_pool']}")
print(f" 木池调子: {stats['wood_pool']['size']}/{stats['wood_pool']['max_size']}")
print(f" 水池变体: {stats['water_pool']['size']}/{stats['water_pool']['max_size']}")
print(f" 道: 已计算 {stats['dao_pointer']} 位圆周率")
self._save_global_memory()
self._save_masterpiece()
def run_forever(self):
print("\n🚀 进入永久学习模式...")
print(" 数据流向:语料→火2→火池(分类语素)→木3→木池(调子)→水1→水池(变体)→金4→金池(作品)")
print(" 老师批改:老师7批语素,老师8批句子,老师6批变体,老师9批作品")
print(" 道层:真圆周率(gmpy2),流式加载,永不枯竭")
print(" Ctrl+C 可安全中断,状态自动保存\n")
try:
while True:
self.run_one_round()
time.sleep(0.2)
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断,状态已保存")
print(f" 当前轮数: {self.round}")
print(f" 金4作品数: {len(self.jin4.works)}")
print(f" 道已计算: {self.luoshu.dao.get_pointer()} 位圆周率")
self._save_global_memory()
for s in self.students.values():
s._save_individual_memory()
for t in self.teachers.values():
t._save_individual_memory()
self.luoshu._save_individual_memory()
print(" 下次运行将从中断处继续")
print("\n🐉 河图洛书智能体 V5 已休眠")
if __name__ == "__main__":
system = HeTuLuoShuSystem()
system.run_forever()
|