API
Docs

Commit

Sends the local dataset and featureset definitions to the server for verification, storage and processing.

Parameters

message:str

Human readable description of the changes in the commit - akin to the commit message in a git commit.

datasets:List[Dataset]

Default: []

List of dataset objects to be committed.

featuresets:List[Featureset]

Default: []

List of featureset objects to be committed.

preview:bool

Default: False

If set to True, server only provides a preview of what will happen if commit were to be done but doesn't change the state at all.

Note

Since preview's main goal is to check the validity of old & new definitions, it only works with the real client/server. Mock client/server simply ignores it.

incremental:bool

Default: False

If set to True, Fennel assumes that only those datasets/featuresets are provided to commit operation that are potentially changing in any form. Any previously existing datasets/featuresets that are not included in the commit operation are left unchanged.

env:Optional[str]

Default: None

Selector to optionally commit only a subset of sources, pipelines and extractors - those with matching values. Rules of selection:

  • If env is None, all objects are selected
  • If env is not None, an object is selected if its own selector is either None or same as env or is ~x for some other x
backfill:bool

Default: True

If you set the backfill parameter to False, the system will return an error if committing changes would result in a backfill of any dataset/pipeline. A backfill occurs when there is no existing dataset that is isomorphic to the new dataset. Setting backfill to False helps prevent accidental backfill by ensuring that only datasets matching the existing structure are committed.

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3from fennel.featuresets import feature as F, featureset, extractor
4
5webhook = Webhook(name="some_webhook")
6
7@source(
8    webhook.endpoint("endpoint1"),
9    disorder="14d",
10    cdc="upsert",
11    env="bronze",
12)
13@source(
14    webhook.endpoint("endpoint2"),
15    disorder="14d",
16    cdc="upsert",
17    env="silver",
18)
19@dataset(index=True)
20class Transaction:
21    txid: int = field(key=True)
22    amount: int
23    timestamp: datetime
24
25@featureset
26class TransactionFeatures:
27    txid: int
28    amount: int = F(Transaction.amount, default=0)
29    amount_is_high: bool
30
31    @extractor(env="bronze")
32    def some_fn(cls, ts, amount: pd.Series):
33        return amount.apply(lambda x: x > 100)
34
35client.commit(
36    message="transaction: add transaction dataset and featureset",
37    datasets=[Transaction],
38    featuresets=[TransactionFeatures],
39    preview=False,  # default is False, so didn't need to include this
40    env="silver",
41)
Silver source and no extractor are committed

python

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3from fennel.featuresets import featureset, feature as F, extractor
4from fennel.lib import inputs, outputs
5
6webhook = Webhook(name="some_webhook")
7
8@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
9@dataset(index=True)
10class Transaction:
11    txid: int = field(key=True)
12    amount: int
13    timestamp: datetime
14
15client.commit(
16    message="transaction: add transaction dataset",
17    datasets=[Transaction],
18    incremental=False,  # default is False, so didn't need to include this
19)
20
21@featureset
22class TransactionFeatures:
23    txid: int
24    amount: int = F(Transaction.amount, default=0)
25    amount_is_high: bool
26
27    @extractor(env="bronze")
28    @inputs("amount")
29    @outputs("amount_is_high")
30    def some_fn(cls, ts, amount: pd.Series):
31        return amount.apply(lambda x: x > 100)
32
33client.commit(
34    message="transaction: add transaction featureset",
35    datasets=[],  # note: transaction dataset is not included here
36    featuresets=[TransactionFeatures],
37    incremental=True,  # now we set incremental to True
38)
Second commit adds a featureset & leaves dataset unchanged

python

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4webhook = Webhook(name="some_webhook")
5
6@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
7@dataset(index=True)
8class Transaction:
9    txid: int = field(key=True)
10    amount: int
11    timestamp: datetime
12
13client.checkout("test_backfill", init=True)
14client.commit(
15    message="transaction: add transaction dataset",
16    datasets=[Transaction],
17    backfill=True,  # default is True, so didn't need to include this
18)
19
20@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
21@dataset(index=True)
22class Transaction:
23    txid: int = field(key=True)
24    amount: int
25    timestamp: datetime
26
27client.checkout("main", init=True)
28client.commit(
29    message="adding transaction dataset to main",
30    datasets=[Transaction],
31    backfill=False,  # set backfill as False to prevent accidental backfill for Transaction dataset
32)
Backfill param will prevent backfill of Transaction dataset when committing to main branch

