Concepts
Sink
Analogous to Sources, Fennel also supports sinks to export data out of Fennel into your data stores.
1from fennel.connectors import sink, Kafka
2from fennel.datasets import dataset, pipeline, Dataset
3from fennel.lib.params import inputs
4
5kafka = Kafka(
6 name="kafka_src",
7 bootstrap_servers=os.environ["KAFKA_HOST"],
8 security_protocol="PLAINTEXT",
9 sasl_mechanism="PLAIN",
10 sasl_plain_username=os.environ["KAFKA_USERNAME"],
11 sasl_plain_password=os.environ["KAFKA_PASSWORD"],
12)
13
14@sink(kafka.topic("user_location"), cdc="debezium")
15@dataset
16class UserLocationFiltered:
17 uid: int
18 city: str
19 country: str
20 update_time: datetime
21
22 @pipeline
23 @inputs(UserLocation)
24 def user_location_count(cls, dataset: Dataset):
25 return dataset.filter(lambda row: row["country"] != "United States")
python
In this example, a regular Fennel dataset is being created using a pipeline. But it's desired to write it out to a Kafka topic as new updates arrive in the dataset.
Like Sources, first an object is created that knows how to connect with your
Kafka cluster. And sink
decorator is applied on the dataset that needs to be
written out - this decorator knows that destination if a Kafka topic and that
the CDC data needs to be written out in the debezium format.
That's it - once this is written, UserLocationFiltered
dataset will start
publishing changes to your Kafka.
As of right now, Fennel only supports Kafka sinks and writes data in the debezium format. Given the ubiquity of debezium connectors, you should be able to further pipe this debezium data from Kafka to your data store of choice.
More data stores and cdc strategies will be supported in the future updates.