Skip to content

Double Integrator Discrete

MPCC example: 3D double integrator tracking a circular reference trajectory (discrete).

Based on the MPCC formulation from Romero et al. (2022).

Demonstrates model-predictive contouring control (MPCC) with:

  • 3D double integrator (point mass) dynamics (position + velocity, force control)
  • Discrete reference path via Cinterp (sampled from a periodic function arbitrary 3D paths)
  • Lag/contour error decomposition following Romero 2022
  • Receding horizon closed-loop simulation

File: examples/mpc/double_integrator_discrete.py

import os
import sys

import numpy as np

# Add grandparent directory to path to import openscvx
current_dir = os.path.dirname(os.path.abspath(__file__))
grandparent_dir = os.path.dirname(os.path.dirname(current_dir))
sys.path.append(grandparent_dir)

import openscvx as ox
from openscvx import Problem
from openscvx.plotting import plot_scp_iterations, plot_states
from openscvx.plotting.viser import (
    add_animated_trail,
    add_animation_controls,
    add_ghost_trajectory,
    add_position_marker,
    compute_velocity_colors,
    create_server,
)

###############################################################################
# Reference path shape
###############################################################################


def wavy_circle(M: int, R: float = 3.0, z_amp: float = 0.5, z_freq: int = 3) -> np.ndarray:
    """Circle in the xy-plane with a sinusoidal z-component."""
    theta = np.linspace(0, 2 * np.pi, M, endpoint=False)
    return np.column_stack(
        [
            R * np.cos(theta),
            R * np.sin(theta),
            z_amp * np.sin(z_freq * theta),
        ]
    )


def torus_knot(M: int, R: float = 3.0, r: float = 1.0, p: int = 2, q: int = 3) -> np.ndarray:
    """(p, q) torus knot. p=2, q=3 gives a trefoil knot."""
    theta = np.linspace(0, 2 * np.pi, M, endpoint=False)
    return np.column_stack(
        [
            (R + r * np.cos(q * theta)) * np.cos(p * theta),
            (R + r * np.cos(q * theta)) * np.sin(p * theta),
            r * np.sin(q * theta),
        ]
    )


def lissajous_3d(
    M: int,
    R: float = 3.0,
    a: int = 1,
    b: int = 2,
    c: int = 3,
    phi: float = np.pi / 2,
    z_amp: float = 1.0,
) -> np.ndarray:
    """3D Lissajous curve. Periodic when a, b, c are integers."""
    theta = np.linspace(0, 2 * np.pi, M, endpoint=False)
    return np.column_stack(
        [
            R * np.cos(a * theta),
            R * np.sin(b * theta + phi),
            z_amp * np.sin(c * theta),
        ]
    )


def figure_eight(M: int, R: float = 3.0, z_amp: float = 1.0) -> np.ndarray:
    """Figure-eight knot curve."""
    theta = np.linspace(0, 2 * np.pi, M, endpoint=False)
    scale = R / 3.0  # Normalize so max radial extent ~ R
    return np.column_stack(
        [
            scale * (2 + np.cos(2 * theta)) * np.cos(3 * theta),
            scale * (2 + np.cos(2 * theta)) * np.sin(3 * theta),
            z_amp * np.sin(4 * theta),
        ]
    )


###############################################################################
# Reference path parameters
###############################################################################
R_circle = 3.0  # Radius of the reference circle
center = np.array([0.0, 0.0, 0.0])  # Center of the reference path (3D)
tilt_angle = np.radians(45)  # Tilt out of the xy-plane (rotation about x-axis)

# Rotation matrix: tilt about x-axis by tilt_angle
R_tilt = np.array(
    [
        [1, 0, 0],
        [0, np.cos(tilt_angle), -np.sin(tilt_angle)],
        [0, np.sin(tilt_angle), np.cos(tilt_angle)],
    ]
)

###############################################################################
# Discrete reference path
###############################################################################
M = 30  # Number of samples per lap
path_fn = wavy_circle  # Swap this for any (M) -> (M, 3) function

# Sample one lap in the local frame, then rotate into 3D
path_local = path_fn(M, R=R_circle)
path_rotated = (R_tilt @ path_local.T).T + center
px_lap = path_rotated[:, 0]
py_lap = path_rotated[:, 1]
pz_lap = path_rotated[:, 2]

# Arc-length parametrization via cumulative chord lengths
dp = np.diff(path_rotated, axis=0, append=path_rotated[:1])  # wrap to start
chord_lengths = np.linalg.norm(dp, axis=1)
total_arc_length = np.sum(chord_lengths)  # Recompute from actual path
s_lap = np.concatenate([[0.0], np.cumsum(chord_lengths[:-1])])

