Module utils

Module utils 

Source

Structsยง

BackfillState
BackfillStatePerVnode
UpstreamStreamKeyUpdateNormalizer ๐Ÿ”’
Rewrites upstream updates when the input stream key is not the same column set as the current executor stream key.

Enumsยง

BackfillProgressPerVnode
Used for tracking backfill state per vnode The OwnedRow only contains the pk of upstream, to track current_pos.

Constantsยง

METADATA_STATE_LEN
vnode, is_finished, row_count, all occupy 1 column each.

Functionsยง

build_temporary_state ๐Ÿ”’
We want to avoid allocating a row for every vnode. Instead we can just modify a single row, and dispatch it to state table to write. This builds the following segments of the row:
compute_bounds ๐Ÿ”’
construct_initial_finished_state ๐Ÿ”’
create_builder
Creates a data chunk builder for snapshot read. If the rate_limit is smaller than chunk_size, it will take precedence. This is so we can partition snapshot read into smaller chunks than chunk size.
flush_data ๐Ÿ”’
Flush the data
get_cdc_chunk_last_offset ๐Ÿ”’
get_new_pos ๐Ÿ”’
Get new backfill pos from the chunk. Since chunk should have ordered rows, we can just take the last row.
get_progress_per_vnode ๐Ÿ”’
Recovers progress per vnode, so we know which to backfill. See how it decodes the state with the inline comments.
iter_chunks ๐Ÿ”’
mapping_chunk ๐Ÿ”’
Builds a new stream chunk with output_indices.
mapping_message ๐Ÿ”’
mapping_watermark ๐Ÿ”’
mark_cdc_chunk ๐Ÿ”’
mark_cdc_chunk_inner ๐Ÿ”’
mark_chunk ๐Ÿ”’
mark_chunk_inner ๐Ÿ”’
Mark chunk: For each row of the chunk, forward it to downstream if its pk <= current_pos, otherwise ignore it. We implement it by changing the visibility bitmap.
mark_chunk_ref_by_vnode ๐Ÿ”’
Mark chunk: For each row of the chunk, forward it to downstream if its pk <= current_pos for the corresponding vnode, otherwise ignore it. We implement it by changing the visibility bitmap.
normalize_unmatched_updates ๐Ÿ”’
We will rewrite unmatched U-/U+ into +/- ops. They can be unmatched because while they will always have the same stream key, their storage pk might be different. Here we use storage pk (current_pos) to filter them, as such, a U+ might be filtered out, but their corresponding U- could be kept, and vice versa.
normalize_update_chunk_by_key ๐Ÿ”’
persist_state ๐Ÿ”’
Schema | vnode | pk | backfill_finished | row_count |
persist_state_per_vnode ๐Ÿ”’
Schema | vnode | pk | backfill_finished | row_count | Persists the state per vnode based on BackfillState. We track the current committed state via committed_progress so we know whether we need to persist the state or not.
same_key_columns ๐Ÿ”’
update_pos_by_vnode ๐Ÿ”’
Update backfill pos by vnode.