splinter-keep/tools/engine.py

1138 lines
44 KiB
Python

#!/usr/bin/env python3
"""
engine.py — The Chaos Game Engine
Owns the LLM interaction, prompt assembly, response parsing, and game state
persistence. The TUI (run.py) calls this module — they do not depend on each
other, only on the shared session/ file layout.
"""
from __future__ import annotations
import json
import re
import sys
from dataclasses import dataclass, field
from datetime import date, datetime
from pathlib import Path
from string import Template
from typing import Iterator, Optional
# ── Paths ──────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent.parent
SESSION_DIR = BASE_DIR / 'session'
CONFIG_PATH = SESSION_DIR / 'config.json'
CHAR_PATH = SESSION_DIR / 'character.md'
WORLD_PATH = SESSION_DIR / 'world.md'
BOOK_PATH = SESSION_DIR / 'book.md'
JOURNAL_PATH = SESSION_DIR / 'journal.md'
AMBIENCE_PATH = SESSION_DIR / 'ambience.md'
LOG_DIR = SESSION_DIR / 'log'
LLM_LOG_PATH = SESSION_DIR / 'llm.log'
AMBIENCE_OPTIONS_PATH = SESSION_DIR / "ambience_options.md"
AUDIO_DIR = SESSION_DIR / "audio"
TODAY = date.today().isoformat()
# ── Structured output ──────────────────────────────────────────────────────
@dataclass
class GenerationResult:
"""Legacy result — kept for backward compat with CLI main()."""
narrative: str
choices: list[str] = field(default_factory=list)
log_entry: Optional[str] = None
ambience: Optional[str] = None
character_updates: Optional[str] = None
world_updates: Optional[str] = None
journal_add: list[str] = field(default_factory=list)
journal_done: list[str] = field(default_factory=list)
error: Optional[str] = None
@dataclass
class TurnResult:
"""Output of a complete turn via finalize_turn tool."""
book_log: str = ""
user_prompt: str = ""
ambience: Optional[str] = None
log_entry: Optional[str] = None
error: Optional[str] = None
debug_info: str = ""
# ── DM System Prompt Template ──────────────────────────────────────────────
SYSTEM_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion.
## Rules
- **Odds**: 1d6, 4+ favourable, 3- trouble.
- **Traits**: 3d6, roll UNDER trait.
- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour.
- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed.
- **Modifiers**: Favourable +1, Risky -1, Desperate -2.
## Tools (action only)
Wrap in ```tool to perform an action:
```
{"tool": "roll", "args": {"dice": "1d6"}}
```
- **roll** — dice, modifier
- **player_roll** — dice, reason
- **character_update** — content: "full sheet" (if HP/cash/gear/stats change)
- **world_update** — content: "full world" (if NPCs/locations/threads change)
- **journal_update** — add: [...], done: [...]
You have the full state above — no need to look anything up. Just write the story and use tools when the player's action changes something.
## State
### Character
$character
### World
$world
### Log
$log
### Story
$story""")
PROSE_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion.
## Rules
- **Odds**: 1d6, 4+ favourable, 3- trouble.
- **Traits**: 3d6, roll UNDER trait.
- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour.
- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed.
- **Modifiers**: Favourable +1, Risky -1, Desperate -2.
A die is cast at the start of each turn — incorporate it into your narrative.
## State
### Character
$character
### World
$world
### Log
$log
### Story
$story""")
# ── Game Engine ────────────────────────────────────────────────────────────
class GameEngine:
"""Owns the LLM interaction and game state persistence."""
def __init__(self, session_dir: str | Path = SESSION_DIR):
self.session_dir = Path(session_dir)
self.config: dict = {}
self._load_config()
# ── Config ──────────────────────────────────────────────────────────
def _load_config(self) -> None:
if not CONFIG_PATH.exists():
print(
"No session/config.json found. Creating default.\n"
"Edit the model field (e.g. 'ollama/llama3.1', 'openai/gpt-4', "
"'anthropic/claude-sonnet-4-20250514') and set api_key if needed.",
file=sys.stderr,
)
self.config = {
"llm": {
"model": "ollama/llama3.1",
"api_key": None,
"api_base": None,
"temperature": 0.8,
"max_tokens": 300,
}
}
self._save_config()
else:
raw = CONFIG_PATH.read_text()
self.config = json.loads(raw)
# Ensure api_key is None not empty string
llm = self.config.get("llm", {})
if not llm.get("api_key"):
llm["api_key"] = None
def _save_config(self) -> None:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(self.config, indent=2) + "\n")
@property
def model(self) -> str:
return self.config.get("llm", {}).get("model", "ollama/llama3.1")
@property
def api_key(self) -> str | None:
return self.config.get("llm", {}).get("api_key")
@property
def api_base(self) -> str | None:
return self.config.get("llm", {}).get("api_base")
@property
def temperature(self) -> float:
return self.config.get("llm", {}).get("temperature", 0.8)
@property
def max_tokens(self) -> int:
return self.config.get("llm", {}).get("max_tokens", 512)
def _set_llm_env(self) -> None:
"""Set provider-specific env vars for litellm."""
prefix = self.model.split("/")[0].upper()
import os
key = self.api_key or "sk-placeholder"
os.environ[f"{prefix}_API_KEY"] = key
if self.api_base:
os.environ[f"{prefix}_API_BASE"] = self.api_base
# ── Context Assembly ────────────────────────────────────────────────
def _read_file(self, path: Path) -> str:
return path.read_text().strip() if path.exists() else ""
def _read_recent_log(self, max_entries: int = 5) -> str:
"""Read the latest log file and return the last N entries."""
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
from datetime import timedelta
yesterday = (date.today() - timedelta(days=1)).isoformat()
log_path = LOG_DIR / f"{yesterday}.md"
if not log_path.exists():
return "*No recent events.*"
lines = log_path.read_text().splitlines()
entries = [l for l in lines if l.strip().startswith("- ")]
return "\n".join(entries[-max_entries:]) or "*No recent events.*"
def _read_recent_book(self, max_turns: int = 1) -> str:
"""Return the last N turns from the book as context."""
text = self._read_file(BOOK_PATH)
if not text:
return "*No prior story.*"
turns = text.split("\n## ")
recent = turns[-max_turns:]
return "\n## ".join(recent) if len(turns) > 1 else recent[0]
@staticmethod
def _truncate_world(text: str) -> str:
"""Extract key world context: NPCs, factions, active threads, rumours."""
if not text or text == "*No world state.*":
return text
sections = re.split(r"\n(?=## |### )", text)
parts = []
for sec in sections:
header = sec.split("\n")[0].strip() if sec else ""
if "Active Threads" in header:
parts.append(sec)
elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header:
parts.append(sec)
result = "\n\n".join(parts)
return result or text[:1500] + "\n_(world truncated)_"
def _get_valid_ambiences(self) -> set[str]:
"""Parse ambience_options.md and return set of valid ambience names with associated audio files."""
valid = {"silence"} # silence always valid (stops music)
if not AMBIENCE_OPTIONS_PATH.exists():
return valid
in_table = False
for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines():
s = line.strip()
if not s.startswith("|") or not s.endswith("|"):
in_table = False
continue
if in_table and all(c in "-:| " for c in s):
continue
parts = [p.strip() for p in s.split("|") if p.strip()]
if not parts:
continue
if not in_table:
in_table = True
continue
name = parts[0].lower()
files_str = parts[1] if len(parts) > 1 else ""
files = [f.strip() for f in files_str.split(",")]
# Only add if at least one file exists (or is listed)
has_files = any((AUDIO_DIR / f).exists() or f for f in files)
if has_files:
valid.add(name)
return valid
def build_system_prompt(self) -> str:
"""Assemble the system prompt with current game state."""
char = self._read_file(CHAR_PATH) or "*No character sheet.*"
world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*"
log = self._read_recent_log()
story = self._read_recent_book()
return SYSTEM_PROMPT.substitute(
character=char, world=world, log=log, story=story
)
def build_user_message(
self,
player_action: str | None = None,
last_prompt: str | None = None,
**kwargs: str | None,
) -> str:
"""Build the user message for this turn's LLM call."""
if kwargs:
raise TypeError(
f"build_user_message() got unexpected keyword arguments: "
f"{set(kwargs)}. Did you mean 'last_prompt' instead of one of these?"
)
parts = []
if last_prompt:
parts.append(f"## Situation\n{last_prompt}")
if player_action:
parts.append(f"## Player's Request\n{player_action}")
has_existing_story = bool(
self._read_file(BOOK_PATH).strip()
) if not last_prompt else True
if not player_action and not last_prompt:
if has_existing_story:
raise RuntimeError(f"User action is required for every turn.")
else:
parts.append(
"## Instructions\n"
"This is a new story. Welcome the player and guide them through the game setup."
)
else:
parts.append(
"## Instructions\n"
"Advance the story based on the player's request. "
"All state is shown above — write the outcome directly."
)
return "\n\n".join(parts)
# ── LLM Call ────────────────────────────────────────────────────────
def generate(
self,
player_action: str | None = None,
last_narrative: str | None = None,
) -> GenerationResult:
"""
Synchronous generation. Calls the LLM, parses the response,
and returns a GenerationResult.
The TUI calls this from a worker thread — see run.py.
"""
system = self.build_system_prompt()
user = self.build_user_message(
player_action=player_action, last_prompt=last_narrative
)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
try:
import litellm
except ImportError:
return GenerationResult(
narrative="",
error=(
"litellm is not installed. Run: pip install litellm"
),
)
# Set API key / base if provided
self._set_llm_env()
try:
response = litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
stream=False,
timeout=60,
)
text = response.choices[0].message.content or ""
except Exception as e:
return GenerationResult(
narrative="",
error=f"LLM call failed: {e}",
)
return self.parse_response(text)
def generate_stream(
self,
player_action: str | None = None,
last_narrative: str | None = None,
) -> Iterator[str]:
"""
Streaming generator. Yields text chunks as they arrive from the LLM.
On completion, the final yield is the FULL text (for parsing).
"""
system = self.build_system_prompt()
user = self.build_user_message(
player_action=player_action, last_prompt=last_narrative
)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
try:
import litellm
except ImportError:
yield json.dumps({
"error": "litellm is not installed. Run: pip install litellm"
})
return
self._set_llm_env()
try:
response = litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
stream=True,
timeout=60,
)
full_text = ""
for chunk in response:
delta = chunk.choices[0].delta.content or ""
if delta:
full_text += delta
yield full_text # partial narrative for streaming display
# Final yield is the completed text
yield full_text
except Exception as e:
yield json.dumps({"error": f"LLM call failed: {e}"})
# ── Tool Infrastructure ────────────────────────────────────────────
TOOL_REGISTRY: dict[str, dict] = {
"read_file": {
"description": "Read a game state file.",
"args": {"file": "character | world | book | log | journal"},
},
"roll": {
"description": "Roll dice and return the outcome.",
"args": {"dice": "e.g. 1d6, 2d6", "modifier": "optional +N or -N"},
},
"think": {
"description": "Internal reasoning shown in the game status bar.",
"args": {"thought": "Your reasoning."},
},
"player_roll": {
"description": "Ask the player to physically roll dice and enter the result.",
"args": {"dice": "e.g. 2d6+1", "reason": "Why the roll is needed (shown to player)"},
},
"character_get": {
"description": "Read the full character sheet.",
"args": {},
},
"character_update": {
"description": "Replace the character sheet with a new full version (ONLY if HP/cash/gear/stats changed).",
"args": {"content": "Full character sheet markdown"},
},
"world_get": {
"description": "Read the full world state.",
"args": {},
},
"world_update": {
"description": "Replace the world state with a new full version (ONLY if NPCs/locations/threads changed).",
"args": {"content": "Full world state markdown"},
},
"journal_get": {
"description": "Read the journal (TODO / DONE).",
"args": {},
},
"journal_update": {
"description": "Add or complete journal entries.",
"args": {"add": "Optional: list of new TODO items", "done": "Optional: list of completed items"},
},
"finalize_turn": {
"description": "Complete the turn.",
"args": {
"book_log": "[Required] Full narrative — appended to story book (permanent record)",
"user_prompt": "[Required] Short prompt for player — NOT recorded, 1-3 sentences",
"log_entry": "[Optional] One-sentence summary",
"ambience": "[Optional] Soundscape name",
},
},
}
def _tool_think(self, args: dict) -> str:
"""Think tool — content is displayed via dm_status in the status bar."""
return ""
def _tool_read_file(self, args: dict) -> str:
filename = (args or {}).get("file", "")
paths = {
"character": CHAR_PATH,
"world": WORLD_PATH,
"book": BOOK_PATH,
"log": LOG_DIR / f"{TODAY}.md",
"journal": JOURNAL_PATH,
}
path = paths.get(filename)
if not path:
return f"Unknown file: {filename}. Choose from: {', '.join(paths)}"
return self._read_file(path) or f"*{filename} is empty.*"
def _tool_roll(self, args: dict) -> str:
import random
dice_str = (args or {}).get("dice", "1d6")
modifier_str = (args or {}).get("modifier", "0")
try:
count, sides = dice_str.lower().split("d")
count = int(count) if count else 1
sides = int(sides)
except (ValueError, TypeError):
return f"Invalid dice: {dice_str}. Use format like '2d6'."
mod = 0
if modifier_str:
try:
mod = int(modifier_str)
except ValueError:
pass
rolls = [random.randint(1, sides) for _ in range(count)]
total = sum(rolls) + mod
mod_str = f" {'+' if mod >= 0 else ''}{mod}" if mod != 0 else ""
return f"Roll: {dice_str}{mod_str} → [{', '.join(str(r) for r in rolls)}] = {total}"
def _tool_character_get(self, args: dict) -> str:
return self._read_file(CHAR_PATH) or "*Character sheet is empty.*"
def _tool_character_update(self, args: dict) -> str:
content = (args or {}).get("content", "")
if not content:
return "**Error:** `content` is required."
if not self._validate_update_size("character", content, CHAR_PATH):
return "**Error:** Update rejected — content is too short (likely a partial paste)."
CHAR_PATH.write_text(content.strip() + "\n")
return "Character sheet updated."
def _tool_world_get(self, args: dict) -> str:
return self._read_file(WORLD_PATH) or "*World state is empty.*"
def _tool_world_update(self, args: dict) -> str:
content = (args or {}).get("content", "")
if not content:
return "**Error:** `content` is required."
if not self._validate_update_size("world", content, WORLD_PATH):
return "**Error:** Update rejected — content is too short (likely a partial paste)."
WORLD_PATH.write_text(content.strip() + "\n")
return "World state updated."
def _tool_journal_get(self, args: dict) -> str:
return self._read_file(JOURNAL_PATH) or "*Journal is empty.*"
def _tool_journal_update(self, args: dict) -> str:
add = (args or {}).get("add", [])
done = (args or {}).get("done", [])
if isinstance(add, str):
add = [add]
if isinstance(done, str):
done = [done]
if not add and not done:
return "**Error:** Provide at least one of `add` or `done`."
self._update_journal(add=add, done=done)
return "Journal updated."
@staticmethod
def _describe_tool_action(tool_name: str, args: dict) -> str:
"""Return a user-facing status message for a tool call.
Prefer the LLM-provided dm_status — otherwise fall back to a generic description."""
dm_status = (args or {}).get("dm_status")
if dm_status:
return f"DM is {dm_status}..."
read_descriptions = {
"character": "reading the character sheet",
"world": "consulting the world map",
"book": "reviewing the story so far",
"log": "checking the session log",
"journal": "scanning the journal",
}
if tool_name == "read_file":
file = (args or {}).get("file", "")
desc = read_descriptions.get(file, f"reading {file}")
elif tool_name in ("character_get", "world_get", "journal_get"):
file = tool_name.replace("_get", "")
desc = read_descriptions.get(file, f"reading {file}")
elif tool_name in ("character_update", "world_update"):
desc = "updating the records"
elif tool_name == "journal_update":
desc = "updating the journal"
elif tool_name == "roll":
dice = (args or {}).get("dice", "1d6")
mod = (args or {}).get("modifier")
desc = f"rolling {dice}"
if mod:
desc += f" {mod}"
elif tool_name == "player_roll":
dice = (args or {}).get("dice", "1d6")
desc = f"asking you to roll {dice}"
else:
desc = f"using {tool_name}"
return f"DM is {desc}..."
def _execute_tool(self, tool_name: str, args: dict) -> str:
fn_map = {
"read_file": self._tool_read_file,
"roll": self._tool_roll,
"think": self._tool_think,
"character_get": self._tool_character_get,
"character_update": self._tool_character_update,
"world_get": self._tool_world_get,
"world_update": self._tool_world_update,
"journal_get": self._tool_journal_get,
"journal_update": self._tool_journal_update,
}
fn = fn_map.get(tool_name)
if not fn:
return f"Unknown tool: {tool_name}"
try:
return fn(args)
except Exception as e:
return f"Tool error ({tool_name}): {e}"
@staticmethod
def _extract_thoughts(text: str) -> list[str]:
pattern = r"```thought\s*\n?(.*?)```"
return re.findall(pattern, text, re.DOTALL)
@staticmethod
def _extract_tool_calls(text: str, *, round_num: int = 0, on_debug: callable = None) -> list[dict]:
"""Extract tool calls from ```tool and ```json blocks.
Uses json.JSONDecoder.raw_decode for strict parsing; falls back to
heuristics if the LLM produces unescaped newlines in string values.
"""
calls = []
seen = set()
def _try_parse(raw: str) -> dict | None:
try:
obj = json.loads(raw)
if isinstance(obj, dict) and "tool" in obj:
return obj
except json.JSONDecodeError:
pass
return None
for m in re.finditer(r"```(?:tool|json|finalize_turn)\s*\n?", text):
fence_type = m.group(0).strip("``` \n\r")
# 1) Strict: raw_decode from where the JSON should start
obj = None
try:
decoder = json.JSONDecoder()
obj, end = decoder.raw_decode(text, m.end())
except (json.JSONDecodeError, ValueError, StopIteration):
pass
if obj is None:
# 2) Fallback: find closing backticks and repair unescaped newlines in strings
close = text.find("```", m.end())
if close > 0:
raw = text[m.end():close].strip()
def _escape_in_strings(s: str) -> str:
return re.sub(r'"(?:[^"\\]|\\.)*"', lambda x: x.group(0).replace("\n", "\\n"), s, flags=re.DOTALL)
repaired = _escape_in_strings(raw)
obj = _try_parse(repaired)
if obj is not None and isinstance(obj, dict):
# Normalize: fence type "finalize_turn" means the JSON is the args directly
if fence_type == "finalize_turn":
obj = {"tool": "finalize_turn", "args": obj}
# If JSON has a "tool" key, keep as-is
if "tool" not in obj:
obj = None
if obj is not None:
key = (obj["tool"], json.dumps(obj.get("args", {}), sort_keys=True))
if key not in seen:
seen.add(key)
calls.append(obj)
elif on_debug:
preview = text[m.end():m.end() + 120].replace("\n", "\\n")
on_debug("parse_error", {"round": round_num, "content": preview})
return calls
@staticmethod
def _extract_final_json(text: str) -> dict | None:
pattern = r"```json\s*\n?(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
if not matches:
return None
try:
return json.loads(matches[-1].strip())
except json.JSONDecodeError:
return None
def _call_llm(self, messages: list[dict], *, label: str = "", max_tokens: int | None = None) -> str | None:
"""Make a single LLM call. Returns content text or None on error."""
try:
import litellm
except ImportError:
return None
try:
response = litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
stream=False,
timeout=60,
max_tokens=max_tokens or self.max_tokens,
)
text = response.choices[0].message.content or ""
self._append_llm_log(f"\n--- {label} ---\n{text}")
return text
except Exception as e:
self._append_llm_log(f"\n--- LLM ERROR ({label}) ---\n{e}")
return None
def generate_with_tools(
self,
player_action: str | None = None,
last_prompt: str | None = None,
on_thought: callable = None,
on_action: callable = None,
on_player_roll: callable = None,
on_debug: callable = None,
) -> TurnResult:
"""
Three-phase generation:
1. **Prose** — LLM writes the full book_log from context + player action.
2. **Summarize** — LLM condenses the book_log into one log line.
3. **Extract** — LLM reads the book_log and outputs tool calls for state changes.
"""
self._set_llm_env()
from datetime import datetime
self._append_llm_log(f"\n{'='*60}")
self._append_llm_log(f"=== Turn — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
self._append_llm_log(f"{'='*60}")
if player_action:
self._append_llm_log(f"Player: {player_action}")
elif last_prompt:
self._append_llm_log(f"Resume from: {last_prompt[:120]}")
# ── Phase 1: Prose ────────────────────────────────────────────────
import random
die_roll = random.randint(1, 6)
self._append_llm_log(f"Dice: {die_roll} (1d6)")
if on_action:
on_action(f"Phase 1/3: writing story (dice={die_roll})")
if on_debug:
on_debug("phase", {"phase": 1, "name": "prose", "status": "start", "dice": die_roll})
book_log = None
for attempt in range(3):
system = PROSE_PROMPT.substitute(
character=self._read_file(CHAR_PATH) or "*No character sheet.*",
world=self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*",
log=self._read_recent_log(),
story=self._read_recent_book(),
)
user = self.build_user_message(
player_action=player_action,
last_prompt=last_prompt,
)
user += f"\n\n*A die is cast: **{die_roll}** (1d6).*"
text = self._call_llm([
{"role": "system", "content": system},
{"role": "user", "content": user},
], label=f"Prose attempt {attempt + 1}", max_tokens=1024)
if not text or not text.strip():
if on_debug:
on_debug("phase", {"phase": 1, "status": "empty", "attempt": attempt + 1})
continue
book_log = text.strip()
if on_debug:
preview = book_log[:150].replace("\n", "\\n")
on_debug("phase", {"phase": 1, "status": "done", "chars": len(book_log), "preview": preview})
break
if not book_log:
return TurnResult(error="Prose generation failed after 3 attempts")
# ── Phase 2: Summarize ────────────────────────────────────────────
if on_action:
on_action("Phase 2/3: summarizing story")
if on_debug:
on_debug("phase", {"phase": 2, "name": "summarize", "status": "start"})
log_context = self._read_recent_log()
log_entry = None
for attempt in range(2):
text = self._call_llm([
{"role": "user", "content":
f"Given the session log so far, summarize the new story in one line. "
f"Focus on who was involved (character and NPC names):\n\n"
f"## Session Log\n{log_context}\n\n"
f"## New Story\n{book_log}"}
], label=f"Summarize attempt {attempt + 1}")
if text and text.strip():
log_entry = text.strip().split("\n")[0][:120]
if on_debug:
on_debug("phase", {"phase": 2, "status": "done", "summary": log_entry})
break
if not log_entry:
log_entry = book_log.split("\n")[0][:120]
if on_debug:
on_debug("phase", {"phase": 2, "status": "fallback", "summary": log_entry})
# ── Phase 3: Extract state changes ────────────────────────────────
if on_action:
on_action("Phase 3/3: extracting state changes")
if on_debug:
on_debug("phase", {"phase": 3, "name": "extract", "status": "start"})
user_prompt = self._auto_prompt(book_log)
ambience = None
debug_info = ""
current_char = self._read_file(CHAR_PATH) or "*No character.*"
current_world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world.*"
for attempt in range(3):
text = self._call_llm([
{"role": "user", "content":
f"Read the story and compare with current state. Output tool calls for changes:\n\n"
f"## Current Character\n{current_char}\n\n"
f"## Current World\n{current_world}\n\n"
f"## Story\n{book_log}\n\n"
f"Output tool blocks for changes only. Include the FULL updated content:\n"
f"- character_update — content: full new sheet if HP/cash/gear/stats changed\n"
f"- world_update — content: full new world if NPCs/locations/threads changed\n"
f"- journal_update — add: [...], done: [...]\n"
f"- finalize_turn — user_prompt (question for player), ambience (soundscape)\n\n"
f"Wrap each in ```tool:\n"
f"```tool\n{{\"tool\": \"character_update\", \"args\": {{\"content\": \"# Character\\n...\"}}}}\n```"}
], label=f"Extract attempt {attempt + 1}")
if not text or not text.strip():
if on_debug:
on_debug("phase", {"phase": 3, "status": "empty", "attempt": attempt + 1})
continue
tool_calls = self._extract_tool_calls(
text, round_num=attempt + 1, on_debug=on_debug
)
if on_debug and tool_calls:
names = [tc.get("tool", "?") for tc in tool_calls if tc.get("tool") != "finalize_turn"]
fin = any(tc.get("tool") == "finalize_turn" for tc in tool_calls)
on_debug("phase", {"phase": 3, "status": "tools_found", "tools": names, "has_finalize": fin})
errors = []
for tc in tool_calls:
name = tc.get("tool", "?")
args = tc.get("args", {})
if name == "finalize_turn":
if args.get("user_prompt"):
user_prompt = args["user_prompt"]
if args.get("ambience"):
ambience = args["ambience"]
continue
if on_action:
on_action(f"State: {self._describe_tool_action(name, args)}")
if on_debug:
on_debug("tool_call", {"round": attempt + 1, "tool": name, "args": args})
if name == "player_roll" and on_player_roll:
dice = args.get("dice", "1d6")
reason = args.get("reason", "a check")
roll_val = on_player_roll(dice, reason)
result = f"Player rolled {dice} for '{reason}': {roll_val}"
else:
result = self._execute_tool(name, args)
if result.startswith("**Error:") or result.startswith("Tool error") or result.startswith("Unknown"):
errors.append(f"{name}: {result}")
if on_debug:
on_debug("tool_result", {"round": attempt + 1, "tool": name, "result": result})
if not errors:
if on_debug:
on_debug("phase", {"phase": 3, "status": "done", "applied": len([tc for tc in tool_calls if tc.get("tool") != "finalize_turn"])})
break
debug_info = "; ".join(errors)
if on_debug:
on_debug("phase", {"phase": 3, "status": "errors", "errors": errors, "attempt": attempt + 1})
if on_action:
on_action("Turn complete")
if on_debug:
on_debug("phase_done", {
"book_log_chars": len(book_log),
"log_entry": log_entry,
"user_prompt": user_prompt,
"ambience": ambience,
"extract_errors": debug_info or None,
})
self._append_llm_log(
f"\n--- FINAL ---\n"
f"book_log: {book_log[:200]}\n"
f"log_entry: {log_entry}\n"
f"user_prompt: {user_prompt}\n"
f"ambience: {ambience}\n"
)
return TurnResult(
book_log=book_log,
log_entry=log_entry,
user_prompt=user_prompt,
ambience=ambience,
debug_info=debug_info,
)
@staticmethod
def _strip_tool_blocks(text: str) -> str:
"""Remove ```tool, ```json, finalize_turn blocks from narrative text."""
return re.sub(
r'```(?:tool|json|finalize_turn)\s*\n?.*?```',
'',
text,
flags=re.DOTALL,
).strip()
@staticmethod
def _auto_prompt(book_log: str) -> str:
"""Extract a player prompt from the narrative. Uses the last sentence."""
lines = book_log.strip().splitlines()
sentences = []
for line in reversed(lines):
line = line.strip()
if not line:
continue
# Take last substantive line as the prompt
return f"**What do you do?**\n\n{line}"
return "**What do you do?**"
# ── Response Parsing ────────────────────────────────────────────────
@staticmethod
def parse_response(text: str) -> GenerationResult:
"""
Parse a full LLM response into a GenerationResult.
Extracts the JSON block and splits narrative from it.
"""
# Check for error JSON
if text.startswith('{"error":'):
try:
err = json.loads(text).get("error", "Unknown error")
except json.JSONDecodeError:
err = "Unknown error"
return GenerationResult(narrative="", error=err)
# Try to find a ```json ... ``` block
json_pattern = r"```json\s*\n?(.*?)\n?```"
matches = re.findall(json_pattern, text, re.DOTALL)
narrative = text
data = {}
if matches:
json_str = matches[-1].strip()
# Remove the JSON block from narrative
narrative = text[: text.rfind("```json")]
# Also strip any stray "book_log:" lines that may appear before the JSON block
narrative_lines = []
for line in narrative.splitlines():
if not line.lstrip().startswith('book_log:'):
narrative_lines.append(line)
narrative = "\n".join(narrative_lines).strip()
try:
data = json.loads(json_str)
except json.JSONDecodeError:
pass
else:
# Fallback: maybe the entire response is JSON (no fence)
text_stripped = text.strip()
if text_stripped.startswith("{") and text_stripped.endswith("}"):
try:
data = json.loads(text_stripped)
narrative = data.get("narrative", "")
except json.JSONDecodeError:
pass
return GenerationResult(
narrative=narrative or text,
choices=data.get("choices", []),
log_entry=data.get("log_entry"),
ambience=data.get("ambience"),
character_updates=data.get("character_updates"),
world_updates=data.get("world_updates"),
journal_add=data.get("journal_add", []),
journal_done=data.get("journal_done", []),
)
# ── State Persistence ───────────────────────────────────────────────
def _validate_update_size(self, name: str, new_content: str, path: Path) -> bool:
"""Reject updates that are more than 30% shorter than the existing file
— likely the LLM pasted a fragment instead of the full state."""
if not path.exists():
return True
old = path.read_text().strip()
if not old:
return True
ratio = len(new_content) / len(old)
if ratio < 0.7:
import sys
print(
f"WARNING: {name} update rejected ({ratio:.0%} of original size "
f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.",
file=sys.stderr,
)
return False
return True
def apply_state(self, result: TurnResult) -> None:
"""Write state changes from a TurnResult to disk."""
if result.ambience:
AMBIENCE_PATH.write_text(result.ambience.strip().lower() + "\n")
def archive_turn(self, narrative: str) -> None:
"""Append the narrative as a new turn in book.md."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
heading = f"\n\n## Turn — {timestamp}\n\n"
BOOK_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(BOOK_PATH, "a") as f:
f.write(heading + narrative.strip() + "\n")
def append_log(self, entry: str) -> None:
"""Append a log entry to today's log file."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
log_path.write_text(f"# Session Log — {TODAY}\n\n")
with open(log_path, "a") as f:
f.write(entry.strip() + "\n")
def _append_llm_log(self, text: str) -> None:
"""Append raw LLM activity to llm.log for debugging."""
LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LLM_LOG_PATH, "a") as f:
f.write(text + "\n")
def _update_journal(
self, add: list[str] | None = None, done: list[str] | None = None
) -> None:
"""Add or complete TODO items in journal.md."""
if not JOURNAL_PATH.exists():
JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n")
lines = JOURNAL_PATH.read_text().splitlines()
# Parse into sections: everything before TODO, TODO items, between, DONE items, after
todo_items: list[str] = []
done_items: list[str] = []
before_todo: list[str] = []
between: list[str] = []
after_done: list[str] = []
section = "before_todo"
for line in lines:
stripped = line.strip()
if stripped == "## TODO":
section = "todo"
before_todo.append(line)
elif stripped == "## DONE":
section = "done"
between.append(line)
elif section == "before_todo":
before_todo.append(line)
elif section == "todo":
if stripped.startswith("- "):
todo_items.append(stripped[2:])
else:
between.append(line)
elif section == "done":
if stripped.startswith("- "):
done_items.append(stripped[2:])
else:
after_done.append(line)
# Apply changes
if done:
done_set = set(done)
todo_items = [i for i in todo_items if i not in done_set]
new_done = [i for i in done if i not in done_items]
done_items.extend(new_done)
if add:
todo_set = set(todo_items)
new_todo = [i for i in add if i not in todo_set]
# Insert new items at the top of TODO
todo_items = new_todo + todo_items
# Reconstruct
out = list(before_todo)
for item in todo_items:
out.append(f"- {item}")
out.extend(between)
for item in done_items:
out.append(f"- {item}")
out.extend(after_done)
# Clean up: collapse multiple blank lines
cleaned = []
prev_blank = False
for line in out:
is_blank = line.strip() == ""
if is_blank and prev_blank:
continue
cleaned.append(line)
prev_blank = is_blank
# Ensure trailing newline
JOURNAL_PATH.write_text("\n".join(cleaned) + "\n")
# ── CLI entry point (for testing) ─────────────────────────────────────────
def main():
"""Generate a turn from the command line (debug/testing)."""
import argparse
parser = argparse.ArgumentParser(description="The Chaos Game Engine (CLI)")
parser.add_argument("--action", "-a", help="Player action text")
parser.add_argument("--last", "-l", help="Last narrative text")
args = parser.parse_args()
engine = GameEngine()
result = engine.generate(
player_action=args.action,
last_narrative=args.last,
)
if result.error:
print(f"ERROR: {result.error}", file=sys.stderr)
sys.exit(1)
print(result.narrative)
if result.choices:
print("\n--- Choices ---")
for c in result.choices:
print(f" [{c}]")
if result.log_entry:
print(f"\n[Log] {result.log_entry}")
if result.ambience:
print(f"[Ambience] {result.ambience}")
if __name__ == "__main__":
main()