splinter-keep/tools/state.py

211 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
state.py — Game state persistence and file I/O for The Chaos engine.
Standalone functions that read/write session files. No dependency on
GameEngine or other modules besides paths.
"""
from __future__ import annotations
import re
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
from paths import (
CHAR_PATH, WORLD_PATH, BOOK_PATH, JOURNAL_PATH, AMBIENCE_PATH,
LOG_DIR, LLM_LOG_PATH, AMBIENCE_OPTIONS_PATH, CHANGES_PATH,
AUDIO_DIR, TODAY,
)
from models import TurnResult
def read_file(path: Path) -> str:
"""Read a text file, return empty string if missing."""
return path.read_text().strip() if path.exists() else ""
def read_recent_log(max_entries: int = 5) -> str:
"""Read the latest log file and return the last N entries."""
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
yesterday = (date.today() - timedelta(days=1)).isoformat()
log_path = LOG_DIR / f"{yesterday}.md"
if not log_path.exists():
return "*No recent events.*"
lines = log_path.read_text().splitlines()
entries = [l for l in lines if l.strip().startswith("- ")]
return "\n".join(entries[-max_entries:]) or "*No recent events.*"
def read_recent_book(max_turns: int = 1) -> str:
"""Return the last N turns from the book as context."""
text = read_file(BOOK_PATH)
if not text:
return "*No prior story.*"
turns = text.split("\n## ")
recent = turns[-max_turns:]
return "\n## ".join(recent) if len(turns) > 1 else recent[0]
def truncate_world(text: str) -> str:
"""Extract key world context: NPCs, factions, active threads, rumours."""
if not text or text == "*No world state.*":
return text
sections = re.split(r"\n(?=## |### )", text)
parts = []
for sec in sections:
header = sec.split("\n")[0].strip() if sec else ""
if "Active Threads" in header:
parts.append(sec)
elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header:
parts.append(sec)
result = "\n\n".join(parts)
return result or text[:1500] + "\n_(world truncated)_"
def get_valid_ambiences() -> set[str]:
"""Parse ambience_options.md and return set of valid ambience names."""
valid = {"silence"}
if not AMBIENCE_OPTIONS_PATH.exists():
return valid
in_table = False
for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines():
s = line.strip()
if not s.startswith("|") or not s.endswith("|"):
in_table = False
continue
if in_table and all(c in "-:| " for c in s):
continue
parts = [p.strip() for p in s.split("|") if p.strip()]
if not parts:
continue
if not in_table:
in_table = True
continue
name = parts[0].lower()
files_str = parts[1] if len(parts) > 1 else ""
files = [f.strip() for f in files_str.split(",")]
has_files = any((AUDIO_DIR / f).exists() or f for f in files)
if has_files:
valid.add(name)
return valid
def validate_update_size(name: str, new_content: str, path: Path) -> bool:
"""Reject updates more than 30% shorter than existing — likely partial paste."""
if not path.exists():
return True
old = path.read_text().strip()
if not old:
return True
ratio = len(new_content) / len(old)
if ratio < 0.7:
print(
f"WARNING: {name} update rejected ({ratio:.0%} of original size "
f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.",
file=sys.stderr,
)
return False
return True
def apply_state(result: TurnResult) -> None:
"""Write state changes from a TurnResult to disk."""
if result.ambience:
AMBIENCE_PATH.write_text(result.ambience.strip().lower() + "\n")
if result.changes:
CHANGES_PATH.write_text("\n".join(result.changes) + "\n")
else:
CHANGES_PATH.write_text("")
def archive_turn(narrative: str) -> None:
"""Append the narrative as a new turn in book.md."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
heading = f"\n\n## Turn — {timestamp}\n\n"
BOOK_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(BOOK_PATH, "a") as f:
f.write(heading + narrative.strip() + "\n")
def append_log(entry: str) -> None:
"""Append a log entry to today's log file."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
log_path.write_text(f"# Session Log — {TODAY}\n\n")
with open(log_path, "a") as f:
f.write(entry.strip() + "\n")
def append_llm_log(text: str) -> None:
"""Append raw LLM activity to llm.log for debugging."""
LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LLM_LOG_PATH, "a") as f:
f.write(text + "\n")
def update_journal(add: list[str] | None = None, done: list[str] | None = None) -> None:
"""Add or complete TODO items in journal.md."""
if not JOURNAL_PATH.exists():
JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n")
lines = JOURNAL_PATH.read_text().splitlines()
todo_items: list[str] = []
done_items: list[str] = []
before_todo: list[str] = []
between: list[str] = []
after_done: list[str] = []
section = "before_todo"
for line in lines:
stripped = line.strip()
if stripped == "## TODO":
section = "todo"
before_todo.append(line)
elif stripped == "## DONE":
section = "done"
between.append(line)
elif section == "before_todo":
before_todo.append(line)
elif section == "todo":
if stripped.startswith("- "):
todo_items.append(stripped[2:])
else:
between.append(line)
elif section == "done":
if stripped.startswith("- "):
done_items.append(stripped[2:])
else:
after_done.append(line)
if done:
done_set = set(done)
todo_items = [i for i in todo_items if i not in done_set]
new_done = [i for i in done if i not in done_items]
done_items.extend(new_done)
if add:
todo_set = set(todo_items)
new_todo = [i for i in add if i not in todo_set]
todo_items = new_todo + todo_items
out = list(before_todo)
for item in todo_items:
out.append(f"- {item}")
out.extend(between)
for item in done_items:
out.append(f"- {item}")
out.extend(after_done)
cleaned = []
prev_blank = False
for line in out:
is_blank = line.strip() == ""
if is_blank and prev_blank:
continue
cleaned.append(line)
prev_blank = is_blank
JOURNAL_PATH.write_text("\n".join(cleaned) + "\n")