API
Docs

Source

All Fennel sources are wrapped in the @source decorator applied on top of the datasets. This decorator specifies a bunch of options to configure the ingestion mechanism that apply to most data sources.

Parameters

every:Duration

Default: "1h"

The frequency with which the ingestion should be carried out. Streaming sources like Kafka, Kinesis, Webhook ignore it since they do continuous polling.

Note that some Fennel sources make multiple round-trips of limited size in a single iteration so as to not overload the system - every only applies across full iterations of ingestion.

since:Optional[datetime]

Default: None

When since is set, the source only admits those rows that where the value corresponding to the timestamp column of the dataset will be >= since.

Fennel reads as little data as possible given this constraint - for instance, when reading parquet files, the filter is pushed all the way down. However, in several cases, it's still necessary to read all the data before rejecting rows that are older than since.

until:Optional[datetime]

Default: None

When until is set, the source only admits those rows that where the value corresponding to the timestamp column of the dataset will be < until.

Fennel reads as little data as possible given this constraint - for instance, when reading parquet files, the filter is pushed all the way down. However, in several cases, it's still necessary to read all the data before rejecting rows that are newer than until.

disorder:Duration

Specifies how out of order can data from this source arrive.

Analogous to MaxOutOfOrderness in Flink, this provides Fennel a guarantee that if some row with timestamp t has arrived, no other row with timestamp < t-disorder can ever arrive. And if such rows do arrive, Fennel has the liberty of discarding them and not including them in the computation.

sample:Optional[float] | Optional[Sample]

When specifying sampling for a dataset, it can be provided in two ways:

  1. Simply specify the sampling rate when you want to sample the dataset without specifying the columns used for sampling.
    • Sampling Rate: A float between 0 and 1 that determines the proportion of the dataset to include.
  2. Use the Sample object when you want to specify both the sampling rate and the specific columns used for sampling.
    • Sampling Rate: A float between 0 and 1 that determines the proportion of the dataset to include.
    • Using: A list of columns used to hash for sampling the data. Preproc columns and the timestamp field cannot be included in this list.

Default Behavior When No Columns Are Specified

  1. For Keyed Datasets: All key columns are used for sampling, excluding any preproc columns.
  2. For Non-Keyed Datasets: All columns are used for sampling except for the timestamp and preproc columns.
cdc:"append" | "native" | "debezium"

Specifies how should valid change data be constructed from the ingested data.

"append" means that data should be interpreted as sequence of append operations with no deletes and no updates. Append can only be applied to keyless datasets ( to prevent situations where multiple inserts arrive with the same key fields). As of right now, all SQL sources, Kafka, Kinesis, S3, and webhook support append mode.

"upsert" means that incoming data will only have inserts but should be interpreted as sequence of upsert operations. It can only be used for keyed datasets and works for every source where append works. Note that in order to support "upsert", Fennel needs to maintain the last seen row for each key which has some overhead. As a result, pre-prepared "debezium" data should be preferred over "upsert".

"native" means that the underlying system exposes CDC natively and that Fennel should tap into that. As of right now, native CDC is only available for Deltalake & Hudi and will soon be available for more sources including MySQL and Postgres.

"debezium" means that the raw data itself is laid out in debezium layout out of which valid CDC data can be constructed. This is only possible for sources that expose raw schemaless data, namely, s3, kinesis, kafka, and webhook.

env:None | str | List[str]

Default: None

When present, marks this source to be selected during commit only when commit operation itself is made for a env that matches this env. Primary use case is to decorate a single dataset with many @source decorators and choose only one of them to commit depending on the environment.

preproc:Optional[Dict[str, Union[Ref, Any]]]

Default: None

When present, specifies the preproc behavior for the columns referred to by the keys of the dictionary.

As of right now, there are three kinds of values of preproc:

  • ref: Ref: written as ref(str) and means that the column denoted by the key of this value is aliased to another column in the sourced data. This is useful, for instance, when you want to rename columns while bringing them to Fennel. With this, you can also perform indirections of kind A[B][C] and rename them while bringing to fennel.

  • eval: Eval: written as eval(Callable | Expr, schema: Dict[str, Type]) and means that the column denoted by the key of this value computed either through python callable or rust expressions. This is useful, for instance, when you want to change dtype of a column, add a new column using another column or fill nulls in the columns with some value to Fennel. The schema parameter is used to specify schema of columns which is needed for evaluation but not present in dataset.

  • Any: means that the column denoted by the key of this value should be given a constant value.

Note

Fennel supports preproc ref(str) values of type A[B][C] only for the JSON, Avro and Protobuf formats, and A, B should be struct types. If you have data in other format or require indirection for other parent types apart from struct, please reach out to Fennel support.

where:Optional[Callable | Eval]

Default: None

When present, filters source dataset rows with the input value.

