# hetu_luoshu_v6.py - 河图洛书智能体 V6(集成感知系统,移除火2初始化)
import os
import json
import random
import time
import requests
import re
import subprocess
import gzip
from collections import Counter
from typing import List, Dict, Any, Optional, Tuple, Set
from urllib.parse import quote
from datetime import datetime
from decimal import Decimal, getcontext
# 尝试导入 jieba 用于词性标注
try:
import jieba
import jieba.posseg as pseg
JIEBA_AVAILABLE = True
except ImportError:
JIEBA_AVAILABLE = False
print("⚠️ jieba 未安装,将使用简单词性标注(建议安装: pip install jieba)")
# 尝试导入 gmpy2(真圆周率引擎)
try:
import gmpy2
GMPY2_AVAILABLE = True
except ImportError:
GMPY2_AVAILABLE = False
print("⚠️ gmpy2 未安装,将使用 BBP 引擎(精度有限)")
print(" 建议安装: pip install gmpy2")
# ==================== DeepSeek API 配置 ====================
DEEPSEEK_API_KEY = "sk-952a1c833f61473ca2fe38a1bb367e9e"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
_api_fail_count = 0
_api_fail_threshold = 5
def call_deepseek(prompt: str, max_tokens: int = 800, temperature: float = 0.5) -> Optional[str]:
global _api_fail_count
try:
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {DEEPSEEK_API_KEY}"}
data = {"model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": temperature}
resp = requests.post(DEEPSEEK_API_URL, json=data, headers=headers, timeout=30)
if resp.status_code == 200:
_api_fail_count = 0
return resp.json()["choices"][0]["message"]["content"]
_api_fail_count += 1
return None
except Exception as e:
_api_fail_count += 1
print(f"DeepSeek API 调用失败: {e}")
return None
def is_api_degraded() -> bool:
return _api_fail_count >= _api_fail_threshold
def web_search(keyword: str) -> Optional[str]:
try:
url = f"https://baike.baidu.com/item/{quote(keyword)}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
resp = requests.get(url, headers=headers, timeout=15)
if resp.status_code == 200:
match = re.search(r'<div class="lemma-summary">(.*?)</div>', resp.text, re.DOTALL)
if match:
text = re.sub(r'<[^>]+>', '', match.group(1))
return text[:500]
match = re.search(r'<meta name="description" content="(.*?)"', resp.text)
if match:
return match.group(1)[:500]
return None
except Exception as e:
print(f"网上搜索失败: {e}")
return None
# ==================== 道的圆周率引擎 V3 ====================
class DaoPi:
def __init__(self, chunk_size: int = 5000):
self.chunk_size = chunk_size
self.pointer = 0
self.cache = []
self.current_chunk_start = 0
if GMPY2_AVAILABLE:
self._use_gmpy2 = True
print(f" 📐 道引擎:gmpy2(真圆周率),chunk_size={chunk_size}")
self._load_chunk_gmpy2(0)
else:
self._use_gmpy2 = False
print(f" 📐 道引擎:BBP(备用),chunk_size={chunk_size},精度有限")
self._load_chunk_bbp(0)
def _load_chunk_gmpy2(self, start_pos: int) -> None:
precision_bits = (start_pos + self.chunk_size + 100) * 4
gmpy2.get_context().precision = precision_bits
pi = gmpy2.const_pi()
decimal_places = start_pos + self.chunk_size + 50
pi_str = format(pi, f'.{decimal_places}f')
if '.' in pi_str:
pi_str = pi_str.split('.')[1]
else:
pi_str = ""
if start_pos < len(pi_str):
segment = pi_str[start_pos:start_pos + self.chunk_size]
else:
segment = ""
while len(segment) < self.chunk_size:
segment += "0"
self.cache = [int(ch) for ch in segment]
self.current_chunk_start = start_pos
print(f" 📐 道已加载新段: 位置 {start_pos} - {start_pos + self.chunk_size}")
def _load_chunk_bbp(self, start_pos: int) -> None:
need_precision = start_pos + self.chunk_size + 50
getcontext().prec = need_precision + 10
pi = Decimal(0)
for k in range(need_precision):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) -
Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) -
Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
if start_pos < len(pi_str):
segment = pi_str[start_pos:start_pos + self.chunk_size]
else:
segment = ""
while len(segment) < self.chunk_size:
segment += "0"
self.cache = [int(ch) for ch in segment]
self.current_chunk_start = start_pos
print(f" 📐 道已加载新段: 位置 {start_pos} - {start_pos + self.chunk_size}")
def _ensure_cache(self, pos: int) -> None:
if pos < self.current_chunk_start or pos >= self.current_chunk_start + self.chunk_size:
if self._use_gmpy2:
self._load_chunk_gmpy2(pos)
else:
self._load_chunk_bbp(pos)
def next_digit(self) -> int:
self._ensure_cache(self.pointer)
idx = self.pointer - self.current_chunk_start
digit = self.cache[idx]
self.pointer += 1
return digit
def novelty(self, length: int = 8) -> float:
segment = ''.join(str(self.next_digit()) for _ in range(length))
numeric = 0
for i, ch in enumerate(segment):
numeric += int(ch) * (0.1 ** (i+1))
novelty = numeric / 0.111111
return min(0.99, novelty)
def get_pointer(self) -> int:
return self.pointer
def reset_pointer(self):
self.pointer = 0
if self._use_gmpy2:
self._load_chunk_gmpy2(0)
else:
self._load_chunk_bbp(0)
# ==================== 感知系统(134项能力,所有学生共享) ====================
class HetuPerception:
"""河图感知系统 - 134项能力,永久固化,所有学生共享"""
def __init__(self, location: str = "襄阳"):
print("🧠 河图感知系统初始化(134项能力)")
self.location = location
self.location_desc = "湖北省襄阳市,三国文化发源地"
# 统计层
self.word_freq = Counter()
self.pair_freq = Counter()
self.sentence_pool = []
# 记忆层
self.history = {}
self.long_term_memory = []
self.short_term_memory = []
self.blacklist = set()
self.low_quality_pairs = []
# 待办事项
self.tasks = []
self.reminders = []
# 词典
self.positive_words = set()
self.negative_words = set()
self.abstract_words = set()
self.concrete_words = set()
self.easy_words = set()
self.hard_words = set()
self.synonyms = set()
self.antonyms = set()
self.causal_words = set()
self.nonsense_pairs = set()
self.person_names = set()
self.place_names = set()
self.org_names = set()
# 词性映射
self.pos_names = {
'n': '名词', 'v': '动词', 'a': '形容词', 'd': '副词',
'm': '数词', 'q': '量词', 'r': '代词', 'p': '介词',
'c': '连词', 'u': '助词', 'e': '叹词', 'o': '拟声词',
'nr': '人名', 'ns': '地名', 'nt': '组织名'
}
self.stopwords = {"的", "了", "在", "是", "有", "和", "与", "或", "但", "而",
"也", "之", "其", "于", "以", "为", "所", "不", "这", "那"}
# 统计
self.files_processed = 0
self.api_calls = 0
# 加载种子词典和持久化数据
self._load_seed_dicts()
self._load_dicts()
print(f" ✅ 感知系统初始化完成")
print(f" 地点: {self.location}")
print(f" 正面词: {len(self.positive_words)} | 负面词: {len(self.negative_words)}")
print(f" 词频表: {len(self.word_freq)} 个词")
def _load_seed_dicts(self):
"""加载种子词典"""
self.positive_words.update(["好", "美", "乐", "喜", "爱", "赞", "优", "真", "诚"])
self.negative_words.update(["坏", "恶", "痛", "苦", "悲", "哀", "愁", "恨", "怨"])
self.abstract_words.update(["思想", "感情", "精神", "文化", "价值", "意义", "真理"])
self.concrete_words.update(["桌子", "椅子", "水", "火", "山", "人", "书", "电脑"])
self.easy_words.update(["一", "二", "三", "人", "口", "手", "水", "火"])
self.hard_words.update(["哲学", "形而上学", "辩证法", "唯心主义"])
self.causal_words.update(["因为", "所以", "因此", "于是", "故", "则", "由于"])
self.synonyms.update([("美丽", "漂亮"), ("快乐", "开心"), ("快速", "迅速")])
self.antonyms.update([("大", "小"), ("多", "少"), ("高", "低"), ("好", "坏")])
self.nonsense_pairs.update([("太阳", "晚上"), ("月亮", "白天"), ("鱼", "天上")])
def _save_dicts(self):
os.makedirs("memories", exist_ok=True)
data = {
"positive_words": list(self.positive_words),
"negative_words": list(self.negative_words),
"abstract_words": list(self.abstract_words),
"concrete_words": list(self.concrete_words),
"easy_words": list(self.easy_words),
"hard_words": list(self.hard_words),
"synonyms": list(self.synonyms),
"antonyms": list(self.antonyms),
"causal_words": list(self.causal_words),
"person_names": list(self.person_names),
"place_names": list(self.place_names),
"org_names": list(self.org_names),
"word_freq": dict(self.word_freq),
"tasks": self.tasks,
"api_calls": self.api_calls
}
with open("memories/permanent_dict.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _load_dicts(self):
fname = "memories/permanent_dict.json"
if os.path.exists(fname):
try:
with open(fname, "r", encoding="utf-8") as f:
data = json.load(f)
self.positive_words = set(data.get("positive_words", []))
self.negative_words = set(data.get("negative_words", []))
self.abstract_words = set(data.get("abstract_words", []))
self.concrete_words = set(data.get("concrete_words", []))
self.easy_words = set(data.get("easy_words", []))
self.hard_words = set(data.get("hard_words", []))
self.synonyms = set(tuple(p) for p in data.get("synonyms", []))
self.antonyms = set(tuple(p) for p in data.get("antonyms", []))
self.causal_words = set(data.get("causal_words", []))
self.person_names = set(data.get("person_names", []))
self.place_names = set(data.get("place_names", []))
self.org_names = set(data.get("org_names", []))
self.word_freq = Counter(data.get("word_freq", {}))
self.tasks = data.get("tasks", [])
self.api_calls = data.get("api_calls", 0)
print(f" 📂 加载感知词典: {len(self.word_freq)} 个词")
except:
pass
# ==================== 基础感知能力 ====================
def segment(self, text):
if not JIEBA_AVAILABLE or not text:
return []
return [w.strip() for w in jieba.cut(text) if w.strip()]
def get_pos(self, word):
for w, p in pseg.cut(word):
if w == word: return self.pos_names.get(p, p)
return "未知"
def is_chinese(self, ch): return '\u4e00' <= ch <= '\u9fff'
def char_type(self, ch):
if self.is_chinese(ch): return "汉字"
if 'a' <= ch <= 'z' or 'A' <= ch <= 'Z': return "英文"
if '0' <= ch <= '9': return "数字"
return "标点"
def split_sentences(self, text):
return [s.strip() for s in re.split(r'[。!?;\n]+', text) if s.strip()]
def char_count(self, text): return len(text)
def word_count(self, text): return len(self.segment(text))
def sentence_count(self, text): return len(self.split_sentences(text))
# ==================== 统计能力 ====================
def get_freq(self, word): return self.word_freq.get(word, 0)
def get_next_words(self, word, top_n=3):
candidates = [(pair.split('|')[1], cnt) for pair, cnt in self.pair_freq.items() if pair.startswith(f"{word}|")]
candidates.sort(key=lambda x: x[1], reverse=True)
return [w for w, c in candidates[:top_n]]
def train_from_text(self, text: str):
words = self.segment(text)
for i, w in enumerate(words):
if 1 <= len(w) <= 4 and self.is_chinese(w[0]) and w not in self.stopwords:
self.word_freq[w] += 1
if i > 0:
prev = words[i-1]
if 1 <= len(prev) <= 4 and self.is_chinese(prev[0]) and prev not in self.stopwords:
self.pair_freq[f"{prev}|{w}"] += 1
for s in self.split_sentences(text):
if 10 <= len(s) <= 200 and self.is_chinese(s[0]):
self.sentence_pool.append(s)
self.files_processed += 1
# ==================== 判断能力 ====================
def sentiment(self, text):
words = self.segment(text)
pos = sum(1 for w in words if w in self.positive_words)
neg = sum(1 for w in words if w in self.negative_words)
if pos > neg: return "正面"
if neg > pos: return "负面"
return "中性"
def sentiment_intensity(self, text):
words = self.segment(text)
return min(10, sum(1 for w in words if w in self.positive_words or w in self.negative_words))
def fluency(self, sentence):
words = self.segment(sentence)
if len(words) < 2: return 0.0
match = sum(1 for i in range(len(words)-1) if f"{words}|{words[i+1]}" in self.pair_freq)
return match / (len(words)-1)
def grammar_check(self, sentence):
words = self.segment(sentence)
return any(self.get_pos(w) in ["名词", "代词"] for w in words) and any(self.get_pos(w) == "动词" for w in words)
def novelty_score(self, text):
words = self.segment(text)
if len(words) < 2: return 0.0
new_pairs = sum(1 for i in range(len(words)-1) if f"{words}|{words[i+1]}" not in self.pair_freq)
return new_pairs / (len(words)-1)
def complexity_score(self, text):
words = self.segment(text)
if not words: return 0.0
avg_len = sum(len(w) for w in words) / len(words)
rare_ratio = sum(1 for w in words if self.word_freq.get(w, 0) <= 2) / len(words)
return min(1.0, (avg_len - 1) / 3 * 0.5 + rare_ratio * 0.5)
def coherence_score(self, text):
sents = self.split_sentences(text)
if len(sents) < 2: return 1.0
scores = []
for i in range(len(sents)-1):
w1 = set(self.segment(sents))
w2 = set(self.segment(sents[i+1]))
scores.append(min(1.0, len(w1 & w2) / 3))
return sum(scores) / len(scores) if scores else 0.5
def info_density(self, text):
words = self.segment(text)
if not words: return 0.0
content = [w for w in words if w not in self.stopwords and self.is_chinese(w[0])]
return len(content) / len(words)
def aesthetic_score(self, text):
sents = self.split_sentences(text)
if len(sents) < 2: return 0.5
last_chars = [s[-1] for s in sents if s]
if len(last_chars) < 2: return 0.5
rhyme = sum(1 for i in range(len(last_chars)-1) if last_chars == last_chars[i+1])
return rhyme / (len(last_chars)-1)
def confidence(self, output):
words = self.segment(output)
if not words: return 0.0
high = sum(1 for w in words if self.word_freq.get(w, 0) > 10) / len(words)
pair = sum(1 for i in range(len(words)-1) if f"{words}|{words[i+1]}" in self.pair_freq)
pair_ratio = pair / (len(words)-1) if len(words) > 1 else 1.0
return (high + pair_ratio) / 2
# ==================== 固化能力(核心) ====================
def validate_word(self, word: str, freq: int = 1) -> bool:
if word in self.word_freq:
return True
self.api_calls += 1
prompt = f"判断词语是否合法有意义。\n词语:{word}\n输出:true 或 false"
resp = call_deepseek(prompt, max_tokens=10, temperature=0.1)
is_valid = "true" in resp.lower()
if is_valid:
self.word_freq[word] = freq
self._save_dicts()
return is_valid
def abstraction(self, word: str) -> str:
if word in self.abstract_words:
return "抽象"
if word in self.concrete_words:
return "具体"
self.api_calls += 1
prompt = f"判断词语是抽象还是具体。\n词语:{word}\n输出:抽象 或 具体"
resp = call_deepseek(prompt, max_tokens=10, temperature=0.1)
if "抽象" in resp:
self.abstract_words.add(word)
self._save_dicts()
return "抽象"
if "具体" in resp:
self.concrete_words.add(word)
self._save_dicts()
return "具体"
return "中等"
def difficulty(self, word: str) -> str:
if word in self.easy_words:
return "简单"
if word in self.hard_words:
return "困难"
if self.word_freq.get(word, 0) > 50:
self.easy_words.add(word)
self._save_dicts()
return "简单"
self.api_calls += 1
prompt = f"评估词语学习难度。\n词语:{word}\n输出:简单 或 中等 或 困难"
resp = call_deepseek(prompt, max_tokens=10, temperature=0.1)
if "简单" in resp:
self.easy_words.add(word)
self._save_dicts()
return "简单"
if "困难" in resp:
self.hard_words.add(word)
self._save_dicts()
return "困难"
return "中等"
def get_api_calls(self): return self.api_calls
# ==================== 小型策略网络 ====================
class SimpleStrategyNet:
def __init__(self, student_id: str):
self.student_id = student_id
self.experiences = []
def predict(self, recent_memory: List[Dict]) -> Dict[str, float]:
if len(recent_memory) < 3:
return {"curiosity": 0.3, "intensity": 0.5, "variety": 0.5}
recent_scores = [m.get("teacher_score", 0) for m in recent_memory[-3:]]
avg_score = sum(recent_scores) / len(recent_scores) if recent_scores else 0.5
if avg_score > 0.8:
return {"curiosity": 0.2, "intensity": 0.3, "variety": 0.4}
elif avg_score < 0.3:
return {"curiosity": 0.6, "intensity": 0.8, "variety": 0.7}
else:
return {"curiosity": 0.4, "intensity": 0.5, "variety": 0.5}
def update(self, teacher_score: float, last_action: Dict):
self.experiences.append({"time": time.time(), "action": last_action, "score": teacher_score})
if len(self.experiences) > 100:
self.experiences = self.experiences[-100:]
# ==================== 火池(分类语素库) ====================
class FirePool:
POS_CATEGORIES = {
'n': 'noun', 'v': 'verb', 'a': 'adj', 'm': 'num', 'q': 'num',
'r': 'pron', 'p': 'particle', 'c': 'particle', 'u': 'particle',
'd': 'particle', 'default': 'other', 'eng': 'english'
}
def __init__(self, max_per_category: int = 1000):
self.max_per_category = max_per_category
self.pools = {
'noun': {"high": [], "low": []}, 'verb': {"high": [], "low": []},
'adj': {"high": [], "low": []}, 'num': {"high": [], "low": []},
'pron': {"high": [], "low": []}, 'particle': {"high": [], "low": []},
'other': {"high": [], "low": []}, 'english': {"high": [], "low": []}
}
self._dirty = True
self._load()
def _get_filename(self) -> str:
return "memories/fire_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pools = data.get("pools", self.pools)
total = sum(len(pool["high"]) + len(pool["low"]) for pool in self.pools.values())
print(f" 🔥 加载火池,{total} 个语素")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pools": self.pools}, f, ensure_ascii=False, indent=2)
def _get_pos_category(self, pos_flag: str) -> str:
return self.POS_CATEGORIES.get(pos_flag, 'other')
def exists(self, unit: str) -> bool:
for category in self.pools:
for section in ['high', 'low']:
for item in self.pools[category][section]:
if item["unit"] == unit:
return True
return False
def add(self, unit: str, pos: str, frequency: int = 1, quality: float = 0.5):
if self.exists(unit):
return
category = self._get_pos_category(pos)
pool = self.pools[category]
new_item = {"unit": unit, "frequency": frequency, "quality": quality, "time": time.time()}
if quality >= 0.7:
pool["high"].append(new_item)
else:
pool["low"].append(new_item)
self._dirty = True
self._sort_and_trim(category)
self._save()
def increase_frequency(self, unit: str, delta: int = 1):
for category in self.pools:
for section in ['high', 'low']:
for i, item in enumerate(self.pools[category][section]):
if item["unit"] == unit:
self.pools[category][section]["frequency"] += delta
self._dirty = True
self._save()
return
def _sort_and_trim(self, category: str):
pool = self.pools[category]
pool["high"].sort(key=lambda x: x["frequency"] * x["quality"], reverse=True)
pool["low"].sort(key=lambda x: x["frequency"] * x["quality"], reverse=True)
high_target = self.max_per_category // 2
if len(pool["high"]) > high_target:
pool["high"] = pool["high"][:high_target]
low_target = self.max_per_category // 2
if len(pool["low"]) > low_target:
pool["low"] = pool["low"][:low_target]
def get_by_pos(self, pos: str, high_ratio: float = 0.8, total_count: int = 5) -> List[str]:
category = self._get_pos_category(pos)
pool = self.pools[category]
high_needed = int(total_count * high_ratio)
low_needed = total_count - high_needed
result = []
if pool["high"]:
high_items = random.sample(pool["high"], min(high_needed, len(pool["high"])))
result.extend([item["unit"] for item in high_items])
if pool["low"]:
low_items = random.sample(pool["low"], min(low_needed, len(pool["low"])))
result.extend([item["unit"] for item in low_items])
random.shuffle(result)
return result[:total_count]
def apply_thunder(self, threshold_ratio: float = 10.0):
thunder_count = 0
for category in self.pools:
for section in ['high', 'low']:
items = self.pools[category][section]
if len(items) < 2:
continue
items.sort(key=lambda x: x["frequency"], reverse=True)
max_freq = items[0]["frequency"]
second_freq = items[1]["frequency"] if len(items) > 1 else 1
if max_freq > second_freq * threshold_ratio:
old_freq = max_freq
items[0]["frequency"] = max(1, int(max_freq / 2))
thunder_count += 1
print(f" ⚡ 雷霆之怒: {items[0]['unit']} 频次从 {old_freq} 降至 {items[0]['frequency']}")
self._dirty = True
if thunder_count > 0:
self._save()
return thunder_count
def get_stats(self) -> dict:
return {k: {"high": len(v["high"]), "low": len(v["low"])} for k, v in self.pools.items()}
# ==================== 木池 ====================
class WoodPool:
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self.pool = []
self._dirty = True
self._load()
def _get_filename(self) -> str:
return "memories/wood_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pool = data.get("pool", [])
print(f" 🌳 加载木池,{len(self.pool)} 个调子")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pool": self.pool}, f, ensure_ascii=False, indent=2)
def add(self, rhythm: List[int], pos_seq: List[List[str]], high_ratio: float, quality: float = 0.5):
for existing in self.pool:
if existing["rhythm"] == rhythm and existing["pos_seq"] == pos_seq:
existing["count"] += max(1, int(quality * 3))
existing["quality"] = max(existing.get("quality", 0), quality)
existing["time"] = time.time()
self._dirty = True
self._save()
return
self.pool.append({
"rhythm": rhythm,
"pos_seq": pos_seq,
"high_ratio": high_ratio,
"count": max(1, int(quality * 3)),
"quality": quality,
"time": time.time()
})
self._dirty = True
self._sort_and_trim()
self._save()
def _sort_and_trim(self):
if self._dirty:
self.pool.sort(key=lambda x: x["count"] * x.get("quality", 0.5), reverse=True)
self._dirty = False
if len(self.pool) > self.max_size:
self.pool = self.pool[:self.max_size]
def get_random(self, num: int = 1) -> List[Dict]:
if not self.pool:
return []
self._sort_and_trim()
source = self.pool[:min(200, len(self.pool))]
return random.sample(source, min(num, len(source)))
def apply_thunder(self, threshold_ratio: float = 10.0):
if len(self.pool) < 2:
return 0
self._sort_and_trim()
max_count = self.pool[0]["count"]
second_count = self.pool[1]["count"] if len(self.pool) > 1 else 1
if max_count > second_count * threshold_ratio:
old = max_count
self.pool[0]["count"] = max(1, int(max_count / 2))
print(f" ⚡ 雷霆之怒: 调子使用次数降半 {old} -> {self.pool[0]['count']}")
self._dirty = True
self._save()
return 1
return 0
def get_stats(self) -> dict:
return {"size": len(self.pool), "max_size": self.max_size}
# ==================== 水池 ====================
class WaterPool:
def __init__(self, max_size: int = 2000):
self.max_size = max_size
self.pool = []
self._dirty = True
self._load()
def _get_filename(self) -> str:
return "memories/water_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pool = data.get("pool", [])
print(f" 💧 加载水池,{len(self.pool)} 条变体")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pool": self.pool}, f, ensure_ascii=False, indent=2)
def add(self, text: str, rhythm: List[int], pos_seq: List[List[str]], high_ratio: float, score: float = 0.5):
for existing in self.pool:
if existing["text"] == text:
existing["count"] = existing.get("count", 0) + 1
existing["score"] = max(existing["score"], score)
existing["time"] = time.time()
self._dirty = True
self._save()
return
self.pool.append({
"text": text,
"score": score,
"rhythm": rhythm,
"pos_seq": pos_seq,
"high_ratio": high_ratio,
"count": 1,
"time": time.time()
})
self._dirty = True
self._sort_and_trim()
self._save()
def _sort_and_trim(self):
if self._dirty:
now = time.time()
for item in self.pool:
age = now - item["time"]
freshness = max(0.3, 1.0 - age / 7200)
item["_effective_score"] = item["score"] * freshness * (1 + 0.1 * item.get("count", 1))
self.pool.sort(key=lambda x: x.get("_effective_score", x["score"]), reverse=True)
self._dirty = False
if len(self.pool) > self.max_size:
self.pool = self.pool[:self.max_size]
def get_best(self, num: int = 10, min_len: int = 10, exclude: set = None) -> List[Dict]:
self._sort_and_trim()
if exclude is None:
exclude = set()
results = []
for item in self.pool:
text = item["text"]
if min_len <= len(text) <= 200 and text not in exclude:
results.append(item)
if len(results) >= num:
break
return results
def apply_thunder(self, threshold_ratio: float = 10.0):
if len(self.pool) < 2:
return 0
self._sort_and_trim()
max_score = self.pool[0]["score"]
second_score = self.pool[1]["score"] if len(self.pool) > 1 else 0.1
if max_score > second_score * threshold_ratio:
old = max_score
self.pool[0]["score"] = max(0.1, max_score / 2)
print(f" ⚡ 雷霆之怒: 变体分数降半 {old:.2f} -> {self.pool[0]['score']:.2f}")
self._dirty = True
self._save()
return 1
return 0
def get_stats(self) -> dict:
return {"size": len(self.pool), "max_size": self.max_size}
# ==================== 金池 ====================
class GoldenPool:
def __init__(self, max_size: int = 500):
self.max_size = max_size
self.pool = []
self._load()
def _get_filename(self) -> str:
return "memories/golden_pool.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.pool = data.get("pool", [])
print(f" 💎 加载金池,{len(self.pool)} 件作品")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({"pool": self.pool}, f, ensure_ascii=False, indent=2)
def add(self, text: str, score: float):
for existing in self.pool:
if existing["text"] == text:
existing["count"] = existing.get("count", 0) + 1
existing["score"] = max(existing["score"], score)
self._save()
return
self.pool.append({
"text": text,
"score": score,
"count": 1,
"time": time.time()
})
self.pool.sort(key=lambda x: x["score"], reverse=True)
if len(self.pool) > self.max_size:
self.pool = self.pool[:self.max_size]
self._save()
def get_stats(self) -> dict:
return {"size": len(self.pool), "max_size": self.max_size}
# ==================== 同义词自学习模块 ====================
class SynonymLearner:
def __init__(self):
self.synonyms = {}
self.cooccurrence = Counter()
self._load()
self._init_fallback()
def _init_fallback(self):
self._fallback = {
"好": ["棒", "优", "佳", "美"],
"大": ["巨", "宏", "浩", "庞"],
"小": ["微", "细", "精", "纤"],
"是": ["乃", "即", "为", "系"],
"有": ["具", "含", "拥", "备"],
"无": ["缺", "失", "乏", "没"],
"多": ["众", "繁", "丰", "盛"],
"少": ["稀", "寡", "微", "欠"],
"美": ["丽", "艳", "秀", "雅"],
"真": ["实", "诚", "确", "正"],
}
def _get_filename(self) -> str:
return "memories/synonyms_memory.json"
def _load(self):
filename = self._get_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.synonyms = data.get("synonyms", {})
self.cooccurrence = Counter(data.get("cooccurrence", {}))
print(f" 📖 加载同义词库,{len(self.synonyms)} 组")
except:
pass
def _save(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_filename()
with open(filename, 'w', encoding='utf-8') as f:
json.dump({
"synonyms": self.synonyms,
"cooccurrence": dict(self.cooccurrence)
}, f, ensure_ascii=False, indent=2)
def learn_from_corpus(self, texts: List[str]):
for text in texts:
words = re.findall(r'[\u4e00-\u9fff]{2,4}', text)
for i, w1 in enumerate(words):
for w2 in words[i+1:i+3]:
if w1 != w2:
self.cooccurrence[f"{w1}|{w2}"] += 1
self.cooccurrence[f"{w2}|{w1}"] += 1
threshold = 3
for pair, count in self.cooccurrence.items():
if count >= threshold:
w1, w2 = pair.split('|')
if w1 not in self.synonyms:
self.synonyms[w1] = []
if w2 not in self.synonyms[w1]:
self.synonyms[w1].append(w2)
if w2 not in self.synonyms:
self.synonyms[w2] = []
if w1 not in self.synonyms[w2]:
self.synonyms[w2].append(w1)
self._save()
print(f" 📚 同义词学习完成,现有 {len(self.synonyms)} 组")
def get(self, word: str) -> List[str]:
if word in self.synonyms and self.synonyms[word]:
return self.synonyms[word]
return self._fallback.get(word, [])
# ==================== 学生基类 ====================
class Student:
def __init__(self, student_id: str, name: str, perception: HetuPerception):
self.id = student_id
self.name = name
self.perception = perception
self.memory = []
self.learning_memory = []
self.blacklist = set()
self.blacklist_count = {}
self.strategy_net = SimpleStrategyNet(student_id)
self.skill_level = 0.3
self.blocked = False
self.learning_rounds = 0
self.consecutive_passes = 0
self.is_graduated = False
self.luoshu = None
self.can_think = False
self._load_individual_memory()
def _get_memory_filename(self) -> str:
name_map = {
"火2-化": "huo2_memory",
"木3-生": "mu3_memory",
"水1-变": "shui1_memory",
"金4-成": "jin4_memory"
}
base = name_map.get(self.name, self.name.replace("-", "_"))
return f"memories/{base}.json"
def _atomic_write(self, filename: str, data: dict):
tmp = filename + ".tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp, filename)
def _load_individual_memory(self):
filename = self._get_memory_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.memory = data.get("memory", [])
self.learning_memory = data.get("learning_memory", [])
self.skill_level = data.get("skill_level", 0.3)
self.blocked = data.get("blocked", False)
self.learning_rounds = data.get("learning_rounds", 0)
self.consecutive_passes = data.get("consecutive_passes", 0)
self.is_graduated = data.get("is_graduated", False)
self.blacklist = set(data.get("blacklist", []))
self.blacklist_count = data.get("blacklist_count", {})
if hasattr(self, "works"):
self.works = data.get("works", [])
print(f" 📂 [{self.name}] 加载记忆,技能={self.skill_level:.2f}, 阻塞={self.blocked}, 黑名单={len(self.blacklist)}")
except:
pass
def _save_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
data = {
"name": self.name,
"memory": self.memory[-200:],
"learning_memory": self.learning_memory[-50:],
"skill_level": self.skill_level,
"blocked": self.blocked,
"learning_rounds": self.learning_rounds,
"consecutive_passes": self.consecutive_passes,
"is_graduated": self.is_graduated,
"blacklist": list(self.blacklist),
"blacklist_count": self.blacklist_count,
}
if hasattr(self, "works"):
data["works"] = self.works
self._atomic_write(filename, data)
def add_to_blacklist(self, item: str, reason: str = ""):
if item in self.blacklist_count:
self.blacklist_count[item] += 1
else:
self.blacklist_count[item] = 1
self.blacklist.add(item)
self._save_individual_memory()
print(f" 📝 [{self.name}] 加入黑名单: {item[:30]} (次数:{self.blacklist_count[item]}) {reason}")
def is_blacklisted(self, item: str) -> bool:
return item in self.blacklist
def _search_web(self, query: str) -> Optional[str]:
print(f" 🌐 [{self.name}] 网上搜索: {query[:50]}...")
return web_search(query)
def _search_api(self, query: str) -> Optional[str]:
print(f" 🤖 [{self.name}] API搜索: {query[:50]}...")
return call_deepseek(f"请提供关于「{query}」的中文百科信息,200字以内。只返回客观事实。", max_tokens=500, temperature=0.3)
def _learn_from_sources(self, topic: str):
print(f" 📚 [{self.name}] 开始学习: {topic[:50]}...")
web_result = self._search_web(topic)
time.sleep(0.3)
api_result = self._search_api(topic)
time.sleep(0.3)
self.learning_memory.append({"time": time.time(), "topic": topic, "web": web_result[:200] if web_result else "", "api": api_result[:200] if api_result else ""})
self.skill_level = min(1.0, self.skill_level + 0.1)
self._save_individual_memory()
print(f" 📈 [{self.name}] 技能等级: {self.skill_level:.2f}")
def learn(self, topic: str):
self.learning_rounds += 1
print(f" 📚 [{self.name}] 学习轮次: {self.learning_rounds}/3")
self._learn_from_sources(topic)
def on_pass(self):
self.consecutive_passes += 1
self._save_individual_memory()
print(f" ✅ [{self.name}] 通过 ({self.consecutive_passes}/3)")
if self.consecutive_passes >= 3:
self.is_graduated = True
print(f" 🎉 [{self.name}] 毕业!")
def on_fail(self, fail_reason: str = ""):
self.consecutive_passes = 0
self.is_graduated = False
self.blocked = True
self.learning_rounds = 0
self._save_individual_memory()
print(f" ❌ [{self.name}] 不通过,进入学习模式(需学习3轮)")
def can_retry(self) -> bool:
if not self.blocked:
return True
if self.learning_rounds >= 3:
self.blocked = False
self.learning_rounds = 0
self._save_individual_memory()
print(f" 🔓 [{self.name}] 学习完成,开始重做")
return True
return False
def receive_feedback(self, teacher_score: float, teacher_comment: str = ""):
self.memory.append({"time": time.time(), "score": teacher_score, "comment": teacher_comment})
self.strategy_net.update(teacher_score, {})
if len(self.memory) > 200:
self.memory = self.memory[-200:]
self._save_individual_memory()
def _get_mimic_ratio(self) -> float:
base_ratio = 0.20
skill_penalty = self.skill_level * 0.1
return max(0.10, min(0.30, base_ratio - skill_penalty))
def _get_dao_effect(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return self.luoshu.dao_novelty
return 0.5
def _maybe_mimic(self, original_output: Dict, context: Dict) -> Dict:
mimic_ratio = self._get_mimic_ratio()
if random.random() >= mimic_ratio:
return original_output
print(f"🎭 [{self.name}] 触发模仿模式 (比例={mimic_ratio:.0%})...")
if self.luoshu is None:
return original_output
if "火2" in self.name:
masterpieces = [item["text"] for item in self.luoshu.golden_pool.pool[:10]] if hasattr(self.luoshu, 'golden_pool') else []
if masterpieces:
all_words = []
for text in masterpieces:
words = re.findall(r'[\u4e00-\u9fff]{2,4}', text)
all_words.extend(words)
if all_words:
word_counter = Counter(all_words)
top_words = [w for w, c in word_counter.most_common(5)]
current_units = original_output.get("units", [])
new_units = list(set(current_units + top_words))
original_output["units"] = new_units
print(f"🔥 [火2] 模仿了优秀词汇: {top_words[:3]}")
elif "木3" in self.name:
masterpieces = [item["text"] for item in self.luoshu.golden_pool.pool[:10]] if hasattr(self.luoshu, 'golden_pool') else []
if masterpieces:
sample = random.choice(masterpieces)
sentences = re.findall(r'[^。!?;]*[。!?;]', sample)
if sentences:
template = sentences[0]
punctuation = re.findall(r'[,。!?;、]', template)
if punctuation:
sentence = original_output.get("sentence", "")
if sentence:
connectors = ["因为", "所以", "但是", "然而", "于是", "因此"]
prefix = random.choice(connectors)
original_output["sentence"] = f"{prefix}{sentence}"
print(f"🌳 [木3] 模仿了句式逻辑: {prefix}...")
else:
sentence = original_output.get("sentence", "")
if sentence:
original_output["sentence"] = f"{sentence}。"
print(f"🌳 [木3] 模仿了句号结尾")
elif "水1" in self.name:
sentence = original_output.get("sentence", "")
if sentence and len(sentence) > 5:
techniques = [
("加修辞前缀", lambda s: f"堪称完美的{s}" if len(s) < 30 else f"令人惊叹的{s}"),
("加反问语气", lambda s: f"难道{s}不是真理吗?"),
("加排比结构", lambda s: f"不仅{s},而且{s},更是{s}"[:100]),
("加夸张表达", lambda s: f"毫无疑问,{s}"),
("加古风表达", lambda s: f"夫{s},诚然也。"),
]
tech_name, tech_func = random.choice(techniques)
new_sentence = tech_func(sentence)
original_output["sentence"] = new_sentence
print(f"💧 [水1] 模仿了修辞: {tech_name}")
elif "金4" in self.name:
masterpieces = [item["text"] for item in self.luoshu.golden_pool.pool[:10]] if hasattr(self.luoshu, 'golden_pool') else []
if masterpieces:
best = max(masterpieces, key=len)
if best and len(best) > 10:
new_text = best
if self.luoshu and hasattr(self.luoshu, 'synonym_learner'):
words = re.findall(r'[\u4e00-\u9fff]{2,4}', new_text)
for word in words:
synonyms = self.luoshu.synonym_learner.get(word)
if synonyms and random.random() < 0.3:
new_text = new_text.replace(word, random.choice(synonyms), 1)
if new_text and new_text[-1] in "。!?":
new_text = new_text[:-1] + random.choice("。!?")
original_output["final"] = new_text
print(f"🏆 [金4] 模仿了满分作文并微调")
else:
templates = [
"天地之间,万物演化,河图洛书,揭示其理。",
"智能之形,源于结构,涌现于交互,成就于演化。",
"阴阳相生,五行相克,八卦相荡,万物相成。",
]
original_output["final"] = random.choice(templates)
print(f"🏆 [金4] 模仿了预设模板")
return original_output
def _try_think(self, context: Dict) -> Optional[Dict]:
return None
def execute(self, context: Dict = None) -> Dict:
if context is None:
context = {}
thinking_result = self._try_think(context)
if thinking_result and thinking_result.get("valid"):
print(f" 🧠 [{self.name}] 思考成功")
return thinking_result["output"]
print(f" ⚙️ [{self.name}] 使用规则")
output = self._rule_based_execute(context)
final_output = self._maybe_mimic(output, context)
return final_output
def _rule_based_execute(self, context: Dict) -> Dict:
raise NotImplementedError
# ==================== 老师基类 ====================
class Teacher:
def __init__(self, teacher_id: str, student_name: str, perception: HetuPerception):
self.id = teacher_id
self.student_name = student_name
self.perception = perception
self.records = []
self.pass_threshold = 0.6
self.evolution_advice = ""
self.rule_description = self._get_default_rule()
self.luoshu = None
self.api_fail_count = 0
self.use_fallback = False
self._load_individual_memory()
def _get_memory_filename(self) -> str:
return f"memories/teacher_{self.id}_memory.json"
def _get_standard_filename(self) -> str:
return f"standards/teacher_{self.id}_standard.json"
def _atomic_write(self, filename: str, data: dict):
tmp = filename + ".tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp, filename)
def _load_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
self.records = data.get("records", [])
self.pass_threshold = data.get("pass_threshold", 0.6)
self.rule_description = data.get("rule_description", self.rule_description)
print(f" 📂 老师{self.id} 加载记忆,{len(self.records)} 条记录")
except:
pass
def _save_individual_memory(self):
os.makedirs("memories", exist_ok=True)
filename = self._get_memory_filename()
data = {
"teacher_id": self.id,
"student": self.student_name,
"records": self.records[-500:],
"pass_threshold": self.pass_threshold,
"rule_description": self.rule_description,
}
self._atomic_write(filename, data)
def _get_default_rule(self) -> str:
rules = {
"7": "检查语素:是否纯中文、是否有实际语义",
"8": "检查句子:通顺度、长度",
"6": "检查变体:有效性",
"9": "检查作品:逻辑连贯性"
}
return rules.get(self.id, "根据学生输出判断")
def _save_standard(self, current_round: int):
os.makedirs("standards", exist_ok=True)
filename = self._get_standard_filename()
data = {
"teacher_id": self.id,
"student": self.student_name,
"timestamp": datetime.now().isoformat(),
"round": current_round,
"rule_description": self.rule_description,
"pass_threshold": self.pass_threshold,
"evolution_advice": self.evolution_advice,
"recent_pass_rate": self._get_recent_pass_rate(),
"use_fallback": self.use_fallback
}
history = []
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
history = json.load(f)
if isinstance(history, dict):
history = [history]
except:
pass
history.append(data)
if len(history) > 20:
history = history[-20:]
tmp = filename + ".tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
os.replace(tmp, filename)
txt = f"standards/teacher_{self.id}_standard.txt"
with open(txt, 'w', encoding='utf-8') as f:
f.write(f"老师{self.id}({self.student_name})评判标准\n更新时间: {datetime.now()}\n")
f.write(f"当前轮数: {current_round}\n通过阈值: {self.pass_threshold}\n最近通过率: {self._get_recent_pass_rate():.1%}\n")
f.write(f"降级模式: {'是' if self.use_fallback else '否'}\n规则: {self.rule_description}\n")
def _get_recent_pass_rate(self) -> float:
if len(self.records) < 20:
return 0.5
recent = self.records[-50:]
return sum(1 for r in recent if r.get("passed", False)) / len(recent)
def _get_dao_effect(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return (self.luoshu.dao_novelty - 0.5) * 0.2
return 0.0
def evaluate(self, student_output: Dict) -> tuple:
score = 0.5
comment = ""
dao_effect = self._get_dao_effect()
if self.id == "7":
unit = student_output.get("unit", "")
# 如果没有 unit,尝试从 units 列表获取
if not unit:
units = student_output.get("units", [])
if units:
unit = units[0]
if not unit:
score = 0.0
comment = "无语素"
elif not re.match(r'^[\u4e00-\u9fff]+$', unit) and not re.match(r'^[a-zA-Z]+$', unit):
score = 0.2 + dao_effect
comment = "包含非中英文字符"
elif len(unit) > 4:
score = 0.5 + dao_effect
comment = "语素过长"
else:
score = 0.7 + dao_effect
comment = "合格"
elif self.id == "8":
sentence = student_output.get("sentence", "")
if len(sentence) > 15 and any(p in sentence for p in "。!?;"):
score = 0.9 + dao_effect
comment = "句子通顺"
elif len(sentence) > 8:
score = 0.7 + dao_effect
comment = "长度合适"
else:
score = 0.3 + dao_effect
comment = "句子太短"
elif self.id == "6":
variants = student_output.get("variants", [])
unique = len(set(variants))
if unique >= 2:
score = 0.8 + dao_effect
comment = f"生成{unique}个变体"
elif unique == 1:
score = 0.5 + dao_effect
comment = "只生成1个变体"
else:
score = 0.3 + dao_effect
comment = "生成0个变体"
elif self.id == "9":
final = student_output.get("final", "")
if self.use_fallback:
if final and len(final) > 10:
score = 0.85 + dao_effect
comment = "作品合格(降级)"
else:
score = 0.2 + dao_effect
comment = "无作品"
else:
if final and len(final) > 10:
if any(kw in final for kw in ["故", "然", "所以", "因此", "则"]):
score = 0.95 + dao_effect
comment = "作品合格且有逻辑"
else:
score = 0.85 + dao_effect
comment = "作品合格"
else:
score = 0.2 + dao_effect
comment = "无作品"
score = max(0.0, min(1.0, score))
passed = score >= self.pass_threshold
self.records.append({
"time": time.time(),
"student_output": str(student_output)[:100],
"score": score,
"passed": passed,
"comment": comment
})
if len(self.records) > 500:
self.records = self.records[-500:]
self._save_individual_memory()
return passed, score, comment
def self_evolve(self, current_round: int):
if self.use_fallback:
print(f" ⏸️ 老师{self.id} 降级模式,跳过进化")
return
if len(self.records) < 10:
return
recent = self.records[-30:]
pass_rate = sum(1 for r in recent if r.get("passed", False)) / len(recent) if recent else 0.5
passed_ex = [r.get("student_output", "")[:80] for r in recent if r.get("passed", False)][-2:]
failed_ex = [r.get("student_output", "")[:80] for r in recent if not r.get("passed", False)][-2:]
prompt = f"""你是老师{self.id},负责评判{self.student_name}。
当前规则:{self.rule_description}
最近30次通过率:{pass_rate:.1%}
通过示例:{passed_ex}
未通过示例:{failed_ex}
请给出改进建议。一句话即可。
"""
print(f" 📖 老师{self.id} 自我进化...")
resp = call_deepseek(prompt, max_tokens=200, temperature=0.6)
if resp:
self.api_fail_count = 0
self.evolution_advice = resp
self.rule_description = resp[:300]
print(f" ✅ 老师{self.id} 进化完成")
else:
self.api_fail_count += 1
if self.api_fail_count >= 3:
self.use_fallback = True
print(f" 🔻 老师{self.id} 进入降级模式")
self._save_standard(current_round)
self._save_individual_memory()
def try_recover(self):
if not self.use_fallback:
return
resp = call_deepseek("回复'OK'", max_tokens=5, temperature=0)
if resp:
self.use_fallback = False
self.api_fail_count = 0
print(f" 🔺 老师{self.id} 已恢复")
return True
return False
# ==================== 语料读取器 ====================
class CorpusReader:
def __init__(self, target_dirs: List[str]):
self.target_dirs = target_dirs
self.total_files = 0
self.scanned_count = 0
self.has_scanned_all_files = False
self.all_text = ""
self.sentence_pool = []
self._build_pools()
def _read_file_with_fallback(self, file_path: str, max_chars: int = 8000) -> Optional[str]:
encodings = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5', 'latin-1']
for enc in encodings:
try:
with open(file_path, 'r', encoding=enc) as f:
return f.read(max_chars)
except:
continue
return None
def _get_text_from_dir(self, dir_path: str) -> str:
text_parts = []
for root, dirs, files in os.walk(dir_path):
for file in files:
if file.endswith(('.txt', '.md', '.json', '.csv')):
content = self._read_file_with_fallback(os.path.join(root, file), 8000)
if content:
text_parts.append(content)
return "".join(text_parts)
def _build_pools(self):
print("📚 正在构建语料精华池...")
all_text_parts = []
for base_dir in self.target_dirs:
if os.path.exists(base_dir):
text = self._get_text_from_dir(base_dir)
all_text_parts.append(text)
sentences = re.split(r'[。!?;]+', text)
for s in sentences:
s = s.strip()
if 5 < len(s) < 100 and re.search(r'[\u4e00-\u9fff]', s):
self.sentence_pool.append(s)
self.all_text = "".join(all_text_parts)
self.sentence_pool = list(set(self.sentence_pool))
self.has_scanned_all_files = True
print(f" ✅ 精华池构建完成!")
print(f" 文本长度: {len(self.all_text)}")
print(f" 句子量: {len(self.sentence_pool)}")
def get_all_text(self) -> str:
return self.all_text
def get_sample_sentences(self, num: int = 200) -> List[str]:
if not self.sentence_pool:
return []
random.shuffle(self.sentence_pool)
return self.sentence_pool[:num]
def get_total_text_size(self) -> int:
total = 0
for base_dir in self.target_dirs:
if os.path.exists(base_dir):
for root, dirs, files in os.walk(base_dir):
for file in files:
if file.endswith(('.txt', '.md', '.json', '.csv')):
try:
total += os.path.getsize(os.path.join(root, file))
except:
pass
return total
# ==================== 火2 ====================
class Huo2(Student):
def __init__(self, corpus_reader: CorpusReader, perception: HetuPerception):
super().__init__("2", "火2-化", perception)
self.corpus_reader = corpus_reader
self.teacher7 = None
self.initialized = True # 直接设为已初始化,跳过旧的火池建库
def _get_pos(self, word: str) -> str:
if JIEBA_AVAILABLE and len(word) >= 2:
words = list(pseg.cut(word))
if words:
return words[0].flag
if re.match(r'^[一二三四五六七八九十百千万亿]+$', word):
return 'm'
if re.match(r'^[a-zA-Z]+$', word):
return 'eng'
return 'n'
def _rule_based_execute(self, context: Dict) -> Dict:
# 从感知系统获取语素
units = []
if self.perception and self.perception.word_freq:
for word, freq in self.perception.word_freq.most_common(30):
if len(word) >= 2 and word not in self.perception.stopwords:
units.append(word)
if not units:
units = ["道", "德", "仁", "义", "礼", "智", "信", "天", "地", "人", "和", "气", "心", "性", "命"]
# 将语素添加到火池,供木3使用
if self.luoshu and self.luoshu.fire_pool:
for unit in units[:20]:
pos = self._get_pos(unit)
self.luoshu.fire_pool.add(unit, pos, frequency=1, quality=0.7)
# 输出 teacher7 需要的 unit 字段
return {"unit": units[0] if units else "道", "units": units, "count": len(units)}
# ==================== 木3 ====================
class Mu3(Student):
def __init__(self, corpus_reader: CorpusReader, perception: HetuPerception):
super().__init__("3", "木3-生", perception)
self.corpus_reader = corpus_reader
self.teacher8 = None
def _get_dao_length(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return self.luoshu.dao_novelty
return 0.5
def _extract_rhythm(self, sentence: str) -> Tuple[List[int], List[List[str]], float]:
if not sentence:
return [], [], 0.7
parts = re.split(r'[,,、]', sentence)
rhythm = []
pos_seq = []
for part in parts:
part = part.strip()
chinese = re.findall(r'[\u4e00-\u9fff]', part)
if chinese:
rhythm.append(len(chinese))
# 简单的词性推断
pos_list = []
for char in chinese:
# 如果是常见名词特征
if char in "天地人山水日月光阴阳五行":
pos_list.append('n')
elif char in "爱恨喜怒哀乐愁":
pos_list.append('v')
else:
pos_list.append('n')
pos_seq.append(pos_list)
if not rhythm:
rhythm = [len(sentence)]
pos_seq = [['n']]
return rhythm, pos_seq, 0.7
def _compose(self, rhythm: List[int], pos_seq: List[List[str]], high_ratio: float, dao: float) -> str:
if not self.luoshu or not rhythm:
return ""
parts = []
for i, part_len in enumerate(rhythm):
# 确定需要的词性
target_pos = 'n'
if i < len(pos_seq) and pos_seq:
target_pos = pos_seq[0] if pos_seq else 'n'
units = self.luoshu.fire_pool.get_by_pos(target_pos, high_ratio, part_len)
if not units or len(units) < part_len:
# 如果获取的语素不足,补充默认语素
default_units = ["天", "地", "人", "和", "道", "德", "仁", "义", "礼", "智", "信", "心", "性", "命"]
needed = part_len - len(units)
while len(units) < part_len and default_units:
units.append(default_units[0])
default_units = default_units[1:]
while len(units) < part_len:
units.append("道")
parts.append(''.join(units[:part_len]))
if not parts:
return ""
punctuations = [",", "。", "!", "?"]
result = ""
for i, part in enumerate(parts):
result += part
if i < len(parts) - 1:
result += random.choice([",", "、"])
else:
result += random.choice(punctuations[1:])
# 确保句子长度足够
if len(result) < 10:
result = result + "。"
return result
def _build_wood_pool(self):
print(f" 🌳 [{self.name}] 从语料提取调子...")
sentences = self.corpus_reader.get_sample_sentences(200)
for s in sentences:
rhythm, pos_seq, high = self._extract_rhythm(s)
if rhythm:
self.luoshu.wood_pool.add(rhythm, pos_seq, high, 0.6)
print(f" ✅ 调子库构建完成,共 {len(self.luoshu.wood_pool.pool)} 个调子")
def _rule_based_execute(self, context: Dict) -> Dict:
if not self.luoshu:
return {"sentence": "", "length": 0}
# 确保火池有语素,如果没有则从感知系统补充
fire_stats = self.luoshu.fire_pool.get_stats()
total_units = sum(v["high"] + v["low"] for v in fire_stats.values())
if total_units < 10:
# 从感知系统补充语素到火池
if self.perception and self.perception.word_freq:
for word, freq in self.perception.word_freq.most_common(30):
if len(word) >= 2 and word not in self.perception.stopwords:
pos = 'n'
self.luoshu.fire_pool.add(word, pos, frequency=1, quality=0.6)
# 确保木池有调子
if len(self.luoshu.wood_pool.pool) < 10:
self._build_wood_pool()
dao = self._get_dao_length()
rhythms = self.luoshu.wood_pool.get_random(3)
if not rhythms:
# 默认调子:4字+6字,或单个5字
rhythms = [{"rhythm": [4, 6], "pos_seq": [['n'], ['n']], "high_ratio": 0.7},
{"rhythm": [5, 5], "pos_seq": [['n'], ['n']], "high_ratio": 0.7},
{"rhythm": [3, 4, 3], "pos_seq": [['n'], ['n'], ['n']], "high_ratio": 0.7}]
selected = random.choice(rhythms)
sentence = self._compose(selected["rhythm"], selected.get("pos_seq", [['n']]), selected.get("high_ratio", 0.7), dao)
if self.teacher8 and sentence and len(sentence) >= 10:
passed, score, comment = self.teacher8.evaluate({"sentence": sentence})
if passed:
# 将成功的句子提取调子加入木池
rhythm, pos_seq, high = self._extract_rhythm(sentence)
if rhythm:
self.luoshu.wood_pool.add(rhythm, pos_seq, high, score)
elif sentence and len(sentence) < 10:
# 句子太短,添加补充内容
sentence = sentence + "此乃真理。"
return {"sentence": sentence, "length": len(sentence)}
# ==================== 水1 ====================
class Shui1(Student):
def __init__(self, student_id: str, name: str, perception: HetuPerception):
super().__init__(student_id, name, perception)
self.teacher6 = None
def _get_dao(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return self.luoshu.dao_novelty
return 0.5
def _hybrid(self, r1: List[int], r2: List[int], dao: float) -> List[int]:
method = random.choice(["concat", "interleave"])
if method == "concat":
nr = r1 + r2
else:
nr = []
for a, b in zip(r1, r2):
nr.append(a)
nr.append(b)
nr.extend(r1[len(r2):])
nr.extend(r2[len(r1):])
if dao > 0.6:
nr.append(random.randint(3, 8))
return nr[:20]
def _compose(self, rhythm: List[int], high_ratio: float, dao: float) -> str:
if not self.luoshu or not rhythm:
return ""
parts = []
for part_len in rhythm:
units = self.luoshu.fire_pool.get_by_pos('n', high_ratio, part_len)
if not units:
units = self.luoshu.fire_pool.get_by_pos('n', 0.8, part_len)
if not units or len(units) < part_len:
default_units = ["天", "地", "人", "和", "道", "德", "仁", "义", "礼", "智"]
needed = part_len - len(units)
while len(units) < part_len and default_units:
units.append(default_units[0])
default_units = default_units[1:]
while len(units) < part_len:
units.append("道")
parts.append(''.join(units[:part_len]))
if not parts:
return ""
result = ""
for i, part in enumerate(parts):
result += part
if i < len(parts) - 1:
result += random.choice([",", "、"])
else:
result += random.choice(["。", "!", "?"])
if dao > 0.7 and len(result) > 10:
result = result + "!"
return result
def _rule_based_execute(self, context: Dict) -> Dict:
if not self.luoshu:
return {"variants": [], "original": ""}
dao = self._get_dao()
variants = []
rhythms = self.luoshu.wood_pool.get_random(5)
if len(rhythms) >= 2:
for i in range(min(3, len(rhythms)-1)):
try:
r1 = rhythms.get("rhythm", [4, 4])
r2 = rhythms[i+1].get("rhythm", [4, 4])
nr = self._hybrid(r1, r2, dao)
high = (rhythms.get("high_ratio", 0.7) + rhythms[i+1].get("high_ratio", 0.7)) / 2
s = self._compose(nr, high, dao)
if s and len(s) > 10:
variants.append(s)
if self.teacher6:
passed, score, comment = self.teacher6.evaluate({"variants": })
if passed:
self.luoshu.water_pool.add(s, nr, [['n']]*len(nr), high, score)
except Exception as e:
continue
# 如果变体不足,使用简单的模板变体
if len(variants) < 2:
sentence = context.get("sentence", "")
if sentence and len(sentence) > 5:
# 简单变体1:加前缀
prefixes = ["因此", "所以", "然而", "而且", "总之"]
variants.append(random.choice(prefixes) + sentence)
# 简单变体2:加后缀
suffixes = [",此乃真理也。", ",毋庸置疑。", ",诚哉斯言。"]
if sentence.endswith("。"):
base = sentence[:-1]
else:
base = sentence
variants.append(base + random.choice(suffixes))
variants = list(set(variants))[:5]
return {"variants": variants, "original": context.get("sentence", "")}
# ==================== 金4 ====================
class Jin4(Student):
def __init__(self, perception: HetuPerception):
super().__init__("4", "金4-成", perception)
self.works = []
self.teacher9 = None
def _get_golden_subdir(self) -> str:
return "golden_works"
def _save_golden_work(self, work: str):
os.makedirs(self._get_golden_subdir(), exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
safe = re.sub(r'[\\/*?:"<>|\n\r\t]', '', work[:30].strip())
safe = re.sub(r'\s+', '_', safe) if safe else "unnamed"
fname = f"{self._get_golden_subdir()}/{ts}_{safe}.txt"
try:
with open(fname, 'w', encoding='utf-8') as f:
f.write(f"# 固化时间: {ts}\n# 长度: {len(work)}\n\n{work}\n")
print(f" 💎 金作品已保存: {fname}")
except:
pass
def _get_dao(self) -> float:
if self.luoshu and hasattr(self.luoshu, 'dao_novelty'):
return (self.luoshu.dao_novelty - 0.5) * 0.2
return 0.0
def _rule_based_execute(self, context: Dict) -> Dict:
if not self.luoshu:
return {"final": "", "works_count": len(self.works)}
candidates = self.luoshu.water_pool.get_best(20, 10, set(self.works))
# 如果没有候选,尝试从木池直接生成
if not candidates:
rhythms = self.luoshu.wood_pool.get_random(3)
if rhythms:
selected = random.choice(rhythms)
dao = self._get_dao()
# 直接生成一个句子
parts = []
for part_len in selected.get("rhythm", [4, 6]):
units = self.luoshu.fire_pool.get_by_pos('n', 0.8, part_len)
if not units:
units = ["天", "地", "人", "道", "德"] * (part_len // 2 + 1)
parts.append(''.join(units[:part_len]))
if parts:
candidate_text = ",".join(parts) + "。"
candidates = [{"text": candidate_text, "score": 0.5}]
if not candidates:
return {"final": "", "works_count": len(self.works)}
dao = self._get_dao()
strategy = self.strategy_net.predict(self.memory[-10:])
threshold = 0.6 + strategy["intensity"] * 0.1 - self.skill_level * 0.1 + dao
threshold = max(0.3, min(0.95, threshold))
best = candidates[0]["text"]
if self.teacher9:
passed, score, comment = self.teacher9.evaluate({"final": best})
if passed and score >= 0.7:
self.works.append(best)
self._save_golden_work(best)
self.luoshu.golden_pool.add(best, score)
print(f" ✅ 固化作品: {best[:40]}... (阈值={threshold:.2f})")
return {"final": best, "works_count": len(self.works)}
else:
print(f" ⏸️ 作品不合格 (分={score:.2f})")
return {"final": "", "works_count": len(self.works)}
if random.random() > threshold:
self.works.append(best)
self._save_golden_work(best)
self.luoshu.golden_pool.add(best, 0.8)
print(f" ✅ 固化作品: {best[:40]}...")
return {"final": best, "works_count": len(self.works)}
print(f" ⏸️ 未固化")
return {"final": "", "works_count": len(self.works)}
# ==================== 洛书中心 ====================
class LuoShuCenter:
def __init__(self, perception: HetuPerception):
self.common_memory = {"masterpieces": [], "stats": {}, "stage": "小学", "weights": {"火2": 1.0, "木3": 1.0, "水1": 1.0, "金4": 1.0}}
self.evolution_advice = ""
self.quality_rule = "长度大于10且含中文,优先看语义连贯性"
self.memory = []
self.stage_start_round = 0
self.min_rounds_per_stage = 500
self.synonym_learner = SynonymLearner()
self.fire_pool = FirePool(max_per_category=1000)
self.wood_pool = WoodPool(max_size=1000)
self.water_pool = WaterPool(max_size=2000)
self.golden_pool = GoldenPool(max_size=500)
self.dao = DaoPi(chunk_size=5000)
self.dao_novelty = 0.5
self.perception = perception
self._load_individual_memory()
self._load_standard()
def _atomic_write(self, filename: str, data: dict):
tmp = filename + ".tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp, filename)
def _get_memory_filename(self) -> str:
return "memories/luoshu_center_memory.json"
def _get_standard_filename(self) -> str:
return "standards/luoshu_center_standard.json"
def _load_individual_memory(self):
os.makedirs("memories", exist_ok=True)
fname = self._get_memory_filename()
if os.path.exists(fname):
try:
with open(fname, 'r', encoding='utf-8') as f:
data = json.load(f)
self.memory = data.get("memory", [])
self.common_memory = data.get("common_memory", self.common_memory)
print(f"📂 洛书中心加载记忆,{len(self.memory)} 条记录")
except:
pass
def _save_individual_memory(self):
os.makedirs("memories", exist_ok=True)
data = {"memory": self.memory[-200:], "common_memory": self.common_memory}
self._atomic_write(self._get_memory_filename(), data)
def _load_standard(self):
fname = self._get_standard_filename()
if os.path.exists(fname):
try:
with open(fname, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list) and data:
latest = data[-1]
self.quality_rule = latest.get("quality_rule", self.quality_rule)
self.common_memory["stage"] = latest.get("stage", "小学")
self.common_memory["weights"] = latest.get("weights", self.common_memory["weights"])
self.stage_start_round = latest.get("stage_start_round", 0)
print(f"📊 洛书中心加载标准: {self.quality_rule[:50]}...")
except:
pass
def _save_standard(self, system):
os.makedirs("standards", exist_ok=True)
data = {
"timestamp": datetime.now().isoformat(),
"round": system.round,
"stage": self.common_memory["stage"],
"weights": self.common_memory["weights"],
"quality_rule": self.quality_rule,
"evolution_advice": self.evolution_advice,
"stats": self.common_memory["stats"],
"stage_start_round": self.stage_start_round,
"min_rounds_per_stage": self.min_rounds_per_stage,
"dao_pointer": self.dao.get_pointer()
}
history = []
fname = self._get_standard_filename()
if os.path.exists(fname):
try:
with open(fname, 'r', encoding='utf-8') as f:
history = json.load(f)
if isinstance(history, dict):
history = [history]
except:
pass
history.append(data)
if len(history) > 20:
history = history[-20:]
self._atomic_write(fname, history)
txt = f"standards/luoshu_center_standard.txt"
with open(txt, 'w', encoding='utf-8') as f:
f.write(f"洛书中心标准\n更新时间: {datetime.now()}\n轮数: {system.round}\n")
f.write(f"阶段: {self.common_memory['stage']}\n规则: {self.quality_rule}\n")
def report(self, student_id: str, score: float, passed: bool):
if student_id not in self.common_memory["stats"]:
self.common_memory["stats"][student_id] = {"scores": [], "pass_count": 0, "total_count": 0}
self.common_memory["stats"][student_id]["scores"].append(score)
self.common_memory["stats"][student_id]["total_count"] += 1
if passed:
self.common_memory["stats"][student_id]["pass_count"] += 1
if len(self.common_memory["stats"][student_id]["scores"]) > 100:
self.common_memory["stats"][student_id]["scores"] = self.common_memory["stats"][student_id]["scores"][-100:]
self._save_individual_memory()
def add_masterpiece(self, text: str, source: str):
for existing in self.common_memory["masterpieces"][-50:]:
if existing.get("text") == text:
return
self.common_memory["masterpieces"].append({"text": text, "source": source, "time": time.time()})
if len(self.common_memory["masterpieces"]) > 100:
self.common_memory["masterpieces"] = self.common_memory["masterpieces"][-100:]
self._save_individual_memory()
def evaluate_works_quality(self, works: List[str]) -> float:
if not works:
return 5.0
txt = "\n".join([f"{i+1}. {w[:100]}" for i, w in enumerate(works[-5:])])
prompt = f"评价以下作品质量,0-10分,10分最高。优先看语义连贯性和逻辑完整性。只输出平均分。\n{txt}"
resp = call_deepseek(prompt, max_tokens=50, temperature=0.3)
if resp:
try:
return min(10, max(0, float(re.search(r'(\d+(?:\.\d+)?)', resp).group(1))))
except:
pass
return 5.0
def self_evolve(self, system):
self.dao_novelty = self.dao.novelty(6)
recent = system.jin4.works[-10:] if system.jin4.works else []
quality = self.evaluate_works_quality(recent)
adj = quality * (0.8 + self.dao_novelty * 0.4)
self.quality_rule = f"基于第{system.round}轮质量({adj:.1f}/10)调整,道新奇:{self.dao_novelty:.2f}"
self.evolution_advice = f"质量{adj:.1f}"
self._save_standard(system)
self._save_individual_memory()
print(f" 📊 洛书中心标准已保存 (质量={adj:.1f}, 道新奇={self.dao_novelty:.2f})")
if system.round % 500 == 0:
texts = [mp.get("text", "") for mp in self.common_memory["masterpieces"][-50:]]
if texts:
self.synonym_learner.learn_from_corpus(texts)
if system.round % 100 == 0:
self.water_pool._dirty = True
self.water_pool._sort_and_trim()
def get_pool_stats(self) -> dict:
return {
"fire_pool": self.fire_pool.get_stats(),
"wood_pool": self.wood_pool.get_stats(),
"water_pool": self.water_pool.get_stats(),
"golden_pool": self.golden_pool.get_stats(),
"masterpieces": len(self.common_memory["masterpieces"]),
"dao_pointer": self.dao.get_pointer()
}
def update_weights_and_stage(self, system):
for sid, w in self.common_memory["weights"].items():
stat = self.common_memory["stats"].get(sid, {"pass_count": 0, "total_count": 1})
rate = stat["pass_count"] / max(1, stat["total_count"])
if rate > 0.8:
self.common_memory["weights"][sid] = max(0.5, w * 0.9)
elif rate < 0.3:
self.common_memory["weights"][sid] = min(2.0, w * 1.2)
all_pass = all(self.common_memory["stats"].get(sid, {"pass_count": 0, "total_count": 1}).get("pass_count", 0) / max(1, self.common_memory["stats"].get(sid, {"total_count": 1}).get("total_count", 1)) > 0.7 for sid in ["火2", "木3", "水1", "金4"])
if not all_pass:
return
if self.stage_start_round == 0:
self.stage_start_round = system.round
rounds = system.round - self.stage_start_round
if rounds < self.min_rounds_per_stage:
print(f"\n📚 四部已合格,磨练中({rounds}/{self.min_rounds_per_stage}轮)")
return
stage = self.common_memory["stage"]
stages = ["小学", "中学", "大学", "研究生"]
idx = stages.index(stage)
if idx >= len(stages)-1:
return
self.common_memory["stage"] = stages[idx+1]
self.stage_start_round = system.round
print(f"\n🎉 升入{stages[idx+1]}阶段!")
self._save_individual_memory()
# ==================== 主系统 ====================
class HeTuLuoShuSystem:
def __init__(self):
print("🐉 河图洛书智能体 - V6(集成感知系统,移除旧火2初始化)")
print(" 火2:感知系统共享语素库")
print(" 木3:按调子造句 → 老师8批改")
print(" 水1:杂交调子生成变体 → 老师6批改")
print(" 金4:筛选固化 → 老师9批改")
print(" 感知系统:134项能力,所有学生共享,永久固化")
print("="*60)
# 创建感知系统(所有学生共享)
self.perception = HetuPerception(location="襄阳")
# 添加种子语料
seed_texts = [
"天地玄黄,宇宙洪荒。日月盈昃,辰宿列张。寒来暑往,秋收冬藏。",
"道可道,非常道;名可名,非常名。无名天地之始,有名万物之母。",
"河图者,阴阳之象也;洛书者,五行之数也。",
"易经曰:天行健,君子以自强不息;地势坤,君子以厚德载物。",
"阴阳相生,五行相克。万物负阴而抱阳,冲气以为和。",
"大学之道,在明明德,在亲民,在止于至善。",
"仁者爱人,义者循理,礼者敬人,智者知人,信者诚人。",
"天之道,损有余而补不足;人之道,损不足以奉有余。"
]
for text in seed_texts:
self.perception.train_from_text(text)
target_dirs = ["learning_material"]
self.corpus_reader = CorpusReader(target_dirs)
self.crawler_lock = False
self.huo2 = Huo2(self.corpus_reader, self.perception)
self.mu3 = Mu3(self.corpus_reader, self.perception)
self.shui1 = Shui1("1", "水1-变", self.perception)
self.jin4 = Jin4(self.perception)
self.students = {"火2": self.huo2, "木3": self.mu3, "水1": self.shui1, "金4": self.jin4}
self.teachers = {
"7": Teacher("7", "火2", self.perception),
"8": Teacher("8", "木3", self.perception),
"6": Teacher("6", "水1", self.perception),
"9": Teacher("9", "金4", self.perception)
}
self.luoshu = LuoShuCenter(self.perception)
self.huo2.teacher7 = self.teachers["7"]
self.mu3.teacher8 = self.teachers["8"]
self.shui1.teacher6 = self.teachers["6"]
self.jin4.teacher9 = self.teachers["9"]
for s in self.students.values():
s.luoshu = self.luoshu
for t in self.teachers.values():
t.luoshu = self.luoshu
self.round = 0
self.masterpiece_round = 0
self._load_global_memory()
os.makedirs("masterpieces", exist_ok=True)
os.makedirs("standards", exist_ok=True)
os.makedirs("golden_works", exist_ok=True)
os.makedirs("memories", exist_ok=True)
for t in self.teachers.values():
t._save_standard(0)
def _atomic_write(self, filename: str, data: dict):
tmp = filename + ".tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp, filename)
def get_crawler_size(self) -> int:
return 0
def _check_and_recover_teachers(self):
for t in self.teachers.values():
t.try_recover()
def _get_global_filename(self) -> str:
return "memory_snapshot.json"
def _save_global_memory(self):
data = {
"round": self.round,
"masterpiece_round": self.masterpiece_round,
"common_memory": self.luoshu.common_memory,
"teachers": {tid: {"pass_threshold": t.pass_threshold, "rule_description": t.rule_description, "use_fallback": t.use_fallback} for tid, t in self.teachers.items()}
}
self._atomic_write(self._get_global_filename(), data)
def _load_global_memory(self):
fname = self._get_global_filename()
if os.path.exists(fname):
try:
with open(fname, 'r', encoding='utf-8') as f:
data = json.load(f)
self.round = data.get("round", 0)
self.masterpiece_round = data.get("masterpiece_round", 0)
self.luoshu.common_memory = data.get("common_memory", self.luoshu.common_memory)
for tid, tdata in data.get("teachers", {}).items():
if tid in self.teachers:
self.teachers[tid].pass_threshold = tdata.get("pass_threshold", 0.6)
self.teachers[tid].rule_description = tdata.get("rule_description", self.teachers[tid].rule_description)
self.teachers[tid].use_fallback = tdata.get("use_fallback", False)
print(f"📂 恢复全局记忆,从第 {self.round} 轮继续")
except:
pass
def _save_masterpiece(self):
if self.round - self.masterpiece_round >= 100 and self.round > 0:
self.masterpiece_round = self.round
if self.jin4.works:
latest = self.jin4.works[-1]
fname = f"masterpieces/masterpiece_{self.round:06d}.txt"
with open(fname, 'w', encoding='utf-8') as f:
f.write(f"# 第{self.round}轮作品\n# {datetime.now()}\n# 道:{self.luoshu.dao.get_pointer()}位\n\n{latest}\n")
print(f"\n📖 已保存作品: {fname}\n")
def _get_learning_topic(self, name: str) -> str:
topics = {
"火2-化": "中文文本特征提取和关键词抽取方法",
"木3-生": "如何生成通顺、有意义的完整中文句子",
"水1-变": "中文句子的变形技巧:反转、移位、重组",
"金4-成": "优秀文学作品的评判标准和语言艺术"
}
return topics.get(name, "中文语言学习")
def _run_student(self, student, teacher, prev=None):
name = student.name
if student.blocked:
print(f" 🚫 [{name}] 阻塞中,学习轮次: {student.learning_rounds}/3")
if student.can_retry():
print(f" 🔓 [{name}] 已解封,开始执行")
else:
student.learn(self._get_learning_topic(name))
return None, False
ctx = {}
if name == "木3-生" and prev:
ctx["units"] = prev.get("units", [])
elif name == "水1-变" and prev:
ctx["sentence"] = prev.get("sentence", "")
elif name == "金4-成" and prev:
ctx["variants"] = prev.get("variants", [])
ctx["teacher9_threshold"] = teacher.pass_threshold
out = student.execute(ctx)
passed, score, comment = teacher.evaluate(out)
student.receive_feedback(score, comment)
self.luoshu.report(name.split("-")[0], score, passed)
print(f" 📋 老师{teacher.id}: {'✅通过' if passed else '❌不通过'} (分:{score:.2f}) {comment}")
if passed:
student.on_pass()
else:
student.on_fail(comment)
return out, passed
def run_one_round(self):
self.round += 1
print(f"\n{'='*60}")
print(f"第 {self.round} 轮")
stage = self.luoshu.common_memory.get("stage", "小学")
print(f"阶段: {stage} | 道已计算: {self.luoshu.dao.get_pointer()} 位圆周率")
print(f" 火2毕业: {self.huo2.is_graduated} | 木3毕业: {self.mu3.is_graduated} | 水1毕业: {self.shui1.is_graduated} | 金4毕业: {self.jin4.is_graduated}")
output = None
# 直接进入正常运作(不经过火2初始化)
output, passed = self._run_student(self.huo2, self.teachers["7"])
if not self.huo2.is_graduated:
self._save_global_memory()
return
for _ in range(10):
output, passed = self._run_student(self.mu3, self.teachers["8"], output)
if not self.mu3.is_graduated:
self._save_global_memory()
return
for _ in range(5):
output, passed = self._run_student(self.shui1, self.teachers["6"], output)
if not self.shui1.is_graduated:
self._save_global_memory()
return
output, passed = self._run_student(self.jin4, self.teachers["9"], output)
if self.round % 100 == 0 and self.round > 0:
print("\n【系统进化】")
for t in self.teachers.values():
t.self_evolve(self.round)
time.sleep(0.5)
self.luoshu.self_evolve(self)
self.luoshu.update_weights_and_stage(self)
self._check_and_recover_teachers()
self.luoshu.fire_pool.apply_thunder(10.0)
self.luoshu.wood_pool.apply_thunder(10.0)
self.luoshu.water_pool.apply_thunder(10.0)
if self.round % 50 == 0:
stats = self.luoshu.get_pool_stats()
print(f"\n📊 统计:")
print(f" 火池: {stats['fire_pool']}")
print(f" 木池: {stats['wood_pool']}")
print(f" 水池: {stats['water_pool']}")
print(f" 金池: {stats['golden_pool']}")
print(f" 道: {stats['dao_pointer']} 位")
print(f" 感知系统API调用: {self.perception.get_api_calls()}")
self._save_global_memory()
self._save_masterpiece()
def run_forever(self):
print("\n🚀 进入永久学习模式...")
print(" 感知系统:134项能力,所有学生共享")
print(" 知识固化:第一次学习,永久记住")
print(" Ctrl+C 可安全中断\n")
try:
while True:
self.run_one_round()
time.sleep(0.2)
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断,状态已保存")
print(f" 当前轮数: {self.round}")
print(f" 金4作品数: {len(self.jin4.works)}")
print(f" 道已计算: {self.luoshu.dao.get_pointer()} 位圆周率")
print(f" 感知系统API调用: {self.perception.get_api_calls()}")
self._save_global_memory()
for s in self.students.values():
s._save_individual_memory()
for t in self.teachers.values():
t._save_individual_memory()
self.luoshu._save_individual_memory()
self.perception._save_dicts()
print(" 下次运行将从中断处继续")
print("\n🐉 河图洛书智能体 V6 已休眠")
if __name__ == "__main__":
system = HeTuLuoShuSystem()
system.run_forever() |