Code split to allow small LLM to work on it

This commit is contained in:
Dejvino 2026-06-30 18:47:44 +02:00
parent 91b1b35cfa
commit 64b6416929
12 changed files with 1129 additions and 1024 deletions

View File

@ -137,6 +137,14 @@ python3 tools/engine.py --action "I head to the market"
``` ```
## Testing Commands ## Testing Commands
nWhen committing, also use the pre-commit validator to check for oversized Python files.
n## Pre-commit Validation
Before committing, run the pre-commit validator script to ensure no Python file is too large.
```bash
./pre-commit.sh
```
Always run tests before making changes. This prevents runtime errors like missing imports. Always run tests before making changes. This prevents runtime errors like missing imports.

9
pre-commit.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/bash
ERRORS=$(python3 -c "import os; [f for f in os.listdir('./tools') if f.endswith('.py') and os.path.getsize(os.path.join('./tools', f)) > 2048]")
if [ -z "$ERRORS" ]; then
echo "OK"
else
echo "You need to refactor this:"
echo "$ERRORS"
exit 1
fi

File diff suppressed because it is too large Load Diff

63
tools/llm.py Normal file
View File

@ -0,0 +1,63 @@
#!/usr/bin/env python3
"""
llm.py LLM interaction layer for The Chaos engine.
Provides the low-level call_llm function and environment variable setup
for provider-specific auth.
"""
from __future__ import annotations
import os
from state import append_llm_log
def set_llm_env(model: str, api_key: str | None, api_base: str | None) -> None:
"""Set provider-specific env vars for litellm."""
prefix = model.split("/")[0].upper()
key = api_key or "sk-placeholder"
os.environ[f"{prefix}_API_KEY"] = key
if api_base:
os.environ[f"{prefix}_API_BASE"] = api_base
def call_llm(
messages: list[dict],
*,
model: str,
temperature: float,
timeout: int,
max_tokens: int,
label: str = "",
on_debug: callable = None,
) -> str | None:
"""Make a single LLM call. Returns content text or None on error."""
try:
import litellm
except ImportError:
if on_debug:
on_debug("llm_error", {"label": label, "error": "litellm not installed"})
return None
try:
response = litellm.completion(
model=model,
messages=messages,
temperature=temperature,
stream=False,
timeout=timeout,
max_tokens=max_tokens,
)
content = getattr(response.choices[0].message, 'content', None) or ""
reasoning = getattr(response.choices[0].message, 'reasoning_content', None) or ""
if reasoning and reasoning not in content:
append_llm_log(f"\n--- {label} [reasoning] ---\n{reasoning}")
text = content or reasoning
append_llm_log(f"\n--- {label} ---\n{text}")
return text
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
append_llm_log(f"\n--- LLM ERROR ({label}) ---\n{err_msg}")
if on_debug:
on_debug("llm_error", {"label": label, "error": err_msg})
return None

35
tools/models.py Normal file
View File

