Getting Started

Quickstart

The following example tries to show how several concepts in Fennel come together to solve a problem.

0. Installation

We only need to install Fennel's Python client to run this example, so let's install that first:

1pip install fennel-ai

bash

And while we are at it, let's add all the imports that we will need in the rest of the tutorial:

1from datetime import datetime, timedelta
2from typing import Optional
3
4import pandas as pd
5import requests
6
7from fennel.datasets import dataset, pipeline, field, Dataset
8from fennel.featuresets import feature, featureset, extractor
9from fennel.lib.aggregate import Count
10from fennel.lib.expectations import (
11    expectations,
12    expect_column_values_to_be_between,
13)
14from fennel.lib.metadata import meta
15from fennel.lib.schema import inputs, outputs
16from fennel.sources import source, Postgres, Snowflake, Kafka, Webhook

python

1. Data Connectors

Fennel ships with data connectors that know how to talk to all common data sources. The connectors can be defined in code or in Fennel console (not shown here).

1postgres = Postgres.get(name="my_rdbms")
2warehouse = Snowflake.get(name="my_warehouse")
3kafka = Kafka.get(name="my_kafka")
4webhook = Webhook(name="fennel_webhook")

python

2. Datasets

Datasets are the tables that you want to use in your feature pipelines. These are constantly kept fresh as new data arrives from connectors.

1@dataset
2@source(postgres.table("product", cursor="updated"), every="1m", tier="prod")
3@source(webhook.endpoint("Product"), tier="dev")
4@meta(owner="[email protected]", tags=["PII"])
5class Product:
6    product_id: int = field(key=True)
7    seller_id: int
8    price: float
9    desc: Optional[str]
10    last_modified: datetime = field(timestamp=True)
11
12    # Powerful primitives like data expectations for data hygiene
13    @expectations
14    def get_expectations(cls):
15        return [
16            expect_column_values_to_be_between(
17                column="price", min_value=1, max_value=1e4, mostly=0.95
18            )
19        ]
20
21
22# ingesting realtime data from Kafka works exactly the same way
23@meta(owner="[email protected]")
24@source(kafka.topic("orders"), disorder="1h", tier="prod")
25@source(webhook.endpoint("Order"), tier="dev")
26@dataset
27class Order:
28    uid: int
29    product_id: int
30    timestamp: datetime

python

Fennel also lets you derive more datasets by defining pipelines that transform data across different sources (e.g. s3, kafka, postgres etc.) in the same plane of abstraction. These pipelines are highly declarative, completely Python native, realtime, versioned, are auto backfilled on declaration, and can be unit tested.

1@meta(owner="[email protected]")
2@dataset
3class UserSellerOrders:
4    uid: int = field(key=True)
5    seller_id: int = field(key=True)
6    num_orders_1d: int
7    num_orders_1w: int
8    timestamp: datetime
9
10    @pipeline(version=1)
11    @inputs(Order, Product)
12    def my_pipeline(cls, orders: Dataset, products: Dataset):
13        orders = orders.join(products, how="left", on=["product_id"])
14        orders = orders.transform(lambda df: df.fillna(0))
15        orders = orders.drop("product_id", "desc", "price")
16        orders = orders.dropnull()
17        return orders.groupby("uid", "seller_id").aggregate(
18            Count(window="1d", into_field="num_orders_1d"),
19            Count(window="1w", into_field="num_orders_1w"),
20        )

python

3. Featuresets

Featuresets are containers for the features that you want to extract from your datasets. Features, unlike datasets, have no state and are computed on the "read path" (i.e. when you query for them) via arbitrary Python code. Features are immutable to improve reliability.

