Documentation

Development Workflow

Unit Tests

Fennel's Python client ships with an (inefficient) mock server inside it - this makes it possible to do local development and run unit tests against the mock server to verify correctness. This works even if you don't have any remote Fennel server - heck it works even if you don't have internet.

This mock server has near parity with the actual server with one notable exception - it doesn't support data connectors to external data systems (after all, it is completely local with zero remote dependencies!)

Example

Let's first see how it will work and later we will see a fully functional unit test example.

1from fennel.test_lib import mock
2
3
4class TestDataset(unittest.TestCase):
5    @mock
6    def test_dataset(self, client):
7        # client talks to the mock server
8        # ... do any setup
9        # Sync the dataset
10        client.sync(datasets=[User])
11        # ... some other stuff
12        client.log("fennel_webhook", 'User', pd.Dataframe(...))
13        # ... some other stuff
14        found = client.extract_features(...)
15        self.assertEqual(found, expected)    
16

Here we imported mock_client from the test_lib. This is a decorator which can be used to decorate test functions - and the decorator supplies an extra argument called client to the test. Once the client object reaches the body of the test, you can do all operations that are typically done on a real client - you can sync datasets/featuresets, log data, extract features etc.

Since external data integration doesn't work in mock server, the only way to bring data to a dataset in the mock server is by explicitly logging data to it.

Testing Datasets

For testing Datasets, you can use the client.log to add some local data to a dataset and then query this or other downstream datasets using the .lookup API. Here is an end to end example. Suppose our regular non-test code looks like this:

unit_tests.py
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib.aggregate import Count, Sum, Average
3from fennel.lib.includes import includes
4from fennel.lib.metadata import meta
5from fennel.lib.schema import inputs, outputs
6from fennel.lib.window import Window
7from fennel.sources import source, Webhook
8
9webhook = Webhook(name="fennel_webhook")
10
11
12@meta(owner="[email protected]")
13@source(webhook.endpoint("RatingActivity"))
14@dataset
15class RatingActivity:
16    userid: int
17    rating: float
18    movie: str
19    t: datetime
20
21
22@meta(owner="[email protected]")
23@dataset
24class MovieRating:
25    movie: str = field(key=True)
26    rating: float
27    num_ratings: int
28    sum_ratings: float
29    t: datetime
30
31    @pipeline(version=1)
32    @inputs(RatingActivity)
33    def pipeline_aggregate(cls, activity: Dataset):
34        return activity.groupby("movie").aggregate(
35            [
36                Count(window=Window("7d"), into_field="num_ratings"),
37                Sum(
38                    window=Window("28d"), of="rating", into_field="sum_ratings"
39                ),
40                Average(window=Window("12h"), of="rating", into_field="rating"),
41            ]
42        )

And you want to test that data reaching RatingActivity dataset correctly propagates to MovieRating dataset via the pipeline. You could write the following unit test to do so:

1import unittest
2
3from fennel.test_lib import mock
4
5
6class TestDataset(unittest.TestCase):
7    @mock
8    def test_dataset(self, client):
9        # Sync the dataset
10        client.sync(
11            datasets=[MovieRating, RatingActivity],
12        )
13        now = datetime.now()
14        one_hour_ago = now - timedelta(hours=1)
15        two_hours_ago = now - timedelta(hours=2)
16        three_hours_ago = now - timedelta(hours=3)
17        four_hours_ago = now - timedelta(hours=4)
18        five_hours_ago = now - timedelta(hours=5)
19
20        data = [
21            [18231, 2, "Jumanji", five_hours_ago],
22            [18231, 3, "Jumanji", four_hours_ago],
23            [18231, 2, "Jumanji", three_hours_ago],
24            [18231, 5, "Jumanji", five_hours_ago],
25            [18231, 4, "Titanic", three_hours_ago],
26            [18231, 3, "Titanic", two_hours_ago],
27            [18231, 5, "Titanic", one_hour_ago],
28            [18231, 5, "Titanic", now - timedelta(minutes=1)],
29            [18231, 3, "Titanic", two_hours_ago],
30        ]
31        columns = ["userid", "rating", "movie", "t"]
32        df = pd.DataFrame(data, columns=columns)
33        response = client.log("fennel_webhook", "RatingActivity", df)
34        assert response.status_code == requests.codes.OK
35
36        # Do some lookups to verify pipeline_aggregate
37        # is working as expected
38        ts = pd.Series([now, now])
39        names = pd.Series(["Jumanji", "Titanic"])
40        df, _ = MovieRating.lookup(
41            ts,
42            movie=names,
43        )
44        assert df.shape == (2, 5)
45        assert df["movie"].tolist() == ["Jumanji", "Titanic"]
46        assert df["rating"].tolist() == [3, 4]
47        assert df["num_ratings"].tolist() == [4, 5]
48        assert df["sum_ratings"].tolist() == [12, 20]

Testing Featuresets

Extractors are simple Python functions and, hence, can be unit tested directly.

