Docs
API

Concepts

Pipeline

A pipeline is a function defined on a dataset that describes how it can be derived from one or more existing datasets. Let's look at an example.

Imagine we have the following datasets defined in the system:

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4webhook = Webhook(name="fennel_webhook")
5
6@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
7@dataset(index=True)
8class User:
9    uid: int = field(key=True)
10    dob: datetime
11    country: str
12    signup_time: datetime = field(timestamp=True)
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amount: float
19    payment_country: str
20    merchant_id: int
21    timestamp: datetime

python

And we want to create a dataset which represents some stats about the transactions made by a user in a country different from their home country. We will write that dataset as follows:

1from fennel.datasets import pipeline, Dataset, dataset, field
2from fennel.datasets import Count, Sum
3from fennel.dtypes import Continuous
4
5@dataset(index=True)
6class UserTransactionsAbroad:
7    uid: int = field(key=True)
8    count: int
9    amount_1d: float
10    amount_1w: float
11    timestamp: datetime
12
13    @pipeline
14    @inputs(User, Transaction)
15    def first_pipeline(cls, user: Dataset, transaction: Dataset):
16        joined = transaction.join(user, how="left", on=["uid"])
17        abroad = joined.filter(
18            lambda df: df["country"] != df["payment_country"]
19        )
20        return abroad.groupby("uid").aggregate(
21            count=Count(window=Continuous("forever")),
22            amount_1d=Sum(of="amount", window=Continuous("1d")),
23            amount_1w=Sum(of="amount", window=Continuous("1w")),
24        )

python

There is a lot happening here so let's break it down line by line:

  • First, a regular dataset schema is defined - this dataset has the schema that we desire to create via the pipeline.
  • @pipeline decorator signifies that the decorated method is a pipeline.
  • @inputs decorator declares the input datasets that are needed for this pipeline to derive the output dataset. In this case, this pipeline is declaring that it starts from User and Transaction datasets.
  • Notice the signature of the pipeline function - it takes 2 arguments besides cls (every pipeline is a classmethod). They are essentially symbols for User dataset and Transaction dataset respectively.
  • Pipeline's function body is able to manipulate these symbols and create other dataset objects. For instance, these two datasets are joined via join and the resulting dataset is stored in variable joined followed by afilter operation and a groupby / aggregate operation to create the dataset matching the intended schema.

That's it - your first Fennel pipeline! Now let's look at a few more related ideas.

Operators

Fennel pipelines are built out of a few general purpose operators like filter, transform, join etc which can be composed together to write any pipeline. You can read about all the operators here.

Native Python Interop

Some of the operators (e.g. transform, filter) accept free-form Python lambdas and can be used to do arbitrary computation (including making calls into external services if needed). You can also import/use your favorite Python libraries inside these lambdas - thereby extending interop to the full Python ecosystem. For such Python based operators, input/outputs variables are Pandas DataFrames or Pandas Series. Here is an example with filter and assign operator:

1@dataset
2class FraudActivity:
3    uid: int
4    merchant_id: int
5    amount_cents: float
6    timestamp: datetime
7
8    @pipeline
9    @inputs(Activity)
10    def create_fraud_dataset(cls, activity: Dataset):
11        return (
12            activity.filter(lambda df: df["action_type"] == "report")
13            .assign(
14                "amount_cents",
15                float,
16                lambda df: df["amount"].astype(float) / 100,
17            )
18            .drop("action_type", "amount")
19        )
Using free-form Python in pipelines

python

The only constraint on the pipeline topology is that aggregate has to be the terminal node i.e. it's not allowed to compose any other operator on the output of aggregate operator. This constraint allows Fennel to significantly reduce costs/performance of doing continuous sliding aggregations. And it's possible that even this constraint will be removed in the future.

Expressions

While Fennel lets you write free-form Python using lambdas, Fennel also offers a native expressions API (similar to engines like Spark or Polars) for more structured operations. The above example can be rewritten using expressions as follows:

1from fennel.expr import col
2
3@dataset
4class FraudActivityDataset:
5    uid: int
6    merchant_id: int
7    amount_cents: float
8    timestamp: datetime
9
10    @pipeline
11    @inputs(Activity)
12    def create_fraud_dataset(cls, activity: Dataset):
13        return (
14            activity
15            .filter(col("action_type") == "report").assign(
16                amount_cents=(
17                    col("amount").str.parse(float) / 100.0
18                ).astype(float)
19            )
20            .drop("action_type", "amount")
21        )
Using expressions in pipelines

python

These expressions are structured and hence can be fully executed in Fennel's Rust engine with zero dependency on Python at the runtime, making them 10-100x faster. In addition, they are able to catch almost all errors at the compile time itself, hence improve the reliability of your pipelines.

The recommendation is to use expressions whenever you can and fall back to free-form Python lambdas when you want to do something that can not be done using the expressions.

Windowing

Typically, streaming engines support three types of windows:

  1. Tumbling - fixed length, non-overlapping window. With 10 min windows, when queried at 10:03 min & 10 sec, it will look at period between 9:50-10:00am.
  2. Hopping (aka sliding) - fixed length but overlapping windows. With 10 min windows with 2 min stride, when queried at 10:03min & 10 sec, it will look at period between 9:52-10:02am.
  3. Session - dynamic length based on user events. Often used for very specific purposes different from other kinds of windows.

Fennel supports all of these. However, Machine learning use cases often require another kind of window - Continuous.

Continuous Windows

As the name implies, with a 10 min continuous windows, a query at 10:03min & 10sec should cover the period from 9:53 min & 10sec to 10:03 min & 10 sec. Clearly, continuous windows are lot harder to support efficiently and require some read side aggregation.

A common way of supporting continuous windows is to store all the raw events in the window and aggregate them (say sum them up) during the request. This is an O(N) operation and for high volume and/or long windows, this can get very slow.

To avoid this, Fennel does partial aggregation in tiles during the write path itself and does the final assembly of all tiles during the read path - this makes final assembly O(1) while incurring at most 1% error.

Because of this trick, calculating continuous windows becomes sufficiently efficient. And given their ability to capture more current data, they are the most common/default windows in Fennel and are supported via the aggregate operator.

Power of Fennel Pipelines

Fennel pipelines have a bunch of really desirable properties:

  1. Extremely declarative - you don't need to specify where/how pipelines should run, how much RAM they should take, how should they be partitioned if datasets are too big etc.

  2. Python Native - as mentioned above, it's possible to run arbitrary Python computation using any Python libraries.

  3. Realtime - as soon as new data arrives in any input dataset, pipeline propagates the derived data downstream. The same mechanism works whether the input dataset is continuously getting data in realtime or if it gets data in a batch fashion. The pipeline code is same in both realtime and batch cases.

  4. Immutable/versioned - it's not possible to modify pipeline code unless explicitly declaring the intent to make changes. As a result, situations where half the data was derived using an older undocumented version of code never happen.

  5. Auto-backfilled - pipelines are backfilled automatically on declaration. There is no separate backfill operation.

On This Page

Edit this Page on Github