API
Docs

Aggregate

Operator to do continuous window aggregations. Aggregate operator must always be preceded by a groupby operator.

Parameters

aggregates:List[Aggregation]

Positional argument specifying the list of aggregations to apply on the grouped dataset. This list can be passed either as an unpacked *args or as an explicit list as the first position argument.

See aggregations for the full list of aggregate functions.

along:Optional[str]

Keyword argument indicating the time axis to aggregate along. If along is None, Fennel will aggregate along the timestamp of the input dataset.

Returns

Dataset

Returns a dataset where all columns passed to groupby become the key columns, the timestamp column stays as it is and one column is created for each aggregation.

The type of each aggregated column depends on the aggregate and the type of the corresponding column in the input dataset.

Note

Aggregate is the terminal operator - no other operator can follow it and no other datasets can be derived from the dataset containing this pipeline.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Count,
7    Sum,
8)
9from fennel.dtypes import Continuous
10from fennel.lib import inputs
11from fennel.connectors import source, Webhook
12
13webhook = Webhook(name="webhook")
14
15@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
16@dataset
17class Transaction:
18    uid: int
19    amount: int
20    timestamp: datetime = field(timestamp=True)
21    transaction_time: datetime
22
23@dataset(index=True)
24class Aggregated:
25    # groupby field becomes the key field
26    uid: int = field(key=True)
27    # new fields are added to the dataset by the aggregate operation
28    total: int
29    count_1d: int
30    timestamp: datetime = field(timestamp=True)
31
32    @pipeline
33    @inputs(Transaction)
34    def aggregate_pipeline(cls, ds: Dataset):
35        return ds.groupby("uid").aggregate(
36            count_1d=Count(window=Continuous("forever")),
37            total=Sum(of="amount", window=Continuous("forever")),
38            along="transaction_time",
39        )
Aggregate count & sum of transactions in rolling windows along transaction time

python

Assign

Operator to add a new column to a dataset - the added column is neither a key column nor a timestamp column.

Parameters

name:str

The name of the new column to be added - must not conflict with any existing name on the dataset.

dtype:Type

The data type of the new column to be added - must be a valid Fennel supported data type.

func:Callable[pd.Dataframe, pd.Series[T]]

The function, which when given a subset of the dataset as a dataframe, returns the value of the new column for each row in the dataframe.

Fennel verifies at runtime that the returned series matches the declared dtype.

**kwargs:TypedExpression

Assign can also be given one or more expressions instead of Python lambdas - it can either have expressions or lambdas but not both. Expected types must also be present along with each expression (see example).

Unlike lambda based assign, all type validation and many other errors can be verified at the commit time itself (vs incurring runtime errors).

Returns

Dataset

Returns a dataset with one additional column of the given name and type same as dtype. This additional column is neither a key-column or the timestamp column.

Errors

Invalid series at runtime:

Runtime error if the value returned from the lambda isn't a pandas Series of the declared type and the same length as the input dataframe.

Invalid expression at import/commit time:

When using expressions, errors may be raised during the import or commit if types don't match and/or there are other validation errors related to the expressions.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def my_pipeline(cls, ds: Dataset):
24        return ds.assign("amount_sq", int, lambda df: df["amount"] ** 2)
Adding new column 'amount_sq' of type int

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
10@dataset
11class Transaction:
12    uid: int = field(key=True)
13    amount: int
14    timestamp: datetime
15
16@dataset(index=True)
17class WithSquare:
18    uid: int = field(key=True)
19    amount: int
20    amount_sq: int
21    amount_half: float
22    timestamp: datetime
23
24    @pipeline
25    @inputs(Transaction)
26    def my_pipeline(cls, ds: Dataset):
27        return ds.assign(
28            amount_sq=(col("amount") * col("amount")).astype(int),
29            amount_half=(col("amount") / 2).astype(float),
30        )
Adding two new columns using expressions

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset
15class WithHalf:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def my_pipeline(cls, ds: Dataset):
24        return ds.assign(
25            "amount_sq", int, lambda df: df["amount"] * 0.5
26        )
Runtime error: returns float, not int

