Fennel has two main concepts -- datasets and featuresets. Let's look at both one by one
Dataset refers to a "table" of data with typed columns. Duh! Here is how a dataset is defined.
1@meta(owner="[email protected]") 2@dataset 3class UserDataset: 4 uid: int = field(key=True) 5 dob: datetime 6 country: str 7 update_time: datetime = field(timestamp=True)
This dataset has four columns --
uid (of type int),
dob (of type datetime),
country (of type string), and
signup_time (of type datetime). For now, ignore the
field(...) descriptors - they'd be explained soon.
How to get data into a dataset? It's possible to hydrate datasets from external data sources:
First we define the external sources by providing the required credentials:
1from fennel.sources import source, Postgres, Kafka 2 3postgres = Postgres(host=...<credentials>..) 4kafka = Kafka(...<credentials>..) 5
Then we define the datasets that will hydrate themselves from these sources:
1@meta(owner="[email protected]") 2@source( 3 postgres.table("user", cursor="update_timestamp"), every="1m", tier="prod" 4) 5@source(webhook.endpoint("User"), tier="dev") 6@dataset 7class User: 8 uid: int = field(key=True) 9 dob: datetime 10 country: str 11 signup_time: datetime = field(timestamp=True) 12 13 14@meta(owner="[email protected]") 15@source(kafka.topic("transactions"), tier="prod") 16@source(webhook.endpoint("Transaction"), tier="dev") 17@dataset 18class Transaction: 19 uid: int 20 amount: float 21 payment_country: str 22 merchant_id: int 23 timestamp: datetime = field(timestamp=True)
The first dataset will poll postgres table for new updates every minute and hydrate itself with new data. The second dataset hydrates itself from a kafka topic. Fennel supports connectors with all main sources - check here for details.
Hydrating datasets this way from external sources already looks somewhat cool because it allows you to bring data from multiple places in the same abstraction layer. But what to do with these datasets?
Fennel lets you derive new datasets from existing datasets by writing simple declarative Python code - it's unimaginatively called a pipeline. Let's look at one such pipeline:
1from fennel.datasets import pipeline, Dataset 2from fennel.lib.aggregate import Count, Sum 3from fennel.lib.window import Window 4from fennel.lib.schema import inputs 5 6 7@meta(owner="[email protected]") 8@dataset 9class UserTransactionsAbroad: 10 uid: int = field(key=True) 11 count: int 12 amount_1d: float 13 amount_1w: float 14 timestamp: datetime = field(timestamp=True) 15 16 @pipeline(version=1) 17 @inputs(User, Transaction) 18 def first_pipeline(cls, user: Dataset, transaction: Dataset): 19 joined = transaction.join(user, how="left", on=["uid"]) 20 abroad = joined.filter( 21 lambda df: df["country"] != df["payment_country"] 22 ) 23 return abroad.groupby("uid").aggregate( 24 [ 25 Count(window=Window("forever"), into_field="count"), 26 Sum(of="amount", window=Window("1d"), into_field="amount_1d"), 27 Sum(of="amount", window=Window("1d"), into_field="amount_1w"), 28 ] 29 )
This is a dataset that will keep rolling stats about transactions made by a user abroad and we want to derive it from
User dataset and
Transaction dataset. Line 8-17 define this pipeline. You'd note that this pipeline is written using native Python and Pandas so you can unleash the full power of Python. But more importantly, this pipeline is operating on two datasets, one of which is streaming (i.e.
Transaction ) and comes from Kafka and the other is static-ish dataset (i.e.
User) coming from Postgres. And you can do joins and aggregations across them both. Wow! Now this is beginning to look powerful. What else can you do with the datasets?
It's also possible to do low latency lookups on these datasets using dataset keys. Earlier you were asked to ignore the field descriptors -- it's time to revisit those. If you look carefully, line 3 above defines
uid to be a key (dataset can have multi-column keys too). If we know the uid of a user, we can ask this dataset for the value of the rest of the columns. Something (but not exactly) like this:
1uids = pd.Series([1, 2, 3, 4]) 2ts = pd.Series([now, now, now, now]) 3data, found = UserTransactionsAbroad.lookup(ts, uid=uids)
Here "found" is a boolean series denoting whether there was any row in the dataset with that key. If data isn't found, a row of Nones is returned. What's even cooler is that this method can be used to lookup the value as of any arbitrary time -- Fennel datasets track time evolution of data as the data evolves and can go back in time to do a lookup. This movement of data is tagged with whatever field is tagged with
field(timestamp=True). In fact, this ability to track time evolution enables Fennel to use the same code to generate both online and offline features.
Okay so we can define datasets, source them from external datasets, derive them via pipelines, and do complex temporal lookups on them. What has all this to do with features? How to write a feature in Fennel? Well, this is where we have to talk about the second main concept -- featureset
A featureset, as the name implies, is just a collection of features, each with some code that knows how to extract it - called an extractor. That may sound like a mouthful but isn't that complicated. Let's define a feature that computes user's age using the datasets we defined above.
Here is how a really simple featureset looks:
1from fennel.featuresets import feature, featureset, extractor 2from fennel.lib.schema import inputs, outputs 3 4 5@featureset 6class UserFeature: 7 uid: int = feature(id=1) 8 country: str = feature(id=2) 9 age: float = feature(id=3) 10 dob: datetime = feature(id=4) 11 12 @extractor 13 @inputs(uid) 14 @outputs(age) 15 def get_age(cls, ts: pd.Series, uids: pd.Series): 16 dobs = User.lookup(ts=ts, uid=uids, fields=["dob"]) 17 ages = [dob - datetime.now() for dob in dobs] 18 return pd.Series(ages) 19 20 @extractor 21 @inputs(uid) 22 @outputs(country) 23 def get_country(cls, ts: pd.Series, uids: pd.Series): 24 countries, _ = User.lookup(ts=ts, uid=uids, fields=["country"]) 25 return countries
This is a featureset with 3 features ---
age. Lines 7-11 describe an extractor that given the value of the feature
uid, knows how to define the feature
age (this input/output information is encoded in the typing signature, not function names). Inside the extractor function, you are welcome to do arbitrary Python computation. Similarly, lines 13-16 define another extractor function, this time which knows how to compute
country given the input
More crucially, these extractors are able to do lookup on
User dataset that we defined earlier to read the data computed by datasets. That's it - that's the basic anatomy of a featureset - one or more typed features with some extractors that know how to extract those features. These features extractors can recursively depend on other features (whether in the same featureset or across) and know how to compute the output features.
At this point, you may have questions about the relationship between featuresets and datasets and more specifically pipeline and extractor.
The main difference is that datasets are updated on the write path -- as the new data arrives, it goes to datasets from which it goes to other datasets via pipelines. All of this is happening asynchronously and results in data being stored in datasets. Features, however, are a purely 'read side' concept - feature is extracted while the request is waiting (this could be online feature serving request or offline training data generation request). Features can recursively depend on other features. And the bridge between them is
lookup functionality of the dataset.
Here is a diagram of how the concepts fit together:
This provides a relatively simplified bird's eye view of the main concepts. But there is more to both datasets and featuresets and how they come together. You can read in more detail about datasets here and about featuresets here.
Syncing Datasets and Features with Fennel
When you work with Fennel, your datasets and featuresets will live in a Python file in your codebase and Fennel servers will not know about them until you inform the servers by issuing a
sync call. Here is how it will look:
1from fennel.client import Client 2 3client = Client(<FENNEL SERVER URL>) 4client.sync( 5 datasets=[User, Transaction, UserTransactionsAbroad], 6 featuresets=[UserFeature], 7) 8
Line 4 here makes a POST request to Fennel and syncs the dataset on the server. Fennel may reject this sync request if there is any error with any dataset or featureset e.g. if a dataset already exists with this name or somehow this dataset is malformed.
Overtime, you'd have many more datasets and featuresets - you'd send all of them in a sync call. And with that, the validation can become lot more complex e.g schema compatibility validation across the whole graph of datasets/featuresets
Assuming the call succeeds, any datasets/featuresets that don't yet exist will be created, any datasets/featuresets that exist but are not provided in the sync call are deleted and rest are left unchanged. See the section on CI/CD to learn how the end to end deployment could work in a production environment
Datasets and Featuresets, despite being very simple abstraction, together pack a punch in terms of power. Here are a few topics to read next to learn more: