A pipeline is a functions defined on a dataset that describes how a dataset can be derived from one or more existing datasets. Let's look at an example:


Imagine we have the following datasets defined in the system:
1@meta(owner="[email protected]")
4class User:
5    uid: int = field(key=True)
6    dob: datetime
7    country: str
8    signup_time: datetime = field(timestamp=True)
11@meta(owner="[email protected]")
14class Transaction:
15    uid: int
16    amount: float
17    payment_country: str
18    merchant_id: int
19    timestamp: datetime

And we want to create a dataset which represents some stats about the transactions made by a user in a country different from their home country. We'd write that dataset as follows:

1@meta(owner="[email protected]")
3class UserTransactionsAbroad:
4    uid: int = field(key=True)
5    count: int
6    amount_1d: float
7    amount_1w: float
8    recent_merchant_ids: List[int]
9    timestamp: datetime
11    @classmethod
12    @pipeline(version=1)
13    @inputs(User, Transaction)
14    def first_pipeline(cls, user: Dataset, transaction: Dataset):
15        joined = transaction.join(user, how="left", on=["uid"])
16        abroad = joined.filter(
17            lambda df: df["country"] != df["payment_country"]
18        )
19        return abroad.groupby("uid").aggregate(
20            [
21                Count(window=Window("forever"), into_field="count"),
22                Sum(of="amount", window=Window("1d"), into_field="amount_1d"),
23                Sum(of="amount", window=Window("1w"), into_field="amount_1w"),
24                LastK(
25                    of="merchant_id",
26                    window=Window("1d"),
27                    into_field="recent_merchant_ids",
28                    limit=5,
29                    dedup=True,
30                ),
31            ]
32        )

There is a lot happening here so let's break it down line by line:

  • Lines 1-9 are defining a regular dataset. Just note that this dataset has the schema that we desire to create via the pipeline.
  • Lines 14-32 describe the actual pipeline code - we'd come to that in a second.
  • Line 11 declares that this is a classmethod - all pipelines are classmethods. pipeline decorator itself wraps classmethod decorator so you can omit classmethod in practice - here it is shown for just describing the concept.
  • Line 12 declares that the decorated function represents a pipeline.
  • Line 13 declares all the input datasets that are needed for this pipeline to derive the output dataset. In this case, this pipeline is declaring that it starts from User and Transaction datasets.
  • Notice the signature of the pipeline function in line 14 - it takes 2 arguments besides cls - they are essentially symbols for User dataset and Transaction dataset respectively.
  • Pipeline's function body is able to manipulate these symbols and create other dataset objects. For instance, line 15 joins these two datasets and the resulting dataset is stored in variable joined. Line 16 does a filter operation on joined and stores the result in another dataset called abroad. Finally lines 19-32 aggregate the abroad dataset and create a dataset matching the schema defined in lines 1-9.

That's it - your first Fennel pipeline! Now let's look at a few more related ideas.


Fennel pipelines are built out of a few general purpose operators like filter, transform, join etc which can be composed together to write any pipeline. You can read about all the operators here. Further, a few operators (e.g. transform, filter) take free-form Python using which arbitrary computation can be done (including making calls into external services if needed). For all such operators, input/outputs variables are Pandas DataFrames or Pandas Series. Here is an example with transform operator demonstrating this:

1@meta(owner="[email protected]")
3class FraudActivityDataset:
4    timestamp: datetime
5    user_id: int
6    merchant_id: int
7    transaction_amount: float
9    @pipeline(version=1)
10    @inputs(Activity)
11    def create_fraud_dataset(cls, activity: Dataset):
12        def extract_info(df: pd.DataFrame) -> pd.DataFrame:
13            df_json = df["metadata"].apply(json.loads).apply(pd.Series)
14            df = pd.concat([df_json, df[["user_id", "timestamp"]]], axis=1)
15            df["transaction_amount"] = df["transaction_amount"] / 100
16            return df[
17                ["merchant_id", "transaction_amount", "user_id", "timestamp"]
18            ]
20        filtered_ds = activity.filter(lambda df: df["action_type"] == "report")
21        return filtered_ds.transform(
22            extract_info,
23            schema={
24                "transaction_amount": float,
25                "merchant_id": int,
26                "user_id": int,
27                "timestamp": datetime,
28            },
29        )

The ONLY constraint on the pipeline topology is that aggregate has to be the terminal node i.e. it's not allowed to compose any other operator on the output of aggregate operator. This constraint allows Fennel to significantly reduce costs/performance of pipelines. And it's possible that even this constraint will be removed in the future.

Power of Fennel Pipelines

Fennel pipelines have a bunch of really desirable properties:

  1. Extremely declarative - you don't need to specify where/how pipelines should run, how much RAM they should take, how should they be partitioned if datasets are too big etc.

  2. Python Native - as mentioned above, it's possible to run arbitrary Python computation in pipelines. You can even import your favorite packages to use and/or make external API calls.

  3. Realtime - as soon as new data arrives in any input dataset, pipeline propagates the derived data downstream. The same mechanism works whether the input dataset is continuously getting data in realtime or if it gets data in a batch fashion. The pipeline code is same in both realtime and batch cases.

  4. Immutable/versioned - it's not possible to modify pipeline code unless explicitly declaring the intent to make changes. As a result, situations where half the data was derived using an older undocumented version of code never happen.

  5. Auto-backfilled - pipelines are backfilled automatically on declaration. There is no separate backfill operation.