python

1with pytest.raises(Exception):
2
3    from fennel.datasets import dataset, field, pipeline, Dataset
4    from fennel.lib import inputs
5    from fennel.connectors import source, Webhook
6    from fennel.expr import col
7
8    webhook = Webhook(name="webhook")
9
10    @source(webhook.endpoint("txn"), disorder="14d", cdc="upsert")
11    @dataset
12    class Transaction:
13        uid: int = field(key=True)
14        amount: int
15        timestamp: datetime
16
17    @dataset
18    class WithHalf:
19        uid: int = field(key=True)
20        amount: int
21        amount_sq: int
22        amount_half: int
23        timestamp: datetime
24
25        @pipeline
26        @inputs(Transaction)
27        def my_pipeline(cls, ds: Dataset):
28            return ds.assign(
29                amount_sq=(col("amount") * col("amount")).astype(int),
30                amount_half=(col("amount") / 2).astype(int),
31            )
Import error: age_half is expected to be int but expr evaluates to float

python

Changelog

Operator to convert a keyed dataset into a CDC changelog stream. All key fields are converted into normal fields, and an additional column is added, indicating the type of change (insert or delete) for the delta.

Parameters

delete:str

Kwarg that specifies the name of a boolean column which stores whether a delta was a delete kind in the original dataset. Exactly one of this or insert kwarg should be set.

insert:str

Kwarg that specifies the name of a boolean column which stores whether a delta was an insert kind in the original dataset. Exactly one of this or delete kwarg should be set.

Returns

Dataset

Returns a dataset with input keyed dataset into an append only CDC changelog stream. All key fields converted into normal fields, and an additional column is added, which contains the type of change (insert or delete) for the delta.

Errors

Neither insert nor delete kwarg is set:

Error if neither of insert or delete kwarg is set.

Both insert and delete kwargs are set:

Error if both insert and delete kwargs are set.

Dedup

Operator to dedup keyless datasets (e.g. event streams).

Parameters

by:Optional[List[str]]

Default: None

The list of columns to use for identifying duplicates. If not specified, all the columns are used for identifying duplicates.

If window is specified, two rows of the input dataset are considered duplicates when they are in the same window and have the same value for the by columns.

If window is not specified, two rows are considered duplicates when they have the exact same values for the timestamp column and all the by columns.

window:Optional[Tumbling | Session]

Default: None

The window to group rows for deduping. If not specified, the rows will be deduped only by the by columns and the timestamp.

Returns

Dataset

Returns a keyless dataset having the same schema as the input dataset but with some duplicated rows filtered out.

Errors

Dedup on dataset with key columns:

Commit error to apply dedup on a keyed dataset.

Dedup on hopping window or tumbling window with lookback:

Dedup on hopping window or tumbling window with lookback is not supported.

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    txid: int
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset
16class Deduped:
17    txid: int
18    uid: int
19    amount: int
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def dedup_pipeline(cls, ds: Dataset):
25        return ds.dedup(by="txid")
Dedup using txid and timestamp

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    txid: int
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset
16class Deduped:
17    txid: int
18    uid: int
19    amount: int
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def dedup_by_all_pipeline(cls, ds: Dataset):
25        return ds.dedup()
Dedup using all the fields

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Session
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    txid: int
12    uid: int
13    amount: int
14    timestamp: datetime
15
16@dataset
17class Deduped:
18    txid: int
19    uid: int
20    amount: int
21    timestamp: datetime
22
23    @pipeline
24    @inputs(Transaction)
25    def dedup_by_all_pipeline(cls, ds: Dataset):
26        return ds.dedup(by="txid", window=Session(gap="10s"))
Dedup using session window

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Tumbling
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    txid: int
12    uid: int
13    amount: int
14    timestamp: datetime
15
16@dataset
17class Deduped:
18    txid: int
19    uid: int
20    amount: int
21    timestamp: datetime
22
23    @pipeline
24    @inputs(Transaction)
25    def dedup_by_all_pipeline(cls, ds: Dataset):
26        return ds.dedup(by="txid", window=Tumbling(duration="10s"))
Dedup using tumbling window

