Documentation

API Reference

Sources

Here is the description of all the external sources supported by Fennel and how to use them:

Webhook

The Webhook source operates on a push-based mechanism, making it convenient for sending data to Fennel. There are two options for pushing data into Fennel: utilizing the Fennel Python SDK or employing the REST API.

The following fields need to be specified:

  1. name - A name to identify the source. The name should be unique across all sources.

And the following fields need to be defined on the webhook:

  1. endpoint - The endpoint for the given webhook to which the data will be sent.
source.py
1webhook = sources.Webhook(name="fennel_webhook")
2
3
4@source(webhook.endpoint("UserDataset"))
5@meta(owner="[email protected]")
6@dataset
7class UserDataset:
8    uid: int = field(key=True)
9    email: str
10    timestamp: datetime
11    ...
12
13
14client.log("fennel_webhook", "UserDataset", df)

To use the REST api check the REST API documentation.

MySQL

The following fields need to be specified:

  1. name - A name to identify the source. The name should be unique across all sources.
  2. host - The host name of the database.
  3. port - The port to connect to. By default it is 3303 for MySQL and 5432 for Postgres.
  4. db_name - The database name.
  5. username - The username which is used to access the database.
  6. password - The password associated with the username.
  7. jdbc_params - Additional properties to pass to the JDBC URL string when connecting to the database formatted as key=value pairs separated by the symbol &. (example: key1=value1&key2=value2&key3=value3).
1mysql = sources.MySQL(
2    name="py_mysql_src",
3    host="my-favourite-mysql.us-west-2.rds.amazonaws.com",
4    port=3306,
5    db_name="some_database_name",
6    username="admin",
7    password="password",
8    jdbc_params="enabledTLSProtocols=TLSv1.2",
9)
10
11
12@source(mysql.table("user", cursor="update_time"), every="1m")
13@dataset
14class UserMysqlSourcedDataset:
15    uid: int = field(key=True)
16    email: str
17    timestamp: datetime
18    ...

Warning

If you see a Cannot create a PoolableConnectionFactoryerror, try setting jdbc_params to enabledTLSProtocols=TLSv1.2

Postgres

1postgres = sources.Postgres(
2    name="py_psql_src",
3    host="my-favourite-postgres.us-west-2.rds.amazonaws.com",
4    db_name="some_database_name",
5    username="admin",
6    password="password",
7)
8
9
10@source(postgres.table("user", cursor="update_time"), every="1m")
11@dataset
12class UserPostgresSourcedDataset:
13    uid: int
14    timestamp: datetime
15    ...

Warning

If you see a Cannot create a PoolableConnectionFactoryerror, try setting **jdbc_params ** to **** enabledTLSProtocols=TLSv1.2

S3

The following fields need to be defined on the source:

  1. name - A name to identify the source. The name should be unique across all sources.
  2. aws_access_key_id (optional) - AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.
  3. aws_secret_access_key (optional) - AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.

Info

Fennel creates a special role with name prefixed by FennelDataAccessRole- in your AWS account for role-based access.

The following fields need to be defined on the bucket:

  1. bucket - Name of the S3 bucket where the file(s) exist.
  2. prefix (optional) - By providing a path-like prefix (e.g., myFolder/thisTable/) under which all the relevant files sit, we can optimize finding these in S3. This is optional but recommended if your bucket contains many folders/files
  3. format (optional) - The format of the files you'd like to replicate. You can choose between CSV (default), Avro, Hudi and Parquet.
  4. delimiter (optional) - the character delimiting individual cells in the CSV data. The default value is "," and if overridden, this can only be a 1-character string. For example, to use tab-delimited data enter "\t".
1s3 = sources.S3(
2    name="ratings_source",
3    aws_access_key_id="<SOME_ACCESS_KEY>",
4    aws_secret_access_key="<SOME_SECRET_ACCESS_KEY>",
5)
6
7
8@source(s3.bucket("engagement", prefix="notion"), every="30m")
9@meta(owner="[email protected]")
10@dataset
11class UserS3SourcedDataset:
12    uid: int = field(key=True)
13    email: str
14    timestamp: datetime
15    ...

Fennel uses file_last_modified property exported by S3 to track what data has been seen so far and hence a cursor field doesn't need to be specified.

BigQuery

The following fields need to be specified:

  1. name - A name to identify the source. The name should be unique across all sources.
  2. project_id - The project ID of the Google Cloud project containing the BigQuery dataset.
  3. dataset_id - The ID of the BigQuery dataset containing the table(s) to replicate.
  4. credentials_json - The JSON string containing the credentials for the Service Account to use to access BigQuery. See below for instructions on how to obtain this.