1from fennel.featuresets import feature, featureset, extractor
2
3
4@meta(owner="[email protected]")
5@featureset
6class UserInfoFeatures:
7    userid: int = feature(id=1)
8    name: str = feature(id=2)
9    # The users gender among male/female/non-binary
10    age: int = feature(id=4).meta(owner="[email protected]")
11    age_squared: int = feature(id=5)
12    age_cubed: int = feature(id=6)
13    is_name_common: bool = feature(id=7)
14
15    @extractor
16    @inputs(age, name)
17    @outputs(age_squared, age_cubed, is_name_common)
18    def get_age_and_name_features(
19        cls, ts: pd.Series, user_age: pd.Series, name: pd.Series
20    ):
21        is_name_common = name.isin(["John", "Mary", "Bob"])
22        df = pd.concat([user_age**2, user_age**3, is_name_common], axis=1)
23        df.columns = [
24            str(cls.age_squared),
25            str(cls.age_cubed),
26            str(cls.is_name_common),
27        ]
28        return df
29
30
31# somewhere in the test file, you can write this
32class TestSimpleExtractor(unittest.TestCase):
33    def test_get_age_and_name_features(self):
34        age = pd.Series([32, 24])
35        name = pd.Series(["John", "Rahul"])
36        ts = pd.Series([datetime(2020, 1, 1), datetime(2020, 1, 1)])
37        df = UserInfoFeatures.get_age_and_name_features(
38            UserInfoFeatures, ts, age, name
39        )
40        self.assertEqual(df.shape, (2, 3))
41        self.assertEqual(
42            df["UserInfoFeatures.age_squared"].tolist(), [1024, 576]
43        )
44        self.assertEqual(
45            df["UserInfoFeatures.age_cubed"].tolist(), [32768, 13824]
46        )
47        self.assertEqual(
48            df["UserInfoFeatures.is_name_common"].tolist(),
49            [True, False],
50        )

For extractors that depend on dataset lookups, the setup looks similar to that of testing datasets as shown above - create a mock client, sync some datasets/featuresets, log data to a dataset, and finally use client to extract features. Here is an example:

1@meta(owner="[email protected]")
2@source(webhook.endpoint("UserInfoDataset"))
3@dataset
4class UserInfoDataset:
5    user_id: int = field(key=True)
6    name: str
7    age: Optional[int]
8    timestamp: datetime = field(timestamp=True)
9    country: str
10
11
12@meta(owner="[email protected]")
13@featureset
14class UserInfoMultipleExtractor:
15    userid: int = feature(id=1)
16    name: str = feature(id=2)
17    country_geoid: int = feature(id=3)
18    # The users gender among male/female/non-binary
19    age: int = feature(id=4).meta(owner="[email protected]")
20    age_squared: int = feature(id=5)
21    age_cubed: int = feature(id=6)
22    is_name_common: bool = feature(id=7)
23
24    @extractor(depends_on=[UserInfoDataset])
25    @inputs(userid)
26    @outputs(age, name)
27    def get_user_age_and_name(cls, ts: pd.Series, user_id: pd.Series):
28        df, _found = UserInfoDataset.lookup(ts, user_id=user_id)
29        return df[["age", "name"]]
30
31    @extractor
32    @inputs(age, name)
33    @outputs(age_squared, age_cubed, is_name_common)
34    def get_age_and_name_features(
35        cls, ts: pd.Series, user_age: pd.Series, name: pd.Series
36    ):
37        is_name_common = name.isin(["John", "Mary", "Bob"])
38        df = pd.concat([user_age**2, user_age**3, is_name_common], axis=1)
39        df.columns = [
40            str(cls.age_squared),
41            str(cls.age_cubed),
42            str(cls.is_name_common),
43        ]
44        return df
45
46    @extractor(depends_on=[UserInfoDataset])
47    @includes(get_country_geoid)
48    @inputs(userid)
49    @outputs(country_geoid)
50    def get_country_geoid_extractor(cls, ts: pd.Series, user_id: pd.Series):
51        df, _found = UserInfoDataset.lookup(ts, user_id=user_id)  # type: ignore
52        df["country_geoid"] = df["country"].apply(get_country_geoid)
53        return df["country_geoid"]
54
55
56# this is your test code in some test module
57class TestExtractorDAGResolution(unittest.TestCase):
58    @mock
59    def test_dag_resolution(self, client):
60        client.sync(
61            datasets=[UserInfoDataset],
62            featuresets=[UserInfoMultipleExtractor],
63        )
64        now = datetime.now()
65        data = [
66            [18232, "John", 32, "USA", now],
67            [18234, "Monica", 24, "Chile", now],
68        ]
69        columns = ["user_id", "name", "age", "country", "timestamp"]
70        df = pd.DataFrame(data, columns=columns)
71        response = client.log("fennel_webhook", "UserInfoDataset", df)
72        assert response.status_code == requests.codes.OK, response.json()
73
74        feature_df = client.extract_features(
75            output_feature_list=[
76                UserInfoMultipleExtractor,
77            ],
78            input_feature_list=[UserInfoMultipleExtractor.userid],
79            input_dataframe=pd.DataFrame(
80                {"UserInfoMultipleExtractor.userid": [18232, 18234]}
81            ),
82        )
83        self.assertEqual(feature_df.shape, (2, 7))