python

Log

Method to push data into Fennel datasets via webhook endpoints.

Parameters

webhook:str

The name of the webhook source containing the endpoint to which the data should be logged.

endpoint:str

The name of the webhook endpoint to which the data should be logged.

df:pd.Dataframe

The dataframe containing all the data that must be logged. The column of the dataframe must have the right names & types to be compatible with schemas of datasets attached to the webhook endpoint.

batch_size:int

Default: 1000

To prevent sending too much data in one go, Fennel client divides the dataframe in chunks of batch_size rows each and sends each chunk one by one.

Note that Fennel servers provides atomicity guarantee for any call of log - either the whole data is accepted or none of it is. However, breaking down a dataframe in chunks can lead to situation where some chunks have been ingested but others weren't.

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18    "some_webhook",
19    "some_endpoint",
20    df=pd.DataFrame(
21        columns=["uid", "amount", "timestamp"],
22        data=[
23            [1, 10, "2021-01-01T00:00:00"],
24            [2, 20, "2021-02-01T00:00:00"],
25        ],
26    ),
27)
Logging data to webhook via client

python

Errors

Invalid webhook endpoint:

Fennel will throw an error (equivalent to 404) if no endpoint with the given specification exists.

Schema mismatch errors:

There is no explicit schema tied to a webhook endpoint - the schema comes from the datasets attached to it. As a result, the log call itself doesn't check for schema mismatch but later runtime errors may be generated async if the logged data doesn't match the schema of the attached datasets.

You may want to keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Query

Method to query the latest value of features (typically for online inference).

Parameters

inputs:List[Union[Feature, str]]

List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.

outputs:List[Union[Featureset, Feature, str]]

List of features that need to be queries. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queries) or strings representing fully qualified feature names.

input_dataframe:pd.Dataframe

A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.

log:bool

Default: False

Boolean which indicates if the queried features should also be logged (for log-and-wait approach to training data generation).

workflow:str

Default: 'default'

The name of the workflow associated with the feature query. Only relevant when log is set to True, in which case, features associated with the same workflow are collected together. Useful if you want to separate logged features between, say, login fraud and transaction fraud.

sampling_rate:float

Default: 1.0

The rate at which feature data should be sampled before logging. Only relevant when log is set to True.

1from fennel.featuresets import featureset, extractor
2from fennel.lib import inputs, outputs
3
4@featureset
5class Numbers:
6    num: int
7    is_even: bool
8    is_odd: bool
9
10    @extractor
11    @inputs("num")
12    @outputs("is_even", "is_odd")
13    def my_extractor(cls, ts, nums: pd.Series):
14        is_even = nums.apply(lambda x: x % 2 == 0)
15        is_odd = is_even.apply(lambda x: not x)
16        return pd.DataFrame({"is_even": is_even, "is_odd": is_odd})
17
18client.commit(message="some commit msg", featuresets=[Numbers])
19
20# now we can query the features
21feature_df = client.query(
22    inputs=[Numbers.num],
23    outputs=[Numbers.is_even, Numbers.is_odd],
24    input_dataframe=pd.DataFrame({"Numbers.num": [1, 2, 3, 4]}),
25)
26
27pd.testing.assert_frame_equal(
28    feature_df,
29    pd.DataFrame(
30        {
31            "Numbers.is_even": [False, True, False, True],
32            "Numbers.is_odd": [True, False, True, False],
33        }
34    ),
35)
Querying two features

python

Returns

type:Union[pd.Dataframe, pd.Series]

Returns the queried features as dataframe with one column for each feature in outputs. If a single output feature is requested, features are returned as a single pd.Series. Note that input features aren't returned back unless they are also present in the outputs

