Docs
API

Concepts

Introduction

Fennel has two core concepts - datasets and featuresets. Let's look at both one by one.

Dataset

Dataset refers to a "table" of data with typed columns. Here is how a dataset is defined.

1@dataset
2class UserDataset:
3    uid: int = field(key=True)
4    dob: datetime
5    country: str
6    update_time: datetime = field(timestamp=True)

python

This dataset has four columns -- uid (of type int), dob (of type datetime), country (of type string), and signup_time (of type datetime). For now, ignore the field(...) descriptors - they'd be explained soon.

Source

Fennel datasets can be hydrated from external data sources. To do that, first we define the external sources by providing the required credentials:

1from fennel.connectors import source, Postgres, Kafka
2
3postgres = Postgres(host=... < credentials >..)
4kafka = Kafka(... < credentials >..)

python

Then we define the datasets that will hydrate themselves from these sources:

1user_table = postgres.table("user", cursor="signup_at")
2
3@source(user_table, every="1m", disorder="7d", cdc="upsert")
4@dataset(index=True)
5class User:
6    uid: int = field(key=True)
7    dob: datetime
8    country: str
9    signup_at: datetime = field(timestamp=True)
10
11@source(kafka.topic("txn"), disorder="1d", cdc="append")
12@dataset
13class Transaction:
14    uid: int
15    amount: float
16    payment_country: str
17    merchant_id: int
18    timestamp: datetime = field(timestamp=True)

python

The first dataset will poll postgres table for new updates every minute and hydrate itself with new data. The second dataset hydrates itself from a kafka topic. There are few more kwargs set here like disorder and cdc - ignore them for now - though if you are interested, you can read about them and sources in general here.

Besides Postgres and Kafka, Fennel supports connectors with many other sources. See the full list.

Pipeline

Once you have one or more "sourced" datasets, you can derive new datasets from existing datasets by writing simple declarative Python code - it's unimaginatively called a pipeline. Let's look at one such pipeline:

1@dataset(index=True)
2class UserTransactionsAbroad:
3    uid: int = field(key=True)
4    count: int
5    amount_1d: float
6    amount_1w: float
7    timestamp: datetime = field(timestamp=True)
8
9    @pipeline
10    @inputs(User, Transaction)
11    def first_pipeline(cls, user: Dataset, transaction: Dataset):
12        joined = transaction.join(user, how="left", on=["uid"])
13        abroad = joined.filter(
14            lambda df: df["country"] != df["payment_country"]
15        )
16        return abroad.groupby("uid").aggregate(
17            count=Count(window=Continuous("forever")),
18            amount_1d=Sum(of="amount", window=Continuous("1d")),
19            amount_1w=Sum(of="amount", window=Continuous("1w")),
20        )

python

Index

It's also possible to do low latency lookups on Fennel datasets using dataset keys. Earlier you were asked to ignore the field descriptors. It's time to revisit those. If you look carefully, line with field(key=True) above defines uid to be a key - similar to primary keys in databases. Note that Fennel datasets can have multi-column keys too.

Keyed datasets can be "indexed" by setting index=True in the dataset decorator - this tells Fennel to build some auxiliary data structure to look up the value of a row with a given uid (or more generally the value of all the key fields).

1uids = pd.Series([1, 2, 3, 4])
2ts = pd.Series([now, now, now, now])
3data, found = UserTransactionsAbroad.lookup(ts, uid=uids)

python

Here "found" is a boolean series denoting whether there was any row in the dataset with that key. If data isn't found, a row of Nones is returned. What's even cooler is that this method can be used to lookup the value as of any arbitrary time (via the ts argument). Fennel datasets track time evolution of data as the data evolves and can go back in time to do a lookup. This movement of data is tagged with whatever field is tagged with field(timestamp=True). In fact, this ability to track time evolution enables Fennel to use the same code to generate both online and offline features.

Featureset

So we can define datasets, source them from external datasets, derive them via pipelines, and do temporal primary key lookups on them via indices. What has all this to do with features? How to write a feature in Fennel? Well, this is where we have to talk about the second main concept - featureset.

A featureset, as the name implies, is just a collection of features. Features in Fennel, however, are different from features in many other systems - they don't represent stored data but instead are backed by a stateless Python function that knows how to extract it - called an extractor. Let's define a feature that computes user's age using the datasets we defined above.

Here is how a simple featureset looks:

1@featureset
2class UserFeature:
3    uid: int
4    country: str
5    age: float
6
7    @extractor(deps=[User])
8    @inputs("uid")
9    @outputs("age")
10    def get_age(cls, ts: pd.Series, uids: pd.Series):
11        df, _ = User.lookup(ts=ts, uid=uids, fields=["dob"])
12        df.fillna(datetime(1970, 1, 1), inplace=True)
13        age = (ts - df["dob"]).dt.days / 365  # age is calculated as of `ts`
14        return pd.DataFrame(age, columns=["age"])
15
16
17    @extractor(deps=[User])
18    @inputs("uid")
19    @outputs("country")
20    def get_country(cls, ts: pd.Series, uids: pd.Series):
21        countries, _ = User.lookup(ts=ts, uid=uids, fields=["country"])
22        countries = countries.fillna("unknown")
23        return countries

