API
Docs

Average

Aggregation to computes a rolling average for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the average should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type float.

default:Optional[float]

Average over an empty set of rows isn't well defined - Fennel returns default in such cases. If the default is not set or is None, Fennel returns None and in that case, the expected type of into_field must be Optional[float].

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Average,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amt: int
19    timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23    uid: int = field(key=True)
24    avg_1d: float
25    avg_1w: float
26    timestamp: datetime
27
28    @pipeline
29    @inputs(Transaction)
30    def avg_pipeline(cls, ds: Dataset):
31        return ds.groupby("uid").aggregate(
32            avg_1d=Average(of="amt", window=Continuous("1d"), default=-1.0),
33            avg_1w=Average(of="amt", window=Continuous("1w"), default=-1.0),
34        )
Average in rolling window of 1 day & 1 week

python

Returns

Union[float, Optional[float]]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Average on non int/float/decimal types:

The input column denoted by of must either be of int or float or decimal types.

Note that like SQL, aggregations over Optional[int] or Optional[float] are allowed.

Output and/or default aren't float:

The type of the field denoted by into_field in the output dataset and that of default should both be float.

1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    zip: str
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    avg_1d: str
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def invalid_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            avg_1d=Average(
26                of="zip", window=Continuous("1d"), default="avg"
27            ),
28        )
Can not take average over string, only int or float or decimal

python

1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amt: float
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    # output of avg is always float
19    ret: int
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def invalid_pipeline(cls, ds: Dataset):
25        return ds.groupby("uid").aggregate(
26            ret=Average(of="amt", window=Continuous("1d"), default=1.0),
27        )
Invalid type: ret is int but should be float

python

Count

Aggregation to compute a rolling count for each group within a window.

Parameters

window:Window

The continuous window within which something need to be counted. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int.

unique:bool

Default: False

If set to True, the aggregation counts the number of unique values of the field given by of (aka COUNT DISTINCT in SQL).

approx:bool

Default: False

If set to True, the count isn't exact but only an approximation. This field must be set to True if and only if unique is set to True.

Fennel uses hyperloglog data structure to compute unique approximate counts and in practice, the count is exact for small counts.

of:Optional[str]

Name of the field in the input dataset which should be used for unique. Only relevant when unique is set to True.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Count,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    vendor: str
19    amount: int
20    timestamp: datetime
21
22@dataset(index=True)
23class Aggregated:
24    uid: int = field(key=True)
25    num_transactions: int
26    unique_vendors_1w: int
27    timestamp: datetime
28
29    @pipeline
30    @inputs(Transaction)
31    def count_pipeline(cls, ds: Dataset):
32        return ds.groupby("uid").aggregate(
33            num_transactions=Count(window=Continuous("forever")),
34            unique_vendors_1w=Count(
35                of="vendor",
36                unique=True,
37                approx=True,
38                window=Continuous("1w"),
39            ),
40        )
Count # of transaction & distinct vendors per user

python

Returns

int

Accumulates the count in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0.

Errors

Count unique on unhashable type:

The input column denoted by of must have a hashable type in order to build a hyperloglog. For instance, float or types built on float aren't allowed.

Unique counts without approx:

As of right now, it's a commit error to try to compute unique count without setting approx to True.

Warning

Maintaining unique counts is substantially more costly than maintaining non-unique counts so use it only when truly needed.

Distinct

Aggregation to computes a set of distinct values for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the distinct set should be computed. This field must be of any hashable type (e.g. floats aren't allowed)

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type List[T] where T is the type of the field denoted by of.

unordered:float

If set to True, the list is sorted by natural comparison order. However, as of right now, this must be set to False since ordered mode isn't supported yet.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Distinct,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amount: int
19    timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23    uid: int = field(key=True)
24    amounts: List[int]
25    timestamp: datetime
26
27    @pipeline
28    @inputs(Transaction)
29    def distinct_pipeline(cls, ds: Dataset):
30        return ds.groupby("uid").aggregate(
31            amounts=Distinct(
32                of="amount",
33                window=Continuous("1d"),
34                unordered=True,
35            ),
36        )
Distinct in window of 1 day

python

Returns

List[T]

Stores the result of the aggregation in the appropriate column of the output dataset which must be of type List[T] where T is the type of the input column.

Errors

Computing distinct for non-hashable types:

Distinct operator is a lot like building a hashmap - for it to be valid, the underlying data must be hashable. Types like float (or any other complex type built using float) aren't hashable - so a commit error is raised.

Warning

Storing the full set of distinct values can get costly so it's recommended to use Distinct only for sets of small cardinality (say < 100)

1from fennel.datasets import dataset, field, pipeline, Dataset, Distinct
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    amounts: int  # should be List[int]
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            amounts=Distinct(
25                of="amount",
26                limit=10,
27                unordered=True,
28            ),
29        )
amounts should be of type List[int], not int

