Average
Aggregation to computes a rolling average for each group within a window.
Parameters
Name of the field in the input dataset over which the average should be computed.
This field must either be of type int
or float
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type float
.
Average over an empty set of rows isn't well defined - Fennel returns default
in such cases. If the default is not set or is None, Fennel returns None and
in that case, the expected type of into_field
must be Optional[float]
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Average,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amt: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 avg_1d: float
25 avg_1w: float
26 timestamp: datetime
27
28 @pipeline
29 @inputs(Transaction)
30 def avg_pipeline(cls, ds: Dataset):
31 return ds.groupby("uid").aggregate(
32 avg_1d=Average(of="amt", window=Continuous("1d"), default=-1.0),
33 avg_1w=Average(of="amt", window=Continuous("1w"), default=-1.0),
34 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must either be of int
or float
or
decimal
types.
Note that like SQL, aggregations over Optional[int]
or Optional[float]
are allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should both be float
.
1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 zip: str
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 avg_1d: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 avg_1d=Average(
26 of="zip", window=Continuous("1d"), default="avg"
27 ),
28 )
python
1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 # output of avg is always float
19 ret: int
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def invalid_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 ret=Average(of="amt", window=Continuous("1d"), default=1.0),
27 )
python
Count
Aggregation to compute a rolling count for each group within a window.
Parameters
The continuous window within which something need to be counted. Possible values
are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
.
Default: False
If set to True, the aggregation counts the number of unique values of the field
given by of
(aka COUNT DISTINCT
in SQL).
Default: False
If set to True, the count isn't exact but only an approximation. This field must
be set to True if and only if unique
is set to True.
Fennel uses hyperloglog data structure to compute unique approximate counts and in practice, the count is exact for small counts.
Name of the field in the input dataset which should be used for unique
. Only
relevant when unique
is set to True.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Count,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 vendor: str
19 amount: int
20 timestamp: datetime
21
22@dataset(index=True)
23class Aggregated:
24 uid: int = field(key=True)
25 num_transactions: int
26 unique_vendors_1w: int
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def count_pipeline(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 num_transactions=Count(window=Continuous("forever")),
34 unique_vendors_1w=Count(
35 of="vendor",
36 unique=True,
37 approx=True,
38 window=Continuous("1w"),
39 ),
40 )
python
Returns
Accumulates the count in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0.
Errors
The input column denoted by of
must have a hashable type in order to build a
hyperloglog. For instance, float
or types built on float
aren't allowed.
As of right now, it's a commit error to try to compute unique count without setting
approx
to True.
Maintaining unique counts is substantially more costly than maintaining non-unique counts so use it only when truly needed.
Distinct
Aggregation to computes a set of distinct values for each group within a window.
Parameters
Name of the field in the input dataset over which the distinct set should be computed. This field must be of any hashable type (e.g. floats aren't allowed)
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type List[T]
where T
is the type
of the field denoted by of
.
If set to True, the list is sorted by natural comparison order. However, as of
right now, this must be set to False since ordered
mode isn't supported yet.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Distinct,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 amounts: List[int]
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def distinct_pipeline(cls, ds: Dataset):
30 return ds.groupby("uid").aggregate(
31 amounts=Distinct(
32 of="amount",
33 window=Continuous("1d"),
34 unordered=True,
35 ),
36 )
python
Returns
Stores the result of the aggregation in the appropriate column of the output
dataset which must be of type List[T]
where T
is the type of the input column.
Errors
Distinct operator is a lot like building a hashmap - for it to be valid, the
underlying data must be hashable. Types like float
(or any other complex type
built using float
) aren't hashable - so a commit error is raised.
Storing the full set of distinct values can get costly so it's recommended to use
Distinct
only for sets of small cardinality (say < 100)
1from fennel.datasets import dataset, field, pipeline, Dataset, Distinct
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10 uid: int
11 amount: int
12 timestamp: datetime
13
14@dataset
15class Aggregated:
16 uid: int = field(key=True)
17 amounts: int # should be List[int]
18 timestamp: datetime
19
20 @pipeline
21 @inputs(Transaction)
22 def bad_pipeline(cls, ds: Dataset):
23 return ds.groupby("uid").aggregate(
24 amounts=Distinct(
25 of="amount",
26 limit=10,
27 unordered=True,
28 ),
29 )
python
LastK
Aggregation to compute a rolling list of the latest values for each group within a window.
Parameters
Name of the field in the input dataset over which the aggregation should be computed.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type List[T]
where T
is the type
of the field denoted by of
.
Since storing all the values for a group can get costly, LastK expects a
limit
to be specified which denotes the maximum size of the list that should
be maintained at any point.
If set to True, only distinct values are stored else values stored in the last can have duplicates too.
If set to True, None values are dropped from the result. It expects of
field
to be of type Optional[T]
and into_field
gets the type List[T]
.
1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def lastk_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=LastK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 dropnull=False,
30 window=Continuous("1d"),
31 ),
32 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output dataset.
Errors
The column denoted by into_field
in the output dataset must be of type List[T]
where T is the type of the column denoted by of
in the input dataset. Commit error
is raised if this is not the case.
Storing the full set of values and maintaining order between them can get costly, so use this aggregation only when needed.
1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: int # should be List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def bad_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=LastK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 dropnull=False,
30 window=Continuous("1d"),
31 ),
32 )
python
FirstK
Aggregation to compute a rolling list of the earliest values for each group within a window.
Parameters
Name of the field in the input dataset over which the aggregation should be computed.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type List[T]
where T
is the type
of the field denoted by of
.
Since storing all the values for a group can get costly, FirstK expects a
limit
to be specified which denotes the maximum size of the list that should
be maintained at any point.
If set to True, only distinct values are stored else values stored in the first can have duplicates too.
If set to True, None values are dropped from the result. It expects of
field
to be of type Optional[T]
and into_field
gets the type List[T]
.
1from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def firstk_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=FirstK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 window=Continuous("1d"),
30 dropnull=False,
31 ),
32 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output dataset.
Errors
The column denoted by into_field
in the output dataset must be of type List[T]
where T is the type of the column denoted by of
in the input dataset. Commit error
is raised if this is not the case.
Storing the full set of values and maintaining order between them can get costly, so use this aggregation only when needed.
1from fennel.datasets import dataset, field, pipeline, Dataset, FirstK
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amount: int
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 amounts: int # should be List[int]
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def bad_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 amounts=FirstK(
26 of="amount",
27 limit=10,
28 dedup=False,
29 window=Continuous("1d"),
30 dropnull=False,
31 ),
32 )
python
Max
Aggregation to computes a rolling max for each group within a window.
Parameters
Name of the field in the input dataset over which the max should be computed.
This field must either be of type int
, float
, date
or datetime
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
, float
, date
or
datetime
- same as the type of the field in the input dataset corresponding to
of
.
Max over an empty set of rows isn't well defined - Fennel returns default
in such cases. The type of default
must be same as that of of
in the input
dataset. If the default is not set or is None, Fennel returns None and in that case,
the expected type of into_field
must be Optional[T]
.
1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 max_1d: float
19 max_1w: float
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def def_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 max_1d=Max(of="amt", window=Continuous("1d"), default=-1.0),
27 max_1w=Max(of="amt", window=Continuous("1w"), default=-1.0),
28 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must be of int
, float
, decimal
,
date
or datetime
types.
Note that like SQL, aggregations over Optional[int]
or Optional[float]
are allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should be same as that of the field field denoted by of
in the
input dataset.
1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 zip: str
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 max: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def def_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 max=Max(of="zip", window=Continuous("1d"), default="max"),
26 )
python
1from fennel.datasets import dataset, field, Dataset, Max, pipeline
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 max_1d: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def def_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 max_1d=Max(of="amt", window=Continuous("1d"), default=1),
26 )
python
Min
Aggregation to computes a rolling min for each group within a window.
Parameters
Name of the field in the input dataset over which the min should be computed.
This field must either be of type int
, float
, date
or datetime
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
, float
, date
or
datetime
- same as the type of the field in the input dataset corresponding to
of
.
Min over an empty set of rows isn't well defined - Fennel returns default
in such cases. The type of default
must be same as that of of
in the input
dataset. If the default is not set or is None, Fennel returns None and in that case,
the expected type of into_field
must be Optional[T]
.
1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset(index=True)
16class Aggregated:
17 uid: int = field(key=True)
18 min_1d: float
19 min_1w: float
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def min_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 min_1d=Min(of="amt", window=Continuous("1d"), default=-1.0),
27 min_1w=Min(of="amt", window=Continuous("1w"), default=-1.0),
28 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must be of int
, float
, decimal
,
date
or datetime
types.
Note that like SQL, aggregations over Optional[int]
or Optional[float]
are allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should be same as that of the field field denoted by of
in the
input dataset.
1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 zip: str
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 min: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 min=Min(of="zip", window=Continuous("1d"), default="min"),
26 )
python
1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11 uid: int
12 amt: float
13 timestamp: datetime
14
15@dataset
16class Aggregated:
17 uid: int = field(key=True)
18 min_1d: int
19 timestamp: datetime
20
21 @pipeline
22 @inputs(Transaction)
23 def invalid_pipeline(cls, ds: Dataset):
24 return ds.groupby("uid").aggregate(
25 min_1d=Min(of="amt", window=Continuous("1d"), default=1),
26 )
python
Stddev
Aggregation to computes a rolling standard deviation for each group within a window.
Parameters
Name of the field in the input dataset over which the aggregation should be
computed. This field must either be of type int
or float
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type float
.
Standard deviation over an empty set of rows isn't well defined - Fennel
returns default
in such cases. If the default is not set or is None,
Fennel returns None and in that case, the expected type of into_field
must be Optional[float]
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Average,
7 Stddev,
8)
9from fennel.dtypes import Continuous
10from fennel.lib import inputs
11from fennel.connectors import source, Webhook
12
13webhook = Webhook(name="webhook")
14
15@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
16@dataset
17class Transaction:
18 uid: int
19 amt: int
20 timestamp: datetime
21
22@dataset(index=True)
23class Aggregated:
24 uid: int = field(key=True)
25 mean: float
26 stddev: float
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def stddev_pipeline(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 mean=Average(of="amt", window=Continuous("1d"), default=-1.0),
34 stddev=Stddev(of="amt", window=Continuous("1d"), default=-1.0),
35 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must either be of int
or float
or
decimal
types.
Note that like SQL, aggregations over Optional[int]
or Optional[float]
are allowed.
The type of the field denoted by into_field
in the output dataset and that of
default
should both be float
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Stddev,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 zip: str
19 timestamp: datetime
20
21@dataset
22class Aggregated:
23 uid: int = field(key=True)
24 var: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def invalid_pipeline(cls, ds: Dataset):
30 return ds.groupby("uid").aggregate(
31 var=Stddev(of="zip", window=Continuous("1d"), default="x"),
32 )
python
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Stddev,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amt: float
19 timestamp: datetime
20
21@dataset
22class Aggregated:
23 uid: int = field(key=True)
24 ret: int
25 timestamp: datetime
26
27 @pipeline
28 @inputs(Transaction)
29 def invalid_pipeline(cls, ds: Dataset):
30 return ds.groupby("uid").aggregate(
31 ret=Stddev(of="amt", window=Continuous("1d"), default=1.0),
32 )
python
Sum
Aggregation to compute a rolling sum for each group within a window.
Parameters
Name of the field in the input dataset over which the sum should be computed.
This field can only either be int
or `float.
The continuous window within which something need to be counted. Possible values
are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
or float
- same as the
type of the field in the input dataset corresponding to of
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Sum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 # new int fields added to the dataset by the count aggregation
25 amount_1w: int
26 total: int
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def sum_pipeline(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 amount_1w=Sum(of="amount", window=Continuous("1w")),
34 total=Sum(of="amount", window=Continuous("forever")),
35 )
python
Returns
Accumulates the count in the appropriate field of the output dataset. If there
are no rows to count, by default, it returns 0 (or 0.0 if of
is float).
Errors
The input column denoted by of
must either be of int
or float
or decimal
types.
Note that like SQL, aggregations over Optional[int]
or Optional[float]
are allowed.
1from fennel.datasets import dataset, field, pipeline, Dataset, Sum
2from fennel.dtypes import Continuous
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(
9 webhook.endpoint("Transaction"), disorder="14d", cdc="append"
10)
11@dataset
12class Transaction:
13 uid: int
14 amount: str
15 vendor: str
16 timestamp: datetime
17
18@dataset
19class Aggregated:
20 uid: int = field(key=True)
21 total: int
22 timestamp: datetime
23
24 @pipeline
25 @inputs(Transaction)
26 def bad_pipeline(cls, ds: Dataset):
27 return ds.groupby("uid").aggregate(
28 total=Sum(of="vendor", window=Continuous("forever")),
29 )
python
Quantile
Aggregation to compute rolling quantiles (aka percentiles) for each group within a window.
Parameters
Name of the field in the input dataset over which the quantile should be computed.
This field must either be of type int
or float
.
The continuous window within which aggregation needs to be computed. Possible
values are "forever"
or any time duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type Optional[float]
unless default
is provided, in which case, it is expected to be of type float
.
Quantile over an empty set of rows isn't well defined - Fennel returns default
in such cases. If the default is not set or is None, Fennel returns None and in
that case, the expected type of into_field
must be Optional[float]
.
The percentile (between 0 and 1) to be calculated.
Default: False
If set to True, the calculated value isn't exact but only an approximation. Fennel only supports approximate quantiles for now so this kwarg must always be set to True.
Fennel uses uDDsketch data structure to compute approximate quantiles with an error bound set to be within 1% of the true value.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 Quantile,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 # new float fields added to the dataset by the quantile aggregation
25 median_amount_1w: float
26 timestamp: datetime
27
28 @pipeline
29 @inputs(Transaction)
30 def quantil_pipeline(cls, ds: Dataset):
31 return ds.groupby("uid").aggregate(
32 median_amount_1w=Quantile(
33 of="amount",
34 window=Continuous("1w"),
35 p=0.5,
36 approx=True,
37 default=0.0,
38 ),
39 )
python
Returns
Stores the result of the aggregation in the appropriate field of the output
dataset. If there are no rows in the aggregation window, default
is used.
Errors
The input column denoted by of
must either be of int
or float
or
decimal
types.
Note that like SQL, aggregations over Optional[int]
or Optional[float]
are allowed.
The type of the field denoted by into_field
in the output dataset should match
the default
. If default
is set and not None, the field should be float
else
it should be Optional[float]
.
Commit error if the value of p
is not between 0 and 1.
Commit error if approx
is not set to True. Fennel only supports approximate
quantiles for now but requires this kwarg to be set explicitly to both set the
right expectations and be compatible with future addition of exact quantiles.
1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12 uid: int
13 amount: str
14 timestamp: datetime
15
16@dataset
17class Aggregated:
18 uid: int = field(key=True)
19 median_amount_1w: float
20 timestamp: datetime
21
22 @pipeline
23 @inputs(Transaction)
24 def bad_pipeline(cls, ds: Dataset):
25 return ds.groupby("uid").aggregate(
26 median_amount_1w=Quantile(
27 of="amount",
28 window=Continuous("1w"),
29 p=0.5,
30 approx=True,
31 default=0.0,
32 ),
33 )
python
1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12 uid: int
13 amount: int
14 vendor: str
15 timestamp: datetime
16
17@dataset
18class Aggregated:
19 uid: int = field(key=True)
20 median_amount_1w: float
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def bad_pipeline(cls, ds: Dataset):
26 return ds.groupby("uid").aggregate(
27 median_amount_1w=Quantile(
28 of="amount",
29 window=Continuous("1w"),
30 p=0.5,
31 approx=True,
32 ),
33 )
python
1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.dtypes import Continuous
4from fennel.lib import inputs
5from fennel.connectors import source, Webhook
6
7webhook = Webhook(name="webhook")
8
9@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
10@dataset
11class Transaction:
12 uid: int
13 amount: int
14 vendor: str
15 timestamp: datetime
16
17@dataset
18class Aggregated:
19 uid: int = field(key=True)
20 median_amount_1w: Optional[float]
21 timestamp: datetime
22
23 @pipeline
24 @inputs(Transaction)
25 def bad_pipeline(cls, ds: Dataset):
26 return ds.groupby("uid").aggregate(
27 median_amount_1w=Quantile(
28 of="amount",
29 window=Continuous("1w"),
30 p=10.0,
31 approx=True,
32 ),
33 )
python
Exponential Decay Sum
Aggregation to compute a rolling exponential decay for each group within a window.
Parameters
Name of the field in the input dataset over which the decayed sum should be computed.
This field can only either be int
or float
or decimal
.
The continuous window within which something need to be counted. Possible values
are "forever"
or any time duration.
Half-life of the exponential decay. This is the time it takes for the value to
decay to half of its original value. The value of half_life
must be greater than
0. The value is of type duration.
The name of the field in the output dataset that should store the result of this
aggregation. This field is expected to be of type int
or float
- same as the
type of the field in the input dataset corresponding to of
.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 ExpDecaySum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17 uid: int
18 amount: int
19 timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23 uid: int = field(key=True)
24 # new int fields added to the dataset by the aggregation
25 amount_1w: float
26 total: float
27 timestamp: datetime
28
29 @pipeline
30 @inputs(Transaction)
31 def exp_decay_sum(cls, ds: Dataset):
32 return ds.groupby("uid").aggregate(
33 amount_1w=ExpDecaySum(
34 of="amount", window=Continuous("1w"), half_life="1d"
35 ),
36 total=ExpDecaySum(
37 of="amount",
38 window=Continuous("forever"),
39 half_life="1w",
40 ),
41 )
python
Returns
Accumulates the result in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0.0
Errors
The input column denoted by of
must either be of int
or float
types.
The output field denoted by into_field
must always be of type float
.
Note that like SQL, aggregations over Optional[int]
or Optional[float]
are allowed.
1from fennel.datasets import (
2 dataset,
3 field,
4 pipeline,
5 Dataset,
6 ExpDecaySum,
7)
8from fennel.dtypes import Continuous
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(
15 webhook.endpoint("Transaction"), disorder="14d", cdc="append"
16)
17@dataset
18class Transaction:
19 uid: int
20 amount: str
21 vendor: str
22 timestamp: datetime
23
24@dataset
25class Aggregated:
26 uid: int = field(key=True)
27 total: int
28 timestamp: datetime
29
30 @pipeline
31 @inputs(Transaction)
32 def bad_pipeline(cls, ds: Dataset):
33 return ds.groupby("uid").aggregate(
34 total=ExpDecaySum(
35 of="amount",
36 window=Continuous("forever"),
37 half_life="1w",
38 ),
39 )
python
The value of the decayed sum depends on the time you query the dataset. Hence it varies with request time for the same key in a dataset. Therefore pipelines containing this aggregation are terminal - no other operator can follow it.