1use std::collections::HashMap;
16use std::sync::Arc;
17
18use either::Either;
19use futures::stream::select_with_strategy;
20use futures::{TryStreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::{DataChunk, Op, StreamChunk};
24use risingwave_common::catalog::Schema;
25use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::types::{Datum, ToOwnedDatum};
28use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
29use risingwave_common::util::sort_util::cmp_datum_iter;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_storage::StateStore;
32use risingwave_storage::store::PrefetchOptions;
33
34use crate::common::table::state_table::StateTable;
35use crate::executor::backfill::utils::create_builder;
36use crate::executor::prelude::*;
37use crate::task::{CreateMviewProgressReporter, FragmentId};
38
39type Builders = HashMap<VirtualNode, DataChunkBuilder>;
40
41#[derive(Clone, Debug, PartialEq, Eq)]
43enum LocalityBackfillProgress {
44 NotStarted,
46 InProgress {
48 current_pos: OwnedRow,
50 processed_rows: u64,
52 },
53 Completed {
55 final_pos: OwnedRow,
57 total_rows: u64,
59 },
60}
61
62#[derive(Clone, Debug)]
64struct LocalityBackfillState {
65 per_vnode: HashMap<VirtualNode, LocalityBackfillProgress>,
67 total_snapshot_rows: u64,
69}
70
71impl LocalityBackfillState {
72 fn new(vnodes: impl Iterator<Item = VirtualNode>) -> Self {
73 let per_vnode = vnodes
74 .map(|vnode| (vnode, LocalityBackfillProgress::NotStarted))
75 .collect();
76 Self {
77 per_vnode,
78 total_snapshot_rows: 0,
79 }
80 }
81
82 fn is_completed(&self) -> bool {
83 self.per_vnode
84 .values()
85 .all(|progress| matches!(progress, LocalityBackfillProgress::Completed { .. }))
86 }
87
88 fn vnodes(&self) -> impl Iterator<Item = (VirtualNode, &LocalityBackfillProgress)> {
89 self.per_vnode
90 .iter()
91 .map(|(&vnode, progress)| (vnode, progress))
92 }
93
94 fn has_progress(&self) -> bool {
95 self.per_vnode
96 .values()
97 .any(|progress| matches!(progress, LocalityBackfillProgress::InProgress { .. }))
98 }
99
100 fn update_progress(&mut self, vnode: VirtualNode, new_pos: OwnedRow, row_count_delta: u64) {
101 let progress = self.per_vnode.get_mut(&vnode).unwrap();
102 match progress {
103 LocalityBackfillProgress::NotStarted => {
104 *progress = LocalityBackfillProgress::InProgress {
105 current_pos: new_pos,
106 processed_rows: row_count_delta,
107 };
108 }
109 LocalityBackfillProgress::InProgress { processed_rows, .. } => {
110 *progress = LocalityBackfillProgress::InProgress {
111 current_pos: new_pos,
112 processed_rows: *processed_rows + row_count_delta,
113 };
114 }
115 LocalityBackfillProgress::Completed { .. } => {
116 }
118 }
119 self.total_snapshot_rows += row_count_delta;
120 }
121
122 fn finish_vnode(&mut self, vnode: VirtualNode, pk_len: usize) {
123 let progress = self.per_vnode.get_mut(&vnode).unwrap();
124 match progress {
125 LocalityBackfillProgress::NotStarted => {
126 let final_pos = OwnedRow::new(vec![None; pk_len]);
128 *progress = LocalityBackfillProgress::Completed {
129 final_pos,
130 total_rows: 0,
131 };
132 }
133 LocalityBackfillProgress::InProgress {
134 current_pos,
135 processed_rows,
136 } => {
137 *progress = LocalityBackfillProgress::Completed {
138 final_pos: current_pos.clone(),
139 total_rows: *processed_rows,
140 };
141 }
142 LocalityBackfillProgress::Completed { .. } => {
143 }
145 }
146 }
147
148 fn get_progress(&self, vnode: &VirtualNode) -> &LocalityBackfillProgress {
149 self.per_vnode.get(vnode).unwrap()
150 }
151}
152
153pub struct LocalityProviderExecutor<S: StateStore> {
165 upstream: Executor,
167
168 #[allow(dead_code)]
170 locality_columns: Vec<usize>,
171
172 state_table: StateTable<S>,
174
175 progress_table: StateTable<S>,
177
178 input_schema: Schema,
179
180 progress: CreateMviewProgressReporter,
182
183 fragment_id: FragmentId,
184
185 actor_id: ActorId,
186
187 metrics: Arc<StreamingMetrics>,
189
190 chunk_size: usize,
192}
193
194impl<S: StateStore> LocalityProviderExecutor<S> {
195 #[allow(clippy::too_many_arguments)]
196 pub fn new(
197 upstream: Executor,
198 locality_columns: Vec<usize>,
199 state_table: StateTable<S>,
200 progress_table: StateTable<S>,
201 input_schema: Schema,
202 progress: CreateMviewProgressReporter,
203 metrics: Arc<StreamingMetrics>,
204 chunk_size: usize,
205 fragment_id: FragmentId,
206 ) -> Self {
207 Self {
208 upstream,
209 locality_columns,
210 state_table,
211 progress_table,
212 input_schema,
213 actor_id: progress.actor_id(),
214 progress,
215 metrics,
216 chunk_size,
217 fragment_id,
218 }
219 }
220
221 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
223 async fn make_snapshot_stream<'a>(
224 state_table: &'a StateTable<S>,
225 backfill_state: LocalityBackfillState,
226 ) {
227 for vnode in state_table.vnodes().iter_vnodes() {
229 let progress = backfill_state.get_progress(&vnode);
230
231 let current_pos = match progress {
232 LocalityBackfillProgress::NotStarted => None,
233 LocalityBackfillProgress::Completed { .. } => {
234 continue;
236 }
237 LocalityBackfillProgress::InProgress { current_pos, .. } => {
238 Some(current_pos.clone())
239 }
240 };
241
242 let range_bounds = if let Some(ref pos) = current_pos {
244 let start_bound = std::ops::Bound::Excluded(pos.as_inner());
245 (start_bound, std::ops::Bound::<&[Datum]>::Unbounded)
246 } else {
247 (
248 std::ops::Bound::<&[Datum]>::Unbounded,
249 std::ops::Bound::<&[Datum]>::Unbounded,
250 )
251 };
252
253 let iter = state_table
255 .iter_with_vnode(
256 vnode,
257 &range_bounds,
258 PrefetchOptions::prefetch_for_small_range_scan(),
259 )
260 .await?;
261 pin_mut!(iter);
262
263 while let Some(row) = iter.try_next().await? {
264 yield Some((vnode, row));
265 }
266 }
267
268 yield None;
270 }
271
272 async fn persist_backfill_state(
274 progress_table: &mut StateTable<S>,
275 backfill_state: &LocalityBackfillState,
276 ) -> StreamExecutorResult<()> {
277 for (vnode, progress) in &backfill_state.per_vnode {
278 let (is_finished, current_pos, row_count) = match progress {
279 LocalityBackfillProgress::NotStarted => continue, LocalityBackfillProgress::InProgress {
281 current_pos,
282 processed_rows,
283 } => (false, current_pos.clone(), *processed_rows),
284 LocalityBackfillProgress::Completed {
285 final_pos,
286 total_rows,
287 } => (true, final_pos.clone(), *total_rows),
288 };
289
290 let mut row_data = vec![Some(vnode.to_scalar().into())];
292 row_data.extend(current_pos);
293 row_data.push(Some(risingwave_common::types::ScalarImpl::Bool(
294 is_finished,
295 )));
296 row_data.push(Some(risingwave_common::types::ScalarImpl::Int64(
297 row_count as i64,
298 )));
299
300 let new_row = OwnedRow::new(row_data);
301
302 let key_data = vec![Some(vnode.to_scalar().into())];
305 let key = OwnedRow::new(key_data);
306
307 if let Some(existing_row) = progress_table.get_row(&key).await? {
308 progress_table.update(existing_row, new_row);
310 } else {
311 progress_table.insert(new_row);
313 }
314 }
315 Ok(())
316 }
317
318 async fn load_backfill_state(
320 progress_table: &StateTable<S>,
321 ) -> StreamExecutorResult<LocalityBackfillState> {
322 let mut backfill_state = LocalityBackfillState::new(progress_table.vnodes().iter_vnodes());
323 let mut total_snapshot_rows = 0;
324
325 for vnode in progress_table.vnodes().iter_vnodes() {
327 let key_data = vec![Some(vnode.to_scalar().into())];
329
330 let key = OwnedRow::new(key_data);
331
332 if let Some(row) = progress_table.get_row(&key).await? {
333 let finished_col_idx = row.len() - 2;
335 let is_finished = row
336 .datum_at(finished_col_idx)
337 .map(|d| d.into_bool())
338 .unwrap_or(false);
339
340 let row_count = row
342 .datum_at(row.len() - 1)
343 .map(|d| d.into_int64() as u64)
344 .unwrap_or(0);
345
346 let current_pos_data: Vec<Datum> = (1..finished_col_idx)
347 .map(|i| row.datum_at(i).to_owned_datum())
348 .collect();
349 let current_pos = OwnedRow::new(current_pos_data);
350
351 let progress = if is_finished {
353 LocalityBackfillProgress::Completed {
354 final_pos: current_pos,
355 total_rows: row_count,
356 }
357 } else {
358 LocalityBackfillProgress::InProgress {
359 current_pos,
360 processed_rows: row_count,
361 }
362 };
363
364 backfill_state.per_vnode.insert(vnode, progress);
365 total_snapshot_rows += row_count;
366 }
367 }
369
370 backfill_state.total_snapshot_rows = total_snapshot_rows;
371 Ok(backfill_state)
372 }
373
374 fn mark_chunk(
376 chunk: StreamChunk,
377 backfill_state: &LocalityBackfillState,
378 state_table: &StateTable<S>,
379 ) -> StreamExecutorResult<StreamChunk> {
380 let chunk = chunk.compact_vis();
381 let (data, ops) = chunk.into_parts();
382 let mut new_visibility = risingwave_common::bitmap::BitmapBuilder::with_capacity(ops.len());
383
384 let pk_indices = state_table.pk_indices();
385 let pk_order = state_table.pk_serde().get_order_types();
386
387 for row in data.rows() {
388 let pk = row.project(pk_indices);
390 let vnode = state_table.compute_vnode_by_pk(pk);
391
392 let visible = match backfill_state.get_progress(&vnode) {
393 LocalityBackfillProgress::Completed { .. } => true,
394 LocalityBackfillProgress::NotStarted => false,
395 LocalityBackfillProgress::InProgress { current_pos, .. } => {
396 cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
398 }
399 };
400
401 new_visibility.append(visible);
402 }
403
404 let (columns, _) = data.into_parts();
405 let chunk = StreamChunk::with_visibility(ops, columns, new_visibility.finish());
406 Ok(chunk)
407 }
408
409 fn handle_snapshot_chunk(
410 data_chunk: DataChunk,
411 vnode: VirtualNode,
412 pk_indices: &[usize],
413 backfill_state: &mut LocalityBackfillState,
414 cur_barrier_snapshot_processed_rows: &mut u64,
415 ) -> StreamExecutorResult<StreamChunk> {
416 let chunk = StreamChunk::from_parts(vec![Op::Insert; data_chunk.cardinality()], data_chunk);
417 let chunk_cardinality = chunk.cardinality() as u64;
418
419 if let Some(last_row) = chunk.rows().last() {
422 let pk = last_row.1.project(pk_indices);
423 let pk_owned = pk.into_owned_row();
424 backfill_state.update_progress(vnode, pk_owned, chunk_cardinality);
425 }
426
427 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
428 Ok(chunk)
429 }
430}
431
432impl<S: StateStore> Execute for LocalityProviderExecutor<S> {
433 fn execute(self: Box<Self>) -> BoxedMessageStream {
434 self.execute_inner().boxed()
435 }
436}
437
438impl<S: StateStore> LocalityProviderExecutor<S> {
439 #[try_stream(ok = Message, error = StreamExecutorError)]
440 async fn execute_inner(mut self) {
441 let mut upstream = self.upstream.execute();
442
443 let first_barrier = expect_first_barrier(&mut upstream).await?;
445 let first_epoch = first_barrier.epoch;
446
447 yield Message::Barrier(first_barrier);
449
450 let mut state_table = self.state_table;
451 let mut progress_table = self.progress_table;
452
453 state_table.init_epoch(first_epoch).await?;
455 progress_table.init_epoch(first_epoch).await?;
456
457 let mut backfill_state = Self::load_backfill_state(&progress_table).await?;
459
460 let pk_indices = state_table.pk_indices().iter().cloned().collect_vec();
462
463 let need_backfill = !backfill_state.is_completed();
464
465 let need_buffering = backfill_state
466 .per_vnode
467 .values()
468 .all(|progress| matches!(progress, LocalityBackfillProgress::NotStarted));
469 let mut buffered_rows: u64 = 0;
470 if need_buffering {
472 let mut start_backfill = false;
474
475 #[for_await]
476 for msg in upstream.by_ref() {
477 let msg = msg?;
478
479 match msg {
480 Message::Watermark(_) => {
481 }
483 Message::Chunk(chunk) => {
484 buffered_rows += chunk.cardinality() as u64;
485 state_table.write_chunk(chunk);
486 state_table.try_flush().await?;
487 }
488 Message::Barrier(barrier) => {
489 let epoch = barrier.epoch;
490
491 if let Some(mutation) = barrier.mutation.as_deref() {
493 use crate::executor::Mutation;
494 if let Mutation::StartFragmentBackfill { fragment_ids } = mutation
495 && fragment_ids.contains(&self.fragment_id)
496 {
497 tracing::info!(
498 "Start backfill of locality provider with fragment id: {:?}",
499 &self.fragment_id
500 );
501 start_backfill = true;
502 }
503 }
504
505 self.progress.update_with_buffered_rows(
506 barrier.epoch,
507 barrier.epoch.curr,
508 0,
509 buffered_rows,
510 );
511
512 let post_commit1 = state_table.commit(epoch).await?;
514 let post_commit2 = progress_table.commit(epoch).await?;
515
516 yield Message::Barrier(barrier);
517 post_commit1.post_yield_barrier(None).await?;
518 post_commit2.post_yield_barrier(None).await?;
519
520 if start_backfill {
522 break;
523 }
524 }
525 }
526 }
527 }
528
529 if need_backfill {
555 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
556 let mut pending_barrier: Option<Barrier> = None;
557
558 let metrics = self
559 .metrics
560 .new_backfill_metrics(state_table.table_id(), self.actor_id);
561
562 let snapshot_data_types = self.input_schema.data_types();
564 let mut builders: Builders = state_table
565 .vnodes()
566 .iter_vnodes()
567 .map(|vnode| {
568 let builder = create_builder(
569 RateLimit::Disabled,
570 self.chunk_size,
571 snapshot_data_types.clone(),
572 );
573 (vnode, builder)
574 })
575 .collect();
576
577 'backfill_loop: loop {
578 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
579 let mut cur_barrier_upstream_processed_rows: u64 = 0;
580
581 {
583 let left_upstream = upstream.by_ref().map(Either::Left);
584 let right_snapshot = pin!(
585 Self::make_snapshot_stream(&state_table, backfill_state.clone(),)
586 .map(Either::Right)
587 );
588
589 let mut backfill_stream =
592 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
593 stream::PollNext::Left
594 });
595
596 #[for_await]
597 for either in &mut backfill_stream {
598 match either {
599 Either::Left(msg) => {
601 match msg? {
602 Message::Barrier(barrier) => {
603 pending_barrier = Some(barrier);
605 break;
606 }
607 Message::Chunk(chunk) => {
608 upstream_chunk_buffer.push(chunk.compact_vis());
610 }
611 Message::Watermark(_) => {
612 }
614 }
615 }
616 Either::Right(msg) => {
618 match msg? {
619 None => {
620 for (vnode, builder) in &mut builders {
623 if let Some(data_chunk) = builder.consume_all() {
624 let chunk = Self::handle_snapshot_chunk(
625 data_chunk,
626 *vnode,
627 &pk_indices,
628 &mut backfill_state,
629 &mut cur_barrier_snapshot_processed_rows,
630 )?;
631 yield Message::Chunk(chunk);
632 }
633 }
634
635 for chunk in upstream_chunk_buffer.drain(..) {
637 let chunk_cardinality = chunk.cardinality() as u64;
638 cur_barrier_upstream_processed_rows +=
639 chunk_cardinality;
640 yield Message::Chunk(chunk);
641 }
642 metrics
643 .backfill_snapshot_read_row_count
644 .inc_by(cur_barrier_snapshot_processed_rows);
645 metrics
646 .backfill_upstream_output_row_count
647 .inc_by(cur_barrier_upstream_processed_rows);
648 break 'backfill_loop;
649 }
650 Some((vnode, row)) => {
651 let builder = builders.get_mut(&vnode).unwrap();
653 if let Some(data_chunk) = builder.append_one_row(row) {
654 let chunk = Self::handle_snapshot_chunk(
656 data_chunk,
657 vnode,
658 &pk_indices,
659 &mut backfill_state,
660 &mut cur_barrier_snapshot_processed_rows,
661 )?;
662 yield Message::Chunk(chunk);
663 }
664 }
667 }
668 }
669 }
670 }
671 }
672
673 let barrier = match pending_barrier.take() {
675 Some(barrier) => barrier,
676 None => break 'backfill_loop, };
678
679 for (vnode, builder) in &mut builders {
681 if let Some(data_chunk) = builder.consume_all() {
682 let chunk = Self::handle_snapshot_chunk(
683 data_chunk,
684 *vnode,
685 &pk_indices,
686 &mut backfill_state,
687 &mut cur_barrier_snapshot_processed_rows,
688 )?;
689 yield Message::Chunk(chunk);
690 }
691 }
692
693 for chunk in upstream_chunk_buffer.drain(..) {
695 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
696
697 if backfill_state.has_progress() {
699 let marked_chunk =
700 Self::mark_chunk(chunk.clone(), &backfill_state, &state_table)?;
701 yield Message::Chunk(marked_chunk);
702 }
703 }
704
705 state_table
707 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
708 .await?;
709
710 let total_snapshot_processed_rows: u64 = backfill_state
713 .vnodes()
714 .map(|(_, progress)| match *progress {
715 LocalityBackfillProgress::InProgress { processed_rows, .. } => {
716 processed_rows
717 }
718 LocalityBackfillProgress::Completed { total_rows, .. } => total_rows,
719 LocalityBackfillProgress::NotStarted => 0,
720 })
721 .sum();
722
723 self.progress.update_with_buffered_rows(
724 barrier.epoch,
725 barrier.epoch.curr, total_snapshot_processed_rows,
727 buffered_rows,
728 );
729
730 Self::persist_backfill_state(&mut progress_table, &backfill_state).await?;
732 let barrier_epoch = barrier.epoch;
733 let post_commit = progress_table.commit(barrier_epoch).await?;
734
735 metrics
736 .backfill_snapshot_read_row_count
737 .inc_by(cur_barrier_snapshot_processed_rows);
738 metrics
739 .backfill_upstream_output_row_count
740 .inc_by(cur_barrier_upstream_processed_rows);
741
742 yield Message::Barrier(barrier);
743 post_commit.post_yield_barrier(None).await?;
744 }
745 }
746
747 tracing::debug!("Locality provider backfill finished, forwarding upstream directly");
748
749 if need_backfill && !backfill_state.is_completed() {
751 while let Some(Ok(msg)) = upstream.next().await {
752 match msg {
753 Message::Barrier(barrier) => {
754 state_table
756 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
757 .await?;
758
759 for vnode in state_table.vnodes().iter_vnodes() {
761 backfill_state.finish_vnode(vnode, pk_indices.len());
762 }
763
764 let total_snapshot_processed_rows: u64 = backfill_state
766 .vnodes()
767 .map(|(_, progress)| match *progress {
768 LocalityBackfillProgress::Completed { total_rows, .. } => {
769 total_rows
770 }
771 LocalityBackfillProgress::InProgress { processed_rows, .. } => {
772 processed_rows
773 }
774 LocalityBackfillProgress::NotStarted => 0,
775 })
776 .sum();
777
778 self.progress.finish_with_buffered_rows(
781 barrier.epoch,
782 total_snapshot_processed_rows,
783 buffered_rows,
784 );
785
786 Self::persist_backfill_state(&mut progress_table, &backfill_state).await?;
788 let post_commit = progress_table.commit(barrier.epoch).await?;
789
790 yield Message::Barrier(barrier);
791 post_commit.post_yield_barrier(None).await?;
792 break; }
794 Message::Chunk(chunk) => {
795 yield Message::Chunk(chunk);
797 }
798 Message::Watermark(watermark) => {
799 yield Message::Watermark(watermark);
801 }
802 }
803 }
804 }
805
806 #[for_await]
808 for msg in upstream {
809 let msg = msg?;
810
811 match msg {
812 Message::Barrier(barrier) => {
813 state_table
815 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
816 .await?;
817 progress_table
818 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
819 .await?;
820 yield Message::Barrier(barrier);
821 }
822 _ => {
823 yield msg;
825 }
826 }
827 }
828 }
829}