From 64b6416929d4f72e14088c7698a8d1b1104b45d6 Mon Sep 17 00:00:00 2001 From: Dejvino Date: Tue, 30 Jun 2026 18:47:44 +0200 Subject: [PATCH] Code split to allow small LLM to work on it --- AGENTS.md | 8 + pre-commit.sh | 9 + tools/engine.py | 1071 +++++----------------------------------- tools/llm.py | 63 +++ tools/models.py | 35 ++ tools/paths.py | 27 + tools/prompts.py | 86 ++++ tools/run.py | 13 +- tools/state.py | 210 ++++++++ tools/test_imports.py | 90 ++-- tools/test_runtime.py | 78 +-- tools/tools_handler.py | 463 +++++++++++++++++ 12 files changed, 1129 insertions(+), 1024 deletions(-) create mode 100755 pre-commit.sh create mode 100644 tools/llm.py create mode 100644 tools/models.py create mode 100644 tools/paths.py create mode 100644 tools/prompts.py create mode 100644 tools/state.py create mode 100644 tools/tools_handler.py diff --git a/AGENTS.md b/AGENTS.md index 5d2fe86..7592979 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -137,6 +137,14 @@ python3 tools/engine.py --action "I head to the market" ``` ## Testing Commands +nWhen committing, also use the pre-commit validator to check for oversized Python files. + +n## Pre-commit Validation +Before committing, run the pre-commit validator script to ensure no Python file is too large. + +```bash +./pre-commit.sh +``` Always run tests before making changes. This prevents runtime errors like missing imports. diff --git a/pre-commit.sh b/pre-commit.sh new file mode 100755 index 0000000..57f151c --- /dev/null +++ b/pre-commit.sh @@ -0,0 +1,9 @@ +#!/bin/bash +ERRORS=$(python3 -c "import os; [f for f in os.listdir('./tools') if f.endswith('.py') and os.path.getsize(os.path.join('./tools', f)) > 2048]") +if [ -z "$ERRORS" ]; then + echo "OK" +else + echo "You need to refactor this:" + echo "$ERRORS" + exit 1 +fi diff --git a/tools/engine.py b/tools/engine.py index 19d9144..6902aba 100644 --- a/tools/engine.py +++ b/tools/engine.py @@ -5,6 +5,8 @@ engine.py — The Chaos Game Engine Owns the LLM interaction, prompt assembly, response parsing, and game state persistence. The TUI (run.py) calls this module — they do not depend on each other, only on the shared session/ file layout. + +Split into sub-modules: paths, models, prompts, state, tools_handler, llm. """ from __future__ import annotations @@ -12,143 +14,31 @@ from __future__ import annotations import json import re import sys -from dataclasses import dataclass, field -import random -from datetime import date, datetime +from collections import Counter +from datetime import datetime from pathlib import Path -from string import Template from typing import Iterator, Optional - -# ── Paths ────────────────────────────────────────────────────────────────── -BASE_DIR = Path(__file__).resolve().parent.parent -SESSION_DIR = BASE_DIR / 'session' -CONFIG_PATH = SESSION_DIR / 'config.json' -CHAR_PATH = SESSION_DIR / 'character.md' -WORLD_PATH = SESSION_DIR / 'world.md' -BOOK_PATH = SESSION_DIR / 'book.md' -JOURNAL_PATH = SESSION_DIR / 'journal.md' -AMBIENCE_PATH = SESSION_DIR / 'ambience.md' -LOG_DIR = SESSION_DIR / 'log' -LLM_LOG_PATH = SESSION_DIR / 'llm.log' -AMBIENCE_OPTIONS_PATH = SESSION_DIR / "ambience_options.md" -CHANGES_PATH = SESSION_DIR / "changes.md" -AUDIO_DIR = SESSION_DIR / "audio" -TODAY = date.today().isoformat() - - -# ── Structured output ────────────────────────────────────────────────────── -@dataclass -class GenerationResult: - """Legacy result — kept for backward compat with CLI main().""" - narrative: str - choices: list[str] = field(default_factory=list) - log_entry: Optional[str] = None - ambience: Optional[str] = None - character_updates: Optional[str] = None - world_updates: Optional[str] = None - journal_add: list[str] = field(default_factory=list) - journal_done: list[str] = field(default_factory=list) - error: Optional[str] = None - - -@dataclass -class TurnResult: - """Output of a complete turn via finalize_turn tool.""" - book_log: str = "" - user_prompt: str = "" - ambience: Optional[str] = None - log_entry: Optional[str] = None - error: Optional[str] = None - debug_info: str = "" - changes: list[str] = field(default_factory=list) - - -# ── DM System Prompt Template ────────────────────────────────────────────── -SYSTEM_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion. - -## Rules -- **Odds**: 1d6, 4+ favourable, 3- trouble. -- **Traits**: 3d6, roll UNDER trait. -- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour. -- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed. -- **Modifiers**: Favourable +1, Risky -1, Desperate -2. - -## Tools (action only) -Wrap in ```tool to perform an action: -``` -{"tool": "roll", "args": {"dice": "1d6"}} -``` - -- **roll** — dice, modifier -- **player_roll** — dice, reason -- **character_update** — content: "full sheet" (if HP/cash/gear/stats change) -- **world_update** — content: "full world" (if NPCs/locations/threads change) -- **journal_update** — add: [...], done: [...] - -You have the full state above — no need to look anything up. Just write the story and use tools when the player's action changes something. - -## State - -### Character -$character - -### World -$world - -### Log -$log - -### Story -$story""") - -PROSE_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion. - -## Rules -- **Odds**: 1d6, 4+ favourable, 3- trouble. -- **Traits**: 3d6, roll UNDER trait. -- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour. -- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed. -- **Modifiers**: Favourable +1, Risky -1, Desperate -2. - -A die is cast at the start of each turn — incorporate it into your narrative. - -End your response with a `### Changes` block listing what changed: - -### Changes -- Current Health: 3 -- Cash: 45 silver -- Added to inventory: Silver key -- Removed from inventory: Torches (10) -- Replaced gear: Mace (1d6+1) → Mace (1d6+2) -- Note: Found a hidden passage -- Journal done: Defeat the demon -- Journal add: Investigate the mine - -Only include lines for things that actually changed. Omit unused lines entirely. - -## State - -### Character -$character - -### World -$world - -### Log -$log - -### Story -$story""") - +from paths import ( + CHAR_PATH, WORLD_PATH, BOOK_PATH, CONFIG_PATH, LOG_DIR, +) +from models import GenerationResult, TurnResult +from prompts import SYSTEM_PROMPT, PROSE_PROMPT +import state # read_file, read_recent_log, read_recent_book, truncate_world, append_llm_log +from tools_handler import ( + execute_tool, describe_tool_action, describe_change, + parse_changes_block, extract_tool_calls, +) +from llm import set_llm_env, call_llm # ── Game Engine ──────────────────────────────────────────────────────────── class GameEngine: """Owns the LLM interaction and game state persistence.""" - def __init__(self, session_dir: str | Path = SESSION_DIR): - self.session_dir = Path(session_dir) + def __init__(self, session_dir: str | Path | None = None): + from paths import SESSION_DIR + self.session_dir = Path(session_dir) if session_dir else SESSION_DIR self.config: dict = {} self._load_config() @@ -175,7 +65,6 @@ class GameEngine: else: raw = CONFIG_PATH.read_text() self.config = json.loads(raw) - # Ensure api_key is None not empty string llm = self.config.get("llm", {}) if not llm.get("api_key"): llm["api_key"] = None @@ -208,92 +97,14 @@ class GameEngine: def timeout(self) -> int: return self.config.get("llm", {}).get("timeout", 120) - def _set_llm_env(self) -> None: - """Set provider-specific env vars for litellm.""" - prefix = self.model.split("/")[0].upper() - import os - key = self.api_key or "sk-placeholder" - os.environ[f"{prefix}_API_KEY"] = key - if self.api_base: - os.environ[f"{prefix}_API_BASE"] = self.api_base - # ── Context Assembly ──────────────────────────────────────────────── - def _read_file(self, path: Path) -> str: - return path.read_text().strip() if path.exists() else "" - - def _read_recent_log(self, max_entries: int = 5) -> str: - """Read the latest log file and return the last N entries.""" - log_path = LOG_DIR / f"{TODAY}.md" - if not log_path.exists(): - from datetime import timedelta - yesterday = (date.today() - timedelta(days=1)).isoformat() - log_path = LOG_DIR / f"{yesterday}.md" - if not log_path.exists(): - return "*No recent events.*" - lines = log_path.read_text().splitlines() - entries = [l for l in lines if l.strip().startswith("- ")] - return "\n".join(entries[-max_entries:]) or "*No recent events.*" - - def _read_recent_book(self, max_turns: int = 1) -> str: - """Return the last N turns from the book as context.""" - text = self._read_file(BOOK_PATH) - if not text: - return "*No prior story.*" - turns = text.split("\n## ") - recent = turns[-max_turns:] - return "\n## ".join(recent) if len(turns) > 1 else recent[0] - - @staticmethod - def _truncate_world(text: str) -> str: - """Extract key world context: NPCs, factions, active threads, rumours.""" - if not text or text == "*No world state.*": - return text - sections = re.split(r"\n(?=## |### )", text) - parts = [] - for sec in sections: - header = sec.split("\n")[0].strip() if sec else "" - if "Active Threads" in header: - parts.append(sec) - elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header: - parts.append(sec) - result = "\n\n".join(parts) - return result or text[:1500] + "\n_(world truncated)_" - - def _get_valid_ambiences(self) -> set[str]: - """Parse ambience_options.md and return set of valid ambience names with associated audio files.""" - valid = {"silence"} # silence always valid (stops music) - if not AMBIENCE_OPTIONS_PATH.exists(): - return valid - in_table = False - for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines(): - s = line.strip() - if not s.startswith("|") or not s.endswith("|"): - in_table = False - continue - if in_table and all(c in "-:| " for c in s): - continue - parts = [p.strip() for p in s.split("|") if p.strip()] - if not parts: - continue - if not in_table: - in_table = True - continue - name = parts[0].lower() - files_str = parts[1] if len(parts) > 1 else "" - files = [f.strip() for f in files_str.split(",")] - # Only add if at least one file exists (or is listed) - has_files = any((AUDIO_DIR / f).exists() or f for f in files) - if has_files: - valid.add(name) - return valid - def build_system_prompt(self) -> str: """Assemble the system prompt with current game state.""" - char = self._read_file(CHAR_PATH) or "*No character sheet.*" - world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*" - log = self._read_recent_log() - story = self._read_recent_book() + char = state.read_file(CHAR_PATH) or "*No character sheet.*" + world = state.truncate_world(state.read_file(WORLD_PATH) or "") or "*No world state.*" + log = state.read_recent_log() + story = state.read_recent_book() return SYSTEM_PROMPT.substitute( character=char, world=world, log=log, story=story ) @@ -318,12 +129,12 @@ class GameEngine: parts.append(f"## Player's Request\n{player_action}") has_existing_story = bool( - self._read_file(BOOK_PATH).strip() + state.read_file(BOOK_PATH).strip() ) if not last_prompt else True if not player_action and not last_prompt: if has_existing_story: - raise RuntimeError(f"User action is required for every turn.") + raise RuntimeError(f"User action is required for every turn.") else: parts.append( "## Instructions\n" @@ -347,8 +158,6 @@ class GameEngine: """ Synchronous generation. Calls the LLM, parses the response, and returns a GenerationResult. - - The TUI calls this from a worker thread — see run.py. """ system = self.build_system_prompt() user = self.build_user_message( @@ -365,13 +174,10 @@ class GameEngine: except ImportError: return GenerationResult( narrative="", - error=( - "litellm is not installed. Run: pip install litellm" - ), + error="litellm is not installed. Run: pip install litellm", ) - # Set API key / base if provided - self._set_llm_env() + set_llm_env(self.model, self.api_key, self.api_base) try: response = litellm.completion( @@ -417,7 +223,7 @@ class GameEngine: }) return - self._set_llm_env() + set_llm_env(self.model, self.api_key, self.api_base) try: response = litellm.completion( @@ -432,476 +238,12 @@ class GameEngine: delta = chunk.choices[0].delta.content or "" if delta: full_text += delta - yield full_text # partial narrative for streaming display - # Final yield is the completed text + yield full_text yield full_text except Exception as e: yield json.dumps({"error": f"LLM call failed: {e}"}) - # ── Tool Infrastructure ──────────────────────────────────────────── - - TOOL_REGISTRY: dict[str, dict] = { - "roll": {"description": "Roll dice.", "args": {"dice": "1d6", "modifier": "+1"}}, - "player_roll": {"description": "Ask player to roll.", "args": {"dice": "1d6", "reason": "why"}}, - "modify_traits": {"description": "Change STR/DEX/WIL.", "args": {"str": "optional", "dex": "optional", "wil": "optional"}}, - "modify_vitals": {"description": "Change HP, cash, weapon, armour.", "args": {"current_hp": "optional", "max_hp": "optional", "cash": "optional", "weapon": "optional", "armour": "optional"}}, - "add_to_inventory": {"description": "Add item to gear.", "args": {"item": "item name and stats"}}, - "remove_from_inventory": {"description": "Remove item from gear.", "args": {"item": "exact item text"}}, - "replace_gear": {"description": "Replace gear by exact match.", "args": {"before": "exact text", "after": "new text"}}, - "add_note": {"description": "Add note to sheet.", "args": {"note": "note content"}}, - "replace_note": {"description": "Replace note by exact match.", "args": {"before": "exact text", "after": "new text"}}, - "world_update": {"description": "Replace world state.", "args": {"content": "full world markdown"}}, - "journal_update": {"description": "Update TODO/DONE.", "args": {"add": "[...]", "done": "[...]"}}, - "finalize_turn": {"description": "End turn.", "args": {"user_prompt": "question for player", "ambience": "soundscape name"}}, - } - - def _tool_think(self, args: dict) -> str: - """Think tool — content is displayed via dm_status in the status bar.""" - return "" - - def _tool_read_file(self, args: dict) -> str: - filename = (args or {}).get("file", "") - paths = { - "character": CHAR_PATH, - "world": WORLD_PATH, - "book": BOOK_PATH, - "log": LOG_DIR / f"{TODAY}.md", - "journal": JOURNAL_PATH, - } - path = paths.get(filename) - if not path: - return f"Unknown file: {filename}. Choose from: {', '.join(paths)}" - return self._read_file(path) or f"*{filename} is empty.*" - - def _tool_roll(self, args: dict) -> str: - import random - dice_str = (args or {}).get("dice", "1d6") - modifier_str = (args or {}).get("modifier", "0") - try: - count, sides = dice_str.lower().split("d") - count = int(count) if count else 1 - sides = int(sides) - except (ValueError, TypeError): - return f"Invalid dice: {dice_str}. Use format like '2d6'." - mod = 0 - if modifier_str: - try: - mod = int(modifier_str) - except ValueError: - pass - rolls = [random.randint(1, sides) for _ in range(count)] - total = sum(rolls) + mod - mod_str = f" {'+' if mod >= 0 else ''}{mod}" if mod != 0 else "" - return f"Roll: {dice_str}{mod_str} → [{', '.join(str(r) for r in rolls)}] = {total}" - - def _patch_character(self, pattern: str, repl: str, count: int = 1, flags: int = 0) -> str: - """Apply a regex replacement to character.md. Returns error msg or empty string.""" - text = CHAR_PATH.read_text() - new, n = re.subn(pattern, repl, text, count=count, flags=flags) - if n == 0: - return f"**Error:** pattern not found:\n{pattern}" - CHAR_PATH.write_text(new) - return "" - - def _tool_modify_traits(self, args: dict) -> str: - errors = [] - for stat in ("str", "dex", "wil"): - val = args.get(stat) - if val is not None: - err = self._patch_character( - rf"^(- \*\*{stat.upper()}:\*\*\s*)\d+", rf"\g<1>{val}", count=1, flags=re.MULTILINE - ) - if err: - errors.append(err) - return "; ".join(errors) if errors else "Traits updated." - - def _tool_modify_vitals(self, args: dict) -> str: - errors = [] - for field, label in [("current_hp", "Current Health"), ("max_hp", "Max Health"), - ("cash", "Cash"), ("weapon", "Weapon"), ("armour", "Armour")]: - val = args.get(field) - if val is not None: - err = self._patch_character( - rf"^(- \*\*{label}:\*\*\s*).*", rf"\g<1>{val}", count=1, flags=re.MULTILINE - ) - if err: - errors.append(err) - return "; ".join(errors) if errors else "Vitals updated." - - def _tool_add_to_inventory(self, args: dict) -> str: - item = (args or {}).get("item", "") - if not item: - return "**Error:** `item` is required." - text = CHAR_PATH.read_text() - if item in text: - return f"Item already in inventory: {item}" - # Insert after last gear item or after "## Gear" header - gear_section = re.search(r"^## Gear\n", text, re.MULTILINE) - if gear_section: - insert_at = gear_section.end() - text = text[:insert_at] + f"- {item}\n" + text[insert_at:] - else: - text += f"\n## Gear\n- {item}\n" - CHAR_PATH.write_text(text) - return f"Added to inventory: {item}" - - def _tool_remove_from_inventory(self, args: dict) -> str: - item = (args or {}).get("item", "") - if not item: - return "**Error:** `item` is required." - err = self._patch_character(rf"^- {re.escape(item)}\n?", "", count=1, flags=re.MULTILINE) - if err: - return f"**Error:** item not found: {item}" - return f"Removed from inventory: {item}" - - def _tool_replace_gear(self, args: dict) -> str: - before = (args or {}).get("before", "") - after = (args or {}).get("after", "") - if not before or not after: - return "**Error:** `before` and `after` are required." - err = self._patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE) - if err: - return f"**Error:** gear not found: {before}" - return f"Gear replaced: {before} → {after}" - - def _tool_add_note(self, args: dict) -> str: - note = (args or {}).get("note", "") - if not note: - return "**Error:** `note` is required." - text = CHAR_PATH.read_text() - notes_section = re.search(r"^## Notes & Scribbles\n", text, re.MULTILINE) - if notes_section: - text = text[:notes_section.end()] + f"- {note}\n" + text[notes_section.end():] - else: - text += f"\n## Notes & Scribbles\n- {note}\n" - CHAR_PATH.write_text(text) - return f"Note added: {note}" - - def _tool_replace_note(self, args: dict) -> str: - before = (args or {}).get("before", "") - after = (args or {}).get("after", "") - if not before or not after: - return "**Error:** `before` and `after` are required." - err = self._patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE) - if err: - return f"**Error:** note not found: {before}" - return f"Note replaced." - - def _tool_world_update(self, args: dict) -> str: - content = (args or {}).get("content", "") - if not content: - return "**Error:** `content` is required." - if not self._validate_update_size("world", content, WORLD_PATH): - return "**Error:** Update rejected — content is too short (likely a partial paste)." - WORLD_PATH.write_text(content.strip() + "\n") - return "World state updated." - - def _tool_journal_update(self, args: dict) -> str: - add = (args or {}).get("add", []) - done = (args or {}).get("done", []) - if isinstance(add, str): - add = [add] - if isinstance(done, str): - done = [done] - if not add and not done: - return "**Error:** Provide at least one of `add` or `done`." - self._update_journal(add=add, done=done) - return "Journal updated." - - @staticmethod - def _describe_tool_action(tool_name: str, args: dict) -> str: - """Return a user-facing status message for a tool call. - Prefer the LLM-provided dm_status — otherwise fall back to a generic description.""" - dm_status = (args or {}).get("dm_status") - if dm_status: - return f"DM is {dm_status}..." - - read_descriptions = { - "character": "reading the character sheet", - "world": "consulting the world map", - "book": "reviewing the story so far", - "log": "checking the session log", - "journal": "scanning the journal", - } - if tool_name == "read_file": - file = (args or {}).get("file", "") - desc = read_descriptions.get(file, f"reading {file}") - elif tool_name in ("character_get", "world_get", "journal_get"): - file = tool_name.replace("_get", "") - desc = read_descriptions.get(file, f"reading {file}") - elif tool_name in ("character_update", "world_update"): - desc = "updating the records" - elif tool_name == "journal_update": - desc = "updating the journal" - elif tool_name == "roll": - dice = (args or {}).get("dice", "1d6") - mod = (args or {}).get("modifier") - desc = f"rolling {dice}" - if mod: - desc += f" {mod}" - elif tool_name == "player_roll": - dice = (args or {}).get("dice", "1d6") - desc = f"asking you to roll {dice}" - elif tool_name == "modify_traits": - desc = "updating traits" - elif tool_name == "modify_vitals": - desc = "updating vitals" - elif tool_name == "add_to_inventory": - desc = "adding item to inventory" - elif tool_name == "remove_from_inventory": - desc = "removing item from inventory" - elif tool_name == "replace_gear": - desc = "replacing gear" - elif tool_name == "add_note": - desc = "adding note" - elif tool_name == "replace_note": - desc = "replacing note" - else: - desc = f"using {tool_name}" - return f"DM is {desc}..." - - @staticmethod - def _describe_change(tool_name: str, args: dict) -> str: - """Build a compact human-readable change description from a tool call.""" - if tool_name == "modify_vitals": - parts = [] - for k, v in args.items(): - label = k.replace("_", " ").title() - parts.append(f"{label}: {v}") - return f"⚡ {', '.join(parts)}" if parts else "" - elif tool_name == "modify_traits": - parts = [] - for k, v in args.items(): - parts.append(f"{k.upper()}: {v}") - return f"⚡ {', '.join(parts)}" - elif tool_name == "add_to_inventory": - return f"+ {args.get('item', '?')}" - elif tool_name == "remove_from_inventory": - return f"− {args.get('item', '?')}" - elif tool_name == "replace_gear": - return f"↻ {args.get('before', '?')} → {args.get('after', '?')}" - elif tool_name == "add_note": - note = args.get("note", "?") - return f"📝 {note[:60]}{'…' if len(note) > 60 else ''}" - elif tool_name == "replace_note": - return f"📝 {args.get('before', '?')[:40]} → {args.get('after', '?')[:40]}" - elif tool_name == "world_update": - return "🌍 World updated" - elif tool_name == "journal_update": - parts = [] - for a in args.get("add", []): - parts.append(f"📋 {a}") - for d in args.get("done", []): - parts.append(f"✅ {d}") - return "; ".join(parts) if parts else "" - return "" - - @staticmethod - def _parse_changes_block(changes_block: str) -> list[dict]: - """Parse a ### Changes block into tool call dicts. - - Handles the standard format: - - Current Health: N - - Cash: N - - Max Health: N - - Added to inventory: item1, item2 - - Removed from inventory: item1, item2 - - Replaced gear: X → Y - - Note: text - - Journal add: item1, item2 - - Journal done: item1, item2 - """ - calls = [] - for raw_line in changes_block.split("\n"): - line = raw_line.strip() - if not line.startswith("- "): - continue - content = line[2:].strip() - - m = re.match(r"Current Health:\s*(\d+)", content, re.IGNORECASE) - if m: - calls.append({"tool": "modify_vitals", "args": {"current_hp": m.group(1)}}) - continue - - m = re.match(r"Cash:\s*(\d+)", content, re.IGNORECASE) - if m: - calls.append({"tool": "modify_vitals", "args": {"cash": m.group(1)}}) - continue - - m = re.match(r"Max Health:\s*(\d+)", content, re.IGNORECASE) - if m: - calls.append({"tool": "modify_vitals", "args": {"max_hp": m.group(1)}}) - continue - - m = re.match(r"Add(?:ed)? to inventory:\s*(.+)", content, re.IGNORECASE) - if m: - for item in [i.strip() for i in m.group(1).split(",") if i.strip()]: - calls.append({"tool": "add_to_inventory", "args": {"item": item}}) - continue - - m = re.match(r"Remov(?:e|ed) from inventory:\s*(.+)", content, re.IGNORECASE) - if m: - for item in [i.strip() for i in m.group(1).split(",") if i.strip()]: - calls.append({"tool": "remove_from_inventory", "args": {"item": item}}) - continue - - m = re.match(r"Replace(?:d)? gear:\s*(.+?)\s*[→➜]\s*(.+)", content, re.IGNORECASE) - if m: - calls.append({"tool": "replace_gear", "args": {"before": m.group(1).strip(), "after": m.group(2).strip()}}) - continue - - m = re.match(r"Note:\s*(.+)", content, re.IGNORECASE) - if m: - calls.append({"tool": "add_note", "args": {"note": m.group(1).strip()}}) - continue - - m = re.match(r"Journal add:\s*(.+)", content, re.IGNORECASE) - if m: - calls.append({"tool": "journal_update", "args": {"add": [i.strip() for i in m.group(1).split(",") if i.strip()]}}) - continue - - m = re.match(r"Journal done:\s*(.+)", content, re.IGNORECASE) - if m: - calls.append({"tool": "journal_update", "args": {"done": [i.strip() for i in m.group(1).split(",") if i.strip()]}}) - continue - - # Looted from X: Y — narrative fallback, extract what we can - m = re.match(r"Looted from .+:\s*(.+)", content, re.IGNORECASE) - if m: - items_text = m.group(1).strip() - calls.append({"tool": "add_note", "args": {"note": f"Looted: {items_text}"}}) - continue - - return calls - - def _execute_tool(self, tool_name: str, args: dict) -> str: - fn_map = { - "roll": self._tool_roll, - "modify_traits": self._tool_modify_traits, - "modify_vitals": self._tool_modify_vitals, - "add_to_inventory": self._tool_add_to_inventory, - "remove_from_inventory": self._tool_remove_from_inventory, - "replace_gear": self._tool_replace_gear, - "add_note": self._tool_add_note, - "replace_note": self._tool_replace_note, - "world_update": self._tool_world_update, - "journal_update": self._tool_journal_update, - } - fn = fn_map.get(tool_name) - if not fn: - return f"Unknown tool: {tool_name}" - try: - return fn(args) - except Exception as e: - import traceback - tb = traceback.format_exc() - self._append_llm_log(f"\n--- TOOL ERROR ({tool_name}) ---\n{tb}") - return f"Tool error ({tool_name}): {e}" - - @staticmethod - def _extract_thoughts(text: str) -> list[str]: - pattern = r"```thought\s*\n?(.*?)```" - return re.findall(pattern, text, re.DOTALL) - - @staticmethod - def _extract_tool_calls(text: str, *, round_num: int = 0, on_debug: callable = None) -> list[dict]: - """Extract tool calls from ```tool and ```json blocks. - - Uses json.JSONDecoder.raw_decode for strict parsing; falls back to - heuristics if the LLM produces unescaped newlines in string values. - """ - calls = [] - seen = set() - - def _try_parse(raw: str) -> dict | None: - try: - obj = json.loads(raw) - if isinstance(obj, dict) and "tool" in obj: - return obj - except json.JSONDecodeError: - pass - return None - - for m in re.finditer(r"```(?:tool|json|finalize_turn)\s*\n?", text): - fence_type = m.group(0).strip("``` \n\r") - # 1) Strict: raw_decode from where the JSON should start - obj = None - try: - decoder = json.JSONDecoder() - obj, end = decoder.raw_decode(text, m.end()) - except (json.JSONDecodeError, ValueError, StopIteration): - pass - - if obj is None: - # 2) Fallback: find closing backticks and repair unescaped newlines in strings - close = text.find("```", m.end()) - if close > 0: - raw = text[m.end():close].strip() - def _escape_in_strings(s: str) -> str: - return re.sub(r'"(?:[^"\\]|\\.)*"', lambda x: x.group(0).replace("\n", "\\n"), s, flags=re.DOTALL) - repaired = _escape_in_strings(raw) - obj = _try_parse(repaired) - - if obj is not None and isinstance(obj, dict): - # Normalize: fence type "finalize_turn" means the JSON is the args directly - if fence_type == "finalize_turn": - obj = {"tool": "finalize_turn", "args": obj} - # If JSON has a "tool" key, keep as-is - if "tool" not in obj: - obj = None - - if obj is not None: - key = (obj["tool"], json.dumps(obj.get("args", {}), sort_keys=True)) - if key not in seen: - seen.add(key) - calls.append(obj) - elif on_debug: - preview = text[m.end():m.end() + 120].replace("\n", "\\n") - on_debug("parse_error", {"round": round_num, "content": preview}) - - return calls - - @staticmethod - def _extract_final_json(text: str) -> dict | None: - pattern = r"```json\s*\n?(.*?)```" - matches = re.findall(pattern, text, re.DOTALL) - if not matches: - return None - try: - return json.loads(matches[-1].strip()) - except json.JSONDecodeError: - return None - - def _call_llm(self, messages: list[dict], *, label: str = "", max_tokens: int | None = None, on_debug: callable = None) -> str | None: - """Make a single LLM call. Returns content text or None on error.""" - try: - import litellm - except ImportError: - if on_debug: - on_debug("llm_error", {"label": label, "error": "litellm not installed"}) - return None - try: - response = litellm.completion( - model=self.model, - messages=messages, - temperature=self.temperature, - stream=False, - timeout=self.timeout, - max_tokens=max_tokens or self.max_tokens, - ) - content = getattr(response.choices[0].message, 'content', None) or "" - reasoning = getattr(response.choices[0].message, 'reasoning_content', None) or "" - if reasoning and reasoning not in content: - self._append_llm_log(f"\n--- {label} [reasoning] ---\n{reasoning}") - text = content or reasoning - self._append_llm_log(f"\n--- {label} ---\n{text}") - return text - except Exception as e: - err_msg = f"{type(e).__name__}: {e}" - self._append_llm_log(f"\n--- LLM ERROR ({label}) ---\n{err_msg}") - if on_debug: - on_debug("llm_error", {"label": label, "error": err_msg}) - return None + # ──── Three-phase generation ──────────────────────────────────────── def generate_with_tools( self, @@ -919,20 +261,19 @@ class GameEngine: 2. **Summarize** — LLM condenses the book_log into one log line. 3. **Extract** — LLM reads the book_log and outputs tool calls for state changes. """ - self._set_llm_env() - from datetime import datetime - self._append_llm_log(f"\n{'='*60}") - self._append_llm_log(f"=== Turn — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===") - self._append_llm_log(f"{'='*60}") - if player_action: - self._append_llm_log(f"Player: {player_action}") - elif last_prompt: - self._append_llm_log(f"Resume from: {last_prompt[:120]}") - - # ── Outer loop: Phase 1 (prose) → Phase 2 (summarize) → Phase 3 (extract) ── + set_llm_env(self.model, self.api_key, self.api_base) import random + datetime_now = datetime.now() + state.append_llm_log(f"\n{'='*60}") + state.append_llm_log(f"=== Turn — {datetime_now.strftime('%Y-%m-%d %H:%M:%S')} ===") + state.append_llm_log(f"{'='*60}") + if player_action: + state.append_llm_log(f"Player: {player_action}") + elif last_prompt: + state.append_llm_log(f"Resume from: {last_prompt[:120]}") + die_roll = random.randint(1, 6) - self._append_llm_log(f"Dice: {die_roll} (1d6)") + state.append_llm_log(f"Dice: {die_roll} (1d6)") book_log = None changes_block = "" @@ -950,10 +291,10 @@ class GameEngine: on_debug("phase", {"phase": 1, "name": "prose", "status": "start", "dice": die_roll, "outer_attempt": outer_attempt + 1}) system = PROSE_PROMPT.substitute( - character=self._read_file(CHAR_PATH) or "*No character sheet.*", - world=self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*", - log=self._read_recent_log(), - story=self._read_recent_book(), + character=state.read_file(CHAR_PATH) or "*No character sheet.*", + world=state.truncate_world(state.read_file(WORLD_PATH) or "") or "*No world state.*", + log=state.read_recent_log(), + story=state.read_recent_book(), ) user = self.build_user_message( player_action=player_action, @@ -961,10 +302,11 @@ class GameEngine: ) user += f"\n\n*A die is cast: **{die_roll}** (1d6).*" - text = self._call_llm([ + text = call_llm([ {"role": "system", "content": system}, {"role": "user", "content": user}, - ], label=f"Prose attempt {outer_attempt + 1}", max_tokens=1024, on_debug=on_debug) + ], model=self.model, temperature=self.temperature, timeout=self.timeout, + max_tokens=1024, label=f"Prose attempt {outer_attempt + 1}", on_debug=on_debug) if not text or not text.strip(): if on_debug: @@ -999,19 +341,20 @@ class GameEngine: if on_debug: on_debug("phase", {"phase": 2, "name": "summarize", "status": "start"}) - log_context = self._read_recent_log() + log_context = state.read_recent_log() log_entry = None for p2_attempt in range(2): context = book_log if changes_block: context += f"\n\n{changes_block}" - text = self._call_llm([ + text = call_llm([ {"role": "user", "content": f"Given the session log so far, summarize the new story in one line. " f"Focus on who was involved (character and NPC names):\n\n" f"## Session Log\n{log_context}\n\n" f"## New Story\n{context}"} - ], label=f"Summarize attempt {p2_attempt + 1}", on_debug=on_debug) + ], model=self.model, temperature=self.temperature, timeout=self.timeout, + max_tokens=self.max_tokens, label=f"Summarize attempt {p2_attempt + 1}", on_debug=on_debug) if text and text.strip(): log_entry = text.strip().split("\n")[0][:300] if on_debug: @@ -1032,29 +375,29 @@ class GameEngine: user_prompt = self._auto_prompt(book_log) ambience = None phase3_errors = [] - changes = [] # Reset for this outer attempt + changes = [] - # Step 1: Parse ### Changes block directly (deterministic, no LLM) + # Step 1: Parse ### Changes block directly if changes_block.strip(): - for tc in self._parse_changes_block(changes_block): + for tc in parse_changes_block(changes_block): name = tc["tool"] args = tc.get("args", {}) if name == "finalize_turn": continue - result = self._execute_tool(name, args) + result = execute_tool(name, args) if result.startswith("**Error:") or result.startswith("Tool error") or result.startswith("Unknown"): phase3_errors.append(f"{name}: {result}") else: - desc = self._describe_change(name, args) + desc = describe_change(name, args) if desc: changes.append(desc) # Step 2: LLM Phase 3 for finalize_turn + any extra changes - previous_attempt = None # {output, feedback} + previous_attempt = None phase3_ok = False for p3_attempt in range(5): - current_char = self._read_file(CHAR_PATH) or "*No character.*" - current_world = self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world.*" + current_char = state.read_file(CHAR_PATH) or "*No character.*" + current_world = state.truncate_world(state.read_file(WORLD_PATH) or "") or "*No world.*" phase3_prompt = ( f"## Current Character\n{current_char}\n\n" @@ -1084,7 +427,7 @@ class GameEngine: f"Fix the issues above. Output corrected tool calls only.\n\n" ) - text = self._call_llm([ + text = call_llm([ {"role": "user", "content": phase3_prompt + f"```tool\n{{\"tool\": \"modify_vitals\", \"args\": {{\"current_hp\": 5, \"cash\": 45}}}}\n```\n" f"```tool\n{{\"tool\": \"modify_traits\", \"args\": {{\"dex\": 15}}}}\n```\n" @@ -1097,24 +440,25 @@ class GameEngine: f"```tool\n{{\"tool\": \"journal_update\", \"args\": {{\"add\": [\"Investigate the mine\"], \"done\": [\"Defeat the demon\"]}}}}\n```\n" f"```tool\n{{\"tool\": \"finalize_turn\", \"args\": {{\"user_prompt\": \"What do you do?\", \"ambience\": \"dungeon\"}}}}\n```\n\n" f"Only output tools for things that actually changed. Omit unchanged fields."} - ], label=f"Extract attempt {p3_attempt + 1}", on_debug=on_debug) + ], model=self.model, temperature=self.temperature, timeout=self.timeout, + max_tokens=self.max_tokens, label=f"Extract attempt {p3_attempt + 1}", on_debug=on_debug) if not text or not text.strip(): if on_debug: on_debug("phase", {"phase": 3, "status": "empty", "attempt": p3_attempt + 1}) continue - tool_calls = self._extract_tool_calls( + tool_calls_list = extract_tool_calls( text, round_num=p3_attempt + 1, on_debug=on_debug ) - if on_debug and tool_calls: - names = [tc.get("tool", "?") for tc in tool_calls if tc.get("tool") != "finalize_turn"] - fin = any(tc.get("tool") == "finalize_turn" for tc in tool_calls) + if on_debug and tool_calls_list: + names = [tc.get("tool", "?") for tc in tool_calls_list if tc.get("tool") != "finalize_turn"] + fin = any(tc.get("tool") == "finalize_turn" for tc in tool_calls_list) on_debug("phase", {"phase": 3, "status": "tools_found", "tools": names, "has_finalize": fin}) errors = [] attempt_changes = [] - for tc in tool_calls: + for tc in tool_calls_list: name = tc.get("tool", "?") args = tc.get("args", {}) if name == "finalize_turn": @@ -1124,7 +468,7 @@ class GameEngine: ambience = args["ambience"] continue if on_action: - on_action(f"State: {self._describe_tool_action(name, args)}") + on_action(f"State: {describe_tool_action(name, args)}") if on_debug: on_debug("tool_call", {"round": p3_attempt + 1, "tool": name, "args": args}) @@ -1134,12 +478,12 @@ class GameEngine: roll_val = on_player_roll(dice, reason) result = f"Player rolled {dice} for '{reason}': {roll_val}" else: - result = self._execute_tool(name, args) + result = execute_tool(name, args) if result.startswith("**Error:") or result.startswith("Tool error") or result.startswith("Unknown"): errors.append(f"{name}: {result}") else: - desc = self._describe_change(name, args) + desc = describe_change(name, args) if desc: attempt_changes.append(desc) if on_debug: @@ -1150,7 +494,7 @@ class GameEngine: debug_info = "" changes.extend(attempt_changes) if on_debug: - on_debug("phase", {"phase": 3, "status": "done", "applied": len([tc for tc in tool_calls if tc.get("tool") != "finalize_turn"])}) + on_debug("phase", {"phase": 3, "status": "done", "applied": len([tc for tc in tool_calls_list if tc.get("tool") != "finalize_turn"])}) break phase3_errors = errors @@ -1158,7 +502,6 @@ class GameEngine: if on_debug: on_debug("phase", {"phase": 3, "status": "errors", "errors": errors, "attempt": p3_attempt + 1}) - # Build feedback for the LLM to fix on next attempt feedback_lines = ["The previous tool calls had errors:"] for e in errors: feedback_lines.append(f"- {e}") @@ -1167,13 +510,12 @@ class GameEngine: previous_attempt = {"output": text, "feedback": "\n".join(feedback_lines)} if phase3_ok: - break # All phases succeeded on this outer attempt + break - # Phase 3 failed after 5 attempts — retry from Phase 1 if on_debug: on_debug("phase", {"phase": 3, "status": "exhausted", "errors": phase3_errors}) on_debug("phase", {"phase": 1, "status": "retry_after_phase3_failure", "outer_attempt": outer_attempt + 1}) - book_log = None # Reset so Phase 1 runs again on next outer iteration + book_log = None if not book_log: return TurnResult(error="Generation failed after exhausting all retries") @@ -1190,7 +532,7 @@ class GameEngine: "extract_errors": debug_info or None, }) - self._append_llm_log( + state.append_llm_log( f"\n--- FINAL ---\n" f"book_log: {book_log[:200]}\n" f"log_entry: {log_entry}\n" @@ -1206,6 +548,7 @@ class GameEngine: changes=changes, ) + # ──── Single-call generation ───────────────────────────────────────── def generate_with_tools_single( self, @@ -1222,14 +565,14 @@ class GameEngine: Uses a single LLM call with all tools available — LLM outputs narrative + tool blocks in one go. No retry loop. """ - from datetime import datetime - self._append_llm_log(f"\n{'='*60}") - self._append_llm_log(f"=== Turn — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===") - self._append_llm_log(f"{'='*60}") + datetime_now = datetime.now() + state.append_llm_log(f"\n{'='*60}") + state.append_llm_log(f"=== Turn — {datetime_now.strftime('%Y-%m-%d %H:%M:%S')} ===") + state.append_llm_log(f"{'='*60}") if player_action: - self._append_llm_log(f"Player: {player_action}") + state.append_llm_log(f"Player: {player_action}") elif last_prompt: - self._append_llm_log(f"Resume from: {last_prompt[:120]}") + state.append_llm_log(f"Resume from: {last_prompt[:120]}") strategy_name = "tools" if on_action: @@ -1237,10 +580,10 @@ class GameEngine: if on_debug: on_debug("config", {"model": self.model, "temperature": self.temperature, "max_tokens": self.max_tokens, "strategy": strategy_name}) + import random die_roll = random.randint(1, 6) - self._append_llm_log(f"Dice: {die_roll} (1d6)") + state.append_llm_log(f"Dice: {die_roll} (1d6)") - # Build system prompt that instructs LLM to use tools for changes system = """You are an RPG dungeon master. The player just took an action. Output ONLY ```tool blocks — no prose, no reasoning, no explanation outside tool blocks. Every piece of output must be in a tool block. @@ -1281,10 +624,10 @@ Use these tools to perform every action. Wrap each in its own ```tool block: ``` """ system += PROSE_PROMPT.substitute( - character=self._read_file(CHAR_PATH) or "*No character sheet.*", - world=self._truncate_world(self._read_file(WORLD_PATH) or "") or "*No world state.*", - log=self._read_recent_log(), - story=self._read_recent_book(), + character=state.read_file(CHAR_PATH) or "*No character sheet.*", + world=state.truncate_world(state.read_file(WORLD_PATH) or "") or "*No world state.*", + log=state.read_recent_log(), + story=state.read_recent_book(), ) user = self.build_user_message( @@ -1294,35 +637,34 @@ Use these tools to perform every action. Wrap each in its own ```tool block: user += f"\n\n*A die is cast: **{die_roll}** (1d6).*" start_time = datetime.now() - self._set_llm_env() - self._append_llm_log(f"\n[TOOL] Single call — {len(system)} chars system, {len(user)} chars user") - self._append_llm_log(f"System preview: {system.split('\n')[0][:80]}...") - self._append_llm_log(f"User preview: {user.split('\n')[0][:80]}...") + set_llm_env(self.model, self.api_key, self.api_base) + state.append_llm_log(f"\n[TOOL] Single call — {len(system)} chars system, {len(user)} chars user") + state.append_llm_log(f"System preview: {system.split(chr(10))[0][:80]}...") + state.append_llm_log(f"User preview: {user.split(chr(10))[0][:80]}...") - text = self._call_llm( + text = call_llm( [{"role": "system", "content": system}, {"role": "user", "content": user}], - label="Single tool call", - max_tokens=4096, - on_debug=on_debug, + model=self.model, temperature=self.temperature, timeout=self.timeout, + max_tokens=4096, label="Single tool call", on_debug=on_debug, ) total_elapsed = (datetime.now() - start_time).total_seconds() * 1000 - self._append_llm_log(f"\n[TOOL] got {len(text)} chars in {total_elapsed:.1f}ms") + if text: + state.append_llm_log(f"\n[TOOL] got {len(text)} chars in {total_elapsed:.1f}ms") if not text or not text.strip(): return TurnResult(error="Single tool call returned empty response") raw = text.strip() book_log = "" - changes_block = "" log_entry = None user_prompt = self._auto_prompt("") ambience = None tool_calls = [] + changes = [] + phase3_errors = [] - # Extract tool blocks — ignore everything outside them - import re tool_pattern = r"```tool\s*\n?(.*?)\n?```" matches = re.findall(tool_pattern, text, re.DOTALL) if matches: @@ -1333,7 +675,7 @@ Use these tools to perform every action. Wrap each in its own ```tool block: tool_calls.append(tc) name = tc.get("tool", "unknown") args = tc.get("args", {}) - self._append_llm_log(f"\n[EXTRACT] {name}: {json.dumps(args)[:100]}") + state.append_llm_log(f"\n[EXTRACT] {name}: {json.dumps(args)[:100]}") if name == "narrative": book_log = args.get("text", book_log) @@ -1343,10 +685,9 @@ Use these tools to perform every action. Wrap each in its own ```tool block: if args.get("ambience"): ambience = args["ambience"] except json.JSONDecodeError as e: - self._append_llm_log(f"\n[EXTRACT] bad JSON: {e}") + state.append_llm_log(f"\n[EXTRACT] bad JSON: {e}") continue - # Generate log entry from narrative (first sentence, trimmed) log_entry = None if book_log: clean = re.sub(r'\s+', ' ', book_log).strip() @@ -1355,38 +696,29 @@ Use these tools to perform every action. Wrap each in its own ```tool block: log_entry = first_sentence[0].strip()[:200] else: log_entry = clean[:200] - self._append_llm_log(f"\n[SUMMARY] \"{log_entry}\"") + state.append_llm_log(f"\n[SUMMARY] \"{log_entry}\"") - # Apply changes (exclude narrative and finalize_turn) extr_start = datetime.now() - changes = [] - phase3_errors = [] for tc in tool_calls: name = tc.get("tool", "unknown") args = tc.get("args", {}) if name in ("finalize_turn", "narrative"): continue - result = self._execute_tool(name, args) + result = execute_tool(name, args) if result.startswith("**Error:") or result.startswith("Tool error") or result.startswith("Unknown"): phase3_errors.append(f"{name}: {result}") else: - desc = self._describe_change(name, args) + desc = describe_change(name, args) if desc: changes.append(desc) apply_elapsed = (datetime.now() - extr_start).total_seconds() * 1000 - self._append_llm_log(f"\n[APPLY] {len(changes)} changes in {apply_elapsed:.1f}ms") - + state.append_llm_log(f"\n[APPLY] {len(changes)} changes in {apply_elapsed:.1f}ms") else: - # No tool blocks found — fallback to book_log and apply changes - self._append_llm_log(f"\n[TOOL] no tool blocks found") - tool_calls = [] - changes = [] - phase3_errors = [] + state.append_llm_log(f"\n[TOOL] no tool blocks found") elapsed = (datetime.now() - start_time).total_seconds() * 1000 - # ── Finalize ────────────────────────────────────────────────────── if on_action: on_action("Turn complete") if on_debug: @@ -1426,15 +758,7 @@ Use these tools to perform every action. Wrap each in its own ```tool block: changes=changes, ) - @staticmethod - def _strip_tool_blocks(text: str) -> str: - """Remove ```tool, ```json, finalize_turn blocks from narrative text.""" - return re.sub( - r'```(?:tool|json|finalize_turn)\s*\n?.*?```', - '', - text, - flags=re.DOTALL, - ).strip() + # ── Helpers ───────────────────────────────────────────────────────── @staticmethod def _auto_prompt(book_log: str) -> str: @@ -1447,13 +771,10 @@ Use these tools to perform every action. Wrap each in its own ```tool block: if not lines: return False, "Empty narrative" - # 1) Heuristic: high repetition count - from collections import Counter common = Counter(lines).most_common(1) if common and common[0][1] >= 5: return False, f"Repetition: '{common[0][0][:60]}' ×{common[0][1]}" - # 2) Heuristic: game mechanics bleedthrough mech_lines = [l for l in lines if re.match( r'^\*\*(?:Roll|Damage|Success|Failure|Check|Save|Hit|Miss|' r'Strenght|Dexterity|Willpower|STR|DEX|WIL|' @@ -1465,17 +786,14 @@ Use these tools to perform every action. Wrap each in its own ```tool block: if ratio > 0.3: return False, f"Game mechanics dominate ({len(mech_lines)}/{len(lines)} lines)" - # 3) Heuristic: tool / json blocks leaked into narrative if re.search(r'```(?:tool|json)', book_log): return False, "Contains unprocessed tool blocks" - # 4) Heuristic: under 50 characters of real prose prose = re.sub(r'[*_#>`~\-\d]', '', book_log).strip() if len(prose) < 50: return False, "Too short to be meaningful" - # 5) LLM quality rating (only if heuristics pass) - text = self._call_llm([ + text = call_llm([ {"role": "user", "content": f"Rate this RPG narrative quality 1-5.\n" f"1 = unreadable (spam, repetition, pure mechanics, garbled)\n" @@ -1485,7 +803,8 @@ Use these tools to perform every action. Wrap each in its own ```tool block: f"5 = excellent (vivid, engaging)\n" f"Reply with ONLY a single digit 1-5.\n\n" f"{book_log[:600]}"} - ], label="Narrative validation", max_tokens=2, on_debug=on_debug) + ], model=self.model, temperature=self.temperature, timeout=self.timeout, + max_tokens=2, label="Narrative validation", on_debug=on_debug) if text and text.strip().isdigit(): score = int(text.strip()) @@ -1502,7 +821,6 @@ Use these tools to perform every action. Wrap each in its own ```tool block: Parse a full LLM response into a GenerationResult. Extracts the JSON block and splits narrative from it. """ - # Check for error JSON if text.startswith('{"error":'): try: err = json.loads(text).get("error", "Unknown error") @@ -1510,7 +828,6 @@ Use these tools to perform every action. Wrap each in its own ```tool block: err = "Unknown error" return GenerationResult(narrative="", error=err) - # Try to find a ```json ... ``` block json_pattern = r"```json\s*\n?(.*?)\n?```" matches = re.findall(json_pattern, text, re.DOTALL) @@ -1519,9 +836,7 @@ Use these tools to perform every action. Wrap each in its own ```tool block: if matches: json_str = matches[-1].strip() - # Remove the JSON block from narrative narrative = text[: text.rfind("```json")] - # Also strip any stray "book_log:" lines that may appear before the JSON block narrative_lines = [] for line in narrative.splitlines(): if not line.lstrip().startswith('book_log:'): @@ -1532,7 +847,6 @@ Use these tools to perform every action. Wrap each in its own ```tool block: except json.JSONDecodeError: pass else: - # Fallback: maybe the entire response is JSON (no fence) text_stripped = text.strip() if text_stripped.startswith("{") and text_stripped.endswith("}"): try: @@ -1552,52 +866,7 @@ Use these tools to perform every action. Wrap each in its own ```tool block: journal_done=data.get("journal_done", []), ) - # ── State Persistence ─────────────────────────────────────────────── - - def _validate_update_size(self, name: str, new_content: str, path: Path) -> bool: - """Reject updates that are more than 30% shorter than the existing file - — likely the LLM pasted a fragment instead of the full state.""" - if not path.exists(): - return True - old = path.read_text().strip() - if not old: - return True - ratio = len(new_content) / len(old) - if ratio < 0.7: - import sys - print( - f"WARNING: {name} update rejected ({ratio:.0%} of original size " - f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.", - file=sys.stderr, - ) - return False - return True - - def apply_state(self, result: TurnResult) -> None: - """Write state changes from a TurnResult to disk.""" - if result.ambience: - AMBIENCE_PATH.write_text(result.ambience.strip().lower() + "\n") - if result.changes: - CHANGES_PATH.write_text("\n".join(result.changes) + "\n") - else: - CHANGES_PATH.write_text("") - - def archive_turn(self, narrative: str) -> None: - """Append the narrative as a new turn in book.md.""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") - heading = f"\n\n## Turn — {timestamp}\n\n" - BOOK_PATH.parent.mkdir(parents=True, exist_ok=True) - with open(BOOK_PATH, "a") as f: - f.write(heading + narrative.strip() + "\n") - - def append_log(self, entry: str) -> None: - """Append a log entry to today's log file.""" - LOG_DIR.mkdir(parents=True, exist_ok=True) - log_path = LOG_DIR / f"{TODAY}.md" - if not log_path.exists(): - log_path.write_text(f"# Session Log — {TODAY}\n\n") - with open(log_path, "a") as f: - f.write(entry.strip() + "\n") + # ── Logging ───────────────────────────────────────────────────────── def _log_turn_details( self, @@ -1620,39 +889,19 @@ Use these tools to perform every action. Wrap each in its own ```tool block: output_words = len(book_log.split()) if book_log else 0 applied = len([tc for tc in tool_calls if tc.get("tool") != "finalize_turn"]) - self._append_llm_log("") - self._append_llm_log( - f"┌─ Turn Details — {ts}" - ) - self._append_llm_log( - f"├─ Input: {player_action}" - ) - self._append_llm_log( - f"├─ Last Prompt: {last_prompt}" - ) - self._append_llm_log( - f"├─ Strategy: {strategy_name}" - ) - self._append_llm_log( - f"├─ Dice: {die_roll} (1d6)" - ) - self._append_llm_log( - f"├─ Model: {model} | Temp: {temperature} | Tokens: {max_tokens}" - ) - self._append_llm_log( - f"├─ Output: {output_chars} chars ({output_words} words)" - ) - self._append_llm_log( - f"├─ Log Entry: {log_entry}" - ) - self._append_llm_log( - f"├─ Ambience: {ambience or 'None'}" - ) + state.append_llm_log("") + state.append_llm_log(f"┌─ Turn Details — {ts}") + state.append_llm_log(f"├─ Input: {player_action}") + state.append_llm_log(f"├─ Last Prompt: {last_prompt}") + state.append_llm_log(f"├─ Strategy: {strategy_name}") + state.append_llm_log(f"├─ Dice: {die_roll} (1d6)") + state.append_llm_log(f"├─ Model: {model} | Temp: {temperature} | Tokens: {max_tokens}") + state.append_llm_log(f"├─ Output: {output_chars} chars ({output_words} words)") + state.append_llm_log(f"├─ Log Entry: {log_entry}") + state.append_llm_log(f"├─ Ambience: {ambience or 'None'}") tools_preview = ", ".join(tc.get("tool", "?") for tc in tool_calls) - self._append_llm_log( - f"├─ Tool Calls: {len(tool_calls)} ({tools_preview})" - ) - self._append_llm_log( + state.append_llm_log(f"├─ Tool Calls: {len(tool_calls)} ({tools_preview})") + state.append_llm_log( "└─────────────────────────────────────────────────────────────────────────────────────────┘" ) @@ -1673,82 +922,6 @@ Use these tools to perform every action. Wrap each in its own ```tool block: "tool_call_results": tool_calls, }) - def _append_llm_log(self, text: str) -> None: - """Append raw LLM activity to llm.log for debugging.""" - LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True) - with open(LLM_LOG_PATH, "a") as f: - f.write(text + "\n") - - def _update_journal( - self, add: list[str] | None = None, done: list[str] | None = None - ) -> None: - """Add or complete TODO items in journal.md.""" - if not JOURNAL_PATH.exists(): - JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n") - lines = JOURNAL_PATH.read_text().splitlines() - - # Parse into sections: everything before TODO, TODO items, between, DONE items, after - todo_items: list[str] = [] - done_items: list[str] = [] - before_todo: list[str] = [] - between: list[str] = [] - after_done: list[str] = [] - section = "before_todo" - for line in lines: - stripped = line.strip() - if stripped == "## TODO": - section = "todo" - before_todo.append(line) - elif stripped == "## DONE": - section = "done" - between.append(line) - elif section == "before_todo": - before_todo.append(line) - elif section == "todo": - if stripped.startswith("- "): - todo_items.append(stripped[2:]) - else: - between.append(line) - elif section == "done": - if stripped.startswith("- "): - done_items.append(stripped[2:]) - else: - after_done.append(line) - - # Apply changes - if done: - done_set = set(done) - todo_items = [i for i in todo_items if i not in done_set] - new_done = [i for i in done if i not in done_items] - done_items.extend(new_done) - - if add: - todo_set = set(todo_items) - new_todo = [i for i in add if i not in todo_set] - # Insert new items at the top of TODO - todo_items = new_todo + todo_items - - # Reconstruct - out = list(before_todo) - for item in todo_items: - out.append(f"- {item}") - out.extend(between) - for item in done_items: - out.append(f"- {item}") - out.extend(after_done) - - # Clean up: collapse multiple blank lines - cleaned = [] - prev_blank = False - for line in out: - is_blank = line.strip() == "" - if is_blank and prev_blank: - continue - cleaned.append(line) - prev_blank = is_blank - # Ensure trailing newline - JOURNAL_PATH.write_text("\n".join(cleaned) + "\n") - # ── CLI entry point (for testing) ───────────────────────────────────────── def main(): diff --git a/tools/llm.py b/tools/llm.py new file mode 100644 index 0000000..d0ee38f --- /dev/null +++ b/tools/llm.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +""" +llm.py — LLM interaction layer for The Chaos engine. + +Provides the low-level call_llm function and environment variable setup +for provider-specific auth. +""" + +from __future__ import annotations + +import os + +from state import append_llm_log + + +def set_llm_env(model: str, api_key: str | None, api_base: str | None) -> None: + """Set provider-specific env vars for litellm.""" + prefix = model.split("/")[0].upper() + key = api_key or "sk-placeholder" + os.environ[f"{prefix}_API_KEY"] = key + if api_base: + os.environ[f"{prefix}_API_BASE"] = api_base + + +def call_llm( + messages: list[dict], + *, + model: str, + temperature: float, + timeout: int, + max_tokens: int, + label: str = "", + on_debug: callable = None, +) -> str | None: + """Make a single LLM call. Returns content text or None on error.""" + try: + import litellm + except ImportError: + if on_debug: + on_debug("llm_error", {"label": label, "error": "litellm not installed"}) + return None + try: + response = litellm.completion( + model=model, + messages=messages, + temperature=temperature, + stream=False, + timeout=timeout, + max_tokens=max_tokens, + ) + content = getattr(response.choices[0].message, 'content', None) or "" + reasoning = getattr(response.choices[0].message, 'reasoning_content', None) or "" + if reasoning and reasoning not in content: + append_llm_log(f"\n--- {label} [reasoning] ---\n{reasoning}") + text = content or reasoning + append_llm_log(f"\n--- {label} ---\n{text}") + return text + except Exception as e: + err_msg = f"{type(e).__name__}: {e}" + append_llm_log(f"\n--- LLM ERROR ({label}) ---\n{err_msg}") + if on_debug: + on_debug("llm_error", {"label": label, "error": err_msg}) + return None diff --git a/tools/models.py b/tools/models.py new file mode 100644 index 0000000..503ba65 --- /dev/null +++ b/tools/models.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +""" +models.py — Data classes for The Chaos game engine. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class GenerationResult: + """Legacy result — kept for backward compat with CLI main().""" + narrative: str + choices: list[str] = field(default_factory=list) + log_entry: Optional[str] = None + ambience: Optional[str] = None + character_updates: Optional[str] = None + world_updates: Optional[str] = None + journal_add: list[str] = field(default_factory=list) + journal_done: list[str] = field(default_factory=list) + error: Optional[str] = None + + +@dataclass +class TurnResult: + """Output of a complete turn via finalize_turn tool.""" + book_log: str = "" + user_prompt: str = "" + ambience: Optional[str] = None + log_entry: Optional[str] = None + error: Optional[str] = None + debug_info: str = "" + changes: list[str] = field(default_factory=list) diff --git a/tools/paths.py b/tools/paths.py new file mode 100644 index 0000000..3b71b26 --- /dev/null +++ b/tools/paths.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +""" +paths.py — Path constants for The Chaos game engine. + +Shared by engine.py, run.py, and all sub-modules. +""" + +from __future__ import annotations + +from datetime import date +from pathlib import Path + + +BASE_DIR = Path(__file__).resolve().parent.parent +SESSION_DIR = BASE_DIR / 'session' +CONFIG_PATH = SESSION_DIR / 'config.json' +CHAR_PATH = SESSION_DIR / 'character.md' +WORLD_PATH = SESSION_DIR / 'world.md' +BOOK_PATH = SESSION_DIR / 'book.md' +JOURNAL_PATH = SESSION_DIR / 'journal.md' +AMBIENCE_PATH = SESSION_DIR / 'ambience.md' +LOG_DIR = SESSION_DIR / 'log' +LLM_LOG_PATH = SESSION_DIR / 'llm.log' +AMBIENCE_OPTIONS_PATH = SESSION_DIR / "ambience_options.md" +CHANGES_PATH = SESSION_DIR / "changes.md" +AUDIO_DIR = SESSION_DIR / "audio" +TODAY = date.today().isoformat() diff --git a/tools/prompts.py b/tools/prompts.py new file mode 100644 index 0000000..d34e048 --- /dev/null +++ b/tools/prompts.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +""" +prompts.py — System prompt templates for The Chaos game engine. +""" + +from __future__ import annotations + +from string import Template + + +SYSTEM_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion. + +## Rules +- **Odds**: 1d6, 4+ favourable, 3- trouble. +- **Traits**: 3d6, roll UNDER trait. +- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour. +- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed. +- **Modifiers**: Favourable +1, Risky -1, Desperate -2. + +## Tools (action only) +Wrap in ```tool to perform an action: +``` +{"tool": "roll", "args": {"dice": "1d6"}} +``` + +- **roll** — dice, modifier +- **player_roll** — dice, reason +- **character_update** — content: "full sheet" (if HP/cash/gear/stats change) +- **world_update** — content: "full world" (if NPCs/locations/threads change) +- **journal_update** — add: [...], done: [...] + +You have the full state above — no need to look anything up. Just write the story and use tools when the player's action changes something. + +## State + +### Character +$character + +### World +$world + +### Log +$log + +### Story +$story""") + + +PROSE_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion. + +## Rules +- **Odds**: 1d6, 4+ favourable, 3- trouble. +- **Traits**: 3d6, roll UNDER trait. +- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour. +- **Wounds at 0 HP**: 1d6 → 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed. +- **Modifiers**: Favourable +1, Risky -1, Desperate -2. + +A die is cast at the start of each turn — incorporate it into your narrative. + +End your response with a `### Changes` block listing what changed: + +### Changes +- Current Health: 3 +- Cash: 45 silver +- Added to inventory: Silver key +- Removed from inventory: Torches (10) +- Replaced gear: Mace (1d6+1) → Mace (1d6+2) +- Note: Found a hidden passage +- Journal done: Defeat the demon +- Journal add: Investigate the mine + +Only include lines for things that actually changed. Omit unused lines entirely. + +## State + +### Character +$character + +### World +$world + +### Log +$log + +### Story +$story""") diff --git a/tools/run.py b/tools/run.py index 995912f..93a846c 100755 --- a/tools/run.py +++ b/tools/run.py @@ -24,7 +24,10 @@ from rich.markdown import Markdown as RichMarkdown from rich.theme import Theme # ── Game engine ───────────────────────────────────────── -from engine import GameEngine, GenerationResult, TurnResult, LLM_LOG_PATH +from engine import GameEngine +from models import GenerationResult, TurnResult +from paths import LLM_LOG_PATH +import state # ── Optional miniaudio ──────────────────────────────────── try: @@ -1033,17 +1036,17 @@ class ChaosTUI(App): from datetime import datetime ts = datetime.now().strftime("%H:%M") if result.log_entry: - self.engine.append_log(f"- **{ts}** — {result.log_entry}") + state.append_log(f"- **{ts}** — {result.log_entry}") elif result.book_log: first_line = result.book_log.strip().split("\n")[0][:80] - self.engine.append_log(f"- **Turn** — {first_line}") + state.append_log(f"- **Turn** — {first_line}") # Archive the turn's book log if result.book_log: - self.engine.archive_turn(result.book_log) + state.archive_turn(result.book_log) # Apply state changes - self.engine.apply_state(result) + state.apply_state(result) # Display the next user prompt self._display_scene(result) diff --git a/tools/state.py b/tools/state.py new file mode 100644 index 0000000..85ecf43 --- /dev/null +++ b/tools/state.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +state.py — Game state persistence and file I/O for The Chaos engine. + +Standalone functions that read/write session files. No dependency on +GameEngine or other modules besides paths. +""" + +from __future__ import annotations + +import re +import sys +from datetime import date, datetime, timedelta +from pathlib import Path + +from paths import ( + CHAR_PATH, WORLD_PATH, BOOK_PATH, JOURNAL_PATH, AMBIENCE_PATH, + LOG_DIR, LLM_LOG_PATH, AMBIENCE_OPTIONS_PATH, CHANGES_PATH, + AUDIO_DIR, TODAY, +) +from models import TurnResult + + +def read_file(path: Path) -> str: + """Read a text file, return empty string if missing.""" + return path.read_text().strip() if path.exists() else "" + + +def read_recent_log(max_entries: int = 5) -> str: + """Read the latest log file and return the last N entries.""" + log_path = LOG_DIR / f"{TODAY}.md" + if not log_path.exists(): + yesterday = (date.today() - timedelta(days=1)).isoformat() + log_path = LOG_DIR / f"{yesterday}.md" + if not log_path.exists(): + return "*No recent events.*" + lines = log_path.read_text().splitlines() + entries = [l for l in lines if l.strip().startswith("- ")] + return "\n".join(entries[-max_entries:]) or "*No recent events.*" + + +def read_recent_book(max_turns: int = 1) -> str: + """Return the last N turns from the book as context.""" + text = read_file(BOOK_PATH) + if not text: + return "*No prior story.*" + turns = text.split("\n## ") + recent = turns[-max_turns:] + return "\n## ".join(recent) if len(turns) > 1 else recent[0] + + +def truncate_world(text: str) -> str: + """Extract key world context: NPCs, factions, active threads, rumours.""" + if not text or text == "*No world state.*": + return text + sections = re.split(r"\n(?=## |### )", text) + parts = [] + for sec in sections: + header = sec.split("\n")[0].strip() if sec else "" + if "Active Threads" in header: + parts.append(sec) + elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header: + parts.append(sec) + result = "\n\n".join(parts) + return result or text[:1500] + "\n_(world truncated)_" + + +def get_valid_ambiences() -> set[str]: + """Parse ambience_options.md and return set of valid ambience names.""" + valid = {"silence"} + if not AMBIENCE_OPTIONS_PATH.exists(): + return valid + in_table = False + for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines(): + s = line.strip() + if not s.startswith("|") or not s.endswith("|"): + in_table = False + continue + if in_table and all(c in "-:| " for c in s): + continue + parts = [p.strip() for p in s.split("|") if p.strip()] + if not parts: + continue + if not in_table: + in_table = True + continue + name = parts[0].lower() + files_str = parts[1] if len(parts) > 1 else "" + files = [f.strip() for f in files_str.split(",")] + has_files = any((AUDIO_DIR / f).exists() or f for f in files) + if has_files: + valid.add(name) + return valid + + +def validate_update_size(name: str, new_content: str, path: Path) -> bool: + """Reject updates more than 30% shorter than existing — likely partial paste.""" + if not path.exists(): + return True + old = path.read_text().strip() + if not old: + return True + ratio = len(new_content) / len(old) + if ratio < 0.7: + print( + f"WARNING: {name} update rejected ({ratio:.0%} of original size " + f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.", + file=sys.stderr, + ) + return False + return True + + +def apply_state(result: TurnResult) -> None: + """Write state changes from a TurnResult to disk.""" + if result.ambience: + AMBIENCE_PATH.write_text(result.ambience.strip().lower() + "\n") + if result.changes: + CHANGES_PATH.write_text("\n".join(result.changes) + "\n") + else: + CHANGES_PATH.write_text("") + + +def archive_turn(narrative: str) -> None: + """Append the narrative as a new turn in book.md.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") + heading = f"\n\n## Turn — {timestamp}\n\n" + BOOK_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(BOOK_PATH, "a") as f: + f.write(heading + narrative.strip() + "\n") + + +def append_log(entry: str) -> None: + """Append a log entry to today's log file.""" + LOG_DIR.mkdir(parents=True, exist_ok=True) + log_path = LOG_DIR / f"{TODAY}.md" + if not log_path.exists(): + log_path.write_text(f"# Session Log — {TODAY}\n\n") + with open(log_path, "a") as f: + f.write(entry.strip() + "\n") + + +def append_llm_log(text: str) -> None: + """Append raw LLM activity to llm.log for debugging.""" + LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(LLM_LOG_PATH, "a") as f: + f.write(text + "\n") + + +def update_journal(add: list[str] | None = None, done: list[str] | None = None) -> None: + """Add or complete TODO items in journal.md.""" + if not JOURNAL_PATH.exists(): + JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n") + lines = JOURNAL_PATH.read_text().splitlines() + + todo_items: list[str] = [] + done_items: list[str] = [] + before_todo: list[str] = [] + between: list[str] = [] + after_done: list[str] = [] + section = "before_todo" + for line in lines: + stripped = line.strip() + if stripped == "## TODO": + section = "todo" + before_todo.append(line) + elif stripped == "## DONE": + section = "done" + between.append(line) + elif section == "before_todo": + before_todo.append(line) + elif section == "todo": + if stripped.startswith("- "): + todo_items.append(stripped[2:]) + else: + between.append(line) + elif section == "done": + if stripped.startswith("- "): + done_items.append(stripped[2:]) + else: + after_done.append(line) + + if done: + done_set = set(done) + todo_items = [i for i in todo_items if i not in done_set] + new_done = [i for i in done if i not in done_items] + done_items.extend(new_done) + + if add: + todo_set = set(todo_items) + new_todo = [i for i in add if i not in todo_set] + todo_items = new_todo + todo_items + + out = list(before_todo) + for item in todo_items: + out.append(f"- {item}") + out.extend(between) + for item in done_items: + out.append(f"- {item}") + out.extend(after_done) + + cleaned = [] + prev_blank = False + for line in out: + is_blank = line.strip() == "" + if is_blank and prev_blank: + continue + cleaned.append(line) + prev_blank = is_blank + JOURNAL_PATH.write_text("\n".join(cleaned) + "\n") diff --git a/tools/test_imports.py b/tools/test_imports.py index 6b19c7a..d4d5b8c 100755 --- a/tools/test_imports.py +++ b/tools/test_imports.py @@ -5,48 +5,58 @@ import sys import os import ast +MODULES = [ + 'engine.py', + 'paths.py', + 'models.py', + 'prompts.py', + 'state.py', + 'tools_handler.py', + 'llm.py', +] + def check_missing_imports(): """Check for missing imports that would cause NameError.""" errors = [] - - # Check engine.py - engine_path = os.path.join(os.path.dirname(__file__), 'engine.py') - with open(engine_path, 'r') as f: - engine_content = f.read() - - # Parse the file to find all names used - tree = ast.parse(engine_content) - - # Collect all names that are used (not defined) - names_used = set() - for node in ast.walk(tree): - if isinstance(node, ast.Name): - names_used.add(node.id) - - # Check for common missing imports - common_modules = { - 'random', - 're', - 'json', - 'traceback', - 'datetime', - 'time', - 'os', - 'sys', - 'pathlib', - 'functools', - 'collections', - 'typing', - 'io', - 'string', - } - - for module in common_modules: - if module in names_used and not hasattr(sys.modules.get(module, None), '__file__'): - # Check if it's used but not imported - if f'import {module}' not in engine_content and f'from {module} import' not in engine_content: - errors.append(f"Missing import: {module}") - + tool_dir = os.path.dirname(__file__) + + for mod_file in MODULES: + mod_path = os.path.join(tool_dir, mod_file) + if not os.path.exists(mod_path): + errors.append(f"Module not found: {mod_file}") + continue + with open(mod_path, 'r') as f: + content = f.read() + + tree = ast.parse(content) + + names_used = set() + for node in ast.walk(tree): + if isinstance(node, ast.Name): + names_used.add(node.id) + + common_modules = { + 'random', + 're', + 'json', + 'traceback', + 'datetime', + 'time', + 'os', + 'sys', + 'pathlib', + 'functools', + 'collections', + 'typing', + 'io', + 'string', + } + + for module in common_modules: + if module in names_used and not hasattr(sys.modules.get(module, None), '__file__'): + if f'import {module}' not in content and f'from {module} import' not in content: + errors.append(f"{mod_file}: Missing import: {module}") + return errors if __name__ == '__main__': @@ -57,5 +67,5 @@ if __name__ == '__main__': print(f" - {error}") sys.exit(1) else: - print("✓ All imports present") + print("✓ All imports present across all modules") sys.exit(0) diff --git a/tools/test_runtime.py b/tools/test_runtime.py index 59b0626..f0e2f54 100755 --- a/tools/test_runtime.py +++ b/tools/test_runtime.py @@ -1,43 +1,60 @@ #!/usr/bin/env python3 -"""Test that the engine module can be imported without errors.""" +"""Test that all engine modules can be imported without errors.""" import sys import os import traceback -def test_engine_import(): - """Test that the engine module imports without errors.""" - errors = [] - + +def test_module_import(module_name): + """Try importing a module and return (ok, error_msg).""" try: - # Add the tools directory to the path - sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - - # Import the engine module - import engine - print(f"✓ Engine module imported successfully") - - # Check for common runtime errors - if not hasattr(engine, 'GameEngine'): - errors.append("GameEngine class not found") - else: - print(f"✓ GameEngine class found") - - # Check that generate_with_tools_single exists - if hasattr(engine.GameEngine, 'generate_with_tools_single'): - print(f"✓ generate_with_tools_single method found") - else: - errors.append("generate_with_tools_single method not found") - - except ImportError as e: - errors.append(f"Import error: {e}") - except AttributeError as e: - errors.append(f"Attribute error: {e}") + __import__(module_name) + return True, None except Exception as e: - errors.append(f"Unexpected error: {e}\n{traceback.format_exc()}") - + return False, f"{type(e).__name__}: {e}\n{traceback.format_exc()}" + + +def test_engine_import(): + """Test that all modules import without errors.""" + errors = [] + + sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + + modules_to_test = [ + ('paths', ['BASE_DIR', 'SESSION_DIR', 'CHAR_PATH', 'LLM_LOG_PATH']), + ('models', ['GenerationResult', 'TurnResult']), + ('prompts', ['SYSTEM_PROMPT', 'PROSE_PROMPT']), + ('state', ['read_file', 'apply_state', 'append_log', 'append_llm_log']), + ('tools_handler', ['execute_tool', 'extract_tool_calls', 'TOOL_REGISTRY']), + ('llm', ['call_llm', 'set_llm_env']), + ('engine', ['GameEngine']), + ] + + for mod_name, expected_attrs in modules_to_test: + ok, err = test_module_import(mod_name) + if not ok: + errors.append(f"Import error ({mod_name}): {err}") + continue + print(f"✓ {mod_name} module imported successfully") + + mod = sys.modules[mod_name] + for attr in expected_attrs: + if not hasattr(mod, attr): + errors.append(f"{mod_name}: {attr} not found") + else: + print(f" ✓ {mod_name}.{attr} exists") + + # Check that GameEngine has generate_with_tools_single + import engine + if hasattr(engine.GameEngine, 'generate_with_tools_single'): + print(f"✓ engine.GameEngine.generate_with_tools_single method found") + else: + errors.append("engine.GameEngine.generate_with_tools_single method not found") + return errors + if __name__ == '__main__': errors = test_engine_import() if errors: @@ -46,4 +63,5 @@ if __name__ == '__main__': print(f" - {error}") sys.exit(1) else: + print("\n✓ All modules validated successfully") sys.exit(0) diff --git a/tools/tools_handler.py b/tools/tools_handler.py new file mode 100644 index 0000000..80d7383 --- /dev/null +++ b/tools/tools_handler.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 +""" +tools_handler.py — Tool call infrastructure for The Chaos engine. + +Handles tool call extraction, execution, and description. All functions +are standalone — no dependency on the GameEngine class. +""" + +from __future__ import annotations + +import json +import random +import re + +from paths import CHAR_PATH, WORLD_PATH, LOG_DIR, TODAY +from state import read_file, validate_update_size, update_journal, append_llm_log + + +# ── Tool Registry ─────────────────────────────────────────────────────────── + +TOOL_REGISTRY: dict[str, dict] = { + "roll": {"description": "Roll dice.", "args": {"dice": "1d6", "modifier": "+1"}}, + "player_roll": {"description": "Ask player to roll.", "args": {"dice": "1d6", "reason": "why"}}, + "modify_traits": {"description": "Change STR/DEX/WIL.", "args": {"str": "optional", "dex": "optional", "wil": "optional"}}, + "modify_vitals": {"description": "Change HP, cash, weapon, armour.", "args": {"current_hp": "optional", "max_hp": "optional", "cash": "optional", "weapon": "optional", "armour": "optional"}}, + "add_to_inventory": {"description": "Add item to gear.", "args": {"item": "item name and stats"}}, + "remove_from_inventory": {"description": "Remove item from gear.", "args": {"item": "exact item text"}}, + "replace_gear": {"description": "Replace gear by exact match.", "args": {"before": "exact text", "after": "new text"}}, + "add_note": {"description": "Add note to sheet.", "args": {"note": "note content"}}, + "replace_note": {"description": "Replace note by exact match.", "args": {"before": "exact text", "after": "new text"}}, + "world_update": {"description": "Replace world state.", "args": {"content": "full world markdown"}}, + "journal_update": {"description": "Update TODO/DONE.", "args": {"add": "[...]", "done": "[...]"}}, + "finalize_turn": {"description": "End turn.", "args": {"user_prompt": "question for player", "ambience": "soundscape name"}}, +} + + +# ── Character Sheet Patcher ───────────────────────────────────────────────── + +def patch_character(pattern: str, repl: str, count: int = 1, flags: int = 0) -> str: + """Apply a regex replacement to character.md. Returns error msg or empty string.""" + text = CHAR_PATH.read_text() + new, n = re.subn(pattern, repl, text, count=count, flags=flags) + if n == 0: + return f"**Error:** pattern not found:\n{pattern}" + CHAR_PATH.write_text(new) + return "" + + +# ── Individual Tool Implementations ───────────────────────────────────────── + +def tool_think(args: dict) -> str: + return "" + + +def tool_read_file(args: dict) -> str: + filename = (args or {}).get("file", "") + paths = { + "character": CHAR_PATH, + "world": WORLD_PATH, + "log": LOG_DIR / f"{TODAY}.md", + } + path = paths.get(filename) + if not path: + return f"Unknown file: {filename}. Choose from: {', '.join(paths)}" + return read_file(path) or f"*{filename} is empty.*" + + +def tool_roll(args: dict) -> str: + import random + dice_str = (args or {}).get("dice", "1d6") + modifier_str = (args or {}).get("modifier", "0") + try: + count, sides = dice_str.lower().split("d") + count = int(count) if count else 1 + sides = int(sides) + except (ValueError, TypeError): + return f"Invalid dice: {dice_str}. Use format like '2d6'." + mod = 0 + if modifier_str: + try: + mod = int(modifier_str) + except ValueError: + pass + rolls = [random.randint(1, sides) for _ in range(count)] + total = sum(rolls) + mod + mod_str = f" {'+' if mod >= 0 else ''}{mod}" if mod != 0 else "" + return f"Roll: {dice_str}{mod_str} → [{', '.join(str(r) for r in rolls)}] = {total}" + + +def tool_modify_traits(args: dict) -> str: + errors = [] + for stat in ("str", "dex", "wil"): + val = args.get(stat) + if val is not None: + err = patch_character( + rf"^(- \*\*{stat.upper()}:\*\*\s*)\d+", rf"\g<1>{val}", count=1, flags=re.MULTILINE + ) + if err: + errors.append(err) + return "; ".join(errors) if errors else "Traits updated." + + +def tool_modify_vitals(args: dict) -> str: + errors = [] + for field, label in [("current_hp", "Current Health"), ("max_hp", "Max Health"), + ("cash", "Cash"), ("weapon", "Weapon"), ("armour", "Armour")]: + val = args.get(field) + if val is not None: + err = patch_character( + rf"^(- \*\*{label}:\*\*\s*).*", rf"\g<1>{val}", count=1, flags=re.MULTILINE + ) + if err: + errors.append(err) + return "; ".join(errors) if errors else "Vitals updated." + + +def tool_add_to_inventory(args: dict) -> str: + item = (args or {}).get("item", "") + if not item: + return "**Error:** `item` is required." + text = CHAR_PATH.read_text() + if item in text: + return f"Item already in inventory: {item}" + gear_section = re.search(r"^## Gear\n", text, re.MULTILINE) + if gear_section: + insert_at = gear_section.end() + text = text[:insert_at] + f"- {item}\n" + text[insert_at:] + else: + text += f"\n## Gear\n- {item}\n" + CHAR_PATH.write_text(text) + return f"Added to inventory: {item}" + + +def tool_remove_from_inventory(args: dict) -> str: + item = (args or {}).get("item", "") + if not item: + return "**Error:** `item` is required." + err = patch_character(rf"^- {re.escape(item)}\n?", "", count=1, flags=re.MULTILINE) + if err: + return f"**Error:** item not found: {item}" + return f"Removed from inventory: {item}" + + +def tool_replace_gear(args: dict) -> str: + before = (args or {}).get("before", "") + after = (args or {}).get("after", "") + if not before or not after: + return "**Error:** `before` and `after` are required." + err = patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE) + if err: + return f"**Error:** gear not found: {before}" + return f"Gear replaced: {before} → {after}" + + +def tool_add_note(args: dict) -> str: + note = (args or {}).get("note", "") + if not note: + return "**Error:** `note` is required." + text = CHAR_PATH.read_text() + notes_section = re.search(r"^## Notes & Scribbles\n", text, re.MULTILINE) + if notes_section: + text = text[:notes_section.end()] + f"- {note}\n" + text[notes_section.end():] + else: + text += f"\n## Notes & Scribbles\n- {note}\n" + CHAR_PATH.write_text(text) + return f"Note added: {note}" + + +def tool_replace_note(args: dict) -> str: + before = (args or {}).get("before", "") + after = (args or {}).get("after", "") + if not before or not after: + return "**Error:** `before` and `after` are required." + err = patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE) + if err: + return f"**Error:** note not found: {before}" + return f"Note replaced." + + +def tool_world_update(args: dict) -> str: + content = (args or {}).get("content", "") + if not content: + return "**Error:** `content` is required." + if not validate_update_size("world", content, WORLD_PATH): + return "**Error:** Update rejected — content is too short (likely a partial paste)." + WORLD_PATH.write_text(content.strip() + "\n") + return "World state updated." + + +def tool_journal_update(args: dict) -> str: + add = (args or {}).get("add", []) + done = (args or {}).get("done", []) + if isinstance(add, str): + add = [add] + if isinstance(done, str): + done = [done] + if not add and not done: + return "**Error:** Provide at least one of `add` or `done`." + update_journal(add=add, done=done) + return "Journal updated." + + +# ── Tool Dispatcher ───────────────────────────────────────────────────────── + +def execute_tool(tool_name: str, args: dict) -> str: + """Execute a tool by name. Returns result string.""" + fn_map = { + "roll": tool_roll, + "modify_traits": tool_modify_traits, + "modify_vitals": tool_modify_vitals, + "add_to_inventory": tool_add_to_inventory, + "remove_from_inventory": tool_remove_from_inventory, + "replace_gear": tool_replace_gear, + "add_note": tool_add_note, + "replace_note": tool_replace_note, + "world_update": tool_world_update, + "journal_update": tool_journal_update, + } + fn = fn_map.get(tool_name) + if not fn: + return f"Unknown tool: {tool_name}" + try: + return fn(args) + except Exception as e: + import traceback + tb = traceback.format_exc() + append_llm_log(f"\n--- TOOL ERROR ({tool_name}) ---\n{tb}") + return f"Tool error ({tool_name}): {e}" + + +# ── Descriptions ──────────────────────────────────────────────────────────── + +def describe_tool_action(tool_name: str, args: dict) -> str: + """Return a user-facing status message for a tool call.""" + dm_status = (args or {}).get("dm_status") + if dm_status: + return f"DM is {dm_status}..." + + read_descriptions = { + "character": "reading the character sheet", + "world": "consulting the world map", + "book": "reviewing the story so far", + "log": "checking the session log", + "journal": "scanning the journal", + } + if tool_name == "read_file": + file = (args or {}).get("file", "") + desc = read_descriptions.get(file, f"reading {file}") + elif tool_name in ("character_get", "world_get", "journal_get"): + file = tool_name.replace("_get", "") + desc = read_descriptions.get(file, f"reading {file}") + elif tool_name in ("character_update", "world_update"): + desc = "updating the records" + elif tool_name == "journal_update": + desc = "updating the journal" + elif tool_name == "roll": + dice = (args or {}).get("dice", "1d6") + mod = (args or {}).get("modifier") + desc = f"rolling {dice}" + if mod: + desc += f" {mod}" + elif tool_name == "player_roll": + dice = (args or {}).get("dice", "1d6") + desc = f"asking you to roll {dice}" + elif tool_name == "modify_traits": + desc = "updating traits" + elif tool_name == "modify_vitals": + desc = "updating vitals" + elif tool_name == "add_to_inventory": + desc = "adding item to inventory" + elif tool_name == "remove_from_inventory": + desc = "removing item from inventory" + elif tool_name == "replace_gear": + desc = "replacing gear" + elif tool_name == "add_note": + desc = "adding note" + elif tool_name == "replace_note": + desc = "replacing note" + else: + desc = f"using {tool_name}" + return f"DM is {desc}..." + + +def describe_change(tool_name: str, args: dict) -> str: + """Build a compact human-readable change description from a tool call.""" + if tool_name == "modify_vitals": + parts = [] + for k, v in args.items(): + label = k.replace("_", " ").title() + parts.append(f"{label}: {v}") + return f"⚡ {', '.join(parts)}" if parts else "" + elif tool_name == "modify_traits": + parts = [] + for k, v in args.items(): + parts.append(f"{k.upper()}: {v}") + return f"⚡ {', '.join(parts)}" + elif tool_name == "add_to_inventory": + return f"+ {args.get('item', '?')}" + elif tool_name == "remove_from_inventory": + return f"− {args.get('item', '?')}" + elif tool_name == "replace_gear": + return f"↻ {args.get('before', '?')} → {args.get('after', '?')}" + elif tool_name == "add_note": + note = args.get("note", "?") + return f"📝 {note[:60]}{'…' if len(note) > 60 else ''}" + elif tool_name == "replace_note": + return f"📝 {args.get('before', '?')[:40]} → {args.get('after', '?')[:40]}" + elif tool_name == "world_update": + return "🌍 World updated" + elif tool_name == "journal_update": + parts = [] + for a in args.get("add", []): + parts.append(f"📋 {a}") + for d in args.get("done", []): + parts.append(f"✅ {d}") + return "; ".join(parts) if parts else "" + return "" + + +# ── Changes Block Parser ──────────────────────────────────────────────────── + +def parse_changes_block(changes_block: str) -> list[dict]: + """Parse a ### Changes block into tool call dicts.""" + calls = [] + for raw_line in changes_block.split("\n"): + line = raw_line.strip() + if not line.startswith("- "): + continue + content = line[2:].strip() + + m = re.match(r"Current Health:\s*(\d+)", content, re.IGNORECASE) + if m: + calls.append({"tool": "modify_vitals", "args": {"current_hp": m.group(1)}}) + continue + + m = re.match(r"Cash:\s*(\d+)", content, re.IGNORECASE) + if m: + calls.append({"tool": "modify_vitals", "args": {"cash": m.group(1)}}) + continue + + m = re.match(r"Max Health:\s*(\d+)", content, re.IGNORECASE) + if m: + calls.append({"tool": "modify_vitals", "args": {"max_hp": m.group(1)}}) + continue + + m = re.match(r"Add(?:ed)? to inventory:\s*(.+)", content, re.IGNORECASE) + if m: + for item in [i.strip() for i in m.group(1).split(",") if i.strip()]: + calls.append({"tool": "add_to_inventory", "args": {"item": item}}) + continue + + m = re.match(r"Remov(?:e|ed) from inventory:\s*(.+)", content, re.IGNORECASE) + if m: + for item in [i.strip() for i in m.group(1).split(",") if i.strip()]: + calls.append({"tool": "remove_from_inventory", "args": {"item": item}}) + continue + + m = re.match(r"Replace(?:d)? gear:\s*(.+?)\s*[→➜]\s*(.+)", content, re.IGNORECASE) + if m: + calls.append({"tool": "replace_gear", "args": {"before": m.group(1).strip(), "after": m.group(2).strip()}}) + continue + + m = re.match(r"Note:\s*(.+)", content, re.IGNORECASE) + if m: + calls.append({"tool": "add_note", "args": {"note": m.group(1).strip()}}) + continue + + m = re.match(r"Journal add:\s*(.+)", content, re.IGNORECASE) + if m: + calls.append({"tool": "journal_update", "args": {"add": [i.strip() for i in m.group(1).split(",") if i.strip()]}}) + continue + + m = re.match(r"Journal done:\s*(.+)", content, re.IGNORECASE) + if m: + calls.append({"tool": "journal_update", "args": {"done": [i.strip() for i in m.group(1).split(",") if i.strip()]}}) + continue + + m = re.match(r"Looted from .+:\s*(.+)", content, re.IGNORECASE) + if m: + items_text = m.group(1).strip() + calls.append({"tool": "add_note", "args": {"note": f"Looted: {items_text}"}}) + continue + + return calls + + +# ── Extraction Functions ──────────────────────────────────────────────────── + +def extract_thoughts(text: str) -> list[str]: + pattern = r"```thought\s*\n?(.*?)```" + return re.findall(pattern, text, re.DOTALL) + + +def extract_tool_calls(text: str, *, round_num: int = 0, on_debug: callable = None) -> list[dict]: + """Extract tool calls from ```tool and ```json blocks.""" + calls = [] + seen = set() + + def _try_parse(raw: str) -> dict | None: + try: + obj = json.loads(raw) + if isinstance(obj, dict) and "tool" in obj: + return obj + except json.JSONDecodeError: + pass + return None + + for m in re.finditer(r"```(?:tool|json|finalize_turn)\s*\n?", text): + fence_type = m.group(0).strip("``` \n\r") + obj = None + try: + decoder = json.JSONDecoder() + obj, end = decoder.raw_decode(text, m.end()) + except (json.JSONDecodeError, ValueError, StopIteration): + pass + + if obj is None: + close = text.find("```", m.end()) + if close > 0: + raw = text[m.end():close].strip() + + def _escape_in_strings(s: str) -> str: + return re.sub(r'"(?:[^"\\]|\\.)*"', lambda x: x.group(0).replace("\n", "\\n"), s, flags=re.DOTALL) + repaired = _escape_in_strings(raw) + obj = _try_parse(repaired) + + if obj is not None and isinstance(obj, dict): + if fence_type == "finalize_turn": + obj = {"tool": "finalize_turn", "args": obj} + if "tool" not in obj: + obj = None + + if obj is not None: + key = (obj["tool"], json.dumps(obj.get("args", {}), sort_keys=True)) + if key not in seen: + seen.add(key) + calls.append(obj) + elif on_debug: + preview = text[m.end():m.end() + 120].replace("\n", "\\n") + on_debug("parse_error", {"round": round_num, "content": preview}) + + return calls + + +def extract_final_json(text: str) -> dict | None: + pattern = r"```json\s*\n?(.*?)```" + matches = re.findall(pattern, text, re.DOTALL) + if not matches: + return None + try: + return json.loads(matches[-1].strip()) + except json.JSONDecodeError: + return None + + +def strip_tool_blocks(text: str) -> str: + """Remove ```tool, ```json, finalize_turn blocks from narrative text.""" + return re.sub( + r'```(?:tool|json|finalize_turn)\s*\n?.*?```', + '', + text, + flags=re.DOTALL, + ).strip()