We want to avoid allocating a row for every vnode.
Instead we can just modify a single row, and dispatch it to state table to write.
This builds the following segments of the row:
Creates a data chunk builder for snapshot read.
If the rate_limit is smaller than chunk_size, it will take precedence.
This is so we can partition snapshot read into smaller chunks than chunk size.
Mark chunk:
For each row of the chunk, forward it to downstream if its pk <= current_pos, otherwise
ignore it. We implement it by changing the visibility bitmap.
Mark chunk:
For each row of the chunk, forward it to downstream if its pk <= current_pos for the
corresponding vnode, otherwise ignore it.
We implement it by changing the visibility bitmap.
We will rewrite unmatched U-/U+ into +/- ops.
They can be unmatched because while they will always have the same stream key,
their storage pk might be different. Here we use storage pk (current_pos) to filter them,
as such, a U+ might be filtered out, but their corresponding U- could be kept, and vice versa.
Schema
| vnode | pk | backfill_finished | row_count |
Persists the state per vnode based on BackfillState.
We track the current committed state via committed_progress
so we know whether we need to persist the state or not.