I am streaming data from a Kafka topic to RisingWave (RW) and have a specific message structure that I need to use. However, when I attempt to create a table in RW with the given structure, the data does not get written to the respective columns, except for the vehicle_number
column. Here is the message structure and the Create Query I am using:
//message from topic
{
"vehicle_number": "GD2497",
"can_info": {
"ttc_hour_info": 0,
// other fields omitted for brevity
}
}
//Table
CREATE TABLE IF NOT EXISTS "can-param"
(
"vehicle_number" text,
"can_info.ttc_hour_info" bigint,
// other fields omitted for brevity
) WITH (
connector = 'kafka',
// other properties omitted for brevity
) FORMAT PLAIN ENCODE JSON;```
I cannot change the message structure. Could you please suggest where I might be going wrong, or point me to the relevant documentation?
Anurag Ambuj
Asked on Apr 12, 2024
It seems like the issue might be related to the way the can_info
structure is defined in the Create Query. Instead of defining each field within can_info
separately, you should use the struct
data type to define the nested structure. Here's an example of how you can define the can_info
column using struct
:
CREATE TABLE IF NOT EXISTS "can-param"
(
"vehicle_number" text,
"can_info" struct<
"ttc_hour_info" bigint,
// other fields omitted for brevity
>
// other properties omitted for brevity
) ...
Additionally, keep in mind that data ingestion is an asynchronous process in RisingWave, and errors related to data ingestion will appear in the compute node logs rather than being returned to the SQL client. You should check the compute node logs for messages like risingwave_connector::parser: failed to parse non-pk column, padding with 'NULL' error=Undefined field 'can_info.ttc_hour_info' at ''
to diagnose any issues with the data ingestion process.