@ -0,0 +1,35 @@
#!/usr/bin/env python3
"""
models.py Data classes for The Chaos game engine.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class GenerationResult:
"""Legacy result — kept for backward compat with CLI main()."""
narrative: str
choices: list[str] = field(default_factory=list)
log_entry: Optional[str] = None
ambience: Optional[str] = None
character_updates: Optional[str] = None
world_updates: Optional[str] = None
journal_add: list[str] = field(default_factory=list)
journal_done: list[str] = field(default_factory=list)
error: Optional[str] = None
@dataclass
class TurnResult:
"""Output of a complete turn via finalize_turn tool."""
book_log: str = ""
user_prompt: str = ""
ambience: Optional[str] = None
log_entry: Optional[str] = None
error: Optional[str] = None
debug_info: str = ""
changes: list[str] = field(default_factory=list)

27
tools/paths.py Normal file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python3
"""
paths.py Path constants for The Chaos game engine.
Shared by engine.py, run.py, and all sub-modules.
"""
from __future__ import annotations
from datetime import date
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
SESSION_DIR = BASE_DIR / 'session'
CONFIG_PATH = SESSION_DIR / 'config.json'
CHAR_PATH = SESSION_DIR / 'character.md'
WORLD_PATH = SESSION_DIR / 'world.md'
BOOK_PATH = SESSION_DIR / 'book.md'
JOURNAL_PATH = SESSION_DIR / 'journal.md'
AMBIENCE_PATH = SESSION_DIR / 'ambience.md'
LOG_DIR = SESSION_DIR / 'log'
LLM_LOG_PATH = SESSION_DIR / 'llm.log'
AMBIENCE_OPTIONS_PATH = SESSION_DIR / "ambience_options.md"
CHANGES_PATH = SESSION_DIR / "changes.md"
AUDIO_DIR = SESSION_DIR / "audio"
TODAY = date.today().isoformat()

86
tools/prompts.py Normal file
View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
prompts.py System prompt templates for The Chaos game engine.
"""
from __future__ import annotations
from string import Template
SYSTEM_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion.
## Rules
- **Odds**: 1d6, 4+ favourable, 3- trouble.
- **Traits**: 3d6, roll UNDER trait.
- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour.
- **Wounds at 0 HP**: 1d6 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed.
- **Modifiers**: Favourable +1, Risky -1, Desperate -2.
## Tools (action only)
Wrap in ```tool to perform an action:
```
{"tool": "roll", "args": {"dice": "1d6"}}
```
- **roll** dice, modifier
- **player_roll** dice, reason
- **character_update** content: "full sheet" (if HP/cash/gear/stats change)
- **world_update** content: "full world" (if NPCs/locations/threads change)
- **journal_update** add: [...], done: [...]
You have the full state above no need to look anything up. Just write the story and use tools when the player's action changes something.
## State
### Character
$character
### World
$world
### Log
$log
### Story
$story""")
PROSE_PROMPT = Template("""You are the DM for "The Chaos". Narrate in 2nd person ("You"), vivid but concise. Player: Dillion.
## Rules
- **Odds**: 1d6, 4+ favourable, 3- trouble.
- **Traits**: 3d6, roll UNDER trait.
- **Combat**: 1d6, 4+ hits. Damage: 1d6 + mod - armour.
- **Wounds at 0 HP**: 1d6 1-2 die, 3-4 -1 max HP, 5-6 -1 all rolls until healed.
- **Modifiers**: Favourable +1, Risky -1, Desperate -2.
A die is cast at the start of each turn incorporate it into your narrative.
End your response with a `### Changes` block listing what changed:
### Changes
- Current Health: 3
- Cash: 45 silver
- Added to inventory: Silver key
- Removed from inventory: Torches (10)
- Replaced gear: Mace (1d6+1) Mace (1d6+2)
- Note: Found a hidden passage
- Journal done: Defeat the demon
- Journal add: Investigate the mine
Only include lines for things that actually changed. Omit unused lines entirely.
## State
### Character
$character
### World
$world
### Log
$log
### Story
$story""")

View File

@ -24,7 +24,10 @@ from rich.markdown import Markdown as RichMarkdown
from rich.theme import Theme from rich.theme import Theme
# ── Game engine ───────────────────────────────────────── # ── Game engine ─────────────────────────────────────────
from engine import GameEngine, GenerationResult, TurnResult, LLM_LOG_PATH from engine import GameEngine
from models import GenerationResult, TurnResult
from paths import LLM_LOG_PATH
import state
# ── Optional miniaudio ──────────────────────────────────── # ── Optional miniaudio ────────────────────────────────────
try: try:
@ -1033,17 +1036,17 @@ class ChaosTUI(App):
from datetime import datetime from datetime import datetime
ts = datetime.now().strftime("%H:%M") ts = datetime.now().strftime("%H:%M")
if result.log_entry: if result.log_entry:
self.engine.append_log(f"- **{ts}** — {result.log_entry}") state.append_log(f"- **{ts}** — {result.log_entry}")
elif result.book_log: elif result.book_log:
first_line = result.book_log.strip().split("\n")[0][:80] first_line = result.book_log.strip().split("\n")[0][:80]
self.engine.append_log(f"- **Turn** — {first_line}") state.append_log(f"- **Turn** — {first_line}")
# Archive the turn's book log # Archive the turn's book log
if result.book_log: if result.book_log:
self.engine.archive_turn(result.book_log) state.archive_turn(result.book_log)
# Apply state changes # Apply state changes
self.engine.apply_state(result) state.apply_state(result)
# Display the next user prompt # Display the next user prompt
self._display_scene(result) self._display_scene(result)

210
tools/state.py Normal file
View File

@ -0,0 +1,210 @@
#!/usr/bin/env python3
"""
state.py Game state persistence and file I/O for The Chaos engine.
Standalone functions that read/write session files. No dependency on
GameEngine or other modules besides paths.
"""
from __future__ import annotations
import re
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
from paths import (
CHAR_PATH, WORLD_PATH, BOOK_PATH, JOURNAL_PATH, AMBIENCE_PATH,
LOG_DIR, LLM_LOG_PATH, AMBIENCE_OPTIONS_PATH, CHANGES_PATH,
AUDIO_DIR, TODAY,
)
from models import TurnResult
def read_file(path: Path) -> str:
"""Read a text file, return empty string if missing."""
return path.read_text().strip() if path.exists() else ""
def read_recent_log(max_entries: int = 5) -> str:
"""Read the latest log file and return the last N entries."""
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
yesterday = (date.today() - timedelta(days=1)).isoformat()
log_path = LOG_DIR / f"{yesterday}.md"
if not log_path.exists():
return "*No recent events.*"
lines = log_path.read_text().splitlines()
entries = [l for l in lines if l.strip().startswith("- ")]
return "\n".join(entries[-max_entries:]) or "*No recent events.*"
def read_recent_book(max_turns: int = 1) -> str:
"""Return the last N turns from the book as context."""
text = read_file(BOOK_PATH)
if not text:
return "*No prior story.*"
turns = text.split("\n## ")
recent = turns[-max_turns:]
return "\n## ".join(recent) if len(turns) > 1 else recent[0]
def truncate_world(text: str) -> str:
"""Extract key world context: NPCs, factions, active threads, rumours."""
if not text or text == "*No world state.*":
return text
sections = re.split(r"\n(?=## |### )", text)
parts = []
for sec in sections:
header = sec.split("\n")[0].strip() if sec else ""
if "Active Threads" in header:
parts.append(sec)
elif "Notable NPCs" in header or "Factions at Play" in header or "### Rumours" in header:
parts.append(sec)
result = "\n\n".join(parts)
return result or text[:1500] + "\n_(world truncated)_"
def get_valid_ambiences() -> set[str]:
"""Parse ambience_options.md and return set of valid ambience names."""
valid = {"silence"}
if not AMBIENCE_OPTIONS_PATH.exists():
return valid
in_table = False
for line in AMBIENCE_OPTIONS_PATH.read_text().splitlines():
s = line.strip()
if not s.startswith("|") or not s.endswith("|"):
in_table = False
continue
if in_table and all(c in "-:| " for c in s):
continue
parts = [p.strip() for p in s.split("|") if p.strip()]
if not parts:
continue
if not in_table:
in_table = True
continue
name = parts[0].lower()
files_str = parts[1] if len(parts) > 1 else ""
files = [f.strip() for f in files_str.split(",")]
has_files = any((AUDIO_DIR / f).exists() or f for f in files)
if has_files:
valid.add(name)
return valid
def validate_update_size(name: str, new_content: str, path: Path) -> bool:
"""Reject updates more than 30% shorter than existing — likely partial paste."""
if not path.exists():
return True
old = path.read_text().strip()
if not old:
return True
ratio = len(new_content) / len(old)
if ratio < 0.7:
print(
f"WARNING: {name} update rejected ({ratio:.0%} of original size "
f"= {len(new_content)} vs {len(old)} chars) — likely a partial paste.",
file=sys.stderr,
)
return False
return True
def apply_state(result: TurnResult) -> None:
"""Write state changes from a TurnResult to disk."""
if result.ambience:
AMBIENCE_PATH.write_text(result.ambience.strip().lower() + "\n")
if result.changes:
CHANGES_PATH.write_text("\n".join(result.changes) + "\n")
else:
CHANGES_PATH.write_text("")
def archive_turn(narrative: str) -> None:
"""Append the narrative as a new turn in book.md."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
heading = f"\n\n## Turn — {timestamp}\n\n"
BOOK_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(BOOK_PATH, "a") as f:
f.write(heading + narrative.strip() + "\n")
def append_log(entry: str) -> None:
"""Append a log entry to today's log file."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
log_path = LOG_DIR / f"{TODAY}.md"
if not log_path.exists():
log_path.write_text(f"# Session Log — {TODAY}\n\n")
with open(log_path, "a") as f:
f.write(entry.strip() + "\n")
def append_llm_log(text: str) -> None:
"""Append raw LLM activity to llm.log for debugging."""
LLM_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(LLM_LOG_PATH, "a") as f:
f.write(text + "\n")
def update_journal(add: list[str] | None = None, done: list[str] | None = None) -> None:
"""Add or complete TODO items in journal.md."""
if not JOURNAL_PATH.exists():
JOURNAL_PATH.write_text("# Journal\n\n## TODO\n\n## DONE\n\n")
lines = JOURNAL_PATH.read_text().splitlines()
todo_items: list[str] = []
done_items: list[str] = []
before_todo: list[str] = []
between: list[str] = []
after_done: list[str] = []
section = "before_todo"
for line in lines:
stripped = line.strip()
if stripped == "## TODO":
section = "todo"
before_todo.append(line)
elif stripped == "## DONE":
section = "done"
between.append(line)
elif section == "before_todo":
before_todo.append(line)
elif section == "todo":
if stripped.startswith("- "):
todo_items.append(stripped[2:])
else:
between.append(line)
elif section == "done":
if stripped.startswith("- "):
done_items.append(stripped[2:])
else:
after_done.append(line)
if done:
done_set = set(done)
todo_items = [i for i in todo_items if i not in done_set]
new_done = [i for i in done if i not in done_items]
done_items.extend(new_done)
if add:
todo_set = set(todo_items)
new_todo = [i for i in add if i not in todo_set]
todo_items = new_todo + todo_items
out = list(before_todo)
for item in todo_items:
out.append(f"- {item}")
out.extend(between)
for item in done_items:
out.append(f"- {item}")
out.extend(after_done)
cleaned = []
prev_blank = False
for line in out:
is_blank = line.strip() == ""
if is_blank and prev_blank:
continue
cleaned.append(line)
prev_blank = is_blank
JOURNAL_PATH.write_text("\n".join(cleaned) + "\n")

View File

@ -5,47 +5,57 @@ import sys
import os import os
import ast import ast
MODULES = [
'engine.py',
'paths.py',
'models.py',
'prompts.py',
'state.py',
'tools_handler.py',
'llm.py',
]
def check_missing_imports(): def check_missing_imports():
"""Check for missing imports that would cause NameError.""" """Check for missing imports that would cause NameError."""
errors = [] errors = []
tool_dir = os.path.dirname(__file__)
# Check engine.py for mod_file in MODULES:
engine_path = os.path.join(os.path.dirname(__file__), 'engine.py') mod_path = os.path.join(tool_dir, mod_file)
with open(engine_path, 'r') as f: if not os.path.exists(mod_path):
engine_content = f.read() errors.append(f"Module not found: {mod_file}")
continue
with open(mod_path, 'r') as f:
content = f.read()
# Parse the file to find all names used tree = ast.parse(content)
tree = ast.parse(engine_content)
# Collect all names that are used (not defined) names_used = set()
names_used = set() for node in ast.walk(tree):
for node in ast.walk(tree): if isinstance(node, ast.Name):
if isinstance(node, ast.Name): names_used.add(node.id)
names_used.add(node.id)
# Check for common missing imports common_modules = {
common_modules = { 'random',
'random', 're',
're', 'json',
'json', 'traceback',
'traceback', 'datetime',
'datetime', 'time',
'time', 'os',
'os', 'sys',
'sys', 'pathlib',
'pathlib', 'functools',
'functools', 'collections',
'collections', 'typing',
'typing', 'io',
'io', 'string',
'string', }
}
for module in common_modules: for module in common_modules:
if module in names_used and not hasattr(sys.modules.get(module, None), '__file__'): if module in names_used and not hasattr(sys.modules.get(module, None), '__file__'):
# Check if it's used but not imported if f'import {module}' not in content and f'from {module} import' not in content:
if f'import {module}' not in engine_content and f'from {module} import' not in engine_content: errors.append(f"{mod_file}: Missing import: {module}")
errors.append(f"Missing import: {module}")
return errors return errors
@ -57,5 +67,5 @@ if __name__ == '__main__':
print(f" - {error}") print(f" - {error}")
sys.exit(1) sys.exit(1)
else: else:
print("✓ All imports present") print("✓ All imports present across all modules")
sys.exit(0) sys.exit(0)

View File

@ -1,43 +1,60 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Test that the engine module can be imported without errors.""" """Test that all engine modules can be imported without errors."""
import sys import sys
import os import os
import traceback import traceback
def test_module_import(module_name):
"""Try importing a module and return (ok, error_msg)."""
try:
__import__(module_name)
return True, None
except Exception as e:
return False, f"{type(e).__name__}: {e}\n{traceback.format_exc()}"
def test_engine_import(): def test_engine_import():
"""Test that the engine module imports without errors.""" """Test that all modules import without errors."""
errors = [] errors = []
try: sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Add the tools directory to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Import the engine module modules_to_test = [
import engine ('paths', ['BASE_DIR', 'SESSION_DIR', 'CHAR_PATH', 'LLM_LOG_PATH']),
print(f"✓ Engine module imported successfully") ('models', ['GenerationResult', 'TurnResult']),
('prompts', ['SYSTEM_PROMPT', 'PROSE_PROMPT']),
('state', ['read_file', 'apply_state', 'append_log', 'append_llm_log']),
('tools_handler', ['execute_tool', 'extract_tool_calls', 'TOOL_REGISTRY']),
('llm', ['call_llm', 'set_llm_env']),
('engine', ['GameEngine']),
]
# Check for common runtime errors for mod_name, expected_attrs in modules_to_test:
if not hasattr(engine, 'GameEngine'): ok, err = test_module_import(mod_name)
errors.append("GameEngine class not found") if not ok:
else: errors.append(f"Import error ({mod_name}): {err}")
print(f"✓ GameEngine class found") continue
print(f"{mod_name} module imported successfully")
# Check that generate_with_tools_single exists mod = sys.modules[mod_name]
if hasattr(engine.GameEngine, 'generate_with_tools_single'): for attr in expected_attrs:
print(f"✓ generate_with_tools_single method found") if not hasattr(mod, attr):
else: errors.append(f"{mod_name}: {attr} not found")
errors.append("generate_with_tools_single method not found") else:
print(f"{mod_name}.{attr} exists")
except ImportError as e: # Check that GameEngine has generate_with_tools_single
errors.append(f"Import error: {e}") import engine
except AttributeError as e: if hasattr(engine.GameEngine, 'generate_with_tools_single'):
errors.append(f"Attribute error: {e}") print(f"✓ engine.GameEngine.generate_with_tools_single method found")
except Exception as e: else:
errors.append(f"Unexpected error: {e}\n{traceback.format_exc()}") errors.append("engine.GameEngine.generate_with_tools_single method not found")
return errors return errors
if __name__ == '__main__': if __name__ == '__main__':
errors = test_engine_import() errors = test_engine_import()
if errors: if errors:
@ -46,4 +63,5 @@ if __name__ == '__main__':
print(f" - {error}") print(f" - {error}")
sys.exit(1) sys.exit(1)
else: else:
print("\n✓ All modules validated successfully")
sys.exit(0) sys.exit(0)

