risingwave_stream/executor/backfill/snapshot_backfill/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use futures::TryFutureExt;
22use futures::future::try_join_all;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
25use risingwave_common::must_match;
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::types::{DataType, ScalarImpl};
28use risingwave_common::util::row_serde::OrderedRowSerde;
29
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub(super) enum EpochBackfillProgress {
32    Consuming { latest_pk: OwnedRow },
33    Consumed,
34}
35
36#[derive(Debug, Eq, PartialEq)]
37pub(super) struct VnodeBackfillProgress {
38    pub(super) epoch: u64,
39    pub(super) row_count: usize,
40    pub(super) progress: EpochBackfillProgress,
41}
42
43/// `vnode`, `epoch`, `row_count`, `is_finished`
44const EXTRA_COLUMN_TYPES: [DataType; 4] = [
45    DataType::Int16,
46    DataType::Int64,
47    DataType::Int64,
48    DataType::Boolean,
49];
50
51impl VnodeBackfillProgress {
52    fn validate_progress_table_schema(
53        progress_table_column_types: &[DataType],
54        upstream_pk_column_types: &[DataType],
55    ) -> StreamExecutorResult<()> {
56        if progress_table_column_types.len()
57            != EXTRA_COLUMN_TYPES.len() + upstream_pk_column_types.len()
58        {
59            return Err(anyhow!(
60                "progress table columns len not matched with the len derived from upstream table pk. progress table: {:?}, pk: {:?}",
61                progress_table_column_types,
62                upstream_pk_column_types)
63                .into()
64            );
65        }
66        for (expected_type, progress_table_type) in EXTRA_COLUMN_TYPES
67            .iter()
68            .chain(upstream_pk_column_types.iter())
69            .zip_eq_debug(progress_table_column_types.iter())
70        {
71            if expected_type != progress_table_type {
72                return Err(anyhow!(
73                    "progress table column not matched with upstream table schema: progress table: {:?}, pk: {:?}",
74                    progress_table_column_types,
75                    upstream_pk_column_types)
76                    .into()
77                );
78            }
79        }
80        Ok(())
81    }
82
83    pub(super) fn from_row(row: &OwnedRow, pk_serde: &OrderedRowSerde) -> Self {
84        assert_eq!(
85            row.len(),
86            pk_serde.get_data_types().len() + EXTRA_COLUMN_TYPES.len() - 1, /* Pk of the progress state table (i.e. vnode column) not included */
87        );
88        let epoch = must_match!(&row[0], Some(ScalarImpl::Int64(epoch)) => {
89           *epoch as u64
90        });
91        let row_count = must_match!(&row[1], Some(ScalarImpl::Int64(row_count)) => {
92           *row_count as usize
93        });
94        let is_finished = must_match!(&row[2], Some(ScalarImpl::Bool(is_finished)) => {
95           *is_finished
96        });
97        Self {
98            epoch,
99            row_count,
100            progress: if !is_finished {
101                EpochBackfillProgress::Consuming {
102                    latest_pk: row.slice(EXTRA_COLUMN_TYPES.len() - 1..).to_owned_row(),
103                }
104            } else {
105                row.slice(EXTRA_COLUMN_TYPES.len() - 1..)
106                    .iter()
107                    .enumerate()
108                    .for_each(|(i, datum)| {
109                        if datum.is_some() {
110                            if cfg!(debug_assertions) {
111                                panic!("get non-empty pk row: {:?}", row);
112                            } else {
113                                warn!(
114                                    i,
115                                    row = ?row,
116                                    "get non-empty pk row. will be ignore"
117                                );
118                            }
119                        }
120                    });
121                EpochBackfillProgress::Consumed
122            },
123        }
124    }
125
126    fn build_row<'a>(
127        &'a self,
128        vnode: VirtualNode,
129        consumed_pk_rows: &'a OwnedRow,
130    ) -> impl Row + 'a {
131        let (is_finished, pk) = match &self.progress {
132            EpochBackfillProgress::Consuming { latest_pk } => {
133                assert_eq!(latest_pk.len(), consumed_pk_rows.len());
134                (false, latest_pk)
135            }
136            EpochBackfillProgress::Consumed => (true, consumed_pk_rows),
137        };
138        [
139            Some(ScalarImpl::Int16(vnode.to_scalar())),
140            Some(ScalarImpl::Int64(self.epoch as _)),
141            Some(ScalarImpl::Int64(self.row_count as _)),
142            Some(ScalarImpl::Bool(is_finished)),
143        ]
144        .chain(pk)
145    }
146}
147
148#[derive(Debug, Eq, PartialEq)]
149enum VnodeBackfillState {
150    New(VnodeBackfillProgress),
151    Update {
152        latest: VnodeBackfillProgress,
153        committed: VnodeBackfillProgress,
154    },
155    Committed(VnodeBackfillProgress),
156}
157
158impl VnodeBackfillState {
159    fn update_inner(&mut self, latest_progress: VnodeBackfillProgress) {
160        let temp_place_holder = Self::temp_placeholder();
161        let prev_state = replace(self, temp_place_holder);
162        *self = match prev_state {
163            VnodeBackfillState::New(_) => VnodeBackfillState::New(latest_progress),
164            VnodeBackfillState::Update { committed, .. } => VnodeBackfillState::Update {
165                latest: latest_progress,
166                committed,
167            },
168            VnodeBackfillState::Committed(committed) => VnodeBackfillState::Update {
169                latest: latest_progress,
170                committed,
171            },
172        };
173    }
174
175    fn mark_committed(&mut self) {
176        *self = VnodeBackfillState::Committed(match replace(self, Self::temp_placeholder()) {
177            VnodeBackfillState::New(progress) => progress,
178            VnodeBackfillState::Update { latest, .. } => latest,
179            VnodeBackfillState::Committed(progress) => progress,
180        });
181    }
182
183    fn latest_progress(&self) -> &VnodeBackfillProgress {
184        match self {
185            VnodeBackfillState::New(progress) => progress,
186            VnodeBackfillState::Update { latest, .. } => latest,
187            VnodeBackfillState::Committed(progress) => progress,
188        }
189    }
190
191    fn temp_placeholder() -> Self {
192        Self::New(VnodeBackfillProgress {
193            epoch: 0,
194            row_count: 0,
195            progress: EpochBackfillProgress::Consumed,
196        })
197    }
198}
199
200use risingwave_common::util::epoch::EpochPair;
201use risingwave_common::util::iter_util::ZipEqDebug;
202use risingwave_storage::StateStore;
203
204use crate::common::table::state_table::StateTablePostCommit;
205use crate::executor::StreamExecutorResult;
206use crate::executor::prelude::StateTable;
207
208pub(super) struct BackfillState<S: StateStore> {
209    vnode_state: HashMap<VirtualNode, VnodeBackfillState>,
210    pk_serde: OrderedRowSerde,
211    consumed_pk_rows: OwnedRow,
212    state_table: StateTable<S>,
213}
214
215impl<S: StateStore> BackfillState<S> {
216    pub(super) async fn new(
217        mut state_table: StateTable<S>,
218        init_epoch: EpochPair,
219        pk_serde: OrderedRowSerde,
220    ) -> StreamExecutorResult<Self> {
221        VnodeBackfillProgress::validate_progress_table_schema(
222            state_table.get_data_types(),
223            pk_serde.get_data_types(),
224        )?;
225        state_table.init_epoch(init_epoch).await?;
226        let mut vnode_state = HashMap::new();
227        let committed_progress_row = Self::load_vnode_progress_row(&state_table).await?;
228        for (vnode, progress_row) in committed_progress_row {
229            let Some(progress_row) = progress_row else {
230                continue;
231            };
232            let progress = VnodeBackfillProgress::from_row(&progress_row, &pk_serde);
233            assert!(
234                vnode_state
235                    .insert(vnode, VnodeBackfillState::Committed(progress))
236                    .is_none()
237            );
238        }
239        let consumed_pk_rows = OwnedRow::new(vec![None; pk_serde.get_data_types().len()]);
240        Ok(Self {
241            vnode_state,
242            pk_serde,
243            consumed_pk_rows,
244            state_table,
245        })
246    }
247
248    async fn load_vnode_progress_row(
249        state_table: &StateTable<S>,
250    ) -> StreamExecutorResult<Vec<(VirtualNode, Option<OwnedRow>)>> {
251        let rows = try_join_all(state_table.vnodes().iter_vnodes().map(|vnode| {
252            state_table
253                .get_row([vnode.to_datum()])
254                .map_ok(move |progress_row| (vnode, progress_row))
255        }))
256        .await?;
257        Ok(rows)
258    }
259
260    fn update_progress(&mut self, vnode: VirtualNode, progress: VnodeBackfillProgress) {
261        match self.vnode_state.entry(vnode) {
262            Entry::Occupied(entry) => {
263                let state = entry.into_mut();
264                let prev_progress = state.latest_progress();
265                if prev_progress == &progress {
266                    // ignore if no update
267                    return;
268                }
269                // sanity check
270                {
271                    assert!(
272                        prev_progress.epoch <= progress.epoch,
273                        "progress epoch regress from {} to {}",
274                        prev_progress.epoch,
275                        progress.epoch
276                    );
277                    match &prev_progress.progress {
278                        EpochBackfillProgress::Consuming { latest_pk: prev_pk } => {
279                            if prev_progress.epoch == progress.epoch
280                                && let EpochBackfillProgress::Consuming { latest_pk: pk } =
281                                    &progress.progress
282                            {
283                                assert_eq!(pk.len(), self.pk_serde.get_data_types().len());
284                                assert!(prev_progress.row_count <= progress.row_count);
285                                if cfg!(debug_assertions) {
286                                    let mut prev_buf = vec![];
287                                    self.pk_serde.serialize(prev_pk, &mut prev_buf);
288                                    let mut buf = vec![];
289                                    self.pk_serde.serialize(pk, &mut buf);
290                                    assert!(
291                                        buf > prev_buf,
292                                        "new pk progress: {:?} not exceed prev pk progress: {:?}",
293                                        pk,
294                                        prev_pk
295                                    );
296                                }
297                            }
298                        }
299                        EpochBackfillProgress::Consumed => {
300                            assert!(
301                                prev_progress.epoch < progress.epoch,
302                                "{:?} {:?}",
303                                prev_progress,
304                                progress
305                            );
306                        }
307                    }
308                }
309                state.update_inner(progress);
310            }
311            Entry::Vacant(entry) => {
312                entry.insert(VnodeBackfillState::New(progress));
313            }
314        }
315    }
316
317    pub(super) fn update_epoch_progress(
318        &mut self,
319        vnode: VirtualNode,
320        epoch: u64,
321        row_count: usize,
322        pk: OwnedRow,
323    ) {
324        self.update_progress(
325            vnode,
326            VnodeBackfillProgress {
327                epoch,
328                row_count,
329                progress: EpochBackfillProgress::Consuming { latest_pk: pk },
330            },
331        )
332    }
333
334    pub(super) fn finish_epoch(&mut self, vnode: VirtualNode, epoch: u64, row_count: usize) {
335        self.update_progress(
336            vnode,
337            VnodeBackfillProgress {
338                epoch,
339                row_count,
340                progress: EpochBackfillProgress::Consumed,
341            },
342        );
343    }
344
345    pub(super) fn latest_progress(
346        &self,
347    ) -> impl Iterator<Item = (VirtualNode, Option<&VnodeBackfillProgress>)> {
348        self.state_table.vnodes().iter_vnodes().map(|vnode| {
349            (
350                vnode,
351                self.vnode_state
352                    .get(&vnode)
353                    .map(VnodeBackfillState::latest_progress),
354            )
355        })
356    }
357
358    pub(super) async fn commit(
359        &mut self,
360        barrier_epoch: EpochPair,
361    ) -> StreamExecutorResult<BackfillStatePostCommit<'_, S>> {
362        for (vnode, state) in &self.vnode_state {
363            match state {
364                VnodeBackfillState::New(progress) => {
365                    self.state_table
366                        .insert(progress.build_row(*vnode, &self.consumed_pk_rows));
367                }
368                VnodeBackfillState::Update { latest, committed } => {
369                    self.state_table.update(
370                        committed.build_row(*vnode, &self.consumed_pk_rows),
371                        latest.build_row(*vnode, &self.consumed_pk_rows),
372                    );
373                }
374                VnodeBackfillState::Committed(_) => {}
375            }
376        }
377        let post_commit = self.state_table.commit(barrier_epoch).await?;
378        self.vnode_state
379            .values_mut()
380            .for_each(VnodeBackfillState::mark_committed);
381        Ok(BackfillStatePostCommit {
382            inner: post_commit,
383            vnode_state: &mut self.vnode_state,
384            pk_serde: &self.pk_serde,
385        })
386    }
387}
388
389#[must_use]
390pub(super) struct BackfillStatePostCommit<'a, S: StateStore> {
391    inner: StateTablePostCommit<'a, S>,
392    vnode_state: &'a mut HashMap<VirtualNode, VnodeBackfillState>,
393    pk_serde: &'a OrderedRowSerde,
394}
395
396impl<S: StateStore> BackfillStatePostCommit<'_, S> {
397    pub(super) async fn post_yield_barrier(
398        self,
399        new_vnode_bitmap: Option<Arc<Bitmap>>,
400    ) -> StreamExecutorResult<Option<Arc<Bitmap>>> {
401        let new_vnode_bitmap = if let Some(((new_vnode_bitmap, prev_vnode_bitmap, state), _)) =
402            self.inner.post_yield_barrier(new_vnode_bitmap).await?
403        {
404            Self::update_vnode_bitmap(&*state, self.vnode_state, self.pk_serde, prev_vnode_bitmap)
405                .await?;
406            Some(new_vnode_bitmap)
407        } else {
408            None
409        };
410        Ok(new_vnode_bitmap)
411    }
412
413    async fn update_vnode_bitmap(
414        state_table: &StateTable<S>,
415        vnode_state: &mut HashMap<VirtualNode, VnodeBackfillState>,
416        pk_serde: &OrderedRowSerde,
417        prev_vnode_bitmap: Arc<Bitmap>,
418    ) -> StreamExecutorResult<()> {
419        let committed_progress_rows = BackfillState::load_vnode_progress_row(state_table).await?;
420        let mut new_state = HashMap::new();
421        for (vnode, progress_row) in committed_progress_rows {
422            if let Some(progress_row) = progress_row {
423                let progress = VnodeBackfillProgress::from_row(&progress_row, pk_serde);
424                assert!(
425                    new_state
426                        .insert(vnode, VnodeBackfillState::Committed(progress))
427                        .is_none()
428                );
429            }
430
431            if prev_vnode_bitmap.is_set(vnode.to_index()) {
432                // if the vnode exist previously, the new state should be the same as the previous one
433                assert_eq!(vnode_state.get(&vnode), new_state.get(&vnode));
434            }
435        }
436        *vnode_state = new_state;
437        Ok(())
438    }
439}