Data Corruption and Preparation
Data corruption and preparation utilities for simulating and handling real-world hardware sensor issues in robotics applications.
- class pykal.data_change.clean[source]
Bases:
objectClean and prepare corrupted sensor data.
All methods are static and take NumPy arrays as input, returning cleaned versions. Designed to handle common hardware issues before feeding data to estimators/controllers.
- static with_calibration(data, offset=0.0, scale=1.0)[source]
Apply calibration (remove bias, scale correction).
- Parameters:
- Returns:
Calibrated data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([2., 4., 6.]) >>> calibrated = prepare.with_calibration(data, offset=1.0, scale=0.5) >>> np.allclose(calibrated, [0.5, 1.5, 2.5]) True
- static with_clipping_recovery(data, lower=None, upper=None, mark_invalid=False)[source]
Detect and handle clipped/saturated values.
- Parameters:
- Returns:
Data with clipped values handled
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 5., 5., 5., 1.]) # clipped at 5 >>> recovered = prepare.with_clipping_recovery(data, upper=5, mark_invalid=True) >>> np.isnan(recovered[1:4]).all() # clipped values marked True
- static with_debounce(data, threshold=0.1, min_duration=2)[source]
Remove contact bounce from binary/step signals.
Requires signal to remain stable for min_duration samples before accepting the transition.
- Parameters:
- Returns:
Debounced data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([0., 0., 1., 0., 1., 1., 1.]) # bouncing >>> debounced = prepare.with_debounce(data, min_duration=2) >>> debounced.shape == data.shape True
- static with_exponential_smoothing(data, alpha=0.3)[source]
Apply exponential smoothing filter (denoise, low-pass).
Gives more weight to recent data. Alpha=1 is no filtering, alpha=0 is infinite smoothing.
- Parameters:
data (NDArray) – Input data array
alpha (float, optional) – Smoothing factor between 0 and 1 (default 0.3)
- Returns:
Smoothed data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 2., 3., 4., 5.]) >>> smoothed = prepare.with_exponential_smoothing(data, alpha=0.5) >>> smoothed.shape == data.shape True >>> smoothed[0] == data[0] # first value unchanged True
- static with_interpolation(data, method='linear')[source]
Interpolate missing data (NaN values).
Useful for handling dropouts and missing sensor readings.
- Parameters:
data (NDArray) – Input data array (may contain NaN)
method (str, optional) – Interpolation method: ‘linear’ or ‘nearest’ (default ‘linear’)
- Returns:
Data with NaN values interpolated
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., np.nan, 3., np.nan, 5.]) >>> filled = prepare.with_interpolation(data) >>> np.isnan(filled).sum() == 0 # no NaN remaining True
- static with_low_pass_filter(data, alpha=0.2)[source]
Simple first-order low-pass filter (RC filter).
Attenuates high-frequency noise while preserving low-frequency signals.
- Parameters:
data (NDArray) – Input data array
alpha (float, optional) – Filter coefficient (0=max filtering, 1=no filtering) (default 0.2)
- Returns:
Filtered data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([0., 1., 0., 1., 0.]) # high freq >>> filtered = prepare.with_low_pass_filter(data, alpha=0.3) >>> filtered.shape == data.shape True
- static with_median_filter(data, window=3)[source]
Apply median filter (remove spikes, outliers).
Highly effective for spike/impulse noise while preserving edges.
- Parameters:
data (NDArray) – Input data array
window (int, optional) – Size of median window (default 3)
- Returns:
Filtered data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 1., 100., 1., 1.]) # spike at index 2 >>> filtered = prepare.with_median_filter(data, window=3) >>> filtered[2] < 10 # spike removed True
- static with_moving_average(data, window=3)[source]
Apply moving average filter (denoise, smooth).
Simple low-pass filter effective for Gaussian noise.
- Parameters:
data (NDArray) – Input data array
window (int, optional) – Size of moving average window (default 3)
- Returns:
Smoothed data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 5., 2., 6., 3.]) >>> smoothed = prepare.with_moving_average(data, window=3) >>> smoothed.shape == data.shape True >>> abs(smoothed[2] - 2.6667) < 0.001 # average of [1, 5, 2] True
- static with_outlier_removal(data, threshold=3.0, method='replace')[source]
Detect and handle outliers using z-score method.
- Parameters:
- Returns:
Data with outliers handled
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 1., 100., 1., 1.]) # outlier at index 2 >>> cleaned = prepare.with_outlier_removal(data, threshold=1.5) >>> cleaned[2] < 10 # outlier replaced True
- static with_staleness_policy(data, policy='hold')[source]
Apply staleness policy to data with missing values (NaN).
Handles stale/missing sensor data according to different policies, matching the ROSNode staleness configuration in ros_node.py.
This is particularly useful for sensor fusion where different sensors update at different rates, or when dealing with intermittent communication.
- Parameters:
data (NDArray) – Input data array (may contain NaN for stale/missing data)
policy (str, optional) – Staleness policy (default ‘hold’): - ‘zero’: Replace missing/stale data with zeros - ‘hold’: Hold last valid value (forward fill) - ‘drop’: Remove data points with NaN (returns shorter array) - ‘none’: Keep NaN values as-is (no processing)
- Returns:
Processed data according to policy
- Return type:
NDArray
Examples
Hold policy (forward fill - default):
>>> import numpy as np >>> data = np.array([1., 2., np.nan, np.nan, 5.]) >>> filled = prepare.with_staleness_policy(data, policy='hold') >>> np.array_equal(filled, [1., 2., 2., 2., 5.]) True
Zero policy (replace with zeros):
>>> data = np.array([1., 2., np.nan, np.nan, 5.]) >>> filled = prepare.with_staleness_policy(data, policy='zero') >>> np.array_equal(filled, [1., 2., 0., 0., 5.]) True
Drop policy (remove NaN entries):
>>> data = np.array([1., 2., np.nan, np.nan, 5.]) >>> filled = prepare.with_staleness_policy(data, policy='drop') >>> np.array_equal(filled, [1., 2., 5.]) True
None policy (keep NaN as-is):
>>> data = np.array([1., 2., np.nan, np.nan, 5.]) >>> filled = prepare.with_staleness_policy(data, policy='none') >>> np.array_equal(filled, data, equal_nan=True) True
- class pykal.data_change.corrupt[source]
Bases:
objectSimulate common hardware data corruption issues.
All methods are static and take NumPy arrays as input, returning corrupted versions. Useful for testing robustness of estimators and controllers before hardware deployment.
- static with_bias(data, bias=0.5)[source]
Add constant offset/bias to data.
Common in uncalibrated sensors (IMUs, force sensors, etc.).
- Parameters:
data (NDArray) – Input data array
bias (float, optional) – Constant bias to add (default 0.5)
- Returns:
Data with added bias
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1.0, 2.0, 3.0]) >>> biased = corrupt.with_bias(data, bias=1.5) >>> np.allclose(biased - data, 1.5) True
- static with_bounce(data, duration=3, amplitude=0.5, seed=None)[source]
Simulate contact bounce on digital/binary signals.
Common in switches, encoders, limit switches. Creates rapid oscillations when signal changes state.
- Parameters:
- Returns:
Data with bounce artifacts at transitions
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([0., 0., 1., 1., 1.]) >>> bounced = corrupt.with_bounce(data, duration=2, seed=42) >>> bounced.shape == data.shape True
- static with_clipping(data, lower=None, upper=None)[source]
Clip data to saturation limits (sensor saturation).
Common when sensors reach their measurement range limits.
- Parameters:
- Returns:
Clipped data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([-2., -1., 0., 1., 2.]) >>> clipped = corrupt.with_clipping(data, lower=-1, upper=1) >>> np.allclose(clipped, [-1., -1., 0., 1., 1.]) True
- static with_delay(data, delay=1, fill_value=0.0)[source]
Add time delay to data (latency, slow sensors).
Common in communication delays, slow sensors, processing lag.
- Parameters:
- Returns:
Delayed data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 2., 3., 4.]) >>> delayed = corrupt.with_delay(data, delay=2, fill_value=0) >>> np.allclose(delayed, [0., 0., 1., 2.]) True
- static with_drift(data, drift_rate=0.01, drift_type='linear')[source]
Add time-dependent drift to data.
Common in sensors that warm up or degrade (temperature sensors, gyroscopes, pressure sensors).
- Parameters:
- Returns:
Data with added drift
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.ones(5) >>> drifted = corrupt.with_drift(data, drift_rate=0.1) >>> drifted[-1] > drifted[0] # drift increases over time True
- static with_dropouts(data, dropout_rate=0.1, fill_value=nan, seed=None)[source]
Randomly drop data points (packet loss, sensor failures).
Common in wireless communication, intermittent connections.
- Parameters:
- Returns:
Data with random dropouts
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 2., 3., 4., 5.]) >>> dropped = corrupt.with_dropouts(data, dropout_rate=0.3, seed=42) >>> dropped.shape == data.shape True >>> np.isnan(dropped).sum() > 0 # some data dropped True
- static with_gaussian_noise(data, std=None, mean=None, cov=None, seed=None)[source]
Add Gaussian (normal) noise to data. Supports scalar, list, or NDArray input. Returns data in the same type it was passed in.
- static with_quantization(data, levels=256)[source]
Quantize data to discrete levels (ADC quantization).
Simulates analog-to-digital conversion with limited bit depth.
- Parameters:
data (NDArray) – Input data array
levels (int, optional) – Number of quantization levels (default 256 for 8-bit ADC)
- Returns:
Quantized data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.linspace(0, 1, 100) >>> quantized = corrupt.with_quantization(data, levels=10) >>> len(np.unique(quantized)) <= 10 True
- static with_spikes(data, spike_rate=0.05, spike_magnitude=5.0, seed=None)[source]
Add random spikes/outliers to data.
Common in EMI, electrical interference, sensor glitches.
- Parameters:
- Returns:
Data with random spikes
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.ones(100) >>> spiked = corrupt.with_spikes(data, spike_rate=0.1, seed=42) >>> (np.abs(spiked - data) > 1).sum() > 0 # some spikes present True
Corruption Utilities
The corrupt class provides methods to simulate common hardware sensor issues:
- class pykal.data_change.corrupt[source]
Bases:
objectSimulate common hardware data corruption issues.
All methods are static and take NumPy arrays as input, returning corrupted versions. Useful for testing robustness of estimators and controllers before hardware deployment.
- static with_bias(data, bias=0.5)[source]
Add constant offset/bias to data.
Common in uncalibrated sensors (IMUs, force sensors, etc.).
- Parameters:
data (NDArray) – Input data array
bias (float, optional) – Constant bias to add (default 0.5)
- Returns:
Data with added bias
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1.0, 2.0, 3.0]) >>> biased = corrupt.with_bias(data, bias=1.5) >>> np.allclose(biased - data, 1.5) True
- static with_bounce(data, duration=3, amplitude=0.5, seed=None)[source]
Simulate contact bounce on digital/binary signals.
Common in switches, encoders, limit switches. Creates rapid oscillations when signal changes state.
- Parameters:
- Returns:
Data with bounce artifacts at transitions
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([0., 0., 1., 1., 1.]) >>> bounced = corrupt.with_bounce(data, duration=2, seed=42) >>> bounced.shape == data.shape True
- static with_clipping(data, lower=None, upper=None)[source]
Clip data to saturation limits (sensor saturation).
Common when sensors reach their measurement range limits.
- Parameters:
- Returns:
Clipped data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([-2., -1., 0., 1., 2.]) >>> clipped = corrupt.with_clipping(data, lower=-1, upper=1) >>> np.allclose(clipped, [-1., -1., 0., 1., 1.]) True
- static with_delay(data, delay=1, fill_value=0.0)[source]
Add time delay to data (latency, slow sensors).
Common in communication delays, slow sensors, processing lag.
- Parameters:
- Returns:
Delayed data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 2., 3., 4.]) >>> delayed = corrupt.with_delay(data, delay=2, fill_value=0) >>> np.allclose(delayed, [0., 0., 1., 2.]) True
- static with_drift(data, drift_rate=0.01, drift_type='linear')[source]
Add time-dependent drift to data.
Common in sensors that warm up or degrade (temperature sensors, gyroscopes, pressure sensors).
- Parameters:
- Returns:
Data with added drift
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.ones(5) >>> drifted = corrupt.with_drift(data, drift_rate=0.1) >>> drifted[-1] > drifted[0] # drift increases over time True
- static with_dropouts(data, dropout_rate=0.1, fill_value=nan, seed=None)[source]
Randomly drop data points (packet loss, sensor failures).
Common in wireless communication, intermittent connections.
- Parameters:
- Returns:
Data with random dropouts
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.array([1., 2., 3., 4., 5.]) >>> dropped = corrupt.with_dropouts(data, dropout_rate=0.3, seed=42) >>> dropped.shape == data.shape True >>> np.isnan(dropped).sum() > 0 # some data dropped True
- static with_gaussian_noise(data, std=None, mean=None, cov=None, seed=None)[source]
Add Gaussian (normal) noise to data. Supports scalar, list, or NDArray input. Returns data in the same type it was passed in.
- static with_quantization(data, levels=256)[source]
Quantize data to discrete levels (ADC quantization).
Simulates analog-to-digital conversion with limited bit depth.
- Parameters:
data (NDArray) – Input data array
levels (int, optional) – Number of quantization levels (default 256 for 8-bit ADC)
- Returns:
Quantized data
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.linspace(0, 1, 100) >>> quantized = corrupt.with_quantization(data, levels=10) >>> len(np.unique(quantized)) <= 10 True
- static with_spikes(data, spike_rate=0.05, spike_magnitude=5.0, seed=None)[source]
Add random spikes/outliers to data.
Common in EMI, electrical interference, sensor glitches.
- Parameters:
- Returns:
Data with random spikes
- Return type:
NDArray
Examples
>>> import numpy as np >>> data = np.ones(100) >>> spiked = corrupt.with_spikes(data, spike_rate=0.1, seed=42) >>> (np.abs(spiked - data) > 1).sum() > 0 # some spikes present True
Preparation Utilities
The prepare class provides methods to clean and prepare corrupted sensor data: