{ "cells": [ { "cell_type": "markdown", "id": "933381a6", "metadata": {}, "source": [ "[← Control Systems as Dynamical Systems](../../../getting_started/theory_to_python/control_systems_as_dynamical_systems.rst)" ] }, { "cell_type": "markdown", "id": "121971fc", "metadata": { "lines_to_next_cell": 0 }, "source": [ "# Example: Car Cruise Control" ] }, { "cell_type": "markdown", "id": "bc110efa", "metadata": {}, "source": [ "Suppose we want to build a robot. If we were hobbyists, we might want a weekend project; if we were engineers, we might need an extra arm on the assembly line; if we were academics, we might dream of a quadruped with a cannon-head to threaten the government for grant money.\n", "\n", "Of course, we must begin somewhere. In this notebook, we take a classic example of a feedback system -- a car with cruise control -- and use ``pykal`` to take it from a block diagram to a composition of dynamical systems." ] }, { "cell_type": "markdown", "id": "8016da19", "metadata": {}, "source": [ "## System Overview\n", " We consider here a gross simplification: our car will only ever drive on a perfectly flat plane with minimal drag.\n", "\n", "We are given an acceleromoter (which will give us a measurement of our current speed), a button on the steering wheel (which will enable us to modify the cruise control speed), and the car's CPU (i.e. we can implement any algorithm we want)." ] }, { "cell_type": "markdown", "id": "422665ce", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## Dynamical System Graph\n", ":::{warning}\n", "This section assumes you have already gone through the [Dynamical System](./dynamical_system.ipynb) notebook. If you have not, please do so now.\n", ":::\n", "\n", " Recall the following block diagram for a simple feedback system:\n", "\n", "\n", "\n", "where $r$ is the **setpoint** (or **reference point**),\n", "$u$ is the **control input**,\n", "$x$ is the (hidden) **state** of the plant,\n", "$\\hat{x}$ is the **state estimate**,\n", "and $e$ is the **error** term.\n", "\n", "In the case of our car cruise control system, we can leverage this diagram with a simple relabling:\n", "\n", "\n", "\n", "where the **PID** is a proportional-integrative-differential control algorithm, **KF** is a Kalman filter, and the **car** is, well, the car.\n", ":::{note} If you were to ask five control theorists to design a block diagram, you would get six different diagrams. Don't fret the particulars; the diagram is simply a tool to organize our thinking.\n", ":::\n", "\n", "We now proceed with casting the block diagram defined above as **dynamical system graph**:\n", "\n", "\n", "\n", "We construct this graph block-by-block in the sections that follow.\n", "\n", ":::{note} Although the signals between blocks have remained, for the most part, unchanged, the **summing junction** and **inverter** have dissapeared. Indeed, they've been absorbed into the PID block, as our [implementation of the algorithm](../../algorithm_library/pid_pykal.ipynb) computes the error term internally.\n", ":::" ] }, { "cell_type": "markdown", "id": "27d3fd3f", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Block 1: Setpoint Generator\n", "\n", "
\n", "\n", "
\n", "\n", "The setpoint generator maintains the cruise control speed reference based on the driver's button inputs.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "9481b4a4", "metadata": { "lines_to_next_cell": 0 }, "outputs": [], "source": [ "# ---- SETPOINT BLOCK ---------------------\n", "\n", "#### ARROWS IN ####\n", "bk = int # button input; bk in {-1, 0, 1} (decrement, neutral, increment)\n", "\n", "#### PARAMETERS ############\n", "sk = int\n", "bk = int # Current setpoint state (cruise control setpoint in mph)\n", "\n", "#### ARROWS OUT ####\n", "rk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "\n", "from pykal import DynamicalSystem\n", "\n", "\n", "def setpoint_f(sk: float, bk: int) -> float:\n", " \"\"\"Increment or decrement setpoint based on button input.\"\"\"\n", " return sk + bk\n", "\n", "\n", "def setpoint_h(\n", " sk: float,\n", ") -> float:\n", " \"\"\"Return the current setpoint speed.\"\"\"\n", " rk = sk\n", " return rk\n", "\n", "\n", "setpoint_block = DynamicalSystem(f=setpoint_f, h=setpoint_h)" ] }, { "cell_type": "markdown", "id": "500e1e38", "metadata": { "lines_to_next_cell": 0 }, "source": [ " To get a feel for our setpoint dynamical system, we simulate the following scenario: we press up on the button once a second for 30 seconds." ] }, { "cell_type": "code", "execution_count": null, "id": "da9d515d", "metadata": { "lines_to_next_cell": 0 }, "outputs": [], "source": [ "import numpy as np\n", "\n", "# ---- SETPOINT GENERATOR BLOCK -------------------\n", "\n", "#### ARROWS IN #########################\n", "bk = 1\n", "\n", "#### PARAMETERS #########################\n", "sk = 20\n", "bk = bk\n", "\n", "#### ARROWS OUT #########################\n", "rk = None\n", "# ---- -------------------\n", "\n", "### SIMULATION VARIABLES ############\n", "dt = 1\n", "sim_time = np.arange(0, 30, dt)\n", "rk_hist = []\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", " sk_next, rk = setpoint_block.step(params={\"sk\": sk, \"bk\": bk})\n", " sk = sk_next\n", "\n", " rk_hist.append(rk)" ] }, { "cell_type": "code", "execution_count": null, "id": "51ff1106", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# plotting\n", "from matplotlib import pyplot as plt\n", "\n", "plt.plot(sim_time, rk_hist)\n", "plt.xlabel(\"time (seconds)\")\n", "plt.ylabel(\"rk (setpoint speed)\")\n", "plt.title(\"Setpoint Speed Over Time\")\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "4ac58458", "metadata": { "lines_to_next_cell": 0 }, "source": [ "#### Simulating Different Scenarios\n", "\n", "\n", "We are limited only by our creativity. Here, we simulate the following scenario:\n", "\n", "\"Our initial setpoint speed is **20 mph**. After **five seconds**, we feel bold and hold \"up\" on the cruise control button. Since we are holding the button, the button does not increment the speed setpoint once per second but instead at a rate of **three times a second** (many buttons have a such feature).\n", "\n", "Out cruise control maxes out at **80 mph**. Once we reach our maximum setpoint speed, we maintain it for **five seconds** until we wiener out -- at which point, we hold \"down\" on the cruise control button, which decrements **three times a second** until our setpoint is back at **20 mph.**\"\n", "\n", "We can model this scenario easily (note that we are still focusing on the setpoint speed, not the actual speed of the car; that will come later)." ] }, { "cell_type": "code", "execution_count": null, "id": "9ab6f1f9", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# ---- SETPOINT GENERATOR BLOCK -------------------\n", "\n", "#### ARROWS IN #########################\n", "bk = 0\n", "\n", "#### PARAMETERS #########################\n", "sk = 20\n", "bk = bk\n", "\n", "#### ARROWS OUT #########################\n", "rk = None\n", "# --------------------------------------\n", "\n", "### SIMULATION VARIABLES ##############################\n", "# these are old\n", "dt = 1\n", "sim_time = np.arange(0, 60, dt)\n", "rk_hist = []\n", "\n", "# these are new\n", "button_mode = \"neutral\" # neutral | ramp_up | hold_max | ramp_down | done\n", "button_rate = 3\n", "max_rk = 80\n", "final_rk = 20\n", "seconds_at_which_we_ramp_up = 5\n", "seconds_to_spend_at_max_rk = 5\n", "seconds_passed_after_hitting_max_rk = 0\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", "\n", " # --- Decide bk ---\n", " if tk >= seconds_at_which_we_ramp_up and button_mode == \"neutral\":\n", " button_mode = \"ramp_up\"\n", "\n", " if button_mode == \"ramp_up\":\n", " bk = 1 * button_rate\n", " if sk >= max_rk:\n", " button_mode = \"hold_max\"\n", "\n", " if button_mode == \"hold_max\":\n", " bk = 0.0 * button_rate\n", " seconds_passed_after_hitting_max_rk += dt\n", "\n", " if seconds_passed_after_hitting_max_rk >= seconds_to_spend_at_max_rk:\n", " button_mode = \"ramp_down\"\n", "\n", " if button_mode == \"ramp_down\":\n", " bk = -1 * button_rate\n", " if sk <= final_rk:\n", " button_mode = \"done\"\n", " bk = 0.0 * button_rate\n", "\n", " sk_next, rk = setpoint_block.step(params={\"sk\": sk, \"bk\": bk, \"max_rk\": max_rk})\n", "\n", " rk_hist.append(rk)\n", "\n", " sk = np.clip(sk_next, 0, max_rk) # set a max setpoint speed, 0 <= sk <= max_rk" ] }, { "cell_type": "code", "execution_count": null, "id": "b00d107c", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "from matplotlib import pyplot as plt\n", "\n", "plt.plot(sim_time, rk_hist)\n", "plt.xlabel(\"time (seconds)\")\n", "plt.ylabel(\"rk (setpoint speed)\")\n", "plt.title(\"Setpoint Generator Simulation\")\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "771881d2", "metadata": {}, "source": [ "\n", "Not too shabby. This seems like a fun scenario to play with, so we will keep it around. But for the sake of convenience, we can compose the button logic above into *another* `DynamicalSystem` object." ] }, { "cell_type": "markdown", "id": "6948f54f", "metadata": { "lines_to_next_cell": 0 }, "source": [ "\n", "#### Composing Dynamical Systems\n", "\n", "We will call the scenario defined above, somewhate unimaginatively, as \"scenario_1\". After some thinking, we can cast the button logic in the scenario in an $(f,h)$-representation.\n", "\n", "We define a \"Scenario 1\" block and update our diagram as follows:\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "127d2a88", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ], "title": "# %%" }, "outputs": [], "source": [ "# ---- SCENARIO 1 BLOCK -------------------\n", "\n", "#### ARROWS IN ####\n", "# None\n", "\n", "#### PARAMETERS ############\n", "lk = {\n", " \"button_mode\": str,\n", " \"time_since_hit_max_rk\": float,\n", " \"bk\": float,\n", "} # scenario 1 state\n", "\n", "tk = float # current sim time\n", "sk = float # current setpoint state\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": float,\n", " \"max_rk\": float,\n", " \"final_rk\": float,\n", " \"time_at_which_we_ramp_up\": float,\n", " \"time_at_max_rk\": float,\n", "}\n", "#### ARROWS OUT ####\n", "bk = int\n", "\n", "# ---- SETPOINT BLOCK ---------------------\n", "#### ARROWS IN ####\n", "bk = int\n", "\n", "#### PARAMETERS ############\n", "sk = int\n", "bk = int\n", "\n", "#### ARROWS OUT ####\n", "sk = float\n", "rk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "\n", "from typing import Dict\n", "\n", "# This is the first $(f,h)$- representation that came to my mind, but there are plenty of others! Do whatever makes sense for you.\n", "\n", "\n", "def scenario_1_f(lk: Dict, tk: float, sk: float, **scen_1_const_params) -> Dict:\n", " \"\"\"\n", " Update scenario 1 state based on button logic.\n", "\n", " Implements a multi-stage button press scenario: neutral → ramp_up → hold_max → ramp_down → done.\n", " \"\"\"\n", " # extract elements of state\n", " button_mode = lk[\"button_mode\"]\n", " time_since_hit_max_rk = lk[\"time_since_hit_max_rk\"]\n", " bk = lk[\"bk\"]\n", "\n", " # extract constant params\n", " button_rate = scen_1_const_params[\"button_rate\"]\n", " max_rk = scen_1_const_params[\"max_rk\"]\n", " final_rk = scen_1_const_params[\"final_rk\"]\n", " time_at_which_we_ramp_up = scen_1_const_params[\"time_at_which_we_ramp_up\"]\n", " time_at_max_rk = scen_1_const_params[\"time_at_max_rk\"]\n", "\n", " # --- Literally the same code from earlier, just copy-pasted ---\n", " if tk >= time_at_which_we_ramp_up and button_mode == \"neutral\":\n", " button_mode = \"ramp_up\"\n", "\n", " if button_mode == \"ramp_up\":\n", " bk = 1 * button_rate\n", " if sk >= max_rk:\n", " button_mode = \"hold_max\"\n", "\n", " if button_mode == \"hold_max\":\n", " bk = 0.0 * button_rate\n", " time_since_hit_max_rk += dt\n", "\n", " if time_since_hit_max_rk >= time_at_max_rk:\n", " button_mode = \"ramp_down\"\n", "\n", " if button_mode == \"ramp_down\":\n", " bk = -1 * button_rate\n", " if sk <= final_rk:\n", " button_mode = \"done\"\n", " bk = 0.0 * button_rate\n", " # ------\n", "\n", " # update state dictionary\n", " lk_next = lk\n", " lk_next[\"button_mode\"] = button_mode\n", " lk_next[\"time_since_hit_max_rk\"] = time_since_hit_max_rk\n", " lk_next[\"bk\"] = bk\n", "\n", " return lk_next\n", "\n", "\n", "def scenario_1_h(lk: Dict) -> float:\n", " \"\"\"Extract button input from scenario 1 state.\"\"\"\n", " return lk[\"bk\"]\n", "\n", "\n", "scenario_1_block = DynamicalSystem(f=scenario_1_f, h=scenario_1_h)\n", "\n", "# ---- SETPOINT BLOCK ---------------------\n", "\n", "#### ARROWS IN ####\n", "bk = int # button input; bk in {-1, 0, 1} (decrement, neutral, increment)\n", "\n", "#### PARAMETERS ############\n", "sk = int\n", "bk = int # Current setpoint state (cruise control setpoint in mph)\n", "\n", "#### ARROWS OUT ####\n", "rk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "\n", "from pykal import DynamicalSystem\n", "\n", "\n", "def setpoint_f(sk: float, bk: int) -> float:\n", " \"\"\"Increment or decrement setpoint based on button input.\"\"\"\n", " return sk + bk\n", "\n", "\n", "def setpoint_h(\n", " sk: float,\n", ") -> float:\n", " \"\"\"Return the current setpoint speed.\"\"\"\n", " rk = sk\n", " return rk\n", "\n", "\n", "setpoint_block = DynamicalSystem(f=setpoint_f, h=setpoint_h)" ] }, { "cell_type": "markdown", "id": "19390134", "metadata": {}, "source": [ "\n", "Now we have a *much* cleaner simulation of our scenario above.\n", "\n", ":::{note}\n", "We immediately become less formal; going forward, we only will declare and initialize variables which **must** be defined prior to the simulation, and we will only define them once. This is to reduce code clutter and improve readability.\n", ":::" ] }, { "cell_type": "code", "execution_count": null, "id": "c4623dc1", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# ---- SCENARIO 1 BLOCK -------------------\n", "#### PARAMETERS ############\n", "lk = {\n", " \"button_mode\": \"neutral\",\n", " \"time_since_hit_max_rk\": 0,\n", " \"bk\": 0,\n", "}\n", "\n", "sk = 20 # initial setpoint sk\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": 3,\n", " \"max_rk\": 80,\n", " \"final_rk\": 20,\n", " \"time_at_which_we_ramp_up\": 5,\n", " \"time_at_max_rk\": 5,\n", "}\n", "\n", "# ---- SETPOINT GENERATOR BLOCK -------------------\n", "\n", "# no initialization needed\n", "\n", "### SIMULATION VARIABLES ##############################\n", "dt = 1\n", "sim_time = np.arange(0, 60, dt)\n", "rk_hist = []\n", "\n", "### SIMULATION ###################################################\n", "\n", "for tk in sim_time:\n", " lk_next, bk = scenario_1_block.step(\n", " params={\"lk\": lk, \"sk\": sk, \"tk\": tk, **scen_1_const_params}\n", " )\n", " sk_next, rk = setpoint_block.step(params={\"sk\": sk, \"bk\": bk})\n", "\n", " rk_hist.append(rk)\n", "\n", " sk = sk_next\n", " lk = lk_next" ] }, { "cell_type": "code", "execution_count": null, "id": "c6561a1f", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "from matplotlib import pyplot as plt\n", "\n", "plt.plot(sim_time, rk_hist)\n", "plt.xlabel(\"time (seconds)\")\n", "plt.ylabel(\"rk (setpoint speed)\")\n", "plt.title(\"Setpoint Generator with Callback Policy\")\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "6d74039c", "metadata": {}, "source": [ "To make this even more succinct, we can wrap the composition of dynamical systems above into yet *another* dynamical system, which we will call the \"scenario_1_setpoint_generator\"." ] }, { "cell_type": "markdown", "id": "57b13629", "metadata": { "lines_to_next_cell": 0 }, "source": [ "\n", "
\n", "\n", "
\n", "\n", ":::{note}\n", "The blue box represents wrapped dynamical system blocks. Internally, it contains two subsystems that communicate via signals, but externally it presents a simpler interface\n", ":::\n" ] }, { "cell_type": "code", "execution_count": null, "id": "4daaeac1", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "\n", "#### ARROWS IN ####\n", "# None\n", "\n", "#### PARAMETERS ############\n", "Sk = {\n", " \"lk\": Dict,\n", " \"sk\": float,\n", " \"rk\": float,\n", "} # scenario 1 state and setpoint generator state w/output\n", "\n", "tk = float # current sim time\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": float,\n", " \"max_rk\": float,\n", " \"final_rk\": float,\n", " \"time_at_which_we_ramp_up\": float,\n", " \"time_at_max_rk\": float,\n", "}\n", "\n", "#### ARROWS OUT ####\n", "rk = int\n", "\n", "# ---- $(f,h)$-representation --------------\n", "from typing import Dict\n", "\n", "\n", "def scen_1_setpoint_block_f(Sk: Dict, tk: float, **scen_1_const_params) -> Dict:\n", " \"\"\"\n", " Evolve wrapped scenario 1 setpoint system.\n", "\n", " Internally steps scenario_1_block and setpoint_block in series.\n", " \"\"\"\n", " # extract elements of state\n", " lk = Sk[\"lk\"]\n", " sk = Sk[\"sk\"]\n", "\n", " # --- Literally just copy-pasted ---\n", "\n", " lk_next, bk = scenario_1_block.step(\n", " params={\n", " \"lk\": lk,\n", " \"sk\": sk,\n", " \"tk\": tk,\n", " **scen_1_const_params,\n", " }\n", " )\n", " sk_next, rk = setpoint_block.step(params={\"sk\": sk, \"bk\": bk})\n", "\n", " # ------\n", "\n", " # update state dictionary\n", " Sk[\"lk\"] = lk_next\n", " Sk[\"sk\"] = sk_next\n", " Sk[\"rk\"] = rk\n", "\n", " Sk_next = Sk\n", " return Sk_next\n", "\n", "\n", "def scen_1_setpoint_block_h(Sk: Dict) -> float:\n", " \"\"\"Extract setpoint reference from wrapped system state.\"\"\"\n", " return Sk[\"rk\"]\n", "\n", "\n", "scen_1_setpoint_block = DynamicalSystem(\n", " f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h\n", ")" ] }, { "cell_type": "markdown", "id": "eed988ba", "metadata": {}, "source": [ "We now have a wonderfully minimal interface to play with!" ] }, { "cell_type": "code", "execution_count": null, "id": "b09db950", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# ---- SCENARIO 1 SETPOINT BLOCK -------------------\n", "\n", "#### PARAMETERS ####\n", "\n", "Sk = {\n", " \"lk\": {\n", " \"button_mode\": \"neutral\",\n", " \"time_since_hit_max_rk\": 0,\n", " \"bk\": 0,\n", " },\n", " \"sk\": 20,\n", " \"rk\": None,\n", "}\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": 3,\n", " \"max_rk\": 80,\n", " \"final_rk\": 20,\n", " \"time_at_which_we_ramp_up\": 5,\n", " \"time_at_max_rk\": 5,\n", "}\n", "\n", "# ----------------------------------------------\n", "\n", "### SIMULATION VARIABLES ############\n", "rk_hist = []\n", "dt = 1\n", "sim_time = np.arange(0, 60, dt)\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", " Sk_next, rk = scen_1_setpoint_block.step(\n", " params={\"Sk\": Sk, \"tk\": tk, **scen_1_const_params}\n", " )\n", " Sk = Sk_next\n", "\n", " rk_hist.append(rk)" ] }, { "cell_type": "code", "execution_count": null, "id": "7ba23ce5", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Plot\n", "plt.plot(sim_time, rk_hist)\n", "plt.xlabel(\"time (seconds)\")\n", "plt.ylabel(\"rk (setpoint speed)\")\n", "plt.title(\"Setpoint Generator with Button (Wrapped System)\")\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(\"✓ Wrapped system produces the same result with much simpler code!\")" ] }, { "cell_type": "markdown", "id": "ac9ff6de", "metadata": {}, "source": [ "### Block 2: PID\n", "\n", "
\n", "\n", "
\n", "\n", "The [PID controller](../../algorithm_library/pid_pykal.ipynb) compares the setpoint to the estimated state and computes a control command to minimize tracking error. To see the derivation of the evolution and output functions, click on the notebook link in the preceding sentence." ] }, { "cell_type": "code", "execution_count": null, "id": "0e3e12ad", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "from typing import List\n", "\n", "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "\n", "#### ARROWS IN ####\n", "# None\n", "\n", "#### PARAMETERS ############\n", "Sk = {\n", " \"lk\": Dict,\n", " \"sk\": float,\n", " \"rk\": float,\n", "} # scenario 1 state and setpoint generator state w/output\n", "\n", "tk = float # current sim time\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": float,\n", " \"max_rk\": float,\n", " \"final_rk\": float,\n", " \"time_at_which_we_ramp_up\": float,\n", " \"time_at_max_rk\": float,\n", "}\n", "\n", "#### ARROWS OUT ####\n", "rk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "scen_1_setpoint_block = DynamicalSystem(\n", " f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h\n", ")\n", "\n", "# ---- PID BLOCK -------------------\n", "\n", "#### ARROWS IN ####\n", "rk = float\n", "\n", "#### PARAMETERS ############\n", "ck = List # controller state ck = [ek, Ik, ek_prev]\n", "rk = float # reference speed\n", "xhatk = float # state estimate\n", "pid_const_params = {\n", " \"K_P\": float, # Proportional gain\n", " \"K_I\": float, # Integral gain (eliminates steady-state error)\n", " \"K_D\": float, # Derivative gain (reduces overshoot)\n", "}\n", "\n", "#### ARROWS OUT ####\n", "uk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "from pykal.algorithm_library.controllers import pid\n", "\n", "pid_block = DynamicalSystem(f=pid.f, h=pid.h)" ] }, { "cell_type": "markdown", "id": "626975c2", "metadata": {}, "source": [ "To get a feel for the PID controller, we create a simple mock system and simulate scenario_1." ] }, { "cell_type": "code", "execution_count": null, "id": "1fc8cadc", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "#### PARAMETERS #######\n", "Sk = {\n", " \"lk\": {\n", " \"button_mode\": \"neutral\",\n", " \"time_since_hit_max_rk\": 0,\n", " \"bk\": 0,\n", " },\n", " \"sk\": 20,\n", " \"rk\": None,\n", "}\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": 3,\n", " \"max_rk\": 80,\n", " \"final_rk\": 20,\n", " \"time_at_which_we_ramp_up\": 5,\n", " \"time_at_max_rk\": 5,\n", "}\n", "\n", "# ---- PID BLOCK -------------------\n", "\n", "#### PARAMETERS #############\n", "\n", "ck = (0.0, 0.0, 0.0) # (ek, Ik, ek_prev)\n", "pid_const_params = {\"KP\": 100.0, \"KI\": 10, \"KD\": 20}\n", "\n", "xhatk = Sk[\"sk\"] # our initial guess is spot on\n", "\n", "# --------------------------------------------------\n", "\n", "### SIMULATION VARIABLES ############\n", "dt = 1\n", "sim_time = np.arange(0, 60, dt)\n", "rk_hist = []\n", "uk_hist = []\n", "xhatk_hist = []\n", "error_hist = []\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", " Sk_next, rk = scen_1_setpoint_block.step(\n", " params={\"Sk\": Sk, \"tk\": tk, **scen_1_const_params}\n", " )\n", "\n", " ck_next, uk = pid_block.step(\n", " params={\"ck\": ck, \"rk\": rk, \"xhatk\": xhatk, **pid_const_params}\n", " )\n", "\n", " # the real plant dynamics come later\n", " xhatk += uk * dt * 0.01 # \"some delay\" via scaling factor\n", "\n", " rk_hist.append(rk)\n", " uk_hist.append(uk)\n", " xhatk_hist.append(xhatk)\n", " error_hist.append(ck[0]) # ck[0] is the current error\n", "\n", " Sk = Sk_next\n", " ck = ck_next" ] }, { "cell_type": "code", "execution_count": null, "id": "541ad0a5", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "fig, axs = plt.subplots(1, 3, figsize=(15, 4))\n", "\n", "# Plot 1: Setpoint and State Estimate\n", "axs[0].plot(sim_time, rk_hist, \"k--\", linewidth=2, label=\"Setpoint (r)\")\n", "axs[0].plot(sim_time, xhatk_hist, \"b-\", linewidth=1.5, label=\"State Estimate (x̂)\")\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[0].set_ylabel(\"Speed (mph)\", fontsize=11)\n", "axs[0].set_title(\"Setpoint Tracking\", fontsize=12, fontweight=\"bold\")\n", "axs[0].legend(loc=\"best\")\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Plot 2: Control Input\n", "axs[1].plot(sim_time, uk_hist, \"g-\", linewidth=1.5)\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1].set_ylabel(\"Control Input (u)\", fontsize=11)\n", "axs[1].set_title(\"Controller Output\", fontsize=12, fontweight=\"bold\")\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "# Plot 3: Tracking Error\n", "axs[2].plot(sim_time, error_hist, \"m-\", linewidth=1.5)\n", "axs[2].axhline(y=0, color=\"k\", linestyle=\":\", alpha=0.5)\n", "axs[2].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[2].set_ylabel(\"Error (mph)\", fontsize=11)\n", "axs[2].set_title(\"Tracking Error\", fontsize=12, fontweight=\"bold\")\n", "axs[2].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "0797ceac", "metadata": { "lines_to_next_cell": 0 }, "source": [ "Yikes! Looks like our PID gains are incorrect. Can you fix them? And if you're able to fix them, see what happens when you change some parameters for `scen_1_setpoint_block`. Is your fix robust?" ] }, { "cell_type": "markdown", "id": "60a5d024", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Block 3: Car (Plant)\n", "\n", "
\n", "\n", "
\n", "\n", "The car dynamics are modeled with **position and velocity** as the state, and linear drag acting on the velocity.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "68779583", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "\n", "#### ARROWS IN ####\n", "# None\n", "\n", "#### PARAMETERS ############\n", "Sk = {\n", " \"lk\": Dict,\n", " \"sk\": float,\n", " \"rk\": float,\n", "} # scenario 1 state and setpoint generator state w/output\n", "\n", "tk = float # current sim time\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": float,\n", " \"max_rk\": float,\n", " \"final_rk\": float,\n", " \"time_at_which_we_ramp_up\": float,\n", " \"time_at_max_rk\": float,\n", "}\n", "\n", "#### ARROWS OUT ####\n", "rk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "scen_1_setpoint_block = DynamicalSystem(\n", " f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h\n", ")\n", "\n", "# ---- PID BLOCK -------------------\n", "\n", "from typing import List\n", "\n", "#### ARROWS IN ####\n", "rk = float\n", "\n", "#### PARAMETERS ############\n", "ck = List # controller state ck = [ek, Ik, ek_prev]\n", "rk = float # reference speed\n", "xhatk = float # state estimate\n", "pid_const_params = {\n", " \"K_P\": float, # Proportional gain\n", " \"K_I\": float, # Integral gain (eliminates steady-state error)\n", " \"K_D\": float, # Derivative gain (reduces overshoot)\n", "}\n", "\n", "#### ARROWS OUT ####\n", "uk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "from pykal.algorithm_library.controllers import pid\n", "\n", "pid_block = DynamicalSystem(f=pid.f, h=pid.h)\n", "\n", "# ---- PLANT BLOCK -------------------\n", "from numpy.typing import NDArray\n", "\n", "#### ARROWS IN ####\n", "uk = float\n", "\n", "#### PARAMETERS ############\n", "pk = NDArray # pk.shape=(2,1), pk[0,0] = pos, pk[1,0] = vel\n", "uk = float\n", "car_const_params = {\"m\": float, \"b\": float, \"dt\": float} # mass # drag # dt (duh)\n", "\n", "#### ARROWS OUT ####\n", "yk = float # vel measurement\n", "\n", "\n", "# ---- $(f,h)$-representation --------------\n", "def plant_f(\n", " pk: NDArray,\n", " uk: float,\n", " m: float,\n", " b: float,\n", " dt: float,\n", ") -> NDArray:\n", " \"\"\"\n", " State evolution.\n", "\n", " Parameters\n", " ----------\n", " pk : (2, 1) ndarray\n", " State column vector [position; velocity].\n", "\n", " Returns\n", " -------\n", " (2, 1) ndarray\n", " Next state column vector.\n", " \"\"\"\n", "\n", " if pk.shape != (2, 1):\n", " raise ValueError(f\"pk must have shape (2,1); got {pk.shape}\")\n", "\n", " p_k, v_k = pk[0, 0], pk[1, 0]\n", "\n", " p_next = p_k + dt * v_k\n", " v_next = v_k + dt * (-b / m * v_k + uk / m)\n", "\n", " return np.array([[p_next], [v_next]])\n", "\n", "\n", "def plant_h(\n", " pk: NDArray,\n", ") -> NDArray:\n", " \"\"\"\n", " Measurement model.\n", "\n", " Parameters\n", " ----------\n", " pk : (2, 1) ndarray\n", " State column vector [position; velocity].\n", "\n", " Returns\n", " -------\n", " (1, 1) ndarray\n", " Velocity measurement.\n", " \"\"\"\n", "\n", " if pk.shape != (2, 1):\n", " raise ValueError(f\"pk must have shape (2,1); got {pk.shape}\")\n", "\n", " v_k = pk[1, 0]\n", "\n", " return v_k\n", "\n", "\n", "plant_block = DynamicalSystem(f=plant_f, h=plant_h)" ] }, { "cell_type": "markdown", "id": "ed43698b", "metadata": {}, "source": [ "We can now update our previous PID controller simulation with the actual system we're controlling!" ] }, { "cell_type": "code", "execution_count": null, "id": "f091f578", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "import numpy as np\n", "\n", "rng = np.random.default_rng()\n", "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "#### PARAMETERS #######\n", "Sk = {\n", " \"lk\": {\n", " \"button_mode\": \"neutral\",\n", " \"time_since_hit_max_rk\": 0,\n", " \"bk\": 0,\n", " },\n", " \"sk\": 20,\n", " \"rk\": None,\n", "}\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": 3,\n", " \"max_rk\": 80,\n", " \"final_rk\": 20,\n", " \"time_at_which_we_ramp_up\": 5,\n", " \"time_at_max_rk\": 5,\n", "}\n", "\n", "# ---- PID BLOCK -------------------\n", "\n", "#### PARAMETERS #############\n", "\n", "ck = (0.0, 0.0, 0.0) # (ek, Ik, ek_prev)\n", "pid_const_params = {\"KP\": 100.0, \"KI\": 10, \"KD\": 20}\n", "\n", "xhatk = Sk[\"sk\"] # our initial guess is spot on\n", "\n", "# ---------PLANT BLOCK---------------\n", "pk = np.array([[0], [20]])\n", "car_const_params = {\"m\": 1500, \"b\": 50, \"dt\": 1}\n", "\n", "### SIMULATION VARIABLES ############\n", "dt = 1\n", "sim_time = np.arange(0, 60, dt)\n", "\n", "Qk = np.array([[1e-1, 0], [0, 2e-1]]) # add process and measurement noise\n", "Rk = np.array([[3]])\n", "\n", "rk_hist = []\n", "uk_hist = []\n", "error_hist = []\n", "xhatk_hist = []\n", "xk_hist = [] # true velocity\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", " Sk_next, rk = scen_1_setpoint_block.step(\n", " params={\"Sk\": Sk, \"tk\": tk, **scen_1_const_params}\n", " )\n", "\n", " ck_next, uk = pid_block.step(\n", " params={\"ck\": ck, \"rk\": rk, \"xhatk\": xhatk, **pid_const_params}\n", " )\n", "\n", " pk_next, yk = plant_block.step(params={\"pk\": pk, \"uk\": uk, **car_const_params})\n", "\n", " # Apply process noise and measurement noise\n", " process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(\n", " -1, 1\n", " ) # (2,1) vector\n", " measurement_noise = rng.multivariate_normal(mean=np.zeros(Rk.shape[0]), cov=Rk)[\n", " 0\n", " ] # scalar\n", "\n", " pk_next = pk_next + process_noise\n", " yk = yk + measurement_noise\n", "\n", " xhatk = yk # no smoothing; our measurement is raw\n", "\n", " rk_hist.append(rk)\n", " uk_hist.append(uk)\n", " xhatk_hist.append(xhatk)\n", " xk_hist.append(pk[1, 0]) # true velocity\n", " error_hist.append(ck[0])\n", "\n", " Sk = Sk_next\n", " ck = ck_next\n", " pk = pk_next" ] }, { "cell_type": "code", "execution_count": null, "id": "9ec3d38a", "metadata": { "lines_to_next_cell": 0, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "fig, axs = plt.subplots(1, 3, figsize=(15, 4))\n", "\n", "# Plot 1: Setpoint and State Estimate\n", "axs[0].plot(sim_time, rk_hist, \"k--\", linewidth=2, label=\"Setpoint (r)\")\n", "axs[0].plot(sim_time, xhatk_hist, \"b-\", linewidth=1.5, label=\"State Estimate (x̂)\")\n", "axs[0].plot(\n", " sim_time, xk_hist, \"r-\", linewidth=1.5, alpha=0.5, label=\"True Velocity (x)\"\n", ")\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[0].set_ylabel(\"Speed (mph)\", fontsize=11)\n", "axs[0].set_title(\"Setpoint Tracking\", fontsize=12, fontweight=\"bold\")\n", "axs[0].legend(loc=\"best\")\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Plot 2: Control Input\n", "axs[1].plot(sim_time, uk_hist, \"g-\", linewidth=1.5)\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1].set_ylabel(\"Control Input (u)\", fontsize=11)\n", "axs[1].set_title(\"Controller Output\", fontsize=12, fontweight=\"bold\")\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "# Plot 3: Tracking Error\n", "axs[2].plot(sim_time, error_hist, \"m-\", linewidth=1.5)\n", "axs[2].axhline(y=0, color=\"k\", linestyle=\":\", alpha=0.5)\n", "axs[2].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[2].set_ylabel(\"Error (mph)\", fontsize=11)\n", "axs[2].set_title(\"Tracking Error\", fontsize=12, fontweight=\"bold\")\n", "axs[2].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "ad2f9ef7", "metadata": { "lines_to_next_cell": 0 }, "source": [ "Alas! Can you tune the gains here? And how does changing the mass or drag of the car affect the optimal gains?" ] }, { "cell_type": "markdown", "id": "2a91efd2", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Block 4: KF (Observer)\n", "\n", "
\n", "\n", "
\n", "\n", "The [Kalman filter](../../algorithm_library/kf_pykal.ipynb) estimates the car's velocity by fusing the motion model with noisy measurements.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5488c37c", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "\n", "#### ARROWS IN ####\n", "# None\n", "\n", "#### PARAMETERS ############\n", "Sk = {\n", " \"lk\": Dict,\n", " \"sk\": float,\n", " \"rk\": float,\n", "} # scenario 1 state and setpoint generator state w/output\n", "\n", "tk = float # current sim time\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": float,\n", " \"max_rk\": float,\n", " \"final_rk\": float,\n", " \"time_at_which_we_ramp_up\": float,\n", " \"time_at_max_rk\": float,\n", "}\n", "\n", "#### ARROWS OUT ####\n", "rk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "scen_1_setpoint_block = DynamicalSystem(\n", " f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h\n", ")\n", "\n", "# ---- PID BLOCK -------------------\n", "\n", "from typing import List\n", "\n", "#### ARROWS IN ####\n", "rk = float\n", "\n", "#### PARAMETERS ############\n", "ck = List # controller state ck = [ek, Ik, ek_prev]\n", "rk = float # reference speed\n", "xhatk = float # state estimate\n", "pid_const_params = {\n", " \"K_P\": float, # Proportional gain\n", " \"K_I\": float, # Integral gain (eliminates steady-state error)\n", " \"K_D\": float, # Derivative gain (reduces overshoot)\n", "}\n", "\n", "#### ARROWS OUT ####\n", "uk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "from pykal.algorithm_library.controllers import pid\n", "\n", "pid_block = DynamicalSystem(f=pid.f, h=pid.h)\n", "\n", "# ---- PLANT BLOCK -------------------\n", "from numpy.typing import NDArray\n", "\n", "#### ARROWS IN ####\n", "uk = float\n", "\n", "#### PARAMETERS ############\n", "pk = NDArray # pk.shape=(2,1), pk[0,0] = pos, pk[1,0] = vel\n", "uk = float\n", "car_const_params = {\"m\": float, \"b\": float, \"dt\": float} # mass # drag # dt (duh)\n", "\n", "#### ARROWS OUT ####\n", "yk = float # vel measurement\n", "\n", "\n", "# ---- $(f,h)$-representation --------------\n", "def plant_f(\n", " pk: NDArray,\n", " uk: float,\n", " m: float,\n", " b: float,\n", " dt: float,\n", ") -> NDArray:\n", " \"\"\"\n", " State evolution.\n", "\n", " Parameters\n", " ----------\n", " pk : (2, 1) ndarray\n", " State column vector [position; velocity].\n", "\n", " Returns\n", " -------\n", " (2, 1) ndarray\n", " Next state column vector.\n", " \"\"\"\n", "\n", " if pk.shape != (2, 1):\n", " raise ValueError(f\"pk must have shape (2,1); got {pk.shape}\")\n", "\n", " p_k, v_k = pk[0, 0], pk[1, 0]\n", "\n", " p_next = p_k + dt * v_k\n", " v_next = v_k + dt * (-b / m * v_k + uk / m)\n", "\n", " return np.array([[p_next], [v_next]])\n", "\n", "\n", "def plant_h(\n", " pk: NDArray,\n", ") -> NDArray:\n", " \"\"\"\n", " Measurement model.\n", "\n", " Parameters\n", " ----------\n", " pk : (2, 1) ndarray\n", " State column vector [position; velocity].\n", "\n", " Returns\n", " -------\n", " (1, 1) ndarray\n", " Velocity measurement.\n", " \"\"\"\n", "\n", " if pk.shape != (2, 1):\n", " raise ValueError(f\"pk must have shape (2,1); got {pk.shape}\")\n", "\n", " v_k = pk[1, 0]\n", "\n", " return v_k\n", "\n", "\n", "plant_block = DynamicalSystem(f=plant_f, h=plant_h)\n", "\n", "# ---- KF BLOCK -------------------\n", "from numpy.typing import NDArray\n", "from typing import Callable\n", "\n", "#### ARROWS IN ####\n", "uk = float\n", "yk = NDArray # (since kf expects an array and yk is a float from plant_block, we will recast yk when we call the kf block)\n", "\n", "#### PARAMETERS ############\n", "zk = List # zk[0]=pk, zk[1] = Pk (covariance matrix)\n", "\n", "kf_const_params = {\n", " \"f\": Callable, # plant evolution function\n", " \"h\": Callable, # plant output function\n", " \"Fk\": NDArray, # plant evolution function Jacobian\n", " \"Hk\": NDArray, # plant output function Jacobian\n", " \"Qk\": NDArray, # process noise covariance matrix\n", " \"Rk\": NDArray, # measurement noise covariance matrix\n", "}\n", "\n", "f_h_params = Dict\n", "\n", "\n", "#### ARROWS OUT ####\n", "xhatk = NDArray # vel prediction measurement\n", "\n", "\n", "# ---- $(f,h)$-representation --------------\n", "\n", "from pykal.algorithm_library.estimators import kf\n", "\n", "kf_block = DynamicalSystem(f=kf.f, h=kf.h)" ] }, { "cell_type": "markdown", "id": "ac7a8cfe", "metadata": { "lines_to_next_cell": 0 }, "source": [ "We can now simulate the full system." ] }, { "cell_type": "code", "execution_count": null, "id": "8bdae25b", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "import numpy as np\n", "\n", "rng = np.random.default_rng()\n", "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "#### PARAMETERS #######\n", "Sk = {\n", " \"lk\": {\n", " \"button_mode\": \"neutral\",\n", " \"time_since_hit_max_rk\": 0,\n", " \"bk\": 0,\n", " },\n", " \"sk\": 20,\n", " \"rk\": None,\n", "}\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": 3,\n", " \"max_rk\": 80,\n", " \"final_rk\": 20,\n", " \"time_at_which_we_ramp_up\": 5,\n", " \"time_at_max_rk\": 5,\n", "}\n", "\n", "# ---- PID BLOCK -------------------\n", "\n", "#### PARAMETERS #############\n", "\n", "ck = (0.0, 0.0, 0.0) # (ek, Ik, ek_prev)\n", "pid_const_params = {\"KP\": 100.0, \"KI\": 10, \"KD\": 20}\n", "\n", "xhatk = Sk[\"sk\"] # our initial guess is spot on\n", "\n", "# ---------PLANT BLOCK---------------\n", "#### PARAMETERS #############\n", "pk = np.array([[0], [20]])\n", "car_const_params = {\"m\": 1500, \"b\": 50, \"dt\": 1}\n", "\n", "# ---------KF BLOCK---------------\n", "#### PARAMETERS #############\n", "zk = [\n", " pk, # [xhatk,Pk] - full state estimate [position, velocity]\n", " np.array([[0.5, 0.0], [0.0, 1.0]]),\n", "]\n", "\n", "\n", "def plant_F(**car_const_params) -> np.ndarray:\n", " \"\"\"\n", " Compute Jacobian of plant_f with respect to state x = [x_k, v_k].\n", " \"\"\"\n", " m = car_const_params[\"m\"]\n", " b = car_const_params[\"b\"]\n", " dt = car_const_params[\"dt\"]\n", " return np.array(\n", " [\n", " [1.0, dt],\n", " [0.0, 1.0 - (b / m) * dt],\n", " ]\n", " )\n", "\n", "\n", "def plant_H() -> np.ndarray:\n", " \"\"\"\n", " Compute Jacobian of plant_h with respect to state x = [x_k, v_k].\n", " \"\"\"\n", " return np.array([[0.0, 1.0]])\n", "\n", "\n", "Fk = plant_F(**car_const_params)\n", "Hk = plant_H()\n", "Qk = np.array([[1e-1, 0], [0, 2e-1]])\n", "Rk = np.array([[3]])\n", "\n", "kf_const_params = {\n", " \"f\": plant_block.f,\n", " \"h\": plant_block.h,\n", " \"Fk\": Fk,\n", " \"Hk\": Hk,\n", "}\n", "# ----------------------------------------\n", "\n", "### SIMULATION VARIABLES ############\n", "dt = 1\n", "sim_time = np.arange(0, 60, dt)\n", "\n", "Qk = np.array([[1e-1, 0], [0, 2e-1]]) # add process and measurement noise\n", "Rk = np.array([[3]])\n", "\n", "rk_hist = []\n", "uk_hist = []\n", "error_hist = []\n", "xhatk_hist = []\n", "xk_hist = []\n", "yk_hist = []\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", " Sk_next, rk = scen_1_setpoint_block.step(\n", " params={\"Sk\": Sk, \"tk\": tk, **scen_1_const_params}\n", " )\n", "\n", " ck_next, uk = pid_block.step(\n", " params={\"ck\": ck, \"rk\": rk, \"xhatk\": xhatk, **pid_const_params}\n", " )\n", "\n", " pk_next, yk = plant_block.step(params={\"pk\": pk, \"uk\": uk, **car_const_params})\n", "\n", " # Apply process noise and measurement noise\n", " process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(\n", " -1, 1\n", " ) # (2,1) vector\n", " measurement_noise = rng.multivariate_normal(mean=np.zeros(Rk.shape[0]), cov=Rk)[\n", " 0\n", " ] # scalar\n", "\n", " pk_next = pk_next + process_noise\n", " yk = yk + measurement_noise\n", "\n", " zk_next, phatk = kf_block.step(\n", " params={\n", " \"zk\": zk,\n", " \"uk\": uk,\n", " \"yk\": np.array([[yk]]),\n", " \"Qk\": Qk,\n", " \"Rk\": Rk,\n", " **kf_const_params,\n", " \"f_h_params\": {\n", " \"pk\": pk,\n", " \"uk\": uk,\n", " **car_const_params,\n", " },\n", " }\n", " )\n", "\n", " xhatk = phatk[1, 0]\n", "\n", " rk_hist.append(rk)\n", " uk_hist.append(uk)\n", " xhatk_hist.append(xhatk)\n", " xk_hist.append(pk[1, 0]) # true velocity\n", " error_hist.append(ck[0])\n", " yk_hist.append(yk)\n", "\n", " Sk = Sk_next\n", " ck = ck_next\n", " pk = pk_next\n", " zk = zk_next" ] }, { "cell_type": "code", "execution_count": null, "id": "d46d14f7", "metadata": { "lines_to_next_cell": 0, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "fig, axs = plt.subplots(1, 3, figsize=(15, 4))\n", "\n", "# Plot 1: Setpoint and State Estimate\n", "axs[0].plot(sim_time, rk_hist, \"k--\", linewidth=2, label=\"Setpoint (r)\")\n", "axs[0].plot(sim_time, xhatk_hist, \"b-\", linewidth=1.5, label=\"State Estimate (x̂)\")\n", "axs[0].plot(\n", " sim_time, xk_hist, \"r-\", linewidth=1.5, alpha=0.5, label=\"True Velocity (x)\"\n", ")\n", "axs[0].scatter(\n", " sim_time[::10], yk_hist[::10], c=\"gray\", s=10, alpha=0.3, label=\"Measurements (y)\"\n", ")\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[0].set_ylabel(\"Speed (mph)\", fontsize=11)\n", "axs[0].set_title(\"Velocity Tracking\", fontsize=12, fontweight=\"bold\")\n", "axs[0].legend(loc=\"best\")\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Plot 2: Control Input\n", "axs[1].plot(sim_time, uk_hist, \"g-\", linewidth=1.5)\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1].set_ylabel(\"Control Input (u)\", fontsize=11)\n", "axs[1].set_title(\"Controller Output\", fontsize=12, fontweight=\"bold\")\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "# Plot 3: Tracking Error\n", "axs[2].plot(sim_time, error_hist, \"m-\", linewidth=1.5)\n", "axs[2].axhline(y=0, color=\"k\", linestyle=\":\", alpha=0.5)\n", "axs[2].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[2].set_ylabel(\"Error (mph)\", fontsize=11)\n", "axs[2].set_title(\"Tracking Error\", fontsize=12, fontweight=\"bold\")\n", "axs[2].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "ff9e78bf", "metadata": {}, "source": [ "### Wrapping the Complete System\n", "\n", "Just as we wrapped the button logic into a `DynamicalSystem` earlier, we can wrap the **entire cruise control system** into a single `DynamicalSystem` object. All system parameters can be configured at initialization, and the simulation loop becomes trivial." ] }, { "cell_type": "markdown", "id": "f1869f5b", "metadata": { "lines_to_next_cell": 0 }, "source": [ "
\n", "\n", "
\n", "\n", "The diagram above shows the complete cruise control system in two views. The **top** view shows all five subsystems (Button, Setpoint, PID, Plant, and Observer) wrapped inside a blue box, with all internal signals visible. The **bottom** view shows the same system from the outside—a single `DynamicalSystem` block with a clean interface that accepts parameters and returns the desired variables.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "6ec18fe1", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# --------- CRUISE CONTROL BLOCK---------------\n", "#### ARROWS IN ####\n", "# None\n", "\n", "#### PARAMETERS #############\n", "Ck = {\n", " \"Sk\": Dict,\n", " \"zk\": List,\n", " \"pk\": NDArray,\n", " \"ck\": List,\n", " \"xhatk\": float,\n", " \"rk\": float,\n", " \"uk\": float,\n", " \"xk\": float,\n", " \"err\": float,\n", " \"yk\": float,\n", "} # whole system state\n", "tk = float # current time\n", "scen_1_const_params = Dict\n", "car_const_params = Dict\n", "kf_const_params = Dict\n", "pid_const_params = Dict\n", "Qk = NDArray\n", "Rk = NDArray\n", "\n", "#### ARROWS OUT ####\n", "xhatk = float\n", "\n", "# ---- $(f,h)$-representation --------------\n", "\n", "\n", "def cruise_control_f(\n", " Ck: Dict,\n", " tk: float,\n", " Qk: NDArray,\n", " Rk: NDArray,\n", " scen_1_const_params: Dict,\n", " pid_const_params: Dict,\n", " car_const_params: Dict,\n", " kf_const_params: Dict,\n", ") -> Dict:\n", " Sk = Ck[\"Sk\"]\n", " zk = Ck[\"zk\"]\n", " pk = Ck[\"pk\"]\n", " ck = Ck[\"ck\"]\n", " xhatk = Ck[\"xhatk\"]\n", "\n", " Sk_next, rk = scen_1_setpoint_block.step(\n", " params={\"Sk\": Sk, \"tk\": tk, **scen_1_const_params}\n", " )\n", "\n", " ck_next, uk = pid_block.step(\n", " params={\"ck\": ck, \"rk\": rk, \"xhatk\": xhatk, **pid_const_params}\n", " )\n", "\n", " pk_next, yk = plant_block.step(params={\"pk\": pk, \"uk\": uk, **car_const_params})\n", "\n", " # Apply process noise and measurement noise\n", " process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(\n", " -1, 1\n", " ) # (2,1) vector\n", " measurement_noise = rng.multivariate_normal(mean=np.zeros(Rk.shape[0]), cov=Rk)[\n", " 0\n", " ] # scalar\n", "\n", " pk_next = pk_next + process_noise\n", " yk = yk + measurement_noise\n", "\n", " zk_next, phatk = kf_block.step(\n", " params={\n", " \"zk\": zk,\n", " \"uk\": uk,\n", " \"yk\": np.array([[yk]]),\n", " \"Qk\": Qk,\n", " \"Rk\": Rk,\n", " **kf_const_params,\n", " \"f_h_params\": {\n", " \"pk\": pk,\n", " \"uk\": uk,\n", " **car_const_params,\n", " },\n", " }\n", " )\n", "\n", " xhatk = phatk[1, 0]\n", "\n", " Ck_next = Ck\n", " Ck_next[\"Sk\"] = Sk_next\n", " Ck_next[\"zk\"] = zk_next\n", " Ck_next[\"pk\"] = pk_next\n", " Ck_next[\"ck\"] = ck_next\n", " Ck_next[\"xhatk\"] = xhatk\n", " Ck_next[\"rk\"] = rk\n", " Ck_next[\"uk\"] = uk\n", " Ck_next[\"xk\"] = pk[1, 0]\n", " Ck_next[\"err\"] = ck[0]\n", " Ck_next[\"yk\"] = yk\n", "\n", " return Ck_next\n", "\n", "\n", "def cruise_control_h(Ck: Dict) -> Dict:\n", " output_dict = {\n", " \"xhatk\": Ck[\"xhatk\"],\n", " \"rk\": Ck[\"rk\"],\n", " \"uk\": Ck[\"uk\"],\n", " \"xk\": Ck[\"xk\"],\n", " \"err\": Ck[\"err\"],\n", " \"yk\": Ck[\"yk\"],\n", " }\n", " return output_dict\n", "\n", "\n", "cruise_control_block = DynamicalSystem(f=cruise_control_f, h=cruise_control_h)" ] }, { "cell_type": "markdown", "id": "0f4d747a", "metadata": {}, "source": [ "With the state as we've defined it, initialization might seem tricky. Here we introduce a neat trick to make initialization trivial: we can run the previous simulation once and set the appropriate values in $C_k$!" ] }, { "cell_type": "code", "execution_count": null, "id": "d70ef896", "metadata": { "lines_to_next_cell": 0, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "import numpy as np\n", "\n", "rng = np.random.default_rng()\n", "# ---- SCENARIO 1 SETPOINTBLOCK -------------------\n", "#### PARAMETERS #######\n", "Sk = {\n", " \"lk\": {\n", " \"button_mode\": \"neutral\",\n", " \"time_since_hit_max_rk\": 0,\n", " \"bk\": 0,\n", " },\n", " \"sk\": 20,\n", " \"rk\": None,\n", "}\n", "\n", "scen_1_const_params = {\n", " \"button_rate\": 3,\n", " \"max_rk\": 80,\n", " \"final_rk\": 20,\n", " \"time_at_which_we_ramp_up\": 5,\n", " \"time_at_max_rk\": 5,\n", "}\n", "\n", "# ---- PID BLOCK -------------------\n", "\n", "#### PARAMETERS #############\n", "\n", "ck = (0.0, 0.0, 0.0) # (ek, Ik, ek_prev)\n", "pid_const_params = {\"KP\": 100.0, \"KI\": 10, \"KD\": 20}\n", "\n", "xhatk = Sk[\"sk\"] # our initial guess is spot on\n", "\n", "# ---------PLANT BLOCK---------------\n", "#### PARAMETERS #############\n", "pk = np.array([[0], [20]])\n", "car_const_params = {\"m\": 1500, \"b\": 50, \"dt\": 1}\n", "\n", "# ---------KF BLOCK---------------\n", "#### PARAMETERS #############\n", "zk = [\n", " pk, # [xhatk,Pk] - full state estimate [position, velocity]\n", " np.array([[0.5, 0.0], [0.0, 1.0]]),\n", "]\n", "\n", "\n", "def plant_F(**car_const_params) -> np.ndarray:\n", " \"\"\"\n", " Compute Jacobian of plant_f with respect to state x = [x_k, v_k].\n", " \"\"\"\n", " m = car_const_params[\"m\"]\n", " b = car_const_params[\"b\"]\n", " dt = car_const_params[\"dt\"]\n", " return np.array(\n", " [\n", " [1.0, dt],\n", " [0.0, 1.0 - (b / m) * dt],\n", " ]\n", " )\n", "\n", "\n", "def plant_H() -> np.ndarray:\n", " \"\"\"\n", " Compute Jacobian of plant_h with respect to state x = [x_k, v_k].\n", " \"\"\"\n", " return np.array([[0.0, 1.0]])\n", "\n", "\n", "Fk = plant_F(**car_const_params)\n", "Hk = plant_H()\n", "Qk = np.array([[1e-1, 0], [0, 2e-1]])\n", "Rk = np.array([[3]])\n", "\n", "kf_const_params = {\n", " \"f\": plant_block.f,\n", " \"h\": plant_block.h,\n", " \"Fk\": Fk,\n", " \"Hk\": Hk,\n", "}\n", "### SIMULATION VARIABLES ############\n", "dt = 1\n", "sim_time = np.arange(0, 1, dt) # just tk = 0\n", "\n", "for tk in sim_time:\n", " Sk_next, rk = scen_1_setpoint_block.step(\n", " params={\"Sk\": Sk, \"tk\": tk, **scen_1_const_params}\n", " )\n", "\n", " ck_next, uk = pid_block.step(\n", " params={\"ck\": ck, \"rk\": rk, \"xhatk\": xhatk, **pid_const_params}\n", " )\n", "\n", " pk_next, yk = plant_block.step(params={\"pk\": pk, \"uk\": uk, **car_const_params})\n", "\n", " # Apply process noise and measurement noise\n", " process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(\n", " -1, 1\n", " ) # (2,1) vector\n", " measurement_noise = rng.multivariate_normal(mean=np.zeros(Rk.shape[0]), cov=Rk)[\n", " 0\n", " ] # scalar\n", "\n", " pk_next = pk_next + process_noise\n", " yk = yk + measurement_noise\n", "\n", " zk_next, phatk = kf_block.step(\n", " params={\n", " \"zk\": zk,\n", " \"uk\": uk,\n", " \"yk\": np.array([[yk]]),\n", " \"Qk\": Qk,\n", " \"Rk\": Rk,\n", " **kf_const_params,\n", " \"f_h_params\": {\n", " \"pk\": pk,\n", " \"uk\": uk,\n", " **car_const_params,\n", " },\n", " }\n", " )\n", "\n", " xhatk = phatk[1, 0]\n", "\n", " # initialize Ck\n", " Ck = {\n", " \"Sk\": Sk,\n", " \"zk\": zk,\n", " \"pk\": pk,\n", " \"ck\": ck,\n", " \"xhatk\": xhatk,\n", " \"rk\": rk,\n", " \"uk\": uk,\n", " \"xk\": pk[1, 0],\n", " \"err\": ck[0],\n", " \"yk\": yk,\n", " }\n", "\n", "# %%[markdown]\n", "# Now we can simulate the entire system within a single dynamical system." ] }, { "cell_type": "code", "execution_count": null, "id": "8db829e2", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "rng = np.random.default_rng()\n", "\n", "### SIMULATION VARIABLES ############\n", "dt = 1\n", "sim_time = np.arange(0, 60, dt)\n", "\n", "Qk = np.array([[1e-1, 0], [0, 2e-1]]) # add process and measurement noise\n", "Rk = np.array([[3]])\n", "\n", "rk_hist = []\n", "uk_hist = []\n", "error_hist = []\n", "xhatk_hist = []\n", "xk_hist = []\n", "yk_hist = []\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", " Ck_next, output_dict = cruise_control_block.step(\n", " params={\n", " \"Ck\": Ck,\n", " \"tk\": tk,\n", " \"Qk\": Qk,\n", " \"Rk\": Rk,\n", " \"scen_1_const_params\": scen_1_const_params,\n", " \"pid_const_params\": pid_const_params,\n", " \"car_const_params\": car_const_params,\n", " \"kf_const_params\": kf_const_params,\n", " }\n", " )\n", "\n", " rk_hist.append(output_dict[\"rk\"])\n", " uk_hist.append(output_dict[\"uk\"])\n", " xhatk_hist.append(output_dict[\"xhatk\"])\n", " xk_hist.append(output_dict[\"xk\"]) # true velocity\n", " error_hist.append(output_dict[\"err\"])\n", " yk_hist.append(output_dict[\"yk\"])" ] }, { "cell_type": "code", "execution_count": null, "id": "0ccf3b51", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "fig, axs = plt.subplots(1, 3, figsize=(15, 4))\n", "\n", "# Plot 1: Setpoint and State Estimate\n", "axs[0].plot(sim_time, rk_hist, \"k--\", linewidth=2, label=\"Setpoint (r)\")\n", "axs[0].plot(sim_time, xhatk_hist, \"b-\", linewidth=1.5, label=\"State Estimate (x̂)\")\n", "axs[0].plot(\n", " sim_time, xk_hist, \"r-\", linewidth=1.5, alpha=0.5, label=\"True Velocity (x)\"\n", ")\n", "axs[0].scatter(\n", " sim_time[::10], yk_hist[::10], c=\"gray\", s=10, alpha=0.3, label=\"Measurements (y)\"\n", ")\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[0].set_ylabel(\"Speed (mph)\", fontsize=11)\n", "axs[0].set_title(\"Velocity Tracking\", fontsize=12, fontweight=\"bold\")\n", "axs[0].legend(loc=\"best\")\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Plot 2: Control Input\n", "axs[1].plot(sim_time, uk_hist, \"g-\", linewidth=1.5)\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1].set_ylabel(\"Control Input (u)\", fontsize=11)\n", "axs[1].set_title(\"Controller Output\", fontsize=12, fontweight=\"bold\")\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "# Plot 3: Tracking Error\n", "axs[2].plot(sim_time, error_hist, \"m-\", linewidth=1.5)\n", "axs[2].axhline(y=0, color=\"k\", linestyle=\":\", alpha=0.5)\n", "axs[2].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[2].set_ylabel(\"Error (mph)\", fontsize=11)\n", "axs[2].set_title(\"Tracking Error\", fontsize=12, fontweight=\"bold\")\n", "axs[2].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "b512359d", "metadata": {}, "source": [ "## Experimentation\n", "\n", "**Controller tuning**:\n", "- Adjust PID gains (KP, KI, KD) to reduce overshoot\n", "- What happens if you increase KP too much? (oscillations)\n", "- What happens if KD is too small? (slow settling)\n", "- Find optimal gains for minimal overshoot and settling time\n", "\n", "**Setpoint policy**:\n", "- Change button policy parameters (button_rate, hold_seconds_at_max)\n", "- Try different target speeds and ramp rates\n", "- Design a smooth sinusoidal setpoint trajectory\n", "\n", "**Plant variations**:\n", "- Modify physical parameters (m, b) to simulate different vehicles\n", " - Heavy truck: m=5000 kg, b=100 kg/s\n", " - Sports car: m=1000 kg, b=30 kg/s\n", "- Add measurement noise to make the problem more realistic\n", "- Tune Kalman filter covariances (Q, R) for different noise scenarios\n", "\n", "**Challenge**: Can you design PID gains that work well for both a heavy truck and a sports car?" ] }, { "cell_type": "markdown", "id": "94a1c4ca", "metadata": {}, "source": [ "[← Control Systems as Dynamical Systems](../../../getting_started/theory_to_python/control_systems_as_dynamical_systems.rst)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }