all-things-risingwave

How can I create a materialized view in RisingWave with an index on JSON values for efficient querying?

I have data coming from a Kafka stream that includes JSON string arrays in the rep_codes field. I want to create a materialized view (MV) in RisingWave that allows me to quickly retrieve all account numbers associated with a given rep_code. However, I'm facing issues with the JSON functions not being recognized, and I'm also concerned about handling duplicate values and implementing an upsert pattern. Here's an example of how I'm trying to create the source and MVs:

CREATE SOURCE IF NOT EXISTS accounts_source
WITH (
    connector='kafka',
    topic='accounts',
    properties.bootstrap.server='redpanda-0:9092',
    scan.startup.mode='earliest'
)
FORMAT PLAIN ENCODE AVRO (
    schema.registry = '<http://redpanda-0:8081>',
    message = 'KafkaSeed.Models.account'
);

create materialized view if not exists rep_code_account_number
as
select unnest(rep_codes) as rep_code, account_number from accounts_source;

create materialized view if not exists rep_code_accounts
as
select
rcan.rep_code,
rcan.account_number,
ac.value,
ac.account_type,
ac.open_date,
ac.billing_start_date,
ac.managed
from rep_code_account_number rcan 
join accounts_source ac 
on rcan.account_number = ac.account_number;

I need guidance on how to properly index the JSON values and ensure that the MV updates correctly with the Kafka stream, avoiding duplicates and potentially using an upsert pattern.

Ti

Tim Gitchel

Asked on Mar 28, 2024

To handle JSON values and create an indexed MV in RisingWave, you can use the unnest function to expand the JSON array and then create the MV with the necessary fields. For handling duplicates and implementing an upsert pattern, you can create a table with the UPSERT format and then join it with the MV that contains the flattened rep_code and account_number. This approach minimizes data duplication and allows for efficient querying. Here's an example of how you can set this up:

CREATE TABLE IF NOT EXISTS accounts (
    primary key (account_key)
)
INCLUDE KEY AS account_key
WITH (
    connector='kafka',
    topic='accounts',
    properties.bootstrap.server='redpanda-0:9092',
    scan.startup.mode='earliest'
)
FORMAT UPSERT ENCODE AVRO (
    schema.registry = '<http://redpanda-0:8081>',
    message = 'KafkaSeed.Models.account'
);

CREATE INDEX accounts_account_number_idx ON public.accounts (account_number);

create materialized view if not exists rep_code_account_number
as
select unnest(rep_codes) as rep_code, account_number from accounts;

CREATE INDEX rep_code_account_number_rep_code_idx ON public.rep_code_account_number (rep_code,account_number);

select
rcan.rep_code,
rcan.account_number,
ac.value,
ac.account_type,
ac.open_date,
ac.billing_start_date,
ac.managed
from rep_code_account_number rcan 
join accounts ac 
on rcan.account_number = ac.account_number
where rcan.rep_code = 'P1ADV79F' limit 20;

This setup ensures that the MV is updated with the latest data from the Kafka stream, and the index allows for quick retrieval of account numbers by rep_code. If you encounter issues with the table not updating, make sure that the stream is not being dropped and recreated after the table has been created.

Mar 29, 2024Edited by