463
tools/tools_handler.py Normal file
View File

@ -0,0 +1,463 @@
#!/usr/bin/env python3
"""
tools_handler.py Tool call infrastructure for The Chaos engine.
Handles tool call extraction, execution, and description. All functions
are standalone no dependency on the GameEngine class.
"""
from __future__ import annotations
import json
import random
import re
from paths import CHAR_PATH, WORLD_PATH, LOG_DIR, TODAY
from state import read_file, validate_update_size, update_journal, append_llm_log
# ── Tool Registry ───────────────────────────────────────────────────────────
TOOL_REGISTRY: dict[str, dict] = {
"roll": {"description": "Roll dice.", "args": {"dice": "1d6", "modifier": "+1"}},
"player_roll": {"description": "Ask player to roll.", "args": {"dice": "1d6", "reason": "why"}},
"modify_traits": {"description": "Change STR/DEX/WIL.", "args": {"str": "optional", "dex": "optional", "wil": "optional"}},
"modify_vitals": {"description": "Change HP, cash, weapon, armour.", "args": {"current_hp": "optional", "max_hp": "optional", "cash": "optional", "weapon": "optional", "armour": "optional"}},
"add_to_inventory": {"description": "Add item to gear.", "args": {"item": "item name and stats"}},
"remove_from_inventory": {"description": "Remove item from gear.", "args": {"item": "exact item text"}},
"replace_gear": {"description": "Replace gear by exact match.", "args": {"before": "exact text", "after": "new text"}},
"add_note": {"description": "Add note to sheet.", "args": {"note": "note content"}},
"replace_note": {"description": "Replace note by exact match.", "args": {"before": "exact text", "after": "new text"}},
"world_update": {"description": "Replace world state.", "args": {"content": "full world markdown"}},
"journal_update": {"description": "Update TODO/DONE.", "args": {"add": "[...]", "done": "[...]"}},
"finalize_turn": {"description": "End turn.", "args": {"user_prompt": "question for player", "ambience": "soundscape name"}},
}
# ── Character Sheet Patcher ─────────────────────────────────────────────────
def patch_character(pattern: str, repl: str, count: int = 1, flags: int = 0) -> str:
"""Apply a regex replacement to character.md. Returns error msg or empty string."""
text = CHAR_PATH.read_text()
new, n = re.subn(pattern, repl, text, count=count, flags=flags)
if n == 0:
return f"**Error:** pattern not found:\n{pattern}"
CHAR_PATH.write_text(new)
return ""
# ── Individual Tool Implementations ─────────────────────────────────────────
def tool_think(args: dict) -> str:
return ""
def tool_read_file(args: dict) -> str:
filename = (args or {}).get("file", "")
paths = {
"character": CHAR_PATH,
"world": WORLD_PATH,
"log": LOG_DIR / f"{TODAY}.md",
}
path = paths.get(filename)
if not path:
return f"Unknown file: {filename}. Choose from: {', '.join(paths)}"
return read_file(path) or f"*{filename} is empty.*"
def tool_roll(args: dict) -> str:
import random
dice_str = (args or {}).get("dice", "1d6")
modifier_str = (args or {}).get("modifier", "0")
try:
count, sides = dice_str.lower().split("d")
count = int(count) if count else 1
sides = int(sides)
except (ValueError, TypeError):
return f"Invalid dice: {dice_str}. Use format like '2d6'."
mod = 0
if modifier_str:
try:
mod = int(modifier_str)
except ValueError:
pass
rolls = [random.randint(1, sides) for _ in range(count)]
total = sum(rolls) + mod
mod_str = f" {'+' if mod >= 0 else ''}{mod}" if mod != 0 else ""
return f"Roll: {dice_str}{mod_str} → [{', '.join(str(r) for r in rolls)}] = {total}"
def tool_modify_traits(args: dict) -> str:
errors = []
for stat in ("str", "dex", "wil"):
val = args.get(stat)
if val is not None:
err = patch_character(
rf"^(- \*\*{stat.upper()}:\*\*\s*)\d+", rf"\g<1>{val}", count=1, flags=re.MULTILINE
)
if err:
errors.append(err)
return "; ".join(errors) if errors else "Traits updated."
def tool_modify_vitals(args: dict) -> str:
errors = []
for field, label in [("current_hp", "Current Health"), ("max_hp", "Max Health"),
("cash", "Cash"), ("weapon", "Weapon"), ("armour", "Armour")]:
val = args.get(field)
if val is not None:
err = patch_character(
rf"^(- \*\*{label}:\*\*\s*).*", rf"\g<1>{val}", count=1, flags=re.MULTILINE
)
if err:
errors.append(err)
return "; ".join(errors) if errors else "Vitals updated."
def tool_add_to_inventory(args: dict) -> str:
item = (args or {}).get("item", "")
if not item:
return "**Error:** `item` is required."
text = CHAR_PATH.read_text()
if item in text:
return f"Item already in inventory: {item}"
gear_section = re.search(r"^## Gear\n", text, re.MULTILINE)
if gear_section:
insert_at = gear_section.end()
text = text[:insert_at] + f"- {item}\n" + text[insert_at:]
else:
text += f"\n## Gear\n- {item}\n"
CHAR_PATH.write_text(text)
return f"Added to inventory: {item}"
def tool_remove_from_inventory(args: dict) -> str:
item = (args or {}).get("item", "")
if not item:
return "**Error:** `item` is required."
err = patch_character(rf"^- {re.escape(item)}\n?", "", count=1, flags=re.MULTILINE)
if err:
return f"**Error:** item not found: {item}"
return f"Removed from inventory: {item}"
def tool_replace_gear(args: dict) -> str:
before = (args or {}).get("before", "")
after = (args or {}).get("after", "")
if not before or not after:
return "**Error:** `before` and `after` are required."
err = patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE)
if err:
return f"**Error:** gear not found: {before}"
return f"Gear replaced: {before}{after}"
def tool_add_note(args: dict) -> str:
note = (args or {}).get("note", "")
if not note:
return "**Error:** `note` is required."
text = CHAR_PATH.read_text()
notes_section = re.search(r"^## Notes & Scribbles\n", text, re.MULTILINE)
if notes_section:
text = text[:notes_section.end()] + f"- {note}\n" + text[notes_section.end():]
else:
text += f"\n## Notes & Scribbles\n- {note}\n"
CHAR_PATH.write_text(text)
return f"Note added: {note}"
def tool_replace_note(args: dict) -> str:
before = (args or {}).get("before", "")
after = (args or {}).get("after", "")
if not before or not after:
return "**Error:** `before` and `after` are required."
err = patch_character(rf"^- {re.escape(before)}", f"- {after}", count=1, flags=re.MULTILINE)
if err:
return f"**Error:** note not found: {before}"
return f"Note replaced."
def tool_world_update(args: dict) -> str:
content = (args or {}).get("content", "")
if not content:
return "**Error:** `content` is required."
if not validate_update_size("world", content, WORLD_PATH):
return "**Error:** Update rejected — content is too short (likely a partial paste)."
WORLD_PATH.write_text(content.strip() + "\n")
return "World state updated."
def tool_journal_update(args: dict) -> str:
add = (args or {}).get("add", [])
done = (args or {}).get("done", [])
if isinstance(add, str):
add = [add]
if isinstance(done, str):
done = [done]
if not add and not done:
return "**Error:** Provide at least one of `add` or `done`."
update_journal(add=add, done=done)
return "Journal updated."
# ── Tool Dispatcher ─────────────────────────────────────────────────────────
def execute_tool(tool_name: str, args: dict) -> str:
"""Execute a tool by name. Returns result string."""
fn_map = {
"roll": tool_roll,
"modify_traits": tool_modify_traits,
"modify_vitals": tool_modify_vitals,
"add_to_inventory": tool_add_to_inventory,
"remove_from_inventory": tool_remove_from_inventory,
"replace_gear": tool_replace_gear,
"add_note": tool_add_note,
"replace_note": tool_replace_note,
"world_update": tool_world_update,
"journal_update": tool_journal_update,
}
fn = fn_map.get(tool_name)
if not fn:
return f"Unknown tool: {tool_name}"
try:
return fn(args)
except Exception as e:
import traceback
tb = traceback.format_exc()
append_llm_log(f"\n--- TOOL ERROR ({tool_name}) ---\n{tb}")
return f"Tool error ({tool_name}): {e}"
# ── Descriptions ────────────────────────────────────────────────────────────
def describe_tool_action(tool_name: str, args: dict) -> str:
"""Return a user-facing status message for a tool call."""
dm_status = (args or {}).get("dm_status")
if dm_status:
return f"DM is {dm_status}..."
read_descriptions = {
"character": "reading the character sheet",
"world": "consulting the world map",
"book": "reviewing the story so far",
"log": "checking the session log",
"journal": "scanning the journal",
}
if tool_name == "read_file":
file = (args or {}).get("file", "")
desc = read_descriptions.get(file, f"reading {file}")
elif tool_name in ("character_get", "world_get", "journal_get"):
file = tool_name.replace("_get", "")
desc = read_descriptions.get(file, f"reading {file}")
elif tool_name in ("character_update", "world_update"):
desc = "updating the records"
elif tool_name == "journal_update":
desc = "updating the journal"
elif tool_name == "roll":
dice = (args or {}).get("dice", "1d6")
mod = (args or {}).get("modifier")
desc = f"rolling {dice}"
if mod:
desc += f" {mod}"
elif tool_name == "player_roll":
dice = (args or {}).get("dice", "1d6")
desc = f"asking you to roll {dice}"
elif tool_name == "modify_traits":
desc = "updating traits"
elif tool_name == "modify_vitals":
desc = "updating vitals"
elif tool_name == "add_to_inventory":
desc = "adding item to inventory"
elif tool_name == "remove_from_inventory":
desc = "removing item from inventory"
elif tool_name == "replace_gear":
desc = "replacing gear"
elif tool_name == "add_note":
desc = "adding note"
elif tool_name == "replace_note":
desc = "replacing note"
else:
desc = f"using {tool_name}"
return f"DM is {desc}..."
def describe_change(tool_name: str, args: dict) -> str:
"""Build a compact human-readable change description from a tool call."""
if tool_name == "modify_vitals":
parts = []
for k, v in args.items():
label = k.replace("_", " ").title()
parts.append(f"{label}: {v}")
return f"{', '.join(parts)}" if parts else ""
elif tool_name == "modify_traits":
parts = []
for k, v in args.items():
parts.append(f"{k.upper()}: {v}")
return f"{', '.join(parts)}"
elif tool_name == "add_to_inventory":
return f"+ {args.get('item', '?')}"
elif tool_name == "remove_from_inventory":
return f" {args.get('item', '?')}"
elif tool_name == "replace_gear":
return f"{args.get('before', '?')}{args.get('after', '?')}"
elif tool_name == "add_note":
note = args.get("note", "?")
return f"📝 {note[:60]}{'' if len(note) > 60 else ''}"
elif tool_name == "replace_note":
return f"📝 {args.get('before', '?')[:40]}{args.get('after', '?')[:40]}"
elif tool_name == "world_update":
return "🌍 World updated"
elif tool_name == "journal_update":
parts = []
for a in args.get("add", []):
parts.append(f"📋 {a}")
for d in args.get("done", []):
parts.append(f"{d}")
return "; ".join(parts) if parts else ""
return ""
# ── Changes Block Parser ────────────────────────────────────────────────────
def parse_changes_block(changes_block: str) -> list[dict]:
"""Parse a ### Changes block into tool call dicts."""
calls = []
for raw_line in changes_block.split("\n"):
line = raw_line.strip()
if not line.startswith("- "):
continue
content = line[2:].strip()
m = re.match(r"Current Health:\s*(\d+)", content, re.IGNORECASE)
if m:
calls.append({"tool": "modify_vitals", "args": {"current_hp": m.group(1)}})
continue
m = re.match(r"Cash:\s*(\d+)", content, re.IGNORECASE)
if m:
calls.append({"tool": "modify_vitals", "args": {"cash": m.group(1)}})
continue
m = re.match(r"Max Health:\s*(\d+)", content, re.IGNORECASE)
if m:
calls.append({"tool": "modify_vitals", "args": {"max_hp": m.group(1)}})
continue
m = re.match(r"Add(?:ed)? to inventory:\s*(.+)", content, re.IGNORECASE)
if m:
for item in [i.strip() for i in m.group(1).split(",") if i.strip()]:
calls.append({"tool": "add_to_inventory", "args": {"item": item}})
continue
m = re.match(r"Remov(?:e|ed) from inventory:\s*(.+)", content, re.IGNORECASE)
if m:
for item in [i.strip() for i in m.group(1).split(",") if i.strip()]:
calls.append({"tool": "remove_from_inventory", "args": {"item": item}})
continue
m = re.match(r"Replace(?:d)? gear:\s*(.+?)\s*[→➜]\s*(.+)", content, re.IGNORECASE)
if m:
calls.append({"tool": "replace_gear", "args": {"before": m.group(1).strip(), "after": m.group(2).strip()}})
continue
m = re.match(r"Note:\s*(.+)", content, re.IGNORECASE)
if m:
calls.append({"tool": "add_note", "args": {"note": m.group(1).strip()}})
continue
m = re.match(r"Journal add:\s*(.+)", content, re.IGNORECASE)
if m:
calls.append({"tool": "journal_update", "args": {"add": [i.strip() for i in m.group(1).split(",") if i.strip()]}})
continue
m = re.match(r"Journal done:\s*(.+)", content, re.IGNORECASE)
if m:
calls.append({"tool": "journal_update", "args": {"done": [i.strip() for i in m.group(1).split(",") if i.strip()]}})
continue
m = re.match(r"Looted from .+:\s*(.+)", content, re.IGNORECASE)
if m:
items_text = m.group(1).strip()
calls.append({"tool": "add_note", "args": {"note": f"Looted: {items_text}"}})
continue
return calls
# ── Extraction Functions ────────────────────────────────────────────────────
def extract_thoughts(text: str) -> list[str]:
pattern = r"```thought\s*\n?(.*?)```"
return re.findall(pattern, text, re.DOTALL)
def extract_tool_calls(text: str, *, round_num: int = 0, on_debug: callable = None) -> list[dict]:
"""Extract tool calls from ```tool and ```json blocks."""
calls = []
seen = set()
def _try_parse(raw: str) -> dict | None:
try:
obj = json.loads(raw)
if isinstance(obj, dict) and "tool" in obj:
return obj
except json.JSONDecodeError:
pass
return None
for m in re.finditer(r"```(?:tool|json|finalize_turn)\s*\n?", text):
fence_type = m.group(0).strip("``` \n\r")
obj = None
try:
decoder = json.JSONDecoder()
obj, end = decoder.raw_decode(text, m.end())
except (json.JSONDecodeError, ValueError, StopIteration):
pass
if obj is None:
close = text.find("```", m.end())
if close > 0:
raw = text[m.end():close].strip()
def _escape_in_strings(s: str) -> str:
return re.sub(r'"(?:[^"\\]|\\.)*"', lambda x: x.group(0).replace("\n", "\\n"), s, flags=re.DOTALL)
repaired = _escape_in_strings(raw)
obj = _try_parse(repaired)
if obj is not None and isinstance(obj, dict):
if fence_type == "finalize_turn":
obj = {"tool": "finalize_turn", "args": obj}
if "tool" not in obj:
obj = None
if obj is not None:
key = (obj["tool"], json.dumps(obj.get("args", {}), sort_keys=True))
if key not in seen:
seen.add(key)
calls.append(obj)
elif on_debug:
preview = text[m.end():m.end() + 120].replace("\n", "\\n")
on_debug("parse_error", {"round": round_num, "content": preview})
return calls
def extract_final_json(text: str) -> dict | None:
pattern = r"```json\s*\n?(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
if not matches:
return None
try:
return json.loads(matches[-1].strip())
except json.JSONDecodeError:
return None
def strip_tool_blocks(text: str) -> str:
"""Remove ```tool, ```json, finalize_turn blocks from narrative text."""
return re.sub(
r'```(?:tool|json|finalize_turn)\s*\n?.*?```',
'',
text,
flags=re.DOTALL,
).strip()