risingwave_stream::executor::backfill::utils

Function persist_state_per_vnode

source
pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
    epoch: EpochPair,
    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
    backfill_state: &mut BackfillState,
    state_len: usize,
    vnodes: impl Iterator<Item = VirtualNode>,
) -> StreamExecutorResult<()>
Expand description

Schema | vnode | pk | backfill_finished | row_count | Persists the state per vnode based on BackfillState. We track the current committed state via committed_progress so we know whether we need to persist the state or not.

The state is encoded as follows: NotStarted:

  • Not persist to store at all.

InProgress:

  • Format: | vnode | pk | false | row_count |
  • If change in current pos: Persist.
  • No change in current pos: Do not persist.

Completed

  • Format: | vnode | pk | true | row_count |
  • If previous state is InProgress / NotStarted: Persist.
  • If previous state is Completed: Do not persist.

TODO(kwannoel): we should check committed state to be all finished in the tests. TODO(kwannoel): Instead of persisting state per vnode each time, we can optimize by persisting state for a subset of vnodes which were updated.