API
Docs

Avro Registry

Several Fennel sources work with Avro format. When using Avro, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.

Fennel supports integration with avro schema registries.

Parameters

registry:Literal["confluent"]

String denoting the provider of the registry. As of right now, Fennel only supports "confluent" avro registry though more such schema registries may be added over time.

url:str

The URL where the schema registry is hosted.

username:Optional[str] | Optional[Secret]

User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.

Assuming authentication is needed, either username/password must be provided or a token, but not both.

password:Optional[str] | Optional[Secret]

The password associated with the username.

token:Optional[str] | Optional[Secret]

Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.

1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5    name="my_kafka",
6    bootstrap_servers="localhost:9092",  # could come via os env var too
7    security_protocol="SASL_PLAINTEXT",
8    sasl_mechanism="PLAIN",
9    sasl_plain_username=os.environ["KAFKA_USERNAME"],
10    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13avro = Avro(
14    registry="confluent",
15    url=os.environ["SCHEMA_REGISTRY_URL"],
16    username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17    password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23    uid: int = field(key=True)
24    email: str
25    timestamp: datetime
Using avro registry with kafka

python

Protobuf Registry

Several Fennel sources work with Protobuf format. When using Protobuf, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.

Fennel supports integration with protobuf schema registries.

Parameters

registry:Literal["confluent"]

String denoting the provider of the registry. As of right now, Fennel only supports "confluent" protobuf registry though more such schema registries may be added over time.

url:str

The URL where the schema registry is hosted.

username:Optional[str] | Optional[Secret]

User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.

Assuming authentication is needed, either username/password must be provided or a token, but not both.

password:Optional[str] | Optional[Secret]

The password associated with the username.

token:Optional[str] | Optional[Secret]

Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.

1from fennel.connectors import source, Kafka, Protobuf
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5    name="my_kafka",
6    bootstrap_servers="localhost:9092",  # could come via os env var too
7    security_protocol="SASL_PLAINTEXT",
8    sasl_mechanism="PLAIN",
9    sasl_plain_username=os.environ["KAFKA_USERNAME"],
10    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13protobuf = Protobuf(
14    registry="confluent",
15    url=os.environ["SCHEMA_REGISTRY_URL"],
16    username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17    password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=protobuf), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23    uid: int = field(key=True)
24    email: str
25    timestamp: datetime
Using protobuf registry with kafka

python

BigQuery

Data connector to Google BigQuery databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

project_id:str

The project ID of the Google Cloud project containing the BigQuery dataset.

dataset_id:str

The ID of the BigQuery dataset containing the table(s) to replicate.

service_account_key:Dict[str, str] | Secret

A dictionary containing the credentials for the Service Account to use to access BigQuery. See below for instructions on how to obtain this.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Errors

Connectivity Issues:

Fennel tries to test the connection with your BigQuery during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

BigQuery Credentials

How to obtain credentials?:

Interfacing with BigQuery requires credentials for a Service Account with the "BigQuery User" role at Project level and "BigQuery Data Editor" role at Dataset level. "BigQuery User" role grants permissions to run BigQuery jobs and "BigQuery Data Editor" role grants permissions to read and update table data and its metadata. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.

The easiest way to create a Service Account is to follow GCP's guide for Creating a Service Account. Once you've created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles. Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com

Then, add the service account as a Member of your Google Cloud Project with the "BigQuery User" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.

At this point, you should have a service account with the "BigQuery User" project-level permission.

Similarly, provide the "BigQuery Data Editor" permission to the service account by following Granting Access to Dataset in the Google documentation.

To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.

1from fennel.connectors import source, BigQuery
2from fennel.datasets import dataset
3
4bq = BigQuery(
5    name="my_bigquery",
6    project_id="my_project",
7    dataset_id="my_dataset",
8    service_account_key={
9        "type": "service_account",
10        "project_id": "fake-project-356105",
11        "client_email": "[email protected]",
12        "client_id": "103688493243243272951",
13        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
14        "token_uri": "https://oauth2.googleapis.com/token",
15        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
16    },
17)
18
19table = bq.table("user", cursor="timestamp")
20
21@source(table, disorder="14d", cdc="append")
22@dataset
23class UserClick:
24    uid: int
25    ad_id: int
26    timestamp: datetime

python

Deltalake

Data connector to read data from tables in deltalake living in S3.

Deltalake connector is implemented via s3 connector - just the format parameter needs to be setup as 'delta' and source CDC must be passed as 'native'.

To enable Change Data Feed (CDF) on your table, follow the instructions in this documentation

Warning

Fennel doesn't support reading delta tables from HDFS or any other non-S3 storage.

Note

Fennel supports only native CDC with data in Deltalake format. If your data is of the append type, enable CDF on the table and use native CDC mode.

1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="mys3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11    s3.bucket("data", prefix="user", format="delta"),
12    every="1h",
13    disorder="14d",
14    cdc="native",
15)
16@dataset
17class User:
18    uid: int = field(key=True)
19    email: str
20    timestamp: datetime
Sourcing delta tables into Fennel datasets

