import numpy as np
from numpy.typing import NDArray
from typing import Tuple, Optional
[docs]
class MPC:
"""
Model Predictive Control (MPC) for constrained optimal control.
Based on Mayne et al. (2000), "Constrained model predictive control:
Stability and optimality", Automatica, Vol. 36, No. 6, pp. 789-814.
MPC solves a finite-horizon optimal control problem at each time step,
applying only the first control input and resolving (receding horizon).
"""
[docs]
@staticmethod
def f(
*,
mpc_state: Tuple[NDArray, NDArray, float],
xhat_k: NDArray,
xref_traj: NDArray,
A: NDArray,
B: NDArray,
Q: NDArray,
R: NDArray,
P: NDArray,
N: int,
u_min: Optional[NDArray] = None,
u_max: Optional[NDArray] = None,
x_min: Optional[NDArray] = None,
x_max: Optional[NDArray] = None
) -> Tuple[NDArray, NDArray, float]:
"""
Solve the MPC optimization problem for linear discrete-time systems.
Parameters
----------
mpc_state : Tuple[NDArray, NDArray, float]
Previous MPC solution ``(u_opt, x_pred, cost)`` containing:
- ``u_opt`` : optimal control sequence from last solve, shape (N, m)
- ``x_pred`` : predicted state trajectory, shape (N+1, n)
- ``cost`` : optimal cost value from last solve
xhat_k : NDArray
Current state estimate, shape (n,)
xref_traj : NDArray
Reference state trajectory, shape (N+1, n)
A : NDArray
State transition matrix, shape (n, n)
B : NDArray
Control input matrix, shape (n, m)
Q : NDArray
Stage cost state weight (positive semi-definite), shape (n, n)
R : NDArray
Stage cost control weight (positive definite), shape (m, m)
P : NDArray
Terminal cost weight (positive semi-definite), shape (n, n)
Typically set to LQR DARE solution for stability
N : int
Prediction horizon length
u_min : NDArray, optional
Lower bound on control inputs, shape (m,)
u_max : NDArray, optional
Upper bound on control inputs, shape (m,)
x_min : NDArray, optional
Lower bound on states, shape (n,)
x_max : NDArray, optional
Upper bound on states, shape (n,)
Returns
-------
u_opt : NDArray
Optimal control sequence over horizon, shape (N, m)
x_pred : NDArray
Predicted state trajectory, shape (N+1, n)
cost : float
Optimal cost value
Notes
-----
The MPC optimization problem at time k is:
.. math::
\\min_{u_0, ..., u_{N-1}} \\sum_{i=0}^{N-1} \\left( ||x_i - x_{ref,i}||_Q^2 + ||u_i||_R^2 \\right) + ||x_N - x_{ref,N}||_P^2
subject to:
.. math::
x_{i+1} &= A x_i + B u_i, \\quad i = 0, ..., N-1 \\\\
x_0 &= \\hat{x}_k \\quad \\text{(initial condition)} \\\\
u_{min} \\leq u_i &\\leq u_{max}, \\quad i = 0, ..., N-1 \\quad \\text{(input constraints)} \\\\
x_{min} \\leq x_i &\\leq x_{max}, \\quad i = 1, ..., N \\quad \\text{(state constraints)}
**Receding Horizon Principle:**
Only the first control input ``u_0`` is applied. At the next time step,
the optimization is resolved with updated state and reference.
**Stability:**
For unconstrained MPC with terminal cost P equal to the LQR DARE solution,
the MPC controller is equivalent to LQR and guarantees closed-loop stability.
With constraints, stability can be ensured via terminal constraint sets.
**Solver:**
Uses CVXPY with OSQP backend for fast quadratic programming.
"""
# Lazy import cvxpy (only when MPC is actually used)
try:
import cvxpy as cp
except ImportError:
raise ImportError(
"cvxpy is required for MPC but not installed. "
"Install it with: pip install cvxpy>=1.3"
)
n = A.shape[0] # State dimension
m = B.shape[1] # Control dimension
# Flatten current state
x0 = np.atleast_1d(xhat_k).flatten()
# Create optimization variables
x = cp.Variable((N + 1, n))
u = cp.Variable((N, m))
# Build cost function
cost_expr = 0
for i in range(N):
# Stage cost
x_err = x[i, :] - xref_traj[i, :]
cost_expr += cp.quad_form(x_err, Q) + cp.quad_form(u[i, :], R)
# Terminal cost
x_err_terminal = x[N, :] - xref_traj[N, :]
cost_expr += cp.quad_form(x_err_terminal, P)
# Build constraints
constraints = []
# Initial condition
constraints.append(x[0, :] == x0)
# Dynamics constraints
for i in range(N):
constraints.append(x[i + 1, :] == A @ x[i, :] + B @ u[i, :])
# Input constraints
if u_min is not None:
for i in range(N):
constraints.append(u[i, :] >= u_min)
if u_max is not None:
for i in range(N):
constraints.append(u[i, :] <= u_max)
# State constraints
if x_min is not None:
for i in range(1, N + 1):
constraints.append(x[i, :] >= x_min)
if x_max is not None:
for i in range(1, N + 1):
constraints.append(x[i, :] <= x_max)
# Formulate and solve QP
problem = cp.Problem(cp.Minimize(cost_expr), constraints)
try:
problem.solve(solver=cp.OSQP, verbose=False)
if problem.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
# Solver failed, return safe fallback (zero control)
print(f"Warning: MPC solver status: {problem.status}. Using fallback control.")
u_opt = np.zeros((N, m))
x_pred = np.tile(x0, (N + 1, 1))
cost_val = np.inf
else:
# Extract solution
u_opt = u.value
x_pred = x.value
cost_val = problem.value
except Exception as e:
print(f"Warning: MPC solver exception: {e}. Using fallback control.")
u_opt = np.zeros((N, m))
x_pred = np.tile(x0, (N + 1, 1))
cost_val = np.inf
return u_opt, x_pred, cost_val
[docs]
@staticmethod
def h(mpc_state: Tuple[NDArray, NDArray, float]) -> NDArray:
"""
Extract the first control input (receding horizon principle).
Parameters
----------
mpc_state : Tuple[NDArray, NDArray, float]
MPC solution ``(u_opt, x_pred, cost)``
Returns
-------
uk : NDArray
Control input to apply at current time, shape (m,)
Notes
-----
The receding horizon principle:
- Solve optimization over N-step horizon
- Apply only first control input ``u_0``
- At next time step, shift horizon and resolve
This is the key to MPC's ability to handle constraints and
time-varying references/disturbances.
Examples
--------
>>> import numpy as np
>>> u_opt = np.array([[1.0], [0.8], [0.6]]) # 3-step horizon
>>> x_pred = np.zeros((4, 2)) # Predicted trajectory
>>> cost = 5.2
>>> mpc_state = (u_opt, x_pred, cost)
>>> uk = MPC.h(mpc_state)
>>> uk
array([1.])
"""
u_opt, _, _ = mpc_state
# Return first control input
if u_opt.ndim == 1:
return u_opt
else:
return u_opt[0, :].flatten()
[docs]
@staticmethod
def solve_single_step(
*,
xk: NDArray,
xref: NDArray,
A: NDArray,
B: NDArray,
Q: NDArray,
R: NDArray,
P: NDArray,
N: int,
u_min: Optional[NDArray] = None,
u_max: Optional[NDArray] = None,
x_min: Optional[NDArray] = None,
x_max: Optional[NDArray] = None,
warm_start: Optional[Tuple[NDArray, NDArray, float]] = None
) -> Tuple[NDArray, NDArray]:
"""
Simplified MPC interface for single-step solving.
Parameters
----------
xk : NDArray
Current state, shape (n,)
xref : NDArray
Reference state (constant over horizon), shape (n,)
A, B, Q, R, P : NDArray
System matrices and cost weights
N : int
Prediction horizon
u_min, u_max, x_min, x_max : NDArray, optional
Constraint bounds
warm_start : Tuple, optional
Previous MPC solution for warm-starting
Returns
-------
u_opt : NDArray
Optimal control sequence, shape (N, m) or (N,) for scalar
x_pred : NDArray
Predicted trajectory, shape (N+1, n)
"""
# Build constant reference trajectory
n = len(xk)
xref_traj = np.tile(xref, (N + 1, 1)) if xref.ndim == 1 else np.tile(xref.reshape(1, -1), (N + 1, 1))
# Initialize or use warm start
if warm_start is None:
m = B.shape[1]
u_init = np.zeros((N, m))
x_init = np.zeros((N + 1, n))
cost_init = 0.0
mpc_state = (u_init, x_init, cost_init)
else:
mpc_state = warm_start
# Solve MPC
u_opt, x_pred, cost = MPC.f(
mpc_state=mpc_state,
xhat_k=xk,
xref_traj=xref_traj,
A=A,
B=B,
Q=Q,
R=R,
P=P,
N=N,
u_min=u_min,
u_max=u_max,
x_min=x_min,
x_max=x_max
)
# Return first control input (receding horizon principle)
u_first = u_opt[0, 0] if u_opt.ndim == 2 and u_opt.shape[1] == 1 else u_opt[0]
return u_first, x_pred
[docs]
@staticmethod
def simple_f(
*,
xk: NDArray,
xref: NDArray,
A: NDArray,
B: NDArray,
Q: NDArray,
R: NDArray,
P: NDArray,
N: int,
u_min: Optional[NDArray] = None,
u_max: Optional[NDArray] = None,
x_min: Optional[NDArray] = None,
x_max: Optional[NDArray] = None,
mpc_output: Optional[Tuple[NDArray, NDArray]] = None
) -> Tuple[NDArray, NDArray]:
"""
Simplified stateless MPC for DynamicalSystem integration.
This is a wrapper around standard_f that handles state initialization.
Returns (u_opt, x_pred) for compatibility with DynamicalSystem.
Parameters are same as solve_single_step.
"""
u_opt, x_pred = MPC.solve_single_step(
xk=xk,
xref=xref,
A=A,
B=B,
Q=Q,
R=R,
P=P,
N=N,
u_min=u_min,
u_max=u_max,
x_min=x_min,
x_max=x_max
)
return u_opt, x_pred
[docs]
@staticmethod
def simple_h(mpc_output: Tuple[NDArray, NDArray]) -> NDArray:
"""Extract first control from simple_f output."""
u_opt, x_pred = mpc_output
return u_opt
# Module-level aliases for convenience
# Allows usage like: from pykal.algorithm_library.controllers import mpc; mpc.f(...)
f = MPC.f
h = MPC.h
solve_single_step = MPC.solve_single_step
simple_f = MPC.simple_f
simple_h = MPC.simple_h