Data Corruption and Preparation

Data corruption and preparation utilities for simulating and handling real-world hardware sensor issues in robotics applications.

class pykal.data_change.clean[source]

Bases: object

Clean and prepare corrupted sensor data.

All methods are static and take NumPy arrays as input, returning cleaned versions. Designed to handle common hardware issues before feeding data to estimators/controllers.

static with_calibration(data, offset=0.0, scale=1.0)[source]

Apply calibration (remove bias, scale correction).

Parameters:
  • data (NDArray) – Input data array

  • offset (float, optional) – Offset to subtract (bias correction) (default 0.0)

  • scale (float, optional) – Scale factor to apply (default 1.0)

Returns:

Calibrated data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([2., 4., 6.])
>>> calibrated = prepare.with_calibration(data, offset=1.0, scale=0.5)
>>> np.allclose(calibrated, [0.5, 1.5, 2.5])
True
static with_clipping_recovery(data, lower=None, upper=None, mark_invalid=False)[source]

Detect and handle clipped/saturated values.

Parameters:
  • data (NDArray) – Input data array

  • lower (float, optional) – Lower saturation limit (default: data.min())

  • upper (float, optional) – Upper saturation limit (default: data.max())

  • mark_invalid (bool, optional) – If True, replace clipped values with NaN (default False)

Returns:

Data with clipped values handled

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 5., 5., 5., 1.])  # clipped at 5
>>> recovered = prepare.with_clipping_recovery(data, upper=5, mark_invalid=True)
>>> np.isnan(recovered[1:4]).all()  # clipped values marked
True
static with_debounce(data, threshold=0.1, min_duration=2)[source]

Remove contact bounce from binary/step signals.

Requires signal to remain stable for min_duration samples before accepting the transition.

Parameters:
  • data (NDArray) – Input data array

  • threshold (float, optional) – Threshold for detecting state change (default 0.1)

  • min_duration (int, optional) – Minimum stable samples required (default 2)

Returns:

Debounced data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([0., 0., 1., 0., 1., 1., 1.])  # bouncing
>>> debounced = prepare.with_debounce(data, min_duration=2)
>>> debounced.shape == data.shape
True
static with_exponential_smoothing(data, alpha=0.3)[source]

Apply exponential smoothing filter (denoise, low-pass).

Gives more weight to recent data. Alpha=1 is no filtering, alpha=0 is infinite smoothing.

Parameters:
  • data (NDArray) – Input data array

  • alpha (float, optional) – Smoothing factor between 0 and 1 (default 0.3)

Returns:

Smoothed data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 2., 3., 4., 5.])
>>> smoothed = prepare.with_exponential_smoothing(data, alpha=0.5)
>>> smoothed.shape == data.shape
True
>>> smoothed[0] == data[0]  # first value unchanged
True
static with_interpolation(data, method='linear')[source]

Interpolate missing data (NaN values).

Useful for handling dropouts and missing sensor readings.

Parameters:
  • data (NDArray) – Input data array (may contain NaN)

  • method (str, optional) – Interpolation method: ‘linear’ or ‘nearest’ (default ‘linear’)

Returns:

Data with NaN values interpolated

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., np.nan, 3., np.nan, 5.])
>>> filled = prepare.with_interpolation(data)
>>> np.isnan(filled).sum() == 0  # no NaN remaining
True
static with_low_pass_filter(data, alpha=0.2)[source]

Simple first-order low-pass filter (RC filter).

Attenuates high-frequency noise while preserving low-frequency signals.

Parameters:
  • data (NDArray) – Input data array

  • alpha (float, optional) – Filter coefficient (0=max filtering, 1=no filtering) (default 0.2)

Returns:

Filtered data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([0., 1., 0., 1., 0.])  # high freq
>>> filtered = prepare.with_low_pass_filter(data, alpha=0.3)
>>> filtered.shape == data.shape
True
static with_median_filter(data, window=3)[source]

Apply median filter (remove spikes, outliers).

Highly effective for spike/impulse noise while preserving edges.

Parameters:
  • data (NDArray) – Input data array

  • window (int, optional) – Size of median window (default 3)

Returns:

Filtered data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 1., 100., 1., 1.])  # spike at index 2
>>> filtered = prepare.with_median_filter(data, window=3)
>>> filtered[2] < 10  # spike removed
True
static with_moving_average(data, window=3)[source]

Apply moving average filter (denoise, smooth).

