1074 lines
35 KiB
Python
Executable File
1074 lines
35 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
run.py — The Chaos TTRPG Session Client (Game Mode)
|
|
|
|
Owns the TUI and game loop. Layout:
|
|
PLAY (narrative + choices + input) | CHAR | LOG | BOOK tabs
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import random
|
|
import sys
|
|
import threading
|
|
from datetime import date
|
|
from pathlib import Path
|
|
|
|
from textual import on
|
|
from textual.app import App, ComposeResult
|
|
from textual.containers import Horizontal, Vertical, VerticalScroll
|
|
from textual.screen import Screen
|
|
from textual.widgets import Button, Input, Static, TabbedContent, TabPane
|
|
from rich.markdown import Markdown as RichMarkdown
|
|
from rich.theme import Theme
|
|
|
|
# ── Game engine ─────────────────────────────────────────
|
|
from engine import GameEngine, GenerationResult, TurnResult
|
|
|
|
# ── Optional miniaudio ────────────────────────────────────
|
|
try:
|
|
import miniaudio
|
|
HAS_AUDIO = True
|
|
except ImportError:
|
|
HAS_AUDIO = False
|
|
print("Note: miniaudio not installed — no ambience music.", file=sys.stderr)
|
|
|
|
|
|
# ── Paths ────────────────────────────────────────────────
|
|
BASE = Path(__file__).resolve().parent.parent
|
|
SESSION = BASE / 'session'
|
|
LOG_DIR = SESSION / 'log'
|
|
CHAR_PATH = SESSION / 'character.md'
|
|
WORLD_PATH = SESSION / 'world.md'
|
|
JOURNAL_PATH = SESSION / 'journal.md'
|
|
AMBIENCE_PATH = SESSION / 'ambience.md'
|
|
AMBIENCE_OPTIONS_PATH = SESSION / 'ambience_options.md'
|
|
BOOK_PATH = SESSION / 'book.md'
|
|
LAST_PROMPT_PATH = SESSION / 'last_prompt.md'
|
|
AUDIO_DIR = SESSION / 'audio'
|
|
TODAY = date.today().isoformat()
|
|
LOG_PATH = LOG_DIR / f'{TODAY}.md'
|
|
|
|
REFRESH_SECS = 2
|
|
|
|
MARKDOWN_THEME = Theme({
|
|
"markdown.h1": "bold #ff6b6b on #2a0000",
|
|
"markdown.h2": "bold #ffd93d",
|
|
"markdown.h3": "bold italic #6bcbff",
|
|
"markdown.h4": "bold #95e1d3",
|
|
"markdown.code": "bold #00d2d3 on #002222",
|
|
"markdown.code_block": "on #1e1e2e",
|
|
"markdown.block_quote": "dim italic #8395a7",
|
|
"markdown.link": "underline #48dbfb",
|
|
"markdown.item": "#ff9f43",
|
|
"markdown.em": "italic #ff9ff3",
|
|
"markdown.strong": "bold #feca57",
|
|
"markdown.horizontal_rule": "dim #555555",
|
|
})
|
|
|
|
|
|
# ── Helpers (file reading, status, book, ambience) ───────
|
|
def ensure_log():
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
if not LOG_PATH.exists():
|
|
LOG_PATH.write_text(f"# Session Log — {TODAY}\n\n")
|
|
_populate_if_empty()
|
|
|
|
def _populate_if_empty():
|
|
content = LOG_PATH.read_text().strip()
|
|
if content and len(content.splitlines()) > 2:
|
|
return
|
|
prev = _previous_log()
|
|
if prev:
|
|
lines = prev.read_text().splitlines()
|
|
lines[0] = f"# Session Log — {TODAY}"
|
|
LOG_PATH.write_text('\n'.join(lines) + '\n')
|
|
|
|
def _previous_log():
|
|
entries = sorted(LOG_DIR.glob('*.md'))
|
|
today_name = LOG_PATH.name
|
|
for e in reversed(entries):
|
|
if e.name != today_name:
|
|
return e
|
|
return None
|
|
|
|
def read_todo():
|
|
if not JOURNAL_PATH.exists():
|
|
return ["—— No journal yet ——"]
|
|
lines = JOURNAL_PATH.read_text().splitlines()
|
|
in_todo = False
|
|
todo = []
|
|
for l in lines:
|
|
if l.strip().lstrip('#').strip().startswith('TODO'):
|
|
in_todo = True
|
|
continue
|
|
if l.strip().startswith('#') and in_todo:
|
|
break
|
|
if in_todo and l.strip():
|
|
todo.append(l.strip().lstrip('- '))
|
|
return todo or ["—— All done! ——"]
|
|
|
|
def read_log_tail(n=200):
|
|
if not LOG_PATH.exists():
|
|
return []
|
|
lines = LOG_PATH.read_text().splitlines()
|
|
return [l for l in lines if l.strip() and not l.startswith('#')][-n:]
|
|
|
|
def status_summary():
|
|
if not CHAR_PATH.exists():
|
|
return "no character"
|
|
lines = CHAR_PATH.read_text().splitlines()
|
|
name = "?"
|
|
health = "?"
|
|
for l in lines:
|
|
if l.startswith('**Name:**'):
|
|
name = l.split(':', 1)[1].strip().strip('_').strip('*')
|
|
if l.startswith('**Current Health:**'):
|
|
h = l.split(':', 1)[1].strip().strip('_').strip('*')
|
|
if h:
|
|
health = h
|
|
if l.startswith('**Max Health:**'):
|
|
m = l.split(':', 1)[1].strip().strip('_').strip('*')
|
|
if m and health == '?':
|
|
health = m
|
|
return f"{name} ❤ {health}"
|
|
|
|
def log_count():
|
|
return len(read_log_tail())
|
|
|
|
def load_book_pages():
|
|
if not BOOK_PATH.exists() or not BOOK_PATH.read_text().strip():
|
|
return ["*The story has not begun.*"]
|
|
text = BOOK_PATH.read_text().strip()
|
|
turns = text.split('\n## ')
|
|
pages = []
|
|
for i, t in enumerate(turns):
|
|
pages.append(t if i == 0 else '## ' + t)
|
|
return pages or ["*The story has not begun.*"]
|
|
|
|
def parse_ambience_options():
|
|
if not AMBIENCE_OPTIONS_PATH.exists():
|
|
return {}
|
|
options = {}
|
|
lines = AMBIENCE_OPTIONS_PATH.read_text().splitlines()
|
|
in_table = False
|
|
for line in lines:
|
|
s = line.strip()
|
|
if not s.startswith('|') or not s.endswith('|'):
|
|
in_table = False
|
|
continue
|
|
parts = [p.strip() for p in s.split('|')]
|
|
parts = [p for p in parts if p]
|
|
if len(parts) < 2:
|
|
continue
|
|
if not in_table:
|
|
in_table = True
|
|
continue
|
|
if all(c in '-:| ' for c in s):
|
|
continue
|
|
name = parts[0].lower()
|
|
files = [f.strip() for f in parts[1].split(',') if f.strip()]
|
|
paths = [AUDIO_DIR / f for f in files]
|
|
options[name] = paths
|
|
return options
|
|
|
|
|
|
# ── Ambience subsystem ───────────────────────────────────
|
|
class AmbiencePlayer:
|
|
def __init__(self):
|
|
self.current_ambience = 'silence'
|
|
self._last_mtime = 0
|
|
self._options = {}
|
|
self._device = None
|
|
self._stream = None
|
|
self._muted = False
|
|
self.load_options()
|
|
|
|
@property
|
|
def available(self):
|
|
return HAS_AUDIO
|
|
|
|
@property
|
|
def ambience_name(self):
|
|
return self.current_ambience
|
|
|
|
@property
|
|
def is_muted(self):
|
|
return self._muted
|
|
|
|
def toggle_mute(self):
|
|
self._muted = not self._muted
|
|
if self._muted:
|
|
self._stop()
|
|
else:
|
|
self._load_current()
|
|
|
|
def load_options(self):
|
|
self._options = parse_ambience_options()
|
|
|
|
def _stop(self):
|
|
if self._device:
|
|
try:
|
|
self._device.close()
|
|
except Exception:
|
|
pass
|
|
self._device = None
|
|
self._stream = None
|
|
|
|
def poll(self):
|
|
if not HAS_AUDIO:
|
|
return
|
|
try:
|
|
mtime = os.path.getmtime(AMBIENCE_PATH)
|
|
except OSError:
|
|
return
|
|
if mtime == self._last_mtime:
|
|
return
|
|
self._last_mtime = mtime
|
|
try:
|
|
name = AMBIENCE_PATH.read_text().strip().lower()
|
|
except OSError:
|
|
return
|
|
# Save the name even when muted — will play on unmute
|
|
self.current_ambience = name
|
|
self._stop()
|
|
if not self._muted and name != 'silence' and name in self._options:
|
|
self._play_current()
|
|
|
|
def _switch_to(self, name):
|
|
if name == self.current_ambience:
|
|
return
|
|
self.current_ambience = name
|
|
self._stop()
|
|
if self._muted or name == 'silence' or name not in self._options:
|
|
return
|
|
self._play_current()
|
|
|
|
def _play_current(self):
|
|
tracks = self._options.get(self.current_ambience, [])
|
|
valid = [t for t in tracks if t.exists()]
|
|
if not valid:
|
|
return
|
|
track = random.choice(valid)
|
|
try:
|
|
self._stream = miniaudio.stream_file(str(track))
|
|
self._device = miniaudio.PlaybackDevice()
|
|
self._device.start(self._stream)
|
|
except Exception:
|
|
self.current_ambience = None
|
|
|
|
def _load_current(self):
|
|
"""Called on unmute — replay current ambience if not silence."""
|
|
if self.current_ambience and self.current_ambience != 'silence':
|
|
self._play_current()
|
|
|
|
|
|
# module-level ref
|
|
app_ambience_player = None
|
|
|
|
|
|
# ── Roll Modal ───────────────────────────────────────────
|
|
class RollModal(Screen):
|
|
"""Overlay asking the player to roll physical dice and enter the result."""
|
|
|
|
CSS = """
|
|
RollModal {
|
|
align: center middle;
|
|
background: rgba(0, 0, 0, 0.75);
|
|
}
|
|
#roll-dialog {
|
|
width: 44;
|
|
height: auto;
|
|
padding: 2 3;
|
|
background: #2a2a3a;
|
|
border: thick #e0ad4c;
|
|
}
|
|
#roll-title {
|
|
text-style: bold;
|
|
color: #ffd93d;
|
|
text-align: center;
|
|
height: 3;
|
|
}
|
|
#roll-reason {
|
|
color: #c0b090;
|
|
text-align: center;
|
|
height: 3;
|
|
}
|
|
#roll-input {
|
|
margin: 1 0;
|
|
}
|
|
#roll-submit {
|
|
width: 100%;
|
|
}
|
|
#roll-hint {
|
|
color: #888888;
|
|
text-align: center;
|
|
height: 1;
|
|
}
|
|
"""
|
|
|
|
def __init__(self, dice: str, reason: str) -> None:
|
|
super().__init__()
|
|
self.dice = dice
|
|
self.reason = reason
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Vertical(id="roll-dialog"):
|
|
yield Static(f"[bold]🎲 ROLL {self.dice}[/bold]", id="roll-title")
|
|
yield Static(f"Reason: {self.reason}", id="roll-reason")
|
|
yield Input(
|
|
placeholder="Enter the number you rolled...",
|
|
id="roll-input",
|
|
)
|
|
yield Button("Submit", id="roll-submit", variant="primary")
|
|
yield Static("(or press Enter)", id="roll-hint")
|
|
|
|
def on_input_submitted(self, event: Input.Submitted) -> None:
|
|
self._submit(event.value)
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "roll-submit":
|
|
inp = self.query_one("#roll-input", Input)
|
|
self._submit(inp.value)
|
|
|
|
def _submit(self, value: str) -> None:
|
|
val = value.strip()
|
|
if val:
|
|
self.dismiss(val)
|
|
|
|
|
|
# ── Auto-refreshing panels ───────────────────────────────
|
|
class AutoStatic(Static):
|
|
def load(self):
|
|
raise NotImplementedError
|
|
|
|
def on_mount(self):
|
|
self.load()
|
|
self.set_interval(REFRESH_SECS, self.load)
|
|
|
|
|
|
class TodoPane(AutoStatic):
|
|
def load(self):
|
|
items = read_todo()
|
|
self.update("\n".join(f" ☐ {i}" for i in items))
|
|
|
|
|
|
class TranscriptPane(AutoStatic):
|
|
def load(self):
|
|
lines = read_log_tail()
|
|
display = "\n".join(lines[-80:])
|
|
if lines:
|
|
display += "\n\n>>--- NOW --->"
|
|
self.update(display)
|
|
self.call_after_refresh(self._scroll_bottom)
|
|
|
|
def _scroll_bottom(self):
|
|
if self.parent and hasattr(self.parent, 'scroll_end'):
|
|
self.parent.scroll_end(animate=False)
|
|
|
|
|
|
class DebugPane(Static):
|
|
"""Scrolling log of LLM thoughts, tool calls, and results for this turn."""
|
|
|
|
MAX_LINES = 200
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._lines: list[str] = []
|
|
|
|
def append(self, text: str) -> None:
|
|
self._lines.append(text)
|
|
if len(self._lines) > self.MAX_LINES:
|
|
self._lines.pop(0)
|
|
self.update("\n".join(self._lines[-100:]))
|
|
self.call_after_refresh(self._scroll_bottom)
|
|
|
|
def _scroll_bottom(self):
|
|
if self.parent and hasattr(self.parent, 'scroll_end'):
|
|
self.parent.scroll_end(animate=False)
|
|
|
|
def clear(self) -> None:
|
|
self._lines.clear()
|
|
self.update("")
|
|
|
|
|
|
class CharPane(AutoStatic):
|
|
def load(self):
|
|
if not CHAR_PATH.exists():
|
|
self.update("*No character sheet*")
|
|
return
|
|
self.update(RichMarkdown(CHAR_PATH.read_text().strip()))
|
|
|
|
|
|
class StatusBar(AutoStatic):
|
|
def load(self):
|
|
char = status_summary()
|
|
count = log_count()
|
|
todo = len(read_todo())
|
|
music = ""
|
|
if not HAS_AUDIO:
|
|
music = " │ ♫ (install miniaudio)"
|
|
elif app_ambience_player:
|
|
name = app_ambience_player.ambience_name
|
|
music = f" │ ♫ {name}"
|
|
self.update(f"{char} │ {count} entries │ {todo} todo │ {TODAY}{music}")
|
|
|
|
|
|
# ── The App ──────────────────────────────────────────────
|
|
class ChaosTUI(App):
|
|
TITLE = "The Chaos"
|
|
CSS = """
|
|
Screen {
|
|
background: #121212;
|
|
}
|
|
#banner {
|
|
dock: top;
|
|
height: 1;
|
|
background: #2a2a2a;
|
|
color: #e0ad4c;
|
|
text-align: center;
|
|
}
|
|
#main {
|
|
height: 100%;
|
|
background: #111111;
|
|
}
|
|
#todo-header {
|
|
background: #3a2a1a;
|
|
color: #e0b060;
|
|
text-style: bold;
|
|
padding: 0 1;
|
|
height: 1;
|
|
}
|
|
#todo-content {
|
|
background: #1a1510;
|
|
color: #d0b080;
|
|
padding: 0 1;
|
|
height: 5;
|
|
max-height: 5;
|
|
overflow-y: auto;
|
|
scrollbar-size-vertical: 2;
|
|
}
|
|
#main-tabs {
|
|
height: 1fr;
|
|
}
|
|
TabbedContent {
|
|
background: #1a1a2a;
|
|
}
|
|
VerticalScroll {
|
|
overflow-y: auto;
|
|
scrollbar-size-vertical: 2;
|
|
scrollbar-color: #555555;
|
|
scrollbar-color-hover: #777777;
|
|
scrollbar-color-active: #999999;
|
|
}
|
|
#char-content {
|
|
background: #1e1e2a;
|
|
color: #c0c0c0;
|
|
padding: 0 1;
|
|
}
|
|
#transcript {
|
|
background: #1a2a1a;
|
|
color: #c8c8c8;
|
|
padding: 0 1;
|
|
}
|
|
#debug-content {
|
|
background: #1a1a1a;
|
|
color: #88b0a0;
|
|
padding: 0 1;
|
|
}
|
|
#debug-content .dm-thought {
|
|
color: #c0a060;
|
|
}
|
|
#debug-content .dm-tool {
|
|
color: #60a0c0;
|
|
}
|
|
#debug-content .dm-result {
|
|
color: #80a080;
|
|
}
|
|
|
|
/* Play tab */
|
|
#play-narrative {
|
|
background: #161616;
|
|
color: #d8d8d8;
|
|
padding: 1 2;
|
|
height: auto;
|
|
}
|
|
#play-status {
|
|
background: #1a1a2a;
|
|
color: #e0b060;
|
|
padding: 0 2;
|
|
height: 1;
|
|
text-style: bold italic;
|
|
text-align: center;
|
|
}
|
|
#play-status.processing {
|
|
background: #2a1a0a;
|
|
color: #ffd93d;
|
|
}
|
|
#play-input {
|
|
height: 3;
|
|
background: #222222;
|
|
color: #e0d0c0;
|
|
border: solid #555555;
|
|
padding: 0 1;
|
|
}
|
|
#play-input:focus {
|
|
border: solid #e0ad4c;
|
|
}
|
|
#play-input:disabled {
|
|
background: #1a1a1a;
|
|
color: #666666;
|
|
border: solid #333333;
|
|
}
|
|
|
|
/* Book tab */
|
|
#book-header {
|
|
background: #2d2d2d;
|
|
color: #e0c080;
|
|
text-style: bold;
|
|
padding: 0 1;
|
|
height: 1;
|
|
}
|
|
#book-nav {
|
|
height: 3;
|
|
background: #222222;
|
|
align: center middle;
|
|
}
|
|
#book-nav Button {
|
|
width: 10;
|
|
margin: 0 1;
|
|
}
|
|
#book-nav Button:disabled {
|
|
color: #444444;
|
|
}
|
|
#book-nav Button:hover {
|
|
text-style: bold;
|
|
}
|
|
#book-nav-center {
|
|
height: 3;
|
|
width: 1fr;
|
|
}
|
|
#book-page-label {
|
|
height: 1;
|
|
color: #c0b090;
|
|
text-style: bold;
|
|
padding: 0 2;
|
|
text-align: center;
|
|
}
|
|
#book-progress {
|
|
height: 1;
|
|
background: #1a1a1a;
|
|
color: #e0b060;
|
|
padding: 0 2;
|
|
text-align: center;
|
|
}
|
|
#book-scroll {
|
|
height: 1fr;
|
|
}
|
|
#book-content {
|
|
background: #161616;
|
|
color: #d8d8d8;
|
|
padding: 0 2;
|
|
}
|
|
|
|
#status-bar {
|
|
background: #222222;
|
|
color: #888888;
|
|
padding: 0 1;
|
|
height: 1;
|
|
text-style: italic;
|
|
}
|
|
#mute-btn {
|
|
dock: bottom;
|
|
width: 6;
|
|
height: 1;
|
|
background: #2a2a2a;
|
|
color: #888888;
|
|
border: none;
|
|
padding: 0 1;
|
|
min-width: 6;
|
|
margin: 0;
|
|
}
|
|
#mute-btn:hover {
|
|
background: #3a3a3a;
|
|
color: #cccccc;
|
|
}
|
|
#mute-btn.muted {
|
|
color: #ff6b6b;
|
|
text-style: bold;
|
|
}
|
|
"""
|
|
|
|
BINDINGS = [
|
|
("ctrl+c", "quit", "Quit"),
|
|
("escape", "quit", "Quit"),
|
|
]
|
|
|
|
def __init__(self, *args, no_music=False, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
global app_ambience_player
|
|
if not no_music:
|
|
app_ambience_player = AmbiencePlayer()
|
|
else:
|
|
app_ambience_player = None
|
|
|
|
# Game engine
|
|
self.engine = GameEngine()
|
|
|
|
# Game loop state
|
|
self._last_prompt: str = ""
|
|
self._last_result: TurnResult | None = None
|
|
self._is_processing: bool = False
|
|
|
|
# Thinking animation
|
|
self._spinner_frames = ["◴", "◷", "◶", "◵"]
|
|
self._thinking_frame = 0
|
|
self._thinking_timer_handle = None
|
|
self._dm_action = "DM is weaving the narrative"
|
|
|
|
# Player roll state (thread-safe)
|
|
self._roll_event = threading.Event()
|
|
self._roll_result: str | None = None
|
|
|
|
# Book viewer state
|
|
self._book_page = 0
|
|
self._book_pages = []
|
|
self._prev_page_count = 0
|
|
|
|
# Debug log
|
|
self._debug_lines: list[str] = []
|
|
|
|
# ── Compose ──────────────────────────────────────────
|
|
def compose(self):
|
|
yield Static(f"⚔ The Chaos ╎ {TODAY}", id="banner")
|
|
with Vertical(id="main"):
|
|
yield Static("TODO", id="todo-header")
|
|
yield TodoPane(id="todo-content")
|
|
with TabbedContent(initial="play-tab", id="main-tabs"):
|
|
with TabPane("PLAY", id="play-tab"):
|
|
with VerticalScroll(id="play-scroll"):
|
|
yield Static("*Awaiting the fates...*", id="play-narrative")
|
|
yield Static("", id="play-status")
|
|
yield Input(
|
|
placeholder="Type your action and press Enter...",
|
|
id="play-input",
|
|
)
|
|
with TabPane("CHARACTER", id="char-tab"):
|
|
with VerticalScroll():
|
|
yield CharPane(id="char-content")
|
|
with TabPane("LOG", id="log-tab"):
|
|
with VerticalScroll():
|
|
yield TranscriptPane(id="transcript")
|
|
with TabPane("BOOK", id="book-tab"):
|
|
yield Static("BOOK", id="book-header")
|
|
with Vertical(id="book-section"):
|
|
with Horizontal(id="book-nav"):
|
|
yield Button("<< Prev", id="book-prev")
|
|
with Vertical(id="book-nav-center"):
|
|
yield Static("", id="book-page-label")
|
|
yield Static("", id="book-progress")
|
|
yield Button("Next >>", id="book-next")
|
|
with VerticalScroll(id="book-scroll"):
|
|
yield Static("", id="book-content")
|
|
with TabPane("DEBUG", id="debug-tab"):
|
|
with VerticalScroll():
|
|
yield DebugPane("", id="debug-content")
|
|
yield StatusBar(id="status-bar")
|
|
yield Button("♫", id="mute-btn", classes="mute-button")
|
|
|
|
def on_mount(self):
|
|
ensure_log()
|
|
self.console._theme = MARKDOWN_THEME
|
|
self._init_book()
|
|
self.set_interval(REFRESH_SECS, self._check_ambience)
|
|
self.set_interval(REFRESH_SECS, self._reload_book)
|
|
self.call_after_refresh(self._update_mute_button)
|
|
# Start the game
|
|
self.call_after_refresh(self._begin_game)
|
|
|
|
def _begin_game(self):
|
|
"""Resume from last saved prompt or generate an opening scene."""
|
|
if LAST_PROMPT_PATH.exists():
|
|
saved = LAST_PROMPT_PATH.read_text().strip()
|
|
if saved:
|
|
self._last_prompt = saved
|
|
pages = load_book_pages()
|
|
parts = []
|
|
if pages:
|
|
parts.append(pages[-1])
|
|
parts.append(f"---\n\n{saved}")
|
|
self._set_narrative("\n\n".join(parts))
|
|
self._enable_input()
|
|
return
|
|
self._call_llm()
|
|
|
|
# ── Ambience ─────────────────────────────────────────
|
|
def _check_ambience(self):
|
|
if app_ambience_player:
|
|
app_ambience_player.poll()
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "mute-btn":
|
|
if app_ambience_player:
|
|
app_ambience_player.toggle_mute()
|
|
self._update_mute_button()
|
|
|
|
def _update_mute_button(self) -> None:
|
|
btn = self.query_one("#mute-btn", Button)
|
|
if app_ambience_player and app_ambience_player.is_muted:
|
|
btn.label = "♪ muted"
|
|
btn.classes = "muted"
|
|
btn.tooltip = "Unmute music"
|
|
else:
|
|
btn.label = "♫"
|
|
btn.classes = ""
|
|
btn.tooltip = "Mute music"
|
|
|
|
# ── Game Loop ─────────────────────────────────────────
|
|
def _call_llm(self, player_action: str | None = None):
|
|
"""Called when the player has acted — sends their action to the LLM."""
|
|
if self._is_processing:
|
|
return
|
|
self._is_processing = True
|
|
|
|
input_widget = self.query_one("#play-input", Input)
|
|
input_widget.disabled = True
|
|
input_widget.placeholder = "DM is at work..."
|
|
|
|
self._show_thinking()
|
|
|
|
# Clear debug for new turn
|
|
pane = self.query_one("#debug-content", DebugPane)
|
|
pane.clear()
|
|
if player_action:
|
|
self._append_debug(f"▶ player action: {player_action}")
|
|
else:
|
|
self._append_debug("▶ starting new turn")
|
|
|
|
# Run generation in a daemon thread so it doesn't block the UI
|
|
t = threading.Thread(
|
|
target=self._run_generation,
|
|
args=(player_action,),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
def _run_generation(self, player_action: str | None) -> None:
|
|
"""Worker thread: calls engine.generate_with_tools() and posts result back."""
|
|
last_prompt = self._last_prompt if self._last_prompt else None
|
|
|
|
def on_thought(thought: str) -> None:
|
|
self.call_from_thread(self._on_thought, thought)
|
|
|
|
def on_action(action: str) -> None:
|
|
self.call_from_thread(self._on_action, action)
|
|
|
|
def on_debug(event_type: str, data: dict) -> None:
|
|
self.call_from_thread(self._on_debug, event_type, data)
|
|
|
|
result = self.engine.generate_with_tools(
|
|
player_action=player_action,
|
|
last_prompt=last_prompt,
|
|
on_thought=on_thought,
|
|
on_action=on_action,
|
|
on_player_roll=self._on_player_roll,
|
|
on_debug=on_debug,
|
|
)
|
|
|
|
self.call_from_thread(self._on_generation_done, result, player_action)
|
|
|
|
def _append_debug(self, text: str) -> None:
|
|
"""Append a line to the debug pane."""
|
|
from datetime import datetime
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
pane = self.query_one("#debug-content", DebugPane)
|
|
pane.append(f"[{ts}] {text}")
|
|
|
|
def _show_thinking(self) -> None:
|
|
"""Show the thinking indicator and start the animation timer."""
|
|
self._dm_action = "DM is weaving the narrative"
|
|
self._thinking_frame = 0
|
|
status = self.query_one("#play-status", Static)
|
|
status.add_class("processing")
|
|
spinner = self._spinner_frames[0]
|
|
status.update(f"✦ {spinner} {self._dm_action} ✦")
|
|
self._thinking_timer_handle = self.set_interval(
|
|
0.5, self._tick_thinking
|
|
)
|
|
|
|
def _hide_thinking(self) -> None:
|
|
"""Stop the animation and hide the thinking indicator."""
|
|
if self._thinking_timer_handle:
|
|
self._thinking_timer_handle.stop()
|
|
self._thinking_timer_handle = None
|
|
status = self.query_one("#play-status", Static)
|
|
status.remove_class("processing")
|
|
status.update("")
|
|
|
|
def _on_thought(self, thought: str) -> None:
|
|
"""Display a thought from the DM in the status bar."""
|
|
display = thought[:60] + "…" if len(thought) > 60 else thought
|
|
self._dm_action = display
|
|
self._thinking_frame = 0
|
|
status = self.query_one("#play-status", Static)
|
|
status.add_class("processing")
|
|
spinner = self._spinner_frames[0]
|
|
status.update(f"✦ {spinner} {display} ✦")
|
|
self._append_debug(f"✦ {display}")
|
|
|
|
def _on_action(self, action: str) -> None:
|
|
"""Display a DM action (tool call) in the status bar."""
|
|
self._dm_action = action
|
|
self._thinking_frame = 0
|
|
status = self.query_one("#play-status", Static)
|
|
status.add_class("processing")
|
|
spinner = self._spinner_frames[0]
|
|
status.update(f"✦ {spinner} {action} ✦")
|
|
self._append_debug(action)
|
|
|
|
def _on_debug(self, event_type: str, data: dict) -> None:
|
|
"""Structured debug entry: visible description + technical detail."""
|
|
r = data.get("round", "")
|
|
if event_type == "llm_response":
|
|
text = data.get("text", "")
|
|
if text.strip():
|
|
preview = text[:200].replace("\n", "\\n").strip() + ("…" if len(text) > 200 else "")
|
|
self._append_debug(f" LLM response: {preview}")
|
|
else:
|
|
self._append_debug(f" LLM response: (empty)")
|
|
elif event_type == "thought":
|
|
thought = data.get("text", "")
|
|
display = thought[:60] + "…" if len(thought) > 60 else thought
|
|
self._append_debug(f" 💭 {display}")
|
|
elif event_type == "tool_call":
|
|
tool = data.get("tool", "?")
|
|
args = data.get("args", {})
|
|
desc = args.get("dm_status", tool)
|
|
self._append_debug(f" 🔧 {desc}")
|
|
self._append_debug(f" {tool}({json.dumps(args)})")
|
|
elif event_type == "tool_result":
|
|
tool = data.get("tool", "?")
|
|
result = data.get("result", "")
|
|
preview = result[:80].replace("\n", " ").strip() + ("…" if len(result) > 80 else "")
|
|
self._append_debug(f" → {preview}")
|
|
elif event_type == "validation_error":
|
|
err_type = data.get("type", "")
|
|
if err_type == "finalize_turn":
|
|
self._append_debug(f" ✖ finalize_turn missing: {', '.join(data.get('errors', []))}")
|
|
elif err_type == "mixed_get_finalize":
|
|
tools = data.get("tools", [])
|
|
self._append_debug(f" ✖ mixed get tools {tools} with finalize_turn — rejected")
|
|
else:
|
|
tool = data.get("tool", "?")
|
|
self._append_debug(f" ✖ {tool} missing dm_status")
|
|
elif event_type == "finalize":
|
|
self._append_debug(" ✔ finalize_turn")
|
|
elif event_type == "no_tool_calls":
|
|
self._append_debug(f" ⚠ no tool calls — reminded to use tools")
|
|
elif event_type == "parse_error":
|
|
self._append_debug(f" ⚠ failed to parse tool block: {data.get('content', '')}")
|
|
elif event_type == "empty_response":
|
|
self._append_debug(" ⚠ empty response — waiting 2s, retrying without reminder")
|
|
elif event_type == "llm_error":
|
|
self._append_debug(f" ✖ LLM error: {data.get('error', '')}")
|
|
|
|
def _on_player_roll(self, dice: str, reason: str) -> str:
|
|
"""Called from worker thread. Shows roll popup, blocks until player responds."""
|
|
self.call_from_thread(self._append_debug, f"🎲 asks player to roll {dice} ({reason})")
|
|
self.call_from_thread(self._show_roll_modal, dice, reason)
|
|
self._roll_event.wait()
|
|
self._roll_event.clear()
|
|
result = self._roll_result
|
|
self._roll_result = None
|
|
return result or "0"
|
|
|
|
def _show_roll_modal(self, dice: str, reason: str) -> None:
|
|
"""Push the RollModal screen (runs on main thread)."""
|
|
self._roll_event.clear()
|
|
self._roll_result = None
|
|
|
|
def on_dismiss(value: str) -> None:
|
|
self._roll_result = value
|
|
self._roll_event.set()
|
|
|
|
self.push_screen(RollModal(dice, reason), on_dismiss)
|
|
|
|
def _tick_thinking(self) -> None:
|
|
"""Animate the spinner on the current DM action."""
|
|
if not self._is_processing:
|
|
return
|
|
self._thinking_frame = (self._thinking_frame + 1) % len(self._spinner_frames)
|
|
spinner = self._spinner_frames[self._thinking_frame]
|
|
status = self.query_one("#play-status", Static)
|
|
status.update(f"✦ {spinner} {self._dm_action} ✦")
|
|
|
|
def _on_generation_done(
|
|
self, result: TurnResult, player_action: str | None
|
|
) -> None:
|
|
"""Handle the completed turn on the main thread."""
|
|
self._is_processing = False
|
|
self._hide_thinking()
|
|
|
|
if result.error:
|
|
self._show_error(result.error, result.debug_info)
|
|
self._append_debug(f"✖ error: {result.error}")
|
|
return
|
|
|
|
# Log only after successful finalize — failed turns produce no side effects
|
|
from datetime import datetime
|
|
ts = datetime.now().strftime("%H:%M")
|
|
if player_action:
|
|
time_of_day = self._guess_time_of_day()
|
|
self.engine.append_log(f"- **{time_of_day}** — {player_action}")
|
|
if result.log_entry:
|
|
self.engine.append_log(f"- **{ts}** — {result.log_entry}")
|
|
elif result.book_log:
|
|
first_line = result.book_log.strip().split("\n")[0][:80]
|
|
self.engine.append_log(f"- **Turn** — {first_line}")
|
|
|
|
# Archive the turn's book log
|
|
if result.book_log:
|
|
self.engine.archive_turn(result.book_log)
|
|
|
|
# Apply state changes
|
|
self.engine.apply_state(result)
|
|
|
|
# Display the next user prompt
|
|
self._display_scene(result)
|
|
|
|
# Persist the prompt so the game resumes here on restart
|
|
if result.user_prompt:
|
|
LAST_PROMPT_PATH.write_text(result.user_prompt.strip())
|
|
|
|
# Store for next turn
|
|
self._last_prompt = result.user_prompt
|
|
self._last_result = result
|
|
self._append_debug("✔ turn complete")
|
|
|
|
def _display_scene(self, result: TurnResult) -> None:
|
|
"""Update the UI with the last story entry followed by the DM prompt."""
|
|
parts = []
|
|
if result.book_log:
|
|
parts.append(result.book_log)
|
|
if result.user_prompt:
|
|
parts.append(f"---\n\n{result.user_prompt}")
|
|
self._set_narrative("\n\n".join(parts) if parts else "")
|
|
self._enable_input()
|
|
|
|
def _enable_input(self) -> None:
|
|
input_widget = self.query_one("#play-input", Input)
|
|
input_widget.disabled = False
|
|
input_widget.placeholder = "Type your action and press Enter..."
|
|
input_widget.value = ""
|
|
input_widget.focus()
|
|
|
|
def _set_narrative(self, text: str) -> None:
|
|
widget = self.query_one("#play-narrative", Static)
|
|
widget.update(RichMarkdown(text))
|
|
# Scroll to top
|
|
scroll = self.query_one("#play-scroll", VerticalScroll)
|
|
scroll.scroll_home(animate=False)
|
|
|
|
def _show_error(self, error: str, debug_info: str = "") -> None:
|
|
text = f"**Error:** {error}\n\n"
|
|
if debug_info:
|
|
text += f"**Debug Info:**\n\n{debug_info}\n\n"
|
|
text += "Check your session/config.json and ensure your LLM provider is running."
|
|
self._set_narrative(text)
|
|
self._enable_input()
|
|
|
|
# ── Input handling ────────────────────────────────────
|
|
def on_input_submitted(self, event: Input.Submitted) -> None:
|
|
"""Player pressed Enter in the input widget."""
|
|
action = event.value.strip()
|
|
if not action or self._is_processing:
|
|
event.stop()
|
|
return
|
|
event.stop()
|
|
self._handle_player_action(action)
|
|
|
|
def _handle_player_action(self, action: str) -> None:
|
|
"""Handle a player action typed in the input."""
|
|
self._call_llm(player_action=action)
|
|
|
|
def _guess_time_of_day(self) -> str:
|
|
"""Simple time-of-day label based on hour."""
|
|
from datetime import datetime
|
|
h = datetime.now().hour
|
|
if h < 6:
|
|
return "Night"
|
|
elif h < 12:
|
|
return "Morning"
|
|
elif h < 14:
|
|
return "Midday"
|
|
elif h < 18:
|
|
return "Afternoon"
|
|
elif h < 21:
|
|
return "Evening"
|
|
else:
|
|
return "Night"
|
|
|
|
# ── Book viewer ───────────────────────────────────────
|
|
def _init_book(self):
|
|
self._reload_book()
|
|
self._render_book_page()
|
|
|
|
def _reload_book(self):
|
|
self._book_pages = load_book_pages()
|
|
if len(self._book_pages) > self._prev_page_count:
|
|
self._book_page = len(self._book_pages) - 1
|
|
self._prev_page_count = len(self._book_pages)
|
|
self._book_page = max(0, min(self._book_page, len(self._book_pages) - 1))
|
|
self._render_book_page()
|
|
|
|
def _render_book_page(self):
|
|
if not self._book_pages:
|
|
return
|
|
self.query_one("#book-content").update(
|
|
RichMarkdown(self._book_pages[self._book_page])
|
|
)
|
|
total = len(self._book_pages)
|
|
self.query_one("#book-page-label").update(
|
|
f"Page {self._book_page + 1} of {total}"
|
|
)
|
|
pct = (self._book_page + 1) / total if total else 1
|
|
fill = round(pct * 20)
|
|
bar = "█" * fill + "░" * (20 - fill)
|
|
self.query_one("#book-progress").update(f"[{bar}]")
|
|
self.query_one("#book-prev").disabled = (self._book_page == 0)
|
|
self.query_one("#book-next").disabled = (self._book_page == total - 1)
|
|
|
|
def action_prev_page(self):
|
|
if self._book_page > 0:
|
|
self._book_page -= 1
|
|
self._render_book_page()
|
|
self.query_one("#book-scroll").scroll_home(animate=False)
|
|
|
|
def action_next_page(self):
|
|
if self._book_page < len(self._book_pages) - 1:
|
|
self._book_page += 1
|
|
self._render_book_page()
|
|
self.query_one("#book-scroll").scroll_home(animate=False)
|
|
|
|
@on(Button.Pressed, "#book-prev")
|
|
def on_book_prev(self):
|
|
self.action_prev_page()
|
|
|
|
@on(Button.Pressed, "#book-next")
|
|
def on_book_next(self):
|
|
self.action_next_page()
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="The Chaos TUI")
|
|
parser.add_argument('--no-music', action='store_true', help='Disable ambience music')
|
|
args = parser.parse_args()
|
|
app = ChaosTUI(no_music=args.no_music)
|
|
app.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|