{ "cells": [ { "cell_type": "markdown", "id": "30d19eb0", "metadata": {}, "source": [ "[← Modules](../../../getting_started/theory_to_python/modules.rst)" ] }, { "cell_type": "markdown", "id": "120f94dd", "metadata": {}, "source": [ "# DataChange\n", "\n", "The `data_change` module provides tools to model realistic sensor data. In particular, it allows you to:\n", "\n", "- **Corrupt ideal signals** to simulate realistic hardware behavior via the `data_change.corrupt` subclass\n", "- **Clean corrupted signals** for downstream use in estimators and controllers via the `data_change.clean` subclass\n", "\n", "This notebook demonstrates all corruption and preparation methods available in `pykal.data_change`, using visual examples to illustrate how each transformation affects the underlying signal." ] }, { "cell_type": "markdown", "id": "706cdbea", "metadata": {}, "source": [ "## References and Examples\n", "\n", " We first generate a representative set of reference signals using the `DynamicalSystem` class. In the remaining sections, we demonstrate corrupting these signals via\n", "\n", "- **Gaussian noise** (thermal noise / ADC noise)\n", "- **Bias offsets** (uncalibrated zero errors)\n", "- **Drift** (time-varying bias from warm-up / temperature)\n", "- **Quantization** (finite ADC resolution)\n", "- **Spikes / outliers** (EMI glitches, electrical transients)\n", "- **Clipping / saturation** (sensor range limits)\n", "- **Packet loss / dropouts** (missing samples, intermittent links)\n", "- **Contact bounce** (encoders, switches, digital edges)\n", "- **Latency / delay** (communication + processing lag)\n", "\n", "and then, when cleaning operations are available, demonstrating how to clean the corrupted signals, using such methods as\n", "\n", "- **moving average / low-pass filtering** (noise suppression)\n", "- **median filtering** (outlier rejection without blurring edges)\n", "- **exponential smoothing** (real-time filtering)\n", "- **debouncing** (stable digital transitions)\n", "- **outlier handling** (replace or interpolate)\n", "- **interpolation** (filling missing samples)\n", "- **staleness policies** for **asynchronous sensors** (hold/zero/drop/none)\n", "- **calibration** (bias + scale correction)" ] }, { "cell_type": "markdown", "id": "463b736e", "metadata": {}, "source": [ "### Generate Reference Signals" ] }, { "cell_type": "code", "execution_count": null, "id": "8ddbefc5", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from pykal import DynamicalSystem\n", "\n", "# -----------------------------\n", "# Reference signals (stateless)\n", "# -----------------------------\n", "\n", "\n", "def sine_wave(tk: float, f_hz: float, amplitude: float, phase: float) -> float:\n", " \"\"\"y(t) = A sin(2π f t + φ)\"\"\"\n", " return amplitude * np.sin(2.0 * np.pi * f_hz * tk + phase)\n", "\n", "\n", "def ramp(tk: float, t0: float, t1: float, y0: float, y1: float) -> float:\n", " \"\"\"Linear ramp from (t0, y0) to (t1, y1), clamped outside interval.\"\"\"\n", " if tk <= t0:\n", " return y0\n", " if tk >= t1:\n", " return y1\n", " s = (tk - t0) / (t1 - t0)\n", " return (1.0 - s) * y0 + s * y1\n", "\n", "\n", "def constant(c: float) -> float:\n", " \"\"\"y(t) = c\"\"\"\n", " return c\n", "\n", "\n", "def pwm(tk: float, f_hz: float, duty: float, low: float, high: float) -> float:\n", " \"\"\"\n", " Pulse-width modulation (PWM).\n", " \"\"\"\n", " T = 1.0 / f_hz\n", " phase = tk % T\n", " return high if phase < duty * T else low\n", "\n", "\n", "# Wrapping signals in DynamicalSystem objects\n", "ref_sine_wave = DynamicalSystem(h=sine_wave)\n", "ref_constant = DynamicalSystem(h=constant)\n", "ref_ramp = DynamicalSystem(h=ramp)\n", "ref_pwm = DynamicalSystem(h=pwm)\n", "\n", "# Simulating signals over time\n", "\n", "T = np.linspace(0, 10, 1000)\n", "\n", "Y_sin = []\n", "Y_const = []\n", "Y_ramp = []\n", "Y_pwm = []\n", "\n", "for tk in T:\n", " Y_sin.append(\n", " ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " )\n", " Y_const.append(ref_constant.step(params={\"c\": 1}))\n", " Y_ramp.append(\n", " ref_ramp.step(params={\"tk\": tk, \"t0\": 2.0, \"t1\": 8.0, \"y0\": 0.0, \"y1\": 1.0})\n", " )\n", " Y_pwm.append(\n", " ref_pwm.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"duty\": 0.5, \"low\": 0.0, \"high\": 1.0}\n", " )\n", " )" ] }, { "cell_type": "code", "execution_count": null, "id": "370467eb", "metadata": { "lines_to_next_cell": 0, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# -----------------------------\n", "# Visualize signals\n", "# -----------------------------\n", "fig, axs = plt.subplots(2, 2, figsize=(14, 8))\n", "\n", "axs[0, 0].plot(T, Y_sin, linewidth=2)\n", "axs[0, 0].set_title(\"Sine Wave\", fontweight=\"bold\")\n", "axs[0, 0].set_ylabel(\"Amplitude\")\n", "axs[0, 0].grid(True, alpha=0.3)\n", "\n", "axs[0, 1].plot(T, Y_pwm, linewidth=2)\n", "axs[0, 1].set_title(\"PWM Signal\", fontweight=\"bold\")\n", "axs[0, 1].grid(True, alpha=0.3)\n", "\n", "axs[1, 0].plot(T, Y_ramp, linewidth=2)\n", "axs[1, 0].set_title(\"Ramp Signal\", fontweight=\"bold\")\n", "axs[1, 0].set_xlabel(\"Time (s)\")\n", "axs[1, 0].set_ylabel(\"Amplitude\")\n", "axs[1, 0].grid(True, alpha=0.3)\n", "\n", "axs[1, 1].plot(T, Y_const, linewidth=2)\n", "axs[1, 1].set_title(\"Constant Signal\", fontweight=\"bold\")\n", "axs[1, 1].set_xlabel(\"Time (s)\")\n", "axs[1, 1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "a1a5d3d0", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Gaussian Noise\n", "Thanks to the [Central Limit Theorem](https://en.wikipedia.org/wiki/Central_limit_theorem#Applications_and_examples), sensor noise can often be modeled as the true sensor value plus zero-mean Gaussian noise. That is, the noisy output of the sensor, which we denote $\\tilde{y}_k$, is\n", "\n", "$$\n", "\\tilde{y}_k = y_k + r_k\n", "$$\n", "\n", "where $r_k \\sim \\mathcal{N}\\bigl(0,\\, \\sigma \\bigr)$.\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_gaussian_noise`." ] }, { "cell_type": "code", "execution_count": null, "id": "1a722e6e", "metadata": {}, "outputs": [], "source": [ "from pykal.data_change import corrupt, clean\n", "\n", "Y_clean_sin = []\n", "Y_noisy_sin = []\n", "\n", "for tk in T:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_corrupted = corrupt.with_gaussian_noise(yk, mean=0.0, std=0.2)\n", "\n", " Y_clean_sin.append(yk)\n", " Y_noisy_sin.append(yk_corrupted)" ] }, { "cell_type": "code", "execution_count": null, "id": "99793fc1", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Convert to NumPy arrays\n", "Y_clean_sin = np.asarray(Y_clean_sin)\n", "Y_noisy_sin = np.asarray(Y_noisy_sin)\n", "\n", "# -----------------------------\n", "# Plot clean vs noisy\n", "# -----------------------------\n", "plt.figure(figsize=(12, 5))\n", "\n", "plt.plot(\n", " T,\n", " Y_clean_sin,\n", " \"b-\",\n", " label=\"Clean Signal\",\n", " linewidth=2,\n", " alpha=0.7,\n", ")\n", "\n", "plt.plot(\n", " T,\n", " Y_noisy_sin,\n", " \"r-\",\n", " label=r\"With Gaussian Noise ($\\sigma=0.2$)\",\n", " linewidth=1,\n", " alpha=0.6,\n", ")\n", "\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Gaussian Noise Corruption\", fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "# -----------------------------\n", "# Noise statistics\n", "# -----------------------------\n", "print(f\"Original std: {np.std(Y_clean_sin):.4f}\")\n", "print(f\"Corrupted std: {np.std(Y_noisy_sin):.4f}\")\n", "print(f\"Noise added: {np.std(Y_noisy_sin - Y_clean_sin):.4f}\")" ] }, { "cell_type": "markdown", "id": "4ec3cc60", "metadata": {}, "source": [ "**Multivariate Gaussian Noise** (with covariance matrix):\n", "\n", "For vector-valued sensors (e.g., 3-axis IMU), noise is often correlated across channels. The noisy measurement is\n", "\n", "$$\n", "\\tilde{\\mathbf{y}}_k = \\mathbf{y}_k + \\mathbf{r}_k\n", "$$\n", "\n", "where $\\mathbf{r}_k \\sim \\mathcal{N}(\\mathbf{0}, \\mathbf{Q})$ and $\\mathbf{Q}$ is the covariance matrix capturing both individual channel noise levels and cross-channel correlations.\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_gaussian_noise` (with `Q` parameter)." ] }, { "cell_type": "code", "execution_count": null, "id": "2caaa09d", "metadata": {}, "outputs": [], "source": [ "# 3D IMU example: accelerometer with different noise per axis\n", "# Covariance: x low noise, y medium, z high (with small correlations)\n", "Q = np.array([[0.01, 0.002, 0.0], [0.002, 0.05, 0.01], [0.0, 0.01, 0.1]])\n", "\n", "# Generate 100 samples from three reference signals\n", "T_imu = np.linspace(0, 10, 100)\n", "IMU_clean = []\n", "IMU_noisy = []\n", "\n", "for i, tk in enumerate(T_imu):\n", " # Generate clean 3D signal (sine, ramp, constant)\n", " x_clean = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " y_clean = ref_ramp.step(\n", " params={\"tk\": tk, \"t0\": 2.0, \"t1\": 8.0, \"y0\": 0.0, \"y1\": 1.0}\n", " )\n", " z_clean = ref_constant.step(params={\"c\": 1})\n", "\n", " clean_vec = np.array([x_clean, y_clean, z_clean])\n", " # Apply multivariate Gaussian noise in the loop\n", " noisy_vec = corrupt.with_gaussian_noise(clean_vec, Q=Q, seed=i)\n", "\n", " IMU_clean.append(clean_vec)\n", " IMU_noisy.append(noisy_vec)" ] }, { "cell_type": "code", "execution_count": null, "id": "1bcc76d6", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# Convert to NumPy arrays for plotting\n", "imu_data = np.array(IMU_clean)\n", "noisy_imu = np.array(IMU_noisy)\n", "\n", "fig, axs = plt.subplots(1, 3, figsize=(15, 4))\n", "axes_names = [\"X Axis (low noise)\", \"Y Axis (medium noise)\", \"Z Axis (high noise)\"]\n", "\n", "for i in range(3):\n", " axs[i].plot(imu_data[:, i], \"b-\", label=\"Clean\", linewidth=2, alpha=0.7)\n", " axs[i].plot(noisy_imu[:, i], \"r-\", label=\"Noisy\", linewidth=1, alpha=0.6)\n", " axs[i].set_title(axes_names[i], fontsize=12, fontweight=\"bold\")\n", " axs[i].set_xlabel(\"Sample\", fontsize=10)\n", " axs[i].legend()\n", " axs[i].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "f1aad81f", "metadata": {}, "source": [ "#### Moving Average Filter\n", "\n", "The moving average filter smooths noisy signals by replacing each sample with the average of its neighbors:\n", "\n", "$$\n", "\\hat{y}_k = \\frac{1}{w} \\sum_{i=0}^{w-1} \\tilde{y}_{k-i}\n", "$$\n", "\n", "where $w$ is the window size. This reduces high-frequency noise but introduces phase lag proportional to $w/2$.\n", "\n", "**Best for**: reducing Gaussian noise\n", "**Tradeoff**: introduces lag, reduces bandwidth\n", "\n", "API reference: {func}`pykal.data_change.clean.with_moving_average`." ] }, { "cell_type": "code", "execution_count": null, "id": "fdc94868", "metadata": {}, "outputs": [], "source": [ "T_ma = np.linspace(0, 10, 1000)\n", "Y_original = []\n", "Y_noisy = []\n", "Y_smoothed = []\n", "\n", "for tk in T_ma:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_noisy = corrupt.with_gaussian_noise(yk, std=0.3)\n", "\n", " Y_original.append(yk)\n", " Y_noisy.append(yk_noisy)\n", "\n", "# Apply moving average after generation\n", "Y_noisy_arr = np.array(Y_noisy)\n", "Y_smoothed = clean.with_moving_average(Y_noisy_arr, window=10)" ] }, { "cell_type": "code", "execution_count": null, "id": "4f3e7787", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Convert to arrays for plotting\n", "sine_wave = np.array(Y_original)\n", "noisy_sine_2 = np.array(Y_noisy)\n", "smoothed_ma = Y_smoothed\n", "t = T_ma\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Original Clean Signal\", linewidth=2, alpha=0.5)\n", "plt.plot(\n", " t,\n", " noisy_sine_2,\n", " \"gray\",\n", " label=\"Corrupted (Gaussian noise)\",\n", " linewidth=0.5,\n", " alpha=0.5,\n", ")\n", "plt.plot(t, smoothed_ma, \"r-\", label=\"Prepared (Moving Avg, window=10)\", linewidth=2)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Moving Average Denoising\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Corrupted signal std: {np.std(noisy_sine_2):.4f}\")\n", "print(f\"Prepared signal std: {np.std(smoothed_ma):.4f}\")\n", "print(f\"Recovery error (RMS): {np.sqrt(np.mean((smoothed_ma - sine_wave)**2)):.4f}\")" ] }, { "cell_type": "markdown", "id": "54cca937", "metadata": {}, "source": [ "#### Exponential Smoothing\n", "\n", "Exponential smoothing is a recursive filter ideal for real-time applications:\n", "\n", "$$\n", "\\hat{y}_k = \\alpha \\tilde{y}_k + (1 - \\alpha) \\hat{y}_{k-1}\n", "$$\n", "\n", "where $\\alpha \\in [0,1]$ controls the smoothing strength. Larger $\\alpha$ responds faster but smooths less; smaller $\\alpha$ provides more smoothing but reacts slower to changes.\n", "\n", "**Best for**: real-time filtering\n", "**Parameter**: $\\alpha \\in [0,1]$\n", "\n", "API reference: {func}`pykal.data_change.clean.with_exponential_smoothing`." ] }, { "cell_type": "code", "execution_count": null, "id": "78586491", "metadata": {}, "outputs": [], "source": [ "T_exp = np.linspace(0, 10, 1000)\n", "Y_clean_exp = []\n", "Y_noisy_exp = []\n", "Y_smoothed_03 = []\n", "Y_smoothed_05 = []\n", "Y_smoothed_08 = []\n", "\n", "# Initialize exponential smoothing state for each alpha\n", "y_prev_03 = 0.0\n", "y_prev_05 = 0.0\n", "y_prev_08 = 0.0\n", "\n", "for tk in T_exp:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_noisy = corrupt.with_gaussian_noise(yk, std=0.25)\n", "\n", " # Apply exponential smoothing in the loop (recursive)\n", " y_smooth_03 = 0.3 * yk_noisy + (1 - 0.3) * y_prev_03\n", " y_smooth_05 = 0.5 * yk_noisy + (1 - 0.5) * y_prev_05\n", " y_smooth_08 = 0.8 * yk_noisy + (1 - 0.8) * y_prev_08\n", "\n", " # Update previous values\n", " y_prev_03 = y_smooth_03\n", " y_prev_05 = y_smooth_05\n", " y_prev_08 = y_smooth_08\n", "\n", " Y_clean_exp.append(yk)\n", " Y_noisy_exp.append(yk_noisy)\n", " Y_smoothed_03.append(y_smooth_03)\n", " Y_smoothed_05.append(y_smooth_05)\n", " Y_smoothed_08.append(y_smooth_08)\n", "\n", "# Convert to arrays\n", "smoothed_alpha_03 = np.array(Y_smoothed_03)\n", "smoothed_alpha_05 = np.array(Y_smoothed_05)\n", "smoothed_alpha_08 = np.array(Y_smoothed_08)" ] }, { "cell_type": "code", "execution_count": null, "id": "b735cd8d", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "sine_wave = np.array(Y_clean_exp)\n", "noisy_sine_3 = np.array(Y_noisy_exp)\n", "t = T_exp\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "plt.plot(t, noisy_sine_3, \"gray\", label=\"Corrupted\", linewidth=0.5, alpha=0.3)\n", "plt.plot(\n", " t,\n", " smoothed_alpha_03,\n", " \"r-\",\n", " label=\"α=0.3 (heavy smoothing)\",\n", " linewidth=1.5,\n", " alpha=0.8,\n", ")\n", "plt.plot(t, smoothed_alpha_05, \"g-\", label=\"α=0.5 (moderate)\", linewidth=1.5, alpha=0.8)\n", "plt.plot(t, smoothed_alpha_08, \"m-\", label=\"α=0.8 (light)\", linewidth=1.5, alpha=0.8)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Exponential Smoothing with Different α\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "8ebd5a55", "metadata": {}, "source": [ "#### Low-Pass Filter (First-Order)\n", "\n", "A first-order low-pass filter attenuates high-frequency components while passing low frequencies:\n", "\n", "$$\n", "\\hat{y}_k = \\alpha \\tilde{y}_k + (1 - \\alpha) \\hat{y}_{k-1}\n", "$$\n", "\n", "Equivalent to exponential smoothing, but interpreted in the frequency domain with cutoff frequency $f_c = \\alpha f_s / (2\\pi)$ where $f_s$ is the sampling rate.\n", "\n", "**Best for**: attenuating high-frequency noise\n", "**Parameter**: $\\alpha \\in [0,1]$\n", "\n", "API reference: {func}`pykal.data_change.clean.with_low_pass_filter`." ] }, { "cell_type": "code", "execution_count": null, "id": "d9b0f7dd", "metadata": {}, "outputs": [], "source": [ "T_lp = np.linspace(0, 10, 1000)\n", "Y_clean_lp = []\n", "Y_noisy_lp = []\n", "Y_filtered = []\n", "\n", "# Initialize low-pass filter state\n", "y_prev = 0.0\n", "alpha = 0.1\n", "\n", "for tk in T_lp:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_noisy = corrupt.with_gaussian_noise(yk, std=0.3)\n", "\n", " # Apply low-pass filter in the loop (recursive)\n", " y_filtered = alpha * yk_noisy + (1 - alpha) * y_prev\n", " y_prev = y_filtered\n", "\n", " Y_clean_lp.append(yk)\n", " Y_noisy_lp.append(yk_noisy)\n", " Y_filtered.append(y_filtered)\n", "\n", "# Convert to array\n", "filtered_lp = np.array(Y_filtered)" ] }, { "cell_type": "code", "execution_count": null, "id": "4bd67e6b", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "sine_wave = np.array(Y_clean_lp)\n", "noisy_sine_4 = np.array(Y_noisy_lp)\n", "t = T_lp\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "plt.plot(t, noisy_sine_4, \"gray\", label=\"Corrupted (noise)\", linewidth=0.5, alpha=0.5)\n", "plt.plot(t, filtered_lp, \"r-\", label=\"Prepared (low-pass α=0.1)\", linewidth=2)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Low-Pass Filter (First-Order)\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "702660c2", "metadata": {}, "source": [ "### Contact Bounce\n", "\n", "Mechanical switches and encoders exhibit contact bounce—rapid oscillations near transitions before settling to a stable value:\n", "\n", "$$\n", "\\tilde{y}_k = \\begin{cases}\n", "y_k + A \\cdot \\text{noise}_k & \\text{if } |k - k_{\\text{transition}}| < d \\\\\n", "y_k & \\text{otherwise}\n", "\\end{cases}\n", "$$\n", "\n", "where $A$ is the bounce amplitude and $d$ is the bounce duration in samples. This non-ideal behavior occurs due to mechanical spring forces causing contacts to vibrate.\n", "\n", "**Hardware source**: mechanical switches and encoders that chatter near transitions\n", "**Common in**: digital inputs, rotary encoders, limit switches\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_bounce`." ] }, { "cell_type": "code", "execution_count": null, "id": "cb4b4265", "metadata": {}, "outputs": [], "source": [ "T_bounce = np.linspace(0, 4, 400)\n", "Y_step = []\n", "Y_bounced = []\n", "\n", "for tk in T_bounce:\n", " # Generate step signal: 0 before t=2, 1 after\n", " yk = 0.0 if tk < 2.0 else 1.0\n", "\n", " Y_step.append(yk)\n", "\n", "# Apply bounce corruption after generation\n", "step_signal = np.array(Y_step)\n", "bounced_step = corrupt.with_bounce(step_signal, duration=5, amplitude=0.3, seed=42)" ] }, { "cell_type": "code", "execution_count": null, "id": "d78095cd", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_bounce\n", "\n", "fig, axs = plt.subplots(1, 2, figsize=(14, 5))\n", "\n", "axs[0].plot(t, step_signal, \"b-\", label=\"Clean Step\", linewidth=2)\n", "axs[0].plot(t, bounced_step, \"r-\", label=\"With Bounce\", linewidth=1.5, alpha=0.8)\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Contact Bounce (Full Signal)\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "zoom_start, zoom_end = 195, 215\n", "axs[1].plot(\n", " t[zoom_start:zoom_end],\n", " step_signal[zoom_start:zoom_end],\n", " \"b-\",\n", " label=\"Clean Step\",\n", " linewidth=3,\n", ")\n", "axs[1].plot(\n", " t[zoom_start:zoom_end],\n", " bounced_step[zoom_start:zoom_end],\n", " \"r-\",\n", " label=\"With Bounce\",\n", " linewidth=2,\n", " alpha=0.8,\n", ")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Contact Bounce (Zoomed Transition)\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "51217db8", "metadata": {}, "source": [ "#### Debounce\n", "\n", "Debouncing removes contact bounce by requiring signal stability for a minimum duration before accepting a transition:\n", "\n", "$$\n", "\\hat{y}_k = \\begin{cases}\n", "\\tilde{y}_k & \\text{if } |\\tilde{y}_j - \\tilde{y}_{j-1}| < \\epsilon \\text{ for all } j \\in [k-d, k] \\\\\n", "\\hat{y}_{k-1} & \\text{otherwise}\n", "\\end{cases}\n", "$$\n", "\n", "where $\\epsilon$ is the stability threshold and $d$ is the minimum stable duration. This implements a form of temporal hysteresis.\n", "\n", "**Best for**: removing contact bounce from digital signals\n", "**Idea**: require stability for a minimum duration before accepting transitions\n", "\n", "API reference: {func}`pykal.data_change.clean.with_debounce`." ] }, { "cell_type": "code", "execution_count": null, "id": "748fa218", "metadata": {}, "outputs": [], "source": [ "T_debounce = np.linspace(0, 4, 400)\n", "Y_step_debounce = []\n", "\n", "for tk in T_debounce:\n", " yk = 0.0 if tk < 2.0 else 1.0\n", " Y_step_debounce.append(yk)\n", "\n", "# Apply bounce and debounce\n", "step_signal = np.array(Y_step_debounce)\n", "bounced_step_2 = corrupt.with_bounce(step_signal, duration=8, amplitude=0.4, seed=46)\n", "debounced = clean.with_debounce(bounced_step_2, threshold=0.2, min_duration=3)" ] }, { "cell_type": "code", "execution_count": null, "id": "2ec1a68b", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_debounce\n", "zoom_start, zoom_end = 195, 230\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(\n", " t[zoom_start:zoom_end],\n", " step_signal[zoom_start:zoom_end],\n", " \"b-\",\n", " label=\"Original Clean\",\n", " linewidth=3,\n", " alpha=0.5,\n", ")\n", "plt.plot(\n", " t[zoom_start:zoom_end],\n", " bounced_step_2[zoom_start:zoom_end],\n", " \"gray\",\n", " label=\"Corrupted (bounce)\",\n", " linewidth=1.5,\n", " alpha=0.7,\n", ")\n", "plt.plot(\n", " t[zoom_start:zoom_end],\n", " debounced[zoom_start:zoom_end],\n", " \"r-\",\n", " label=\"Prepared (debounced)\",\n", " linewidth=2,\n", ")\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Debounce Filter (Zoomed Transition)\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "0336c6a2", "metadata": {}, "source": [ "### Dropouts (Packet Loss)\n", "\n", "Wireless communication and intermittent sensor connections cause random samples to be missing:\n", "\n", "$$\n", "\\tilde{y}_k = \\begin{cases}\n", "y_k & \\text{with probability } (1 - p) \\\\\n", "\\text{NaN} & \\text{with probability } p\n", "\\end{cases}\n", "$$\n", "\n", "where $p$ is the dropout rate. Missing samples are marked as NaN and must be handled by downstream processing.\n", "\n", "**Hardware source**: wireless links, intermittent buses, sensor hiccups\n", "**Common in**: WiFi sensors, BLE devices, multi-robot comms\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_dropouts`." ] }, { "cell_type": "code", "execution_count": null, "id": "360aa4f5", "metadata": {}, "outputs": [], "source": [ "T_dropout = np.linspace(0, 10, 1000)\n", "Y_sine_dropout = []\n", "Y_dropped = []\n", "\n", "np.random.seed(42)\n", "dropout_rate = 0.2\n", "\n", "for tk in T_dropout:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply dropout in the loop (probabilistic per-sample)\n", " if np.random.rand() < dropout_rate:\n", " yk_dropped = np.nan\n", " else:\n", " yk_dropped = yk\n", "\n", " Y_sine_dropout.append(yk)\n", " Y_dropped.append(yk_dropped)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_dropout)\n", "dropped_sine = np.array(Y_dropped)" ] }, { "cell_type": "code", "execution_count": null, "id": "c00e028a", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_dropout\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Clean Signal\", linewidth=2, alpha=0.5)\n", "plt.plot(t, dropped_sine, \"r.\", label=\"With 20% Dropouts (NaN)\", markersize=3)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Packet Loss / Dropouts\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Total samples: {len(sine_wave)}\")\n", "print(f\"Dropped samples: {np.isnan(dropped_sine).sum()}\")\n", "print(f\"Dropout rate: {np.isnan(dropped_sine).sum() / len(sine_wave):.2%}\")" ] }, { "cell_type": "markdown", "id": "eb0d2eb3", "metadata": {}, "source": [ "#### Interpolation (Fill Missing Data)\n", "\n", "Interpolation fills missing samples (NaN values) using neighboring valid data:\n", "\n", "**Linear interpolation**:\n", "$$\n", "\\hat{y}_k = y_i + \\frac{k - i}{j - i}(y_j - y_i)\n", "$$\n", "where $i < k < j$ and $y_i, y_j$ are the nearest valid samples.\n", "\n", "**Nearest-neighbor interpolation**:\n", "$$\n", "\\hat{y}_k = y_{\\arg\\min_i |k - i|}\n", "$$\n", "\n", "**Best for**: dropouts (NaN values)\n", "**Methods**: linear (smooth), nearest-neighbor (preserves steps)\n", "\n", "API reference: {func}`pykal.data_change.clean.with_interpolation`." ] }, { "cell_type": "code", "execution_count": null, "id": "a027a7a3", "metadata": {}, "outputs": [], "source": [ "T_interp = np.linspace(0, 10, 1000)\n", "Y_sine_interp = []\n", "\n", "for tk in T_interp:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " Y_sine_interp.append(yk)\n", "\n", "# Apply dropouts and interpolation\n", "sine_wave = np.array(Y_sine_interp)\n", "dropped_sine_2 = corrupt.with_dropouts(sine_wave, dropout_rate=0.15, seed=48)\n", "filled_linear = clean.with_interpolation(dropped_sine_2, method=\"linear\")\n", "filled_nearest = clean.with_interpolation(dropped_sine_2, method=\"nearest\")" ] }, { "cell_type": "code", "execution_count": null, "id": "48d6a8c6", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_interp\n", "\n", "fig, axs = plt.subplots(1, 2, figsize=(14, 5))\n", "\n", "axs[0].plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "axs[0].plot(t, dropped_sine_2, \"r.\", label=\"Corrupted (dropouts)\", markersize=3)\n", "axs[0].plot(t, filled_linear, \"g-\", label=\"Prepared (linear)\", linewidth=1.5, alpha=0.8)\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Linear Interpolation\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "axs[1].plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "axs[1].plot(t, dropped_sine_2, \"r.\", label=\"Corrupted (dropouts)\", markersize=3)\n", "axs[1].plot(\n", " t, filled_nearest, \"m-\", label=\"Prepared (nearest)\", linewidth=1.5, alpha=0.8\n", ")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Nearest-Neighbor Interpolation\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Dropped samples: {np.isnan(dropped_sine_2).sum()}\")\n", "print(f\"Remaining NaN after linear interpolation: {np.isnan(filled_linear).sum()}\")" ] }, { "cell_type": "markdown", "id": "732e0aaf", "metadata": {}, "source": [ "### Asynchronous / Multi-Rate Data\n", "\n", "**Hardware source**: sensors sampling at different rates, non-uniform timing\n", "**Common in**: sensor fusion (IMU @ 100Hz, GPS @ 10Hz, camera @ 30Hz)" ] }, { "cell_type": "code", "execution_count": null, "id": "d6fb5335", "metadata": {}, "outputs": [], "source": [ "# Simulate three sensors with different sampling rates\n", "t_fast = np.arange(0, 2, 0.01) # 100 Hz\n", "t_medium = np.arange(0, 2, 0.033) # ~30 Hz\n", "t_slow = np.arange(0, 2, 0.1) # 10 Hz\n", "\n", "# Generate fast sensor data\n", "signal_fast = []\n", "for tk in t_fast:\n", " yk = np.sin(2 * np.pi * 1.5 * tk)\n", " signal_fast.append(yk)\n", "signal_fast = np.array(signal_fast)\n", "signal_fast_noisy = corrupt.with_gaussian_noise(signal_fast, std=0.05, seed=60)\n", "\n", "# Generate medium sensor data\n", "signal_medium = []\n", "for tk in t_medium:\n", " yk = np.sin(2 * np.pi * 1.5 * tk)\n", " signal_medium.append(yk)\n", "signal_medium = np.array(signal_medium)\n", "signal_medium_noisy = corrupt.with_gaussian_noise(signal_medium, std=0.08, seed=61)\n", "\n", "# Generate slow sensor data\n", "signal_slow = []\n", "for tk in t_slow:\n", " yk = np.sin(2 * np.pi * 1.5 * tk)\n", " signal_slow.append(yk)\n", "signal_slow = np.array(signal_slow)\n", "signal_slow_noisy = corrupt.with_gaussian_noise(signal_slow, std=0.1, seed=62)" ] }, { "cell_type": "code", "execution_count": null, "id": "66a98b02", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "fig, axs = plt.subplots(2, 1, figsize=(12, 8))\n", "\n", "# Top: show different sampling rates\n", "axs[0].plot(t_fast, signal_fast, \"b-\", linewidth=2, alpha=0.3, label=\"True Signal\")\n", "axs[0].plot(\n", " t_fast,\n", " signal_fast_noisy,\n", " \"r.\",\n", " markersize=3,\n", " label=\"Fast Sensor (100Hz)\",\n", " alpha=0.6,\n", ")\n", "axs[0].plot(\n", " t_medium,\n", " signal_medium_noisy,\n", " \"g^\",\n", " markersize=5,\n", " label=\"Medium Sensor (~30Hz)\",\n", " alpha=0.7,\n", ")\n", "axs[0].plot(\n", " t_slow, signal_slow_noisy, \"mo\", markersize=7, label=\"Slow Sensor (10Hz)\", alpha=0.8\n", ")\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Asynchronous Multi-Rate Sensors\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Bottom: zoom in to see timing differences\n", "zoom_end = 0.3\n", "mask_fast = t_fast <= zoom_end\n", "mask_medium = t_medium <= zoom_end\n", "mask_slow = t_slow <= zoom_end\n", "\n", "axs[1].plot(\n", " t_fast[mask_fast],\n", " signal_fast[mask_fast],\n", " \"b-\",\n", " linewidth=2,\n", " alpha=0.3,\n", " label=\"True Signal\",\n", ")\n", "axs[1].plot(\n", " t_fast[mask_fast],\n", " signal_fast_noisy[mask_fast],\n", " \"r.\",\n", " markersize=6,\n", " label=\"Fast (100Hz)\",\n", " alpha=0.6,\n", ")\n", "axs[1].plot(\n", " t_medium[mask_medium],\n", " signal_medium_noisy[mask_medium],\n", " \"g^\",\n", " markersize=8,\n", " label=\"Medium (~30Hz)\",\n", " alpha=0.7,\n", ")\n", "axs[1].plot(\n", " t_slow[mask_slow],\n", " signal_slow_noisy[mask_slow],\n", " \"mo\",\n", " markersize=10,\n", " label=\"Slow (10Hz)\",\n", " alpha=0.8,\n", ")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Zoomed: Different Sample Timings\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Fast sensor samples: {len(t_fast)} @ 100Hz\")\n", "print(f\"Medium sensor samples: {len(t_medium)} @ ~30Hz\")\n", "print(f\"Slow sensor samples: {len(t_slow)} @ 10Hz\")\n", "print(f\"\\nChallenge: How to fuse these into a single unified estimate?\")" ] }, { "cell_type": "markdown", "id": "7b31a3b6", "metadata": {}, "source": [ "#### Kalman Filter Multi-Rate Sensor Fusion\n", "\n", "The optimal solution for fusing asynchronous sensors with different noise characteristics is the **Kalman filter**. Here we demonstrate:\n", "\n", "- **Fast sensor** (100 Hz): High rate but noisy (σ = 0.3)\n", "- **Slow sensor** (10 Hz): Low rate but accurate (σ = 0.05)\n", "- **Kalman filter**: Optimally weights both based on their noise characteristics\n", "\n", "
\n", "\n", "
" ] }, { "cell_type": "code", "execution_count": null, "id": "5e2a0677", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "from pykal.algorithm_library.estimators.kf import KF\n", "\n", "# True state evolution: simple 1D constant velocity\n", "# State: [position, velocity]\n", "dt_sim = 0.01 # Simulation timestep (100 Hz)\n", "t_total = 2.0\n", "t_sim = np.arange(0, t_total, dt_sim)\n", "\n", "# True state trajectory\n", "true_pos = np.sin(2 * np.pi * 0.5 * t_sim) # Sinusoidal position\n", "true_vel = 2 * np.pi * 0.5 * np.cos(2 * np.pi * 0.5 * t_sim) # Derivative\n", "\n", "# Generate sensor measurements\n", "# Fast sensor: measures position every 0.01s (100 Hz) with high noise\n", "fast_noise_std = 0.3\n", "np.random.seed(100)\n", "fast_measurements = true_pos + np.random.normal(0, fast_noise_std, len(t_sim))\n", "\n", "# Slow sensor: measures position every 0.1s (10 Hz) with low noise\n", "slow_rate = 10 # Every 10 samples\n", "slow_noise_std = 0.05\n", "slow_measurements = np.full_like(true_pos, np.nan)\n", "slow_measurements[::slow_rate] = true_pos[::slow_rate] + np.random.normal(\n", " 0, slow_noise_std, len(true_pos[::slow_rate])\n", ")\n", "\n", "# Kalman filter setup - constant velocity model\n", "# State: x = [position, velocity]^T\n", "# Dynamics: position_{k+1} = position_k + velocity_k * dt\n", "# velocity_{k+1} = velocity_k\n", "\n", "\n", "# State transition matrix (constant for linear system)\n", "def get_F(dt):\n", " return np.array([[1.0, dt], [0.0, 1.0]])\n", "\n", "\n", "# Measurement matrix (observe position only)\n", "H = np.array([[1.0, 0.0]])\n", "\n", "# Process noise covariance\n", "Q = np.diag([0.01, 0.01])\n", "\n", "# Measurement noise covariances\n", "R_fast = np.array([[fast_noise_std**2]])\n", "R_slow = np.array([[slow_noise_std**2]])\n", "\n", "# Initialize state estimate\n", "x_est = np.array([[0.0], [0.0]]) # Start at origin with zero velocity\n", "P_est = np.diag([1.0, 1.0]) # Initial uncertainty\n", "\n", "# Storage\n", "kf_positions = []\n", "kf_velocities = []\n", "\n", "# Run Kalman filter\n", "for i in range(len(t_sim)):\n", " # Predict step (always done)\n", " F = get_F(dt_sim)\n", " x_pred = F @ x_est # State prediction\n", " P_pred = F @ P_est @ F.T + Q # Covariance prediction\n", "\n", " # Update step (if measurement available)\n", " has_slow = not np.isnan(slow_measurements[i])\n", " has_fast = not np.isnan(fast_measurements[i])\n", "\n", " if has_slow:\n", " # Use slow (accurate) sensor\n", " z = np.array([[slow_measurements[i]]])\n", " R = R_slow\n", " elif has_fast:\n", " # Use fast (noisy) sensor\n", " z = np.array([[fast_measurements[i]]])\n", " R = R_fast\n", " else:\n", " # No measurement - use prediction\n", " x_est = x_pred\n", " P_est = P_pred\n", " kf_positions.append(x_est[0, 0])\n", " kf_velocities.append(x_est[1, 0])\n", " continue\n", "\n", " # Kalman gain\n", " S = H @ P_pred @ H.T + R\n", " K = P_pred @ H.T @ np.linalg.inv(S)\n", "\n", " # Update estimate\n", " y_pred = H @ x_pred\n", " innovation = z - y_pred\n", " x_est = x_pred + K @ innovation\n", " P_est = (np.eye(2) - K @ H) @ P_pred\n", "\n", " kf_positions.append(x_est[0, 0])\n", " kf_velocities.append(x_est[1, 0])\n", "\n", "kf_positions = np.array(kf_positions)\n", "kf_velocities = np.array(kf_velocities)\n", "\n", "# Compute errors\n", "fast_error_rms = np.sqrt(np.mean((fast_measurements - true_pos) ** 2))\n", "slow_error_rms = np.sqrt(np.nanmean((slow_measurements - true_pos) ** 2))\n", "kf_error_rms = np.sqrt(np.mean((kf_positions - true_pos) ** 2))\n", "\n", "print(f\"Fast sensor RMS error: {fast_error_rms:.4f}\")\n", "print(f\"Slow sensor RMS error: {slow_error_rms:.4f}\")\n", "print(f\"Kalman filter RMS error: {kf_error_rms:.4f}\")\n", "print(f\"\\nImprovement over fast sensor: {(1 - kf_error_rms/fast_error_rms)*100:.1f}%\")\n", "print(f\"Improvement over slow sensor: {(1 - kf_error_rms/slow_error_rms)*100:.1f}%\")" ] }, { "cell_type": "code", "execution_count": null, "id": "3f9a6d54", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Visualization\n", "fig, axs = plt.subplots(2, 1, figsize=(14, 9))\n", "\n", "# Top: Position estimates\n", "axs[0].plot(t_sim, true_pos, \"k-\", linewidth=2.5, label=\"True Position\", alpha=0.7)\n", "axs[0].plot(\n", " t_sim,\n", " fast_measurements,\n", " \"r.\",\n", " markersize=2,\n", " label=f\"Fast Sensor (100Hz, σ={fast_noise_std})\",\n", " alpha=0.3,\n", ")\n", "axs[0].plot(\n", " t_sim,\n", " slow_measurements,\n", " \"go\",\n", " markersize=6,\n", " label=f\"Slow Sensor (10Hz, σ={slow_noise_std})\",\n", " alpha=0.7,\n", ")\n", "axs[0].plot(\n", " t_sim, kf_positions, \"b-\", linewidth=2, label=\"Kalman Filter Estimate\", alpha=0.9\n", ")\n", "axs[0].set_ylabel(\"Position\", fontsize=12)\n", "axs[0].set_title(\n", " \"Multi-Rate Kalman Filter Sensor Fusion\", fontsize=14, fontweight=\"bold\"\n", ")\n", "axs[0].legend(fontsize=11, loc=\"upper right\")\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Bottom: Velocity estimate (velocity is not directly measured!)\n", "axs[1].plot(t_sim, true_vel, \"k-\", linewidth=2.5, label=\"True Velocity\", alpha=0.7)\n", "axs[1].plot(\n", " t_sim, kf_velocities, \"b-\", linewidth=2, label=\"KF Velocity Estimate\", alpha=0.9\n", ")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Velocity\", fontsize=12)\n", "axs[1].set_title(\n", " \"Velocity Estimation (Not Directly Measured)\", fontsize=14, fontweight=\"bold\"\n", ")\n", "axs[1].legend(fontsize=11, loc=\"upper right\")\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(\"\\nKey insight: KF optimally fuses noisy-fast and accurate-slow sensors!\")\n", "print(\"Bonus: KF estimates velocity even though it's never directly measured.\")" ] }, { "cell_type": "markdown", "id": "ecb5467d", "metadata": {}, "source": [ "#### Staleness Policy (Asynchronous Sensors)\n", "\n", "Asynchronous sensors arrive at different times, requiring policies for handling missing data:\n", "\n", "**Policies**:\n", "- **`zero`**: $\\hat{y}_k = 0$ if NaN (assumes baseline state)\n", "- **`hold`**: $\\hat{y}_k = \\hat{y}_{k-1}$ if NaN (zero-order hold)\n", "- **`drop`**: remove NaN samples entirely (variable-rate output)\n", "- **`none`**: keep NaN (downstream must handle)\n", "\n", "$$\n", "\\hat{y}_k = \\begin{cases}\n", "\\tilde{y}_k & \\text{if } \\tilde{y}_k \\neq \\text{NaN} \\\\\n", "\\text{policy}(\\hat{y}_{k-1}) & \\text{if } \\tilde{y}_k = \\text{NaN}\n", "\\end{cases}\n", "$$\n", "\n", "This is critical for robotics where sensor fusion often happens across streams without a shared clock.\n", "\n", "**Best for**: multi-rate or intermittent sensors\n", "\n", "API reference: {func}`pykal.data_change.clean.with_staleness_policy`." ] }, { "cell_type": "code", "execution_count": null, "id": "d0b8ba86", "metadata": {}, "outputs": [], "source": [ "T_stale = np.linspace(0, 2, 200)\n", "Y_sine_stale = []\n", "Y_intermittent = []\n", "Y_handled_zero = []\n", "Y_handled_hold = []\n", "\n", "np.random.seed(49)\n", "dropout_rate = 0.3\n", "y_prev_hold = 0.0 # Initialize hold policy state\n", "\n", "for tk in T_stale:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply dropouts\n", " if np.random.rand() < dropout_rate:\n", " yk_intermittent = np.nan\n", " else:\n", " yk_intermittent = yk\n", "\n", " # Apply staleness policies in the loop\n", " # Zero policy: replace NaN with 0\n", " if np.isnan(yk_intermittent):\n", " yk_zero = 0.0\n", " else:\n", " yk_zero = yk_intermittent\n", "\n", " # Hold policy: forward-fill last valid value\n", " if np.isnan(yk_intermittent):\n", " yk_hold = y_prev_hold\n", " else:\n", " yk_hold = yk_intermittent\n", " y_prev_hold = yk_hold\n", "\n", " Y_sine_stale.append(yk)\n", " Y_intermittent.append(yk_intermittent)\n", " Y_handled_zero.append(yk_zero)\n", " Y_handled_hold.append(yk_hold)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_stale)\n", "intermittent_signal = np.array(Y_intermittent)\n", "handled_zero = np.array(Y_handled_zero)\n", "handled_hold = np.array(Y_handled_hold)\n", "\n", "# Drop policy requires removing NaN, so use array-based approach\n", "handled_drop = clean.with_staleness_policy(intermittent_signal, policy=\"drop\")" ] }, { "cell_type": "code", "execution_count": null, "id": "07a54237", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_stale\n", "fig, axs = plt.subplots(2, 2, figsize=(14, 10))\n", "\n", "axs[0, 0].plot(t, sine_wave, \"b-\", label=\"Clean\", linewidth=2, alpha=0.5)\n", "axs[0, 0].plot(\n", " t, intermittent_signal, \"r.\", label=\"Intermittent (30% dropout)\", markersize=4\n", ")\n", "axs[0, 0].set_title(\"Original: Intermittent Sensor\", fontsize=12, fontweight=\"bold\")\n", "axs[0, 0].legend()\n", "axs[0, 0].grid(True, alpha=0.3)\n", "\n", "axs[0, 1].plot(t, sine_wave, \"b-\", label=\"Clean\", linewidth=2, alpha=0.5)\n", "axs[0, 1].plot(t, handled_zero, \"g-\", label=\"Policy: 'zero'\", linewidth=1.5)\n", "axs[0, 1].set_title(\"Zero Policy\", fontsize=12, fontweight=\"bold\")\n", "axs[0, 1].legend()\n", "axs[0, 1].grid(True, alpha=0.3)\n", "\n", "axs[1, 0].plot(t, sine_wave, \"b-\", label=\"Clean\", linewidth=2, alpha=0.5)\n", "axs[1, 0].plot(t, handled_hold, \"m-\", label=\"Policy: 'hold'\", linewidth=1.5)\n", "axs[1, 0].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1, 0].set_title(\"Hold Policy\", fontsize=12, fontweight=\"bold\")\n", "axs[1, 0].legend()\n", "axs[1, 0].grid(True, alpha=0.3)\n", "\n", "t_drop = np.linspace(0, 2, len(handled_drop))\n", "sine_drop = sine_wave[: len(handled_drop)]\n", "axs[1, 1].plot(t_drop, sine_drop, \"b-\", label=\"Clean\", linewidth=2, alpha=0.5)\n", "axs[1, 1].plot(\n", " t_drop,\n", " handled_drop,\n", " \"c-\",\n", " label=\"Policy: 'drop'\",\n", " linewidth=1.5,\n", " marker=\"o\",\n", " markersize=2,\n", ")\n", "axs[1, 1].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1, 1].set_title(\"Drop Policy\", fontsize=12, fontweight=\"bold\")\n", "axs[1, 1].legend()\n", "axs[1, 1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Original length: {len(intermittent_signal)}\")\n", "print(f\"After 'drop' policy: {len(handled_drop)} samples\")" ] }, { "cell_type": "markdown", "id": "b45e4be6", "metadata": {}, "source": [ "### Constant Bias\n", "\n", "Constant bias offsets arise from uncalibrated zero-errors or systematic measurement offsets:\n", "\n", "$$\n", "\\tilde{y}_k = y_k + b\n", "$$\n", "\n", "where $b$ is the constant bias. Common causes include ADC offset voltage, sensor mounting misalignment, or gravitational effects on accelerometers.\n", "\n", "**Hardware source**: zero-offset error, poor calibration\n", "**Common in**: IMUs, force/torque sensors, pressure sensors\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_bias`." ] }, { "cell_type": "code", "execution_count": null, "id": "48c8ce0d", "metadata": {}, "outputs": [], "source": [ "T_bias = np.linspace(0, 10, 1000)\n", "Y_sine_bias = []\n", "Y_biased = []\n", "\n", "bias = 1.5\n", "\n", "for tk in T_bias:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply constant bias in the loop\n", " yk_biased = yk + bias\n", "\n", " Y_sine_bias.append(yk)\n", " Y_biased.append(yk_biased)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_bias)\n", "biased_sine = np.array(Y_biased)" ] }, { "cell_type": "code", "execution_count": null, "id": "98e852b6", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_bias\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Clean Signal (zero-mean)\", linewidth=2, alpha=0.7)\n", "plt.plot(t, biased_sine, \"r-\", label=\"With Bias (+1.5)\", linewidth=2, alpha=0.7)\n", "plt.axhline(y=0, color=\"k\", linestyle=\":\", alpha=0.5, label=\"Zero Line\")\n", "plt.axhline(y=1.5, color=\"gray\", linestyle=\":\", alpha=0.5, label=\"Bias Level\")\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Constant Bias Offset\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Original mean: {np.mean(sine_wave):.4f}\")\n", "print(f\"Biased mean: {np.mean(biased_sine):.4f}\")\n", "print(f\"Bias: {np.mean(biased_sine) - np.mean(sine_wave):.4f}\")" ] }, { "cell_type": "markdown", "id": "3741b7e4", "metadata": {}, "source": [ "#### Calibration (Remove Bias and Scale)\n", "\n", "Calibration corrects both bias (additive error) and scale (multiplicative error):\n", "\n", "$$\n", "\\hat{y}_k = s \\cdot (\\tilde{y}_k - b)\n", "$$\n", "\n", "where $b$ is the bias offset and $s$ is the scale factor. These parameters are typically determined from known reference measurements during a calibration procedure.\n", "\n", "**Best for**: correcting systematic sensor errors after calibration data is available\n", "\n", "API reference: {func}`pykal.data_change.clean.with_calibration`." ] }, { "cell_type": "code", "execution_count": null, "id": "b51eb5a1", "metadata": {}, "outputs": [], "source": [ "T_calib = np.linspace(0, 10, 1000)\n", "Y_sine_calib = []\n", "Y_miscalibrated = []\n", "Y_calibrated = []\n", "\n", "# Calibration parameters\n", "scale_error = 2.0\n", "bias_error = 3.0\n", "# Correction parameters\n", "offset = 3.0\n", "scale = 0.5\n", "\n", "for tk in T_calib:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply miscalibration in the loop\n", " yk_miscal = yk * scale_error + bias_error\n", "\n", " # Apply calibration correction in the loop\n", " yk_calib = scale * (yk_miscal - offset)\n", "\n", " Y_sine_calib.append(yk)\n", " Y_miscalibrated.append(yk_miscal)\n", " Y_calibrated.append(yk_calib)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_calib)\n", "miscalibrated = np.array(Y_miscalibrated)\n", "calibrated = np.array(Y_calibrated)" ] }, { "cell_type": "code", "execution_count": null, "id": "82c19a93", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_calib\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"True Signal (calibrated)\", linewidth=2, alpha=0.5)\n", "plt.plot(t, miscalibrated, \"r-\", label=\"Miscalibrated (2× + 3)\", linewidth=2, alpha=0.7)\n", "plt.plot(t, calibrated, \"g--\", label=\"After Calibration\", linewidth=2)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\n", " \"Sensor Calibration (Bias and Scale Correction)\", fontsize=14, fontweight=\"bold\"\n", ")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Calibration error (RMS): {np.sqrt(np.mean((calibrated - sine_wave)**2)):.6f}\")" ] }, { "cell_type": "markdown", "id": "76d23279", "metadata": {}, "source": [ "### Drift (Time-Dependent Bias)\n", "\n", "Drift is a slowly varying bias that changes over time, often due to thermal effects or sensor warm-up:\n", "\n", "**Linear drift**:\n", "$$\n", "\\tilde{y}_k = y_k + r \\cdot k \\cdot \\Delta t\n", "$$\n", "\n", "**Exponential drift**:\n", "$$\n", "\\tilde{y}_k = y_k + r \\cdot (1 - e^{-k \\cdot \\Delta t / \\tau})\n", "$$\n", "\n", "where $r$ is the drift rate, $\\Delta t$ is the sample period, and $\\tau$ is the time constant. Gyroscopes are particularly susceptible to drift due to bias instability.\n", "\n", "**Hardware source**: warm-up, temperature dependence, slow degradation\n", "**Common in**: gyros, barometers, thermal sensors\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_drift`." ] }, { "cell_type": "code", "execution_count": null, "id": "2932312d", "metadata": {}, "outputs": [], "source": [ "T_drift = np.linspace(0, 10, 1000)\n", "Y_const_drift = []\n", "Y_drifted_linear = []\n", "Y_drifted_exp = []\n", "\n", "dt = T_drift[1] - T_drift[0] # Sample period\n", "drift_rate_linear = 0.1\n", "drift_rate_exp = 0.05\n", "\n", "for k, tk in enumerate(T_drift):\n", " yk = ref_constant.step(params={\"c\": 1})\n", "\n", " # Apply linear drift in the loop\n", " yk_drift_linear = yk + drift_rate_linear * k * dt\n", "\n", " # Apply exponential drift in the loop\n", " yk_drift_exp = yk + drift_rate_exp * (1 - np.exp(-k * dt))\n", "\n", " Y_const_drift.append(yk)\n", " Y_drifted_linear.append(yk_drift_linear)\n", " Y_drifted_exp.append(yk_drift_exp)\n", "\n", "# Convert to arrays\n", "constant_signal = np.array(Y_const_drift)\n", "drifted_constant_linear = np.array(Y_drifted_linear)\n", "drifted_constant_exp = np.array(Y_drifted_exp)" ] }, { "cell_type": "code", "execution_count": null, "id": "80fd2721", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_drift\n", "\n", "fig, axs = plt.subplots(1, 2, figsize=(14, 5))\n", "\n", "axs[0].plot(t, constant_signal, \"b-\", label=\"Clean Constant\", linewidth=2, alpha=0.7)\n", "axs[0].plot(t, drifted_constant_linear, \"r-\", label=\"With Linear Drift\", linewidth=2)\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Linear Drift (sensor warm-up)\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "axs[1].plot(t, constant_signal, \"b-\", label=\"Clean Constant\", linewidth=2, alpha=0.7)\n", "axs[1].plot(t, drifted_constant_exp, \"g-\", label=\"With Exponential Drift\", linewidth=2)\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Exponential Drift (thermal effects)\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "112f340a", "metadata": {}, "source": [ "### Quantization (ADC Resolution Limits)\n", "\n", "Analog-to-digital converters have finite resolution, mapping continuous voltages to discrete levels:\n", "\n", "$$\n", "\\tilde{y}_k = \\Delta \\cdot \\left\\lfloor \\frac{y_k}{\\Delta} + 0.5 \\right\\rfloor\n", "$$\n", "\n", "where $\\Delta = (y_{\\max} - y_{\\min}) / (L - 1)$ is the quantization step and $L$ is the number of levels ($L = 2^n$ for an $n$-bit ADC). This introduces quantization error $|y_k - \\tilde{y}_k| \\leq \\Delta/2$.\n", "\n", "**Hardware source**: finite-bit ADCs\n", "**Common in**: essentially all analog sensors\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_quantization`." ] }, { "cell_type": "code", "execution_count": null, "id": "650840af", "metadata": {}, "outputs": [], "source": [ "T_quant = np.linspace(0, 2, 200)\n", "Y_sine_quant = []\n", "Y_quant_8bit = []\n", "Y_quant_4bit = []\n", "\n", "for tk in T_quant:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply 8-bit quantization in the loop (256 levels)\n", " y_min, y_max = -1.0, 1.0\n", " delta_8 = (y_max - y_min) / (256 - 1)\n", " yk_8bit = delta_8 * np.floor((yk - y_min) / delta_8 + 0.5) + y_min\n", "\n", " # Apply 4-bit quantization in the loop (16 levels)\n", " delta_4 = (y_max - y_min) / (16 - 1)\n", " yk_4bit = delta_4 * np.floor((yk - y_min) / delta_4 + 0.5) + y_min\n", "\n", " Y_sine_quant.append(yk)\n", " Y_quant_8bit.append(yk_8bit)\n", " Y_quant_4bit.append(yk_4bit)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_quant)\n", "quantized_sine_8bit = np.array(Y_quant_8bit)\n", "quantized_sine_4bit = np.array(Y_quant_4bit)" ] }, { "cell_type": "code", "execution_count": null, "id": "ea040001", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_quant\n", "\n", "fig, axs = plt.subplots(1, 2, figsize=(14, 5))\n", "\n", "axs[0].plot(\n", " t,\n", " sine_wave,\n", " \"b-\",\n", " label=\"Clean (infinite resolution)\",\n", " linewidth=2,\n", " alpha=0.7,\n", ")\n", "axs[0].plot(t, quantized_sine_8bit, \"r-\", label=\"8-bit (256 levels)\", linewidth=1.5)\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"8-bit ADC Quantization\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "axs[1].plot(\n", " t,\n", " sine_wave,\n", " \"b-\",\n", " label=\"Clean (infinite resolution)\",\n", " linewidth=2,\n", " alpha=0.7,\n", ")\n", "axs[1].plot(t, quantized_sine_4bit, \"g-\", label=\"4-bit (16 levels)\", linewidth=1.5)\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"4-bit ADC Quantization (Severe)\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Unique values in clean signal: {len(np.unique(sine_wave))}\")\n", "print(f\"Unique values in 8-bit signal: {len(np.unique(quantized_sine_8bit))}\")\n", "print(f\"Unique values in 4-bit signal: {len(np.unique(quantized_sine_4bit))}\")" ] }, { "cell_type": "markdown", "id": "9f562490", "metadata": {}, "source": [ "### Spikes / Outliers\n", "\n", "Electromagnetic interference and electrical transients cause sporadic large-magnitude errors:\n", "\n", "$$\n", "\\tilde{y}_k = \\begin{cases}\n", "y_k + A \\cdot \\text{randn}() & \\text{with probability } p \\\\\n", "y_k & \\text{with probability } (1-p)\n", "\\end{cases}\n", "$$\n", "\n", "where $p$ is the spike rate and $A$ is the spike magnitude. Unlike Gaussian noise which affects every sample, spikes are rare but severe deviations.\n", "\n", "**Hardware source**: EMI, electrical transients, sensor glitches\n", "**Common in**: unshielded sensors, motors switching, noisy power rails\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_spikes`." ] }, { "cell_type": "code", "execution_count": null, "id": "4fe9492a", "metadata": {}, "outputs": [], "source": [ "T_spike = np.linspace(0, 10, 1000)\n", "Y_sine_spike = []\n", "Y_spiked = []\n", "\n", "np.random.seed(42)\n", "spike_rate = 0.02\n", "spike_magnitude = 3.0\n", "\n", "for tk in T_spike:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply spikes in the loop (probabilistic per-sample)\n", " if np.random.rand() < spike_rate:\n", " yk_spiked = yk + spike_magnitude * np.random.randn()\n", " else:\n", " yk_spiked = yk\n", "\n", " Y_sine_spike.append(yk)\n", " Y_spiked.append(yk_spiked)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_spike)\n", "spiked_sine = np.array(Y_spiked)" ] }, { "cell_type": "code", "execution_count": null, "id": "6c76e075", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_spike\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Clean Signal\", linewidth=2, alpha=0.7)\n", "plt.plot(\n", " t, spiked_sine, \"r-\", label=\"With Spikes (2% rate, 3× magnitude)\", linewidth=1.5\n", ")\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"EMI Spikes / Outliers\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "spike_mask = np.abs(spiked_sine - sine_wave) > 0.5\n", "print(f\"Number of spikes: {spike_mask.sum()}\")\n", "print(f\"Spike rate: {spike_mask.sum() / len(sine_wave):.2%}\")" ] }, { "cell_type": "markdown", "id": "7258bbd6", "metadata": {}, "source": [ "#### Median Filter\n", "\n", "The median filter replaces each sample with the median of its neighbors, providing robust outlier rejection:\n", "\n", "$$\n", "\\hat{y}_k = \\text{median}\\{\\tilde{y}_{k-w/2}, \\ldots, \\tilde{y}_k, \\ldots, \\tilde{y}_{k+w/2}\\}\n", "$$\n", "\n", "where $w$ is the window size. Unlike mean-based filters, the median is insensitive to outliers (up to 50% contamination) and preserves edges better than moving average.\n", "\n", "**Best for**: removing spikes/outliers while preserving edges\n", "\n", "API reference: {func}`pykal.data_change.clean.with_median_filter`." ] }, { "cell_type": "code", "execution_count": null, "id": "990a2df2", "metadata": {}, "outputs": [], "source": [ "T_median = np.linspace(0, 10, 1000)\n", "Y_sine_median = []\n", "\n", "for tk in T_median:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " Y_sine_median.append(yk)\n", "\n", "# Apply spikes and median filter\n", "sine_wave = np.array(Y_sine_median)\n", "spiked_sine_2 = corrupt.with_spikes(\n", " sine_wave, spike_rate=0.03, spike_magnitude=4.0, seed=44\n", ")\n", "cleaned_median = clean.with_median_filter(spiked_sine_2, window=5)" ] }, { "cell_type": "code", "execution_count": null, "id": "8ec19275", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_median\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Original Clean Signal\", linewidth=2, alpha=0.5)\n", "plt.plot(\n", " t, spiked_sine_2, \"gray\", label=\"Corrupted (with spikes)\", linewidth=0.5, alpha=0.5\n", ")\n", "plt.plot(t, cleaned_median, \"r-\", label=\"Prepared (Median, window=5)\", linewidth=2)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Median Filter: Spike Removal\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Recovery error (RMS): {np.sqrt(np.mean((cleaned_median - sine_wave)**2)):.4f}\")" ] }, { "cell_type": "markdown", "id": "fea3030d", "metadata": {}, "source": [ "#### Outlier Removal (Z-Score)\n", "\n", "Statistical outlier detection flags samples exceeding a threshold number of standard deviations from the mean:\n", "\n", "$$\n", "\\text{outlier}_k = |\\tilde{y}_k - \\mu| > \\lambda \\sigma\n", "$$\n", "\n", "where $\\mu$ and $\\sigma$ are robust estimates (median and MAD) and $\\lambda$ is the threshold (typically 2.5-3.0).\n", "\n", "**Strategies**:\n", "- **Replace**: substitute with $\\mu$\n", "- **Interpolate**: fill using neighboring valid samples\n", "\n", "**Best for**: detecting statistical outliers\n", "**Strategies**: replace with robust statistic or interpolate neighbors\n", "\n", "API reference: {func}`pykal.data_change.clean.with_outlier_removal`." ] }, { "cell_type": "code", "execution_count": null, "id": "68a495f3", "metadata": {}, "outputs": [], "source": [ "T_outlier = np.linspace(0, 10, 1000)\n", "Y_sine_outlier = []\n", "\n", "for tk in T_outlier:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " Y_sine_outlier.append(yk)\n", "\n", "# Apply spikes and outlier removal\n", "sine_wave = np.array(Y_sine_outlier)\n", "spiked_sine_3 = corrupt.with_spikes(\n", " sine_wave, spike_rate=0.05, spike_magnitude=5.0, seed=47\n", ")\n", "cleaned_replace = clean.with_outlier_removal(\n", " spiked_sine_3, threshold=2.5, method=\"replace\"\n", ")\n", "cleaned_interp = clean.with_outlier_removal(\n", " spiked_sine_3, threshold=2.5, method=\"interpolate\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "9afd9fee", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_outlier\n", "\n", "fig, axs = plt.subplots(1, 2, figsize=(14, 5))\n", "\n", "axs[0].plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "axs[0].plot(\n", " t, spiked_sine_3, \"gray\", label=\"Corrupted (outliers)\", linewidth=0.5, alpha=0.5\n", ")\n", "axs[0].plot(t, cleaned_replace, \"r-\", label=\"Prepared (replace)\", linewidth=1.5)\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Outlier Removal: Replace\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "axs[1].plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "axs[1].plot(\n", " t, spiked_sine_3, \"gray\", label=\"Corrupted (outliers)\", linewidth=0.5, alpha=0.5\n", ")\n", "axs[1].plot(t, cleaned_interp, \"g-\", label=\"Prepared (interpolate)\", linewidth=1.5)\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Outlier Removal: Interpolate\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "a07cf55c", "metadata": {}, "source": [ "### Saturation / Clipping\n", "\n", "Sensors and amplifiers have finite measurement ranges, causing large signals to saturate:\n", "\n", "$$\n", "\\tilde{y}_k = \\begin{cases}\n", "y_{\\min} & \\text{if } y_k < y_{\\min} \\\\\n", "y_k & \\text{if } y_{\\min} \\leq y_k \\leq y_{\\max} \\\\\n", "y_{\\max} & \\text{if } y_k > y_{\\max}\n", "\\end{cases}\n", "$$\n", "\n", "Clipped measurements provide no information about the true signal magnitude beyond the saturation limit, making them unsuitable for control or estimation.\n", "\n", "**Hardware source**: sensor range limits, amplifier saturation\n", "**Common in**: force sensors, ADC rails, mechanical stops\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_clipping`." ] }, { "cell_type": "code", "execution_count": null, "id": "cc0290d7", "metadata": {}, "outputs": [], "source": [ "T_clip = np.linspace(0, 10, 1000)\n", "Y_sine_clip = []\n", "Y_large = []\n", "Y_clipped = []\n", "\n", "lower = -1.0\n", "upper = 1.0\n", "\n", "for tk in T_clip:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Scale signal to exceed limits\n", " yk_large = yk * 2.0\n", "\n", " # Apply clipping in the loop\n", " yk_clipped = np.clip(yk_large, lower, upper)\n", "\n", " Y_sine_clip.append(yk)\n", " Y_large.append(yk_large)\n", " Y_clipped.append(yk_clipped)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_clip)\n", "large_sine = np.array(Y_large)\n", "clipped_sine = np.array(Y_clipped)" ] }, { "cell_type": "code", "execution_count": null, "id": "f3763264", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_clip\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(\n", " t, large_sine, \"b-\", label=\"Clean Signal (exceeds limits)\", linewidth=2, alpha=0.7\n", ")\n", "plt.plot(t, clipped_sine, \"r-\", label=\"Clipped to [-1, 1]\", linewidth=2)\n", "plt.axhline(y=1.0, color=\"k\", linestyle=\"--\", alpha=0.5, label=\"Saturation Limits\")\n", "plt.axhline(y=-1.0, color=\"k\", linestyle=\"--\", alpha=0.5)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Sensor Saturation / Clipping\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "saturated_samples = np.sum((np.abs(clipped_sine) >= 0.99))\n", "print(\n", " f\"Saturated samples: {saturated_samples} ({saturated_samples/len(clipped_sine):.1%})\"\n", ")" ] }, { "cell_type": "markdown", "id": "dbefd37d", "metadata": {}, "source": [ "#### Clipping Recovery\n", "\n", "Clipping recovery detects saturated values and marks them as invalid (NaN):\n", "\n", "$$\n", "\\hat{y}_k = \\begin{cases}\n", "\\text{NaN} & \\text{if } |\\tilde{y}_k - y_{\\min}| < \\epsilon \\text{ or } |\\tilde{y}_k - y_{\\max}| < \\epsilon \\\\\n", "\\tilde{y}_k & \\text{otherwise}\n", "\\end{cases}\n", "$$\n", "\n", "where $\\epsilon$ is a small tolerance. This prevents saturated measurements from corrupting estimators or controllers. Often the right move is not to \"recover\" clipped values, but to **refuse to trust them**.\n", "\n", "**Best for**: detecting saturated values and marking them invalid for estimators\n", "\n", "API reference: {func}`pykal.data_change.clean.with_clipping_recovery`." ] }, { "cell_type": "code", "execution_count": null, "id": "b6faa26a", "metadata": {}, "outputs": [], "source": [ "T_clip_rec = np.linspace(0, 10, 1000)\n", "Y_sine_clip_rec = []\n", "Y_large_2 = []\n", "Y_clipped_2 = []\n", "Y_recovered = []\n", "\n", "lower = -1.0\n", "upper = 1.0\n", "epsilon = 0.01 # Tolerance for clipping detection\n", "\n", "for tk in T_clip_rec:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Scale signal to exceed limits\n", " yk_large = yk * 2.5\n", "\n", " # Apply clipping\n", " yk_clipped = np.clip(yk_large, lower, upper)\n", "\n", " # Apply clipping recovery in the loop\n", " if abs(yk_clipped - lower) < epsilon or abs(yk_clipped - upper) < epsilon:\n", " yk_recovered = np.nan # Mark as invalid\n", " else:\n", " yk_recovered = yk_clipped\n", "\n", " Y_sine_clip_rec.append(yk)\n", " Y_large_2.append(yk_large)\n", " Y_clipped_2.append(yk_clipped)\n", " Y_recovered.append(yk_recovered)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_clip_rec)\n", "large_sine_2 = np.array(Y_large_2)\n", "clipped_sine_2 = np.array(Y_clipped_2)\n", "recovered = np.array(Y_recovered)" ] }, { "cell_type": "code", "execution_count": null, "id": "5d749099", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_clip_rec\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(\n", " t, large_sine_2, \"b-\", label=\"True Signal (exceeds limits)\", linewidth=2, alpha=0.5\n", ")\n", "plt.plot(t, clipped_sine_2, \"r-\", label=\"Clipped Signal\", linewidth=2, alpha=0.7)\n", "plt.plot(t, recovered, \"g.\", label=\"Valid Data (clipped → NaN)\", markersize=2)\n", "plt.axhline(y=1.0, color=\"k\", linestyle=\"--\", alpha=0.5, label=\"Saturation Limits\")\n", "plt.axhline(y=-1.0, color=\"k\", linestyle=\"--\", alpha=0.5)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Clipping Detection and Recovery\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Total samples: {len(clipped_sine_2)}\")\n", "print(f\"Clipped samples detected: {np.isnan(recovered).sum()}\")\n", "print(f\"Valid samples remaining: {(~np.isnan(recovered)).sum()}\")" ] }, { "cell_type": "markdown", "id": "f6daa308", "metadata": {}, "source": [ "### Time Delay / Latency\n", "\n", "Communication and processing introduce delays between measurement and availability:\n", "\n", "$$\n", "\\tilde{y}_k = y_{k-d}\n", "$$\n", "\n", "where $d$ is the delay in samples. In continuous time, this is $\\tilde{y}(t) = y(t - \\tau)$ where $\\tau = d \\cdot \\Delta t$. Delays cause phase lag and can destabilize feedback control systems if not compensated.\n", "\n", "**Hardware source**: comms lag, processing delay, slow sensors\n", "**Common in**: networked sensors, cameras, heavy filtering pipelines\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_delay`." ] }, { "cell_type": "code", "execution_count": null, "id": "08c62c0e", "metadata": {}, "outputs": [], "source": [ "T_delay = np.linspace(0, 4, 400)\n", "Y_step_delay = []\n", "\n", "for tk in T_delay:\n", " yk = 0.0 if tk < 2.0 else 1.0\n", " Y_step_delay.append(yk)\n", "\n", "# Apply delay\n", "step_signal = np.array(Y_step_delay)\n", "delayed_step = corrupt.with_delay(step_signal, delay=50, fill_value=0.0)" ] }, { "cell_type": "code", "execution_count": null, "id": "4c895e45", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_delay\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, step_signal, \"b-\", label=\"Clean Signal\", linewidth=2, alpha=0.7)\n", "plt.plot(t, delayed_step, \"r-\", label=\"Delayed by 50 samples\", linewidth=2)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Time Delay / Latency\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "86e849c0", "metadata": {}, "source": [ "### Realistic Hardware Pipelines\n", "\n", "The point of `data_change` is not to apply one operator at a time—it's to build pipelines that match real sensors." ] }, { "cell_type": "markdown", "id": "4a0e82d8", "metadata": {}, "source": [ "### Scenario 1: Noisy IMU Accelerometer\n", "\n", "**Issues**: bias + Gaussian noise + occasional EMI spikes\n", "**Pipeline**: calibration → median filter → low-pass filter" ] }, { "cell_type": "code", "execution_count": null, "id": "3c80352b", "metadata": {}, "outputs": [], "source": [ "T_imu = np.linspace(0, 10, 1000)\n", "Y_imu_true = []\n", "Y_imu_raw = []\n", "Y_imu_step1 = []\n", "\n", "# Set random seed for reproducibility\n", "np.random.seed(51)\n", "\n", "# Parameters\n", "bias = 0.5\n", "noise_std = 0.15\n", "spike_rate = 0.01\n", "spike_magnitude = 2.0\n", "calib_offset = 0.5\n", "calib_scale = 1.0\n", "\n", "# Generate clean signal, apply per-sample corruptions, and apply calibration in loop\n", "for tk in T_imu:\n", " # Generate clean signal\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply bias in the loop\n", " yk_biased = yk + bias\n", "\n", " # Apply Gaussian noise in the loop\n", " yk_noisy = yk_biased + np.random.randn() * noise_std\n", "\n", " # Apply spikes in the loop\n", " if np.random.rand() < spike_rate:\n", " yk_raw = yk_noisy + spike_magnitude * np.random.randn()\n", " else:\n", " yk_raw = yk_noisy\n", "\n", " # Apply calibration in the loop\n", " yk_calibrated = calib_scale * (yk_raw - calib_offset)\n", "\n", " Y_imu_true.append(yk)\n", " Y_imu_raw.append(yk_raw)\n", " Y_imu_step1.append(yk_calibrated)\n", "\n", "# Convert to arrays for array-based operations\n", "imu_true = np.array(Y_imu_true)\n", "imu_raw = np.array(Y_imu_raw)\n", "imu_step1 = np.array(Y_imu_step1)\n", "\n", "# Apply median filter (requires window, must be array-based)\n", "imu_step2 = clean.with_median_filter(imu_step1, window=3)\n", "\n", "# Apply low-pass filter in a second loop (recursive operation)\n", "Y_imu_clean = []\n", "y_prev = imu_step2[0] # Initialize with first value\n", "alpha = 0.15\n", "\n", "for yk in imu_step2:\n", " y_filtered = alpha * yk + (1 - alpha) * y_prev\n", " y_prev = y_filtered\n", " Y_imu_clean.append(y_filtered)\n", "\n", "imu_clean = np.array(Y_imu_clean)" ] }, { "cell_type": "code", "execution_count": null, "id": "9225853c", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_imu\n", "fig, axs = plt.subplots(2, 2, figsize=(14, 10))\n", "\n", "axs[0, 0].plot(t, imu_true, \"b-\", label=\"True Signal\", linewidth=2)\n", "axs[0, 0].set_title(\"1. True IMU Reading\", fontsize=12, fontweight=\"bold\")\n", "axs[0, 0].legend()\n", "axs[0, 0].grid(True, alpha=0.3)\n", "\n", "axs[0, 1].plot(\n", " t, imu_raw, \"r-\", label=\"Raw (bias+noise+spikes)\", linewidth=1, alpha=0.7\n", ")\n", "axs[0, 1].set_title(\"2. Raw Corrupted Sensor\", fontsize=12, fontweight=\"bold\")\n", "axs[0, 1].legend()\n", "axs[0, 1].grid(True, alpha=0.3)\n", "\n", "axs[1, 0].plot(\n", " t,\n", " imu_step2,\n", " \"g-\",\n", " label=\"After calibration + spike removal\",\n", " linewidth=1.5,\n", " alpha=0.8,\n", ")\n", "axs[1, 0].set_title(\"3. Intermediate (spikes removed)\", fontsize=12, fontweight=\"bold\")\n", "axs[1, 0].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1, 0].legend()\n", "axs[1, 0].grid(True, alpha=0.3)\n", "\n", "axs[1, 1].plot(t, imu_true, \"b-\", label=\"True Signal\", linewidth=2, alpha=0.5)\n", "axs[1, 1].plot(t, imu_clean, \"m-\", label=\"Final Prepared\", linewidth=2)\n", "axs[1, 1].set_title(\"4. Final Prepared Signal\", fontsize=12, fontweight=\"bold\")\n", "axs[1, 1].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1, 1].legend()\n", "axs[1, 1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Raw signal RMS error: {np.sqrt(np.mean((imu_raw - imu_true)**2)):.4f}\")\n", "print(f\"Prepared signal RMS error: {np.sqrt(np.mean((imu_clean - imu_true)**2)):.4f}\")" ] }, { "cell_type": "markdown", "id": "d213440f", "metadata": {}, "source": [ "### Scenario 2: Bouncing Rotary Encoder\n", "\n", "**Issues**: bounce at transitions\n", "**Pipeline**: debounce" ] }, { "cell_type": "code", "execution_count": null, "id": "0e8acc18", "metadata": {}, "outputs": [], "source": [ "T_encoder = np.linspace(0, 4, 400)\n", "Y_encoder_true = []\n", "\n", "for tk in T_encoder:\n", " yk = 0.0 if tk < 2.0 else 1.0\n", " Y_encoder_true.append(yk)\n", "\n", "# Apply encoder corruptions and cleaning\n", "encoder_true = np.array(Y_encoder_true)\n", "encoder_raw = corrupt.with_bounce(encoder_true, duration=10, amplitude=0.4, seed=53)\n", "encoder_clean = clean.with_debounce(encoder_raw, threshold=0.2, min_duration=4)" ] }, { "cell_type": "code", "execution_count": null, "id": "b3ff350e", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_encoder\n", "fig, axs = plt.subplots(3, 1, figsize=(12, 10))\n", "\n", "axs[0].plot(t, encoder_true, \"b-\", label=\"True Encoder State\", linewidth=2)\n", "axs[0].set_title(\"1. True Encoder Signal\", fontsize=12, fontweight=\"bold\")\n", "axs[0].set_ylabel(\"State\", fontsize=11)\n", "axs[0].legend()\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "axs[1].plot(t, encoder_raw, \"r-\", label=\"Raw with Bounce\", linewidth=1.5)\n", "axs[1].set_title(\"2. Corrupted (Contact Bounce)\", fontsize=12, fontweight=\"bold\")\n", "axs[1].set_ylabel(\"State\", fontsize=11)\n", "axs[1].legend()\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "axs[2].plot(t, encoder_true, \"b-\", label=\"True\", linewidth=2, alpha=0.5)\n", "axs[2].plot(t, encoder_clean, \"g-\", label=\"Prepared (Debounced)\", linewidth=2)\n", "axs[2].set_title(\"3. Prepared (Debounced)\", fontsize=12, fontweight=\"bold\")\n", "axs[2].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[2].set_ylabel(\"State\", fontsize=11)\n", "axs[2].legend()\n", "axs[2].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "65354439", "metadata": {}, "source": [ "### Scenario 3: Wireless Sensor with Packet Loss\n", "\n", "**Issues**: missing samples (NaN)\n", "**Pipelines**:\n", "- offline reconstruction: interpolation\n", "- real-time robustness: hold staleness policy" ] }, { "cell_type": "code", "execution_count": null, "id": "e3888731", "metadata": {}, "outputs": [], "source": [ "T_wireless = np.linspace(0, 10, 1000)\n", "Y_wireless_true = []\n", "Y_wireless_raw = []\n", "\n", "# Set random seed for reproducibility\n", "np.random.seed(54)\n", "\n", "# Parameters\n", "dropout_rate = 0.25\n", "\n", "# Generate clean signal and apply dropouts in the loop\n", "for tk in T_wireless:\n", " # Generate clean signal\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply dropout in the loop (probabilistic per-sample)\n", " if np.random.rand() < dropout_rate:\n", " yk_dropped = np.nan\n", " else:\n", " yk_dropped = yk\n", "\n", " Y_wireless_true.append(yk)\n", " Y_wireless_raw.append(yk_dropped)\n", "\n", "# Convert to arrays\n", "wireless_true = np.array(Y_wireless_true)\n", "wireless_raw = np.array(Y_wireless_raw)\n", "\n", "# Apply interpolation (requires neighbors, must be array-based)\n", "wireless_interpolated = clean.with_interpolation(wireless_raw, method=\"linear\")\n", "\n", "# Apply staleness policy (hold) in a second loop\n", "Y_wireless_hold = []\n", "y_prev_hold = 0.0 # Initialize hold state\n", "\n", "for yk in wireless_raw:\n", " # Hold policy: forward-fill last valid value\n", " if np.isnan(yk):\n", " yk_hold = y_prev_hold\n", " else:\n", " yk_hold = yk\n", " y_prev_hold = yk_hold\n", "\n", " Y_wireless_hold.append(yk_hold)\n", "\n", "wireless_hold = np.array(Y_wireless_hold)" ] }, { "cell_type": "code", "execution_count": null, "id": "051e7614", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_wireless\n", "fig, axs = plt.subplots(3, 1, figsize=(12, 10))\n", "\n", "axs[0].plot(t, wireless_true, \"b-\", label=\"True Signal\", linewidth=2)\n", "axs[0].set_title(\"1. True Sensor Reading\", fontsize=12, fontweight=\"bold\")\n", "axs[0].legend()\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "axs[1].plot(t, wireless_raw, \"r.\", label=\"Received (25% packet loss)\", markersize=3)\n", "axs[1].set_title(\"2. Corrupted (Wireless Dropouts)\", fontsize=12, fontweight=\"bold\")\n", "axs[1].legend()\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "axs[2].plot(t, wireless_true, \"b-\", label=\"True\", linewidth=2, alpha=0.3)\n", "axs[2].plot(\n", " t,\n", " wireless_interpolated,\n", " \"g-\",\n", " label=\"Prepared (interpolated)\",\n", " linewidth=1.5,\n", " alpha=0.8,\n", ")\n", "axs[2].plot(\n", " t, wireless_hold, \"m--\", label=\"Prepared (hold policy)\", linewidth=1.5, alpha=0.8\n", ")\n", "axs[2].set_title(\"3. Prepared (Two Strategies)\", fontsize=12, fontweight=\"bold\")\n", "axs[2].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[2].legend()\n", "axs[2].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(\n", " f\"Interpolated RMS error: {np.sqrt(np.mean((wireless_interpolated - wireless_true)**2)):.4f}\"\n", ")\n", "print(\n", " f\"Hold policy RMS error: {np.sqrt(np.mean((wireless_hold - wireless_true)**2)):.4f}\"\n", ")" ] }, { "cell_type": "markdown", "id": "5d4096f1", "metadata": {}, "source": [ "[← Modules](../../../getting_started/theory_to_python/modules.rst)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }