"""Visual Novel Gradio App - Main application with UI and handlers.""" from __future__ import annotations import os import urllib.parse import numpy as np import logging from typing import Optional import gradio as gr from fastrtc import WebRTC from engine import SceneState, POSITION_OFFSETS, Choice, InputRequest from story import build_sample_story # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def passthrough_stream(frame): """Return the incoming frame untouched so the user sees their feed.""" return frame def camera_hint_text(show_camera: bool) -> str: if show_camera: return "๐ŸŽฅ Webcam overlay is active for this scene." return "๐Ÿ•น๏ธ Webcam is hidden for this scene." def voice_hint_text(show_voice: bool) -> str: if show_voice: return "๐ŸŽค Voice capture is available in this scene." return "๐Ÿ”‡ Voice capture is hidden for this scene." def motor_hint_text(show_motors: bool) -> str: if show_motors: return "๐Ÿค– Motor control is available in this scene." return "๐Ÿ›‘ Motor control hidden for this scene." def robot_hint_text(show_robot: bool) -> str: if show_robot: return "๐Ÿค– Robot control is available in this scene." return "๐Ÿ”’ Robot control hidden for this scene." # Dynamixel control functions using Python protocol implementation def dxl_build_ping_packet(motor_id: int) -> list[int]: """Build a ping packet and return as list of bytes.""" import dynamixel packet = dynamixel.ping_packet(motor_id) return list(packet) def dxl_build_torque_packet(motor_id: int, enable: bool) -> list[int]: """Build a torque enable/disable packet and return as list of bytes.""" import dynamixel packet = dynamixel.torque_enable_packet(motor_id, enable) return list(packet) def dxl_build_goal_position_packet(motor_id: int, degrees: float) -> list[int]: """Build a goal position packet and return as list of bytes.""" import dynamixel # Convert degrees to ticks (0-360ยฐ -> 0-4095) clamped_deg = max(0.0, min(360.0, degrees)) ticks = int((clamped_deg / 360.0) * 4095) packet = dynamixel.goal_position_packet(motor_id, ticks) return list(packet) def dxl_parse_response(response_bytes: list[int]) -> str: """Parse a status packet response and return human-readable result.""" import dynamixel if not response_bytes: return "โŒ No response received" success, message = dynamixel.parse_status_packet(bytes(response_bytes)) if success: return f"โœ… {message}" else: return f"โŒ {message}" def get_scene_motor_packets(story_state: dict) -> list: """Extract motor commands from current scene and build packets.""" scenes = story_state["scenes"] current_index = story_state["index"] if 0 <= current_index < len(scenes): scene = scenes[current_index] # Build packet for each motor command packets = [] for cmd in scene.motor_commands: packet = dxl_build_goal_position_packet(cmd.motor_id, cmd.position) packets.append(packet) return packets return [] def get_scene_audio(story_state: dict) -> Optional[str]: """Extract audio file from current scene.""" scenes = story_state["scenes"] current_index = story_state["index"] if 0 <= current_index < len(scenes): scene = scenes[current_index] return scene.audio_file return None def get_scene_robot_pose(story_state: dict) -> Optional[dict]: """Extract robot pose from current scene.""" scenes = story_state["scenes"] current_index = story_state["index"] if 0 <= current_index < len(scenes): scene = scenes[current_index] if scene.robot_pose: return { "target_head_pose": { "x": scene.robot_pose.head_x, "y": scene.robot_pose.head_y, "z": scene.robot_pose.head_z, "roll": scene.robot_pose.head_roll, "pitch": scene.robot_pose.head_pitch, "yaw": scene.robot_pose.head_yaw, }, "target_body_yaw": scene.robot_pose.body_yaw, "target_antennas": [scene.robot_pose.antenna_left, scene.robot_pose.antenna_right], } return None def synthesize_tone(sample_rate: int = 16000, duration: float = 1.25) -> tuple[int, np.ndarray]: """Generate a short confirmation tone to play back as the AI voice.""" samples = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) carrier = np.sin(2 * np.pi * 520 * samples) + 0.4 * np.sin(2 * np.pi * 880 * samples) fade_len = int(sample_rate * 0.08) envelope = np.ones_like(carrier) envelope[:fade_len] *= np.linspace(0.0, 1.0, fade_len) envelope[-fade_len:] *= np.linspace(1.0, 0.0, fade_len) tone = 0.18 * carrier * envelope return sample_rate, tone.astype(np.float32) def describe_audio_clip(audio: Optional[tuple[int, np.ndarray]]) -> str: if audio is None: return "No audio captured yet. Hit record to speak with the companion." sample_rate, samples = audio num_samples = len(samples) if samples is not None else 0 if num_samples == 0: return "Audio appears empty. Please re-record." duration = num_samples / float(sample_rate or 1) rms = float(np.sqrt(np.mean(np.square(samples)))) return f"Captured {duration:.2f}s of audio (RMS ~{rms:.3f}). Ready for the AI." def process_voice_interaction( audio: Optional[tuple[int, np.ndarray]], prompt: str ) -> tuple[str, Optional[tuple[int, np.ndarray]], str, tuple[int, np.ndarray]]: summary = describe_audio_clip(audio) user_prompt = (prompt or "React to the current scene.").strip() if audio is None: ai_line = ( "AI response pending: record or upload an audio clip so the agent can react." ) response_audio = synthesize_tone() return summary, None, ai_line, response_audio ai_line = ( "Imaginary AI companion: I'm using your latest microphone input " f"and the prompt \"{user_prompt}\" to craft a response." ) response_audio = synthesize_tone() return summary, audio, ai_line, response_audio def render_scene( scene: SceneState, index: int, total: int, variables: dict ) -> tuple[str, str, str, bool, bool, bool, bool, Optional[List[Choice]], Optional[InputRequest]]: """Generate the HTML stage, dialogue text, and metadata.""" char_layers = [] for sprite in scene.characters.values(): if not sprite.visible: continue offset = POSITION_OFFSETS.get(sprite.position, "50%") # Build class names with animation class_names = "character" if sprite.animation: class_names += f" anim-{sprite.animation}" # Apply scale using CSS variable (so animations can use it) char_layers.append( f"""
""" ) dialogue_markdown = ( "" if scene.text else "" ) # Avoid duplicating the speech bubble content below the stage. metadata = f"{scene.background_label or 'Scene'} ยท {index + 1} / {total}" bubble_html = "" text_content = (scene.text or "").strip() # Substitute variables in text (e.g., {player_name}) for var_name, var_value in variables.items(): text_content = text_content.replace(f"{{{var_name}}}", str(var_value)) if text_content: speaker_html = ( f'
{scene.speaker}
' if scene.speaker else "" ) bubble_html = f"""
{speaker_html}
{text_content}
""" # Apply blur filters to background and stage bg_blur_style = f"filter: blur({scene.background_blur}px);" if scene.background_blur > 0 else "" stage_blur_style = f"filter: blur({scene.stage_blur}px);" if scene.stage_blur > 0 else "" # Build stage layer HTML if stage image is set stage_layer_html = "" if scene.stage_url: stage_layer_html = f'
' stage_html = f"""
{stage_layer_html} {''.join(char_layers)} {bubble_html}
""" return ( stage_html, dialogue_markdown, metadata, scene.show_camera, scene.show_voice, scene.show_motors, scene.show_robot, scene.choices, scene.input_request, ) def is_scene_accessible(scene: SceneState, active_paths: set) -> bool: """Check if a scene is accessible given the active story paths.""" # Scenes with no path are always accessible (main path) if scene.path is None: return True # Scenes with a specific path are only accessible if that path is active return scene.path in active_paths def change_scene( story_state: dict, direction: int ) -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]: scenes: List[SceneState] = story_state["scenes"] variables = story_state.get("variables", {}) active_paths = story_state.get("active_paths", set()) if not scenes: return ( story_state, "", "No scenes available.", "", camera_hint_text(False), gr.update(visible=False), voice_hint_text(False), gr.update(visible=False), motor_hint_text(False), gr.update(visible=False), robot_hint_text(False), gr.update(visible=False), gr.update(visible=False, choices=[]), gr.update(visible=False), gr.update(interactive=True), gr.update(interactive=True), gr.update(visible=False), # right_column ) total = len(scenes) current_index = story_state["index"] # Find the next accessible scene in the given direction new_index = current_index search_index = current_index + direction while 0 <= search_index < total: if is_scene_accessible(scenes[search_index], active_paths): new_index = search_index break search_index += direction story_state["index"] = new_index html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene( scenes[story_state["index"]], story_state["index"], total, variables ) # Disable navigation when choices or input are present nav_enabled = not bool(choices) and not bool(input_req) # Show right column if any feature is active right_column_visible = show_camera or show_voice or show_motors or show_robot return ( story_state, html, dialogue, meta, camera_hint_text(show_camera), gr.update(visible=show_camera), voice_hint_text(show_voice), gr.update(visible=show_voice), motor_hint_text(show_motors), gr.update(visible=show_motors), robot_hint_text(show_robot), gr.update(visible=show_robot), gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None), f"### {input_req.prompt}" if input_req else "", gr.update(visible=bool(input_req)), gr.update(interactive=nav_enabled), gr.update(interactive=nav_enabled), gr.update(visible=right_column_visible), # right_column ) def handle_choice(story_state: dict, choice_index: int) -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]: """Navigate to the scene selected by the choice.""" scenes: List[SceneState] = story_state["scenes"] variables = story_state.get("variables", {}) active_paths = story_state.get("active_paths", set()) current_scene = scenes[story_state["index"]] if current_scene.choices and 0 <= choice_index < len(current_scene.choices): chosen = current_scene.choices[choice_index] story_state["index"] = chosen.next_scene_index # Activate the path of the chosen scene target_scene = scenes[chosen.next_scene_index] if target_scene.path: active_paths = set(active_paths) # Copy the set active_paths.add(target_scene.path) story_state["active_paths"] = active_paths html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene( scenes[story_state["index"]], story_state["index"], len(scenes), variables ) nav_enabled = not bool(choices) and not bool(input_req) right_column_visible = show_camera or show_voice or show_motors or show_robot return ( story_state, html, dialogue, meta, camera_hint_text(show_camera), gr.update(visible=show_camera), voice_hint_text(show_voice), gr.update(visible=show_voice), motor_hint_text(show_motors), gr.update(visible=show_motors), robot_hint_text(show_robot), gr.update(visible=show_robot), gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None), f"### {input_req.prompt}" if input_req else "", gr.update(visible=bool(input_req)), gr.update(interactive=nav_enabled), gr.update(interactive=nav_enabled), gr.update(visible=right_column_visible), # right_column ) return change_scene(story_state, 0) def handle_input(story_state: dict, user_input: str) -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]: """Store user input and advance to next scene.""" scenes: List[SceneState] = story_state["scenes"] variables = story_state.get("variables", {}) current_scene = scenes[story_state["index"]] if current_scene.input_request and user_input: variables[current_scene.input_request.variable_name] = user_input story_state["variables"] = variables # Advance to next scene story_state["index"] = min(story_state["index"] + 1, len(scenes) - 1) html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene( scenes[story_state["index"]], story_state["index"], len(scenes), variables ) nav_enabled = not bool(choices) and not bool(input_req) right_column_visible = show_camera or show_voice or show_motors or show_robot return ( story_state, html, dialogue, meta, camera_hint_text(show_camera), gr.update(visible=show_camera), voice_hint_text(show_voice), gr.update(visible=show_voice), motor_hint_text(show_motors), gr.update(visible=show_motors), robot_hint_text(show_robot), gr.update(visible=show_robot), gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None), f"### {input_req.prompt}" if input_req else "", gr.update(visible=bool(input_req)), gr.update(interactive=nav_enabled), gr.update(interactive=nav_enabled), gr.update(visible=right_column_visible), # right_column ) def load_initial_state() -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]: scenes = build_sample_story() story_state = {"scenes": scenes, "index": 0, "variables": {}, "active_paths": set()} if scenes: html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene( scenes[0], 0, len(scenes), {} ) else: html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = ( "", "No scenes available.", "", False, False, False, False, None, None, ) nav_enabled = not bool(choices) and not bool(input_req) right_column_visible = show_camera or show_voice or show_motors or show_robot return ( story_state, html, dialogue, meta, camera_hint_text(show_camera), gr.update(visible=show_camera), voice_hint_text(show_voice), gr.update(visible=show_voice), motor_hint_text(show_motors), gr.update(visible=show_motors), robot_hint_text(show_robot), gr.update(visible=show_robot), gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None), f"### {input_req.prompt}" if input_req else "", gr.update(visible=bool(input_req)), gr.update(interactive=nav_enabled), gr.update(interactive=nav_enabled), gr.update(visible=right_column_visible), # right_column ) CUSTOM_CSS = """ /* Override Gradio's height constraints for stage container */ #stage-container { height: auto !important; max-height: none !important; } #stage-container > div { height: auto !important; } .stage { width: 100%; height: 80vh; min-height: 600px; border-radius: 0; position: relative; overflow: hidden; box-shadow: 0 12px 32px rgba(15,23,42,0.45); display: flex; align-items: flex-end; justify-content: center; } /* Ensure background layers fill the stage */ .stage-background, .stage-layer { max-height: none !important; } .stage-background { position: absolute; top: 0; left: 0; width: 100%; height: 100%; background-size: contain; background-position: center; background-repeat: no-repeat; z-index: 0; } .stage-layer { position: absolute; top: 0; left: 0; width: 100%; height: 100%; background-size: contain; background-position: center; background-repeat: no-repeat; z-index: 5; } .character { position: absolute; bottom: 0; width: 200px; height: 380px; background-size: contain; background-repeat: no-repeat; --char-scale: 1.0; transform: translateX(-50%) scale(var(--char-scale)); transition: transform 0.4s ease; z-index: 10; } /* Character animations */ .character.anim-idle { animation: anim-idle 4s ease-in-out infinite; } .character.anim-shake { animation: anim-shake 0.5s ease-in-out; } .character.anim-bounce { animation: anim-bounce 0.6s ease-in-out; } .character.anim-pulse { animation: anim-pulse 1s ease-in-out infinite; } .speech-bubble { position: absolute; bottom: 18px; left: 50%; transform: translateX(-50%); min-width: 60%; max-width: 90%; padding: 20px 24px; border-radius: 20px; background: rgba(15,23,42,0.88); color: #f8fafc; font-family: "Atkinson Hyperlegible", system-ui, sans-serif; box-shadow: 0 10px 28px rgba(0,0,0,0.35); z-index: 20; } .speech-bubble::after { content: ""; position: absolute; bottom: -16px; left: 50%; transform: translateX(-50%); border-width: 16px 12px 0 12px; border-style: solid; border-color: rgba(15,23,42,0.88) transparent transparent transparent; } .bubble-speaker { font-size: 0.85rem; letter-spacing: 0.08em; font-weight: 700; text-transform: uppercase; color: #facc15; margin-bottom: 6px; } .bubble-text { font-size: 1.05rem; line-height: 1.5; } .camera-column { position: relative; min-height: 360px; gap: 0.75rem; } .camera-hint { font-size: 0.85rem; color: #cbd5f5; margin-bottom: 0.4rem; } #camera-wrapper { width: 100%; max-width: 320px; } #camera-wrapper > div { border-radius: 18px; background: rgba(15,23,42,0.88); padding: 6px; box-shadow: 0 12px 26px rgba(15,23,42,0.55); } #camera-wrapper video { border-radius: 14px; object-fit: cover; box-shadow: 0 10px 30px rgba(0,0,0,0.4); } .dxl-card { margin-top: 0.5rem; padding: 1rem 1.2rem; border-radius: 14px; background: rgba(15,23,42,0.85); color: #e2e8f0; box-shadow: 0 10px 26px rgba(0,0,0,0.45); } .dxl-card h3 { margin: 0 0 0.35rem 0; } .dxl-row { display: flex; gap: 0.6rem; align-items: center; margin-bottom: 0.5rem; flex-wrap: wrap; } .dxl-row label { font-size: 0.9rem; color: #cbd5e1; } .dxl-row input[type="number"], .dxl-row select, .dxl-row input[type="range"] { flex: 1; min-width: 120px; } .dxl-btn { padding: 0.5rem 0.8rem; border-radius: 10px; border: 1px solid rgba(148,163,184,0.4); background: rgba(255,255,255,0.05); color: #e2e8f0; cursor: pointer; transition: transform 0.1s ease, background 0.15s ease; } .dxl-btn.primary { background: linear-gradient(120deg, #06b6d4, #2563eb); border-color: rgba(59,130,246,0.5); } .dxl-btn:disabled { opacity: 0.5; cursor: not-allowed; } .dxl-btn:not(:disabled):hover { transform: translateY(-1px); } .dxl-status { font-size: 0.9rem; color: #a5b4fc; min-height: 1.2rem; } .input-prompt { font-size: 1.1rem; font-weight: 600; color: #1e293b; margin-bottom: 0.5rem; } @keyframes anim-idle { 0% { transform: translate(-50%, 0px) scale(var(--char-scale)); } 50% { transform: translate(-50%, 12px) scale(var(--char-scale)); } 100% { transform: translate(-50%, 0px) scale(var(--char-scale)); } } @keyframes anim-shake { 0%, 100% { transform: translate(-50%, 0) rotate(0deg) scale(var(--char-scale)); } 10%, 30%, 50%, 70%, 90% { transform: translate(-52%, 0) rotate(-2deg) scale(var(--char-scale)); } 20%, 40%, 60%, 80% { transform: translate(-48%, 0) rotate(2deg) scale(var(--char-scale)); } } @keyframes anim-bounce { 0%, 100% { transform: translate(-50%, 0) scale(var(--char-scale)); } 25% { transform: translate(-50%, -30px) scale(var(--char-scale)); } 50% { transform: translate(-50%, 0) scale(var(--char-scale)); } 75% { transform: translate(-50%, -15px) scale(var(--char-scale)); } } @keyframes anim-pulse { 0%, 100% { transform: translate(-50%, 0) scale(var(--char-scale)); } 50% { transform: translate(-50%, 0) scale(calc(var(--char-scale) * 1.05)); } } """ ENUMERATE_CAMERAS_JS = """ async (currentDevices) => { if (!navigator.mediaDevices?.enumerateDevices) { return currentDevices || []; } try { const devices = await navigator.mediaDevices.enumerateDevices(); return devices .filter((device) => device.kind === "videoinput") .map((device, index) => ({ label: device.label || `Camera ${index + 1}`, deviceId: device.deviceId || null, })); } catch (error) { console.warn("enumerateDevices failed", error); return currentDevices || []; } } """ def load_dxl_script_js() -> str: """Inline the DXL script content directly.""" script_dir = os.path.dirname(os.path.abspath(__file__)) js_path = os.path.join(script_dir, "web", "dxl_webserial.js") try: with open(js_path, 'r', encoding='utf-8') as f: js_content = f.read() except Exception as e: js_content = f"console.error('[DXL] Failed to load script: {e}');" # Properly escape for JavaScript template literal js_content_escaped = js_content.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${') return f""" () => {{ try {{ // Execute inline script const scriptFn = new Function({repr(js_content)}); scriptFn(); console.log('[DXL] Script loaded inline'); }} catch(e) {{ console.error('[DXL] Failed to execute:', e); }} }} """ def dxl_send_and_receive_js() -> str: """JavaScript to send packet bytes and receive response via Web Serial.""" return """ async (packet_bytes) => { // Check if dxlSerial is available and connected if (typeof window.dxlSerial === 'undefined' || !window.dxlSerial) { console.error("[DXL] Serial not available - connect first"); return []; } if (!window.dxlSerial.connected) { console.error("[DXL] Not connected to serial port"); return []; } try { await window.dxlSerial.writeBytes(packet_bytes); const response = await window.dxlSerial.readPacket(800); return response; } catch (err) { console.error("[DXL] Communication error:", err.message); return []; } } """ def execute_motor_packets_js() -> str: """JavaScript to execute pre-built motor packets.""" return """ async (packets) => { if (!packets || packets.length === 0) { return; // No packets to execute } // Check if serial is available if (typeof window.dxlSerial === 'undefined' || !window.dxlSerial || !window.dxlSerial.connected) { return; // Silently skip if not connected } // Execute each packet sequentially for (const pkt of packets) { try { await window.dxlSerial.writeBytes(pkt); await window.dxlSerial.readPacket(800); } catch (err) { console.error(`[Motors] Error:`, err.message); } } } """ def play_scene_audio_js() -> str: """JavaScript to play audio file.""" return """ (audio_path) => { if (!audio_path || audio_path === '') { return; // No audio to play } // Create or reuse audio element let audio = document.getElementById('scene-audio-player'); if (!audio) { audio = new Audio(); audio.id = 'scene-audio-player'; } console.log('[Audio] Playing:', audio_path); audio.src = audio_path; audio.play().catch(err => console.error('[Audio] Playback failed:', err)); } """ def load_robot_ws_script_js() -> str: """JavaScript to initialize WebSocket connection to Reachy Mini robot.""" return """ () => { console.log('[Robot] Initializing WebSocket connection...'); // Define global initialization function if not already defined if (!window.loadRobotWebSocket) { window.loadRobotWebSocket = function() { const hostDiv = document.getElementById('robot-ws-host'); if (!hostDiv) { console.error('[Robot] Cannot initialize - host div not found'); return; } const ROBOT_URL = 'localhost:8000'; const WS_URL = `ws://${ROBOT_URL}/api/move/ws/set_target`; console.log('[Robot] Connecting to:', WS_URL); // Global robot state window.reachyRobot = { ws: null, connected: false }; // Create UI hostDiv.innerHTML = `
Disconnected - Trying to connect...
`; function updateStatus(connected) { const statusDiv = document.getElementById('robot-connection-status'); const dot = document.getElementById('robot-status-dot'); const text = document.getElementById('robot-status-text'); if (connected) { statusDiv.style.background = '#d4edda'; statusDiv.style.color = '#155724'; dot.style.background = '#28a745'; dot.style.boxShadow = '0 0 10px #28a745'; text.textContent = 'Connected to robot'; } else { statusDiv.style.background = '#f8d7da'; statusDiv.style.color = '#721c24'; dot.style.background = '#dc3545'; dot.style.boxShadow = 'none'; text.textContent = 'Disconnected - Reconnecting...'; } } function connectWebSocket() { console.log('[Robot] Connecting to WebSocket:', WS_URL); window.reachyRobot.ws = new WebSocket(WS_URL); window.reachyRobot.ws.onopen = () => { console.log('[Robot] WebSocket connected'); window.reachyRobot.connected = true; updateStatus(true); }; window.reachyRobot.ws.onclose = () => { console.log('[Robot] WebSocket disconnected'); window.reachyRobot.connected = false; updateStatus(false); // Reconnect after 2 seconds setTimeout(connectWebSocket, 2000); }; window.reachyRobot.ws.onerror = (error) => { console.error('[Robot] WebSocket error:', error); }; window.reachyRobot.ws.onmessage = (event) => { try { const message = JSON.parse(event.data); if (message.status === 'error') { console.error('[Robot] Server error:', message.detail); } } catch (e) { console.error('[Robot] Failed to parse message:', e); } }; } connectWebSocket(); }; // End of window.loadRobotWebSocket definition } // Try to initialize (with multiple retries) let retryCount = 0; const maxRetries = 10; function tryInit() { const hostDiv = document.getElementById('robot-ws-host'); if (!hostDiv) { retryCount++; if (retryCount <= maxRetries) { console.warn(`[Robot] Host div not found, retry ${retryCount}/${maxRetries} in 1 second`); setTimeout(tryInit, 1000); } else { console.warn('[Robot] Gave up waiting for robot widget div. Will initialize on first use.'); } return; } if (window.reachyRobot) { console.log('[Robot] Already initialized'); return; } // Initialize now console.log('[Robot] Found host div, initializing...'); window.loadRobotWebSocket(); } tryInit(); } """ def send_robot_pose_js() -> str: """JavaScript to send robot pose via WebSocket.""" return """ async (pose_data) => { if (!pose_data) { return; // No pose to send } // Initialize WebSocket if not already done (lazy initialization) if (!window.reachyRobot) { console.log('[Robot] Lazy initialization on first pose send'); if (window.loadRobotWebSocket) { window.loadRobotWebSocket(); // Wait a bit for connection to establish await new Promise(resolve => setTimeout(resolve, 500)); } } if (!window.reachyRobot || !window.reachyRobot.connected || !window.reachyRobot.ws || window.reachyRobot.ws.readyState !== WebSocket.OPEN) { console.warn('[Robot] WebSocket not connected, skipping pose command'); return; } try { console.log('[Robot] Sending pose:', pose_data); window.reachyRobot.ws.send(JSON.stringify(pose_data)); } catch (error) { console.error('[Robot] Failed to send pose:', error); } } """ def build_app() -> gr.Blocks: with gr.Blocks(title="Gradio Visual Novel") as demo: gr.HTML(f"", elem_id="vn-styles") story_state = gr.State() with gr.Row(): with gr.Column(scale=3, min_width=640): stage = gr.HTML(label="Stage", elem_id="stage-container") dialogue = gr.Markdown(label="Dialogue") meta = gr.Markdown(label="Scene Info", elem_id="scene-info") # Choice selection choice_radio = gr.Radio(label="Make a choice", visible=False) # Text input with gr.Group(visible=False) as input_group: input_prompt = gr.Markdown("", elem_classes=["input-prompt"]) with gr.Row(): user_input = gr.Textbox(label="Your answer", scale=4) input_submit_btn = gr.Button("Submit", variant="primary", scale=1) with gr.Row(): prev_btn = gr.Button("โŸต Back", variant="secondary") next_btn = gr.Button("Next โŸถ", variant="primary") with gr.Column(scale=1, min_width=320, elem_classes=["camera-column"], visible=False) as right_column: gr.Markdown("### Live Camera (WebRTC)") camera_hint = gr.Markdown( camera_hint_text(False), elem_classes=["camera-hint"] ) gr.Markdown( "Allow camera access when prompted. The webcam appears only in scenes that request it.", elem_classes=["camera-hint"], ) with gr.Group(elem_id="camera-wrapper"): webrtc_component = WebRTC( label="Webcam Stream", mode="send-receive", modality="video", full_screen=False, visible=False, ) webrtc_component.stream( fn=passthrough_stream, inputs=[webrtc_component], outputs=[webrtc_component], ) voice_hint = gr.Markdown( voice_hint_text(False), elem_classes=["camera-hint"] ) with gr.Group(visible=False, elem_id="voice-wrapper") as voice_section: with gr.Accordion("Voice & Audio Agent", open=True): gr.Markdown( "Record a short line to pass to your AI companion. " "We play back your clip and a synthetic confirmation tone.", elem_classes=["camera-hint"], ) voice_prompt = gr.Textbox( label="Prompt/context", value="React to the current scene with a friendly reply.", lines=2, ) mic = gr.Audio( sources=["microphone", "upload"], type="numpy", label="Record or upload audio", ) send_voice_btn = gr.Button( "Send to voice agent", variant="secondary" ) voice_summary = gr.Markdown("No audio captured yet.") playback = gr.Audio(label="Your recording", interactive=False) ai_voice_text = gr.Markdown("AI response will appear here.") ai_voice_audio = gr.Audio( label="AI voice reply (synthetic tone)", interactive=False ) send_voice_btn.click( fn=process_voice_interaction, inputs=[mic, voice_prompt], outputs=[ voice_summary, playback, ai_voice_text, ai_voice_audio, ], ) motor_hint = gr.Markdown( motor_hint_text(False), elem_classes=["camera-hint"] ) with gr.Group(visible=False, elem_id="dxl-panel-container") as motor_group: with gr.Accordion("Dynamixel XL330 Control", open=True): gr.Markdown( "**Web Serial Control** - Use Chrome/Edge desktop. Connect to serial port, then control motors.", elem_classes=["camera-hint"], ) # Serial connection panel (still handled by JavaScript) gr.HTML('
', elem_id="dxl-panel-host-wrapper") # Motor control inputs (Python-based) with gr.Row(): motor_id_input = gr.Number( label="Motor ID", value=1, minimum=0, maximum=252, precision=0, ) with gr.Row(): goal_slider = gr.Slider( label="Goal Position (degrees)", minimum=0, maximum=360, value=90, step=1, ) with gr.Row(): ping_btn = gr.Button("Ping", size="sm") torque_on_btn = gr.Button("Torque ON", size="sm", variant="secondary") torque_off_btn = gr.Button("Torque OFF", size="sm") with gr.Row(): send_goal_btn = gr.Button("Send Goal Position", variant="primary") motor_status = gr.Markdown("Status: Ready") # Robot Control (Reachy Mini via WebSocket) robot_hint = gr.Markdown( robot_hint_text(False), elem_classes=["camera-hint"] ) with gr.Group(visible=False, elem_id="robot-panel-container") as robot_group: with gr.Accordion("Reachy Mini Robot Control", open=True): gr.Markdown( "**WebSocket Control** - Connects to localhost:8000 for real-time robot control.", elem_classes=["camera-hint"], ) # WebSocket connection area (will be managed by JavaScript) # Status is shown dynamically by JavaScript inside this div gr.HTML('
', elem_id="robot-ws-host-wrapper") # Wire up event handlers all_outputs = [ story_state, stage, dialogue, meta, camera_hint, webrtc_component, voice_hint, voice_section, motor_hint, motor_group, robot_hint, robot_group, choice_radio, input_prompt, input_group, prev_btn, next_btn, right_column, ] # Hidden JSON for passing packet bytes between Python and JavaScript # Note: gr.State doesn't work well with JavaScript, so we use JSON packet_bytes_json = gr.JSON(visible=False, value=[]) response_bytes_json = gr.JSON(visible=False, value=[]) motor_packets_json = gr.JSON(visible=False, value=[]) # For scene motor commands # Hidden textbox for passing audio path to JavaScript audio_path_box = gr.Textbox(visible=False, value="") # Hidden JSON for passing robot pose to JavaScript robot_pose_json = gr.JSON(visible=False, value=None) # Load initialization scripts combined_init_js = f""" () => {{ // Initialize Dynamixel ({load_dxl_script_js()})(); // Initialize Robot WebSocket ({load_robot_ws_script_js()})(); }} """ demo.load( fn=load_initial_state, inputs=None, outputs=all_outputs, js=combined_init_js, ) # Navigation buttons with automatic motor command execution, audio playback, and robot control # Create parallel chains for audio, motors, and robot to ensure all get the updated state # Previous button prev_event = prev_btn.click( fn=lambda state: change_scene(state, -1), inputs=story_state, outputs=all_outputs, ) # Audio chain prev_event.then( fn=get_scene_audio, inputs=[story_state], outputs=[audio_path_box], ).then( fn=None, inputs=[audio_path_box], outputs=[], js=play_scene_audio_js(), ) # Motor chain (parallel) prev_event.then( fn=get_scene_motor_packets, inputs=[story_state], outputs=[motor_packets_json], ).then( fn=None, inputs=[motor_packets_json], outputs=[], js=execute_motor_packets_js(), ) # Robot chain (parallel) prev_event.then( fn=get_scene_robot_pose, inputs=[story_state], outputs=[robot_pose_json], ).then( fn=None, inputs=[robot_pose_json], outputs=[], js=send_robot_pose_js(), ) # Next button next_event = next_btn.click( fn=lambda state: change_scene(state, 1), inputs=story_state, outputs=all_outputs, ) # Audio chain next_event.then( fn=get_scene_audio, inputs=[story_state], outputs=[audio_path_box], ).then( fn=None, inputs=[audio_path_box], outputs=[], js=play_scene_audio_js(), ) # Motor chain (parallel) next_event.then( fn=get_scene_motor_packets, inputs=[story_state], outputs=[motor_packets_json], ).then( fn=None, inputs=[motor_packets_json], outputs=[], js=execute_motor_packets_js(), ) # Robot chain (parallel) next_event.then( fn=get_scene_robot_pose, inputs=[story_state], outputs=[robot_pose_json], ).then( fn=None, inputs=[robot_pose_json], outputs=[], js=send_robot_pose_js(), ) # Choice handler choice_event = choice_radio.change( fn=handle_choice, inputs=[story_state, choice_radio], outputs=all_outputs, ) # Audio chain choice_event.then( fn=get_scene_audio, inputs=[story_state], outputs=[audio_path_box], ).then( fn=None, inputs=[audio_path_box], outputs=[], js=play_scene_audio_js(), ) # Motor chain (parallel) choice_event.then( fn=get_scene_motor_packets, inputs=[story_state], outputs=[motor_packets_json], ).then( fn=None, inputs=[motor_packets_json], outputs=[], js=execute_motor_packets_js(), ) # Robot chain (parallel) choice_event.then( fn=get_scene_robot_pose, inputs=[story_state], outputs=[robot_pose_json], ).then( fn=None, inputs=[robot_pose_json], outputs=[], js=send_robot_pose_js(), ) # Input submit button input_submit_event = input_submit_btn.click( fn=handle_input, inputs=[story_state, user_input], outputs=all_outputs, ) # Audio chain input_submit_event.then( fn=get_scene_audio, inputs=[story_state], outputs=[audio_path_box], ).then( fn=None, inputs=[audio_path_box], outputs=[], js=play_scene_audio_js(), ) # Motor chain (parallel) input_submit_event.then( fn=get_scene_motor_packets, inputs=[story_state], outputs=[motor_packets_json], ).then( fn=None, inputs=[motor_packets_json], outputs=[], js=execute_motor_packets_js(), ) # Robot chain (parallel) input_submit_event.then( fn=get_scene_robot_pose, inputs=[story_state], outputs=[robot_pose_json], ).then( fn=None, inputs=[robot_pose_json], outputs=[], js=send_robot_pose_js(), ) # Input enter key input_enter_event = user_input.submit( fn=handle_input, inputs=[story_state, user_input], outputs=all_outputs, ) # Audio chain input_enter_event.then( fn=get_scene_audio, inputs=[story_state], outputs=[audio_path_box], ).then( fn=None, inputs=[audio_path_box], outputs=[], js=play_scene_audio_js(), ) # Motor chain (parallel) input_enter_event.then( fn=get_scene_motor_packets, inputs=[story_state], outputs=[motor_packets_json], ).then( fn=None, inputs=[motor_packets_json], outputs=[], js=execute_motor_packets_js(), ) # Robot chain (parallel) input_enter_event.then( fn=get_scene_robot_pose, inputs=[story_state], outputs=[robot_pose_json], ).then( fn=None, inputs=[robot_pose_json], outputs=[], js=send_robot_pose_js(), ) # Motor control event handlers # Pattern: Python builds packet -> JS sends/receives -> Python parses # Ping button ping_btn.click( fn=dxl_build_ping_packet, inputs=[motor_id_input], outputs=[packet_bytes_json], ).then( fn=None, inputs=[packet_bytes_json], outputs=[response_bytes_json], js=dxl_send_and_receive_js(), ).then( fn=dxl_parse_response, inputs=[response_bytes_json], outputs=[motor_status], ) # Torque ON button torque_on_btn.click( fn=lambda motor_id: dxl_build_torque_packet(motor_id, True), inputs=[motor_id_input], outputs=[packet_bytes_json], ).then( fn=None, inputs=[packet_bytes_json], outputs=[response_bytes_json], js=dxl_send_and_receive_js(), ).then( fn=dxl_parse_response, inputs=[response_bytes_json], outputs=[motor_status], ) # Torque OFF button torque_off_btn.click( fn=lambda motor_id: dxl_build_torque_packet(motor_id, False), inputs=[motor_id_input], outputs=[packet_bytes_json], ).then( fn=None, inputs=[packet_bytes_json], outputs=[response_bytes_json], js=dxl_send_and_receive_js(), ).then( fn=dxl_parse_response, inputs=[response_bytes_json], outputs=[motor_status], ) # Send goal position button send_goal_btn.click( fn=dxl_build_goal_position_packet, inputs=[motor_id_input, goal_slider], outputs=[packet_bytes_json], ).then( fn=None, inputs=[packet_bytes_json], outputs=[response_bytes_json], js=dxl_send_and_receive_js(), ).then( fn=dxl_parse_response, inputs=[response_bytes_json], outputs=[motor_status], ) return demo def main() -> None: """Launch the Visual Novel Gradio app.""" logger.info("=== Visual Novel App Startup ===") logger.info("Using HuggingFace repo URLs for assets") # Build Gradio app demo = build_app() # Launch with SSR disabled demo.launch( server_name="0.0.0.0", server_port=7860, ssr_mode=False, show_error=True, ) if __name__ == "__main__": main()