python

LastK

Aggregation to compute a rolling list of the latest values for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the aggregation should be computed.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type List[T] where T is the type of the field denoted by of.

limit:int

Since storing all the values for a group can get costly, LastK expects a limit to be specified which denotes the maximum size of the list that should be maintained at any point.

dedup:bool

If set to True, only distinct values are stored else values stored in the last can have duplicates too.

dropnull:bool

If set to True, None values are dropped from the result. It expects of field to be of type Optional[T] and into_field gets the type List[T].

1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17    uid: int = field(key=True)
18    amounts: List[int]
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def lastk_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            amounts=LastK(
26                of="amount",
27                limit=10,
28                dedup=False,
29                dropnull=False,
30                window=Continuous("1d"),
31            ),
32        )
LastK in window of 1 day

python

Returns

List[T]

Stores the result of the aggregation in the appropriate field of the output dataset.

Errors

Incorrect output type:

The column denoted by into_field in the output dataset must be of type List[T] where T is the type of the column denoted by of in the input dataset. Commit error is raised if this is not the case.

Warning

Storing the full set of values and maintaining order between them can get costly, so use this aggregation only when needed.

1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    amounts: int  # should be List[int]
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def bad_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            amounts=LastK(
26                of="amount",
27                limit=10,
28                dedup=False,
29                dropnull=False,
30                window=Continuous("1d"),
31            ),
32        )
amounts should be of type List[int], not int

python

FirstK

Aggregation to compute a rolling list of the earliest values for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the aggregation should be computed.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type List[T] where T is the type of the field denoted by of.

limit:int

Since storing all the values for a group can get costly, FirstK expects a limit to be specified which denotes the maximum size of the list that should be maintained at any point.

dedup:bool

If set to True, only distinct values are stored else values stored in the first can have duplicates too.

dropnull:bool

If set to True, None values are dropped from the result. It expects of field to be of type Optional[T] and into_field gets the type List[T].

1from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17    uid: int = field(key=True)
18    amounts: List[int]
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def firstk_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            amounts=FirstK(
26                of="amount",
27                limit=10,
28                dedup=False,
29                window=Continuous("1d"),
30                dropnull=False,
31            ),
32        )
FirstK in window of 1 day

python

Returns

List[T]

Stores the result of the aggregation in the appropriate field of the output dataset.

Errors

Incorrect output type:

The column denoted by into_field in the output dataset must be of type List[T] where T is the type of the column denoted by of in the input dataset. Commit error is raised if this is not the case.

Warning

Storing the full set of values and maintaining order between them can get costly, so use this aggregation only when needed.

1from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    amounts: int  # should be List[int]
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def bad_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            amounts=FirstK(
26                of="amount",
27                limit=10,
28                dedup=False,
29                window=Continuous("1d"),
30                dropnull=False,
31            ),
32        )
amounts should be of type List[int], not int

python

Max

Aggregation to computes a rolling max for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the max should be computed. This field must either be of type int, float, date or datetime.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int, float, date or datetime - same as the type of the field in the input dataset corresponding to of.

default:Optional[Union[int, float, Decimal, datetime, date]]

Max over an empty set of rows isn't well defined - Fennel returns default in such cases. The type of default must be same as that of of in the input dataset. If the default is not set or is None, Fennel returns None and in that case, the expected type of into_field must be Optional[T].

1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amt: float
13    timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17    uid: int = field(key=True)
18    max_1d: float
19    max_1w: float
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def def_pipeline(cls, ds: Dataset):
25        return ds.groupby("uid").aggregate(
26            max_1d=Max(of="amt", window=Continuous("1d"), default=-1.0),
27            max_1w=Max(of="amt", window=Continuous("1w"), default=-1.0),
28        )
Maximum in rolling window of 1 day & 1 week

