splinter-keep/tools/run.py
2026-06-28 18:53:33 +02:00

1173 lines
39 KiB
Python
Executable File

#!/usr/bin/env python3
"""
run.py — The Chaos TTRPG Session Client (Game Mode)
Owns the TUI and game loop. Layout:
PLAY (narrative + choices + input) | CHAR | LOG | BOOK tabs
"""
from __future__ import annotations
import json
import os
import random
import sys
import threading
from datetime import date
from pathlib import Path
from textual import on
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.screen import Screen
from textual.widgets import Button, Input, Static, TabbedContent, TabPane
from rich.markdown import Markdown as RichMarkdown
from rich.theme import Theme
# ── Game engine ─────────────────────────────────────────
from engine import GameEngine, GenerationResult, TurnResult
# ── Optional miniaudio ────────────────────────────────────
try:
import miniaudio
HAS_AUDIO = True
except ImportError:
HAS_AUDIO = False
print("Note: miniaudio not installed — no ambience music.", file=sys.stderr)
# ── Paths ────────────────────────────────────────────────
BASE = Path(__file__).resolve().parent.parent
SESSION = BASE / 'session'
LOG_DIR = SESSION / 'log'
CHAR_PATH = SESSION / 'character.md'
WORLD_PATH = SESSION / 'world.md'
JOURNAL_PATH = SESSION / 'journal.md'
AMBIENCE_PATH = SESSION / 'ambience.md'
AMBIENCE_OPTIONS_PATH = SESSION / 'ambience_options.md'
BOOK_PATH = SESSION / 'book.md'
LAST_PROMPT_PATH = SESSION / 'last_prompt.md'
CHANGES_PATH = SESSION / 'changes.md'
SETTINGS_PATH = SESSION / 'settings.json'
AUDIO_DIR = SESSION / 'audio'
TODAY = date.today().isoformat()
LOG_PATH = LOG_DIR / f'{TODAY}.md'
REFRESH_SECS = 2
MARKDOWN_THEME = Theme({
"markdown.h1": "bold #ff6b6b on #2a0000",
"markdown.h2": "bold #ffd93d",
"markdown.h3": "bold italic #6bcbff",
"markdown.h4": "bold #95e1d3",
"markdown.code": "bold #00d2d3 on #002222",
"markdown.code_block": "on #1e1e2e",
"markdown.block_quote": "dim italic #8395a7",
"markdown.link": "underline #48dbfb",
"markdown.item": "#ff9f43",
"markdown.em": "italic #ff9ff3",
"markdown.strong": "bold #feca57",
"markdown.horizontal_rule": "dim #555555",
})
# ── Helpers (file reading, status, book, ambience) ───────
def ensure_log():
LOG_DIR.mkdir(parents=True, exist_ok=True)
if not LOG_PATH.exists():
LOG_PATH.write_text(f"# Session Log — {TODAY}\n\n")
_populate_if_empty()
def _populate_if_empty():
content = LOG_PATH.read_text().strip()
if content and len(content.splitlines()) > 2:
return
prev = _previous_log()
if prev:
lines = prev.read_text().splitlines()
lines[0] = f"# Session Log — {TODAY}"
LOG_PATH.write_text('\n'.join(lines) + '\n')
def _previous_log():
entries = sorted(LOG_DIR.glob('*.md'))
today_name = LOG_PATH.name
for e in reversed(entries):
if e.name != today_name:
return e
return None
def read_todo():
if not JOURNAL_PATH.exists():
return ["—— No journal yet ——"]
lines = JOURNAL_PATH.read_text().splitlines()
in_todo = False
todo = []
for l in lines:
if l.strip().lstrip('#').strip().startswith('TODO'):
in_todo = True
continue
if l.strip().startswith('#') and in_todo:
break
if in_todo and l.strip():
todo.append(l.strip().lstrip('- '))
return todo or ["—— All done! ——"]
def read_log_tail(n=200):
if not LOG_PATH.exists():
return []
lines = LOG_PATH.read_text().splitlines()
return [l for l in lines if l.strip() and not l.startswith('#')][-n:]
def status_summary():
if not CHAR_PATH.exists():
return "no character"
lines = CHAR_PATH.read_text().splitlines()
name = "?"
health = "?"
for l in lines:
if l.startswith('**Name:**'):
name = l.split(':', 1)[1].strip().strip('_').strip('*')
if l.startswith('**Current Health:**'):
h = l.split(':', 1)[1].strip().strip('_').strip('*')
if h:
health = h
if l.startswith('**Max Health:**'):
m = l.split(':', 1)[1].strip().strip('_').strip('*')
if m and health == '?':
health = m
return f"{name}{health}"
def log_count():
return len(read_log_tail())
def load_book_pages():
if not BOOK_PATH.exists() or not BOOK_PATH.read_text().strip():
return ["*The story has not begun.*"]
text = BOOK_PATH.read_text().strip()
turns = text.split('\n## ')
pages = []
for i, t in enumerate(turns):
pages.append(t if i == 0 else '## ' + t)
return pages or ["*The story has not begun.*"]
def parse_ambience_options():
if not AMBIENCE_OPTIONS_PATH.exists():
return {}
options = {}
lines = AMBIENCE_OPTIONS_PATH.read_text().splitlines()
in_table = False
for line in lines:
s = line.strip()
if not s.startswith('|') or not s.endswith('|'):
in_table = False
continue
parts = [p.strip() for p in s.split('|')]
parts = [p for p in parts if p]
if len(parts) < 2:
continue
if not in_table:
in_table = True
continue
if all(c in '-:| ' for c in s):
continue
name = parts[0].lower()
files = [f.strip() for f in parts[1].split(',') if f.strip()]
paths = [AUDIO_DIR / f for f in files]
options[name] = paths
return options
# ── Ambience subsystem ───────────────────────────────────
class AmbiencePlayer:
def __init__(self):
self.current_ambience = 'silence'
self._last_mtime = 0
self._options = {}
self._device = None
self._stream = None
self._muted = False
self.load_options()
@property
def available(self):
return HAS_AUDIO
@property
def ambience_name(self):
return self.current_ambience
@property
def is_muted(self):
return self._muted
def toggle_mute(self):
self._muted = not self._muted
if self._muted:
self._stop()
else:
self._load_current()
def load_options(self):
self._options = parse_ambience_options()
def _stop(self):
if self._device:
try:
self._device.close()
except Exception:
pass
self._device = None
self._stream = None
def poll(self):
if not HAS_AUDIO:
return
try:
mtime = os.path.getmtime(AMBIENCE_PATH)
except OSError:
return
if mtime == self._last_mtime:
return
self._last_mtime = mtime
try:
name = AMBIENCE_PATH.read_text().strip().lower()
except OSError:
return
# Save the name even when muted — will play on unmute
self.current_ambience = name
self._stop()
if not self._muted and name != 'silence' and name in self._options:
self._play_current()
def _switch_to(self, name):
if name == self.current_ambience:
return
self.current_ambience = name
self._stop()
if self._muted or name == 'silence' or name not in self._options:
return
self._play_current()
def _play_current(self):
tracks = self._options.get(self.current_ambience, [])
valid = [t for t in tracks if t.exists()]
if not valid:
return
track = random.choice(valid)
try:
self._stream = miniaudio.stream_file(str(track))
self._device = miniaudio.PlaybackDevice()
self._device.start(self._stream)
except Exception:
self.current_ambience = None
def _load_current(self):
"""Called on unmute — replay current ambience if not silence."""
if self.current_ambience and self.current_ambience != 'silence':
self._play_current()
# module-level ref
app_ambience_player = None
# ── Roll Modal ───────────────────────────────────────────
class RollModal(Screen):
"""Overlay asking the player to roll physical dice and enter the result."""
CSS = """
RollModal {
align: center middle;
background: rgba(0, 0, 0, 0.75);
}
#roll-dialog {
width: 44;
height: auto;
padding: 2 3;
background: #2a2a3a;
border: thick #e0ad4c;
}
#roll-title {
text-style: bold;
color: #ffd93d;
text-align: center;
height: 3;
}
#roll-reason {
color: #c0b090;
text-align: center;
height: 3;
}
#roll-input {
margin: 1 0;
}
#roll-submit {
width: 100%;
}
#roll-hint {
color: #888888;
text-align: center;
height: 1;
}
"""
def __init__(self, dice: str, reason: str) -> None:
super().__init__()
self.dice = dice
self.reason = reason
def compose(self) -> ComposeResult:
with Vertical(id="roll-dialog"):
yield Static(f"[bold]🎲 ROLL {self.dice}[/bold]", id="roll-title")
yield Static(f"Reason: {self.reason}", id="roll-reason")
yield Input(
placeholder="Enter the number you rolled...",
id="roll-input",
)
yield Button("Submit", id="roll-submit", variant="primary")
yield Static("(or press Enter)", id="roll-hint")
def on_input_submitted(self, event: Input.Submitted) -> None:
self._submit(event.value)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "roll-submit":
inp = self.query_one("#roll-input", Input)
self._submit(inp.value)
def _submit(self, value: str) -> None:
val = value.strip()
if val:
self.dismiss(val)
# ── Auto-refreshing panels ───────────────────────────────
class AutoStatic(Static):
def load(self):
raise NotImplementedError
def on_mount(self):
self.load()
self.set_interval(REFRESH_SECS, self.load)
class TodoPane(AutoStatic):
def load(self):
items = read_todo()
self.update("\n".join(f"{i}" for i in items))
class TranscriptPane(AutoStatic):
def load(self):
lines = read_log_tail()
display = "\n".join(lines[-80:])
if lines:
display += "\n\n>>--- NOW --->"
self.update(display)
self.call_after_refresh(self._scroll_bottom)
def _scroll_bottom(self):
if self.parent and hasattr(self.parent, 'scroll_end'):
self.parent.scroll_end(animate=False)
class DebugPane(Static):
"""Scrolling log of LLM thoughts, tool calls, and results for this turn."""
MAX_LINES = 200
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._lines: list[str] = []
def append(self, text: str) -> None:
self._lines.append(text)
if len(self._lines) > self.MAX_LINES:
self._lines.pop(0)
self.update("\n".join(self._lines[-100:]))
self.call_after_refresh(self._scroll_bottom)
def _scroll_bottom(self):
if self.parent and hasattr(self.parent, 'scroll_end'):
self.parent.scroll_end(animate=False)
def clear(self) -> None:
self._lines.clear()
self.update("")
class CharPane(AutoStatic):
def load(self):
if not CHAR_PATH.exists():
self.update("*No character sheet*")
return
self.update(RichMarkdown(CHAR_PATH.read_text().strip()))
class StatusBar(AutoStatic):
def load(self):
char = status_summary()
count = log_count()
todo = len(read_todo())
music = ""
if not HAS_AUDIO:
music = " │ ♫ (install miniaudio)"
elif app_ambience_player:
name = app_ambience_player.ambience_name
music = f" │ ♫ {name}"
self.update(f"{char}{count} entries │ {todo} todo │ {TODAY}{music}")
# ── The App ──────────────────────────────────────────────
class ChaosTUI(App):
TITLE = "The Chaos"
CSS = """
Screen {
background: #121212;
}
#banner {
dock: top;
height: 1;
background: #2a2a2a;
color: #e0ad4c;
text-align: center;
}
#main {
height: 100%;
background: #111111;
}
#todo-header {
background: #3a2a1a;
color: #e0b060;
text-style: bold;
padding: 0 1;
height: 1;
}
#todo-content {
background: #1a1510;
color: #d0b080;
padding: 0 1;
height: 5;
max-height: 5;
overflow-y: auto;
scrollbar-size-vertical: 2;
}
#main-tabs {
height: 1fr;
}
TabbedContent {
background: #1a1a2a;
}
VerticalScroll {
overflow-y: auto;
scrollbar-size-vertical: 2;
scrollbar-color: #555555;
scrollbar-color-hover: #777777;
scrollbar-color-active: #999999;
}
#char-content {
background: #1e1e2a;
color: #c0c0c0;
padding: 0 1;
}
#transcript {
background: #1a2a1a;
color: #c8c8c8;
padding: 0 1;
}
#debug-content {
background: #1a1a1a;
color: #88b0a0;
padding: 0 1;
}
#debug-content .dm-thought {
color: #c0a060;
}
#debug-content .dm-tool {
color: #60a0c0;
}
#debug-content .dm-result {
color: #80a080;
}
/* Play tab */
#play-narrative {
background: #161616;
color: #d8d8d8;
padding: 1 2;
height: auto;
}
#play-status {
background: #1a1a2a;
color: #e0b060;
padding: 0 2;
height: 1;
text-style: bold italic;
text-align: center;
}
#play-status.processing {
background: #2a1a0a;
color: #ffd93d;
}
#play-input {
height: 3;
background: #222222;
color: #e0d0c0;
border: solid #555555;
padding: 0 1;
}
#play-input:focus {
border: solid #e0ad4c;
}
#play-input:disabled {
background: #1a1a1a;
color: #666666;
border: solid #333333;
}
/* Book tab */
#book-header {
background: #2d2d2d;
color: #e0c080;
text-style: bold;
padding: 0 1;
height: 1;
}
#book-nav {
height: 3;
background: #222222;
align: center middle;
}
#book-nav Button {
width: 10;
margin: 0 1;
}
#book-nav Button:disabled {
color: #444444;
}
#book-nav Button:hover {
text-style: bold;
}
#book-nav-center {
height: 3;
width: 1fr;
}
#book-page-label {
height: 1;
color: #c0b090;
text-style: bold;
padding: 0 2;
text-align: center;
}
#book-progress {
height: 1;
background: #1a1a1a;
color: #e0b060;
padding: 0 2;
text-align: center;
}
#book-scroll {
height: 1fr;
}
#book-content {
background: #161616;
color: #d8d8d8;
padding: 0 2;
}
#status-bar {
background: #222222;
color: #888888;
padding: 0 1;
height: 1;
text-style: italic;
}
#mute-btn {
dock: bottom;
width: 6;
height: 1;
background: #2a2a2a;
color: #888888;
border: none;
padding: 0 1;
min-width: 6;
margin: 0;
}
#mute-btn:hover {
background: #3a3a3a;
color: #cccccc;
}
#mute-btn.muted {
color: #ff6b6b;
text-style: bold;
}
"""
BINDINGS = [
("ctrl+c", "quit", "Quit"),
("escape", "quit", "Quit"),
]
def __init__(self, *args, no_music=False, **kwargs):
super().__init__(*args, **kwargs)
global app_ambience_player
if not no_music:
app_ambience_player = AmbiencePlayer()
else:
app_ambience_player = None
# Game engine
self.engine = GameEngine()
# Game loop state
self._last_prompt: str = ""
self._last_result: TurnResult | None = None
self._is_processing: bool = False
# Thinking animation
self._spinner_frames = ["", "", "", ""]
self._thinking_frame = 0
self._thinking_timer_handle = None
self._dm_action = "DM is weaving the narrative"
# Player roll state (thread-safe)
self._roll_event = threading.Event()
self._roll_result: str | None = None
# Book viewer state
self._book_page = 0
self._book_pages = []
self._prev_page_count = 0
# Debug log
self._debug_lines: list[str] = []
# Settings guard — prevent save during init before restore
self._settings_loaded = False
# ── Settings persistence ──────────────────────────────
def _load_settings(self) -> dict:
defaults = {"active_tab": "play-tab", "music_muted": False, "book_page": 0}
if SETTINGS_PATH.exists():
try:
data = json.loads(SETTINGS_PATH.read_text())
if isinstance(data, dict):
defaults.update(data)
except (json.JSONDecodeError, OSError):
pass
return defaults
def _save_settings(self) -> None:
if not self._settings_loaded:
return
tabs = self.query_one("#main-tabs", TabbedContent)
active = tabs.active if tabs else "play-tab"
data = {
"active_tab": active,
"music_muted": app_ambience_player.is_muted if app_ambience_player else False,
"book_page": self._book_page,
}
try:
SETTINGS_PATH.write_text(json.dumps(data, indent=2) + "\n")
except OSError:
pass
# ── Compose ──────────────────────────────────────────
def compose(self):
yield Static(f"⚔ The Chaos ╎ {TODAY}", id="banner")
with Vertical(id="main"):
yield Static("TODO", id="todo-header")
yield TodoPane(id="todo-content")
with TabbedContent(initial="play-tab", id="main-tabs"):
with TabPane("PLAY", id="play-tab"):
with VerticalScroll(id="play-scroll"):
yield Static("*Awaiting the fates...*", id="play-narrative")
yield Static("", id="play-status")
yield Input(
placeholder="Type your action and press Enter...",
id="play-input",
)
with TabPane("CHARACTER", id="char-tab"):
with VerticalScroll():
yield CharPane(id="char-content")
with TabPane("LOG", id="log-tab"):
with VerticalScroll():
yield TranscriptPane(id="transcript")
with TabPane("BOOK", id="book-tab"):
yield Static("BOOK", id="book-header")
with Vertical(id="book-section"):
with Horizontal(id="book-nav"):
yield Button("<< Prev", id="book-prev")
with Vertical(id="book-nav-center"):
yield Static("", id="book-page-label")
yield Static("", id="book-progress")
yield Button("Next >>", id="book-next")
with VerticalScroll(id="book-scroll"):
yield Static("", id="book-content")
with TabPane("DEBUG", id="debug-tab"):
with VerticalScroll():
yield DebugPane("", id="debug-content")
yield StatusBar(id="status-bar")
yield Button("", id="mute-btn", classes="mute-button")
def on_mount(self):
ensure_log()
self.console._theme = MARKDOWN_THEME
self._init_book()
self.set_interval(REFRESH_SECS, self._check_ambience)
self.set_interval(REFRESH_SECS, self._reload_book)
self.call_after_refresh(self._update_mute_button)
self.call_after_refresh(self._apply_settings)
# Start the game
self.call_after_refresh(self._begin_game)
def _apply_settings(self) -> None:
"""Load and apply persisted UI settings."""
settings = self._load_settings()
self._book_page = settings.get("book_page", 0)
self._prev_page_count = len(self._book_pages)
self._reload_book()
if settings.get("music_muted") and app_ambience_player and not app_ambience_player.is_muted:
app_ambience_player.toggle_mute()
self._update_mute_button()
try:
tabs = self.query_one("#main-tabs", TabbedContent)
tabs.active = settings.get("active_tab", "play-tab")
except Exception:
pass
self._settings_loaded = True
def _begin_game(self):
"""Resume from last saved prompt or generate an opening scene."""
if LAST_PROMPT_PATH.exists():
saved = LAST_PROMPT_PATH.read_text().strip()
if saved:
self._last_prompt = saved
pages = load_book_pages()
parts = []
if pages:
parts.append(pages[-1])
if CHANGES_PATH.exists():
changes = [l for l in CHANGES_PATH.read_text().splitlines() if l.strip()]
if changes:
changes_text = "\n".join(f"> {c}" for c in changes)
parts.append(f"> **Last turn changes:**\n{changes_text}")
parts.append(f"---\n\n{saved}")
self._set_narrative("\n\n".join(parts))
self._enable_input()
return
self._call_llm()
# ── Ambience ─────────────────────────────────────────
def _check_ambience(self):
if app_ambience_player:
app_ambience_player.poll()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "mute-btn":
if app_ambience_player:
app_ambience_player.toggle_mute()
self._update_mute_button()
self._save_settings()
def _update_mute_button(self) -> None:
btn = self.query_one("#mute-btn", Button)
if app_ambience_player and app_ambience_player.is_muted:
btn.label = "♪ muted"
btn.classes = "muted"
btn.tooltip = "Unmute music"
else:
btn.label = ""
btn.classes = ""
btn.tooltip = "Mute music"
# ── Game Loop ─────────────────────────────────────────
def _call_llm(self, player_action: str | None = None):
"""Called when the player has acted — sends their action to the LLM."""
if self._is_processing:
return
self._is_processing = True
input_widget = self.query_one("#play-input", Input)
input_widget.disabled = True
input_widget.placeholder = "DM is at work..."
self._show_thinking()
# Clear debug for new turn
pane = self.query_one("#debug-content", DebugPane)
pane.clear()
if player_action:
self._append_debug(f"▶ player action: {player_action}")
else:
self._append_debug("▶ starting new turn")
# Run generation in a daemon thread so it doesn't block the UI
t = threading.Thread(
target=self._run_generation,
args=(player_action,),
daemon=True,
)
t.start()
def _run_generation(self, player_action: str | None) -> None:
"""Worker thread: calls engine.generate_with_tools() and posts result back."""
import traceback
last_prompt = self._last_prompt if self._last_prompt else None
def on_thought(thought: str) -> None:
self.call_from_thread(self._on_thought, thought)
def on_action(action: str) -> None:
self.call_from_thread(self._on_action, action)
def on_debug(event_type: str, data: dict) -> None:
self.call_from_thread(self._on_debug, event_type, data)
try:
result = self.engine.generate_with_tools(
player_action=player_action,
last_prompt=last_prompt,
on_thought=on_thought,
on_action=on_action,
on_player_roll=self._on_player_roll,
on_debug=on_debug,
)
except Exception as e:
tb = traceback.format_exc()
self.call_from_thread(self._on_generation_error, e, tb)
return
self.call_from_thread(self._on_generation_done, result, player_action)
def _append_debug(self, text: str) -> None:
"""Append a line to the debug pane."""
from datetime import datetime
ts = datetime.now().strftime("%H:%M:%S")
pane = self.query_one("#debug-content", DebugPane)
pane.append(f"[{ts}] {text}")
def _show_thinking(self) -> None:
"""Show the thinking indicator and start the animation timer."""
self._dm_action = "DM is weaving the narrative"
self._thinking_frame = 0
status = self.query_one("#play-status", Static)
status.add_class("processing")
spinner = self._spinner_frames[0]
status.update(f"{spinner} {self._dm_action}")
self._thinking_timer_handle = self.set_interval(
0.5, self._tick_thinking
)
def _hide_thinking(self) -> None:
"""Stop the animation and hide the thinking indicator."""
if self._thinking_timer_handle:
self._thinking_timer_handle.stop()
self._thinking_timer_handle = None
status = self.query_one("#play-status", Static)
status.remove_class("processing")
status.update("")
def _on_thought(self, thought: str) -> None:
"""Display a thought from the DM in the status bar."""
display = thought[:60] + "" if len(thought) > 60 else thought
self._dm_action = display
self._thinking_frame = 0
status = self.query_one("#play-status", Static)
status.add_class("processing")
spinner = self._spinner_frames[0]
status.update(f"{spinner} {display}")
self._append_debug(f"{display}")
def _on_action(self, action: str) -> None:
"""Display a DM action (tool call) in the status bar."""
self._dm_action = action
self._thinking_frame = 0
status = self.query_one("#play-status", Static)
status.add_class("processing")
spinner = self._spinner_frames[0]
status.update(f"{spinner} {action}")
self._append_debug(action)
def _on_debug(self, event_type: str, data: dict) -> None:
"""Structured debug entry: visible description + technical detail."""
if event_type == "phase":
p = data.get("phase", 0)
status = data.get("status", "")
if status == "start":
name = data.get("name", "")
dice = data.get("dice")
outer = data.get("outer_attempt")
d = f" dice={dice}" if dice else ""
o = f" [attempt {outer}/3]" if outer else ""
self._append_debug(f"▸ Phase {p}: {name}{o} {d}")
elif status == "done":
if p == 1:
self._append_debug(f" ✔ prose: {data.get('chars', 0)} chars")
elif p == 2:
self._append_debug(f" ✔ summary: {data.get('summary', '')}")
elif p == 3:
n = data.get("applied", 0)
self._append_debug(f" ✔ extract: {n} state changes applied")
elif status == "empty":
self._append_debug(f" ⚠ phase {p} attempt {data.get('attempt', '?')} empty — retry")
elif status == "fallback":
self._append_debug(f" ⚠ phase {p} used fallback: {data.get('summary', '')}")
elif status == "tools_found":
tools = data.get("tools", [])
fin = data.get("has_finalize", False)
t = ", ".join(tools) if tools else "none"
self._append_debug(f" 🔧 tools found: {t}" + (" + finalize_turn" if fin else ""))
elif status == "errors":
errs = data.get("errors", [])
for e in errs:
self._append_debug(f"{e}")
self._append_debug(f" ⟳ retry (attempt {data.get('attempt', '?')})")
elif status == "exhausted":
errs = data.get("errors", [])
self._append_debug(f" ✖ Phase 3 exhausted all retries — state changes may be missing!")
for e in errs:
self._append_debug(f" {e}")
elif status == "retry_after_phase3_failure":
self._append_debug(f" ⟳ Phase 3 failed — retrying from Phase 1 (attempt {data.get('outer_attempt', '?')}/3)")
elif status == "validation_failed":
self._append_debug(f" ✖ narrative rejected: {data.get('reason', '?')} (attempt {data.get('outer_attempt', '?')}/3)")
elif event_type == "phase_done":
self._append_debug(f" ✔ turn complete — book_log: {data.get('book_log_chars', 0)} chars")
if data.get("log_entry"):
self._append_debug(f" log: {data['log_entry']}")
if data.get("ambience"):
self._append_debug(f" ambience: {data['ambience']}")
if data.get("extract_errors"):
self._append_debug(f" extract errors: {data['extract_errors']}")
elif event_type == "tool_call":
tool = data.get("tool", "?")
args = data.get("args", {})
self._append_debug(f" 🔧 {tool}({json.dumps(args)})")
elif event_type == "tool_result":
result = data.get("result", "")
preview = result[:80].replace("\n", " ").strip() + ("" if len(result) > 80 else "")
self._append_debug(f"{preview}")
elif event_type == "parse_error":
self._append_debug(f" ⚠ bad tool block: {data.get('content', '')}")
elif event_type == "llm_error":
label = data.get("label", "")
err = data.get("error", "")
self._append_debug(f" ✖ LLM error [{label}]: {err}")
def _on_player_roll(self, dice: str, reason: str) -> str:
"""Called from worker thread. Shows roll popup, blocks until player responds."""
self.call_from_thread(self._append_debug, f"🎲 asks player to roll {dice} ({reason})")
self.call_from_thread(self._show_roll_modal, dice, reason)
self._roll_event.wait()
self._roll_event.clear()
result = self._roll_result
self._roll_result = None
return result or "0"
def _show_roll_modal(self, dice: str, reason: str) -> None:
"""Push the RollModal screen (runs on main thread)."""
self._roll_event.clear()
self._roll_result = None
def on_dismiss(value: str) -> None:
self._roll_result = value
self._roll_event.set()
self.push_screen(RollModal(dice, reason), on_dismiss)
def _tick_thinking(self) -> None:
"""Animate the spinner on the current DM action."""
if not self._is_processing:
return
self._thinking_frame = (self._thinking_frame + 1) % len(self._spinner_frames)
spinner = self._spinner_frames[self._thinking_frame]
status = self.query_one("#play-status", Static)
status.update(f"{spinner} {self._dm_action}")
def _on_generation_done(
self, result: TurnResult, player_action: str | None
) -> None:
"""Handle the completed turn on the main thread."""
self._is_processing = False
self._hide_thinking()
if result.error:
self._show_error(result.error, result.debug_info)
self._append_debug(f"✖ error: {result.error}")
return
# Log only after successful finalize — failed turns produce no side effects
from datetime import datetime
ts = datetime.now().strftime("%H:%M")
if result.log_entry:
self.engine.append_log(f"- **{ts}** — {result.log_entry}")
elif result.book_log:
first_line = result.book_log.strip().split("\n")[0][:80]
self.engine.append_log(f"- **Turn** — {first_line}")
# Archive the turn's book log
if result.book_log:
self.engine.archive_turn(result.book_log)
# Apply state changes
self.engine.apply_state(result)
# Display the next user prompt
self._display_scene(result)
# Persist the prompt so the game resumes here on restart
if result.user_prompt:
LAST_PROMPT_PATH.write_text(result.user_prompt.strip())
# Store for next turn
self._last_prompt = result.user_prompt
self._last_result = result
self._append_debug("✔ turn complete")
def _on_generation_error(
self, error: Exception, traceback_str: str
) -> None:
"""Handle an unhandled exception from the worker thread."""
import traceback
self._is_processing = False
self._hide_thinking()
err_msg = f"{type(error).__name__}: {error}"
self._append_debug(f"✖ UNHANDLED EXCEPTION: {err_msg}")
for line in traceback_str.rstrip().split("\n")[-10:]:
self._append_debug(f" {line}")
self._show_error(err_msg, traceback_str)
def _display_scene(self, result: TurnResult) -> None:
"""Update the UI with the last story entry followed by the DM prompt."""
parts = []
if result.book_log:
parts.append(result.book_log)
if result.changes:
changes_text = "\n".join(f"> {c}" for c in result.changes)
parts.append(f"> **Changes:**\n{changes_text}")
if result.user_prompt:
parts.append(f"---\n\n{result.user_prompt}")
self._set_narrative("\n\n".join(parts) if parts else "")
self._enable_input()
def _enable_input(self) -> None:
input_widget = self.query_one("#play-input", Input)
input_widget.disabled = False
input_widget.placeholder = "Type your action and press Enter..."
input_widget.value = ""
input_widget.focus()
def _set_narrative(self, text: str) -> None:
widget = self.query_one("#play-narrative", Static)
widget.update(RichMarkdown(text))
# Scroll to top
scroll = self.query_one("#play-scroll", VerticalScroll)
scroll.scroll_home(animate=False)
def _show_error(self, error: str, debug_info: str = "") -> None:
text = f"**Error:** {error}\n\n"
if debug_info:
text += f"**Debug Info:**\n\n{debug_info}\n\n"
text += "Check your session/config.json and ensure your LLM provider is running."
self._set_narrative(text)
self._enable_input()
# ── Input handling ────────────────────────────────────
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Player pressed Enter in the input widget."""
action = event.value.strip()
if not action or self._is_processing:
event.stop()
return
event.stop()
self._handle_player_action(action)
def _handle_player_action(self, action: str) -> None:
"""Handle a player action typed in the input."""
self._call_llm(player_action=action)
def _guess_time_of_day(self) -> str:
"""Simple time-of-day label based on hour."""
from datetime import datetime
h = datetime.now().hour
if h < 6:
return "Night"
elif h < 12:
return "Morning"
elif h < 14:
return "Midday"
elif h < 18:
return "Afternoon"
elif h < 21:
return "Evening"
else:
return "Night"
# ── Book viewer ───────────────────────────────────────
def _init_book(self):
self._reload_book()
self._render_book_page()
def _reload_book(self):
self._book_pages = load_book_pages()
if len(self._book_pages) > self._prev_page_count:
self._book_page = len(self._book_pages) - 1
self._prev_page_count = len(self._book_pages)
self._book_page = max(0, min(self._book_page, len(self._book_pages) - 1))
self._render_book_page()
def _render_book_page(self):
if not self._book_pages:
return
self.query_one("#book-content").update(
RichMarkdown(self._book_pages[self._book_page])
)
total = len(self._book_pages)
self.query_one("#book-page-label").update(
f"Page {self._book_page + 1} of {total}"
)
pct = (self._book_page + 1) / total if total else 1
fill = round(pct * 20)
bar = "" * fill + "" * (20 - fill)
self.query_one("#book-progress").update(f"[{bar}]")
self.query_one("#book-prev").disabled = (self._book_page == 0)
self.query_one("#book-next").disabled = (self._book_page == total - 1)
def action_prev_page(self):
if self._book_page > 0:
self._book_page -= 1
self._render_book_page()
self.query_one("#book-scroll").scroll_home(animate=False)
def action_next_page(self):
if self._book_page < len(self._book_pages) - 1:
self._book_page += 1
self._render_book_page()
self.query_one("#book-scroll").scroll_home(animate=False)
@on(Button.Pressed, "#book-prev")
def on_book_prev(self):
self.action_prev_page()
self._save_settings()
@on(Button.Pressed, "#book-next")
def on_book_next(self):
self.action_next_page()
self._save_settings()
def on_tabbed_content_tab_activated(self, event: TabbedContent.TabActivated) -> None:
self._save_settings()
def main():
import argparse
parser = argparse.ArgumentParser(description="The Chaos TUI")
parser.add_argument('--no-music', action='store_true', help='Disable ambience music')
args = parser.parse_args()
app = ChaosTUI(no_music=args.no_music)
app.run()
if __name__ == '__main__':
main()