# Tile to cover [progress.min, progress.max] = [-0.5L, 1.5L]
s_min, s_max = -0.5 * total_arc_length, 1.5 * total_arc_length
n_before = int(np.ceil(-s_min / total_arc_length))
n_after = int(np.ceil(s_max / total_arc_length))
laps = range(-n_before, n_after + 1)

s_data = np.concatenate([s_lap + k * total_arc_length for k in laps])
px_data = np.tile(px_lap, len(laps))
py_data = np.tile(py_lap, len(laps))
pz_data = np.tile(pz_lap, len(laps))


###############################################################################
# MPCC parameters
###############################################################################
n_mpc = 10  # Horizon nodes
horizon_duration = 1.0  # Horizon length [s]

Q_LAG = 1e-1  # Lag error weight (high -> accurate progress tracking)
Q_CONTOUR = 1e-2  # Contour error weight
Q_PROGRESS = 1e-2

###############################################################################
# MPCC problem definition
###############################################################################

# --- States ---
position = ox.State("position", shape=(3,))
position.min = np.array([-10.0, -10.0, -10.0])
position.max = np.array([10.0, 10.0, 10.0])
position.initial = path_rotated[0]  # Start at first sample point
position.final = [ox.Free(0.0), ox.Free(0.0), ox.Free(0.0)]

velocity = ox.State("velocity", shape=(3,))
velocity.min = np.array([-20.0, -20.0, -20.0])
velocity.max = np.array([20.0, 20.0, 20.0])
velocity.initial = np.array([0.0, 0.0, 0.0])
velocity.final = [ox.Free(0.0), ox.Free(0.0), ox.Free(0.0)]

progress = ox.State("progress", shape=(1,))  # Arc-length progress theta_hat
progress.min = np.array([-0.5 * total_arc_length])
progress.max = np.array([1.5 * total_arc_length])
progress.initial = np.array([0.0])
progress.final = [ox.Maximize(0.0)]

lag_sum = ox.State("lag_sum", shape=(1,))  # Integrated lag cost
lag_sum.min = np.array([0.0])
lag_sum.max = np.array([1e-3])
lag_sum.initial = np.array([0.0])
lag_sum.final = [ox.Minimize(0.0)]

contour_sum = ox.State("contour_sum", shape=(1,))  # Integrated contour cost
contour_sum.min = np.array([0.0])
contour_sum.max = np.array([5e-3])
contour_sum.initial = np.array([0.0])
contour_sum.final = [ox.Minimize(0.0)]

# --- Controls ---
force = ox.Control("force", shape=(3,))
f_max = 20.0
force.min = np.array([-f_max, -f_max, -f_max])
force.max = np.array([f_max, f_max, f_max])
force.guess = np.zeros((n_mpc, 3))

progress_rate = ox.Control("progress_rate", shape=(1,))  # d(theta_hat)/dt
progress_rate.min = np.array([1.5])  # Forward only
progress_rate.max = np.array([10.0])
progress_rate.guess = np.full((n_mpc, 1), 5.0)

m = 1.0  # Mass

# --- Reference trajectory (discrete, via Cinterp) ---
p_ref = ox.Concat(
    ox.Cinterp(progress[0], s_data, px_data),
    ox.Cinterp(progress[0], s_data, py_data),
    ox.Cinterp(progress[0], s_data, pz_data),
)

# Tangent: derivative of the position spline, sampled at breakpoints and
# re-interpolated with a second Cinterp for a smooth tangent field.
from scipy.interpolate import CubicSpline as _CS

_dpx = _CS(s_data, px_data)(s_data, 1)
_dpy = _CS(s_data, py_data)(s_data, 1)
_dpz = _CS(s_data, pz_data)(s_data, 1)
_tnorm = np.sqrt(_dpx**2 + _dpy**2 + _dpz**2)
tx_data = _dpx / _tnorm
ty_data = _dpy / _tnorm
tz_data = _dpz / _tnorm

tangent = ox.Concat(
    ox.Cinterp(progress[0], s_data, tx_data),
    ox.Cinterp(progress[0], s_data, ty_data),
    ox.Cinterp(progress[0], s_data, tz_data),
)

# --- Error decomposition (position-only, per Romero 2022 Fig. 2) ---
e = position - p_ref  # Position error vector (3,)

# Lag: projection of error onto tangent direction
lag_scalar = ox.Sum(e * tangent)  # Dot product (scalar)
lag_cost = lag_scalar**2

# Contour: Pythagorean decomposition  |e_c|^2 = |e|^2 - |e_l|^2
# Use Sum(e*e) instead of Norm(e)**2 to avoid d(Norm)/de = e/||e|| singularity at e=0
contour_cost = ox.Max(ox.Sum(e * e) - lag_scalar**2, 0.0)

# --- Dynamics ---
dynamics = {
    "position": velocity,
    "velocity": (1 / m) * force,
    "progress": progress_rate,
    "lag_sum": lag_cost,
    "contour_sum": contour_cost,
}