As of now there are two kinds of values of where:

  • Callable: In this case the input is a lambda which is used to filter rows.
  • Eval: Similar to eval value in preproc, the input here is an expression which is used to filter rows.
bounded:bool

Default: False

When not set or set as False, it indicates that the source possesses infinite amount of data that is continuously increasing.

When set as True, it indicates that the source possesses finite amount of data and that it will exhaust at some point. In such cases, idleness must also be set.

idleness:Optional[Duration]

Default: None

Only relevant when bounded is set to True - in such cases, the bounded source is assumed to have exhausted after Fennel is unable to obtain any new data despite continuously asking for at least idleness period.

1import pandas as pd
2from fennel.connectors import source, S3, ref, eval
3from fennel.datasets import dataset, field
4from fennel.connectors.connectors import Sample
5
6s3 = S3(name="my_s3")  # using IAM role based access
7
8bucket = s3.bucket("data", path="user/*/date-%Y-%m-%d/*", format="parquet")
9
10if sys.version_info >= (3, 10):
11    @source(
12        bucket,
13        every="1h",
14        cdc="upsert",
15        disorder="2d",
16        since=datetime(2021, 1, 1, 3, 30, 0),  # 3:30 AM on 1st Jan 2021
17        until=datetime(2022, 1, 1, 0, 0, 0),  # 12:00 AM on 1st Jan 2022
18        preproc={
19            "uid": ref("user_id"),  # 'uid' comes from column 'user_id'
20            "country": "USA",  # country for every row should become 'USA'
21            "age": eval(
22                lambda x: pd.to_numeric(x["age"]).astype(int),
23                schema={"age": str},
24            ),  # converting age dtype to int
25        },
26        where=eval(col("age") >= 18, schema={"age": int}),
27        env="prod",
28        sample=Sample(0.2, using=["email"]),
29        bounded=True,
30        idleness="1h",
31    )
32    @dataset
33    class User:
34        uid: int = field(key=True)
35        email: str
36        country: str
37        age: int
38        timestamp: datetime
Specifying options in source decorator

python

Sink

All Fennel sinks are wrapped in the @sink decorator applied on top of the datasets. This decorator specifies a bunch of options to configure the sink mechanism that apply to most data sinks.

Parameters

every:Duration

Default: "1h"

The frequency with which the sink should be carried out. Streaming sinks like Kafka, Kinesis, Webhook ignore it since they do continuous polling.

Note that some Fennel sinks make multiple round-trips of limited size in a single iteration so as to not overload the system - every only applies across full iterations of ingestion.

since:Optional[datetime]

Default: None

When since is set, the sink only admits those rows that where the value corresponding to the timestamp column of the dataset will be >= since.

until:Optional[datetime]

Default: None

When until is set, the sink only admits those rows that where the value corresponding to the timestamp column of the dataset will be < until.

cdc:Optional[Literal["debezium"]]

Specifies how should valid change data be used for the sink data.

"debezium" means that the raw data itself is laid out in debezium layout out of which valid CDC data can be constructed. This is only possible for kafka sink as of now

env:None | str | List[str]

Default: None

When present, marks this sink to be selected during commit only when commit operation itself is made for a env that matches this env. Primary use case is to decorate a single dataset with many @sink decorators and choose only one of them to commit depending on the environment.

how:Optional[Literal["incremental", "recreate"] | SnapshotData]

Default: None

This denotes the style of sink

  • incremental: Fennel incrementally sinks the new data changes
  • recreate: Fennel recreates the entire sink every time
  • SnapshotData: Fennel sinks only the current snapshot of dataset
Note

Fennel supports only Increment style of sink. If you want the style to be either Recreate or SnapshotData, please reach out to Fennel support.

renames:Optional[Dict[str, str]]

Default: None

This means that the column denoted by the key is aliased to another column in the sink data. This is useful, for instance, when you want to rename columns while sinking them.

1from fennel.connectors import sink
2
3@dataset
4@sink(
5    s3.bucket("datalake", prefix="user", format="delta"),
6    every="1d",
7    how="incremental",
8    renames={
9        "uid": "new_uid",  # 'uid' column should be renamed to 'new_uid'
10        "email": "new_email",  # 'email' column should be renamed to 'new_email'
11    },
12    since=datetime(2021, 1, 1, 3, 30, 0),  # 3:30 AM on 1st Jan 2021
13    until=datetime(2022, 1, 1, 0, 0, 0),  # 12:00 AM on 1st Jan 2022
14    env="prod",
15)
16class SomeDatasetFiltered:
17    uid: int = field(key=True)
18    email: str
19    timestamp: datetime
20
21    @pipeline
22    @inputs(SomeDataset)
23    def gmail_filtered(cls, dataset: Dataset):
24        return dataset.filter(
25            lambda row: row["email"].contains("gmail.com")
26        )
Specifying options in sink decorator

python