python

Drop

Operator to drop one or more non-key non-timestamp columns from a dataset.

Parameters

columns:List[str]

List of columns in the incoming dataset that should be dropped. This can be passed either as unpacked *args or as a Python list.

Returns

Dataset

Returns a dataset with the same schema as the input dataset but with some columns (as specified by columns) removed.

Errors

Dropping key/timestamp columns:

Commit error on removing any key columns or the timestamp column.

Dropping non-existent columns:

Commit error on removing any column that doesn't exist in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    country: str
13    weight: float
14    height: float
15    gender: str
16    timestamp: datetime
17
18@dataset(index=True)
19class Dropped:
20    uid: int = field(key=True)
21    gender: str
22    timestamp: datetime
23
24    @pipeline
25    @inputs(User)
26    def drop_pipeline(cls, user: Dataset):
27        return user.drop("height", "weight").drop(
28            columns=["city", "country"]
29        )
Can pass names via *args or kwarg columns

python

1@source(webhook.endpoint("User"))
2@dataset
3class User:
4    uid: int = field(key=True)
5    city: str
6    timestamp: datetime
7
8@dataset
9class Dropped:
10    city: str
11    timestamp: datetime
12
13    @pipeline
14    @inputs(User)
15    def pipeline(cls, user: Dataset):
16        return user.drop("uid")
Can not drop key or timestamp columns

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    timestamp: datetime
13
14@dataset
15class Dropped:
16    uid: int = field(key=True)
17    city: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.drop("random")
Can not drop a non-existent column

python

Dropnull

Operator to drop rows containing null values (aka None in Python speak) in the given columns.

Parameters

columns:Optional[List[str]]

List of columns in the incoming dataset that should be checked for presence of None values - if any such column has None for a row, the row will be filtered out from the output dataset. This can be passed either as unpacked *args or as a Python list.

If no arguments are given, columns will be all columns with the type Optional[T] in the dataset.

Returns

Dataset

Returns a dataset with the same name & number of columns as the input dataset but with the type of some columns modified from Optional[T] -> T.

Errors

Dropnull on non-optional columns:

Commit error to pass a column without an optional type.

Dropnull on non-existent columns:

Commit error to pass a column that doesn't exist in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    dob: str
12    city: Optional[str]
13    country: Optional[str]
14    gender: Optional[str]
15    timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19    uid: int = field(key=True)
20    dob: str
21    # dropnull changes the type of the columns to non-optional
22    city: str
23    country: str
24    gender: Optional[str]
25    timestamp: datetime
26
27    @pipeline
28    @inputs(User)
29    def dropnull_pipeline(cls, user: Dataset):
30        return user.dropnull("city", "country")
Dropnull on city & country, but not gender

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    dob: str
12    city: Optional[str]
13    country: Optional[str]
14    gender: Optional[str]
15    timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19    uid: int = field(key=True)
20    dob: str
21    # dropnull changes the type of all optional columns to non-optional
22    city: str
23    country: str
24    gender: str
25    timestamp: datetime
26
27    @pipeline
28    @inputs(User)
29    def dropnull_pipeline(cls, user: Dataset):
30        return user.dropnull()
Applies to all optional columns if none is given explicitly

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: Optional[str]
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int = field(key=True)
17    city: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.select("random")
Dropnull on a non-existent column

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    # dropnull can only be used on optional columns
12    city: str
13    timestamp: datetime
14
15@dataset
16class Derived:
17    uid: int = field(key=True)
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.select("city")
Dropnull on a non-optional column

python

Explode

Operator to explode lists in a single row to form multiple rows, analogous to to the explodefunction in Pandas.

Only applicable to keyless datasets.

Parameters

columns:List[str]

The list of columns to explode. This list can be passed either as unpacked *args or kwarg columns mapping to an explicit list.

All the columns should be of type List[T] for some T in the input dataset and after explosion, they get converted to a column of type Optional[T].

Returns

Dataset

Returns a dataset with the same number & name of columns as the input dataset but with the type of exploded columns modified from List[T] to Optional[T].