# --- Constraints ---
states = [position, velocity, progress, lag_sum, contour_sum]
controls = [force, progress_rate]

constraints = []
for state in [position, velocity]:
    constraints.extend([ox.ctcs(state <= state.max), ox.ctcs(state.min <= state)])

# --- Time ---
t = ox.Time(
    initial=0.0, final=horizon_duration, min=0.0, max=horizon_duration, uniform_time_grid=True
)

# --- Problem ---
problem_mpc = Problem(
    dynamics=dynamics,
    states=states,
    controls=controls,
    time=t,
    constraints=constraints,
    N=n_mpc,
    algorithm={
        "autotuner": ox.ConstantProximalWeight(),
        "lam_cost": {"lag_sum": Q_LAG, "contour_sum": Q_CONTOUR, "progress": Q_PROGRESS},
    },
)

problem_mpc.settings.dev.printing = False


###############################################################################
# Initial guesses
###############################################################################
def set_initial_guess(
    theta_start: float = 0.0,
    ref_speed: float = 5.0,
):
    """Set guesses by interpolating the discrete reference path.

    Only requires (s_data, px_data, py_data, pz_data) and a nominal speed — no
    analytical path knowledge or precomputed tangent data.
    """
    arc_guess = np.linspace(theta_start, theta_start + ref_speed * horizon_duration, n_mpc)

    # Position: interpolate from reference sample nodes
    pos_guess = np.column_stack(
        [
            np.interp(arc_guess, s_data, px_data),
            np.interp(arc_guess, s_data, py_data),
            np.interp(arc_guess, s_data, pz_data),
        ]
    )
    position.guess = pos_guess

    # Velocity: finite-difference of position guess
    dt = horizon_duration / (n_mpc - 1)
    vel_guess = np.gradient(pos_guess, dt, axis=0)
    velocity.guess = vel_guess
    velocity.initial = vel_guess[0]

    # Force: finite-difference of velocity (mass * acceleration)
    acc_guess = np.gradient(vel_guess, dt, axis=0)
    force.guess = m * acc_guess

    progress.guess = arc_guess.reshape(-1, 1)
    lag_sum.guess = np.zeros((n_mpc, 1))
    contour_sum.guess = np.zeros((n_mpc, 1))

    progress_rate.guess = np.full((n_mpc, 1), ref_speed)


