Source
All Fennel sources are wrapped in the @source
decorator applied on top of the
datasets. This decorator specifies a bunch of options to configure the ingestion
mechanism that apply to most data sources.
Parameters
Default: "1h"
The frequency with which the ingestion should be carried out. Streaming sources like Kafka, Kinesis, Webhook ignore it since they do continuous polling.
Note that some Fennel sources make multiple round-trips of limited size in a single
iteration so as to not overload the system - every
only applies across full
iterations of ingestion.
Default: None
When since
is set, the source only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be >= since
.
Fennel reads as little data as possible given this constraint - for instance, when
reading parquet files, the filter is pushed all the way down. However, in
several cases, it's still necessary to read all the data before rejecting rows
that are older than since
.
Default: None
When until
is set, the source only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be < until
.
Fennel reads as little data as possible given this constraint - for instance, when
reading parquet files, the filter is pushed all the way down. However, in
several cases, it's still necessary to read all the data before rejecting rows
that are newer than until
.
Specifies how out of order can data from this source arrive.
Analogous to MaxOutOfOrderness
in Flink, this provides Fennel a guarantee that
if some row with timestamp t
has arrived, no other row with timestamp < t-disorder
can ever arrive. And if such rows do arrive, Fennel has the liberty of discarding
them and not including them in the computation.
When specifying sampling for a dataset, it can be provided in two ways:
- Simply specify the sampling rate when you want to sample the dataset without specifying the columns used for sampling.
- Sampling Rate: A float between 0 and 1 that determines the proportion of the dataset to include.
- Use the
Sample
object when you want to specify both the sampling rate and the specific columns used for sampling.- Sampling Rate: A float between 0 and 1 that determines the proportion of the dataset to include.
- Using: A list of columns used to hash for sampling the data. Preproc columns and the timestamp field cannot be included in this list.
Default Behavior When No Columns Are Specified
- For Keyed Datasets: All key columns are used for sampling, excluding any preproc columns.
- For Non-Keyed Datasets: All columns are used for sampling except for the timestamp and preproc columns.
Specifies how should valid change data be constructed from the ingested data.
"append"
means that data should be interpreted as sequence of append operations
with no deletes and no updates. Append can only be applied to keyless datasets (
to prevent situations where multiple inserts arrive with the same key fields). As
of right now, all SQL sources, Kafka, Kinesis, S3, and webhook support append
mode.
"upsert"
means that incoming data will only have inserts but should be
interpreted as sequence of upsert operations. It can only be used for keyed
datasets and works for every source where append works. Note that in order to
support "upsert"
, Fennel needs to maintain the last seen row for each key which
has some overhead. As a result, pre-prepared "debezium"
data should be preferred
over "upsert"
.
"native"
means that the underlying system exposes CDC natively and that Fennel
should tap into that. As of right now, native CDC is only available for
Deltalake & Hudi
and will soon be available for more sources including MySQL and Postgres.
"debezium"
means that the raw data itself is laid out in debezium layout out
of which valid CDC data can be constructed. This is only possible for sources
that expose raw schemaless data, namely, s3,
kinesis, kafka,
and webhook.
Default: None
When present, marks this source to be selected during commit
only when commit
operation itself is made for a env
that matches this env. Primary use case is to
decorate a single dataset with many @source
decorators and choose only one of
them to commit depending on the environment.
Default: None
When present, specifies the preproc behavior for the columns referred to by the keys of the dictionary.
As of right now, there are three kinds of values of preproc:
-
ref: Ref
: written asref(str)
and means that the column denoted by the key of this value is aliased to another column in the sourced data. This is useful, for instance, when you want to rename columns while bringing them to Fennel. With this, you can also perform indirections of kind A[B][C] and rename them while bringing to fennel. -
eval: Eval
: written aseval(Callable | Expr, schema: Dict[str, Type])
and means that the column denoted by the key of this value computed either through python callable or rust expressions. This is useful, for instance, when you want to change dtype of a column, add a new column using another column or fill nulls in the columns with some value to Fennel. The schema parameter is used to specify schema of columns which is needed for evaluation but not present in dataset. -
Any
: means that the column denoted by the key of this value should be given a constant value.
Fennel supports preproc ref(str) values of type A[B][C] only for the JSON, Avro and Protobuf formats, and A, B should be struct types. If you have data in other format or require indirection for other parent types apart from struct, please reach out to Fennel support.
Default: None
When present, filters source dataset rows with the input value.
As of now there are two kinds of values of where:
Callable
: In this case the input is a lambda which is used to filter rows.Eval
: Similar to eval value in preproc, the input here is an expression which is used to filter rows.
Default: False
When not set or set as False
, it indicates that the source possesses infinite
amount of data that is continuously increasing.
When set as True
, it indicates that the source possesses finite amount of data
and that it will exhaust at some point. In such cases, idleness
must also be set.
Default: None
Only relevant when bounded
is set to True
- in such cases, the bounded source
is assumed to have exhausted after Fennel is unable to obtain any new data despite
continuously asking for at least idleness
period.
1import pandas as pd
2from fennel.connectors import source, S3, ref, eval
3from fennel.datasets import dataset, field
4from fennel.connectors.connectors import Sample
5
6s3 = S3(name="my_s3") # using IAM role based access
7
8bucket = s3.bucket("data", path="user/*/date-%Y-%m-%d/*", format="parquet")
9
10if sys.version_info >= (3, 10):
11 @source(
12 bucket,
13 every="1h",
14 cdc="upsert",
15 disorder="2d",
16 since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021
17 until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022
18 preproc={
19 "uid": ref("user_id"), # 'uid' comes from column 'user_id'
20 "country": "USA", # country for every row should become 'USA'
21 "age": eval(
22 lambda x: pd.to_numeric(x["age"]).astype(int),
23 schema={"age": str},
24 ), # converting age dtype to int
25 },
26 where=eval(col("age") >= 18, schema={"age": int}),
27 env="prod",
28 sample=Sample(0.2, using=["email"]),
29 bounded=True,
30 idleness="1h",
31 )
32 @dataset
33 class User:
34 uid: int = field(key=True)
35 email: str
36 country: str
37 age: int
38 timestamp: datetime
python
Sink
All Fennel sinks are wrapped in the @sink
decorator applied on top of the
datasets. This decorator specifies a bunch of options to configure the sink
mechanism that apply to most data sinks.
Parameters
Default: "1h"
The frequency with which the sink should be carried out. Streaming sinks like Kafka, Kinesis, Webhook ignore it since they do continuous polling.
Note that some Fennel sinks make multiple round-trips of limited size in a single
iteration so as to not overload the system - every
only applies across full
iterations of ingestion.
Default: None
When since
is set, the sink only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be >= since
.
Default: None
When until
is set, the sink only admits those rows that where the value
corresponding to the timestamp
column of the dataset will be < until
.
Specifies how should valid change data be used for the sink data.
"debezium"
means that the raw data itself is laid out in debezium layout out
of which valid CDC data can be constructed. This is only possible for kafka sink
as of now
Default: None
When present, marks this sink to be selected during commit
only when commit
operation itself is made for a env
that matches this env. Primary use case is to
decorate a single dataset with many @sink
decorators and choose only one of
them to commit depending on the environment.
Default: None
This denotes the style of sink
- incremental: Fennel incrementally sinks the new data changes
- recreate: Fennel recreates the entire sink every time
- SnapshotData: Fennel sinks only the current snapshot of dataset
Fennel supports only Increment style of sink. If you want the style to be either Recreate or SnapshotData, please reach out to Fennel support.
Default: None
This means that the column denoted by the key is aliased to another column in the sink data. This is useful, for instance, when you want to rename columns while sinking them.
1from fennel.connectors import sink
2
3@dataset
4@sink(
5 s3.bucket("datalake", prefix="user", format="delta"),
6 every="1d",
7 how="incremental",
8 renames={
9 "uid": "new_uid", # 'uid' column should be renamed to 'new_uid'
10 "email": "new_email", # 'email' column should be renamed to 'new_email'
11 },
12 since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021
13 until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022
14 env="prod",
15)
16class SomeDatasetFiltered:
17 uid: int = field(key=True)
18 email: str
19 timestamp: datetime
20
21 @pipeline
22 @inputs(SomeDataset)
23 def gmail_filtered(cls, dataset: Dataset):
24 return dataset.filter(
25 lambda row: row["email"].contains("gmail.com")
26 )
python