Empty lists are converted to None values (hence the output types need to be Optional[T]).

Errors

Exploding keyed datasets:

Commit error to apply explode on an input dataset with key columns.

Exploding non-list columns:

Commit error to explode using a column that is not of the type List[T].

Exploding non-existent columns:

Commit error to explode using a column that is not present in the input dataset.

Unequal size lists in multi-column explode:

For a given row, all the columns getting exploded must have lists of the same length, otherwise a runtime error is raised. Note that the lists can be of different type across rows.

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    skus: List[int]
12    prices: List[float]
13    timestamp: datetime
14
15@dataset
16class Derived:
17    uid: int
18    sku: Optional[int]
19    price: Optional[float]
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Orders)
24    def explode_pipeline(cls, ds: Dataset):
25        return (
26            ds
27            .explode("skus", "prices").rename(
28                {"skus": "sku", "prices": "price"}
29            )
30        )
Exploding skus and prices together

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: float
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price")
Exploding a non-list column

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: List[float]
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price", "random")
Exploding a non-existent column

python

Filter

Operator to selectively filter out rows from a dataset.

Parameters

func:Callable[pd.Dataframe, pd.Series[bool]] | Expression

The actual filter function - takes a pandas dataframe containing a batch of rows from the input dataset and is expected to return a series of booleans of the same length. Only rows corresponding to True are retained in the output dataset.

Alternatively, can also be a Fennel expression.

Returns

Dataset

Returns a dataset with the same schema as the input dataset, just with some rows potentially filtered out.

Errors

Invalid series:

Runtime error if the value returned from the lambda isn't a pandas Series of the bool and of the same length as the input dataframe. When using expressions, any type and many other kinds of errors are caught at import or commit time statically.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    signup_time: datetime
13
14@dataset(index=True)
15class Filtered:
16    uid: int = field(key=True)
17    city: str
18    signup_time: datetime
19
20    @pipeline
21    @inputs(User)
22    def my_pipeline(cls, user: Dataset):
23        return user.filter(lambda df: df["city"] != "London")
Filtering out rows where city is London

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
10@dataset
11class User:
12    uid: int = field(key=True)
13    city: str
14    signup_time: datetime
15
16@dataset(index=True)
17class Filtered:
18    uid: int = field(key=True)
19    city: str
20    signup_time: datetime
21
22    @pipeline
23    @inputs(User)
24    def my_pipeline(cls, user: Dataset):
25        return user.filter(col("city") != "London")
Filtering out rows where city is London using expression

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    signup_time: datetime
13
14@dataset
15class Filtered:
16    uid: int = field(key=True)
17    city: str
18    signup_time: datetime
19
20    @pipeline
21    @inputs(User)
22    def my_pipeline(cls, user: Dataset):
23        return user.filter(lambda df: df["city"] + "London")
Runtime Error: Lambda returns str, not bool

python

First

Operator to find the first element of a group by the row timestamp. First operator must always be preceded by a groupby operator.

Parameters

The first operator does not take any parameters.

Returns

Dataset

The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.

For each group formed by grouping, one row is chosen having the lowest value in the timestamp field. In case of ties, the first seen row wins.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class FirstOnly:
16    uid: int = field(key=True)
17    amount: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def first_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").first()
Dataset with just the first transaction of each user

python

Groupby

Operator to group rows of incoming datasets to be processed by the next operator.

Technically, groupby isn't a standalone operator by itself since its output isn't a valid dataset. Instead, it becomes a valid operator when followed by first, latest, or aggregate.

Parameters

keys:List[str]

List of keys in the incoming dataset along which the rows should be grouped. This can be passed as unpacked *args or a Python list.

window:Optional[Union[Tumbling, Hopping, Session]]

Optional field to specify the default window for all the aggregations in the following aggregate operator. If window parameter is used then the operator can only be followed by an aggregate operator and window will become a key field in the output schema.

Errors

Grouping by non-existent columns:

Commit error if trying to group by columns that don't exist in the input dataset.

Grouping by timestamp column:

Commit error if trying to do a groupby via the timestamp column of the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    category: str
12    timestamp: datetime
13
14@dataset
15class FirstInCategory:
16    category: str = field(key=True)
17    uid: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def groupby_pipeline(cls, transactions: Dataset):
23        return transactions.groupby("category").first()
Groupby category before using first

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8    webhook.endpoint("Transaction"), disorder="14d", cdc="append"
9)
10@dataset
11class Transaction:
12    uid: int
13    category: str
14    timestamp: datetime
15
16@dataset
17class FirstInCategory:
18    category: str = field(key=True)
19    uid: int
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def bad_pipeline(cls, transactions: Dataset):
25        return transactions.groupby("non_existent_column").first()
Groupby using a non-existent column

python

Join

Operator to join two datasets. The right hand side dataset must have one or more key columns and the join operation is performed on these columns.

Parameters

dataset:Dataset

The right hand side dataset to join this dataset with. RHS dataset must be a keyed dataset and must also be an input to the pipeline (vs being an intermediary dataset derived within a pipeline itself).

how:"inner" | "left"

Required kwarg indicating whether the join should be an inner join (how="inner") or a left-outer join (how="left"). With "left", the output dataset may have a row even if there is no matching row on the right side.

on:Optional[List[str]]

Default: None

Kwarg that specifies the list of fields along which join should happen. If present, both left and right side datasets must have fields with these names and matching data types (data types on left hand side can be optional). This list must be identical to the names of all key columns of the right hand side.

If this isn't set, left_on and right_on must be set instead.

left_on:Optional[List[str]]

Default: None

Kwarg that specifies the list of fields from the left side dataset that should be used for joining. If this kwarg is set, right_on must also be set. Note that right_on must be identical to the names of all the key columns of the right side.

right_on:Optional[List[str]]

Default: None

Kwarg that specifies the list of fields from the right side dataset that should be used for joining. If this kwarg is setup, left_on must also be set. The length of left_on and right_on must be the same and corresponding fields on both sides must have the same data types.

within:Tuple[Duration, Duration]

Default: ("forever", "0s")

Optional kwarg specifying the time window relative to the left side timestamp within which the join should be performed. This can be seen as adding another condition to join like WHERE left_time - d1 < right_time AND right_time < left_time + d2 where (d1, d2) = within.

  • The first value in the tuple represents how far back in time should a join happen. The term "forever" means that we can go infinitely back in time when searching for an event to join from the left-hand side data.
  • The second value in the tuple represents how far ahead in time we can go to perform a join. This is useful in cases when the corresponding RHS data of the join can come later. The default value for this parameter is ("forever", "0s") which means that we can go infinitely back in time and the RHS data should be available for the event time of the LHS data.
fields:Optional[List[str]]

Default: None

Optional kwarg that specifies the list of (non-key) fields of the right dataset that should be included in the output dataset. If this kwarg is not set, all such fields are included in the output dataset. If right dataset's timestamp field is included in fields, then it is included as a normal field in the output dataset, with left dataset's timestamp field as the output dataset's timestamp field.

Returns

Dataset

Returns a dataset representing the joined dataset having the same keys & timestamp columns as the LHS dataset.

The output dataset has all the columns from the left dataset and all non-key non-timestamp columns from the right dataset.

If the join was of type inner, the type of a joined RHS column of type T stays T but if the join was of type left, the type in the output dataset becomes Optional[T] if it was T on the RHS side.

For LHS columns, the type is the same as the type in the LHS dataset if join type is left. If the join type is inner, if a join column on the LHS is Optional[T], then the type in the output dataset is T (i.e., the Optional is dropped).

Errors

Join with non-key dataset on the right side:

Commit error to do a join with a dataset that doesn't have key columns.

Join with intermediate dataset:

Commit error to do a join with a dataset that is not an input to the pipeline but instead is an intermediate dataset derived during the pipeline itself.

Post-join column name conflict:

Commit error if join will result in a dataset having two columns of the same name. A common way to work-around this is to rename columns via the rename operator before the join.

Mismatch in columns to be joined:

