Join datasets
Combining datasets from various feature groups is crucial for building comprehensive and scalable data processing pipelines. The framework automatically handles data merging in the background to accommodate the following scenarios:
- Different Compute Frameworks: Merging data from feature groups that utilize different underlying compute technologies.
- Same Compute Framework, Different Sources: Combining datasets that use the same compute framework but originate from different data sources.
- Same Feature Group, Different Feature Options: Integrating data from the same feature group configured with different feature options.
If we have a feature, which is dependent on a aforementioned setup,
- we need to define a Link,
- and the ComputeFramework must support the according merge!
We will first discuss the basic building blocks of the Links (Index, Join Types) and then the merge in the ComputeFramework.
Index
The Index class defines the keys (columns) used to merge datasets. An index is a tuple of strings.
Properties:
- Multi-Index Support: Supports multi-column keys for complex merges.
- Comparison Operations: Methods to compare indexes and determine subset relationships.
- Utility Methods: Provides methods to check if the index is a multi-index.
Example:
from mloda.user import Index
# Create an Index with a single column
single_column_index = Index(('user_id',))
# Create an Index with multiple columns (multi-index)
multi_column_index = Index(('user_id', 'timestamp'))
# Check if an index is a multi-index
is_multi = multi_column_index.is_multi_index() # Returns True
# Check if single column is a part of a composite index
is_a_part_of = single_column_index.is_a_part_of_(multi_column_index) # Returns True
JoinType
Join Types specify how two datasets are merged based on their keys. The framework supports the following join types:
- Inner Join,
- Left Join,
- Outer Join,
- Right Join (use sparingly; prefer left joins when possible).
from enum import Enum
class JoinType(Enum):
INNER = "inner"
LEFT = "left"
RIGHT = "right"
OUTER = "outer"
APPEND = "append"
UNION = "union"
join_type = JoinType.INNER
Link
A Link specifies the relationship of:
- join type,
- the feature groups involved,
- and the indexes to use for the join.
from mloda.user import Link, JoinSpec
from mloda.provider import FeatureGroup
# Assume FeatureGroupA and FeatureGroupB are defined feature groups
feature_group_a = FeatureGroup()
feature_group_b = FeatureGroup()
# JoinSpec accepts multiple index formats:
# - String for single column: "id"
# - Tuple for single/multiple columns: ("id",) or ("col1", "col2")
# - Explicit Index: Index(("id",))
# Simple single-column join using string
link = Link.inner(
left=JoinSpec(feature_group_a, "id"),
right=JoinSpec(feature_group_b, "feature_a_id")
)
# Multi-column join using tuple
link = Link.inner(
left=JoinSpec(feature_group_a, ("id", "timestamp")),
right=JoinSpec(feature_group_b, ("ref_id", "ref_time"))
)
# Explicit Index syntax (equivalent to string/tuple above)
link = Link.inner(
left=JoinSpec(feature_group_a, Index(("id",))),
right=JoinSpec(feature_group_b, Index(("feature_a_id",)))
)
Convenience _on Methods
For feature groups that define index_columns(), you can use the convenience _on methods to automatically derive the join index:
from mloda.user import Link, Index
# Define feature groups with index_columns
class UserFeatureGroup(FeatureGroup):
@classmethod
def index_columns(cls):
return [Index(("user_id",))]
class OrderFeatureGroup(FeatureGroup):
@classmethod
def index_columns(cls):
return [Index(("user_id",)), Index(("order_id",))]
# Instead of verbose JoinSpec construction:
link = Link.inner(
JoinSpec(UserFeatureGroup, UserFeatureGroup.index_columns()[0]),
JoinSpec(OrderFeatureGroup, OrderFeatureGroup.index_columns()[0])
)
# Use the convenient _on method:
link = Link.inner_on(UserFeatureGroup, OrderFeatureGroup)
# Select specific index when feature group has multiple (0-based position):
link = Link.inner_on(UserFeatureGroup, OrderFeatureGroup, left_index=0, right_index=1)
Available _on methods: inner_on, left_on, right_on, outer_on, append_on, union_on
Note: The _on methods raise ValueError if the feature group doesn't define index_columns() or returns an empty list, and IndexError if the specified index position is out of range.
Self-Joins with Alias Fields
When joining a feature group with itself, you need to distinguish between the left and right instances using alias fields (similar to SQL table aliases in self-joins).
Alias fields are optional dictionary parameters (self_left_alias and self_right_alias) that match against feature options:
from mloda.user import Feature, Index, Link
# Define an example feature group for demonstration
class UserFeatureGroup(FeatureGroup):
@classmethod
def index_columns(cls):
return [Index(("user_id",))]
# Option 1: Using _on method (recommended when index_columns is defined)
link = Link.inner_on(UserFeatureGroup, UserFeatureGroup,
self_left_alias={"side": "left"},
self_right_alias={"side": "right"})
# Option 2: Using explicit JoinSpec
left = JoinSpec(UserFeatureGroup, "user_id")
right = JoinSpec(UserFeatureGroup, "user_id")
link = Link("inner", left, right,
self_left_alias={"side": "left"},
self_right_alias={"side": "right"})
# 2. Tag features with matching options
# Feature names reference the feature from the feature group
features = {
Feature("age", options={"side": "left"}),
Feature("age", options={"side": "right"}),
}
The execution planner validates that the alias key-value pairs exist in the corresponding feature's options to correctly identify which instance belongs to which side of the join.
Polymorphic Link Matching
Links support inheritance-based matching, allowing a link defined with base classes to automatically apply to subclasses. This enables defining generic join relationships that work across feature group hierarchies.
Matching Rules:
-
Exact match first: If a link's feature groups exactly match the classes being joined, it takes priority over any polymorphic matches.
-
Balanced inheritance: For polymorphic matches, both sides must have the same inheritance distance from the link's defined classes. This prevents sibling class mismatches.
-
Most specific wins: Among valid balanced matches, the link closest in the inheritance hierarchy is selected.
Example:
# Define a base feature group hierarchy
class BaseUserFeatureGroup(FeatureGroup):
pass
class PremiumUserFeatureGroup(BaseUserFeatureGroup):
pass
class StandardUserFeatureGroup(BaseUserFeatureGroup):
pass
# Define a link using base classes
link = Link.inner(
left=JoinSpec(BaseUserFeatureGroup, "user_id"),
right=JoinSpec(BaseUserFeatureGroup, "user_id")
)
# This link will match:
# - (PremiumUserFeatureGroup, PremiumUserFeatureGroup) ✓
# - (StandardUserFeatureGroup, StandardUserFeatureGroup) ✓
# - (BaseUserFeatureGroup, BaseUserFeatureGroup) ✓
# This link will NOT match (sibling mismatch):
# - (PremiumUserFeatureGroup, StandardUserFeatureGroup) ✗
The balanced inheritance rule ensures that joins only occur between "parallel" subclasses - both sides must be at the same level in the inheritance hierarchy relative to the link definition.
mlodaAPI
from mloda.user import mloda
set_of_links = {link}
mloda.run_all(
features=["Feature_of_FeatureGroupA", "Feature_of_FeatureGroupB"],
links=set_of_links
)
mloda will then use the merge implementation in the compute framework and use the given links to join datasets, if needed.
In the following section, we will see how this can look like.
Merging Data in the Compute Framework
The compute framework uses the base class BaseMergeEngine as configuration. In this example, we show the PandasMergeEngine.
class PandasDataFrame(ComputeFramework):
def merge_engine(self) -> Type[BaseMergeEngine]:
return PandasMergeEngine
The merge can implement:
- merge_inner
- merge_left
- merge_right
- merge_full_outer
- merge_append
- merge_union
These methods are invoked via the final implementation in the abstract class BaseMergeEngine:
@final
def merge(self, left_data: Any, right_data: Any, jointype: JoinType, left_index: Index, right_index: Index) -> Any:
if jointype == JoinType.INNER:
return self.merge_inner(left_data, right_data, left_index, right_index)
if jointype == JoinType.LEFT:
return self.merge_left(left_data, right_data, left_index, right_index)
...
A simplified MergeEngine implementation looks like this:
class PandasMergeEngine(BaseMergeEngine):
def merge_inner(self, left_data: Any, right_data: Any, left_index: Index, right_index: Index) -> Any:
return self.join_logic("inner", left_data, right_data, left_index, right_index, JoinType.INNER)
def merge_left(...)
...
def merge_right(...)
...
def merge_outer(...)
...
def join_logic(
self, join_type: str, left_data: Any, right_data: Any, left_index: Index, right_index: Index, jointype: JoinType
) -> Any:
if left_index.is_multi_index() or right_index.is_multi_index():
raise ValueError(f"MultiIndex is not yet implemented {self.__class__.__name__}")
if left_index == right_index:
left_idx = left_index.index[0]
right_idx = right_index.index[0]
left_data = self.pd_merge()(left_data, right_data, left_on=left_idx, right_on=right_idx, how=join_type)
return left_data
else:
raise ValueError(
f"JoinType {join_type} {left_index} {right_index} are not yet implemented {self.__class__.__name__}"
)
Key Components:
- left_data: Left dataset for the join.
- right_data: Right dataset for the join.
- left_index and right_index: Indexes specifying join keys.
- jointype: Instance of JoinType.
By implementing these merge functionality, the compute framework automatically handles data merging operations in the background, aligning with the relationships defined by Index, JoinType, and Link.