all-things-risingwave

What is the recommended approach to migrate data from Postgres to RisingWave using Python?

I'm currently using Postgres for analytics of Kafka streams and need to migrate to RisingWave (rw). I have data in Postgres that I don't have in Kafka anymore and need to make this data available in rw. I'm looking for a way to backfill massive data into rw using Python. I've tried a few methods, but they are either too slow or not working due to errors. I'm interested in a faster approach that can handle around 1 million records.

Here are the methods I've tried:

  1. Read from Postgres in chunks and write to rw record by record ("INSERT") - This is very slow.
  2. Read from Postgres into a pandas dataframe and use df.to_sql() - This results in a TypeError related to RisingWaveDialect.has_table().
  3. Read from Postgres into a pandas dataframe and use psycopg2's cursor.copy_from() - This results in a psycopg2.errors.InternalError_ related to SQL parser error.

I've also considered using direct CDC, but I'm only interested in old data, and setting up CDC seems like a lot of effort for a one-time migration.

Ge

Georg Boegerl

Asked on Jan 10, 2024

Tao Wu suggested a solution that involves creating a Postgres CDC table to ingest the snapshot data and then using an INSERT ... SELECT statement to move the data into another table. This approach doesn't require continuous scraping of CDC from the upstream Postgres. Here's an example of how to set up the tables and perform the data migration:

create table person (
    "id" int,
    "name" varchar,
    "email_address" varchar,
    "credit_card" varchar,
    "city" varchar,
    PRIMARY KEY ("id")
) with (
    connector = 'postgres-cdc',
    hostname = 'postgres',
    port = '5432',
    username = 'myuser',
    password = '123456',
    database.name = 'mydb',
    schema.name = 'public',
    table.name = 'person',
    slot.name = 'person'
);

create table kafka_person (
    "id" int,
    "name" varchar,
    "email_address" varchar,
    "credit_card" varchar,
    "city" varchar,
    PRIMARY KEY ("id")
) WITH (
    connector = 'kafka',
    topic = 'person',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

insert into kafka_person select * from person;

This method seems promising and I plan to give it a try.

Jan 10, 2024Edited by