Docs
API

Concepts

Sink

Analogous to Sources, Fennel also supports sinks to export data out of Fennel into your data stores.

1from fennel.connectors import sink, Kafka
2from fennel.datasets import dataset, pipeline, Dataset
3from fennel.lib.params import inputs
4
5kafka = Kafka(
6    name="kafka_src",
7    bootstrap_servers=os.environ["KAFKA_HOST"],
8    security_protocol="PLAINTEXT",
9    sasl_mechanism="PLAIN",
10    sasl_plain_username=os.environ["KAFKA_USERNAME"],
11    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
12)
13
14@sink(kafka.topic("user_location"), cdc="debezium")
15@dataset
16class UserLocationFiltered:
17    uid: int
18    city: str
19    country: str
20    update_time: datetime
21
22    @pipeline
23    @inputs(UserLocation)
24    def user_location_count(cls, dataset: Dataset):
25        return dataset.filter(lambda row: row["country"] != "United States")
Writing a Fennel dataset to a Kafka topic

python

In this example, a regular Fennel dataset is being created using a pipeline. But it's desired to write it out to a Kafka topic as new updates arrive in the dataset.

Like Sources, first an object is created that knows how to connect with your Kafka cluster. And sink decorator is applied on the dataset that needs to be written out - this decorator knows that destination if a Kafka topic and that the CDC data needs to be written out in the debezium format.

That's it - once this is written, UserLocationFiltered dataset will start publishing changes to your Kafka.

Fennel ships with data sinks to a couple of common datastores so that you can 'sink' from your Fennel datasets to your external datasets. Sinks to many other common data stores will be added soon.

Edit this Page on Github