Avro Registry
Several Fennel sources work with Avro format. When using Avro, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.
Fennel supports integration with avro schema registries.
Parameters
String denoting the provider of the registry. As of right now, Fennel only supports "confluent" avro registry though more such schema registries may be added over time.
The URL where the schema registry is hosted.
User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.
Assuming authentication is needed, either username/password must be provided or a token, but not both.
The password associated with the username.
Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.
1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5 name="my_kafka",
6 bootstrap_servers="localhost:9092", # could come via os env var too
7 security_protocol="SASL_PLAINTEXT",
8 sasl_mechanism="PLAIN",
9 sasl_plain_username=os.environ["KAFKA_USERNAME"],
10 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13avro = Avro(
14 registry="confluent",
15 url=os.environ["SCHEMA_REGISTRY_URL"],
16 username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17 password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
python
Protobuf Registry
Several Fennel sources work with Protobuf format. When using Protobuf, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.
Fennel supports integration with protobuf schema registries.
Parameters
String denoting the provider of the registry. As of right now, Fennel only supports "confluent" protobuf registry though more such schema registries may be added over time.
The URL where the schema registry is hosted.
User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.
Assuming authentication is needed, either username/password must be provided or a token, but not both.
The password associated with the username.
Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.
1from fennel.connectors import source, Kafka, Protobuf
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5 name="my_kafka",
6 bootstrap_servers="localhost:9092", # could come via os env var too
7 security_protocol="SASL_PLAINTEXT",
8 sasl_mechanism="PLAIN",
9 sasl_plain_username=os.environ["KAFKA_USERNAME"],
10 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13protobuf = Protobuf(
14 registry="confluent",
15 url=os.environ["SCHEMA_REGISTRY_URL"],
16 username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17 password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=protobuf), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23 uid: int = field(key=True)
24 email: str
25 timestamp: datetime
python
BigQuery
Data connector to Google BigQuery databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The project ID of the Google Cloud project containing the BigQuery dataset.
The ID of the BigQuery dataset containing the table(s) to replicate.
A dictionary containing the credentials for the Service Account to use to access BigQuery. See below for instructions on how to obtain this.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
Errors
Fennel tries to test the connection with your BigQuery during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
BigQuery Credentials
Interfacing with BigQuery requires credentials for a Service Account with the "BigQuery User" role at Project level and "BigQuery Data Editor" role at Dataset level. "BigQuery User" role grants permissions to run BigQuery jobs and "BigQuery Data Editor" role grants permissions to read and update table data and its metadata. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.
The easiest way to create a Service Account is to follow GCP's guide
for Creating a Service Account. Once you've
created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles.
Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com
Then, add the service account as a Member of your Google Cloud Project with the "BigQuery User" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.
At this point, you should have a service account with the "BigQuery User" project-level permission.
Similarly, provide the "BigQuery Data Editor" permission to the service account by following Granting Access to Dataset in the Google documentation.
To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.
1from fennel.connectors import source, BigQuery
2from fennel.datasets import dataset
3
4bq = BigQuery(
5 name="my_bigquery",
6 project_id="my_project",
7 dataset_id="my_dataset",
8 service_account_key={
9 "type": "service_account",
10 "project_id": "fake-project-356105",
11 "client_email": "[email protected]",
12 "client_id": "103688493243243272951",
13 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
14 "token_uri": "https://oauth2.googleapis.com/token",
15 "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
16 },
17)
18
19table = bq.table("user", cursor="timestamp")
20
21@source(table, disorder="14d", cdc="append")
22@dataset
23class UserClick:
24 uid: int
25 ad_id: int
26 timestamp: datetime
python
Deltalake
Data connector to read data from tables in deltalake living in S3.
Deltalake connector is implemented via s3 connector - just the format parameter needs to be setup as 'delta' and source CDC must be passed as 'native'.
To enable Change Data Feed (CDF) on your table, follow the instructions in this documentation
Fennel doesn't support reading delta tables from HDFS or any other non-S3 storage.
Fennel supports only native CDC with data in Deltalake format. If your data is of the append type, enable CDF on the table and use native CDC mode.
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="mys3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11 s3.bucket("data", prefix="user", format="delta"),
12 every="1h",
13 disorder="14d",
14 cdc="native",
15)
16@dataset
17class User:
18 uid: int = field(key=True)
19 email: str
20 timestamp: datetime
python
Hudi
Data connector to read data from Apache Hudi tables in S3.
Hudi connector is implemented via s3 connector - just the format parameter needs to be setup as 'hudi'
Fennel doesn't support reading hudi tables from HDFS or any other non-S3 storage.
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="mys3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11 s3.bucket("data", prefix="user", format="hudi"),
12 disorder="14d",
13 cdc="upsert",
14 every="1h",
15)
16@dataset
17class User:
18 uid: int = field(key=True)
19 email: str
20 timestamp: datetime
python
Kafka
Data connector to any data store that speaks the Kafka protocol (e.g. Native Kafka, MSK, Redpanda etc.)
Cluster Parameters
A name to identify the source. This name should be unique across ALL connectors.
This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself and discover the rest of the brokers in the cluster.
Addresses are written as host & port pairs and can be specified either as a
single server (e.g. localhost:9092
) or a comma separated list of several
servers (e.g. localhost:9092,another.host:9092
).
Protocol used to communicate with the brokers.
SASL mechanism to use for authentication.
SASL username.
SASL password.
Topic Parameters
The name of the kafka topic that needs to be sourced into the dataset.
1from fennel.connectors import source, Kafka
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5 name="my_kafka",
6 bootstrap_servers="localhost:9092", # could come via os env var too
7 security_protocol="SASL_PLAINTEXT",
8 sasl_mechanism="PLAIN",
9 sasl_plain_username=os.environ["KAFKA_USERNAME"],
10 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13@source(kafka.topic("user", format="json"), disorder="14d", cdc="upsert")
14@dataset
15class SomeDataset:
16 uid: int = field(key=True)
17 email: str
18 timestamp: datetime
python
Fennel supports only Append and Upsert mode CDC with data in Protobuf format. If you require support for CDC data format, please reach out to Fennel support.
Errors
Fennel server tries to connect with the Kafka broker during the commit
operation
itself to validate connectivity - as a result, incorrect URL/Username/Password
etc will be caught at commit time itself as an error.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Kafka can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Kinesis
Data connector to ingest data from AWS Kinesis.
Parameters for Defining Source
A name to identify the source. The name should be unique across all Fennel connectors.
The arn of the role that Fennel should use to access the Kinesis stream. The role must already exist and Fennel's principal must have been given the permission to assume this role (see below for details or talk to Fennel support if you need help).
Stream Parameters
The arn of the Kinesis stream. The corresponding role_arn
must have
appropriate permissions for this stream. Providing a stream that either doesn't
exist or can not be read using the given role_arn
will result in an error
during the commit operation.
The initial position in the stream from which Fennel should start ingestion. See Kinesis ShardIteratorType for more context. Allowed values are:
"latest"
- start from the latest data (starting a few minutes aftercommit
)"trim_horizon"
- start from the oldest data that hasn't been trimmed/expired yet.- datetime - start from the position denoted by this timestamp (i.e. equivalent
to
AT_TIMESTAMP
in Kinesis vocabulary).
If choosing the datetime option, the timestamp can be specified as a datetime
object, or as an int
representing seconds since the epoch, or as a float
representing {seconds}.{microseconds}
since the epoch or as an ISO-8601 formatted str
.
Note that this timestamp is the time attached with the Kinesis message itself at the time of production, not any timestamp field inside the message.
The format of the data in the Kinesis stream. Most common value is "json"
though Fennel also supports Avro.
Errors
Fennel server tries to connect with Kinesis during the commit
operation
itself to validate connectivity - as a result, incorrect stream/role ARNs or
insufficient permissions will be caught at commit time itself as an error.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5 name="my_kinesis",
6 role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10 stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11 init_position=datetime(2023, 1, 5), # Start ingesting from Jan 5, 2023
12 format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18 uid: int
19 order_id: str
20 amount: float
21 timestamp: datetime
python
1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5 name="my_kinesis",
6 role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10 stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11 init_position="latest",
12 format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18 uid: int
19 order_id: str
20 amount: float
21 timestamp: datetime
python
Managing Kinesis Access
Fennel creates a special role with name prefixed by FennelDataAccessRole-
in
your AWS account for role-based access. The role corresponding to the role_arn
passed to Kinesis source should have the following trust policy allowing this
special Fennel role to assume the kinesis role.
See Trust Policy
Specify the exact role_arn
in the form
arn:aws:iam::<fennel-data-plane-account-id>:role/<FennelDataAccessRole-...>
without any wildcards.
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "",
6 "Effect": "Allow",
7 "Principal": {
8 "AWS": [
9 "<role_arn>"
10 ]
11 },
12 "Action": "sts:AssumeRole"
13 }
14 ]
15}
python
Also attach the following permission policy. Add more streams to the Resource
field if more than one streams need to be consumed via this role. Here
the account-id
is your account where the stream lives.
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "AllowKinesisAccess",
6 "Effect": "Allow",
7 "Action": [
8 "kinesis:DescribeStream",
9 "kinesis:DescribeStreamSummary",
10 "kinesis:DescribeStreamConsumer",
11 "kinesis:RegisterStreamConsumer",
12 "kinesis:ListShards",
13 "kinesis:GetShardIterator",
14 "kinesis:SubscribeToShard",
15 "kinesis:GetRecords"
16 ],
17 "Resource": [
18 "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>",
19 "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>/*"
20 ]
21 }
22 ]
23}
python
MongoDB
Data connector to MongoDB databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The hostname of the database.
The name of the Mongo database to establish a connection with.
The username which should be used to access the database. This username should
have access to the database db_name
.
The password associated with the username.
Fennel uses SRV connection format for authentication which is supported in Mongo versions 3.6 and later. If you have a self-hosted DB with version earlier than 3.6, please reach out to Fennel support.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form db.collection.find({"cursor": { "$gte": last_cursor - disorder } })
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
It is recommended to put an index on the cursor
field so that Fennel ingestion
queries don't create too much load on your MongoDB database.
1from fennel.connectors import source, Mongo
2from fennel.datasets import dataset
3
4mongo = Mongo(
5 name="mongo_src",
6 host="atlascluster.ushabcd.mongodb.net",
7 db_name="mongo",
8 username="username",
9 password="password",
10)
11
12collection = mongo.collection("user", cursor="timestamp")
13
14@source(collection, disorder="14d", cdc="append")
15@dataset
16class UserClick:
17 uid: int
18 ad_id: int
19 timestamp: datetime
python
Errors
Fennel tries to test the connection with your MongoDB during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
MySQL
Data connector to MySQL databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The hostname of the database.
Default: 3306
The port to connect to.
The name of the MySQL database to establish a connection with.
The username which should be used to access the database. This username should
have access to the database db_name
.
The password associated with the username.
Default: None
Additional properties to pass to the JDBC URL string when connecting to the
database formatted as key=value
pairs separated by the symbol &
. For
instance: key1=value1&key2=value2
.
If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params
to enabledTLSProtocols=TLSv1.2
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
It is recommended to put an index on the cursor
field so that Fennel ingestion
queries don't create too much load on your MySQL
1from fennel.connectors import source, MySQL
2from fennel.datasets import dataset, field
3
4mysql = MySQL(
5 name="my_mysql",
6 host="my-favourite-mysql.us-west-2.rds.amazonaws.com",
7 port=3306, # could be omitted, defaults to 3306
8 db_name=os.environ["DB_NAME"],
9 username=os.environ["MYSQL_USERNAME"],
10 password=os.environ["MYSQL_PASSWORD"],
11 jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = mysql.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19 uid: int = field(key=True)
20 email: str
21 created_at: datetime
22 updated_at: datetime = field(timestamp=True)
python
Errors
Fennel tries to test the connection with your MySQL during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in MySQL is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Postgres
Data connector to Postgres databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The hostname of the database.
Default: 5432
The port to connect to.
The name of the Postgres database to establish a connection with.
The username which should be used to access the database. This username should
have access to the database db_name
.
The password associated with the username.
Default: None
Additional properties to pass to the JDBC URL string when connecting to the
database formatted as key=value
pairs separated by the symbol &
. For
instance: key1=value1&key2=value2
.
If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params
to enabledTLSProtocols=TLSv1.2
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
It is recommended to put an index on the cursor
field so that Fennel ingestion
queries don't create too much load on your Postgres database.
1from fennel.connectors import source, Postgres
2from fennel.datasets import dataset, field
3
4postgres = Postgres(
5 name="my_postgres",
6 host="my-favourite-pg.us-west-2.rds.amazonaws.com",
7 port=5432, # could be omitted, defaults to 5432
8 db_name=os.environ["DB_NAME"],
9 username=os.environ["POSTGRES_USERNAME"],
10 password=os.environ["POSTGRES_PASSWORD"],
11 jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = postgres.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19 uid: int = field(key=True)
20 email: str
21 created_at: datetime
22 updated_at: datetime = field(timestamp=True)
python
Errors
Fennel tries to test the connection with your Postgres during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Postgres is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Pub/Sub
Data connector to Google Pub/Sub messaging service. Pub/Sub only supports at least once delivery guarantee. If exactly-once delivery is required, please use Dedup operator to make it exactly once.
Project Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
The project ID of the Google Cloud project containing the Pub/Sub topic
A dictionary containing the credentials for the Service Account to use to access Pub/Sub. See below for instructions on how to obtain this.
Topic Parameters
The name of the topic from which the data should be ingested.
The format of the data in Pub/Sub topic. Only "json"
is supported
Fennel supports only Append and Upsert mode CDC with data in JSON format. If you require support for schema or CDC data format, please reach out to Fennel support.
Errors
Fennel tries to test the connection with your Pub/Sub topic during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Pub/Sub Credentials
Interfacing with Pub/Sub requires credentials for a Service Account with the "Pub/Sub Subscriber" role, which grants permissions to create subscription and read messages from the subscribed topic. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.
The easiest way to create a Service Account is to follow GCP's guide
for Creating a Service Account. Once you've
created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles.
Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com
Then, add the service account as a Member of your Google Cloud Project with the "Pub/Sub Subscriber" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.
At this point, you should have a service account with the "Pub/Sub Subscriber" project-level permission.
To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.
1from fennel.connectors import source, PubSub
2from fennel.datasets import dataset, field
3
4pubsub = PubSub(
5 name="pubsub_src",
6 project_id="test_project",
7 service_account_key={
8 "type": "service_account",
9 "project_id": "fake-project-356105",
10 "client_email": "[email protected]",
11 "client_id": "103688493243243272951",
12 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
13 "token_uri": "https://oauth2.googleapis.com/token",
14 "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
15 },
16)
17
18@source(
19 pubsub.topic("test_topic", format="json"), disorder="2d", cdc="upsert"
20)
21@dataset
22class UserClick:
23 uid: int = field(key=True)
24 ad_id: int
25 timestamp: datetime
python
Redshift
Data connector to Redshift databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
To handle potentially large volume of data, Fennel asks Redshift to dump
query results in a temporary S3 bucket (since it's faster to go via S3). But this
requires Redshift to be able to access that S3 bucket. s3_access_role_arn
is
the IAM role ARN that Redshift should use to access S3.
This IAM role should be given full access to S3 and should also be assumable by your Redshift.
Steps to set up IAM role:
- Create an IAM role by following this documentation.
- Provide full access to S3 this role
- Associate IAM role with Redshift cluster by following this documentation.
You can refer to a sample policy in the right side code snippets. Do not set this parameter when using username/password for authentication
The name of the database where the relevant data resides.
The username which should be used to access the database. This username should have access to the
database db_name
. Do not set this parameter when using IAM authentication
The password associated with the username. Do not set this parameter when using IAM authentication
The hostname of the database.
Default: 5439
The port to connect to.
The name of the schema where the required data table(s) resides.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
For large datasets, it is recommended to use IAM-based authentication, as username/password-based authentication does not store temporary data in S3
Errors
Fennel tries to test the connection with your Redshift during commit
itself so any
connectivity issue (e.g. wrong database name, host name, port, etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Redshift is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
1from fennel.connectors import source, Redshift
2from fennel.datasets import dataset
3
4redshift = Redshift(
5 name="my_redshift",
6 s3_access_role_arn="arn:aws:iam::123:role/Redshift",
7 db_name=os.environ["DB_NAME"],
8 host="test-workgroup.1234.us-west-2.redshift-serverless.amazonaws.com",
9 port=5439, # could be omitted, defaults to 5439
10 schema="public",
11)
12
13table = redshift.table("user", cursor="timestamp")
14
15@source(table, disorder="14d", cdc="append")
16@dataset
17class UserClick:
18 uid: int
19 ad_id: int
20 timestamp: datetime
python
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Effect": "Allow",
6 "Action": "redshift:DescribeClusters",
7 "Resource": "*"
8 },
9 {
10 "Effect": "Allow",
11 "Action": [
12 "redshift:ModifyClusterIamRoles",
13 "redshift:CreateCluster"
14 ],
15 "Resource": [
16 # Redshift workgroup ARN
17 "arn:aws:redshift-serverless:us-west-2:82448945123:workgroup/0541e0ae-2ad1-4fe0-b2f3-4d6c1d3453e"
18 ]
19 },
20 {
21 "Effect": "Allow",
22 "Action": "iam:PassRole",
23 "Resource": [
24 # ARN of role created above
25 "arn:aws:iam::82448945123:role/RedshiftS3AccessRole",
26 ]
27 }
28 ]
29}
JSON
S3
Data connector to source data from S3.
Account Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
Default: None
AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.
Default: None
AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.
Default: None
Role ARN to assume to get access to S3. This field is not required if role-based access is used or if AWS access and secret keys are used or if the bucket is public.
Bucket Parameters
The name of the S3 bucket where the data files exist.
Default: None
The prefix of the bucket (as relative path within bucket) where the data files
exist. For instance, some-folder/
or A/B/C
are all valid prefixes. Prefix
can not have any wildcard characters.
Exactly one of prefix
or path
must be provided.
Default: None
A /
delimited path (relative to the bucket) describing the objects to be
ingested. The valid path parts are:
- static string of alphanumeric characters, underscores, hyphens or dots.
*
wild card - this must be the entire path part:*/*
is valid butfoo*/
is not.- string with a strftime format specifier (e.g
yyyymmdd=%Y%m%d
)
If you have a large volume of data or objects and your bucket is time partitioned,
it's highly recommended to include details of time partitioning in your path instead
of providing *
- Fennel can use this information to optimize the ingestion.
For example, if your bucket has the structure orders/{country}/date={date}/store={store}/{file}.json
, provide the path orders/*/date=%Y%m%d/*/*
Exactly one of prefix
or path
must be provided.
Default: ,
The character delimiting individual cells in the CSV data - only relevant when
format is CSV
, otherwise it's ignored.
The default value is ","
can be overridden by any other 1-character string. For
example, to use tab-delimited data enter "\t"
.
Default: None
Relevant only when using path
with strftime specifiers.
To do incremental ingestion of data from time partitioned S3 bucket, Fennel needs to know what time ranges may be present in any given partition. While not common, sometimes the the timestamp field used to time-partition data in your S3 may be different from the field that you want to be the "official" timestamp field of the Fennel dataset.
In such cases, it's possible that a bucket corresponding to say month=01
contains rows where value of the timestamp field is outside of month=01
.
spread
is a measure of how wide this gap can be. More formally, spread
indicates the maximum difference between the partition interval and
the value of the timestamp field for data in that partition. A None
value indicates no spread,
which is the case when the partitioning scheme uses the same timestamp values as the dataset's
timestamp
column. spread
is specified using Fennel's Duration type.
Examples:
- Given a path
txns/date=20240207/hh=06/
andspread=None
, fennel expects all data under this path to have timestamp between2024-02-07 06:00:00
and2024-02-07 07:00:00
- Given a path
txns/date=20240207/hh=06/
andspread="3d"
, fennel expects all data under this path to have a timestamp between2024-02-04 06:00:00
and2024-02-10 07:00:00
- Given a path
txns/date=20240207/
andspread="6h"
, fennel expects all data under this path to have a timestamp between2024-02-06 18:00:00
and2024-02-08 06:00:00
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="mys3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11 s3.bucket("datalake", prefix="user"),
12 every="1h",
13 disorder="14d",
14 cdc="upsert",
15)
16@dataset
17class User:
18 uid: int = field(key=True)
19 email: str
20 timestamp: datetime
python
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5 name="my_s3",
6 aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7 aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10bucket = s3.bucket(
11 "data", path="user/*/date-%Y-%m-%d/*", format="parquet", spread="2d"
12)
13
14@source(bucket, disorder="14d", cdc="upsert", every="1h")
15@dataset
16class User:
17 uid: int = field(key=True)
18 email: str
19 timestamp: datetime
python
Errors
Fennel server try to do some lightweight operations on the bucket during the commit operation - all connectivity or authentication related errors should be caught during the commit itself.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in S3 can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Enabling IAM Access
Fennel creates a role with name prefixed by FennelDataAccessRole-
in
your AWS account for role-based access. In order to use IAM access for s3, please
ensure that this role has access to read and do list files on the buckets of
interest.
With that ready, simply don't specify aws_access_key_id
and
aws_secret_access_key
and Fennel will automatically fall back to IAM based
access.
In case you do not want to provide S3 access to FennelDataAccessRole-
, pass role_arn
parameter inside connector params and make sure FennelDataAccessRole-
can assume that IAM role
- Fennel uses
file_last_modified
property exported by S3 to track what data has been seen so far and hence a cursor field doesn't need to be specified. - Fennel supports
role_arn
parameter only for CSV, JSON, Parquet and Avro data formats. In case you require support for Hudi or Deltalake format, please reach out to Fennel support
Secret
Secret can be used to pass sensitive information like username/password to Fennel using Secrets Manager secret reference.
In order to use Secret one of the below should be followed:
- Fennel Data access role should be given access to the secret.
- Or a new role can be created with access to secrets needed and Fennel Data access role can be added as trusted entities for that new role. so that the new role can be assumed to access the secrets.
Parameters
The ARN of the secret.
The Optional ARN of the role to be assumed to access the secret. This should be provided if a new role is created for Fennel Data access role to assume.
1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3from fennel.integrations.aws import Secret
4
5aws_secret = Secret(
6 arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
7 role_arn="arn:aws:iam::123456789012:role/secret-access-role",
8)
9
10# secret with above arn has content like below
11# {
12# "kafka": {
13# "username": "actual-kafka-username",
14# "password": "actual-kafka-password"
15# },
16# "schema_registry": {
17# "username": "actual-schema-registry-username",
18# "password": "actual-schema-registry-password"
19# }
20# }
21
22kafka = Kafka(
23 name="my_kafka",
24 bootstrap_servers="localhost:9092", # could come via os env var too
25 security_protocol="SASL_PLAINTEXT",
26 sasl_mechanism="PLAIN",
27 sasl_plain_username=aws_secret["kafka"]["username"],
28 sasl_plain_password=aws_secret["kafka"]["password"],
29)
30avro = Avro(
31 registry="confluent",
32 url=os.environ["SCHEMA_REGISTRY_URL"],
33 username=aws_secret["schema_registry"]["username"],
34 password=aws_secret["schema_registry"]["password"],
35)
36
37@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
38@dataset
39class SomeDataset:
40 uid: int = field(key=True)
41 email: str
42 timestamp: datetime
python
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "VisualEditor0",
6 "Effect": "Allow",
7 "Action": [
8 "secretsmanager:GetResourcePolicy",
9 "secretsmanager:GetSecretValue",
10 "secretsmanager:DescribeSecret",
11 "secretsmanager:ListSecretVersionIds"
12 ],
13 "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-secret-name-I4hSKr"
14 }
15 ]
16}
JSON
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Effect": "Allow",
6 "Principal": {
7 "AWS": [
8 "arn:aws:iam::123456789012:role/FennelDataAccessRole"
9 ]
10 },
11 "Action": "sts:AssumeRole"
12 }
13 ]
14}
JSON
Snowflake
Data connector to Snowflake databases.
Database Parameters
A name to identify the source. The name should be unique across all Fennel connectors.
Snowflake account identifier. This is the first part of the URL used to access
Snowflake. For example, if the URL is https://<account>.snowflakecomputing.com
,
then the account is <account>
.
This is usually of the form <ORG_ID>-<ACCOUNT_ID>
. Refer to the
Snowflake documentation
to find the account identifier.
The role that should be used by Fennel to access Snowflake.
The warehouse that should be used to access Snowflake.
The name of the database where the relevant data resides.
The schema where the required data table(s) resides.
The username which should be used to access Snowflake. This username should
have required permissions to assume the provided role
.
The password associated with the username.
Table Parameters
The name of the table within the database that should be ingested.
The name of the field in the table that acts as cursor
for ingestion i.e.
a field that is approximately monotonic and only goes up with time.
Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder}
to get data it hasn't seen before. Auto increment IDs or timestamps corresponding
to modified_at
(vs created_at
unless the field doesn't change) are good
contenders.
Note that this field doesn't even need to be a part of the Fennel dataset.
1from fennel.connectors import source, Snowflake
2from fennel.datasets import dataset
3
4snowflake = Snowflake(
5 name="my_snowflake",
6 account="VPECCVJ-MUB03765",
7 warehouse="TEST",
8 db_name=os.environ["DB_NAME"],
9 schema="PUBLIC",
10 role="ACCOUNTADMIN",
11 username=os.environ["SNOWFLAKE_USERNAME"],
12 password=os.environ["SNOWFLAKE_PASSWORD"],
13)
14
15table = snowflake.table("User", cursor="timestamp")
16
17@source(table, disorder="14d", cdc="append")
18@dataset
19class UserClick:
20 uid: int
21 ad_id: int
22 timestamp: datetime
python
Errors
Fennel tries to test the connection with your Snowflake during commit
itself so any
connectivity issue (e.g. wrong host name, username, password etc) is flagged as
as an error during commit with the real Fennel servers.
Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.
Schema validity of data in Snowflake is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Webhook
A push-based data connector, making it convenient for sending arbitrary JSON data to Fennel. Data can be pushed to a webhook endpoint either via the REST API or via the Python SDK.
Source Parameters
A name to identify the source. This name should be unique across all Fennel sources.
Default: 14d
Data sent to webhook is buffered for the duration retention
. That is, if the
data has been logged to a webhook, datasets defined later that source from this
webhook will still see that data until this duration.
Connector Parameters
The endpoint for the given webhook to which the data will be sent.
A single webhook could be visualized as a single Kafka cluster with each endpoint being somewhat analogous to a topic. A single webhook source can have as many endpoints as required.
Multiple datasets could be reading from the same webhook endpoint - in which case, they all get the exact same data.
1from fennel.connectors import source, Webhook
2from fennel.datasets import dataset, field
3
4webhook = Webhook(name="prod_webhook", retention="14d")
5
6@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
7@dataset
8class User:
9 uid: int = field(key=True)
10 email: str
11 timestamp: datetime
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16 txid: int
17 uid: int
18 amount: float
19 timestamp: datetime
python
1df = pd.DataFrame(
2 {
3 "uid": [1, 2, 3],
4 "email": ["[email protected]", "[email protected]", "[email protected]"],
5 "timestamp": [
6 datetime.now(timezone.utc),
7 datetime.now(timezone.utc),
8 datetime.now(timezone.utc),
9 ],
10 }
11)
12client.log("prod_webhook", "User", df)
python
1import requests
2
3url = "{}/api/v1/log".format(os.environ["FENNEL_SERVER_URL"])
4headers = {"Content-Type": "application/json"}
5data = [
6 {
7 "uid": 1,
8 "email": "[email protected]",
9 "timestamp": 1614556800,
10 },
11 {
12 "uid": 2,
13 "email": "[email protected]",
14 "timestamp": 1614556800,
15 },
16]
17req = {
18 "webhook": "prod_webhook",
19 "endpoint": "User",
20 "data": data,
21}
22requests.post(url, headers=headers, data=req)
python
Errors
Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.
Unlike all other sources, Webhook does work with mock client. As a result, it's very effective for quick prototyping and unit testing.