I'm experiencing a problem where I have successfully created a source in RisingWave from a Kafka topic, but the table remains empty and shows 0 rows/s throughput, even though Confluent Platform indicates successful data consumption from the topic. I've shared a raw message from the Kafka topic, and it seems there might be an issue with the data types, particularly with article_id
being a string in JSON when it should be an integer. I'm also facing a timeout when running a query in RisingWave. Here's the query I'm trying to run:
CREATE TABLE articles (
article_id INT,
...
) WITH (
connector = 'kafka',
...
) FORMAT PLAIN ENCODE JSON;
INSERT INTO articles (
...
)
SELECT
CAST(article_id AS INT),
...
FROM articles;
I need guidance on how to adjust my approach to successfully parse and ingest data into RisingWave from Kafka.
Fahad Ullah Shah
Asked on Aug 15, 2023
To resolve the data parsing and ingestion issues, I need to ensure that the data types in the Kafka messages match the schema I've defined in RisingWave. Since I'm receiving JSON messages with article_id
as a string, I should define article_id
as VARCHAR
in the RisingWave table and then cast it to INT
in a separate query or materialized view. Additionally, the query I'm trying to run is not well-supported in RisingWave because it attempts to read from and insert into the same table, which is semantically incorrect and could lead to an infinite loop. Instead, I should create a materialized view that casts the necessary columns to the correct data types. Here's an example of how I can create a materialized view to cast article_id
to INT
:
CREATE MATERIALIZED VIEW articles_transformed AS
SELECT
CAST(article_id AS INT) AS article_id,
...
FROM articles;
After making these changes, I should be able to successfully ingest and transform the data from Kafka into RisingWave for further processing in my machine learning pipeline.