Mask Engine

The mask engine provides boolean masking primitives for use inside calculate_feature(). It lets a FeatureGroup selectively include or exclude values without removing rows from the output.

For user-provided filter propagation (GlobalFilter, SingleFilter, row elimination), see Filter Data.

How the mask engine is wired

Each ComputeFramework provides a BaseMaskEngine subclass via its mask_engine() classmethod. The framework sets features.mask_engine on the FeatureSet before calculate_feature() is called. This wiring is independent of features.filters and final_filters().

@classmethod
def calculate_feature(cls, data, features: FeatureSet):
    engine = features.mask_engine   # always available when the framework provides one
    mask = engine.equal(data, "status", "active")
    # ... apply mask

Primitives

Every BaseMaskEngine subclass implements these abstract classmethods:

Method Returns mask where...
equal(data, column, value) data[column] == value
greater_equal(data, column, value) data[column] >= value
greater_than(data, column, value) data[column] > value
less_equal(data, column, value) data[column] <= value
less_than(data, column, value) data[column] < value
is_in(data, column, values) data[column] is in values
combine(mask1, mask2) logical AND of two masks
all_true(data) all True (no filtering)

Convenience methods

Built from the primitives above. No per-engine implementation needed.

between(data, column, min_value, max_value, *, min_exclusive=False, max_exclusive=False)

Range check combining a lower and upper bound:

# Inclusive: value in [10, 100]
mask = engine.between(data, "value", 10, 100)

# Exclusive bounds: value in (10, 100)
mask = engine.between(data, "value", 10, 100, min_exclusive=True, max_exclusive=True)

# Half-open: value in [10, 100)
mask = engine.between(data, "value", 10, 100, max_exclusive=True)

all_of(data, masks)

AND-combine a list of masks. Returns an all-True mask if the list is empty.

mask = engine.all_of(data, [
    engine.equal(data, "status", "active"),
    engine.between(data, "value", 10, 100),
])

Masking patterns

Inline masking, skip row elimination

Set final_filters() = False to tell the framework not to remove rows. Use the mask engine to selectively null out values before aggregation.

class MaskedRegionSum(FeatureGroup):
    @classmethod
    def final_filters(cls) -> bool:
        return False  # skip row elimination

    @classmethod
    def calculate_feature(cls, data, features: FeatureSet):
        engine = features.mask_engine
        mask = engine.equal(data, "status", "active")
        masked = pc.if_else(mask, data["sales"], None)
        # aggregate masked values, broadcast back to all rows
        return pa.table({cls.get_class_name(): broadcast_sum(masked, data["region"])})

Result: all rows preserved, but only matching values contributed to the sum.

Inline masking + row elimination

Use the mask engine for conditional logic and return final_filters() = True so the framework also eliminates non-matching rows afterward. You must preserve the filter column in your output (see the overlap contract in the filter docs).

class MaskedRegionSumActiveOnly(FeatureGroup):
    @classmethod
    def final_filters(cls) -> bool:
        return True  # also eliminate non-matching rows

    @classmethod
    def calculate_feature(cls, data, features: FeatureSet):
        engine = features.mask_engine
        mask = engine.equal(data, "status", "active")
        masked = pc.if_else(mask, data["sales"], None)
        sums = broadcast_sum(masked, data["region"])
        # Return all rows; the framework will remove non-matching ones
        return pa.table({
            cls.get_class_name(): sums,
            "status": data["status"],  # preserve the filter column
        })

Result: only matching rows remain, with aggregated values computed from masked data.

Pipeline data flow (inline masking)

Input:
    region | status   | value
    A      | active   |  10
    A      | inactive |  20
    B      | active   |  30
    B      | inactive |  40

Mask predicate: status == "active"
Stage Data
Use features.mask_engine engine.equal(data, "status", "active")
Build mask [True, False, True, False]
Apply mask to value column masked_value = [10, NULL, 30, NULL]
Aggregate (sum by region, broadcast) Region A: 10, Region B: 30
Final result [10, 10, 30, 30] (4 rows, all preserved)

The framework skips run_final_filter() because final_filters() = False.