Errors

Unknown features:

Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.

Resolution error:

An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.

Schema mismatch errors:

Fennel raises a run-time error if any extractor returns a value of the feature that doesn't match its stated type.

Authorization error:

Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.

Query Offline

Method to query the historical values of features. Typically used for training data generation or batch inference.

Parameters

inputs:List[Union[Feature, str]]

List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.

outputs:List[Union[Featureset, Feature, str]]

List of features that need to be queried. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queried) or strings representing fully qualified feature names.

input_dataframe:Optional[pd.Dataframe]

A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.

This parameter is mutually exclusive with input_s3.

input_s3:Optional[connectors.S3]

Sending large volumes of the input data over the wire is often infeasible. In such cases, input data can be written to S3 and the location of the file is sent as input_s3 via S3.bucket() function of S3 connector.

When using this option, please ensure that Fennel's data connector IAM role has the ability to execute read & list operations on this bucket - talk to Fennel support if you need help.

timestamp_column:str

The name of the column containing the timestamps as of which the feature values must be computed.

output_s3:Optional[connectors.S3]

Specifies the location & other details about the s3 path where the values of all the output features should be written. Similar to input_s3, this is provided via S3.bucket() function of S3 connector.

If this isn't provided, Fennel writes the results of all requests to a fixed default bucket - you can see its details from the return value of query_offline or via Fennel Console.

When using this option, please ensure that Fennel's data connector IAM role has write permissions on this bucket - talk to Fennel support if you need help.

workflow:str

Default: 'default'

The name of the workflow associated with the feature query. It functions like a tag for example, "fraud" or "finance" to categorize the query. If this parameter is not provided, it will default to "default".

feature_to_column_map:Optional[Dict[Feature, str]]

Default: None

When reading input data from s3, sometimes the column names in s3 don't match one-to-one with the names of the input features. In such cases, a dictionary mapping features to column names can be provided.

This should be setup only when input_s3 is provided.

Returns

type:Dict[str, Any]

Immediately returns a dictionary containing the following information:

  • request_id - a random uuid assigned to this request. Fennel can be polled about the status of this request using the request_id
  • output s3 bucket - the s3 bucket where results will be written
  • output s3 path prefix - the prefix of the output s3 bucket
  • completion rate - progress of the request as a fraction between 0 and 1
  • failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
  • status - the overall status of this request

A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.

Errors

Unknown features:

Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.

Resolution error:

An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.

Schema mismatch errors:

Fennel raises a run-time error and may register failure on a subset of rows if any extractor returns a value of the feature that doesn't match its stated type.

Authorization error:

Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.

Request

Response

1response = client.query_offline(
2    inputs=[Numbers.num],
3    outputs=[Numbers.is_even, Numbers.is_odd],
4    format="pandas",
5    input_dataframe=pd.DataFrame(
6        {"Numbers.num": [1, 2, 3, 4]},
7        {
8            "timestamp": [
9                datetime.now(timezone.utc) - HOUR * i for i in range(4)
10            ]
11        },
12    ),
13    timestamp_column="timestamp",
14    workflow="fraud",
15)
16print(response)
Example with pandas input & default s3 output

python

1from fennel.connectors import S3
2
3s3 = S3(
4    name="extract_hist_input",
5    aws_access_key_id="<ACCESS KEY HERE>",
6    aws_secret_access_key="<SECRET KEY HERE>",
7)
8s3_input_connection = s3.bucket("bucket", prefix="data/user_features")
9s3_output_connection = s3.bucket("bucket", prefix="output")
10
11response = client.query_offline(
12    inputs=[Numbers.num],
13    outputs=[Numbers.is_even, Numbers.is_odd],
14    format="csv",
15    timestamp_column="timestamp",
16    input_s3=s3_input_connection,
17    output_s3=s3_output_connection,
18    workflow="fraud",
19)
Example specifying input and output s3 buckets

python

track_offline_query

Track Offline Query

Method to monitor the progress of a run of offline query.

Parameters

request_id:str

The unique request ID returned by the query_offline operation that needs to be tracked.

