Concepts
Sink
Analogous to Sources, Fennel also supports sinks to export data out of Fennel into your data stores.
1from fennel.connectors import sink, Kafka
2from fennel.datasets import dataset, pipeline, Dataset
3from fennel.lib.params import inputs
4
5kafka = Kafka(
6 name="kafka_src",
7 bootstrap_servers=os.environ["KAFKA_HOST"],
8 security_protocol="PLAINTEXT",
9 sasl_mechanism="PLAIN",
10 sasl_plain_username=os.environ["KAFKA_USERNAME"],
11 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
12)
13
14@sink(kafka.topic("user_location"), cdc="debezium")
15@dataset
16class UserLocationFiltered:
17 uid: int
18 city: str
19 country: str
20 update_time: datetime
21
22 @pipeline
23 @inputs(UserLocation)
24 def user_location_count(cls, dataset: Dataset):
25 return dataset.filter(lambda row: row["country"] != "United States")
python
In this example, a regular Fennel dataset is being created using a pipeline. But it's desired to write it out to a Kafka topic as new updates arrive in the dataset.
Like Sources, first an object is created that knows how to connect with your
Kafka cluster. And sink
decorator is applied on the dataset that needs to be
written out - this decorator knows that destination if a Kafka topic and that
the CDC data needs to be written out in the debezium format.
That's it - once this is written, UserLocationFiltered
dataset will start
publishing changes to your Kafka.
Fennel ships with data sinks to a couple of common datastores so that you can 'sink' from your Fennel datasets to your external datasets. Sinks to many other common data stores will be added soon.