I'm working with Kafka topics where messages from one topic (topic1
) have a non-standard JSON encoding with a prefix. I've created a source with BYTES
encoding, parsed it, and created a sink to another topic (topic2
). However, when querying the timestamp (ts
) from the new source created from topic2
, the field is empty. I need to have a TIMESTAMP
field in the source to set a WATERMARK
for interval joins. I've tried various methods, including creating materialized views and adjusting the parsing of the timestamp, but I still encounter issues with empty timestamp fields. Here's an example of the SQL I've used:
CREATE SOURCE IF NOT EXISTS source1 (id bytea)
WITH (
connector='kafka',
topic='topic1',
properties.bootstrap.server='...',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE BYTES;
CREATE MATERIALIZED VIEW view1 AS
WITH extracted_json AS (
SELECT
(substring(convert_from(id, 'utf8'), 8 + position('@event' IN encode(id, 'escape')))::jsonb) AS json_data
FROM
source1
)
SELECT
to_timestamp((json_data->'ts')::double)::timestamp AS ts,
json_data->>'datetime' AS datetime,
json_data->>'d' as d
FROM
extracted_json;
CREATE SINK events_newhope_jsonb_sink FROM view1
WITH (
connector='kafka',
topic='topic2',
properties.bootstrap.server='...'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE IF NOT EXISTS source2 (
ts TIMESTAMP,
datetime VARCHAR,
d VARCHAR,
WATERMARK FOR ts AS ts - INTERVAL '60' SECOND
)
WITH (
connector='kafka',
topic='topic2',
properties.bootstrap.server='...',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;
CREATE MATERIALIZED VIEW view2 AS SELECT * FROM source2;
SELECT * from view2 limit 1;
I'm looking for suggestions on how to make the timestamps work and if there's a way to improve the overall approach.
Krzysztof Sota
Asked on Mar 21, 2024
I managed to resolve the issue by using intermediate materialized views instead of sinking directly from the source. The key difference was in changing the SQL from CREATE SINK sink1 AS select_statement_xyz
to CREATE MATERIALIZED VIEW view1 AS select_statement_xyz; CREATE SINK sink1 FROM view1 WITH (...)
. I'm not sure why they behave differently, but the materialized view approach worked. Additionally, I found that queries directly on the source returned old messages, whereas the materialized view only returned new messages with correct timestamps. Here's the final set of SQL statements that worked for me:
SET timezone = "Europe/Warsaw";
CREATE SOURCE IF NOT EXISTS source1 (id bytea)
WITH (
connector='kafka',
topic='topic1',
properties.bootstrap.server='...',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE BYTES;
CREATE MATERIALIZED VIEW view1 AS
WITH extracted_json AS (
SELECT
(substring(convert_from(id, 'utf8'), 8 + position('@event' IN encode(id, 'escape')))::jsonb) AS json_data
FROM
source1
)
SELECT
to_timestamp((json_data->'ts')::double)::timestamp AS ts,
json_data->>'datetime' AS datetime,
json_data->>'d' as d
FROM
extracted_json;
CREATE SINK events_newhope_jsonb_sink FROM view1
WITH (
connector='kafka',
topic='topic2',
properties.bootstrap.server='...'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE IF NOT EXISTS source2 (
ts TIMESTAMP,
datetime VARCHAR,
d VARCHAR,
WATERMARK FOR ts AS ts - INTERVAL '60' SECOND
)
WITH (
connector='kafka',
topic='topic2',
properties.bootstrap.server='...',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;
CREATE MATERIALIZED VIEW view2 AS SELECT * FROM source2;
SELECT * from view2 limit 1;
I hope this solution will be useful for future users facing similar issues.