Docs
API

Getting Started

Quickstart

The following example tries to show how several concepts in Fennel come together to solve a problem.

0. Installation

We only need to install Fennel's Python client to run this example, so let's install that first:

1pip install fennel-ai

bash

And while we are at it, let's add all the imports that we will need in the rest of the tutorial:

1from datetime import datetime, timedelta, timezone
2from typing import Optional
3
4import pandas as pd
5
6from fennel.connectors import source, Postgres, Kafka, Webhook
7from fennel.datasets import dataset, pipeline, field, Dataset, Count
8from fennel.dtypes import Continuous
9from fennel.featuresets import featureset, extractor
10from fennel.lib import expectations, expect_column_values_to_be_between
11from fennel.lib import inputs, outputs
12from fennel.testing import MockClient, log
13
14__owner__ = "[email protected]"

python

1. Data Connectors

Fennel ships with data connectors that know how to talk to all common data sources. The connectors can be defined in code or in Fennel console (not shown here).

1postgres = Postgres.get(name="my_rdbms")
2kafka = Kafka.get(name="my_kafka")
3webhook = Webhook(name="fennel_webhook")

python

2. Datasets

Datasets are the tables that you want to use in your feature pipelines. These are constantly kept fresh as new data arrives from connectors.

1table = postgres.table("product", cursor="last_modified")
2
3
4@source(table, disorder="1d", cdc="upsert", every="1m")
5@dataset(index=True)
6class Product:
7    product_id: int = field(key=True)
8    seller_id: int
9    price: float
10    desc: Optional[str]
11    last_modified: datetime = field(timestamp=True)
12
13    # Powerful primitives like data expectations for data hygiene
14    @expectations
15    def get_expectations(cls):
16        return [
17            expect_column_values_to_be_between(
18                column="price", min_value=1, max_value=1e4, mostly=0.95
19            )
20        ]
21
22
23# ingesting realtime data from Kafka works exactly the same way
24@source(kafka.topic("orders"), disorder="1h", cdc="append")
25@dataset
26class Order:
27    uid: int
28    product_id: int
29    timestamp: datetime

python

Fennel also lets you derive more datasets by defining pipelines that transform data across different sources (e.g. s3, kafka, postgres etc.) in the same plane of abstraction. These pipelines are highly declarative, completely Python native, realtime, versioned, are auto backfilled on declaration, and can be unit tested.

1@dataset(index=True, version=1)
2class UserSellerOrders:
3    uid: int = field(key=True)
4    seller_id: int = field(key=True)
5    num_orders_1d: int
6    num_orders_1w: int
7    timestamp: datetime
8
9    @pipeline
10    @inputs(Order, Product)
11    def my_pipeline(cls, orders: Dataset, products: Dataset):
12        orders = orders.join(products, how="left", on=["product_id"])
13        orders = orders.transform(lambda df: df.fillna(0))
14        orders = orders.drop("product_id", "desc", "price")
15        orders = orders.dropnull()
16        return orders.groupby("uid", "seller_id").aggregate(
17            num_orders_1d=Count(window=Continuous("1d")),
18            num_orders_1w=Count(window=Continuous("1w")),
19        )

python

3. Featuresets

Featuresets are containers for the features that you want to extract from your datasets. Features, unlike datasets, have no state and are computed on the "read path" (i.e. when you query for them) via arbitrary Python code. Features are immutable to improve reliability.

1@featureset
2class UserSellerFeatures:
3    uid: int
4    seller_id: int
5    num_orders_1d: int
6    num_orders_1w: int
7
8    @extractor(deps=[UserSellerOrders])
9    @inputs("uid", "seller_id")
10    @outputs("num_orders_1d", "num_orders_1w")
11    def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
12        df, found = UserSellerOrders.lookup(ts, seller_id=sellers, uid=uids)
13        df = df.fillna(0)
14        df["num_orders_1d"] = df["num_orders_1d"].astype(int)
15        df["num_orders_1w"] = df["num_orders_1w"].astype(int)
16        return df[["num_orders_1d", "num_orders_1w"]]

python

4. Commit

Once datasets/featuresets have been written (or updated), you can commit those definitions by instantiating a client and using it to talk to the server. Since we are not working with a real server, here we use the MockClient to run this example locally.

1# client = Client('<FENNEL SERVER URL>') # uncomment this to use real Fennel server
2client = MockClient()  # comment this line to use a real Fennel server
3client.commit(
4    message="initial commit",
5    datasets=[Order, Product, UserSellerOrders],
6    featuresets=[UserSellerFeatures],
7)

python

Mock Client doesn't support data connectors so we will manually log some data to simulate data flows.

1# create some product data
2now = datetime.now(timezone.utc)
3columns = ["product_id", "seller_id", "price", "desc", "last_modified"]
4data = [
5    [1, 1, 10.0, "product 1", now],
6    [2, 2, 20.0, "product 2", now],
7    [3, 1, 30.0, "product 3", now],
8]
9df = pd.DataFrame(data, columns=columns)
10log(Product, df)
11
12columns = ["uid", "product_id", "timestamp"]
13data = [[1, 1, now], [1, 2, now], [1, 3, now]]
14df = pd.DataFrame(data, columns=columns)
15log(Order, df)

python

5. Query

This is the read path of Fennel. You can query for live features (i.e. features using the latest value of all datasets) like this:

1feature_df = client.query(
2    outputs=[
3        UserSellerFeatures.num_orders_1d,
4        UserSellerFeatures.num_orders_1w,
5    ],
6    inputs=[
7        UserSellerFeatures.uid,
8        UserSellerFeatures.seller_id,
9    ],
10    input_dataframe=pd.DataFrame(
11        [[1, 1], [1, 2]],
12        columns=["UserSellerFeatures.uid", "UserSellerFeatures.seller_id"],
13    ),
14)
15
16assert feature_df.columns.tolist() == [
17    "UserSellerFeatures.num_orders_1d",
18    "UserSellerFeatures.num_orders_1w",
19]
20assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1]
21assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1]

python

You can also query for historical values of features at arbitrary timestamps ( often used while creating training datasets or for offline batch inference) like this:

1day = timedelta(days=1)
2
3feature_df = client.query_offline(
4    outputs=[
5        UserSellerFeatures.num_orders_1d,
6        UserSellerFeatures.num_orders_1w,
7    ],
8    inputs=[
9        UserSellerFeatures.uid,
10        UserSellerFeatures.seller_id,
11    ],
12    timestamp_column="timestamps",
13    input_dataframe=pd.DataFrame(
14        [[1, 1, now], [1, 2, now], [1, 1, now - day], [1, 2, now - day]],
15        columns=[
16            "UserSellerFeatures.uid",
17            "UserSellerFeatures.seller_id",
18            "timestamps",
19        ],
20    ),
21)
22
23assert feature_df.columns.tolist() == [
24    "UserSellerFeatures.num_orders_1d",
25    "UserSellerFeatures.num_orders_1w",
26    "timestamps",
27]
28assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1, 0, 0]
29assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1, 0, 0]

python

Query requests can be made over REST API from any language/tool which makes it easy to ship features to production servers.

On This Page

Edit this Page on Github