Commit error if the number/type of the join columns on the left and right side don't match.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    merchant: int
12    amount: int
13    timestamp: datetime
14
15@source(
16    webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
17)
18@dataset(index=True)
19class MerchantCategory:
20    # right side of the join can only be on key fields
21    merchant: int = field(key=True)
22    category: str
23    updated_at: datetime  # won't show up in joined dataset
24
25@dataset
26class WithCategory:
27    uid: int
28    merchant: int
29    amount: int
30    timestamp: datetime
31    category: str
32
33    @pipeline
34    @inputs(Transaction, MerchantCategory)
35    def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
36        return tx.join(merchant_category, on=["merchant"], how="inner")
Inner join on 'merchant'

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from typing import Optional
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    merchant: Optional[int]
13    amount: int
14    timestamp: datetime
15
16@source(
17    webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
18)
19@dataset(index=True)
20class MerchantCategory:
21    # right side of the join can only be on key fields
22    merchant: int = field(key=True)
23    category: str
24    updated_at: datetime  # won't show up in joined dataset
25
26@dataset
27class WithCategory:
28    uid: int
29    merchant: Optional[int]
30    amount: int
31    timestamp: datetime
32    category: Optional[str]
33
34    @pipeline
35    @inputs(Transaction, MerchantCategory)
36    def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
37        return tx.join(merchant_category, on=["merchant"], how="left")
Left join on 'merchant' with optional LHS fields

python

Latest

Operator to find the latest element of a group by the row timestamp. Latest operator must always be preceded by a groupby operator.

Latest operator is a good way to effectively convert a stream of only append to a time-aware upsert stream.

Parameters

The latest operator does not take any parameters.

Returns

Dataset

The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.

The row with the maximum timestamp is chosen for each group. In case of ties, the last seen row wins.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class LatestOnly:
16    uid: int = field(key=True)
17    amount: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def latest_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").latest()
Dataset with just the latest transaction of each user

python

Rename

Operator to rename columns of a dataset.

Parameters

columns:Dict[str, str]

Dictionary mapping from old column names to their new names.

All columns should still have distinct and valid names post renaming.

Returns

Dataset

Returns a dataset with the same schema as the input dataset, just with the columns renamed.

Errors

Renaming non-existent column:

Commit error if there is no existing column with name matching each of the keys in the rename dictionary.

Conflicting column names post-rename:

Commit error if after renaming, there will be two columns in the dataset having the same name.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    weight: float
12    height: float
13    timestamp: datetime
14
15@dataset(index=True)
16class Derived:
17    uid: int = field(key=True)
18    # rename changes the name of the columns
19    weight_lb: float
20    height_in: float
21    timestamp: datetime
22
23    @pipeline
24    @inputs(User)
25    def rename_pipeline(cls, user: Dataset):
26        return user.rename(
27            {"weight": "weight_lb", "height": "height_in"}
28        )
Rename weight -> weight_lb & height -> height_in

python

Select

Operator to select some columns from a dataset.

Parameters

columns:List[str]

List of columns in the incoming dataset that should be selected into the output dataset. This can be passed either as unpacked *args or as kwarg set to a Python list.

Returns

Dataset

Returns a dataset containing only the selected columns. Timestamp field is automatically included whether explicitly provided in the select or not.

Errors

Not selecting all key columns:

Select, like most other operators, can not change the key or timestamp columns. As a result, not selecting all the key columns is a commit error.

Selecting non-existent column:

Commit error to select a column that is not present in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    weight: float
12    height: float
13    city: str
14    country: str
15    gender: str
16    timestamp: datetime
17
18@dataset(index=True)
19class Selected:
20    uid: int = field(key=True)
21    weight: float
22    height: float
23    timestamp: datetime
24
25    @pipeline
26    @inputs(User)
27    def select_pipeline(cls, user: Dataset):
28        return user.select("uid", "height", "weight")
Selecting uid, height & weight columns

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    timestamp: datetime
13
14@dataset
15class Selected:
16    city: str
17    timestamp: datetime
18
19    @pipeline
20    @inputs(User)
21    def bad_pipeline(cls, user: Dataset):
22        return user.select("city")
Did not select key uid

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    timestamp: datetime
13
14@dataset
15class Selected:
16    uid: int = field(key=True)
17    city: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.select("uid", "random")
Selecting non-existent column

