Docs
API

Concepts

Sink

Analogous to Sources, Fennel also supports sinks to export data out of Fennel into your data stores.

1from fennel.connectors import sink, Kafka
2from fennel.datasets import dataset, pipeline, Dataset
3from fennel.lib.params import inputs
4
5kafka = Kafka(
6    name="kafka_src",
7    bootstrap_servers=os.environ["KAFKA_HOST"],
8    security_protocol="PLAINTEXT",
9    sasl_mechanism="PLAIN",
10    sasl_plain_username=os.environ["KAFKA_USERNAME"],
11    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
12)
13
14@sink(kafka.topic("user_location"), cdc="debezium")
15@dataset
16class UserLocationFiltered:
17    uid: int
18    city: str
19    country: str
20    update_time: datetime
21
22    @pipeline
23    @inputs(UserLocation)
24    def user_location_count(cls, dataset: Dataset):
25        return dataset.filter(lambda row: row["country"] != "United States")
Writing a Fennel dataset to a Kafka topic

python

In this example, a regular Fennel dataset is being created using a pipeline. But it's desired to write it out to a Kafka topic as new updates arrive in the dataset.

Like Sources, first an object is created that knows how to connect with your Kafka cluster. And sink decorator is applied on the dataset that needs to be written out - this decorator knows that destination if a Kafka topic and that the CDC data needs to be written out in the debezium format.

That's it - once this is written, UserLocationFiltered dataset will start publishing changes to your Kafka.

As of right now, Fennel only supports Kafka sinks and writes data in the debezium format. Given the ubiquity of debezium connectors, you should be able to further pipe this debezium data from Kafka to your data store of choice.

More data stores and cdc strategies will be supported in the future updates.

Edit this Page on Github