Filter
In the data and machine learning world, a filter is a technique used to narrow down or preprocess data by removing irrelevant or unwanted information based on specific conditions, improving the quality and relevance of the dataset for analysis or modeling.
That however means that a data project typically contains multiple filters. For this, this project uses the GlobalFilter as a container for multiple SingleFilters.
SingleFilter
- filter_feature: It can be a Feature or a feature name as string.
- parameter: A dictionary of parameters to filter. Example: {"min": 2, "max": 3}
- filter_type: It can be a str or a FilterType.
FilterType
This class is supposed to created similarity in the framework and by framework users.
class FilterType(Enum):
MIN = "min"
MAX = "max"
EQUAL = "equal"
RANGE = "range"
REGEX = "regex"
CATEGORICAL_INCLUSION = "categorical_inclusion"
GlobalFilter
The GlobalFilter provides methods to add filters to the collection. The prefered way to use the GlobalFilter is by using the following functions.
add_filter: Adds a single filter to the GlobalFilter object.
- filter_feature
- filter_type
- parameter
add_time_and_time_travel_filters: Adds time and time travel filtering to the GlobalFiltering. This is a convenience method. Due to the complexity of time in data/ml/ai projects, this function should be used.
This method is useful for filtering data based on time ranges (event) and validity periods (valid).
Event Time Filter: For historical data (e.g., checking if a customer had a valid contract at the event time), only the event time filter is needed.
Time Travel Filter: If prior actions (e.g., payments made before the event) are relevant, the time travel filter is required.
Typically, valid_to matches the event timestamp, but in cases like payment plans, where payments occur after creation, some payments may be excluded based on the valid_to data.
Parameters:
- event_from (datetime): Start of the time range (with timezone).
- event_to (datetime): End of the time range (with timezone).
- valid_from (Optional[datetime]): Start of the validity period (optional, with timezone).
- valid_to (Optional[datetime]): End of the validity period (optional, with timezone).
- max_exclusive (bool): If True, the upper bounds (event_to, valid_to) are treated as exclusive.
- event_time_column: The column name containing event timestamps. Default is "reference_time".
- validity_time_column: The column name containing validity timestamps. Default is "time_travel".
The single_filters created will be converted to UTC as ISO 8601 formatted strings to ensure consistency across time zones and avoid ambiguity when comparing or processing time-based data.
How to create a collection of single filters (GlobalFilter)
In this example, we simply instantiate a GlobalFilter and add a SingleFilter.
from mloda.user import GlobalFilter
global_filter = GlobalFilter()
global_filter.add_filter("example_order_id", "equal", {"value": 1})
global_filter.filters
Result
{<SingleFilter(feature_name=example_order_id, type=equal, parameters=(('value', 1),))>}
How to deal with time filters
In this example, we show how one can manage datetime relations.
from datetime import datetime, timezone
event_from = datetime(2023, 1, 1, tzinfo=timezone.utc)
event_to = datetime(2023, 12, 31, tzinfo=timezone.utc)
valid_from = datetime(2022, 1, 1, tzinfo=timezone.utc)
valid_to = datetime(2022, 12, 31, tzinfo=timezone.utc)
global_filter.add_time_and_time_travel_filters(event_from, event_to, valid_from, valid_to)
global_filter.filters
Result
{<SingleFilter(feature_name=time_travel, type=range, parameters=(('max', '2022-12-31T00:00:00+00:00'), ('max_exclusive', True), ('min', '2022-01-01T00:00:00+00:00')))>,
<SingleFilter(feature_name=reference_time, type=range, parameters=(('max', '2023-12-31T00:00:00+00:00'), ('max_exclusive', True), ('min', '2023-01-01T00:00:00+00:00')))>}
Example access to Filters in the Feature Group
Now, we need to also use a FeatureGroup which supports it. In this example, we show where we can access the SingleFilters. The implementation of the concrete filters is dependent on the feature group. This example is rather complex as we filter a python dictionary.
Further, the feature is a data creator, so we create the data here itself.
from mloda.user import mloda
from mloda.provider import FeatureGroup, FeatureSet, ComputeFramework, BaseInputData, DataCreator
from typing import Any, Union, Set, Type, Optional
from mloda_plugins.compute_framework.base_implementations.pyarrow.table import PyArrowTable
class ExampleOrderFilter(FeatureGroup):
@classmethod
def input_data(cls) -> Optional[BaseInputData]:
return DataCreator({cls.get_class_name(), "example_order_id"})
@classmethod
def calculate_feature(cls, data: Any, features: FeatureSet) -> Any:
_data_creator = {cls.get_class_name(): [1, 2, 3],
"example_order_id": [2, 1, 1]
}
# The following algorithm is naive and rather should show an example than a normal use case.
# The filter implementation highly depends on the feature group!
# Extract the filter value and filter_name information from the filters.
for filter in features.filters:
filter_value = filter.parameter.value
filter_name = filter.filter_feature.name
break
# Create the order_id filter
order_id_filter = [i for i, order_id in enumerate(_data_creator[filter_name]) if order_id == filter_value]
# Apply the filter
filtered_data = {
key: [value[i] for i in order_id_filter]
for key, value in _data_creator.items()
if key != filter_name
}
return filtered_data
@classmethod
def compute_framework_rule(cls) -> set[type[ComputeFramework]]:
return {PyArrowTable}
result = mloda.run_all(
["ExampleOrderFilter"],
global_filter=global_filter
)
result[0]
Expected Output
ExampleOrderFilter: [[2,3]]
Although this example is complex, it is noteworthy, that the framework considers filters as features and setup as that in the framework.
Summary
This filtering system improves data preprocessing through GlobalFilter and SingleFilters, allowing flexible, condition-based refinement, including time-based filtering. It maintains consistency using FilterType and supports complex machine learning use cases. If you encounter a commonly used filter not yet included, feel free to open an issue or submit a pull request.
Handling Filters in a FeatureGroup
The previous sections show how users create filters. This section explains how FeatureGroup authors work with those filters during calculation.
How filters reach your FeatureGroup
When a user passes a GlobalFilter, the framework matches each SingleFilter to the
FeatureGroups that declare the filter's column as an input. Each matched filter is
deep-copied before delivery. If two FeatureGroups both match the same original
filter, they each receive independent copies. This means a single GlobalFilter can
be processed differently by different FeatureGroups in the same pipeline: one may use
it for inline masking while another uses it for row elimination.
Matched filters are attached to the FeatureSet before calculate_feature() is
called. Inside your calculation you can access them via features.filters:
@classmethod
def calculate_feature(cls, data, features: FeatureSet):
if features.filters is not None:
for single_filter in features.filters:
column = single_filter.name # e.g. "status"
value = single_filter.parameter.value # e.g. "active"
# ... use however you need
features.filters is always available, regardless of any other setting.
Two independent concerns
Filters involve two decisions that are independent of each other:
| Concern | Who decides | When it happens |
|---|---|---|
Inline reading -- should the FeatureGroup read features.filters during calculation? |
The FeatureGroup author (you) | During calculate_feature() |
| Row elimination -- should the framework remove non-matching rows after calculation? | final_filters() return value |
After calculate_feature() |
A FeatureGroup can do either, both, or neither. The two concerns are decoupled.
final_filters() reference
Override this classmethod on your FeatureGroup to control post-calculation row elimination:
@classmethod
def final_filters(cls) -> bool | None:
return None # default
| Return value | Meaning |
|---|---|
None |
Defer to the FilterEngine tied to the ComputeFramework. Most engines (Pandas, PyArrow, Polars, Spark) default to True (eliminate rows). Iceberg defaults to False (predicate pushdown handles it). |
False |
Skip row elimination. Use this when your FeatureGroup fully handles the filter itself. |
True |
Force row elimination, even if the FilterEngine would skip it. |
This method does not affect whether features.filters is populated. Filters are
always available for inline reading.
final_filters() is a semantic flag that controls when in the pipeline the
filter is applied, not where the computation runs physically:
| Framework category | Examples | Physical behavior |
|---|---|---|
| Eager | Pandas, PyArrow | Post-hoc filter in memory after full materialization |
| Lazy (SQL) | DuckDB, SQLite | .filter() adds WHERE to query plan; optimizer may push to scan time |
| Lazy (dataframe) | Polars, Spark | .filter() adds node to lazy plan; optimizer decides physical order |
| Scan-time | Iceberg | Predicates pushed into scan expressions (final_filters()=False) |
All frameworks that return True produce the same logical result (non-matching rows
absent from output), but the physical execution path differs.
Usage patterns
Pattern 1: Let the framework handle everything (default)
The most common case. Your FeatureGroup ignores filters entirely, and the framework removes non-matching rows after calculation. No override needed.
class SalesTotal(FeatureGroup):
@classmethod
def calculate_feature(cls, data, features: FeatureSet):
# Just compute; framework handles filtering afterward
return pa.table({cls.get_class_name(): compute_totals(data)})
Pattern 2: Inline masking, skip row elimination
Use FilterMask.build() to turn features.filters into a boolean mask for a given
column. This replaces the boilerplate of iterating SingleFilter objects and building
masks manually. The correct engine is wired automatically by the ComputeFramework.
Import it from mloda.provider:
from mloda.provider import FilterMask
Example: "Sum of active sales per region, broadcast back to every row."
class MaskedRegionSum(FeatureGroup):
@classmethod
def final_filters(cls) -> bool:
return False # skip row elimination
@classmethod
def calculate_feature(cls, data, features: FeatureSet):
mask = FilterMask.build(data, features, column="status")
masked = pc.if_else(mask, data["sales"], None)
# aggregate masked values, broadcast back to all rows
return pa.table({cls.get_class_name(): broadcast_sum(masked, data["region"])})
Result: all rows preserved, but only matching values contributed to the sum.
FilterMask.build() supports equal, min, max, range, and
categorical_inclusion filter types. When multiple filters target the same column
they are AND-combined. When no filters match the column, an all-True mask is returned.
Pattern 3: Inline masking + row elimination
Read filters for conditional logic and request row elimination afterward. Useful when you need filter-aware computation (masking, weighting, branching) but also want non-matching rows removed from the final output.
Example: "Sum of active sales per region, only for active rows."
class MaskedRegionSumActiveOnly(FeatureGroup):
@classmethod
def final_filters(cls) -> bool:
return True # also eliminate non-matching rows
@classmethod
def calculate_feature(cls, data, features: FeatureSet):
mask = FilterMask.build(data, features, column="status")
masked = pc.if_else(mask, data["sales"], None)
sums = broadcast_sum(masked, data["region"])
# Return all rows; the framework will remove non-matching ones
return pa.table({
cls.get_class_name(): sums,
"status": data["status"], # preserve the filter column
})
Result: only matching rows remain, with aggregated values computed from masked data.
Pattern 4: Force elimination on a non-eliminating engine
Some engines skip row elimination by default (e.g. Iceberg, which uses predicate pushdown
at scan time). If your FeatureGroup computes derived columns that the scan could not
filter, override final_filters() to force elimination:
class DerivedIcebergFeature(FeatureGroup):
@classmethod
def final_filters(cls) -> bool:
return True # override Iceberg's default of False
The overlap contract
When a FeatureGroup reads filters inline and returns final_filters() = True, the
same filters are processed twice: once by your code during calculation, and once by the
framework's FilterEngine afterward.
This is safe as long as you follow one rule: preserve the filter column with its original values in your output.
The framework's row elimination works by matching against the filter column in your
returned data. If your FeatureGroup drops that column, the framework raises a
ValueError with a clear message naming the missing column. If your FeatureGroup
changes the column's type (e.g., mapping strings to integers), the framework detects
the dtype mismatch and raises a ValueError. Value-level mutations within the same
type (e.g., remapping "active" to "yes") are not detected and may produce wrong
results silently.
# Correct: filter column preserved with original values
return pa.table({
cls.get_class_name(): computed_values,
"status": original_status_column, # framework can filter on this
})
# Wrong: filter column type changed -- raises ValueError (dtype mismatch)
return pa.table({
cls.get_class_name(): computed_values,
"status": [1, 0, 1, 0], # mapped to ints; framework detects string-vs-numeric mismatch
})
# Wrong: filter column omitted -- raises ValueError at runtime
return pa.table({
cls.get_class_name(): computed_values,
# "status" missing; framework raises: "missing filter column 'status'"
})
Quick reference
| Your FeatureGroup... | final_filters() |
Reads features.filters? |
Must preserve filter column? |
|---|---|---|---|
| Ignores filters | None (default) |
No | Yes (it is in input_data) |
| Handles everything inline | False |
Yes | No (elimination skipped) |
| Uses inline logic + elimination | True |
Yes | Yes |
| Just forces elimination | True |
No | Yes |
Pipeline data flows
The following examples show what happens to data at each stage for the four patterns above. All flows use the same input and filter:
Input:
region | status | value
A | active | 10
A | inactive | 20
B | active | 30
B | inactive | 40
Filter: status == "active"
Eager framework + final filters (Pandas, PyArrow)
| Stage | Data |
|---|---|
calculate_feature() output |
[10, 20, 30, 40] with status = [active, inactive, active, inactive] |
run_final_filter() |
Creates mask [True, False, True, False], applies table.filter(mask) |
| Final result | [10, 30] with status = [active, active] (2 rows) |
Lazy framework + final filters (DuckDB, SQLite, Polars, Spark)
| Stage | Data |
|---|---|
calculate_feature() output |
Lazy relation: SELECT value, status FROM source |
run_final_filter() |
Appends filter: WHERE "status" = 'active' |
| Materialized result | [10, 30] with status = [active, active] (2 rows) |
The result is identical to eager row elimination, but no intermediate full-table materialization occurs.
Inline masking (all rows preserved)
| Stage | Data |
|---|---|
Read features.filters |
Extract: status == "active" |
| Build mask | [True, False, True, False] |
| Apply mask to value column | masked_value = [10, NULL, 30, NULL] |
| Aggregate (sum by region, broadcast) | Region A: 10, Region B: 30 |
| Final result | [10, 10, 30, 30] (4 rows, all preserved) |
The framework skips run_final_filter() because final_filters() = False.
Masking and elimination (same filter, two FeatureGroups)
A pipeline may need the same filter for two purposes in separate steps. Because filters are deep-copied independently for each FeatureGroup, both steps run without interference:
| Stage | Step 1 (inline mask FG) | Step 2 (regular FG) |
|---|---|---|
final_filters() |
False |
None (engine default: True) |
calculate_feature() |
Reads filters, masks values, aggregates | Computes raw values, ignores filters |
run_final_filter() |
Skipped | Applies row elimination |
| Result | [10, 10, 30, 30] (4 rows) |
[10, 30] (2 rows) |