API
Docs

Certificate

Certificate to be used for HTTP-based authentication

Parameters

cert:str | Secret

CA Certificate required for client to authenticate the server

1from fennel.connectors import sink, HTTP, Certificate
2from fennel.integrations.aws import Secret
3
4aws_secret = Secret(
5    arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
6    role_arn="arn:aws:iam::123456789012:role/secret-access-role",
7)
8
9http = HTTP(
10    name="http",
11    host="http://http-echo-server.harsha.svc.cluster.local:8081/",
12    healthz="/health",
13    ca_cert=Certificate(aws_secret["ca_cert"]),
14)
15
16@dataset
17@sink(
18    http.path(endpoint="/sink", limit=1000, headers={"Foo": "Bar"}),
19    cdc="debezium",
20    how="incremental",
21)
22class SomeDatasetFiltered:
23    uid: int = field(key=True)
24    email: str
25    timestamp: datetime
26
27    @pipeline
28    @inputs(SomeDataset)
29    def gmail_filtered(cls, dataset: Dataset):
30        return dataset.filter(
31            lambda row: row["email"].contains("gmail.com")
32        )
Using certificate with HTTP sink

python

HTTP

Data sink to HTTP endpoints.

Connector Parameters

name:str

A name to identify the sink. The name should be unique across all Fennel connectors.

host:str|Secret

The HTTP host URL. Example: https://127.0.0.1:8081

healthz:str

The health check endpoint to verify the server's availability.

ca_cert:Certificate

Parameter for certificate-based authentication

HTTP Path Parameters

endpoint:str

The specific endpoint where data will be sent

limit:Optional[int]

The number of records to include in each request to the endpoint. Default: 100

headers:Dict[str,str]

A map of headers to include with each request

1from fennel.connectors import sink, HTTP, Certificate
2from fennel.integrations.aws import Secret
3
4aws_secret = Secret(
5    arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
6    role_arn="arn:aws:iam::123456789012:role/secret-access-role",
7)
8
9http = HTTP(
10    name="http",
11    host="http://http-echo-server.harsha.svc.cluster.local:8081/",
12    healthz="/health",
13    ca_cert=Certificate(aws_secret["ca_cert"]),
14)
15
16@dataset
17@sink(
18    http.path(endpoint="/sink", limit=1000, headers={"Foo": "Bar"}),
19    cdc="debezium",
20    how="incremental",
21)
22class SomeDatasetFiltered:
23    uid: int = field(key=True)
24    email: str
25    timestamp: datetime
26
27    @pipeline
28    @inputs(SomeDataset)
29    def gmail_filtered(cls, dataset: Dataset):
30        return dataset.filter(
31            lambda row: row["email"].contains("gmail.com")
32        )
HTTP sink

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your HTTP sink during commit itself using the health check endpoint

Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.

Note
  • HTTP sink ensures at least once delivery. To handle duplicates, use ["payload"]["source"]["fennel"]["partition"] and ["payload"]["source"]["fennel"]["offset"] fields in the output.

Kafka

Data sink to any data store that speaks the Kafka protocol (e.g. Native Kafka, MSK, Redpanda etc.)

Cluster Parameters

name:str

A name to identify the sink. This name should be unique across ALL connectors.

bootstrap_servers:str

This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself and discover the rest of the brokers in the cluster.

Addresses are written as host & port pairs and can be specified either as a single server (e.g. localhost:9092) or a comma separated list of several servers (e.g. localhost:9092,another.host:9092).

security_protocol:"PLAINTEXT" | "SASL_PLAINTEXT" | "SASL_SSL"

Protocol used to communicate with the brokers.

sasl_mechanism:"PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512" | "GSSAPI"

SASL mechanism to use for authentication.

sasl_plain_username:Optional[str] | Optional[Secret]

SASL username.

sasl_plain_password:Optional[str] | Optional[Secret]

SASL password.

Topic Parameters

topic:str

The name of the kafka topic that needs to be sinked.

1from fennel.connectors import sink
2
3@dataset
4@sink(kafka.topic("gmail_filtered"), cdc="debezium")
5class SomeDatasetFiltered:
6    uid: int = field(key=True)
7    email: str
8    timestamp: datetime
9
10    @pipeline
11    @inputs(SomeDataset)
12    def gmail_filtered(cls, dataset: Dataset):
13        return dataset.filter(
14            lambda row: row["email"].contains("gmail.com")
15        )
Capturing change from a dataset to a Kafka sink

python

Errors

Connectivity problems:

Fennel server tries to connect with the Kafka broker during the commit operation itself to validate connectivity - as a result, incorrect URL/Username/Password etc will be caught at commit time itself as an error.

Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.

Note
  • Fennel supports kafka sink with only the JSON debezium format. Given the ubiquity of debezium connectors, you should be able to further pipe this debezium data from Kafka to your data store of choice. In case you require support for other formats, please reach out to Fennel support.

