risingwave_stream/executor/backfill/
utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::cmp::{max, min};
17use std::collections::HashMap;
18use std::ops::Bound;
19
20use await_tree::InstrumentAwait;
21use futures::Stream;
22use futures::future::try_join_all;
23use futures_async_stream::try_stream;
24use risingwave_common::array::stream_record::Record;
25use risingwave_common::array::{DataChunk, Op, StreamChunk};
26use risingwave_common::bail;
27use risingwave_common::bitmap::BitmapBuilder;
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, Row, RowExt};
30use risingwave_common::types::{DataType, Datum};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::epoch::EpochPair;
33use risingwave_common::util::iter_util::ZipEqDebug;
34use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
35use risingwave_common::util::value_encoding::BasicSerde;
36use risingwave_common_rate_limit::RateLimit;
37use risingwave_connector::error::ConnectorError;
38use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
39use risingwave_storage::StateStore;
40use risingwave_storage::row_serde::value_serde::ValueRowSerde;
41use risingwave_storage::table::collect_data_chunk_with_builder;
42
43use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
44use crate::executor::{Message, StreamExecutorError, StreamExecutorResult, Watermark};
45
46/// `vnode`, `is_finished`, `row_count`, all occupy 1 column each.
47pub const METADATA_STATE_LEN: usize = 3;
48
49#[derive(Clone, Debug)]
50pub struct BackfillState {
51    /// Used to track backfill progress.
52    // TODO: Instead of using hashmap, perhaps we can just use static array.
53    inner: HashMap<VirtualNode, BackfillStatePerVnode>,
54}
55
56impl BackfillState {
57    pub(crate) fn has_progress(&self) -> bool {
58        self.inner.values().any(|p| {
59            matches!(
60                p.current_state(),
61                &BackfillProgressPerVnode::InProgress { .. }
62            )
63        })
64    }
65
66    pub(crate) fn get_current_state(
67        &mut self,
68        vnode: &VirtualNode,
69    ) -> &mut BackfillProgressPerVnode {
70        &mut self.inner.get_mut(vnode).unwrap().current_state
71    }
72
73    // Expects the vnode to always have progress, otherwise it will return an error.
74    pub(crate) fn get_progress(
75        &self,
76        vnode: &VirtualNode,
77    ) -> StreamExecutorResult<&BackfillProgressPerVnode> {
78        match self.inner.get(vnode) {
79            Some(p) => Ok(p.current_state()),
80            None => bail!(
81                "Backfill progress for vnode {:#?} not found, backfill_state not initialized properly",
82                vnode,
83            ),
84        }
85    }
86
87    pub(crate) fn update_progress(
88        &mut self,
89        vnode: VirtualNode,
90        new_pos: OwnedRow,
91        snapshot_row_count_delta: u64,
92    ) -> StreamExecutorResult<()> {
93        let state = self.get_current_state(&vnode);
94        match state {
95            BackfillProgressPerVnode::NotStarted => {
96                *state = BackfillProgressPerVnode::InProgress {
97                    current_pos: new_pos,
98                    snapshot_row_count: snapshot_row_count_delta,
99                };
100            }
101            BackfillProgressPerVnode::InProgress {
102                snapshot_row_count, ..
103            } => {
104                *state = BackfillProgressPerVnode::InProgress {
105                    current_pos: new_pos,
106                    snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta,
107                };
108            }
109            BackfillProgressPerVnode::Completed { .. } => unreachable!(),
110        }
111        Ok(())
112    }
113
114    pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) {
115        let finished_placeholder_position = construct_initial_finished_state(pos_len);
116        let current_state = self.get_current_state(&vnode);
117        let (new_pos, snapshot_row_count) = match current_state {
118            BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0),
119            BackfillProgressPerVnode::InProgress {
120                current_pos,
121                snapshot_row_count,
122            } => (current_pos.clone(), *snapshot_row_count),
123            BackfillProgressPerVnode::Completed { .. } => {
124                return;
125            }
126        };
127        *current_state = BackfillProgressPerVnode::Completed {
128            current_pos: new_pos,
129            snapshot_row_count,
130        };
131    }
132
133    /// Return state to be committed.
134    fn get_commit_state(&self, vnode: &VirtualNode) -> Option<(Option<Vec<Datum>>, Vec<Datum>)> {
135        let new_state = self.inner.get(vnode).unwrap().current_state().clone();
136        let new_encoded_state = match new_state {
137            BackfillProgressPerVnode::NotStarted => unreachable!(),
138            BackfillProgressPerVnode::InProgress {
139                current_pos,
140                snapshot_row_count,
141            } => {
142                let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
143                encoded_state[0] = Some(vnode.to_scalar().into());
144                encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
145                encoded_state[current_pos.len() + 1] = Some(false.into());
146                encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
147                encoded_state
148            }
149            BackfillProgressPerVnode::Completed {
150                current_pos,
151                snapshot_row_count,
152            } => {
153                let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
154                encoded_state[0] = Some(vnode.to_scalar().into());
155                encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
156                encoded_state[current_pos.len() + 1] = Some(true.into());
157                encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
158                encoded_state
159            }
160        };
161        let old_state = self.inner.get(vnode).unwrap().committed_state().clone();
162        let old_encoded_state = match old_state {
163            BackfillProgressPerVnode::NotStarted => None,
164            BackfillProgressPerVnode::InProgress {
165                current_pos,
166                snapshot_row_count,
167            } => {
168                let committed_pos = current_pos;
169                let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
170                encoded_state[0] = Some(vnode.to_scalar().into());
171                encoded_state[1..committed_pos.len() + 1]
172                    .clone_from_slice(committed_pos.as_inner());
173                encoded_state[committed_pos.len() + 1] = Some(false.into());
174                encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
175                Some(encoded_state)
176            }
177            BackfillProgressPerVnode::Completed {
178                current_pos,
179                snapshot_row_count,
180            } => {
181                let committed_pos = current_pos;
182                let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
183                encoded_state[0] = Some(vnode.to_scalar().into());
184                encoded_state[1..committed_pos.len() + 1]
185                    .clone_from_slice(committed_pos.as_inner());
186                encoded_state[committed_pos.len() + 1] = Some(true.into());
187                encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
188                Some(encoded_state)
189            }
190        };
191        Some((old_encoded_state, new_encoded_state))
192    }
193
194    // TODO: We can add a committed flag to speed up this check.
195    /// Checks if the state needs to be committed.
196    fn need_commit(&self, vnode: &VirtualNode) -> bool {
197        let state = self.inner.get(vnode).unwrap();
198        match state.current_state() {
199            // If current state and committed state are the same, we don't need to commit.
200            s @ BackfillProgressPerVnode::InProgress { .. }
201            | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(),
202            BackfillProgressPerVnode::NotStarted => false,
203        }
204    }
205
206    fn mark_committed(&mut self, vnode: VirtualNode) {
207        let BackfillStatePerVnode {
208            committed_state,
209            current_state,
210        } = self.inner.get_mut(&vnode).unwrap();
211
212        assert!(matches!(
213            current_state,
214            BackfillProgressPerVnode::InProgress { .. }
215                | BackfillProgressPerVnode::Completed { .. }
216        ));
217        *committed_state = current_state.clone();
218    }
219
220    pub(crate) fn get_snapshot_row_count(&self) -> u64 {
221        self.inner
222            .values()
223            .map(|p| p.get_snapshot_row_count())
224            .sum()
225    }
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
229pub struct BackfillStatePerVnode {
230    committed_state: BackfillProgressPerVnode,
231    current_state: BackfillProgressPerVnode,
232}
233
234impl BackfillStatePerVnode {
235    pub(crate) fn new(
236        committed_state: BackfillProgressPerVnode,
237        current_state: BackfillProgressPerVnode,
238    ) -> Self {
239        Self {
240            committed_state,
241            current_state,
242        }
243    }
244
245    pub(crate) fn committed_state(&self) -> &BackfillProgressPerVnode {
246        &self.committed_state
247    }
248
249    pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
250        &self.current_state
251    }
252
253    pub(crate) fn get_snapshot_row_count(&self) -> u64 {
254        self.current_state().get_snapshot_row_count()
255    }
256}
257
258impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
259    fn from(v: Vec<(VirtualNode, BackfillStatePerVnode)>) -> Self {
260        Self {
261            inner: v.into_iter().collect(),
262        }
263    }
264}
265
266/// Used for tracking backfill state per vnode
267/// The `OwnedRow` only contains the pk of upstream, to track `current_pos`.
268#[derive(Clone, Eq, PartialEq, Debug)]
269pub enum BackfillProgressPerVnode {
270    /// no entry exists for a vnode, or on initialization of the executor.
271    NotStarted,
272    InProgress {
273        /// The current snapshot offset
274        current_pos: OwnedRow,
275        /// Number of snapshot records read for this vnode.
276        snapshot_row_count: u64,
277    },
278    Completed {
279        /// The current snapshot offset
280        current_pos: OwnedRow,
281        /// Number of snapshot records read for this vnode.
282        snapshot_row_count: u64,
283    },
284}
285
286impl BackfillProgressPerVnode {
287    fn get_snapshot_row_count(&self) -> u64 {
288        match self {
289            BackfillProgressPerVnode::NotStarted => 0,
290            BackfillProgressPerVnode::InProgress {
291                snapshot_row_count, ..
292            }
293            | BackfillProgressPerVnode::Completed {
294                snapshot_row_count, ..
295            } => *snapshot_row_count,
296        }
297    }
298}
299
300pub(crate) fn mark_chunk(
301    chunk: StreamChunk,
302    current_pos: &OwnedRow,
303    pk_in_output_indices: &[usize],
304    pk_order: &[OrderType],
305) -> StreamChunk {
306    let chunk = chunk.compact_vis();
307    mark_chunk_inner(chunk, current_pos, pk_in_output_indices, pk_order)
308}
309
310pub(crate) fn mark_cdc_chunk(
311    offset_parse_func: &CdcOffsetParseFunc,
312    chunk: StreamChunk,
313    current_pos: &OwnedRow,
314    pk_in_output_indices: &[usize],
315    pk_order: &[OrderType],
316    last_cdc_offset: Option<CdcOffset>,
317) -> StreamExecutorResult<StreamChunk> {
318    let chunk = chunk.compact_vis();
319    mark_cdc_chunk_inner(
320        offset_parse_func,
321        chunk,
322        current_pos,
323        last_cdc_offset,
324        pk_in_output_indices,
325        pk_order,
326    )
327}
328
329/// Mark chunk:
330/// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
331/// corresponding `vnode`, otherwise ignore it.
332/// We implement it by changing the visibility bitmap.
333pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
334    chunk: &StreamChunk,
335    backfill_state: &BackfillState,
336    pk_in_output_indices: &[usize],
337    upstream_table: &ReplicatedStateTable<S, SD>,
338    pk_order: &[OrderType],
339) -> StreamExecutorResult<StreamChunk> {
340    let chunk = chunk.clone();
341    let (data, ops) = chunk.into_parts();
342    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
343
344    let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
345    let mut unmatched_update_delete = false;
346    let mut visible_update_delete = false;
347    for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
348        let pk = row.project(pk_in_output_indices);
349        let vnode = upstream_table.compute_vnode_by_pk(pk);
350        let visible = match backfill_state.get_progress(&vnode)? {
351            // We want to just forward the row, if the vnode has finished backfill.
352            BackfillProgressPerVnode::Completed { .. } => true,
353            // If not started, no need to forward.
354            BackfillProgressPerVnode::NotStarted => false,
355            // If in progress, we need to check row <= current_pos.
356            BackfillProgressPerVnode::InProgress { current_pos, .. } => {
357                cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
358            }
359        };
360        if !visible {
361            tracing::trace!(
362                source = "upstream",
363                state = "process_barrier",
364                action = "mark_chunk",
365                ?vnode,
366                ?op,
367                ?pk,
368                ?row,
369                "update_filtered",
370            );
371        }
372        new_visibility.append(visible);
373
374        normalize_unmatched_updates(
375            &mut new_ops,
376            &mut unmatched_update_delete,
377            &mut visible_update_delete,
378            visible,
379            i,
380            op,
381        );
382    }
383    let (columns, _) = data.into_parts();
384    let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
385    Ok(chunk)
386}
387
388/// Mark chunk:
389/// For each row of the chunk, forward it to downstream if its pk <= `current_pos`, otherwise
390/// ignore it. We implement it by changing the visibility bitmap.
391fn mark_chunk_inner(
392    chunk: StreamChunk,
393    current_pos: &OwnedRow,
394    pk_in_output_indices: &[usize],
395    pk_order: &[OrderType],
396) -> StreamChunk {
397    let (data, ops) = chunk.into_parts();
398    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
399    let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
400    let mut unmatched_update_delete = false;
401    let mut visible_update_delete = false;
402    for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
403        let lhs = row.project(pk_in_output_indices);
404        let rhs = current_pos;
405        let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
406        new_visibility.append(visible);
407
408        normalize_unmatched_updates(
409            &mut new_ops,
410            &mut unmatched_update_delete,
411            &mut visible_update_delete,
412            visible,
413            i,
414            op,
415        );
416    }
417    let (columns, _) = data.into_parts();
418    StreamChunk::with_visibility(new_ops, columns, new_visibility.finish())
419}
420
421/// We will rewrite unmatched U-/U+ into +/- ops.
422/// They can be unmatched because while they will always have the same stream key,
423/// their storage pk might be different. Here we use storage pk (`current_pos`) to filter them,
424/// as such, a U+ might be filtered out, but their corresponding U- could be kept, and vice versa.
425///
426/// This hanging U-/U+ can lead to issues downstream, since we work with an assumption in the
427/// system that there's never hanging U-/U+.
428fn normalize_unmatched_updates(
429    normalized_ops: &mut Cow<'_, [Op]>,
430    unmatched_update_delete: &mut bool,
431    visible_update_delete: &mut bool,
432    current_visibility: bool,
433    current_op_index: usize,
434    current_op: &Op,
435) {
436    if *unmatched_update_delete {
437        assert_eq!(*current_op, Op::UpdateInsert);
438        let visible_update_insert = current_visibility;
439        match (visible_update_delete, visible_update_insert) {
440            (true, false) => {
441                // Lazily clone the ops here.
442                let ops = normalized_ops.to_mut();
443                ops[current_op_index - 1] = Op::Delete;
444            }
445            (false, true) => {
446                // Lazily clone the ops here.
447                let ops = normalized_ops.to_mut();
448                ops[current_op_index] = Op::Insert;
449            }
450            (true, true) | (false, false) => {}
451        }
452        *unmatched_update_delete = false;
453    } else {
454        match current_op {
455            Op::UpdateDelete => {
456                *unmatched_update_delete = true;
457                *visible_update_delete = current_visibility;
458            }
459            Op::UpdateInsert => {
460                unreachable!("UpdateInsert should not be present without UpdateDelete")
461            }
462            _ => {}
463        }
464    }
465}
466
467fn mark_cdc_chunk_inner(
468    offset_parse_func: &CdcOffsetParseFunc,
469    chunk: StreamChunk,
470    current_pos: &OwnedRow,
471    last_cdc_offset: Option<CdcOffset>,
472    pk_in_output_indices: &[usize],
473    pk_order: &[OrderType],
474) -> StreamExecutorResult<StreamChunk> {
475    let (data, ops) = chunk.into_parts();
476    let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
477
478    // `_rw_offset` must be placed at the last column right now
479    let offset_col_idx = data.dimension() - 1;
480    for v in data.rows().map(|row| {
481        let offset_datum = row.datum_at(offset_col_idx).unwrap();
482        let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?;
483        let visible = {
484            // filter changelog events with binlog range
485            let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset {
486                binlog_low <= &event_offset
487            } else {
488                true
489            };
490
491            if in_binlog_range {
492                let lhs = row.project(pk_in_output_indices);
493                let rhs = current_pos;
494                cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
495            } else {
496                false
497            }
498        };
499        Ok::<_, ConnectorError>(visible)
500    }) {
501        new_visibility.append(v?);
502    }
503
504    let (columns, _) = data.into_parts();
505    Ok(StreamChunk::with_visibility(
506        ops,
507        columns,
508        new_visibility.finish(),
509    ))
510}
511
512/// Builds a new stream chunk with `output_indices`.
513pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk {
514    let (ops, columns, visibility) = chunk.into_inner();
515    let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect();
516    StreamChunk::with_visibility(ops, mapped_columns, visibility)
517}
518
519fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
520    watermark.transform_with_indices(upstream_indices)
521}
522
523pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
524    match msg {
525        Message::Barrier(_) => Some(msg),
526        Message::Watermark(watermark) => {
527            mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
528        }
529        Message::Chunk(chunk) => Some(Message::Chunk(mapping_chunk(chunk, upstream_indices))),
530    }
531}
532
533fn same_key_columns(lhs: &[usize], rhs: &[usize]) -> bool {
534    lhs.len() == rhs.len() && lhs.iter().all(|idx| rhs.contains(idx))
535}
536
537/// Rewrites upstream updates when the input stream key is not the same column
538/// set as the current executor stream key.
539///
540/// If an upstream update keeps the input stream key unchanged but changes the
541/// current executor stream key, downstream state keyed by the current stream key
542/// must see it as a delete followed by an insert.
543pub(super) struct UpstreamStreamKeyUpdateNormalizer {
544    current_stream_key_indices: Option<Vec<usize>>,
545}
546
547impl UpstreamStreamKeyUpdateNormalizer {
548    /// Creates a normalizer for chunks with the given schema.
549    ///
550    /// `input_stream_key_indices` are the stream-key column indices of the input
551    /// executor in the incoming chunk schema.
552    ///
553    /// `current_stream_key_indices` are the stream-key column indices of the
554    /// current executor in the same incoming chunk schema.
555    ///
556    /// For example, if an upstream MV has input stream key `[k]` but the current
557    /// executor stream key is `[k, ts]`, then an update from
558    /// `(k = 1, ts = 10)` to `(k = 1, ts = 20)` is rewritten as
559    /// `Delete(k = 1, ts = 10)` plus `Insert(k = 1, ts = 20)`. If the two stream
560    /// keys are the same column set, or if an update does not change current
561    /// stream-key values, it is left unchanged.
562    pub(super) fn new(
563        input_stream_key_indices: &[usize],
564        current_stream_key_indices: Vec<usize>,
565    ) -> Self {
566        let current_stream_key_indices =
567            (!same_key_columns(input_stream_key_indices, &current_stream_key_indices))
568                .then_some(current_stream_key_indices);
569        Self {
570            current_stream_key_indices,
571        }
572    }
573
574    pub(super) fn normalize_chunk(&self, chunk: StreamChunk) -> Option<StreamChunk> {
575        if let Some(current_stream_key_indices) = &self.current_stream_key_indices {
576            normalize_update_chunk_by_key(chunk, current_stream_key_indices)
577        } else {
578            Some(chunk)
579        }
580    }
581
582    pub(super) fn normalize_message(&self, msg: Message) -> Option<Message> {
583        match msg {
584            Message::Chunk(chunk) => self.normalize_chunk(chunk).map(Message::Chunk),
585            msg => Some(msg),
586        }
587    }
588}
589
590fn normalize_update_chunk_by_key(chunk: StreamChunk, key_indices: &[usize]) -> Option<StreamChunk> {
591    let (data_chunk, ops) = chunk.into_parts();
592    let mut update_indices = vec![];
593    let mut row_idx = data_chunk.next_visible_row_idx(0);
594    while let Some(idx) = row_idx {
595        let row = data_chunk.row_at_unchecked_vis(idx);
596        match ops[idx] {
597            Op::UpdateDelete => {
598                let next_idx = data_chunk
599                    .next_visible_row_idx(idx + 1)
600                    .unwrap_or_else(|| panic!("expect a U+ after U-\nU- row: {}", row.display()));
601                let next_row = data_chunk.row_at_unchecked_vis(next_idx);
602                debug_assert_eq!(
603                    ops[next_idx],
604                    Op::UpdateInsert,
605                    "expect a U+ after U-\nU- row: {}\nrow after U-: {}",
606                    row.display(),
607                    next_row.display()
608                );
609                if row.project(key_indices) != next_row.project(key_indices) {
610                    update_indices.push((idx, next_idx));
611                }
612                row_idx = data_chunk.next_visible_row_idx(next_idx + 1);
613            }
614            Op::UpdateInsert => panic!("expect a U- before U+\nU+ row: {}", row.display()),
615            Op::Insert | Op::Delete => {
616                row_idx = data_chunk.next_visible_row_idx(idx + 1);
617            }
618        }
619    }
620
621    if update_indices.is_empty() {
622        return Some(StreamChunk::from_parts(ops, data_chunk));
623    }
624
625    let (columns, visibility) = data_chunk.into_parts();
626    let mut ops = ops.to_vec();
627    for (delete_idx, insert_idx) in update_indices {
628        ops[delete_idx] = Op::Delete;
629        ops[insert_idx] = Op::Insert;
630    }
631    Some(StreamChunk::from_parts(
632        ops,
633        DataChunk::new(columns, visibility),
634    ))
635}
636
637/// Recovers progress per vnode, so we know which to backfill.
638/// See how it decodes the state with the inline comments.
639pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
640    state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
641) -> StreamExecutorResult<Vec<(VirtualNode, BackfillStatePerVnode)>> {
642    debug_assert!(!state_table.vnodes().is_empty());
643    let vnodes = state_table.vnodes().iter_vnodes();
644    let mut result = Vec::with_capacity(state_table.vnodes().len());
645    // 1. Get the vnode keys, so we can get the state per vnode.
646    let vnode_keys = vnodes.map(|vnode| {
647        let datum: [Datum; 1] = [Some(vnode.to_scalar().into())];
648        datum
649    });
650    let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key));
651    // 2. Fetch the state for each vnode.
652    //    It should have the following schema, it should not contain vnode:
653    //    | pk | `backfill_finished` | `row_count` |
654    let state_for_vnodes = try_join_all(tasks).await?;
655    for (vnode, state_for_vnode) in state_table
656        .vnodes()
657        .iter_vnodes()
658        .zip_eq_debug(state_for_vnodes)
659    {
660        let backfill_progress = match state_for_vnode {
661            // There's some state, means there was progress made. It's either finished / in progress.
662            Some(row) => {
663                // 3. Decode the `snapshot_row_count`. Decode from the back, since
664                //    pk is variable length.
665                let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap();
666                let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64;
667
668                // 4. Decode the `is_finished` flag (whether backfill has finished).
669                //    Decode from the back, since pk is variable length.
670                let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap();
671                let vnode_is_finished = vnode_is_finished.as_ref().unwrap();
672
673                // 5. Decode the `current_pos`.
674                let current_pos = row.as_inner().get(..row.len() - 2).unwrap();
675                let current_pos = current_pos.into_owned_row();
676
677                // 6. Construct the in-memory state per vnode, based on the decoded state.
678                if *vnode_is_finished.as_bool() {
679                    BackfillStatePerVnode::new(
680                        BackfillProgressPerVnode::Completed {
681                            current_pos: current_pos.clone(),
682                            snapshot_row_count,
683                        },
684                        BackfillProgressPerVnode::Completed {
685                            current_pos,
686                            snapshot_row_count,
687                        },
688                    )
689                } else {
690                    BackfillStatePerVnode::new(
691                        BackfillProgressPerVnode::InProgress {
692                            current_pos: current_pos.clone(),
693                            snapshot_row_count,
694                        },
695                        BackfillProgressPerVnode::InProgress {
696                            current_pos,
697                            snapshot_row_count,
698                        },
699                    )
700                }
701            }
702            // No state, means no progress made.
703            None => BackfillStatePerVnode::new(
704                BackfillProgressPerVnode::NotStarted,
705                BackfillProgressPerVnode::NotStarted,
706            ),
707        };
708        result.push((vnode, backfill_progress));
709    }
710    assert_eq!(result.len(), state_table.vnodes().count_ones());
711    Ok(result)
712}
713
714/// Flush the data
715pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
716    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
717    epoch: EpochPair,
718    old_state: &mut Option<Vec<Datum>>,
719    current_partial_state: &mut [Datum],
720) -> StreamExecutorResult<()> {
721    let vnodes = table.vnodes().clone();
722    if let Some(old_state) = old_state {
723        if old_state[1..] != current_partial_state[1..] {
724            vnodes.iter_vnodes_scalar().for_each(|vnode| {
725                let datum = Some(vnode.into());
726                current_partial_state[0].clone_from(&datum);
727                old_state[0] = datum;
728                table.write_record(Record::Update {
729                    old_row: &old_state[..],
730                    new_row: &(*current_partial_state),
731                })
732            });
733        }
734    } else {
735        // No existing state, create a new entry.
736        vnodes.iter_vnodes_scalar().for_each(|vnode| {
737            let datum = Some(vnode.into());
738            // fill the state
739            current_partial_state[0] = datum;
740            table.write_record(Record::Insert {
741                new_row: &(*current_partial_state),
742            })
743        });
744    }
745    table.commit_assert_no_update_vnode_bitmap(epoch).await
746}
747
748/// We want to avoid allocating a row for every vnode.
749/// Instead we can just modify a single row, and dispatch it to state table to write.
750/// This builds the following segments of the row:
751/// 1. `current_pos`
752/// 2. `backfill_finished`
753/// 3. `row_count`
754pub(crate) fn build_temporary_state(
755    row_state: &mut [Datum],
756    is_finished: bool,
757    current_pos: &OwnedRow,
758    row_count: u64,
759) {
760    row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
761    row_state[current_pos.len() + 1] = Some(is_finished.into());
762    row_state[current_pos.len() + 2] = Some((row_count as i64).into());
763}
764
765/// Update backfill pos by vnode.
766pub(crate) fn update_pos_by_vnode(
767    vnode: VirtualNode,
768    chunk: &StreamChunk,
769    pk_in_output_indices: &[usize],
770    backfill_state: &mut BackfillState,
771    snapshot_row_count_delta: u64,
772) -> StreamExecutorResult<()> {
773    let new_pos = get_new_pos(chunk, pk_in_output_indices);
774    assert_eq!(new_pos.len(), pk_in_output_indices.len());
775    backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?;
776    Ok(())
777}
778
779/// Get new backfill pos from the chunk. Since chunk should have ordered rows, we can just take the
780/// last row.
781pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> OwnedRow {
782    chunk
783        .rows()
784        .last()
785        .unwrap()
786        .1
787        .project(pk_in_output_indices)
788        .into_owned_row()
789}
790
791pub(crate) fn get_cdc_chunk_last_offset(
792    offset_parse_func: &CdcOffsetParseFunc,
793    chunk: &StreamChunk,
794) -> StreamExecutorResult<Option<CdcOffset>> {
795    let row = chunk.rows().last().unwrap().1;
796    let offset_col = row.iter().last().unwrap();
797    let output =
798        offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?);
799    output.transpose().map_err(|e| e.into())
800}
801
802// NOTE(kwannoel): ["None" ..] encoding should be appropriate to mark
803// the case where upstream snapshot is empty.
804// This is so we can persist backfill state as "finished".
805// It won't be confused with another case where pk position comprised of nulls,
806// because they both record that backfill is finished.
807pub(crate) fn construct_initial_finished_state(pos_len: usize) -> OwnedRow {
808    OwnedRow::new(vec![None; pos_len])
809}
810
811pub(crate) fn compute_bounds(
812    pk_indices: &[usize],
813    current_pos: Option<OwnedRow>,
814) -> Option<(Bound<OwnedRow>, Bound<OwnedRow>)> {
815    // `current_pos` is None means it needs to scan from the beginning, so we use Unbounded to
816    // scan. Otherwise, use Excluded.
817    if let Some(current_pos) = current_pos {
818        // If `current_pos` is an empty row which means upstream mv contains only one row and it
819        // has been consumed. The iter interface doesn't support
820        // `Excluded(empty_row)` range bound, so we can simply return `None`.
821        if current_pos.is_empty() {
822            assert!(pk_indices.is_empty());
823            return None;
824        }
825
826        Some((Bound::Excluded(current_pos), Bound::Unbounded))
827    } else {
828        Some((Bound::Unbounded, Bound::Unbounded))
829    }
830}
831
832#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
833pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder)
834where
835    StreamExecutorError: From<E>,
836    R: Row,
837    S: Stream<Item = Result<R, E>> + Unpin + 'a,
838{
839    while let Some(data_chunk) = collect_data_chunk_with_builder(&mut iter, builder)
840        .instrument_await("backfill_snapshot_read")
841        .await?
842    {
843        debug_assert!(data_chunk.cardinality() > 0);
844        let ops = vec![Op::Insert; data_chunk.capacity()];
845        let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
846        yield stream_chunk;
847    }
848}
849
850/// Schema
851/// | vnode | pk | `backfill_finished` | `row_count` |
852/// Persists the state per vnode based on `BackfillState`.
853/// We track the current committed state via `committed_progress`
854/// so we know whether we need to persist the state or not.
855///
856/// The state is encoded as follows:
857/// `NotStarted`:
858/// - Not persist to store at all.
859///
860/// `InProgress`:
861/// - Format: | vnode | pk | false | `row_count` |
862/// - If change in current pos: Persist.
863/// - No change in current pos: Do not persist.
864///
865/// Completed
866/// - Format: | vnode | pk | true | `row_count` |
867/// - If previous state is `InProgress` / `NotStarted`: Persist.
868/// - If previous state is Completed: Do not persist.
869///
870/// TODO(kwannoel): we should check committed state to be all `finished` in the tests.
871/// TODO(kwannoel): Instead of persisting state per vnode each time,
872/// we can optimize by persisting state for a subset of vnodes which were updated.
873pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
874    epoch: EpochPair,
875    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
876    backfill_state: &mut BackfillState,
877    #[cfg(debug_assertions)] state_len: usize,
878    vnodes: impl Iterator<Item = VirtualNode>,
879) -> StreamExecutorResult<()> {
880    for vnode in vnodes {
881        if !backfill_state.need_commit(&vnode) {
882            continue;
883        }
884        let (encoded_prev_state, encoded_current_state) =
885            match backfill_state.get_commit_state(&vnode) {
886                Some((old_state, new_state)) => (old_state, new_state),
887                None => continue,
888            };
889        if let Some(encoded_prev_state) = encoded_prev_state {
890            // There's some progress, update the state.
891            #[cfg(debug_assertions)]
892            {
893                let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
894                // old_row only contains the value segment.
895                let old_row = table.get_row(pk).await?;
896                match old_row {
897                    Some(old_row) => {
898                        let inner = old_row.as_inner();
899                        // value segment (without vnode) should be used for comparison
900                        assert_eq!(inner, &encoded_prev_state[1..]);
901                        assert_ne!(inner, &encoded_current_state[1..]);
902                        assert_eq!(old_row.len(), state_len - 1);
903                        assert_eq!(encoded_current_state.len(), state_len);
904                    }
905                    None => {
906                        bail!("row {:#?} not found", pk);
907                    }
908                }
909            }
910            table.write_record(Record::Update {
911                old_row: &encoded_prev_state[..],
912                new_row: &encoded_current_state[..],
913            });
914        } else {
915            // No existing state, create a new entry.
916            #[cfg(debug_assertions)]
917            {
918                let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
919                let row = table.get_row(pk).await?;
920                assert!(row.is_none(), "row {:#?}", row);
921                assert_eq!(encoded_current_state.len(), state_len);
922            }
923            table.write_record(Record::Insert {
924                new_row: &encoded_current_state[..],
925            });
926        }
927        backfill_state.mark_committed(vnode);
928    }
929
930    table.commit_assert_no_update_vnode_bitmap(epoch).await?;
931    Ok(())
932}
933
934/// Schema
935/// | vnode | pk | `backfill_finished` | `row_count` |
936///
937/// For `current_pos` and `old_pos` are just pk of upstream.
938/// They should be strictly increasing.
939pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
940    epoch: EpochPair,
941    table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
942    is_finished: bool,
943    current_pos: &Option<OwnedRow>,
944    row_count: u64,
945    old_state: &mut Option<Vec<Datum>>,
946    current_state: &mut [Datum],
947) -> StreamExecutorResult<()> {
948    if let Some(current_pos_inner) = current_pos {
949        // state w/o vnodes.
950        build_temporary_state(current_state, is_finished, current_pos_inner, row_count);
951        flush_data(table, epoch, old_state, current_state).await?;
952        *old_state = Some(current_state.into());
953    } else {
954        table.commit_assert_no_update_vnode_bitmap(epoch).await?;
955    }
956    Ok(())
957}
958
959/// Creates a data chunk builder for snapshot read.
960/// If the `rate_limit` is smaller than `chunk_size`, it will take precedence.
961/// This is so we can partition snapshot read into smaller chunks than chunk size.
962pub fn create_builder(
963    rate_limit: RateLimit,
964    chunk_size: usize,
965    data_types: Vec<DataType>,
966) -> DataChunkBuilder {
967    let batch_size = match rate_limit {
968        RateLimit::Disabled | RateLimit::Pause => chunk_size,
969        RateLimit::Fixed(limit) => min(limit.get() as usize, chunk_size),
970    };
971    // Ensure that the batch size is at least 2, to have enough space for two rows in a single update.
972    let batch_size = max(2, batch_size);
973    DataChunkBuilder::new(data_types, batch_size)
974}
975
976#[cfg(test)]
977mod tests {
978    use std::sync::Arc;
979
980    use super::*;
981
982    #[test]
983    fn test_normalizing_unmatched_updates() {
984        let ops = vec![
985            Op::UpdateDelete,
986            Op::UpdateInsert,
987            Op::UpdateDelete,
988            Op::UpdateInsert,
989        ];
990        let ops: Arc<[Op]> = ops.into();
991
992        {
993            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
994            let mut unmatched_update_delete = true;
995            let mut visible_update_delete = true;
996            let current_visibility = true;
997            normalize_unmatched_updates(
998                &mut new_ops,
999                &mut unmatched_update_delete,
1000                &mut visible_update_delete,
1001                current_visibility,
1002                1,
1003                &Op::UpdateInsert,
1004            );
1005            assert_eq!(
1006                &new_ops[..],
1007                vec![
1008                    Op::UpdateDelete,
1009                    Op::UpdateInsert,
1010                    Op::UpdateDelete,
1011                    Op::UpdateInsert
1012                ]
1013            );
1014        }
1015        {
1016            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
1017            let mut unmatched_update_delete = true;
1018            let mut visible_update_delete = false;
1019            let current_visibility = false;
1020            normalize_unmatched_updates(
1021                &mut new_ops,
1022                &mut unmatched_update_delete,
1023                &mut visible_update_delete,
1024                current_visibility,
1025                1,
1026                &Op::UpdateInsert,
1027            );
1028            assert_eq!(
1029                &new_ops[..],
1030                vec![
1031                    Op::UpdateDelete,
1032                    Op::UpdateInsert,
1033                    Op::UpdateDelete,
1034                    Op::UpdateInsert
1035                ]
1036            );
1037        }
1038        {
1039            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
1040            let mut unmatched_update_delete = true;
1041            let mut visible_update_delete = true;
1042            let current_visibility = false;
1043            normalize_unmatched_updates(
1044                &mut new_ops,
1045                &mut unmatched_update_delete,
1046                &mut visible_update_delete,
1047                current_visibility,
1048                1,
1049                &Op::UpdateInsert,
1050            );
1051            assert_eq!(
1052                &new_ops[..],
1053                vec![
1054                    Op::Delete,
1055                    Op::UpdateInsert,
1056                    Op::UpdateDelete,
1057                    Op::UpdateInsert
1058                ]
1059            );
1060        }
1061        {
1062            let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
1063            let mut unmatched_update_delete = true;
1064            let mut visible_update_delete = false;
1065            let current_visibility = true;
1066            normalize_unmatched_updates(
1067                &mut new_ops,
1068                &mut unmatched_update_delete,
1069                &mut visible_update_delete,
1070                current_visibility,
1071                1,
1072                &Op::UpdateInsert,
1073            );
1074            assert_eq!(
1075                &new_ops[..],
1076                vec![
1077                    Op::UpdateDelete,
1078                    Op::Insert,
1079                    Op::UpdateDelete,
1080                    Op::UpdateInsert
1081                ]
1082            );
1083        }
1084    }
1085}