python

Hudi

Data connector to read data from Apache Hudi tables in S3.

Hudi connector is implemented via s3 connector - just the format parameter needs to be setup as 'hudi'

Warning

Fennel doesn't support reading hudi tables from HDFS or any other non-S3 storage.

1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="mys3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11    s3.bucket("data", prefix="user", format="hudi"),
12    disorder="14d",
13    cdc="upsert",
14    every="1h",
15)
16@dataset
17class User:
18    uid: int = field(key=True)
19    email: str
20    timestamp: datetime
Sourcing hudi tables into Fennel datasets

python

Kafka

Data connector to any data store that speaks the Kafka protocol (e.g. Native Kafka, MSK, Redpanda etc.)

Cluster Parameters

name:str

A name to identify the source. This name should be unique across ALL connectors.

bootstrap_servers:str

This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself and discover the rest of the brokers in the cluster.

Addresses are written as host & port pairs and can be specified either as a single server (e.g. localhost:9092) or a comma separated list of several servers (e.g. localhost:9092,another.host:9092).

security_protocol:"PLAINTEXT" | "SASL_PLAINTEXT" | "SASL_SSL"

Protocol used to communicate with the brokers.

sasl_mechanism:"PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512" | "GSSAPI"

SASL mechanism to use for authentication.

sasl_plain_username:Optional[str] | Optional[Secret]

SASL username.

sasl_plain_password:Optional[str] | Optional[Secret]

SASL password.

Topic Parameters

topic:str

The name of the kafka topic that needs to be sourced into the dataset.

format:"json" | Avro | Protobuf

Default: json

The format of the data in Kafka topic. "json", Avro and Protobuf supported.

1from fennel.connectors import source, Kafka
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5    name="my_kafka",
6    bootstrap_servers="localhost:9092",  # could come via os env var too
7    security_protocol="SASL_PLAINTEXT",
8    sasl_mechanism="PLAIN",
9    sasl_plain_username=os.environ["KAFKA_USERNAME"],
10    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13@source(kafka.topic("user", format="json"), disorder="14d", cdc="upsert")
14@dataset
15class SomeDataset:
16    uid: int = field(key=True)
17    email: str
18    timestamp: datetime
Sourcing json data from kafka to a dataset

python

Note

Fennel supports only Append and Upsert mode CDC with data in Protobuf format. If you require support for CDC data format, please reach out to Fennel support.

Errors

Connectivity problems:

Fennel server tries to connect with the Kafka broker during the commit operation itself to validate connectivity - as a result, incorrect URL/Username/Password etc will be caught at commit time itself as an error.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Kafka can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Kinesis

Data connector to ingest data from AWS Kinesis.

Parameters for Defining Source

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

role_arn:str

The arn of the role that Fennel should use to access the Kinesis stream. The role must already exist and Fennel's principal must have been given the permission to assume this role (see below for details or talk to Fennel support if you need help).

