risingwave_stream/executor/backfill/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::collections::HashMap;
17use std::ops::Bound;
18
19use await_tree::InstrumentAwait;
20use futures::Stream;
21use futures::future::try_join_all;
22use futures_async_stream::try_stream;
23use risingwave_common::array::stream_record::Record;
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::bail;
26use risingwave_common::bitmap::BitmapBuilder;
27use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
28use risingwave_common::row::{OwnedRow, Row, RowExt};
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_common::util::iter_util::ZipEqDebug;
33use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
34use risingwave_common::util::value_encoding::BasicSerde;
35use risingwave_common_rate_limit::RateLimit;
36use risingwave_connector::error::ConnectorError;
37use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
38use risingwave_storage::StateStore;
39use risingwave_storage::row_serde::value_serde::ValueRowSerde;
40use risingwave_storage::table::collect_data_chunk_with_builder;
41
42use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
43use crate::executor::{
44    Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
45};
46
47/// `vnode`, `is_finished`, `row_count`, all occupy 1 column each.
48pub const METADATA_STATE_LEN: usize = 3;
49
50#[derive(Clone, Debug)]
51pub struct BackfillState {
52    /// Used to track backfill progress.
53    // TODO: Instead of using hashmap, perhaps we can just use static array.
54    inner: HashMap<VirtualNode, BackfillStatePerVnode>,
55}
56
57impl BackfillState {
58    pub(crate) fn has_progress(&self) -> bool {
59        self.inner.values().any(|p| {
60            matches!(
61                p.current_state(),
62                &BackfillProgressPerVnode::InProgress { .. }
63            )
64        })
65    }
66
67    pub(crate) fn get_current_state(
68        &mut self,
69        vnode: &VirtualNode,
70    ) -> &mut BackfillProgressPerVnode {
71        &mut self.inner.get_mut(vnode).unwrap().current_state
72    }
73
74    // Expects the vnode to always have progress, otherwise it will return an error.
75    pub(crate) fn get_progress(
76        &self,
77        vnode: &VirtualNode,
78    ) -> StreamExecutorResult<&BackfillProgressPerVnode> {
79        match self.inner.get(vnode) {
80            Some(p) => Ok(p.current_state()),
81            None => bail!(
82                "Backfill progress for vnode {:#?} not found, backfill_state not initialized properly",
83                vnode,
84            ),
85        }
86    }
87
88    pub(crate) fn update_progress(
89        &mut self,
90        vnode: VirtualNode,
91        new_pos: OwnedRow,
92        snapshot_row_count_delta: u64,
93    ) -> StreamExecutorResult<()> {
94        let state = self.get_current_state(&vnode);
95        match state {
96            BackfillProgressPerVnode::NotStarted => {
97                *state = BackfillProgressPerVnode::InProgress {
98                    current_pos: new_pos,
99                    snapshot_row_count: snapshot_row_count_delta,
100                };
101            }
102            BackfillProgressPerVnode::InProgress {
103                snapshot_row_count, ..
104            } => {
105                *state = BackfillProgressPerVnode::InProgress {
106                    current_pos: new_pos,
107                    snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta,
108                };
109            }
110            BackfillProgressPerVnode::Completed { .. } => unreachable!(),
111        }
112        Ok(())
113    }
114
115    pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) {
116        let finished_placeholder_position = construct_initial_finished_state(pos_len);
117        let current_state = self.get_current_state(&vnode);
118        let (new_pos, snapshot_row_count) = match current_state {
119            BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0),
120            BackfillProgressPerVnode::InProgress {
121                current_pos,
122                snapshot_row_count,
123            } => (current_pos.clone(), *snapshot_row_count),
124            BackfillProgressPerVnode::Completed { .. } => {
125                return;
126            }
127        };
128        *current_state = BackfillProgressPerVnode::Completed {
129            current_pos: new_pos,
130            snapshot_row_count,
131        };
132    }
133
134    /// Return state to be committed.
135    fn get_commit_state(&self, vnode: &VirtualNode) -> Option<(Option<Vec<Datum>>, Vec<Datum>)> {
136        let new_state = self.inner.get(vnode).unwrap().current_state().clone();
137        let new_encoded_state = match new_state {
138            BackfillProgressPerVnode::NotStarted => unreachable!(),
139            BackfillProgressPerVnode::InProgress {
140                current_pos,
141                snapshot_row_count,
142            } => {
143                let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
144                encoded_state[0] = Some(vnode.to_scalar().into());
145                encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
146                encoded_state[current_pos.len() + 1] = Some(false.into());
147                encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
148                encoded_state
149            }
150            BackfillProgressPerVnode::Completed {
151                current_pos,
152                snapshot_row_count,
153            } => {
154                let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
155                encoded_state[0] = Some(vnode.to_scalar().into());
156                encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
157                encoded_state[current_pos.len() + 1] = Some(true.into());
158                encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
159                encoded_state
160            }
161        };
162        let old_state = self.inner.get(vnode).unwrap().committed_state().clone();
163        let old_encoded_state = match old_state {
164            BackfillProgressPerVnode::NotStarted => None,
165            BackfillProgressPerVnode::InProgress {
166                current_pos,
167                snapshot_row_count,
168            } => {
169                let committed_pos = current_pos;
170                let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
171                encoded_state[0] = Some(vnode.to_scalar().into());
172                encoded_state[1..committed_pos.len() + 1]
173                    .clone_from_slice(committed_pos.as_inner());
174                encoded_state[committed_pos.len() + 1] = Some(false.into());
175                encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
176                Some(encoded_state)
177            }
178            BackfillProgressPerVnode::Completed {
179                current_pos,
180                snapshot_row_count,
181            } => {
182                let committed_pos = current_pos;
183                let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
184                encoded_state[0] = Some(vnode.to_scalar().into());
185                encoded_state[1..committed_pos.len() + 1]
186                    .clone_from_slice(committed_pos.as_inner());
187                encoded_state[committed_pos.len() + 1] = Some(true.into());
188                encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
189                Some(encoded_state)
190            }
191        };
192        Some((old_encoded_state, new_encoded_state))
193    }
194
195    // TODO: We can add a committed flag to speed up this check.
196    /// Checks if the state needs to be committed.
197    fn need_commit(&self, vnode: &VirtualNode) -> bool {
198        let state = self.inner.get(vnode).unwrap();
199        match state.current_state() {
200            // If current state and committed state are the same, we don't need to commit.
201            s @ BackfillProgressPerVnode::InProgress { .. }
202            | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(),
203            BackfillProgressPerVnode::NotStarted => false,
204        }
205    }
206
207    fn mark_committed(&mut self, vnode: VirtualNode) {
208        let BackfillStatePerVnode {
209            committed_state,
210            current_state,
211        } = self.inner.get_mut(&vnode).unwrap();
212
213        assert!(matches!(
214            current_state,
215            BackfillProgressPerVnode::InProgress { .. }
216                | BackfillProgressPerVnode::Completed { .. }
217        ));
218        *committed_state = current_state.clone();
219    }
220
221    pub(crate) fn get_snapshot_row_count(&self) -> u64 {
222        self.inner
223            .values()
224            .map(|p| p.get_snapshot_row_count())
225            .sum()
226    }
227}
228
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct BackfillStatePerVnode {
231    committed_state: BackfillProgressPerVnode,
232    current_state: BackfillProgressPerVnode,
233}
234
235impl BackfillStatePerVnode {
236    pub(crate) fn new(
237        committed_state: BackfillProgressPerVnode,
238        current_state: BackfillProgressPerVnode,
239    ) -> Self {
240        Self {
241            committed_state,
242            current_state,
243        }
244    }
245
246    pub(crate) fn committed_state(&self) -> &BackfillProgressPerVnode {
247        &self.committed_state
248    }
249
250    pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
251        &self.current_state
252    }
253
254    pub(crate) fn get_snapshot_row_count(&self) -> u64 {
255        self.current_state().get_snapshot_row_count()
256    }
257}
258
259impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
260    fn from(v: Vec<(VirtualNode, BackfillStatePerVnode)>) -> Self {
261        Self {
262            inner: v.into_iter().collect(),
263        }
264    }
265}
266
267/// Used for tracking backfill state per vnode
268/// The `OwnedRow` only contains the pk of upstream, to track `current_pos`.
269#[derive(Clone, Eq, PartialEq, Debug)]
270pub enum BackfillProgressPerVnode {
271    /// no entry exists for a vnode, or on initialization of the executor.
272    NotStarted,
273    InProgress {
274        /// The current snapshot offset
275        current_pos: OwnedRow,
276        /// Number of snapshot records read for this vnode.
277        snapshot_row_count: u64,
278    },
279    Completed {
280        /// The current snapshot offset
281        current_pos: OwnedRow,
282        /// Number of snapshot records read for this vnode.
283        snapshot_row_count: u64,
284    },
285}
286
287impl BackfillProgressPerVnode {
288    fn get_snapshot_row_count(&self) -> u64 {
289        match self {
290            BackfillProgressPerVnode::NotStarted => 0,
291            BackfillProgressPerVnode::InProgress {
292                snapshot_row_count, ..
293            }
294            | BackfillProgressPerVnode::Completed {
295                snapshot_row_count, ..
296            } => *snapshot_row_count,
297        }
298    }
299}
300
301pub(crate) fn mark_chunk(
302    chunk: StreamChunk,
303    current_pos: &OwnedRow,
304    pk_in_output_indices: PkIndicesRef<'_>,
305    pk_order: &[OrderType],
306) -> StreamChunk {
307    let chunk = chunk.compact();
308    mark_chunk_inner(chunk, current_pos, pk_in_output_indices, pk_order)
309}
310
311pub(crate) fn mark_cdc_chunk(
312    offset_parse_func: &CdcOffsetParseFunc,
313    chunk: StreamChunk,
314    current_pos: &OwnedRow,
315    pk_in_output_indices: PkIndicesRef<'_>,
316    pk_order: &[OrderType],
317    last_cdc_offset: Option<CdcOffset>,
318) -> StreamExecutorResult<StreamChunk> {
319    let chunk = chunk.compact();
320    mark_cdc_chunk_inner(
321        offset_parse_func,
322        chunk,
323        current_pos,
324        last_cdc_offset,
325        pk_in_output_indices,
326        pk_order,
327    )
328}
329
330/// Mark chunk:
331/// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
332/// corresponding `vnode`, otherwise ignore it.
333/// We implement it by changing the visibility bitmap.
334pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
335    chunk: &StreamChunk,
336    backfill_state: &BackfillState,
337    pk_in_output_indices: PkIndicesRef<'_>,
338    upstream_table: &ReplicatedStateTable<S, SD>,
339    pk_order: &[OrderType],
340) -> StreamExecutorResult<StreamChunk> {
341    let chunk = chunk.clone();
342    let (data, ops) = chunk.into_parts();
343    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
344
345    let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
346    let mut unmatched_update_delete = false;
347    let mut visible_update_delete = false;
348    for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
349        let pk = row.project(pk_in_output_indices);
350        let vnode = upstream_table.compute_vnode_by_pk(pk);
351        let visible = match backfill_state.get_progress(&vnode)? {
352            // We want to just forward the row, if the vnode has finished backfill.
353            BackfillProgressPerVnode::Completed { .. } => true,
354            // If not started, no need to forward.
355            BackfillProgressPerVnode::NotStarted => false,
356            // If in progress, we need to check row <= current_pos.
357            BackfillProgressPerVnode::InProgress { current_pos, .. } => {
358                cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
359            }
360        };
361        if !visible {
362            tracing::trace!(
363                source = "upstream",
364                state = "process_barrier",
365                action = "mark_chunk",
366                ?vnode,
367                ?op,
368                ?pk,
369                ?row,
370                "update_filtered",
371            );
372        }
373        new_visibility.append(visible);
374
375        normalize_unmatched_updates(
376            &mut new_ops,
377            &mut unmatched_update_delete,
378            &mut visible_update_delete,
379            visible,
380            i,
381            op,
382        );
383    }
384    let (columns, _) = data.into_parts();
385    let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
386    Ok(chunk)
387}
388
389/// Mark chunk:
390/// For each row of the chunk, forward it to downstream if its pk <= `current_pos`, otherwise
391/// ignore it. We implement it by changing the visibility bitmap.
392fn mark_chunk_inner(
393    chunk: StreamChunk,
394    current_pos: &OwnedRow,
395    pk_in_output_indices: PkIndicesRef<'_>,
396    pk_order: &[OrderType],
397) -> StreamChunk {
398    let (data, ops) = chunk.into_parts();
399    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
400    let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
401    let mut unmatched_update_delete = false;
402    let mut visible_update_delete = false;
403    for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
404        let lhs = row.project(pk_in_output_indices);
405        let rhs = current_pos;
406        let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
407        new_visibility.append(visible);
408
409        normalize_unmatched_updates(
410            &mut new_ops,
411            &mut unmatched_update_delete,
412            &mut visible_update_delete,
413            visible,
414            i,
415            op,
416        );
417    }
418    let (columns, _) = data.into_parts();
419    StreamChunk::with_visibility(new_ops, columns, new_visibility.finish())
420}
421
422/// We will rewrite unmatched U-/U+ into +/- ops.
423/// They can be unmatched because while they will always have the same stream key,
424/// their storage pk might be different. Here we use storage pk (`current_pos`) to filter them,
425/// as such, a U+ might be filtered out, but their corresponding U- could be kept, and vice versa.
426///
427/// This hanging U-/U+ can lead to issues downstream, since we work with an assumption in the
428/// system that there's never hanging U-/U+.
429fn normalize_unmatched_updates(
430    normalized_ops: &mut Cow<'_, [Op]>,
431    unmatched_update_delete: &mut bool,
432    visible_update_delete: &mut bool,
433    current_visibility: bool,
434    current_op_index: usize,
435    current_op: &Op,
436) {
437    if *unmatched_update_delete {
438        assert_eq!(*current_op, Op::UpdateInsert);
439        let visible_update_insert = current_visibility;
440        match (visible_update_delete, visible_update_insert) {
441            (true, false) => {
442                // Lazily clone the ops here.
443                let ops = normalized_ops.to_mut();
444                ops[current_op_index - 1] = Op::Delete;
445            }
446            (false, true) => {
447                // Lazily clone the ops here.
448                let ops = normalized_ops.to_mut();
449                ops[current_op_index] = Op::Insert;
450            }
451            (true, true) | (false, false) => {}
452        }
453        *unmatched_update_delete = false;
454    } else {
455        match current_op {
456            Op::UpdateDelete => {
457                *unmatched_update_delete = true;
458                *visible_update_delete = current_visibility;
459            }
460            Op::UpdateInsert => {
461                unreachable!("UpdateInsert should not be present without UpdateDelete")
462            }
463            _ => {}
464        }
465    }
466}
467
468fn mark_cdc_chunk_inner(
469    offset_parse_func: &CdcOffsetParseFunc,
470    chunk: StreamChunk,
471    current_pos: &OwnedRow,
472    last_cdc_offset: Option<CdcOffset>,
473    pk_in_output_indices: PkIndicesRef<'_>,
474    pk_order: &[OrderType],
475) -> StreamExecutorResult<StreamChunk> {
476    let (data, ops) = chunk.into_parts();
477    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
478
479    // `_rw_offset` must be placed at the last column right now
480    let offset_col_idx = data.dimension() - 1;
481    for v in data.rows().map(|row| {
482        let offset_datum = row.datum_at(offset_col_idx).unwrap();
483        let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?;
484        let visible = {
485            // filter changelog events with binlog range
486            let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset {
487                binlog_low <= &event_offset
488            } else {
489                true
490            };
491
492            if in_binlog_range {
493                let lhs = row.project(pk_in_output_indices);
494                let rhs = current_pos;
495                cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
496            } else {
497                false
498            }
499        };
500        Ok::<_, ConnectorError>(visible)
501    }) {
502        new_visibility.append(v?);
503    }
504
505    let (columns, _) = data.into_parts();
506    Ok(StreamChunk::with_visibility(
507        ops,
508        columns,
509        new_visibility.finish(),
510    ))
511}
512
513/// Builds a new stream chunk with `output_indices`.
514pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk {
515    let (ops, columns, visibility) = chunk.into_inner();
516    let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect();
517    StreamChunk::with_visibility(ops, mapped_columns, visibility)
518}
519
520fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
521    watermark.transform_with_indices(upstream_indices)
522}
523
524pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
525    match msg {
526        Message::Barrier(_) => Some(msg),
527        Message::Watermark(watermark) => {
528            mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
529        }
530        Message::Chunk(chunk) => Some(Message::Chunk(mapping_chunk(chunk, upstream_indices))),
531    }
532}
533
534/// Recovers progress per vnode, so we know which to backfill.
535/// See how it decodes the state with the inline comments.
536pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
537    state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
538) -> StreamExecutorResult<Vec<(VirtualNode, BackfillStatePerVnode)>> {
539    debug_assert!(!state_table.vnodes().is_empty());
540    let vnodes = state_table.vnodes().iter_vnodes();
541    let mut result = Vec::with_capacity(state_table.vnodes().len());
542    // 1. Get the vnode keys, so we can get the state per vnode.
543    let vnode_keys = vnodes.map(|vnode| {
544        let datum: [Datum; 1] = [Some(vnode.to_scalar().into())];
545        datum
546    });
547    let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key));
548    // 2. Fetch the state for each vnode.
549    //    It should have the following schema, it should not contain vnode:
550    //    | pk | `backfill_finished` | `row_count` |
551    let state_for_vnodes = try_join_all(tasks).await?;
552    for (vnode, state_for_vnode) in state_table
553        .vnodes()
554        .iter_vnodes()
555        .zip_eq_debug(state_for_vnodes)
556    {
557        let backfill_progress = match state_for_vnode {
558            // There's some state, means there was progress made. It's either finished / in progress.
559            Some(row) => {
560                // 3. Decode the `snapshot_row_count`. Decode from the back, since
561                //    pk is variable length.
562                let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap();
563                let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64;
564
565                // 4. Decode the `is_finished` flag (whether backfill has finished).
566                //    Decode from the back, since pk is variable length.
567                let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap();
568                let vnode_is_finished = vnode_is_finished.as_ref().unwrap();
569
570                // 5. Decode the `current_pos`.
571                let current_pos = row.as_inner().get(..row.len() - 2).unwrap();
572                let current_pos = current_pos.into_owned_row();
573
574                // 6. Construct the in-memory state per vnode, based on the decoded state.
575                if *vnode_is_finished.as_bool() {
576                    BackfillStatePerVnode::new(
577                        BackfillProgressPerVnode::Completed {
578                            current_pos: current_pos.clone(),
579                            snapshot_row_count,
580                        },
581                        BackfillProgressPerVnode::Completed {
582                            current_pos,
583                            snapshot_row_count,
584                        },
585                    )
586                } else {
587                    BackfillStatePerVnode::new(
588                        BackfillProgressPerVnode::InProgress {
589                            current_pos: current_pos.clone(),
590                            snapshot_row_count,
591                        },
592                        BackfillProgressPerVnode::InProgress {
593                            current_pos,
594                            snapshot_row_count,
595                        },
596                    )
597                }
598            }
599            // No state, means no progress made.
600            None => BackfillStatePerVnode::new(
601                BackfillProgressPerVnode::NotStarted,
602                BackfillProgressPerVnode::NotStarted,
603            ),
604        };
605        result.push((vnode, backfill_progress));
606    }
607    assert_eq!(result.len(), state_table.vnodes().count_ones());
608    Ok(result)
609}
610
611/// Flush the data
612pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
613    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
614    epoch: EpochPair,
615    old_state: &mut Option<Vec<Datum>>,
616    current_partial_state: &mut [Datum],
617) -> StreamExecutorResult<()> {
618    let vnodes = table.vnodes().clone();
619    if let Some(old_state) = old_state {
620        if old_state[1..] != current_partial_state[1..] {
621            vnodes.iter_vnodes_scalar().for_each(|vnode| {
622                let datum = Some(vnode.into());
623                current_partial_state[0].clone_from(&datum);
624                old_state[0] = datum;
625                table.write_record(Record::Update {
626                    old_row: &old_state[..],
627                    new_row: &(*current_partial_state),
628                })
629            });
630        }
631    } else {
632        // No existing state, create a new entry.
633        vnodes.iter_vnodes_scalar().for_each(|vnode| {
634            let datum = Some(vnode.into());
635            // fill the state
636            current_partial_state[0] = datum;
637            table.write_record(Record::Insert {
638                new_row: &(*current_partial_state),
639            })
640        });
641    }
642    table.commit_assert_no_update_vnode_bitmap(epoch).await
643}
644
645/// We want to avoid allocating a row for every vnode.
646/// Instead we can just modify a single row, and dispatch it to state table to write.
647/// This builds the following segments of the row:
648/// 1. `current_pos`
649/// 2. `backfill_finished`
650/// 3. `row_count`
651pub(crate) fn build_temporary_state(
652    row_state: &mut [Datum],
653    is_finished: bool,
654    current_pos: &OwnedRow,
655    row_count: u64,
656) {
657    row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
658    row_state[current_pos.len() + 1] = Some(is_finished.into());
659    row_state[current_pos.len() + 2] = Some((row_count as i64).into());
660}
661
662/// Update backfill pos by vnode.
663pub(crate) fn update_pos_by_vnode(
664    vnode: VirtualNode,
665    chunk: &StreamChunk,
666    pk_in_output_indices: &[usize],
667    backfill_state: &mut BackfillState,
668    snapshot_row_count_delta: u64,
669) -> StreamExecutorResult<()> {
670    let new_pos = get_new_pos(chunk, pk_in_output_indices);
671    assert_eq!(new_pos.len(), pk_in_output_indices.len());
672    backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?;
673    Ok(())
674}
675
676/// Get new backfill pos from the chunk. Since chunk should have ordered rows, we can just take the
677/// last row.
678pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> OwnedRow {
679    chunk
680        .rows()
681        .last()
682        .unwrap()
683        .1
684        .project(pk_in_output_indices)
685        .into_owned_row()
686}
687
688pub(crate) fn get_cdc_chunk_last_offset(
689    offset_parse_func: &CdcOffsetParseFunc,
690    chunk: &StreamChunk,
691) -> StreamExecutorResult<Option<CdcOffset>> {
692    let row = chunk.rows().last().unwrap().1;
693    let offset_col = row.iter().last().unwrap();
694    let output =
695        offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?);
696    output.transpose().map_err(|e| e.into())
697}
698
699// NOTE(kwannoel): ["None" ..] encoding should be appropriate to mark
700// the case where upstream snapshot is empty.
701// This is so we can persist backfill state as "finished".
702// It won't be confused with another case where pk position comprised of nulls,
703// because they both record that backfill is finished.
704pub(crate) fn construct_initial_finished_state(pos_len: usize) -> OwnedRow {
705    OwnedRow::new(vec![None; pos_len])
706}
707
708pub(crate) fn compute_bounds(
709    pk_indices: &[usize],
710    current_pos: Option<OwnedRow>,
711) -> Option<(Bound<OwnedRow>, Bound<OwnedRow>)> {
712    // `current_pos` is None means it needs to scan from the beginning, so we use Unbounded to
713    // scan. Otherwise, use Excluded.
714    if let Some(current_pos) = current_pos {
715        // If `current_pos` is an empty row which means upstream mv contains only one row and it
716        // has been consumed. The iter interface doesn't support
717        // `Excluded(empty_row)` range bound, so we can simply return `None`.
718        if current_pos.is_empty() {
719            assert!(pk_indices.is_empty());
720            return None;
721        }
722
723        Some((Bound::Excluded(current_pos), Bound::Unbounded))
724    } else {
725        Some((Bound::Unbounded, Bound::Unbounded))
726    }
727}
728
729#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
730pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder)
731where
732    StreamExecutorError: From<E>,
733    R: Row,
734    S: Stream<Item = Result<R, E>> + Unpin + 'a,
735{
736    while let Some(data_chunk) = collect_data_chunk_with_builder(&mut iter, builder)
737        .instrument_await("backfill_snapshot_read")
738        .await?
739    {
740        debug_assert!(data_chunk.cardinality() > 0);
741        let ops = vec![Op::Insert; data_chunk.capacity()];
742        let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
743        yield stream_chunk;
744    }
745}
746
747/// Schema
748/// | vnode | pk | `backfill_finished` | `row_count` |
749/// Persists the state per vnode based on `BackfillState`.
750/// We track the current committed state via `committed_progress`
751/// so we know whether we need to persist the state or not.
752///
753/// The state is encoded as follows:
754/// `NotStarted`:
755/// - Not persist to store at all.
756///
757/// `InProgress`:
758/// - Format: | vnode | pk | false | `row_count` |
759/// - If change in current pos: Persist.
760/// - No change in current pos: Do not persist.
761///
762/// Completed
763/// - Format: | vnode | pk | true | `row_count` |
764/// - If previous state is `InProgress` / `NotStarted`: Persist.
765/// - If previous state is Completed: Do not persist.
766///
767/// TODO(kwannoel): we should check committed state to be all `finished` in the tests.
768/// TODO(kwannoel): Instead of persisting state per vnode each time,
769/// we can optimize by persisting state for a subset of vnodes which were updated.
770pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
771    epoch: EpochPair,
772    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
773    backfill_state: &mut BackfillState,
774    #[cfg(debug_assertions)] state_len: usize,
775    vnodes: impl Iterator<Item = VirtualNode>,
776) -> StreamExecutorResult<()> {
777    for vnode in vnodes {
778        if !backfill_state.need_commit(&vnode) {
779            continue;
780        }
781        let (encoded_prev_state, encoded_current_state) =
782            match backfill_state.get_commit_state(&vnode) {
783                Some((old_state, new_state)) => (old_state, new_state),
784                None => continue,
785            };
786        if let Some(encoded_prev_state) = encoded_prev_state {
787            // There's some progress, update the state.
788            #[cfg(debug_assertions)]
789            {
790                let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
791                // old_row only contains the value segment.
792                let old_row = table.get_row(pk).await?;
793                match old_row {
794                    Some(old_row) => {
795                        let inner = old_row.as_inner();
796                        // value segment (without vnode) should be used for comparison
797                        assert_eq!(inner, &encoded_prev_state[1..]);
798                        assert_ne!(inner, &encoded_current_state[1..]);
799                        assert_eq!(old_row.len(), state_len - 1);
800                        assert_eq!(encoded_current_state.len(), state_len);
801                    }
802                    None => {
803                        bail!("row {:#?} not found", pk);
804                    }
805                }
806            }
807            table.write_record(Record::Update {
808                old_row: &encoded_prev_state[..],
809                new_row: &encoded_current_state[..],
810            });
811        } else {
812            // No existing state, create a new entry.
813            #[cfg(debug_assertions)]
814            {
815                let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
816                let row = table.get_row(pk).await?;
817                assert!(row.is_none(), "row {:#?}", row);
818                assert_eq!(encoded_current_state.len(), state_len);
819            }
820            table.write_record(Record::Insert {
821                new_row: &encoded_current_state[..],
822            });
823        }
824        backfill_state.mark_committed(vnode);
825    }
826
827    table.commit_assert_no_update_vnode_bitmap(epoch).await?;
828    Ok(())
829}
830
831/// Schema
832/// | vnode | pk | `backfill_finished` | `row_count` |
833///
834/// For `current_pos` and `old_pos` are just pk of upstream.
835/// They should be strictly increasing.
836pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
837    epoch: EpochPair,
838    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
839    is_finished: bool,
840    current_pos: &Option<OwnedRow>,
841    row_count: u64,
842    old_state: &mut Option<Vec<Datum>>,
843    current_state: &mut [Datum],
844) -> StreamExecutorResult<()> {
845    if let Some(current_pos_inner) = current_pos {
846        // state w/o vnodes.
847        build_temporary_state(current_state, is_finished, current_pos_inner, row_count);
848        flush_data(table, epoch, old_state, current_state).await?;
849        *old_state = Some(current_state.into());
850    } else {
851        table.commit_assert_no_update_vnode_bitmap(epoch).await?;
852    }
853    Ok(())
854}
855
856/// Creates a data chunk builder for snapshot read.
857/// If the `rate_limit` is smaller than `chunk_size`, it will take precedence.
858/// This is so we can partition snapshot read into smaller chunks than chunk size.
859pub fn create_builder(
860    rate_limit: RateLimit,
861    chunk_size: usize,
862    data_types: Vec<DataType>,
863) -> DataChunkBuilder {
864    let batch_size = match rate_limit {
865        RateLimit::Disabled | RateLimit::Pause => chunk_size,
866        RateLimit::Fixed(limit) if limit.get() as usize >= chunk_size => chunk_size,
867        RateLimit::Fixed(limit) => limit.get() as usize,
868    };
869    DataChunkBuilder::new(data_types, batch_size)
870}
871
872#[cfg(test)]
873mod tests {
874    use std::sync::Arc;
875
876    use super::*;
877
878    #[test]
879    fn test_normalizing_unmatched_updates() {
880        let ops = vec![
881            Op::UpdateDelete,
882            Op::UpdateInsert,
883            Op::UpdateDelete,
884            Op::UpdateInsert,
885        ];
886        let ops: Arc<[Op]> = ops.into();
887
888        {
889            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
890            let mut unmatched_update_delete = true;
891            let mut visible_update_delete = true;
892            let current_visibility = true;
893            normalize_unmatched_updates(
894                &mut new_ops,
895                &mut unmatched_update_delete,
896                &mut visible_update_delete,
897                current_visibility,
898                1,
899                &Op::UpdateInsert,
900            );
901            assert_eq!(
902                &new_ops[..],
903                vec![
904                    Op::UpdateDelete,
905                    Op::UpdateInsert,
906                    Op::UpdateDelete,
907                    Op::UpdateInsert
908                ]
909            );
910        }
911        {
912            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
913            let mut unmatched_update_delete = true;
914            let mut visible_update_delete = false;
915            let current_visibility = false;
916            normalize_unmatched_updates(
917                &mut new_ops,
918                &mut unmatched_update_delete,
919                &mut visible_update_delete,
920                current_visibility,
921                1,
922                &Op::UpdateInsert,
923            );
924            assert_eq!(
925                &new_ops[..],
926                vec![
927                    Op::UpdateDelete,
928                    Op::UpdateInsert,
929                    Op::UpdateDelete,
930                    Op::UpdateInsert
931                ]
932            );
933        }
934        {
935            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
936            let mut unmatched_update_delete = true;
937            let mut visible_update_delete = true;
938            let current_visibility = false;
939            normalize_unmatched_updates(
940                &mut new_ops,
941                &mut unmatched_update_delete,
942                &mut visible_update_delete,
943                current_visibility,
944                1,
945                &Op::UpdateInsert,
946            );
947            assert_eq!(
948                &new_ops[..],
949                vec![
950                    Op::Delete,
951                    Op::UpdateInsert,
952                    Op::UpdateDelete,
953                    Op::UpdateInsert
954                ]
955            );
956        }
957        {
958            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
959            let mut unmatched_update_delete = true;
960            let mut visible_update_delete = false;
961            let current_visibility = true;
962            normalize_unmatched_updates(
963                &mut new_ops,
964                &mut unmatched_update_delete,
965                &mut visible_update_delete,
966                current_visibility,
967                1,
968                &Op::UpdateInsert,
969            );
970            assert_eq!(
971                &new_ops[..],
972                vec![
973                    Op::UpdateDelete,
974                    Op::Insert,
975                    Op::UpdateDelete,
976                    Op::UpdateInsert
977                ]
978            );
979        }
980    }
981}