Returns

type:Dict[str, Any]

Immediately returns a dictionary containing the following information:

  • request_id - a random uuid assigned to this request. Fennel can be polled about the status of this request using the request_id
  • output s3 bucket - the s3 bucket where results will be written
  • output s3 path prefix - the prefix of the output s3 bucket
  • completion rate - progress of the request as a fraction between 0 and 1
  • failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
  • status - the overall status of this request

A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.

Request

Response

1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.track_offline_query(request_id)
3print(response)
Checking progress of a prior extract historical request

python

cancel_offline_query

Cancel Offline Query

Method to cancel a previously issued query_offline request.

Parameters

request_id:str

The unique request ID returned by the query_offline operation that needs to be canceled.

Request

Response

1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.cancel_offline_query(request_id)
3print(response)
Canceling offline query with given ID

python

Returns

type:Dict[str, Any]

Marks the request for cancellation and immediately returns a dictionary containing the following information:

  • request_id - a random uuid assigned to this request. Fennel can be polled about the status of this request using the request_id
  • output s3 bucket - the s3 bucket where results will be written
  • output s3 path prefix - the prefix of the output s3 bucket
  • completion rate - progress of the request as a fraction between 0 and 1
  • failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
  • status - the overall status of this request

A completion rate of 1.0 indicates that all processing had been completed. A completion rate of 1.0 and failure rate of 0 means that all processing had been completed successfully.

Lookup

Method to lookup rows of keyed datasets.

Parameters

dataset:Union[str, Dataset]

The name of the dataset or Dataset object to be looked up.

keys:List[Dict[str, Any]]

List of dict where each dict contains the value of the key fields for one row for which data needs to be looked up.

fields:List[str]

The list of field names in the dataset to be looked up.

timestamps:List[Union[int, str, datetime]]

Default: None

Timestamps (one per row) as of which the lookup should be done. If not set, the lookups are done as of now (i.e the latest value for each key).

If set, the length of this list should be identical to that of the number of elements in the keys.

Timestamp itself can either be passed as datetime or str (e.g. by using pd.to_datetime or int denoting seconds/milliseconds/microseconds since epoch).

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18    "some_webhook",
19    "some_endpoint",
20    pd.DataFrame(
21        data=[
22            [1, 10, "2021-01-01T00:00:00"],
23            [2, 20, "2021-02-01T00:00:00"],
24        ],
25        columns=["uid", "amount", "timestamp"],
26    ),
27)
28
29# now do a lookup to verify that the rows were logged
30keys = pd.DataFrame({"uid": [1, 2, 3]})
31ts = [
32    datetime(2021, 1, 1, 0, 0, 0),
33    datetime(2021, 2, 1, 0, 0, 0),
34    datetime(2021, 3, 1, 0, 0, 0),
35]
36response, found = client.lookup(
37    "Transaction",
38    keys=keys,
39    timestamps=pd.Series(ts),
40)
Example of doing lookup on dataset

python

init_branch

Init Branch

Creates a new empty branch and checks out the client to point towards it.

Parameters

name:str

The name of the branch that should be created. The name can consist of any alpha numeric character [a-z, A-Z, 0-9] as well as slashes "/", hyphens "-", underscores "_", and periods "."

Errors

Invalid name:

Raises an error if the name of the branch contains invalid characters.

Branch already exists:

Raises an error if a branch of the same name already exists.

Invalid auth token:

Raises an error if the auth token isn't valid. Not applicable to the mock client.

Insufficient permissions:

Raises an error if the account corresponding to the auth token doesn't carry the permission to create a new branch. Not applicable to the mock client.

1client.init_branch("mybranch")
2
3# init checks out client to the new branch
4# so this commit (or any other operations) will be on `mybranch`
5client.commit(...)
Create a new empty branch 'mybranch'

python

clone_branch

Clone Branch

Clones an existing branch into a new branch and checks out the client to point towards it.

Parameters

name:str

The name of the new branch that should be created as a result of the clone. The name can consist of any ASCII characters.

from_name:str

