I am trying to ingest data from Kafka, deduplicate it, and sink it back to Kafka using RisingWave. However, I am encountering an error related to the configuration parameters. Here is the configuration snippet: CREATE TABLE IF NOT EXISTS test_source ( id varchar, timestamp timestamp, field integer ) WITH ( connector='kafka', topic='test_risingwave_source', properties.bootstrap.server='localhost:9092', scan.startup.mode='earliest', scan.startup.timestamp_millis='140000000' ) ROW FORMAT JSON; CREATE SINK test_sink FROM test_source WITH ( connector='kafka', kafka.brokers='localhost:9092', kafka.topic='test_risingwave_sink', format='append_only', force_append_only='true' );
The error message is: ERROR: QueryError: internal error: Rpc error: gRPC error (Internal error): Executor error: Sink error: config error: missing field
properties.bootstrap.server``. What could be the issue?
Ralph Matthias Debusmann
Asked on Mar 27, 2023
properties.bootstrap.server
instead of kafka.brokers
and topic
instead of kafka.topic
.kafka.brokers
with properties.bootstrap.server
and kafka.topic
with topic
in the configuration to resolve the error.