Concepts
Pipeline
A pipeline is a function defined on a dataset that describes how it can be derived from one or more existing datasets. Let's look at an example.
Imagine we have the following datasets defined in the system:
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4webhook = Webhook(name="fennel_webhook")
5
6@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
7@dataset(index=True)
8class User:
9 uid: int = field(key=True)
10 dob: datetime
11 country: str
12 signup_time: datetime = field(timestamp=True)
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: float
19 payment_country: str
20 merchant_id: int
21 timestamp: datetime
python
And we want to create a dataset which represents some stats about the transactions made by a user in a country different from their home country. We will write that dataset as follows:
1from fennel.datasets import pipeline, Dataset, dataset, field
2from fennel.datasets import Count, Sum
3from fennel.dtypes import Continuous
4
5@dataset(index=True)
6class UserTransactionsAbroad:
7 uid: int = field(key=True)
8 count: int
9 amount_1d: float
10 amount_1w: float
11 timestamp: datetime
12
13 @pipeline
14 @inputs(User, Transaction)
15 def first_pipeline(cls, user: Dataset, transaction: Dataset):
16 joined = transaction.join(user, how="left", on=["uid"])
17 abroad = joined.filter(
18 lambda df: df["country"] != df["payment_country"]
19 )
20 return abroad.groupby("uid").aggregate(
21 count=Count(window=Continuous("forever")),
22 amount_1d=Sum(of="amount", window=Continuous("1d")),
23 amount_1w=Sum(of="amount", window=Continuous("1w")),
24 )
python
There is a lot happening here so let's break it down line by line:
- First, a regular dataset schema is defined - this dataset has the schema that we desire to create via the pipeline.
@pipeline
decorator signifies that the decorated method is a pipeline.@inputs
decorator declares the input datasets that are needed for this pipeline to derive the output dataset. In this case, this pipeline is declaring that it starts fromUser
andTransaction
datasets.- Notice the signature of the pipeline function - it takes 2 arguments
besides
cls
(every pipeline is a classmethod). They are essentially symbols forUser
dataset andTransaction
dataset respectively. - Pipeline's function body is able to manipulate these symbols and create other
dataset objects. For instance, these two datasets are joined via
join
and the resulting dataset is stored in variablejoined
followed by afilter
operation and agroupby
/aggregate
operation to create the dataset matching the intended schema.
That's it - your first Fennel pipeline! Now let's look at a few more related ideas.
Operators
Fennel pipelines are built out of a few general purpose operators like filter
,
transform
, join
etc which can be composed together to write any pipeline.
You can read about all the operators here.
Native Python Interop
Some of the operators (e.g. transform
, filter
) accept free-form Python
lambdas and can be used to do arbitrary computation (including making calls into
external services if needed). You can also import/use your favorite Python
libraries inside these lambdas - thereby extending interop to the full Python
ecosystem. For such Python based operators, input/outputs variables
are Pandas DataFrames or Pandas Series. Here is an example with filter
and
assign
operator:
1@dataset
2class FraudActivity:
3 uid: int
4 merchant_id: int
5 amount_cents: float
6 timestamp: datetime
7
8 @pipeline
9 @inputs(Activity)
10 def create_fraud_dataset(cls, activity: Dataset):
11 return (
12 activity.filter(lambda df: df["action_type"] == "report")
13 .assign(
14 "amount_cents",
15 float,
16 lambda df: df["amount"].astype(float) / 100,
17 )
18 .drop("action_type", "amount")
19 )
python
The only constraint on the pipeline topology is that aggregate
has to be the
terminal node i.e. it's not allowed to compose any other operator on the output
of aggregate
operator. This constraint allows Fennel to
significantly reduce costs/performance of doing continuous sliding aggregations.
And it's possible that even this constraint will be removed in the future.
Expressions
While Fennel lets you write free-form Python using lambdas, Fennel also offers a native expressions API (similar to engines like Spark or Polars) for more structured operations. The above example can be rewritten using expressions as follows:
1from fennel.expr import col
2
3@dataset
4class FraudActivityDataset:
5 uid: int
6 merchant_id: int
7 amount_cents: float
8 timestamp: datetime
9
10 @pipeline
11 @inputs(Activity)
12 def create_fraud_dataset(cls, activity: Dataset):
13 return (
14 activity
15 .filter(col("action_type") == "report").assign(
16 amount_cents=(
17 col("amount").str.parse(float) / 100.0
18 ).astype(float)
19 )
20 .drop("action_type", "amount")
21 )
python
These expressions are structured and hence can be fully executed in Fennel's Rust engine with zero dependency on Python at the runtime, making them 10-100x faster. In addition, they are able to catch almost all errors at the compile time itself, hence improve the reliability of your pipelines.
The recommendation is to use expressions whenever you can and fall back to free-form Python lambdas when you want to do something that can not be done using the expressions.
Windowing
Typically, streaming engines support three types of windows:
- Tumbling - fixed length, non-overlapping window. With 10 min windows, when queried at 10:03 min & 10 sec, it will look at period between 9:50-10:00am.
- Hopping (aka sliding) - fixed length but overlapping windows. With 10 min windows with 2 min stride, when queried at 10:03min & 10 sec, it will look at period between 9:52-10:02am.
- Session - dynamic length based on user events. Often used for very specific purposes different from other kinds of windows.
Fennel supports all of these. However, Machine learning use cases often require another kind of window - Continuous.
Continuous Windows
As the name implies, with a 10 min continuous windows, a query at 10:03min & 10sec should cover the period from 9:53 min & 10sec to 10:03 min & 10 sec. Clearly, continuous windows are lot harder to support efficiently and require some read side aggregation.
A common way of supporting continuous windows is to store all the raw events in the window and aggregate them (say sum them up) during the request. This is an O(N) operation and for high volume and/or long windows, this can get very slow.
To avoid this, Fennel does partial aggregation in tiles during the write path itself and does the final assembly of all tiles during the read path - this makes final assembly O(1) while incurring at most 1% error.
Because of this trick, calculating continuous windows becomes sufficiently efficient. And given their ability to capture more current data, they are the most common/default windows in Fennel and are supported via the aggregate operator.
Power of Fennel Pipelines
Fennel pipelines have a bunch of really desirable properties:
-
Extremely declarative - you don't need to specify where/how pipelines should run, how much RAM they should take, how should they be partitioned if datasets are too big etc.
-
Python Native - as mentioned above, it's possible to run arbitrary Python computation using any Python libraries.
-
Realtime - as soon as new data arrives in any input dataset, pipeline propagates the derived data downstream. The same mechanism works whether the input dataset is continuously getting data in realtime or if it gets data in a batch fashion. The pipeline code is same in both realtime and batch cases.
-
Immutable/versioned - it's not possible to modify pipeline code unless explicitly declaring the intent to make changes. As a result, situations where half the data was derived using an older undocumented version of code never happen.
-
Auto-backfilled - pipelines are backfilled automatically on declaration. There is no separate backfill operation.