all-things-risingwave

How is the state of `SplitEnumerator`s managed in RisingWave, and how does `SplitMetaData::update_with_offset` work?

I'm implementing a custom source/sink for a format similar to Debezium streams stored in Parquet. I have a basic sink working, but I'm struggling with understanding the state management of SplitEnumerators and the use of SplitMetaData::update_with_offset. Specifically, I'm facing issues with re-execution of splits after a restart, and the start_offset always being 0 when I emit records in my SplitReader. Here's a snippet of my concerns:

  1. How is the state of SplitEnumerators managed? After a restart, it seems to re-execute splits already processed.
  2. When is SplitMetaData::update_with_offset called, and is it the mechanism to mark a split as fully/partially processed?

I'm also dealing with the challenge of having to read all data in-order and managing the end_offset to be advanced periodically by an external system.

Se

Sergii Mikhtoniuk

Asked on Apr 02, 2024

Eric and Bohan Zhang provided insights into my questions:

  • SplitEnumerators do not keep states internally; they list all splits and track changes in meta.
  • SplitMetaData::update_with_offset is called when a barrier comes, and the source exec updates the offset in the state table. RisingWave is a streaming system, so splits are not marked as done; it keeps reading each split.
  • If start_offset is always 0, it might be expected as the source part may get the split assigned from the enumerator and read from the offset given.
  • For managing splits with closed-ended intervals, I need to consider making the interval part of the split's identity, but I must also handle the requirement that the enumerator cannot forget previous splits. I may need to use a single split for my stream and find a way to let the end_offset advance periodically by an external system.
Apr 03, 2024Edited by