troubleshooting

How can I achieve low tail latencies in stream processing with RisingWave?

I'm working on a fraud detection system and need to join two streams by ID and time with very low tail latencies (p99.99 < 30ms). I'm using RisingWave and RedPanda for stream processing. I've noticed that when there is no JOIN, the end-to-end latency is acceptable (~5-15ms), but with a JOIN, the latency jumps randomly up to barrier_interval_ms. I've set barrier_interval_ms to 100, which resulted in a p99 latency of ~120ms. I suspect the latency issue is related to the streaming mechanism of RisingWave rather than RedPanda/SOURCE latency.

Here's an example of the SQL I'm using:

CREATE SOURCE IF NOT EXISTS evt_login_step (
    time_sent_ms BIGINT,
    assumed_user_id VARCHAR,
    user_id VARCHAR,
    visit_id VARCHAR,
    action_id VARCHAR,
    proc_time timestamptz as proctime(),
    WATERMARK FOR ts AS ts - interval '00:00:00.005'
)
INCLUDE key as row_key
INCLUDE timestamp as ts
WITH (
    connector='kafka',
    topic='EVT_LOGIN_STEP',
    properties.bootstrap.server='redpanda:29092',
    scan.startup.mode='latest',
    properties.fetch.wait.max.ms='5'
) FORMAT PLAIN ENCODE JSON;

CREATE SINK IF NOT EXISTS sink_login_step AS
SELECT
    ls.assumed_user_id,
    ls.action_time_ms,
    ls.time_sent_ms as action_time_sent_ms,
    ls.proc_time as action_proc_time,
    tbp.proc_time as tbp_proc_time
FROM (
    SELECT * FROM evt_login_step
) ls
LEFT OUTER JOIN evt_identity_prediction tbp ON tbp.action_id = ls.action_id AND tbp.ts BETWEEN ls.ts - INTERVAL '1' SECOND AND ls.ts + INTERVAL '1' SECOND
WITH (
    connector='kafka',
    topic='SINK_LOGIN_STEP',
    properties.bootstrap.server='redpanda:29092',
    properties.batch.num.messages='1',
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true'
);```

My questions are:

- Is there a way to bypass the barrier mechanism to reduce latency?
- Can `barrier_interval_ms` be reduced to 30ms to meet our latency requirements, and what would be the side-effects?
- Is there any way to delay one stream by a few milliseconds (e.g., 5ms) to achieve precision despite `barrier_interval_ms` or other internal timers?
Kr

Krystof Hilar

Asked on Mar 23, 2024

Regarding your concerns about latency in RisingWave when joining streams:

  • The barrier mechanism in RisingWave is indeed responsible for the observed latency. It ensures consistency across the distributed system but can introduce delays as it travels through the graph.
  • Reducing barrier_interval_ms to 30ms might be technically possible by altering the code or hacking the value in memory, but it could have side-effects such as increased load on state storage due to more frequent checkpoints.
  • Delaying one stream by a few milliseconds is theoretically possible using the SQL syntax you mentioned (interval '00:00:00.005'), but achieving such precision might be challenging due to the barrier mechanism and other internal timings.

For the JOIN operation, you might want to consider using windowing with Emit on Window Close semantics or ensuring that your SINK and SOURCE have consistent primary keys to avoid internal buffering. If you switch to an INNER JOIN, it could make the stream append-only, thus eliminating the need for buffering.

Lastly, achieving sub-30ms tail latencies is ambitious and may not be feasible with current mainstream streaming systems. It's important to balance the need for low latency with the system's consistency and fault tolerance capabilities.

Mar 24, 2024Edited by