{ "cells": [ { "cell_type": "markdown", "id": "30d19eb0", "metadata": {}, "source": [ "[← Modules](../../../getting_started/theory_to_python/modules.rst)" ] }, { "cell_type": "markdown", "id": "120f94dd", "metadata": {}, "source": [ "# DataChange\n", "\n", "The `data_change` module provides tools to model realistic sensor data. In particular, it allows you to:\n", "\n", "- **Corrupt ideal signals** to simulate realistic hardware behavior via the `data_change.corrupt` subclass\n", "- **Clean corrupted signals** for downstream use in estimators and controllers via the `data_change.clean` subclass\n", "\n", "This notebook demonstrates all corruption and preparation methods available in `pykal.data_change`, using visual examples to illustrate how each transformation affects the underlying signal." ] }, { "cell_type": "markdown", "id": "706cdbea", "metadata": {}, "source": [ "## References and Examples\n", "\n", " We first generate a representative set of reference signals using the `DynamicalSystem` class. In the remaining sections, we demonstrate corrupting these signals via\n", "\n", "- **Gaussian noise** (thermal noise / ADC noise)\n", "- **Bias offsets** (uncalibrated zero errors)\n", "- **Drift** (time-varying bias from warm-up / temperature)\n", "- **Quantization** (finite ADC resolution)\n", "- **Spikes / outliers** (EMI glitches, electrical transients)\n", "- **Clipping / saturation** (sensor range limits)\n", "- **Packet loss / dropouts** (missing samples, intermittent links)\n", "- **Contact bounce** (encoders, switches, digital edges)\n", "- **Latency / delay** (communication + processing lag)\n", "\n", "and then, when cleaning operations are available, demonstrating how to clean the corrupted signals, using such methods as\n", "\n", "- **moving average / low-pass filtering** (noise suppression)\n", "- **median filtering** (outlier rejection without blurring edges)\n", "- **exponential smoothing** (real-time filtering)\n", "- **debouncing** (stable digital transitions)\n", "- **outlier handling** (replace or interpolate)\n", "- **interpolation** (filling missing samples)\n", "- **staleness policies** for **asynchronous sensors** (hold/zero/drop/none)\n", "- **calibration** (bias + scale correction)" ] }, { "cell_type": "markdown", "id": "463b736e", "metadata": {}, "source": [ "### Generate Reference Signals" ] }, { "cell_type": "code", "execution_count": null, "id": "8ddbefc5", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from pykal import DynamicalSystem\n", "\n", "# -----------------------------\n", "# Reference signals (stateless)\n", "# -----------------------------\n", "\n", "\n", "def sine_wave(tk: float, f_hz: float, amplitude: float, phase: float) -> float:\n", " \"\"\"y(t) = A sin(2π f t + φ)\"\"\"\n", " return amplitude * np.sin(2.0 * np.pi * f_hz * tk + phase)\n", "\n", "\n", "def ramp(tk: float, t0: float, t1: float, y0: float, y1: float) -> float:\n", " \"\"\"Linear ramp from (t0, y0) to (t1, y1), clamped outside interval.\"\"\"\n", " if tk <= t0:\n", " return y0\n", " if tk >= t1:\n", " return y1\n", " s = (tk - t0) / (t1 - t0)\n", " return (1.0 - s) * y0 + s * y1\n", "\n", "\n", "def constant(c: float) -> float:\n", " \"\"\"y(t) = c\"\"\"\n", " return c\n", "\n", "\n", "def pwm(tk: float, f_hz: float, duty: float, low: float, high: float) -> float:\n", " \"\"\"\n", " Pulse-width modulation (PWM).\n", " \"\"\"\n", " T = 1.0 / f_hz\n", " phase = tk % T\n", " return high if phase < duty * T else low\n", "\n", "\n", "# Wrapping signals in DynamicalSystem objects\n", "ref_sine_wave = DynamicalSystem(h=sine_wave)\n", "ref_constant = DynamicalSystem(h=constant)\n", "ref_ramp = DynamicalSystem(h=ramp)\n", "ref_pwm = DynamicalSystem(h=pwm)\n", "\n", "# Simulating signals over time\n", "\n", "T = np.linspace(0, 10, 1000)\n", "\n", "Y_sin = []\n", "Y_const = []\n", "Y_ramp = []\n", "Y_pwm = []\n", "\n", "for tk in T:\n", " Y_sin.append(\n", " ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " )\n", " Y_const.append(ref_constant.step(params={\"c\": 1}))\n", " Y_ramp.append(\n", " ref_ramp.step(params={\"tk\": tk, \"t0\": 2.0, \"t1\": 8.0, \"y0\": 0.0, \"y1\": 1.0})\n", " )\n", " Y_pwm.append(\n", " ref_pwm.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"duty\": 0.5, \"low\": 0.0, \"high\": 1.0}\n", " )\n", " )" ] }, { "cell_type": "code", "execution_count": null, "id": "370467eb", "metadata": { "lines_to_next_cell": 0, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# -----------------------------\n", "# Visualize signals\n", "# -----------------------------\n", "fig, axs = plt.subplots(2, 2, figsize=(14, 8))\n", "\n", "axs[0, 0].plot(T, Y_sin, linewidth=2)\n", "axs[0, 0].set_title(\"Sine Wave\", fontweight=\"bold\")\n", "axs[0, 0].set_ylabel(\"Amplitude\")\n", "axs[0, 0].grid(True, alpha=0.3)\n", "\n", "axs[0, 1].plot(T, Y_pwm, linewidth=2)\n", "axs[0, 1].set_title(\"PWM Signal\", fontweight=\"bold\")\n", "axs[0, 1].grid(True, alpha=0.3)\n", "\n", "axs[1, 0].plot(T, Y_ramp, linewidth=2)\n", "axs[1, 0].set_title(\"Ramp Signal\", fontweight=\"bold\")\n", "axs[1, 0].set_xlabel(\"Time (s)\")\n", "axs[1, 0].set_ylabel(\"Amplitude\")\n", "axs[1, 0].grid(True, alpha=0.3)\n", "\n", "axs[1, 1].plot(T, Y_const, linewidth=2)\n", "axs[1, 1].set_title(\"Constant Signal\", fontweight=\"bold\")\n", "axs[1, 1].set_xlabel(\"Time (s)\")\n", "axs[1, 1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "a1a5d3d0", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Gaussian Noise\n", "Thanks to the [Central Limit Theorem](https://en.wikipedia.org/wiki/Central_limit_theorem#Applications_and_examples), sensor noise can often be modeled as the true sensor value plus zero-mean Gaussian noise. That is, the noisy output of the sensor, which we denote $\\tilde{y}_k$, is\n", "\n", "$$\n", "\\tilde{y}_k = y_k + r_k\n", "$$\n", "\n", "where $r_k \\sim \\mathcal{N}\\bigl(0,\\, \\sigma \\bigr)$.\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_gaussian_noise`." ] }, { "cell_type": "code", "execution_count": null, "id": "1a722e6e", "metadata": {}, "outputs": [], "source": [ "from pykal.data_change import corrupt, clean\n", "\n", "Y_clean_sin = []\n", "Y_noisy_sin = []\n", "\n", "for tk in T:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_corrupted = corrupt.with_gaussian_noise(yk, mean=0.0, std=0.2)\n", "\n", " Y_clean_sin.append(yk)\n", " Y_noisy_sin.append(yk_corrupted)" ] }, { "cell_type": "code", "execution_count": null, "id": "99793fc1", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Convert to NumPy arrays\n", "Y_clean_sin = np.asarray(Y_clean_sin)\n", "Y_noisy_sin = np.asarray(Y_noisy_sin)\n", "\n", "# -----------------------------\n", "# Plot clean vs noisy\n", "# -----------------------------\n", "plt.figure(figsize=(12, 5))\n", "\n", "plt.plot(\n", " T,\n", " Y_clean_sin,\n", " \"b-\",\n", " label=\"Clean Signal\",\n", " linewidth=2,\n", " alpha=0.7,\n", ")\n", "\n", "plt.plot(\n", " T,\n", " Y_noisy_sin,\n", " \"r-\",\n", " label=r\"With Gaussian Noise ($\\sigma=0.2$)\",\n", " linewidth=1,\n", " alpha=0.6,\n", ")\n", "\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Gaussian Noise Corruption\", fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "# -----------------------------\n", "# Noise statistics\n", "# -----------------------------\n", "print(f\"Original std: {np.std(Y_clean_sin):.4f}\")\n", "print(f\"Corrupted std: {np.std(Y_noisy_sin):.4f}\")\n", "print(f\"Noise added: {np.std(Y_noisy_sin - Y_clean_sin):.4f}\")" ] }, { "cell_type": "markdown", "id": "4ec3cc60", "metadata": {}, "source": [ "**Multivariate Gaussian Noise** (with covariance matrix):\n", "\n", "For vector-valued sensors (e.g., 3-axis IMU), noise is often correlated across channels. The noisy measurement is\n", "\n", "$$\n", "\\tilde{\\mathbf{y}}_k = \\mathbf{y}_k + \\mathbf{r}_k\n", "$$\n", "\n", "where $\\mathbf{r}_k \\sim \\mathcal{N}(\\mathbf{0}, \\mathbf{Q})$ and $\\mathbf{Q}$ is the covariance matrix capturing both individual channel noise levels and cross-channel correlations.\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_gaussian_noise` (with `Q` parameter)." ] }, { "cell_type": "code", "execution_count": null, "id": "2caaa09d", "metadata": {}, "outputs": [], "source": [ "# 3D IMU example: accelerometer with different noise per axis\n", "# Covariance: x low noise, y medium, z high (with small correlations)\n", "Q = np.array([[0.01, 0.002, 0.0], [0.002, 0.05, 0.01], [0.0, 0.01, 0.1]])\n", "\n", "# Generate 100 samples from three reference signals\n", "T_imu = np.linspace(0, 10, 100)\n", "IMU_clean = []\n", "IMU_noisy = []\n", "\n", "for i, tk in enumerate(T_imu):\n", " # Generate clean 3D signal (sine, ramp, constant)\n", " x_clean = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " y_clean = ref_ramp.step(\n", " params={\"tk\": tk, \"t0\": 2.0, \"t1\": 8.0, \"y0\": 0.0, \"y1\": 1.0}\n", " )\n", " z_clean = ref_constant.step(params={\"c\": 1})\n", "\n", " clean_vec = np.array([x_clean, y_clean, z_clean])\n", " # Apply multivariate Gaussian noise in the loop\n", " noisy_vec = corrupt.with_gaussian_noise(clean_vec, Q=Q, seed=i)\n", "\n", " IMU_clean.append(clean_vec)\n", " IMU_noisy.append(noisy_vec)" ] }, { "cell_type": "code", "execution_count": null, "id": "1bcc76d6", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "\n", "# Convert to NumPy arrays for plotting\n", "imu_data = np.array(IMU_clean)\n", "noisy_imu = np.array(IMU_noisy)\n", "\n", "fig, axs = plt.subplots(1, 3, figsize=(15, 4))\n", "axes_names = [\"X Axis (low noise)\", \"Y Axis (medium noise)\", \"Z Axis (high noise)\"]\n", "\n", "for i in range(3):\n", " axs[i].plot(imu_data[:, i], \"b-\", label=\"Clean\", linewidth=2, alpha=0.7)\n", " axs[i].plot(noisy_imu[:, i], \"r-\", label=\"Noisy\", linewidth=1, alpha=0.6)\n", " axs[i].set_title(axes_names[i], fontsize=12, fontweight=\"bold\")\n", " axs[i].set_xlabel(\"Sample\", fontsize=10)\n", " axs[i].legend()\n", " axs[i].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "f1aad81f", "metadata": {}, "source": [ "#### Moving Average Filter\n", "\n", "The moving average filter smooths noisy signals by replacing each sample with the average of its neighbors:\n", "\n", "$$\n", "\\hat{y}_k = \\frac{1}{w} \\sum_{i=0}^{w-1} \\tilde{y}_{k-i}\n", "$$\n", "\n", "where $w$ is the window size. This reduces high-frequency noise but introduces phase lag proportional to $w/2$.\n", "\n", "**Best for**: reducing Gaussian noise\n", "**Tradeoff**: introduces lag, reduces bandwidth\n", "\n", "API reference: {func}`pykal.data_change.clean.with_moving_average`." ] }, { "cell_type": "code", "execution_count": null, "id": "fdc94868", "metadata": {}, "outputs": [], "source": [ "T_ma = np.linspace(0, 10, 1000)\n", "Y_original = []\n", "Y_noisy = []\n", "Y_smoothed = []\n", "\n", "for tk in T_ma:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_noisy = corrupt.with_gaussian_noise(yk, std=0.3)\n", "\n", " Y_original.append(yk)\n", " Y_noisy.append(yk_noisy)\n", "\n", "# Apply moving average after generation\n", "Y_noisy_arr = np.array(Y_noisy)\n", "Y_smoothed = clean.with_moving_average(Y_noisy_arr, window=10)" ] }, { "cell_type": "code", "execution_count": null, "id": "4f3e7787", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Convert to arrays for plotting\n", "sine_wave = np.array(Y_original)\n", "noisy_sine_2 = np.array(Y_noisy)\n", "smoothed_ma = Y_smoothed\n", "t = T_ma\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Original Clean Signal\", linewidth=2, alpha=0.5)\n", "plt.plot(\n", " t,\n", " noisy_sine_2,\n", " \"gray\",\n", " label=\"Corrupted (Gaussian noise)\",\n", " linewidth=0.5,\n", " alpha=0.5,\n", ")\n", "plt.plot(t, smoothed_ma, \"r-\", label=\"Prepared (Moving Avg, window=10)\", linewidth=2)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Moving Average Denoising\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Corrupted signal std: {np.std(noisy_sine_2):.4f}\")\n", "print(f\"Prepared signal std: {np.std(smoothed_ma):.4f}\")\n", "print(f\"Recovery error (RMS): {np.sqrt(np.mean((smoothed_ma - sine_wave)**2)):.4f}\")" ] }, { "cell_type": "markdown", "id": "54cca937", "metadata": {}, "source": [ "#### Exponential Smoothing\n", "\n", "Exponential smoothing is a recursive filter ideal for real-time applications:\n", "\n", "$$\n", "\\hat{y}_k = \\alpha \\tilde{y}_k + (1 - \\alpha) \\hat{y}_{k-1}\n", "$$\n", "\n", "where $\\alpha \\in [0,1]$ controls the smoothing strength. Larger $\\alpha$ responds faster but smooths less; smaller $\\alpha$ provides more smoothing but reacts slower to changes.\n", "\n", "**Best for**: real-time filtering\n", "**Parameter**: $\\alpha \\in [0,1]$\n", "\n", "API reference: {func}`pykal.data_change.clean.with_exponential_smoothing`." ] }, { "cell_type": "code", "execution_count": null, "id": "78586491", "metadata": {}, "outputs": [], "source": [ "T_exp = np.linspace(0, 10, 1000)\n", "Y_clean_exp = []\n", "Y_noisy_exp = []\n", "Y_smoothed_03 = []\n", "Y_smoothed_05 = []\n", "Y_smoothed_08 = []\n", "\n", "# Initialize exponential smoothing state for each alpha\n", "y_prev_03 = 0.0\n", "y_prev_05 = 0.0\n", "y_prev_08 = 0.0\n", "\n", "for tk in T_exp:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_noisy = corrupt.with_gaussian_noise(yk, std=0.25)\n", "\n", " # Apply exponential smoothing in the loop (recursive)\n", " y_smooth_03 = 0.3 * yk_noisy + (1 - 0.3) * y_prev_03\n", " y_smooth_05 = 0.5 * yk_noisy + (1 - 0.5) * y_prev_05\n", " y_smooth_08 = 0.8 * yk_noisy + (1 - 0.8) * y_prev_08\n", "\n", " # Update previous values\n", " y_prev_03 = y_smooth_03\n", " y_prev_05 = y_smooth_05\n", " y_prev_08 = y_smooth_08\n", "\n", " Y_clean_exp.append(yk)\n", " Y_noisy_exp.append(yk_noisy)\n", " Y_smoothed_03.append(y_smooth_03)\n", " Y_smoothed_05.append(y_smooth_05)\n", " Y_smoothed_08.append(y_smooth_08)\n", "\n", "# Convert to arrays\n", "smoothed_alpha_03 = np.array(Y_smoothed_03)\n", "smoothed_alpha_05 = np.array(Y_smoothed_05)\n", "smoothed_alpha_08 = np.array(Y_smoothed_08)" ] }, { "cell_type": "code", "execution_count": null, "id": "b735cd8d", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "sine_wave = np.array(Y_clean_exp)\n", "noisy_sine_3 = np.array(Y_noisy_exp)\n", "t = T_exp\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "plt.plot(t, noisy_sine_3, \"gray\", label=\"Corrupted\", linewidth=0.5, alpha=0.3)\n", "plt.plot(\n", " t,\n", " smoothed_alpha_03,\n", " \"r-\",\n", " label=\"α=0.3 (heavy smoothing)\",\n", " linewidth=1.5,\n", " alpha=0.8,\n", ")\n", "plt.plot(t, smoothed_alpha_05, \"g-\", label=\"α=0.5 (moderate)\", linewidth=1.5, alpha=0.8)\n", "plt.plot(t, smoothed_alpha_08, \"m-\", label=\"α=0.8 (light)\", linewidth=1.5, alpha=0.8)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Exponential Smoothing with Different α\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "8ebd5a55", "metadata": {}, "source": [ "#### Low-Pass Filter (First-Order)\n", "\n", "A first-order low-pass filter attenuates high-frequency components while passing low frequencies:\n", "\n", "$$\n", "\\hat{y}_k = \\alpha \\tilde{y}_k + (1 - \\alpha) \\hat{y}_{k-1}\n", "$$\n", "\n", "Equivalent to exponential smoothing, but interpreted in the frequency domain with cutoff frequency $f_c = \\alpha f_s / (2\\pi)$ where $f_s$ is the sampling rate.\n", "\n", "**Best for**: attenuating high-frequency noise\n", "**Parameter**: $\\alpha \\in [0,1]$\n", "\n", "API reference: {func}`pykal.data_change.clean.with_low_pass_filter`." ] }, { "cell_type": "code", "execution_count": null, "id": "d9b0f7dd", "metadata": {}, "outputs": [], "source": [ "T_lp = np.linspace(0, 10, 1000)\n", "Y_clean_lp = []\n", "Y_noisy_lp = []\n", "Y_filtered = []\n", "\n", "# Initialize low-pass filter state\n", "y_prev = 0.0\n", "alpha = 0.1\n", "\n", "for tk in T_lp:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " yk_noisy = corrupt.with_gaussian_noise(yk, std=0.3)\n", "\n", " # Apply low-pass filter in the loop (recursive)\n", " y_filtered = alpha * yk_noisy + (1 - alpha) * y_prev\n", " y_prev = y_filtered\n", "\n", " Y_clean_lp.append(yk)\n", " Y_noisy_lp.append(yk_noisy)\n", " Y_filtered.append(y_filtered)\n", "\n", "# Convert to array\n", "filtered_lp = np.array(Y_filtered)" ] }, { "cell_type": "code", "execution_count": null, "id": "4bd67e6b", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "sine_wave = np.array(Y_clean_lp)\n", "noisy_sine_4 = np.array(Y_noisy_lp)\n", "t = T_lp\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "plt.plot(t, noisy_sine_4, \"gray\", label=\"Corrupted (noise)\", linewidth=0.5, alpha=0.5)\n", "plt.plot(t, filtered_lp, \"r-\", label=\"Prepared (low-pass α=0.1)\", linewidth=2)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Low-Pass Filter (First-Order)\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "702660c2", "metadata": {}, "source": [ "### Contact Bounce\n", "\n", "Mechanical switches and encoders exhibit contact bounce—rapid oscillations near transitions before settling to a stable value:\n", "\n", "$$\n", "\\tilde{y}_k = \\begin{cases}\n", "y_k + A \\cdot \\text{noise}_k & \\text{if } |k - k_{\\text{transition}}| < d \\\\\n", "y_k & \\text{otherwise}\n", "\\end{cases}\n", "$$\n", "\n", "where $A$ is the bounce amplitude and $d$ is the bounce duration in samples. This non-ideal behavior occurs due to mechanical spring forces causing contacts to vibrate.\n", "\n", "**Hardware source**: mechanical switches and encoders that chatter near transitions\n", "**Common in**: digital inputs, rotary encoders, limit switches\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_bounce`." ] }, { "cell_type": "code", "execution_count": null, "id": "cb4b4265", "metadata": {}, "outputs": [], "source": [ "T_bounce = np.linspace(0, 4, 400)\n", "Y_step = []\n", "Y_bounced = []\n", "\n", "for tk in T_bounce:\n", " # Generate step signal: 0 before t=2, 1 after\n", " yk = 0.0 if tk < 2.0 else 1.0\n", "\n", " Y_step.append(yk)\n", "\n", "# Apply bounce corruption after generation\n", "step_signal = np.array(Y_step)\n", "bounced_step = corrupt.with_bounce(step_signal, duration=5, amplitude=0.3, seed=42)" ] }, { "cell_type": "code", "execution_count": null, "id": "d78095cd", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_bounce\n", "\n", "fig, axs = plt.subplots(1, 2, figsize=(14, 5))\n", "\n", "axs[0].plot(t, step_signal, \"b-\", label=\"Clean Step\", linewidth=2)\n", "axs[0].plot(t, bounced_step, \"r-\", label=\"With Bounce\", linewidth=1.5, alpha=0.8)\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Contact Bounce (Full Signal)\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "zoom_start, zoom_end = 195, 215\n", "axs[1].plot(\n", " t[zoom_start:zoom_end],\n", " step_signal[zoom_start:zoom_end],\n", " \"b-\",\n", " label=\"Clean Step\",\n", " linewidth=3,\n", ")\n", "axs[1].plot(\n", " t[zoom_start:zoom_end],\n", " bounced_step[zoom_start:zoom_end],\n", " \"r-\",\n", " label=\"With Bounce\",\n", " linewidth=2,\n", " alpha=0.8,\n", ")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Contact Bounce (Zoomed Transition)\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "51217db8", "metadata": {}, "source": [ "#### Debounce\n", "\n", "Debouncing removes contact bounce by requiring signal stability for a minimum duration before accepting a transition:\n", "\n", "$$\n", "\\hat{y}_k = \\begin{cases}\n", "\\tilde{y}_k & \\text{if } |\\tilde{y}_j - \\tilde{y}_{j-1}| < \\epsilon \\text{ for all } j \\in [k-d, k] \\\\\n", "\\hat{y}_{k-1} & \\text{otherwise}\n", "\\end{cases}\n", "$$\n", "\n", "where $\\epsilon$ is the stability threshold and $d$ is the minimum stable duration. This implements a form of temporal hysteresis.\n", "\n", "**Best for**: removing contact bounce from digital signals\n", "**Idea**: require stability for a minimum duration before accepting transitions\n", "\n", "API reference: {func}`pykal.data_change.clean.with_debounce`." ] }, { "cell_type": "code", "execution_count": null, "id": "748fa218", "metadata": {}, "outputs": [], "source": [ "T_debounce = np.linspace(0, 4, 400)\n", "Y_step_debounce = []\n", "\n", "for tk in T_debounce:\n", " yk = 0.0 if tk < 2.0 else 1.0\n", " Y_step_debounce.append(yk)\n", "\n", "# Apply bounce and debounce\n", "step_signal = np.array(Y_step_debounce)\n", "bounced_step_2 = corrupt.with_bounce(step_signal, duration=8, amplitude=0.4, seed=46)\n", "debounced = clean.with_debounce(bounced_step_2, threshold=0.2, min_duration=3)" ] }, { "cell_type": "code", "execution_count": null, "id": "2ec1a68b", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_debounce\n", "zoom_start, zoom_end = 195, 230\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(\n", " t[zoom_start:zoom_end],\n", " step_signal[zoom_start:zoom_end],\n", " \"b-\",\n", " label=\"Original Clean\",\n", " linewidth=3,\n", " alpha=0.5,\n", ")\n", "plt.plot(\n", " t[zoom_start:zoom_end],\n", " bounced_step_2[zoom_start:zoom_end],\n", " \"gray\",\n", " label=\"Corrupted (bounce)\",\n", " linewidth=1.5,\n", " alpha=0.7,\n", ")\n", "plt.plot(\n", " t[zoom_start:zoom_end],\n", " debounced[zoom_start:zoom_end],\n", " \"r-\",\n", " label=\"Prepared (debounced)\",\n", " linewidth=2,\n", ")\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Debounce Filter (Zoomed Transition)\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "0336c6a2", "metadata": {}, "source": [ "### Dropouts (Packet Loss)\n", "\n", "Wireless communication and intermittent sensor connections cause random samples to be missing:\n", "\n", "$$\n", "\\tilde{y}_k = \\begin{cases}\n", "y_k & \\text{with probability } (1 - p) \\\\\n", "\\text{NaN} & \\text{with probability } p\n", "\\end{cases}\n", "$$\n", "\n", "where $p$ is the dropout rate. Missing samples are marked as NaN and must be handled by downstream processing.\n", "\n", "**Hardware source**: wireless links, intermittent buses, sensor hiccups\n", "**Common in**: WiFi sensors, BLE devices, multi-robot comms\n", "\n", "API reference: {func}`pykal.data_change.corrupt.with_dropouts`." ] }, { "cell_type": "code", "execution_count": null, "id": "360aa4f5", "metadata": {}, "outputs": [], "source": [ "T_dropout = np.linspace(0, 10, 1000)\n", "Y_sine_dropout = []\n", "Y_dropped = []\n", "\n", "np.random.seed(42)\n", "dropout_rate = 0.2\n", "\n", "for tk in T_dropout:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", "\n", " # Apply dropout in the loop (probabilistic per-sample)\n", " if np.random.rand() < dropout_rate:\n", " yk_dropped = np.nan\n", " else:\n", " yk_dropped = yk\n", "\n", " Y_sine_dropout.append(yk)\n", " Y_dropped.append(yk_dropped)\n", "\n", "# Convert to arrays\n", "sine_wave = np.array(Y_sine_dropout)\n", "dropped_sine = np.array(Y_dropped)" ] }, { "cell_type": "code", "execution_count": null, "id": "c00e028a", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_dropout\n", "\n", "plt.figure(figsize=(12, 5))\n", "plt.plot(t, sine_wave, \"b-\", label=\"Clean Signal\", linewidth=2, alpha=0.5)\n", "plt.plot(t, dropped_sine, \"r.\", label=\"With 20% Dropouts (NaN)\", markersize=3)\n", "plt.xlabel(\"Time (s)\", fontsize=12)\n", "plt.ylabel(\"Amplitude\", fontsize=12)\n", "plt.title(\"Packet Loss / Dropouts\", fontsize=14, fontweight=\"bold\")\n", "plt.legend(fontsize=11)\n", "plt.grid(True, alpha=0.3)\n", "plt.show()\n", "\n", "print(f\"Total samples: {len(sine_wave)}\")\n", "print(f\"Dropped samples: {np.isnan(dropped_sine).sum()}\")\n", "print(f\"Dropout rate: {np.isnan(dropped_sine).sum() / len(sine_wave):.2%}\")" ] }, { "cell_type": "markdown", "id": "eb0d2eb3", "metadata": {}, "source": [ "#### Interpolation (Fill Missing Data)\n", "\n", "Interpolation fills missing samples (NaN values) using neighboring valid data:\n", "\n", "**Linear interpolation**:\n", "$$\n", "\\hat{y}_k = y_i + \\frac{k - i}{j - i}(y_j - y_i)\n", "$$\n", "where $i < k < j$ and $y_i, y_j$ are the nearest valid samples.\n", "\n", "**Nearest-neighbor interpolation**:\n", "$$\n", "\\hat{y}_k = y_{\\arg\\min_i |k - i|}\n", "$$\n", "\n", "**Best for**: dropouts (NaN values)\n", "**Methods**: linear (smooth), nearest-neighbor (preserves steps)\n", "\n", "API reference: {func}`pykal.data_change.clean.with_interpolation`." ] }, { "cell_type": "code", "execution_count": null, "id": "a027a7a3", "metadata": {}, "outputs": [], "source": [ "T_interp = np.linspace(0, 10, 1000)\n", "Y_sine_interp = []\n", "\n", "for tk in T_interp:\n", " yk = ref_sine_wave.step(\n", " params={\"tk\": tk, \"f_hz\": 1.0, \"amplitude\": 1.0, \"phase\": 0.0}\n", " )\n", " Y_sine_interp.append(yk)\n", "\n", "# Apply dropouts and interpolation\n", "sine_wave = np.array(Y_sine_interp)\n", "dropped_sine_2 = corrupt.with_dropouts(sine_wave, dropout_rate=0.15, seed=48)\n", "filled_linear = clean.with_interpolation(dropped_sine_2, method=\"linear\")\n", "filled_nearest = clean.with_interpolation(dropped_sine_2, method=\"nearest\")" ] }, { "cell_type": "code", "execution_count": null, "id": "48d6a8c6", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "t = T_interp\n", "\n", "fig, axs = plt.subplots(1, 2, figsize=(14, 5))\n", "\n", "axs[0].plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "axs[0].plot(t, dropped_sine_2, \"r.\", label=\"Corrupted (dropouts)\", markersize=3)\n", "axs[0].plot(t, filled_linear, \"g-\", label=\"Prepared (linear)\", linewidth=1.5, alpha=0.8)\n", "axs[0].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Linear Interpolation\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "axs[1].plot(t, sine_wave, \"b-\", label=\"Original Clean\", linewidth=2, alpha=0.5)\n", "axs[1].plot(t, dropped_sine_2, \"r.\", label=\"Corrupted (dropouts)\", markersize=3)\n", "axs[1].plot(\n", " t, filled_nearest, \"m-\", label=\"Prepared (nearest)\", linewidth=1.5, alpha=0.8\n", ")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Nearest-Neighbor Interpolation\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Dropped samples: {np.isnan(dropped_sine_2).sum()}\")\n", "print(f\"Remaining NaN after linear interpolation: {np.isnan(filled_linear).sum()}\")" ] }, { "cell_type": "markdown", "id": "732e0aaf", "metadata": {}, "source": [ "### Asynchronous / Multi-Rate Data\n", "\n", "**Hardware source**: sensors sampling at different rates, non-uniform timing\n", "**Common in**: sensor fusion (IMU @ 100Hz, GPS @ 10Hz, camera @ 30Hz)" ] }, { "cell_type": "code", "execution_count": null, "id": "d6fb5335", "metadata": {}, "outputs": [], "source": [ "# Simulate three sensors with different sampling rates\n", "t_fast = np.arange(0, 2, 0.01) # 100 Hz\n", "t_medium = np.arange(0, 2, 0.033) # ~30 Hz\n", "t_slow = np.arange(0, 2, 0.1) # 10 Hz\n", "\n", "# Generate fast sensor data\n", "signal_fast = []\n", "for tk in t_fast:\n", " yk = np.sin(2 * np.pi * 1.5 * tk)\n", " signal_fast.append(yk)\n", "signal_fast = np.array(signal_fast)\n", "signal_fast_noisy = corrupt.with_gaussian_noise(signal_fast, std=0.05, seed=60)\n", "\n", "# Generate medium sensor data\n", "signal_medium = []\n", "for tk in t_medium:\n", " yk = np.sin(2 * np.pi * 1.5 * tk)\n", " signal_medium.append(yk)\n", "signal_medium = np.array(signal_medium)\n", "signal_medium_noisy = corrupt.with_gaussian_noise(signal_medium, std=0.08, seed=61)\n", "\n", "# Generate slow sensor data\n", "signal_slow = []\n", "for tk in t_slow:\n", " yk = np.sin(2 * np.pi * 1.5 * tk)\n", " signal_slow.append(yk)\n", "signal_slow = np.array(signal_slow)\n", "signal_slow_noisy = corrupt.with_gaussian_noise(signal_slow, std=0.1, seed=62)" ] }, { "cell_type": "code", "execution_count": null, "id": "66a98b02", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "fig, axs = plt.subplots(2, 1, figsize=(12, 8))\n", "\n", "# Top: show different sampling rates\n", "axs[0].plot(t_fast, signal_fast, \"b-\", linewidth=2, alpha=0.3, label=\"True Signal\")\n", "axs[0].plot(\n", " t_fast,\n", " signal_fast_noisy,\n", " \"r.\",\n", " markersize=3,\n", " label=\"Fast Sensor (100Hz)\",\n", " alpha=0.6,\n", ")\n", "axs[0].plot(\n", " t_medium,\n", " signal_medium_noisy,\n", " \"g^\",\n", " markersize=5,\n", " label=\"Medium Sensor (~30Hz)\",\n", " alpha=0.7,\n", ")\n", "axs[0].plot(\n", " t_slow, signal_slow_noisy, \"mo\", markersize=7, label=\"Slow Sensor (10Hz)\", alpha=0.8\n", ")\n", "axs[0].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[0].set_title(\"Asynchronous Multi-Rate Sensors\", fontsize=14, fontweight=\"bold\")\n", "axs[0].legend(fontsize=11)\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Bottom: zoom in to see timing differences\n", "zoom_end = 0.3\n", "mask_fast = t_fast <= zoom_end\n", "mask_medium = t_medium <= zoom_end\n", "mask_slow = t_slow <= zoom_end\n", "\n", "axs[1].plot(\n", " t_fast[mask_fast],\n", " signal_fast[mask_fast],\n", " \"b-\",\n", " linewidth=2,\n", " alpha=0.3,\n", " label=\"True Signal\",\n", ")\n", "axs[1].plot(\n", " t_fast[mask_fast],\n", " signal_fast_noisy[mask_fast],\n", " \"r.\",\n", " markersize=6,\n", " label=\"Fast (100Hz)\",\n", " alpha=0.6,\n", ")\n", "axs[1].plot(\n", " t_medium[mask_medium],\n", " signal_medium_noisy[mask_medium],\n", " \"g^\",\n", " markersize=8,\n", " label=\"Medium (~30Hz)\",\n", " alpha=0.7,\n", ")\n", "axs[1].plot(\n", " t_slow[mask_slow],\n", " signal_slow_noisy[mask_slow],\n", " \"mo\",\n", " markersize=10,\n", " label=\"Slow (10Hz)\",\n", " alpha=0.8,\n", ")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=12)\n", "axs[1].set_ylabel(\"Amplitude\", fontsize=12)\n", "axs[1].set_title(\"Zoomed: Different Sample Timings\", fontsize=14, fontweight=\"bold\")\n", "axs[1].legend(fontsize=11)\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Fast sensor samples: {len(t_fast)} @ 100Hz\")\n", "print(f\"Medium sensor samples: {len(t_medium)} @ ~30Hz\")\n", "print(f\"Slow sensor samples: {len(t_slow)} @ 10Hz\")\n", "print(f\"\\nChallenge: How to fuse these into a single unified estimate?\")" ] }, { "cell_type": "markdown", "id": "7b31a3b6", "metadata": {}, "source": [ "#### Kalman Filter Multi-Rate Sensor Fusion\n", "\n", "The optimal solution for fusing asynchronous sensors with different noise characteristics is the **Kalman filter**. Here we demonstrate:\n", "\n", "- **Fast sensor** (100 Hz): High rate but noisy (σ = 0.3)\n", "- **Slow sensor** (10 Hz): Low rate but accurate (σ = 0.05)\n", "- **Kalman filter**: Optimally weights both based on their noise characteristics\n", "\n", "