python

Returns

Union[int, float, date, datetime]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Max on other types:

The input column denoted by of must be of int, float, decimal, date or datetime types.

Note that like SQL, aggregations over Optional[int] or Optional[float] are allowed.

Types of input, output & default don't match:

The type of the field denoted by into_field in the output dataset and that of default should be same as that of the field field denoted by of in the input dataset.

1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    zip: str
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    max: str
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def def_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            max=Max(of="zip", window=Continuous("1d"), default="max"),
26        )
Can not take max over string; only int, float, date or datetime

python

1from fennel.datasets import dataset, field, Dataset, Max, pipeline
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amt: float
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    max_1d: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def def_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            max_1d=Max(of="amt", window=Continuous("1d"), default=1),
26        )
amt is float but max_1d is int

python

Min

Aggregation to computes a rolling min for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the min should be computed. This field must either be of type int, float, date or datetime.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int, float, date or datetime - same as the type of the field in the input dataset corresponding to of.

default:Optional[Union[int, float, Decimal, datetime, date]]

Min over an empty set of rows isn't well defined - Fennel returns default in such cases. The type of default must be same as that of of in the input dataset. If the default is not set or is None, Fennel returns None and in that case, the expected type of into_field must be Optional[T].

1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amt: float
13    timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17    uid: int = field(key=True)
18    min_1d: float
19    min_1w: float
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def min_pipeline(cls, ds: Dataset):
25        return ds.groupby("uid").aggregate(
26            min_1d=Min(of="amt", window=Continuous("1d"), default=-1.0),
27            min_1w=Min(of="amt", window=Continuous("1w"), default=-1.0),
28        )
Minimum in rolling window of 1 day & 1 week

python

Returns

Union[int, float, date, datetime]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Min on other types:

The input column denoted by of must be of int, float, decimal, date or datetime types.

Note that like SQL, aggregations over Optional[int] or Optional[float] are allowed.

Types of input, output & default don't match:

The type of the field denoted by into_field in the output dataset and that of default should be same as that of the field field denoted by of in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    zip: str
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    min: str
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def invalid_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            min=Min(of="zip", window=Continuous("1d"), default="min"),
26        )
Can not take min over string; only int, float, date or datetime

python

1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amt: float
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    min_1d: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def invalid_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            min_1d=Min(of="amt", window=Continuous("1d"), default=1),
26        )
amt is float but min_1d is int

python

Stddev

Aggregation to computes a rolling standard deviation for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the aggregation should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type float.

default:Optional[float]

Standard deviation over an empty set of rows isn't well defined - Fennel returns default in such cases. If the default is not set or is None, Fennel returns None and in that case, the expected type of into_field must be Optional[float].

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Average,
7    Stddev,
8)
9from fennel.dtypes import Continuous
10from fennel.lib import inputs
11from fennel.connectors import source, Webhook
12
13webhook = Webhook(name="webhook")
14
15@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
16@dataset
17class Transaction:
18    uid: int
19    amt: int
20    timestamp: datetime
21
22@dataset(index=True)
23class Aggregated:
24    uid: int = field(key=True)
25    mean: float
26    stddev: float
27    timestamp: datetime
28
29    @pipeline
30    @inputs(Transaction)
31    def stddev_pipeline(cls, ds: Dataset):
32        return ds.groupby("uid").aggregate(
33            mean=Average(of="amt", window=Continuous("1d"), default=-1.0),
34            stddev=Stddev(of="amt", window=Continuous("1d"), default=-1.0),
35        )
Standard deviation in window of 1 day & week

python

Returns

Union[float, Optional[float]]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Stddev on non int/float types:

The input column denoted by of must either be of int or float or decimal types.

Note that like SQL, aggregations over Optional[int] or Optional[float] are allowed.

Output and/or default aren't float:

