265 lines
8.4 KiB
Python
265 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
state.py — Game state persistence and file I/O for The Chaos engine.
|
|
|
|
Standalone functions that read/write session files. No dependency on
|
|
GameEngine or other modules besides paths.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from .paths import (
|
|
CHAR_PATH, WORLD_PATH, BOOK_PATH, JOURNAL_PATH, AMBIENCE_PATH,
|
|
LOG_PATH, LLM_LOG_PATH, AMBIENCE_OPTIONS_PATH, CHANGES_PATH,
|
|
AUDIO_DIR, SESSION_DIR, ARCHIVE_DIR,
|
|
)
|
|
from .models import TurnResult
|
|
|
|
|
|
def read_file(path: Path) -> str:
|
|
"""Read a text file, return empty string if missing."""
|
|
return path.read_text().strip() if path.exists() else ""
|
|
|
|
|
|
def read_recent_log(max_entries: int = 5) -> str:
|
|
"""Return the last N entries from the session log."""
|
|
if not LOG_PATH.exists():
|
|
return "*No recent events.*"
|
|
lines = LOG_PATH.read_text().splitlines()
|
|
entries = [l for l in lines if l.strip().startswith("- ")]
|
|
return "\n".join(entries[-max_entries:]) or "*No recent events.*"
|
|
|
|
|
|
def read_recent_book(max_turns: int = 1) -> str:
|
|
"""Return the last N turns from the book as context."""
|
|
text = read_file(BOOK_PATH)
|
|
if not text:
|
|
return "*No prior story.*"
|
|
turns = text.split("\n## ")
|
|
if len(turns) <= 1:
|
|
return turns[0]
|
|
recent = turns[-max_turns:]
|
|
if not recent[0].startswith("## "):
|
|
recent[0] = "## " + recent[0]
|
|
return "\n## ".join(recent)
|
|
|
|
|
|
def truncate_world(text: str) -> str:
|
|
"""Extract key world context: NPCs, factions, active threads, rumours."""
|
|
if not text or text == "*No world state.*":
|
|
return text
|
|
sections = re.split(r"\n(?=## |### )", text)
|
|
parts = []
|
|
for sec in sections:
|
|
header = sec.split("\n")[0].strip() if sec else ""
|
|
if "Active Threads" in header:
|
|
parts.append(sec)
|
|
elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header:
|
|
parts.append(sec)
|
|
result = "\n\n".join(parts)
|
|
return result or text[:1500] + "\n_(world truncated)_"
|
|
|
|
|
|
def get_valid_ambiences() -> set[str]:
|
|
"""Parse ambience_options.md and return set of valid ambience names."""
|
|
valid = {"silence"}
|
|
if not AMBIENCE_OPTIONS_PATH.exists():
|
|
return valid
|
|
in_table = False
|
|
for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines():
|
|
s = line.strip()
|
|
if not s.startswith("|") or not s.endswith("|"):
|
|
in_table = False
|
|
continue
|
|
if in_table and all(c in "-:| " for c in s):
|
|
continue
|
|
parts = [p.strip() for p in s.split("|") if p.strip()]
|
|
if not parts:
|
|
continue
|
|
if not in_table:
|
|
in_table = True
|
|
continue
|
|
name = parts[0].lower()
|
|
files_str = parts[1] if len(parts) > 1 else ""
|
|
files = [f.strip() for f in files_str.split(",")]
|
|
has_files = any((AUDIO_DIR / f).exists() or f for f in files)
|
|
if has_files:
|
|
valid.add(name)
|
|
return valid
|
|
|
|
|
|
def validate_update_size(name: str, new_content: str, path: Path) -> bool:
|
|
"""Reject updates more than 30% shorter than existing — likely partial paste."""
|
|
if not path.exists():
|
|
return True
|
|
old = path.read_text().strip()
|
|
if not old:
|
|
return True
|
|
ratio = len(new_content) / len(old)
|
|
if ratio < 0.7:
|
|
print(
|
|
f"WARNING: {name} update rejected ({ratio:.0%} of original size "
|
|
f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.",
|
|
file=sys.stderr,
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
def apply_state(result: TurnResult) -> None:
|
|
"""Write state changes from a TurnResult to disk."""
|
|
if result.changes:
|
|
CHANGES_PATH.write_text("\n".join(result.changes) + "\n")
|
|
else:
|
|
CHANGES_PATH.write_text("")
|
|
|
|
|
|
def next_turn_number() -> int:
|
|
"""Return the next turn number based on existing turns in book.md."""
|
|
_migrate_turn_headers()
|
|
text = read_file(BOOK_PATH)
|
|
if not text:
|
|
return 1
|
|
return len(re.findall(r"\n## Turn \d", text)) + 1
|
|
|
|
|
|
def _migrate_turn_headers():
|
|
"""Rewrite old date-based turn headers (## Turn — YYYY-MM-DD) to numbered format."""
|
|
text = read_file(BOOK_PATH)
|
|
if not text:
|
|
return
|
|
if not re.search(r"\n## Turn — \d{4}", text):
|
|
return
|
|
turns = re.split(r"\n(?=## Turn )", text)
|
|
migrated = []
|
|
for i, t in enumerate(turns, 1):
|
|
t = re.sub(r"^## Turn — \d{4}-\d{2}-\d{2}", f"## Turn {i}", t)
|
|
migrated.append(t)
|
|
BOOK_PATH.write_text("\n".join(migrated))
|
|
|
|
|
|
def archive_turn(narrative: str) -> int:
|
|
"""Append the narrative as a new turn in book.md. Returns the turn number."""
|
|
num = next_turn_number()
|
|
heading = f"\n\n## Turn {num}\n\n"
|
|
BOOK_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(BOOK_PATH, "a") as f:
|
|
f.write(heading + narrative.strip() + "\n")
|
|
return num
|
|
|
|
|
|
def append_log(entry: str) -> None:
|
|
"""Append a log entry to the session log."""
|
|
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(LOG_PATH, "a") as f:
|
|
f.write(entry.strip() + "\n")
|
|
|
|
|
|
def append_llm_log(text: str) -> None:
|
|
"""Append raw LLM activity to llm.log for debugging."""
|
|
LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(LLM_LOG_PATH, "a") as f:
|
|
f.write(text + "\n")
|
|
|
|
|
|
def update_journal(add: list[str] | None = None, done: list[str] | None = None) -> None:
|
|
"""Add or complete TODO items in journal.md."""
|
|
if not JOURNAL_PATH.exists():
|
|
JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n")
|
|
lines = JOURNAL_PATH.read_text().splitlines()
|
|
|
|
todo_items: list[str] = []
|
|
done_items: list[str] = []
|
|
before_todo: list[str] = []
|
|
between: list[str] = []
|
|
after_done: list[str] = []
|
|
section = "before_todo"
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if stripped == "## TODO":
|
|
section = "todo"
|
|
before_todo.append(line)
|
|
elif stripped == "## DONE":
|
|
section = "done"
|
|
between.append(line)
|
|
elif section == "before_todo":
|
|
before_todo.append(line)
|
|
elif section == "todo":
|
|
if stripped.startswith("- "):
|
|
todo_items.append(stripped[2:])
|
|
else:
|
|
between.append(line)
|
|
elif section == "done":
|
|
if stripped.startswith("- "):
|
|
done_items.append(stripped[2:])
|
|
else:
|
|
after_done.append(line)
|
|
|
|
if done:
|
|
done_set = set(done)
|
|
todo_items = [i for i in todo_items if i not in done_set]
|
|
new_done = [i for i in done if i not in done_items]
|
|
done_items.extend(new_done)
|
|
|
|
if add:
|
|
todo_set = set(todo_items)
|
|
new_todo = [i for i in add if i not in todo_set]
|
|
todo_items = new_todo + todo_items
|
|
|
|
out = list(before_todo)
|
|
for item in todo_items:
|
|
out.append(f"- {item}")
|
|
out.extend(between)
|
|
for item in done_items:
|
|
out.append(f"- {item}")
|
|
out.extend(after_done)
|
|
|
|
cleaned = []
|
|
prev_blank = False
|
|
for line in out:
|
|
is_blank = line.strip() == ""
|
|
if is_blank and prev_blank:
|
|
continue
|
|
cleaned.append(line)
|
|
prev_blank = is_blank
|
|
JOURNAL_PATH.write_text("\n".join(cleaned) + "\n")
|
|
|
|
|
|
def extract_hero_name() -> str:
|
|
"""Extract the hero's name from character.md."""
|
|
text = read_file(CHAR_PATH)
|
|
if not text:
|
|
return "unknown-hero"
|
|
for line in text.splitlines():
|
|
if line.strip().startswith("**Name:**"):
|
|
name = line.split(":", 1)[1].strip().strip("*_ \t")
|
|
return name.lower().replace(" ", "-") or "unknown-hero"
|
|
return "unknown-hero"
|
|
|
|
|
|
def archive_session() -> str:
|
|
"""Archive the current session folder to archive/<hero-name>-<timestamp>/ and clear session state for a fresh start.
|
|
Returns the archive path as a string."""
|
|
hero = extract_hero_name()
|
|
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
archive_dir = ARCHIVE_DIR / f"{hero}-{ts}"
|
|
archive_dir.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if SESSION_DIR.exists():
|
|
shutil.copytree(SESSION_DIR, archive_dir, dirs_exist_ok=True)
|
|
|
|
# Clear session state
|
|
for child in SESSION_DIR.iterdir():
|
|
if child.is_file():
|
|
child.unlink()
|
|
elif child.is_dir() and child.name != "__pycache__":
|
|
shutil.rmtree(child)
|
|
|
|
return str(archive_dir)
|