Certificate
Certificate to be used for HTTP-based authentication
Parameters
CA Certificate required for client to authenticate the server
1from fennel.connectors import sink, HTTP, Certificate
2from fennel.integrations.aws import Secret
3
4aws_secret = Secret(
5 arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
6 role_arn="arn:aws:iam::123456789012:role/secret-access-role",
7)
8
9http = HTTP(
10 name="http",
11 host="http://http-echo-server.harsha.svc.cluster.local:8081/",
12 healthz="/health",
13 ca_cert=Certificate(aws_secret["ca_cert"]),
14)
15
16@dataset
17@sink(
18 http.path(endpoint="/sink", limit=1000, headers={"Foo": "Bar"}),
19 cdc="debezium",
20 how="incremental",
21)
22class SomeDatasetFiltered:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(SomeDataset)
29 def gmail_filtered(cls, dataset: Dataset):
30 return dataset.filter(
31 lambda row: row["email"].contains("gmail.com")
32 )
python
HTTP
Data sink to HTTP endpoints.
Connector Parameters
A name to identify the sink. The name should be unique across all Fennel connectors.
The HTTP host URL. Example: https://127.0.0.1:8081
The health check endpoint to verify the server's availability.
Parameter for certificate-based authentication
HTTP Path Parameters
The specific endpoint where data will be sent
The number of records to include in each request to the endpoint. Default: 100
A map of headers to include with each request
1from fennel.connectors import sink, HTTP, Certificate
2from fennel.integrations.aws import Secret
3
4aws_secret = Secret(
5 arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
6 role_arn="arn:aws:iam::123456789012:role/secret-access-role",
7)
8
9http = HTTP(
10 name="http",
11 host="http://http-echo-server.harsha.svc.cluster.local:8081/",
12 healthz="/health",
13 ca_cert=Certificate(aws_secret["ca_cert"]),
14)
15
16@dataset
17@sink(
18 http.path(endpoint="/sink", limit=1000, headers={"Foo": "Bar"}),
19 cdc="debezium",
20 how="incremental",
21)
22class SomeDatasetFiltered:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
26
27 @pipeline
28 @inputs(SomeDataset)
29 def gmail_filtered(cls, dataset: Dataset):
30 return dataset.filter(
31 lambda row: row["email"].contains("gmail.com")
32 )
python
Errors
Fennel tries to test the connection with your HTTP sink during commit
itself
using the health check endpoint
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
- HTTP sink ensures at least once delivery. To handle duplicates, use
["payload"]["source"]["fennel"]["partition"]
and["payload"]["source"]["fennel"]["offset"]
fields in the output.
Kafka
Data sink to any data store that speaks the Kafka protocol (e.g. Native Kafka, MSK, Redpanda etc.)
Cluster Parameters
A name to identify the sink. This name should be unique across ALL connectors.
This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself and discover the rest of the brokers in the cluster.
Addresses are written as host & port pairs and can be specified either as a
single server (e.g. localhost:9092
) or a comma separated list of several
servers (e.g. localhost:9092,another.host:9092
).
Protocol used to communicate with the brokers.
SASL mechanism to use for authentication.
SASL username.
SASL password.
Topic Parameters
The name of the kafka topic that needs to be sinked.
1from fennel.connectors import sink
2
3@dataset
4@sink(kafka.topic("gmail_filtered"), cdc="debezium")
5class SomeDatasetFiltered:
6 uid: int = field(key=True)
7 email: str
8 timestamp: datetime
9
10 @pipeline
11 @inputs(SomeDataset)
12 def gmail_filtered(cls, dataset: Dataset):
13 return dataset.filter(
14 lambda row: row["email"].contains("gmail.com")
15 )
python
Errors
Fennel server tries to connect with the Kafka broker during the commit
operation
itself to validate connectivity - as a result, incorrect URL/Username/Password
etc will be caught at commit time itself as an error.
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
- Fennel supports kafka sink with only the JSON debezium format. Given the ubiquity of debezium connectors, you should be able to further pipe this debezium data from Kafka to your data store of choice. In case you require support for other formats, please reach out to Fennel support.
S3
Data connector to sink data to S3.
Account Parameters
A name to identify the sink. The name should be unique across all Fennel connectors.
Default: None
AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.
Default: None
AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.
Default: None
Role ARN to assume to get access to S3. This field is not required if role-based access is used or if AWS access and secret keys are used or if the bucket is public.
Bucket Parameters
The name of the S3 bucket where the data files have to be sinked.
Default: None
The prefix of the bucket (as relative path within bucket) where the data files
should be sinked. For instance, some-folder/
or A/B/C
are all valid prefixes. Prefix
can not have any wildcard characters.
Default: ,
The character delimiting individual cells in the CSV data - only relevant when
format is CSV
, otherwise it's ignored.
The default value is ","
can be overridden by any other 1-character string. For
example, to use tab-delimited data enter "\t"
.
1from fennel.connectors import sink
2
3@dataset
4@sink(
5 s3.bucket("datalake", prefix="user", format="delta"),
6 every="1d",
7 how="incremental",
8 renames={"uid": "new_uid"},
9)
10class SomeDatasetFiltered:
11 uid: int = field(key=True)
12 email: str
13 timestamp: datetime
14
15 @pipeline
16 @inputs(SomeDataset)
17 def gmail_filtered(cls, dataset: Dataset):
18 return dataset.filter(
19 lambda row: row["email"].contains("gmail.com")
20 )
python
Errors
Fennel server try to do some lightweight operations on the bucket during the commit operation - all connectivity or authentication related errors should be caught during the commit itself.
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
Enabling IAM Access
Fennel creates a role with name prefixed by FennelDataAccessRole-
in
your AWS account for role-based access. In order to use IAM access for S3, please
ensure that this role has access to read and do list files on the buckets of
interest.
With that ready, simply don't specify aws_access_key_id
and
aws_secret_access_key
and Fennel will automatically fall back to IAM based
access.
In case you do not want to provide S3 access to FennelDataAccessRole-
, pass role_arn
parameter inside connector params and make sure FennelDataAccessRole-
can assume that IAM role
- Fennel appends a generated suffix to the above provided prefix to avoid ambiguities when the sink dataset version is updated or when multiple branches have the same sink defined. This suffix can be found in the 'Sink' tab of the console after initiating a data sink.
- A keyless dataset sink ensures at least once delivery, while a keyed dataset sink guarantees
exactly once delivery. For keyless datasets, use the
__fennel_hash__
column to identify and filter out duplicate deliveries. - Fennel supports S3 sink with only delta format and access to S3 through
FennelDataAccessRole-
. In case you require support for other formats or access mechanisms, please reach out to Fennel support
Snowflake
Data sink to Snowflake databases.
Database Parameters
A name to identify the sink. The name should be unique across all Fennel connectors.
Snowflake account identifier. This is the first part of the URL used to access
Snowflake. For example, if the URL is https://<account>.snowflakecomputing.com
,
then the account is <account>
.
This is usually of the form <ORG_ID>-<ACCOUNT_ID>
. Refer to the
Snowflake documentation
to find the account identifier.
The role that should be used by Fennel to access Snowflake.
The warehouse that should be used to access Snowflake.
The name of the database where the data has to be sinked.
The schema where the required data has to be sinked.
The username which should be used to access Snowflake. This username should
have required permissions to assume the provided role
.
The password associated with the username.
Table Parameters
The prefix of the table within the database to which the data should be sinked.
1from fennel.connectors import sink
2
3@dataset
4@sink(
5 snowflake.table("test_table"),
6 every="1d",
7 how="incremental",
8 renames={"uid": "new_uid"},
9)
10class SomeDatasetFiltered:
11 uid: int = field(key=True)
12 email: str
13 timestamp: datetime
14
15 @pipeline
16 @inputs(SomeDataset)
17 def gmail_filtered(cls, dataset: Dataset):
18 return dataset.filter(
19 lambda row: row["email"].contains("gmail.com")
20 )
python
Errors
Fennel tries to test the connection with your Snowflake during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data sink and hence is unable to do this validation at commit time.
- Fennel appends a generated suffix to the provided table name to prevent ambiguities when the sink dataset version is updated or when multiple branches have the same sink defined. This suffix can be viewed in the 'Sink' tab of the console after initiating a data sink.
- A keyless dataset sink ensures at least once delivery, while a keyed dataset sink guarantees
exactly once delivery. For keyless datasets, use the
__fennel_hash__
column to identify and filter out duplicate deliveries.