Skip to content

3DoF Pdg Realtime Base

3-DOF Powered Descent Guidance (PDG) for planetary landing.

This example demonstrates optimal trajectory generation for a rocket performing powered descent guidance, similar to SpaceX Falcon 9 or Blue Origin landings. The problem includes:

  • 3D position and velocity dynamics
  • Fuel-optimal mass minimization
  • Thrust magnitude and pointing constraints
  • Glideslope constraint for safe landing approach

File: examples/realtime/base_problems/3DoF_pdg_realtime_base.py

import contextlib
import io
import os
import queue
import sys
import time as pytime

import numpy as np

# Add repo root to path so `examples.*` imports resolve when run as a script.
current_dir = os.path.dirname(os.path.abspath(__file__))
repo_root_dir = os.path.dirname(os.path.dirname(os.path.dirname(current_dir)))
sys.path.append(repo_root_dir)

import openscvx as ox
from examples.plotting_viser import (
    create_pdg_animated_plotting_server,
    create_scp_animated_plotting_server,
)
from openscvx import Free, Problem
from openscvx.utils import printing as _openscvx_printing


def _silence_openscvx_console_printing() -> None:
    """Disable ASCII banner, problem box, and post-process results box for this example."""
    _openscvx_printing.intro = lambda: None
    _openscvx_printing.print_problem_summary = lambda *args, **kwargs: None
    _openscvx_printing.print_results_summary = lambda *args, **kwargs: None


def _scp_intermediate_drain_only(problem: Problem):
    """Consume iteration emits without printing rows (keeps queue from growing)."""

    def _run(print_queue, params, columns) -> None:
        hz = 30.0
        while True:
            t_start = pytime.time()
            try:
                data = print_queue.get(timeout=1.0 / hz)
                problem._scp_last_emit = data
            except queue.Empty:
                pass
            pytime.sleep(max(0.0, 1.0 / hz - (pytime.time() - t_start)))

    return _run


_gpq_patched = False


def _patch_get_print_queue_data_for_pdg() -> None:
    """Prefer latest SCP emit stored on the problem (drain thread) for Viser metrics."""
    global _gpq_patched
    if _gpq_patched:
        return
    import examples.plotting_viser as _pv

    _orig_gpq = _pv.get_print_queue_data

    def _gpq(problem):
        emit = getattr(problem, "_scp_last_emit", None)
        if emit is not None:
            return {
                "dis_time": emit.get("dis_time", 0.0),
                "prob_stat": emit.get("prob_stat", "--"),
                "cost": emit.get("cost", 0.0),
            }
        return _orig_gpq(problem)

    _pv.get_print_queue_data = _gpq
    _gpq_patched = True


def install_pdg_scp_console_style(problem: Problem) -> None:
    """Show SCP column header once; suppress per-iteration table rows."""
    problem.settings.dev.printing = True
    _openscvx_printing.intermediate = _scp_intermediate_drain_only(problem)
    _patch_get_print_queue_data_for_pdg()


def initialize_problem_quiet(problem: Problem) -> None:
    """Call ``initialize()`` without writing OpenSCvx banner/timing lines to stdout."""
    with contextlib.redirect_stdout(io.StringIO()):
        problem.initialize()


def print_scp_table_header_once(problem: Problem) -> None:
    """Print the SCP iteration table header (column names) once."""
    cols = getattr(problem, "_columns", None)
    if cols is not None:
        _openscvx_printing.header(cols)


_silence_openscvx_console_printing()

n = 10
total_time = 95.0  # Total simulation time

# Define state components
position = ox.State("position", shape=(3,))  # 3D position [x, y, z]
v_max = 500 * 1e3 / 3600  # Maximum velocity in m/s (800 km/h converted to m/s)
position.max = np.array([3000, 3000, 3000])
position.min = np.array([-3000, -3000, 0])
position.initial = [Free(2000), Free(0), Free(1500)]
position.final = [Free(0), Free(0), 0]
position.guess = np.linspace(position.initial, position.final, n)

velocity = ox.State("velocity", shape=(3,))  # 3D velocity [vx, vy, vz]
velocity.max = np.array([v_max, v_max, v_max])
velocity.min = np.array([-v_max, -v_max, -v_max])
velocity.initial = np.array([80, 30, -75])
velocity.final = np.array([0, 0, 0])
velocity.guess = np.linspace(velocity.initial, velocity.final, n)

mass = ox.State("mass", shape=(1,))  # Vehicle mass
mass.max = np.array([1905])
mass.min = np.array([1505])
mass.initial = np.array([1905])
mass.final = [("maximize", 1700)]
mass.scaling_min = np.array([1700])
# mass.scaling_max = np.array([1700])
mass.guess = np.linspace(mass.initial, 1690, n).reshape(-1, 1)

