energy_fault_detector.data_preprocessing.data_preprocessor

Generic class for building a preprocessing pipeline.

class DataPreprocessor(steps=None, **params)

Bases: Pipeline, SaveLoadMixin

A data preprocessing pipeline that allows for configurable steps based on the extended pipeline.

If both steps and legacy params are provided, steps take precedence and a warning is emitted. When neither steps nor legacy params are provided, a default “old-style” pipeline is created which removes features that are constant or just binary and contain more 5% missing values. Afterward, remaining missing values are imputed with the mean and the features are scaled with the StandardScaler.

Parameters:
  • steps (Optional[List[Dict[str, Any]]]) –

    Optional list of step specifications. Each item is a dict with:

    • name: registered step name (see STEP_REGISTRY).

    • enabled: optional bool (default True).

    • params: dict of constructor arguments for the step.

    • step_name: optional explicit pipeline name (defaults to name).

  • **params (Any) – Legacy parameters used when steps is None (see _legacy_keys()).

Notes

Enforced ordering in steps mode:

  1. NaN introducing steps first (DuplicateValuesToNan, CounterDiffTransformer),

  2. ColumnSelector (if present),

  3. Other steps

  4. SimpleImputer placed before scaler (always present; mean strategy by default),

  5. Scaler always last (StandardScaler by default).

Configuration example:

train:
  data_preprocessor:
    steps:
    - name: column_selector
      params:
        max_nan_frac_per_col: 0.05
        features_to_exclude: ['exclude_this_feature']
    - name: counter_diff_transformer
      step_name: counter_flow
      params:
        counters: ['flow_total_m3']
        compute_rate: True
        fill_first: 'zero'
    - name: counter_diff_transformer
      step_name: counter_energy
      params:
        counters: ['energy_total_kwh']
        compute_rate: False
        fill_first: 'zero'
        reset_strategy: 'rollover',
        rollover_values:
          'energy_total_kwh': 100000.0
NAME_ALIASES: Dict[str, str] = {'angle_transform': 'angle_transformer', 'counter_diff': 'counter_diff_transformer', 'counter_diff_transform': 'counter_diff_transformer', 'duplicate_value_to_nan': 'duplicate_to_nan', 'duplicate_values_to_nan': 'duplicate_to_nan', 'imputer': 'simple_imputer', 'minmax': 'minmax_scaler', 'standard': 'standard_scaler', 'standardize': 'standard_scaler', 'standardscaler': 'standard_scaler'}
STEP_REGISTRY = {'angle_transformer': <class 'energy_fault_detector.data_preprocessing.angle_transformer.AngleTransformer'>, 'column_selector': <class 'energy_fault_detector.data_preprocessing.column_selector.ColumnSelector'>, 'counter_diff_transformer': <class 'energy_fault_detector.data_preprocessing.counter_diff_transformer.CounterDiffTransformer'>, 'duplicate_to_nan': <class 'energy_fault_detector.data_preprocessing.duplicate_value_to_nan.DuplicateValuesToNan'>, 'low_unique_value_filter': <class 'energy_fault_detector.data_preprocessing.low_unique_value_filter.LowUniqueValueFilter'>, 'minmax_scaler': sklearn.preprocessing.MinMaxScaler, 'simple_imputer': sklearn.impute.SimpleImputer, 'standard_scaler': sklearn.preprocessing.StandardScaler}
fit_transform(x, **kwargs)

Fit and transform in one step.

Parameters:

x (DataFrame) – Input DataFrame.

Return type:

DataFrame

Returns:

Transformed DataFrame with the same index as input.

inverse_transform(x, **kwargs)

Inverse-transform scaler and angles (other transforms are not reversed).

Parameters:

x (DataFrame) – The transformed data.

Return type:

DataFrame

Returns:

DataFrame with inverse scaling and angle back-transformation.

transform(x, **kwargs)

Apply pipeline steps to the input DataFrame.

Parameters:

x (DataFrame) – Input DataFrame.

Return type:

DataFrame

Returns:

DataFrame with the same index as input.