pykal.data_change.corrupt

class pykal.data_change.corrupt[source]

Bases: object

Simulate common hardware data corruption issues.

All methods are static and take NumPy arrays as input, returning corrupted versions. Useful for testing robustness of estimators and controllers before hardware deployment.

__init__()

Methods

__init__()

with_bias(data[, bias])

Add constant offset/bias to data.

with_bounce(data[, duration, amplitude, seed])

Simulate contact bounce on digital/binary signals.

with_clipping(data[, lower, upper])

Clip data to saturation limits (sensor saturation).

with_delay(data[, delay, fill_value])

Add time delay to data (latency, slow sensors).

with_drift(data[, drift_rate, drift_type])

Add time-dependent drift to data.

with_dropouts(data[, dropout_rate, ...])

Randomly drop data points (packet loss, sensor failures).

with_gaussian_noise(data[, std, mean, cov, seed])

Add Gaussian (normal) noise to data.

with_quantization(data[, levels])

Quantize data to discrete levels (ADC quantization).

with_spikes(data[, spike_rate, ...])

Add random spikes/outliers to data.

static with_bias(data, bias=0.5)[source]

Add constant offset/bias to data.

Common in uncalibrated sensors (IMUs, force sensors, etc.).

Parameters:
  • data (NDArray) – Input data array

  • bias (float, optional) – Constant bias to add (default 0.5)

Returns:

Data with added bias

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1.0, 2.0, 3.0])
>>> biased = corrupt.with_bias(data, bias=1.5)
>>> np.allclose(biased - data, 1.5)
True
static with_bounce(data, duration=3, amplitude=0.5, seed=None)[source]

Simulate contact bounce on digital/binary signals.

Common in switches, encoders, limit switches. Creates rapid oscillations when signal changes state.

Parameters:
  • data (NDArray) – Input data array (typically binary or step changes)

  • duration (int, optional) – Number of samples to bounce (default 3)

  • amplitude (float, optional) – Amplitude of bounce oscillation (default 0.5)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with bounce artifacts at transitions

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([0., 0., 1., 1., 1.])
>>> bounced = corrupt.with_bounce(data, duration=2, seed=42)
>>> bounced.shape == data.shape
True
static with_clipping(data, lower=None, upper=None)[source]

Clip data to saturation limits (sensor saturation).

Common when sensors reach their measurement range limits.

Parameters:
  • data (NDArray) – Input data array

  • lower (float, optional) – Lower clipping bound (default: data.min())

  • upper (float, optional) – Upper clipping bound (default: data.max())

Returns:

Clipped data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([-2., -1., 0., 1., 2.])
>>> clipped = corrupt.with_clipping(data, lower=-1, upper=1)
>>> np.allclose(clipped, [-1., -1., 0., 1., 1.])
True
static with_delay(data, delay=1, fill_value=0.0)[source]

Add time delay to data (latency, slow sensors).

Common in communication delays, slow sensors, processing lag.

Parameters:
  • data (NDArray) – Input data array

  • delay (int, optional) – Number of samples to delay (default 1)

  • fill_value (float, optional) – Value to use for initial samples (default 0.0)

Returns:

Delayed data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 2., 3., 4.])
>>> delayed = corrupt.with_delay(data, delay=2, fill_value=0)
>>> np.allclose(delayed, [0., 0., 1., 2.])
True
static with_drift(data, drift_rate=0.01, drift_type='linear')[source]

Add time-dependent drift to data.

Common in sensors that warm up or degrade (temperature sensors, gyroscopes, pressure sensors).

Parameters:
  • data (NDArray) – Input data array

  • drift_rate (float, optional) – Rate of drift per sample (default 0.01)

  • drift_type (str, optional) – Type of drift: ‘linear’ or ‘exponential’ (default ‘linear’)

Returns:

Data with added drift

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.ones(5)
>>> drifted = corrupt.with_drift(data, drift_rate=0.1)
>>> drifted[-1] > drifted[0]  # drift increases over time
True
static with_dropouts(data, dropout_rate=0.1, fill_value=nan, seed=None)[source]

Randomly drop data points (packet loss, sensor failures).

Common in wireless communication, intermittent connections.

Parameters:
  • data (NDArray) – Input data array

  • dropout_rate (float, optional) – Fraction of data points to drop (default 0.1)

  • fill_value (float, optional) – Value to use for dropped points (default np.nan)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with random dropouts

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.array([1., 2., 3., 4., 5.])
>>> dropped = corrupt.with_dropouts(data, dropout_rate=0.3, seed=42)
>>> dropped.shape == data.shape
True
>>> np.isnan(dropped).sum() > 0  # some data dropped
True
static with_gaussian_noise(data, std=None, mean=None, cov=None, seed=None)[source]

Add Gaussian (normal) noise to data. Supports scalar, list, or NDArray input. Returns data in the same type it was passed in.

Return type:

Union[float, list, ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Parameters:
static with_quantization(data, levels=256)[source]

Quantize data to discrete levels (ADC quantization).

Simulates analog-to-digital conversion with limited bit depth.

Parameters:
  • data (NDArray) – Input data array

  • levels (int, optional) – Number of quantization levels (default 256 for 8-bit ADC)

Returns:

Quantized data

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.linspace(0, 1, 100)
>>> quantized = corrupt.with_quantization(data, levels=10)
>>> len(np.unique(quantized)) <= 10
True
static with_spikes(data, spike_rate=0.05, spike_magnitude=5.0, seed=None)[source]

Add random spikes/outliers to data.

Common in EMI, electrical interference, sensor glitches.

Parameters:
  • data (NDArray) – Input data array

  • spike_rate (float, optional) – Fraction of data points to spike (default 0.05)

  • spike_magnitude (float, optional) – Magnitude of spikes relative to data range (default 5.0)

  • seed (int, optional) – Random seed for reproducibility

Returns:

Data with random spikes

Return type:

NDArray

Examples

>>> import numpy as np
>>> data = np.ones(100)
>>> spiked = corrupt.with_spikes(data, spike_rate=0.1, seed=42)
>>> (np.abs(spiked - data) > 1).sum() > 0  # some spikes present
True