risingwave_stream/executor/backfill/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::cmp::{max, min};
17use std::collections::HashMap;
18use std::ops::Bound;
19
20use await_tree::InstrumentAwait;
21use futures::Stream;
22use futures::future::try_join_all;
23use futures_async_stream::try_stream;
24use risingwave_common::array::stream_record::Record;
25use risingwave_common::array::{Op, StreamChunk};
26use risingwave_common::bail;
27use risingwave_common::bitmap::BitmapBuilder;
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, Row, RowExt};
30use risingwave_common::types::{DataType, Datum};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::epoch::EpochPair;
33use risingwave_common::util::iter_util::ZipEqDebug;
34use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
35use risingwave_common::util::value_encoding::BasicSerde;
36use risingwave_common_rate_limit::RateLimit;
37use risingwave_connector::error::ConnectorError;
38use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
39use risingwave_storage::StateStore;
40use risingwave_storage::row_serde::value_serde::ValueRowSerde;
41use risingwave_storage::table::collect_data_chunk_with_builder;
42
43use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
44use crate::executor::{
45    Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
46};
47
48/// `vnode`, `is_finished`, `row_count`, all occupy 1 column each.
49pub const METADATA_STATE_LEN: usize = 3;
50
51#[derive(Clone, Debug)]
52pub struct BackfillState {
53    /// Used to track backfill progress.
54    // TODO: Instead of using hashmap, perhaps we can just use static array.
55    inner: HashMap<VirtualNode, BackfillStatePerVnode>,
56}
57
58impl BackfillState {
59    pub(crate) fn has_progress(&self) -> bool {
60        self.inner.values().any(|p| {
61            matches!(
62                p.current_state(),
63                &BackfillProgressPerVnode::InProgress { .. }
64            )
65        })
66    }
67
68    pub(crate) fn get_current_state(
69        &mut self,
70        vnode: &VirtualNode,
71    ) -> &mut BackfillProgressPerVnode {
72        &mut self.inner.get_mut(vnode).unwrap().current_state
73    }
74
75    // Expects the vnode to always have progress, otherwise it will return an error.
76    pub(crate) fn get_progress(
77        &self,
78        vnode: &VirtualNode,
79    ) -> StreamExecutorResult<&BackfillProgressPerVnode> {
80        match self.inner.get(vnode) {
81            Some(p) => Ok(p.current_state()),
82            None => bail!(
83                "Backfill progress for vnode {:#?} not found, backfill_state not initialized properly",
84                vnode,
85            ),
86        }
87    }
88
89    pub(crate) fn update_progress(
90        &mut self,
91        vnode: VirtualNode,
92        new_pos: OwnedRow,
93        snapshot_row_count_delta: u64,
94    ) -> StreamExecutorResult<()> {
95        let state = self.get_current_state(&vnode);
96        match state {
97            BackfillProgressPerVnode::NotStarted => {
98                *state = BackfillProgressPerVnode::InProgress {
99                    current_pos: new_pos,
100                    snapshot_row_count: snapshot_row_count_delta,
101                };
102            }
103            BackfillProgressPerVnode::InProgress {
104                snapshot_row_count, ..
105            } => {
106                *state = BackfillProgressPerVnode::InProgress {
107                    current_pos: new_pos,
108                    snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta,
109                };
110            }
111            BackfillProgressPerVnode::Completed { .. } => unreachable!(),
112        }
113        Ok(())
114    }
115
116    pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) {
117        let finished_placeholder_position = construct_initial_finished_state(pos_len);
118        let current_state = self.get_current_state(&vnode);
119        let (new_pos, snapshot_row_count) = match current_state {
120            BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0),
121            BackfillProgressPerVnode::InProgress {
122                current_pos,
123                snapshot_row_count,
124            } => (current_pos.clone(), *snapshot_row_count),
125            BackfillProgressPerVnode::Completed { .. } => {
126                return;
127            }
128        };
129        *current_state = BackfillProgressPerVnode::Completed {
130            current_pos: new_pos,
131            snapshot_row_count,
132        };
133    }
134
135    /// Return state to be committed.
136    fn get_commit_state(&self, vnode: &VirtualNode) -> Option<(Option<Vec<Datum>>, Vec<Datum>)> {
137        let new_state = self.inner.get(vnode).unwrap().current_state().clone();
138        let new_encoded_state = match new_state {
139            BackfillProgressPerVnode::NotStarted => unreachable!(),
140            BackfillProgressPerVnode::InProgress {
141                current_pos,
142                snapshot_row_count,
143            } => {
144                let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
145                encoded_state[0] = Some(vnode.to_scalar().into());
146                encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
147                encoded_state[current_pos.len() + 1] = Some(false.into());
148                encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
149                encoded_state
150            }
151            BackfillProgressPerVnode::Completed {
152                current_pos,
153                snapshot_row_count,
154            } => {
155                let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
156                encoded_state[0] = Some(vnode.to_scalar().into());
157                encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
158                encoded_state[current_pos.len() + 1] = Some(true.into());
159                encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
160                encoded_state
161            }
162        };
163        let old_state = self.inner.get(vnode).unwrap().committed_state().clone();
164        let old_encoded_state = match old_state {
165            BackfillProgressPerVnode::NotStarted => None,
166            BackfillProgressPerVnode::InProgress {
167                current_pos,
168                snapshot_row_count,
169            } => {
170                let committed_pos = current_pos;
171                let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
172                encoded_state[0] = Some(vnode.to_scalar().into());
173                encoded_state[1..committed_pos.len() + 1]
174                    .clone_from_slice(committed_pos.as_inner());
175                encoded_state[committed_pos.len() + 1] = Some(false.into());
176                encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
177                Some(encoded_state)
178            }
179            BackfillProgressPerVnode::Completed {
180                current_pos,
181                snapshot_row_count,
182            } => {
183                let committed_pos = current_pos;
184                let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
185                encoded_state[0] = Some(vnode.to_scalar().into());
186                encoded_state[1..committed_pos.len() + 1]
187                    .clone_from_slice(committed_pos.as_inner());
188                encoded_state[committed_pos.len() + 1] = Some(true.into());
189                encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
190                Some(encoded_state)
191            }
192        };
193        Some((old_encoded_state, new_encoded_state))
194    }
195
196    // TODO: We can add a committed flag to speed up this check.
197    /// Checks if the state needs to be committed.
198    fn need_commit(&self, vnode: &VirtualNode) -> bool {
199        let state = self.inner.get(vnode).unwrap();
200        match state.current_state() {
201            // If current state and committed state are the same, we don't need to commit.
202            s @ BackfillProgressPerVnode::InProgress { .. }
203            | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(),
204            BackfillProgressPerVnode::NotStarted => false,
205        }
206    }
207
208    fn mark_committed(&mut self, vnode: VirtualNode) {
209        let BackfillStatePerVnode {
210            committed_state,
211            current_state,
212        } = self.inner.get_mut(&vnode).unwrap();
213
214        assert!(matches!(
215            current_state,
216            BackfillProgressPerVnode::InProgress { .. }
217                | BackfillProgressPerVnode::Completed { .. }
218        ));
219        *committed_state = current_state.clone();
220    }
221
222    pub(crate) fn get_snapshot_row_count(&self) -> u64 {
223        self.inner
224            .values()
225            .map(|p| p.get_snapshot_row_count())
226            .sum()
227    }
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
231pub struct BackfillStatePerVnode {
232    committed_state: BackfillProgressPerVnode,
233    current_state: BackfillProgressPerVnode,
234}
235
236impl BackfillStatePerVnode {
237    pub(crate) fn new(
238        committed_state: BackfillProgressPerVnode,
239        current_state: BackfillProgressPerVnode,
240    ) -> Self {
241        Self {
242            committed_state,
243            current_state,
244        }
245    }
246
247    pub(crate) fn committed_state(&self) -> &BackfillProgressPerVnode {
248        &self.committed_state
249    }
250
251    pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
252        &self.current_state
253    }
254
255    pub(crate) fn get_snapshot_row_count(&self) -> u64 {
256        self.current_state().get_snapshot_row_count()
257    }
258}
259
260impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
261    fn from(v: Vec<(VirtualNode, BackfillStatePerVnode)>) -> Self {
262        Self {
263            inner: v.into_iter().collect(),
264        }
265    }
266}
267
268/// Used for tracking backfill state per vnode
269/// The `OwnedRow` only contains the pk of upstream, to track `current_pos`.
270#[derive(Clone, Eq, PartialEq, Debug)]
271pub enum BackfillProgressPerVnode {
272    /// no entry exists for a vnode, or on initialization of the executor.
273    NotStarted,
274    InProgress {
275        /// The current snapshot offset
276        current_pos: OwnedRow,
277        /// Number of snapshot records read for this vnode.
278        snapshot_row_count: u64,
279    },
280    Completed {
281        /// The current snapshot offset
282        current_pos: OwnedRow,
283        /// Number of snapshot records read for this vnode.
284        snapshot_row_count: u64,
285    },
286}
287
288impl BackfillProgressPerVnode {
289    fn get_snapshot_row_count(&self) -> u64 {
290        match self {
291            BackfillProgressPerVnode::NotStarted => 0,
292            BackfillProgressPerVnode::InProgress {
293                snapshot_row_count, ..
294            }
295            | BackfillProgressPerVnode::Completed {
296                snapshot_row_count, ..
297            } => *snapshot_row_count,
298        }
299    }
300}
301
302pub(crate) fn mark_chunk(
303    chunk: StreamChunk,
304    current_pos: &OwnedRow,
305    pk_in_output_indices: PkIndicesRef<'_>,
306    pk_order: &[OrderType],
307) -> StreamChunk {
308    let chunk = chunk.compact();
309    mark_chunk_inner(chunk, current_pos, pk_in_output_indices, pk_order)
310}
311
312pub(crate) fn mark_cdc_chunk(
313    offset_parse_func: &CdcOffsetParseFunc,
314    chunk: StreamChunk,
315    current_pos: &OwnedRow,
316    pk_in_output_indices: PkIndicesRef<'_>,
317    pk_order: &[OrderType],
318    last_cdc_offset: Option<CdcOffset>,
319) -> StreamExecutorResult<StreamChunk> {
320    let chunk = chunk.compact();
321    mark_cdc_chunk_inner(
322        offset_parse_func,
323        chunk,
324        current_pos,
325        last_cdc_offset,
326        pk_in_output_indices,
327        pk_order,
328    )
329}
330
331/// Mark chunk:
332/// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
333/// corresponding `vnode`, otherwise ignore it.
334/// We implement it by changing the visibility bitmap.
335pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
336    chunk: &StreamChunk,
337    backfill_state: &BackfillState,
338    pk_in_output_indices: PkIndicesRef<'_>,
339    upstream_table: &ReplicatedStateTable<S, SD>,
340    pk_order: &[OrderType],
341) -> StreamExecutorResult<StreamChunk> {
342    let chunk = chunk.clone();
343    let (data, ops) = chunk.into_parts();
344    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
345
346    let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
347    let mut unmatched_update_delete = false;
348    let mut visible_update_delete = false;
349    for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
350        let pk = row.project(pk_in_output_indices);
351        let vnode = upstream_table.compute_vnode_by_pk(pk);
352        let visible = match backfill_state.get_progress(&vnode)? {
353            // We want to just forward the row, if the vnode has finished backfill.
354            BackfillProgressPerVnode::Completed { .. } => true,
355            // If not started, no need to forward.
356            BackfillProgressPerVnode::NotStarted => false,
357            // If in progress, we need to check row <= current_pos.
358            BackfillProgressPerVnode::InProgress { current_pos, .. } => {
359                cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
360            }
361        };
362        if !visible {
363            tracing::trace!(
364                source = "upstream",
365                state = "process_barrier",
366                action = "mark_chunk",
367                ?vnode,
368                ?op,
369                ?pk,
370                ?row,
371                "update_filtered",
372            );
373        }
374        new_visibility.append(visible);
375
376        normalize_unmatched_updates(
377            &mut new_ops,
378            &mut unmatched_update_delete,
379            &mut visible_update_delete,
380            visible,
381            i,
382            op,
383        );
384    }
385    let (columns, _) = data.into_parts();
386    let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
387    Ok(chunk)
388}
389
390/// Mark chunk:
391/// For each row of the chunk, forward it to downstream if its pk <= `current_pos`, otherwise
392/// ignore it. We implement it by changing the visibility bitmap.
393fn mark_chunk_inner(
394    chunk: StreamChunk,
395    current_pos: &OwnedRow,
396    pk_in_output_indices: PkIndicesRef<'_>,
397    pk_order: &[OrderType],
398) -> StreamChunk {
399    let (data, ops) = chunk.into_parts();
400    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
401    let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
402    let mut unmatched_update_delete = false;
403    let mut visible_update_delete = false;
404    for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
405        let lhs = row.project(pk_in_output_indices);
406        let rhs = current_pos;
407        let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
408        new_visibility.append(visible);
409
410        normalize_unmatched_updates(
411            &mut new_ops,
412            &mut unmatched_update_delete,
413            &mut visible_update_delete,
414            visible,
415            i,
416            op,
417        );
418    }
419    let (columns, _) = data.into_parts();
420    StreamChunk::with_visibility(new_ops, columns, new_visibility.finish())
421}
422
423/// We will rewrite unmatched U-/U+ into +/- ops.
424/// They can be unmatched because while they will always have the same stream key,
425/// their storage pk might be different. Here we use storage pk (`current_pos`) to filter them,
426/// as such, a U+ might be filtered out, but their corresponding U- could be kept, and vice versa.
427///
428/// This hanging U-/U+ can lead to issues downstream, since we work with an assumption in the
429/// system that there's never hanging U-/U+.
430fn normalize_unmatched_updates(
431    normalized_ops: &mut Cow<'_, [Op]>,
432    unmatched_update_delete: &mut bool,
433    visible_update_delete: &mut bool,
434    current_visibility: bool,
435    current_op_index: usize,
436    current_op: &Op,
437) {
438    if *unmatched_update_delete {
439        assert_eq!(*current_op, Op::UpdateInsert);
440        let visible_update_insert = current_visibility;
441        match (visible_update_delete, visible_update_insert) {
442            (true, false) => {
443                // Lazily clone the ops here.
444                let ops = normalized_ops.to_mut();
445                ops[current_op_index - 1] = Op::Delete;
446            }
447            (false, true) => {
448                // Lazily clone the ops here.
449                let ops = normalized_ops.to_mut();
450                ops[current_op_index] = Op::Insert;
451            }
452            (true, true) | (false, false) => {}
453        }
454        *unmatched_update_delete = false;
455    } else {
456        match current_op {
457            Op::UpdateDelete => {
458                *unmatched_update_delete = true;
459                *visible_update_delete = current_visibility;
460            }
461            Op::UpdateInsert => {
462                unreachable!("UpdateInsert should not be present without UpdateDelete")
463            }
464            _ => {}
465        }
466    }
467}
468
469fn mark_cdc_chunk_inner(
470    offset_parse_func: &CdcOffsetParseFunc,
471    chunk: StreamChunk,
472    current_pos: &OwnedRow,
473    last_cdc_offset: Option<CdcOffset>,
474    pk_in_output_indices: PkIndicesRef<'_>,
475    pk_order: &[OrderType],
476) -> StreamExecutorResult<StreamChunk> {
477    let (data, ops) = chunk.into_parts();
478    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
479
480    // `_rw_offset` must be placed at the last column right now
481    let offset_col_idx = data.dimension() - 1;
482    for v in data.rows().map(|row| {
483        let offset_datum = row.datum_at(offset_col_idx).unwrap();
484        let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?;
485        let visible = {
486            // filter changelog events with binlog range
487            let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset {
488                binlog_low <= &event_offset
489            } else {
490                true
491            };
492
493            if in_binlog_range {
494                let lhs = row.project(pk_in_output_indices);
495                let rhs = current_pos;
496                cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
497            } else {
498                false
499            }
500        };
501        Ok::<_, ConnectorError>(visible)
502    }) {
503        new_visibility.append(v?);
504    }
505
506    let (columns, _) = data.into_parts();
507    Ok(StreamChunk::with_visibility(
508        ops,
509        columns,
510        new_visibility.finish(),
511    ))
512}
513
514/// Builds a new stream chunk with `output_indices`.
515pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk {
516    let (ops, columns, visibility) = chunk.into_inner();
517    let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect();
518    StreamChunk::with_visibility(ops, mapped_columns, visibility)
519}
520
521fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
522    watermark.transform_with_indices(upstream_indices)
523}
524
525pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
526    match msg {
527        Message::Barrier(_) => Some(msg),
528        Message::Watermark(watermark) => {
529            mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
530        }
531        Message::Chunk(chunk) => Some(Message::Chunk(mapping_chunk(chunk, upstream_indices))),
532    }
533}
534
535/// Recovers progress per vnode, so we know which to backfill.
536/// See how it decodes the state with the inline comments.
537pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
538    state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
539) -> StreamExecutorResult<Vec<(VirtualNode, BackfillStatePerVnode)>> {
540    debug_assert!(!state_table.vnodes().is_empty());
541    let vnodes = state_table.vnodes().iter_vnodes();
542    let mut result = Vec::with_capacity(state_table.vnodes().len());
543    // 1. Get the vnode keys, so we can get the state per vnode.
544    let vnode_keys = vnodes.map(|vnode| {
545        let datum: [Datum; 1] = [Some(vnode.to_scalar().into())];
546        datum
547    });
548    let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key));
549    // 2. Fetch the state for each vnode.
550    //    It should have the following schema, it should not contain vnode:
551    //    | pk | `backfill_finished` | `row_count` |
552    let state_for_vnodes = try_join_all(tasks).await?;
553    for (vnode, state_for_vnode) in state_table
554        .vnodes()
555        .iter_vnodes()
556        .zip_eq_debug(state_for_vnodes)
557    {
558        let backfill_progress = match state_for_vnode {
559            // There's some state, means there was progress made. It's either finished / in progress.
560            Some(row) => {
561                // 3. Decode the `snapshot_row_count`. Decode from the back, since
562                //    pk is variable length.
563                let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap();
564                let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64;
565
566                // 4. Decode the `is_finished` flag (whether backfill has finished).
567                //    Decode from the back, since pk is variable length.
568                let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap();
569                let vnode_is_finished = vnode_is_finished.as_ref().unwrap();
570
571                // 5. Decode the `current_pos`.
572                let current_pos = row.as_inner().get(..row.len() - 2).unwrap();
573                let current_pos = current_pos.into_owned_row();
574
575                // 6. Construct the in-memory state per vnode, based on the decoded state.
576                if *vnode_is_finished.as_bool() {
577                    BackfillStatePerVnode::new(
578                        BackfillProgressPerVnode::Completed {
579                            current_pos: current_pos.clone(),
580                            snapshot_row_count,
581                        },
582                        BackfillProgressPerVnode::Completed {
583                            current_pos,
584                            snapshot_row_count,
585                        },
586                    )
587                } else {
588                    BackfillStatePerVnode::new(
589                        BackfillProgressPerVnode::InProgress {
590                            current_pos: current_pos.clone(),
591                            snapshot_row_count,
592                        },
593                        BackfillProgressPerVnode::InProgress {
594                            current_pos,
595                            snapshot_row_count,
596                        },
597                    )
598                }
599            }
600            // No state, means no progress made.
601            None => BackfillStatePerVnode::new(
602                BackfillProgressPerVnode::NotStarted,
603                BackfillProgressPerVnode::NotStarted,
604            ),
605        };
606        result.push((vnode, backfill_progress));
607    }
608    assert_eq!(result.len(), state_table.vnodes().count_ones());
609    Ok(result)
610}
611
612/// Flush the data
613pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
614    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
615    epoch: EpochPair,
616    old_state: &mut Option<Vec<Datum>>,
617    current_partial_state: &mut [Datum],
618) -> StreamExecutorResult<()> {
619    let vnodes = table.vnodes().clone();
620    if let Some(old_state) = old_state {
621        if old_state[1..] != current_partial_state[1..] {
622            vnodes.iter_vnodes_scalar().for_each(|vnode| {
623                let datum = Some(vnode.into());
624                current_partial_state[0].clone_from(&datum);
625                old_state[0] = datum;
626                table.write_record(Record::Update {
627                    old_row: &old_state[..],
628                    new_row: &(*current_partial_state),
629                })
630            });
631        }
632    } else {
633        // No existing state, create a new entry.
634        vnodes.iter_vnodes_scalar().for_each(|vnode| {
635            let datum = Some(vnode.into());
636            // fill the state
637            current_partial_state[0] = datum;
638            table.write_record(Record::Insert {
639                new_row: &(*current_partial_state),
640            })
641        });
642    }
643    table.commit_assert_no_update_vnode_bitmap(epoch).await
644}
645
646/// We want to avoid allocating a row for every vnode.
647/// Instead we can just modify a single row, and dispatch it to state table to write.
648/// This builds the following segments of the row:
649/// 1. `current_pos`
650/// 2. `backfill_finished`
651/// 3. `row_count`
652pub(crate) fn build_temporary_state(
653    row_state: &mut [Datum],
654    is_finished: bool,
655    current_pos: &OwnedRow,
656    row_count: u64,
657) {
658    row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
659    row_state[current_pos.len() + 1] = Some(is_finished.into());
660    row_state[current_pos.len() + 2] = Some((row_count as i64).into());
661}
662
663/// Update backfill pos by vnode.
664pub(crate) fn update_pos_by_vnode(
665    vnode: VirtualNode,
666    chunk: &StreamChunk,
667    pk_in_output_indices: &[usize],
668    backfill_state: &mut BackfillState,
669    snapshot_row_count_delta: u64,
670) -> StreamExecutorResult<()> {
671    let new_pos = get_new_pos(chunk, pk_in_output_indices);
672    assert_eq!(new_pos.len(), pk_in_output_indices.len());
673    backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?;
674    Ok(())
675}
676
677/// Get new backfill pos from the chunk. Since chunk should have ordered rows, we can just take the
678/// last row.
679pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> OwnedRow {
680    chunk
681        .rows()
682        .last()
683        .unwrap()
684        .1
685        .project(pk_in_output_indices)
686        .into_owned_row()
687}
688
689pub(crate) fn get_cdc_chunk_last_offset(
690    offset_parse_func: &CdcOffsetParseFunc,
691    chunk: &StreamChunk,
692) -> StreamExecutorResult<Option<CdcOffset>> {
693    let row = chunk.rows().last().unwrap().1;
694    let offset_col = row.iter().last().unwrap();
695    let output =
696        offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?);
697    output.transpose().map_err(|e| e.into())
698}
699
700// NOTE(kwannoel): ["None" ..] encoding should be appropriate to mark
701// the case where upstream snapshot is empty.
702// This is so we can persist backfill state as "finished".
703// It won't be confused with another case where pk position comprised of nulls,
704// because they both record that backfill is finished.
705pub(crate) fn construct_initial_finished_state(pos_len: usize) -> OwnedRow {
706    OwnedRow::new(vec![None; pos_len])
707}
708
709pub(crate) fn compute_bounds(
710    pk_indices: &[usize],
711    current_pos: Option<OwnedRow>,
712) -> Option<(Bound<OwnedRow>, Bound<OwnedRow>)> {
713    // `current_pos` is None means it needs to scan from the beginning, so we use Unbounded to
714    // scan. Otherwise, use Excluded.
715    if let Some(current_pos) = current_pos {
716        // If `current_pos` is an empty row which means upstream mv contains only one row and it
717        // has been consumed. The iter interface doesn't support
718        // `Excluded(empty_row)` range bound, so we can simply return `None`.
719        if current_pos.is_empty() {
720            assert!(pk_indices.is_empty());
721            return None;
722        }
723
724        Some((Bound::Excluded(current_pos), Bound::Unbounded))
725    } else {
726        Some((Bound::Unbounded, Bound::Unbounded))
727    }
728}
729
730#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
731pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder)
732where
733    StreamExecutorError: From<E>,
734    R: Row,
735    S: Stream<Item = Result<R, E>> + Unpin + 'a,
736{
737    while let Some(data_chunk) = collect_data_chunk_with_builder(&mut iter, builder)
738        .instrument_await("backfill_snapshot_read")
739        .await?
740    {
741        debug_assert!(data_chunk.cardinality() > 0);
742        let ops = vec![Op::Insert; data_chunk.capacity()];
743        let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
744        yield stream_chunk;
745    }
746}
747
748/// Schema
749/// | vnode | pk | `backfill_finished` | `row_count` |
750/// Persists the state per vnode based on `BackfillState`.
751/// We track the current committed state via `committed_progress`
752/// so we know whether we need to persist the state or not.
753///
754/// The state is encoded as follows:
755/// `NotStarted`:
756/// - Not persist to store at all.
757///
758/// `InProgress`:
759/// - Format: | vnode | pk | false | `row_count` |
760/// - If change in current pos: Persist.
761/// - No change in current pos: Do not persist.
762///
763/// Completed
764/// - Format: | vnode | pk | true | `row_count` |
765/// - If previous state is `InProgress` / `NotStarted`: Persist.
766/// - If previous state is Completed: Do not persist.
767///
768/// TODO(kwannoel): we should check committed state to be all `finished` in the tests.
769/// TODO(kwannoel): Instead of persisting state per vnode each time,
770/// we can optimize by persisting state for a subset of vnodes which were updated.
771pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
772    epoch: EpochPair,
773    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
774    backfill_state: &mut BackfillState,
775    #[cfg(debug_assertions)] state_len: usize,
776    vnodes: impl Iterator<Item = VirtualNode>,
777) -> StreamExecutorResult<()> {
778    for vnode in vnodes {
779        if !backfill_state.need_commit(&vnode) {
780            continue;
781        }
782        let (encoded_prev_state, encoded_current_state) =
783            match backfill_state.get_commit_state(&vnode) {
784                Some((old_state, new_state)) => (old_state, new_state),
785                None => continue,
786            };
787        if let Some(encoded_prev_state) = encoded_prev_state {
788            // There's some progress, update the state.
789            #[cfg(debug_assertions)]
790            {
791                let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
792                // old_row only contains the value segment.
793                let old_row = table.get_row(pk).await?;
794                match old_row {
795                    Some(old_row) => {
796                        let inner = old_row.as_inner();
797                        // value segment (without vnode) should be used for comparison
798                        assert_eq!(inner, &encoded_prev_state[1..]);
799                        assert_ne!(inner, &encoded_current_state[1..]);
800                        assert_eq!(old_row.len(), state_len - 1);
801                        assert_eq!(encoded_current_state.len(), state_len);
802                    }
803                    None => {
804                        bail!("row {:#?} not found", pk);
805                    }
806                }
807            }
808            table.write_record(Record::Update {
809                old_row: &encoded_prev_state[..],
810                new_row: &encoded_current_state[..],
811            });
812        } else {
813            // No existing state, create a new entry.
814            #[cfg(debug_assertions)]
815            {
816                let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
817                let row = table.get_row(pk).await?;
818                assert!(row.is_none(), "row {:#?}", row);
819                assert_eq!(encoded_current_state.len(), state_len);
820            }
821            table.write_record(Record::Insert {
822                new_row: &encoded_current_state[..],
823            });
824        }
825        backfill_state.mark_committed(vnode);
826    }
827
828    table.commit_assert_no_update_vnode_bitmap(epoch).await?;
829    Ok(())
830}
831
832/// Schema
833/// | vnode | pk | `backfill_finished` | `row_count` |
834///
835/// For `current_pos` and `old_pos` are just pk of upstream.
836/// They should be strictly increasing.
837pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
838    epoch: EpochPair,
839    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
840    is_finished: bool,
841    current_pos: &Option<OwnedRow>,
842    row_count: u64,
843    old_state: &mut Option<Vec<Datum>>,
844    current_state: &mut [Datum],
845) -> StreamExecutorResult<()> {
846    if let Some(current_pos_inner) = current_pos {
847        // state w/o vnodes.
848        build_temporary_state(current_state, is_finished, current_pos_inner, row_count);
849        flush_data(table, epoch, old_state, current_state).await?;
850        *old_state = Some(current_state.into());
851    } else {
852        table.commit_assert_no_update_vnode_bitmap(epoch).await?;
853    }
854    Ok(())
855}
856
857/// Creates a data chunk builder for snapshot read.
858/// If the `rate_limit` is smaller than `chunk_size`, it will take precedence.
859/// This is so we can partition snapshot read into smaller chunks than chunk size.
860pub fn create_builder(
861    rate_limit: RateLimit,
862    chunk_size: usize,
863    data_types: Vec<DataType>,
864) -> DataChunkBuilder {
865    let batch_size = match rate_limit {
866        RateLimit::Disabled | RateLimit::Pause => chunk_size,
867        RateLimit::Fixed(limit) => min(limit.get() as usize, chunk_size),
868    };
869    // Ensure that the batch size is at least 2, to have enough space for two rows in a single update.
870    let batch_size = max(2, batch_size);
871    DataChunkBuilder::new(data_types, batch_size)
872}
873
874#[cfg(test)]
875mod tests {
876    use std::sync::Arc;
877
878    use super::*;
879
880    #[test]
881    fn test_normalizing_unmatched_updates() {
882        let ops = vec![
883            Op::UpdateDelete,
884            Op::UpdateInsert,
885            Op::UpdateDelete,
886            Op::UpdateInsert,
887        ];
888        let ops: Arc<[Op]> = ops.into();
889
890        {
891            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
892            let mut unmatched_update_delete = true;
893            let mut visible_update_delete = true;
894            let current_visibility = true;
895            normalize_unmatched_updates(
896                &mut new_ops,
897                &mut unmatched_update_delete,
898                &mut visible_update_delete,
899                current_visibility,
900                1,
901                &Op::UpdateInsert,
902            );
903            assert_eq!(
904                &new_ops[..],
905                vec![
906                    Op::UpdateDelete,
907                    Op::UpdateInsert,
908                    Op::UpdateDelete,
909                    Op::UpdateInsert
910                ]
911            );
912        }
913        {
914            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
915            let mut unmatched_update_delete = true;
916            let mut visible_update_delete = false;
917            let current_visibility = false;
918            normalize_unmatched_updates(
919                &mut new_ops,
920                &mut unmatched_update_delete,
921                &mut visible_update_delete,
922                current_visibility,
923                1,
924                &Op::UpdateInsert,
925            );
926            assert_eq!(
927                &new_ops[..],
928                vec![
929                    Op::UpdateDelete,
930                    Op::UpdateInsert,
931                    Op::UpdateDelete,
932                    Op::UpdateInsert
933                ]
934            );
935        }
936        {
937            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
938            let mut unmatched_update_delete = true;
939            let mut visible_update_delete = true;
940            let current_visibility = false;
941            normalize_unmatched_updates(
942                &mut new_ops,
943                &mut unmatched_update_delete,
944                &mut visible_update_delete,
945                current_visibility,
946                1,
947                &Op::UpdateInsert,
948            );
949            assert_eq!(
950                &new_ops[..],
951                vec![
952                    Op::Delete,
953                    Op::UpdateInsert,
954                    Op::UpdateDelete,
955                    Op::UpdateInsert
956                ]
957            );
958        }
959        {
960            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
961            let mut unmatched_update_delete = true;
962            let mut visible_update_delete = false;
963            let current_visibility = true;
964            normalize_unmatched_updates(
965                &mut new_ops,
966                &mut unmatched_update_delete,
967                &mut visible_update_delete,
968                current_visibility,
969                1,
970                &Op::UpdateInsert,
971            );
972            assert_eq!(
973                &new_ops[..],
974                vec![
975                    Op::UpdateDelete,
976                    Op::Insert,
977                    Op::UpdateDelete,
978                    Op::UpdateInsert
979                ]
980            );
981        }
982    }
983}