The name of the existing branch that should be cloned into the new branch.

Errors

Destination branch already exists:

Raises an error if a branch of the same name already exists.

Source branch does not exist:

Raises an error if there is no existing branch of the name from_branch.

Invalid auth token:

Raises an error if the auth token isn't valid. Not applicable to the mock client.

Insufficient permissions:

Raises an error if the account corresponding to the auth token doesn't carry permissions to create a new branch. Also raises an error if the token doesn't have access to entities defined in the source branch. Auth/permissions checks are not applicable to the mock client.

1client.init_branch("base")
2# do some operations on `base` branch
3client.commit(...)
4
5# clone `base` branch to `mybranch`
6client.clone_branch("mybranch", "base")
7
8# clone checks out client to the new branch
9# so this commit (or any other operations) will be on `mybranch`
10client.commit(...)
Clone 'base' branch into 'mybranch'

python

delete_branch

Delete Branch

Deletes an existing branch and checks out the client to point to the main branch.

Parameters

name:str

The name of the existing branch that should be deleted.

Errors

Branch does not exist:

Raises an error if a branch of the given name doesn't exist.

Invalid auth token:

Raises an error if the auth token isn't valid. Not applicable to the mock client.

Insufficient permissions:

Raises an error if the account corresponding to the auth token doesn't carry the permission to delete branches. Also raises an error if the token doesn't have edit access to the entities in the branch being deleted. Not applicable to the mock client.

1client.delete_branch("mybranch")
2
3# do some operations on the branch
4client.commit(...)
5
6# delete the branch
7client.init_branch("mybranch")
8
9# client now points to the main branch
10client.commit(...)
Delete an existing branch

python

checkout

Checkout

Sets the client to point to the given branch.

Parameters

name:str

The name of the branch that the client should start pointing to. All subsequent operations (e.g. commit, query) will be directed to this branch.

init:bool

Default: False

If true, creates a new empty branch if the name is not found within the branches synced with Fennel

1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
7
8# create and change active branch from `mybranch` to `mybranch2`
9client.checkout("mybranch2", init=True)
10assert client.branch() == "mybranch2"
11
12# all subsequent operations will be on `mybranch2`
Changing client to point to 'mybranch'

python

Errors

checkout does not raise any error.

Note

If not specified via explicit checkout, by default, clients point to the 'main' branch.

Note

Note that checkout doesn't validate that the name points to a real branch by default. Instead, it just changes the local state of the client. If the branch doesn't exist, subsequent branch operations will fail, not the checkout itself. However, when init is set to True, checkout will first create the branch if a real branch is not found and subsequently point to it.

branch

Branch

Get the name of the current branch.

Parameters

Doesn't take any parameters.

Returns

name:str

Returns the name of the branch that the client is pointing to.

1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
7
8# create and change active branch from `mybranch` to `mybranch2`
9client.checkout("mybranch2", init=True)
10assert client.branch() == "mybranch2"
11
12# all subsequent operations will be on `mybranch2`
Get the name of the current branch

python

erase

Erase

Method to hard-erase data from a dataset.

Data related to the provided erase keys is removed and will not be reflected to downstream dataset or any subsequent queries.

This method should be used as a way to comply with GDPR and other similar regulations that require "right to be forgotten". For operational deletion/correction of data, regular CDC mechanism must be used instead.

Warning

Erase only removes the data from the dataset in the request. If the data has already propagated to downstream datasets via pipelines, you may want to issue separate erase requests for all such datasets too.

Parameters

dataset:Union[Dataset, str]

The dataset from which data needs to be erased. Can be provided either as a Dataset object or string representing the dataset name.

erase_keys:pd.Dataframe

The dataframe containing the erase keys - all data matching these erase keys is removed. The columns of the dataframe must have the right names & types to be compatible with the erase keys defined in the schema of dataset.

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10    uid: int = field(key=True, erase_key=True)
11    amount: int
12    timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16client.erase(
17    Transaction,
18    erase_keys=pd.DataFrame({"uid": [1, 2, 3]}),
19)
Erasing data corresponding to given uids

python