python

Transform

Catch all operator to add/remove/update columns.

Parameters

func:Callable[pd.Dataframe, pd.Dataframe]

The transform function that takes a pandas dataframe containing a batch of rows from the input dataset and returns an output dataframe of the same length, though potentially with different set of columns.

schema:Optional[Dict[str, Type]]

The expected schema of the output dataset. If not specified, the schema of the input dataset is used.

Returns

Dataset

Returns a dataset with the schema as specified in schema and rows as transformed by the transform function.

Errors

Output dataframe doesn't match the schema:

Runtime error if the dataframe returned by the transform function doesn't match the provided schema.

Modifying key/timestamp columns:

Commit error if transform tries to modify key/timestamp columns.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def transform_pipeline(cls, ds: Dataset):
24        schema = ds.schema()
25        schema["amount_sq"] = int
26        return ds.transform(
27            lambda df: df.assign(amount_sq=df["amount"] ** 2), schema
28        )
Adding column amount_sq

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8    webhook.endpoint("Transaction"), disorder="14d", cdc="upsert"
9)
10@dataset
11class Transaction:
12    uid: int = field(key=True)
13    amount: int
14    timestamp: datetime
15
16def transform(df: pd.DataFrame) -> pd.DataFrame:
17    df["user"] = df["uid"]
18    df.drop(columns=["uid"], inplace=True)
19    return df
20
21@dataset
22class Derived:
23    user: int = field(key=True)
24    amount: int
25    timestamp: datetime
26
27    @pipeline
28    @inputs(Transaction)
29    def bad_pipeline(cls, ds: Dataset):
30        schema = {"user": int, "amount": int, "timestamp": datetime}
31        return ds.transform(transform, schema)
Modifying key or timestamp columns

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset
15class WithHalf:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def invalid_pipeline(cls, ds: Dataset):
24        schema = ds.schema()
25        schema["amount_sq"] = int
26        return ds.transform(
27            lambda df: df.assign(amount_sq=str(df["amount"])), schema
28        )  # noqa
Runtime error: amount_sq is of type int, not str

python

Union

Operator to union rows from two datasets of the identical schema. Only applicable to keyless datasets. Written as simple + operator on two datasets.

Returns

Dataset

Returns a dataset with the same schema as both the input datasets but containing rows from both of them. If both contain the identical row, two copies of that row are present in the output datasets.

Errors

Taking union of datasets with different schemas:

Union operator is defined only when both the input datasets have the same schema. Commit error to apply union on input datasets with different schemas.

Taking union of keyed datasets:

Commit error to apply union on input datasets with key columns.

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, S3, Kafka
4
5cutoff = datetime(2024, 1, 1, 0, 0, 0)
6s3 = S3(name="mys3")
7bucket = s3.bucket("data", path="orders")
8kafka = Kafka(
9    name="my_kafka",
10    bootstrap_servers="localhost:9092",
11    security_protocol="SASL_PLAINTEXT",
12    sasl_mechanism="PLAIN",
13    sasl_plain_username=os.environ["KAFKA_USERNAME"],
14    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
15)
16
17@source(bucket, cdc="append", disorder="2d", until=cutoff)
18@dataset
19class OrdersBackfill:
20    uid: int
21    skuid: int
22    timestamp: datetime
23
24@source(kafka.topic("order"), cdc="append", disorder="1d", since=cutoff)
25@dataset
26class OrdersLive:
27    uid: int
28    skuid: int
29    timestamp: datetime
30
31@dataset
32class Union:
33    uid: int
34    skuid: int
35    timestamp: datetime
36
37    @pipeline
38    @inputs(OrdersBackfill, OrdersLive)
39    def explode_pipeline(cls, backfill: Dataset, live: Dataset):
40        return backfill + live
Union an s3 and kafka dataset

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: float
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price")
Exploding a non-list column

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: List[float]
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price", "random")
Exploding a non-existent column

python