S3

Data connector to sink data to S3.

Account Parameters

name:str

A name to identify the sink. The name should be unique across all Fennel connectors.

aws_access_key_id:Optional[str]

Default: None

AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.

aws_secret_access_key:Optional[str]

Default: None

AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.

role_arn:Optional[str]

Default: None

Role ARN to assume to get access to S3. This field is not required if role-based access is used or if AWS access and secret keys are used or if the bucket is public.

Bucket Parameters

bucket:str

The name of the S3 bucket where the data files have to be sinked.

prefix:Optional[str]

Default: None

The prefix of the bucket (as relative path within bucket) where the data files should be sinked. For instance, some-folder/ or A/B/C are all valid prefixes. Prefix can not have any wildcard characters.

format:str

Default: csv

The format of the files you'd like to sink. Valid values are "csv", "parquet", "json", "delta" or "hudi".

delimiter:Optional[str]

Default: ,

The character delimiting individual cells in the CSV data - only relevant when format is CSV, otherwise it's ignored.

The default value is "," can be overridden by any other 1-character string. For example, to use tab-delimited data enter "\t".

1from fennel.connectors import sink
2
3@dataset
4@sink(
5    s3.bucket("datalake", prefix="user", format="delta"),
6    every="1d",
7    how="incremental",
8    renames={"uid": "new_uid"},
9)
10class SomeDatasetFiltered:
11    uid: int = field(key=True)
12    email: str
13    timestamp: datetime
14
15    @pipeline
16    @inputs(SomeDataset)
17    def gmail_filtered(cls, dataset: Dataset):
18        return dataset.filter(
19            lambda row: row["email"].contains("gmail.com")
20        )
S3 sink

python

Errors

Connectivity or authentication errors:

Fennel server try to do some lightweight operations on the bucket during the commit operation - all connectivity or authentication related errors should be caught during the commit itself.

Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.

Enabling IAM Access

Fennel creates a role with name prefixed by FennelDataAccessRole- in your AWS account for role-based access. In order to use IAM access for S3, please ensure that this role has access to read and do list files on the buckets of interest.

With that ready, simply don't specify aws_access_key_id and aws_secret_access_key and Fennel will automatically fall back to IAM based access.

In case you do not want to provide S3 access to FennelDataAccessRole-, pass role_arn parameter inside connector params and make sure FennelDataAccessRole- can assume that IAM role

Note
  • Fennel appends a generated suffix to the above provided prefix to avoid ambiguities when the sink dataset version is updated or when multiple branches have the same sink defined. This suffix can be found in the 'Sink' tab of the console after initiating a data sink.
  • A keyless dataset sink ensures at least once delivery, while a keyed dataset sink guarantees exactly once delivery. For keyless datasets, use the __fennel_hash__ column to identify and filter out duplicate deliveries.
  • Fennel supports S3 sink with only delta format and access to S3 through FennelDataAccessRole-. In case you require support for other formats or access mechanisms, please reach out to Fennel support

Snowflake

Data sink to Snowflake databases.

Database Parameters

name:str

A name to identify the sink. The name should be unique across all Fennel connectors.

account:str

Snowflake account identifier. This is the first part of the URL used to access Snowflake. For example, if the URL is https://<account>.snowflakecomputing.com, then the account is <account>.

This is usually of the form <ORG_ID>-<ACCOUNT_ID>. Refer to the Snowflake documentation to find the account identifier.

role:str

The role that should be used by Fennel to access Snowflake.

warehouse:str

The warehouse that should be used to access Snowflake.

db_name:str

The name of the database where the data has to be sinked.

schema:str

The schema where the required data has to be sinked.

username:str | Secret

The username which should be used to access Snowflake. This username should have required permissions to assume the provided role.

password:str | Secret

The password associated with the username.

Table Parameters

table:str

The prefix of the table within the database to which the data should be sinked.

1from fennel.connectors import sink
2
3@dataset
4@sink(
5    snowflake.table("test_table"),
6    every="1d",
7    how="incremental",
8    renames={"uid": "new_uid"},
9)
10class SomeDatasetFiltered:
11    uid: int = field(key=True)
12    email: str
13    timestamp: datetime
14
15    @pipeline
16    @inputs(SomeDataset)
17    def gmail_filtered(cls, dataset: Dataset):
18        return dataset.filter(
19            lambda row: row["email"].contains("gmail.com")
20        )
Snowflake sink

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your Snowflake during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.

Note
  • Fennel appends a generated suffix to the provided table name to prevent ambiguities when the sink dataset version is updated or when multiple branches have the same sink defined. This suffix can be viewed in the 'Sink' tab of the console after initiating a data sink.
  • A keyless dataset sink ensures at least once delivery, while a keyed dataset sink guarantees exactly once delivery. For keyless datasets, use the __fennel_hash__ column to identify and filter out duplicate deliveries.