risingwave_stream/executor/backfill/snapshot_backfill/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use futures::TryFutureExt;
22use futures::future::try_join_all;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
25use risingwave_common::must_match;
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::types::{DataType, ScalarImpl};
28use risingwave_common::util::row_serde::OrderedRowSerde;
29
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub(super) enum EpochBackfillProgress {
32    Consuming { latest_pk: OwnedRow },
33    Consumed,
34}
35
36#[derive(Debug, Eq, PartialEq)]
37pub(super) struct VnodeBackfillProgress {
38    pub(super) epoch: u64,
39    pub(super) row_count: usize,
40    pub(super) progress: EpochBackfillProgress,
41}
42
43/// `vnode`, `epoch`, `row_count`, `is_finished`
44const EXTRA_COLUMN_TYPES: [DataType; 4] = [
45    DataType::Int16,
46    DataType::Int64,
47    DataType::Int64,
48    DataType::Boolean,
49];
50
51impl VnodeBackfillProgress {
52    fn validate_progress_table_schema(
53        progress_table_column_types: &[DataType],
54        upstream_pk_column_types: &[DataType],
55    ) -> StreamExecutorResult<()> {
56        if progress_table_column_types.len()
57            != EXTRA_COLUMN_TYPES.len() + upstream_pk_column_types.len()
58        {
59            return Err(anyhow!(
60                "progress table columns len not matched with the len derived from upstream table pk. progress table: {:?}, pk: {:?}",
61                progress_table_column_types,
62                upstream_pk_column_types)
63                .into()
64            );
65        }
66        for (expected_type, progress_table_type) in EXTRA_COLUMN_TYPES
67            .iter()
68            .chain(upstream_pk_column_types.iter())
69            .zip_eq_debug(progress_table_column_types.iter())
70        {
71            if expected_type != progress_table_type {
72                return Err(anyhow!(
73                    "progress table column not matched with upstream table schema: progress table: {:?}, pk: {:?}",
74                    progress_table_column_types,
75                    upstream_pk_column_types)
76                    .into()
77                );
78            }
79        }
80        Ok(())
81    }
82
83    pub(super) fn from_row(row: &OwnedRow, pk_serde: &OrderedRowSerde) -> Self {
84        assert_eq!(
85            row.len(),
86            pk_serde.get_data_types().len() + EXTRA_COLUMN_TYPES.len() - 1, /* Pk of the progress state table (i.e. vnode column) not included */
87        );
88        let epoch = must_match!(&row[0], Some(ScalarImpl::Int64(epoch)) => {
89           *epoch as u64
90        });
91        let row_count = must_match!(&row[1], Some(ScalarImpl::Int64(row_count)) => {
92           *row_count as usize
93        });
94        let is_finished = must_match!(&row[2], Some(ScalarImpl::Bool(is_finished)) => {
95           *is_finished
96        });
97        Self {
98            epoch,
99            row_count,
100            progress: if !is_finished {
101                EpochBackfillProgress::Consuming {
102                    latest_pk: row.slice(EXTRA_COLUMN_TYPES.len() - 1..).to_owned_row(),
103                }
104            } else {
105                row.slice(EXTRA_COLUMN_TYPES.len() - 1..)
106                    .iter()
107                    .enumerate()
108                    .for_each(|(i, datum)| {
109                        if datum.is_some() {
110                            if cfg!(debug_assertions) {
111                                panic!("get non-empty pk row: {:?}", row);
112                            } else {
113                                warn!(
114                                    i,
115                                    row = ?row,
116                                    "get non-empty pk row. will be ignore"
117                                );
118                            }
119                        }
120                    });
121                EpochBackfillProgress::Consumed
122            },
123        }
124    }
125
126    fn build_row<'a>(
127        &'a self,
128        vnode: VirtualNode,
129        consumed_pk_rows: &'a OwnedRow,
130    ) -> impl Row + 'a {
131        let (is_finished, pk) = match &self.progress {
132            EpochBackfillProgress::Consuming { latest_pk } => {
133                assert_eq!(latest_pk.len(), consumed_pk_rows.len());
134                (false, latest_pk)
135            }
136            EpochBackfillProgress::Consumed => (true, consumed_pk_rows),
137        };
138        [
139            Some(ScalarImpl::Int16(vnode.to_scalar())),
140            Some(ScalarImpl::Int64(self.epoch as _)),
141            Some(ScalarImpl::Int64(self.row_count as _)),
142            Some(ScalarImpl::Bool(is_finished)),
143        ]
144        .chain(pk)
145    }
146}
147
148#[derive(Debug, Eq, PartialEq)]
149enum VnodeBackfillState {
150    New(VnodeBackfillProgress),
151    Update {
152        latest: VnodeBackfillProgress,
153        committed: VnodeBackfillProgress,
154    },
155    Committed(VnodeBackfillProgress),
156}
157
158impl VnodeBackfillState {
159    fn update_inner(&mut self, latest_progress: VnodeBackfillProgress) {
160        let temp_place_holder = Self::temp_placeholder();
161        let prev_state = replace(self, temp_place_holder);
162        *self = match prev_state {
163            VnodeBackfillState::New(_) => VnodeBackfillState::New(latest_progress),
164            VnodeBackfillState::Update { committed, .. } => VnodeBackfillState::Update {
165                latest: latest_progress,
166                committed,
167            },
168            VnodeBackfillState::Committed(committed) => VnodeBackfillState::Update {
169                latest: latest_progress,
170                committed,
171            },
172        };
173    }
174
175    fn mark_committed(&mut self) {
176        *self = VnodeBackfillState::Committed(match replace(self, Self::temp_placeholder()) {
177            VnodeBackfillState::New(progress) => progress,
178            VnodeBackfillState::Update { latest, .. } => latest,
179            VnodeBackfillState::Committed(progress) => progress,
180        });
181    }
182
183    fn latest_progress(&self) -> &VnodeBackfillProgress {
184        match self {
185            VnodeBackfillState::New(progress) => progress,
186            VnodeBackfillState::Update { latest, .. } => latest,
187            VnodeBackfillState::Committed(progress) => progress,
188        }
189    }
190
191    fn temp_placeholder() -> Self {
192        Self::New(VnodeBackfillProgress {
193            epoch: 0,
194            row_count: 0,
195            progress: EpochBackfillProgress::Consumed,
196        })
197    }
198}
199
200use risingwave_common::util::epoch::EpochPair;
201use risingwave_common::util::iter_util::ZipEqDebug;
202use risingwave_storage::StateStore;
203
204use crate::common::table::state_table::StateTablePostCommit;
205use crate::executor::StreamExecutorResult;
206use crate::executor::prelude::StateTable;
207
208pub(super) struct BackfillState<S: StateStore> {
209    vnode_state: HashMap<VirtualNode, VnodeBackfillState>,
210    pk_serde: OrderedRowSerde,
211    consumed_pk_rows: OwnedRow,
212    state_table: StateTable<S>,
213}
214
215impl<S: StateStore> BackfillState<S> {
216    pub(super) async fn new(
217        mut state_table: StateTable<S>,
218        init_epoch: EpochPair,
219        pk_serde: OrderedRowSerde,
220    ) -> StreamExecutorResult<Self> {
221        VnodeBackfillProgress::validate_progress_table_schema(
222            state_table.get_data_types(),
223            pk_serde.get_data_types(),
224        )?;
225        state_table.init_epoch(init_epoch).await?;
226        let mut vnode_state = HashMap::new();
227        let committed_progress_row = Self::load_vnode_progress_row(&state_table).await?;
228        for (vnode, progress_row) in committed_progress_row {
229            let Some(progress_row) = progress_row else {
230                continue;
231            };
232            let progress = VnodeBackfillProgress::from_row(&progress_row, &pk_serde);
233            debug!(?vnode, ?progress, "load initial progress");
234            assert!(
235                vnode_state
236                    .insert(vnode, VnodeBackfillState::Committed(progress))
237                    .is_none()
238            );
239        }
240        let consumed_pk_rows = OwnedRow::new(vec![None; pk_serde.get_data_types().len()]);
241        Ok(Self {
242            vnode_state,
243            pk_serde,
244            consumed_pk_rows,
245            state_table,
246        })
247    }
248
249    async fn load_vnode_progress_row(
250        state_table: &StateTable<S>,
251    ) -> StreamExecutorResult<Vec<(VirtualNode, Option<OwnedRow>)>> {
252        let rows = try_join_all(state_table.vnodes().iter_vnodes().map(|vnode| {
253            state_table
254                .get_row([vnode.to_datum()])
255                .map_ok(move |progress_row| (vnode, progress_row))
256        }))
257        .await?;
258        Ok(rows)
259    }
260
261    fn update_progress(&mut self, vnode: VirtualNode, progress: VnodeBackfillProgress) {
262        match self.vnode_state.entry(vnode) {
263            Entry::Occupied(entry) => {
264                let state = entry.into_mut();
265                let prev_progress = state.latest_progress();
266                if prev_progress == &progress {
267                    // ignore if no update
268                    return;
269                }
270                // sanity check
271                {
272                    assert!(
273                        prev_progress.epoch <= progress.epoch,
274                        "progress epoch regress from {} to {}",
275                        prev_progress.epoch,
276                        progress.epoch
277                    );
278                    match &prev_progress.progress {
279                        EpochBackfillProgress::Consuming { latest_pk: prev_pk } => {
280                            if prev_progress.epoch == progress.epoch
281                                && let EpochBackfillProgress::Consuming { latest_pk: pk } =
282                                    &progress.progress
283                            {
284                                assert_eq!(pk.len(), self.pk_serde.get_data_types().len());
285                                assert!(
286                                    prev_progress.row_count <= progress.row_count,
287                                    "{} <= {}, vnode: {:?}",
288                                    prev_progress.row_count,
289                                    progress.row_count,
290                                    vnode,
291                                );
292                                if cfg!(debug_assertions) {
293                                    let mut prev_buf = vec![];
294                                    self.pk_serde.serialize(prev_pk, &mut prev_buf);
295                                    let mut buf = vec![];
296                                    self.pk_serde.serialize(pk, &mut buf);
297                                    assert!(
298                                        buf > prev_buf,
299                                        "new pk progress: {:?} not exceed prev pk progress: {:?}",
300                                        pk,
301                                        prev_pk
302                                    );
303                                }
304                            }
305                        }
306                        EpochBackfillProgress::Consumed => {
307                            assert!(
308                                prev_progress.epoch < progress.epoch,
309                                "{:?} {:?}",
310                                prev_progress,
311                                progress
312                            );
313                        }
314                    }
315                }
316                state.update_inner(progress);
317            }
318            Entry::Vacant(entry) => {
319                entry.insert(VnodeBackfillState::New(progress));
320            }
321        }
322    }
323
324    pub(super) fn update_epoch_progress(
325        &mut self,
326        vnode: VirtualNode,
327        epoch: u64,
328        row_count: usize,
329        pk: OwnedRow,
330    ) {
331        self.update_progress(
332            vnode,
333            VnodeBackfillProgress {
334                epoch,
335                row_count,
336                progress: EpochBackfillProgress::Consuming { latest_pk: pk },
337            },
338        )
339    }
340
341    pub(super) fn finish_epoch(&mut self, vnode: VirtualNode, epoch: u64, row_count: usize) {
342        self.update_progress(
343            vnode,
344            VnodeBackfillProgress {
345                epoch,
346                row_count,
347                progress: EpochBackfillProgress::Consumed,
348            },
349        );
350    }
351
352    pub(super) fn latest_progress(
353        &self,
354    ) -> impl Iterator<Item = (VirtualNode, Option<&VnodeBackfillProgress>)> {
355        self.state_table.vnodes().iter_vnodes().map(|vnode| {
356            (
357                vnode,
358                self.vnode_state
359                    .get(&vnode)
360                    .map(VnodeBackfillState::latest_progress),
361            )
362        })
363    }
364
365    pub(super) async fn commit(
366        &mut self,
367        barrier_epoch: EpochPair,
368    ) -> StreamExecutorResult<BackfillStatePostCommit<'_, S>> {
369        for (vnode, state) in &self.vnode_state {
370            match state {
371                VnodeBackfillState::New(progress) => {
372                    self.state_table
373                        .insert(progress.build_row(*vnode, &self.consumed_pk_rows));
374                }
375                VnodeBackfillState::Update { latest, committed } => {
376                    self.state_table.update(
377                        committed.build_row(*vnode, &self.consumed_pk_rows),
378                        latest.build_row(*vnode, &self.consumed_pk_rows),
379                    );
380                }
381                VnodeBackfillState::Committed(_) => {}
382            }
383        }
384        let post_commit = self.state_table.commit(barrier_epoch).await?;
385        self.vnode_state
386            .values_mut()
387            .for_each(VnodeBackfillState::mark_committed);
388        Ok(BackfillStatePostCommit {
389            inner: post_commit,
390            vnode_state: &mut self.vnode_state,
391            pk_serde: &self.pk_serde,
392        })
393    }
394}
395
396#[must_use]
397pub(super) struct BackfillStatePostCommit<'a, S: StateStore> {
398    inner: StateTablePostCommit<'a, S>,
399    vnode_state: &'a mut HashMap<VirtualNode, VnodeBackfillState>,
400    pk_serde: &'a OrderedRowSerde,
401}
402
403impl<S: StateStore> BackfillStatePostCommit<'_, S> {
404    pub(super) async fn post_yield_barrier(
405        self,
406        new_vnode_bitmap: Option<Arc<Bitmap>>,
407    ) -> StreamExecutorResult<Option<Arc<Bitmap>>> {
408        let new_vnode_bitmap = if let Some(((new_vnode_bitmap, prev_vnode_bitmap, state), _)) =
409            self.inner.post_yield_barrier(new_vnode_bitmap).await?
410        {
411            Self::update_vnode_bitmap(&*state, self.vnode_state, self.pk_serde, prev_vnode_bitmap)
412                .await?;
413            Some(new_vnode_bitmap)
414        } else {
415            None
416        };
417        Ok(new_vnode_bitmap)
418    }
419
420    async fn update_vnode_bitmap(
421        state_table: &StateTable<S>,
422        vnode_state: &mut HashMap<VirtualNode, VnodeBackfillState>,
423        pk_serde: &OrderedRowSerde,
424        prev_vnode_bitmap: Arc<Bitmap>,
425    ) -> StreamExecutorResult<()> {
426        let committed_progress_rows = BackfillState::load_vnode_progress_row(state_table).await?;
427        let mut new_state = HashMap::new();
428        for (vnode, progress_row) in committed_progress_rows {
429            if let Some(progress_row) = progress_row {
430                let progress = VnodeBackfillProgress::from_row(&progress_row, pk_serde);
431                assert!(
432                    new_state
433                        .insert(vnode, VnodeBackfillState::Committed(progress))
434                        .is_none()
435                );
436            }
437
438            if prev_vnode_bitmap.is_set(vnode.to_index()) {
439                // if the vnode exist previously, the new state should be the same as the previous one
440                assert_eq!(vnode_state.get(&vnode), new_state.get(&vnode));
441            }
442        }
443        *vnode_state = new_state;
444        Ok(())
445    }
446}