1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::replace;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use futures::TryFutureExt;
22use futures::future::try_join_all;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
25use risingwave_common::must_match;
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::types::{DataType, ScalarImpl};
28use risingwave_common::util::row_serde::OrderedRowSerde;
29
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub(super) enum EpochBackfillProgress {
32 Consuming { latest_pk: OwnedRow },
33 Consumed,
34}
35
36#[derive(Debug, Eq, PartialEq)]
37pub(super) struct VnodeBackfillProgress {
38 pub(super) epoch: u64,
39 pub(super) row_count: usize,
40 pub(super) progress: EpochBackfillProgress,
41}
42
43const EXTRA_COLUMN_TYPES: [DataType; 4] = [
45 DataType::Int16,
46 DataType::Int64,
47 DataType::Int64,
48 DataType::Boolean,
49];
50
51impl VnodeBackfillProgress {
52 fn validate_progress_table_schema(
53 progress_table_column_types: &[DataType],
54 upstream_pk_column_types: &[DataType],
55 ) -> StreamExecutorResult<()> {
56 if progress_table_column_types.len()
57 != EXTRA_COLUMN_TYPES.len() + upstream_pk_column_types.len()
58 {
59 return Err(anyhow!(
60 "progress table columns len not matched with the len derived from upstream table pk. progress table: {:?}, pk: {:?}",
61 progress_table_column_types,
62 upstream_pk_column_types)
63 .into()
64 );
65 }
66 for (expected_type, progress_table_type) in EXTRA_COLUMN_TYPES
67 .iter()
68 .chain(upstream_pk_column_types.iter())
69 .zip_eq_debug(progress_table_column_types.iter())
70 {
71 if expected_type != progress_table_type {
72 return Err(anyhow!(
73 "progress table column not matched with upstream table schema: progress table: {:?}, pk: {:?}",
74 progress_table_column_types,
75 upstream_pk_column_types)
76 .into()
77 );
78 }
79 }
80 Ok(())
81 }
82
83 pub(super) fn from_row(row: &OwnedRow, pk_serde: &OrderedRowSerde) -> Self {
84 assert_eq!(
85 row.len(),
86 pk_serde.get_data_types().len() + EXTRA_COLUMN_TYPES.len() - 1, );
88 let epoch = must_match!(&row[0], Some(ScalarImpl::Int64(epoch)) => {
89 *epoch as u64
90 });
91 let row_count = must_match!(&row[1], Some(ScalarImpl::Int64(row_count)) => {
92 *row_count as usize
93 });
94 let is_finished = must_match!(&row[2], Some(ScalarImpl::Bool(is_finished)) => {
95 *is_finished
96 });
97 Self {
98 epoch,
99 row_count,
100 progress: if !is_finished {
101 EpochBackfillProgress::Consuming {
102 latest_pk: row.slice(EXTRA_COLUMN_TYPES.len() - 1..).to_owned_row(),
103 }
104 } else {
105 row.slice(EXTRA_COLUMN_TYPES.len() - 1..)
106 .iter()
107 .enumerate()
108 .for_each(|(i, datum)| {
109 if datum.is_some() {
110 if cfg!(debug_assertions) {
111 panic!("get non-empty pk row: {:?}", row);
112 } else {
113 warn!(
114 i,
115 row = ?row,
116 "get non-empty pk row. will be ignore"
117 );
118 }
119 }
120 });
121 EpochBackfillProgress::Consumed
122 },
123 }
124 }
125
126 fn build_row<'a>(
127 &'a self,
128 vnode: VirtualNode,
129 consumed_pk_rows: &'a OwnedRow,
130 ) -> impl Row + 'a {
131 let (is_finished, pk) = match &self.progress {
132 EpochBackfillProgress::Consuming { latest_pk } => {
133 assert_eq!(latest_pk.len(), consumed_pk_rows.len());
134 (false, latest_pk)
135 }
136 EpochBackfillProgress::Consumed => (true, consumed_pk_rows),
137 };
138 [
139 Some(ScalarImpl::Int16(vnode.to_scalar())),
140 Some(ScalarImpl::Int64(self.epoch as _)),
141 Some(ScalarImpl::Int64(self.row_count as _)),
142 Some(ScalarImpl::Bool(is_finished)),
143 ]
144 .chain(pk)
145 }
146}
147
148#[derive(Debug, Eq, PartialEq)]
149enum VnodeBackfillState {
150 New(VnodeBackfillProgress),
151 Update {
152 latest: VnodeBackfillProgress,
153 committed: VnodeBackfillProgress,
154 },
155 Committed(VnodeBackfillProgress),
156}
157
158impl VnodeBackfillState {
159 fn update_inner(&mut self, latest_progress: VnodeBackfillProgress) {
160 let temp_place_holder = Self::temp_placeholder();
161 let prev_state = replace(self, temp_place_holder);
162 *self = match prev_state {
163 VnodeBackfillState::New(_) => VnodeBackfillState::New(latest_progress),
164 VnodeBackfillState::Update { committed, .. } => VnodeBackfillState::Update {
165 latest: latest_progress,
166 committed,
167 },
168 VnodeBackfillState::Committed(committed) => VnodeBackfillState::Update {
169 latest: latest_progress,
170 committed,
171 },
172 };
173 }
174
175 fn mark_committed(&mut self) {
176 *self = VnodeBackfillState::Committed(match replace(self, Self::temp_placeholder()) {
177 VnodeBackfillState::New(progress) => progress,
178 VnodeBackfillState::Update { latest, .. } => latest,
179 VnodeBackfillState::Committed(progress) => progress,
180 });
181 }
182
183 fn latest_progress(&self) -> &VnodeBackfillProgress {
184 match self {
185 VnodeBackfillState::New(progress) => progress,
186 VnodeBackfillState::Update { latest, .. } => latest,
187 VnodeBackfillState::Committed(progress) => progress,
188 }
189 }
190
191 fn temp_placeholder() -> Self {
192 Self::New(VnodeBackfillProgress {
193 epoch: 0,
194 row_count: 0,
195 progress: EpochBackfillProgress::Consumed,
196 })
197 }
198}
199
200use risingwave_common::util::epoch::EpochPair;
201use risingwave_common::util::iter_util::ZipEqDebug;
202use risingwave_storage::StateStore;
203
204use crate::common::table::state_table::StateTablePostCommit;
205use crate::executor::StreamExecutorResult;
206use crate::executor::prelude::StateTable;
207
208pub(super) struct BackfillState<S: StateStore> {
209 vnode_state: HashMap<VirtualNode, VnodeBackfillState>,
210 pk_serde: OrderedRowSerde,
211 consumed_pk_rows: OwnedRow,
212 state_table: StateTable<S>,
213}
214
215impl<S: StateStore> BackfillState<S> {
216 pub(super) async fn new(
217 mut state_table: StateTable<S>,
218 init_epoch: EpochPair,
219 pk_serde: OrderedRowSerde,
220 ) -> StreamExecutorResult<Self> {
221 VnodeBackfillProgress::validate_progress_table_schema(
222 state_table.get_data_types(),
223 pk_serde.get_data_types(),
224 )?;
225 state_table.init_epoch(init_epoch).await?;
226 let mut vnode_state = HashMap::new();
227 let committed_progress_row = Self::load_vnode_progress_row(&state_table).await?;
228 for (vnode, progress_row) in committed_progress_row {
229 let Some(progress_row) = progress_row else {
230 continue;
231 };
232 let progress = VnodeBackfillProgress::from_row(&progress_row, &pk_serde);
233 assert!(
234 vnode_state
235 .insert(vnode, VnodeBackfillState::Committed(progress))
236 .is_none()
237 );
238 }
239 let consumed_pk_rows = OwnedRow::new(vec![None; pk_serde.get_data_types().len()]);
240 Ok(Self {
241 vnode_state,
242 pk_serde,
243 consumed_pk_rows,
244 state_table,
245 })
246 }
247
248 async fn load_vnode_progress_row(
249 state_table: &StateTable<S>,
250 ) -> StreamExecutorResult<Vec<(VirtualNode, Option<OwnedRow>)>> {
251 let rows = try_join_all(state_table.vnodes().iter_vnodes().map(|vnode| {
252 state_table
253 .get_row([vnode.to_datum()])
254 .map_ok(move |progress_row| (vnode, progress_row))
255 }))
256 .await?;
257 Ok(rows)
258 }
259
260 fn update_progress(&mut self, vnode: VirtualNode, progress: VnodeBackfillProgress) {
261 match self.vnode_state.entry(vnode) {
262 Entry::Occupied(entry) => {
263 let state = entry.into_mut();
264 let prev_progress = state.latest_progress();
265 if prev_progress == &progress {
266 return;
268 }
269 {
271 assert!(
272 prev_progress.epoch <= progress.epoch,
273 "progress epoch regress from {} to {}",
274 prev_progress.epoch,
275 progress.epoch
276 );
277 match &prev_progress.progress {
278 EpochBackfillProgress::Consuming { latest_pk: prev_pk } => {
279 if prev_progress.epoch == progress.epoch
280 && let EpochBackfillProgress::Consuming { latest_pk: pk } =
281 &progress.progress
282 {
283 assert_eq!(pk.len(), self.pk_serde.get_data_types().len());
284 assert!(prev_progress.row_count <= progress.row_count);
285 if cfg!(debug_assertions) {
286 let mut prev_buf = vec![];
287 self.pk_serde.serialize(prev_pk, &mut prev_buf);
288 let mut buf = vec![];
289 self.pk_serde.serialize(pk, &mut buf);
290 assert!(
291 buf > prev_buf,
292 "new pk progress: {:?} not exceed prev pk progress: {:?}",
293 pk,
294 prev_pk
295 );
296 }
297 }
298 }
299 EpochBackfillProgress::Consumed => {
300 assert!(
301 prev_progress.epoch < progress.epoch,
302 "{:?} {:?}",
303 prev_progress,
304 progress
305 );
306 }
307 }
308 }
309 state.update_inner(progress);
310 }
311 Entry::Vacant(entry) => {
312 entry.insert(VnodeBackfillState::New(progress));
313 }
314 }
315 }
316
317 pub(super) fn update_epoch_progress(
318 &mut self,
319 vnode: VirtualNode,
320 epoch: u64,
321 row_count: usize,
322 pk: OwnedRow,
323 ) {
324 self.update_progress(
325 vnode,
326 VnodeBackfillProgress {
327 epoch,
328 row_count,
329 progress: EpochBackfillProgress::Consuming { latest_pk: pk },
330 },
331 )
332 }
333
334 pub(super) fn finish_epoch(&mut self, vnode: VirtualNode, epoch: u64, row_count: usize) {
335 self.update_progress(
336 vnode,
337 VnodeBackfillProgress {
338 epoch,
339 row_count,
340 progress: EpochBackfillProgress::Consumed,
341 },
342 );
343 }
344
345 pub(super) fn latest_progress(
346 &self,
347 ) -> impl Iterator<Item = (VirtualNode, Option<&VnodeBackfillProgress>)> {
348 self.state_table.vnodes().iter_vnodes().map(|vnode| {
349 (
350 vnode,
351 self.vnode_state
352 .get(&vnode)
353 .map(VnodeBackfillState::latest_progress),
354 )
355 })
356 }
357
358 pub(super) async fn commit(
359 &mut self,
360 barrier_epoch: EpochPair,
361 ) -> StreamExecutorResult<BackfillStatePostCommit<'_, S>> {
362 for (vnode, state) in &self.vnode_state {
363 match state {
364 VnodeBackfillState::New(progress) => {
365 self.state_table
366 .insert(progress.build_row(*vnode, &self.consumed_pk_rows));
367 }
368 VnodeBackfillState::Update { latest, committed } => {
369 self.state_table.update(
370 committed.build_row(*vnode, &self.consumed_pk_rows),
371 latest.build_row(*vnode, &self.consumed_pk_rows),
372 );
373 }
374 VnodeBackfillState::Committed(_) => {}
375 }
376 }
377 let post_commit = self.state_table.commit(barrier_epoch).await?;
378 self.vnode_state
379 .values_mut()
380 .for_each(VnodeBackfillState::mark_committed);
381 Ok(BackfillStatePostCommit {
382 inner: post_commit,
383 vnode_state: &mut self.vnode_state,
384 pk_serde: &self.pk_serde,
385 })
386 }
387}
388
389#[must_use]
390pub(super) struct BackfillStatePostCommit<'a, S: StateStore> {
391 inner: StateTablePostCommit<'a, S>,
392 vnode_state: &'a mut HashMap<VirtualNode, VnodeBackfillState>,
393 pk_serde: &'a OrderedRowSerde,
394}
395
396impl<S: StateStore> BackfillStatePostCommit<'_, S> {
397 pub(super) async fn post_yield_barrier(
398 self,
399 new_vnode_bitmap: Option<Arc<Bitmap>>,
400 ) -> StreamExecutorResult<Option<Arc<Bitmap>>> {
401 let new_vnode_bitmap = if let Some(((new_vnode_bitmap, prev_vnode_bitmap, state), _)) =
402 self.inner.post_yield_barrier(new_vnode_bitmap).await?
403 {
404 Self::update_vnode_bitmap(&*state, self.vnode_state, self.pk_serde, prev_vnode_bitmap)
405 .await?;
406 Some(new_vnode_bitmap)
407 } else {
408 None
409 };
410 Ok(new_vnode_bitmap)
411 }
412
413 async fn update_vnode_bitmap(
414 state_table: &StateTable<S>,
415 vnode_state: &mut HashMap<VirtualNode, VnodeBackfillState>,
416 pk_serde: &OrderedRowSerde,
417 prev_vnode_bitmap: Arc<Bitmap>,
418 ) -> StreamExecutorResult<()> {
419 let committed_progress_rows = BackfillState::load_vnode_progress_row(state_table).await?;
420 let mut new_state = HashMap::new();
421 for (vnode, progress_row) in committed_progress_rows {
422 if let Some(progress_row) = progress_row {
423 let progress = VnodeBackfillProgress::from_row(&progress_row, pk_serde);
424 assert!(
425 new_state
426 .insert(vnode, VnodeBackfillState::Committed(progress))
427 .is_none()
428 );
429 }
430
431 if prev_vnode_bitmap.is_set(vnode.to_index()) {
432 assert_eq!(vnode_state.get(&vnode), new_state.get(&vnode));
434 }
435 }
436 *vnode_state = new_state;
437 Ok(())
438 }
439}