The type of the field denoted by into_field in the output dataset and that of default should both be float.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Stddev,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    zip: str
19    timestamp: datetime
20
21@dataset
22class Aggregated:
23    uid: int = field(key=True)
24    var: str
25    timestamp: datetime
26
27    @pipeline
28    @inputs(Transaction)
29    def invalid_pipeline(cls, ds: Dataset):
30        return ds.groupby("uid").aggregate(
31            var=Stddev(of="zip", window=Continuous("1d"), default="x"),
32        )
Can not take stddev over string, only int or float or decimal

python

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Stddev,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amt: float
19    timestamp: datetime
20
21@dataset
22class Aggregated:
23    uid: int = field(key=True)
24    ret: int
25    timestamp: datetime
26
27    @pipeline
28    @inputs(Transaction)
29    def invalid_pipeline(cls, ds: Dataset):
30        return ds.groupby("uid").aggregate(
31            ret=Stddev(of="amt", window=Continuous("1d"), default=1.0),
32        )
Invalid type: ret is int but should be float

python

Sum

Aggregation to compute a rolling sum for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the sum should be computed. This field can only either be int or `float.

window:Window

The continuous window within which something need to be counted. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int or float - same as the type of the field in the input dataset corresponding to of.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Sum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amount: int
19    timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23    uid: int = field(key=True)
24    # new int fields added to the dataset by the count aggregation
25    amount_1w: int
26    total: int
27    timestamp: datetime
28
29    @pipeline
30    @inputs(Transaction)
31    def sum_pipeline(cls, ds: Dataset):
32        return ds.groupby("uid").aggregate(
33            amount_1w=Sum(of="amount", window=Continuous("1w")),
34            total=Sum(of="amount", window=Continuous("forever")),
35        )
Sum up amount in 1 week and forever windows

python

Returns

Union[int, float, Decimal]

Accumulates the count in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0 (or 0.0 if of is float).

Errors

Sum on non int/float types:

The input column denoted by of must either be of int or float or decimal types.

Note that like SQL, aggregations over Optional[int] or Optional[float] are allowed.

1from fennel.datasets import dataset, field, pipeline, Dataset, Sum
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(
9    webhook.endpoint("Transaction"), disorder="14d", cdc="append"
10)
11@dataset
12class Transaction:
13    uid: int
14    amount: str
15    vendor: str
16    timestamp: datetime
17
18@dataset
19class Aggregated:
20    uid: int = field(key=True)
21    total: int
22    timestamp: datetime
23
24    @pipeline
25    @inputs(Transaction)
26    def bad_pipeline(cls, ds: Dataset):
27        return ds.groupby("uid").aggregate(
28            total=Sum(of="vendor", window=Continuous("forever")),
29        )
Can only sum up int or float types

python

Quantile

Aggregation to compute rolling quantiles (aka percentiles) for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the quantile should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type Optional[float] unless default is provided, in which case, it is expected to be of type float.

default:Optional[float]

Quantile over an empty set of rows isn't well defined - Fennel returns default in such cases. If the default is not set or is None, Fennel returns None and in that case, the expected type of into_field must be Optional[float].

p:float

The percentile (between 0 and 1) to be calculated.

approx:bool

Default: False

If set to True, the calculated value isn't exact but only an approximation. Fennel only supports approximate quantiles for now so this kwarg must always be set to True.

Fennel uses uDDsketch data structure to compute approximate quantiles with an error bound set to be within 1% of the true value.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Quantile,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amount: int
19    timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23    uid: int = field(key=True)
24    # new float fields added to the dataset by the quantile aggregation
25    median_amount_1w: float
26    timestamp: datetime
27
28    @pipeline
29    @inputs(Transaction)
30    def quantil_pipeline(cls, ds: Dataset):
31        return ds.groupby("uid").aggregate(
32            median_amount_1w=Quantile(
33                of="amount",
34                window=Continuous("1w"),
35                p=0.5,
36                approx=True,
37                default=0.0,
38            ),
39        )
Median in rolling windows of 1 day & 1 week

python

Returns

Union[float, Optional[float]]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Quantile on non int/float types:

The input column denoted by of must either be of int or float or decimal types.

Note that like SQL, aggregations over Optional[int] or Optional[float] are allowed.

Types of output & default don't match:

The type of the field denoted by into_field in the output dataset should match the default. If default is set and not None, the field should be float else it should be Optional[float].

Invalid p value:

Commit error if the value of p is not between 0 and 1.

Approximate is not set to true:

Commit error if approx is not set to True. Fennel only supports approximate quantiles for now but requires this kwarg to be set explicitly to both set the right expectations and be compatible with future addition of exact quantiles.

1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12    uid: int
13    amount: str
14    timestamp: datetime
15
16@dataset
17class Aggregated:
18    uid: int = field(key=True)
19    median_amount_1w: float
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def bad_pipeline(cls, ds: Dataset):
25        return ds.groupby("uid").aggregate(
26            median_amount_1w=Quantile(
27                of="amount",
28                window=Continuous("1w"),
29                p=0.5,
30                approx=True,
31                default=0.0,
32            ),
33        )
Can not take quantile over string, only int or float or decimal

python

1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12    uid: int
13    amount: int
14    vendor: str
15    timestamp: datetime
16
17@dataset
18class Aggregated:
19    uid: int = field(key=True)
20    median_amount_1w: float
21    timestamp: datetime
22
23    @pipeline
24    @inputs(Transaction)
25    def bad_pipeline(cls, ds: Dataset):
26        return ds.groupby("uid").aggregate(
27            median_amount_1w=Quantile(
28                of="amount",
29                window=Continuous("1w"),
30                p=0.5,
31                approx=True,
32            ),
33        )
Default is not specified, so the output field should be Optional[float]

python

1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12    uid: int
13    amount: int
14    vendor: str
15    timestamp: datetime
16
17@dataset
18class Aggregated:
19    uid: int = field(key=True)
20    median_amount_1w: Optional[float]
21    timestamp: datetime
22
23    @pipeline
24    @inputs(Transaction)
25    def bad_pipeline(cls, ds: Dataset):
26        return ds.groupby("uid").aggregate(
27            median_amount_1w=Quantile(
28                of="amount",
29                window=Continuous("1w"),
30                p=10.0,
31                approx=True,
32            ),
33        )
p is invalid, can only be between [0, 1]

python

Exponential Decay Sum

Aggregation to compute a rolling exponential decay for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the decayed sum should be computed. This field can only either be int or float or decimal.

window:Window

The continuous window within which something need to be counted. Possible values are "forever" or any time duration.

half_life:Duration

Half-life of the exponential decay. This is the time it takes for the value to decay to half of its original value. The value of half_life must be greater than 0. The value is of type duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int or float - same as the type of the field in the input dataset corresponding to of.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    ExpDecaySum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amount: int
19    timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23    uid: int = field(key=True)
24    # new int fields added to the dataset by the aggregation
25    amount_1w: float
26    total: float
27    timestamp: datetime
28
29    @pipeline
30    @inputs(Transaction)
31    def exp_decay_sum(cls, ds: Dataset):
32        return ds.groupby("uid").aggregate(
33            amount_1w=ExpDecaySum(
34                of="amount", window=Continuous("1w"), half_life="1d"
35            ),
36            total=ExpDecaySum(
37                of="amount",
38                window=Continuous("forever"),
39                half_life="1w",
40            ),
41        )
Exponential decayed sum of up amount in 1 week and forever windows for different half lives

python

Returns

float

Accumulates the result in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0.0

Errors

Sum on non int/float types:

The input column denoted by of must either be of int or float types. The output field denoted by into_field must always be of type float.

Note that like SQL, aggregations over Optional[int] or Optional[float] are allowed.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    ExpDecaySum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(
15    webhook.endpoint("Transaction"), disorder="14d", cdc="append"
16)
17@dataset
18class Transaction:
19    uid: int
20    amount: str
21    vendor: str
22    timestamp: datetime
23
24@dataset
25class Aggregated:
26    uid: int = field(key=True)
27    total: int
28    timestamp: datetime
29
30    @pipeline
31    @inputs(Transaction)
32    def bad_pipeline(cls, ds: Dataset):
33        return ds.groupby("uid").aggregate(
34            total=ExpDecaySum(
35                of="amount",
36                window=Continuous("forever"),
37                half_life="1w",
38            ),
39        )
Output type should always be float

python

Note

The value of the decayed sum depends on the time you query the dataset. Hence it varies with request time for the same key in a dataset. Therefore pipelines containing this aggregation are terminal - no other operator can follow it.