Aggregate
Operator to do continuous window aggregations. Aggregate operator must always be preceded by a groupby operator.
Parameters
Positional argument specifying the list of aggregations to apply on the grouped dataset. This list can be passed either as an unpacked *args or as an explicit list as the first position argument.
See aggregations for the full list of aggregate functions.
Keyword argument indicating the time axis to aggregate along. If along
is None
, Fennel will aggregate along the timestamp of the input dataset.
Returns
Returns a dataset where all columns passed to groupby
become the key columns,
the timestamp column stays as it is and one column is created for each aggregation.
The type of each aggregated column depends on the aggregate and the type of the corresponding column in the input dataset.
Aggregate is the terminal operator - no other operator can follow it and no other datasets can be derived from the dataset containing this pipeline.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Count,
7 Sum,
8)
9from fennel.dtypes import Continuous
10from fennel.lib import inputs
11from fennel.connectors import source, Webhook
12
13webhook = Webhook(name="webhook")
14
15@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
16@dataset
17class Transaction:
18 uid: int
19 amount: int
20 timestamp: datetime = field(timestamp=True)
21 transaction_time: datetime
22
23@dataset(index=True)
24class Aggregated:
25 # groupby field becomes the key field
26 uid: int = field(key=True)
27 # new fields are added to the dataset by the aggregate operation
28 total: int
29 count_1d: int
30 timestamp: datetime = field(timestamp=True)
31
32 @pipeline
33 @inputs(Transaction)
34 def aggregate_pipeline(cls, ds: Dataset):
35 return ds.groupby("uid").aggregate(
36 count_1d=Count(window=Continuous("forever")),
37 total=Sum(of="amount", window=Continuous("forever")),
38 along="transaction_time",
39 )
python
Assign
Operator to add a new column to a dataset - the added column is neither a key column nor a timestamp column.
Parameters
The name of the new column to be added - must not conflict with any existing name on the dataset.
The data type of the new column to be added - must be a valid Fennel supported data type.
The function, which when given a subset of the dataset as a dataframe, returns the value of the new column for each row in the dataframe.
Fennel verifies at runtime that the returned series matches the declared dtype
.
Assign can also be given one or more expressions instead of Python lambdas - it can either have expressions or lambdas but not both. Expected types must also be present along with each expression (see example).
Unlike lambda based assign, all type validation and many other errors can be verified at the commit time itself (vs incurring runtime errors).
Returns
Returns a dataset with one additional column of the given name
and type same
as dtype
. This additional column is neither a key-column or the timestamp
column.
Errors
Runtime error if the value returned from the lambda isn't a pandas Series of the declared type and the same length as the input dataframe.
When using expressions, errors may be raised during the import or commit if types don't match and/or there are other validation errors related to the expressions.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def my_pipeline(cls, ds: Dataset):
24 return ds.assign("amount_sq", int, lambda df: df["amount"] ** 2)
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
10@dataset
11class Transaction:
12 uid: int = field(key=True)
13 amount: int
14 timestamp: datetime
15
16@dataset(index=True)
17class WithSquare:
18 uid: int = field(key=True)
19 amount: int
20 amount_sq: int
21 amount_half: float
22 timestamp: datetime
23
24 @pipeline
25 @inputs(Transaction)
26 def my_pipeline(cls, ds: Dataset):
27 return ds.assign(
28 amount_sq=(col("amount") * col("amount")).astype(int),
29 amount_half=(col("amount") / 2).astype(float),
30 )
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset
15class WithHalf:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def my_pipeline(cls, ds: Dataset):
24 return ds.assign(
25 "amount_sq", int, lambda df: df["amount"] * 0.5
26 )
python
1with pytest.raises(Exception):
2
3 from fennel.datasets import dataset, field, pipeline, Dataset
4 from fennel.lib import inputs
5 from fennel.connectors import source, Webhook
6 from fennel.expr import col
7
8 webhook = Webhook(name="webhook")
9
10 @source(webhook.endpoint("txn"), disorder="14d", cdc="upsert")
11 @dataset
12 class Transaction:
13 uid: int = field(key=True)
14 amount: int
15 timestamp: datetime
16
17 @dataset
18 class WithHalf:
19 uid: int = field(key=True)
20 amount: int
21 amount_sq: int
22 amount_half: int
23 timestamp: datetime
24
25 @pipeline
26 @inputs(Transaction)
27 def my_pipeline(cls, ds: Dataset):
28 return ds.assign(
29 amount_sq=(col("amount") * col("amount")).astype(int),
30 amount_half=(col("amount") / 2).astype(int),
31 )
python
Changelog
Operator to convert a keyed dataset into a CDC changelog stream. All key fields are converted into normal fields, and an additional column is added, indicating the type of change (insert or delete) for the delta.
Parameters
Kwarg that specifies the name of a boolean column which stores whether a delta was
a delete kind in the original dataset. Exactly one of this or insert
kwarg
should be set.
Kwarg that specifies the name of a boolean column which stores whether a delta was
an insert kind in the original dataset. Exactly one of this or delete
kwarg
should be set.
Returns
Returns a dataset with input keyed dataset into an append only CDC changelog stream. All key fields converted into normal fields, and an additional column is added, which contains the type of change (insert or delete) for the delta.
Errors
Error if neither of insert
or delete
kwarg is set.
Error if both insert
and delete
kwargs are set.
Dedup
Operator to dedup keyless datasets (e.g. event streams).
Parameters
Default: None
The list of columns to use for identifying duplicates. If not specified, all the columns are used for identifying duplicates.
If window is specified, two rows of the input dataset are considered duplicates when they are in the same window and have the same value for the by columns.
If window is not specified, two rows are considered duplicates when they have the exact same values for the timestamp column and all the by columns.
Default: None
The window to group rows for deduping. If not specified, the rows will be deduped only by the by
columns and the timestamp.
Returns
Returns a keyless dataset having the same schema as the input dataset but with some duplicated rows filtered out.
Errors
Commit error to apply dedup on a keyed dataset.
Dedup on hopping window or tumbling window with lookback is not supported.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 txid: int
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Deduped:
17 txid: int
18 uid: int
19 amount: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def dedup_pipeline(cls, ds: Dataset):
25 return ds.dedup(by="txid")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 txid: int
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Deduped:
17 txid: int
18 uid: int
19 amount: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def dedup_by_all_pipeline(cls, ds: Dataset):
25 return ds.dedup()
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Session
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 txid: int
12 uid: int
13 amount: int
14 timestamp: datetime
15
16@dataset
17class Deduped:
18 txid: int
19 uid: int
20 amount: int
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def dedup_by_all_pipeline(cls, ds: Dataset):
26 return ds.dedup(by="txid", window=Session(gap="10s"))
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Tumbling
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 txid: int
12 uid: int
13 amount: int
14 timestamp: datetime
15
16@dataset
17class Deduped:
18 txid: int
19 uid: int
20 amount: int
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def dedup_by_all_pipeline(cls, ds: Dataset):
26 return ds.dedup(by="txid", window=Tumbling(duration="10s"))
python
Drop
Operator to drop one or more non-key non-timestamp columns from a dataset.
Parameters
List of columns in the incoming dataset that should be dropped. This can be passed either as unpacked *args or as a Python list.
Returns
Returns a dataset with the same schema as the input dataset but with some columns
(as specified by columns
) removed.
Errors
Commit error on removing any key columns or the timestamp column.
Commit error on removing any column that doesn't exist in the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 country: str
13 weight: float
14 height: float
15 gender: str
16 timestamp: datetime
17
18@dataset(index=True)
19class Dropped:
20 uid: int = field(key=True)
21 gender: str
22 timestamp: datetime
23
24 @pipeline
25 @inputs(User)
26 def drop_pipeline(cls, user: Dataset):
27 return user.drop("height", "weight").drop(
28 columns=["city", "country"]
29 )
python
1@source(webhook.endpoint("User"))
2@dataset
3class User:
4 uid: int = field(key=True)
5 city: str
6 timestamp: datetime
7
8@dataset
9class Dropped:
10 city: str
11 timestamp: datetime
12
13 @pipeline
14 @inputs(User)
15 def pipeline(cls, user: Dataset):
16 return user.drop("uid")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Dropped:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.drop("random")
python
Dropnull
Operator to drop rows containing null values (aka None in Python speak) in the given columns.
Parameters
List of columns in the incoming dataset that should be checked for presence of None values - if any such column has None for a row, the row will be filtered out from the output dataset. This can be passed either as unpacked *args or as a Python list.
If no arguments are given, columns
will be all columns with the type Optional[T]
in the dataset.
Returns
Returns a dataset with the same name & number of columns as the input dataset but
with the type of some columns modified from Optional[T]
-> T
.
Errors
Commit error to pass a column without an optional type.
Commit error to pass a column that doesn't exist in the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 dob: str
12 city: Optional[str]
13 country: Optional[str]
14 gender: Optional[str]
15 timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19 uid: int = field(key=True)
20 dob: str
21 # dropnull changes the type of the columns to non-optional
22 city: str
23 country: str
24 gender: Optional[str]
25 timestamp: datetime
26
27 @pipeline
28 @inputs(User)
29 def dropnull_pipeline(cls, user: Dataset):
30 return user.dropnull("city", "country")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 dob: str
12 city: Optional[str]
13 country: Optional[str]
14 gender: Optional[str]
15 timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19 uid: int = field(key=True)
20 dob: str
21 # dropnull changes the type of all optional columns to non-optional
22 city: str
23 country: str
24 gender: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(User)
29 def dropnull_pipeline(cls, user: Dataset):
30 return user.dropnull()
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: Optional[str]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("random")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 # dropnull can only be used on optional columns
12 city: str
13 timestamp: datetime
14
15@dataset
16class Derived:
17 uid: int = field(key=True)
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("city")
python
Explode
Operator to explode lists in a single row to form multiple rows, analogous to
to the explode
function in Pandas.
Only applicable to keyless datasets.
Parameters
The list of columns to explode. This list can be passed either as
unpacked *args or kwarg columns
mapping to an explicit list.
All the columns should be of type List[T]
for some T
in the input dataset and
after explosion, they get converted to a column of type Optional[T]
.
Returns
Returns a dataset with the same number & name of columns as the input dataset but
with the type of exploded columns modified from List[T]
to Optional[T]
.
Empty lists are converted to None
values (hence the output types need to be
Optional[T]
).
Errors
Commit error to apply explode on an input dataset with key columns.
Commit error to explode using a column that is not of the type List[T]
.
Commit error to explode using a column that is not present in the input dataset.
For a given row, all the columns getting exploded must have lists of the same length, otherwise a runtime error is raised. Note that the lists can be of different type across rows.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 skus: List[int]
12 prices: List[float]
13 timestamp: datetime
14
15@dataset
16class Derived:
17 uid: int
18 sku: Optional[int]
19 price: Optional[float]
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Orders)
24 def explode_pipeline(cls, ds: Dataset):
25 return (
26 ds
27 .explode("skus", "prices").rename(
28 {"skus": "sku", "prices": "price"}
29 )
30 )
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: float
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: List[float]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price", "random")
python
Filter
Operator to selectively filter
out rows from a dataset.
Parameters
The actual filter function - takes a pandas dataframe containing a batch of rows
from the input dataset and is expected to return a series of booleans of the
same length. Only rows corresponding to True
are retained in the output dataset.
Alternatively, can also be a Fennel expression.
Returns
Returns a dataset with the same schema as the input dataset, just with some rows potentially filtered out.
Errors
Runtime error if the value returned from the lambda isn't a pandas Series of the bool and of the same length as the input dataframe. When using expressions, any type and many other kinds of errors are caught at import or commit time statically.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 signup_time: datetime
13
14@dataset(index=True)
15class Filtered:
16 uid: int = field(key=True)
17 city: str
18 signup_time: datetime
19
20 @pipeline
21 @inputs(User)
22 def my_pipeline(cls, user: Dataset):
23 return user.filter(lambda df: df["city"] != "London")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
10@dataset
11class User:
12 uid: int = field(key=True)
13 city: str
14 signup_time: datetime
15
16@dataset(index=True)
17class Filtered:
18 uid: int = field(key=True)
19 city: str
20 signup_time: datetime
21
22 @pipeline
23 @inputs(User)
24 def my_pipeline(cls, user: Dataset):
25 return user.filter(col("city") != "London")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 signup_time: datetime
13
14@dataset
15class Filtered:
16 uid: int = field(key=True)
17 city: str
18 signup_time: datetime
19
20 @pipeline
21 @inputs(User)
22 def my_pipeline(cls, user: Dataset):
23 return user.filter(lambda df: df["city"] + "London")
python
First
Operator to find the first element of a group by the row timestamp. First operator must always be preceded by a groupby operator.
Parameters
The first
operator does not take any parameters.
Returns
The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.
For each group formed by grouping, one row is chosen having the lowest value in the timestamp field. In case of ties, the first seen row wins.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class FirstOnly:
16 uid: int = field(key=True)
17 amount: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def first_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").first()
python
Groupby
Operator to group rows of incoming datasets to be processed by the next operator.
Technically, groupby isn't a standalone operator by itself since its output isn't a valid dataset. Instead, it becomes a valid operator when followed by first, latest, or aggregate.
Parameters
List of keys in the incoming dataset along which the rows should be grouped. This can be passed as unpacked *args or a Python list.
Optional field to specify the default window for all the aggregations in the following aggregate operator. If window parameter is used then the operator can only be followed by an aggregate operator and window will become a key field in the output schema.
Errors
Commit error if trying to group by columns that don't exist in the input dataset.
Commit error if trying to do a groupby via the timestamp column of the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 category: str
12 timestamp: datetime
13
14@dataset
15class FirstInCategory:
16 category: str = field(key=True)
17 uid: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def groupby_pipeline(cls, transactions: Dataset):
23 return transactions.groupby("category").first()
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8 webhook.endpoint("Transaction"), disorder="14d", cdc="append"
9)
10@dataset
11class Transaction:
12 uid: int
13 category: str
14 timestamp: datetime
15
16@dataset
17class FirstInCategory:
18 category: str = field(key=True)
19 uid: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def bad_pipeline(cls, transactions: Dataset):
25 return transactions.groupby("non_existent_column").first()
python
Join
Operator to join two datasets. The right hand side dataset must have one or more key columns and the join operation is performed on these columns.
Parameters
The right hand side dataset to join this dataset with. RHS dataset must be a keyed dataset and must also be an input to the pipeline (vs being an intermediary dataset derived within a pipeline itself).
Required kwarg indicating whether the join should be an inner join (how="inner"
)
or a left-outer join (how="left"
). With "left"
, the output dataset may have
a row even if there is no matching row on the right side.
Default: None
Kwarg that specifies the list of fields along which join should happen. If present, both left and right side datasets must have fields with these names and matching data types (data types on left hand side can be optional). This list must be identical to the names of all key columns of the right hand side.
If this isn't set, left_on
and right_on
must be set instead.
Default: None
Kwarg that specifies the list of fields from the left side dataset that should be
used for joining. If this kwarg is set, right_on
must also be set. Note that
right_on
must be identical to the names of all the key columns of the right side.
Default: None
Kwarg that specifies the list of fields from the right side dataset that should be
used for joining. If this kwarg is setup, left_on
must also be set. The length
of left_on
and right_on
must be the same and corresponding fields on both
sides must have the same data types.
Default: ("forever", "0s")
Optional kwarg specifying the time window relative to the left side timestamp
within which the join should be performed. This can be seen as adding another
condition to join like WHERE left_time - d1 < right_time AND right_time < left_time + d2
where (d1, d2) = within.
- The first value in the tuple represents how far back in time should a join happen. The term "forever" means that we can go infinitely back in time when searching for an event to join from the left-hand side data.
- The second value in the tuple represents how far ahead in time we can go to
perform a join. This is useful in cases when the corresponding RHS data of
the join can come later. The default value for this parameter is
("forever", "0s")
which means that we can go infinitely back in time and the RHS data should be available for the event time of the LHS data.
Default: None
Optional kwarg that specifies the list of (non-key) fields of the right
dataset that should be included in the output dataset. If this kwarg is
not set, all such fields are included in the output dataset. If right dataset's
timestamp field is included in fields
, then it is included as a normal field
in the output dataset, with left dataset's timestamp field as the output
dataset's timestamp field.
Returns
Returns a dataset representing the joined dataset having the same keys & timestamp columns as the LHS dataset.
The output dataset has all the columns from the left dataset and all non-key non-timestamp columns from the right dataset.
If the join was of type inner
, the type of a joined
RHS column of type T
stays T
but if the join was of type left
, the type in
the output dataset becomes Optional[T]
if it was T
on the RHS side.
For LHS columns, the type is the same as the type in the LHS dataset if join type is left
.
If the join type is inner
, if a join column on the LHS is Optional[T]
, then the type
in the output dataset is T
(i.e., the Optional
is dropped).
Errors
Commit error to do a join with a dataset that doesn't have key columns.
Commit error to do a join with a dataset that is not an input to the pipeline but instead is an intermediate dataset derived during the pipeline itself.
Commit error if join will result in a dataset having two columns of the same name. A common way to work-around this is to rename columns via the rename operator before the join.
Commit error if the number/type of the join columns on the left and right side don't match.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 merchant: int
12 amount: int
13 timestamp: datetime
14
15@source(
16 webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
17)
18@dataset(index=True)
19class MerchantCategory:
20 # right side of the join can only be on key fields
21 merchant: int = field(key=True)
22 category: str
23 updated_at: datetime # won't show up in joined dataset
24
25@dataset
26class WithCategory:
27 uid: int
28 merchant: int
29 amount: int
30 timestamp: datetime
31 category: str
32
33 @pipeline
34 @inputs(Transaction, MerchantCategory)
35 def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
36 return tx.join(merchant_category, on=["merchant"], how="inner")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from typing import Optional
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 merchant: Optional[int]
13 amount: int
14 timestamp: datetime
15
16@source(
17 webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
18)
19@dataset(index=True)
20class MerchantCategory:
21 # right side of the join can only be on key fields
22 merchant: int = field(key=True)
23 category: str
24 updated_at: datetime # won't show up in joined dataset
25
26@dataset
27class WithCategory:
28 uid: int
29 merchant: Optional[int]
30 amount: int
31 timestamp: datetime
32 category: Optional[str]
33
34 @pipeline
35 @inputs(Transaction, MerchantCategory)
36 def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
37 return tx.join(merchant_category, on=["merchant"], how="left")
python
Latest
Operator to find the latest element of a group by the row timestamp. Latest operator must always be preceded by a groupby operator.
Latest operator is a good way to effectively convert a stream of only append to a time-aware upsert stream.
Parameters
The latest
operator does not take any parameters.
Returns
The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.
The row with the maximum timestamp is chosen for each group. In case of ties, the last seen row wins.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class LatestOnly:
16 uid: int = field(key=True)
17 amount: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def latest_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").latest()
python
Rename
Operator to rename columns of a dataset.
Parameters
Dictionary mapping from old column names to their new names.
All columns should still have distinct and valid names post renaming.
Returns
Returns a dataset with the same schema as the input dataset, just with the columns renamed.
Errors
Commit error if there is no existing column with name matching each of the keys in the rename dictionary.
Commit error if after renaming, there will be two columns in the dataset having the same name.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 weight: float
12 height: float
13 timestamp: datetime
14
15@dataset(index=True)
16class Derived:
17 uid: int = field(key=True)
18 # rename changes the name of the columns
19 weight_lb: float
20 height_in: float
21 timestamp: datetime
22
23 @pipeline
24 @inputs(User)
25 def rename_pipeline(cls, user: Dataset):
26 return user.rename(
27 {"weight": "weight_lb", "height": "height_in"}
28 )
python
Select
Operator to select some columns from a dataset.
Parameters
List of columns in the incoming dataset that should be selected into the output dataset. This can be passed either as unpacked *args or as kwarg set to a Python list.
Returns
Returns a dataset containing only the selected columns. Timestamp field is automatically included whether explicitly provided in the select or not.
Errors
Select, like most other operators, can not change the key or timestamp columns.
As a result, not selecting all the key columns is a commit
error.
Commit error to select a column that is not present in the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 weight: float
12 height: float
13 city: str
14 country: str
15 gender: str
16 timestamp: datetime
17
18@dataset(index=True)
19class Selected:
20 uid: int = field(key=True)
21 weight: float
22 height: float
23 timestamp: datetime
24
25 @pipeline
26 @inputs(User)
27 def select_pipeline(cls, user: Dataset):
28 return user.select("uid", "height", "weight")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Selected:
16 city: str
17 timestamp: datetime
18
19 @pipeline
20 @inputs(User)
21 def bad_pipeline(cls, user: Dataset):
22 return user.select("city")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Selected:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("uid", "random")
python
Transform
Catch all operator to add/remove/update columns.
Parameters
The transform function that takes a pandas dataframe containing a batch of rows from the input dataset and returns an output dataframe of the same length, though potentially with different set of columns.
The expected schema of the output dataset. If not specified, the schema of the input dataset is used.
Returns
Returns a dataset with the schema as specified in schema
and rows as transformed
by the transform function.
Errors
Runtime error if the dataframe returned by the transform function doesn't match
the provided schema
.
Commit error if transform tries to modify key/timestamp columns.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def transform_pipeline(cls, ds: Dataset):
24 schema = ds.schema()
25 schema["amount_sq"] = int
26 return ds.transform(
27 lambda df: df.assign(amount_sq=df["amount"] ** 2), schema
28 )
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8 webhook.endpoint("Transaction"), disorder="14d", cdc="upsert"
9)
10@dataset
11class Transaction:
12 uid: int = field(key=True)
13 amount: int
14 timestamp: datetime
15
16def transform(df: pd.DataFrame) -> pd.DataFrame:
17 df["user"] = df["uid"]
18 df.drop(columns=["uid"], inplace=True)
19 return df
20
21@dataset
22class Derived:
23 user: int = field(key=True)
24 amount: int
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def bad_pipeline(cls, ds: Dataset):
30 schema = {"user": int, "amount": int, "timestamp": datetime}
31 return ds.transform(transform, schema)
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset
15class WithHalf:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 schema = ds.schema()
25 schema["amount_sq"] = int
26 return ds.transform(
27 lambda df: df.assign(amount_sq=str(df["amount"])), schema
28 ) # noqa
python
Union
Operator to union rows from two datasets of the identical schema. Only
applicable to keyless datasets. Written as simple +
operator on two datasets.
Returns
Returns a dataset with the same schema as both the input datasets but containing rows from both of them. If both contain the identical row, two copies of that row are present in the output datasets.
Errors
Union operator is defined only when both the input datasets have the same schema. Commit error to apply union on input datasets with different schemas.
Commit error to apply union on input datasets with key columns.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, S3, Kafka
4
5cutoff = datetime(2024, 1, 1, 0, 0, 0)
6s3 = S3(name="mys3")
7bucket = s3.bucket("data", path="orders")
8kafka = Kafka(
9 name="my_kafka",
10 bootstrap_servers="localhost:9092",
11 security_protocol="SASL_PLAINTEXT",
12 sasl_mechanism="PLAIN",
13 sasl_plain_username=os.environ["KAFKA_USERNAME"],
14 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
15)
16
17@source(bucket, cdc="append", disorder="2d", until=cutoff)
18@dataset
19class OrdersBackfill:
20 uid: int
21 skuid: int
22 timestamp: datetime
23
24@source(kafka.topic("order"), cdc="append", disorder="1d", since=cutoff)
25@dataset
26class OrdersLive:
27 uid: int
28 skuid: int
29 timestamp: datetime
30
31@dataset
32class Union:
33 uid: int
34 skuid: int
35 timestamp: datetime
36
37 @pipeline
38 @inputs(OrdersBackfill, OrdersLive)
39 def explode_pipeline(cls, backfill: Dataset, live: Dataset):
40 return backfill + live
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: float
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: List[float]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price", "random")
python