splinter-keep/tools/engine.py
2026-06-28 18:40:52 +02:00

1387 lines
57 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
engine.py — The Chaos Game Engine
Owns the LLM interaction, prompt assembly, response parsing, and game state
persistence. The TUI (run.py) calls this module — they do not depend on each
other, only on the shared session/ file layout.
"""
from __future__ import annotations
import json
import re
import sys
from dataclasses import dataclass, field
from datetime import date, datetime
from pathlib import Path
from string import Template
from typing import Iterator, Optional
# ── Paths ──────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent.parent
SESSION_DIR = BASE_DIR / 'session'
CONFIG_PATH = SESSION_DIR / 'config.json'
CHAR_PATH = SESSION_DIR / 'character.md'
WORLD_PATH = SESSION_DIR / 'world.md'
BOOK_PATH = SESSION_DIR / 'book.md'
JOURNAL_PATH = SESSION_DIR / 'journal.md'
AMBIENCE_PATH = SESSION_DIR / 'ambience.md'
LOG_DIR = SESSION_DIR / 'log'
LLM_LOG_PATH = SESSION_DIR / 'llm.log'
AMBIENCE_OPTIONS_PATH = SESSION_DIR / "ambience_options.md"
CHANGES_PATH = SESSION_DIR / "changes.md"
AUDIO_DIR = SESSION_DIR / "audio"
TODAY = date.today().isoformat()
# ── Structured output ──────────────────────────────────────────────────────
@dataclass
class GenerationResult:
"""Legacy result — kept for backward compat with CLI main()."""
narrative: str
choices: list[str] = field(default_factory=list)
log_entry: Optional[str] = None
ambience: Optional[str] = None
character_updates: Optional[str] = None
world_updates: Optional[str] = None
journal_add: list[str] = field(default_factory=list)
journal_done: list[str] = field(default_factory=list)
error: Optional[str] = None
@dataclass
class TurnResult:
"""Output of a complete turn via finalize_turn tool."""
book_log: str = ""
user_prompt: str = ""
ambience: Optional[str] = None
log_entry: Optional[str] = None
error: Optional[str] = None
debug_info: str = ""
changes: list[str] = field(default_factory=list)
# ── DM System Prompt Template ──────────────────────────────────────────────
SYSTEM_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion.
## Rules
- **Odds**: 1d6, 4+ favourable, 3- trouble.
- **Traits**: 3d6, roll UNDER trait.
- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour.
- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed.
- **Modifiers**: Favourable +1, Risky -1, Desperate -2.
## Tools (action only)
Wrap in ```tool to perform an action:
```
{"tool": "roll", "args": {"dice": "1d6"}}
```
- **roll** — dice, modifier
- **player_roll** — dice, reason
- **character_update** — content: "full sheet" (if HP/cash/gear/stats change)
- **world_update** — content: "full world" (if NPCs/locations/threads change)
- **journal_update** — add: [...], done: [...]
You have the full state above — no need to look anything up. Just write the story and use tools when the player's action changes something.
## State
### Character
$character
### World
$world
### Log
$log
### Story
$story""")
PROSE_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion.
## Rules
- **Odds**: 1d6, 4+ favourable, 3- trouble.
- **Traits**: 3d6, roll UNDER trait.
- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour.
- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed.
- **Modifiers**: Favourable +1, Risky -1, Desperate -2.
A die is cast at the start of each turn — incorporate it into your narrative.
End your response with a `### Changes` block listing what changed:
### Changes
- Current Health: 3
- Cash: 45 silver
- Added to inventory: Silver key
- Removed from inventory: Torches (10)
- Replaced gear: Mace (1d6+1) → Mace (1d6+2)
- Note: Found a hidden passage
- Journal done: Defeat the demon
- Journal add: Investigate the mine
Only include lines for things that actually changed. Omit unused lines entirely.
## State
### Character
$character
### World
$world
### Log
$log
### Story
$story""")
# ── Game Engine ────────────────────────────────────────────────────────────
class GameEngine:
"""Owns the LLM interaction and game state persistence."""
def __init__(self, session_dir: str | Path = SESSION_DIR):
self.session_dir = Path(session_dir)
self.config: dict = {}
self._load_config()
# ── Config ──────────────────────────────────────────────────────────
def _load_config(self) -> None:
if not CONFIG_PATH.exists():
print(
"No session/config.json found. Creating default.\n"
"Edit the model field (e.g. 'ollama/llama3.1', 'openai/gpt-4', "
"'anthropic/claude-sonnet-4-20250514') and set api_key if needed.",
file=sys.stderr,
)
self.config = {
"llm": {
"model": "ollama/llama3.1",
"api_key": None,
"api_base": None,
"temperature": 0.8,
"max_tokens": 300,
}
}
self._save_config()
else:
raw = CONFIG_PATH.read_text()
self.config = json.loads(raw)
# Ensure api_key is None not empty string
llm = self.config.get("llm", {})
if not llm.get("api_key"):
llm["api_key"] = None
def _save_config(self) -> None:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(self.config, indent=2) + "\n")
@property
def model(self) -> str:
return self.config.get("llm", {}).get("model", "ollama/llama3.1")
@property
def api_key(self) -> str | None:
return self.config.get("llm", {}).get("api_key")
@property
def api_base(self) -> str | None:
return self.config.get("llm", {}).get("api_base")
@property
def temperature(self) -> float:
return self.config.get("llm", {}).get("temperature", 0.8)
@property
def max_tokens(self) -> int:
return self.config.get("llm", {}).get("max_tokens", 512)
def _set_llm_env(self) -> None:
"""Set provider-specific env vars for litellm."""
prefix = self.model.split("/")[0].upper()
import os
key = self.api_key or "sk-placeholder"
os.environ[f"{prefix}_API_KEY"] = key
if self.api_base:
os.environ[f"{prefix}_API_BASE"] = self.api_base
# ── Context Assembly ────────────────────────────────────────────────
def _read_file(self, path: Path) -> str:
return path.read_text().strip() if path.exists() else ""
def _read_recent_log(self, max_entries: int = 5) -> str:
"""Read the latest log file and return the last N entries."""
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
from datetime import timedelta
yesterday = (date.today() - timedelta(days=1)).isoformat()
log_path = LOG_DIR / f"{yesterday}.md"
if not log_path.exists():
return "*No recent events.*"
lines = log_path.read_text().splitlines()
entries = [l for l in lines if l.strip().startswith("- ")]
return "\n".join(entries[-max_entries:]) or "*No recent events.*"
def _read_recent_book(self, max_turns: int = 1) -> str:
"""Return the last N turns from the book as context."""
text = self._read_file(BOOK_PATH)
if not text:
return "*No prior story.*"
turns = text.split("\n## ")
recent = turns[-max_turns:]
return "\n## ".join(recent) if len(turns) > 1 else recent[0]
@staticmethod
def _truncate_world(text: str) -> str:
"""Extract key world context: NPCs, factions, active threads, rumours."""
if not text or text == "*No world state.*":
return text
sections = re.split(r"\n(?=## |### )", text)
parts = []
for sec in sections:
header = sec.split("\n")[0].strip() if sec else ""
if "Active Threads" in header:
parts.append(sec)
elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header:
parts.append(sec)
result = "\n\n".join(parts)
return result or text[:1500] + "\n_(world truncated)_"
def _get_valid_ambiences(self) -> set[str]:
"""Parse ambience_options.md and return set of valid ambience names with associated audio files."""
valid = {"silence"} # silence always valid (stops music)
if not AMBIENCE_OPTIONS_PATH.exists():
return valid
in_table = False
for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines():
s = line.strip()
if not s.startswith("|") or not s.endswith("|"):
in_table = False
continue
if in_table and all(c in "-:| " for c in s):
continue
parts = [p.strip() for p in s.split("|") if p.strip()]
if not parts:
continue
if not in_table:
in_table = True
continue
name = parts[0].lower()
files_str = parts[1] if len(parts) > 1 else ""
files = [f.strip() for f in files_str.split(",")]
# Only add if at least one file exists (or is listed)
has_files = any((AUDIO_DIR / f).exists() or f for f in files)
if has_files:
valid.add(name)
return valid
def build_system_prompt(self) -> str:
"""Assemble the system prompt with current game state."""
char = self._read_file(CHAR_PATH) or "*No character sheet.*"
world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*"
log = self._read_recent_log()
story = self._read_recent_book()
return SYSTEM_PROMPT.substitute(
character=char, world=world, log=log, story=story
)
def build_user_message(
self,
player_action: str | None = None,
last_prompt: str | None = None,
**kwargs: str | None,
) -> str:
"""Build the user message for this turn's LLM call."""
if kwargs:
raise TypeError(
f"build_user_message() got unexpected keyword arguments: "
f"{set(kwargs)}. Did you mean 'last_prompt' instead of one of these?"
)
parts = []
if last_prompt:
parts.append(f"## Situation\n{last_prompt}")
if player_action:
parts.append(f"## Player's Request\n{player_action}")
has_existing_story = bool(
self._read_file(BOOK_PATH).strip()
) if not last_prompt else True
if not player_action and not last_prompt:
if has_existing_story:
raise RuntimeError(f"User action is required for every turn.")
else:
parts.append(
"## Instructions\n"
"This is a new story. Welcome the player and guide them through the game setup."
)
else:
parts.append(
"## Instructions\n"
"Advance the story based on the player's request. "
"All state is shown above — write the outcome directly."
)
return "\n\n".join(parts)
# ── LLM Call ────────────────────────────────────────────────────────
def generate(
self,
player_action: str | None = None,
last_narrative: str | None = None,
) -> GenerationResult:
"""
Synchronous generation. Calls the LLM, parses the response,
and returns a GenerationResult.
The TUI calls this from a worker thread — see run.py.
"""
system = self.build_system_prompt()
user = self.build_user_message(
player_action=player_action, last_prompt=last_narrative
)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
try:
import litellm
except ImportError:
return GenerationResult(
narrative="",
error=(
"litellm is not installed. Run: pip install litellm"
),
)
# Set API key / base if provided
self._set_llm_env()
try:
response = litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
stream=False,
timeout=60,
)
text = response.choices[0].message.content or ""
except Exception as e:
return GenerationResult(
narrative="",
error=f"LLM call failed: {e}",
)
return self.parse_response(text)
def generate_stream(
self,
player_action: str | None = None,
last_narrative: str | None = None,
) -> Iterator[str]:
"""
Streaming generator. Yields text chunks as they arrive from the LLM.
On completion, the final yield is the FULL text (for parsing).
"""
system = self.build_system_prompt()
user = self.build_user_message(
player_action=player_action, last_prompt=last_narrative
)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
try:
import litellm
except ImportError:
yield json.dumps({
"error": "litellm is not installed. Run: pip install litellm"
})
return
self._set_llm_env()
try:
response = litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
stream=True,
timeout=60,
)
full_text = ""
for chunk in response:
delta = chunk.choices[0].delta.content or ""
if delta:
full_text += delta
yield full_text # partial narrative for streaming display
# Final yield is the completed text
yield full_text
except Exception as e:
yield json.dumps({"error": f"LLM call failed: {e}"})
# ── Tool Infrastructure ────────────────────────────────────────────
TOOL_REGISTRY: dict[str, dict] = {
"roll": {"description": "Roll dice.", "args": {"dice": "1d6", "modifier": "+1"}},
"player_roll": {"description": "Ask player to roll.", "args": {"dice": "1d6", "reason": "why"}},
"modify_traits": {"description": "Change STR/DEX/WIL.", "args": {"str": "optional", "dex": "optional", "wil": "optional"}},
"modify_vitals": {"description": "Change HP, cash, weapon, armour.", "args": {"current_hp": "optional", "max_hp": "optional", "cash": "optional", "weapon": "optional", "armour": "optional"}},
"add_to_inventory": {"description": "Add item to gear.", "args": {"item": "item name and stats"}},
"remove_from_inventory": {"description": "Remove item from gear.", "args": {"item": "exact item text"}},
"replace_gear": {"description": "Replace gear by exact match.", "args": {"before": "exact text", "after": "new text"}},
"add_note": {"description": "Add note to sheet.", "args": {"note": "note content"}},
"replace_note": {"description": "Replace note by exact match.", "args": {"before": "exact text", "after": "new text"}},
"world_update": {"description": "Replace world state.", "args": {"content": "full world markdown"}},
"journal_update": {"description": "Update TODO/DONE.", "args": {"add": "[...]", "done": "[...]"}},
"finalize_turn": {"description": "End turn.", "args": {"user_prompt": "question for player", "ambience": "soundscape name"}},
}
def _tool_think(self, args: dict) -> str:
"""Think tool — content is displayed via dm_status in the status bar."""
return ""
def _tool_read_file(self, args: dict) -> str:
filename = (args or {}).get("file", "")
paths = {
"character": CHAR_PATH,
"world": WORLD_PATH,
"book": BOOK_PATH,
"log": LOG_DIR / f"{TODAY}.md",
"journal": JOURNAL_PATH,
}
path = paths.get(filename)
if not path:
return f"Unknown file: {filename}. Choose from: {', '.join(paths)}"
return self._read_file(path) or f"*{filename} is empty.*"
def _tool_roll(self, args: dict) -> str:
import random
dice_str = (args or {}).get("dice", "1d6")
modifier_str = (args or {}).get("modifier", "0")
try:
count, sides = dice_str.lower().split("d")
count = int(count) if count else 1
sides = int(sides)
except (ValueError, TypeError):
return f"Invalid dice: {dice_str}. Use format like '2d6'."
mod = 0
if modifier_str:
try:
mod = int(modifier_str)
except ValueError:
pass
rolls = [random.randint(1, sides) for _ in range(count)]
total = sum(rolls) + mod
mod_str = f" {'+' if mod >= 0 else ''}{mod}" if mod != 0 else ""
return f"Roll: {dice_str}{mod_str} → [{', '.join(str(r) for r in rolls)}] = {total}"
def _patch_character(self, pattern: str, repl: str, count: int = 1, flags: int = 0) -> str:
"""Apply a regex replacement to character.md. Returns error msg or empty string."""
text = CHAR_PATH.read_text()
new, n = re.subn(pattern, repl, text, count=count, flags=flags)
if n == 0:
return f"**Error:** pattern not found:\n{pattern}"
CHAR_PATH.write_text(new)
return ""
def _tool_modify_traits(self, args: dict) -> str:
errors = []
for stat in ("str", "dex", "wil"):
val = args.get(stat)
if val is not None:
err = self._patch_character(
rf"^(- \*\*{stat.upper()}:\*\*\s*)\d+", rf"\g<1>{val}", count=1, flags=re.MULTILINE
)
if err:
errors.append(err)
return "; ".join(errors) if errors else "Traits updated."
def _tool_modify_vitals(self, args: dict) -> str:
errors = []
for field, label in [("current_hp", "Current Health"), ("max_hp", "Max Health"),
("cash", "Cash"), ("weapon", "Weapon"), ("armour", "Armour")]:
val = args.get(field)
if val is not None:
err = self._patch_character(
rf"^(- \*\*{label}:\*\*\s*).*", rf"\g<1>{val}", count=1, flags=re.MULTILINE
)
if err:
errors.append(err)
return "; ".join(errors) if errors else "Vitals updated."
def _tool_add_to_inventory(self, args: dict) -> str:
item = (args or {}).get("item", "")
if not item:
return "**Error:** `item` is required."
text = CHAR_PATH.read_text()
if item in text:
return f"Item already in inventory: {item}"
# Insert after last gear item or after "## Gear" header
gear_section = re.search(r"^## Gear\n", text, re.MULTILINE)
if gear_section:
insert_at = gear_section.end()
text = text[:insert_at] + f"- {item}\n" + text[insert_at:]
else:
text += f"\n## Gear\n- {item}\n"
CHAR_PATH.write_text(text)
return f"Added to inventory: {item}"
def _tool_remove_from_inventory(self, args: dict) -> str:
item = (args or {}).get("item", "")
if not item:
return "**Error:** `item` is required."
err = self._patch_character(rf"^- {re.escape(item)}\n?", "", count=1, flags=re.MULTILINE)
if err:
return f"**Error:** item not found: {item}"
return f"Removed from inventory: {item}"
def _tool_replace_gear(self, args: dict) -> str:
before = (args or {}).get("before", "")
after = (args or {}).get("after", "")
if not before or not after:
return "**Error:** `before` and `after` are required."
err = self._patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE)
if err:
return f"**Error:** gear not found: {before}"
return f"Gear replaced: {before}{after}"
def _tool_add_note(self, args: dict) -> str:
note = (args or {}).get("note", "")
if not note:
return "**Error:** `note` is required."
text = CHAR_PATH.read_text()
notes_section = re.search(r"^## Notes & Scribbles\n", text, re.MULTILINE)
if notes_section:
text = text[:notes_section.end()] + f"- {note}\n" + text[notes_section.end():]
else:
text += f"\n## Notes & Scribbles\n- {note}\n"
CHAR_PATH.write_text(text)
return f"Note added: {note}"
def _tool_replace_note(self, args: dict) -> str:
before = (args or {}).get("before", "")
after = (args or {}).get("after", "")
if not before or not after:
return "**Error:** `before` and `after` are required."
err = self._patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE)
if err:
return f"**Error:** note not found: {before}"
return f"Note replaced."
def _tool_world_update(self, args: dict) -> str:
content = (args or {}).get("content", "")
if not content:
return "**Error:** `content` is required."
if not self._validate_update_size("world", content, WORLD_PATH):
return "**Error:** Update rejected — content is too short (likely a partial paste)."
WORLD_PATH.write_text(content.strip() + "\n")
return "World state updated."
def _tool_journal_update(self, args: dict) -> str:
add = (args or {}).get("add", [])
done = (args or {}).get("done", [])
if isinstance(add, str):
add = [add]
if isinstance(done, str):
done = [done]
if not add and not done:
return "**Error:** Provide at least one of `add` or `done`."
self._update_journal(add=add, done=done)
return "Journal updated."
@staticmethod
def _describe_tool_action(tool_name: str, args: dict) -> str:
"""Return a user-facing status message for a tool call.
Prefer the LLM-provided dm_status — otherwise fall back to a generic description."""
dm_status = (args or {}).get("dm_status")
if dm_status:
return f"DM is {dm_status}..."
read_descriptions = {
"character": "reading the character sheet",
"world": "consulting the world map",
"book": "reviewing the story so far",
"log": "checking the session log",
"journal": "scanning the journal",
}
if tool_name == "read_file":
file = (args or {}).get("file", "")
desc = read_descriptions.get(file, f"reading {file}")
elif tool_name in ("character_get", "world_get", "journal_get"):
file = tool_name.replace("_get", "")
desc = read_descriptions.get(file, f"reading {file}")
elif tool_name in ("character_update", "world_update"):
desc = "updating the records"
elif tool_name == "journal_update":
desc = "updating the journal"
elif tool_name == "roll":
dice = (args or {}).get("dice", "1d6")
mod = (args or {}).get("modifier")
desc = f"rolling {dice}"
if mod:
desc += f" {mod}"
elif tool_name == "player_roll":
dice = (args or {}).get("dice", "1d6")
desc = f"asking you to roll {dice}"
elif tool_name == "modify_traits":
desc = "updating traits"
elif tool_name == "modify_vitals":
desc = "updating vitals"
elif tool_name == "add_to_inventory":
desc = "adding item to inventory"
elif tool_name == "remove_from_inventory":
desc = "removing item from inventory"
elif tool_name == "replace_gear":
desc = "replacing gear"
elif tool_name == "add_note":
desc = "adding note"
elif tool_name == "replace_note":
desc = "replacing note"
else:
desc = f"using {tool_name}"
return f"DM is {desc}..."
@staticmethod
def _describe_change(tool_name: str, args: dict) -> str:
"""Build a compact human-readable change description from a tool call."""
if tool_name == "modify_vitals":
parts = []
for k, v in args.items():
label = k.replace("_", " ").title()
parts.append(f"{label}: {v}")
return f"{', '.join(parts)}" if parts else ""
elif tool_name == "modify_traits":
parts = []
for k, v in args.items():
parts.append(f"{k.upper()}: {v}")
return f"{', '.join(parts)}"
elif tool_name == "add_to_inventory":
return f"+ {args.get('item', '?')}"
elif tool_name == "remove_from_inventory":
return f" {args.get('item', '?')}"
elif tool_name == "replace_gear":
return f"{args.get('before', '?')}{args.get('after', '?')}"
elif tool_name == "add_note":
note = args.get("note", "?")
return f"📝 {note[:60]}{'' if len(note) > 60 else ''}"
elif tool_name == "replace_note":
return f"📝 {args.get('before', '?')[:40]}{args.get('after', '?')[:40]}"
elif tool_name == "world_update":
return "🌍 World updated"
elif tool_name == "journal_update":
parts = []
for a in args.get("add", []):
parts.append(f"📋 {a}")
for d in args.get("done", []):
parts.append(f"{d}")
return "; ".join(parts) if parts else ""
return ""
def _execute_tool(self, tool_name: str, args: dict) -> str:
fn_map = {
"roll": self._tool_roll,
"modify_traits": self._tool_modify_traits,
"modify_vitals": self._tool_modify_vitals,
"add_to_inventory": self._tool_add_to_inventory,
"remove_from_inventory": self._tool_remove_from_inventory,
"replace_gear": self._tool_replace_gear,
"add_note": self._tool_add_note,
"replace_note": self._tool_replace_note,
"world_update": self._tool_world_update,
"journal_update": self._tool_journal_update,
}
fn = fn_map.get(tool_name)
if not fn:
return f"Unknown tool: {tool_name}"
try:
return fn(args)
except Exception as e:
import traceback
tb = traceback.format_exc()
self._append_llm_log(f"\n--- TOOL ERROR ({tool_name}) ---\n{tb}")
return f"Tool error ({tool_name}): {e}"
@staticmethod
def _extract_thoughts(text: str) -> list[str]:
pattern = r"```thought\s*\n?(.*?)```"
return re.findall(pattern, text, re.DOTALL)
@staticmethod
def _extract_tool_calls(text: str, *, round_num: int = 0, on_debug: callable = None) -> list[dict]:
"""Extract tool calls from ```tool and ```json blocks.
Uses json.JSONDecoder.raw_decode for strict parsing; falls back to
heuristics if the LLM produces unescaped newlines in string values.
"""
calls = []
seen = set()
def _try_parse(raw: str) -> dict | None:
try:
obj = json.loads(raw)
if isinstance(obj, dict) and "tool" in obj:
return obj
except json.JSONDecodeError:
pass
return None
for m in re.finditer(r"```(?:tool|json|finalize_turn)\s*\n?", text):
fence_type = m.group(0).strip("``` \n\r")
# 1) Strict: raw_decode from where the JSON should start
obj = None
try:
decoder = json.JSONDecoder()
obj, end = decoder.raw_decode(text, m.end())
except (json.JSONDecodeError, ValueError, StopIteration):
pass
if obj is None:
# 2) Fallback: find closing backticks and repair unescaped newlines in strings
close = text.find("```", m.end())
if close > 0:
raw = text[m.end():close].strip()
def _escape_in_strings(s: str) -> str:
return re.sub(r'"(?:[^"\\]|\\.)*"', lambda x: x.group(0).replace("\n", "\\n"), s, flags=re.DOTALL)
repaired = _escape_in_strings(raw)
obj = _try_parse(repaired)
if obj is not None and isinstance(obj, dict):
# Normalize: fence type "finalize_turn" means the JSON is the args directly
if fence_type == "finalize_turn":
obj = {"tool": "finalize_turn", "args": obj}
# If JSON has a "tool" key, keep as-is
if "tool" not in obj:
obj = None
if obj is not None:
key = (obj["tool"], json.dumps(obj.get("args", {}), sort_keys=True))
if key not in seen:
seen.add(key)
calls.append(obj)
elif on_debug:
preview = text[m.end():m.end() + 120].replace("\n", "\\n")
on_debug("parse_error", {"round": round_num, "content": preview})
return calls
@staticmethod
def _extract_final_json(text: str) -> dict | None:
pattern = r"```json\s*\n?(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
if not matches:
return None
try:
return json.loads(matches[-1].strip())
except json.JSONDecodeError:
return None
def _call_llm(self, messages: list[dict], *, label: str = "", max_tokens: int | None = None, on_debug: callable = None) -> str | None:
"""Make a single LLM call. Returns content text or None on error."""
try:
import litellm
except ImportError:
if on_debug:
on_debug("llm_error", {"label": label, "error": "litellm not installed"})
return None
try:
response = litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
stream=False,
timeout=60,
max_tokens=max_tokens or self.max_tokens,
)
text = response.choices[0].message.content or ""
self._append_llm_log(f"\n--- {label} ---\n{text}")
return text
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
self._append_llm_log(f"\n--- LLM ERROR ({label}) ---\n{err_msg}")
if on_debug:
on_debug("llm_error", {"label": label, "error": err_msg})
return None
def generate_with_tools(
self,
player_action: str | None = None,
last_prompt: str | None = None,
on_thought: callable = None,
on_action: callable = None,
on_player_roll: callable = None,
on_debug: callable = None,
) -> TurnResult:
"""
Three-phase generation:
1. **Prose** — LLM writes the full book_log from context + player action.
2. **Summarize** — LLM condenses the book_log into one log line.
3. **Extract** — LLM reads the book_log and outputs tool calls for state changes.
"""
self._set_llm_env()
from datetime import datetime
self._append_llm_log(f"\n{'='*60}")
self._append_llm_log(f"=== Turn — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
self._append_llm_log(f"{'='*60}")
if player_action:
self._append_llm_log(f"Player: {player_action}")
elif last_prompt:
self._append_llm_log(f"Resume from: {last_prompt[:120]}")
# ── Outer loop: Phase 1 (prose) → Phase 2 (summarize) → Phase 3 (extract) ──
import random
die_roll = random.randint(1, 6)
self._append_llm_log(f"Dice: {die_roll} (1d6)")
book_log = None
changes_block = ""
log_entry = None
user_prompt = self._auto_prompt("")
ambience = None
debug_info = ""
changes = []
for outer_attempt in range(3):
# ── Phase 1: Prose ────────────────────────────────────────────
if on_action:
on_action(f"Phase 1/3: writing story (dice={die_roll})")
if on_debug:
on_debug("phase", {"phase": 1, "name": "prose", "status": "start", "dice": die_roll, "outer_attempt": outer_attempt + 1})
system = PROSE_PROMPT.substitute(
character=self._read_file(CHAR_PATH) or "*No character sheet.*",
world=self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*",
log=self._read_recent_log(),
story=self._read_recent_book(),
)
user = self.build_user_message(
player_action=player_action,
last_prompt=last_prompt,
)
user += f"\n\n*A die is cast: **{die_roll}** (1d6).*"
text = self._call_llm([
{"role": "system", "content": system},
{"role": "user", "content": user},
], label=f"Prose attempt {outer_attempt + 1}", max_tokens=1024, on_debug=on_debug)
if not text or not text.strip():
if on_debug:
on_debug("phase", {"phase": 1, "status": "empty", "attempt": outer_attempt + 1})
continue
raw = text.strip()
changes_block = ""
if "### Changes" in raw:
parts = raw.split("### Changes", 1)
book_log = parts[0].strip()
changes_block = "### Changes" + parts[1]
else:
book_log = raw
if on_debug:
preview = book_log[:150].replace("\n", "\\n")
on_debug("phase", {"phase": 1, "status": "done", "chars": len(book_log), "changes": bool(changes_block), "preview": preview})
# ── Validation ────────────────────────────────────────────────
if on_debug:
on_debug("phase", {"phase": 1, "name": "validation", "status": "start"})
valid, reason = self._validate_narrative(book_log, on_debug=on_debug)
if not valid:
if on_debug:
on_debug("phase", {"phase": 1, "status": "validation_failed", "reason": reason, "outer_attempt": outer_attempt + 1})
book_log = None
continue
# ── Phase 2: Summarize ────────────────────────────────────────
if on_action:
on_action("Phase 2/3: summarizing story")
if on_debug:
on_debug("phase", {"phase": 2, "name": "summarize", "status": "start"})
log_context = self._read_recent_log()
log_entry = None
for p2_attempt in range(2):
context = book_log
if changes_block:
context += f"\n\n{changes_block}"
text = self._call_llm([
{"role": "user", "content":
f"Given the session log so far, summarize the new story in one line. "
f"Focus on who was involved (character and NPC names):\n\n"
f"## Session Log\n{log_context}\n\n"
f"## New Story\n{context}"}
], label=f"Summarize attempt {p2_attempt + 1}", on_debug=on_debug)
if text and text.strip():
log_entry = text.strip().split("\n")[0][:300]
if on_debug:
on_debug("phase", {"phase": 2, "status": "done", "summary": log_entry})
break
if not log_entry:
log_entry = book_log.split("\n")[0][:120]
if on_debug:
on_debug("phase", {"phase": 2, "status": "fallback", "summary": log_entry})
# ── Phase 3: Extract state changes ────────────────────────────
if on_action:
on_action("Phase 3/3: extracting state changes")
if on_debug:
on_debug("phase", {"phase": 3, "name": "extract", "status": "start"})
user_prompt = self._auto_prompt(book_log)
ambience = None
phase3_errors = []
previous_attempt = None # {output, feedback}
phase3_ok = False
for p3_attempt in range(5):
current_char = self._read_file(CHAR_PATH) or "*No character.*"
current_world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world.*"
phase3_prompt = (
f"## Current Character\n{current_char}\n\n"
f"## Current World\n{current_world}\n\n"
f"## Story\n{book_log}\n\n"
)
if changes_block.strip():
phase3_prompt += (
f"## Changes to apply\n{changes_block}\n\n"
f"Convert the listed changes into tool calls:\n\n"
)
else:
phase3_prompt += (
f"Read the story and compare with current state. Output tool calls for any changes:\n\n"
)
phase3_prompt += (
f"Output ```tool blocks for changes only. Examples:\n\n"
)
if previous_attempt:
phase3_prompt += (
f"--- PREVIOUS ATTEMPT (had errors) ---\n"
f"{previous_attempt['output']}\n\n"
f"--- FEEDBACK ---\n"
f"{previous_attempt['feedback']}\n\n"
f"Fix the issues above. Output corrected tool calls only.\n\n"
)
text = self._call_llm([
{"role": "user", "content": phase3_prompt +
f"```tool\n{{\"tool\": \"modify_vitals\", \"args\": {{\"current_hp\": 5, \"cash\": 45}}}}\n```\n"
f"```tool\n{{\"tool\": \"modify_traits\", \"args\": {{\"dex\": 15}}}}\n```\n"
f"```tool\n{{\"tool\": \"add_to_inventory\", \"args\": {{\"item\": \"Silver key\"}}}}\n```\n"
f"```tool\n{{\"tool\": \"remove_from_inventory\", \"args\": {{\"item\": \"Torches (10)\"}}}}\n```\n"
f"```tool\n{{\"tool\": \"replace_gear\", \"args\": {{\"before\": \"Mace (1d6+1)\", \"after\": \"Mace (1d6+2, sharpened)\"}}}}\n```\n"
f"```tool\n{{\"tool\": \"add_note\", \"args\": {{\"note\": \"Found a hidden passage under the temple\"}}}}\n```\n"
f"```tool\n{{\"tool\": \"replace_note\", \"args\": {{\"before\": \"Old note text\", \"after\": \"New note text\"}}}}\n```\n"
f"```tool\n{{\"tool\": \"world_update\", \"args\": {{\"content\": \"# The World\\n\\n...full new world state...\"}}}}\n```\n"
f"```tool\n{{\"tool\": \"journal_update\", \"args\": {{\"add\": [\"Investigate the mine\"], \"done\": [\"Defeat the demon\"]}}}}\n```\n"
f"```tool\n{{\"tool\": \"finalize_turn\", \"args\": {{\"user_prompt\": \"What do you do?\", \"ambience\": \"dungeon\"}}}}\n```\n\n"
f"Only output tools for things that actually changed. Omit unchanged fields."}
], label=f"Extract attempt {p3_attempt + 1}", on_debug=on_debug)
if not text or not text.strip():
if on_debug:
on_debug("phase", {"phase": 3, "status": "empty", "attempt": p3_attempt + 1})
continue
tool_calls = self._extract_tool_calls(
text, round_num=p3_attempt + 1, on_debug=on_debug
)
if on_debug and tool_calls:
names = [tc.get("tool", "?") for tc in tool_calls if tc.get("tool") != "finalize_turn"]
fin = any(tc.get("tool") == "finalize_turn" for tc in tool_calls)
on_debug("phase", {"phase": 3, "status": "tools_found", "tools": names, "has_finalize": fin})
errors = []
attempt_changes = []
for tc in tool_calls:
name = tc.get("tool", "?")
args = tc.get("args", {})
if name == "finalize_turn":
if args.get("user_prompt"):
user_prompt = args["user_prompt"]
if args.get("ambience"):
ambience = args["ambience"]
continue
if on_action:
on_action(f"State: {self._describe_tool_action(name, args)}")
if on_debug:
on_debug("tool_call", {"round": p3_attempt + 1, "tool": name, "args": args})
if name == "player_roll" and on_player_roll:
dice = args.get("dice", "1d6")
reason = args.get("reason", "a check")
roll_val = on_player_roll(dice, reason)
result = f"Player rolled {dice} for '{reason}': {roll_val}"
else:
result = self._execute_tool(name, args)
if result.startswith("**Error:") or result.startswith("Tool error") or result.startswith("Unknown"):
errors.append(f"{name}: {result}")
else:
desc = self._describe_change(name, args)
if desc:
attempt_changes.append(desc)
if on_debug:
on_debug("tool_result", {"round": p3_attempt + 1, "tool": name, "result": result})
if not errors:
phase3_ok = True
debug_info = ""
changes = attempt_changes
if on_debug:
on_debug("phase", {"phase": 3, "status": "done", "applied": len([tc for tc in tool_calls if tc.get("tool") != "finalize_turn"])})
break
phase3_errors = errors
debug_info = "; ".join(errors)
if on_debug:
on_debug("phase", {"phase": 3, "status": "errors", "errors": errors, "attempt": p3_attempt + 1})
# Build feedback for the LLM to fix on next attempt
feedback_lines = ["The previous tool calls had errors:"]
for e in errors:
feedback_lines.append(f"- {e}")
feedback_lines.append("")
feedback_lines.append("Fix ALL issues above. Use correct tool names, valid JSON, and reasonable values.")
previous_attempt = {"output": text, "feedback": "\n".join(feedback_lines)}
if phase3_ok:
break # All phases succeeded on this outer attempt
# Phase 3 failed after 5 attempts — retry from Phase 1
if on_debug:
on_debug("phase", {"phase": 3, "status": "exhausted", "errors": phase3_errors})
on_debug("phase", {"phase": 1, "status": "retry_after_phase3_failure", "outer_attempt": outer_attempt + 1})
book_log = None # Reset so Phase 1 runs again on next outer iteration
if not book_log:
return TurnResult(error="Generation failed after exhausting all retries")
# ── Finalize ──────────────────────────────────────────────────────
if on_action:
on_action("Turn complete")
if on_debug:
on_debug("phase_done", {
"book_log_chars": len(book_log),
"log_entry": log_entry,
"user_prompt": user_prompt,
"ambience": ambience,
"extract_errors": debug_info or None,
})
self._append_llm_log(
f"\n--- FINAL ---\n"
f"book_log: {book_log[:200]}\n"
f"log_entry: {log_entry}\n"
f"user_prompt: {user_prompt}\n"
f"ambience: {ambience}\n"
)
return TurnResult(
book_log=book_log,
log_entry=log_entry,
user_prompt=user_prompt,
ambience=ambience,
debug_info=debug_info,
changes=changes,
)
@staticmethod
def _strip_tool_blocks(text: str) -> str:
"""Remove ```tool, ```json, finalize_turn blocks from narrative text."""
return re.sub(
r'```(?:tool|json|finalize_turn)\s*\n?.*?```',
'',
text,
flags=re.DOTALL,
).strip()
@staticmethod
def _auto_prompt(book_log: str) -> str:
"""Fallback player prompt."""
return "**What do you do?**"
def _validate_narrative(self, book_log: str, *, on_debug: callable = None) -> tuple[bool, str]:
"""Check if book_log is acceptable narrative. Returns (ok, reason)."""
lines = book_log.strip().split("\n")
if not lines:
return False, "Empty narrative"
# 1) Heuristic: high repetition count
from collections import Counter
common = Counter(lines).most_common(1)
if common and common[0][1] >= 5:
return False, f"Repetition: '{common[0][0][:60]}' ×{common[0][1]}"
# 2) Heuristic: game mechanics bleedthrough
mech_lines = [l for l in lines if re.match(
r'^\*\*(?:Roll|Damage|Success|Failure|Check|Save|Hit|Miss|'
r'Strenght|Dexterity|Willpower|STR|DEX|WIL|'
r'(?:[A-Z][a-z]+(?: \(\w+\))?:))',
l
)]
if mech_lines:
ratio = len(mech_lines) / len(lines)
if ratio > 0.3:
return False, f"Game mechanics dominate ({len(mech_lines)}/{len(lines)} lines)"
# 3) Heuristic: tool / json blocks leaked into narrative
if re.search(r'```(?:tool|json)', book_log):
return False, "Contains unprocessed tool blocks"
# 4) Heuristic: under 50 characters of real prose
prose = re.sub(r'[*_#>`~\-\d]', '', book_log).strip()
if len(prose) < 50:
return False, "Too short to be meaningful"
# 5) LLM quality rating (only if heuristics pass)
text = self._call_llm([
{"role": "user", "content":
f"Rate this RPG narrative quality 1-5.\n"
f"1 = unreadable (spam, repetition, pure mechanics, garbled)\n"
f"2 = poor (mostly mechanics, little story)\n"
f"3 = acceptable (some narrative but rough)\n"
f"4 = good (solid prose, minor issues)\n"
f"5 = excellent (vivid, engaging)\n"
f"Reply with ONLY a single digit 1-5.\n\n"
f"{book_log[:600]}"}
], label="Narrative validation", max_tokens=2, on_debug=on_debug)
if text and text.strip().isdigit():
score = int(text.strip())
if score < 3:
return False, f"Quality score: {score}/5"
return True, ""
# ── Response Parsing ────────────────────────────────────────────────
@staticmethod
def parse_response(text: str) -> GenerationResult:
"""
Parse a full LLM response into a GenerationResult.
Extracts the JSON block and splits narrative from it.
"""
# Check for error JSON
if text.startswith('{"error":'):
try:
err = json.loads(text).get("error", "Unknown error")
except json.JSONDecodeError:
err = "Unknown error"
return GenerationResult(narrative="", error=err)
# Try to find a ```json ... ``` block
json_pattern = r"```json\s*\n?(.*?)\n?```"
matches = re.findall(json_pattern, text, re.DOTALL)
narrative = text
data = {}
if matches:
json_str = matches[-1].strip()
# Remove the JSON block from narrative
narrative = text[: text.rfind("```json")]
# Also strip any stray "book_log:" lines that may appear before the JSON block
narrative_lines = []
for line in narrative.splitlines():
if not line.lstrip().startswith('book_log:'):
narrative_lines.append(line)
narrative = "\n".join(narrative_lines).strip()
try:
data = json.loads(json_str)
except json.JSONDecodeError:
pass
else:
# Fallback: maybe the entire response is JSON (no fence)
text_stripped = text.strip()
if text_stripped.startswith("{") and text_stripped.endswith("}"):
try:
data = json.loads(text_stripped)
narrative = data.get("narrative", "")
except json.JSONDecodeError:
pass
return GenerationResult(
narrative=narrative or text,
choices=data.get("choices", []),
log_entry=data.get("log_entry"),
ambience=data.get("ambience"),
character_updates=data.get("character_updates"),
world_updates=data.get("world_updates"),
journal_add=data.get("journal_add", []),
journal_done=data.get("journal_done", []),
)
# ── State Persistence ───────────────────────────────────────────────
def _validate_update_size(self, name: str, new_content: str, path: Path) -> bool:
"""Reject updates that are more than 30% shorter than the existing file
— likely the LLM pasted a fragment instead of the full state."""
if not path.exists():
return True
old = path.read_text().strip()
if not old:
return True
ratio = len(new_content) / len(old)
if ratio < 0.7:
import sys
print(
f"WARNING: {name} update rejected ({ratio:.0%} of original size "
f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.",
file=sys.stderr,
)
return False
return True
def apply_state(self, result: TurnResult) -> None:
"""Write state changes from a TurnResult to disk."""
if result.ambience:
AMBIENCE_PATH.write_text(result.ambience.strip().lower() + "\n")
if result.changes:
CHANGES_PATH.write_text("\n".join(result.changes) + "\n")
else:
CHANGES_PATH.write_text("")
def archive_turn(self, narrative: str) -> None:
"""Append the narrative as a new turn in book.md."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
heading = f"\n\n## Turn — {timestamp}\n\n"
BOOK_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(BOOK_PATH, "a") as f:
f.write(heading + narrative.strip() + "\n")
def append_log(self, entry: str) -> None:
"""Append a log entry to today's log file."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
log_path.write_text(f"# Session Log — {TODAY}\n\n")
with open(log_path, "a") as f:
f.write(entry.strip() + "\n")
def _append_llm_log(self, text: str) -> None:
"""Append raw LLM activity to llm.log for debugging."""
LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LLM_LOG_PATH, "a") as f:
f.write(text + "\n")
def _update_journal(
self, add: list[str] | None = None, done: list[str] | None = None
) -> None:
"""Add or complete TODO items in journal.md."""
if not JOURNAL_PATH.exists():
JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n")
lines = JOURNAL_PATH.read_text().splitlines()
# Parse into sections: everything before TODO, TODO items, between, DONE items, after
todo_items: list[str] = []
done_items: list[str] = []
before_todo: list[str] = []
between: list[str] = []
after_done: list[str] = []
section = "before_todo"
for line in lines:
stripped = line.strip()
if stripped == "## TODO":
section = "todo"
before_todo.append(line)
elif stripped == "## DONE":
section = "done"
between.append(line)
elif section == "before_todo":
before_todo.append(line)
elif section == "todo":
if stripped.startswith("- "):
todo_items.append(stripped[2:])
else:
between.append(line)
elif section == "done":
if stripped.startswith("- "):
done_items.append(stripped[2:])
else:
after_done.append(line)
# Apply changes
if done:
done_set = set(done)
todo_items = [i for i in todo_items if i not in done_set]
new_done = [i for i in done if i not in done_items]
done_items.extend(new_done)
if add:
todo_set = set(todo_items)
new_todo = [i for i in add if i not in todo_set]
# Insert new items at the top of TODO
todo_items = new_todo + todo_items
# Reconstruct
out = list(before_todo)
for item in todo_items:
out.append(f"- {item}")
out.extend(between)
for item in done_items:
out.append(f"- {item}")
out.extend(after_done)
# Clean up: collapse multiple blank lines
cleaned = []
prev_blank = False
for line in out:
is_blank = line.strip() == ""
if is_blank and prev_blank:
continue
cleaned.append(line)
prev_blank = is_blank
# Ensure trailing newline
JOURNAL_PATH.write_text("\n".join(cleaned) + "\n")
# ── CLI entry point (for testing) ─────────────────────────────────────────
def main():
"""Generate a turn from the command line (debug/testing)."""
import argparse
parser = argparse.ArgumentParser(description="The Chaos Game Engine (CLI)")
parser.add_argument("--action", "-a", help="Player action text")
parser.add_argument("--last", "-l", help="Last narrative text")
args = parser.parse_args()
engine = GameEngine()
result = engine.generate(
player_action=args.action,
last_narrative=args.last,
)
if result.error:
print(f"ERROR: {result.error}", file=sys.stderr)
sys.exit(1)
print(result.narrative)
if result.choices:
print("\n--- Choices ---")
for c in result.choices:
print(f" [{c}]")
if result.log_entry:
print(f"\n[Log] {result.log_entry}")
if result.ambience:
print(f"[Ambience] {result.ambience}")
if __name__ == "__main__":
main()