#!/usr/bin/env python3 """ run.py — The Chaos TTRPG Session Client (Game Mode) Owns the TUI and game loop. Layout: PLAY (narrative + choices + input) | CHAR | LOG | BOOK tabs """ from __future__ import annotations import os import random import sys import threading from datetime import date from pathlib import Path from textual import on from textual.app import App, ComposeResult from textual.containers import Horizontal, Vertical, VerticalScroll from textual.widgets import Button, Input, Static, TabbedContent, TabPane from rich.markdown import Markdown as RichMarkdown from rich.theme import Theme # ── Game engine ───────────────────────────────────────── from engine import GameEngine, GenerationResult # ── Optional miniaudio ──────────────────────────────────── try: import miniaudio HAS_AUDIO = True except ImportError: HAS_AUDIO = False print("Note: miniaudio not installed — no ambience music.", file=sys.stderr) # ── Paths ──────────────────────────────────────────────── BASE = Path(__file__).resolve().parent.parent SESSION = BASE / 'session' LOG_DIR = SESSION / 'log' CHAR_PATH = SESSION / 'character.md' WORLD_PATH = SESSION / 'world.md' JOURNAL_PATH = SESSION / 'journal.md' AMBIENCE_PATH = SESSION / 'ambience.md' AMBIENCE_OPTIONS_PATH = SESSION / 'ambience_options.md' BOOK_PATH = SESSION / 'book.md' AUDIO_DIR = SESSION / 'audio' TODAY = date.today().isoformat() LOG_PATH = LOG_DIR / f'{TODAY}.md' REFRESH_SECS = 2 MARKDOWN_THEME = Theme({ "markdown.h1": "bold #ff6b6b on #2a0000", "markdown.h2": "bold #ffd93d", "markdown.h3": "bold italic #6bcbff", "markdown.h4": "bold #95e1d3", "markdown.code": "bold #00d2d3 on #002222", "markdown.code_block": "on #1e1e2e", "markdown.block_quote": "dim italic #8395a7", "markdown.link": "underline #48dbfb", "markdown.item": "#ff9f43", "markdown.em": "italic #ff9ff3", "markdown.strong": "bold #feca57", "markdown.horizontal_rule": "dim #555555", }) # ── Helpers (file reading, status, book, ambience) ─────── def ensure_log(): LOG_DIR.mkdir(parents=True, exist_ok=True) if not LOG_PATH.exists(): LOG_PATH.write_text(f"# Session Log — {TODAY}\n\n") _populate_if_empty() def _populate_if_empty(): content = LOG_PATH.read_text().strip() if content and len(content.splitlines()) > 2: return prev = _previous_log() if prev: lines = prev.read_text().splitlines() lines[0] = f"# Session Log — {TODAY}" LOG_PATH.write_text('\n'.join(lines) + '\n') def _previous_log(): entries = sorted(LOG_DIR.glob('*.md')) today_name = LOG_PATH.name for e in reversed(entries): if e.name != today_name: return e return None def read_todo(): if not JOURNAL_PATH.exists(): return ["—— No journal yet ——"] lines = JOURNAL_PATH.read_text().splitlines() in_todo = False todo = [] for l in lines: if l.strip().lstrip('#').strip().startswith('TODO'): in_todo = True continue if l.strip().startswith('#') and in_todo: break if in_todo and l.strip(): todo.append(l.strip().lstrip('- ')) return todo or ["—— All done! ——"] def read_log_tail(n=200): if not LOG_PATH.exists(): return [] lines = LOG_PATH.read_text().splitlines() return [l for l in lines if l.strip() and not l.startswith('#')][-n:] def status_summary(): if not CHAR_PATH.exists(): return "no character" lines = CHAR_PATH.read_text().splitlines() name = "?" health = "?" for l in lines: if l.startswith('**Name:**'): name = l.split(':', 1)[1].strip().strip('_').strip('*') if l.startswith('**Current Health:**'): h = l.split(':', 1)[1].strip().strip('_').strip('*') if h: health = h if l.startswith('**Max Health:**'): m = l.split(':', 1)[1].strip().strip('_').strip('*') if m and health == '?': health = m return f"{name} ❤ {health}" def log_count(): return len(read_log_tail()) def load_book_pages(): if not BOOK_PATH.exists() or not BOOK_PATH.read_text().strip(): return ["*The story has not begun.*"] text = BOOK_PATH.read_text().strip() turns = text.split('\n## ') pages = [] for i, t in enumerate(turns): pages.append(t if i == 0 else '## ' + t) return pages or ["*The story has not begun.*"] def parse_ambience_options(): if not AMBIENCE_OPTIONS_PATH.exists(): return {} options = {} lines = AMBIENCE_OPTIONS_PATH.read_text().splitlines() in_table = False for line in lines: s = line.strip() if not s.startswith('|') or not s.endswith('|'): in_table = False continue parts = [p.strip() for p in s.split('|')] parts = [p for p in parts if p] if len(parts) < 2: continue if not in_table: in_table = True continue if all(c in '-:| ' for c in s): continue name = parts[0].lower() files = [f.strip() for f in parts[1].split(',') if f.strip()] paths = [AUDIO_DIR / f for f in files] options[name] = paths return options # ── Ambience subsystem ─────────────────────────────────── class AmbiencePlayer: def __init__(self): self.current_ambience = 'silence' self._last_mtime = 0 self._options = {} self._device = None self._stream = None self.load_options() @property def available(self): return HAS_AUDIO @property def ambience_name(self): return self.current_ambience def load_options(self): self._options = parse_ambience_options() def _stop(self): if self._device: try: self._device.close() except Exception: pass self._device = None self._stream = None def poll(self): if not HAS_AUDIO: return try: mtime = os.path.getmtime(AMBIENCE_PATH) except OSError: return if mtime == self._last_mtime: return self._last_mtime = mtime try: name = AMBIENCE_PATH.read_text().strip().lower() except OSError: return self._switch_to(name) def _switch_to(self, name): if name == self.current_ambience: return self.current_ambience = name self._stop() if name == 'silence' or name not in self._options: return tracks = self._options.get(name, []) valid = [t for t in tracks if t.exists()] if not valid: return track = random.choice(valid) try: self._stream = miniaudio.stream_file(str(track)) self._device = miniaudio.PlaybackDevice() self._device.start(self._stream) except Exception: self.current_ambience = None # module-level ref app_ambience_player = None # ── Auto-refreshing panels ─────────────────────────────── class AutoStatic(Static): def load(self): raise NotImplementedError def on_mount(self): self.load() self.set_interval(REFRESH_SECS, self.load) class TodoPane(AutoStatic): def load(self): items = read_todo() self.update("\n".join(f" ☐ {i}" for i in items)) class TranscriptPane(AutoStatic): def load(self): lines = read_log_tail() display = "\n".join(lines[-80:]) if lines: display += "\n\n>>--- NOW --->" self.update(display) self.call_after_refresh(self._scroll_bottom) def _scroll_bottom(self): if self.parent and hasattr(self.parent, 'scroll_end'): self.parent.scroll_end(animate=False) class CharPane(AutoStatic): def load(self): if not CHAR_PATH.exists(): self.update("*No character sheet*") return self.update(RichMarkdown(CHAR_PATH.read_text().strip())) class StatusBar(AutoStatic): def load(self): char = status_summary() count = log_count() todo = len(read_todo()) music = "" if not HAS_AUDIO: music = " │ ♫ (install miniaudio)" elif app_ambience_player: name = app_ambience_player.ambience_name music = f" │ ♫ {name}" self.update(f"{char} │ {count} entries │ {todo} todo │ {TODAY}{music}") # ── The App ────────────────────────────────────────────── class ChaosTUI(App): TITLE = "The Chaos" CSS = """ Screen { background: #121212; } #banner { dock: top; height: 1; background: #2a2a2a; color: #e0ad4c; text-align: center; } #main { height: 100%; background: #111111; } #todo-header { background: #3a2a1a; color: #e0b060; text-style: bold; padding: 0 1; height: 1; } #todo-content { background: #1a1510; color: #d0b080; padding: 0 1; height: 5; max-height: 5; overflow-y: auto; scrollbar-size-vertical: 2; } #main-tabs { height: 1fr; } TabbedContent { background: #1a1a2a; } VerticalScroll { overflow-y: auto; scrollbar-size-vertical: 2; scrollbar-color: #555555; scrollbar-color-hover: #777777; scrollbar-color-active: #999999; } #char-content { background: #1e1e2a; color: #c0c0c0; padding: 0 1; } #transcript { background: #1a2a1a; color: #c8c8c8; padding: 0 1; } /* Play tab */ #play-narrative { background: #161616; color: #d8d8d8; padding: 1 2; height: auto; } #play-choices { height: auto; min-height: 3; background: #1e1e2a; padding: 0 1; align: center middle; } #play-choices Button { margin: 0 1; min-width: 12; } #play-status { background: #1a1a2a; color: #e0b060; padding: 0 2; height: 1; text-style: bold italic; text-align: center; } #play-status.processing { background: #2a1a0a; color: #ffd93d; } #play-input { height: 3; background: #222222; color: #e0d0c0; border: solid #555555; padding: 0 1; } #play-input:focus { border: solid #e0ad4c; } #play-input:disabled { background: #1a1a1a; color: #666666; border: solid #333333; } /* Book tab */ #book-header { background: #2d2d2d; color: #e0c080; text-style: bold; padding: 0 1; height: 1; } #book-nav { height: 3; background: #222222; align: center middle; } #book-nav Button { width: 10; margin: 0 1; } #book-nav Button:disabled { color: #444444; } #book-nav Button:hover { text-style: bold; } #book-nav-center { height: 3; width: 1fr; } #book-page-label { height: 1; color: #c0b090; text-style: bold; padding: 0 2; text-align: center; } #book-progress { height: 1; background: #1a1a1a; color: #e0b060; padding: 0 2; text-align: center; } #book-scroll { height: 1fr; } #book-content { background: #161616; color: #d8d8d8; padding: 0 2; } #status-bar { background: #222222; color: #888888; padding: 0 1; height: 1; text-style: italic; } """ BINDINGS = [ ("ctrl+c", "quit", "Quit"), ("escape", "quit", "Quit"), ] def __init__(self, *args, no_music=False, **kwargs): super().__init__(*args, **kwargs) global app_ambience_player if not no_music: app_ambience_player = AmbiencePlayer() else: app_ambience_player = None # Game engine self.engine = GameEngine() # Game loop state self._last_narrative: str = "" self._last_result: GenerationResult | None = None self._is_processing: bool = False # Thinking animation self._thinking_dots = 0 self._thinking_timer_handle = None # Book viewer state self._book_page = 0 self._book_pages = [] self._prev_page_count = 0 # ── Compose ────────────────────────────────────────── def compose(self): yield Static(f"⚔ The Chaos ╎ {TODAY}", id="banner") with Vertical(id="main"): yield Static("TODO", id="todo-header") yield TodoPane(id="todo-content") with TabbedContent(initial="play-tab", id="main-tabs"): with TabPane("PLAY", id="play-tab"): with VerticalScroll(id="play-scroll"): yield Static("*Awaiting the fates...*", id="play-narrative") yield Horizontal(id="play-choices") yield Static("", id="play-status") yield Input( placeholder="Type your action and press Enter...", id="play-input", ) with TabPane("CHARACTER", id="char-tab"): with VerticalScroll(): yield CharPane(id="char-content") with TabPane("LOG", id="log-tab"): with VerticalScroll(): yield TranscriptPane(id="transcript") with TabPane("BOOK", id="book-tab"): yield Static("BOOK", id="book-header") with Vertical(id="book-section"): with Horizontal(id="book-nav"): yield Button("<< Prev", id="book-prev") with Vertical(id="book-nav-center"): yield Static("", id="book-page-label") yield Static("", id="book-progress") yield Button("Next >>", id="book-next") with VerticalScroll(id="book-scroll"): yield Static("", id="book-content") yield StatusBar(id="status-bar") def on_mount(self): ensure_log() self.console._theme = MARKDOWN_THEME self._init_book() self.set_interval(REFRESH_SECS, self._check_ambience) self.set_interval(REFRESH_SECS, self._reload_book) # Start the game self.call_after_refresh(self._begin_game) def _begin_game(self): """Generate the first scene of the game.""" self._call_llm() # ── Ambience ───────────────────────────────────────── def _check_ambience(self): if app_ambience_player: app_ambience_player.poll() # ── Game Loop ───────────────────────────────────────── def _call_llm(self, player_action: str | None = None): """Called when we need new content from the LLM (scene or resolution).""" if self._is_processing: return self._is_processing = True input_widget = self.query_one("#play-input", Input) input_widget.disabled = True input_widget.placeholder = "LLM is thinking..." self._clear_choices() self._show_thinking() # Run generation in a daemon thread so it doesn't block the UI t = threading.Thread( target=self._run_generation, args=(player_action,), daemon=True, ) t.start() def _run_generation(self, player_action: str | None) -> None: """Worker thread: calls engine.generate() and posts result back.""" # Provide previous narrative as context on subsequent calls last_narrative = self._last_narrative if self._last_narrative else None result = self.engine.generate( player_action=player_action, last_narrative=last_narrative, ) self.call_from_thread(self._on_generation_done, result, player_action) def _show_thinking(self) -> None: """Show the thinking indicator and start the animation timer.""" self._thinking_dots = 0 status = self.query_one("#play-status", Static) status.add_class("processing") status.update("✦ LLM is weaving the narrative ✦") self._thinking_timer_handle = self.set_interval( 0.5, self._tick_thinking ) def _hide_thinking(self) -> None: """Stop the animation and hide the thinking indicator.""" if self._thinking_timer_handle: self._thinking_timer_handle.stop() self._thinking_timer_handle = None status = self.query_one("#play-status", Static) status.remove_class("processing") status.update("") def _tick_thinking(self) -> None: """Animate the thinking dots.""" if not self._is_processing: return self._thinking_dots = (self._thinking_dots + 1) % 4 dots = "." * self._thinking_dots status = self.query_one("#play-status", Static) status.update(f"✦ LLM is weaving the narrative{dots} ✦") def _on_generation_done( self, result: GenerationResult, player_action: str | None ) -> None: """Handle the completed generation on the main thread.""" self._is_processing = False self._hide_thinking() if result.error: self._show_error(result.error) return # If this was a resolution (player acted), archive the previous turn if player_action and self._last_narrative: archive_text = ( f"{self._last_narrative}\n\n" f"---\n\n" f"**Player chose:** {player_action}\n\n" f"{result.narrative}" ) self.engine.archive_turn(archive_text) # Apply state changes if result.character_updates or result.world_updates: self.engine.apply_state(result) # Display the scene self._display_scene(result) # Store for next turn self._last_narrative = result.narrative self._last_result = result def _display_scene(self, result: GenerationResult) -> None: """Update the UI with a new scene.""" self._set_narrative(result.narrative) self._set_choices(result.choices) self._enable_input() def _enable_input(self) -> None: input_widget = self.query_one("#play-input", Input) input_widget.disabled = False input_widget.placeholder = "Type your action and press Enter..." input_widget.value = "" input_widget.focus() def _set_narrative(self, text: str) -> None: widget = self.query_one("#play-narrative", Static) widget.update(RichMarkdown(text)) # Scroll to top scroll = self.query_one("#play-scroll", VerticalScroll) scroll.scroll_home(animate=False) def _clear_choices(self) -> None: container = self.query_one("#play-choices", Horizontal) container.remove_children() def _set_choices(self, choices: list[str]) -> None: container = self.query_one("#play-choices", Horizontal) container.remove_children() for choice in choices: btn = Button(choice, classes="choice-btn") container.mount(btn) def _show_error(self, error: str) -> None: self._set_narrative( f"**Error:** {error}\n\n" "Check your session/config.json and ensure your LLM provider is running." ) self._enable_input() # ── Input handling ──────────────────────────────────── def on_input_submitted(self, event: Input.Submitted) -> None: """Player pressed Enter in the input widget.""" action = event.value.strip() if not action or self._is_processing: event.stop() return event.stop() self._handle_player_action(action) @on(Button.Pressed, ".choice-btn") def on_choice_clicked(self, event: Button.Pressed) -> None: """Player clicked a choice button.""" if self._is_processing: return action = event.button.label self._handle_player_action(str(action)) def _handle_player_action(self, action: str) -> None: """Common handler for player actions from input or buttons.""" # Log the action from datetime import datetime timestamp = datetime.now().strftime("%H:%M") time_of_day = self._guess_time_of_day() log_entry = f"- **{time_of_day}** — {action}" self.engine.append_log(log_entry) # Call LLM to resolve self._call_llm(player_action=action) def _guess_time_of_day(self) -> str: """Simple time-of-day label based on hour.""" from datetime import datetime h = datetime.now().hour if h < 6: return "Night" elif h < 12: return "Morning" elif h < 14: return "Midday" elif h < 18: return "Afternoon" elif h < 21: return "Evening" else: return "Night" # ── Book viewer ─────────────────────────────────────── def _init_book(self): self._reload_book() self._render_book_page() def _reload_book(self): self._book_pages = load_book_pages() if len(self._book_pages) > self._prev_page_count: self._book_page = len(self._book_pages) - 1 self._prev_page_count = len(self._book_pages) self._book_page = max(0, min(self._book_page, len(self._book_pages) - 1)) self._render_book_page() def _render_book_page(self): if not self._book_pages: return self.query_one("#book-content").update( RichMarkdown(self._book_pages[self._book_page]) ) total = len(self._book_pages) self.query_one("#book-page-label").update( f"Page {self._book_page + 1} of {total}" ) pct = (self._book_page + 1) / total if total else 1 fill = round(pct * 20) bar = "█" * fill + "░" * (20 - fill) self.query_one("#book-progress").update(f"[{bar}]") self.query_one("#book-prev").disabled = (self._book_page == 0) self.query_one("#book-next").disabled = (self._book_page == total - 1) def action_prev_page(self): if self._book_page > 0: self._book_page -= 1 self._render_book_page() self.query_one("#book-scroll").scroll_home(animate=False) def action_next_page(self): if self._book_page < len(self._book_pages) - 1: self._book_page += 1 self._render_book_page() self.query_one("#book-scroll").scroll_home(animate=False) @on(Button.Pressed, "#book-prev") def on_book_prev(self): self.action_prev_page() @on(Button.Pressed, "#book-next") def on_book_next(self): self.action_next_page() def main(): import argparse parser = argparse.ArgumentParser(description="The Chaos TUI") parser.add_argument('--no-music', action='store_true', help='Disable ambience music') args = parser.parse_args() app = ChaosTUI(no_music=args.no_music) app.run() if __name__ == '__main__': main()