all-things-risingwave

How to fine grain control JSON fields of a Kafka source in RW for Postgres sink?

I am trying to use RW as a Kafka Connector to Postgres but facing issues with JSON field mapping and conversion. The JSON object from Kafka source has fields like 'id', 'desc', and 'created_at', while the Postgres table columns are 'content_id', 'description', and 'created_at'. How can I achieve fine-grain control for mapping and conversion in this scenario?

Jo

Joe Lin

Asked on Mar 28, 2024

  1. Create the source in RW with fields 'id', 'desc', and 'created_at'.

  2. Use a SELECT statement in the sink to map and convert the fields accordingly.

  3. Example:

-- Create source with id, desc, and created_at
CREATE SOURCE content_feature_source (
    id VARCHAR,
    desc VARCHAR, 
    created_at TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'feature_event',
    properties.bootstrap.server = '10.92.20.22:9092',
    properties.client.id = 'rw_content_feature',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Create sink with SELECT statement for mapping and conversion
CREATE SINK content_feature_sink FROM content_feature_source WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:<postgresql://postgres:5432/ai?user=postgres&password=password>',
    table.name = 'content_feature',
    type = 'upsert',
    primary_key = 'content_id',
    select = 'SELECT id as content_id, desc as description, created_at FROM content_feature_source'
);
Mar 29, 2024Edited by