How to obtain credentials?

Interfacing with BigQuery requires credentials for a Service Account with the "BigQuery User" and "BigQuery Data Editor" roles, which grants permissions to run BigQuery jobs, write to BigQuery Datasets, and read table metadata. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.

The easiest way to create a Service Account is to follow GCP's guide for Creating a Service Account. Once you've created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles. Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com

Then, add the service account as a Member of your Google Cloud Project with the "BigQuery User" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.

At this point, you should have a service account with the "BigQuery User" project-level permission.

For Service Account Key JSON, enter the Google Cloud Service Account Key in JSON format.

Snowflake

The following fields need to be defined:

  1. name - A name to identify the source. The name should be unique across all sources.
  2. host - The host domain of the Snowflake instance (must include the account, region and cloud environment, and end with snowflakecomputing.com). Example: accountname.us-east-2.aws.snowflakecomputing.com.
  3. role - The role that Fennel should use to access Snowflake.
  4. warehouse - The warehouse that Fennel should use to access Snowflake
  5. db_name - The database where the required data resides.
  6. schema - The default schema used as the target schema for all statements issued from the connection that do not explicitly specify a schema name.
  7. username - The username that should be used to access Snowflake. Please note that the username should have the required permissions to assume the role provided.
  8. password - The password associated with the username.
1sf_src = sources.Snowflake(
2    name="snowflake_src",
3    account="nhb38793.us-west-2.snowflakecomputing.com",
4    warehouse="TEST",
5    schema="PUBLIC",
6    db_name="TEST_DB",
7    src_schema="PUBLIC",
8    role="ACCOUNTADMIN",
9    username="<username>",
10    password="<password>",
11)

Info

Currently, Fennel only supports OAuth 1 (username and password) authentication. We are happy to prioritize support for OAuth 2.0 if needed - if so, please talk to us!

Hudi

Fennel integrates with Apache Hudi via its S3 connector. To use Hudi, simply set the format field to "hudi" when configuring the S3 bucket.

1s3 = sources.S3(
2    name="ratings_source",
3    aws_access_key_id="<SOME_ACCESS_KEY>",
4    aws_secret_access_key="<SOME_SECRET_ACCESS_KEY>",
5)
6
7
8@source(s3.bucket("engagement", prefix="notion", format="hudi"), every="30m")
9@meta(owner="[email protected]")
10@dataset
11class UserHudiSourcedDataset:
12    uid: int = field(key=True)
13    email: str
14    timestamp: datetime
15    ...

Kafka

The following fields need to be defined for the source:

  1. name - A name to identify the source. The name should be unique across all sources.
  2. bootstrap_servers - A list of broker host or host:port.
  3. security_protocol - Protocol used to communicate with brokers. Supported PLAINTEXT, SASL_PLAINTEXT, and SASL_SSL.
  4. sasl_mechanism - SASL mechanism to use for authentication. For example, SCRAM-SHA-256, PLAIN.
  5. sasl_plain_username - SASL username.
  6. sasl_plain_password - SASL password.
  7. verify_cert - Enable OpenSSL's builtin broker (server) certificate verification. Default is true.

The following fields need to be defined on the topic:

  1. topic - The kafka topic.
1kafka = sources.Kafka(
2    name="kafka_src",
3    bootstrap_servers="localhost:9092",
4    security_protocol="PLAINTEXT",
5    sasl_mechanism="PLAIN",
6    sasl_plain_username="test",
7    sasl_plain_password="test",
8    verify_cert=False,
9)
10
11
12@source(kafka.topic("user"))
13@meta(owner="[email protected]")
14@dataset
15class UserKafkaSourcedDataset:
16    uid: int = field(key=True)
17    email: str
18    timestamp: datetime
19    ...

Delta Lake

Similar to Hudi, Fennel integrates with Delta Lake via its S3 connector. To use delta lake, simply set the format field to "delta" when configuring the S3 bucket.

1s3 = sources.S3(
2    name="ratings_source",
3    aws_access_key_id="<SOME_ACCESS_KEY>",
4    aws_secret_access_key="<SOME_SECRET_ACCESS_KEY>",
5)
6
7
8@source(s3.bucket("engagement", prefix="notion", format="delta"), every="30m")
9@meta(owner="[email protected]")
10@dataset
11class UserDeltaLakeSourcedDataset:
12    uid: int = field(key=True)
13    email: str
14    timestamp: datetime
15    ...