all-things-risingwave
How can I join and sink multiple topics in Kafka per product to a final Kafka topic with a common structure for product updates?
I have approximately 34 topics in Kafka, mainly containing product information, that need to be joined and sunk per product to a final Kafka topic. This is to ensure a common structure and only include information about product updates. KsqlDb is not suitable due to creating too many topics with each transformation applied.
Pa
Patrice Duhoux
Asked on May 04, 2023
To join and sink multiple topics in Kafka per product to a final Kafka topic with a common structure for product updates, you can use Apache Kafka Streams. Here's a high-level overview of the steps:
- Create a Kafka Streams application that consumes messages from the 34 topics.
- Implement the necessary logic to join the messages based on the product key.
- Transform the joined data into the desired structure for product updates.
- Sink the transformed data to the final Kafka topic.
Here's a simplified example using Kafka Streams in Java:
// Define the Kafka Streams configuration
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-update-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Create a Kafka Streams builder
StreamsBuilder builder = new StreamsBuilder();
// Define the processing logic to join and transform the messages
KStream<String, ProductInfo> productStream = builder.stream(Arrays.asList("topic1", "topic2", ..., "topic34"));
KTable<String, ProductUpdate> productUpdates = productStream.groupByKey().aggregate(...);
// Sink the transformed data to the final Kafka topic
productUpdates.toStream().to("final-topic", Produced.with(Serdes.String(), Serdes.ProductUpdate()));
// Build and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
May 04, 2023Edited by