Stream Parameters

stream_arn:str

The arn of the Kinesis stream. The corresponding role_arn must have appropriate permissions for this stream. Providing a stream that either doesn't exist or can not be read using the given role_arn will result in an error during the commit operation.

init_position:str | datetime | float | int

The initial position in the stream from which Fennel should start ingestion. See Kinesis ShardIteratorType for more context. Allowed values are:

  • "latest" - start from the latest data (starting a few minutes after commit)
  • "trim_horizon"- start from the oldest data that hasn't been trimmed/expired yet.
  • datetime - start from the position denoted by this timestamp (i.e. equivalent to AT_TIMESTAMP in Kinesis vocabulary).

If choosing the datetime option, the timestamp can be specified as a datetime object, or as an int representing seconds since the epoch, or as a float representing {seconds}.{microseconds} since the epoch or as an ISO-8601 formatted str.

Note that this timestamp is the time attached with the Kinesis message itself at the time of production, not any timestamp field inside the message.

format:"json" | Avro

The format of the data in the Kinesis stream. Most common value is "json" though Fennel also supports Avro.

Errors

Connectivity problems:

Fennel server tries to connect with Kinesis during the commit operation itself to validate connectivity - as a result, incorrect stream/role ARNs or insufficient permissions will be caught at commit time itself as an error.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5    name="my_kinesis",
6    role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10    stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11    init_position=datetime(2023, 1, 5),  # Start ingesting from Jan 5, 2023
12    format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18    uid: int
19    order_id: str
20    amount: float
21    timestamp: datetime
Using explicit timestamp as init position

python

1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5    name="my_kinesis",
6    role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10    stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11    init_position="latest",
12    format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18    uid: int
19    order_id: str
20    amount: float
21    timestamp: datetime
Using latest as init position

python

Managing Kinesis Access

Fennel creates a special role with name prefixed by FennelDataAccessRole- in your AWS account for role-based access. The role corresponding to the role_arn passed to Kinesis source should have the following trust policy allowing this special Fennel role to assume the kinesis role.

See Trust Policy

Specify the exact role_arn in the form arn:aws:iam::<fennel-data-plane-account-id>:role/<FennelDataAccessRole-...> without any wildcards.

1{
2    "Version": "2012-10-17",
3    "Statement": [
4        {
5            "Sid": "",
6            "Effect": "Allow",
7            "Principal": {
8                "AWS": [
9                    "<role_arn>"
10                ]
11            },
12            "Action": "sts:AssumeRole"
13        }
14    ]
15}

python

Also attach the following permission policy. Add more streams to the Resource field if more than one streams need to be consumed via this role. Here the account-id is your account where the stream lives.

1{
2  "Version": "2012-10-17",
3  "Statement": [
4    {
5      "Sid": "AllowKinesisAccess",
6      "Effect": "Allow",
7      "Action": [
8        "kinesis:DescribeStream",
9        "kinesis:DescribeStreamSummary",
10        "kinesis:DescribeStreamConsumer",
11        "kinesis:RegisterStreamConsumer",
12        "kinesis:ListShards",
13        "kinesis:GetShardIterator",
14        "kinesis:SubscribeToShard",
15        "kinesis:GetRecords"
16      ],
17      "Resource": [
18        "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>",
19        "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>/*"
20      ]
21    }
22  ]
23}

python

MongoDB

Data connector to MongoDB databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

host:str

The hostname of the database.

db_name:str

The name of the Mongo database to establish a connection with.

username:str | Secret

The username which should be used to access the database. This username should have access to the database db_name.

password:str | Secret

The password associated with the username.

Note

Fennel uses SRV connection format for authentication which is supported in Mongo versions 3.6 and later. If you have a self-hosted DB with version earlier than 3.6, please reach out to Fennel support.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form db.collection.find({"cursor": { "$gte": last_cursor - disorder } }) to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Warning

It is recommended to put an index on the cursor field so that Fennel ingestion queries don't create too much load on your MongoDB database.