python

This is a featureset with 3 features --- uid, country, and age. Highlighted lines describe an extractor that given the value of the feature uid, knows how to define the feature age. Inside the extractor function, you are welcome to do arbitrary Python computation. This featureset has another extractor function - this one knows how to compute country given the input uid.

More crucially, these extractors are able to do lookup on User dataset that we defined earlier to read the data computed by datasets.

Datasets & featuresets have a complimentary relationship. Datasets are updated on the write path -- as the new data arrives, it goes to datasets from which it goes to other datasets via pipelines. All of this is happening asynchronously and results in data being stored in datasets.

Features, however, are a purely 'read side' concept - feature is extracted while the request is waiting (this could be online feature serving request or offline training data generation request). And the bridge between them is lookup functionality of the dataset.

Committing Datasets and Features

When you work with Fennel, your dataset and featureset definitions will live in a Python file in your codebase or a notebook - but the Fennel servers won't know about them until you explicitly tell them.

To communicate with Fennel server, you'd typically create a Client object:

1from fennel.client import Client
2
3client = Client(
4    os.environ["FENNEL_SERVER_URL"], token=os.environ["FENNEL_TOKEN"]
5)

python

And once you have a client object, you'd issue a commit request to commit your dataset/featureset definitions with the server:

1client.commit(
2    message="user: add transaction datasets; first few features",
3    datasets=[User, Transaction, UserTransactionsAbroad],
4    featuresets=[UserFeature],
5)

python

This makes a POST request to Fennel and commits the dataset on the server. Fennel may reject this commit request if there is any error with any dataset or featureset e.g. if a dataset already exists with this name or somehow this dataset is malformed.

Overtime, you'd have many more datasets and featuresets - you'd send all of them in a commit call. And with that, the validation can become lot more complex e.g schema compatibility validation across the whole graph of datasets/featuresets.

Assuming the call succeeds, any datasets/featuresets that don't yet exist will be created, any datasets/featuresets that exist but are not provided in the commit are deleted and rest are left unchanged.

Feature Extraction Requests

Once a few datasets/featuresets have been defined, you'd want to read the value of these features for particular inputs (say uids). That can be accomplished via extract API which reads the 'latest' value of features or extract_historical API which reads historical values of features for training data generation purposes. Here is how they look:

1feature_df = client.query(
2    outputs=[
3        UserFeature.age,
4        UserFeature.country,
5    ],
6    inputs=[
7        UserFeature.uid,
8    ],
9    input_dataframe=pd.DataFrame({"UserFeature.uid": [1, 3]}),
10)

python

Here, we are trying to read two features of the UserFeature featureset - age and country (as defined in outputs). We are providing the value of one feature uid (as defined in inputs) and the actual values of uid are provided in input_dataframe object. Note that even though this snippet shows Python, this is a REST endpoint and can be queried via any other language as well.

There is an analogous function to get historical values of features called extract_historical:

1feature_df = client.query_offline(
2    outputs=[
3        UserFeature.age,
4        UserFeature.country,
5    ],
6    inputs=[
7        UserFeature.uid,
8    ],
9    input_dataframe=pd.DataFrame(
10        {
11            "UserFeature.uid": [1, 3],
12            "timestamp": [
13                datetime.now(timezone.utc),
14                datetime.now(timezone.utc) - timedelta(days=1),
15            ],
16        }
17    ),
18    timestamp_column="timestamp",
19)

python

This is almost identical as before except we are also passing row-level timestamps as of which we want features to be extracted. extract_historical is the mechanism to generate point-in-time correct training datasets or do large scale batch inference.

Branches

Production machine learning is all about experimentation. You'd soon find yourself wanting to change something about these datasets/features and/or add new ones in an experimental way.

To support that, Fennel supports a git-inspired branch model of development. You can create new empty branches or clone existing branches - and after that, you can make changes to branches independently without affecting other branches.

1client.init_branch("dev")
2client.commit(
3    message="some module: some git like commit message",
4    datasets=[SomeDataset],
5    featuresets=[SomeFeatureset],
6)
7
8client.query(
9    outputs=[SomeFeatureset.country],
10    inputs=[SomeFeatureset.uid],
11    input_dataframe=pd.DataFrame({"SomeFeatureset.uid": [1, 2]}),
12)

python

In the above example, we created an empty branch called dev, made a commit on this newly created dev branch, and finally, issued a query against this branch.

In fact, with Fennel, you're always working with branches even if you aren't explicitly creating or checking out branches. By default, Fennel creates a branch called "main" and all the client methods talk to this default branch if some other branch hasn't been checked out.

See this for more details about branches.

On This Page

Edit this Page on Github