Getting Started
Quickstart
The following example tries to show how several concepts in Fennel come together to solve a problem.
0. Installation
We only need to install Fennel's Python client to run this example, so let's install that first:
1pip install fennel-ai
2
And while we are at it, let's add all the imports that we will need in the rest of the tutorial:
1from datetime import datetime, timedelta
2
3import pandas as pd
4import requests
5from typing import Optional
6
7from fennel.datasets import dataset, pipeline, field, Dataset
8from fennel.featuresets import feature, featureset, extractor
9from fennel.lib.aggregate import Count
10from fennel.lib.expectations import (
11 expectations,
12 expect_column_values_to_be_between,
13)
14from fennel.lib.metadata import meta
15from fennel.lib.schema import inputs, outputs
16from fennel.lib.window import Window
17from fennel.sources import source, Postgres, Snowflake, Kafka, Webhook
1. Data Connectors
Fennel ships with data connectors that know how to talk to all common data sources. The connectors can be defined in code or in Fennel console (not shown here).
1postgres = Postgres.get(name="my_rdbms")
2warehouse = Snowflake.get(name="my_warehouse")
3kafka = Kafka.get(name="my_kafka")
2. Datasets
Datasets are the tables that you want to use in your feature pipelines. These are constantly kept fresh as new data arrives from connectors.
1@dataset
2@source(postgres.table("product_info", cursor="last_modified"), every="1m")
3@meta(owner="[email protected]", tags=["PII"])
4class Product:
5 product_id: int = field(key=True)
6 seller_id: int
7 price: float
8 desc: Optional[str]
9 last_modified: datetime = field(timestamp=True)
10
11 # Powerful primitives like data expectations for data hygiene
12 @expectations
13 def get_expectations(cls):
14 return [
15 expect_column_values_to_be_between(
16 column="price", min_value=1, max_value=1e4, mostly=0.95
17 )
18 ]
19
20
21# ingesting realtime data from Kafka works exactly the same way
22@meta(owner="[email protected]")
23@source(kafka.topic("orders"), lateness="1h")
24@dataset
25class Order:
26 uid: int
27 product_id: int
28 timestamp: datetime
Fennel also lets you derive more datasets by defining pipelines that transform data across different sources (e.g. s3, kafka, postgres etc.) in the same plane of abstraction. These pipelines are highly declarative, completely Python native, realtime, versioned, are auto backfilled on declaration, and can be unit tested.
1@meta(owner="[email protected]")
2@dataset
3class UserSellerOrders:
4 uid: int = field(key=True)
5 seller_id: int = field(key=True)
6 num_orders_1d: int
7 num_orders_1w: int
8 timestamp: datetime
9
10 @pipeline(version=1)
11 @inputs(Order, Product)
12 def my_pipeline(cls, orders: Dataset, products: Dataset):
13 orders = orders.join(products, how="left", on=["product_id"])
14 orders = orders.transform(
15 lambda df: df[["uid", "seller_id", "timestamp"]].fillna(0),
16 schema={
17 "uid": int,
18 "seller_id": int,
19 "timestamp": datetime,
20 },
21 )
22
23 return orders.groupby("uid", "seller_id").aggregate(
24 [
25 Count(window=Window("1d"), into_field="num_orders_1d"),
26 Count(window=Window("1w"), into_field="num_orders_1w"),
27 ]
28 )
3. Featuresets
Featuresets are containers for the features that you want to extract from your datasets. Features, unlike datasets, have no state and are computed on the "read path" (i.e. when you query for them) via arbitrary Python code. Features are immutable to improve reliability.
1@meta(owner="[email protected]")
2@featureset
3class UserSellerFeatures:
4 uid: int = feature(id=1)
5 seller_id: int = feature(id=2)
6 num_orders_1d: int = feature(id=3)
7 num_orders_1w: int = feature(id=4)
8
9 @extractor(depends_on=[UserSellerOrders])
10 @inputs(uid, seller_id)
11 @outputs(num_orders_1d, num_orders_1w)
12 def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
13 df, found = UserSellerOrders.lookup(ts, seller_id=sellers, uid=uids)
14 df = df.fillna(0)
15 df["num_orders_1d"] = df["num_orders_1d"].astype(int)
16 df["num_orders_1w"] = df["num_orders_1w"].astype(int)
17 return df[["num_orders_1d", "num_orders_1w"]]
4. Sync
Once datasets/featuresets have been written (or updated), you can sync those definitions with the server by instantiating a client and using it to talk to server. Since we are not working with a real server, here we use the MockClient to run this example locally instead of a real client. Mock Client doesn't support data connectors so we will manually log some data to simulate data flows.
1from fennel.test_lib import MockClient
2
3webhook = Webhook(name="fennel_webhook")
4
5# client = Client('<FENNEL SERVER URL>') # uncomment this line to use a real Fennel server
6client = MockClient() # comment this line to use a real Fennel server
7fake_Product = Product.with_source(webhook.endpoint("Product"))
8fake_Order = Order.with_source(webhook.endpoint("Order"))
9client.sync(
10 datasets=[fake_Order, fake_Product, UserSellerOrders],
11 featuresets=[UserSellerFeatures],
12)
13
14now = datetime.utcnow()
15# create some product data
16columns = ["product_id", "seller_id", "price", "desc", "last_modified"]
17data = [
18 [1, 1, 10.0, "product 1", now],
19 [2, 2, 20.0, "product 2", now],
20 [3, 1, 30.0, "product 3", now],
21]
22df = pd.DataFrame(data, columns=columns)
23response = client.log("fennel_webhook", "Product", df)
24assert response.status_code == requests.codes.OK, response.json()
25
26columns = ["uid", "product_id", "timestamp"]
27data = [[1, 1, now], [1, 2, now], [1, 3, now]]
28df = pd.DataFrame(data, columns=columns)
29response = client.log("fennel_webhook", "Order", df)
30assert response.status_code == requests.codes.OK, response.json()
5. Query
This is the read path of Fennel. You can query for live features (i.e. features using the latest value of all datasets) like this:
1feature_df = client.extract_features(
2 output_feature_list=[
3 UserSellerFeatures.num_orders_1d,
4 UserSellerFeatures.num_orders_1w,
5 ],
6 input_feature_list=[
7 UserSellerFeatures.uid,
8 UserSellerFeatures.seller_id,
9 ],
10 input_dataframe=pd.DataFrame(
11 {
12 "UserSellerFeatures.uid": [1, 1],
13 "UserSellerFeatures.seller_id": [1, 2],
14 }
15 ),
16)
17assert feature_df.columns.tolist() == [
18 "UserSellerFeatures.num_orders_1d",
19 "UserSellerFeatures.num_orders_1w",
20]
21assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1]
22assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1]
You can also query for historical values of features at arbitrary timestamps ( useful in creating training datasets) like this:
1feature_df = client.extract_historical_features(
2 output_feature_list=[
3 UserSellerFeatures.num_orders_1d,
4 UserSellerFeatures.num_orders_1w,
5 ],
6 input_feature_list=[
7 UserSellerFeatures.uid,
8 UserSellerFeatures.seller_id,
9 ],
10 timestamp_column="timestamps",
11 format="pandas",
12 input_dataframe=pd.DataFrame(
13 {
14 "UserSellerFeatures.uid": [1, 1, 1, 1],
15 "UserSellerFeatures.seller_id": [1, 2, 1, 2],
16 "timestamps": [
17 now,
18 now,
19 now - timedelta(days=1),
20 now - timedelta(days=1),
21 ],
22 }
23 ),
24)
25assert feature_df.columns.tolist() == [
26 "UserSellerFeatures.num_orders_1d",
27 "UserSellerFeatures.num_orders_1w",
28 "timestamps",
29]
30assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1, 0, 0]
31assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1, 0, 0]
Query requests can be made over REST API from any language/tool which makes it easy to ship features to production servers.