splinter-keep/tools/run.py
Dejvino 326c8b7ba8 fix journal_update bugs, persist last prompt across restarts
- Coerce string add/done to list in journal_update tool
- Rewrite _update_journal with section-based parsing (no broken index tracking)
- Add duplicate prevention, blank line collapsing
- Save last DM prompt to session/last_prompt.md so game resumes
  from last scene on restart instead of regenerating
2026-06-25 19:53:57 +02:00

892 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
"""
run.py — The Chaos TTRPG Session Client (Game Mode)
Owns the TUI and game loop. Layout:
PLAY (narrative + choices + input) | CHAR | LOG | BOOK tabs
"""
from __future__ import annotations
import os
import random
import sys
import threading
from datetime import date
from pathlib import Path
from textual import on
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.screen import Screen
from textual.widgets import Button, Input, Static, TabbedContent, TabPane
from rich.markdown import Markdown as RichMarkdown
from rich.theme import Theme
# ── Game engine ─────────────────────────────────────────
from engine import GameEngine, GenerationResult, TurnResult
# ── Optional miniaudio ────────────────────────────────────
try:
import miniaudio
HAS_AUDIO = True
except ImportError:
HAS_AUDIO = False
print("Note: miniaudio not installed — no ambience music.", file=sys.stderr)
# ── Paths ────────────────────────────────────────────────
BASE = Path(__file__).resolve().parent.parent
SESSION = BASE / 'session'
LOG_DIR = SESSION / 'log'
CHAR_PATH = SESSION / 'character.md'
WORLD_PATH = SESSION / 'world.md'
JOURNAL_PATH = SESSION / 'journal.md'
AMBIENCE_PATH = SESSION / 'ambience.md'
AMBIENCE_OPTIONS_PATH = SESSION / 'ambience_options.md'
BOOK_PATH = SESSION / 'book.md'
LAST_PROMPT_PATH = SESSION / 'last_prompt.md'
AUDIO_DIR = SESSION / 'audio'
TODAY = date.today().isoformat()
LOG_PATH = LOG_DIR / f'{TODAY}.md'
REFRESH_SECS = 2
MARKDOWN_THEME = Theme({
"markdown.h1": "bold #ff6b6b on #2a0000",
"markdown.h2": "bold #ffd93d",
"markdown.h3": "bold italic #6bcbff",
"markdown.h4": "bold #95e1d3",
"markdown.code": "bold #00d2d3 on #002222",
"markdown.code_block": "on #1e1e2e",
"markdown.block_quote": "dim italic #8395a7",
"markdown.link": "underline #48dbfb",
"markdown.item": "#ff9f43",
"markdown.em": "italic #ff9ff3",
"markdown.strong": "bold #feca57",
"markdown.horizontal_rule": "dim #555555",
})
# ── Helpers (file reading, status, book, ambience) ───────
def ensure_log():
LOG_DIR.mkdir(parents=True, exist_ok=True)
if not LOG_PATH.exists():
LOG_PATH.write_text(f"# Session Log — {TODAY}\n\n")
_populate_if_empty()
def _populate_if_empty():
content = LOG_PATH.read_text().strip()
if content and len(content.splitlines()) > 2:
return
prev = _previous_log()
if prev:
lines = prev.read_text().splitlines()
lines[0] = f"# Session Log — {TODAY}"
LOG_PATH.write_text('\n'.join(lines) + '\n')
def _previous_log():
entries = sorted(LOG_DIR.glob('*.md'))
today_name = LOG_PATH.name
for e in reversed(entries):
if e.name != today_name:
return e
return None
def read_todo():
if not JOURNAL_PATH.exists():
return ["—— No journal yet ——"]
lines = JOURNAL_PATH.read_text().splitlines()
in_todo = False
todo = []
for l in lines:
if l.strip().lstrip('#').strip().startswith('TODO'):
in_todo = True
continue
if l.strip().startswith('#') and in_todo:
break
if in_todo and l.strip():
todo.append(l.strip().lstrip('- '))
return todo or ["—— All done! ——"]
def read_log_tail(n=200):
if not LOG_PATH.exists():
return []
lines = LOG_PATH.read_text().splitlines()
return [l for l in lines if l.strip() and not l.startswith('#')][-n:]
def status_summary():
if not CHAR_PATH.exists():
return "no character"
lines = CHAR_PATH.read_text().splitlines()
name = "?"
health = "?"
for l in lines:
if l.startswith('**Name:**'):
name = l.split(':', 1)[1].strip().strip('_').strip('*')
if l.startswith('**Current Health:**'):
h = l.split(':', 1)[1].strip().strip('_').strip('*')
if h:
health = h
if l.startswith('**Max Health:**'):
m = l.split(':', 1)[1].strip().strip('_').strip('*')
if m and health == '?':
health = m
return f"{name}{health}"
def log_count():
return len(read_log_tail())
def load_book_pages():
if not BOOK_PATH.exists() or not BOOK_PATH.read_text().strip():
return ["*The story has not begun.*"]
text = BOOK_PATH.read_text().strip()
turns = text.split('\n## ')
pages = []
for i, t in enumerate(turns):
pages.append(t if i == 0 else '## ' + t)
return pages or ["*The story has not begun.*"]
def parse_ambience_options():
if not AMBIENCE_OPTIONS_PATH.exists():
return {}
options = {}
lines = AMBIENCE_OPTIONS_PATH.read_text().splitlines()
in_table = False
for line in lines:
s = line.strip()
if not s.startswith('|') or not s.endswith('|'):
in_table = False
continue
parts = [p.strip() for p in s.split('|')]
parts = [p for p in parts if p]
if len(parts) < 2:
continue
if not in_table:
in_table = True
continue
if all(c in '-:| ' for c in s):
continue
name = parts[0].lower()
files = [f.strip() for f in parts[1].split(',') if f.strip()]
paths = [AUDIO_DIR / f for f in files]
options[name] = paths
return options
# ── Ambience subsystem ───────────────────────────────────
class AmbiencePlayer:
def __init__(self):
self.current_ambience = 'silence'
self._last_mtime = 0
self._options = {}
self._device = None
self._stream = None
self.load_options()
@property
def available(self):
return HAS_AUDIO
@property
def ambience_name(self):
return self.current_ambience
def load_options(self):
self._options = parse_ambience_options()
def _stop(self):
if self._device:
try:
self._device.close()
except Exception:
pass
self._device = None
self._stream = None
def poll(self):
if not HAS_AUDIO:
return
try:
mtime = os.path.getmtime(AMBIENCE_PATH)
except OSError:
return
if mtime == self._last_mtime:
return
self._last_mtime = mtime
try:
name = AMBIENCE_PATH.read_text().strip().lower()
except OSError:
return
self._switch_to(name)
def _switch_to(self, name):
if name == self.current_ambience:
return
self.current_ambience = name
self._stop()
if name == 'silence' or name not in self._options:
return
tracks = self._options.get(name, [])
valid = [t for t in tracks if t.exists()]
if not valid:
return
track = random.choice(valid)
try:
self._stream = miniaudio.stream_file(str(track))
self._device = miniaudio.PlaybackDevice()
self._device.start(self._stream)
except Exception:
self.current_ambience = None
# module-level ref
app_ambience_player = None
# ── Roll Modal ───────────────────────────────────────────
class RollModal(Screen):
"""Overlay asking the player to roll physical dice and enter the result."""
CSS = """
RollModal {
align: center middle;
background: rgba(0, 0, 0, 0.75);
}
#roll-dialog {
width: 44;
height: auto;
padding: 2 3;
background: #2a2a3a;
border: thick #e0ad4c;
}
#roll-title {
text-style: bold;
color: #ffd93d;
text-align: center;
height: 3;
}
#roll-reason {
color: #c0b090;
text-align: center;
height: 3;
}
#roll-input {
margin: 1 0;
}
#roll-submit {
width: 100%;
}
#roll-hint {
color: #888888;
text-align: center;
height: 1;
}
"""
def __init__(self, dice: str, reason: str) -> None:
super().__init__()
self.dice = dice
self.reason = reason
def compose(self) -> ComposeResult:
with Vertical(id="roll-dialog"):
yield Static(f"[bold]🎲 ROLL {self.dice}[/bold]", id="roll-title")
yield Static(f"Reason: {self.reason}", id="roll-reason")
yield Input(
placeholder="Enter the number you rolled...",
id="roll-input",
)
yield Button("Submit", id="roll-submit", variant="primary")
yield Static("(or press Enter)", id="roll-hint")
def on_input_submitted(self, event: Input.Submitted) -> None:
self._submit(event.value)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "roll-submit":
inp = self.query_one("#roll-input", Input)
self._submit(inp.value)
def _submit(self, value: str) -> None:
val = value.strip()
if val:
self.dismiss(val)
# ── Auto-refreshing panels ───────────────────────────────
class AutoStatic(Static):
def load(self):
raise NotImplementedError
def on_mount(self):
self.load()
self.set_interval(REFRESH_SECS, self.load)
class TodoPane(AutoStatic):
def load(self):
items = read_todo()
self.update("\n".join(f"{i}" for i in items))
class TranscriptPane(AutoStatic):
def load(self):
lines = read_log_tail()
display = "\n".join(lines[-80:])
if lines:
display += "\n\n>>--- NOW --->"
self.update(display)
self.call_after_refresh(self._scroll_bottom)
def _scroll_bottom(self):
if self.parent and hasattr(self.parent, 'scroll_end'):
self.parent.scroll_end(animate=False)
class CharPane(AutoStatic):
def load(self):
if not CHAR_PATH.exists():
self.update("*No character sheet*")
return
self.update(RichMarkdown(CHAR_PATH.read_text().strip()))
class StatusBar(AutoStatic):
def load(self):
char = status_summary()
count = log_count()
todo = len(read_todo())
music = ""
if not HAS_AUDIO:
music = " │ ♫ (install miniaudio)"
elif app_ambience_player:
name = app_ambience_player.ambience_name
music = f" │ ♫ {name}"
self.update(f"{char}{count} entries │ {todo} todo │ {TODAY}{music}")
# ── The App ──────────────────────────────────────────────
class ChaosTUI(App):
TITLE = "The Chaos"
CSS = """
Screen {
background: #121212;
}
#banner {
dock: top;
height: 1;
background: #2a2a2a;
color: #e0ad4c;
text-align: center;
}
#main {
height: 100%;
background: #111111;
}
#todo-header {
background: #3a2a1a;
color: #e0b060;
text-style: bold;
padding: 0 1;
height: 1;
}
#todo-content {
background: #1a1510;
color: #d0b080;
padding: 0 1;
height: 5;
max-height: 5;
overflow-y: auto;
scrollbar-size-vertical: 2;
}
#main-tabs {
height: 1fr;
}
TabbedContent {
background: #1a1a2a;
}
VerticalScroll {
overflow-y: auto;
scrollbar-size-vertical: 2;
scrollbar-color: #555555;
scrollbar-color-hover: #777777;
scrollbar-color-active: #999999;
}
#char-content {
background: #1e1e2a;
color: #c0c0c0;
padding: 0 1;
}
#transcript {
background: #1a2a1a;
color: #c8c8c8;
padding: 0 1;
}
/* Play tab */
#play-narrative {
background: #161616;
color: #d8d8d8;
padding: 1 2;
height: auto;
}
#play-status {
background: #1a1a2a;
color: #e0b060;
padding: 0 2;
height: 1;
text-style: bold italic;
text-align: center;
}
#play-status.processing {
background: #2a1a0a;
color: #ffd93d;
}
#play-input {
height: 3;
background: #222222;
color: #e0d0c0;
border: solid #555555;
padding: 0 1;
}
#play-input:focus {
border: solid #e0ad4c;
}
#play-input:disabled {
background: #1a1a1a;
color: #666666;
border: solid #333333;
}
/* Book tab */
#book-header {
background: #2d2d2d;
color: #e0c080;
text-style: bold;
padding: 0 1;
height: 1;
}
#book-nav {
height: 3;
background: #222222;
align: center middle;
}
#book-nav Button {
width: 10;
margin: 0 1;
}
#book-nav Button:disabled {
color: #444444;
}
#book-nav Button:hover {
text-style: bold;
}
#book-nav-center {
height: 3;
width: 1fr;
}
#book-page-label {
height: 1;
color: #c0b090;
text-style: bold;
padding: 0 2;
text-align: center;
}
#book-progress {
height: 1;
background: #1a1a1a;
color: #e0b060;
padding: 0 2;
text-align: center;
}
#book-scroll {
height: 1fr;
}
#book-content {
background: #161616;
color: #d8d8d8;
padding: 0 2;
}
#status-bar {
background: #222222;
color: #888888;
padding: 0 1;
height: 1;
text-style: italic;
}
"""
BINDINGS = [
("ctrl+c", "quit", "Quit"),
("escape", "quit", "Quit"),
]
def __init__(self, *args, no_music=False, **kwargs):
super().__init__(*args, **kwargs)
global app_ambience_player
if not no_music:
app_ambience_player = AmbiencePlayer()
else:
app_ambience_player = None
# Game engine
self.engine = GameEngine()
# Game loop state
self._last_prompt: str = ""
self._last_result: TurnResult | None = None
self._is_processing: bool = False
# Thinking animation
self._spinner_frames = ["", "", "", ""]
self._thinking_frame = 0
self._thinking_timer_handle = None
self._dm_action = "DM is weaving the narrative"
# Player roll state (thread-safe)
self._roll_event = threading.Event()
self._roll_result: str | None = None
# Book viewer state
self._book_page = 0
self._book_pages = []
self._prev_page_count = 0
# ── Compose ──────────────────────────────────────────
def compose(self):
yield Static(f"⚔ The Chaos ╎ {TODAY}", id="banner")
with Vertical(id="main"):
yield Static("TODO", id="todo-header")
yield TodoPane(id="todo-content")
with TabbedContent(initial="play-tab", id="main-tabs"):
with TabPane("PLAY", id="play-tab"):
with VerticalScroll(id="play-scroll"):
yield Static("*Awaiting the fates...*", id="play-narrative")
yield Static("", id="play-status")
yield Input(
placeholder="Type your action and press Enter...",
id="play-input",
)
with TabPane("CHARACTER", id="char-tab"):
with VerticalScroll():
yield CharPane(id="char-content")
with TabPane("LOG", id="log-tab"):
with VerticalScroll():
yield TranscriptPane(id="transcript")
with TabPane("BOOK", id="book-tab"):
yield Static("BOOK", id="book-header")
with Vertical(id="book-section"):
with Horizontal(id="book-nav"):
yield Button("<< Prev", id="book-prev")
with Vertical(id="book-nav-center"):
yield Static("", id="book-page-label")
yield Static("", id="book-progress")
yield Button("Next >>", id="book-next")
with VerticalScroll(id="book-scroll"):
yield Static("", id="book-content")
yield StatusBar(id="status-bar")
def on_mount(self):
ensure_log()
self.console._theme = MARKDOWN_THEME
self._init_book()
self.set_interval(REFRESH_SECS, self._check_ambience)
self.set_interval(REFRESH_SECS, self._reload_book)
# Start the game
self.call_after_refresh(self._begin_game)
def _begin_game(self):
"""Resume from last saved prompt or generate an opening scene."""
if LAST_PROMPT_PATH.exists():
saved = LAST_PROMPT_PATH.read_text().strip()
if saved:
self._last_prompt = saved
pages = load_book_pages()
parts = []
if pages:
parts.append(pages[-1])
parts.append(f"---\n\n{saved}")
self._set_narrative("\n\n".join(parts))
self._enable_input()
return
self._call_llm()
# ── Ambience ─────────────────────────────────────────
def _check_ambience(self):
if app_ambience_player:
app_ambience_player.poll()
# ── Game Loop ─────────────────────────────────────────
def _call_llm(self, player_action: str | None = None):
"""Called when the player has acted — sends their action to the LLM."""
if self._is_processing:
return
self._is_processing = True
input_widget = self.query_one("#play-input", Input)
input_widget.disabled = True
input_widget.placeholder = "DM is at work..."
self._show_thinking()
# Run generation in a daemon thread so it doesn't block the UI
t = threading.Thread(
target=self._run_generation,
args=(player_action,),
daemon=True,
)
t.start()
def _run_generation(self, player_action: str | None) -> None:
"""Worker thread: calls engine.generate_with_tools() and posts result back."""
last_prompt = self._last_prompt if self._last_prompt else None
def on_thought(thought: str) -> None:
self.call_from_thread(self._on_thought, thought)
def on_action(action: str) -> None:
self.call_from_thread(self._on_action, action)
result = self.engine.generate_with_tools(
player_action=player_action,
last_prompt=last_prompt,
on_thought=on_thought,
on_action=on_action,
on_player_roll=self._on_player_roll,
)
self.call_from_thread(self._on_generation_done, result, player_action)
def _show_thinking(self) -> None:
"""Show the thinking indicator and start the animation timer."""
self._dm_action = "DM is weaving the narrative"
self._thinking_frame = 0
status = self.query_one("#play-status", Static)
status.add_class("processing")
spinner = self._spinner_frames[0]
status.update(f"{spinner} {self._dm_action}")
self._thinking_timer_handle = self.set_interval(
0.5, self._tick_thinking
)
def _hide_thinking(self) -> None:
"""Stop the animation and hide the thinking indicator."""
if self._thinking_timer_handle:
self._thinking_timer_handle.stop()
self._thinking_timer_handle = None
status = self.query_one("#play-status", Static)
status.remove_class("processing")
status.update("")
def _on_thought(self, thought: str) -> None:
"""Display a thought from the DM in the status bar."""
display = thought[:60] + "" if len(thought) > 60 else thought
self._dm_action = display
self._thinking_frame = 0
status = self.query_one("#play-status", Static)
status.add_class("processing")
spinner = self._spinner_frames[0]
status.update(f"{spinner} {display}")
def _on_action(self, action: str) -> None:
"""Display a DM action (tool call) in the status bar."""
self._dm_action = action
self._thinking_frame = 0
status = self.query_one("#play-status", Static)
status.add_class("processing")
spinner = self._spinner_frames[0]
status.update(f"{spinner} {action}")
def _on_player_roll(self, dice: str, reason: str) -> str:
"""Called from worker thread. Shows roll popup, blocks until player responds."""
self.call_from_thread(self._show_roll_modal, dice, reason)
self._roll_event.wait()
self._roll_event.clear()
result = self._roll_result
self._roll_result = None
return result or "0"
def _show_roll_modal(self, dice: str, reason: str) -> None:
"""Push the RollModal screen (runs on main thread)."""
self._roll_event.clear()
self._roll_result = None
def on_dismiss(value: str) -> None:
self._roll_result = value
self._roll_event.set()
self.push_screen(RollModal(dice, reason), on_dismiss)
def _tick_thinking(self) -> None:
"""Animate the spinner on the current DM action."""
if not self._is_processing:
return
self._thinking_frame = (self._thinking_frame + 1) % len(self._spinner_frames)
spinner = self._spinner_frames[self._thinking_frame]
status = self.query_one("#play-status", Static)
status.update(f"{spinner} {self._dm_action}")
def _on_generation_done(
self, result: TurnResult, player_action: str | None
) -> None:
"""Handle the completed turn on the main thread."""
self._is_processing = False
self._hide_thinking()
if result.error:
self._show_error(result.error, result.debug_info)
return
# Archive the turn's book log
if result.book_log:
self.engine.archive_turn(result.book_log)
# Apply state changes
self.engine.apply_state(result)
# Display the next user prompt
self._display_scene(result)
# Persist the prompt so the game resumes here on restart
if result.user_prompt:
LAST_PROMPT_PATH.write_text(result.user_prompt.strip())
# Store for next turn
self._last_prompt = result.user_prompt
self._last_result = result
def _display_scene(self, result: TurnResult) -> None:
"""Update the UI with the last story entry followed by the DM prompt."""
parts = []
if result.book_log:
parts.append(result.book_log)
if result.user_prompt:
parts.append(f"---\n\n{result.user_prompt}")
self._set_narrative("\n\n".join(parts) if parts else "")
self._enable_input()
def _enable_input(self) -> None:
input_widget = self.query_one("#play-input", Input)
input_widget.disabled = False
input_widget.placeholder = "Type your action and press Enter..."
input_widget.value = ""
input_widget.focus()
def _set_narrative(self, text: str) -> None:
widget = self.query_one("#play-narrative", Static)
widget.update(RichMarkdown(text))
# Scroll to top
scroll = self.query_one("#play-scroll", VerticalScroll)
scroll.scroll_home(animate=False)
def _show_error(self, error: str, debug_info: str = "") -> None:
text = f"**Error:** {error}\n\n"
if debug_info:
text += f"**Debug Info:**\n\n{debug_info}\n\n"
text += "Check your session/config.json and ensure your LLM provider is running."
self._set_narrative(text)
self._enable_input()
# ── Input handling ────────────────────────────────────
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Player pressed Enter in the input widget."""
action = event.value.strip()
if not action or self._is_processing:
event.stop()
return
event.stop()
self._handle_player_action(action)
def _handle_player_action(self, action: str) -> None:
"""Handle a player action typed in the input."""
# Log the action
from datetime import datetime
timestamp = datetime.now().strftime("%H:%M")
time_of_day = self._guess_time_of_day()
log_entry = f"- **{time_of_day}** — {action}"
self.engine.append_log(log_entry)
# Call LLM to resolve
self._call_llm(player_action=action)
def _guess_time_of_day(self) -> str:
"""Simple time-of-day label based on hour."""
from datetime import datetime
h = datetime.now().hour
if h < 6:
return "Night"
elif h < 12:
return "Morning"
elif h < 14:
return "Midday"
elif h < 18:
return "Afternoon"
elif h < 21:
return "Evening"
else:
return "Night"
# ── Book viewer ───────────────────────────────────────
def _init_book(self):
self._reload_book()
self._render_book_page()
def _reload_book(self):
self._book_pages = load_book_pages()
if len(self._book_pages) > self._prev_page_count:
self._book_page = len(self._book_pages) - 1
self._prev_page_count = len(self._book_pages)
self._book_page = max(0, min(self._book_page, len(self._book_pages) - 1))
self._render_book_page()
def _render_book_page(self):
if not self._book_pages:
return
self.query_one("#book-content").update(
RichMarkdown(self._book_pages[self._book_page])
)
total = len(self._book_pages)
self.query_one("#book-page-label").update(
f"Page {self._book_page + 1} of {total}"
)
pct = (self._book_page + 1) / total if total else 1
fill = round(pct * 20)
bar = "" * fill + "" * (20 - fill)
self.query_one("#book-progress").update(f"[{bar}]")
self.query_one("#book-prev").disabled = (self._book_page == 0)
self.query_one("#book-next").disabled = (self._book_page == total - 1)
def action_prev_page(self):
if self._book_page > 0:
self._book_page -= 1
self._render_book_page()
self.query_one("#book-scroll").scroll_home(animate=False)
def action_next_page(self):
if self._book_page < len(self._book_pages) - 1:
self._book_page += 1
self._render_book_page()
self.query_one("#book-scroll").scroll_home(animate=False)
@on(Button.Pressed, "#book-prev")
def on_book_prev(self):
self.action_prev_page()
@on(Button.Pressed, "#book-next")
def on_book_next(self):
self.action_next_page()
def main():
import argparse
parser = argparse.ArgumentParser(description="The Chaos TUI")
parser.add_argument('--no-music', action='store_true', help='Disable ambience music')
args = parser.parse_args()
app = ChaosTUI(no_music=args.no_music)
app.run()
if __name__ == '__main__':
main()