#!/usr/bin/env python3 """ engine.py — The Chaos Game Engine Owns the LLM interaction, prompt assembly, response parsing, and game state persistence. The TUI (run.py) calls this module — they do not depend on each other, only on the shared session/ file layout. """ from __future__ import annotations import json import re import sys from dataclasses import dataclass, field from datetime import date, datetime from pathlib import Path from string import Template from typing import Iterator, Optional # ── Paths ────────────────────────────────────────────────────────────────── BASE_DIR = Path(__file__).resolve().parent.parent SESSION_DIR = BASE_DIR / 'session' CONFIG_PATH = SESSION_DIR / 'config.json' CHAR_PATH = SESSION_DIR / 'character.md' WORLD_PATH = SESSION_DIR / 'world.md' BOOK_PATH = SESSION_DIR / 'book.md' JOURNAL_PATH = SESSION_DIR / 'journal.md' AMBIENCE_PATH = SESSION_DIR / 'ambience.md' LOG_DIR = SESSION_DIR / 'log' LLM_LOG_PATH = SESSION_DIR / 'llm.log' AMBIENCE_OPTIONS_PATH = SESSION_DIR / "ambience_options.md" AUDIO_DIR = SESSION_DIR / "audio" TODAY = date.today().isoformat() # ── Structured output ────────────────────────────────────────────────────── @dataclass class GenerationResult: """Legacy result — kept for backward compat with CLI main().""" narrative: str choices: list[str] = field(default_factory=list) log_entry: Optional[str] = None ambience: Optional[str] = None character_updates: Optional[str] = None world_updates: Optional[str] = None journal_add: list[str] = field(default_factory=list) journal_done: list[str] = field(default_factory=list) error: Optional[str] = None @dataclass class TurnResult: """Output of a complete turn via finalize_turn tool.""" book_log: str = "" user_prompt: str = "" ambience: Optional[str] = None log_entry: Optional[str] = None error: Optional[str] = None debug_info: str = "" # ── DM System Prompt Template ────────────────────────────────────────────── SYSTEM_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion. ## Rules - **Odds**: 1d6, 4+ favourable, 3- trouble. - **Traits**: 3d6, roll UNDER trait. - **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour. - **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed. - **Modifiers**: Favourable +1, Risky -1, Desperate -2. ## Tools (action only) Wrap in ```tool to perform an action: ``` {"tool": "roll", "args": {"dice": "1d6"}} ``` - **roll** — dice, modifier - **player_roll** — dice, reason - **character_update** — content: "full sheet" (if HP/cash/gear/stats change) - **world_update** — content: "full world" (if NPCs/locations/threads change) - **journal_update** — add: [...], done: [...] You have the full state above — no need to look anything up. Just write the story and use tools when the player's action changes something. ## State ### Character $character ### World $world ### Log $log ### Story $story""") PROSE_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion. ## Rules - **Odds**: 1d6, 4+ favourable, 3- trouble. - **Traits**: 3d6, roll UNDER trait. - **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour. - **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed. - **Modifiers**: Favourable +1, Risky -1, Desperate -2. A die is cast at the start of each turn — incorporate it into your narrative. ## State ### Character $character ### World $world ### Log $log ### Story $story""") # ── Game Engine ──────────────────────────────────────────────────────────── class GameEngine: """Owns the LLM interaction and game state persistence.""" def __init__(self, session_dir: str | Path = SESSION_DIR): self.session_dir = Path(session_dir) self.config: dict = {} self._load_config() # ── Config ────────────────────────────────────────────────────────── def _load_config(self) -> None: if not CONFIG_PATH.exists(): print( "No session/config.json found. Creating default.\n" "Edit the model field (e.g. 'ollama/llama3.1', 'openai/gpt-4', " "'anthropic/claude-sonnet-4-20250514') and set api_key if needed.", file=sys.stderr, ) self.config = { "llm": { "model": "ollama/llama3.1", "api_key": None, "api_base": None, "temperature": 0.8, "max_tokens": 300, } } self._save_config() else: raw = CONFIG_PATH.read_text() self.config = json.loads(raw) # Ensure api_key is None not empty string llm = self.config.get("llm", {}) if not llm.get("api_key"): llm["api_key"] = None def _save_config(self) -> None: CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) CONFIG_PATH.write_text(json.dumps(self.config, indent=2) + "\n") @property def model(self) -> str: return self.config.get("llm", {}).get("model", "ollama/llama3.1") @property def api_key(self) -> str | None: return self.config.get("llm", {}).get("api_key") @property def api_base(self) -> str | None: return self.config.get("llm", {}).get("api_base") @property def temperature(self) -> float: return self.config.get("llm", {}).get("temperature", 0.8) @property def max_tokens(self) -> int: return self.config.get("llm", {}).get("max_tokens", 512) def _set_llm_env(self) -> None: """Set provider-specific env vars for litellm.""" prefix = self.model.split("/")[0].upper() import os key = self.api_key or "sk-placeholder" os.environ[f"{prefix}_API_KEY"] = key if self.api_base: os.environ[f"{prefix}_API_BASE"] = self.api_base # ── Context Assembly ──────────────────────────────────────────────── def _read_file(self, path: Path) -> str: return path.read_text().strip() if path.exists() else "" def _read_recent_log(self, max_entries: int = 5) -> str: """Read the latest log file and return the last N entries.""" log_path = LOG_DIR / f"{TODAY}.md" if not log_path.exists(): from datetime import timedelta yesterday = (date.today() - timedelta(days=1)).isoformat() log_path = LOG_DIR / f"{yesterday}.md" if not log_path.exists(): return "*No recent events.*" lines = log_path.read_text().splitlines() entries = [l for l in lines if l.strip().startswith("- ")] return "\n".join(entries[-max_entries:]) or "*No recent events.*" def _read_recent_book(self, max_turns: int = 1) -> str: """Return the last N turns from the book as context.""" text = self._read_file(BOOK_PATH) if not text: return "*No prior story.*" turns = text.split("\n## ") recent = turns[-max_turns:] return "\n## ".join(recent) if len(turns) > 1 else recent[0] @staticmethod def _truncate_world(text: str) -> str: """Extract key world context: NPCs, factions, active threads, rumours.""" if not text or text == "*No world state.*": return text sections = re.split(r"\n(?=## |### )", text) parts = [] for sec in sections: header = sec.split("\n")[0].strip() if sec else "" if "Active Threads" in header: parts.append(sec) elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header: parts.append(sec) result = "\n\n".join(parts) return result or text[:1500] + "\n_(world truncated)_" def _get_valid_ambiences(self) -> set[str]: """Parse ambience_options.md and return set of valid ambience names with associated audio files.""" valid = {"silence"} # silence always valid (stops music) if not AMBIENCE_OPTIONS_PATH.exists(): return valid in_table = False for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines(): s = line.strip() if not s.startswith("|") or not s.endswith("|"): in_table = False continue if in_table and all(c in "-:| " for c in s): continue parts = [p.strip() for p in s.split("|") if p.strip()] if not parts: continue if not in_table: in_table = True continue name = parts[0].lower() files_str = parts[1] if len(parts) > 1 else "" files = [f.strip() for f in files_str.split(",")] # Only add if at least one file exists (or is listed) has_files = any((AUDIO_DIR / f).exists() or f for f in files) if has_files: valid.add(name) return valid def build_system_prompt(self) -> str: """Assemble the system prompt with current game state.""" char = self._read_file(CHAR_PATH) or "*No character sheet.*" world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*" log = self._read_recent_log() story = self._read_recent_book() return SYSTEM_PROMPT.substitute( character=char, world=world, log=log, story=story ) def build_user_message( self, player_action: str | None = None, last_prompt: str | None = None, **kwargs: str | None, ) -> str: """Build the user message for this turn's LLM call.""" if kwargs: raise TypeError( f"build_user_message() got unexpected keyword arguments: " f"{set(kwargs)}. Did you mean 'last_prompt' instead of one of these?" ) parts = [] if last_prompt: parts.append(f"## Situation\n{last_prompt}") if player_action: parts.append(f"## Player's Request\n{player_action}") has_existing_story = bool( self._read_file(BOOK_PATH).strip() ) if not last_prompt else True if not player_action and not last_prompt: if has_existing_story: raise RuntimeError(f"User action is required for every turn.") else: parts.append( "## Instructions\n" "This is a new story. Welcome the player and guide them through the game setup." ) else: parts.append( "## Instructions\n" "Advance the story based on the player's request. " "All state is shown above — write the outcome directly." ) return "\n\n".join(parts) # ── LLM Call ──────────────────────────────────────────────────────── def generate( self, player_action: str | None = None, last_narrative: str | None = None, ) -> GenerationResult: """ Synchronous generation. Calls the LLM, parses the response, and returns a GenerationResult. The TUI calls this from a worker thread — see run.py. """ system = self.build_system_prompt() user = self.build_user_message( player_action=player_action, last_prompt=last_narrative ) messages = [ {"role": "system", "content": system}, {"role": "user", "content": user}, ] try: import litellm except ImportError: return GenerationResult( narrative="", error=( "litellm is not installed. Run: pip install litellm" ), ) # Set API key / base if provided self._set_llm_env() try: response = litellm.completion( model=self.model, messages=messages, temperature=self.temperature, stream=False, timeout=60, ) text = response.choices[0].message.content or "" except Exception as e: return GenerationResult( narrative="", error=f"LLM call failed: {e}", ) return self.parse_response(text) def generate_stream( self, player_action: str | None = None, last_narrative: str | None = None, ) -> Iterator[str]: """ Streaming generator. Yields text chunks as they arrive from the LLM. On completion, the final yield is the FULL text (for parsing). """ system = self.build_system_prompt() user = self.build_user_message( player_action=player_action, last_prompt=last_narrative ) messages = [ {"role": "system", "content": system}, {"role": "user", "content": user}, ] try: import litellm except ImportError: yield json.dumps({ "error": "litellm is not installed. Run: pip install litellm" }) return self._set_llm_env() try: response = litellm.completion( model=self.model, messages=messages, temperature=self.temperature, stream=True, timeout=60, ) full_text = "" for chunk in response: delta = chunk.choices[0].delta.content or "" if delta: full_text += delta yield full_text # partial narrative for streaming display # Final yield is the completed text yield full_text except Exception as e: yield json.dumps({"error": f"LLM call failed: {e}"}) # ── Tool Infrastructure ──────────────────────────────────────────── TOOL_REGISTRY: dict[str, dict] = { "roll": {"description": "Roll dice.", "args": {"dice": "1d6", "modifier": "+1"}}, "player_roll": {"description": "Ask player to roll.", "args": {"dice": "1d6", "reason": "why"}}, "modify_traits": {"description": "Change STR/DEX/WIL.", "args": {"str": "optional", "dex": "optional", "wil": "optional"}}, "modify_vitals": {"description": "Change HP, cash, weapon, armour.", "args": {"current_hp": "optional", "max_hp": "optional", "cash": "optional", "weapon": "optional", "armour": "optional"}}, "add_to_inventory": {"description": "Add item to gear.", "args": {"item": "item name and stats"}}, "remove_from_inventory": {"description": "Remove item from gear.", "args": {"item": "exact item text"}}, "replace_gear": {"description": "Replace gear by exact match.", "args": {"before": "exact text", "after": "new text"}}, "add_note": {"description": "Add note to sheet.", "args": {"note": "note content"}}, "replace_note": {"description": "Replace note by exact match.", "args": {"before": "exact text", "after": "new text"}}, "world_update": {"description": "Replace world state.", "args": {"content": "full world markdown"}}, "journal_update": {"description": "Update TODO/DONE.", "args": {"add": "[...]", "done": "[...]"}}, "finalize_turn": {"description": "End turn.", "args": {"user_prompt": "question for player", "ambience": "soundscape name"}}, } def _tool_think(self, args: dict) -> str: """Think tool — content is displayed via dm_status in the status bar.""" return "" def _tool_read_file(self, args: dict) -> str: filename = (args or {}).get("file", "") paths = { "character": CHAR_PATH, "world": WORLD_PATH, "book": BOOK_PATH, "log": LOG_DIR / f"{TODAY}.md", "journal": JOURNAL_PATH, } path = paths.get(filename) if not path: return f"Unknown file: {filename}. Choose from: {', '.join(paths)}" return self._read_file(path) or f"*{filename} is empty.*" def _tool_roll(self, args: dict) -> str: import random dice_str = (args or {}).get("dice", "1d6") modifier_str = (args or {}).get("modifier", "0") try: count, sides = dice_str.lower().split("d") count = int(count) if count else 1 sides = int(sides) except (ValueError, TypeError): return f"Invalid dice: {dice_str}. Use format like '2d6'." mod = 0 if modifier_str: try: mod = int(modifier_str) except ValueError: pass rolls = [random.randint(1, sides) for _ in range(count)] total = sum(rolls) + mod mod_str = f" {'+' if mod >= 0 else ''}{mod}" if mod != 0 else "" return f"Roll: {dice_str}{mod_str} → [{', '.join(str(r) for r in rolls)}] = {total}" def _patch_character(self, pattern: str, repl: str, count: int = 1, flags: int = 0) -> str: """Apply a regex replacement to character.md. Returns error msg or empty string.""" text = CHAR_PATH.read_text() new, n = re.subn(pattern, repl, text, count=count, flags=flags) if n == 0: return f"**Error:** pattern not found:\n{pattern}" CHAR_PATH.write_text(new) return "" def _tool_modify_traits(self, args: dict) -> str: errors = [] for stat in ("str", "dex", "wil"): val = args.get(stat) if val is not None: err = self._patch_character( rf"^(- \*\*{stat.upper()}:\*\*\s*)\d+", rf"\g<1>{val}", count=1, flags=re.MULTILINE ) if err: errors.append(err) return "; ".join(errors) if errors else "Traits updated." def _tool_modify_vitals(self, args: dict) -> str: errors = [] for field, label in [("current_hp", "Current Health"), ("max_hp", "Max Health"), ("cash", "Cash"), ("weapon", "Weapon"), ("armour", "Armour")]: val = args.get(field) if val is not None: err = self._patch_character( rf"^(- \*\*{label}:\*\*\s*).*", rf"\g<1>{val}", count=1, flags=re.MULTILINE ) if err: errors.append(err) return "; ".join(errors) if errors else "Vitals updated." def _tool_add_to_inventory(self, args: dict) -> str: item = (args or {}).get("item", "") if not item: return "**Error:** `item` is required." text = CHAR_PATH.read_text() if item in text: return f"Item already in inventory: {item}" # Insert after last gear item or after "## Gear" header gear_section = re.search(r"^## Gear\n", text, re.MULTILINE) if gear_section: insert_at = gear_section.end() text = text[:insert_at] + f"- {item}\n" + text[insert_at:] else: text += f"\n## Gear\n- {item}\n" CHAR_PATH.write_text(text) return f"Added to inventory: {item}" def _tool_remove_from_inventory(self, args: dict) -> str: item = (args or {}).get("item", "") if not item: return "**Error:** `item` is required." err = self._patch_character(rf"^- {re.escape(item)}\n?", "", count=1, flags=re.MULTILINE) if err: return f"**Error:** item not found: {item}" return f"Removed from inventory: {item}" def _tool_replace_gear(self, args: dict) -> str: before = (args or {}).get("before", "") after = (args or {}).get("after", "") if not before or not after: return "**Error:** `before` and `after` are required." err = self._patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE) if err: return f"**Error:** gear not found: {before}" return f"Gear replaced: {before} → {after}" def _tool_add_note(self, args: dict) -> str: note = (args or {}).get("note", "") if not note: return "**Error:** `note` is required." text = CHAR_PATH.read_text() notes_section = re.search(r"^## Notes & Scribbles\n", text, re.MULTILINE) if notes_section: text = text[:notes_section.end()] + f"- {note}\n" + text[notes_section.end():] else: text += f"\n## Notes & Scribbles\n- {note}\n" CHAR_PATH.write_text(text) return f"Note added: {note}" def _tool_replace_note(self, args: dict) -> str: before = (args or {}).get("before", "") after = (args or {}).get("after", "") if not before or not after: return "**Error:** `before` and `after` are required." err = self._patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE) if err: return f"**Error:** note not found: {before}" return f"Note replaced." def _tool_world_update(self, args: dict) -> str: content = (args or {}).get("content", "") if not content: return "**Error:** `content` is required." if not self._validate_update_size("world", content, WORLD_PATH): return "**Error:** Update rejected — content is too short (likely a partial paste)." WORLD_PATH.write_text(content.strip() + "\n") return "World state updated." def _tool_journal_update(self, args: dict) -> str: add = (args or {}).get("add", []) done = (args or {}).get("done", []) if isinstance(add, str): add = [add] if isinstance(done, str): done = [done] if not add and not done: return "**Error:** Provide at least one of `add` or `done`." self._update_journal(add=add, done=done) return "Journal updated." @staticmethod def _describe_tool_action(tool_name: str, args: dict) -> str: """Return a user-facing status message for a tool call. Prefer the LLM-provided dm_status — otherwise fall back to a generic description.""" dm_status = (args or {}).get("dm_status") if dm_status: return f"DM is {dm_status}..." read_descriptions = { "character": "reading the character sheet", "world": "consulting the world map", "book": "reviewing the story so far", "log": "checking the session log", "journal": "scanning the journal", } if tool_name == "read_file": file = (args or {}).get("file", "") desc = read_descriptions.get(file, f"reading {file}") elif tool_name in ("character_get", "world_get", "journal_get"): file = tool_name.replace("_get", "") desc = read_descriptions.get(file, f"reading {file}") elif tool_name in ("character_update", "world_update"): desc = "updating the records" elif tool_name == "journal_update": desc = "updating the journal" elif tool_name == "roll": dice = (args or {}).get("dice", "1d6") mod = (args or {}).get("modifier") desc = f"rolling {dice}" if mod: desc += f" {mod}" elif tool_name == "player_roll": dice = (args or {}).get("dice", "1d6") desc = f"asking you to roll {dice}" elif tool_name == "modify_traits": desc = "updating traits" elif tool_name == "modify_vitals": desc = "updating vitals" elif tool_name == "add_to_inventory": desc = "adding item to inventory" elif tool_name == "remove_from_inventory": desc = "removing item from inventory" elif tool_name == "replace_gear": desc = "replacing gear" elif tool_name == "add_note": desc = "adding note" elif tool_name == "replace_note": desc = "replacing note" else: desc = f"using {tool_name}" return f"DM is {desc}..." def _execute_tool(self, tool_name: str, args: dict) -> str: fn_map = { "roll": self._tool_roll, "modify_traits": self._tool_modify_traits, "modify_vitals": self._tool_modify_vitals, "add_to_inventory": self._tool_add_to_inventory, "remove_from_inventory": self._tool_remove_from_inventory, "replace_gear": self._tool_replace_gear, "add_note": self._tool_add_note, "replace_note": self._tool_replace_note, "world_update": self._tool_world_update, "journal_update": self._tool_journal_update, } fn = fn_map.get(tool_name) if not fn: return f"Unknown tool: {tool_name}" try: return fn(args) except Exception as e: return f"Tool error ({tool_name}): {e}" @staticmethod def _extract_thoughts(text: str) -> list[str]: pattern = r"```thought\s*\n?(.*?)```" return re.findall(pattern, text, re.DOTALL) @staticmethod def _extract_tool_calls(text: str, *, round_num: int = 0, on_debug: callable = None) -> list[dict]: """Extract tool calls from ```tool and ```json blocks. Uses json.JSONDecoder.raw_decode for strict parsing; falls back to heuristics if the LLM produces unescaped newlines in string values. """ calls = [] seen = set() def _try_parse(raw: str) -> dict | None: try: obj = json.loads(raw) if isinstance(obj, dict) and "tool" in obj: return obj except json.JSONDecodeError: pass return None for m in re.finditer(r"```(?:tool|json|finalize_turn)\s*\n?", text): fence_type = m.group(0).strip("``` \n\r") # 1) Strict: raw_decode from where the JSON should start obj = None try: decoder = json.JSONDecoder() obj, end = decoder.raw_decode(text, m.end()) except (json.JSONDecodeError, ValueError, StopIteration): pass if obj is None: # 2) Fallback: find closing backticks and repair unescaped newlines in strings close = text.find("```", m.end()) if close > 0: raw = text[m.end():close].strip() def _escape_in_strings(s: str) -> str: return re.sub(r'"(?:[^"\\]|\\.)*"', lambda x: x.group(0).replace("\n", "\\n"), s, flags=re.DOTALL) repaired = _escape_in_strings(raw) obj = _try_parse(repaired) if obj is not None and isinstance(obj, dict): # Normalize: fence type "finalize_turn" means the JSON is the args directly if fence_type == "finalize_turn": obj = {"tool": "finalize_turn", "args": obj} # If JSON has a "tool" key, keep as-is if "tool" not in obj: obj = None if obj is not None: key = (obj["tool"], json.dumps(obj.get("args", {}), sort_keys=True)) if key not in seen: seen.add(key) calls.append(obj) elif on_debug: preview = text[m.end():m.end() + 120].replace("\n", "\\n") on_debug("parse_error", {"round": round_num, "content": preview}) return calls @staticmethod def _extract_final_json(text: str) -> dict | None: pattern = r"```json\s*\n?(.*?)```" matches = re.findall(pattern, text, re.DOTALL) if not matches: return None try: return json.loads(matches[-1].strip()) except json.JSONDecodeError: return None def _call_llm(self, messages: list[dict], *, label: str = "", max_tokens: int | None = None) -> str | None: """Make a single LLM call. Returns content text or None on error.""" try: import litellm except ImportError: return None try: response = litellm.completion( model=self.model, messages=messages, temperature=self.temperature, stream=False, timeout=60, max_tokens=max_tokens or self.max_tokens, ) text = response.choices[0].message.content or "" self._append_llm_log(f"\n--- {label} ---\n{text}") return text except Exception as e: self._append_llm_log(f"\n--- LLM ERROR ({label}) ---\n{e}") return None def generate_with_tools( self, player_action: str | None = None, last_prompt: str | None = None, on_thought: callable = None, on_action: callable = None, on_player_roll: callable = None, on_debug: callable = None, ) -> TurnResult: """ Three-phase generation: 1. **Prose** — LLM writes the full book_log from context + player action. 2. **Summarize** — LLM condenses the book_log into one log line. 3. **Extract** — LLM reads the book_log and outputs tool calls for state changes. """ self._set_llm_env() from datetime import datetime self._append_llm_log(f"\n{'='*60}") self._append_llm_log(f"=== Turn — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===") self._append_llm_log(f"{'='*60}") if player_action: self._append_llm_log(f"Player: {player_action}") elif last_prompt: self._append_llm_log(f"Resume from: {last_prompt[:120]}") # ── Phase 1: Prose ──────────────────────────────────────────────── import random die_roll = random.randint(1, 6) self._append_llm_log(f"Dice: {die_roll} (1d6)") if on_action: on_action(f"Phase 1/3: writing story (dice={die_roll})") if on_debug: on_debug("phase", {"phase": 1, "name": "prose", "status": "start", "dice": die_roll}) book_log = None for attempt in range(3): system = PROSE_PROMPT.substitute( character=self._read_file(CHAR_PATH) or "*No character sheet.*", world=self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*", log=self._read_recent_log(), story=self._read_recent_book(), ) user = self.build_user_message( player_action=player_action, last_prompt=last_prompt, ) user += f"\n\n*A die is cast: **{die_roll}** (1d6).*" text = self._call_llm([ {"role": "system", "content": system}, {"role": "user", "content": user}, ], label=f"Prose attempt {attempt + 1}", max_tokens=1024) if not text or not text.strip(): if on_debug: on_debug("phase", {"phase": 1, "status": "empty", "attempt": attempt + 1}) continue book_log = text.strip() if on_debug: preview = book_log[:150].replace("\n", "\\n") on_debug("phase", {"phase": 1, "status": "done", "chars": len(book_log), "preview": preview}) break if not book_log: return TurnResult(error="Prose generation failed after 3 attempts") # ── Phase 2: Summarize ──────────────────────────────────────────── if on_action: on_action("Phase 2/3: summarizing story") if on_debug: on_debug("phase", {"phase": 2, "name": "summarize", "status": "start"}) log_context = self._read_recent_log() log_entry = None for attempt in range(2): text = self._call_llm([ {"role": "user", "content": f"Given the session log so far, summarize the new story in one line. " f"Focus on who was involved (character and NPC names):\n\n" f"## Session Log\n{log_context}\n\n" f"## New Story\n{book_log}"} ], label=f"Summarize attempt {attempt + 1}") if text and text.strip(): log_entry = text.strip().split("\n")[0][:120] if on_debug: on_debug("phase", {"phase": 2, "status": "done", "summary": log_entry}) break if not log_entry: log_entry = book_log.split("\n")[0][:120] if on_debug: on_debug("phase", {"phase": 2, "status": "fallback", "summary": log_entry}) # ── Phase 3: Extract state changes ──────────────────────────────── if on_action: on_action("Phase 3/3: extracting state changes") if on_debug: on_debug("phase", {"phase": 3, "name": "extract", "status": "start"}) user_prompt = self._auto_prompt(book_log) ambience = None debug_info = "" current_char = self._read_file(CHAR_PATH) or "*No character.*" current_world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world.*" for attempt in range(3): text = self._call_llm([ {"role": "user", "content": f"Read the story and compare with current state. Output tool calls for any changes:\n\n" f"## Current Character\n{current_char}\n\n" f"## Current World\n{current_world}\n\n" f"## Story\n{book_log}\n\n" f"Output ```tool blocks for changes only. Examples:\n\n" f"```tool\n{{\"tool\": \"modify_vitals\", \"args\": {{\"current_hp\": 5, \"cash\": 45}}}}\n```\n" f"```tool\n{{\"tool\": \"modify_traits\", \"args\": {{\"dex\": 15}}}}\n```\n" f"```tool\n{{\"tool\": \"add_to_inventory\", \"args\": {{\"item\": \"Silver key\"}}}}\n```\n" f"```tool\n{{\"tool\": \"remove_from_inventory\", \"args\": {{\"item\": \"Torches (10)\"}}}}\n```\n" f"```tool\n{{\"tool\": \"replace_gear\", \"args\": {{\"before\": \"Mace (1d6+1)\", \"after\": \"Mace (1d6+2, sharpened)\"}}}}\n```\n" f"```tool\n{{\"tool\": \"add_note\", \"args\": {{\"note\": \"Found a hidden passage under the temple\"}}}}\n```\n" f"```tool\n{{\"tool\": \"replace_note\", \"args\": {{\"before\": \"Old note text\", \"after\": \"New note text\"}}}}\n```\n" f"```tool\n{{\"tool\": \"world_update\", \"args\": {{\"content\": \"# The World\\n\\n...full new world state...\"}}}}\n```\n" f"```tool\n{{\"tool\": \"journal_update\", \"args\": {{\"add\": [\"Investigate the mine\"], \"done\": [\"Defeat the demon\"]}}}}\n```\n" f"```tool\n{{\"tool\": \"finalize_turn\", \"args\": {{\"user_prompt\": \"What do you do?\", \"ambience\": \"dungeon\"}}}}\n```\n\n" f"Only output tools for things that actually changed. Omit unchanged fields."} ], label=f"Extract attempt {attempt + 1}") if not text or not text.strip(): if on_debug: on_debug("phase", {"phase": 3, "status": "empty", "attempt": attempt + 1}) continue tool_calls = self._extract_tool_calls( text, round_num=attempt + 1, on_debug=on_debug ) if on_debug and tool_calls: names = [tc.get("tool", "?") for tc in tool_calls if tc.get("tool") != "finalize_turn"] fin = any(tc.get("tool") == "finalize_turn" for tc in tool_calls) on_debug("phase", {"phase": 3, "status": "tools_found", "tools": names, "has_finalize": fin}) errors = [] for tc in tool_calls: name = tc.get("tool", "?") args = tc.get("args", {}) if name == "finalize_turn": if args.get("user_prompt"): user_prompt = args["user_prompt"] if args.get("ambience"): ambience = args["ambience"] continue if on_action: on_action(f"State: {self._describe_tool_action(name, args)}") if on_debug: on_debug("tool_call", {"round": attempt + 1, "tool": name, "args": args}) if name == "player_roll" and on_player_roll: dice = args.get("dice", "1d6") reason = args.get("reason", "a check") roll_val = on_player_roll(dice, reason) result = f"Player rolled {dice} for '{reason}': {roll_val}" else: result = self._execute_tool(name, args) if result.startswith("**Error:") or result.startswith("Tool error") or result.startswith("Unknown"): errors.append(f"{name}: {result}") if on_debug: on_debug("tool_result", {"round": attempt + 1, "tool": name, "result": result}) if not errors: if on_debug: on_debug("phase", {"phase": 3, "status": "done", "applied": len([tc for tc in tool_calls if tc.get("tool") != "finalize_turn"])}) break debug_info = "; ".join(errors) if on_debug: on_debug("phase", {"phase": 3, "status": "errors", "errors": errors, "attempt": attempt + 1}) if on_action: on_action("Turn complete") if on_debug: on_debug("phase_done", { "book_log_chars": len(book_log), "log_entry": log_entry, "user_prompt": user_prompt, "ambience": ambience, "extract_errors": debug_info or None, }) self._append_llm_log( f"\n--- FINAL ---\n" f"book_log: {book_log[:200]}\n" f"log_entry: {log_entry}\n" f"user_prompt: {user_prompt}\n" f"ambience: {ambience}\n" ) return TurnResult( book_log=book_log, log_entry=log_entry, user_prompt=user_prompt, ambience=ambience, debug_info=debug_info, ) @staticmethod def _strip_tool_blocks(text: str) -> str: """Remove ```tool, ```json, finalize_turn blocks from narrative text.""" return re.sub( r'```(?:tool|json|finalize_turn)\s*\n?.*?```', '', text, flags=re.DOTALL, ).strip() @staticmethod def _auto_prompt(book_log: str) -> str: """Fallback player prompt.""" return "**What do you do?**" # ── Response Parsing ──────────────────────────────────────────────── @staticmethod def parse_response(text: str) -> GenerationResult: """ Parse a full LLM response into a GenerationResult. Extracts the JSON block and splits narrative from it. """ # Check for error JSON if text.startswith('{"error":'): try: err = json.loads(text).get("error", "Unknown error") except json.JSONDecodeError: err = "Unknown error" return GenerationResult(narrative="", error=err) # Try to find a ```json ... ``` block json_pattern = r"```json\s*\n?(.*?)\n?```" matches = re.findall(json_pattern, text, re.DOTALL) narrative = text data = {} if matches: json_str = matches[-1].strip() # Remove the JSON block from narrative narrative = text[: text.rfind("```json")] # Also strip any stray "book_log:" lines that may appear before the JSON block narrative_lines = [] for line in narrative.splitlines(): if not line.lstrip().startswith('book_log:'): narrative_lines.append(line) narrative = "\n".join(narrative_lines).strip() try: data = json.loads(json_str) except json.JSONDecodeError: pass else: # Fallback: maybe the entire response is JSON (no fence) text_stripped = text.strip() if text_stripped.startswith("{") and text_stripped.endswith("}"): try: data = json.loads(text_stripped) narrative = data.get("narrative", "") except json.JSONDecodeError: pass return GenerationResult( narrative=narrative or text, choices=data.get("choices", []), log_entry=data.get("log_entry"), ambience=data.get("ambience"), character_updates=data.get("character_updates"), world_updates=data.get("world_updates"), journal_add=data.get("journal_add", []), journal_done=data.get("journal_done", []), ) # ── State Persistence ─────────────────────────────────────────────── def _validate_update_size(self, name: str, new_content: str, path: Path) -> bool: """Reject updates that are more than 30% shorter than the existing file — likely the LLM pasted a fragment instead of the full state.""" if not path.exists(): return True old = path.read_text().strip() if not old: return True ratio = len(new_content) / len(old) if ratio < 0.7: import sys print( f"WARNING: {name} update rejected ({ratio:.0%} of original size " f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.", file=sys.stderr, ) return False return True def apply_state(self, result: TurnResult) -> None: """Write state changes from a TurnResult to disk.""" if result.ambience: AMBIENCE_PATH.write_text(result.ambience.strip().lower() + "\n") def archive_turn(self, narrative: str) -> None: """Append the narrative as a new turn in book.md.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") heading = f"\n\n## Turn — {timestamp}\n\n" BOOK_PATH.parent.mkdir(parents=True, exist_ok=True) with open(BOOK_PATH, "a") as f: f.write(heading + narrative.strip() + "\n") def append_log(self, entry: str) -> None: """Append a log entry to today's log file.""" LOG_DIR.mkdir(parents=True, exist_ok=True) log_path = LOG_DIR / f"{TODAY}.md" if not log_path.exists(): log_path.write_text(f"# Session Log — {TODAY}\n\n") with open(log_path, "a") as f: f.write(entry.strip() + "\n") def _append_llm_log(self, text: str) -> None: """Append raw LLM activity to llm.log for debugging.""" LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True) with open(LLM_LOG_PATH, "a") as f: f.write(text + "\n") def _update_journal( self, add: list[str] | None = None, done: list[str] | None = None ) -> None: """Add or complete TODO items in journal.md.""" if not JOURNAL_PATH.exists(): JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n") lines = JOURNAL_PATH.read_text().splitlines() # Parse into sections: everything before TODO, TODO items, between, DONE items, after todo_items: list[str] = [] done_items: list[str] = [] before_todo: list[str] = [] between: list[str] = [] after_done: list[str] = [] section = "before_todo" for line in lines: stripped = line.strip() if stripped == "## TODO": section = "todo" before_todo.append(line) elif stripped == "## DONE": section = "done" between.append(line) elif section == "before_todo": before_todo.append(line) elif section == "todo": if stripped.startswith("- "): todo_items.append(stripped[2:]) else: between.append(line) elif section == "done": if stripped.startswith("- "): done_items.append(stripped[2:]) else: after_done.append(line) # Apply changes if done: done_set = set(done) todo_items = [i for i in todo_items if i not in done_set] new_done = [i for i in done if i not in done_items] done_items.extend(new_done) if add: todo_set = set(todo_items) new_todo = [i for i in add if i not in todo_set] # Insert new items at the top of TODO todo_items = new_todo + todo_items # Reconstruct out = list(before_todo) for item in todo_items: out.append(f"- {item}") out.extend(between) for item in done_items: out.append(f"- {item}") out.extend(after_done) # Clean up: collapse multiple blank lines cleaned = [] prev_blank = False for line in out: is_blank = line.strip() == "" if is_blank and prev_blank: continue cleaned.append(line) prev_blank = is_blank # Ensure trailing newline JOURNAL_PATH.write_text("\n".join(cleaned) + "\n") # ── CLI entry point (for testing) ───────────────────────────────────────── def main(): """Generate a turn from the command line (debug/testing).""" import argparse parser = argparse.ArgumentParser(description="The Chaos Game Engine (CLI)") parser.add_argument("--action", "-a", help="Player action text") parser.add_argument("--last", "-l", help="Last narrative text") args = parser.parse_args() engine = GameEngine() result = engine.generate( player_action=args.action, last_narrative=args.last, ) if result.error: print(f"ERROR: {result.error}", file=sys.stderr) sys.exit(1) print(result.narrative) if result.choices: print("\n--- Choices ---") for c in result.choices: print(f" [{c}]") if result.log_entry: print(f"\n[Log] {result.log_entry}") if result.ambience: print(f"[Ambience] {result.ambience}") if __name__ == "__main__": main()