Development Workflow
Unit Tests
Fennel's Python client ships with an (inefficient) mock server inside it - this makes it possible to do local development and run unit tests against the mock server to verify correctness. This works even if you don't have any remote Fennel server - heck it works even if you don't have internet.
This mock server has near parity with the actual server with one notable exception - it doesn't support data connectors to external data systems (after all, it is completely local with zero remote dependencies!)
Let's first see how it will work and later we will see a fully functional unit test example.
1from fennel.testing import mock
2
3
4class TestDataset(unittest.TestCase):
5 @mock
6 def test_dataset(self, client):
7 # client talks to the mock server
8 # ... do any setup
9 # commit the dataset
10 client.commit(datasets=[User])
11 # ... some other stuff
12
13 # Log data to the dataset directly (ONLY for testing)
14 log(User, pd.Dataframe(...))
15 # OR
16 # Log data to the dataset via a webhook
17 client.log("fennel_webhook", 'User', pd.Dataframe(...))
18 # ... some other stuff
19 found = client.query(...)
20 self.assertEqual(found, expected)
python
Here we imported mock
from fennel.testing
. This is a decorator which
can be used to decorate test functions - and the decorator supplies an extra
argument called client
to the test. Once the client
object reaches the
body of the test, you can do all operations that are typically done on a real
client - you can commit datasets/featuresets, log data, extract features etc.
You can bring data to a dataset in the mock server, by using the log function from our testing library or by explicitly logging data to a webhook.
Testing Datasets
For testing Datasets, you can use the log method of the client to add some local data to a dataset or use the log method from our testing library and then query this or other downstream datasets using the lookup method. Here is an end to end example. Suppose our regular non-test code looks like this:
1from fennel.connectors import source, Webhook
2from fennel.datasets import Count, Sum, Average
3from fennel.datasets import dataset, field, pipeline, Dataset
4from fennel.dtypes import Continuous
5from fennel.lib import includes, meta, inputs, outputs
6
7__owner__ = "[email protected]"
8webhook = Webhook(name="fennel_webhook")
9
10
11@source(webhook.endpoint("RatingActivity"), disorder="14d", cdc="append")
12@dataset
13class RatingActivity:
14 userid: int
15 rating: float
16 movie: str
17 t: datetime
18
19
20@dataset(index=True)
21class MovieRating:
22 movie: str = field(key=True)
23 rating: float
24 num_ratings: int
25 sum_ratings: float
26 t: datetime
27
28 @pipeline
29 @inputs(RatingActivity)
30 def pipeline_aggregate(cls, activity: Dataset):
31 return activity.groupby("movie").aggregate(
32 num_ratings=Count(window=Continuous("7d")),
33 sum_ratings=Sum(window=Continuous("28d"), of="rating"),
34 rating=Average(window=Continuous("12h"), of="rating"),
35 )
python
And you want to test that data reaching RatingActivity
dataset correctly
propagates to MovieRating
dataset via the pipeline. You could write the
following unit test to do so:
1import unittest
2from fennel.testing import mock, log
3
4
5class TestDataset(unittest.TestCase):
6 @mock
7 def test_dataset(self, client):
8 # Sync the dataset
9 client.commit(
10 message="datasets: add RatingActivity and MovieRating",
11 datasets=[MovieRating, RatingActivity],
12 )
13 now = datetime.now(timezone.utc)
14 one_hour_ago = now - timedelta(hours=1)
15 two_hours_ago = now - timedelta(hours=2)
16 three_hours_ago = now - timedelta(hours=3)
17 four_hours_ago = now - timedelta(hours=4)
18 five_hours_ago = now - timedelta(hours=5)
19
20 data = [
21 [18231, 2, "Jumanji", five_hours_ago],
22 [18231, 3, "Jumanji", four_hours_ago],
23 [18231, 2, "Jumanji", three_hours_ago],
24 [18231, 5, "Jumanji", five_hours_ago],
25 [18231, 4, "Titanic", three_hours_ago],
26 [18231, 3, "Titanic", two_hours_ago],
27 [18231, 5, "Titanic", one_hour_ago],
28 [18231, 5, "Titanic", now - timedelta(minutes=1)],
29 [18231, 3, "Titanic", two_hours_ago],
30 ]
31 columns = ["userid", "rating", "movie", "t"]
32 df = pd.DataFrame(data, columns=columns)
33 log(RatingActivity, df)
34
35 # Do some lookups to verify pipeline_aggregate
36 # is working as expected
37 ts = pd.Series([now, now])
38 names = pd.Series(["Jumanji", "Titanic"])
39 df, _ = MovieRating.lookup(ts, movie=names)
40 assert df.shape == (2, 5)
41 assert df["movie"].tolist() == ["Jumanji", "Titanic"]
42 assert df["rating"].tolist() == [3, 4]
43 assert df["num_ratings"].tolist() == [4, 5]
44 assert df["sum_ratings"].tolist() == [12, 20]
python
Testing Featuresets
Extractors are simple Python functions and, hence, can be unit tested directly.
1from fennel.featuresets import feature as F, featureset, extractor
2
3
4@meta(owner="[email protected]")
5@featureset
6class UserInfoFeatures:
7 userid: int
8 name: str
9 # The users gender among male/female/non-binary
10 age: int = F().meta(owner="[email protected]")
11 age_squared: int
12 age_cubed: int
13 is_name_common: bool
14
15 @extractor
16 @inputs(age, "name")
17 @outputs("age_squared", "age_cubed", "is_name_common")
18 def get_age_and_name_features(
19 cls, ts: pd.Series, user_age: pd.Series, name: pd.Series
20 ):
21 is_name_common = name.isin(["John", "Mary", "Bob"])
22 df = pd.concat([user_age**2, user_age**3, is_name_common], axis=1)
23 df.columns = [
24 str(cls.age_squared),
25 str(cls.age_cubed),
26 str(cls.is_name_common),
27 ]
28 return df
29
30
31# somewhere in the test file, you can write this
32class TestSimpleExtractor(unittest.TestCase):
33 def test_get_age_and_name_features(self):
34 age = pd.Series([32, 24])
35 name = pd.Series(["John", "Rahul"])
36 ts = pd.Series([datetime(2020, 1, 1), datetime(2020, 1, 1)])
37 df = UserInfoFeatures.get_age_and_name_features(
38 UserInfoFeatures, ts, age, name
39 )
40 self.assertEqual(df.shape, (2, 3))
41 self.assertEqual(
42 df["UserInfoFeatures.age_squared"].tolist(), [1024, 576]
43 )
44 self.assertEqual(
45 df["UserInfoFeatures.age_cubed"].tolist(), [32768, 13824]
46 )
47 self.assertEqual(
48 df["UserInfoFeatures.is_name_common"].tolist(),
49 [True, False],
50 )
python
For extractors that depend on dataset lookups, the setup looks similar to that
of testing datasets as shown above - create a mock client, commit
some
datasets/featuresets, log data to a dataset, and finally use client to extract
features. Here is an example:
1@meta(owner="[email protected]")
2@dataset(index=True)
3class UserInfoDataset:
4 user_id: int = field(key=True)
5 name: str
6 age: Optional[int]
7 timestamp: datetime = field(timestamp=True)
8 country: str
9
10
11@meta(owner="[email protected]")
12@featureset
13class UserInfoMultipleExtractor:
14 userid: int
15 name: str
16 country_geoid: int
17 # The users gender among male/female/non-binary
18 age: int = F().meta(owner="[email protected]")
19 age_squared: int
20 age_cubed: int
21 is_name_common: bool
22
23 @extractor(deps=[UserInfoDataset])
24 @inputs("userid")
25 @outputs("age", "name")
26 def get_user_age_and_name(cls, ts: pd.Series, user_id: pd.Series):
27 df, _found = UserInfoDataset.lookup(ts, user_id=user_id)
28 return df[["age", "name"]]
29
30 @extractor
31 @inputs("age", "name")
32 @outputs("age_squared", "age_cubed", "is_name_common")
33 def get_age_and_name_features(
34 cls, ts: pd.Series, user_age: pd.Series, name: pd.Series
35 ):
36 is_name_common = name.isin(["John", "Mary", "Bob"])
37 df = pd.concat([user_age**2, user_age**3, is_name_common], axis=1)
38 df.columns = [
39 "age_squared",
40 "age_cubed",
41 "is_name_common",
42 ]
43 return df
44
45 @extractor(deps=[UserInfoDataset])
46 @includes(get_country_geoid)
47 @inputs("userid")
48 @outputs("country_geoid")
49 def get_country_geoid_extractor(cls, ts: pd.Series, user_id: pd.Series):
50 df, _found = UserInfoDataset.lookup(ts, user_id=user_id) # type: ignore
51 df["country_geoid"] = df["country"].apply(get_country_geoid)
52 return df["country_geoid"]
53
54
55# this is your test code in some test module
56class TestExtractorDAGResolution(unittest.TestCase):
57 @mock
58 def test_dag_resolution(self, client):
59 client.commit(
60 message="user: add info datasets, featuresets",
61 datasets=[UserInfoDataset],
62 featuresets=[UserInfoMultipleExtractor],
63 )
64 now = datetime.now(timezone.utc)
65 data = [
66 [18232, "John", 32, "USA", now],
67 [18234, "Monica", 24, "Chile", now],
68 ]
69 columns = ["user_id", "name", "age", "country", "timestamp"]
70 df = pd.DataFrame(data, columns=columns)
71 # For testing only.
72 log(UserInfoDataset, df)
73
74 feature_df = client.query(
75 outputs=[UserInfoMultipleExtractor],
76 inputs=[UserInfoMultipleExtractor.userid],
77 input_dataframe=pd.DataFrame(
78 {"UserInfoMultipleExtractor.userid": [18232, 18234]}
79 ),
80 )
81 self.assertEqual(feature_df.shape, (2, 7))
python