Simple low-pass filter effective for Gaussian noise.

Parameters:
  • data (NDArray) – Input data array

  • window (int, optional) – Size of moving average window (default 3)

Returns:

Smoothed data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 5., 2., 6., 3.])
>>> smoothed = prepare.with_moving_average(data, window=3)
>>> smoothed.shape == data.shape
True
>>> abs(smoothed[2] - 2.6667) < 0.001  # average of [1, 5, 2]
True
static with_outlier_removal(data, threshold=3.0, method='replace')[source]

Detect and handle outliers using z-score method.

Parameters:
  • data (NDArray) – Input data array

  • threshold (float, optional) – Z-score threshold for outlier detection (default 3.0)

  • method (str, optional) – How to handle outliers: ‘replace’ with median or ‘interpolate’ (default ‘replace’)

Returns:

Data with outliers handled

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 1., 100., 1., 1.])  # outlier at index 2
>>> cleaned = prepare.with_outlier_removal(data, threshold=1.5)
>>> cleaned[2] < 10  # outlier replaced
True
static with_staleness_policy(data, policy='hold')[source]

Apply staleness policy to data with missing values (NaN).

Handles stale/missing sensor data according to different policies, matching the ROSNode staleness configuration in ros_node.py.

This is particularly useful for sensor fusion where different sensors update at different rates, or when dealing with intermittent communication.

Parameters:
  • data (NDArray) – Input data array (may contain NaN for stale/missing data)

  • policy (str, optional) – Staleness policy (default ‘hold’): - ‘zero’: Replace missing/stale data with zeros - ‘hold’: Hold last valid value (forward fill) - ‘drop’: Remove data points with NaN (returns shorter array) - ‘none’: Keep NaN values as-is (no processing)

Returns:

Processed data according to policy

Return type:

NDArray

Examples

Hold policy (forward fill - default):

>>> import numpy as np
>>> data = np.array([1., 2., np.nan, np.nan, 5.])
>>> filled = prepare.with_staleness_policy(data, policy='hold')
>>> np.array_equal(filled, [1., 2., 2., 2., 5.])
True

Zero policy (replace with zeros):

>>> data = np.array([1., 2., np.nan, np.nan, 5.])
>>> filled = prepare.with_staleness_policy(data, policy='zero')
>>> np.array_equal(filled, [1., 2., 0., 0., 5.])
True

Drop policy (remove NaN entries):

>>> data = np.array([1., 2., np.nan, np.nan, 5.])
>>> filled = prepare.with_staleness_policy(data, policy='drop')
>>> np.array_equal(filled, [1., 2., 5.])
True

None policy (keep NaN as-is):

>>> data = np.array([1., 2., np.nan, np.nan, 5.])
>>> filled = prepare.with_staleness_policy(data, policy='none')
>>> np.array_equal(filled, data, equal_nan=True)
True
class pykal.data_change.corrupt[source]

Bases: object

Simulate common hardware data corruption issues.

All methods are static and take NumPy arrays as input, returning corrupted versions. Useful for testing robustness of estimators and controllers before hardware deployment.

static with_bias(data, bias=0.5)[source]

Add constant offset/bias to data.

Common in uncalibrated sensors (IMUs, force sensors, etc.).

Parameters:
  • data (NDArray) – Input data array

  • bias (float, optional) – Constant bias to add (default 0.5)

Returns:

Data with added bias

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1.0, 2.0, 3.0])
>>> biased = corrupt.with_bias(data, bias=1.5)
>>> np.allclose(biased - data, 1.5)
True
static with_bounce(data, duration=3, amplitude=0.5, seed=None)[source]

Simulate contact bounce on digital/binary signals.

Common in switches, encoders, limit switches. Creates rapid oscillations when signal changes state.

Parameters:
  • data (NDArray) – Input data array (typically binary or step changes)

  • duration (int, optional) – Number of samples to bounce (default 3)

  • amplitude (float, optional) – Amplitude of bounce oscillation (default 0.5)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with bounce artifacts at transitions

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([0., 0., 1., 1., 1.])
>>> bounced = corrupt.with_bounce(data, duration=2, seed=42)
>>> bounced.shape == data.shape
True
static with_clipping(data, lower=None, upper=None)[source]

Clip data to saturation limits (sensor saturation).

Common when sensors reach their measurement range limits.

Parameters:
  • data (NDArray) – Input data array

  • lower (float, optional) – Lower clipping bound (default: data.min())

  • upper (float, optional) – Upper clipping bound (default: data.max())

