Commit
Sends the local dataset and featureset definitions to the server for verification, storage and processing.
Parameters
Human readable description of the changes in the commit - akin to the commit message in a git commit.
Default: []
List of dataset objects to be committed.
Default: []
List of featureset objects to be committed.
Default: False
If set to True, server only provides a preview of what will happen if commit were to be done but doesn't change the state at all.
Since preview's main goal is to check the validity of old & new definitions, it only works with the real client/server. Mock client/server simply ignores it.
Default: False
If set to True, Fennel assumes that only those datasets/featuresets are
provided to commit
operation that are potentially changing in any form. Any
previously existing datasets/featuresets that are not included in the commit
operation are left unchanged.
Default: None
Selector to optionally commit only a subset of sources, pipelines and extractors - those with matching values. Rules of selection:
- If
env
is None, all objects are selected - If
env
is not None, an object is selected if its own selector is either None or same asenv
or is~x
for some other x
Default: True
If you set the backfill parameter to False, the system will return an error if committing changes would result in a backfill of any dataset/pipeline. A backfill occurs when there is no existing dataset that is isomorphic to the new dataset. Setting backfill to False helps prevent accidental backfill by ensuring that only datasets matching the existing structure are committed.
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3from fennel.featuresets import feature as F, featureset, extractor
4
5webhook = Webhook(name="some_webhook")
6
7@source(
8 webhook.endpoint("endpoint1"),
9 disorder="14d",
10 cdc="upsert",
11 env="bronze",
12)
13@source(
14 webhook.endpoint("endpoint2"),
15 disorder="14d",
16 cdc="upsert",
17 env="silver",
18)
19@dataset(index=True)
20class Transaction:
21 txid: int = field(key=True)
22 amount: int
23 timestamp: datetime
24
25@featureset
26class TransactionFeatures:
27 txid: int
28 amount: int = F(Transaction.amount, default=0)
29 amount_is_high: bool
30
31 @extractor(env="bronze")
32 def some_fn(cls, ts, amount: pd.Series):
33 return amount.apply(lambda x: x > 100)
34
35client.commit(
36 message="transaction: add transaction dataset and featureset",
37 datasets=[Transaction],
38 featuresets=[TransactionFeatures],
39 preview=False, # default is False, so didn't need to include this
40 env="silver",
41)
python
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3from fennel.featuresets import featureset, feature as F, extractor
4from fennel.lib import inputs, outputs
5
6webhook = Webhook(name="some_webhook")
7
8@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
9@dataset(index=True)
10class Transaction:
11 txid: int = field(key=True)
12 amount: int
13 timestamp: datetime
14
15client.commit(
16 message="transaction: add transaction dataset",
17 datasets=[Transaction],
18 incremental=False, # default is False, so didn't need to include this
19)
20
21@featureset
22class TransactionFeatures:
23 txid: int
24 amount: int = F(Transaction.amount, default=0)
25 amount_is_high: bool
26
27 @extractor(env="bronze")
28 @inputs("amount")
29 @outputs("amount_is_high")
30 def some_fn(cls, ts, amount: pd.Series):
31 return amount.apply(lambda x: x > 100)
32
33client.commit(
34 message="transaction: add transaction featureset",
35 datasets=[], # note: transaction dataset is not included here
36 featuresets=[TransactionFeatures],
37 incremental=True, # now we set incremental to True
38)
python
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4webhook = Webhook(name="some_webhook")
5
6@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
7@dataset(index=True)
8class Transaction:
9 txid: int = field(key=True)
10 amount: int
11 timestamp: datetime
12
13client.checkout("test_backfill", init=True)
14client.commit(
15 message="transaction: add transaction dataset",
16 datasets=[Transaction],
17 backfill=True, # default is True, so didn't need to include this
18)
19
20@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
21@dataset(index=True)
22class Transaction:
23 txid: int = field(key=True)
24 amount: int
25 timestamp: datetime
26
27client.checkout("main", init=True)
28client.commit(
29 message="adding transaction dataset to main",
30 datasets=[Transaction],
31 backfill=False, # set backfill as False to prevent accidental backfill for Transaction dataset
32)
python
Log
Method to push data into Fennel datasets via webhook endpoints.
Parameters
The name of the webhook source containing the endpoint to which the data should be logged.
The name of the webhook endpoint to which the data should be logged.
The dataframe containing all the data that must be logged. The column of the dataframe must have the right names & types to be compatible with schemas of datasets attached to the webhook endpoint.
Default: 1000
To prevent sending too much data in one go, Fennel client divides the dataframe
in chunks of batch_size
rows each and sends each chunk one by one.
Note that Fennel servers provides atomicity guarantee for any call of log
- either
the whole data is accepted or none of it is. However, breaking down a dataframe
in chunks can lead to situation where some chunks have been ingested but others
weren't.
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18 "some_webhook",
19 "some_endpoint",
20 df=pd.DataFrame(
21 columns=["uid", "amount", "timestamp"],
22 data=[
23 [1, 10, "2021-01-01T00:00:00"],
24 [2, 20, "2021-02-01T00:00:00"],
25 ],
26 ),
27)
python
Errors
Fennel will throw an error (equivalent to 404) if no endpoint with the given specification exists.
There is no explicit schema tied to a webhook endpoint - the schema comes from
the datasets attached to it. As a result, the log
call itself doesn't check for
schema mismatch but later runtime errors may be generated async if the logged
data doesn't match the schema of the attached datasets.
You may want to keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Query
Method to query the latest value of features (typically for online inference).
Parameters
List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.
List of features that need to be queries. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queries) or strings representing fully qualified feature names.
A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.
Default: False
Boolean which indicates if the queried features should also be logged (for log-and-wait approach to training data generation).
Default: 'default'
The name of the workflow associated with the feature query. Only relevant
when log
is set to True, in which case, features associated with the same workflow
are collected together. Useful if you want to separate logged features between, say,
login fraud and transaction fraud.
Default: 1.0
The rate at which feature data should be sampled before logging. Only relevant
when log
is set to True.
1from fennel.featuresets import featureset, extractor
2from fennel.lib import inputs, outputs
3
4@featureset
5class Numbers:
6 num: int
7 is_even: bool
8 is_odd: bool
9
10 @extractor
11 @inputs("num")
12 @outputs("is_even", "is_odd")
13 def my_extractor(cls, ts, nums: pd.Series):
14 is_even = nums.apply(lambda x: x % 2 == 0)
15 is_odd = is_even.apply(lambda x: not x)
16 return pd.DataFrame({"is_even": is_even, "is_odd": is_odd})
17
18client.commit(message="some commit msg", featuresets=[Numbers])
19
20# now we can query the features
21feature_df = client.query(
22 inputs=[Numbers.num],
23 outputs=[Numbers.is_even, Numbers.is_odd],
24 input_dataframe=pd.DataFrame({"Numbers.num": [1, 2, 3, 4]}),
25)
26
27pd.testing.assert_frame_equal(
28 feature_df,
29 pd.DataFrame(
30 {
31 "Numbers.is_even": [False, True, False, True],
32 "Numbers.is_odd": [True, False, True, False],
33 }
34 ),
35)
python
Returns
Returns the queried features as dataframe with one column for each feature
in outputs
. If a single output feature is requested, features are returned
as a single pd.Series. Note that input features aren't returned back unless
they are also present in the outputs
Errors
Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.
An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.
Fennel raises a run-time error if any extractor returns a value of the feature that doesn't match its stated type.
Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.
Query Offline
Method to query the historical values of features. Typically used for training data generation or batch inference.
Parameters
List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.
List of features that need to be queried. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queried) or strings representing fully qualified feature names.
A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.
This parameter is mutually exclusive with input_s3
.
Sending large volumes of the input data over the wire is often infeasible.
In such cases, input data can be written to S3 and the location of the file is
sent as input_s3
via S3.bucket()
function of S3
connector.
When using this option, please ensure that Fennel's data connector IAM role has the ability to execute read & list operations on this bucket - talk to Fennel support if you need help.
The name of the column containing the timestamps as of which the feature values must be computed.
Specifies the location & other details about the s3 path where the values of
all the output features should be written. Similar to input_s3
, this is
provided via S3.bucket()
function of S3 connector.
If this isn't provided, Fennel writes the results of all requests to a fixed
default bucket - you can see its details from the return value of query_offline
or via Fennel Console.
When using this option, please ensure that Fennel's data connector IAM role has write permissions on this bucket - talk to Fennel support if you need help.
Default: 'default'
The name of the workflow associated with the feature query. It functions like a tag for example, "fraud" or "finance" to categorize the query. If this parameter is not provided, it will default to "default".
Default: None
When reading input data from s3, sometimes the column names in s3 don't match one-to-one with the names of the input features. In such cases, a dictionary mapping features to column names can be provided.
This should be setup only when input_s3
is provided.
Returns
Immediately returns a dictionary containing the following information:
- request_id - a random uuid assigned to this request. Fennel can be polled
about the status of this request using the
request_id
- output s3 bucket - the s3 bucket where results will be written
- output s3 path prefix - the prefix of the output s3 bucket
- completion rate - progress of the request as a fraction between 0 and 1
- failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
- status - the overall status of this request
A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.
Errors
Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.
An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.
Fennel raises a run-time error and may register failure on a subset of rows if any extractor returns a value of the feature that doesn't match its stated type.
Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.
Request
Response
1response = client.query_offline(
2 inputs=[Numbers.num],
3 outputs=[Numbers.is_even, Numbers.is_odd],
4 format="pandas",
5 input_dataframe=pd.DataFrame(
6 {"Numbers.num": [1, 2, 3, 4]},
7 {
8 "timestamp": [
9 datetime.now(timezone.utc) - HOUR * i for i in range(4)
10 ]
11 },
12 ),
13 timestamp_column="timestamp",
14 workflow="fraud",
15)
16print(response)
python
1from fennel.connectors import S3
2
3s3 = S3(
4 name="extract_hist_input",
5 aws_access_key_id="<ACCESS KEY HERE>",
6 aws_secret_access_key="<SECRET KEY HERE>",
7)
8s3_input_connection = s3.bucket("bucket", prefix="data/user_features")
9s3_output_connection = s3.bucket("bucket", prefix="output")
10
11response = client.query_offline(
12 inputs=[Numbers.num],
13 outputs=[Numbers.is_even, Numbers.is_odd],
14 format="csv",
15 timestamp_column="timestamp",
16 input_s3=s3_input_connection,
17 output_s3=s3_output_connection,
18 workflow="fraud",
19)
python
track_offline_query
Track Offline Query
Method to monitor the progress of a run of offline query.
Parameters
The unique request ID returned by the query_offline
operation that needs
to be tracked.
Returns
Immediately returns a dictionary containing the following information:
- request_id - a random uuid assigned to this request. Fennel can be polled
about the status of this request using the
request_id
- output s3 bucket - the s3 bucket where results will be written
- output s3 path prefix - the prefix of the output s3 bucket
- completion rate - progress of the request as a fraction between 0 and 1
- failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
- status - the overall status of this request
A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.
Request
Response
1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.track_offline_query(request_id)
3print(response)
python
cancel_offline_query
Cancel Offline Query
Method to cancel a previously issued query_offline
request.
Parameters
The unique request ID returned by the query_offline
operation that needs
to be canceled.
Request
Response
1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.cancel_offline_query(request_id)
3print(response)
python
Returns
Marks the request for cancellation and immediately returns a dictionary containing the following information:
- request_id - a random uuid assigned to this request. Fennel can be polled
about the status of this request using the
request_id
- output s3 bucket - the s3 bucket where results will be written
- output s3 path prefix - the prefix of the output s3 bucket
- completion rate - progress of the request as a fraction between 0 and 1
- failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
- status - the overall status of this request
A completion rate of 1.0 indicates that all processing had been completed. A completion rate of 1.0 and failure rate of 0 means that all processing had been completed successfully.
Lookup
Method to lookup rows of keyed datasets.
Parameters
The name of the dataset or Dataset object to be looked up.
List of dict where each dict contains the value of the key fields for one row for which data needs to be looked up.
The list of field names in the dataset to be looked up.
Default: None
Timestamps (one per row) as of which the lookup should be done. If not set, the lookups are done as of now (i.e the latest value for each key).
If set, the length of this list should be identical to that of the number of elements
in the keys
.
Timestamp itself can either be passed as datetime
or str
(e.g. by using
pd.to_datetime
or int
denoting seconds/milliseconds/microseconds since epoch).
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18 "some_webhook",
19 "some_endpoint",
20 pd.DataFrame(
21 data=[
22 [1, 10, "2021-01-01T00:00:00"],
23 [2, 20, "2021-02-01T00:00:00"],
24 ],
25 columns=["uid", "amount", "timestamp"],
26 ),
27)
28
29# now do a lookup to verify that the rows were logged
30keys = pd.DataFrame({"uid": [1, 2, 3]})
31ts = [
32 datetime(2021, 1, 1, 0, 0, 0),
33 datetime(2021, 2, 1, 0, 0, 0),
34 datetime(2021, 3, 1, 0, 0, 0),
35]
36response, found = client.lookup(
37 "Transaction",
38 keys=keys,
39 timestamps=pd.Series(ts),
40)
python
init_branch
Init Branch
Creates a new empty branch and checks out the client to point towards it.
Parameters
The name of the branch that should be created. The name can consist of any alpha
numeric character [a-z, A-Z, 0-9]
as well as slashes "/"
, hyphens "-"
,
underscores "_"
, and periods "."
Errors
Raises an error if the name of the branch contains invalid characters.
Raises an error if a branch of the same name already exists.
Raises an error if the auth token isn't valid. Not applicable to the mock client.
Raises an error if the account corresponding to the auth token doesn't carry the permission to create a new branch. Not applicable to the mock client.
1client.init_branch("mybranch")
2
3# init checks out client to the new branch
4# so this commit (or any other operations) will be on `mybranch`
5client.commit(...)
python
clone_branch
Clone Branch
Clones an existing branch into a new branch and checks out the client to point towards it.
Parameters
The name of the new branch that should be created as a result of the clone. The name can consist of any ASCII characters.
The name of the existing branch that should be cloned into the new branch.
Errors
Raises an error if a branch of the same name
already exists.
Raises an error if there is no existing branch of the name from_branch
.
Raises an error if the auth token isn't valid. Not applicable to the mock client.
Raises an error if the account corresponding to the auth token doesn't carry permissions to create a new branch. Also raises an error if the token doesn't have access to entities defined in the source branch. Auth/permissions checks are not applicable to the mock client.
1client.init_branch("base")
2# do some operations on `base` branch
3client.commit(...)
4
5# clone `base` branch to `mybranch`
6client.clone_branch("mybranch", "base")
7
8# clone checks out client to the new branch
9# so this commit (or any other operations) will be on `mybranch`
10client.commit(...)
python
delete_branch
Delete Branch
Deletes an existing branch and checks out the client to point to the main
branch.
Parameters
The name of the existing branch that should be deleted.
Errors
Raises an error if a branch of the given name
doesn't exist.
Raises an error if the auth token isn't valid. Not applicable to the mock client.
Raises an error if the account corresponding to the auth token doesn't carry the permission to delete branches. Also raises an error if the token doesn't have edit access to the entities in the branch being deleted. Not applicable to the mock client.
1client.delete_branch("mybranch")
2
3# do some operations on the branch
4client.commit(...)
5
6# delete the branch
7client.init_branch("mybranch")
8
9# client now points to the main branch
10client.commit(...)
python
checkout
Checkout
Sets the client to point to the given branch.
Parameters
The name of the branch that the client should start pointing to. All subsequent
operations (e.g. commit
, query
) will be directed to this branch.
Default: False
If true, creates a new empty branch if the name
is not found within the branches synced with Fennel
1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
7
8# create and change active branch from `mybranch` to `mybranch2`
9client.checkout("mybranch2", init=True)
10assert client.branch() == "mybranch2"
11
12# all subsequent operations will be on `mybranch2`
python
Errors
checkout
does not raise any error.
If not specified via explicit checkout
, by default, clients point to the 'main' branch.
Note that checkout
doesn't validate that the name
points to a real branch by default. Instead, it just changes the local state of the client. If the branch doesn't
exist, subsequent branch operations will fail, not the checkout
itself. However, when init
is set to True
, checkout
will first create the branch if a real branch is not found and subsequently point to it.
branch
Branch
Get the name of the current branch.
Parameters
Doesn't take any parameters.
Returns
Returns the name of the branch that the client is pointing to.
1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
7
8# create and change active branch from `mybranch` to `mybranch2`
9client.checkout("mybranch2", init=True)
10assert client.branch() == "mybranch2"
11
12# all subsequent operations will be on `mybranch2`
python
erase
Erase
Method to hard-erase data from a dataset.
Data related to the provided erase keys is removed and will not be reflected to downstream dataset or any subsequent queries.
This method should be used as a way to comply with GDPR and other similar regulations that require "right to be forgotten". For operational deletion/correction of data, regular CDC mechanism must be used instead.
Erase only removes the data from the dataset in the request. If the data has already propagated to downstream datasets via pipelines, you may want to issue separate erase requests for all such datasets too.
Parameters
The dataset from which data needs to be erased. Can be provided either as a Dataset object or string representing the dataset name.
The dataframe containing the erase keys - all data matching these erase keys is removed. The columns of the dataframe must have the right names & types to be compatible with the erase keys defined in the schema of dataset.
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10 uid: int = field(key=True, erase_key=True)
11 amount: int
12 timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16client.erase(
17 Transaction,
18 erase_keys=pd.DataFrame({"uid": [1, 2, 3]}),
19)
python
POST /api/v1/query
Query
API to extract a set of output features given known values of some input features.
Headers
All Fennel REST APIs expect a content-type of application/json
.
Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.
Fennel uses header for passing branch name to the server against which we want to query.
Body Parameters:
List of fully qualified names of input features. Example name: Featureset.feature
List of fully qualified names of output features. Example name: Featureset.feature
.
Can also contain name of a featureset in which case all features in the featureset
are returned.
JSON representing the dataframe of input feature values. The json can either be an array of json objects, each representing a row; or it can be a single json object where each key maps to a list of values representing a column.
Strings of json are also accepted.
If true, the extracted features are also logged (often to serve as future training data).
Default: default
The name of the workflow with which features should be logged (only relevant
when log
is set to true).
Float between 0-1 describing the sample rate to be used for logging features
(only relevant when log
is set to true
).
Returns
The response dataframe is returned as column oriented json.
1url = "{}/api/v1/query".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5 "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7data = {"UserFeatures.userid": [1, 2, 3]}
8req = {
9 "outputs": ["UserFeatures"],
10 "inputs": ["UserFeatures.userid"],
11 "data": data,
12 "log": True,
13 "workflow": "test",
14}
15
16response = requests.post(url, headers=headers, data=req)
17assert response.status_code == requests.codes.OK, response.json()
python
1url = "{}/api/v1/query".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5 "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7data = [
8 {"UserFeatures.userid": 1},
9 {"UserFeatures.userid": 2},
10 {"UserFeatures.userid": 3},
11]
12req = {
13 "outputs": ["UserFeatures"],
14 "inputs": ["UserFeatures.userid"],
15 "data": data,
16 "log": True,
17 "workflow": "test",
18}
19
20response = requests.post(url, headers=headers, data=req)
21assert response.status_code == requests.codes.OK, response.json()
python
POST /api/v1/log
Log
Method to push data into Fennel datasets via webhook endpoints via REST API.
Headers
All Fennel REST APIs expect a content-type of application/json
.
Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.
Body Parameters
The name of the webhook source containing the endpoint to which the data should be logged.
The name of the webhook endpoint to which the data should be logged.
The data to be logged to the webhook. This json string could either be:
-
Row major where it's a json array of rows with each row written as a json object.
-
Column major where it's a dictionary from column name to values of that column as a json array.
1url = "{}/api/v1/log".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5}
6data = [
7 {
8 "user_id": 1,
9 "name": "John",
10 "age": 20,
11 "country": "Russia",
12 "timestamp": "2020-01-01",
13 },
14 {
15 "user_id": 2,
16 "name": "Monica",
17 "age": 24,
18 "country": "Chile",
19 "timestamp": "2021-03-01",
20 },
21 {
22 "user_id": 3,
23 "name": "Bob",
24 "age": 32,
25 "country": "USA",
26 "timestamp": "2020-01-01",
27 },
28]
29req = {
30 "webhook": "fennel_webhook",
31 "endpoint": "UserInfo",
32 "data": data,
33}
34response = requests.post(url, headers=headers, data=req)
35assert response.status_code == requests.codes.OK, response.json()
python
GET /api/v1/lineage
Lineage
Method to get lineage via REST API.
Headers
All Fennel REST APIs expect a content-type of application/json
.
Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.
Fennel uses header for passing branch name to the server against which we want to query.
1url = "{}/api/v1/lineage".format(SERVER)
2headers = {
3 "Content-Type": "application/json",
4 "Authorization": "Bearer <API-TOKEN>",
5 "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7
8response = requests.get(url, headers=headers)
9assert response.status_code == requests.codes.OK, response.json()
python
Core Types
Fennel supports the following data types, expressed as native Python type hints.
Implemented as signed 8 byte integer (int64
)
Implemented as signed 8 byte float with double
precision
Implemented as signed 16 byte integer (int128
) with int val as precision.
Implemented as standard 1 byte boolean
Arbitrary sequence of utf-8 characters. Like most programming languages, str
doesn't support arbitrary binary bytes though.
Arbitrary sequence of binary bytes. This is useful for storing binary data.
List of elements of any other valid type T
. Unlike Python lists, all elements
must have the same type.
Map from str
to data of any valid type T
.
Fennel does not support dictionaries with arbitrary types for keys - please reach out to Fennel support if you have use cases requiring that.
Same as Python Optional
- permits either None
or values of type T
.
Denotes a list of floats of the given fixed length i.e. Embedding[32]
describes a list of 32 floats. This is same as list[float]
but enforces the
list length which is important for dot product and other similar operations on
embeddings.
Describes a timestamp, implemented as microseconds since Unix epoch (so minimum granularity is microseconds). Can be natively parsed from multiple formats though internally is stored as 8-byte signed integer describing timestamp as microseconds from epoch in UTC.
Describes a date, implemented as days since Unix epoch. Can be natively parsed from multiple formats though internally is stored as 8-byte signed integer describing date as days epoch in UTC.
Describes the equivalent of a struct or dataclass - a container containing a fixed set of fields of fixed types.
Fennel uses a strong type system and post data-ingestion, data doesn't auto-coerce
across types. For instance, it will be a compile or runtime error if something
was expected to be of type float
but received an int
instead.
1# imports for data types
2from typing import List, Optional, Dict
3from datetime import datetime
4from fennel.dtypes import struct
5
6# imports for datasets
7from fennel.datasets import dataset, field
8from fennel.lib import meta
9
10@struct # like dataclass but verifies that fields have valid Fennel types
11class Address:
12 street: str
13 city: str
14 state: str
15 zip_code: Optional[str]
16
17@meta(owner="[email protected]")
18@dataset
19class Student:
20 id: int = field(key=True)
21 name: str
22 grades: Dict[str, float]
23 honors: bool
24 classes: List[str]
25 address: Address # Address is now a valid Fennel type
26 signup_time: datetime
python
Type Restrictions
Fennel type restrictions allow you to put additional constraints on base types and restrict the set of valid values in some form.
Restriction on the base type of str
. Permits only the strings matching the given
regex pattern.
Restriction on the base type of int
or float
. Permits only the numbers
between low
and high
(both inclusive by default). Left or right can be made
exclusive by setting min_strict
or max_strict
to be False respectively.
Restricts a type T
to only accept one of the given values
as valid values.
oneof
can be thought of as a more general version of enum
.
For the restriction to be valid, all the values
must themselves be of type T
.
1# imports for data types
2from datetime import datetime, timezone
3from fennel.dtypes import oneof, between, regex
4
5# imports for datasets
6from fennel.datasets import dataset, field
7from fennel.lib import meta
8from fennel.connectors import source, Webhook
9
10webhook = Webhook(name="fennel_webhook")
11
12@meta(owner="[email protected]")
13@source(webhook.endpoint("UserInfoDataset"), disorder="14d", cdc="upsert")
14@dataset
15class UserInfoDataset:
16 user_id: int = field(key=True)
17 name: str
18 age: between(int, 0, 100, strict_min=True)
19 gender: oneof(str, ["male", "female", "non-binary"])
20 email: regex(r"[^@]+@[^@]+\.[^@]+")
21 timestamp: datetime
python
Type Restriction Composition
These restricted types act as regular types -- they can be mixed/matched to form complex composite types. For instance, the following are all valid Fennel types:
list[regex('$[0-9]{5}$')]
- list of regexes matching US zip codesoneof(Optional[int], [None, 0, 1])
- a nullable type that only takes 0 or 1 as valid values
Data belonging to the restricted types is still stored & transmitted (e.g. in json encoding) as a regular base type. It's just that Fennel will reject data of base type that doesn't meet the restriction.
Duration
Fennel lets you express durations in an easy to read natural language as described below:
Symbol | Unit |
---|---|
y | Year |
w | Week |
d | Day |
h | Hour |
m | Minute |
s | Second |
There is no shortcut for month because there is a very high degree of
variance in month's duration- some months are 28 days, some are 30 days and
some are 31 days. A common convention in ML is to use 4 weeks
to describe a month.
A year is hardcoded to be exactly 365 days and doesn't take into account variance like leap years.
1"7h" -> 7 hours
2"12d" -> 12 days
3"2y" -> 2 years
4"3h 20m 4s" -> 3 hours 20 minutes and 4 seconds
5"2y 4w" -> 2 years and 4 weeks
text
Aggregate
Operator to do continuous window aggregations. Aggregate operator must always be preceded by a groupby operator.
Parameters
Positional argument specifying the list of aggregations to apply on the grouped dataset. This list can be passed either as an unpacked *args or as an explicit list as the first position argument.
See aggregations for the full list of aggregate functions.
Keyword argument indicating the time axis to aggregate along. If along
is None
, Fennel will aggregate along the timestamp of the input dataset.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Count,
7 Sum,
8)
9from fennel.dtypes import Continuous
10from fennel.lib import inputs
11from fennel.connectors import source, Webhook
12
13webhook = Webhook(name="webhook")
14
15@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
16@dataset
17class Transaction:
18 uid: int
19 amount: int
20 timestamp: datetime = field(timestamp=True)
21 transaction_time: datetime
22
23@dataset(index=True)
24class Aggregated:
25 # groupby field becomes the key field
26 uid: int = field(key=True)
27 # new fields are added to the dataset by the aggregate operation
28 total: int
29 count_1d: int
30 timestamp: datetime = field(timestamp=True)
31
32 @pipeline
33 @inputs(Transaction)
34 def aggregate_pipeline(cls, ds: Dataset):
35 return ds.groupby("uid").aggregate(
36 count_1d=Count(window=Continuous("forever")),
37 total=Sum(of="amount", window=Continuous("forever")),
38 along="transaction_time",
39 )
python
Returns
Returns a dataset where all columns passed to groupby
become the key columns,
the timestamp column stays as it is and one column is created for each aggregation.
The type of each aggregated column depends on the aggregate and the type of the corresponding column in the input dataset.
Aggregate is the terminal operator - no other operator can follow it and no other datasets can be derived from the dataset containing this pipeline.
Assign
Operator to add a new column to a dataset - the added column is neither a key column nor a timestamp column.
Parameters
The name of the new column to be added - must not conflict with any existing name on the dataset.
The data type of the new column to be added - must be a valid Fennel supported data type.
The function, which when given a subset of the dataset as a dataframe, returns the value of the new column for each row in the dataframe.
Fennel verifies at runtime that the returned series matches the declared dtype
.
Assign can also be given one or more expressions instead of Python lambdas - it can either have expressions or lambdas but not both. Expected types must also be present along with each expression (see example).
Unlike lambda based assign, all type validation and many other errors can be verified at the commit time itself (vs incurring runtime errors).
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def my_pipeline(cls, ds: Dataset):
24 return ds.assign("amount_sq", int, lambda df: df["amount"] ** 2)
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
10@dataset
11class Transaction:
12 uid: int = field(key=True)
13 amount: int
14 timestamp: datetime
15
16@dataset(index=True)
17class WithSquare:
18 uid: int = field(key=True)
19 amount: int
20 amount_sq: int
21 amount_half: float
22 timestamp: datetime
23
24 @pipeline
25 @inputs(Transaction)
26 def my_pipeline(cls, ds: Dataset):
27 return ds.assign(
28 amount_sq=(col("amount") * col("amount")).astype(int),
29 amount_half=(col("amount") / 2).astype(float),
30 )
python
Returns
Returns a dataset with one additional column of the given name
and type same
as dtype
. This additional column is neither a key-column or the timestamp
column.
Errors
Runtime error if the value returned from the lambda isn't a pandas Series of the declared type and the same length as the input dataframe.
When using expressions, errors may be raised during the import or commit if types don't match and/or there are other validation errors related to the expressions.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset
15class WithHalf:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def my_pipeline(cls, ds: Dataset):
24 return ds.assign(
25 "amount_sq", int, lambda df: df["amount"] * 0.5
26 )
python
1with pytest.raises(Exception):
2
3 from fennel.datasets import dataset, field, pipeline, Dataset
4 from fennel.lib import inputs
5 from fennel.connectors import source, Webhook
6 from fennel.expr import col
7
8 webhook = Webhook(name="webhook")
9
10 @source(webhook.endpoint("txn"), disorder="14d", cdc="upsert")
11 @dataset
12 class Transaction:
13 uid: int = field(key=True)
14 amount: int
15 timestamp: datetime
16
17 @dataset
18 class WithHalf:
19 uid: int = field(key=True)
20 amount: int
21 amount_sq: int
22 amount_half: int
23 timestamp: datetime
24
25 @pipeline
26 @inputs(Transaction)
27 def my_pipeline(cls, ds: Dataset):
28 return ds.assign(
29 amount_sq=(col("amount") * col("amount")).astype(int),
30 amount_half=(col("amount") / 2).astype(int),
31 )
python
Changelog
Operator to convert a keyed dataset into a CDC changelog stream. All key fields are converted into normal fields, and an additional column is added, indicating the type of change (insert or delete) for the delta.
Parameters
Kwarg that specifies the name of a boolean column which stores whether a delta was
a delete kind in the original dataset. Exactly one of this or insert
kwarg
should be set.
Kwarg that specifies the name of a boolean column which stores whether a delta was
an insert kind in the original dataset. Exactly one of this or delete
kwarg
should be set.
Returns
Returns a dataset with input keyed dataset into an append only CDC changelog stream. All key fields converted into normal fields, and an additional column is added, which contains the type of change (insert or delete) for the delta.
Errors
Error if neither of insert
or delete
kwarg is set.
Error if both insert
and delete
kwargs are set.
Dedup
Operator to dedup keyless datasets (e.g. event streams).
Parameters
Default: None
The list of columns to use for identifying duplicates. If not specified, all the columns are used for identifying duplicates.
If window is specified, two rows of the input dataset are considered duplicates when they are in the same window and have the same value for the by columns.
If window is not specified, two rows are considered duplicates when they have the exact same values for the timestamp column and all the by columns.
Default: None
The window to group rows for deduping. If not specified, the rows will be deduped only by the by
columns and the timestamp.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 txid: int
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Deduped:
17 txid: int
18 uid: int
19 amount: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def dedup_pipeline(cls, ds: Dataset):
25 return ds.dedup(by="txid")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 txid: int
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Deduped:
17 txid: int
18 uid: int
19 amount: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def dedup_by_all_pipeline(cls, ds: Dataset):
25 return ds.dedup()
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Session
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 txid: int
12 uid: int
13 amount: int
14 timestamp: datetime
15
16@dataset
17class Deduped:
18 txid: int
19 uid: int
20 amount: int
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def dedup_by_all_pipeline(cls, ds: Dataset):
26 return ds.dedup(by="txid", window=Session(gap="10s"))
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Tumbling
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 txid: int
12 uid: int
13 amount: int
14 timestamp: datetime
15
16@dataset
17class Deduped:
18 txid: int
19 uid: int
20 amount: int
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def dedup_by_all_pipeline(cls, ds: Dataset):
26 return ds.dedup(by="txid", window=Tumbling(duration="10s"))
python
Returns
Returns a keyless dataset having the same schema as the input dataset but with some duplicated rows filtered out.
Errors
Commit error to apply dedup on a keyed dataset.
Dedup on hopping window or tumbling window with lookback is not supported.
Drop
Operator to drop one or more non-key non-timestamp columns from a dataset.
Parameters
List of columns in the incoming dataset that should be dropped. This can be passed either as unpacked *args or as a Python list.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 country: str
13 weight: float
14 height: float
15 gender: str
16 timestamp: datetime
17
18@dataset(index=True)
19class Dropped:
20 uid: int = field(key=True)
21 gender: str
22 timestamp: datetime
23
24 @pipeline
25 @inputs(User)
26 def drop_pipeline(cls, user: Dataset):
27 return user.drop("height", "weight").drop(
28 columns=["city", "country"]
29 )
python
Returns
Returns a dataset with the same schema as the input dataset but with some columns
(as specified by columns
) removed.
Errors
Commit error on removing any key columns or the timestamp column.
Commit error on removing any column that doesn't exist in the input dataset.
1@source(webhook.endpoint("User"))
2@dataset
3class User:
4 uid: int = field(key=True)
5 city: str
6 timestamp: datetime
7
8@dataset
9class Dropped:
10 city: str
11 timestamp: datetime
12
13 @pipeline
14 @inputs(User)
15 def pipeline(cls, user: Dataset):
16 return user.drop("uid")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Dropped:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.drop("random")
python
Dropnull
Operator to drop rows containing null values (aka None in Python speak) in the given columns.
Parameters
List of columns in the incoming dataset that should be checked for presence of None values - if any such column has None for a row, the row will be filtered out from the output dataset. This can be passed either as unpacked *args or as a Python list.
If no arguments are given, columns
will be all columns with the type Optional[T]
in the dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 dob: str
12 city: Optional[str]
13 country: Optional[str]
14 gender: Optional[str]
15 timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19 uid: int = field(key=True)
20 dob: str
21 # dropnull changes the type of the columns to non-optional
22 city: str
23 country: str
24 gender: Optional[str]
25 timestamp: datetime
26
27 @pipeline
28 @inputs(User)
29 def dropnull_pipeline(cls, user: Dataset):
30 return user.dropnull("city", "country")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 dob: str
12 city: Optional[str]
13 country: Optional[str]
14 gender: Optional[str]
15 timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19 uid: int = field(key=True)
20 dob: str
21 # dropnull changes the type of all optional columns to non-optional
22 city: str
23 country: str
24 gender: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(User)
29 def dropnull_pipeline(cls, user: Dataset):
30 return user.dropnull()
python
Returns
Returns a dataset with the same name & number of columns as the input dataset but
with the type of some columns modified from Optional[T]
-> T
.
Errors
Commit error to pass a column without an optional type.
Commit error to pass a column that doesn't exist in the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: Optional[str]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("random")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 # dropnull can only be used on optional columns
12 city: str
13 timestamp: datetime
14
15@dataset
16class Derived:
17 uid: int = field(key=True)
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("city")
python
Explode
Operator to explode lists in a single row to form multiple rows, analogous to
to the explode
function in Pandas.
Only applicable to keyless datasets.
Parameters
The list of columns to explode. This list can be passed either as
unpacked *args or kwarg columns
mapping to an explicit list.
All the columns should be of type List[T]
for some T
in the input dataset and
after explosion, they get converted to a column of type Optional[T]
.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 skus: List[int]
12 prices: List[float]
13 timestamp: datetime
14
15@dataset
16class Derived:
17 uid: int
18 sku: Optional[int]
19 price: Optional[float]
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Orders)
24 def explode_pipeline(cls, ds: Dataset):
25 return (
26 ds
27 .explode("skus", "prices").rename(
28 {"skus": "sku", "prices": "price"}
29 )
30 )
python
Returns
Returns a dataset with the same number & name of columns as the input dataset but
with the type of exploded columns modified from List[T]
to Optional[T]
.
Empty lists are converted to None
values (hence the output types need to be
Optional[T]
).
Errors
Commit error to apply explode on an input dataset with key columns.
Commit error to explode using a column that is not of the type List[T]
.
Commit error to explode using a column that is not present in the input dataset.
For a given row, all the columns getting exploded must have lists of the same length, otherwise a runtime error is raised. Note that the lists can be of different type across rows.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: float
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: List[float]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price", "random")
python
Filter
Operator to selectively filter
out rows from a dataset.
Parameters
The actual filter function - takes a pandas dataframe containing a batch of rows
from the input dataset and is expected to return a series of booleans of the
same length. Only rows corresponding to True
are retained in the output dataset.
Alternatively, can also be a Fennel expression.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 signup_time: datetime
13
14@dataset(index=True)
15class Filtered:
16 uid: int = field(key=True)
17 city: str
18 signup_time: datetime
19
20 @pipeline
21 @inputs(User)
22 def my_pipeline(cls, user: Dataset):
23 return user.filter(lambda df: df["city"] != "London")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5from fennel.expr import col
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
10@dataset
11class User:
12 uid: int = field(key=True)
13 city: str
14 signup_time: datetime
15
16@dataset(index=True)
17class Filtered:
18 uid: int = field(key=True)
19 city: str
20 signup_time: datetime
21
22 @pipeline
23 @inputs(User)
24 def my_pipeline(cls, user: Dataset):
25 return user.filter(col("city") != "London")
python
Returns
Returns a dataset with the same schema as the input dataset, just with some rows potentially filtered out.
Errors
Runtime error if the value returned from the lambda isn't a pandas Series of the bool and of the same length as the input dataframe. When using expressions, any type and many other kinds of errors are caught at import or commit time statically.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 signup_time: datetime
13
14@dataset
15class Filtered:
16 uid: int = field(key=True)
17 city: str
18 signup_time: datetime
19
20 @pipeline
21 @inputs(User)
22 def my_pipeline(cls, user: Dataset):
23 return user.filter(lambda df: df["city"] + "London")
python
First
Operator to find the first element of a group by the row timestamp. First operator must always be preceded by a groupby operator.
Parameters
The first
operator does not take any parameters.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class FirstOnly:
16 uid: int = field(key=True)
17 amount: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def first_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").first()
python
Returns
The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.
For each group formed by grouping, one row is chosen having the lowest value in the timestamp field. In case of ties, the first seen row wins.
Groupby
Operator to group rows of incoming datasets to be processed by the next operator.
Technically, groupby isn't a standalone operator by itself since its output isn't a valid dataset. Instead, it becomes a valid operator when followed by first, latest, or aggregate.
Parameters
List of keys in the incoming dataset along which the rows should be grouped. This can be passed as unpacked *args or a Python list.
Optional field to specify the default window for all the aggregations in the following aggregate operator. If window parameter is used then the operator can only be followed by an aggregate operator and window will become a key field in the output schema.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 category: str
12 timestamp: datetime
13
14@dataset
15class FirstInCategory:
16 category: str = field(key=True)
17 uid: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def groupby_pipeline(cls, transactions: Dataset):
23 return transactions.groupby("category").first()
python
Errors
Commit error if trying to group by columns that don't exist in the input dataset.
Commit error if trying to do a groupby via the timestamp column of the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8 webhook.endpoint("Transaction"), disorder="14d", cdc="append"
9)
10@dataset
11class Transaction:
12 uid: int
13 category: str
14 timestamp: datetime
15
16@dataset
17class FirstInCategory:
18 category: str = field(key=True)
19 uid: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def bad_pipeline(cls, transactions: Dataset):
25 return transactions.groupby("non_existent_column").first()
python
Join
Operator to join two datasets. The right hand side dataset must have one or more key columns and the join operation is performed on these columns.
Parameters
The right hand side dataset to join this dataset with. RHS dataset must be a keyed dataset and must also be an input to the pipeline (vs being an intermediary dataset derived within a pipeline itself).
Required kwarg indicating whether the join should be an inner join (how="inner"
)
or a left-outer join (how="left"
). With "left"
, the output dataset may have
a row even if there is no matching row on the right side.
Default: None
Kwarg that specifies the list of fields along which join should happen. If present, both left and right side datasets must have fields with these names and matching data types (data types on left hand side can be optional). This list must be identical to the names of all key columns of the right hand side.
If this isn't set, left_on
and right_on
must be set instead.
Default: None
Kwarg that specifies the list of fields from the left side dataset that should be
used for joining. If this kwarg is set, right_on
must also be set. Note that
right_on
must be identical to the names of all the key columns of the right side.
Default: None
Kwarg that specifies the list of fields from the right side dataset that should be
used for joining. If this kwarg is setup, left_on
must also be set. The length
of left_on
and right_on
must be the same and corresponding fields on both
sides must have the same data types.
Default: ("forever", "0s")
Optional kwarg specifying the time window relative to the left side timestamp
within which the join should be performed. This can be seen as adding another
condition to join like WHERE left_time - d1 < right_time AND right_time < left_time + d2
where (d1, d2) = within.
- The first value in the tuple represents how far back in time should a join happen. The term "forever" means that we can go infinitely back in time when searching for an event to join from the left-hand side data.
- The second value in the tuple represents how far ahead in time we can go to
perform a join. This is useful in cases when the corresponding RHS data of
the join can come later. The default value for this parameter is
("forever", "0s")
which means that we can go infinitely back in time and the RHS data should be available for the event time of the LHS data.
Default: None
Optional kwarg that specifies the list of (non-key) fields of the right
dataset that should be included in the output dataset. If this kwarg is
not set, all such fields are included in the output dataset. If right dataset's
timestamp field is included in fields
, then it is included as a normal field
in the output dataset, with left dataset's timestamp field as the output
dataset's timestamp field.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 merchant: int
12 amount: int
13 timestamp: datetime
14
15@source(
16 webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
17)
18@dataset(index=True)
19class MerchantCategory:
20 # right side of the join can only be on key fields
21 merchant: int = field(key=True)
22 category: str
23 updated_at: datetime # won't show up in joined dataset
24
25@dataset
26class WithCategory:
27 uid: int
28 merchant: int
29 amount: int
30 timestamp: datetime
31 category: str
32
33 @pipeline
34 @inputs(Transaction, MerchantCategory)
35 def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
36 return tx.join(merchant_category, on=["merchant"], how="inner")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from typing import Optional
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 merchant: Optional[int]
13 amount: int
14 timestamp: datetime
15
16@source(
17 webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
18)
19@dataset(index=True)
20class MerchantCategory:
21 # right side of the join can only be on key fields
22 merchant: int = field(key=True)
23 category: str
24 updated_at: datetime # won't show up in joined dataset
25
26@dataset
27class WithCategory:
28 uid: int
29 merchant: Optional[int]
30 amount: int
31 timestamp: datetime
32 category: Optional[str]
33
34 @pipeline
35 @inputs(Transaction, MerchantCategory)
36 def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
37 return tx.join(merchant_category, on=["merchant"], how="left")
python
Returns
Returns a dataset representing the joined dataset having the same keys & timestamp columns as the LHS dataset.
The output dataset has all the columns from the left dataset and all non-key non-timestamp columns from the right dataset.
If the join was of type inner
, the type of a joined
RHS column of type T
stays T
but if the join was of type left
, the type in
the output dataset becomes Optional[T]
if it was T
on the RHS side.
For LHS columns, the type is the same as the type in the LHS dataset if join type is left
.
If the join type is inner
, if a join column on the LHS is Optional[T]
, then the type
in the output dataset is T
(i.e., the Optional
is dropped).
Errors
Commit error to do a join with a dataset that doesn't have key columns.
Commit error to do a join with a dataset that is not an input to the pipeline but instead is an intermediate dataset derived during the pipeline itself.
Commit error if join will result in a dataset having two columns of the same name. A common way to work-around this is to rename columns via the rename operator before the join.
Commit error if the number/type of the join columns on the left and right side don't match.
Latest
Operator to find the latest element of a group by the row timestamp. Latest operator must always be preceded by a groupby operator.
Latest operator is a good way to effectively convert a stream of only append to a time-aware upsert stream.
Parameters
The latest
operator does not take any parameters.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class LatestOnly:
16 uid: int = field(key=True)
17 amount: int
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def latest_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").latest()
python
Returns
The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.
The row with the maximum timestamp is chosen for each group. In case of ties, the last seen row wins.
Rename
Operator to rename columns of a dataset.
Parameters
Dictionary mapping from old column names to their new names.
All columns should still have distinct and valid names post renaming.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 weight: float
12 height: float
13 timestamp: datetime
14
15@dataset(index=True)
16class Derived:
17 uid: int = field(key=True)
18 # rename changes the name of the columns
19 weight_lb: float
20 height_in: float
21 timestamp: datetime
22
23 @pipeline
24 @inputs(User)
25 def rename_pipeline(cls, user: Dataset):
26 return user.rename(
27 {"weight": "weight_lb", "height": "height_in"}
28 )
python
Returns
Returns a dataset with the same schema as the input dataset, just with the columns renamed.
Errors
Commit error if there is no existing column with name matching each of the keys in the rename dictionary.
Commit error if after renaming, there will be two columns in the dataset having the same name.
Select
Operator to select some columns from a dataset.
Parameters
List of columns in the incoming dataset that should be selected into the output dataset. This can be passed either as unpacked *args or as kwarg set to a Python list.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10 uid: int = field(key=True)
11 weight: float
12 height: float
13 city: str
14 country: str
15 gender: str
16 timestamp: datetime
17
18@dataset(index=True)
19class Selected:
20 uid: int = field(key=True)
21 weight: float
22 height: float
23 timestamp: datetime
24
25 @pipeline
26 @inputs(User)
27 def select_pipeline(cls, user: Dataset):
28 return user.select("uid", "height", "weight")
python
Returns
Returns a dataset containing only the selected columns. Timestamp field is automatically included whether explicitly provided in the select or not.
Errors
Select, like most other operators, can not change the key or timestamp columns.
As a result, not selecting all the key columns is a commit
error.
Commit error to select a column that is not present in the input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Selected:
16 city: str
17 timestamp: datetime
18
19 @pipeline
20 @inputs(User)
21 def bad_pipeline(cls, user: Dataset):
22 return user.select("city")
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10 uid: int = field(key=True)
11 city: str
12 timestamp: datetime
13
14@dataset
15class Selected:
16 uid: int = field(key=True)
17 city: str
18 timestamp: datetime
19
20 @pipeline
21 @inputs(User)
22 def bad_pipeline(cls, user: Dataset):
23 return user.select("uid", "random")
python
Transform
Catch all operator to add/remove/update columns.
Parameters
The transform function that takes a pandas dataframe containing a batch of rows from the input dataset and returns an output dataframe of the same length, though potentially with different set of columns.
The expected schema of the output dataset. If not specified, the schema of the input dataset is used.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def transform_pipeline(cls, ds: Dataset):
24 schema = ds.schema()
25 schema["amount_sq"] = int
26 return ds.transform(
27 lambda df: df.assign(amount_sq=df["amount"] ** 2), schema
28 )
python
Returns
Returns a dataset with the schema as specified in schema
and rows as transformed
by the transform function.
Errors
Runtime error if the dataframe returned by the transform function doesn't match
the provided schema
.
Commit error if transform tries to modify key/timestamp columns.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8 webhook.endpoint("Transaction"), disorder="14d", cdc="upsert"
9)
10@dataset
11class Transaction:
12 uid: int = field(key=True)
13 amount: int
14 timestamp: datetime
15
16def transform(df: pd.DataFrame) -> pd.DataFrame:
17 df["user"] = df["uid"]
18 df.drop(columns=["uid"], inplace=True)
19 return df
20
21@dataset
22class Derived:
23 user: int = field(key=True)
24 amount: int
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def bad_pipeline(cls, ds: Dataset):
30 schema = {"user": int, "amount": int, "timestamp": datetime}
31 return ds.transform(transform, schema)
python
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10 uid: int = field(key=True)
11 amount: int
12 timestamp: datetime
13
14@dataset
15class WithHalf:
16 uid: int = field(key=True)
17 amount: int
18 amount_sq: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 schema = ds.schema()
25 schema["amount_sq"] = int
26 return ds.transform(
27 lambda df: df.assign(amount_sq=str(df["amount"])), schema
28 ) # noqa
python
Union
Operator to union rows from two datasets of the identical schema. Only
applicable to keyless datasets. Written as simple +
operator on two datasets.
Returns
Returns a dataset with the same schema as both the input datasets but containing rows from both of them. If both contain the identical row, two copies of that row are present in the output datasets.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, S3, Kafka
4
5cutoff = datetime(2024, 1, 1, 0, 0, 0)
6s3 = S3(name="mys3")
7bucket = s3.bucket("data", path="orders")
8kafka = Kafka(
9 name="my_kafka",
10 bootstrap_servers="localhost:9092",
11 security_protocol="SASL_PLAINTEXT",
12 sasl_mechanism="PLAIN",
13 sasl_plain_username=os.environ["KAFKA_USERNAME"],
14 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
15)
16
17@source(bucket, cdc="append", disorder="2d", until=cutoff)
18@dataset
19class OrdersBackfill:
20 uid: int
21 skuid: int
22 timestamp: datetime
23
24@source(kafka.topic("order"), cdc="append", disorder="1d", since=cutoff)
25@dataset
26class OrdersLive:
27 uid: int
28 skuid: int
29 timestamp: datetime
30
31@dataset
32class Union:
33 uid: int
34 skuid: int
35 timestamp: datetime
36
37 @pipeline
38 @inputs(OrdersBackfill, OrdersLive)
39 def explode_pipeline(cls, backfill: Dataset, live: Dataset):
40 return backfill + live
python
Errors
Union operator is defined only when both the input datasets have the same schema. Commit error to apply union on input datasets with different schemas.
Commit error to apply union on input datasets with key columns.
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: float
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price")
python
1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10 uid: int
11 price: List[float]
12 timestamp: datetime
13
14@dataset
15class Derived:
16 uid: int
17 price: float
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Orders)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.explode("price", "random")
python
Avro Registry
Several Fennel sources work with Avro format. When using Avro, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.
Fennel supports integration with avro schema registries.
Parameters
String denoting the provider of the registry. As of right now, Fennel only supports "confluent" avro registry though more such schema registries may be added over time.
The URL where the schema registry is hosted.
User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.
Assuming authentication is needed, either username/password must be provided or a token, but not both.
The password associated with the username.
Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.
1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5 name="my_kafka",
6 bootstrap_servers="localhost:9092", # could come via os env var too
7 security_protocol="SASL_PLAINTEXT",
8 sasl_mechanism="PLAIN",
9 sasl_plain_username=os.environ["KAFKA_USERNAME"],
10 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13avro = Avro(
14 registry="confluent",
15 url=os.environ["SCHEMA_REGISTRY_URL"],
16 username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17 password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
python
Protobuf Registry
Several Fennel sources work with Protobuf format. When using Protobuf, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.
Fennel supports integration with protobuf schema registries.
Parameters
String denoting the provider of the registry. As of right now, Fennel only supports "confluent" protobuf registry though more such schema registries may be added over time.
The URL where the schema registry is hosted.
User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.
Assuming authentication is needed, either username/password must be provided or a token, but not both.
The password associated with the username.
Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.
1from fennel.connectors import source, Kafka, Protobuf
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5 name="my_kafka",
6 bootstrap_servers="localhost:9092", # could come via os env var too
7 security_protocol="SASL_PLAINTEXT",
8 sasl_mechanism="PLAIN",
9 sasl_plain_username=os.environ["KAFKA_USERNAME"],
10 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13protobuf = Protobuf(
14 registry="confluent",
15 url=os.environ["SCHEMA_REGISTRY_URL"],
16 username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17 password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=protobuf), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
python
BigQuery
Data connector to Google BigQuery databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The project ID of the Google Cloud project containing the BigQuery dataset.
The ID of the BigQuery dataset containing the table(s) to replicate.
A dictionary containing the credentials for the Service Account to use to access BigQuery. See below for instructions on how to obtain this.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
Errors
Fennel tries to test the connection with your BigQuery during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
BigQuery Credentials
Interfacing with BigQuery requires credentials for a Service Account with the "BigQuery User" role at Project level and "BigQuery Data Editor" role at Dataset level. "BigQuery User" role grants permissions to run BigQuery jobs and "BigQuery Data Editor" role grants permissions to read and update table data and its metadata. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.
The easiest way to create a Service Account is to follow GCP's guide
for Creating a Service Account. Once you've
created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles.
Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com
Then, add the service account as a Member of your Google Cloud Project with the "BigQuery User" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.
At this point, you should have a service account with the "BigQuery User" project-level permission.
Similarly, provide the "BigQuery Data Editor" permission to the service account by following Granting Access to Dataset in the Google documentation.
To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.
1from fennel.connectors import source, BigQuery
2from fennel.datasets import dataset
3
4bq = BigQuery(
5 name="my_bigquery",
6 project_id="my_project",
7 dataset_id="my_dataset",
8 service_account_key={
9 "type": "service_account",
10 "project_id": "fake-project-356105",
11 "client_email": "[email protected]",
12 "client_id": "103688493243243272951",
13 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
14 "token_uri": "https://oauth2.googleapis.com/token",
15 "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
16 },
17)
18
19table = bq.table("user", cursor="timestamp")
20
21@source(table, disorder="14d", cdc="append")
22@dataset
23class UserClick:
24 uid: int
25 ad_id: int
26 timestamp: datetime
python
Deltalake
Data connector to read data from tables in deltalake living in S3.
Deltalake connector is implemented via s3 connector - just the format parameter needs to be setup as 'delta' and source CDC must be passed as 'native'.
To enable Change Data Feed (CDF) on your table, follow the instructions in this documentation
Fennel doesn't support reading delta tables from HDFS or any other non-S3 storage.
Fennel supports only native CDC with data in Deltalake format. If your data is of the append type, enable CDF on the table and use native CDC mode.
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="mys3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11 s3.bucket("data", prefix="user", format="delta"),
12 every="1h",
13 disorder="14d",
14 cdc="native",
15)
16@dataset
17class User:
18 uid: int = field(key=True)
19 email: str
20 timestamp: datetime
python
Hudi
Data connector to read data from Apache Hudi tables in S3.
Hudi connector is implemented via s3 connector - just the format parameter needs to be setup as 'hudi'
Fennel doesn't support reading hudi tables from HDFS or any other non-S3 storage.
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="mys3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11 s3.bucket("data", prefix="user", format="hudi"),
12 disorder="14d",
13 cdc="upsert",
14 every="1h",
15)
16@dataset
17class User:
18 uid: int = field(key=True)
19 email: str
20 timestamp: datetime
python
Kafka
Data connector to any data store that speaks the Kafka protocol (e.g. Native Kafka, MSK, Redpanda etc.)
Cluster Parameters
A name to identify the source. This name should be unique across ALL connectors.
This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself and discover the rest of the brokers in the cluster.
Addresses are written as host & port pairs and can be specified either as a
single server (e.g. localhost:9092
) or a comma separated list of several
servers (e.g. localhost:9092,another.host:9092
).
Protocol used to communicate with the brokers.
SASL mechanism to use for authentication.
SASL username.
SASL password.
Topic Parameters
The name of the kafka topic that needs to be sourced into the dataset.
1from fennel.connectors import source, Kafka
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5 name="my_kafka",
6 bootstrap_servers="localhost:9092", # could come via os env var too
7 security_protocol="SASL_PLAINTEXT",
8 sasl_mechanism="PLAIN",
9 sasl_plain_username=os.environ["KAFKA_USERNAME"],
10 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13@source(kafka.topic("user", format="json"), disorder="14d", cdc="upsert")
14@dataset
15class SomeDataset:
16 uid: int = field(key=True)
17 email: str
18 timestamp: datetime
python
Fennel supports only Append and Upsert mode CDC with data in Protobuf format. If you require support for CDC data format, please reach out to Fennel support.
Errors
Fennel server tries to connect with the Kafka broker during the commit
operation
itself to validate connectivity - as a result, incorrect URL/Username/Password
etc will be caught at commit time itself as an error.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Kafka can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Kinesis
Data connector to ingest data from AWS Kinesis.
Parameters for Defining Source
A name to identify the source. The name should be unique across all Fennel connectors.
The arn of the role that Fennel should use to access the Kinesis stream. The role must already exist and Fennel's principal must have been given the permission to assume this role (see below for details or talk to Fennel support if you need help).
Stream Parameters
The arn of the Kinesis stream. The corresponding role_arn
must have
appropriate permissions for this stream. Providing a stream that either doesn't
exist or can not be read using the given role_arn
will result in an error
during the commit operation.
The initial position in the stream from which Fennel should start ingestion. See Kinesis ShardIteratorType for more context. Allowed values are:
"latest"
- start from the latest data (starting a few minutes aftercommit
)"trim_horizon"
- start from the oldest data that hasn't been trimmed/expired yet.- datetime - start from the position denoted by this timestamp (i.e. equivalent
to
AT_TIMESTAMP
in Kinesis vocabulary).
If choosing the datetime option, the timestamp can be specified as a datetime
object, or as an int
representing seconds since the epoch, or as a float
representing {seconds}.{microseconds}
since the epoch or as an ISO-8601 formatted str
.
Note that this timestamp is the time attached with the Kinesis message itself at the time of production, not any timestamp field inside the message.
The format of the data in the Kinesis stream. Most common value is "json"
though Fennel also supports Avro.
Errors
Fennel server tries to connect with Kinesis during the commit
operation
itself to validate connectivity - as a result, incorrect stream/role ARNs or
insufficient permissions will be caught at commit time itself as an error.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5 name="my_kinesis",
6 role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10 stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11 init_position=datetime(2023, 1, 5), # Start ingesting from Jan 5, 2023
12 format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18 uid: int
19 order_id: str
20 amount: float
21 timestamp: datetime
python
1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5 name="my_kinesis",
6 role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10 stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11 init_position="latest",
12 format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18 uid: int
19 order_id: str
20 amount: float
21 timestamp: datetime
python
Managing Kinesis Access
Fennel creates a special role with name prefixed by FennelDataAccessRole-
in
your AWS account for role-based access. The role corresponding to the role_arn
passed to Kinesis source should have the following trust policy allowing this
special Fennel role to assume the kinesis role.
See Trust Policy
Specify the exact role_arn
in the form
arn:aws:iam::<fennel-data-plane-account-id>:role/<FennelDataAccessRole-...>
without any wildcards.
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "",
6 "Effect": "Allow",
7 "Principal": {
8 "AWS": [
9 "<role_arn>"
10 ]
11 },
12 "Action": "sts:AssumeRole"
13 }
14 ]
15}
python
Also attach the following permission policy. Add more streams to the Resource
field if more than one streams need to be consumed via this role. Here
the account-id
is your account where the stream lives.
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "AllowKinesisAccess",
6 "Effect": "Allow",
7 "Action": [
8 "kinesis:DescribeStream",
9 "kinesis:DescribeStreamSummary",
10 "kinesis:DescribeStreamConsumer",
11 "kinesis:RegisterStreamConsumer",
12 "kinesis:ListShards",
13 "kinesis:GetShardIterator",
14 "kinesis:SubscribeToShard",
15 "kinesis:GetRecords"
16 ],
17 "Resource": [
18 "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>",
19 "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>/*"
20 ]
21 }
22 ]
23}
python
MongoDB
Data connector to MongoDB databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The hostname of the database.
The name of the Mongo database to establish a connection with.
The username which should be used to access the database. This username should
have access to the database db_name
.
The password associated with the username.
Fennel uses SRV connection format for authentication which is supported in Mongo versions 3.6 and later. If you have a self-hosted DB with version earlier than 3.6, please reach out to Fennel support.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form db.collection.find({"cursor": { "$gte": last_cursor - disorder } })
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
It is recommended to put an index on the cursor
field so that Fennel ingestion
queries don't create too much load on your MongoDB database.
1from fennel.connectors import source, Mongo
2from fennel.datasets import dataset
3
4mongo = Mongo(
5 name="mongo_src",
6 host="atlascluster.ushabcd.mongodb.net",
7 db_name="mongo",
8 username="username",
9 password="password",
10)
11
12collection = mongo.collection("user", cursor="timestamp")
13
14@source(collection, disorder="14d", cdc="append")
15@dataset
16class UserClick:
17 uid: int
18 ad_id: int
19 timestamp: datetime
python
Errors
Fennel tries to test the connection with your MongoDB during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
MySQL
Data connector to MySQL databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The hostname of the database.
Default: 3306
The port to connect to.
The name of the MySQL database to establish a connection with.
The username which should be used to access the database. This username should
have access to the database db_name
.
The password associated with the username.
Default: None
Additional properties to pass to the JDBC URL string when connecting to the
database formatted as key=value
pairs separated by the symbol &
. For
instance: key1=value1&key2=value2
.
If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params
to enabledTLSProtocols=TLSv1.2
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
It is recommended to put an index on the cursor
field so that Fennel ingestion
queries don't create too much load on your MySQL
1from fennel.connectors import source, MySQL
2from fennel.datasets import dataset, field
3
4mysql = MySQL(
5 name="my_mysql",
6 host="my-favourite-mysql.us-west-2.rds.amazonaws.com",
7 port=3306, # could be omitted, defaults to 3306
8 db_name=os.environ["DB_NAME"],
9 username=os.environ["MYSQL_USERNAME"],
10 password=os.environ["MYSQL_PASSWORD"],
11 jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = mysql.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19 uid: int = field(key=True)
20 email: str
21 created_at: datetime
22 updated_at: datetime = field(timestamp=True)
python
Errors
Fennel tries to test the connection with your MySQL during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in MySQL is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Postgres
Data connector to Postgres databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The hostname of the database.
Default: 5432
The port to connect to.
The name of the Postgres database to establish a connection with.
The username which should be used to access the database. This username should
have access to the database db_name
.
The password associated with the username.
Default: None
Additional properties to pass to the JDBC URL string when connecting to the
database formatted as key=value
pairs separated by the symbol &
. For
instance: key1=value1&key2=value2
.
If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params
to enabledTLSProtocols=TLSv1.2
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
It is recommended to put an index on the cursor
field so that Fennel ingestion
queries don't create too much load on your Postgres database.
1from fennel.connectors import source, Postgres
2from fennel.datasets import dataset, field
3
4postgres = Postgres(
5 name="my_postgres",
6 host="my-favourite-pg.us-west-2.rds.amazonaws.com",
7 port=5432, # could be omitted, defaults to 5432
8 db_name=os.environ["DB_NAME"],
9 username=os.environ["POSTGRES_USERNAME"],
10 password=os.environ["POSTGRES_PASSWORD"],
11 jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = postgres.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19 uid: int = field(key=True)
20 email: str
21 created_at: datetime
22 updated_at: datetime = field(timestamp=True)
python
Errors
Fennel tries to test the connection with your Postgres during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Postgres is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Pub/Sub
Data connector to Google Pub/Sub messaging service. Pub/Sub only supports at least once delivery guarantee. If exactly-once delivery is required, please use Dedup operator to make it exactly once.
Project Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The project ID of the Google Cloud project containing the Pub/Sub topic
A dictionary containing the credentials for the Service Account to use to access Pub/Sub. See below for instructions on how to obtain this.
Topic Parameters
The name of the topic from which the data should be ingested.
The format of the data in Pub/Sub topic. Only "json"
is supported
Fennel supports only Append and Upsert mode CDC with data in JSON format. If you require support for schema or CDC data format, please reach out to Fennel support.
Errors
Fennel tries to test the connection with your Pub/Sub topic during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Pub/Sub Credentials
Interfacing with Pub/Sub requires credentials for a Service Account with the "Pub/Sub Subscriber" role, which grants permissions to create subscription and read messages from the subscribed topic. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.
The easiest way to create a Service Account is to follow GCP's guide
for Creating a Service Account. Once you've
created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles.
Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com
Then, add the service account as a Member of your Google Cloud Project with the "Pub/Sub Subscriber" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.
At this point, you should have a service account with the "Pub/Sub Subscriber" project-level permission.
To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.
1from fennel.connectors import source, PubSub
2from fennel.datasets import dataset, field
3
4pubsub = PubSub(
5 name="pubsub_src",
6 project_id="test_project",
7 service_account_key={
8 "type": "service_account",
9 "project_id": "fake-project-356105",
10 "client_email": "[email protected]",
11 "client_id": "103688493243243272951",
12 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
13 "token_uri": "https://oauth2.googleapis.com/token",
14 "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
15 },
16)
17
18@source(
19 pubsub.topic("test_topic", format="json"), disorder="2d", cdc="upsert"
20)
21@dataset
22class UserClick:
23 uid: int = field(key=True)
24 ad_id: int
25 timestamp: datetime
python
Redshift
Data connector to Redshift databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
To handle potentially large volume of data, Fennel asks Redshift to dump
query results in a temporary S3 bucket (since it's faster to go via S3). But this
requires Redshift to be able to access that S3 bucket. s3_access_role_arn
is
the IAM role ARN that Redshift should use to access S3.
This IAM role should be given full access to S3 and should also be assumable by your Redshift.
Steps to set up IAM role:
- Create an IAM role by following this documentation.
- Provide full access to S3 this role
- Associate IAM role with Redshift cluster by following this documentation.
You can refer to a sample policy in the right side code snippets. Do not set this parameter when using username/password for authentication
The name of the database where the relevant data resides.
The username which should be used to access the database. This username should have access to the
database db_name
. Do not set this parameter when using IAM authentication
The password associated with the username. Do not set this parameter when using IAM authentication
The hostname of the database.
Default: 5439
The port to connect to.
The name of the schema where the required data table(s) resides.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
For large datasets, it is recommended to use IAM-based authentication, as username/password-based authentication does not store temporary data in S3
Errors
Fennel tries to test the connection with your Redshift during commit
itself so any
connectivity issue (e.g. wrong database name, host name, port, etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Redshift is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
1from fennel.connectors import source, Redshift
2from fennel.datasets import dataset
3
4redshift = Redshift(
5 name="my_redshift",
6 s3_access_role_arn="arn:aws:iam::123:role/Redshift",
7 db_name=os.environ["DB_NAME"],
8 host="test-workgroup.1234.us-west-2.redshift-serverless.amazonaws.com",
9 port=5439, # could be omitted, defaults to 5439
10 schema="public",
11)
12
13table = redshift.table("user", cursor="timestamp")
14
15@source(table, disorder="14d", cdc="append")
16@dataset
17class UserClick:
18 uid: int
19 ad_id: int
20 timestamp: datetime
python
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Effect": "Allow",
6 "Action": "redshift:DescribeClusters",
7 "Resource": "*"
8 },
9 {
10 "Effect": "Allow",
11 "Action": [
12 "redshift:ModifyClusterIamRoles",
13 "redshift:CreateCluster"
14 ],
15 "Resource": [
16 # Redshift workgroup ARN
17 "arn:aws:redshift-serverless:us-west-2:82448945123:workgroup/0541e0ae-2ad1-4fe0-b2f3-4d6c1d3453e"
18 ]
19 },
20 {
21 "Effect": "Allow",
22 "Action": "iam:PassRole",
23 "Resource": [
24 # ARN of role created above
25 "arn:aws:iam::82448945123:role/RedshiftS3AccessRole",
26 ]
27 }
28 ]
29}
JSON
S3
Data connector to source data from S3.
Account Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
Default: None
AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.
Default: None
AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.
Default: None
Role ARN to assume to get access to S3. This field is not required if role-based access is used or if AWS access and secret keys are used or if the bucket is public.
Bucket Parameters
The name of the S3 bucket where the data files exist.
Default: None
The prefix of the bucket (as relative path within bucket) where the data files
exist. For instance, some-folder/
or A/B/C
are all valid prefixes. Prefix
can not have any wildcard characters.
Exactly one of prefix
or path
must be provided.
Default: None
A /
delimited path (relative to the bucket) describing the objects to be
ingested. The valid path parts are:
- static string of alphanumeric characters, underscores, hyphens or dots.
*
wild card - this must be the entire path part:*/*
is valid butfoo*/
is not.- string with a strftime format specifier (e.g
yyyymmdd=%Y%m%d
)
If you have a large volume of data or objects and your bucket is time partitioned,
it's highly recommended to include details of time partitioning in your path instead
of providing *
- Fennel can use this information to optimize the ingestion.
For example, if your bucket has the structure orders/{country}/date={date}/store={store}/{file}.json
, provide the path orders/*/date=%Y%m%d/*/*
Exactly one of prefix
or path
must be provided.
Default: ,
The character delimiting individual cells in the CSV data - only relevant when
format is CSV
, otherwise it's ignored.
The default value is ","
can be overridden by any other 1-character string. For
example, to use tab-delimited data enter "\t"
.
Default: None
Relevant only when using path
with strftime specifiers.
To do incremental ingestion of data from time partitioned S3 bucket, Fennel needs to know what time ranges may be present in any given partition. While not common, sometimes the the timestamp field used to time-partition data in your S3 may be different from the field that you want to be the "official" timestamp field of the Fennel dataset.
In such cases, it's possible that a bucket corresponding to say month=01
contains rows where value of the timestamp field is outside of month=01
.
spread
is a measure of how wide this gap can be. More formally, spread
indicates the maximum difference between the partition interval and
the value of the timestamp field for data in that partition. A None
value indicates no spread,
which is the case when the partitioning scheme uses the same timestamp values as the dataset's
timestamp
column. spread
is specified using Fennel's Duration type.
Examples:
- Given a path
txns/date=20240207/hh=06/
andspread=None
, fennel expects all data under this path to have timestamp between2024-02-07 06:00:00
and2024-02-07 07:00:00
- Given a path
txns/date=20240207/hh=06/
andspread="3d"
, fennel expects all data under this path to have a timestamp between2024-02-04 06:00:00
and2024-02-10 07:00:00
- Given a path
txns/date=20240207/
andspread="6h"
, fennel expects all data under this path to have a timestamp between2024-02-06 18:00:00
and2024-02-08 06:00:00
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="mys3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11 s3.bucket("datalake", prefix="user"),
12 every="1h",
13 disorder="14d",
14 cdc="upsert",
15)
16@dataset
17class User:
18 uid: int = field(key=True)
19 email: str
20 timestamp: datetime
python
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="my_s3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10bucket = s3.bucket(
11 "data", path="user/*/date-%Y-%m-%d/*", format="parquet", spread="2d"
12)
13
14@source(bucket, disorder="14d", cdc="upsert", every="1h")
15@dataset
16class User:
17 uid: int = field(key=True)
18 email: str
19 timestamp: datetime
python
Errors
Fennel server try to do some lightweight operations on the bucket during the commit operation - all connectivity or authentication related errors should be caught during the commit itself.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in S3 can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Enabling IAM Access
Fennel creates a role with name prefixed by FennelDataAccessRole-
in
your AWS account for role-based access. In order to use IAM access for s3, please
ensure that this role has access to read and do list files on the buckets of
interest.
With that ready, simply don't specify aws_access_key_id
and
aws_secret_access_key
and Fennel will automatically fall back to IAM based
access.
In case you do not want to provide S3 access to FennelDataAccessRole-
, pass role_arn
parameter inside connector params and make sure FennelDataAccessRole-
can assume that IAM role
- Fennel uses
file_last_modified
property exported by S3 to track what data has been seen so far and hence a cursor field doesn't need to be specified. - Fennel supports
role_arn
parameter only for CSV, JSON, Parquet and Avro data formats. In case you require support for Hudi or Deltalake format, please reach out to Fennel support
Secret
Secret can be used to pass sensitive information like username/password to Fennel using Secrets Manager secret reference.
In order to use Secret one of the below should be followed:
- Fennel Data access role should be given access to the secret.
- Or a new role can be created with access to secrets needed and Fennel Data access role can be added as trusted entities for that new role. so that the new role can be assumed to access the secrets.
Parameters
The ARN of the secret.
The Optional ARN of the role to be assumed to access the secret. This should be provided if a new role is created for Fennel Data access role to assume.
1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3from fennel.integrations.aws import Secret
4
5aws_secret = Secret(
6 arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
7 role_arn="arn:aws:iam::123456789012:role/secret-access-role",
8)
9
10# secret with above arn has content like below
11# {
12# "kafka": {
13# "username": "actual-kafka-username",
14# "password": "actual-kafka-password"
15# },
16# "schema_registry": {
17# "username": "actual-schema-registry-username",
18# "password": "actual-schema-registry-password"
19# }
20# }
21
22kafka = Kafka(
23 name="my_kafka",
24 bootstrap_servers="localhost:9092", # could come via os env var too
25 security_protocol="SASL_PLAINTEXT",
26 sasl_mechanism="PLAIN",
27 sasl_plain_username=aws_secret["kafka"]["username"],
28 sasl_plain_password=aws_secret["kafka"]["password"],
29)
30avro = Avro(
31 registry="confluent",
32 url=os.environ["SCHEMA_REGISTRY_URL"],
33 username=aws_secret["schema_registry"]["username"],
34 password=aws_secret["schema_registry"]["password"],
35)
36
37@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
38@dataset
39class SomeDataset:
40 uid: int = field(key=True)
41 email: str
42 timestamp: datetime
python
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "VisualEditor0",
6 "Effect": "Allow",
7 "Action": [
8 "secretsmanager:GetResourcePolicy",
9 "secretsmanager:GetSecretValue",
10 "secretsmanager:DescribeSecret",
11 "secretsmanager:ListSecretVersionIds"
12 ],
13 "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-secret-name-I4hSKr"
14 }
15 ]
16}
JSON
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Effect": "Allow",
6 "Principal": {
7 "AWS": [
8 "arn:aws:iam::123456789012:role/FennelDataAccessRole"
9 ]
10 },
11 "Action": "sts:AssumeRole"
12 }
13 ]
14}
JSON
Snowflake
Data connector to Snowflake databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
Snowflake account identifier. This is the first part of the URL used to access
Snowflake. For example, if the URL is https://<account>.snowflakecomputing.com
,
then the account is <account>
.
This is usually of the form <ORG_ID>-<ACCOUNT_ID>
. Refer to the
Snowflake documentation
to find the account identifier.
The role that should be used by Fennel to access Snowflake.
The warehouse that should be used to access Snowflake.
The name of the database where the relevant data resides.
The schema where the required data table(s) resides.
The username which should be used to access Snowflake. This username should
have required permissions to assume the provided role
.
The password associated with the username.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
1from fennel.connectors import source, Snowflake
2from fennel.datasets import dataset
3
4snowflake = Snowflake(
5 name="my_snowflake",
6 account="VPECCVJ-MUB03765",
7 warehouse="TEST",
8 db_name=os.environ["DB_NAME"],
9 schema="PUBLIC",
10 role="ACCOUNTADMIN",
11 username=os.environ["SNOWFLAKE_USERNAME"],
12 password=os.environ["SNOWFLAKE_PASSWORD"],
13)
14
15table = snowflake.table("User", cursor="timestamp")
16
17@source(table, disorder="14d", cdc="append")
18@dataset
19class UserClick:
20 uid: int
21 ad_id: int
22 timestamp: datetime
python
Errors
Fennel tries to test the connection with your Snowflake during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Snowflake is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Webhook
A push-based data connector, making it convenient for sending arbitrary JSON data to Fennel. Data can be pushed to a webhook endpoint either via the REST API or via the Python SDK.
Source Parameters
A name to identify the source. This name should be unique across all Fennel sources.
Default: 14d
Data sent to webhook is buffered for the duration retention
. That is, if the
data has been logged to a webhook, datasets defined later that source from this
webhook will still see that data until this duration.
Connector Parameters
The endpoint for the given webhook to which the data will be sent.
A single webhook could be visualized as a single Kafka cluster with each endpoint being somewhat analogous to a topic. A single webhook source can have as many endpoints as required.
Multiple datasets could be reading from the same webhook endpoint - in which case, they all get the exact same data.
1from fennel.connectors import source, Webhook
2from fennel.datasets import dataset, field
3
4webhook = Webhook(name="prod_webhook", retention="14d")
5
6@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
7@dataset
8class User:
9 uid: int = field(key=True)
10 email: str
11 timestamp: datetime
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16 txid: int
17 uid: int
18 amount: float
19 timestamp: datetime
python
1df = pd.DataFrame(
2 {
3 "uid": [1, 2, 3],
4 "email": ["[email protected]", "[email protected]", "[email protected]"],
5 "timestamp": [
6 datetime.now(timezone.utc),
7 datetime.now(timezone.utc),
8 datetime.now(timezone.utc),
9 ],
10 }
11)
12client.log("prod_webhook", "User", df)
python
1import requests
2
3url = "{}/api/v1/log".format(os.environ["FENNEL_SERVER_URL"])
4headers = {"Content-Type": "application/json"}
5data = [
6 {
7 "uid": 1,
8 "email": "[email protected]",
9 "timestamp": 1614556800,
10 },
11 {
12 "uid": 2,
13 "email": "[email protected]",
14 "timestamp": 1614556800,
15 },
16]
17req = {
18 "webhook": "prod_webhook",
19 "endpoint": "User",
20 "data": data,
21}
22requests.post(url, headers=headers, data=req)
python
Errors
Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Unlike all other sources, Webhook does work with mock client. As a result, it's very effective for quick prototyping and unit testing.
Certificate
Certificate to be used for HTTP-based authentication
Parameters
CA Certificate required for client to authenticate the server
1from fennel.connectors import sink, HTTP, Certificate
2from fennel.integrations.aws import Secret
3
4aws_secret = Secret(
5 arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
6 role_arn="arn:aws:iam::123456789012:role/secret-access-role",
7)
8
9http = HTTP(
10 name="http",
11 host="http://http-echo-server.harsha.svc.cluster.local:8081/",
12 healthz="/health",
13 ca_cert=Certificate(aws_secret["ca_cert"]),
14)
15
16@dataset
17@sink(
18 http.path(endpoint="/sink", limit=1000, headers={"Foo": "Bar"}),
19 cdc="debezium",
20 how="incremental",
21)
22class SomeDatasetFiltered:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(SomeDataset)
29 def gmail_filtered(cls, dataset: Dataset):
30 return dataset.filter(
31 lambda row: row["email"].contains("gmail.com")
32 )
python
HTTP
Data sink to HTTP endpoints.
Connector Parameters
A name to identify the sink. The name should be unique across all Fennel connectors.
The HTTP host URL. Example: https://127.0.0.1:8081
The health check endpoint to verify the server's availability.
Parameter for certificate-based authentication
HTTP Path Parameters
The specific endpoint where data will be sent
The number of records to include in each request to the endpoint. Default: 100
A map of headers to include with each request
1from fennel.connectors import sink, HTTP, Certificate
2from fennel.integrations.aws import Secret
3
4aws_secret = Secret(
5 arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
6 role_arn="arn:aws:iam::123456789012:role/secret-access-role",
7)
8
9http = HTTP(
10 name="http",
11 host="http://http-echo-server.harsha.svc.cluster.local:8081/",
12 healthz="/health",
13 ca_cert=Certificate(aws_secret["ca_cert"]),
14)
15
16@dataset
17@sink(
18 http.path(endpoint="/sink", limit=1000, headers={"Foo": "Bar"}),
19 cdc="debezium",
20 how="incremental",
21)
22class SomeDatasetFiltered:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(SomeDataset)
29 def gmail_filtered(cls, dataset: Dataset):
30 return dataset.filter(
31 lambda row: row["email"].contains("gmail.com")
32 )
python
Errors
Fennel tries to test the connection with your HTTP sink during commit
itself
using the health check endpoint
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
- HTTP sink ensures at least once delivery. To handle duplicates, use
["payload"]["source"]["fennel"]["partition"]
and["payload"]["source"]["fennel"]["offset"]
fields in the output.
Kafka
Data sink to any data store that speaks the Kafka protocol (e.g. Native Kafka, MSK, Redpanda etc.)
Cluster Parameters
A name to identify the sink. This name should be unique across ALL connectors.
This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself and discover the rest of the brokers in the cluster.
Addresses are written as host & port pairs and can be specified either as a
single server (e.g. localhost:9092
) or a comma separated list of several
servers (e.g. localhost:9092,another.host:9092
).
Protocol used to communicate with the brokers.
SASL mechanism to use for authentication.
SASL username.
SASL password.
Topic Parameters
The name of the kafka topic that needs to be sinked.
1from fennel.connectors import sink
2
3@dataset
4@sink(kafka.topic("gmail_filtered"), cdc="debezium")
5class SomeDatasetFiltered:
6 uid: int = field(key=True)
7 email: str
8 timestamp: datetime
9
10 @pipeline
11 @inputs(SomeDataset)
12 def gmail_filtered(cls, dataset: Dataset):
13 return dataset.filter(
14 lambda row: row["email"].contains("gmail.com")
15 )
python
Errors
Fennel server tries to connect with the Kafka broker during the commit
operation
itself to validate connectivity - as a result, incorrect URL/Username/Password
etc will be caught at commit time itself as an error.
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
- Fennel supports kafka sink with only the JSON debezium format. Given the ubiquity of debezium connectors, you should be able to further pipe this debezium data from Kafka to your data store of choice. In case you require support for other formats, please reach out to Fennel support.
S3
Data connector to sink data to S3.
Account Parameters
A name to identify the sink. The name should be unique across all Fennel connectors.
Default: None
AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.
Default: None
AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.
Default: None
Role ARN to assume to get access to S3. This field is not required if role-based access is used or if AWS access and secret keys are used or if the bucket is public.
Bucket Parameters
The name of the S3 bucket where the data files have to be sinked.
Default: None
The prefix of the bucket (as relative path within bucket) where the data files
should be sinked. For instance, some-folder/
or A/B/C
are all valid prefixes. Prefix
can not have any wildcard characters.
Default: ,
The character delimiting individual cells in the CSV data - only relevant when
format is CSV
, otherwise it's ignored.
The default value is ","
can be overridden by any other 1-character string. For
example, to use tab-delimited data enter "\t"
.
1from fennel.connectors import sink
2
3@dataset
4@sink(
5 s3.bucket("datalake", prefix="user", format="delta"),
6 every="1d",
7 how="incremental",
8 renames={"uid": "new_uid"},
9)
10class SomeDatasetFiltered:
11 uid: int = field(key=True)
12 email: str
13 timestamp: datetime
14
15 @pipeline
16 @inputs(SomeDataset)
17 def gmail_filtered(cls, dataset: Dataset):
18 return dataset.filter(
19 lambda row: row["email"].contains("gmail.com")
20 )
python
Errors
Fennel server try to do some lightweight operations on the bucket during the commit operation - all connectivity or authentication related errors should be caught during the commit itself.
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
Enabling IAM Access
Fennel creates a role with name prefixed by FennelDataAccessRole-
in
your AWS account for role-based access. In order to use IAM access for S3, please
ensure that this role has access to read and do list files on the buckets of
interest.
With that ready, simply don't specify aws_access_key_id
and
aws_secret_access_key
and Fennel will automatically fall back to IAM based
access.
In case you do not want to provide S3 access to FennelDataAccessRole-
, pass role_arn
parameter inside connector params and make sure FennelDataAccessRole-
can assume that IAM role
- Fennel appends a generated suffix to the above provided prefix to avoid ambiguities when the sink dataset version is updated or when multiple branches have the same sink defined. This suffix can be found in the 'Sink' tab of the console after initiating a data sink.
- A keyless dataset sink ensures at least once delivery, while a keyed dataset sink guarantees
exactly once delivery. For keyless datasets, use the
__fennel_hash__
column to identify and filter out duplicate deliveries. - Fennel supports S3 sink with only delta format and access to S3 through
FennelDataAccessRole-
. In case you require support for other formats or access mechanisms, please reach out to Fennel support
Snowflake
Data sink to Snowflake databases.
Database Parameters
A name to identify the sink. The name should be unique across all Fennel connectors.
Snowflake account identifier. This is the first part of the URL used to access
Snowflake. For example, if the URL is https://<account>.snowflakecomputing.com
,
then the account is <account>
.
This is usually of the form <ORG_ID>-<ACCOUNT_ID>
. Refer to the
Snowflake documentation
to find the account identifier.
The role that should be used by Fennel to access Snowflake.
The warehouse that should be used to access Snowflake.
The name of the database where the data has to be sinked.
The schema where the required data has to be sinked.
The username which should be used to access Snowflake. This username should
have required permissions to assume the provided role
.
The password associated with the username.
Table Parameters
The prefix of the table within the database to which the data should be sinked.
1from fennel.connectors import sink
2
3@dataset
4@sink(
5 snowflake.table("test_table"),
6 every="1d",
7 how="incremental",
8 renames={"uid": "new_uid"},
9)
10class SomeDatasetFiltered:
11 uid: int = field(key=True)
12 email: str
13 timestamp: datetime
14
15 @pipeline
16 @inputs(SomeDataset)
17 def gmail_filtered(cls, dataset: Dataset):
18 return dataset.filter(
19 lambda row: row["email"].contains("gmail.com")
20 )
python
Errors
Fennel tries to test the connection with your Snowflake during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
- Fennel appends a generated suffix to the provided table name to prevent ambiguities when the sink dataset version is updated or when multiple branches have the same sink defined. This suffix can be viewed in the 'Sink' tab of the console after initiating a data sink.
- A keyless dataset sink ensures at least once delivery, while a keyed dataset sink guarantees
exactly once delivery. For keyless datasets, use the
__fennel_hash__
column to identify and filter out duplicate deliveries.
Average
Aggregation to computes a rolling average for each group within a window.
Parameters
Name of the field in the input dataset over which the average should be computed.
This field must either be of type int
or float
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type float
.
Average over an empty set of rows isn't well defined - Fennel returns default
in such cases.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Average,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amt: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 avg_1d: float
25 avg_1w: float
26 timestamp: datetime
27
28 @pipeline
29 @inputs(Transaction)
30 def avg_pipeline(cls, ds: Dataset):
31 return ds.groupby("uid").aggregate(
32 avg_1d=Average(of="amt", window=Continuous("1d"), default=-1.0),
33 avg_1w=Average(of="amt", window=Continuous("1w"), default=-1.0),
34 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must either be of int
or float
types.
Note that unlike SQL, even aggregations over Optional[int]
or Optional[float]
aren't allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should both be float
.
1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 zip: str
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 avg_1d: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 avg_1d=Average(
26 of="zip", window=Continuous("1d"), default="avg"
27 ),
28 )
python
1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 # output of avg is always float
19 ret: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def invalid_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 ret=Average(of="amt", window=Continuous("1d"), default=1.0),
27 )
python
Count
Aggregation to compute a rolling count for each group within a window.
Parameters
The continuous window within which something need to be counted. Possible values
are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
.
Default: False
If set to True, the aggregation counts the number of unique values of the field
given by of
(aka COUNT DISTINCT
in SQL).
Default: False
If set to True, the count isn't exact but only an approximation. This field must
be set to True if and only if unique
is set to True.
Fennel uses hyperloglog data structure to compute unique approximate counts and in practice, the count is exact for small counts.
Name of the field in the input dataset which should be used for unique
. Only
relevant when unique
is set to True.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Count,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 vendor: str
19 amount: int
20 timestamp: datetime
21
22@dataset(index=True)
23class Aggregated:
24 uid: int = field(key=True)
25 num_transactions: int
26 unique_vendors_1w: int
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def count_pipeline(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 num_transactions=Count(window=Continuous("forever")),
34 unique_vendors_1w=Count(
35 of="vendor",
36 unique=True,
37 approx=True,
38 window=Continuous("1w"),
39 ),
40 )
python
Returns
Accumulates the count in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0.
Errors
The input column denoted by of
must have a hashable type in order to build a
hyperloglog. For instance, float
or types built on float
aren't allowed.
As of right now, it's a commit error to try to compute unique count without setting
approx
to True.
Maintaining unique counts is substantially more costly than maintaining non-unique counts so use it only when truly needed.
Distinct
Aggregation to computes a set of distinct values for each group within a window.
Parameters
Name of the field in the input dataset over which the distinct set should be computed. This field must be of any hashable type (e.g. floats aren't allowed)
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type List[T]
where T
is the type
of the field denoted by of
.
If set to True, the list is sorted by natural comparison order. However, as of
right now, this must be set to False since ordered
mode isn't supported yet.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Distinct,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 amounts: List[int]
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def distinct_pipeline(cls, ds: Dataset):
30 return ds.groupby("uid").aggregate(
31 amounts=Distinct(
32 of="amount",
33 window=Continuous("1d"),
34 unordered=True,
35 ),
36 )
python
Returns
Stores the result of the aggregation in the appropriate column of the output
dataset which must be of type List[T]
where T
is the type of the input column.
Errors
Distinct operator is a lot like building a hashmap - for it to be valid, the
underlying data must be hashable. Types like float
(or any other complex type
built using float
) aren't hashable - so a commit error is raised.
Storing the full set of distinct values can get costly so it's recommended to use
Distinct
only for sets of small cardinality (say < 100)
1from fennel.datasets import dataset, field, pipeline, Dataset, Distinct
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset
15class Aggregated:
16 uid: int = field(key=True)
17 amounts: int # should be List[int]
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").aggregate(
24 amounts=Distinct(
25 of="amount",
26 limit=10,
27 unordered=True,
28 ),
29 )
python
LastK
Aggregation to compute a rolling list of the latest values for each group within a window.
Parameters
Name of the field in the input dataset over which the aggregation should be computed.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type List[T]
where T
is the type
of the field denoted by of
.
Since storing all the values for a group can get costly, LastK expects a
limit
to be specified which denotes the maximum size of the list that should
be maintained at any point.
If set to True, only distinct values are stored else values stored in the last can have duplicates too.
1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def lastk_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=LastK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 window=Continuous("1d"),
30 ),
31 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output dataset.
Errors
The column denoted by into_field
in the output dataset must be of type List[T]
where T is the type of the column denoted by of
in the input dataset. Commit error
is raised if this is not the case.
Storing the full set of values and maintaining order between them can get costly, so use this aggregation only when needed.
1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: int # should be List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def bad_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=LastK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 window=Continuous("1d"),
30 ),
31 )
python
FirstK
Aggregation to compute a rolling list of the earliest values for each group within a window.
Parameters
Name of the field in the input dataset over which the aggregation should be computed.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type List[T]
where T
is the type
of the field denoted by of
.
Since storing all the values for a group can get costly, FirstK expects a
limit
to be specified which denotes the maximum size of the list that should
be maintained at any point.
If set to True, only distinct values are stored else values stored in the first can have duplicates too.
If set to True, None values are dropped from the result. It expects of
field
to be of type Optional[T]
and into_field
gets the type List[T]
.
1from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def firstk_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=FirstK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 window=Continuous("1d"),
30 dropnull=False,
31 ),
32 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output dataset.
Errors
The column denoted by into_field
in the output dataset must be of type List[T]
where T is the type of the column denoted by of
in the input dataset. Commit error
is raised if this is not the case.
Storing the full set of values and maintaining order between them can get costly, so use this aggregation only when needed.
1from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: int # should be List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def bad_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=FirstK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 window=Continuous("1d"),
30 dropnull=False,
31 ),
32 )
python
Max
Aggregation to computes a rolling max for each group within a window.
Parameters
Name of the field in the input dataset over which the max should be computed.
This field must either be of type int
, float
, date
or datetime
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
, float
, date
or
datetime
- same as the type of the field in the input dataset corresponding to
of
.
Max over an empty set of rows isn't well defined - Fennel returns default
in such cases. The type of default
must be same as that of of
in the input
dataset.
1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 max_1d: float
19 max_1w: float
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def def_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 max_1d=Max(of="amt", window=Continuous("1d"), default=-1.0),
27 max_1w=Max(of="amt", window=Continuous("1w"), default=-1.0),
28 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must be of int
, float
, date
or datetime
types.
Note that unlike SQL, even aggregations over Optional[int]
or Optional[float]
aren't allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should be same as that of the field field denoted by of
in the
input dataset.
1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 zip: str
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 max: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def def_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 max=Max(of="zip", window=Continuous("1d"), default="max"),
26 )
python
1from fennel.datasets import dataset, field, Dataset, Max, pipeline
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 max_1d: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def def_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 max_1d=Max(of="amt", window=Continuous("1d"), default=1),
26 )
python
Min
Aggregation to computes a rolling min for each group within a window.
Parameters
Name of the field in the input dataset over which the min should be computed.
This field must either be of type int
, float
, date
or datetime
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
, float
, date
or
datetime
- same as the type of the field in the input dataset corresponding to
of
.
Min over an empty set of rows isn't well defined - Fennel returns default
in such cases. The type of default
must be same as that of of
in the input
dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 min_1d: float
19 min_1w: float
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def min_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 min_1d=Min(of="amt", window=Continuous("1d"), default=-1.0),
27 min_1w=Min(of="amt", window=Continuous("1w"), default=-1.0),
28 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must be of int
, float
, date
or datetime
types.
Note that unlike SQL, even aggregations over Optional[int]
or Optional[float]
aren't allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should be same as that of the field field denoted by of
in the
input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 zip: str
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 min: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 min=Min(of="zip", window=Continuous("1d"), default="min"),
26 )
python
1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 min_1d: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 min_1d=Min(of="amt", window=Continuous("1d"), default=1),
26 )
python
Stddev
Aggregation to computes a rolling standard deviation for each group within a window.
Parameters
Name of the field in the input dataset over which the aggregation should be
computed. This field must either be of type int
or float
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type float
.
Standard deviation over an empty set of rows isn't well defined - Fennel
returns default
in such cases.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Average,
7 Stddev,
8)
9from fennel.dtypes import Continuous
10from fennel.lib import inputs
11from fennel.connectors import source, Webhook
12
13webhook = Webhook(name="webhook")
14
15@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
16@dataset
17class Transaction:
18 uid: int
19 amt: int
20 timestamp: datetime
21
22@dataset(index=True)
23class Aggregated:
24 uid: int = field(key=True)
25 mean: float
26 stddev: float
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def stddev_pipeline(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 mean=Average(of="amt", window=Continuous("1d"), default=-1.0),
34 stddev=Stddev(of="amt", window=Continuous("1d"), default=-1.0),
35 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must either be of int
or float
types.
Note that unlike SQL, even aggregations over Optional[int]
or Optional[float]
aren't allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should both be float
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Stddev,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 zip: str
19 timestamp: datetime
20
21@dataset
22class Aggregated:
23 uid: int = field(key=True)
24 var: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def invalid_pipeline(cls, ds: Dataset):
30 return ds.groupby("uid").aggregate(
31 var=Stddev(of="zip", window=Continuous("1d"), default="x"),
32 )
python
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Stddev,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amt: float
19 timestamp: datetime
20
21@dataset
22class Aggregated:
23 uid: int = field(key=True)
24 ret: int
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def invalid_pipeline(cls, ds: Dataset):
30 return ds.groupby("uid").aggregate(
31 ret=Stddev(of="amt", window=Continuous("1d"), default=1.0),
32 )
python
Sum
Aggregation to compute a rolling sum for each group within a window.
Parameters
Name of the field in the input dataset over which the sum should be computed.
This field can only either be int
or `float.
The continuous window within which something need to be counted. Possible values
are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
or float
- same as the
type of the field in the input dataset corresponding to of
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Sum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 # new int fields added to the dataset by the count aggregation
25 amount_1w: int
26 total: int
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def sum_pipeline(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 amount_1w=Sum(of="amount", window=Continuous("1w")),
34 total=Sum(of="amount", window=Continuous("forever")),
35 )
python
Returns
Accumulates the count in the appropriate field of the output dataset. If there
are no rows to count, by default, it returns 0 (or 0.0 if of
is float).
Errors
The input column denoted by of
must either be of int
or float
types.
Note that unlike SQL, even aggregations over Optional[int]
or Optional[float]
aren't allowed.
1from fennel.datasets import dataset, field, pipeline, Dataset, Sum
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(
9 webhook.endpoint("Transaction"), disorder="14d", cdc="append"
10)
11@dataset
12class Transaction:
13 uid: int
14 amount: str
15 vendor: str
16 timestamp: datetime
17
18@dataset
19class Aggregated:
20 uid: int = field(key=True)
21 total: int
22 timestamp: datetime
23
24 @pipeline
25 @inputs(Transaction)
26 def bad_pipeline(cls, ds: Dataset):
27 return ds.groupby("uid").aggregate(
28 total=Sum(of="vendor", window=Continuous("forever")),
29 )
python
Quantile
Aggregation to compute rolling quantiles (aka percentiles) for each group within a window.
Parameters
Name of the field in the input dataset over which the quantile should be computed.
This field must either be of type int
or float
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type Optional[float]
unless default
is provided, in which case, it is expected to be of type float
.
Quantile over an empty set of rows isn't well defined - Fennel returns default
in such cases. If the default is not set or is None, Fennel returns None and in
that case, the expected type of into_field
must be Optional[float]
.
The percentile (between 0 and 1) to be calculated.
Default: False
If set to True, the calculated value isn't exact but only an approximation. Fennel only supports approximate quantiles for now so this kwarg must always be set to True.
Fennel uses uDDsketch data structure to compute approximate quantiles with an error bound set to be within 1% of the true value.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Quantile,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 # new float fields added to the dataset by the quantile aggregation
25 median_amount_1w: float
26 timestamp: datetime
27
28 @pipeline
29 @inputs(Transaction)
30 def quantil_pipeline(cls, ds: Dataset):
31 return ds.groupby("uid").aggregate(
32 median_amount_1w=Quantile(
33 of="amount",
34 window=Continuous("1w"),
35 p=0.5,
36 approx=True,
37 default=0.0,
38 ),
39 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must either be of int
or float
types.
Note that unlike SQL, even aggregations over Optional[int]
or Optional[float]
aren't allowed.
The type of the field denoted by into_field
in the output dataset should match
the default
. If default
is set and not None, the field should be float
else
it should be Optional[float]
.
Commit error if the value of p
is not between 0 and 1.
Commit error if approx
is not set to True. Fennel only supports approximate
quantiles for now but requires this kwarg to be set explicitly to both set the
right expectations and be compatible with future addition of exact quantiles.
1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12 uid: int
13 amount: str
14 timestamp: datetime
15
16@dataset
17class Aggregated:
18 uid: int = field(key=True)
19 median_amount_1w: float
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def bad_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 median_amount_1w=Quantile(
27 of="amount",
28 window=Continuous("1w"),
29 p=0.5,
30 approx=True,
31 default=0.0,
32 ),
33 )
python
1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12 uid: int
13 amount: int
14 vendor: str
15 timestamp: datetime
16
17@dataset
18class Aggregated:
19 uid: int = field(key=True)
20 median_amount_1w: float
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def bad_pipeline(cls, ds: Dataset):
26 return ds.groupby("uid").aggregate(
27 median_amount_1w=Quantile(
28 of="amount",
29 window=Continuous("1w"),
30 p=0.5,
31 approx=True,
32 ),
33 )
python
1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12 uid: int
13 amount: int
14 vendor: str
15 timestamp: datetime
16
17@dataset
18class Aggregated:
19 uid: int = field(key=True)
20 median_amount_1w: Optional[float]
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def bad_pipeline(cls, ds: Dataset):
26 return ds.groupby("uid").aggregate(
27 median_amount_1w=Quantile(
28 of="amount",
29 window=Continuous("1w"),
30 p=10.0,
31 approx=True,
32 ),
33 )
python
Exponential Decay Sum
Aggregation to compute a rolling exponential decay for each group within a window.
Parameters
Name of the field in the input dataset over which the decayed sum should be computed.
This field can only either be int
or `float.
The continuous window within which something need to be counted. Possible values
are "forever"
or any time duration.
Half-life of the exponential decay. This is the time it takes for the value to
decay to half of its original value. The value of half_life
must be greater than
0. The value is of type duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
or float
- same as the
type of the field in the input dataset corresponding to of
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 ExpDecaySum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 # new int fields added to the dataset by the aggregation
25 amount_1w: float
26 total: float
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def exp_decay_sum(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 amount_1w=ExpDecaySum(
34 of="amount", window=Continuous("1w"), half_life="1d"
35 ),
36 total=ExpDecaySum(
37 of="amount",
38 window=Continuous("forever"),
39 half_life="1w",
40 ),
41 )
python
Returns
Accumulates the result in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0.0
Errors
The input column denoted by of
must either be of int
or float
types.
The output field denoted by into_field
must always be of type float
.
Note that unlike SQL, even aggregations over Optional[int]
or Optional[float]
aren't allowed.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 ExpDecaySum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(
15 webhook.endpoint("Transaction"), disorder="14d", cdc="append"
16)
17@dataset
18class Transaction:
19 uid: int
20 amount: str
21 vendor: str
22 timestamp: datetime
23
24@dataset
25class Aggregated:
26 uid: int = field(key=True)
27 total: int
28 timestamp: datetime
29
30 @pipeline
31 @inputs(Transaction)
32 def bad_pipeline(cls, ds: Dataset):
33 return ds.groupby("uid").aggregate(
34 total=ExpDecaySum(
35 of="amount",
36 window=Continuous("forever"),
37 half_life="1w",
38 ),
39 )
python
The value of the decayed sum depends on the time you query the dataset. Hence it varies with request time for the same key in a dataset. Therefore pipelines containing this aggregation are terminal - no other operator can follow it.
Binary Operations
Standard binary operations - arithmetic operations (+
, -
, *
, /
, %
),
relational operations (==
, !=
, >
, >=
, <
, >=
) and logical operations
(&
, |
). Note that logical and
is represented as &
and logical or
is
represented as |
. Bitwise operations aren`t yet supported.
Typing Rules
- All arithmetic operations and all comparison operations (i.e.
>
,>=
,<
<=
) are only permitted on numerical data - ints/floats or options of ints/floats. In such cases, if even one of the operands is of type float, the whole expression is promoted to be of type of float. - Logical operations
&
,|
are only permitted on boolean data. - If even one of the operands is optional, the whole expression is promoted to optional type.
- None, like many SQL dialects, is interpreted as some unknown value. As a
result,
x
+ None is None for allx
. None values are 'viral' in the sense that they usually make value of the whole expression None. Some notable exceptions :False & None
is still False andTrue | None
is still True irrespective of the unknown value that None represents.
1import pandas as pd
2from fennel.expr import lit, col
3
4expr = col("x") + col("y")
5assert expr.typeof(schema={"x": int, "y": int}) == int
6assert expr.typeof(schema={"x": int, "y": float}) == float
7assert expr.typeof(schema={"x": float, "y": float}) == float
8assert (
9 expr.typeof(schema={"x": Optional[float], "y": int}) == Optional[float]
10)
11
12df = pd.DataFrame({"x": [1, 2, None]})
13expr = lit(1) + col("x")
14assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [2, 3, pd.NA]
15
16expr = lit(1) - col("x")
17assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [0, -1, pd.NA]
18
19expr = lit(1) * col("x")
20assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [1, 2, pd.NA]
21
22expr = lit(1) / col("x")
23assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [
24 1,
25 0.5,
26 pd.NA,
27]
python
Col
Function to reference existing columns in the dataframe.
Parameters
The name of the column being referenced. In the case of pipelines, this will typically be the name of the field and in the case of extractors, this will be the name of the feature.
1from fennel.expr import col
2
3expr = col("x") + col("y")
4
5# type of col("x") + col("y") changes based on the type of 'x' and 'y'
6assert expr.typeof(schema={"x": int, "y": float}) == float
7
8# okay if additional columns are provided
9assert expr.typeof(schema={"x": int, "y": float, "z": str}) == float
10
11# raises an error if the schema is not provided
12with pytest.raises(ValueError):
13 expr.typeof(schema={})
14with pytest.raises(ValueError):
15 expr.typeof(schema={"x": int})
16with pytest.raises(ValueError):
17 expr.typeof(schema={"z": int, "y": float})
18
19# can be evaluated with a dataframe
20import pandas as pd
21
22df = pd.DataFrame({"x": [1, 2, 3], "y": [1.0, 2.0, 3.0]})
23assert expr.eval(df, schema={"x": int, "y": float}).tolist() == [
24 2.0,
25 4.0,
26 6.0,
27]
python
Returns
Returns an expression object denoting a reference to the column. The type of the resulting expression is same as that of the referenced column. When evaluated in the context of a dataframe, the value of the expression is same as the value of the dataframe column of that name.
Errors
Error during typeof
or eval
if the referenced column isn't present.
Datetime
Function to get a constant datetime object from its constituent parts.
Parameters
The year of the datetime. Note that this must be an integer, not an expression denoting an integer.
The month of the datetime. Note that this must be an integer, not an expression denoting an integer.
The day of the datetime. Note that this must be an integer, not an expression denoting an integer.
Default: 0
The hour of the datetime. Note that this must be an integer, not an expression denoting an integer.
Default: 0
The minute of the datetime. Note that this must be an integer, not an expression denoting an integer.
Default: 0
The second of the datetime. Note that this must be an integer, not an expression denoting an integer.
Default: 0
The millisecond of the datetime. Note that this must be an integer, not an expression denoting an integer.
Default: 0
The microsecond of the datetime. Note that this must be an integer, not an expression denoting an integer.
Default: UTC
The timezone of the datetime. Note that this must be a string denoting a valid timezone, not an expression denoting a string.
Returns
Returns an expression object denoting the datetime object.
1from fennel.expr import datetime as dt
2
3expr = dt(year=2024, month=1, day=1)
4
5# datetime works for any datetime type or optional datetime type
6assert expr.typeof() == datetime
7
8# can be evaluated with a dataframe
9df = pd.DataFrame({"dummy": [1, 2, 3]})
10assert expr.eval(df, schema={"dummy": int}).tolist() == [
11 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
12 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14]
15# can provide timezone
16expr = dt(year=2024, month=1, day=1, timezone="US/Eastern")
17assert expr.eval(df, schema={"dummy": int}).tolist() == [
18 pd.Timestamp("2024-01-01 00:00:00", tz="US/Eastern"),
19 pd.Timestamp("2024-01-01 00:00:00", tz="US/Eastern"),
20 pd.Timestamp("2024-01-01 00:00:00", tz="US/Eastern"),
21]
python
Errors
The month must be between 1 and 12, the day must be between 1 and 31, the hour must be between 0 and 23, the minute must be between 0 and 59, the second must be between 0 and 59, the millisecond must be between 0 and 999, and the microsecond must be between 0 and 999.
Timezone, if provided, must be a valid timezone string. Note that Fennel only supports area/location based timezones (e.g. "America/New_York"), not fixed offsets (e.g. "+05:30" or "UTC+05:30").
Eval
Helper function to evaluate the value of an expression in the context of a schema and a dataframe.
Parameters
The dataframe for which the expression is evaluated - one value is produced for each row in the dataframe.
The schema of the context under which the expression is to be evaluated. In the case of pipelines, this will be the schema of the input dataset and in the case of extractors, this will be the schema of the featureset.
1import pandas as pd
2from fennel.expr import lit, col
3
4expr = lit(1) + col("amount")
5# value of 1 + col('amount') changes based on the type of 'amount'
6df = pd.DataFrame({"amount": [1, 2, 3]})
7assert expr.eval(df, schema={"amount": int}).tolist() == [2, 3, 4]
8
9df = pd.DataFrame({"amount": [1.0, 2.0, 3.0]})
10assert expr.eval(df, schema={"amount": float}).tolist() == [2.0, 3.0, 4.0]
11
12# raises an error if the schema is not provided
13with pytest.raises(TypeError):
14 expr.eval(df)
15
16# dataframe doesn't have the required column even though schema is provided
17df = pd.DataFrame({"other": [1, 2, 3]})
18with pytest.raises(Exception):
19 expr.eval(df, schema={"amount": int})
python
Returns
Returns a series object of the same length as the number of rows in the input dataframe.
Errors
All columns referenced by col
expression must be present in both the
dataframe and the schema.
The expression should be valid in the context of the given schema.
Some expressions may produce a runtime error e.g. trying to parse an integer out of a string may throw an error if the string doesn't represent an integer.
From Epoch
Function to get a datetime object from a unix timestamp.
Parameters
The duration (in units as specified by unit
) since epoch to convert to a datetime
in the form of an expression denoting an integer.
Default: second
The unit of the duration
parameter. Can be one of second
, millisecond
,
or microsecond
. Defaults to second
.
Returns
Returns an expression object denoting the datetime object.
1from fennel.expr import col, from_epoch
2
3expr = from_epoch(col("x"), unit="second")
4
5# from_epoch works for any int or optional int type
6assert expr.typeof(schema={"x": int}) == datetime
7assert expr.typeof(schema={"x": Optional[int]}) == Optional[datetime]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame({"x": [1714857600, 1714857601, 1714857602]})
11schema = {"x": int}
12expected = [
13 pd.Timestamp("2024-05-04 21:20:00", tz="UTC"),
14 pd.Timestamp("2024-05-04 21:20:01", tz="UTC"),
15 pd.Timestamp("2024-05-04 21:20:02", tz="UTC"),
16]
17assert expr.eval(df, schema=schema).tolist() == expected
python
Is Null
Expression equivalent to: x is null
Parameters
The expression that will be checked for nullness.
1from fennel.expr import col
2
3expr = col("x").isnull()
4
5# type of isnull is always boolean
6assert expr.typeof(schema={"x": Optional[int]}) == bool
7
8# also works for non-optional types, where it's always False
9assert expr.typeof(schema={"x": float}) == bool
10
11# raises an error if the schema is not provided
12with pytest.raises(ValueError):
13 expr.typeof(schema={})
14
15# can be evaluated with a dataframe
16import pandas as pd
17
18df = pd.DataFrame({"x": pd.Series([1, 2, None], dtype=pd.Int64Dtype())})
19assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [
20 False,
21 False,
22 True,
23]
python
Returns
Returns an expression object denoting the output of isnull
expression. Always
evaluates to boolean.
Fill Null
The expression that is analogous to fillna
in Pandas.
Parameters
The expression that will be checked for nullness.
The expression that will be substituted in case expr
turns out to be null.
1from fennel.expr import col, lit
2
3expr = col("x").fillnull(lit(10))
4
5# type of fillnull depends both on type of 'x' and the literal 1
6assert expr.typeof(schema={"x": Optional[int]}) == int
7assert expr.typeof(schema={"x": float}) == float
8
9# raises an error if the schema is not provided
10with pytest.raises(ValueError):
11 expr.typeof(schema={})
12
13# can be evaluated with a dataframe
14import pandas as pd
15
16expr = col("x").fillnull(lit(10))
17df = pd.DataFrame({"x": pd.Series([1, 2, None], dtype=pd.Int64Dtype())})
18assert expr.eval(df, schema={"x": Optional[float]}).tolist() == [
19 1.0,
20 2.0,
21 10.0,
22]
python
Returns
Returns an expression object denoting the output of fillnull
expression.
If the expr
is of type Optional[T1]
and the fill
is of type T2
, the
type of the output expression is the smallest type that both T1
and T2
can
be promoted into.
If the expr
is not optional but is of type T, the output is trivially same as
expr
and hence is also of type T
.
Lit
Fennel's way of describing constants, similar to lit
in Polars or Spark.
Parameters
The literal/constant Python object that is to be used as an expression in Fennel.
This can be used to construct literals of ints, floats, strings, boolean, lists,
structs etc. Notably though, it's not possible to use lit
to build datetime
literals.
1from fennel.expr import lit, col
2
3expr = lit(1)
4
5# lits don't need a schema to be evaluated
6assert expr.typeof() == int
7
8# can be evaluated with a dataframe
9expr = col("x") + lit(1)
10df = pd.DataFrame({"x": pd.Series([1, 2, None], dtype=pd.Int64Dtype())})
11assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [2, 3, pd.NA]
python
Returns
The expression that denotes the literal value.
Not
Logical not unary operator, invoked by ~
symbol.
1from fennel.expr import lit
2
3expr = ~lit(True)
4assert expr.typeof() == bool
5
6# can be evaluated with a dataframe
7df = pd.DataFrame({"x": [1, 2, 3]})
8assert expr.eval(df, schema={"x": int}).tolist() == [False, False, False]
python
Now
Function to get current timestamp, similar to what datetime.now
does in Python.
1from fennel.expr import now, col
2
3expr = now().dt.since(col("birthdate"), "year")
4
5assert (
6 expr.typeof(schema={"birthdate": Optional[datetime]}) == Optional[int]
7)
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {"birthdate": [datetime(1997, 12, 24), datetime(2001, 1, 21), None]}
12)
13assert expr.eval(df, schema={"birthdate": Optional[datetime]}).tolist() == [
14 26,
15 23,
16 pd.NA,
17]
python
Returns
Returns an expression object denoting a reference to the column. The type of the resulting expression is datetime.
Typeof
Helper function to figure out the inferred type of any expression.
Parameters
Default: None
The schema of the context under which the expression is to be analyzed. In the case of pipelines, this will be the schema of the input dataset and in the case of extractors, this will be the schema of the featureset.
Default value is set to None
which represents an empty dictionary.
1from fennel.expr import lit, col
2
3expr = lit(1) + col("amount")
4# type of 1 + col('amount') changes based on the type of 'amount'
5assert expr.typeof(schema={"amount": int}) == int
6assert expr.typeof(schema={"amount": float}) == float
7assert expr.typeof(schema={"amount": Optional[int]}) == Optional[int]
8assert expr.typeof(schema={"amount": Optional[float]}) == Optional[float]
9
10# typeof raises an error if type of 'amount' isn't provided
11with pytest.raises(ValueError):
12 expr.typeof()
13
14# or when the expression won't be valid with the schema
15with pytest.raises(ValueError):
16 expr.typeof(schema={"amount": str})
17
18# no need to provide schema if the expression is constant
19const = lit(1)
20assert const.typeof() == int
python
Returns
Returns the inferred type of the expression, if any.
Errors
All columns referenced by col
expression must be present in the provided
schema.
The expression should be valid in the context of the given schema.
When
Ternary expressions like 'if/else' or 'case' in SQL.
Parameters
The predicate expression for the ternary operator. Must evaluate to a boolean.
The expression that the whole when expression evaluates to if the predictate
evaluates to True. then
must always be called on the result of a when
expression.
Default: lit(None)
The equivalent of else
branch in the ternary expression - the whole expression
evaluates to this branch when the predicate evaluates to be False.
Defaults to lit(None)
when not provided.
1from fennel.expr import when, col, InvalidExprException
2
3expr = when(col("x")).then(1).otherwise(0)
4
5# type depends on the type of the then and otherwise values
6assert expr.typeof(schema={"x": bool}) == int
7
8# raises an error if the schema is not provided
9with pytest.raises(ValueError):
10 expr.typeof(schema={})
11# also when the predicate is not boolean
12with pytest.raises(ValueError):
13 expr.typeof(schema={"x": int})
14
15# can be evaluated with a dataframe
16import pandas as pd
17
18df = pd.DataFrame({"x": [True, False, True]})
19assert expr.eval(df, schema={"x": bool}).tolist() == [1, 0, 1]
20
21# not valid if only when is provided
22with pytest.raises(InvalidExprException):
23 expr = when(col("x"))
24 expr.typeof(schema={"x": bool})
25
26# if otherwise is not provided, it defaults to None
27expr = when(col("x")).then(1)
28assert expr.typeof(schema={"x": bool}) == Optional[int]
python
Returns
Returns an expression object denoting the result of the when/then/otherwise expression.
Errors
Error during typeof
or eval
if the referenced column isn't present.
Valid when
expressions must have accompanying then
and otherwise
clauses.
Day
Function to get the day component of a datetime object.
Parameters
Default: UTC
The timezone in which to interpret the datetime. If not specified, UTC is used.
Returns
Returns an expression object denoting the integer value of the day of the datetime object.
1from fennel.expr import col
2
3expr = col("x").dt.day
4
5# day works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == int
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14 pd.Timestamp("2024-01-01 10:00:00", tz="UTC"),
15 pd.Timestamp("2024-01-01 20:20:00", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20assert expr.eval(df, schema=schema).tolist() == [1, 1, 1]
21
22# also works with timezone aware datetimes
23expr = col("x").dt.with_tz(timezone="US/Eastern").day
24assert expr.eval(df, schema=schema).tolist() == [31, 1, 1]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
The timezone, if provided, must be a valid timezone string. Note that Fennel only supports area/location based timezones (e.g. "America/New_York"), not fixed offsets (e.g. "+05:30" or "UTC+05:30").
Hour
Function to get the hour component of a datetime object.
Parameters
Default: UTC
The timezone in which to interpret the datetime. If not specified, UTC is used.
Returns
Returns an expression object denoting the integer value of the hour of the datetime object.
1from fennel.expr import col
2
3expr = col("x").dt.hour
4
5# hour works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == int
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14 pd.Timestamp("2024-01-01 10:00:00", tz="UTC"),
15 pd.Timestamp("2024-01-01 20:20:00", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20assert expr.eval(df, schema=schema).tolist() == [0, 10, 20]
21
22# also works with timezone aware datetimes
23expr = col("x").dt.with_tz(timezone="US/Eastern").hour
24assert expr.eval(df, schema=schema).tolist() == [19, 5, 15]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
The timezone, if provided, must be a valid timezone string. Note that Fennel only supports area/location based timezones (e.g. "America/New_York"), not fixed offsets (e.g. "+05:30" or "UTC+05:30").
Minute
Function to get the minute component of a datetime object.
Parameters
Default: UTC
The timezone in which to interpret the datetime. If not specified, UTC is used.
Returns
Returns an expression object denoting the integer value of the minute of the datetime object.
1from fennel.expr import col
2
3expr = col("x").dt.minute
4
5# minute works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == int
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14 pd.Timestamp("2024-01-01 10:00:00", tz="UTC"),
15 pd.Timestamp("2024-01-01 20:20:00", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20assert expr.eval(df, schema=schema).tolist() == [0, 0, 20]
21
22# also works with timezone aware datetimes
23expr = col("x").dt.with_tz(timezone="US/Eastern").minute
24assert expr.eval(df, schema=schema).tolist() == [0, 0, 20]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
The timezone, if provided, must be a valid timezone string. Note that Fennel only supports area/location based timezones (e.g. "America/New_York"), not fixed offsets (e.g. "+05:30" or "UTC+05:30").
Month
Function to get the month component of a datetime object.
Parameters
Default: UTC
The timezone in which to interpret the datetime. If not specified, UTC is used.
Returns
Returns an expression object denoting the integer value of the month of the datetime object.
1from fennel.expr import col
2
3expr = col("x").dt.month
4
5# month works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == int
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14 pd.Timestamp("2024-01-01 10:00:00", tz="UTC"),
15 pd.Timestamp("2024-01-01 20:20:00", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20assert expr.eval(df, schema=schema).tolist() == [1, 1, 1]
21
22# also works with timezone aware datetimes
23expr = col("x").dt.with_tz(timezone="US/Eastern").month
24assert expr.eval(df, schema=schema).tolist() == [12, 1, 1]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
The timezone, if provided, must be a valid timezone string. Note that Fennel only supports area/location based timezones (e.g. "America/New_York"), not fixed offsets (e.g. "+05:30" or "UTC+05:30").
Second
Function to get the second component of a datetime object.
Parameters
Default: UTC
The timezone in which to interpret the datetime. If not specified, UTC is used.
Returns
Returns an expression object denoting the integer value of the second of the datetime object.
1from fennel.expr import col
2
3expr = col("x").dt.second
4
5# second works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == int
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:01", tz="UTC"),
14 pd.Timestamp("2024-01-01 10:00:02", tz="UTC"),
15 pd.Timestamp("2024-01-01 20:20:03", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20assert expr.eval(df, schema=schema).tolist() == [1, 2, 3]
21
22# also works with timezone aware datetimes
23expr = col("x").dt.with_tz(timezone="Asia/Kathmandu").second
24assert expr.eval(df, schema=schema).tolist() == [1, 2, 3]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
The timezone, if provided, must be a valid timezone string. Note that Fennel only supports area/location based timezones (e.g. "America/New_York"), not fixed offsets (e.g. "+05:30" or "UTC+05:30").
Since
Function to get the time elapsed between two datetime objects.
Parameters
The datetime object to calculate the elapsed time since.
Default: second
The unit of time to return the elapsed time in. Defaults to seconds. Valid units
are: week
, day
,hour
, minute
, second
, millisecond
, and microsecond
.
Returns
Returns an expression object denoting the integer value of the elapsed time since the specified datetime object in the specified unit.
1from fennel.expr import col
2
3expr = col("x").dt.since(col("y"))
4
5# since works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime, "y": datetime}) == int
7assert (
8 expr.typeof(schema={"x": Optional[datetime], "y": datetime})
9 == Optional[int]
10)
11
12# can be evaluated with a dataframe
13df = pd.DataFrame(
14 {
15 "x": [
16 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
17 pd.Timestamp("2024-01-01 10:00:00", tz="UTC"),
18 pd.Timestamp("2024-01-01 20:20:00", tz="UTC"),
19 ],
20 "y": [
21 pd.Timestamp("2023-01-01 00:00:00", tz="UTC"),
22 pd.Timestamp("2023-01-02 10:00:00", tz="UTC"),
23 pd.Timestamp("2023-01-03 20:20:00", tz="UTC"),
24 ],
25 }
26)
27schema = {"x": datetime, "y": datetime}
28expected = [31536000, 31449600, 31363200]
29assert expr.eval(df, schema=schema).tolist() == expected
30
31# can also change the unit of time
32expr = col("x").dt.since(col("y"), unit="minute")
33assert expr.eval(df, schema=schema).tolist() == [
34 525600,
35 524160,
36 522720,
37]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
Since Epoch
Function to get the time elapsed since epoch for a datetime object.
Parameters
Default: second
The unit of time to return the elapsed time in. Defaults to seconds. Valid units
are: week
, day
,hour
, minute
, second
, millisecond
, and microsecond
.
Returns
Returns an expression object denoting the integer value of the elapsed time since epoch for the datetime object in the specified unit.
1from fennel.expr import col
2
3expr = col("x").dt.since_epoch()
4
5# since_epoch works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == int
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14 pd.Timestamp("2024-01-01 10:00:00", tz="UTC"),
15 pd.Timestamp("2024-01-01 20:20:00", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20expected = [1704067200, 1704103200, 1704140400]
21assert expr.eval(df, schema=schema).tolist() == expected
22
23# can also change the unit of time
24expr = col("x").dt.since_epoch(unit="minute")
25assert expr.eval(df, schema=schema).tolist() == [
26 28401120,
27 28401720,
28 28402340,
29]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
Strftime
Function to format a datetime object as a string.
Parameters
The format string to use for the datetime.
Default: UTC
The timezone in which to interpret the datetime. If not specified, UTC is used.
Returns
Returns an expression object denoting the formatted datetime string.
1from fennel.expr import col
2
3expr = col("x").dt.strftime("%Y-%m-%d")
4
5# strftime works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == str
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[str]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14 pd.Timestamp("2024-01-02 10:00:00", tz="UTC"),
15 pd.Timestamp("2024-01-03 20:20:00", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20assert expr.eval(df, schema=schema).tolist() == [
21 "2024-01-01",
22 "2024-01-02",
23 "2024-01-03",
24]
25
26# also works with timezone aware datetimes
27expr = col("x").dt.with_tz(timezone="US/Eastern").strftime("%Y-%m-%d")
28assert expr.eval(df, schema=schema).tolist() == [
29 "2023-12-31",
30 "2024-01-02",
31 "2024-01-03",
32]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
The format string must be a valid format string.
The timezone must be a valid timezone. Note that Fennel only supports timezones
with area/location names (e.g. America/New_York
) and not timezones with offsets
(e.g. +05:00
).
Year
Function to get the year component of a datetime object.
Parameters
Default: UTC
The timezone in which to interpret the datetime. If not specified, UTC is used.
Returns
Returns an expression object denoting the integer value of the year of the datetime object.
1from fennel.expr import col
2
3expr = col("x").dt.year
4
5# year works for any datetime type or optional datetime type
6assert expr.typeof(schema={"x": datetime}) == int
7assert expr.typeof(schema={"x": Optional[datetime]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {
12 "x": [
13 pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
14 pd.Timestamp("2024-01-01 10:00:00", tz="UTC"),
15 pd.Timestamp("2024-01-01 20:20:00", tz="UTC"),
16 ]
17 }
18)
19schema = {"x": datetime}
20assert expr.eval(df, schema=schema).tolist() == [2024, 2024, 2024]
21
22# also works with timezone aware datetimes
23expr = col("x").dt.with_tz(timezone="US/Eastern").year
24assert expr.eval(df, schema=schema).tolist() == [2023, 2024, 2024]
python
Errors
The dt
namespace must be invoked on an expression that evaluates to datetime
or optional of datetime.
The timezone, if provided, must be a valid timezone string. Note that Fennel only supports area/location based timezones (e.g. "America/New_York"), not fixed offsets (e.g. "+05:30" or "UTC+05:30").
All
Function to check if all the elements in a boolean list are True
.
Returns
Returns an expression object denoting the result of the all
operation.
Only works when the list is of type bool or Optional[bool]. For an empty list,
returns an expression denoting True
. If the list has one or more None
elements, the result becomes None
.
1from fennel.expr import col
2
3expr = col("x").list.all()
4
5# works for lists of int/float or their optional versions
6assert expr.typeof(schema={"x": List[bool]}) == bool
7assert expr.typeof(schema={"x": List[Optional[bool]]}) == Optional[bool]
8assert (
9 expr.typeof(schema={"x": Optional[List[Optional[bool]]]})
10 == Optional[bool]
11)
12
13with pytest.raises(Exception):
14 expr.typeof(schema={"x": List[str]})
15
16# can be evaluated as well
17df = pd.DataFrame(
18 {"x": [[True, True], [True, False], [], None, [True, None]]}
19)
20schema = {"x": Optional[List[Optional[bool]]]}
21assert expr.eval(df, schema=schema).tolist() == [
22 True,
23 False,
24 True,
25 pd.NA,
26 pd.NA,
27]
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. All
can only be invoked on lists of bools (or
optionals of bool).
Any
Function to check if a boolean list contains any True
value.
Returns
Returns an expression object denoting the result of any
operation.
Only works when the list is of type bool(or optional bool). For
an empty list, returns an expression denoting 'False'. If the list has one or more
None
elements, the result becomes None
unless it also has True
in which case
the result is still True
.
1from fennel.expr import col
2
3expr = col("x").list.any()
4
5# works for lists of int/float or their optional versions
6assert expr.typeof(schema={"x": List[bool]}) == bool
7assert expr.typeof(schema={"x": List[Optional[bool]]}) == Optional[bool]
8assert (
9 expr.typeof(schema={"x": Optional[List[Optional[bool]]]})
10 == Optional[bool]
11)
12
13with pytest.raises(Exception):
14 expr.typeof(schema={"x": List[str]})
15
16# can be evaluated as well
17df = pd.DataFrame(
18 {"x": [[True, True], [True, False], [], None, [True, None]]}
19)
20schema = {"x": Optional[List[Optional[bool]]]}
21assert expr.eval(df, schema=schema).tolist() == [
22 True,
23 True,
24 False,
25 pd.NA,
26 True,
27]
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. Any
can only be invoked on lists of bool (or
optionals of bool).
At
Function to get the value of the element at a given index of the list.
Parameters
The index at which list's value needs to be evaluated. This expression is expected to evaluate to an int. Fennel supports indexing by negative integers as well.
1from fennel.expr import col
2
3expr = col("x").list.at(col("y"))
4
5# contains works for only list types, index can be int/optional[int]
6assert expr.typeof(schema={"x": List[int], "y": int}) == Optional[int]
7assert expr.typeof(schema={"x": List[str], "y": int}) == Optional[str]
8
9schema = {"x": Optional[List[float]], "y": float}
10with pytest.raises(Exception):
11 expr.typeof(schema=schema)
12
13# can be evaluated with a dataframe
14df = pd.DataFrame(
15 {
16 "x": [[1, 2, 3], [4, 5, None], [4, 5, None], None],
17 "y": [1, 5, 0, 4],
18 }
19)
20schema = {"x": Optional[List[Optional[int]]], "y": int}
21assert expr.eval(df, schema=schema).tolist() == [2, pd.NA, 4, pd.NA]
22
23# schema of column must be list of something
24with pytest.raises(ValueError):
25 expr.typeof(schema={"x": int})
python
1from fennel.expr import col
2
3expr = col("x").list.at(col("y"))
4
5# negative indices until -len(list) are allowed and do reverse indexing
6# beyond that, start returning None like other out-of-bounds indices
7df = pd.DataFrame(
8 {
9 "x": [[1, 2, 3], [4, 5, None], [4, 5, None], None],
10 "y": [-1, -5, -2, -4],
11 }
12)
13schema = {"x": Optional[List[Optional[int]]], "y": int}
14assert expr.eval(df, schema=schema).tolist() == [3, pd.NA, 5, pd.NA]
python
Returns
Returns an expression object denoting the value of the list at the given index.
If the index is out of bounds of list's length, None
is returned. Consequently,
for a list of elements of type T
, at
always returns Optional[T]
.
Fennel also supports negative indices: -1 maps to the last element of the list,
-2 to the second last element of the list and so on. Negative indices smaller
than -len start returning None
like other out-of-bound indices.
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. Similarly, index
must evaluate to an element of type int
or Optional[int]
.
Contains
Function to check if the given list contains a given element.
Parameters
contains
check if the base list contains the item
or not.
1from fennel.expr import col
2
3expr = col("x").list.contains(col("y"))
4
5# contains works for only list types
6assert expr.typeof(schema={"x": List[int], "y": int}) == bool
7assert (
8 expr.typeof(schema={"x": Optional[List[float]], "y": float})
9 == Optional[bool]
10)
11
12# however doesn't work if item is not of the same type as the list elements
13with pytest.raises(ValueError):
14 expr.typeof(schema={"x": List[int], "y": str})
15
16# can be evaluated with a dataframe
17df = pd.DataFrame(
18 {
19 "x": [[1, 2, 3], [4, 5, None], [4, 5, None], None, []],
20 "y": [1, 5, 3, 4, None],
21 }
22)
23schema = {"x": Optional[List[Optional[int]]], "y": Optional[int]}
24assert expr.eval(df, schema=schema).tolist() == [
25 True,
26 True,
27 pd.NA,
28 pd.NA,
29 False,
30]
31
32# schema of column must be list of something
33with pytest.raises(ValueError):
34 expr.typeof(schema={"x": int})
python
Returns
Returns an expression object denoting the result of the contains
expression.
The resulting expression is of type bool
or Optional[bool]
depending on
either of input/item being nullable.
Note that, Fennel expressions borrow semantics from SQL and treat None
as
an unknown value. As a result, the following rules apply to contains
in
presence of nulls:
- If the base list itself is
None
, the result isNone
regardless of the item. - If the item is
None
, the result isNone
regardless of the list, unless it is empty, in which case, the answer isFalse
(after all, if the list is empty, no matter the value of the item, it's not present in the list). - If the item is not
None
and is present in the list, the answer is obviouslyTrue
- However, if the item is not
None
, is not present in the list but the list has someNone
element, the result is stillNone
(because theNone
values in the list may have been that element - we just can't say)
This is somewhat (but not exactly) similar to Spark's array_contains
function.
If you are interested in checking if a list has any None
elements, a better
way of doing that is to use hasnull.
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. Similarly, item
must evaluate to an element of type T
or Optional[T]
if the list itself was of type List[T]
(or Optional[List[T]]
)
Filter
Function to filter a list down to elements satisfying a predicate.
Parameters
The variable name to which each element of the list should be bound to one-by-one.
The predicate expression to be used to filter the list down. This must
evaluate to bool for each element of the list. Note that this expression can
refer to the element under consideration via var(name)
where name is the
first argument given to the filter
operation (see example for details).
Returns
Returns an expression object denoting the filtered list.
1from fennel.expr import col, var
2
3expr = col("x").list.filter("x", var("x") % 2 == 0)
4
5# works as long as predicate is valid and evaluates to bool
6assert expr.typeof(schema={"x": List[int]}) == List[int]
7assert expr.typeof(schema={"x": List[float]}) == List[float]
8
9with pytest.raises(Exception):
10 expr.typeof(schema={"x": List[str]})
11
12# can be evaluated as well
13df = pd.DataFrame({"x": [[1, 2, 3], [], [1, 2, -2], None, [1, 3]]})
14schema = {"x": Optional[List[int]]}
15assert expr.eval(df, schema=schema).tolist() == [
16 [2],
17 [],
18 [2, -2],
19 pd.NA,
20 [],
21]
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list.
Has Null
Function to check if the given list has any None
values.
1from fennel.expr import col
2
3expr = col("x").list.hasnull()
4
5# len works for any list type or optional list type
6assert expr.typeof(schema={"x": List[int]}) == bool
7assert expr.typeof(schema={"x": Optional[List[float]]}) == Optional[bool]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame({"x": [[1, 2, 3], [4, 5, None], [], None]})
11schema = {"x": Optional[List[Optional[int]]]}
12assert expr.eval(df, schema=schema).tolist() == [False, True, False, pd.NA]
13
14# schema of column must be list of something
15with pytest.raises(ValueError):
16 expr.typeof(schema={"x": int})
python
Returns
Returns an expression object denoting the result of the hasnull
function.
The resulting expression is of type bool
or Optional[bool]
depending on
the input being nullable.
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list.
Len
Function to get the length of a list.
1from fennel.expr import col
2
3expr = col("x").list.len()
4
5# len works for any list type or optional list type
6assert expr.typeof(schema={"x": List[int]}) == int
7assert expr.typeof(schema={"x": Optional[List[float]]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame({"x": [[1, 2, 3], [4, 5], [], None]})
11schema = {"x": Optional[List[int]]}
12assert expr.eval(df, schema=schema).tolist() == [3, 2, 0, pd.NA]
13
14# schema of column must be list of something
15with pytest.raises(ValueError):
16 expr.typeof(schema={"x": int})
python
Returns
Returns an expression object denoting the result of the len
function.
The resulting expression is of type int
or Optional[int]
depending on
the input being nullable.
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list.
Map
Function to map each element of a list to get another list of the same size.
Parameters
The variable name to which each element of the list should be bound to one-by-one.
The expression to be used to transform each element of the list. Note that
this expression can refer to the element under consideration via var(name)
where name is the first argument given to the map
operation (see example for
details).
Returns
Returns an expression object denoting the transformed list.
1from fennel.expr import col, var
2
3expr = col("x").list.map("x", var("x") % 2)
4
5# works as long as predicate is valid
6assert expr.typeof(schema={"x": List[int]}) == List[int]
7assert expr.typeof(schema={"x": List[Optional[int]]}) == List[Optional[int]]
8
9# can be evaluated as well
10df = pd.DataFrame({"x": [[1, 2, 3], [], [1, 2, None], None, [1, 3]]})
11schema = {"x": Optional[List[Optional[int]]]}
12expected = [[1, 0, 1], [], [1, 0, pd.NA], pd.NA, [1, 1]]
13assert expr.eval(df, schema=schema).tolist() == expected
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list.
Max
Function to get the maximum value of a list.
Returns
Returns an expression object denoting the max value of a list.
Only works when the list is of type int/float (or their optional versions). For
an empty list, returns an expression denoting 'None'. If the list has one or more
None
elements, the result becomes None
.
1from fennel.expr import col
2
3expr = col("x").list.max()
4
5# works for lists of int/float or their optional versions
6assert expr.typeof(schema={"x": List[int]}) == Optional[int]
7assert expr.typeof(schema={"x": Optional[List[float]]}) == Optional[float]
8
9with pytest.raises(Exception):
10 expr.typeof(schema={"x": List[str]})
11
12# can be evaluated as well
13df = pd.DataFrame({"x": [[1, 2, 3], [4, 5, None], [], None]})
14schema = {"x": Optional[List[Optional[int]]]}
15assert expr.eval(df, schema=schema).tolist() == [3, pd.NA, pd.NA, pd.NA]
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. Max
can only be invoked on lists of ints/floats (or
optionals of ints/floats).
Mean
Function to get the mean of the values of a list.
Returns
Returns an expression object denoting the mean value of a list.
Only works when the list is of type int/float (or their optional versions). For
an empty list, returns an expression denoting 'None'. If the list has one or more
None
elements, the result becomes None
.
The output type of this expression is either float
or Optional[float]
depending
on the inputs.
1from fennel.expr import col
2
3expr = col("x").list.mean()
4
5# works for lists of int/float or their optional versions
6assert expr.typeof(schema={"x": List[int]}) == Optional[float]
7assert expr.typeof(schema={"x": Optional[List[float]]}) == Optional[float]
8
9with pytest.raises(Exception):
10 expr.typeof(schema={"x": List[str]})
11
12# can be evaluated as well
13df = pd.DataFrame({"x": [[1, 2, 3], [4, 5, None], [], None]})
14schema = {"x": Optional[List[Optional[int]]]}
15assert expr.eval(df, schema=schema).tolist() == [2.0, pd.NA, pd.NA, pd.NA]
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. Mean
can only be invoked on lists of ints/floats (or
optionals of ints/floats).
Min
Function to get the min value of a list.
Returns
Returns an expression object denoting the min value of a list.
Only works when the list is of type int/float (or their optional versions). For
an empty list, returns an expression denoting 'None'. If the list has one or more
None
elements, the result becomes None
.
1from fennel.expr import col
2
3expr = col("x").list.min()
4
5# works for lists of int/float or their optional versions
6assert expr.typeof(schema={"x": List[int]}) == Optional[int]
7assert expr.typeof(schema={"x": Optional[List[float]]}) == Optional[float]
8
9with pytest.raises(Exception):
10 expr.typeof(schema={"x": List[str]})
11
12# can be evaluated as well
13df = pd.DataFrame({"x": [[1, 2, 3], [4, 5, None], [], None]})
14schema = {"x": Optional[List[Optional[int]]]}
15assert expr.eval(df, schema=schema).tolist() == [1, pd.NA, pd.NA, pd.NA]
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. Min
can only be invoked on lists of ints/floats (or
optionals of ints/floats).
Sum
Function to get the sum of values of a list.
Returns
Returns an expression object denoting the sum of the values of the list.
Only works when the list is of type int/float (or their optional versions). For
an empty list, returns an expression denoting '0'. If the list has one or more
None
elements, the whole sum becomes None
.
1from fennel.expr import col
2
3expr = col("x").list.sum()
4
5# works for lists of int/float or their optional versions
6assert expr.typeof(schema={"x": List[int]}) == int
7assert expr.typeof(schema={"x": Optional[List[float]]}) == Optional[float]
8
9with pytest.raises(Exception):
10 expr.typeof(schema={"x": List[str]})
11
12# can be evaluated as well
13df = pd.DataFrame({"x": [[1, 2, 3], [4, 5, None], [], None]})
14schema = {"x": Optional[List[Optional[int]]]}
15assert expr.eval(df, schema=schema).tolist() == [6, pd.NA, 0, pd.NA]
python
Errors
The list
namespace must be invoked on an expression that evaluates to list
or optional of list. Sum
can only be invoked on lists of ints/floats (or
optionals of ints/floats).
Abs
Function to get the absolute value of a number.
Returns
Returns an expression object denoting the absolute value of the input data. The data type of the resulting expression is same as that of the input.
1from fennel.expr import col
2
3expr = col("x").abs() # equivalent to col("x").num.abs()
4
5assert expr.typeof(schema={"x": int}) == int
6assert expr.typeof(schema={"x": Optional[int]}) == Optional[int]
7assert expr.typeof(schema={"x": float}) == float
8assert expr.typeof(schema={"x": Optional[float]}) == Optional[float]
9
10# can be evaluated with a dataframe
11df = pd.DataFrame({"x": pd.Series([1, -2, pd.NA], dtype=pd.Int64Dtype())})
12assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [1, 2, pd.NA]
13
14with pytest.raises(ValueError):
15 expr.typeof(schema={"x": str})
python
Errors
Error during typeof
or eval
if the input expression is not of type int,
float, optional int or optional float.
Ceil
Function in num
namespace to get the ceil of a number.
Returns
Returns an expression object denoting the ceil of the input data. The
data type of the resulting expression is int
if the input was int
or float
or Optional[int]
when the input is Optional[int]
or Optional[float]
.
1from fennel.expr import col
2
3expr = col("x").ceil() # equivalent to col("x").num.ceil()
4assert expr.typeof(schema={"x": int}) == int
5assert expr.typeof(schema={"x": Optional[int]}) == Optional[int]
6assert expr.typeof(schema={"x": float}) == int
7assert expr.typeof(schema={"x": Optional[float]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame({"x": pd.Series([1.1, -2.3, None])})
11assert expr.eval(df, schema={"x": Optional[float]}).tolist() == [
12 2,
13 -2,
14 pd.NA,
15]
16
17with pytest.raises(ValueError):
18 expr.typeof(schema={"x": str})
python
Errors
Error during typeof
or eval
if the input expression is not of type int,
float, optional int or optional float.
Floor
Function in num
namespace to get the floor of a number.
Returns
Returns an expression object denoting the floor of the input data. The
data type of the resulting expression is int
if the input was int
or float
or Optional[int]
when the input is Optional[int]
or Optional[float]
.
1from fennel.expr import col
2
3expr = col("x").floor() # equivalent to col("x").num.floor()
4assert expr.typeof(schema={"x": int}) == int
5assert expr.typeof(schema={"x": Optional[int]}) == Optional[int]
6assert expr.typeof(schema={"x": float}) == int
7assert expr.typeof(schema={"x": Optional[float]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame({"x": pd.Series([1.1, -2.3, None])})
11assert expr.eval(df, schema={"x": Optional[float]}).tolist() == [
12 1,
13 -3,
14 pd.NA,
15]
16
17with pytest.raises(ValueError):
18 expr.typeof(schema={"x": str})
python
Errors
Error during typeof
or eval
if the input expression is not of type int,
float, optional int or optional float.
Round
Function in num
namespace to round a number.
Parameters
Default: 0
The number of the decimal places to round the input to.
Returns
Returns an expression object denoting the rounded value of the input data. The
data type of the resulting expression is int
/ Optional[int]
if precision is
set to 0
or float
/ Optional[int]
for precisions > 0.
1from fennel.expr import col
2
3expr = col("x").ceil() # equivalent to col("x").num.ceil()
4assert expr.typeof(schema={"x": int}) == int
5assert expr.typeof(schema={"x": Optional[int]}) == Optional[int]
6assert expr.typeof(schema={"x": float}) == int
7assert expr.typeof(schema={"x": Optional[float]}) == Optional[int]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame({"x": pd.Series([1.1, -2.3, None])})
11assert expr.eval(df, schema={"x": Optional[float]}).tolist() == [
12 2,
13 -2,
14 pd.NA,
15]
16
17with pytest.raises(ValueError):
18 expr.typeof(schema={"x": str})
python
Errors
Error during typeof
or eval
if the input expression is not of type int,
float, optional int or optional float.
Precision must be a non-negative integer.
To String
Function in num
namespace to convert a number to a string.
Returns
Returns an expression object denoting the string value of the input data. The
data type of the resulting expression is str
(or Optional[str]
if the input
is an optional number).
1from fennel.expr import col
2
3expr = col("x").num.to_string()
4
5# type is str or optional str
6assert expr.typeof(schema={"x": int}) == str
7assert expr.typeof(schema={"x": Optional[int]}) == Optional[str]
8assert expr.typeof(schema={"x": float}) == str
9assert expr.typeof(schema={"x": Optional[float]}) == Optional[str]
10
11# can be evaluated with a dataframe
12df = pd.DataFrame({"x": pd.Series([1.1, -2.3, None])})
13assert expr.eval(df, schema={"x": Optional[float]}).tolist() == [
14 "1.1",
15 "-2.3",
16 pd.NA,
17]
python
Errors
Error during typeof
or eval
if the input expression is not of type int,
float, optional int or optional float.
Concat
Function to concatenate two strings.
Parameters
The string to be concatenated with the base string.
1from fennel.expr import col
2
3expr = col("x").str.concat(col("y"))
4
5assert expr.typeof(schema={"x": str, "y": str}) == str
6assert expr.typeof(schema={"x": str, "y": Optional[str]}) == Optional[str]
7assert expr.typeof(schema={"x": Optional[str], "y": str}) == Optional[str]
8assert (
9 expr.typeof(schema={"x": Optional[str], "y": Optional[str]})
10 == Optional[str]
11)
12
13# can be evaluated with a dataframe
14df = pd.DataFrame(
15 {
16 "x": ["hello", "world", "some", None],
17 "y": [" world", " hello", None, None],
18 }
19)
20schema = {"x": Optional[str], "y": Optional[str]}
21assert expr.eval(df, schema=schema).tolist() == [
22 "hello world",
23 "world hello",
24 pd.NA,
25 pd.NA,
26]
27
28# schema of both columns must be str
29with pytest.raises(ValueError):
30 expr.typeof(schema={"x": str})
31
32with pytest.raises(Exception):
33 expr.typeof(schema={"x": str, "y": int})
python
Returns
Returns an expression object denoting the result of the concact
expression.
The resulting expression is of type str
or Optional[str]
depending on
either of input/item being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string. Similarly, item
must evaluate to either a string or an
optional of string.
Contains
Function to check if the given string contains another string.
Parameters
contains
check if the base string contains item
or not.
1from fennel.expr import col
2
3expr = col("x").str.contains(col("y"))
4
5assert expr.typeof(schema={"x": str, "y": str}) == bool
6assert expr.typeof(schema={"x": str, "y": Optional[str]}) == Optional[bool]
7assert expr.typeof(schema={"x": Optional[str], "y": str}) == Optional[bool]
8assert (
9 expr.typeof(schema={"x": Optional[str], "y": Optional[str]})
10 == Optional[bool]
11)
12
13# can be evaluated with a dataframe
14df = pd.DataFrame(
15 {
16 "x": ["hello", "world", "some", None],
17 "y": ["ell", "random", None, None],
18 }
19)
20schema = {"x": Optional[str], "y": Optional[str]}
21assert expr.eval(df, schema=schema).tolist() == [True, False, pd.NA, pd.NA]
22
23# schema of both columns must be str
24with pytest.raises(ValueError):
25 expr.typeof(schema={"x": str})
26
27with pytest.raises(Exception):
28 expr.typeof(schema={"x": str, "y": int})
python
Returns
Returns an expression object denoting the result of the contains
expression.
The resulting expression is of type bool
or Optional[bool]
depending on
either of input/item being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string. Similarly, item
must evaluate to either a string or an
optional of string.
Ends With
Function to check if the given string ends with the given another string.
Parameters
endswith
checks if the input string ends with the expression item
.
1from fennel.expr import col
2
3expr = col("x").str.endswith(col("y"))
4
5assert expr.typeof(schema={"x": str, "y": str}) == bool
6assert expr.typeof(schema={"x": str, "y": Optional[str]}) == Optional[bool]
7assert expr.typeof(schema={"x": Optional[str], "y": str}) == Optional[bool]
8assert (
9 expr.typeof(schema={"x": Optional[str], "y": Optional[str]})
10 == Optional[bool]
11)
12
13# can be evaluated with a dataframe
14df = pd.DataFrame(
15 {
16 "x": ["hello", "world", "some", None],
17 "y": ["lo", "wor", None, None],
18 }
19)
20schema = {"x": Optional[str], "y": Optional[str]}
21assert expr.eval(df, schema=schema).tolist() == [True, False, pd.NA, pd.NA]
22
23# schema of both columns must be str
24with pytest.raises(ValueError):
25 expr.typeof(schema={"x": str})
26
27with pytest.raises(Exception):
28 expr.typeof(schema={"x": str, "y": int})
python
Returns
Returns an expression object denoting the result of the endswith
expression.
The resulting expression is of type bool
or Optional[bool]
depending on
either of input/item being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string. Similarly, item
must evaluate to either a string or an
optional of string.
Json Extract
Function to extract a value from a json encoded string using a json path.
Parameters
The json path to use when extracting the value from the json encoded string. See this page for more details on json path syntax. The extracted value is always returned as a string or None if the path is not valid/found.
1from fennel.expr import col
2
3expr = col("s").str.json_extract("$.x.y")
4
5# return type is always Optional[str]
6assert expr.typeof(schema={"s": str}) == Optional[str]
7assert expr.typeof(schema={"s": Optional[str]}) == Optional[str]
8
9# can be evaluated with a dataframe
10df = pd.DataFrame(
11 {"s": ['{"x": {"y": "hello"}}', '{"x": {"y": 1}}', "{}", None]}
12)
13schema = {"s": Optional[str]}
14# NOTE that the integer value 1 is returned as a string and not an int
15# also invalid paths (e.g. "$.x.y" in case 3 of "{}") return null
16assert expr.eval(df, schema).tolist() == ["hello", "1", pd.NA, pd.NA]
python
Returns
Returns an expression object denoting the result of the json_extract
expression.
The resulting expression is of type Optional[str]
and more specifically is None
when the base string is None or the path is not found in the json encoded string.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string.
Len
Function to get the length of a string
1from fennel.expr import col
2
3expr = col("x").str.len()
4
5assert expr.typeof(schema={"x": str}) == int
6assert expr.typeof(schema={"x": Optional[str]}) == Optional[int]
7
8# can be evaluated with a dataframe
9df = pd.DataFrame({"x": ["hello", "world", "some", None]})
10schema = {"x": Optional[str]}
11assert expr.eval(df, schema=schema).tolist() == [5, 5, 4, pd.NA]
12
13# schema of column must be str
14with pytest.raises(ValueError):
15 expr.typeof(schema={"x": int})
python
Returns
Returns an expression object denoting the result of the len
function.
The resulting expression is of type int
or Optional[int]
depending on
input being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string.
Lower
Function to convert a string to all lowercase letters.
1from fennel.expr import col
2
3expr = col("x").str.lower()
4
5assert expr.typeof(schema={"x": str}) == str
6assert expr.typeof(schema={"x": Optional[str]}) == Optional[str]
7
8# can be evaluated with a dataframe
9df = pd.DataFrame({"x": ["HeLLo", "World", "some", None]})
10schema = {"x": Optional[str]}
11assert expr.eval(df, schema=schema).tolist() == [
12 "hello",
13 "world",
14 "some",
15 pd.NA,
16]
17
18# schema of column must be str
19with pytest.raises(ValueError):
20 expr.typeof(schema={"x": int})
python
Returns
Returns an expression object denoting the result of the lower
function.
The resulting expression is of type str
or Optional[str]
depending on
input being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string.
Parse
Function to parse an object of the given type out of a string that represents json encoded data.
Parameters
The type of the data should be parsed from the json encoded string.
1from fennel.expr import col, lit
2
3expr = col("x").str.parse(list[int])
4
5assert expr.typeof(schema={"x": str}) == List[int]
6assert expr.typeof(schema={"x": Optional[str]}) == Optional[List[int]]
7
8# can be evaluated with a dataframe
9df = pd.DataFrame({"x": ["[1, 2, 3]", "[4, 5]", None]})
10schema = {"x": Optional[str]}
11assert expr.eval(df, schema=schema).tolist() == [[1, 2, 3], [4, 5], pd.NA]
12
13# schema of column must be str
14with pytest.raises(ValueError):
15 expr.typeof(schema={"x": int})
16
17# can use this to parse several common types
18df = pd.DataFrame({"x": ["1"]})
19schema = {"x": str}
20cases = [
21 ("1", int, 1),
22 ("1.1", float, 1.1),
23 ("true", bool, True),
24 ("false", bool, False),
25 ('"hi"', str, "hi"),
26]
27for case in cases:
28 expr = lit(case[0]).str.parse(case[1])
29 assert expr.eval(df, schema).tolist() == [case[2]]
python
1from fennel.expr import col, lit
2
3invalids = [
4 ("False", bool), # "False" is not valid json, "false" is
5 ("hi", str), # "hi" is not valid json, "\"hi\"" is
6 ("[1, 2, 3", List[int]),
7 ("1.1.1", float),
8]
9for invalid in invalids:
10 expr = lit(invalid[0]).str.parse(invalid[1])
11 df = pd.DataFrame({"x": ["1"]})
12 schema = {"x": str}
13 with pytest.raises(Exception):
14 expr.eval(df, schema)
python
1from fennel.expr import col, lit
2from fennel.dtypes import struct
3
4@struct
5class MyStruct:
6 x: int
7 y: Optional[bool]
8
9cases = [
10 ('{"x": 1, "y": true}', MyStruct(1, True)),
11 ('{"x": 2, "y": null}', MyStruct(2, None)),
12 ('{"x": 3}', MyStruct(3, None)),
13]
14for case in cases:
15 expr = lit(case[0]).str.parse(MyStruct)
16 df = pd.DataFrame({"x": ["1"]})
17 schema = {"x": str}
18 found = expr.eval(df, schema).tolist()
19 assert len(found) == 1
20 assert found[0].x == case[1].x
python
Returns
Returns an expression object denoting the result of the parse
expression.
The resulting expression is of type dtype
or Optional[dtype]
depending on
the base string being nullable.
A type can only be parsed out of valid json representation of that type. For
instance, a str
can not be parsed out of "hi"
because the correct json
representation of the string is "\"hi\""
.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string.
If the given string can not be parsed into an object of the given type, a runtime error is raised.
Split
Function to split a string into a list of strings using a separator.
Parameters
The separator string to use when splitting the string.
1from fennel.expr import col
2
3expr = col("s").str.split(",")
4
5assert expr.typeof(schema={"s": str}) == List[str]
6assert expr.typeof(schema={"s": Optional[str]}) == Optional[List[str]]
7
8# can be evaluated with a dataframe
9df = pd.DataFrame({"s": ["a,b,c", "d,e", "f", None]})
10schema = {"s": Optional[str]}
11assert expr.eval(df, schema).tolist() == [
12 ["a", "b", "c"],
13 ["d", "e"],
14 ["f"],
15 pd.NA,
16]
python
Returns
Returns an expression object denoting the result of the split
function.
The resulting expression is of type List[str]
or Optional[List[str]]
depending on
input being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string.
Starts With
Function to check if the given string starts with another string.
Parameters
startswith
checks if the input string starts with the expression item
.
1from fennel.expr import col
2
3expr = col("x").str.startswith(col("y"))
4
5assert expr.typeof(schema={"x": str, "y": str}) == bool
6assert expr.typeof(schema={"x": str, "y": Optional[str]}) == Optional[bool]
7assert expr.typeof(schema={"x": Optional[str], "y": str}) == Optional[bool]
8assert (
9 expr.typeof(schema={"x": Optional[str], "y": Optional[str]})
10 == Optional[bool]
11)
12
13# can be evaluated with a dataframe
14df = pd.DataFrame(
15 {
16 "x": ["hello", "world", "some", None],
17 "y": ["he", "rld", None, None],
18 }
19)
20schema = {"x": Optional[str], "y": Optional[str]}
21assert expr.eval(df, schema=schema).tolist() == [True, False, pd.NA, pd.NA]
22
23# schema of both columns must be str
24with pytest.raises(ValueError):
25 expr.typeof(schema={"x": str})
26
27with pytest.raises(Exception):
28 expr.typeof(schema={"x": str, "y": int})
python
Returns
Returns an expression object denoting the result of the startswith
expression.
The resulting expression is of type bool
or Optional[bool]
depending on
either of input/item being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string. Similarly, item
must evaluate to either a string or an
optional of string.
Strptime
Function to parse a datetime of the given format out of the string.
Parameters
A valid datetime format string. See here for a full list of all format qualifiers supported by Fennel.
Default: UTC
Sometimes format strings don't precisely specify the timezone. In such cases, a timezone can be provided. In absence of an explicit timezone, all ambiguous strings are assumed to be in UTC.
Note that timezone
is merely a hint to resolve disambiguity - the timezone
info from the format string is preferentially used when available.
1from fennel.expr import col
2from datetime import datetime
3
4expr = col("x").str.strptime("%Y-%m-%d")
5
6assert expr.typeof(schema={"x": str}) == datetime
7assert expr.typeof(schema={"x": Optional[str]}) == Optional[datetime]
8
9df = pd.DataFrame({"x": ["2021-01-01", "2021-02-01", None]})
10schema = {"x": Optional[str]}
11assert expr.eval(df, schema).tolist() == [
12 pd.Timestamp(2021, 1, 1, tz="UTC"),
13 pd.Timestamp(2021, 2, 1, tz="UTC"),
14 pd.NaT,
15]
16
17# can also provide a timezone
18expr = col("x").str.strptime("%Y-%m-%d", timezone="Asia/Tokyo")
19
20assert expr.eval(df, schema).tolist() == [
21 pd.Timestamp(2021, 1, 1, tz="Asia/Tokyo"),
22 pd.Timestamp(2021, 2, 1, tz="Asia/Tokyo"),
23 pd.NaT,
24]
25
26# error on invalid format - %L is not a valid format
27expr = col("x").str.strptime("%Y-%m-%d %L)")
28with pytest.raises(Exception):
29 expr.eval(df, schema)
30
31# error on invalid timezone
32expr = col("x").str.strptime("%Y-%m-%d", timezone="invalid")
33with pytest.raises(Exception):
34 expr.eval(df, schema)
python
Returns
Returns an expression object denoting the result of the strptime
expression.
The resulting expression is of type datetime
or Optional[datetime]
depending on
either of input/item being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string.
Compile time error is raised if either of the format string or timezone is invalid.
Upper
Function to convert a string to all upper case letters.
1from fennel.expr import col
2
3expr = col("x").str.upper()
4
5assert expr.typeof(schema={"x": str}) == str
6assert expr.typeof(schema={"x": Optional[str]}) == Optional[str]
7
8# can be evaluated with a dataframe
9df = pd.DataFrame({"x": ["HeLLo", "World", "some", None]})
10schema = {"x": Optional[str]}
11assert expr.eval(df, schema=schema).tolist() == [
12 "HELLO",
13 "WORLD",
14 "SOME",
15 pd.NA,
16]
17
18# schema of column must be str
19with pytest.raises(ValueError):
20 expr.typeof(schema={"x": int})
python
Returns
Returns an expression object denoting the result of the upper
function.
The resulting expression is of type str
or Optional[str]
depending on
input being nullable.
Errors
The str
namespace must be invoked on an expression that evaluates to string
or optional of string.
Get
Function to get the value of a given field from a struct.
Parameters
The name of the field that needs to be obtained from the struct. Note that this must be a literal string, not an expression.
1from fennel.expr import col
2from fennel.dtypes import struct
3
4@struct
5class MyStruct:
6 f1: int
7 f2: bool
8
9expr = col("x").struct.get("f1")
10assert expr.typeof(schema={"x": MyStruct}) == int
11
12# error to get a field that does not exist
13expr = col("x").struct.get("z")
14with pytest.raises(ValueError):
15 expr.typeof(schema={"x": MyStruct})
16
17# can be evaluated with a dataframe
18df = pd.DataFrame(
19 {
20 "x": [MyStruct(1, True), MyStruct(2, False), None],
21 }
22)
23schema = {"x": Optional[MyStruct]}
24expr = col("x").struct.get("f1")
25assert expr.eval(df, schema=schema).tolist() == [1, 2, pd.NA]
python
Returns
Returns an expression object denoting the result of the get
operation.
If the corresponding field in the struct is of type T
, the resulting expression
is of type T
or Optional[T]
depending on the struct itself being nullable.
Errors
The struct
namespace must be invoked on an expression that evaluates to struct.
Compile error is raised when trying to get a field that doesn't exist on the struct.
Source
All Fennel sources are wrapped in the @source
decorator applied on top of the
datasets. This decorator specifies a bunch of options to configure the ingestion
mechanism that apply to most data sources.
Parameters
Default: "1h"
The frequency with which the ingestion should be carried out. Streaming sources like Kafka, Kinesis, Webhook ignore it since they do continuous polling.
Note that some Fennel sources make multiple round-trips of limited size in a single
iteration so as to not overload the system - every
only applies across full
iterations of ingestion.
Default: None
When since
is set, the source only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be >= since
.
Fennel reads as little data as possible given this constraint - for instance, when
reading parquet files, the filter is pushed all the way down. However, in
several cases, it's still necessary to read all the data before rejecting rows
that are older than since
.
Default: None
When until
is set, the source only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be < until
.
Fennel reads as little data as possible given this constraint - for instance, when
reading parquet files, the filter is pushed all the way down. However, in
several cases, it's still necessary to read all the data before rejecting rows
that are newer than until
.
Specifies how out of order can data from this source arrive.
Analogous to MaxOutOfOrderness
in Flink, this provides Fennel a guarantee that
if some row with timestamp t
has arrived, no other row with timestamp < t-disorder
can ever arrive. And if such rows do arrive, Fennel has the liberty of discarding
them and not including them in the computation.
When specifying sampling for a dataset, it can be provided in two ways:
- Simply specify the sampling rate when you want to sample the dataset without specifying the columns used for sampling.
- Sampling Rate: A float between 0 and 1 that determines the proportion of the dataset to include.
- Use the
Sample
object when you want to specify both the sampling rate and the specific columns used for sampling.- Sampling Rate: A float between 0 and 1 that determines the proportion of the dataset to include.
- Using: A list of columns used to hash for sampling the data. Preproc columns and the timestamp field cannot be included in this list.
Default Behavior When No Columns Are Specified
- For Keyed Datasets: All key columns are used for sampling, excluding any preproc columns.
- For Non-Keyed Datasets: All columns are used for sampling except for the timestamp and preproc columns.
Specifies how should valid change data be constructed from the ingested data.
"append"
means that data should be interpreted as sequence of append operations
with no deletes and no updates. Append can only be applied to keyless datasets (
to prevent situations where multiple inserts arrive with the same key fields). As
of right now, all SQL sources, Kafka, Kinesis, S3, and webhook support append
mode.
"upsert"
means that incoming data will only have inserts but should be
interpreted as sequence of upsert operations. It can only be used for keyed
datasets and works for every source where append works. Note that in order to
support "upsert"
, Fennel needs to maintain the last seen row for each key which
has some overhead. As a result, pre-prepared "debezium"
data should be preferred
over "upsert"
.
"native"
means that the underlying system exposes CDC natively and that Fennel
should tap into that. As of right now, native CDC is only available for
Deltalake & Hudi
and will soon be available for more sources including MySQL and Postgres.
"debezium"
means that the raw data itself is laid out in debezium layout out
of which valid CDC data can be constructed. This is only possible for sources
that expose raw schemaless data, namely, s3,
kinesis, kafka,
and webhook.
Default: None
When present, marks this source to be selected during commit
only when commit
operation itself is made for a env
that matches this env. Primary use case is to
decorate a single dataset with many @source
decorators and choose only one of
them to commit depending on the environment.
Default: None
When present, specifies the preproc behavior for the columns referred to by the keys of the dictionary.
As of right now, there are three kinds of values of preproc:
-
ref: Ref
: written asref(str)
and means that the column denoted by the key of this value is aliased to another column in the sourced data. This is useful, for instance, when you want to rename columns while bringing them to Fennel. With this, you can also perform indirections of kind A[B][C] and rename them while bringing to fennel. -
eval: Eval
: written aseval(Callable | Expr, schema: Dict[str, Type])
and means that the column denoted by the key of this value computed either through python callable or rust expressions. This is useful, for instance, when you want to change dtype of a column, add a new column using another column or fill nulls in the columns with some value to Fennel. The schema parameter is used to specify schema of columns which is needed for evaluation but not present in dataset. -
Any
: means that the column denoted by the key of this value should be given a constant value.
Fennel supports preproc ref(str) values of type A[B][C] only for the JSON and Protobuf formats, and A, B should be struct types. If you have data in other format or require indirection for other parent types apart from struct, please reach out to Fennel support.
Default: None
When present, filters source dataset rows with the input value.
As of now there are two kinds of values of where:
Callable
: In this case the input is a lambda which is used to filter rows.Eval
: Similar to eval value in preproc, the input here is an expression which is used to filter rows.
Default: False
When not set or set as False
, it indicates that the source possesses infinite
amount of data that is continuously increasing.
When set as True
, it indicates that the source possesses finite amount of data
and that it will exhaust at some point. In such cases, idleness
must also be set.
Default: None
Only relevant when bounded
is set to True
- in such cases, the bounded source
is assumed to have exhausted after Fennel is unable to obtain any new data despite
continuously asking for at least idleness
period.
1import pandas as pd
2from fennel.connectors import source, S3, ref, eval
3from fennel.datasets import dataset, field
4from fennel.connectors.connectors import Sample
5
6s3 = S3(name="my_s3") # using IAM role based access
7
8bucket = s3.bucket("data", path="user/*/date-%Y-%m-%d/*", format="parquet")
9
10if sys.version_info >= (3, 10):
11 @source(
12 bucket,
13 every="1h",
14 cdc="upsert",
15 disorder="2d",
16 since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021
17 until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022
18 preproc={
19 "uid": ref("user_id"), # 'uid' comes from column 'user_id'
20 "country": "USA", # country for every row should become 'USA'
21 "age": eval(
22 lambda x: pd.to_numeric(x["age"]).astype(int),
23 schema={"age": str},
24 ), # converting age dtype to int
25 },
26 where=eval(col("age") >= 18, schema={"age": int}),
27 env="prod",
28 sample=Sample(0.2, using=["email"]),
29 bounded=True,
30 idleness="1h",
31 )
32 @dataset
33 class User:
34 uid: int = field(key=True)
35 email: str
36 country: str
37 age: int
38 timestamp: datetime
python
Sink
All Fennel sinks are wrapped in the @sink
decorator applied on top of the
datasets. This decorator specifies a bunch of options to configure the sink
mechanism that apply to most data sinks.
Parameters
Default: "1h"
The frequency with which the sink should be carried out. Streaming sinks like Kafka, Kinesis, Webhook ignore it since they do continuous polling.
Note that some Fennel sinks make multiple round-trips of limited size in a single
iteration so as to not overload the system - every
only applies across full
iterations of ingestion.
Default: None
When since
is set, the sink only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be >= since
.
Default: None
When until
is set, the sink only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be < until
.
Specifies how should valid change data be used for the sink data.
"debezium"
means that the raw data itself is laid out in debezium layout out
of which valid CDC data can be constructed. This is only possible for kafka sink
as of now
Default: None
When present, marks this sink to be selected during commit
only when commit
operation itself is made for a env
that matches this env. Primary use case is to
decorate a single dataset with many @sink
decorators and choose only one of
them to commit depending on the environment.
Default: None
This denotes the style of sink
- incremental: Fennel incrementally sinks the new data changes
- recreate: Fennel recreates the entire sink every time
- SnapshotData: Fennel sinks only the current snapshot of dataset
Fennel supports only Increment style of sink. If you want the style to be either Recreate or SnapshotData, please reach out to Fennel support.
Default: None
This means that the column denoted by the key is aliased to another column in the sink data. This is useful, for instance, when you want to rename columns while sinking them.
1from fennel.connectors import sink
2
3@dataset
4@sink(
5 s3.bucket("datalake", prefix="user", format="delta"),
6 every="1d",
7 how="incremental",
8 renames={
9 "uid": "new_uid", # 'uid' column should be renamed to 'new_uid'
10 "email": "new_email", # 'email' column should be renamed to 'new_email'
11 },
12 since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021
13 until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022
14 env="prod",
15)
16class SomeDatasetFiltered:
17 uid: int = field(key=True)
18 email: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(SomeDataset)
23 def gmail_filtered(cls, dataset: Dataset):
24 return dataset.filter(
25 lambda row: row["email"].contains("gmail.com")
26 )
python
Expectations
Fennel's type system lets one maintain data integrity by rejecting data that does not conform to its types. However, there are cases where one may want to accept data that does not conform to the types, but still monitor how often the data does not conform to the types. For this, Fennel provides the ability to specify expectations on the data.
Fennel internally relies on Great Expectations to help
users easily specify data expectations. Fennel's expectations are a subset of Great Expectations
expectations and are documented below, but the api to specify expectations is the same.
Expectation Types
Single Column Expectations
The following expectations operate on a single column at a time.
expect_column_values_to_not_be_null
Expect the column values to not be null. To be counted as an exception, values must be explicitly null or missing, such as np.nan. Empty strings don't count as null unless they have been coerced to a null type.
Parameters:column (str)
– The column name.
expect_column_values_to_be_null
Expect the column values to be null. It is the inverse of
expect_column_values_to_not_be_null
.
Parameters:column (str)
– The column name.
expect_column_values_to_be_of_type
Expect a column to contain values of a specified data type.
Parameters:column (str)
– The column name.type_ (str)
– The expected data type of the column values.
expect_column_values_to_be_in_type_list
Expect a column to contain values of one of several specified data types.
Parameters:column (str)
– The column name.type_list (list)
– A list of expected data types of the column values.
expect_column_values_to_be_in_set
Expect each column value to be in a given set.
Parameters:column (str)
– The column name.value_set (list)
– A set of objects used for comparison.
expect_column_values_to_not_be_in_set
Expect each column value to not be in a given set.
Parameters:column (str)
– The column name.value_set (list)
– A set of objects used for comparison.
expect_column_values_to_be_between
Expect column values to be between a minimum value and a maximum value.
Parameters:column (str)
– The column name.min_value (int)
– The minimum value for a column entry.max_value (int)
– The maximum value for a column entry.strict_min (bool)
– If True, the column values must be strictly larger than min_value.strict_max (bool)
– If True, the column values must be strictly smaller than max_value.
expect_column_value_lengths_to_be_between
Expect the lengths of column values to be between a minimum value and a maximum value.
Parameters:column (str)
– The column name.min_value (int)
– The minimum value for a column entry length.max_value (int)
– The maximum value for a column entry length.
expect_column_value_lengths_to_equal
Expect the lengths of column values to equal a given value.
Parameters:column (str)
– The column name.value (int)
– The expected length of column values.
expect_column_values_to_match_regex
Expect column entries to be strings that match a given regular expression. .
Parameters:column (str)
– The column name.value (int)
– The expected length of column values.
expect_column_values_to_not_match_regex
Expect the lengths of column values to equal a given value.
Parameters:column (str)
– The column name.value (int)
– The expected length of column values.
expect_column_values_to_match_regex_list
Expect column entries to be strings that match at least one of a list of regular expressions.
Parameters:
column (str)
– The column name.regex_list (list)
– The list of regular expressions that each column entry should match at least one of.
expect_column_values_to_not_match_regex_list
Expect column entries to be strings that do not match any of a list of regular expressions.
Parameters:
column (str)
– The column name.regex_list (list)
– The list of regular expressions that each column entry should not match any of.
expect_column_values_to_match_strftime_format
Expect column entries to be strings representing a date or time with a given format.
Parameters:
column (str)
– The column name.strftime_format (str)
– The strftime format that each column entry should match.
expect_column_values_to_be_dateutil_parseable
Expect column entries to be parseable using dateutil.
Parameters:
column (str)
– The column name.
expect_column_values_to_be_json_parseable
Expect column entries to be parseable as JSON.
Parameters:
column (str)
– The column name.
expect_column_values_to_match_json_schema
Expect column entries to match a given JSON schema.
Parameters:
column (str)
– The column name.json_schema (dict)
– The JSON schema that each column entry should match.
Multi Column Expectations
The following expectations require two or more columns.
expect_column_pair_values_to_be_equal
Expect the values in a column to be the exact same as the values in another column.
Parameters:
column_A (str)
– The first column name.column_B (str)
– The second column name.ignore_row_if (str)
– Control how null values are handled. See ignore_row_if for details.
expect_column_pair_values_A_to_be_greater_than_B
Expect the values in column A to be greater than the values in column B.
Parameters:column_A (str)
– The first column name.column_B (str)
– The second column name.or_equal (bool)
– If True, then values can be equal, not strictly greater than.
expect_column_pair_values_to_be_in_set
Expect the values in a column to belong to a given set.
Parameters:
column_A (str)
– The first column name.column_B (str)
– The second column name.value_pairs_set (set)
– A set of tuples describing acceptable pairs of values. Each tuple should have two elements, the first from column A and the second from column B.
expect_multicolumn_sum_to_equal
Expect the sum of multiple columns to equal a specified value.
Parameters:column_list (list)
– The list of column names to be summed.sum_total (int)
– The expected sum of the columns.
Example
1from fennel.datasets import dataset
2from fennel.lib import (
3 expectations,
4 expect_column_values_to_be_between,
5 expect_column_values_to_be_in_set,
6 expect_column_pair_values_A_to_be_greater_than_B,
7)
8from fennel.dtypes import between
9
10
11@dataset
12class Sample:
13 passenger_count: between(int, 0, 100)
14 gender: str
15 age: between(int, 0, 100, strict_min=True)
16 mothers_age: between(int, 0, 100, strict_min=True)
17 timestamp: datetime
18
19 @expectations
20 def my_function(cls):
21 return [
22 expect_column_values_to_be_between(
23 column=str(cls.passenger_count),
24 min_value=1,
25 max_value=6,
26 mostly=0.95,
27 ),
28 expect_column_values_to_be_in_set(
29 str(cls.gender), ["male", "female"], mostly=0.99
30 ),
31 # Pairwise expectation
32 expect_column_pair_values_A_to_be_greater_than_B(
33 column_A=str(cls.age), column_B=str(cls.mothers_age)
34 ),
35 ]
python