Concepts
Introduction
Fennel has two core concepts - datasets and featuresets. Let's look at both one by one.
Dataset
Dataset refers to a "table" of data with typed columns. Here is how a dataset is defined.
1@dataset
2class UserDataset:
3 uid: int = field(key=True)
4 dob: datetime
5 country: str
6 update_time: datetime = field(timestamp=True)
python
This dataset has four columns -- uid
(of type int), dob
(of type datetime),
country
(of type string), and signup_time
(of type datetime). For now,
ignore the field(...)
descriptors - they'd be explained soon.
Source
Fennel datasets can be hydrated from external data sources. To do that, first we define the external sources by providing the required credentials:
1from fennel.connectors import source, Postgres, Kafka
2
3postgres = Postgres(host=... < credentials >..)
4kafka = Kafka(... < credentials >..)
python
Then we define the datasets that will hydrate themselves from these sources:
1user_table = postgres.table("user", cursor="signup_at")
2
3@source(user_table, every="1m", disorder="7d", cdc="upsert")
4@dataset(index=True)
5class User:
6 uid: int = field(key=True)
7 dob: datetime
8 country: str
9 signup_at: datetime = field(timestamp=True)
10
11@source(kafka.topic("txn"), disorder="1d", cdc="append")
12@dataset
13class Transaction:
14 uid: int
15 amount: float
16 payment_country: str
17 merchant_id: int
18 timestamp: datetime = field(timestamp=True)
python
The first dataset will poll postgres table for new updates every minute and
hydrate itself with new data. The second dataset hydrates itself from a kafka
topic. There are few more kwargs set here like disorder
and cdc
- ignore
them for now - though if you are interested, you can read about them and sources
in general here.
Besides Postgres and Kafka, Fennel supports connectors with many other sources. See the full list.
Pipeline
Once you have one or more "sourced" datasets, you can derive new datasets from existing datasets by writing simple declarative Python code - it's unimaginatively called a pipeline. Let's look at one such pipeline:
1@dataset(index=True)
2class UserTransactionsAbroad:
3 uid: int = field(key=True)
4 count: int
5 amount_1d: float
6 amount_1w: float
7 timestamp: datetime = field(timestamp=True)
8
9 @pipeline
10 @inputs(User, Transaction)
11 def first_pipeline(cls, user: Dataset, transaction: Dataset):
12 joined = transaction.join(user, how="left", on=["uid"])
13 abroad = joined.filter(
14 lambda df: df["country"] != df["payment_country"]
15 )
16 return abroad.groupby("uid").aggregate(
17 count=Count(window=Continuous("forever")),
18 amount_1d=Sum(of="amount", window=Continuous("1d")),
19 amount_1w=Sum(of="amount", window=Continuous("1w")),
20 )
python
Index
It's also possible to do low latency lookups on Fennel datasets using dataset
keys. Earlier you were asked to ignore the field descriptors. It's time to revisit
those. If you look carefully, line with field(key=True)
above defines uid
to be a key - similar to primary keys in databases. Note that Fennel datasets
can have multi-column keys too.
Keyed datasets can be "indexed" by setting index=True
in the dataset
decorator - this tells Fennel to build some auxiliary data structure to look up
the value of a row with a given uid (or more generally the value of all the key
fields).
1uids = pd.Series([1, 2, 3, 4])
2ts = pd.Series([now, now, now, now])
3data, found = UserTransactionsAbroad.lookup(ts, uid=uids)
python
Here "found" is a boolean series denoting whether there was any row in the
dataset with that key. If data isn't found, a row of Nones is returned. What's
even cooler is that this method can be used to lookup the value as of any
arbitrary time (via the ts
argument). Fennel datasets track time evolution of
data as the data evolves and can go back in time to do a lookup. This movement
of data is tagged with whatever field is tagged with field(timestamp=True)
. In
fact, this ability to track time evolution enables Fennel to use the same code
to generate both online and offline features.
Featureset
So we can define datasets, source them from external datasets, derive them via pipelines, and do temporal primary key lookups on them via indices. What has all this to do with features? How to write a feature in Fennel? Well, this is where we have to talk about the second main concept - featureset.
A featureset, as the name implies, is just a collection of features. Features in Fennel, however, are different from features in many other systems - they don't represent stored data but instead are backed by a stateless Python function that knows how to extract it - called an extractor. Let's define a feature that computes user's age using the datasets we defined above.
Here is how a simple featureset looks:
1@featureset
2class UserFeature:
3 uid: int
4 country: str
5 age: float
6
7 @extractor(deps=[User])
8 @inputs("uid")
9 @outputs("age")
10 def get_age(cls, ts: pd.Series, uids: pd.Series):
11 df, _ = User.lookup(ts=ts, uid=uids, fields=["dob"])
12 df.fillna(datetime(1970, 1, 1), inplace=True)
13 age = (ts - df["dob"]).dt.days / 365 # age is calculated as of `ts`
14 return pd.DataFrame(age, columns=["age"])
15
16
17 @extractor(deps=[User])
18 @inputs("uid")
19 @outputs("country")
20 def get_country(cls, ts: pd.Series, uids: pd.Series):
21 countries, _ = User.lookup(ts=ts, uid=uids, fields=["country"])
22 countries = countries.fillna("unknown")
23 return countries
python
This is a featureset with 3 features --- uid
, country
, and age
. Highlighted
lines describe an extractor that given the value of the feature uid
, knows how
to define the feature age
. Inside the extractor function, you are welcome to
do arbitrary Python computation. This featureset has
another extractor function - this one knows how to compute country
given
the input uid
.
More crucially, these extractors are able to do lookup on User
dataset that
we defined earlier to read the data computed by datasets.
Datasets & featuresets have a complimentary relationship. Datasets are updated on the write path -- as the new data arrives, it goes to datasets from which it goes to other datasets via pipelines. All of this is happening asynchronously and results in data being stored in datasets.
Features, however, are a purely 'read side' concept - feature is extracted while
the request is waiting (this could be online feature serving request or offline
training data generation request). And the bridge between them is lookup
functionality of the dataset.
Committing Datasets and Features
When you work with Fennel, your dataset and featureset definitions will live in a Python file in your codebase or a notebook - but the Fennel servers won't know about them until you explicitly tell them.
To communicate with Fennel server, you'd typically create a Client
object:
1from fennel.client import Client
2
3client = Client(
4 os.environ["FENNEL_SERVER_URL"], token=os.environ["FENNEL_TOKEN"]
5)
python
And once you have a client object, you'd issue a commit
request to commit your
dataset/featureset definitions with the server:
1client.commit(
2 message="user: add transaction datasets; first few features",
3 datasets=[User, Transaction, UserTransactionsAbroad],
4 featuresets=[UserFeature],
5)
python
This makes a POST request to Fennel and commits the dataset on the server. Fennel may reject this commit request if there is any error with any dataset or featureset e.g. if a dataset already exists with this name or somehow this dataset is malformed.
Overtime, you'd have many more datasets and featuresets - you'd send all of them in a commit call. And with that, the validation can become lot more complex e.g schema compatibility validation across the whole graph of datasets/featuresets.
Assuming the call succeeds, any datasets/featuresets that don't yet exist will be created, any datasets/featuresets that exist but are not provided in the commit are deleted and rest are left unchanged.
Feature Extraction Requests
Once a few datasets/featuresets have been defined, you'd want to read the value
of these features for particular inputs (say uids). That can be accomplished via
extract
API which reads the 'latest' value of features or extract_historical
API which reads historical values of features for training data generation
purposes. Here is how they look:
1feature_df = client.query(
2 outputs=[
3 UserFeature.age,
4 UserFeature.country,
5 ],
6 inputs=[
7 UserFeature.uid,
8 ],
9 input_dataframe=pd.DataFrame({"UserFeature.uid": [1, 3]}),
10)
python
Here, we are trying to read two features of the UserFeature
featureset - age
and country
(as defined in outputs
). We are providing the value of one
feature uid
(as defined in inputs
) and the actual values of uid
are
provided in input_dataframe
object. Note that even though this snippet shows
Python, this is a REST endpoint and can be queried via any other language as well.
There is an analogous function to get historical values of features called
extract_historical
:
1feature_df = client.query_offline(
2 outputs=[
3 UserFeature.age,
4 UserFeature.country,
5 ],
6 inputs=[
7 UserFeature.uid,
8 ],
9 input_dataframe=pd.DataFrame(
10 {
11 "UserFeature.uid": [1, 3],
12 "timestamp": [
13 datetime.now(timezone.utc),
14 datetime.now(timezone.utc) - timedelta(days=1),
15 ],
16 }
17 ),
18 timestamp_column="timestamp",
19)
python
This is almost identical as before except we are also passing row-level timestamps
as of which we want features to be extracted. extract_historical
is the mechanism
to generate point-in-time correct training datasets or do large scale batch inference.
Branches
Production machine learning is all about experimentation. You'd soon find yourself wanting to change something about these datasets/features and/or add new ones in an experimental way.
To support that, Fennel supports a git-inspired branch model of development. You can create new empty branches or clone existing branches - and after that, you can make changes to branches independently without affecting other branches.
1client.init_branch("dev")
2client.commit(
3 message="some module: some git like commit message",
4 datasets=[SomeDataset],
5 featuresets=[SomeFeatureset],
6)
7
8client.query(
9 outputs=[SomeFeatureset.country],
10 inputs=[SomeFeatureset.uid],
11 input_dataframe=pd.DataFrame({"SomeFeatureset.uid": [1, 2]}),
12)
python
In the above example, we created an empty branch called dev
, made a commit
on this newly created dev
branch, and finally, issued a query against this branch.
In fact, with Fennel, you're always working with branches even if you aren't explicitly creating or checking out branches. By default, Fennel creates a branch called "main" and all the client methods talk to this default branch if some other branch hasn't been checked out.
See this for more details about branches.