# Define control — thrust force vector [Tx, Ty, Tz]
thrust = ox.Control("thrust", shape=(3,), parameterization="ZOH")

T_bar = 3.1 * 1e3
# T1 = 0.3 * T_bar
T2_init = 0.8 * T_bar
n_eng = 6

# Set bounds on control
thrust.min = n_eng * np.array([-T_bar, -T_bar, -T_bar])
thrust.max = n_eng * np.array([T_bar, T_bar, T_bar])

# Set initial control guess
thrust.guess = np.repeat(np.expand_dims(np.array([0, 0, n_eng * (T2_init) / 2]), axis=0), n, axis=0)

# Define list of all states and controls
states = [position, velocity, mass]
controls = [thrust]


# Define Parameters for physical constants
g_e = 9.807  # Gravitational acceleration on Earth in m/s^2

# Create parameters for the problem
I_sp = ox.Parameter("I_sp", value=225.0)
g = ox.Parameter("g", value=3.7114)
theta = ox.Parameter("theta", value=27 * np.pi / 180)
glideslope_angle = ox.Parameter("glideslope_angle", value=86 * np.pi / 180)
thrust_pointing_angle = ox.Parameter("thrust_pointing_angle", value=40 * np.pi / 180)
T1 = ox.Parameter("T1", value=0.3 * T_bar)
T2 = ox.Parameter("T2", value=0.8 * T_bar)
initial_position = ox.Parameter("initial_position", shape=(3,), value=np.array([2000, 0, 1500]))
final_position = ox.Parameter("final_position", shape=(2,), value=np.array([0, 0]))

# Keep these symbolic so live parameter updates are reflected in constraints.
rho_min_expr = n_eng * T1 * ox.Cos(theta)  # Minimum thrust bound
rho_max_expr = n_eng * T2 * ox.Cos(theta)  # Maximum thrust bound

# Generate box constraints for all states
constraints = []
for state in states:
    constraints.extend(
        [
            ox.ctcs(state <= state.max, idx=0),
            ox.ctcs(state.min <= state, idx=0),
        ]
    )

# Thrust magnitude constraints
constraints.extend(
    [
        ox.ctcs(rho_min_expr <= ox.linalg.Norm(thrust), idx=1),
        ox.ctcs(ox.linalg.Norm(thrust) <= rho_max_expr, idx=1),
    ]
)

# Thrust pointing constraint (thrust cant angle)
constraints.append(
    ox.ctcs(ox.Cos(thrust_pointing_angle) <= thrust[2] / ox.linalg.Norm(thrust), idx=2)
)

# Glideslope constraint
constraints.append(
    ox.ctcs(ox.linalg.Norm(position[:2]) <= ox.Tan(glideslope_angle) * position[2], idx=3)
)

# Initial position constraint
constraints.append((position == initial_position).convex().at([0]))

# Terminal position constraint
constraints.append((position[0:2] == final_position).convex().at([n - 1]))


# Define dynamics as dictionary mapping state names to their derivatives
g_vec = np.array([0, 0, 1], dtype=np.float64) * g  # Gravitational acceleration vector

dynamics = {
    "position": velocity,
    "velocity": thrust / mass[0] - g_vec,
    "mass": -ox.linalg.Norm(thrust) / (I_sp * g_e * ox.Cos(theta)),
}

# Build the problem
time = ox.Time(
    initial=0.0,
    final=("free", total_time),
    min=0.0,
    max=2e2,
)

problem = Problem(
    dynamics=dynamics,
    states=states,
    controls=controls,
    time=time,
    constraints=constraints,
    N=n,
    algorithm={
        "autotuner": ox.ConstantProximalWeight(),
        "lam_cost": 1 / 3,
        "lam_vc": 2e0,
        "lam_prox": 1e0,
    },
    discretizer={"ode_solver": "Dopri8"},
)

install_pdg_scp_console_style(problem)

plotting_dict = {
    "rho_min": n_eng * T1.value * np.cos(theta.value),
    "rho_max": n_eng * T2.value * np.cos(theta.value),
    "glideslope_angle_deg": 86.0,
}

if __name__ == "__main__":
    initialize_problem_quiet(problem)
    problem.solve()
    results = problem.post_process()
    results.update(plotting_dict)

    # Create PDG trajectory visualization
    scene_scale = 100
    traj_server = create_pdg_animated_plotting_server(
        results,
        thrust_key="thrust",
        glideslope_angle_deg=86.0,
        scene_scale=1.0,
    )

    # Create SCP iteration visualization
    scp_server = create_scp_animated_plotting_server(
        results,
        frame_duration_ms=200,
        scene_scale=100.0,
    )

    # Keep servers running
    traj_server.sleep_forever()