splinter-keep/tools/engine_lib/state.py
2026-07-05 12:50:22 +02:00

463 lines
15 KiB
Python

#!/usr/bin/env python3
"""
state.py — Game state persistence and file I/O for The Chaos engine.
Standalone functions that read/write session files. No dependency on
GameEngine or other modules besides paths.
"""
from __future__ import annotations
import re
import shutil
import sys
from datetime import datetime
from pathlib import Path
from .paths import (
CHAR_PATH, WORLD_PATH, BOOK_PATH, JOURNAL_PATH, AMBIENCE_PATH,
LOG_PATH, LLM_LOG_PATH, AMBIENCE_OPTIONS_PATH, CHANGES_PATH,
META_LOG_PATH, AUDIO_DIR, SESSION_DIR, ARCHIVE_DIR, THEMES_DIR,
ACTIVE_THEME_PATH,
)
from .models import TurnResult
def read_file(path: Path) -> str:
"""Read a text file, return empty string if missing."""
return path.read_text().strip() if path.exists() else ""
def read_recent_log(max_entries: int = 5) -> str:
"""Return the last N entries from the session log."""
if not LOG_PATH.exists():
return "*No recent events.*"
lines = LOG_PATH.read_text().splitlines()
entries = [l for l in lines if l.strip().startswith("- ")]
return "\n".join(entries[-max_entries:]) or "*No recent events.*"
def read_recent_book(max_turns: int = 1) -> str:
"""Return the last N turns from the book as context."""
text = read_file(BOOK_PATH)
if not text:
return "*No prior story.*"
turns = text.split("\n## ")
if len(turns) <= 1:
return turns[0]
recent = turns[-max_turns:]
if not recent[0].startswith("## "):
recent[0] = "## " + recent[0]
return "\n## ".join(recent)
def truncate_world(text: str) -> str:
"""Extract key world context: NPCs, factions, active threads, rumours."""
if not text or text == "*No world state.*":
return text
sections = re.split(r"\n(?=## |### )", text)
parts = []
for sec in sections:
header = sec.split("\n")[0].strip() if sec else ""
if "Active Threads" in header:
parts.append(sec)
elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header:
parts.append(sec)
result = "\n\n".join(parts)
return result or text[:1500] + "\n_(world truncated)_"
def get_valid_ambiences() -> set[str]:
"""Parse ambience_options.md and return set of valid ambience names."""
valid = {"silence"}
if not AMBIENCE_OPTIONS_PATH.exists():
return valid
in_table = False
for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines():
s = line.strip()
if not s.startswith("|") or not s.endswith("|"):
in_table = False
continue
if in_table and all(c in "-:| " for c in s):
continue
parts = [p.strip() for p in s.split("|") if p.strip()]
if not parts:
continue
if not in_table:
in_table = True
continue
name = parts[0].lower()
files_str = parts[1] if len(parts) > 1 else ""
files = [f.strip() for f in files_str.split(",")]
has_files = any((AUDIO_DIR / f).exists() or f for f in files)
if has_files:
valid.add(name)
return valid
def validate_update_size(name: str, new_content: str, path: Path) -> bool:
"""Reject updates more than 30% shorter than existing — likely partial paste."""
if not path.exists():
return True
old = path.read_text().strip()
if not old:
return True
ratio = len(new_content) / len(old)
if ratio < 0.7:
print(
f"WARNING: {name} update rejected ({ratio:.0%} of original size "
f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.",
file=sys.stderr,
)
return False
return True
def apply_state(result: TurnResult) -> None:
"""Write state changes from a TurnResult to disk."""
if result.changes:
CHANGES_PATH.write_text("\n".join(result.changes) + "\n")
else:
CHANGES_PATH.write_text("")
def next_turn_number() -> int:
"""Return the next turn number based on existing turns in book.md."""
_migrate_turn_headers()
text = read_file(BOOK_PATH)
if not text:
return 1
return len(re.findall(r"\n## Turn \d", text)) + 1
def _migrate_turn_headers():
"""Rewrite old date-based turn headers (## Turn — YYYY-MM-DD) to numbered format."""
text = read_file(BOOK_PATH)
if not text:
return
if not re.search(r"\n## Turn — \d{4}", text):
return
turns = re.split(r"\n(?=## Turn )", text)
migrated = []
for i, t in enumerate(turns, 1):
t = re.sub(r"^## Turn — \d{4}-\d{2}-\d{2}", f"## Turn {i}", t)
migrated.append(t)
BOOK_PATH.write_text("\n".join(migrated))
def archive_turn(narrative: str) -> int:
"""Append the narrative as a new turn in book.md. Returns the turn number."""
num = next_turn_number()
heading = f"\n\n## Turn {num}\n\n"
BOOK_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(BOOK_PATH, "a") as f:
f.write(heading + narrative.strip() + "\n")
return num
def append_log(entry: str) -> None:
"""Append a log entry to the session log."""
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LOG_PATH, "a") as f:
f.write(entry.strip() + "\n")
def append_llm_log(text: str) -> None:
"""Append raw LLM activity to llm.log for debugging."""
LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LLM_LOG_PATH, "a") as f:
f.write(text + "\n")
def append_meta_log(turn_num: int, entry: str) -> None:
"""Append a meta_log entry to meta_log.md with turn number."""
META_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(META_LOG_PATH, "a") as f:
f.write(f"- **Turn {turn_num}** — {entry.strip()}\n")
def read_last_meta_log() -> str:
"""Return the last meta_log entry, or empty string if none."""
if not META_LOG_PATH.exists():
return ""
lines = [l.strip() for l in META_LOG_PATH.read_text().splitlines() if l.strip()]
return lines[-1] if lines else ""
def update_journal(add: list[str] | None = None, done: list[str] | None = None) -> None:
"""Add or complete TODO items in journal.md."""
if not JOURNAL_PATH.exists():
JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n")
lines = JOURNAL_PATH.read_text().splitlines()
todo_items: list[str] = []
done_items: list[str] = []
before_todo: list[str] = []
between: list[str] = []
after_done: list[str] = []
section = "before_todo"
for line in lines:
stripped = line.strip()
if stripped == "## TODO":
section = "todo"
before_todo.append(line)
elif stripped == "## DONE":
section = "done"
between.append(line)
elif section == "before_todo":
before_todo.append(line)
elif section == "todo":
if stripped.startswith("- "):
todo_items.append(stripped[2:])
else:
between.append(line)
elif section == "done":
if stripped.startswith("- "):
done_items.append(stripped[2:])
else:
after_done.append(line)
if done:
done_set = set(done)
todo_items = [i for i in todo_items if i not in done_set]
new_done = [i for i in done if i not in done_items]
done_items.extend(new_done)
if add:
todo_set = set(todo_items)
new_todo = [i for i in add if i not in todo_set]
todo_items = new_todo + todo_items
out = list(before_todo)
for item in todo_items:
out.append(f"- {item}")
out.extend(between)
for item in done_items:
out.append(f"- {item}")
out.extend(after_done)
cleaned = []
prev_blank = False
for line in out:
is_blank = line.strip() == ""
if is_blank and prev_blank:
continue
cleaned.append(line)
prev_blank = is_blank
JOURNAL_PATH.write_text("\n".join(cleaned) + "\n")
def extract_hero_name() -> str:
"""Extract the hero's name from character.md."""
text = read_file(CHAR_PATH)
if not text:
return "unknown-hero"
for line in text.splitlines():
if line.strip().startswith("**Name:**"):
name = line.split(":", 1)[1].strip().strip("*_ \t")
return name.lower().replace(" ", "-") or "unknown-hero"
return "unknown-hero"
def archive_session() -> str:
"""Archive the current session folder to archive/<hero-name>-<timestamp>/ and clear session state for a fresh start.
Returns the archive path as a string."""
hero = extract_hero_name()
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
archive_dir = ARCHIVE_DIR / f"{hero}-{ts}"
archive_dir.parent.mkdir(parents=True, exist_ok=True)
if SESSION_DIR.exists():
shutil.copytree(SESSION_DIR, archive_dir, dirs_exist_ok=True)
# Clear session state
for child in SESSION_DIR.iterdir():
if child.is_file():
child.unlink()
elif child.is_dir() and child.name != "__pycache__":
shutil.rmtree(child)
return str(archive_dir)
# ── Theme management ─────────────────────────────────────────
def list_themes() -> list[dict]:
"""Scan themes/ and return metadata for each available theme."""
if not THEMES_DIR.exists():
return [{"id": "default", "name": "Default", "description": "Built-in theme"}]
themes = []
for d in sorted(THEMES_DIR.iterdir()):
if not d.is_dir():
continue
meta_path = d / "theme.json"
if meta_path.exists():
try:
import json
meta = json.loads(meta_path.read_text())
except Exception:
meta = {}
else:
meta = {}
themes.append({
"id": meta.get("id", d.name),
"name": meta.get("name", d.name),
"version": meta.get("version", "0.0.0"),
"description": meta.get("description", ""),
})
return themes
def get_active_theme() -> str:
"""Read the active theme id from session/current_theme."""
if ACTIVE_THEME_PATH.exists():
return ACTIVE_THEME_PATH.read_text().strip() or "default"
return "default"
def set_active_theme(theme_id: str) -> None:
"""Write the active theme id to session/current_theme."""
ACTIVE_THEME_PATH.parent.mkdir(parents=True, exist_ok=True)
ACTIVE_THEME_PATH.write_text(theme_id.strip() + "\n")
def init_session_from_theme(theme_id: str | None = None) -> None:
"""Seed session files from a theme. Only copies files that don't exist yet."""
from .paths import (
get_theme_dir, get_character_template_path, get_theme_audio_dir,
get_theme_ambience_options_path,
)
tid = theme_id or get_active_theme()
theme_dir = get_theme_dir(tid)
set_active_theme(tid)
# Character template
tmpl = get_character_template_path(tid)
if tmpl.exists() and not CHAR_PATH.exists():
shutil.copy2(tmpl, CHAR_PATH)
# World template (optional)
world_tmpl = theme_dir / "world_template.md"
if world_tmpl.exists() and not WORLD_PATH.exists():
shutil.copy2(world_tmpl, WORLD_PATH)
# Ambience options baseline
ao = get_theme_ambience_options_path(tid)
if ao.exists() and not AMBIENCE_OPTIONS_PATH.exists():
shutil.copy2(ao, AMBIENCE_OPTIONS_PATH)
# Audio files (seed if session/audio/ is empty)
theme_audio = get_theme_audio_dir(tid)
if theme_audio.exists():
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
existing = set(f.name for f in AUDIO_DIR.iterdir()) if AUDIO_DIR.exists() else set()
for f in theme_audio.iterdir():
if f.is_file() and f.name not in existing:
shutil.copy2(f, AUDIO_DIR)
# ── Save / Load ─────────────────────────────────────────────
SAVES_DIR = Path(__file__).resolve().parent.parent.parent / 'saves'
def save_game(slot_name: str) -> str:
"""Save current session to saves/<slot_name>/.
Excludes audio/ (seeded from theme). Returns the save path."""
slot_dir = SAVES_DIR / slot_name
slot_dir.mkdir(parents=True, exist_ok=True)
# Copy session files (excluding audio/)
for child in SESSION_DIR.iterdir():
if child.name == "audio":
continue
if child.is_file():
shutil.copy2(child, slot_dir / child.name)
elif child.is_dir():
shutil.copytree(child, slot_dir / child.name, dirs_exist_ok=True)
# Write metadata
hero = extract_hero_name()
theme = get_active_theme()
narrative = read_recent_book(1)
preview = narrative.strip()[:200] if narrative else ""
meta = {
"hero": hero,
"theme": theme,
"timestamp": datetime.now().isoformat(),
"preview": preview,
}
import json
(slot_dir / "metadata.json").write_text(json.dumps(meta, indent=2) + "\n")
return str(slot_dir)
def load_game(slot_name: str) -> str:
"""Load session from saves/<slot_name>/ back into session/.
Returns the hero name for UI reload."""
slot_dir = SAVES_DIR / slot_name
if not slot_dir.exists():
raise FileNotFoundError(f"Save slot not found: {slot_name}")
# Clear current session (same as archive_session)
for child in list(SESSION_DIR.iterdir()):
if child.is_file():
child.unlink()
elif child.is_dir() and child.name != "__pycache__":
shutil.rmtree(child)
# Restore save files
for child in slot_dir.iterdir():
if child.name == "metadata.json":
continue
if child.is_file():
shutil.copy2(child, SESSION_DIR / child.name)
elif child.is_dir():
shutil.copytree(child, SESSION_DIR / child.name, dirs_exist_ok=True)
# Re-seed audio from theme (audio is never in saves)
theme = get_active_theme()
from .paths import get_theme_audio_dir
theme_audio = get_theme_audio_dir(theme)
if theme_audio.exists():
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
for f in theme_audio.iterdir():
if f.is_file() and not (AUDIO_DIR / f.name).exists():
shutil.copy2(f, AUDIO_DIR)
return extract_hero_name()
def list_saves() -> list[dict]:
"""Return list of save slots with metadata."""
if not SAVES_DIR.exists():
return []
slots = []
import json
for d in sorted(SAVES_DIR.iterdir()):
if not d.is_dir():
continue
meta_path = d / "metadata.json"
meta = {}
if meta_path.exists():
try:
meta = json.loads(meta_path.read_text())
except Exception:
pass
slots.append({
"slot": d.name,
"hero": meta.get("hero", "?"),
"theme": meta.get("theme", "?"),
"timestamp": meta.get("timestamp", ""),
"preview": meta.get("preview", ""),
})
return slots
def delete_save(slot_name: str) -> None:
"""Remove a save slot."""
slot_dir = SAVES_DIR / slot_name
if slot_dir.exists():
shutil.rmtree(slot_dir)