Returns:

Clipped data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([-2., -1., 0., 1., 2.])
>>> clipped = corrupt.with_clipping(data, lower=-1, upper=1)
>>> np.allclose(clipped, [-1., -1., 0., 1., 1.])
True
static with_delay(data, delay=1, fill_value=0.0)[source]

Add time delay to data (latency, slow sensors).

Common in communication delays, slow sensors, processing lag.

Parameters:
  • data (NDArray) – Input data array

  • delay (int, optional) – Number of samples to delay (default 1)

  • fill_value (float, optional) – Value to use for initial samples (default 0.0)

Returns:

Delayed data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 2., 3., 4.])
>>> delayed = corrupt.with_delay(data, delay=2, fill_value=0)
>>> np.allclose(delayed, [0., 0., 1., 2.])
True
static with_drift(data, drift_rate=0.01, drift_type='linear')[source]

Add time-dependent drift to data.

Common in sensors that warm up or degrade (temperature sensors, gyroscopes, pressure sensors).

Parameters:
  • data (NDArray) – Input data array

  • drift_rate (float, optional) – Rate of drift per sample (default 0.01)

  • drift_type (str, optional) – Type of drift: ‘linear’ or ‘exponential’ (default ‘linear’)

Returns:

Data with added drift

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.ones(5)
>>> drifted = corrupt.with_drift(data, drift_rate=0.1)
>>> drifted[-1] > drifted[0]  # drift increases over time
True
static with_dropouts(data, dropout_rate=0.1, fill_value=nan, seed=None)[source]

Randomly drop data points (packet loss, sensor failures).

Common in wireless communication, intermittent connections.

Parameters:
  • data (NDArray) – Input data array

  • dropout_rate (float, optional) – Fraction of data points to drop (default 0.1)

  • fill_value (float, optional) – Value to use for dropped points (default np.nan)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with random dropouts

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 2., 3., 4., 5.])
>>> dropped = corrupt.with_dropouts(data, dropout_rate=0.3, seed=42)
>>> dropped.shape == data.shape
True
>>> np.isnan(dropped).sum() > 0  # some data dropped
True
static with_gaussian_noise(data, std=None, mean=None, cov=None, seed=None)[source]

Add Gaussian (normal) noise to data. Supports scalar, list, or NDArray input. Returns data in the same type it was passed in.

Return type:

Union[float, list, ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Parameters:
static with_quantization(data, levels=256)[source]

Quantize data to discrete levels (ADC quantization).

Simulates analog-to-digital conversion with limited bit depth.

Parameters:
  • data (NDArray) – Input data array

  • levels (int, optional) – Number of quantization levels (default 256 for 8-bit ADC)

Returns:

Quantized data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.linspace(0, 1, 100)
>>> quantized = corrupt.with_quantization(data, levels=10)
>>> len(np.unique(quantized)) <= 10
True
static with_spikes(data, spike_rate=0.05, spike_magnitude=5.0, seed=None)[source]

Add random spikes/outliers to data.

Common in EMI, electrical interference, sensor glitches.

Parameters:
  • data (NDArray) – Input data array

  • spike_rate (float, optional) – Fraction of data points to spike (default 0.05)

  • spike_magnitude (float, optional) – Magnitude of spikes relative to data range (default 5.0)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with random spikes

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.ones(100)
>>> spiked = corrupt.with_spikes(data, spike_rate=0.1, seed=42)
>>> (np.abs(spiked - data) > 1).sum() > 0  # some spikes present
True

Corruption Utilities

The corrupt class provides methods to simulate common hardware sensor issues:

class pykal.data_change.corrupt[source]

Bases: object

Simulate common hardware data corruption issues.

All methods are static and take NumPy arrays as input, returning corrupted versions. Useful for testing robustness of estimators and controllers before hardware deployment.

static with_bias(data, bias=0.5)[source]

Add constant offset/bias to data.

Common in uncalibrated sensors (IMUs, force sensors, etc.).

Parameters:
  • data (NDArray) – Input data array

  • bias (float, optional) – Constant bias to add (default 0.5)

Returns:

Data with added bias

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1.0, 2.0, 3.0])
>>> biased = corrupt.with_bias(data, bias=1.5)
>>> np.allclose(biased - data, 1.5)
True
static with_bounce(data, duration=3, amplitude=0.5, seed=None)[source]

Simulate contact bounce on digital/binary signals.

