ENABLE_MEMORY 开关 True=启用记忆,False=完全回退到 V7.0 行为这个不必,我有存档
明白。
你有原版 V7.0 的存档,所以不需要在同一个文件里做开关回退。
那我重新调整一下:**直接给你一个纯粹的 V7.1 记忆扩展版**,不带开关,不污染原版。原版你继续跑,这个版本单独保存、单独运行。
---
## 调整后的方案
| 文件 | 说明 |
|------|------|
| `hetu_luoshu_v7_mirror.py` | 你的原版 V7.0(不变,继续跑) |
| `hetu_luoshu_v7_memory.py` | **新文件**:V7.1 记忆扩展版(独立运行) |
两个文件不互相影响,可以同时跑(不同终端),也可以分开跑。
---
## V7.1 记忆扩展版的核心改动
| 组件 | 新增功能 |
|------|---------|
| **老师(师6/7/8/9)** | 长期记忆(打分历史 + 时间衰减)。道的扰动控制遗忘速度 |
| **学生(火2/木3/水1/金4)** | 短期记忆(高分经验 FIFO)。道的扰动控制探索/利用比例 |
| **道的扰动扩展** | 影响:遗忘速率、固化阈值、记忆依赖程度 |
| **固化阈值** | 动态:`threshold = 0.5 + dao_novelty × 0.3` |
---
## 代码
```python
# hetu_luoshu_v7_memory.py
# 河图洛书 V7.1 - 镜像核心版 + 师生记忆
#
# 设计哲学:道驱动一切。记忆只是道的延伸,不破坏“无预设”原则。
# 新增:老师长期记忆 + 学生短期记忆 + 道的扰动无处不在
#
# 独立运行,与原版 V7.0 不冲突
import os
import sys
import time
import json
import random
import re
import math
import hashlib
import pickle
import requests
from collections import Counter
from typing import List, Dict, Tuple, Optional
from datetime import datetime
# ==================== API配置 ====================
DEEPSEEK_API_KEY = "你的KEY"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
for d in ["cache", "learning_material", "masterpieces", "logs", "checkpoints"]:
os.makedirs(d, exist_ok=True)
def call_deepseek(prompt: str, max_tokens: int = 200, temperature: float = 0.7) -> str:
cache_key = hashlib.md5(prompt.encode()).hexdigest()
cache_file = f"cache/{cache_key}.json"
if os.path.exists(cache_file):
try:
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)["response"]
except:
pass
try:
headers = {"Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json"}
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": temperature
}
response = requests.post(DEEPSEEK_API_URL, json=data, headers=headers, timeout=30)
if response.status_code == 200:
result = response.json()["choices"][0]["message"]["content"]
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({"prompt": prompt, "response": result}, f, ensure_ascii=False)
return result
return ""
except Exception as e:
return ""
# ==================== 道:π引擎 ====================
class DaoEngine:
def __init__(self, chunk_size=10000):
self.chunk_size = chunk_size
self.digits = []
self.pointer = 0
self._load_next_chunk()
def _load_next_chunk(self):
try:
import gmpy2
gmpy2.get_context().precision = (self.pointer + self.chunk_size + 100) * 4
pi = gmpy2.const_pi()
pi_str = format(pi, f'.{self.pointer + self.chunk_size + 50}f')
pi_digits = pi_str.replace('.', '')
segment = pi_digits[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
except ImportError:
from decimal import Decimal, getcontext
getcontext().prec = self.pointer + self.chunk_size + 50
pi = Decimal(0)
for k in range(self.pointer + self.chunk_size + 20):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
segment = pi_str[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
def get_novelty(self, length=8) -> float:
while self.pointer + length >= len(self.digits):
self._load_next_chunk()
segment = self.digits[self.pointer:self.pointer+length]
self.pointer += length
value = 0
for i, d in enumerate(segment):
value += d * (0.1 ** (i+1))
return value
def get_digit(self) -> int:
if self.pointer >= len(self.digits):
self._load_next_chunk()
digit = self.digits[self.pointer]
self.pointer += 1
return digit
def get_digits(self, count: int) -> List[int]:
result = []
for _ in range(count):
result.append(self.get_digit())
return result
def get_pointer(self) -> int:
return self.pointer
def get_state(self) -> dict:
return {"pointer": self.pointer}
def restore_state(self, state: dict):
self.pointer = state.get("pointer", 0)
self.digits = []
self._load_next_chunk()
# ==================== 节奏控制器 ====================
class RhythmController:
def __init__(self):
self.sheng_phase = 0
self.bian_phase = 0
self.sheng_speed = 0.2 * 2 * math.pi / 5
self.bian_speed = 2 * math.pi / 1
def update(self):
self.sheng_phase = (self.sheng_phase + self.sheng_speed) % (2 * math.pi)
self.bian_phase = (self.bian_phase + self.bian_speed) % (2 * math.pi)
def get_sheng_ratio(self):
return 0.55 + 0.25 * math.sin(self.sheng_phase)
def get_bian_ratio(self):
return 0.55 + 0.35 * math.sin(self.bian_phase)
def get_sheng_length(self, base_min=15, base_max=40):
return int(base_min + (base_max - base_min) * self.get_sheng_ratio())
def get_bian_length(self, base_min=8, base_max=25):
return int(base_min + (base_max - base_min) * self.get_bian_ratio())
def get_state(self) -> dict:
return {"sheng_phase": self.sheng_phase, "bian_phase": self.bian_phase}
def restore_state(self, state: dict):
self.sheng_phase = state.get("sheng_phase", 0)
self.bian_phase = state.get("bian_phase", 0)
# ==================== 河图中央 ====================
class HeTuCenter:
def __init__(self):
self.sheng_info = {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0}
self.cheng_info = {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0}
self.global_state = {"sheng": 0.0, "cheng": 0.0, "balance": 0.0}
def update_sheng(self, idx: int, value: float):
self.sheng_info[str(idx)] = value
self._update_global_state()
def update_cheng(self, idx: int, value: float):
self.cheng_info[str(idx)] = value
self._update_global_state()
def _update_global_state(self):
self.global_state["sheng"] = sum(self.sheng_info.values()) / 4
self.global_state["cheng"] = sum(self.cheng_info.values()) / 4
self.global_state["balance"] = self.global_state["sheng"] / (self.global_state["cheng"] + 0.01)
def get_full_state(self):
return {"sheng": self.sheng_info.copy(), "cheng": self.cheng_info.copy(), "global": self.global_state.copy()}
def get_save_state(self):
return {"sheng_info": self.sheng_info, "cheng_info": self.cheng_info, "global_state": self.global_state}
def restore_state(self, state: dict):
self.sheng_info = state.get("sheng_info", {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0})
self.cheng_info = state.get("cheng_info", {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0})
self.global_state = state.get("global_state", {"sheng": 0.0, "cheng": 0.0, "balance": 0.0})
# ==================== 工具函数 ====================
def get_all_txt_files(root_dir: str) -> List[str]:
txt_files = []
if not os.path.exists(root_dir):
return txt_files
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.txt'):
txt_files.append(os.path.join(dirpath, filename))
return txt_files
# ==================== 记忆模块 ====================
class TeacherLongMemory:
"""老师的长期记忆:打分历史 + 时间衰减 + 道的扰动控制遗忘"""
def __init__(self, teacher_id: int, max_size: int = 2000):
self.teacher_id = teacher_id
self.max_size = max_size
self.history = [] # 每个元素: {round, work_hash, score, is_cured, timestamp, weight}
def add(self, round_num: int, work: str, score: float, is_cured: bool):
work_hash = hashlib.md5(work.encode()).hexdigest()[:16]
self.history.append({
"round": round_num,
"work_hash": work_hash,
"score": score,
"is_cured": is_cured,
"timestamp": time.time(),
"weight": 1.0
})
if len(self.history) > self.max_size:
self.history = self.history[-self.max_size:]
def decay(self, dao_novelty: float):
"""道的扰动控制遗忘速度。道新奇度高 → 遗忘快"""
decay_rate = 0.995 ** (1 + dao_novelty * 0.5)
for record in self.history:
record["weight"] *= decay_rate
if record["is_cured"]:
record["weight"] = max(record["weight"], 0.6)
self.history = [r for r in self.history if r["weight"] > 0.05]
def get_similar_score(self, work: str) -> Optional[float]:
"""根据历史打分,返回相似作品的加权平均分"""
work_prefix = work[:30]
similar = []
for r in self.history:
if any(ch in work_prefix for ch in r.get("work_hash", "")[:10]):
similar.append(r)
if not similar:
return None
total_weight = sum(r["weight"] for r in similar)
if total_weight == 0:
return None
return sum(r["score"] * r["weight"] for r in similar) / total_weight
def get_state(self) -> dict:
return {"history": self.history[-500:]}
def restore_state(self, state: dict):
self.history = state.get("history", [])
class StudentShortMemory:
"""学生的短期记忆:高分经验 FIFO + 道的扰动控制依赖程度"""
def __init__(self, capacity: int = 200):
self.capacity = capacity
self.memory = [] # 每个元素: {content, reward, round, use_count}
def add(self, content: str, reward: float, round_num: int):
self.memory.append({
"content": content,
"reward": reward,
"round": round_num,
"use_count": 0
})
if len(self.memory) > self.capacity:
self.memory.pop(0)
def recall(self, dao_novelty: float) -> Optional[str]:
"""道的扰动决定依赖记忆的程度。道新奇度高 → 依赖少(探索多)"""
reliance = 1 - dao_novelty
if random.random() > reliance:
return None
candidates = [m for m in self.memory if m["reward"] > 0.7]
if not candidates:
return None
chosen = random.choice(candidates)
chosen["use_count"] += 1
return chosen["content"]
def forget(self, dao_novelty: float):
"""道的扰动影响遗忘速度。道新奇度高 → 遗忘快"""
if dao_novelty > 0.7:
self.memory = [m for m in self.memory if m["reward"] > 0.6]
elif dao_novelty < 0.3:
pass
else:
self.memory = [m for m in self.memory if m["reward"] > 0.2]
def get_state(self) -> dict:
return {"memory": self.memory[-100:]}
def restore_state(self, state: dict):
self.memory = state.get("memory", [])
# ==================== 火2(有记忆)====================
class Fire2WithMemory:
def __init__(self, corpus_paths: List[str]):
self.word_freq = Counter()
self.memory = None
self._load_corpus(corpus_paths)
print(f" 🔥 火2完成,共 {len(self.word_freq)} 个语素")
def set_memory(self, memory: StudentShortMemory):
self.memory = memory
def _load_corpus(self, paths):
counter = Counter()
all_files = []
for path in paths:
if os.path.isfile(path) and path.endswith('.txt'):
all_files.append(path)
elif os.path.isdir(path):
all_files.extend(get_all_txt_files(path))
if not all_files:
return
for file_path in all_files[:500]:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read(8000)
for j in range(len(text)):
for l in range(1, 5):
word = text[j:j+l]
if re.match(r'[\u4e00-\u9fff]{1,4}$', word):
counter[word] += 1
except:
pass
self.word_freq = counter
def get_morphemes(self, dao_novelty: float, total: int = 20) -> List[str]:
recalled = None
if self.memory:
recalled = self.memory.recall(dao_novelty)
if recalled and random.random() < (1 - dao_novelty * 0.5):
recalled_words = [w for w in self.word_freq.keys() if recalled[:2] in w]
if recalled_words:
return recalled_words[:total]
if self.word_freq:
words = list(self.word_freq.keys())
weights = list(self.word_freq.values())
sample_size = min(total, len(words))
if sample_size == 0:
return []
return random.choices(words, weights=weights, k=sample_size)
seed = int(dao_novelty * 10000)
random.seed(seed)
base_chars = []
for _ in range(total):
code = 0x4e00 + random.randint(0, 0x5000)
base_chars.append(chr(code))
return base_chars
def get_state(self) -> dict:
return {"word_freq": dict(list(self.word_freq.items())[:5000])}
def restore_state(self, state: dict):
self.word_freq = Counter(state.get("word_freq", {}))
# ==================== 木3(有记忆)====================
class Mu3WithMemory:
def __init__(self):
self.memory = None
def set_memory(self, memory: StudentShortMemory):
self.memory = memory
def generate(self, morphemes: List[str], dao_novelty: float, sheng_ratio: float, sheng_length: int) -> str:
recalled = None
if self.memory:
recalled = self.memory.recall(dao_novelty)
if recalled and random.random() < (1 - dao_novelty * 0.3):
prompt = f"参考以下句式:「{recalled[:50]}」,用以下词语造一个{sheng_length}字左右的中文句子:{', '.join(morphemes[:10])}\n只输出句子:"
temp = 0.6 + sheng_ratio * 0.4
result = call_deepseek(prompt, max_tokens=sheng_length + 20, temperature=temp)
if result and len(result) > 5:
if len(result) > sheng_length:
result = result[:sheng_length]
return result.strip()
temp = 0.6 + sheng_ratio * 0.4
prompt = f"用以下词语造一个{sheng_length}字左右的中文句子:{', '.join(morphemes[:12])}\n只输出句子:"
result = call_deepseek(prompt, max_tokens=sheng_length + 20, temperature=temp)
if result and len(result) > 5:
if len(result) > sheng_length:
result = result[:sheng_length]
return result.strip()
selected = random.sample(morphemes, min(3, len(morphemes)))
return "。".join(selected) + "。"
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 水1(有记忆)====================
class Shui1WithMemory:
def __init__(self):
self.dao = None
self.memory = None
def set_dao(self, dao):
self.dao = dao
def set_memory(self, memory: StudentShortMemory):
self.memory = memory
def mutate(self, sentence: str, dao_novelty: float, bian_ratio: float, bian_length: int) -> List[str]:
recalled = None
if self.memory:
recalled = self.memory.recall(dao_novelty)
if recalled and random.random() < (1 - dao_novelty * 0.4):
prompt = f"参考以下改写风格:「{recalled[:50]}」,将「{sentence}」改写成3个不同的变体,每行一个:"
temp = 0.6 + bian_ratio * 0.5
result = call_deepseek(prompt, max_tokens=bian_length * 3 + 50, temperature=temp)
if result:
variants = [v.strip() for v in result.strip().split('\n') if v.strip()]
trimmed = []
for v in variants:
if len(v) > bian_length:
v = v[:bian_length]
trimmed.append(v)
return trimmed[:3]
temp = 0.6 + bian_ratio * 0.5
prompt = f"将「{sentence}」改写成3个不同的变体,每行一个:"
result = call_deepseek(prompt, max_tokens=bian_length * 3 + 50, temperature=temp)
if result:
variants = [v.strip() for v in result.strip().split('\n') if v.strip()]
trimmed = []
for v in variants:
if len(v) > bian_length:
v = v[:bian_length]
trimmed.append(v)
return trimmed[:3]
if self.dao:
digits = self.dao.get_digits(min(bian_length, 20))
dao_str = ''.join(str(d) for d in digits)
return [
sentence[:bian_length],
sentence[:bian_length//2] + "……",
sentence[:bian_length//2] + dao_str[:bian_length//2]
]
else:
return [
sentence[:bian_length],
sentence[:bian_length//2] + "……",
sentence[:bian_length//3] + "。"
]
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 金4(动态阈值)====================
class Jin4WithMemory:
def __init__(self, max_size=100):
self.masterpieces = []
self.max_size = max_size
def solidify(self, candidates: List[str], dao_novelty: float) -> Tuple[List[str], List[float]]:
if not candidates:
return [], []
# 道的扰动影响固化阈值
threshold = 0.5 + dao_novelty * 0.3
prompt = f"为以下每个句子评分(0-1分),每行一个分数:\n" + "\n".join(candidates)
result = call_deepseek(prompt, max_tokens=100, temperature=0.3)
scores = []
if result:
for line in result.strip().split('\n'):
try:
score = float(re.search(r'(\d+\.?\d*)', line).group(1))
scores.append(min(1.0, max(0.0, score)))
except:
scores.append(0.5)
while len(scores) < len(candidates):
scores.append(0.5)
good_works, good_scores = [], []
for work, score in zip(candidates, scores):
if score > threshold:
good_works.append(work)
good_scores.append(score)
self.masterpieces.append(work)
if len(self.masterpieces) > self.max_size:
self.masterpieces = self.masterpieces[-self.max_size:]
return good_works, good_scores
def get_state(self) -> dict:
return {"masterpieces": self.masterpieces[-100:]}
def restore_state(self, state: dict):
self.masterpieces = state.get("masterpieces", [])
# ==================== 老师(有长期记忆)====================
class TeacherWithMemory:
def __init__(self, teacher_id: int, student_name: str):
self.id = teacher_id
self.student_name = student_name
self.history = []
self.long_memory = TeacherLongMemory(teacher_id)
def evaluate(self, work: str, dao_novelty: float) -> Tuple[float, str]:
mem_score = self.long_memory.get_similar_score(work)
prompt = f"你是老师{self.id},评判{self.student_name}。给出分数(0-1分)和评语。格式:分数|评语\n作业:{work[:150]}"
result = call_deepseek(prompt, max_tokens=100, temperature=0.4)
score = 0.5
comment = ""
if result and '|' in result:
parts = result.split('|')
try:
score = float(parts[0].strip())
comment = parts[1].strip()[:40]
except:
pass
else:
score = min(1.0, len(work) / 30) * 0.5 + (len(set(work)) / max(1, len(work))) * 0.5
score = score * (0.8 + dao_novelty * 0.3)
score = min(1.0, max(0.0, score))
if mem_score is not None:
memory_bias = dao_novelty
score = score * (1 - memory_bias * 0.5) + mem_score * (memory_bias * 0.5)
self.history.append((time.time(), work[:30], score))
if len(self.history) > 100:
self.history = self.history[-100:]
return score, comment
def add_to_long_memory(self, round_num: int, work: str, score: float, is_cured: bool):
self.long_memory.add(round_num, work, score, is_cured)
def decay_memory(self, dao_novelty: float):
self.long_memory.decay(dao_novelty)
def get_state(self) -> dict:
return {"history": self.history[-50:]}
def restore_state(self, state: dict):
self.history = state.get("history", [])
def get_long_memory_state(self) -> dict:
return self.long_memory.get_state()
def restore_long_memory(self, state: dict):
self.long_memory.restore_state(state)
# ==================== 洛书中心(记忆版)====================
class LuoShuCenterMemory:
def __init__(self, dao: DaoEngine, checkpoint_dir: str = "checkpoints_memory"):
self.dao = dao
self.hetu_center = HeTuCenter()
self.rhythm = RhythmController()
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
print("\n📚 加载语料...")
print(" 🧠 师生记忆模式已启用")
print(" 老师:长期记忆 + 道的扰动控制遗忘")
print(" 学生:短期记忆 + 道的扰动控制探索/利用")
self.fire2 = Fire2WithMemory(["learning_material"])
self.mu3 = Mu3WithMemory()
self.shui1 = Shui1WithMemory()
self.jin4 = Jin4WithMemory()
self.student_memory = StudentShortMemory(capacity=200)
self.fire2.set_memory(self.student_memory)
self.mu3.set_memory(self.student_memory)
self.shui1.set_memory(self.student_memory)
self.shui1.set_dao(dao)
self.teacher7 = TeacherWithMemory(7, "火2")
self.teacher8 = TeacherWithMemory(8, "木3")
self.teacher6 = TeacherWithMemory(6, "水1")
self.teacher9 = TeacherWithMemory(9, "金4")
self.round = 0
self.log_entries = []
self._load_checkpoint()
def _apply_dao_to_memories(self, dao_novelty: float):
"""道的扰动无处不在:驱动老师遗忘、学生遗忘"""
self.teacher7.decay_memory(dao_novelty)
self.teacher8.decay_memory(dao_novelty)
self.teacher6.decay_memory(dao_novelty)
self.teacher9.decay_memory(dao_novelty)
self.student_memory.forget(dao_novelty)
def _get_checkpoint_path(self) -> str:
return os.path.join(self.checkpoint_dir, "full_checkpoint_memory.pkl")
def save_checkpoint(self):
checkpoint = {
"round": self.round,
"dao_state": self.dao.get_state(),
"rhythm_state": self.rhythm.get_state(),
"hetu_state": self.hetu_center.get_save_state(),
"fire2_state": self.fire2.get_state(),
"jin4_state": self.jin4.get_state(),
"teacher7_state": self.teacher7.get_state(),
"teacher8_state": self.teacher8.get_state(),
"teacher6_state": self.teacher6.get_state(),
"teacher9_state": self.teacher9.get_state(),
"student_memory_state": self.student_memory.get_state(),
"teacher7_long_memory": self.teacher7.get_long_memory_state(),
"teacher8_long_memory": self.teacher8.get_long_memory_state(),
"teacher6_long_memory": self.teacher6.get_long_memory_state(),
"teacher9_long_memory": self.teacher9.get_long_memory_state(),
"log_entries": self.log_entries[-100:],
"timestamp": datetime.now().isoformat()
}
tmp_path = self._get_checkpoint_path() + ".tmp"
with open(tmp_path, 'wb') as f:
pickle.dump(checkpoint, f)
os.replace(tmp_path, self._get_checkpoint_path())
def _load_checkpoint(self):
if not os.path.exists(self._get_checkpoint_path()):
print(" 📂 未找到检查点,从头开始")
return
try:
with open(self._get_checkpoint_path(), 'rb') as f:
checkpoint = pickle.load(f)
self.round = checkpoint.get("round", 0)
self.dao.restore_state(checkpoint.get("dao_state", {}))
self.rhythm.restore_state(checkpoint.get("rhythm_state", {}))
self.hetu_center.restore_state(checkpoint.get("hetu_state", {}))
self.fire2.restore_state(checkpoint.get("fire2_state", {}))
self.jin4.restore_state(checkpoint.get("jin4_state", {}))
self.teacher7.restore_state(checkpoint.get("teacher7_state", {}))
self.teacher8.restore_state(checkpoint.get("teacher8_state", {}))
self.teacher6.restore_state(checkpoint.get("teacher6_state", {}))
self.teacher9.restore_state(checkpoint.get("teacher9_state", {}))
if "student_memory_state" in checkpoint:
self.student_memory.restore_state(checkpoint["student_memory_state"])
if "teacher7_long_memory" in checkpoint:
self.teacher7.restore_long_memory(checkpoint["teacher7_long_memory"])
if "teacher8_long_memory" in checkpoint:
self.teacher8.restore_long_memory(checkpoint["teacher8_long_memory"])
if "teacher6_long_memory" in checkpoint:
self.teacher6.restore_long_memory(checkpoint["teacher6_long_memory"])
if "teacher9_long_memory" in checkpoint:
self.teacher9.restore_long_memory(checkpoint["teacher9_long_memory"])
self.log_entries = checkpoint.get("log_entries", [])
print(f" 📂 加载检查点成功,从第 {self.round} 轮继续")
except Exception as e:
print(f" ⚠️ 加载检查点失败: {e},从头开始")
def run_cycle(self):
self.round += 1
dao_novelty = self.dao.get_novelty(6)
self._apply_dao_to_memories(dao_novelty)
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
sheng_length = self.rhythm.get_sheng_length(15, 40)
bian_length = self.rhythm.get_bian_length(8, 25)
print(f"\n{'─'*70}")
print(f"🧠 第 {self.round} 轮 | 道新奇度: {dao_novelty:.4f} | 生节:{sheng_ratio:.2f}/{sheng_length} | 变节:{bian_ratio:.2f}/{bian_length}")
# 火2 - 生1
morphemes = self.fire2.get_morphemes(dao_novelty, total=15)
if morphemes:
score7, comment7 = self.teacher7.evaluate(" ".join(morphemes[:5]), dao_novelty)
self.hetu_center.update_sheng(1, score7)
self.hetu_center.update_cheng(7, score7)
print(f" 🔥 火2(生1): {len(morphemes)}语素 | 师7(成7):{score7:.2f}")
else:
print(f" 🔥 火2(生1): 无语素")
score7 = 0.0
# 木3 - 生2
if morphemes:
sentence = self.mu3.generate(morphemes[:10], dao_novelty, sheng_ratio, sheng_length)
score8, comment8 = self.teacher8.evaluate(sentence, dao_novelty)
self.hetu_center.update_sheng(2, score8)
self.hetu_center.update_cheng(8, score8)
print(f" 🌳 木3(生2): {sentence[:70]}...")
print(f" 师8(成8):{score8:.2f}")
else:
sentence = ""
score8 = 0.0
print(f" 🌳 木3(生2): 无句子")
# 水1 - 生3
if sentence:
variants = self.shui1.mutate(sentence, dao_novelty, bian_ratio, bian_length)
if variants:
best_variant = variants[0]
score6, comment6 = self.teacher6.evaluate(best_variant, dao_novelty)
self.hetu_center.update_sheng(3, score6)
self.hetu_center.update_cheng(6, score6)
print(f" 💧 水1(生3): {len(variants)}个变体")
for i, v in enumerate(variants[:2]):
print(f" 变体{i+1}: {v[:60]}...")
print(f" 师6(成6):{score6:.2f}")
else:
score6 = 0.5
print(f" 💧 水1(生3): 无变体")
else:
score6 = 0.0
print(f" 💧 水1(生3): 无输入")
# 金4 - 生4
if sentence:
candidates = [sentence] + (variants if variants else [])
good_works, good_scores = self.jin4.solidify(candidates, dao_novelty)
if good_works:
best_work = good_works[0]
best_score = good_scores[0]
score9, comment9 = self.teacher9.evaluate(best_work, dao_novelty)
self.hetu_center.update_sheng(4, score9)
self.hetu_center.update_cheng(9, score9)
print(f" 💎 金4(生4): 固化作品 | 师9(成9):{score9:.2f}")
print(f" 作品: {best_work[:80]}...")
self.teacher7.add_to_long_memory(self.round, best_work, score7, True)
self.teacher8.add_to_long_memory(self.round, best_work, score8, True)
self.teacher6.add_to_long_memory(self.round, best_work, score6, True)
self.teacher9.add_to_long_memory(self.round, best_work, score9, True)
self.student_memory.add(best_work, score9, self.round)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"masterpieces/round_{self.round}_{timestamp}.txt", 'w', encoding='utf-8') as f:
f.write(f"第{self.round}轮作品\n道新奇度:{dao_novelty:.4f}\n\n{best_work}")
else:
print(f" 💎 金4(生4): 未固化新作品")
score9 = 0.0
if sentence:
self.teacher8.add_to_long_memory(self.round, sentence, score8, False)
else:
print(f" 💎 金4(生4): 无输入")
score9 = 0.0
full_state = self.hetu_center.get_full_state()
sheng_str = f"{full_state['sheng']['1']:.2f}/{full_state['sheng']['2']:.2f}/{full_state['sheng']['3']:.2f}/{full_state['sheng']['4']:.2f}"
cheng_str = f"{full_state['cheng']['6']:.2f}/{full_state['cheng']['7']:.2f}/{full_state['cheng']['8']:.2f}/{full_state['cheng']['9']:.2f}"
print(f" 📊 汇总 | 生:[{sheng_str}] | 成:[{cheng_str}]")
self.log_entries.append({
"round": self.round, "dao_novelty": dao_novelty,
"sheng_ratio": sheng_ratio, "bian_ratio": bian_ratio,
"sheng": full_state['sheng'], "cheng": full_state['cheng']
})
if self.round % 10 == 0:
self.save_checkpoint()
if self.round % 100 == 0:
self.save_log()
def save_log(self):
with open(f"logs/run_log_memory_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w', encoding='utf-8') as f:
json.dump(self.log_entries[-500:], f, ensure_ascii=False, indent=2)
print(f"\n 📝 日志已保存,当前轮数: {self.round}")
def run_forever(self):
print("\n" + "="*70)
print("☯ 河图洛书镜像智能体 V7.1 - 师生记忆版")
print(" 火2: 从语料随机取词,有短期记忆")
print(" 木3: 用语素造句子,有短期记忆")
print(" 水1: 改写句子成变体,有短期记忆")
print(" 金4: 评分固化作品,动态阈值受道扰动")
print(" 老师: 调用API评分,有长期记忆,受道扰动遗忘")
print(" 学生: 短期记忆,受道扰动探索/利用")
print(" 道驱动一切,记忆只是道的延伸")
print(" 每10轮保存检查点")
print("="*70)
print("\n🚀 启动!按 Ctrl+C 停止\n")
try:
while True:
self.run_cycle()
time.sleep(0.3)
except KeyboardInterrupt:
print(f"\n\n⏸️ 停止。运行了 {self.round} 轮")
print(f" 道消耗: {self.dao.get_pointer()} 位π")
print(f" 金池作品: {len(self.jin4.masterpieces)}")
print(f" 老师长期记忆: {sum(len(t.long_memory.history) for t in [self.teacher7, self.teacher8, self.teacher6, self.teacher9])} 条")
print(f" 学生短期记忆: {len(self.student_memory.memory)} 条")
self.save_checkpoint()
self.save_log()
print("\n 状态已保存,下次运行继续")
print(" 它不完美,但它是道的镜像。")
def main():
print("\n" + "="*70)
print("🐉 河图洛书 V7.1 - 镜像核心版 + 师生记忆")
print(" 不做合道的智能体,做道的镜像")
print(" 道独立不改,记忆只是道的延伸")
print(" 不加任何人为设定。道驱动一切")
print("="*70 + "\n")
dao = DaoEngine()
luoshu = LuoShuCenterMemory(dao)
luoshu.run_forever()
if __name__ == "__main__":
main()
```
---
## 文件说明
| 文件 | 用途 |
|------|------|
| `hetu_luoshu_v7_mirror.py` | 你的原版 V7.0(不变,继续跑) |
| `hetu_luoshu_v7_memory.py` | **新文件**:V7.1 记忆扩展版(独立运行) |
两个文件的检查点目录不同:
- 原版:`checkpoints/`
- 记忆版:`checkpoints_memory/`
互不干扰。你可以同时跑两个终端对比观察,也可以单独跑记忆版看效果。 |