###############################################################################
# Closed-loop simulation
###############################################################################
def shift_guess(nodes: dict):
    """Shift previous solution by one node for warm-starting."""
    dt = horizon_duration / (n_mpc - 1)

    # Extrapolate a new final node
    pos_last = nodes["position"][-1]
    vel_last = nodes["velocity"][-1]
    force_last = nodes["force"][-1]
    pr_last = nodes["progress_rate"][-1, 0]

    ext_pos = pos_last + dt * vel_last
    ext_vel = vel_last + dt * (1 / m) * force_last
    ext_prog = nodes["progress"][-1, 0] + dt * pr_last

    shifted_progress = np.vstack([nodes["progress"][1:], [[ext_prog]]])
    wrap_offset = (nodes["progress"][1, 0] // total_arc_length) * total_arc_length
    shifted_progress -= wrap_offset

    position.guess = np.vstack([nodes["position"][1:], [ext_pos]])
    velocity.guess = np.vstack([nodes["velocity"][1:], [ext_vel]])
    progress.guess = shifted_progress

    lag_offset = nodes["lag_sum"][1]
    lag_sum.guess = np.maximum(
        np.vstack([nodes["lag_sum"][1:] - lag_offset, nodes["lag_sum"][-1:] - lag_offset]),
        0.0,
    )

    contour_offset = nodes["contour_sum"][1]
    contour_sum.guess = np.maximum(
        np.vstack(
            [
                nodes["contour_sum"][1:] - contour_offset,
                nodes["contour_sum"][-1:] - contour_offset,
            ]
        ),
        0.0,
    )

    force.guess = np.vstack([nodes["force"][1:], [0, 0, 0]])
    progress_rate.guess = np.vstack([nodes["progress_rate"][1:], nodes["progress_rate"][-1:]])


def update_initial_conditions(nodes: dict):
    """Set initial conditions from node 1 of previous solution (simulate one step)."""
    position.initial = nodes["position"][1]
    velocity.initial = nodes["velocity"][1]

    wrap_offset = (nodes["progress"][1, 0] // total_arc_length) * total_arc_length
    progress.initial = np.array([nodes["progress"][1, 0] - wrap_offset])

    # Cost integrators always restart from zero each horizon
    lag_sum.initial = np.array([0.0])
    contour_sum.initial = np.array([0.0])


###############################################################################
# Main
###############################################################################
if __name__ == "__main__":
    set_initial_guess(theta_start=0.0)

    problem_mpc.initialize()

    max_steps = 100
    dt_mpc = horizon_duration / (n_mpc - 1)  # Time between MPC steps
    node1_time = dt_mpc  # Time of node 1 in each horizon

    # --- Run MPC loop, collecting data ---
    actual_segments = []
    actual_vel_segments = []
    actual_time_segments = []
    horizon_trajectories = []
    horizon_velocities = []

    for step in range(max_steps):
        problem_mpc.reset()
        results = problem_mpc.solve()
        results = problem_mpc.post_process()
        nodes = results.nodes

        # Slice trajectory from node 0 to node 1 (the executed segment)
        traj_time = results.trajectory["time"].flatten()
        seg_end = np.searchsorted(traj_time, node1_time, side="right")
        actual_segments.append(results.trajectory["position"][:seg_end].copy())
        actual_vel_segments.append(results.trajectory["velocity"][:seg_end].copy())
        actual_time_segments.append(traj_time[:seg_end].copy())

        horizon_trajectories.append(results.trajectory["position"].copy())
        horizon_velocities.append(results.trajectory["velocity"].copy())

        cur_pos = nodes["position"][0]
        cur_progress = nodes["progress"][0, 0]
        cur_lag = nodes["lag_sum"][-1, 0]
        cur_contour = nodes["contour_sum"][-1, 0]

        laps = cur_progress / total_arc_length
        print(
            f"step {step:3d}: progress={cur_progress:7.2f} "
            f"({laps:.2f} laps), "
            f"lag_cost={cur_lag:.4f}, contour_cost={cur_contour:.4f}, "
            f"pos=[{cur_pos[0]:+6.2f}, {cur_pos[1]:+6.2f}, {cur_pos[2]:+6.2f}]"
        )

        update_initial_conditions(nodes)
        shift_guess(nodes)

    # --- Build dense actual trajectory and horizon lookup ---
    actual_path = np.concatenate(actual_segments, axis=0)
    actual_vel = np.concatenate(actual_vel_segments, axis=0)
    actual_colors = compute_velocity_colors(actual_vel)

    frames_per_step = np.array([len(seg) for seg in actual_segments])
    step_boundaries = np.cumsum(frames_per_step)  # frame index where each step ends

    # Dense time array: offset each segment's local time by the MPC step
    actual_time = np.concatenate(
        [seg_t + i * dt_mpc for i, seg_t in enumerate(actual_time_segments)]
    )

    # Horizon rollout colors (single global viridis mapping across all steps)
    all_horizon_vel = np.concatenate(horizon_velocities, axis=0)
    all_horizon_colors = compute_velocity_colors(all_horizon_vel)
    cumulative_horizon_pts = np.cumsum([len(hv) for hv in horizon_velocities])
    horizon_colors = np.split(all_horizon_colors, cumulative_horizon_pts[:-1])

    # --- Viser visualization ---
    server = create_server(actual_path)

    # Reference path (static)
    ref_dense = path_fn(400, R=R_circle)
    ref_dense = (R_tilt @ ref_dense.T).T + center
    ref_dense = np.vstack([ref_dense, ref_dense[:1]])
    ref_colors = np.full((len(ref_dense), 3), fill_value=[255, 80, 80], dtype=np.uint8)
    server.scene.add_point_cloud(
        "/reference",
        points=ref_dense.astype(np.float32),
        colors=(ref_colors * 0.5).astype(np.uint8),
        point_size=0.02,
    )

    # Ghost of all MPC horizons (faint background)
    all_horizon_points = np.concatenate(horizon_trajectories, axis=0)
    add_ghost_trajectory(server, all_horizon_points, all_horizon_colors, point_size=0.005)

    # Animated actual trail (grows as drone flies, uses library primitive)
    _, update_trail = add_animated_trail(server, actual_path, actual_colors, point_size=0.03)

    # Position marker at current drone position
    _, update_marker = add_position_marker(server, actual_path, radius=0.05)

    # Horizon rollout pop-in: shows the current planned horizon
    horizon_handle = server.scene.add_point_cloud(
        "/horizon_rollout",
        points=horizon_trajectories[0].astype(np.float32),
        colors=horizon_colors[0],
        point_size=0.015,
    )

    def update_horizon(frame_idx):
        """Swap in the horizon rollout for the current MPC step."""
        step = int(np.searchsorted(step_boundaries, frame_idx, side="right"))
        step = min(step, max_steps - 1)
        horizon_handle.points = horizon_trajectories[step].astype(np.float32)
        horizon_handle.colors = horizon_colors[step]

    add_animation_controls(
        server,
        actual_time,
        [update_trail, update_marker, update_horizon],
    )

    plot_states(results).show()
    plot_scp_iterations(results).show()

    server.sleep_forever()