Commit
Sends the local dataset and featureset definitions to the server for verification, storage and processing.
Parameters
Human readable description of the changes in the commit - akin to the commit message in a git commit.
Default: []
List of dataset objects to be committed.
Default: []
List of featureset objects to be committed.
Default: False
If set to True, server only provides a preview of what will happen if commit were to be done but doesn't change the state at all.
Since preview's main goal is to check the validity of old & new definitions, it only works with the real client/server. Mock client/server simply ignores it.
Default: False
If set to True, Fennel assumes that only those datasets/featuresets are
provided to commit
operation that are potentially changing in any form. Any
previously existing datasets/featuresets that are not included in the commit
operation are left unchanged.
Default: None
Selector to optionally commit only a subset of sources, pipelines and extractors - those with matching values. Rules of selection:
- If
env
is None, all objects are selected - If
env
is not None, an object is selected if its own selector is either None or same asenv
or is~x
for some other x
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3from fennel.featuresets import feature as F, featureset, extractor
4
5webhook = Webhook(name="some_webhook")
6
7@source(
8 webhook.endpoint("endpoint1"),
9 disorder="14d",
10 cdc="upsert",
11 env="bronze",
12)
13@source(
14 webhook.endpoint("endpoint2"),
15 disorder="14d",
16 cdc="upsert",
17 env="silver",
18)
19@dataset(index=True)
20class Transaction:
21 txid: int = field(key=True)
22 amount: int
23 timestamp: datetime
24
25@featureset
26class TransactionFeatures:
27 txid: int
28 amount: int = F(Transaction.amount, default=0)
29 amount_is_high: bool
30
31 @extractor(env="bronze")
32 def some_fn(cls, ts, amount: pd.Series):
33 return amount.apply(lambda x: x > 100)
34
35client.commit(
36 message="transaction: add transaction dataset and featureset",
37 datasets=[Transaction],
38 featuresets=[TransactionFeatures],
39 preview=False, # default is False, so didn't need to include this
40 env="silver",
41)
python
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3from fennel.featuresets import featureset, feature as F, extractor
4from fennel.lib import inputs, outputs
5
6webhook = Webhook(name="some_webhook")
7
8@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
9@dataset(index=True)
10class Transaction:
11 txid: int = field(key=True)
12 amount: int
13 timestamp: datetime
14
15client.commit(
16 message="transaction: add transaction dataset",
17 datasets=[Transaction],
18 incremental=False, # default is False, so didn't need to include this
19)
20
21@featureset
22class TransactionFeatures:
23 txid: int
24 amount: int = F(Transaction.amount, default=0)
25 amount_is_high: bool
26
27 @extractor(env="bronze")
28 @inputs("amount")
29 @outputs("amount_is_high")
30 def some_fn(cls, ts, amount: pd.Series):
31 return amount.apply(lambda x: x > 100)
32
33client.commit(
34 message="transaction: add transaction featureset",
35 datasets=[], # note: transaction dataset is not included here
36 featuresets=[TransactionFeatures],
37 incremental=True, # now we set incremental to True
38)
python
Log
Method to push data into Fennel datasets via webhook endpoints.
Parameters
The name of the webhook source containing the endpoint to which the data should be logged.
The name of the webhook endpoint to which the data should be logged.
The dataframe containing all the data that must be logged. The column of the dataframe must have the right names & types to be compatible with schemas of datasets attached to the webhook endpoint.
Default: 1000
To prevent sending too much data in one go, Fennel client divides the dataframe
in chunks of batch_size
rows each and sends each chunk one by one.
Note that Fennel servers provides atomicity guarantee for any call of log
- either
the whole data is accepted or none of it is. However, breaking down a dataframe
in chunks can lead to situation where some chunks have been ingested but others
weren't.
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18 "some_webhook",
19 "some_endpoint",
20 df=pd.DataFrame(
21 columns=["uid", "amount", "timestamp"],
22 data=[
23 [1, 10, "2021-01-01T00:00:00"],
24 [2, 20, "2021-02-01T00:00:00"],
25 ],
26 ),
27)
python
Errors
Fennel will throw an error (equivalent to 404) if no endpoint with the given specification exists.
There is no explicit schema tied to a webhook endpoint - the schema comes from
the datasets attached to it. As a result, the log
call itself doesn't check for
schema mismatch but later runtime errors may be generated async if the logged
data doesn't match the schema of the attached datasets.
You may want to keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Query
Method to query the latest value of features (typically for online inference).
Parameters
List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.
List of features that need to be queries. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queries) or strings representing fully qualified feature names.
A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.
Default: False
Boolean which indicates if the queried features should also be logged (for log-and-wait approach to training data generation).
Default: 'default'
The name of the workflow associated with the feature query. Only relevant
when log
is set to True, in which case, features associated with the same workflow
are collected together. Useful if you want to separate logged features between, say,
login fraud and transaction fraud.
Default: 1.0
The rate at which feature data should be sampled before logging. Only relevant
when log
is set to True.
1from fennel.featuresets import featureset, extractor
2from fennel.lib import inputs, outputs
3
4@featureset
5class Numbers:
6 num: int
7 is_even: bool
8 is_odd: bool
9
10 @extractor
11 @inputs("num")
12 @outputs("is_even", "is_odd")
13 def my_extractor(cls, ts, nums: pd.Series):
14 is_even = nums.apply(lambda x: x % 2 == 0)
15 is_odd = is_even.apply(lambda x: not x)
16 return pd.DataFrame({"is_even": is_even, "is_odd": is_odd})
17
18client.commit(message="some commit msg", featuresets=[Numbers])
19
20# now we can query the features
21feature_df = client.query(
22 inputs=[Numbers.num],
23 outputs=[Numbers.is_even, Numbers.is_odd],
24 input_dataframe=pd.DataFrame({"Numbers.num": [1, 2, 3, 4]}),
25)
26
27pd.testing.assert_frame_equal(
28 feature_df,
29 pd.DataFrame(
30 {
31 "Numbers.is_even": [False, True, False, True],
32 "Numbers.is_odd": [True, False, True, False],
33 }
34 ),
35)
python
Returns
Returns the queried features as dataframe with one column for each feature
in outputs
. If a single output feature is requested, features are returned
as a single pd.Series. Note that input features aren't returned back unless
they are also present in the outputs
Errors
Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.
An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.
Fennel raises a run-time error if any extractor returns a value of the feature that doesn't match its stated type.
Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.
Query Offline
Method to query the historical values of features. Typically used for training data generation or batch inference.
Parameters
List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.
List of features that need to be queried. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queried) or strings representing fully qualified feature names.
Default: pandas
The format of the input data
A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.
Only relevant when format
is "pandas".
Sending large volumes of the input data over the wire is often infeasible.
In such cases, input data can be written to S3 and the location of the file is
sent as input_s3
via S3.bucket()
function of S3
connector.
This parameter makes sense only when format
isn't "pandas".
When using this option, please ensure that Fennel's data connector IAM role has the ability to execute read & list operations on this bucket - talk to Fennel support if you need help.
The name of the column containing the timestamps as of which the feature values must be computed.
Specifies the location & other details about the s3 path where the values of
all the output features should be written. Similar to input_s3
, this is
provided via S3.bucket()
function of S3 connector.
If this isn't provided, Fennel writes the results of all requests to a fixed
default bucket - you can see its details from the return value of query_offline
or via Fennel Console.
When using this option, please ensure that Fennel's data connector IAM role has write permissions on this bucket - talk to Fennel support if you need help.
Default: None
When reading input data from s3, sometimes the column names in s3 don't match one-to-one with the names of the input features. In such cases, a dictionary mapping features to column names can be provided.
This should be setup only when input_s3
is provided.
Returns
Immediately returns a dictionary containing the following information:
- request_id - a random uuid assigned to this request. Fennel can be polled
about the status of this request using the
request_id
- output s3 bucket - the s3 bucket where results will be written
- output s3 path prefix - the prefix of the output s3 bucket
- completion rate - progress of the request as a fraction between 0 and 1
- failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
- status - the overall status of this request
A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.
Errors
Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.
An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.
Fennel raises a run-time error and may register failure on a subset of rows if any extractor returns a value of the feature that doesn't match its stated type.
Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.
Request
Response
1response = client.query_offline(
2 inputs=[Numbers.num],
3 outputs=[Numbers.is_even, Numbers.is_odd],
4 format="pandas",
5 input_dataframe=pd.DataFrame(
6 {"Numbers.num": [1, 2, 3, 4]},
7 {
8 "timestamp": [
9 datetime.now(timezone.utc) - HOUR * i for i in range(4)
10 ]
11 },
12 ),
13 timestamp_column="timestamp",
14)
15print(response)
python
1from fennel.connectors import S3
2
3s3 = S3(
4 name="extract_hist_input",
5 aws_access_key_id="<ACCESS KEY HERE>",
6 aws_secret_access_key="<SECRET KEY HERE>",
7)
8s3_input_connection = s3.bucket("bucket", prefix="data/user_features")
9s3_output_connection = s3.bucket("bucket", prefix="output")
10
11response = client.query_offline(
12 inputs=[Numbers.num],
13 outputs=[Numbers.is_even, Numbers.is_odd],
14 format="csv",
15 timestamp_column="timestamp",
16 input_s3=s3_input_connection,
17 output_s3=s3_output_connection,
18)
python
track_offline_query
Track Offline Query
Method to monitor the progress of a run of offline query.
Parameters
The unique request ID returned by the query_offline
operation that needs
to be tracked.
Returns
Immediately returns a dictionary containing the following information:
- request_id - a random uuid assigned to this request. Fennel can be polled
about the status of this request using the
request_id
- output s3 bucket - the s3 bucket where results will be written
- output s3 path prefix - the prefix of the output s3 bucket
- completion rate - progress of the request as a fraction between 0 and 1
- failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
- status - the overall status of this request
A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.
Request
Response
1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.track_offline_query(request_id)
3print(response)
python
cancel_offline_query
Cancel Offline Query
Method to cancel a previously issued query_offline
request.
Parameters
The unique request ID returned by the query_offline
operation that needs
to be canceled.
Request
Response
1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.cancel_offline_query(request_id)
3print(response)
python
Returns
Marks the request for cancellation and immediately returns a dictionary containing the following information:
- request_id - a random uuid assigned to this request. Fennel can be polled
about the status of this request using the
request_id
- output s3 bucket - the s3 bucket where results will be written
- output s3 path prefix - the prefix of the output s3 bucket
- completion rate - progress of the request as a fraction between 0 and 1
- failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
- status - the overall status of this request
A completion rate of 1.0 indicates that all processing had been completed. A completion rate of 1.0 and failure rate of 0 means that all processing had been completed successfully.
Lookup
Method to lookup rows of keyed datasets.
Parameters
The name of the dataset or Dataset object to be looked up.
List of dict where each dict contains the value of the key fields for one row for which data needs to be looked up.
The list of field names in the dataset to be looked up.
Default: None
Timestamps (one per row) as of which the lookup should be done. If not set, the lookups are done as of now (i.e the latest value for each key).
If set, the length of this list should be identical to that of the number of elements
in the keys
.
Timestamp itself can either be passed as datetime
or str
(e.g. by using
pd.to_datetime
or int
denoting seconds/milliseconds/microseconds since epoch).
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18 "some_webhook",
19 "some_endpoint",
20 pd.DataFrame(
21 data=[
22 [1, 10, "2021-01-01T00:00:00"],
23 [2, 20, "2021-02-01T00:00:00"],
24 ],
25 columns=["uid", "amount", "timestamp"],
26 ),
27)
28
29# now do a lookup to verify that the rows were logged
30keys = pd.DataFrame({"uid": [1, 2, 3]})
31ts = [
32 datetime(2021, 1, 1, 0, 0, 0),
33 datetime(2021, 2, 1, 0, 0, 0),
34 datetime(2021, 3, 1, 0, 0, 0),
35]
36response, found = client.lookup(
37 "Transaction",
38 keys=keys,
39 timestamps=pd.Series(ts),
40)
python
init_branch
Init Branch
Creates a new empty branch and checks out the client to point towards it.
Parameters
The name of the branch that should be created. The name can consist of any alpha
numeric character [a-z, A-Z, 0-9]
as well as slashes "/"
, hyphens "-"
,
underscores "_"
, and periods "."
Errors
Raises an error if the name of the branch contains invalid characters.
Raises an error if a branch of the same name already exists.
Raises an error if the auth token isn't valid. Not applicable to the mock client.
Raises an error if the account corresponding to the auth token doesn't carry the permission to create a new branch. Not applicable to the mock client.
1client.init_branch("mybranch")
2
3# init checks out client to the new branch
4# so this commit (or any other operations) will be on `mybranch`
5client.commit(...)
python
clone_branch
Clone Branch
Clones an existing branch into a new branch and checks out the client to point towards it.
Parameters
The name of the new branch that should be created as a result of the clone. The name can consist of any ASCII characters.
The name of the existing branch that should be cloned into the new branch.
Errors
Raises an error if a branch of the same name
already exists.
Raises an error if there is no existing branch of the name from_branch
.
Raises an error if the auth token isn't valid. Not applicable to the mock client.
Raises an error if the account corresponding to the auth token doesn't carry permissions to create a new branch. Also raises an error if the token doesn't have access to entities defined in the source branch. Auth/permissions checks are not applicable to the mock client.
1client.init_branch("base")
2# do some operations on `base` branch
3client.commit(...)
4
5# clone `base` branch to `mybranch`
6client.clone_branch("mybranch", "base")
7
8# clone checks out client to the new branch
9# so this commit (or any other operations) will be on `mybranch`
10client.commit(...)
python
delete_branch
Delete Branch
Deletes an existing branch and checks out the client to point to the main
branch.
Parameters
The name of the existing branch that should be deleted.
Errors
Raises an error if a branch of the given name
doesn't exist.
Raises an error if the auth token isn't valid. Not applicable to the mock client.
Raises an error if the account corresponding to the auth token doesn't carry the permission to delete branches. Also raises an error if the token doesn't have edit access to the entities in the branch being deleted. Not applicable to the mock client.
1client.delete_branch("mybranch")
2
3# do some operations on the branch
4client.commit(...)
5
6# delete the branch
7client.init_branch("mybranch")
8
9# client now points to the main branch
10client.commit(...)
python
checkout
Checkout
Sets the client to point to the given branch.
Parameters
The name of the branch that the client should start pointing to. All subsequent
operations (e.g. commit
, query
) will be directed to this branch.
Default: False
If true, creates a new empty branch if the name
is not found within the branches synced with Fennel
1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
7
8# create and change active branch from `mybranch` to `mybranch2`
9client.checkout("mybranch2", init=True)
10assert client.branch() == "mybranch2"
11
12# all subsequent operations will be on `mybranch2`
python
Errors
checkout
does not raise any error.
If not specified via explicit checkout
, by default, clients point to the 'main' branch.
Note that checkout
doesn't validate that the name
points to a real branch by default. Instead, it just changes the local state of the client. If the branch doesn't
exist, subsequent branch operations will fail, not the checkout
itself. However, when init
is set to True
, checkout
will first create the branch if a real branch is not found and subsequently point to it.
branch
Branch
Get the name of the current branch.
Parameters
Doesn't take any parameters.
Returns
Returns the name of the branch that the client is pointing to.
1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
7
8# create and change active branch from `mybranch` to `mybranch2`
9client.checkout("mybranch2", init=True)
10assert client.branch() == "mybranch2"
11
12# all subsequent operations will be on `mybranch2`
python
erase
Erase
Method to hard-erase data from a dataset.
Data related to the provided erase keys is removed and will not be reflected to downstream dataset or any subsequent queries.
This method should be used as a way to comply with GDPR and other similar regulations that require "right to be forgotten". For operational deletion/correction of data, regular CDC mechanism must be used instead.
Erase only removes the data from the dataset in the request. If the data has already propagated to downstream datasets via pipelines, you may want to issue separate erase requests for all such datasets too.
Parameters
The dataset from which data needs to be erased. Can be provided either as a Dataset object or string representing the dataset name.
The dataframe containing the erase keys - all data matching these erase keys is removed. The columns of the dataframe must have the right names & types to be compatible with the erase keys defined in the schema of dataset.
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10 uid: int = field(key=True, erase_key=True)
11 amount: int
12 timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16client.erase(
17 Transaction,
18 erase_keys=pd.DataFrame({"uid": [1, 2, 3]}),
19)
python
POST /api/v1/query
Query
API to extract a set of output features given known values of some input features.
Headers
All Fennel REST APIs expect a content-type of application/json
.
Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.
Fennel uses header for passing branch name to the server against which we want to query.
Body Parameters:
List of fully qualified names of input features. Example name: Featureset.feature
List of fully qualified names of output features. Example name: Featureset.feature
.
Can also contain name of a featureset in which case all features in the featureset
are returned.
JSON representing the dataframe of input feature values. The json can either be an array of json objects, each representing a row; or it can be a single json object where each key maps to a list of values representing a column.
Strings of json are also accepted.
If true, the extracted features are also logged (often to serve as future training data).
Default: default
The name of the workflow with which features should be logged (only relevant
when log
is set to true).
Float between 0-1 describing the sample rate to be used for logging features
(only relevant when log
is set to true
).
Returns
The response dataframe is returned as column oriented json.
1url = "{}/api/v1/query".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5 "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7data = {"UserFeatures.userid": [1, 2, 3]}
8req = {
9 "outputs": ["UserFeatures"],
10 "inputs": ["UserFeatures.userid"],
11 "data": data,
12 "log": True,
13 "workflow": "test",
14}
15
16response = requests.post(url, headers=headers, data=req)
17assert response.status_code == requests.codes.OK, response.json()
python
1url = "{}/api/v1/query".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5 "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7data = [
8 {"UserFeatures.userid": 1},
9 {"UserFeatures.userid": 2},
10 {"UserFeatures.userid": 3},
11]
12req = {
13 "outputs": ["UserFeatures"],
14 "inputs": ["UserFeatures.userid"],
15 "data": data,
16 "log": True,
17 "workflow": "test",
18}
19
20response = requests.post(url, headers=headers, data=req)
21assert response.status_code == requests.codes.OK, response.json()
python
POST /api/v1/log
Log
Method to push data into Fennel datasets via webhook endpoints via REST API.
Headers
All Fennel REST APIs expect a content-type of application/json
.
Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.
Body Parameters
The name of the webhook source containing the endpoint to which the data should be logged.
The name of the webhook endpoint to which the data should be logged.
The data to be logged to the webhook. This json string could either be:
-
Row major where it's a json array of rows with each row written as a json object.
-
Column major where it's a dictionary from column name to values of that column as a json array.
1url = "{}/api/v1/log".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5}
6data = [
7 {
8 "user_id": 1,
9 "name": "John",
10 "age": 20,
11 "country": "Russia",
12 "timestamp": "2020-01-01",
13 },
14 {
15 "user_id": 2,
16 "name": "Monica",
17 "age": 24,
18 "country": "Chile",
19 "timestamp": "2021-03-01",
20 },
21 {
22 "user_id": 3,
23 "name": "Bob",
24 "age": 32,
25 "country": "USA",
26 "timestamp": "2020-01-01",
27 },
28]
29req = {
30 "webhook": "fennel_webhook",
31 "endpoint": "UserInfo",
32 "data": data,
33}
34response = requests.post(url, headers=headers, data=req)
35assert response.status_code == requests.codes.OK, response.json()
python
GET /api/v1/lineage
Lineage
Method to get lineage via REST API.
Headers
All Fennel REST APIs expect a content-type of application/json
.
Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.
Fennel uses header for passing branch name to the server against which we want to query.
1url = "{}/api/v1/lineage".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5 "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7
8response = requests.get(url, headers=headers)
9assert response.status_code == requests.codes.OK, response.json()
python
Core Types
Fennel supports the following data types, expressed as native Python type hints.
Implemented as signed 8 byte integer (int64
)
Implemented as signed 8 byte float with double
precision
Implemented as signed 16 byte integer (int128
) with int val as precision.
Implemented as standard 1 byte boolean
Arbitrary sequence of utf-8 characters. Like most programming languages, str
doesn't support arbitrary binary bytes though.
Arbitrary sequence of binary bytes. This is useful for storing binary data.
List of elements of any other valid type T
. Unlike Python lists, all elements
must have the same type.
Map from str
to data of any valid type T
.
Fennel does not support dictionaries with arbitrary types for keys - please reach out to Fennel support if you have use cases requiring that.
Same as Python Optional
- permits either None
or values of type T
.
Denotes a list of floats of the given fixed length i.e. Embedding[32]
describes a list of 32 floats. This is same as list[float]
but enforces the
list length which is important for dot product and other similar operations on
embeddings.
Describes a timestamp, implemented as microseconds since Unix epoch (so minimum granularity is microseconds). Can be natively parsed from multiple formats though internally is stored as 8-byte signed integer describing timestamp as microseconds from epoch in UTC.
Describes a date, implemented as days since Unix epoch. Can be natively parsed from multiple formats though internally is stored as 8-byte signed integer describing date as days epoch in UTC.
Describes the equivalent of a struct or dataclass - a container containing a fixed set of fields of fixed types.
Fennel uses a strong type system and post data-ingestion, data doesn't auto-coerce
across types. For instance, it will be a compile or runtime error if something
was expected to be of type float
but received an int
instead.
1# imports for data types
2from typing import List, Optional, Dict
3from datetime import datetime
4from fennel.dtypes import struct
5
6# imports for datasets
7from fennel.datasets import dataset, field
8from fennel.lib import meta
9
10@struct # like dataclass but verifies that fields have valid Fennel types
11class Address:
12 street: str
13 city: str
14 state: str
15 zip_code: Optional[str]
16
17@meta(owner="[email protected]")
18@dataset
19class Student:
20 id: int = field(key=True)
21 name: str
22 grades: Dict[str, float]
23 honors: bool
24 classes: List[str]
25 address: Address # Address is now a valid Fennel type
26 signup_time: datetime
python
Type Restrictions
Fennel type restrictions allow you to put additional constraints on base types and restrict the set of valid values in some form.
Restriction on the base type of str
. Permits only the strings matching the given
regex pattern.
Restriction on the base type of int
or float
. Permits only the numbers
between low
and high
(both inclusive by default). Left or right can be made
exclusive by setting min_strict
or max_strict
to be False respectively.
Restricts a type T
to only accept one of the given values
as valid values.
oneof
can be thought of as a more general version of enum
.
For the restriction to be valid, all the values
must themselves be of type T
.
1# imports for data types
2from datetime import datetime, timezone
3from fennel.dtypes import oneof, between, regex
4
5# imports for datasets
6from fennel.datasets import dataset, field
7from fennel.lib import meta
8from fennel.connectors import source, Webhook
9
10webhook = Webhook(name="fennel_webhook")
11
12@meta(owner="[email protected]")
13@source(webhook.endpoint("UserInfoDataset"), disorder="14d", cdc="upsert")
14@dataset
15class UserInfoDataset:
16 user_id: int = field(key=True)
17 name: str
18 age: between(int, 0, 100, strict_min=True)
19 gender: oneof(str, ["male", "female", "non-binary"])
20 email: regex(r"[^@]+@[^@]+\.[^@]+")
21 timestamp: datetime
python
Type Restriction Composition
These restricted types act as regular types -- they can be mixed/matched to form complex composite types. For instance, the following are all valid Fennel types:
list[regex('$[0-9]{5}$')]
- list of regexes matching US zip codesoneof(Optional[int], [None, 0, 1])
- a nullable type that only takes 0 or 1 as valid values
Data belonging to the restricted types is still stored & transmitted (e.g. in json encoding) as a regular base type. It's just that Fennel will reject data of base type that doesn't meet the restriction.
Duration
Fennel lets you express durations in an easy to read natural language as described below:
Symbol | Unit |
---|---|
y | Year |
w | Week |
d | Day |
h | Hour |
m | Minute |
s | Second |
There is no shortcut for month because there is a very high degree of
variance in month's duration- some months are 28 days, some are 30 days and
some are 31 days. A common convention in ML is to use 4 weeks
to describe a month.
A year is hardcoded to be exactly 365 days and doesn't take into account variance like leap years.
1"7h" -> 7 hours
2"12d" -> 12 days
3"2y" -> 2 years
4"3h 20m 4s" -> 3 hours 20 minutes and 4 seconds
5"2y 4w" -> 2 years and 4 weeks
text
Aggregate
Operator to do continuous window aggregations. Aggregate operator must always be preceded by a groupby operator.
Parameters
Positional argument specifying the list of aggregations to apply on the grouped dataset. This list can be passed either as an unpacked *args or as an explicit list as the first position argument.
See aggregations for the full list of aggregate functions.
Keyword argument indicating the time axis to aggregate along. If along
is None
, Fennel will aggregate along the timestamp of the input dataset.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Count,
7 Sum,
8)
9from fennel.dtypes import Continuous
10from fennel.lib import inputs
11from fennel.connectors import source, Webhook
12
13webhook = Webhook(name="webhook")
14
15@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
16@dataset
17class Transaction:
18 uid: int
19 amount: int
20 timestamp: datetime = field(timestamp=True)
21 transaction_time: datetime
22
23@dataset(index=True)
24class Aggregated:
25 # groupby field becomes the key field
26 uid: int = field(key=True)
27 # new fields are added to the dataset by the aggregate operation
28 total: int
29 count_1d: int
30 timestamp: datetime = field(timestamp=True)
31
32 @pipeline
33 @inputs(Transaction)
34 def aggregate_pipeline(cls, ds: Dataset):
35 return ds.groupby("uid").aggregate(
36 count_1d=Count(window=Continuous("forever")),
37 total=Sum(of="amount", window=Continuous("forever")),
38 along="transaction_time",
39 )
python
Returns
Returns a dataset where all columns passed to groupby
become the key columns,
the timestamp column stays as it is and one column is created for each aggregation.
The type of each aggregated column depends on the aggregate and the type of the corresponding column in the input dataset.
Aggregate is the terminal operator - no other operator can follow it and no other datasets can be derived from the dataset containing this pipeline.
Assign
Operator to add a new column to a dataset - the added column is neither a key column nor a timestamp column.
Parameters
The name of the new column to be added - must not conflict with any existing name on the dataset.
The data type of the new column to be added - must be a valid Fennel supported data type.
The function, which when given a subset of the dataset as a dataframe, returns the value of the new column for each row in the dataframe.
Fennel verifies at runtime that the returned series matches the declared dtype
.
Assign can also be given one or more expressions instead of Python lambdas - it can either have expressions or lambdas but not both. Expected types must also be present along with each expression (see example).
Unlike lambda based assign, all type validation and many other errors can be verified at the commit time itself (vs incurring runtime errors).
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def my_pipeline(cls, ds: Dataset):
24 return ds.assign("amount_sq", int, lambda df: df["amount"] ** 2)
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
10@dataset
11class Transaction:
12 uid: int = field(key=True)
13 amount: int
14 timestamp: datetime
15
16@dataset(index=True)
17class WithSquare:
18 uid: int = field(key=True)
19 amount: int
20 amount_sq: int
21 amount_half: float
22 timestamp: datetime
23
24 @pipeline
25 @inputs(Transaction)
26 def my_pipeline(cls, ds: Dataset):
27 return ds.assign(
28 amount_sq=(col("amount") * col("amount")).astype(int),
29 amount_half=(col("amount") / 2).astype(float),
30 )
python
Returns
Returns a dataset with one additional column of the given name
and type same
as dtype
. This additional column is neither a key-column or the timestamp
column.
Errors
Runtime error if the value returned from the lambda isn't a pandas Series of the declared type and the same length as the input dataframe.
When using expressions, errors may be raised during the import or commit if types don't match and/or there are other validation errors related to the expressions.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset
15class WithHalf:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def my_pipeline(cls, ds: Dataset):
24 return ds.assign(
25 "amount_sq", int, lambda df: df["amount"] * 0.5
26 )
python
1with pytest.raises(Exception):
2
3 from fennel.datasets import dataset, field, pipeline, Dataset
4 from fennel.lib import inputs
5 from fennel.connectors import source, Webhook
6 from fennel.expr import col
7
8 webhook = Webhook(name="webhook")
9
10 @source(webhook.endpoint("txn"), disorder="14d", cdc="upsert")
11 @dataset
12 class Transaction:
13 uid: int = field(key=True)
14 amount: int
15 timestamp: datetime
16
17 @dataset
18 class WithHalf:
19 uid: int = field(key=True)
20 amount: int
21 amount_sq: int
22 amount_half: int
23 timestamp: datetime
24
25 @pipeline
26 @inputs(Transaction)
27 def my_pipeline(cls, ds: Dataset):
28 return ds.assign(
29 amount_sq=(col("amount") * col("amount")).astype(int),
30 amount_half=(col("amount") / 2).astype(int),
31 )
python
Dedup
Operator to dedup keyless datasets (e.g. event streams).
Parameters
Default: None
The list of columns to use for identifying duplicates. If not specified, all the columns are used for identifying duplicates.
Two rows of the input dataset are considered duplicates if and only if they have
the same values for the timestamp column and all the by
columns.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 txid: int
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Deduped:
17 txid: int
18 uid: int
19 amount: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def dedup_pipeline(cls, ds: Dataset):
25 return ds.dedup(by="txid")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 txid: int
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Deduped:
17 txid: int
18 uid: int
19 amount: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def dedup_by_all_pipeline(cls, ds: Dataset):
25 return ds.dedup()
python
Returns
Returns a keyless dataset having the same schema as the input dataset but with some duplicated rows filtered out.
Errors
Commit error to apply dedup on a keyed dataset.
Drop
Operator to drop one or more non-key non-timestamp columns from a dataset.
Parameters
List of columns in the incoming dataset that should be dropped. This can be passed either as unpacked *args or as a Python list.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 country: str
13 weight: float
14 height: float
15 gender: str
16 timestamp: datetime
17
18@dataset(index=True)
19class Dropped:
20 uid: int = field(key=True)
21 gender: str
22 timestamp: datetime
23
24 @pipeline
25 @inputs(User)
26 def drop_pipeline(cls, user: Dataset):
27 return user.drop("height", "weight").drop(
28 columns=["city", "country"]
29 )
python
Returns
Returns a dataset with the same schema as the input dataset but with some columns
(as specified by columns
) removed.
Errors
Commit error on removing any key columns or the timestamp column.
Commit error on removing any column that doesn't exist in the input dataset.
1@source(webhook.endpoint("User"))
2@dataset
3class User:
4 uid: int = field(key=True)
5 city: str
6 timestamp: datetime
7
8@dataset
9class Dropped:
10 city: str
11 timestamp: datetime
12
13 @pipeline
14 @inputs(User)
15 def pipeline(cls, user: Dataset):
16 return user.drop("uid")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Dropped:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.drop("random")
python
Dropnull
Operator to drop rows containing null values (aka None in Python speak) in the given columns.
Parameters
List of columns in the incoming dataset that should be checked for presence of None values - if any such column has None for a row, the row will be filtered out from the output dataset. This can be passed either as unpacked *args or as a Python list.
If no arguments are given, columns
will be all columns with the type Optional[T]
in the dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 dob: str
12 city: Optional[str]
13 country: Optional[str]
14 gender: Optional[str]
15 timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19 uid: int = field(key=True)
20 dob: str
21 # dropnull changes the type of the columns to non-optional
22 city: str
23 country: str
24 gender: Optional[str]
25 timestamp: datetime
26
27 @pipeline
28 @inputs(User)
29 def dropnull_pipeline(cls, user: Dataset):
30 return user.dropnull("city", "country")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 dob: str
12 city: Optional[str]
13 country: Optional[str]
14 gender: Optional[str]
15 timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19 uid: int = field(key=True)
20 dob: str
21 # dropnull changes the type of all optional columns to non-optional
22 city: str
23 country: str
24 gender: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(User)
29 def dropnull_pipeline(cls, user: Dataset):
30 return user.dropnull()
python
Returns
Returns a dataset with the same name & number of columns as the input dataset but
with the type of some columns modified from Optional[T]
-> T
.
Errors
Commit error to pass a column without an optional type.
Commit error to pass a column that doesn't exist in the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: Optional[str]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("random")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 # dropnull can only be used on optional columns
12 city: str
13 timestamp: datetime
14
15@dataset
16class Derived:
17 uid: int = field(key=True)
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("city")
python
Explode
Operator to explode lists in a single row to form multiple rows, analogous to
to the explode
function in Pandas.
Only applicable to keyless datasets.
Parameters
The list of columns to explode. This list can be passed either as
unpacked *args or kwarg columns
mapping to an explicit list.
All the columns should be of type List[T]
for some T
in the input dataset and
after explosion, they get converted to a column of type Optional[T]
.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 skus: List[int]
12 prices: List[float]
13 timestamp: datetime
14
15@dataset
16class Derived:
17 uid: int
18 sku: Optional[int]
19 price: Optional[float]
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Orders)
24 def explode_pipeline(cls, ds: Dataset):
25 return (
26 ds
27 .explode("skus", "prices").rename(
28 {"skus": "sku", "prices": "price"}
29 )
30 )
python
Returns
Returns a dataset with the same number & name of columns as the input dataset but
with the type of exploded columns modified from List[T]
to Optional[T]
.
Empty lists are converted to None
values (hence the output types need to be
Optional[T]
).
Errors
Commit error to apply explode on an input dataset with key columns.
Commit error to explode using a column that is not of the type List[T]
.
Commit error to explode using a column that is not present in the input dataset.
For a given row, all the columns getting exploded must have lists of the same length, otherwise a runtime error is raised. Note that the lists can be of different type across rows.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: float
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: List[float]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price", "random")
python
Filter
Operator to selectively filter
out rows from a dataset.
Parameters
The actual filter function - takes a pandas dataframe containing a batch of rows
from the input dataset and is expected to return a series of booleans of the
same length. Only rows corresponding to True
are retained in the output dataset.
Alternatively, can also be a Fennel expression.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 signup_time: datetime
13
14@dataset(index=True)
15class Filtered:
16 uid: int = field(key=True)
17 city: str
18 signup_time: datetime
19
20 @pipeline
21 @inputs(User)
22 def my_pipeline(cls, user: Dataset):
23 return user.filter(lambda df: df["city"] != "London")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
10@dataset
11class User:
12 uid: int = field(key=True)
13 city: str
14 signup_time: datetime
15
16@dataset(index=True)
17class Filtered:
18 uid: int = field(key=True)
19 city: str
20 signup_time: datetime
21
22 @pipeline
23 @inputs(User)
24 def my_pipeline(cls, user: Dataset):
25 return user.filter(col("city") != "London")
python
Returns
Returns a dataset with the same schema as the input dataset, just with some rows potentially filtered out.
Errors
Runtime error if the value returned from the lambda isn't a pandas Series of the bool and of the same length as the input dataframe. When using expressions, any type and many other kinds of errors are caught at import or commit time statically.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 signup_time: datetime
13
14@dataset
15class Filtered:
16 uid: int = field(key=True)
17 city: str
18 signup_time: datetime
19
20 @pipeline
21 @inputs(User)
22 def my_pipeline(cls, user: Dataset):
23 return user.filter(lambda df: df["city"] + "London")
python
First
Operator to find the first element of a group by the row timestamp. First operator must always be preceded by a groupby operator.
Parameters
The first
operator does not take any parameters.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class FirstOnly:
16 uid: int = field(key=True)
17 amount: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def first_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").first()
python
Returns
The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.
For each group formed by grouping, one row is chosen having the lowest value in the timestamp field. In case of ties, the first seen row wins.
Groupby
Operator to group rows of incoming datasets to be processed by the next operator.
Technically, groupby isn't a standalone operator by itself since its output isn't a valid dataset. Instead, it becomes a valid operator when followed by first, latest, or aggregate.
Parameters
List of keys in the incoming dataset along which the rows should be grouped. This can be passed as unpacked *args or a Python list.
Optional field to specify the default window for all the aggregations in the following aggregate operator. If window parameter is used then the operator can only be followed by an aggregate operator and window will become a key field in the output schema.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 category: str
12 timestamp: datetime
13
14@dataset
15class FirstInCategory:
16 category: str = field(key=True)
17 uid: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def groupby_pipeline(cls, transactions: Dataset):
23 return transactions.groupby("category").first()
python
Errors
Commit error if trying to group by columns that don't exist in the input dataset.
Commit error if trying to do a groupby via the timestamp column of the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8 webhook.endpoint("Transaction"), disorder="14d", cdc="append"
9)
10@dataset
11class Transaction:
12 uid: int
13 category: str
14 timestamp: datetime
15
16@dataset
17class FirstInCategory:
18 category: str = field(key=True)
19 uid: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def bad_pipeline(cls, transactions: Dataset):
25 return transactions.groupby("non_existent_column").first()
python
Join
Operator to join two datasets. The right hand side dataset must have one or more key columns and the join operation is performed on these columns.
Parameters
The right hand side dataset to join this dataset with. RHS dataset must be a keyed dataset and must also be an input to the pipeline (vs being an intermediary dataset derived within a pipeline itself).
Required kwarg indicating whether the join should be an inner join (how="inner"
)
or a left-outer join (how="left"
). With "left"
, the output dataset may have
a row even if there is no matching row on the right side.
Default: None
Kwarg that specifies the list of fields along which join should happen. If present, both left and right side datasets must have fields with these names and matching data types. This list must be identical to the names of all key columns of the right hand side.
If this isn't set, left_on
and right_on
must be set instead.
Default: None
Kwarg that specifies the list of fields from the left side dataset that should be
used for joining. If this kwarg is set, right_on
must also be set. Note that
right_on
must be identical to the names of all the key columns of the right side.
Default: None
Kwarg that specifies the list of fields from the right side dataset that should be
used for joining. If this kwarg is setup, left_on
must also be set. The length
of left_on
and right_on
must be the same and corresponding fields on both
sides must have the same data types.
Default: ("forever", "0s")
Optional kwarg specifying the time window relative to the left side timestamp
within which the join should be performed. This can be seen as adding another
condition to join like WHERE left_time - d1 < right_time AND right_time < left_time + d2
where (d1, d2) = within.
- The first value in the tuple represents how far back in time should a join happen. The term "forever" means that we can go infinitely back in time when searching for an event to join from the left-hand side data.
- The second value in the tuple represents how far ahead in time we can go to
perform a join. This is useful in cases when the corresponding RHS data of
the join can come later. The default value for this parameter is
("forever", "0s")
which means that we can go infinitely back in time and the RHS data should be available for the event time of the LHS data.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 merchant: int
12 amount: int
13 timestamp: datetime
14
15@source(
16 webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
17)
18@dataset(index=True)
19class MerchantCategory:
20 # right side of the join can only be on key fields
21 merchant: int = field(key=True)
22 category: str
23 updated_at: datetime # won't show up in joined dataset
24
25@dataset
26class WithCategory:
27 uid: int
28 merchant: int
29 amount: int
30 timestamp: datetime
31 category: str
32
33 @pipeline
34 @inputs(Transaction, MerchantCategory)
35 def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
36 return tx.join(merchant_category, on=["merchant"], how="inner")
python
Returns
Returns a dataset representing the joined dataset having the same keys & timestamp columns as the LHS dataset.
The output dataset has all the columns from the left dataset and all non-key non-timestamp columns from the right dataset.
If the join was of type inner
, the type of a joined
RHS column of type T
stays T
but if the join was of type left
, the type in
the output dataset becomes Optional[T]
if it was T
on the RHS side.
Errors
Commit error to do a join with a dataset that doesn't have key columns.
Commit error to do a join with a dataset that is not an input to the pipeline but instead is an intermediate dataset derived during the pipeline itself.
Commit error if join will result in a dataset having two columns of the same name. A common way to work-around this is to rename columns via the rename operator before the join.
Commit error if the number/type of the join columns on the left and right side don't match.
Latest
Operator to find the latest element of a group by the row timestamp. Latest operator must always be preceded by a groupby operator.
Latest operator is a good way to effectively convert a stream of only append to a time-aware upsert stream.
Parameters
The latest
operator does not take any parameters.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class LatestOnly:
16 uid: int = field(key=True)
17 amount: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def latest_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").latest()
python
Returns
The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.
The row with the maximum timestamp is chosen for each group. In case of ties, the last seen row wins.
Rename
Operator to rename columns of a dataset.
Parameters
Dictionary mapping from old column names to their new names.
All columns should still have distinct and valid names post renaming.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 weight: float
12 height: float
13 timestamp: datetime
14
15@dataset(index=True)
16class Derived:
17 uid: int = field(key=True)
18 # rename changes the name of the columns
19 weight_lb: float
20 height_in: float
21 timestamp: datetime
22
23 @pipeline
24 @inputs(User)
25 def rename_pipeline(cls, user: Dataset):
26 return user.rename(
27 {"weight": "weight_lb", "height": "height_in"}
28 )
python
Returns
Returns a dataset with the same schema as the input dataset, just with the columns renamed.
Errors
Commit error if there is no existing column with name matching each of the keys in the rename dictionary.
Commit error if after renaming, there will be two columns in the dataset having the same name.
Select
Operator to select some columns from a dataset.
Parameters
List of columns in the incoming dataset that should be selected into the output dataset. This can be passed either as unpacked *args or as kwarg set to a Python list.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 weight: float
12 height: float
13 city: str
14 country: str
15 gender: str
16 timestamp: datetime
17
18@dataset(index=True)
19class Selected:
20 uid: int = field(key=True)
21 weight: float
22 height: float
23 timestamp: datetime
24
25 @pipeline
26 @inputs(User)
27 def select_pipeline(cls, user: Dataset):
28 return user.select("uid", "height", "weight")
python
Returns
Returns a dataset containing only the selected columns. Timestamp field is automatically included whether explicitly provided in the select or not.
Errors
Select, like most other operators, can not change the key or timestamp columns.
As a result, not selecting all the key columns is a commit
error.
Commit error to select a column that is not present in the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Selected:
16 city: str
17 timestamp: datetime
18
19 @pipeline
20 @inputs(User)
21 def bad_pipeline(cls, user: Dataset):
22 return user.select("city")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Selected:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("uid", "random")
python
Transform
Catch all operator to add/remove/update columns.
Parameters
The transform function that takes a pandas dataframe containing a batch of rows from the input dataset and returns an output dataframe of the same length, though potentially with different set of columns.
The expected schema of the output dataset. If not specified, the schema of the input dataset is used.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def transform_pipeline(cls, ds: Dataset):
24 schema = ds.schema()
25 schema["amount_sq"] = int
26 return ds.transform(
27 lambda df: df.assign(amount_sq=df["amount"] ** 2), schema
28 )
python
Returns
Returns a dataset with the schema as specified in schema
and rows as transformed
by the transform function.
Errors
Runtime error if the dataframe returned by the transform function doesn't match
the provided schema
.
Commit error if transform tries to modify key/timestamp columns.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8 webhook.endpoint("Transaction"), disorder="14d", cdc="upsert"
9)
10@dataset
11class Transaction:
12 uid: int = field(key=True)
13 amount: int
14 timestamp: datetime
15
16def transform(df: pd.DataFrame) -> pd.DataFrame:
17 df["user"] = df["uid"]
18 df.drop(columns=["uid"], inplace=True)
19 return df
20
21@dataset
22class Derived:
23 user: int = field(key=True)
24 amount: int
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def bad_pipeline(cls, ds: Dataset):
30 schema = {"user": int, "amount": int, "timestamp": datetime}
31 return ds.transform(transform, schema)
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset
15class WithHalf:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 schema = ds.schema()
25 schema["amount_sq"] = int
26 return ds.transform(
27 lambda df: df.assign(amount_sq=str(df["amount"])), schema
28 ) # noqa
python
Union
Operator to union rows from two datasets of the identical schema. Only
applicable to keyless datasets. Written as simple +
operator on two datasets.
Returns
Returns a dataset with the same schema as both the input datasets but containing rows from both of them. If both contain the identical row, two copies of that row are present in the output datasets.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, S3, Kafka
4
5cutoff = datetime(2024, 1, 1, 0, 0, 0)
6s3 = S3(name="mys3")
7bucket = s3.bucket("data", path="orders")
8kafka = Kafka(
9 name="my_kafka",
10 bootstrap_servers="localhost:9092",
11 security_protocol="SASL_PLAINTEXT",
12 sasl_mechanism="PLAIN",
13 sasl_plain_username=os.environ["KAFKA_USERNAME"],
14 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
15)
16
17@source(bucket, cdc="append", disorder="2d", until=cutoff)
18@dataset
19class OrdersBackfill:
20 uid: int
21 skuid: int
22 timestamp: datetime
23
24@source(kafka.topic("order"), cdc="append", disorder="1d", since=cutoff)
25@dataset
26class OrdersLive:
27 uid: int
28 skuid: int
29 timestamp: datetime
30
31@dataset
32class Union:
33 uid: int
34 skuid: int
35 timestamp: datetime
36
37 @pipeline
38 @inputs(OrdersBackfill, OrdersLive)
39 def explode_pipeline(cls, backfill: Dataset, live: Dataset):
40 return backfill + live
python
Errors
Union operator is defined only when both the input datasets have the same schema. Commit error to apply union on input datasets with different schemas.
Commit error to apply union on input datasets with key columns.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: float
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: List[float]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price", "random")
python
Avro Registry
Several Fennel sources work with Avro format. When using Avro, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.
Fennel supports integration with avro schema registries.
Parameters
String denoting the provider of the registry. As of right now, Fennel only supports "confluent" avro registry though more such schema registries may be added over time.
The URL where the schema registry is hosted.
User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.
Assuming authentication is needed, either username/password must be provided or a token, but not both.
The password associated with the username.
Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.
1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5 name="my_kafka",
6 bootstrap_servers="localhost:9092", # could come via os env var too
7 security_protocol="SASL_PLAINTEXT",
8 sasl_mechanism="PLAIN",
9 sasl_plain_username=os.environ["KAFKA_USERNAME"],
10 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13avro = Avro(
14 registry="confluent",
15 url=os.environ["SCHEMA_REGISTRY_URL"],
16 username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17 password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
python