Documentation

Concepts

Source

Source is the ONLY way to get data into Fennel. There are two ways to source data into Fennel:

You can either log data into Fennel using a Webhook source or you can source data from your external datastores.

Fennel ships with data connectors to all common datastores so that you can 'source' your Fennel datasets from your external datasets. Let's see an example:

Example

1from fennel.sources import source, Postgres
2
3postgres = Postgres(host=...<credentials>..)
4
5@source(postgres.table('user'), cursor='update_time', every='1m', lateness='1d')
6@meta(owner='[email protected]')
7@dataset
8class UserLocation:
9    uid: int
10    city: str
11    country: str
12    update_time: datetime
13

In this example, line 3 creates an object that knows how to connect with your Postgres database. Line 6-12 describe a dataset that needs to be sourced from the Postgres. And line 5 declares that this dataset should be sourced from a table named user within the Postgres database. And that's it - once this is written, UserLocation dataset will start mirroring your postgres table userand will update as the underlying Postgres table updates.

Most sources take a few additional parameters as described below:

Every

The frequency with which Fennel checks the external data source for new data. Needed for all sources except Kafka and Webhooks which are ingested continuously.

Cursor

Fennel uses a cursor to do incremental ingestion of data. It does so by remembering the last value of the cursor column (in this case update_time) and issuing a query of the form SELECT * FROM user WHERE update_time > {last_update_time}. Clearly, this works only when the cursor field is monotonically increasing with row updates - which Fennel expects you to ensure. It is also advised to have an index of the cursor column so that this query is efficient. All data sources except Kafka, Webhooks & S3 require a cursor.

Lateness

Fennel, like many other streaming systems, is designed to robustly handle out of order data. If there are no bounds on how out of order data can get, the state can blow up. Unlike some other systems, Fennel keeps this state on disk which eliminates OOM issues. But even then, it's desirable to garbage collect this state when all data before a timestamp has been seen.

This is usually handled by a technique called Watermarking where max out of order delay is specified. This max out of order delay of a source is called lateness in Fennel, and once specified at source level, is respected automatically by each downstream pipeline. In this example, by setting lateness as 1d, we are telling Fennel that once it sees a data with timestamp t, it will never see data with timestamp older than t-1 day and if it does see older data, it's free to discard it.

Schema Matching

Once Fennel obtains data from a source (usually as json string), the data needs to be parsed to extract and validate all the schema fields. Fennel expects the names of the fields in the dataset to match the schema of ingested json string. In this example, it is expected that the user table in Postgres will have at least four columns -uid, city, country, and update_time with appropriate types. Note that the postgres table could have many more columns too - they are simply ignored. If ingested data doesn't match with the schema of the Fennel dataset, the data is discarded and not admitted to the dataset. Fennel maintains logs of how often it happens and it's possible to set alerts on that.

Here is how various types are matched from the sourced data:

  • int, float, str, bool respectively match with any integer types, float types, string types and boolean types. For instance, Fennel's int type matches with INT8 or UINT32 from Postgres.
  • List[T] matches a list of data of type T.
  • Dict[T] matches any dictionary from strings to values of type T.
  • Option[T] matches if either the value is null or if its non-null value matches type T. Note that null is an invalid value for any non-Option types.
  • datetime matching is a bit more flexible to support multiple common data formats to avoid bugs due to incompatible formats. Fennel is able to safely parse datetime from all the following formats.

Datetime Formats

  • Integers that describe timestamp as interval from Unix epoch e.g. 1682099757 Fennel is smart enough to automatically deduce if an integer is describing timestamp as seconds, milliseconds, microseconds or nanoseconds
  • Strings that are decimal representation of an interval from Unix epoch e.g."1682099757"
  • Strings describing timestamp in RFC 3339 format e.g. '2002-10-02T10:00:00-05:00' or '2002-10-02T15:00:00Z' or '2002-10-02T15:00:00.05Z'
  • Strings describing timestamp in RFC2822 format e.g. 'Sun, 23 Jan 2000 01:23:45 JST'

Safety of Credentials

In the above example, the credentials are defined in the code itself, which usually is not a good practice from a security point of view. Instead, Fennel recommends two ways of using Sources securely:

  1. Using environment variables on your local machines
  2. Defining credentials in Fennel's web console and referring to sources by their names in the Python definitions.

In either approach, once the credentials reach the Fennel servers, they are securely stored in a Secret Manager and/or encrypted disks.

Load Impact of Sources

Fennel sources have negligible load impact on the external data sources. For instance, in the above example, as long as indices are put on the cursor field, Fennel will make a single SELECT query on Postgres every minute. And once data reaches Fennel datasets, all subsequent operations are done using the copy of the data stored on Fennel servers, not the underlying data sources. This ensures that external data sources never need to be over-provisioned (or changed in any way) just for Fennel to be able to read the data.

Change Data Capture (CDC)

Fennel can also do CDC ingestion for Postgres and MySQL. However, that requires setting some permissions on your Postgres/MySQL instances. Please talk to Fennel team if you want this enabled.