all-things-risingwave
How to fine grain control JSON fields of a Kafka source in RW for Postgres sink?
I am trying to use RW as a Kafka Connector to Postgres but facing issues with JSON field mapping and conversion. The JSON object from Kafka source has fields like 'id', 'desc', and 'created_at', while the Postgres table columns are 'content_id', 'description', and 'created_at'. How can I achieve fine-grain control for mapping and conversion in this scenario?
Jo
Joe Lin
Asked on Mar 28, 2024
-
Create the source in RW with fields 'id', 'desc', and 'created_at'.
-
Use a SELECT statement in the sink to map and convert the fields accordingly.
-
Example:
-- Create source with id, desc, and created_at
CREATE SOURCE content_feature_source (
id VARCHAR,
desc VARCHAR,
created_at TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'feature_event',
properties.bootstrap.server = '10.92.20.22:9092',
properties.client.id = 'rw_content_feature',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Create sink with SELECT statement for mapping and conversion
CREATE SINK content_feature_sink FROM content_feature_source WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:<postgresql://postgres:5432/ai?user=postgres&password=password>',
table.name = 'content_feature',
type = 'upsert',
primary_key = 'content_id',
select = 'SELECT id as content_id, desc as description, created_at FROM content_feature_source'
);
Mar 29, 2024Edited by