Skip to content

3DoF Pdg Realtime

Interactive real-time visualization for 3-DOF powered descent guidance.

This module provides a Viser GUI that allows live updates of PDG problem parameters (dynamics, constraints, boundary conditions, and SCP weights).

File: examples/realtime/3DoF_pdg_realtime.py

import os
import sys
import threading
import time

import matplotlib
import numpy as np
import viser

current_dir = os.path.dirname(os.path.abspath(__file__))
grandparent_dir = os.path.dirname(os.path.dirname(current_dir))
sys.path.append(grandparent_dir)

import importlib.util

_base_path = os.path.join(current_dir, "base_problems", "3DoF_pdg_realtime_base.py")
_spec = importlib.util.spec_from_file_location("pdg3dof_realtime_base", _base_path)
if _spec is None or _spec.loader is None:
    raise ImportError(f"Unable to load PDG realtime base module: {_base_path}")
pdg = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(pdg)

from examples.plotting_viser import (
    build_scp_step_results,
    compute_velocity_colors_realtime,
    extract_multishoot_trajectory,
    format_metrics_markdown,
    get_print_queue_data,
)

_viridis_cmap = matplotlib.colormaps["viridis"]
VISER_SCENE_SCALE = 0.01

# Initialize once; updates are handled via parameter/state mutation + reset.
pdg.initialize_problem_quiet(pdg.problem)
pdg.print_scp_table_header_once(pdg.problem)


def _to_deg(rad: float) -> float:
    return float(rad) * 180.0 / np.pi


def _to_rad(deg: float) -> float:
    return float(deg) * np.pi / 180.0


def _generate_cone_mesh(
    apex: np.ndarray,
    height: float,
    half_angle_deg: float,
    n_segments: int = 32,
    axis: np.ndarray = np.array([0.0, 0.0, 1.0], dtype=np.float32),
) -> tuple[np.ndarray, np.ndarray]:
    """Generate a cone mesh for realtime glideslope visualization."""
    half_angle_rad = np.radians(half_angle_deg)
    base_radius = height * np.tan(half_angle_rad)

    axis = np.asarray(axis, dtype=np.float32)
    axis = axis / np.linalg.norm(axis)

    ref = (
        np.array([1.0, 0.0, 0.0], dtype=np.float32)
        if abs(axis[0]) < 0.9
        else np.array([0.0, 1.0, 0.0], dtype=np.float32)
    )
    u = ref - np.dot(ref, axis) * axis
    u = u / np.linalg.norm(u)
    v = np.cross(axis, u)

    vertices = [apex.copy()]
    base_center = apex + height * axis
    for i in range(n_segments):
        angle = 2.0 * np.pi * i / n_segments
        offset = base_radius * (np.cos(angle) * u + np.sin(angle) * v)
        vertices.append(base_center + offset)
    vertices.append(base_center.copy())
    vertices = np.array(vertices, dtype=np.float32)

    faces = []
    for i in range(n_segments):
        next_i = (i + 1) % n_segments
        faces.append([0, i + 1, next_i + 1])
    base_center_idx = n_segments + 1
    for i in range(n_segments):
        next_i = (i + 1) % n_segments
        faces.append([base_center_idx, next_i + 1, i + 1])
    faces = np.array(faces, dtype=np.int32)
    return vertices, faces


def _sync_problem_parameters_from_base() -> None:
    """Push current base parameter values into the active problem."""
    pdg.problem.parameters["I_sp"] = float(pdg.I_sp.value)
    pdg.problem.parameters["g"] = float(pdg.g.value)
    pdg.problem.parameters["theta"] = float(pdg.theta.value)
    pdg.problem.parameters["glideslope_angle"] = float(pdg.glideslope_angle.value)
    pdg.problem.parameters["thrust_pointing_angle"] = float(pdg.thrust_pointing_angle.value)
    pdg.problem.parameters["T1"] = float(pdg.T1.value)
    pdg.problem.parameters["T2"] = float(pdg.T2.value)
    pdg.problem.parameters["initial_position"] = np.array(
        pdg.initial_position.value, dtype=np.float64
    )
    pdg.problem.parameters["final_position"] = np.array(pdg.final_position.value, dtype=np.float64)


