pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
epoch: EpochPair,
table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
backfill_state: &mut BackfillState,
state_len: usize,
vnodes: impl Iterator<Item = VirtualNode>,
) -> StreamExecutorResult<()>
Expand description
Schema
| vnode | pk | backfill_finished
| row_count
|
Persists the state per vnode based on BackfillState
.
We track the current committed state via committed_progress
so we know whether we need to persist the state or not.
The state is encoded as follows:
NotStarted
:
- Not persist to store at all.
InProgress
:
- Format: | vnode | pk | false |
row_count
| - If change in current pos: Persist.
- No change in current pos: Do not persist.
Completed
- Format: | vnode | pk | true |
row_count
| - If previous state is
InProgress
/NotStarted
: Persist. - If previous state is Completed: Do not persist.
TODO(kwannoel): we should check committed state to be all finished
in the tests.
TODO(kwannoel): Instead of persisting state per vnode each time,
we can optimize by persisting state for a subset of vnodes which were updated.