I'm currently using Postgres for analytics of Kafka streams and need to migrate to RisingWave (rw). I have data in Postgres that I don't have in Kafka anymore and need to make this data available in rw. I'm looking for a way to backfill massive data into rw using Python. I've tried a few methods, but they are either too slow or not working due to errors. I'm interested in a faster approach that can handle around 1 million records.
Here are the methods I've tried:
df.to_sql()
- This results in a TypeError
related to RisingWaveDialect.has_table()
.psycopg2
's cursor.copy_from()
- This results in a psycopg2.errors.InternalError_
related to SQL parser error.I've also considered using direct CDC, but I'm only interested in old data, and setting up CDC seems like a lot of effort for a one-time migration.
Georg Boegerl
Asked on Jan 10, 2024
Tao Wu suggested a solution that involves creating a Postgres CDC table to ingest the snapshot data and then using an INSERT ... SELECT
statement to move the data into another table. This approach doesn't require continuous scraping of CDC from the upstream Postgres. Here's an example of how to set up the tables and perform the data migration:
create table person (
"id" int,
"name" varchar,
"email_address" varchar,
"credit_card" varchar,
"city" varchar,
PRIMARY KEY ("id")
) with (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'myuser',
password = '123456',
database.name = 'mydb',
schema.name = 'public',
table.name = 'person',
slot.name = 'person'
);
create table kafka_person (
"id" int,
"name" varchar,
"email_address" varchar,
"credit_card" varchar,
"city" varchar,
PRIMARY KEY ("id")
) WITH (
connector = 'kafka',
topic = 'person',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
insert into kafka_person select * from person;
This method seems promising and I plan to give it a try.