"""Visual Novel Gradio App - Main application with UI and handlers."""
from __future__ import annotations
import os
import urllib.parse
import numpy as np
import logging
from typing import Optional
import gradio as gr
from fastrtc import WebRTC
from engine import SceneState, POSITION_OFFSETS, Choice, InputRequest
from story import build_sample_story
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def passthrough_stream(frame):
"""Return the incoming frame untouched so the user sees their feed."""
return frame
def camera_hint_text(show_camera: bool) -> str:
if show_camera:
return "๐ฅ Webcam overlay is active for this scene."
return "๐น๏ธ Webcam is hidden for this scene."
def voice_hint_text(show_voice: bool) -> str:
if show_voice:
return "๐ค Voice capture is available in this scene."
return "๐ Voice capture is hidden for this scene."
def motor_hint_text(show_motors: bool) -> str:
if show_motors:
return "๐ค Motor control is available in this scene."
return "๐ Motor control hidden for this scene."
def robot_hint_text(show_robot: bool) -> str:
if show_robot:
return "๐ค Robot control is available in this scene."
return "๐ Robot control hidden for this scene."
# Dynamixel control functions using Python protocol implementation
def dxl_build_ping_packet(motor_id: int) -> list[int]:
"""Build a ping packet and return as list of bytes."""
import dynamixel
packet = dynamixel.ping_packet(motor_id)
return list(packet)
def dxl_build_torque_packet(motor_id: int, enable: bool) -> list[int]:
"""Build a torque enable/disable packet and return as list of bytes."""
import dynamixel
packet = dynamixel.torque_enable_packet(motor_id, enable)
return list(packet)
def dxl_build_goal_position_packet(motor_id: int, degrees: float) -> list[int]:
"""Build a goal position packet and return as list of bytes."""
import dynamixel
# Convert degrees to ticks (0-360ยฐ -> 0-4095)
clamped_deg = max(0.0, min(360.0, degrees))
ticks = int((clamped_deg / 360.0) * 4095)
packet = dynamixel.goal_position_packet(motor_id, ticks)
return list(packet)
def dxl_parse_response(response_bytes: list[int]) -> str:
"""Parse a status packet response and return human-readable result."""
import dynamixel
if not response_bytes:
return "โ No response received"
success, message = dynamixel.parse_status_packet(bytes(response_bytes))
if success:
return f"โ
{message}"
else:
return f"โ {message}"
def get_scene_motor_packets(story_state: dict) -> list:
"""Extract motor commands from current scene and build packets."""
scenes = story_state["scenes"]
current_index = story_state["index"]
if 0 <= current_index < len(scenes):
scene = scenes[current_index]
# Build packet for each motor command
packets = []
for cmd in scene.motor_commands:
packet = dxl_build_goal_position_packet(cmd.motor_id, cmd.position)
packets.append(packet)
return packets
return []
def get_scene_audio(story_state: dict) -> Optional[str]:
"""Extract audio file from current scene."""
scenes = story_state["scenes"]
current_index = story_state["index"]
if 0 <= current_index < len(scenes):
scene = scenes[current_index]
return scene.audio_file
return None
def get_scene_robot_pose(story_state: dict) -> Optional[dict]:
"""Extract robot pose from current scene."""
scenes = story_state["scenes"]
current_index = story_state["index"]
if 0 <= current_index < len(scenes):
scene = scenes[current_index]
if scene.robot_pose:
return {
"target_head_pose": {
"x": scene.robot_pose.head_x,
"y": scene.robot_pose.head_y,
"z": scene.robot_pose.head_z,
"roll": scene.robot_pose.head_roll,
"pitch": scene.robot_pose.head_pitch,
"yaw": scene.robot_pose.head_yaw,
},
"target_body_yaw": scene.robot_pose.body_yaw,
"target_antennas": [scene.robot_pose.antenna_left, scene.robot_pose.antenna_right],
}
return None
def synthesize_tone(sample_rate: int = 16000, duration: float = 1.25) -> tuple[int, np.ndarray]:
"""Generate a short confirmation tone to play back as the AI voice."""
samples = np.linspace(0, duration, int(sample_rate * duration), endpoint=False)
carrier = np.sin(2 * np.pi * 520 * samples) + 0.4 * np.sin(2 * np.pi * 880 * samples)
fade_len = int(sample_rate * 0.08)
envelope = np.ones_like(carrier)
envelope[:fade_len] *= np.linspace(0.0, 1.0, fade_len)
envelope[-fade_len:] *= np.linspace(1.0, 0.0, fade_len)
tone = 0.18 * carrier * envelope
return sample_rate, tone.astype(np.float32)
def describe_audio_clip(audio: Optional[tuple[int, np.ndarray]]) -> str:
if audio is None:
return "No audio captured yet. Hit record to speak with the companion."
sample_rate, samples = audio
num_samples = len(samples) if samples is not None else 0
if num_samples == 0:
return "Audio appears empty. Please re-record."
duration = num_samples / float(sample_rate or 1)
rms = float(np.sqrt(np.mean(np.square(samples))))
return f"Captured {duration:.2f}s of audio (RMS ~{rms:.3f}). Ready for the AI."
def process_voice_interaction(
audio: Optional[tuple[int, np.ndarray]], prompt: str
) -> tuple[str, Optional[tuple[int, np.ndarray]], str, tuple[int, np.ndarray]]:
summary = describe_audio_clip(audio)
user_prompt = (prompt or "React to the current scene.").strip()
if audio is None:
ai_line = (
"AI response pending: record or upload an audio clip so the agent can react."
)
response_audio = synthesize_tone()
return summary, None, ai_line, response_audio
ai_line = (
"Imaginary AI companion: I'm using your latest microphone input "
f"and the prompt \"{user_prompt}\" to craft a response."
)
response_audio = synthesize_tone()
return summary, audio, ai_line, response_audio
def render_scene(
scene: SceneState, index: int, total: int, variables: dict
) -> tuple[str, str, str, bool, bool, bool, bool, Optional[List[Choice]], Optional[InputRequest]]:
"""Generate the HTML stage, dialogue text, and metadata."""
char_layers = []
for sprite in scene.characters.values():
if not sprite.visible:
continue
offset = POSITION_OFFSETS.get(sprite.position, "50%")
# Build class names with animation
class_names = "character"
if sprite.animation:
class_names += f" anim-{sprite.animation}"
# Apply scale using CSS variable (so animations can use it)
char_layers.append(
f"""
"""
)
dialogue_markdown = (
"" if scene.text else ""
) # Avoid duplicating the speech bubble content below the stage.
metadata = f"{scene.background_label or 'Scene'} ยท {index + 1} / {total}"
bubble_html = ""
text_content = (scene.text or "").strip()
# Substitute variables in text (e.g., {player_name})
for var_name, var_value in variables.items():
text_content = text_content.replace(f"{{{var_name}}}", str(var_value))
if text_content:
speaker_html = (
f'{scene.speaker}
'
if scene.speaker
else ""
)
bubble_html = f"""
{speaker_html}
{text_content}
"""
# Apply blur filters to background and stage
bg_blur_style = f"filter: blur({scene.background_blur}px);" if scene.background_blur > 0 else ""
stage_blur_style = f"filter: blur({scene.stage_blur}px);" if scene.stage_blur > 0 else ""
# Build stage layer HTML if stage image is set
stage_layer_html = ""
if scene.stage_url:
stage_layer_html = f''
stage_html = f"""
{stage_layer_html}
{''.join(char_layers)}
{bubble_html}
"""
return (
stage_html,
dialogue_markdown,
metadata,
scene.show_camera,
scene.show_voice,
scene.show_motors,
scene.show_robot,
scene.choices,
scene.input_request,
)
def is_scene_accessible(scene: SceneState, active_paths: set) -> bool:
"""Check if a scene is accessible given the active story paths."""
# Scenes with no path are always accessible (main path)
if scene.path is None:
return True
# Scenes with a specific path are only accessible if that path is active
return scene.path in active_paths
def change_scene(
story_state: dict, direction: int
) -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]:
scenes: List[SceneState] = story_state["scenes"]
variables = story_state.get("variables", {})
active_paths = story_state.get("active_paths", set())
if not scenes:
return (
story_state,
"",
"No scenes available.",
"",
camera_hint_text(False),
gr.update(visible=False),
voice_hint_text(False),
gr.update(visible=False),
motor_hint_text(False),
gr.update(visible=False),
robot_hint_text(False),
gr.update(visible=False),
gr.update(visible=False, choices=[]),
gr.update(visible=False),
gr.update(interactive=True),
gr.update(interactive=True),
gr.update(visible=False), # right_column
)
total = len(scenes)
current_index = story_state["index"]
# Find the next accessible scene in the given direction
new_index = current_index
search_index = current_index + direction
while 0 <= search_index < total:
if is_scene_accessible(scenes[search_index], active_paths):
new_index = search_index
break
search_index += direction
story_state["index"] = new_index
html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene(
scenes[story_state["index"]], story_state["index"], total, variables
)
# Disable navigation when choices or input are present
nav_enabled = not bool(choices) and not bool(input_req)
# Show right column if any feature is active
right_column_visible = show_camera or show_voice or show_motors or show_robot
return (
story_state,
html,
dialogue,
meta,
camera_hint_text(show_camera),
gr.update(visible=show_camera),
voice_hint_text(show_voice),
gr.update(visible=show_voice),
motor_hint_text(show_motors),
gr.update(visible=show_motors),
robot_hint_text(show_robot),
gr.update(visible=show_robot),
gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None),
f"### {input_req.prompt}" if input_req else "",
gr.update(visible=bool(input_req)),
gr.update(interactive=nav_enabled),
gr.update(interactive=nav_enabled),
gr.update(visible=right_column_visible), # right_column
)
def handle_choice(story_state: dict, choice_index: int) -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]:
"""Navigate to the scene selected by the choice."""
scenes: List[SceneState] = story_state["scenes"]
variables = story_state.get("variables", {})
active_paths = story_state.get("active_paths", set())
current_scene = scenes[story_state["index"]]
if current_scene.choices and 0 <= choice_index < len(current_scene.choices):
chosen = current_scene.choices[choice_index]
story_state["index"] = chosen.next_scene_index
# Activate the path of the chosen scene
target_scene = scenes[chosen.next_scene_index]
if target_scene.path:
active_paths = set(active_paths) # Copy the set
active_paths.add(target_scene.path)
story_state["active_paths"] = active_paths
html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene(
scenes[story_state["index"]], story_state["index"], len(scenes), variables
)
nav_enabled = not bool(choices) and not bool(input_req)
right_column_visible = show_camera or show_voice or show_motors or show_robot
return (
story_state,
html,
dialogue,
meta,
camera_hint_text(show_camera),
gr.update(visible=show_camera),
voice_hint_text(show_voice),
gr.update(visible=show_voice),
motor_hint_text(show_motors),
gr.update(visible=show_motors),
robot_hint_text(show_robot),
gr.update(visible=show_robot),
gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None),
f"### {input_req.prompt}" if input_req else "",
gr.update(visible=bool(input_req)),
gr.update(interactive=nav_enabled),
gr.update(interactive=nav_enabled),
gr.update(visible=right_column_visible), # right_column
)
return change_scene(story_state, 0)
def handle_input(story_state: dict, user_input: str) -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]:
"""Store user input and advance to next scene."""
scenes: List[SceneState] = story_state["scenes"]
variables = story_state.get("variables", {})
current_scene = scenes[story_state["index"]]
if current_scene.input_request and user_input:
variables[current_scene.input_request.variable_name] = user_input
story_state["variables"] = variables
# Advance to next scene
story_state["index"] = min(story_state["index"] + 1, len(scenes) - 1)
html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene(
scenes[story_state["index"]], story_state["index"], len(scenes), variables
)
nav_enabled = not bool(choices) and not bool(input_req)
right_column_visible = show_camera or show_voice or show_motors or show_robot
return (
story_state,
html,
dialogue,
meta,
camera_hint_text(show_camera),
gr.update(visible=show_camera),
voice_hint_text(show_voice),
gr.update(visible=show_voice),
motor_hint_text(show_motors),
gr.update(visible=show_motors),
robot_hint_text(show_robot),
gr.update(visible=show_robot),
gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None),
f"### {input_req.prompt}" if input_req else "",
gr.update(visible=bool(input_req)),
gr.update(interactive=nav_enabled),
gr.update(interactive=nav_enabled),
gr.update(visible=right_column_visible), # right_column
)
def load_initial_state() -> tuple[dict, str, str, str, str, dict, str, dict, str, dict, str, dict, dict, str, dict, dict, dict, dict]:
scenes = build_sample_story()
story_state = {"scenes": scenes, "index": 0, "variables": {}, "active_paths": set()}
if scenes:
html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = render_scene(
scenes[0], 0, len(scenes), {}
)
else:
html, dialogue, meta, show_camera, show_voice, show_motors, show_robot, choices, input_req = (
"",
"No scenes available.",
"",
False,
False,
False,
False,
None,
None,
)
nav_enabled = not bool(choices) and not bool(input_req)
right_column_visible = show_camera or show_voice or show_motors or show_robot
return (
story_state,
html,
dialogue,
meta,
camera_hint_text(show_camera),
gr.update(visible=show_camera),
voice_hint_text(show_voice),
gr.update(visible=show_voice),
motor_hint_text(show_motors),
gr.update(visible=show_motors),
robot_hint_text(show_robot),
gr.update(visible=show_robot),
gr.update(visible=bool(choices), choices=[(c.text, i) for i, c in enumerate(choices)] if choices else [], value=None),
f"### {input_req.prompt}" if input_req else "",
gr.update(visible=bool(input_req)),
gr.update(interactive=nav_enabled),
gr.update(interactive=nav_enabled),
gr.update(visible=right_column_visible), # right_column
)
CUSTOM_CSS = """
/* Override Gradio's height constraints for stage container */
#stage-container {
height: auto !important;
max-height: none !important;
}
#stage-container > div {
height: auto !important;
}
.stage {
width: 100%;
height: 80vh;
min-height: 600px;
border-radius: 0;
position: relative;
overflow: hidden;
box-shadow: 0 12px 32px rgba(15,23,42,0.45);
display: flex;
align-items: flex-end;
justify-content: center;
}
/* Ensure background layers fill the stage */
.stage-background,
.stage-layer {
max-height: none !important;
}
.stage-background {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-size: contain;
background-position: center;
background-repeat: no-repeat;
z-index: 0;
}
.stage-layer {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-size: contain;
background-position: center;
background-repeat: no-repeat;
z-index: 5;
}
.character {
position: absolute;
bottom: 0;
width: 200px;
height: 380px;
background-size: contain;
background-repeat: no-repeat;
--char-scale: 1.0;
transform: translateX(-50%) scale(var(--char-scale));
transition: transform 0.4s ease;
z-index: 10;
}
/* Character animations */
.character.anim-idle {
animation: anim-idle 4s ease-in-out infinite;
}
.character.anim-shake {
animation: anim-shake 0.5s ease-in-out;
}
.character.anim-bounce {
animation: anim-bounce 0.6s ease-in-out;
}
.character.anim-pulse {
animation: anim-pulse 1s ease-in-out infinite;
}
.speech-bubble {
position: absolute;
bottom: 18px;
left: 50%;
transform: translateX(-50%);
min-width: 60%;
max-width: 90%;
padding: 20px 24px;
border-radius: 20px;
background: rgba(15,23,42,0.88);
color: #f8fafc;
font-family: "Atkinson Hyperlegible", system-ui, sans-serif;
box-shadow: 0 10px 28px rgba(0,0,0,0.35);
z-index: 20;
}
.speech-bubble::after {
content: "";
position: absolute;
bottom: -16px;
left: 50%;
transform: translateX(-50%);
border-width: 16px 12px 0 12px;
border-style: solid;
border-color: rgba(15,23,42,0.88) transparent transparent transparent;
}
.bubble-speaker {
font-size: 0.85rem;
letter-spacing: 0.08em;
font-weight: 700;
text-transform: uppercase;
color: #facc15;
margin-bottom: 6px;
}
.bubble-text {
font-size: 1.05rem;
line-height: 1.5;
}
.camera-column {
position: relative;
min-height: 360px;
gap: 0.75rem;
}
.camera-hint {
font-size: 0.85rem;
color: #cbd5f5;
margin-bottom: 0.4rem;
}
#camera-wrapper {
width: 100%;
max-width: 320px;
}
#camera-wrapper > div {
border-radius: 18px;
background: rgba(15,23,42,0.88);
padding: 6px;
box-shadow: 0 12px 26px rgba(15,23,42,0.55);
}
#camera-wrapper video {
border-radius: 14px;
object-fit: cover;
box-shadow: 0 10px 30px rgba(0,0,0,0.4);
}
.dxl-card {
margin-top: 0.5rem;
padding: 1rem 1.2rem;
border-radius: 14px;
background: rgba(15,23,42,0.85);
color: #e2e8f0;
box-shadow: 0 10px 26px rgba(0,0,0,0.45);
}
.dxl-card h3 {
margin: 0 0 0.35rem 0;
}
.dxl-row {
display: flex;
gap: 0.6rem;
align-items: center;
margin-bottom: 0.5rem;
flex-wrap: wrap;
}
.dxl-row label {
font-size: 0.9rem;
color: #cbd5e1;
}
.dxl-row input[type="number"],
.dxl-row select,
.dxl-row input[type="range"] {
flex: 1;
min-width: 120px;
}
.dxl-btn {
padding: 0.5rem 0.8rem;
border-radius: 10px;
border: 1px solid rgba(148,163,184,0.4);
background: rgba(255,255,255,0.05);
color: #e2e8f0;
cursor: pointer;
transition: transform 0.1s ease, background 0.15s ease;
}
.dxl-btn.primary {
background: linear-gradient(120deg, #06b6d4, #2563eb);
border-color: rgba(59,130,246,0.5);
}
.dxl-btn:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.dxl-btn:not(:disabled):hover {
transform: translateY(-1px);
}
.dxl-status {
font-size: 0.9rem;
color: #a5b4fc;
min-height: 1.2rem;
}
.input-prompt {
font-size: 1.1rem;
font-weight: 600;
color: #1e293b;
margin-bottom: 0.5rem;
}
@keyframes anim-idle {
0% { transform: translate(-50%, 0px) scale(var(--char-scale)); }
50% { transform: translate(-50%, 12px) scale(var(--char-scale)); }
100% { transform: translate(-50%, 0px) scale(var(--char-scale)); }
}
@keyframes anim-shake {
0%, 100% { transform: translate(-50%, 0) rotate(0deg) scale(var(--char-scale)); }
10%, 30%, 50%, 70%, 90% { transform: translate(-52%, 0) rotate(-2deg) scale(var(--char-scale)); }
20%, 40%, 60%, 80% { transform: translate(-48%, 0) rotate(2deg) scale(var(--char-scale)); }
}
@keyframes anim-bounce {
0%, 100% { transform: translate(-50%, 0) scale(var(--char-scale)); }
25% { transform: translate(-50%, -30px) scale(var(--char-scale)); }
50% { transform: translate(-50%, 0) scale(var(--char-scale)); }
75% { transform: translate(-50%, -15px) scale(var(--char-scale)); }
}
@keyframes anim-pulse {
0%, 100% { transform: translate(-50%, 0) scale(var(--char-scale)); }
50% { transform: translate(-50%, 0) scale(calc(var(--char-scale) * 1.05)); }
}
"""
ENUMERATE_CAMERAS_JS = """
async (currentDevices) => {
if (!navigator.mediaDevices?.enumerateDevices) {
return currentDevices || [];
}
try {
const devices = await navigator.mediaDevices.enumerateDevices();
return devices
.filter((device) => device.kind === "videoinput")
.map((device, index) => ({
label: device.label || `Camera ${index + 1}`,
deviceId: device.deviceId || null,
}));
} catch (error) {
console.warn("enumerateDevices failed", error);
return currentDevices || [];
}
}
"""
def load_dxl_script_js() -> str:
"""Inline the DXL script content directly."""
script_dir = os.path.dirname(os.path.abspath(__file__))
js_path = os.path.join(script_dir, "web", "dxl_webserial.js")
try:
with open(js_path, 'r', encoding='utf-8') as f:
js_content = f.read()
except Exception as e:
js_content = f"console.error('[DXL] Failed to load script: {e}');"
# Properly escape for JavaScript template literal
js_content_escaped = js_content.replace('\\', '\\\\').replace('`', '\\`').replace('${', '\\${')
return f"""
() => {{
try {{
// Execute inline script
const scriptFn = new Function({repr(js_content)});
scriptFn();
console.log('[DXL] Script loaded inline');
}} catch(e) {{
console.error('[DXL] Failed to execute:', e);
}}
}}
"""
def dxl_send_and_receive_js() -> str:
"""JavaScript to send packet bytes and receive response via Web Serial."""
return """
async (packet_bytes) => {
// Check if dxlSerial is available and connected
if (typeof window.dxlSerial === 'undefined' || !window.dxlSerial) {
console.error("[DXL] Serial not available - connect first");
return [];
}
if (!window.dxlSerial.connected) {
console.error("[DXL] Not connected to serial port");
return [];
}
try {
await window.dxlSerial.writeBytes(packet_bytes);
const response = await window.dxlSerial.readPacket(800);
return response;
} catch (err) {
console.error("[DXL] Communication error:", err.message);
return [];
}
}
"""
def execute_motor_packets_js() -> str:
"""JavaScript to execute pre-built motor packets."""
return """
async (packets) => {
if (!packets || packets.length === 0) {
return; // No packets to execute
}
// Check if serial is available
if (typeof window.dxlSerial === 'undefined' || !window.dxlSerial || !window.dxlSerial.connected) {
return; // Silently skip if not connected
}
// Execute each packet sequentially
for (const pkt of packets) {
try {
await window.dxlSerial.writeBytes(pkt);
await window.dxlSerial.readPacket(800);
} catch (err) {
console.error(`[Motors] Error:`, err.message);
}
}
}
"""
def play_scene_audio_js() -> str:
"""JavaScript to play audio file."""
return """
(audio_path) => {
if (!audio_path || audio_path === '') {
return; // No audio to play
}
// Create or reuse audio element
let audio = document.getElementById('scene-audio-player');
if (!audio) {
audio = new Audio();
audio.id = 'scene-audio-player';
}
console.log('[Audio] Playing:', audio_path);
audio.src = audio_path;
audio.play().catch(err => console.error('[Audio] Playback failed:', err));
}
"""
def load_robot_ws_script_js() -> str:
"""JavaScript to initialize WebSocket connection to Reachy Mini robot."""
return """
() => {
console.log('[Robot] Initializing WebSocket connection...');
// Define global initialization function if not already defined
if (!window.loadRobotWebSocket) {
window.loadRobotWebSocket = function() {
const hostDiv = document.getElementById('robot-ws-host');
if (!hostDiv) {
console.error('[Robot] Cannot initialize - host div not found');
return;
}
const ROBOT_URL = 'localhost:8000';
const WS_URL = `ws://${ROBOT_URL}/api/move/ws/set_target`;
console.log('[Robot] Connecting to:', WS_URL);
// Global robot state
window.reachyRobot = {
ws: null,
connected: false
};
// Create UI
hostDiv.innerHTML = `
Disconnected - Trying to connect...
`;
function updateStatus(connected) {
const statusDiv = document.getElementById('robot-connection-status');
const dot = document.getElementById('robot-status-dot');
const text = document.getElementById('robot-status-text');
if (connected) {
statusDiv.style.background = '#d4edda';
statusDiv.style.color = '#155724';
dot.style.background = '#28a745';
dot.style.boxShadow = '0 0 10px #28a745';
text.textContent = 'Connected to robot';
} else {
statusDiv.style.background = '#f8d7da';
statusDiv.style.color = '#721c24';
dot.style.background = '#dc3545';
dot.style.boxShadow = 'none';
text.textContent = 'Disconnected - Reconnecting...';
}
}
function connectWebSocket() {
console.log('[Robot] Connecting to WebSocket:', WS_URL);
window.reachyRobot.ws = new WebSocket(WS_URL);
window.reachyRobot.ws.onopen = () => {
console.log('[Robot] WebSocket connected');
window.reachyRobot.connected = true;
updateStatus(true);
};
window.reachyRobot.ws.onclose = () => {
console.log('[Robot] WebSocket disconnected');
window.reachyRobot.connected = false;
updateStatus(false);
// Reconnect after 2 seconds
setTimeout(connectWebSocket, 2000);
};
window.reachyRobot.ws.onerror = (error) => {
console.error('[Robot] WebSocket error:', error);
};
window.reachyRobot.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
if (message.status === 'error') {
console.error('[Robot] Server error:', message.detail);
}
} catch (e) {
console.error('[Robot] Failed to parse message:', e);
}
};
}
connectWebSocket();
}; // End of window.loadRobotWebSocket definition
}
// Try to initialize (with multiple retries)
let retryCount = 0;
const maxRetries = 10;
function tryInit() {
const hostDiv = document.getElementById('robot-ws-host');
if (!hostDiv) {
retryCount++;
if (retryCount <= maxRetries) {
console.warn(`[Robot] Host div not found, retry ${retryCount}/${maxRetries} in 1 second`);
setTimeout(tryInit, 1000);
} else {
console.warn('[Robot] Gave up waiting for robot widget div. Will initialize on first use.');
}
return;
}
if (window.reachyRobot) {
console.log('[Robot] Already initialized');
return;
}
// Initialize now
console.log('[Robot] Found host div, initializing...');
window.loadRobotWebSocket();
}
tryInit();
}
"""
def send_robot_pose_js() -> str:
"""JavaScript to send robot pose via WebSocket."""
return """
async (pose_data) => {
if (!pose_data) {
return; // No pose to send
}
// Initialize WebSocket if not already done (lazy initialization)
if (!window.reachyRobot) {
console.log('[Robot] Lazy initialization on first pose send');
if (window.loadRobotWebSocket) {
window.loadRobotWebSocket();
// Wait a bit for connection to establish
await new Promise(resolve => setTimeout(resolve, 500));
}
}
if (!window.reachyRobot || !window.reachyRobot.connected || !window.reachyRobot.ws || window.reachyRobot.ws.readyState !== WebSocket.OPEN) {
console.warn('[Robot] WebSocket not connected, skipping pose command');
return;
}
try {
console.log('[Robot] Sending pose:', pose_data);
window.reachyRobot.ws.send(JSON.stringify(pose_data));
} catch (error) {
console.error('[Robot] Failed to send pose:', error);
}
}
"""
def build_app() -> gr.Blocks:
with gr.Blocks(title="Gradio Visual Novel") as demo:
gr.HTML(f"", elem_id="vn-styles")
story_state = gr.State()
with gr.Row():
with gr.Column(scale=3, min_width=640):
stage = gr.HTML(label="Stage", elem_id="stage-container")
dialogue = gr.Markdown(label="Dialogue")
meta = gr.Markdown(label="Scene Info", elem_id="scene-info")
# Choice selection
choice_radio = gr.Radio(label="Make a choice", visible=False)
# Text input
with gr.Group(visible=False) as input_group:
input_prompt = gr.Markdown("", elem_classes=["input-prompt"])
with gr.Row():
user_input = gr.Textbox(label="Your answer", scale=4)
input_submit_btn = gr.Button("Submit", variant="primary", scale=1)
with gr.Row():
prev_btn = gr.Button("โต Back", variant="secondary")
next_btn = gr.Button("Next โถ", variant="primary")
with gr.Column(scale=1, min_width=320, elem_classes=["camera-column"], visible=False) as right_column:
gr.Markdown("### Live Camera (WebRTC)")
camera_hint = gr.Markdown(
camera_hint_text(False), elem_classes=["camera-hint"]
)
gr.Markdown(
"Allow camera access when prompted. The webcam appears only in scenes that request it.",
elem_classes=["camera-hint"],
)
with gr.Group(elem_id="camera-wrapper"):
webrtc_component = WebRTC(
label="Webcam Stream",
mode="send-receive",
modality="video",
full_screen=False,
visible=False,
)
webrtc_component.stream(
fn=passthrough_stream,
inputs=[webrtc_component],
outputs=[webrtc_component],
)
voice_hint = gr.Markdown(
voice_hint_text(False), elem_classes=["camera-hint"]
)
with gr.Group(visible=False, elem_id="voice-wrapper") as voice_section:
with gr.Accordion("Voice & Audio Agent", open=True):
gr.Markdown(
"Record a short line to pass to your AI companion. "
"We play back your clip and a synthetic confirmation tone.",
elem_classes=["camera-hint"],
)
voice_prompt = gr.Textbox(
label="Prompt/context",
value="React to the current scene with a friendly reply.",
lines=2,
)
mic = gr.Audio(
sources=["microphone", "upload"],
type="numpy",
label="Record or upload audio",
)
send_voice_btn = gr.Button(
"Send to voice agent", variant="secondary"
)
voice_summary = gr.Markdown("No audio captured yet.")
playback = gr.Audio(label="Your recording", interactive=False)
ai_voice_text = gr.Markdown("AI response will appear here.")
ai_voice_audio = gr.Audio(
label="AI voice reply (synthetic tone)", interactive=False
)
send_voice_btn.click(
fn=process_voice_interaction,
inputs=[mic, voice_prompt],
outputs=[
voice_summary,
playback,
ai_voice_text,
ai_voice_audio,
],
)
motor_hint = gr.Markdown(
motor_hint_text(False), elem_classes=["camera-hint"]
)
with gr.Group(visible=False, elem_id="dxl-panel-container") as motor_group:
with gr.Accordion("Dynamixel XL330 Control", open=True):
gr.Markdown(
"**Web Serial Control** - Use Chrome/Edge desktop. Connect to serial port, then control motors.",
elem_classes=["camera-hint"],
)
# Serial connection panel (still handled by JavaScript)
gr.HTML('', elem_id="dxl-panel-host-wrapper")
# Motor control inputs (Python-based)
with gr.Row():
motor_id_input = gr.Number(
label="Motor ID",
value=1,
minimum=0,
maximum=252,
precision=0,
)
with gr.Row():
goal_slider = gr.Slider(
label="Goal Position (degrees)",
minimum=0,
maximum=360,
value=90,
step=1,
)
with gr.Row():
ping_btn = gr.Button("Ping", size="sm")
torque_on_btn = gr.Button("Torque ON", size="sm", variant="secondary")
torque_off_btn = gr.Button("Torque OFF", size="sm")
with gr.Row():
send_goal_btn = gr.Button("Send Goal Position", variant="primary")
motor_status = gr.Markdown("Status: Ready")
# Robot Control (Reachy Mini via WebSocket)
robot_hint = gr.Markdown(
robot_hint_text(False), elem_classes=["camera-hint"]
)
with gr.Group(visible=False, elem_id="robot-panel-container") as robot_group:
with gr.Accordion("Reachy Mini Robot Control", open=True):
gr.Markdown(
"**WebSocket Control** - Connects to localhost:8000 for real-time robot control.",
elem_classes=["camera-hint"],
)
# WebSocket connection area (will be managed by JavaScript)
# Status is shown dynamically by JavaScript inside this div
gr.HTML('', elem_id="robot-ws-host-wrapper")
# Wire up event handlers
all_outputs = [
story_state,
stage,
dialogue,
meta,
camera_hint,
webrtc_component,
voice_hint,
voice_section,
motor_hint,
motor_group,
robot_hint,
robot_group,
choice_radio,
input_prompt,
input_group,
prev_btn,
next_btn,
right_column,
]
# Hidden JSON for passing packet bytes between Python and JavaScript
# Note: gr.State doesn't work well with JavaScript, so we use JSON
packet_bytes_json = gr.JSON(visible=False, value=[])
response_bytes_json = gr.JSON(visible=False, value=[])
motor_packets_json = gr.JSON(visible=False, value=[]) # For scene motor commands
# Hidden textbox for passing audio path to JavaScript
audio_path_box = gr.Textbox(visible=False, value="")
# Hidden JSON for passing robot pose to JavaScript
robot_pose_json = gr.JSON(visible=False, value=None)
# Load initialization scripts
combined_init_js = f"""
() => {{
// Initialize Dynamixel
({load_dxl_script_js()})();
// Initialize Robot WebSocket
({load_robot_ws_script_js()})();
}}
"""
demo.load(
fn=load_initial_state,
inputs=None,
outputs=all_outputs,
js=combined_init_js,
)
# Navigation buttons with automatic motor command execution, audio playback, and robot control
# Create parallel chains for audio, motors, and robot to ensure all get the updated state
# Previous button
prev_event = prev_btn.click(
fn=lambda state: change_scene(state, -1),
inputs=story_state,
outputs=all_outputs,
)
# Audio chain
prev_event.then(
fn=get_scene_audio,
inputs=[story_state],
outputs=[audio_path_box],
).then(
fn=None,
inputs=[audio_path_box],
outputs=[],
js=play_scene_audio_js(),
)
# Motor chain (parallel)
prev_event.then(
fn=get_scene_motor_packets,
inputs=[story_state],
outputs=[motor_packets_json],
).then(
fn=None,
inputs=[motor_packets_json],
outputs=[],
js=execute_motor_packets_js(),
)
# Robot chain (parallel)
prev_event.then(
fn=get_scene_robot_pose,
inputs=[story_state],
outputs=[robot_pose_json],
).then(
fn=None,
inputs=[robot_pose_json],
outputs=[],
js=send_robot_pose_js(),
)
# Next button
next_event = next_btn.click(
fn=lambda state: change_scene(state, 1),
inputs=story_state,
outputs=all_outputs,
)
# Audio chain
next_event.then(
fn=get_scene_audio,
inputs=[story_state],
outputs=[audio_path_box],
).then(
fn=None,
inputs=[audio_path_box],
outputs=[],
js=play_scene_audio_js(),
)
# Motor chain (parallel)
next_event.then(
fn=get_scene_motor_packets,
inputs=[story_state],
outputs=[motor_packets_json],
).then(
fn=None,
inputs=[motor_packets_json],
outputs=[],
js=execute_motor_packets_js(),
)
# Robot chain (parallel)
next_event.then(
fn=get_scene_robot_pose,
inputs=[story_state],
outputs=[robot_pose_json],
).then(
fn=None,
inputs=[robot_pose_json],
outputs=[],
js=send_robot_pose_js(),
)
# Choice handler
choice_event = choice_radio.change(
fn=handle_choice,
inputs=[story_state, choice_radio],
outputs=all_outputs,
)
# Audio chain
choice_event.then(
fn=get_scene_audio,
inputs=[story_state],
outputs=[audio_path_box],
).then(
fn=None,
inputs=[audio_path_box],
outputs=[],
js=play_scene_audio_js(),
)
# Motor chain (parallel)
choice_event.then(
fn=get_scene_motor_packets,
inputs=[story_state],
outputs=[motor_packets_json],
).then(
fn=None,
inputs=[motor_packets_json],
outputs=[],
js=execute_motor_packets_js(),
)
# Robot chain (parallel)
choice_event.then(
fn=get_scene_robot_pose,
inputs=[story_state],
outputs=[robot_pose_json],
).then(
fn=None,
inputs=[robot_pose_json],
outputs=[],
js=send_robot_pose_js(),
)
# Input submit button
input_submit_event = input_submit_btn.click(
fn=handle_input,
inputs=[story_state, user_input],
outputs=all_outputs,
)
# Audio chain
input_submit_event.then(
fn=get_scene_audio,
inputs=[story_state],
outputs=[audio_path_box],
).then(
fn=None,
inputs=[audio_path_box],
outputs=[],
js=play_scene_audio_js(),
)
# Motor chain (parallel)
input_submit_event.then(
fn=get_scene_motor_packets,
inputs=[story_state],
outputs=[motor_packets_json],
).then(
fn=None,
inputs=[motor_packets_json],
outputs=[],
js=execute_motor_packets_js(),
)
# Robot chain (parallel)
input_submit_event.then(
fn=get_scene_robot_pose,
inputs=[story_state],
outputs=[robot_pose_json],
).then(
fn=None,
inputs=[robot_pose_json],
outputs=[],
js=send_robot_pose_js(),
)
# Input enter key
input_enter_event = user_input.submit(
fn=handle_input,
inputs=[story_state, user_input],
outputs=all_outputs,
)
# Audio chain
input_enter_event.then(
fn=get_scene_audio,
inputs=[story_state],
outputs=[audio_path_box],
).then(
fn=None,
inputs=[audio_path_box],
outputs=[],
js=play_scene_audio_js(),
)
# Motor chain (parallel)
input_enter_event.then(
fn=get_scene_motor_packets,
inputs=[story_state],
outputs=[motor_packets_json],
).then(
fn=None,
inputs=[motor_packets_json],
outputs=[],
js=execute_motor_packets_js(),
)
# Robot chain (parallel)
input_enter_event.then(
fn=get_scene_robot_pose,
inputs=[story_state],
outputs=[robot_pose_json],
).then(
fn=None,
inputs=[robot_pose_json],
outputs=[],
js=send_robot_pose_js(),
)
# Motor control event handlers
# Pattern: Python builds packet -> JS sends/receives -> Python parses
# Ping button
ping_btn.click(
fn=dxl_build_ping_packet,
inputs=[motor_id_input],
outputs=[packet_bytes_json],
).then(
fn=None,
inputs=[packet_bytes_json],
outputs=[response_bytes_json],
js=dxl_send_and_receive_js(),
).then(
fn=dxl_parse_response,
inputs=[response_bytes_json],
outputs=[motor_status],
)
# Torque ON button
torque_on_btn.click(
fn=lambda motor_id: dxl_build_torque_packet(motor_id, True),
inputs=[motor_id_input],
outputs=[packet_bytes_json],
).then(
fn=None,
inputs=[packet_bytes_json],
outputs=[response_bytes_json],
js=dxl_send_and_receive_js(),
).then(
fn=dxl_parse_response,
inputs=[response_bytes_json],
outputs=[motor_status],
)
# Torque OFF button
torque_off_btn.click(
fn=lambda motor_id: dxl_build_torque_packet(motor_id, False),
inputs=[motor_id_input],
outputs=[packet_bytes_json],
).then(
fn=None,
inputs=[packet_bytes_json],
outputs=[response_bytes_json],
js=dxl_send_and_receive_js(),
).then(
fn=dxl_parse_response,
inputs=[response_bytes_json],
outputs=[motor_status],
)
# Send goal position button
send_goal_btn.click(
fn=dxl_build_goal_position_packet,
inputs=[motor_id_input, goal_slider],
outputs=[packet_bytes_json],
).then(
fn=None,
inputs=[packet_bytes_json],
outputs=[response_bytes_json],
js=dxl_send_and_receive_js(),
).then(
fn=dxl_parse_response,
inputs=[response_bytes_json],
outputs=[motor_status],
)
return demo
def main() -> None:
"""Launch the Visual Novel Gradio app."""
logger.info("=== Visual Novel App Startup ===")
logger.info("Using HuggingFace repo URLs for assets")
# Build Gradio app
demo = build_app()
# Launch with SSR disabled
demo.launch(
server_name="0.0.0.0",
server_port=7860,
ssr_mode=False,
show_error=True,
)
if __name__ == "__main__":
main()