Concepts
Pipeline
A pipeline is a functions defined on a dataset that describes how a dataset can be derived from one or more existing datasets. Let's look at an example:
Example
Imagine we have the following datasets defined in the system:
1@meta(owner="[email protected]")
2@source(webhook.endpoint("User"))
3@dataset
4class User:
5 uid: int = field(key=True)
6 dob: datetime
7 country: str
8 signup_time: datetime = field(timestamp=True)
9
10
11@meta(owner="[email protected]")
12@source(webhook.endpoint("Transaction"))
13@dataset
14class Transaction:
15 uid: int
16 amount: float
17 payment_country: str
18 merchant_id: int
19 timestamp: datetime
And we want to create a dataset which represents some stats about the transactions made by a user in a country different from their home country. We'd write that dataset as follows:
1@meta(owner="[email protected]")
2@dataset
3class UserTransactionsAbroad:
4 uid: int = field(key=True)
5 count: int
6 amount_1d: float
7 amount_1w: float
8 recent_merchant_ids: List[int]
9 timestamp: datetime
10
11 @classmethod
12 @pipeline(version=1)
13 @inputs(User, Transaction)
14 def first_pipeline(cls, user: Dataset, transaction: Dataset):
15 joined = transaction.join(user, how="left", on=["uid"])
16 abroad = joined.filter(
17 lambda df: df["country"] != df["payment_country"]
18 )
19 return abroad.groupby("uid").aggregate(
20 [
21 Count(window=Window("forever"), into_field="count"),
22 Sum(of="amount", window=Window("1d"), into_field="amount_1d"),
23 Sum(of="amount", window=Window("1w"), into_field="amount_1w"),
24 LastK(
25 of="merchant_id",
26 window=Window("1d"),
27 into_field="recent_merchant_ids",
28 limit=5,
29 dedup=True,
30 ),
31 ]
32 )
There is a lot happening here so let's break it down line by line:
- Lines 1-9 are defining a regular dataset. Just note that this dataset has the schema that we desire to create via the pipeline.
- Lines 14-32 describe the actual pipeline code - we'd come to that in a second.
- Line 11 declares that this is a classmethod - all pipelines are classmethods.
pipeline
decorator itself wrapsclassmethod
decorator so you can omitclassmethod
in practice - here it is shown for just describing the concept. - Line 12 declares that the decorated function represents a pipeline.
- Line 13 declares all the input datasets that are needed for this pipeline to
derive the output dataset. In this case, this pipeline is declaring that it
starts from
User
andTransaction
datasets. - Notice the signature of the pipeline function in line 14 - it takes 2 arguments
besides
cls
- they are essentially symbols forUser
dataset andTransaction
dataset respectively. - Pipeline's function body is able to manipulate these symbols and create other dataset
objects. For instance, line 15 joins these two datasets and the resulting dataset
is stored in variable
joined
. Line 16 does afilter
operation onjoined
and stores the result in another dataset calledabroad
. Finally lines 19-32 aggregate theabroad
dataset and create a dataset matching the schema defined in lines 1-9.
That's it - your first Fennel pipeline! Now let's look at a few more related ideas.
Operators
Fennel pipelines are built out of a few general purpose operators like filter
,
transform
, join
etc which can be composed together to write any pipeline.
You can read about all the operators here. Further,
a few operators (e.g. transform
, filter
) take free-form Python using which
arbitrary computation can be done (including making calls into external services
if needed). For all such operators, input/outputs variables are Pandas DataFrames
or Pandas Series. Here is an example with transform
operator demonstrating this:
1@meta(owner="[email protected]")
2@dataset
3class FraudActivityDataset:
4 timestamp: datetime
5 user_id: int
6 merchant_id: int
7 transaction_amount: float
8
9 @pipeline(version=1)
10 @inputs(Activity)
11 def create_fraud_dataset(cls, activity: Dataset):
12 def extract_info(df: pd.DataFrame) -> pd.DataFrame:
13 df_json = df["metadata"].apply(json.loads).apply(pd.Series)
14 df = pd.concat([df_json, df[["user_id", "timestamp"]]], axis=1)
15 df["transaction_amount"] = df["transaction_amount"] / 100
16 return df[
17 ["merchant_id", "transaction_amount", "user_id", "timestamp"]
18 ]
19
20 filtered_ds = activity.filter(lambda df: df["action_type"] == "report")
21 return filtered_ds.transform(
22 extract_info,
23 schema={
24 "transaction_amount": float,
25 "merchant_id": int,
26 "user_id": int,
27 "timestamp": datetime,
28 },
29 )
The ONLY constraint on the pipeline topology is that aggregate
has to be the
terminal node i.e. it's not allowed to compose any other operator on the output
of aggregate
operator. This constraint allows Fennel to
significantly reduce costs/performance of pipelines. And it's possible that even this
constraint will be removed in the future.
Power of Fennel Pipelines
Fennel pipelines have a bunch of really desirable properties:
-
Extremely declarative - you don't need to specify where/how pipelines should run, how much RAM they should take, how should they be partitioned if datasets are too big etc.
-
Python Native - as mentioned above, it's possible to run arbitrary Python computation in pipelines. You can even import your favorite packages to use and/or make external API calls.
-
Realtime - as soon as new data arrives in any input dataset, pipeline propagates the derived data downstream. The same mechanism works whether the input dataset is continuously getting data in realtime or if it gets data in a batch fashion. The pipeline code is same in both realtime and batch cases.
-
Immutable/versioned - it's not possible to modify pipeline code unless explicitly declaring the intent to make changes. As a result, situations where half the data was derived using an older undocumented version of code never happen.
-
Auto-backfilled - pipelines are backfilled automatically on declaration. There is no separate backfill operation.