1from fennel.connectors import source, Mongo
2from fennel.datasets import dataset
3
4mongo = Mongo(
5    name="mongo_src",
6    host="atlascluster.ushabcd.mongodb.net",
7    db_name="mongo",
8    username="username",
9    password="password",
10)
11
12collection = mongo.collection("user", cursor="timestamp")
13
14@source(collection, disorder="14d", cdc="append")
15@dataset
16class UserClick:
17    uid: int
18    ad_id: int
19    timestamp: datetime
Sourcing dataset from a mongo collection

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your MongoDB during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

MySQL

Data connector to MySQL databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

host:str

The hostname of the database.

port:Optional[int]

Default: 3306

The port to connect to.

db_name:str

The name of the MySQL database to establish a connection with.

username:str | Secret

The username which should be used to access the database. This username should have access to the database db_name.

password:str | Secret

The password associated with the username.

jdbc_params:Optional[str]

Default: None

Additional properties to pass to the JDBC URL string when connecting to the database formatted as key=value pairs separated by the symbol &. For instance: key1=value1&key2=value2.

Error

If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params to enabledTLSProtocols=TLSv1.2

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Warning

It is recommended to put an index on the cursor field so that Fennel ingestion queries don't create too much load on your MySQL

1from fennel.connectors import source, MySQL
2from fennel.datasets import dataset, field
3
4mysql = MySQL(
5    name="my_mysql",
6    host="my-favourite-mysql.us-west-2.rds.amazonaws.com",
7    port=3306,  # could be omitted, defaults to 3306
8    db_name=os.environ["DB_NAME"],
9    username=os.environ["MYSQL_USERNAME"],
10    password=os.environ["MYSQL_PASSWORD"],
11    jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = mysql.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19    uid: int = field(key=True)
20    email: str
21    created_at: datetime
22    updated_at: datetime = field(timestamp=True)
Sourcing dataset from a mysql table

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your MySQL during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in MySQL is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Postgres

Data connector to Postgres databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

host:str

The hostname of the database.

port:Optional[int]

Default: 5432

The port to connect to.

db_name:str

The name of the Postgres database to establish a connection with.

username:str | Secret

The username which should be used to access the database. This username should have access to the database db_name.

password:str | Secret

The password associated with the username.

jdbc_params:Optional[str]

Default: None

Additional properties to pass to the JDBC URL string when connecting to the database formatted as key=value pairs separated by the symbol &. For instance: key1=value1&key2=value2.

Error

If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params to enabledTLSProtocols=TLSv1.2

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Warning

It is recommended to put an index on the cursor field so that Fennel ingestion queries don't create too much load on your Postgres database.

1from fennel.connectors import source, Postgres
2from fennel.datasets import dataset, field
3
4postgres = Postgres(
5    name="my_postgres",
6    host="my-favourite-pg.us-west-2.rds.amazonaws.com",
7    port=5432,  # could be omitted, defaults to 5432
8    db_name=os.environ["DB_NAME"],
9    username=os.environ["POSTGRES_USERNAME"],
10    password=os.environ["POSTGRES_PASSWORD"],
11    jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = postgres.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19    uid: int = field(key=True)
20    email: str
21    created_at: datetime
22    updated_at: datetime = field(timestamp=True)
Sourcing dataset from a postgres table

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your Postgres during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Postgres is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Pub/Sub

Data connector to Google Pub/Sub messaging service. Pub/Sub only supports at least once delivery guarantee. If exactly-once delivery is required, please use Dedup operator to make it exactly once.

Project Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

project_id:str

The project ID of the Google Cloud project containing the Pub/Sub topic

service_account_key:Dict[str, str] | Secret

A dictionary containing the credentials for the Service Account to use to access Pub/Sub. See below for instructions on how to obtain this.

Topic Parameters

topic_id:str

The name of the topic from which the data should be ingested.

format:"json"

The format of the data in Pub/Sub topic. Only "json" is supported

Note

Fennel supports only Append and Upsert mode CDC with data in JSON format. If you require support for schema or CDC data format, please reach out to Fennel support.

Errors

Connectivity Issues:

Fennel tries to test the connection with your Pub/Sub topic during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Pub/Sub Credentials

How to obtain credentials?:

Interfacing with Pub/Sub requires credentials for a Service Account with the "Pub/Sub Subscriber" role, which grants permissions to create subscription and read messages from the subscribed topic. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.

The easiest way to create a Service Account is to follow GCP's guide for Creating a Service Account. Once you've created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles. Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com

Then, add the service account as a Member of your Google Cloud Project with the "Pub/Sub Subscriber" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.

At this point, you should have a service account with the "Pub/Sub Subscriber" project-level permission.

To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.

1from fennel.connectors import source, PubSub
2from fennel.datasets import dataset, field
3
4pubsub = PubSub(
5    name="pubsub_src",
6    project_id="test_project",
7    service_account_key={
8        "type": "service_account",
9        "project_id": "fake-project-356105",
10        "client_email": "[email protected]",
11        "client_id": "103688493243243272951",
12        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
13        "token_uri": "https://oauth2.googleapis.com/token",
14        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
15    },
16)
17
18@source(
19    pubsub.topic("test_topic", format="json"), disorder="2d", cdc="upsert"
20)
21@dataset
22class UserClick:
23    uid: int = field(key=True)
24    ad_id: int
25    timestamp: datetime

python

Redshift

Data connector to Redshift databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

s3_access_role_arn:Optional[str]

To handle potentially large volume of data, Fennel asks Redshift to dump query results in a temporary S3 bucket (since it's faster to go via S3). But this requires Redshift to be able to access that S3 bucket. s3_access_role_arn is the IAM role ARN that Redshift should use to access S3.

This IAM role should be given full access to S3 and should also be assumable by your Redshift.

Steps to set up IAM role:

  • Create an IAM role by following this documentation.
  • Provide full access to S3 this role
  • Associate IAM role with Redshift cluster by following this documentation.

You can refer to a sample policy in the right side code snippets. Do not set this parameter when using username/password for authentication

db_name:str

The name of the database where the relevant data resides.

username:Optional[str] | Optional[Secret]

The username which should be used to access the database. This username should have access to the database db_name. Do not set this parameter when using IAM authentication

password:Optional[str] | Optional[Secret]

The password associated with the username. Do not set this parameter when using IAM authentication

host:str

The hostname of the database.

port:Optional[int]

Default: 5439

The port to connect to.

schema:str

The name of the schema where the required data table(s) resides.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Note

For large datasets, it is recommended to use IAM-based authentication, as username/password-based authentication does not store temporary data in S3

Errors

Connectivity Issues:

Fennel tries to test the connection with your Redshift during commit itself so any connectivity issue (e.g. wrong database name, host name, port, etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Redshift is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

1from fennel.connectors import source, Redshift
2from fennel.datasets import dataset
3
4redshift = Redshift(
5    name="my_redshift",
6    s3_access_role_arn="arn:aws:iam::123:role/Redshift",
7    db_name=os.environ["DB_NAME"],
8    host="test-workgroup.1234.us-west-2.redshift-serverless.amazonaws.com",
9    port=5439,  # could be omitted, defaults to 5439
10    schema="public",
11)
12
13table = redshift.table("user", cursor="timestamp")
14
15@source(table, disorder="14d", cdc="append")
16@dataset
17class UserClick:
18    uid: int
19    ad_id: int
20    timestamp: datetime
Bringing Redshift data into Fennel

python

1{
2    "Version": "2012-10-17",
3    "Statement": [
4        {
5            "Effect": "Allow",
6            "Action": "redshift:DescribeClusters",
7            "Resource": "*"
8        },
9        {
10            "Effect": "Allow",
11            "Action": [
12                "redshift:ModifyClusterIamRoles",
13                "redshift:CreateCluster"
14            ],
15            "Resource": [
16                # Redshift workgroup ARN
17                "arn:aws:redshift-serverless:us-west-2:82448945123:workgroup/0541e0ae-2ad1-4fe0-b2f3-4d6c1d3453e" 
18            ]
19        },
20        {
21            "Effect": "Allow",
22            "Action": "iam:PassRole",
23            "Resource": [
24                # ARN of role created above
25                "arn:aws:iam::82448945123:role/RedshiftS3AccessRole", 
26            ]
27        }
28    ]
29}
Sample IAM policy for integrating with Redshift

JSON

S3

Data connector to source data from S3.

Account Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

aws_access_key_id:Optional[str] | Optional[Secret]

Default: None

AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.

aws_secret_access_key:Optional[str] | Optional[Secret]

Default: None

AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.

role_arn:Optional[str]

Default: None

Role ARN to assume to get access to S3. This field is not required if role-based access is used or if AWS access and secret keys are used or if the bucket is public.

Bucket Parameters

bucket:str

The name of the S3 bucket where the data files exist.

prefix:Optional[str]

Default: None

The prefix of the bucket (as relative path within bucket) where the data files exist. For instance, some-folder/ or A/B/C are all valid prefixes. Prefix can not have any wildcard characters.

Exactly one of prefix or path must be provided.

path:Optional[str]

Default: None

A / delimited path (relative to the bucket) describing the objects to be ingested. The valid path parts are:

  • static string of alphanumeric characters, underscores, hyphens or dots.
  • * wild card - this must be the entire path part: */* is valid but foo*/ is not.
  • string with a strftime format specifier (e.g yyyymmdd=%Y%m%d)

If you have a large volume of data or objects and your bucket is time partitioned, it's highly recommended to include details of time partitioning in your path instead of providing * - Fennel can use this information to optimize the ingestion.

For example, if your bucket has the structure orders/{country}/date={date}/store={store}/{file}.json, provide the path orders/*/date=%Y%m%d/*/*

Exactly one of prefix or path must be provided.

format:str

Default: csv

The format of the files you'd like to ingest. Valid values are "csv", "parquet", "json", "delta" or "hudi".

delimiter:Optional[str]

Default: ,

The character delimiting individual cells in the CSV data - only relevant when format is CSV, otherwise it's ignored.

The default value is "," can be overridden by any other 1-character string. For example, to use tab-delimited data enter "\t".

spread:Optional[Duration]

Default: None

Relevant only when using path with strftime specifiers.

To do incremental ingestion of data from time partitioned S3 bucket, Fennel needs to know what time ranges may be present in any given partition. While not common, sometimes the the timestamp field used to time-partition data in your S3 may be different from the field that you want to be the "official" timestamp field of the Fennel dataset.

In such cases, it's possible that a bucket corresponding to say month=01 contains rows where value of the timestamp field is outside of month=01.

spread is a measure of how wide this gap can be. More formally, spread indicates the maximum difference between the partition interval and the value of the timestamp field for data in that partition. A None value indicates no spread, which is the case when the partitioning scheme uses the same timestamp values as the dataset's timestamp column. spread is specified using Fennel's Duration type.

Examples:

  • Given a path txns/date=20240207/hh=06/ and spread=None, fennel expects all data under this path to have timestamp between 2024-02-07 06:00:00 and 2024-02-07 07:00:00
  • Given a path txns/date=20240207/hh=06/ and spread="3d", fennel expects all data under this path to have a timestamp between 2024-02-04 06:00:00 and 2024-02-10 07:00:00
  • Given a path txns/date=20240207/ and spread="6h", fennel expects all data under this path to have a timestamp between 2024-02-06 18:00:00 and 2024-02-08 06:00:00
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="mys3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11    s3.bucket("datalake", prefix="user"),
12    every="1h",
13    disorder="14d",
14    cdc="upsert",
15)
16@dataset
17class User:
18    uid: int = field(key=True)
19    email: str
20    timestamp: datetime
S3 ingestion via prefix

python

1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="my_s3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10bucket = s3.bucket(
11    "data", path="user/*/date-%Y-%m-%d/*", format="parquet", spread="2d"
12)
13
14@source(bucket, disorder="14d", cdc="upsert", every="1h")
15@dataset
16class User:
17    uid: int = field(key=True)
18    email: str
19    timestamp: datetime
S3 ingestion via path

python

Errors

Connectivity or authentication errors:

Fennel server try to do some lightweight operations on the bucket during the commit operation - all connectivity or authentication related errors should be caught during the commit itself.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in S3 can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Enabling IAM Access

Fennel creates a role with name prefixed by FennelDataAccessRole- in your AWS account for role-based access. In order to use IAM access for s3, please ensure that this role has access to read and do list files on the buckets of interest.

With that ready, simply don't specify aws_access_key_id and aws_secret_access_key and Fennel will automatically fall back to IAM based access.

In case you do not want to provide S3 access to FennelDataAccessRole-, pass role_arn parameter inside connector params and make sure FennelDataAccessRole- can assume that IAM role

Note
  • Fennel uses file_last_modified property exported by S3 to track what data has been seen so far and hence a cursor field doesn't need to be specified.
  • Fennel supports role_arn parameter only for CSV, JSON, Parquet and Avro data formats. In case you require support for Hudi or Deltalake format, please reach out to Fennel support

Secret

Secret can be used to pass sensitive information like username/password to Fennel using Secrets Manager secret reference.

In order to use Secret one of the below should be followed:

  1. Fennel Data access role should be given access to the secret.
  2. Or a new role can be created with access to secrets needed and Fennel Data access role can be added as trusted entities for that new role. so that the new role can be assumed to access the secrets.

Parameters

arn:str

The ARN of the secret.

role_arn:Optional[str]

The Optional ARN of the role to be assumed to access the secret. This should be provided if a new role is created for Fennel Data access role to assume.

1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3from fennel.integrations.aws import Secret
4
5aws_secret = Secret(
6    arn="arn:aws:secretsmanager:us-east-1:123456789012:secret:my-secret-name-I4hSKr",
7    role_arn="arn:aws:iam::123456789012:role/secret-access-role",
8)
9
10# secret with above arn has content like below
11# {
12#     "kafka": {
13#         "username": "actual-kafka-username",
14#         "password": "actual-kafka-password"
15#     },
16#     "schema_registry": {
17#         "username": "actual-schema-registry-username",
18#         "password": "actual-schema-registry-password"
19#     }
20# }
21
22kafka = Kafka(
23    name="my_kafka",
24    bootstrap_servers="localhost:9092",  # could come via os env var too
25    security_protocol="SASL_PLAINTEXT",
26    sasl_mechanism="PLAIN",
27    sasl_plain_username=aws_secret["kafka"]["username"],
28    sasl_plain_password=aws_secret["kafka"]["password"],
29)
30avro = Avro(
31    registry="confluent",
32    url=os.environ["SCHEMA_REGISTRY_URL"],
33    username=aws_secret["schema_registry"]["username"],
34    password=aws_secret["schema_registry"]["password"],
35)
36
37@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
38@dataset
39class SomeDataset:
40    uid: int = field(key=True)
41    email: str
42    timestamp: datetime
Using secrets with kafka

python

1{
2	"Version": "2012-10-17",
3	"Statement": [
4		{
5			"Sid": "VisualEditor0",
6			"Effect": "Allow",
7			"Action": [
8				"secretsmanager:GetResourcePolicy",
9				"secretsmanager:GetSecretValue",
10				"secretsmanager:DescribeSecret",
11				"secretsmanager:ListSecretVersionIds"
12			],
13			"Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-secret-name-I4hSKr"
14		}
15	]
16}
Example Permission policy for new role

JSON

1{
2    "Version": "2012-10-17",
3    "Statement": [
4        {
5            "Effect": "Allow",
6            "Principal": {
7                "AWS": [
8                    "arn:aws:iam::123456789012:role/FennelDataAccessRole"
9                ]
10            },
11            "Action": "sts:AssumeRole"
12        }
13    ]
14}
Example Trusted relationship for the new role

JSON

Snowflake

Data connector to Snowflake databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel connectors.

account:str

Snowflake account identifier. This is the first part of the URL used to access Snowflake. For example, if the URL is https://<account>.snowflakecomputing.com, then the account is <account>.

This is usually of the form <ORG_ID>-<ACCOUNT_ID>. Refer to the Snowflake documentation to find the account identifier.

role:str

The role that should be used by Fennel to access Snowflake.

warehouse:str

The warehouse that should be used to access Snowflake.

db_name:str

The name of the database where the relevant data resides.

schema:str

The schema where the required data table(s) resides.

username:str | Secret

The username which should be used to access Snowflake. This username should have required permissions to assume the provided role.

password:str | Secret

The password associated with the username.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

1from fennel.connectors import source, Snowflake
2from fennel.datasets import dataset
3
4snowflake = Snowflake(
5    name="my_snowflake",
6    account="VPECCVJ-MUB03765",
7    warehouse="TEST",
8    db_name=os.environ["DB_NAME"],
9    schema="PUBLIC",
10    role="ACCOUNTADMIN",
11    username=os.environ["SNOWFLAKE_USERNAME"],
12    password=os.environ["SNOWFLAKE_PASSWORD"],
13)
14
15table = snowflake.table("User", cursor="timestamp")
16
17@source(table, disorder="14d", cdc="append")
18@dataset
19class UserClick:
20    uid: int
21    ad_id: int
22    timestamp: datetime
Defining and using a snowflake source

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your Snowflake during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Snowflake is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Webhook

A push-based data connector, making it convenient for sending arbitrary JSON data to Fennel. Data can be pushed to a webhook endpoint either via the REST API or via the Python SDK.

Source Parameters

name:str

A name to identify the source. This name should be unique across all Fennel sources.

retention:Duration

Default: 14d

Data sent to webhook is buffered for the duration retention. That is, if the data has been logged to a webhook, datasets defined later that source from this webhook will still see that data until this duration.

Connector Parameters

endpoint:str

The endpoint for the given webhook to which the data will be sent.

A single webhook could be visualized as a single Kafka cluster with each endpoint being somewhat analogous to a topic. A single webhook source can have as many endpoints as required.

Multiple datasets could be reading from the same webhook endpoint - in which case, they all get the exact same data.

1from fennel.connectors import source, Webhook
2from fennel.datasets import dataset, field
3
4webhook = Webhook(name="prod_webhook", retention="14d")
5
6@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
7@dataset
8class User:
9    uid: int = field(key=True)
10    email: str
11    timestamp: datetime
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    txid: int
17    uid: int
18    amount: float
19    timestamp: datetime
Two datasets sourcing from endpoints of the same webook

python

1df = pd.DataFrame(
2    {
3        "uid": [1, 2, 3],
4        "email": ["[email protected]", "[email protected]", "[email protected]"],
5        "timestamp": [
6            datetime.now(timezone.utc),
7            datetime.now(timezone.utc),
8            datetime.now(timezone.utc),
9        ],
10    }
11)
12client.log("prod_webhook", "User", df)
Pushing data into webhook via Python SDK

python

1import requests
2
3url = "{}/api/v1/log".format(os.environ["FENNEL_SERVER_URL"])
4headers = {"Content-Type": "application/json"}
5data = [
6    {
7        "uid": 1,
8        "email": "[email protected]",
9        "timestamp": 1614556800,
10    },
11    {
12        "uid": 2,
13        "email": "[email protected]",
14        "timestamp": 1614556800,
15    },
16]
17req = {
18    "webhook": "prod_webhook",
19    "endpoint": "User",
20    "data": data,
21}
22requests.post(url, headers=headers, data=req)
Pushing data into webhook via REST API

python

Errors

Schema mismatch errors:

Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Note

Unlike all other sources, Webhook does work with mock client. As a result, it's very effective for quick prototyping and unit testing.