Common in switches, encoders, limit switches. Creates rapid oscillations when signal changes state.

Parameters:
  • data (NDArray) – Input data array (typically binary or step changes)

  • duration (int, optional) – Number of samples to bounce (default 3)

  • amplitude (float, optional) – Amplitude of bounce oscillation (default 0.5)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with bounce artifacts at transitions

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([0., 0., 1., 1., 1.])
>>> bounced = corrupt.with_bounce(data, duration=2, seed=42)
>>> bounced.shape == data.shape
True
static with_clipping(data, lower=None, upper=None)[source]

Clip data to saturation limits (sensor saturation).

Common when sensors reach their measurement range limits.

Parameters:
  • data (NDArray) – Input data array

  • lower (float, optional) – Lower clipping bound (default: data.min())

  • upper (float, optional) – Upper clipping bound (default: data.max())

Returns:

Clipped data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([-2., -1., 0., 1., 2.])
>>> clipped = corrupt.with_clipping(data, lower=-1, upper=1)
>>> np.allclose(clipped, [-1., -1., 0., 1., 1.])
True
static with_delay(data, delay=1, fill_value=0.0)[source]

Add time delay to data (latency, slow sensors).

Common in communication delays, slow sensors, processing lag.

Parameters:
  • data (NDArray) – Input data array

  • delay (int, optional) – Number of samples to delay (default 1)

  • fill_value (float, optional) – Value to use for initial samples (default 0.0)

Returns:

Delayed data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 2., 3., 4.])
>>> delayed = corrupt.with_delay(data, delay=2, fill_value=0)
>>> np.allclose(delayed, [0., 0., 1., 2.])
True
static with_drift(data, drift_rate=0.01, drift_type='linear')[source]

Add time-dependent drift to data.

Common in sensors that warm up or degrade (temperature sensors, gyroscopes, pressure sensors).

Parameters:
  • data (NDArray) – Input data array

  • drift_rate (float, optional) – Rate of drift per sample (default 0.01)

  • drift_type (str, optional) – Type of drift: ‘linear’ or ‘exponential’ (default ‘linear’)

Returns:

Data with added drift

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.ones(5)
>>> drifted = corrupt.with_drift(data, drift_rate=0.1)
>>> drifted[-1] > drifted[0]  # drift increases over time
True
static with_dropouts(data, dropout_rate=0.1, fill_value=nan, seed=None)[source]

Randomly drop data points (packet loss, sensor failures).

Common in wireless communication, intermittent connections.

Parameters:
  • data (NDArray) – Input data array

  • dropout_rate (float, optional) – Fraction of data points to drop (default 0.1)

  • fill_value (float, optional) – Value to use for dropped points (default np.nan)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with random dropouts

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 2., 3., 4., 5.])
>>> dropped = corrupt.with_dropouts(data, dropout_rate=0.3, seed=42)
>>> dropped.shape == data.shape
True
>>> np.isnan(dropped).sum() > 0  # some data dropped
True
static with_gaussian_noise(data, std=None, mean=None, cov=None, seed=None)[source]

Add Gaussian (normal) noise to data. Supports scalar, list, or NDArray input. Returns data in the same type it was passed in.

Return type:

Union[float, list, ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Parameters:
static with_quantization(data, levels=256)[source]

Quantize data to discrete levels (ADC quantization).

Simulates analog-to-digital conversion with limited bit depth.

Parameters:
  • data (NDArray) – Input data array

  • levels (int, optional) – Number of quantization levels (default 256 for 8-bit ADC)

Returns:

Quantized data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.linspace(0, 1, 100)
>>> quantized = corrupt.with_quantization(data, levels=10)
>>> len(np.unique(quantized)) <= 10
True
static with_spikes(data, spike_rate=0.05, spike_magnitude=5.0, seed=None)[source]

Add random spikes/outliers to data.

Common in EMI, electrical interference, sensor glitches.

Parameters:
  • data (NDArray) – Input data array

  • spike_rate (float, optional) – Fraction of data points to spike (default 0.05)

  • spike_magnitude (float, optional) – Magnitude of spikes relative to data range (default 5.0)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with random spikes

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.ones(100)
>>> spiked = corrupt.with_spikes(data, spike_rate=0.1, seed=42)
>>> (np.abs(spiked - data) > 1).sum() > 0  # some spikes present
True

Preparation Utilities

The prepare class provides methods to clean and prepare corrupted sensor data: