I'm trying to create a source connection from Kafka to RisingWave (RW) and I'm unsure which configuration to use. The documentation mentions two different configurations for JSON and JSON upsert. Here are the examples provided:
For JSON:
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp_millis='140000000'
) FORMAT PLAIN ENCODE JSON;
For JSON upsert:
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='t1'
) FORMAT UPSERT ENCODE JSON;```
Which configuration should I follow to establish the connection? Additionally, I'm using Debezium for change data capture (CDC) from a PostgreSQL database to Kafka, and I'm encountering an error when creating the source connection in RisingWave. The error is:
`QueryError: internal error: error trying to connect: dns error: failed to lookup address information: Temporary failure in name resolution`
I've confirmed that I can connect to Kafka using `telnet kafka_ip 9092` from the RW server node. I'm also using `docker-compose` to run RW. Do I need to use the `createKey` transform to define a primary key in the Kafka topic payload?
Vijay Shankar
Asked on Oct 05, 2023
If your data has a primary key and you want later events to overwrite earlier events with the same primary key, you should use the Upsert format. Otherwise, use the plain format. Since you are using Debezium for CDC from PostgreSQL to Kafka and then to RisingWave, you should use FORMAT DEBEZIUM ENCODE JSON
. You do not need to use the createKey
transform to define a primary key in the Kafka topic payload. Regarding the error you're encountering, it indicates that RisingWave failed to connect to the Kafka brokers. Make sure that the properties.bootstrap.server
is set to the correct IP address and port of your Kafka broker, not localhost
if Kafka is running on a different server. You can also check the compute node's log for more clues, which is outputted to stderr. If you're deploying with Kubernetes or Docker, use kubectl logs
or docker logs
to view the logs.