1@meta(owner="[email protected]")
2@featureset
3class UserSellerFeatures:
4    uid: int = feature(id=1)
5    seller_id: int = feature(id=2)
6    num_orders_1d: int = feature(id=3)
7    num_orders_1w: int = feature(id=4)
8
9    @extractor(depends_on=[UserSellerOrders])
10    @inputs(uid, seller_id)
11    @outputs(num_orders_1d, num_orders_1w)
12    def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
13        df, found = UserSellerOrders.lookup(ts, seller_id=sellers, uid=uids)
14        df = df.fillna(0)
15        df["num_orders_1d"] = df["num_orders_1d"].astype(int)
16        df["num_orders_1w"] = df["num_orders_1w"].astype(int)
17        return df[["num_orders_1d", "num_orders_1w"]]

python

4. Sync

Once datasets/featuresets have been written (or updated), you can sync those definitions with the server by instantiating a client and using it to talk to server. Since we are not working with a real server, here we use the MockClient to run this example locally instead of a real client. Mock Client doesn't support data connectors so we will manually log some data to simulate data flows.

1from fennel.test_lib import MockClient
2
3
4# client = Client('<FENNEL SERVER URL>') # uncomment this line to use a real Fennel server
5client = MockClient()  # comment this line to use a real Fennel server
6client.sync(
7    datasets=[Order, Product, UserSellerOrders],
8    featuresets=[UserSellerFeatures],
9    tier="dev",
10)
11
12now = datetime.utcnow()
13# create some product data
14columns = ["product_id", "seller_id", "price", "desc", "last_modified"]
15data = [
16    [1, 1, 10.0, "product 1", now],
17    [2, 2, 20.0, "product 2", now],
18    [3, 1, 30.0, "product 3", now],
19]
20df = pd.DataFrame(data, columns=columns)
21response = client.log("fennel_webhook", "Product", df)
22assert response.status_code == requests.codes.OK, response.json()
23
24columns = ["uid", "product_id", "timestamp"]
25data = [[1, 1, now], [1, 2, now], [1, 3, now]]
26df = pd.DataFrame(data, columns=columns)
27response = client.log("fennel_webhook", "Order", df)
28assert response.status_code == requests.codes.OK, response.json()

python

5. Query

This is the read path of Fennel. You can query for live features (i.e. features using the latest value of all datasets) like this:

1feature_df = client.extract(
2    outputs=[
3        UserSellerFeatures.num_orders_1d,
4        UserSellerFeatures.num_orders_1w,
5    ],
6    inputs=[
7        UserSellerFeatures.uid,
8        UserSellerFeatures.seller_id,
9    ],
10    input_dataframe=pd.DataFrame(
11        {
12            "UserSellerFeatures.uid": [1, 1],
13            "UserSellerFeatures.seller_id": [1, 2],
14        }
15    ),
16)
17assert feature_df.columns.tolist() == [
18    "UserSellerFeatures.num_orders_1d",
19    "UserSellerFeatures.num_orders_1w",
20]
21assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1]
22assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1]

python

You can also query for historical values of features at arbitrary timestamps ( useful in creating training datasets) like this:

1feature_df = client.extract_historical(
2    outputs=[
3        UserSellerFeatures.num_orders_1d,
4        UserSellerFeatures.num_orders_1w,
5    ],
6    inputs=[
7        UserSellerFeatures.uid,
8        UserSellerFeatures.seller_id,
9    ],
10    timestamp_column="timestamps",
11    format="pandas",
12    input_dataframe=pd.DataFrame(
13        {
14            "UserSellerFeatures.uid": [1, 1, 1, 1],
15            "UserSellerFeatures.seller_id": [1, 2, 1, 2],
16            "timestamps": [
17                now,
18                now,
19                now - timedelta(days=1),
20                now - timedelta(days=1),
21            ],
22        }
23    ),
24)
25assert feature_df.columns.tolist() == [
26    "UserSellerFeatures.num_orders_1d",
27    "UserSellerFeatures.num_orders_1w",
28    "timestamps",
29]
30assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1, 0, 0]
31assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1, 0, 0]

python

Query requests can be made over REST API from any language/tool which makes it easy to ship features to production servers.

On This Page

Edit this Page on Github