Getting Started
Quickstart
The following example tries to show how several concepts in Fennel come together to solve a problem.
0. Installation
We only need to install Fennel's Python client to run this example, so let's install that first:
1pip install fennel-ai
bash
And while we are at it, let's add all the imports that we will need in the rest of the tutorial:
1from datetime import datetime, timedelta, timezone
2from typing import Optional
3
4import pandas as pd
5
6from fennel.connectors import source, Postgres, Kafka, Webhook
7from fennel.datasets import dataset, pipeline, field, Dataset, Count
8from fennel.dtypes import Continuous
9from fennel.featuresets import featureset, extractor
10from fennel.lib import expectations, expect_column_values_to_be_between
11from fennel.lib import inputs, outputs
12from fennel.testing import MockClient, log
13
14__owner__ = "[email protected]"
python
1. Data Connectors
Fennel ships with data connectors that know how to talk to all common data sources. The connectors can be defined in code or in Fennel console (not shown here).
1postgres = Postgres.get(name="my_rdbms")
2kafka = Kafka.get(name="my_kafka")
3webhook = Webhook(name="fennel_webhook")
python
2. Datasets
Datasets are the tables that you want to use in your feature pipelines. These are constantly kept fresh as new data arrives from connectors.
1table = postgres.table("product", cursor="last_modified")
2
3
4@source(table, disorder="1d", cdc="upsert", every="1m")
5@dataset(index=True)
6class Product:
7 product_id: int = field(key=True)
8 seller_id: int
9 price: float
10 desc: Optional[str]
11 last_modified: datetime = field(timestamp=True)
12
13 # Powerful primitives like data expectations for data hygiene
14 @expectations
15 def get_expectations(cls):
16 return [
17 expect_column_values_to_be_between(
18 column="price", min_value=1, max_value=1e4, mostly=0.95
19 )
20 ]
21
22
23# ingesting realtime data from Kafka works exactly the same way
24@source(kafka.topic("orders"), disorder="1h", cdc="append")
25@dataset
26class Order:
27 uid: int
28 product_id: int
29 timestamp: datetime
python
Fennel also lets you derive more datasets by defining pipelines that transform data across different sources (e.g. s3, kafka, postgres etc.) in the same plane of abstraction. These pipelines are highly declarative, completely Python native, realtime, versioned, are auto backfilled on declaration, and can be unit tested.
1@dataset(index=True, version=1)
2class UserSellerOrders:
3 uid: int = field(key=True)
4 seller_id: int = field(key=True)
5 num_orders_1d: int
6 num_orders_1w: int
7 timestamp: datetime
8
9 @pipeline
10 @inputs(Order, Product)
11 def my_pipeline(cls, orders: Dataset, products: Dataset):
12 orders = orders.join(products, how="left", on=["product_id"])
13 orders = orders.transform(lambda df: df.fillna(0))
14 orders = orders.drop("product_id", "desc", "price")
15 orders = orders.dropnull()
16 return orders.groupby("uid", "seller_id").aggregate(
17 num_orders_1d=Count(window=Continuous("1d")),
18 num_orders_1w=Count(window=Continuous("1w")),
19 )
python
3. Featuresets
Featuresets are containers for the features that you want to extract from your datasets. Features, unlike datasets, have no state and are computed on the "read path" (i.e. when you query for them) via arbitrary Python code. Features are immutable to improve reliability.
1@featureset
2class UserSellerFeatures:
3 uid: int
4 seller_id: int
5 num_orders_1d: int
6 num_orders_1w: int
7
8 @extractor(deps=[UserSellerOrders])
9 @inputs("uid", "seller_id")
10 @outputs("num_orders_1d", "num_orders_1w")
11 def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
12 df, found = UserSellerOrders.lookup(ts, seller_id=sellers, uid=uids)
13 df = df.fillna(0)
14 df["num_orders_1d"] = df["num_orders_1d"].astype(int)
15 df["num_orders_1w"] = df["num_orders_1w"].astype(int)
16 return df[["num_orders_1d", "num_orders_1w"]]
python
4. Commit
Once datasets/featuresets have been written (or updated), you can commit those definitions by instantiating a client and using it to talk to the server. Since we are not working with a real server, here we use the MockClient to run this example locally.
1# client = Client('<FENNEL SERVER URL>') # uncomment this to use real Fennel server
2client = MockClient() # comment this line to use a real Fennel server
3client.commit(
4 message="initial commit",
5 datasets=[Order, Product, UserSellerOrders],
6 featuresets=[UserSellerFeatures],
7)
python
Mock Client doesn't support data connectors so we will manually log some data to simulate data flows.
1# create some product data
2now = datetime.now(timezone.utc)
3columns = ["product_id", "seller_id", "price", "desc", "last_modified"]
4data = [
5 [1, 1, 10.0, "product 1", now],
6 [2, 2, 20.0, "product 2", now],
7 [3, 1, 30.0, "product 3", now],
8]
9df = pd.DataFrame(data, columns=columns)
10log(Product, df)
11
12columns = ["uid", "product_id", "timestamp"]
13data = [[1, 1, now], [1, 2, now], [1, 3, now]]
14df = pd.DataFrame(data, columns=columns)
15log(Order, df)
python
5. Query
This is the read path of Fennel. You can query for live features (i.e. features using the latest value of all datasets) like this:
1feature_df = client.query(
2 outputs=[
3 UserSellerFeatures.num_orders_1d,
4 UserSellerFeatures.num_orders_1w,
5 ],
6 inputs=[
7 UserSellerFeatures.uid,
8 UserSellerFeatures.seller_id,
9 ],
10 input_dataframe=pd.DataFrame(
11 [[1, 1], [1, 2]],
12 columns=["UserSellerFeatures.uid", "UserSellerFeatures.seller_id"],
13 ),
14)
15
16assert feature_df.columns.tolist() == [
17 "UserSellerFeatures.num_orders_1d",
18 "UserSellerFeatures.num_orders_1w",
19]
20assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1]
21assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1]
python
You can also query for historical values of features at arbitrary timestamps ( often used while creating training datasets or for offline batch inference) like this:
1day = timedelta(days=1)
2
3feature_df = client.query_offline(
4 outputs=[
5 UserSellerFeatures.num_orders_1d,
6 UserSellerFeatures.num_orders_1w,
7 ],
8 inputs=[
9 UserSellerFeatures.uid,
10 UserSellerFeatures.seller_id,
11 ],
12 timestamp_column="timestamps",
13 input_dataframe=pd.DataFrame(
14 [[1, 1, now], [1, 2, now], [1, 1, now - day], [1, 2, now - day]],
15 columns=[
16 "UserSellerFeatures.uid",
17 "UserSellerFeatures.seller_id",
18 "timestamps",
19 ],
20 ),
21)
22
23assert feature_df.columns.tolist() == [
24 "UserSellerFeatures.num_orders_1d",
25 "UserSellerFeatures.num_orders_1w",
26 "timestamps",
27]
28assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1, 0, 0]
29assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1, 0, 0]
python
Query requests can be made over REST API from any language/tool which makes it easy to ship features to production servers.