Dubins Car Realtime¶
Interactive real-time visualization for Dubins car path planning using Viser.
Web-based GUI mirroring dubins_car_realtime.py (PyQt): continuous SCP steps,
draggable circular obstacle, metrics, and λ weights. Run the script and open the
printed URL in your browser.
File: examples/realtime/dubins_car_realtime.py
import os
import sys
import threading
import time
import numpy as np
import viser
_this_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(_this_dir, ".."))
sys.path.append(os.path.dirname(os.path.dirname(_this_dir)))
from realtime.base_problems.dubins_car_realtime_base import problem
from examples.plotting_viser import (
build_scp_step_results,
extract_multishoot_trajectory,
format_metrics_markdown,
get_print_queue_data,
)
problem.initialize()
def create_realtime_server(optimization_problem) -> viser.ViserServer:
"""Create a viser server for real-time Dubins car visualization."""
server = viser.ViserServer()
server.gui.configure_theme(dark_mode=True)
obs_center_init = np.asarray(optimization_problem.parameters["obs_center"], dtype=np.float64)
obs_radius = float(optimization_problem.parameters["obs_radius"])
# --- Scene (XY plane at z=0) ---
server.scene.add_grid(
"/grid",
width=8,
height=8,
position=(0.0, 0.0, 0.0),
)
server.scene.add_frame(
"/origin",
wxyz=(1.0, 0.0, 0.0, 0.0),
position=(0.0, 0.0, 0.0),
axes_length=0.5,
)
trajectory_handle = server.scene.add_point_cloud(
"/trajectory",
points=np.zeros((1, 3), dtype=np.float32),
colors=(0, 100, 255),
point_size=0.06,
)
obstacle_handle = server.scene.add_icosphere(
"/obstacle",
radius=obs_radius,
color=(100, 255, 100),
position=(float(obs_center_init[0]), float(obs_center_init[1]), 0.0),
)
obstacle_drag = server.scene.add_transform_controls(
"/obstacle_drag",
position=(float(obs_center_init[0]), float(obs_center_init[1]), 0.0),
scale=1.2,
disable_rotations=True,
visible=False,
)
with server.gui.add_folder("Obstacle", expand_by_default=True):
server.gui.add_markdown("*Click the green obstacle to show the drag gizmo*")
obs_vector = server.gui.add_vector3(
"Center (x, y, z)",
initial_value=(float(obs_center_init[0]), float(obs_center_init[1]), 0.0),
step=0.1,
)
selected = {"on": False}
def select_obstacle(on: bool) -> None:
obstacle_drag.visible = on
obstacle_handle.color = (150, 255, 150) if on else (100, 255, 100)
selected["on"] = on
@obstacle_handle.on_click
def _(_) -> None:
select_obstacle(not selected["on"])
@obstacle_drag.on_update
def _(_) -> None:
cx, cy, _ = obstacle_drag.position
c = np.array([cx, cy], dtype=np.float64)
optimization_problem.parameters["obs_center"] = c
obs_vector.value = (float(cx), float(cy), 0.0)
obstacle_handle.position = (cx, cy, 0.0)
@obs_vector.on_update
def _(_) -> None:
cx, cy, _ = obs_vector.value
c = np.array([cx, cy], dtype=np.float64)
optimization_problem.parameters["obs_center"] = c
obstacle_drag.position = (cx, cy, 0.0)
obstacle_handle.position = (cx, cy, 0.0)
state = {
"running": True,
"reset_requested": False,
}
# --- GUI ---
with server.gui.add_folder("Optimization Metrics"):
metrics_text = server.gui.add_markdown(
format_metrics_markdown(
{
"iter": 0,
"J_tr": 0.0,
"J_vb": 0.0,
"J_vc": 0.0,
"cost": 0.0,
"dis_time": 0.0,
"solve_time": 0.0,
"prob_stat": "--",
}
)
+ (
f"\n**λ_cost:** {optimization_problem.algorithm.lam_cost:.2E}"
f"\n**λ_tr:** {optimization_problem.algorithm.lam_prox:.2E}"
)
)
with server.gui.add_folder("Optimization Weights"):
lam_cost_input = server.gui.add_number(
"λ_cost",
initial_value=optimization_problem.algorithm.lam_cost,
min=1e-6,
max=1e6,
step=0.01,
)
lam_tr_input = server.gui.add_number(
"λ_tr (lam_prox)",
initial_value=optimization_problem.algorithm.lam_prox,
min=1e-6,
max=1e6,
step=0.1,
)
@lam_cost_input.on_update
def _(_) -> None:
optimization_problem.algorithm.lam_cost = lam_cost_input.value
@lam_tr_input.on_update
def _(_) -> None:
optimization_problem.algorithm.lam_prox = lam_tr_input.value
with server.gui.add_folder("Problem Control"):
reset_button = server.gui.add_button("Reset Problem")
@reset_button.on_click
def _(_) -> None:
state["reset_requested"] = True
print("Problem reset requested")
def sync_obstacle_from_params() -> None:
c = np.asarray(optimization_problem.parameters["obs_center"], dtype=np.float64)
obstacle_handle.position = (float(c[0]), float(c[1]), 0.0)
obstacle_drag.position = (float(c[0]), float(c[1]), 0.0)
obs_vector.value = (float(c[0]), float(c[1]), 0.0)
def update_metrics(results: dict) -> None:
base = format_metrics_markdown(results)
lam = (
f"\n**λ_cost:** {optimization_problem.algorithm.lam_cost:.2E}\n"
f"**λ_tr:** {optimization_problem.algorithm.lam_prox:.2E}"
)
metrics_text.content = base + lam
def update_trajectory(V_multi_shoot: np.ndarray) -> None:
try:
n_x = optimization_problem.settings.sim.n_states
n_u = optimization_problem.settings.sim.n_controls
pos_xy, _ = extract_multishoot_trajectory(
V_multi_shoot,
n_x,
n_u,
position_slice=slice(0, 2),
velocity_slice=None,
)
if len(pos_xy) > 0:
z = np.zeros((pos_xy.shape[0], 1), dtype=np.float32)
positions = np.hstack([pos_xy.astype(np.float32), z])
n = positions.shape[0]
colors = np.tile(np.array([[0, 100, 255]], dtype=np.uint8), (n, 1))
trajectory_handle.points = positions
trajectory_handle.colors = colors
except Exception as e:
print(f"Trajectory update error: {e}")
x_traj = np.asarray(optimization_problem.state.x)
if x_traj.size and x_traj.shape[1] >= 2:
xy = x_traj[:, :2].astype(np.float32)
z = np.zeros((xy.shape[0], 1), dtype=np.float32)
positions = np.hstack([xy, z])
n = positions.shape[0]
colors = np.tile(np.array([[0, 100, 255]], dtype=np.uint8), (n, 1))
trajectory_handle.points = positions
trajectory_handle.colors = colors
def optimization_loop() -> None:
while state["running"]:
try:
if state["reset_requested"]:
optimization_problem.reset()
state["reset_requested"] = False
sync_obstacle_from_params()
print("Problem reset to initial conditions")
t0 = time.time()
step_result = optimization_problem.step()
solve_time_ms = (time.time() - t0) * 1000.0
results = build_scp_step_results(step_result, solve_time_ms)
results.update(get_print_queue_data(optimization_problem))
update_metrics(results)
if optimization_problem.state.V_history:
V_multi_shoot = np.asarray(optimization_problem.state.V_history[-1])
update_trajectory(V_multi_shoot)
time.sleep(0.05)
except Exception as e:
print(f"Optimization error: {e}")
time.sleep(1.0)
threading.Thread(target=optimization_loop, daemon=True).start()
return server
if __name__ == "__main__":
server = create_realtime_server(problem)
print("Viser server started. Open the URL in your browser.")
server.sleep_forever()