def create_realtime_server(optimization_problem) -> viser.ViserServer:
    server = viser.ViserServer()
    server.gui.configure_theme(dark_mode=True)

    server.scene.add_grid(
        "/grid",
        width=7000 * VISER_SCENE_SCALE,
        height=7000 * VISER_SCENE_SCALE,
        position=(0.0, 0.0, 0.0),
    )
    server.scene.add_frame(
        "/origin",
        wxyz=(1.0, 0.0, 0.0, 0.0),
        position=(0.0, 0.0, 0.0),
        axes_length=200.0 * VISER_SCENE_SCALE,
    )

    trajectory_handle = server.scene.add_point_cloud(
        "/trajectory",
        points=np.zeros((1, 3), dtype=np.float32),
        colors=(255, 255, 0),
        point_size=30.0 * VISER_SCENE_SCALE,
    )
    start_handle = server.scene.add_icosphere(
        "/start",
        radius=40.0 * VISER_SCENE_SCALE,
        color=(100, 255, 100),
        position=tuple(np.asarray(pdg.position.initial, dtype=np.float64) * VISER_SCENE_SCALE),
    )
    target_handle = server.scene.add_icosphere(
        "/target",
        radius=40.0 * VISER_SCENE_SCALE,
        color=(255, 100, 100),
        position=(
            float(pdg.final_position.value[0]) * VISER_SCENE_SCALE,
            float(pdg.final_position.value[1]) * VISER_SCENE_SCALE,
            0.0,
        ),
    )
    start_drag = server.scene.add_transform_controls(
        "/start_drag",
        position=tuple(np.asarray(pdg.position.initial, dtype=np.float64) * VISER_SCENE_SCALE),
        scale=40.0 * VISER_SCENE_SCALE,
        disable_rotations=True,
        visible=True,
    )
    target_drag = server.scene.add_transform_controls(
        "/target_drag",
        position=(
            float(pdg.final_position.value[0]) * VISER_SCENE_SCALE,
            float(pdg.final_position.value[1]) * VISER_SCENE_SCALE,
            0.0,
        ),
        scale=40.0 * VISER_SCENE_SCALE,
        disable_rotations=True,
        visible=True,
    )

    def _cone_height_scaled() -> float:
        init_z = float(np.asarray(pdg.initial_position.value, dtype=np.float64)[2])
        return max(abs(init_z), 100.0) * VISER_SCENE_SCALE

    def _update_glideslope_cone() -> None:
        apex = np.array(
            [
                float(pdg.final_position.value[0]) * VISER_SCENE_SCALE,
                float(pdg.final_position.value[1]) * VISER_SCENE_SCALE,
                0.0,
            ],
            dtype=np.float32,
        )
        cone_vertices, cone_faces = _generate_cone_mesh(
            apex=apex,
            height=_cone_height_scaled(),
            half_angle_deg=_to_deg(float(pdg.glideslope_angle.value)),
            n_segments=48,
        )
        glideslope_cone_handle.vertices = cone_vertices
        glideslope_cone_handle.faces = cone_faces

    cone_vertices, cone_faces = _generate_cone_mesh(
        apex=np.array(
            [
                float(pdg.final_position.value[0]) * VISER_SCENE_SCALE,
                float(pdg.final_position.value[1]) * VISER_SCENE_SCALE,
                0.0,
            ],
            dtype=np.float32,
        ),
        height=_cone_height_scaled(),
        half_angle_deg=_to_deg(float(pdg.glideslope_angle.value)),
        n_segments=48,
    )
    glideslope_cone_handle = server.scene.add_mesh_simple(
        "/constraints/glideslope_cone",
        vertices=cone_vertices,
        faces=cone_faces,
        color=(80, 200, 120),
        wireframe=False,
        opacity=0.2,
    )

    state = {"running": True, "reset_requested": False}

    with server.gui.add_folder("Optimization Metrics"):
        metrics_text = server.gui.add_markdown(
            format_metrics_markdown(
                {
                    "iter": 0,
                    "J_tr": 0.0,
                    "J_vb": 0.0,
                    "J_vc": 0.0,
                    "cost": 0.0,
                    "dis_time": 0.0,
                    "solve_time": 0.0,
                    "prob_stat": "--",
                }
            )
        )

    with server.gui.add_folder("Problem Control", expand_by_default=True):
        reset_button = server.gui.add_button("Apply Changes + Reset Problem")

        @reset_button.on_click
        def _(_) -> None:
            state["reset_requested"] = True

    with server.gui.add_folder("Algorithm Weights"):
        lam_cost = server.gui.add_number(
            "lam_cost",
            initial_value=optimization_problem.algorithm.lam_cost,
            min=1e-8,
            max=1e5,
            step=0.01,
        )
        lam_vc = server.gui.add_number(
            "lam_vc",
            initial_value=optimization_problem.algorithm.lam_vc,
            min=1e-8,
            max=1e5,
            step=0.01,
        )
        lam_prox = server.gui.add_number(
            "lam_prox",
            initial_value=optimization_problem.algorithm.lam_prox,
            min=1e-8,
            max=1e5,
            step=0.01,
        )

        @lam_cost.on_update
        def _(_) -> None:
            optimization_problem.algorithm.lam_cost = float(lam_cost.value)

        @lam_vc.on_update
        def _(_) -> None:
            optimization_problem.algorithm.lam_vc = float(lam_vc.value)

        @lam_prox.on_update
        def _(_) -> None:
            optimization_problem.algorithm.lam_prox = float(lam_prox.value)

    with server.gui.add_folder("Dynamics / Constraint Parameters"):
        isp_input = server.gui.add_number(
            "I_sp", initial_value=float(pdg.I_sp.value), min=50.0, max=450.0, step=1.0
        )
        g_input = server.gui.add_number(
            "g (m/s^2)", initial_value=float(pdg.g.value), min=0.0, max=20.0, step=0.01
        )
        theta_deg_input = server.gui.add_number(
            "theta (deg)", initial_value=_to_deg(pdg.theta.value), min=0.0, max=80.0, step=0.1
        )
        glideslope_deg_input = server.gui.add_number(
            "glideslope (deg)",
            initial_value=_to_deg(pdg.glideslope_angle.value),
            min=1.0,
            max=89.0,
            step=0.1,
        )
        thrust_pointing_deg_input = server.gui.add_number(
            "thrust_pointing (deg)",
            initial_value=_to_deg(pdg.thrust_pointing_angle.value),
            min=0.0,
            max=89.0,
            step=0.1,
        )
        t1_input = server.gui.add_number(
            "T1 per-engine (N)", initial_value=float(pdg.T1.value), min=0.0, max=10000.0, step=10.0
        )
        t2_input = server.gui.add_number(
            "T2 per-engine (N)", initial_value=float(pdg.T2.value), min=0.0, max=10000.0, step=10.0
        )
        initial_pos_input = server.gui.add_vector3(
            "initial_position (m)",
            initial_value=tuple(np.asarray(pdg.initial_position.value, dtype=np.float64)),
            step=10.0,
        )
        final_xy_input = server.gui.add_vector2(
            "final_position [x,y] (m)",
            initial_value=tuple(np.asarray(pdg.final_position.value, dtype=np.float64)),
            step=10.0,
        )

        @isp_input.on_update
        def _(_) -> None:
            pdg.I_sp.value = float(isp_input.value)
            optimization_problem.parameters["I_sp"] = float(isp_input.value)

        @g_input.on_update
        def _(_) -> None:
            pdg.g.value = float(g_input.value)
            optimization_problem.parameters["g"] = float(g_input.value)

        @theta_deg_input.on_update
        def _(_) -> None:
            val = _to_rad(theta_deg_input.value)
            pdg.theta.value = val
            optimization_problem.parameters["theta"] = val

        @glideslope_deg_input.on_update
        def _(_) -> None:
            val = _to_rad(glideslope_deg_input.value)
            pdg.glideslope_angle.value = val
            optimization_problem.parameters["glideslope_angle"] = val
            _update_glideslope_cone()

        @thrust_pointing_deg_input.on_update
        def _(_) -> None:
            val = _to_rad(thrust_pointing_deg_input.value)
            pdg.thrust_pointing_angle.value = val
            optimization_problem.parameters["thrust_pointing_angle"] = val

        @t1_input.on_update
        def _(_) -> None:
            val = float(t1_input.value)
            pdg.T1.value = val
            optimization_problem.parameters["T1"] = val

        @t2_input.on_update
        def _(_) -> None:
            val = float(t2_input.value)
            pdg.T2.value = val
            optimization_problem.parameters["T2"] = val

        @initial_pos_input.on_update
        def _(_) -> None:
            new_initial = np.array(initial_pos_input.value, dtype=np.float64)
            pdg.initial_position.value = new_initial
            optimization_problem.parameters["initial_position"] = new_initial
            pdg.position.initial = new_initial
            start_handle.position = tuple(new_initial * VISER_SCENE_SCALE)
            start_drag.position = tuple(new_initial * VISER_SCENE_SCALE)
            _update_glideslope_cone()

        @final_xy_input.on_update
        def _(_) -> None:
            vec = np.array(final_xy_input.value, dtype=np.float64)
            pdg.final_position.value = vec
            optimization_problem.parameters["final_position"] = vec
            target_handle.position = (
                float(vec[0]) * VISER_SCENE_SCALE,
                float(vec[1]) * VISER_SCENE_SCALE,
                0.0,
            )
            _update_glideslope_cone()
            target_drag.position = (
                float(vec[0]) * VISER_SCENE_SCALE,
                float(vec[1]) * VISER_SCENE_SCALE,
                0.0,
            )

    @start_drag.on_update
    def _(_) -> None:
        new_initial = np.array(start_drag.position, dtype=np.float64) / VISER_SCENE_SCALE
        pdg.initial_position.value = new_initial
        optimization_problem.parameters["initial_position"] = new_initial
        pdg.position.initial = new_initial
        start_handle.position = tuple(start_drag.position)
        initial_pos_input.value = tuple(new_initial)
        _update_glideslope_cone()

    @target_drag.on_update
    def _(_) -> None:
        x, y, _ = target_drag.position
        target_drag.position = (x, y, 0.0)
        new_final_xy = np.array([x, y], dtype=np.float64) / VISER_SCENE_SCALE
        pdg.final_position.value = new_final_xy
        optimization_problem.parameters["final_position"] = new_final_xy
        target_handle.position = (x, y, 0.0)
        final_xy_input.value = tuple(new_final_xy)
        _update_glideslope_cone()

    def apply_all_state_edits() -> None:
        # Drive state initial condition from the explicit initial_position parameter.
        pdg.position.initial = np.array(pdg.initial_position.value, dtype=np.float64)
        pdg.position.guess = np.linspace(pdg.position.initial, np.array([0.0, 0.0, 0.0]), pdg.n)
        pdg.velocity.guess = np.linspace(pdg.velocity.initial, np.array([0.0, 0.0, 0.0]), pdg.n)
        pdg.mass.guess = np.linspace(pdg.mass.initial, np.array([1690.0]), pdg.n).reshape(-1, 1)
        _sync_problem_parameters_from_base()

        rho_max = pdg.n_eng * float(pdg.T2.value) * np.cos(float(pdg.theta.value))
        pdg.thrust.min = pdg.n_eng * np.array(
            [-pdg.T_bar, -pdg.T_bar, -pdg.T_bar], dtype=np.float64
        )
        pdg.thrust.max = pdg.n_eng * np.array([pdg.T_bar, pdg.T_bar, pdg.T_bar], dtype=np.float64)
        pdg.plotting_dict["rho_min"] = (
            pdg.n_eng * float(pdg.T1.value) * np.cos(float(pdg.theta.value))
        )
        pdg.plotting_dict["rho_max"] = rho_max
        pdg.plotting_dict["glideslope_angle_deg"] = _to_deg(float(pdg.glideslope_angle.value))

    def update_trajectory(V_multi_shoot: np.ndarray) -> None:
        try:
            n_x = optimization_problem.settings.sim.n_states
            n_u = optimization_problem.settings.sim.n_controls
            positions, velocities = extract_multishoot_trajectory(V_multi_shoot, n_x, n_u)
            if len(positions) > 0:
                # Reuse helper for speed-colored trajectory.
                colors = compute_velocity_colors_realtime(velocities, _viridis_cmap)
                trajectory_handle.points = (positions * VISER_SCENE_SCALE).astype(np.float32)
                trajectory_handle.colors = colors
        except Exception:
            x_traj = np.asarray(optimization_problem.state.x)
            if x_traj.size and x_traj.shape[1] >= 3:
                points = (x_traj[:, :3] * VISER_SCENE_SCALE).astype(np.float32)
                trajectory_handle.points = points
                trajectory_handle.colors = np.tile(
                    np.array([[255, 255, 0]], dtype=np.uint8), (points.shape[0], 1)
                )

    def optimization_loop() -> None:
        while state["running"]:
            try:
                if state["reset_requested"]:
                    apply_all_state_edits()
                    optimization_problem.reset()
                    state["reset_requested"] = False

                t0 = time.time()
                step_result = optimization_problem.step()
                solve_time_ms = (time.time() - t0) * 1000.0

                results = build_scp_step_results(step_result, solve_time_ms)
                results.update(get_print_queue_data(optimization_problem))
                metrics_text.content = format_metrics_markdown(results)

                if optimization_problem.state.V_history:
                    V_multi_shoot = np.asarray(optimization_problem.state.V_history[-1])
                    update_trajectory(V_multi_shoot)

                time.sleep(0.05)
            except Exception as e:
                print(f"Optimization error: {e}")
                time.sleep(0.5)

    threading.Thread(target=optimization_loop, daemon=True).start()
    return server


if __name__ == "__main__":
    print("Starting 3DoF PDG realtime optimization.")
    print("Open the URL shown below in your browser.\n")
    server = create_realtime_server(pdg.problem)
    server.sleep_forever()