1use std::collections::HashMap;
16use std::sync::Arc;
17
18use either::Either;
19use futures::stream::select_with_strategy;
20use futures::{TryStreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::{DataChunk, Op, StreamChunk};
24use risingwave_common::catalog::Schema;
25use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::types::{Datum, ToOwnedDatum};
28use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
29use risingwave_common::util::sort_util::cmp_datum_iter;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_storage::StateStore;
32use risingwave_storage::store::PrefetchOptions;
33
34use crate::common::table::state_table::StateTable;
35use crate::executor::backfill::utils::create_builder;
36use crate::executor::prelude::*;
37use crate::task::{CreateMviewProgressReporter, FragmentId};
38
39type Builders = HashMap<VirtualNode, DataChunkBuilder>;
40
41#[derive(Clone, Debug, PartialEq, Eq)]
43enum LocalityBackfillProgress {
44 NotStarted,
46 InProgress {
48 current_pos: OwnedRow,
50 processed_rows: u64,
52 },
53 Completed {
55 final_pos: OwnedRow,
57 total_rows: u64,
59 },
60}
61
62#[derive(Clone, Debug)]
64struct LocalityBackfillState {
65 per_vnode: HashMap<VirtualNode, LocalityBackfillProgress>,
67 total_snapshot_rows: u64,
69}
70
71impl LocalityBackfillState {
72 fn new(vnodes: impl Iterator<Item = VirtualNode>) -> Self {
73 let per_vnode = vnodes
74 .map(|vnode| (vnode, LocalityBackfillProgress::NotStarted))
75 .collect();
76 Self {
77 per_vnode,
78 total_snapshot_rows: 0,
79 }
80 }
81
82 fn is_completed(&self) -> bool {
83 self.per_vnode
84 .values()
85 .all(|progress| matches!(progress, LocalityBackfillProgress::Completed { .. }))
86 }
87
88 fn vnodes(&self) -> impl Iterator<Item = (VirtualNode, &LocalityBackfillProgress)> {
89 self.per_vnode
90 .iter()
91 .map(|(&vnode, progress)| (vnode, progress))
92 }
93
94 fn has_progress(&self) -> bool {
95 self.per_vnode
96 .values()
97 .any(|progress| matches!(progress, LocalityBackfillProgress::InProgress { .. }))
98 }
99
100 fn update_progress(&mut self, vnode: VirtualNode, new_pos: OwnedRow, row_count_delta: u64) {
101 let progress = self.per_vnode.get_mut(&vnode).unwrap();
102 match progress {
103 LocalityBackfillProgress::NotStarted => {
104 *progress = LocalityBackfillProgress::InProgress {
105 current_pos: new_pos,
106 processed_rows: row_count_delta,
107 };
108 }
109 LocalityBackfillProgress::InProgress { processed_rows, .. } => {
110 *progress = LocalityBackfillProgress::InProgress {
111 current_pos: new_pos,
112 processed_rows: *processed_rows + row_count_delta,
113 };
114 }
115 LocalityBackfillProgress::Completed { .. } => {
116 }
118 }
119 self.total_snapshot_rows += row_count_delta;
120 }
121
122 fn finish_vnode(&mut self, vnode: VirtualNode, pk_len: usize) {
123 let progress = self.per_vnode.get_mut(&vnode).unwrap();
124 match progress {
125 LocalityBackfillProgress::NotStarted => {
126 let final_pos = OwnedRow::new(vec![None; pk_len]);
128 *progress = LocalityBackfillProgress::Completed {
129 final_pos,
130 total_rows: 0,
131 };
132 }
133 LocalityBackfillProgress::InProgress {
134 current_pos,
135 processed_rows,
136 } => {
137 *progress = LocalityBackfillProgress::Completed {
138 final_pos: current_pos.clone(),
139 total_rows: *processed_rows,
140 };
141 }
142 LocalityBackfillProgress::Completed { .. } => {
143 }
145 }
146 }
147
148 fn get_progress(&self, vnode: &VirtualNode) -> &LocalityBackfillProgress {
149 self.per_vnode.get(vnode).unwrap()
150 }
151}
152
153pub struct LocalityProviderExecutor<S: StateStore> {
165 upstream: Executor,
167
168 #[allow(dead_code)]
170 locality_columns: Vec<usize>,
171
172 state_table: StateTable<S>,
174
175 progress_table: StateTable<S>,
177
178 input_schema: Schema,
179
180 progress: CreateMviewProgressReporter,
182
183 fragment_id: FragmentId,
184
185 actor_id: ActorId,
186
187 metrics: Arc<StreamingMetrics>,
189
190 chunk_size: usize,
192}
193
194impl<S: StateStore> LocalityProviderExecutor<S> {
195 #[allow(clippy::too_many_arguments)]
196 pub fn new(
197 upstream: Executor,
198 locality_columns: Vec<usize>,
199 state_table: StateTable<S>,
200 progress_table: StateTable<S>,
201 input_schema: Schema,
202 progress: CreateMviewProgressReporter,
203 metrics: Arc<StreamingMetrics>,
204 chunk_size: usize,
205 fragment_id: FragmentId,
206 ) -> Self {
207 Self {
208 upstream,
209 locality_columns,
210 state_table,
211 progress_table,
212 input_schema,
213 actor_id: progress.actor_id(),
214 progress,
215 metrics,
216 chunk_size,
217 fragment_id,
218 }
219 }
220
221 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
223 async fn make_snapshot_stream<'a>(
224 state_table: &'a StateTable<S>,
225 backfill_state: LocalityBackfillState,
226 ) {
227 for vnode in state_table.vnodes().iter_vnodes() {
229 let progress = backfill_state.get_progress(&vnode);
230
231 let current_pos = match progress {
232 LocalityBackfillProgress::NotStarted => None,
233 LocalityBackfillProgress::Completed { .. } => {
234 continue;
236 }
237 LocalityBackfillProgress::InProgress { current_pos, .. } => {
238 Some(current_pos.clone())
239 }
240 };
241
242 let range_bounds = if let Some(ref pos) = current_pos {
244 let start_bound = std::ops::Bound::Excluded(pos.as_inner());
245 (start_bound, std::ops::Bound::<&[Datum]>::Unbounded)
246 } else {
247 (
248 std::ops::Bound::<&[Datum]>::Unbounded,
249 std::ops::Bound::<&[Datum]>::Unbounded,
250 )
251 };
252
253 let iter = state_table
255 .iter_with_vnode(
256 vnode,
257 &range_bounds,
258 PrefetchOptions::prefetch_for_small_range_scan(),
259 )
260 .await?;
261 pin_mut!(iter);
262
263 while let Some(row) = iter.try_next().await? {
264 yield Some((vnode, row));
265 }
266 }
267
268 yield None;
270 }
271
272 async fn persist_backfill_state(
274 progress_table: &mut StateTable<S>,
275 backfill_state: &LocalityBackfillState,
276 ) -> StreamExecutorResult<()> {
277 for (vnode, progress) in &backfill_state.per_vnode {
278 let (is_finished, current_pos, row_count) = match progress {
279 LocalityBackfillProgress::NotStarted => continue, LocalityBackfillProgress::InProgress {
281 current_pos,
282 processed_rows,
283 } => (false, current_pos.clone(), *processed_rows),
284 LocalityBackfillProgress::Completed {
285 final_pos,
286 total_rows,
287 } => (true, final_pos.clone(), *total_rows),
288 };
289
290 let mut row_data = vec![Some(vnode.to_scalar().into())];
292 row_data.extend(current_pos);
293 row_data.push(Some(risingwave_common::types::ScalarImpl::Bool(
294 is_finished,
295 )));
296 row_data.push(Some(risingwave_common::types::ScalarImpl::Int64(
297 row_count as i64,
298 )));
299
300 let new_row = OwnedRow::new(row_data);
301
302 let key_data = vec![Some(vnode.to_scalar().into())];
305 let key = OwnedRow::new(key_data);
306
307 if let Some(existing_row) = progress_table.get_row(&key).await? {
308 progress_table.update(existing_row, new_row);
310 } else {
311 progress_table.insert(new_row);
313 }
314 }
315 Ok(())
316 }
317
318 async fn load_backfill_state(
320 progress_table: &StateTable<S>,
321 ) -> StreamExecutorResult<LocalityBackfillState> {
322 let mut backfill_state = LocalityBackfillState::new(progress_table.vnodes().iter_vnodes());
323 let mut total_snapshot_rows = 0;
324
325 for vnode in progress_table.vnodes().iter_vnodes() {
327 let key_data = vec![Some(vnode.to_scalar().into())];
329
330 let key = OwnedRow::new(key_data);
331
332 if let Some(row) = progress_table.get_row(&key).await? {
333 let finished_col_idx = row.len() - 2;
335 let is_finished = row
336 .datum_at(finished_col_idx)
337 .map(|d| d.into_bool())
338 .unwrap_or(false);
339
340 let row_count = row
342 .datum_at(row.len() - 1)
343 .map(|d| d.into_int64() as u64)
344 .unwrap_or(0);
345
346 let current_pos_data: Vec<Datum> = (1..finished_col_idx)
347 .map(|i| row.datum_at(i).to_owned_datum())
348 .collect();
349 let current_pos = OwnedRow::new(current_pos_data);
350
351 let progress = if is_finished {
353 LocalityBackfillProgress::Completed {
354 final_pos: current_pos,
355 total_rows: row_count,
356 }
357 } else {
358 LocalityBackfillProgress::InProgress {
359 current_pos,
360 processed_rows: row_count,
361 }
362 };
363
364 backfill_state.per_vnode.insert(vnode, progress);
365 total_snapshot_rows += row_count;
366 }
367 }
369
370 backfill_state.total_snapshot_rows = total_snapshot_rows;
371 Ok(backfill_state)
372 }
373
374 fn mark_chunk(
376 chunk: StreamChunk,
377 backfill_state: &LocalityBackfillState,
378 state_table: &StateTable<S>,
379 ) -> StreamExecutorResult<StreamChunk> {
380 let chunk = chunk.compact();
381 let (data, ops) = chunk.into_parts();
382 let mut new_visibility = risingwave_common::bitmap::BitmapBuilder::with_capacity(ops.len());
383
384 let pk_indices = state_table.pk_indices();
385 let pk_order = state_table.pk_serde().get_order_types();
386
387 for row in data.rows() {
388 let pk = row.project(pk_indices);
390 let vnode = state_table.compute_vnode_by_pk(pk);
391
392 let visible = match backfill_state.get_progress(&vnode) {
393 LocalityBackfillProgress::Completed { .. } => true,
394 LocalityBackfillProgress::NotStarted => false,
395 LocalityBackfillProgress::InProgress { current_pos, .. } => {
396 cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
398 }
399 };
400
401 new_visibility.append(visible);
402 }
403
404 let (columns, _) = data.into_parts();
405 let chunk = StreamChunk::with_visibility(ops, columns, new_visibility.finish());
406 Ok(chunk)
407 }
408
409 fn handle_snapshot_chunk(
410 data_chunk: DataChunk,
411 vnode: VirtualNode,
412 pk_indices: &[usize],
413 backfill_state: &mut LocalityBackfillState,
414 cur_barrier_snapshot_processed_rows: &mut u64,
415 ) -> StreamExecutorResult<StreamChunk> {
416 let chunk = StreamChunk::from_parts(vec![Op::Insert; data_chunk.cardinality()], data_chunk);
417 let chunk_cardinality = chunk.cardinality() as u64;
418
419 if let Some(last_row) = chunk.rows().last() {
422 let pk = last_row.1.project(pk_indices);
423 let pk_owned = pk.into_owned_row();
424 backfill_state.update_progress(vnode, pk_owned, chunk_cardinality);
425 }
426
427 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
428 Ok(chunk)
429 }
430}
431
432impl<S: StateStore> Execute for LocalityProviderExecutor<S> {
433 fn execute(self: Box<Self>) -> BoxedMessageStream {
434 self.execute_inner().boxed()
435 }
436}
437
438impl<S: StateStore> LocalityProviderExecutor<S> {
439 #[try_stream(ok = Message, error = StreamExecutorError)]
440 async fn execute_inner(mut self) {
441 let mut upstream = self.upstream.execute();
442
443 let first_barrier = expect_first_barrier(&mut upstream).await?;
445 let first_epoch = first_barrier.epoch;
446
447 yield Message::Barrier(first_barrier);
449
450 let mut state_table = self.state_table;
451 let mut progress_table = self.progress_table;
452
453 state_table.init_epoch(first_epoch).await?;
455 progress_table.init_epoch(first_epoch).await?;
456
457 let mut backfill_state = Self::load_backfill_state(&progress_table).await?;
459
460 let pk_indices = state_table.pk_indices().iter().cloned().collect_vec();
462
463 let need_backfill = !backfill_state.is_completed();
464
465 let need_buffering = backfill_state
466 .per_vnode
467 .values()
468 .all(|progress| matches!(progress, LocalityBackfillProgress::NotStarted));
469
470 if need_buffering {
472 let mut start_backfill = false;
474
475 #[for_await]
476 for msg in upstream.by_ref() {
477 let msg = msg?;
478
479 match msg {
480 Message::Watermark(_) => {
481 }
483 Message::Chunk(chunk) => {
484 state_table.write_chunk(chunk);
485 state_table.try_flush().await?;
486 }
487 Message::Barrier(barrier) => {
488 let epoch = barrier.epoch;
489
490 if let Some(mutation) = barrier.mutation.as_deref() {
492 use crate::executor::Mutation;
493 if let Mutation::StartFragmentBackfill { fragment_ids } = mutation
494 && fragment_ids.contains(&self.fragment_id)
495 {
496 tracing::info!(
497 "Start backfill of locality provider with fragment id: {:?}",
498 &self.fragment_id
499 );
500 start_backfill = true;
501 }
502 }
503
504 let post_commit1 = state_table.commit(epoch).await?;
506 let post_commit2 = progress_table.commit(epoch).await?;
507
508 yield Message::Barrier(barrier);
509 post_commit1.post_yield_barrier(None).await?;
510 post_commit2.post_yield_barrier(None).await?;
511
512 if start_backfill {
514 break;
515 }
516 }
517 }
518 }
519 }
520
521 if need_backfill {
547 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
548 let mut pending_barrier: Option<Barrier> = None;
549
550 let metrics = self
551 .metrics
552 .new_backfill_metrics(state_table.table_id(), self.actor_id);
553
554 let snapshot_data_types = self.input_schema.data_types();
556 let mut builders: Builders = state_table
557 .vnodes()
558 .iter_vnodes()
559 .map(|vnode| {
560 let builder = create_builder(
561 RateLimit::Disabled,
562 self.chunk_size,
563 snapshot_data_types.clone(),
564 );
565 (vnode, builder)
566 })
567 .collect();
568
569 'backfill_loop: loop {
570 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
571 let mut cur_barrier_upstream_processed_rows: u64 = 0;
572
573 {
575 let left_upstream = upstream.by_ref().map(Either::Left);
576 let right_snapshot = pin!(
577 Self::make_snapshot_stream(&state_table, backfill_state.clone(),)
578 .map(Either::Right)
579 );
580
581 let mut backfill_stream =
584 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
585 stream::PollNext::Left
586 });
587
588 #[for_await]
589 for either in &mut backfill_stream {
590 match either {
591 Either::Left(msg) => {
593 match msg? {
594 Message::Barrier(barrier) => {
595 pending_barrier = Some(barrier);
597 break;
598 }
599 Message::Chunk(chunk) => {
600 upstream_chunk_buffer.push(chunk.compact());
602 }
603 Message::Watermark(_) => {
604 }
606 }
607 }
608 Either::Right(msg) => {
610 match msg? {
611 None => {
612 for (vnode, builder) in &mut builders {
615 if let Some(data_chunk) = builder.consume_all() {
616 let chunk = Self::handle_snapshot_chunk(
617 data_chunk,
618 *vnode,
619 &pk_indices,
620 &mut backfill_state,
621 &mut cur_barrier_snapshot_processed_rows,
622 )?;
623 yield Message::Chunk(chunk);
624 }
625 }
626
627 for chunk in upstream_chunk_buffer.drain(..) {
629 let chunk_cardinality = chunk.cardinality() as u64;
630 cur_barrier_upstream_processed_rows +=
631 chunk_cardinality;
632 yield Message::Chunk(chunk);
633 }
634 metrics
635 .backfill_snapshot_read_row_count
636 .inc_by(cur_barrier_snapshot_processed_rows);
637 metrics
638 .backfill_upstream_output_row_count
639 .inc_by(cur_barrier_upstream_processed_rows);
640 break 'backfill_loop;
641 }
642 Some((vnode, row)) => {
643 let builder = builders.get_mut(&vnode).unwrap();
645 if let Some(data_chunk) = builder.append_one_row(row) {
646 let chunk = Self::handle_snapshot_chunk(
648 data_chunk,
649 vnode,
650 &pk_indices,
651 &mut backfill_state,
652 &mut cur_barrier_snapshot_processed_rows,
653 )?;
654 yield Message::Chunk(chunk);
655 }
656 }
659 }
660 }
661 }
662 }
663 }
664
665 let barrier = match pending_barrier.take() {
667 Some(barrier) => barrier,
668 None => break 'backfill_loop, };
670
671 for (vnode, builder) in &mut builders {
673 if let Some(data_chunk) = builder.consume_all() {
674 let chunk = Self::handle_snapshot_chunk(
675 data_chunk,
676 *vnode,
677 &pk_indices,
678 &mut backfill_state,
679 &mut cur_barrier_snapshot_processed_rows,
680 )?;
681 yield Message::Chunk(chunk);
682 }
683 }
684
685 for chunk in upstream_chunk_buffer.drain(..) {
687 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
688
689 if backfill_state.has_progress() {
691 let marked_chunk =
692 Self::mark_chunk(chunk.clone(), &backfill_state, &state_table)?;
693 yield Message::Chunk(marked_chunk);
694 }
695 }
696
697 state_table
699 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
700 .await?;
701
702 let total_snapshot_processed_rows: u64 = backfill_state
704 .vnodes()
705 .map(|(_, progress)| match *progress {
706 LocalityBackfillProgress::InProgress { processed_rows, .. } => {
707 processed_rows
708 }
709 LocalityBackfillProgress::Completed { total_rows, .. } => total_rows,
710 LocalityBackfillProgress::NotStarted => 0,
711 })
712 .sum();
713
714 self.progress.update(
715 barrier.epoch,
716 barrier.epoch.curr, total_snapshot_processed_rows,
718 );
719
720 Self::persist_backfill_state(&mut progress_table, &backfill_state).await?;
722 let barrier_epoch = barrier.epoch;
723 let post_commit = progress_table.commit(barrier_epoch).await?;
724
725 metrics
726 .backfill_snapshot_read_row_count
727 .inc_by(cur_barrier_snapshot_processed_rows);
728 metrics
729 .backfill_upstream_output_row_count
730 .inc_by(cur_barrier_upstream_processed_rows);
731
732 yield Message::Barrier(barrier);
733 post_commit.post_yield_barrier(None).await?;
734 }
735 }
736
737 tracing::debug!("Locality provider backfill finished, forwarding upstream directly");
738
739 if need_backfill && !backfill_state.is_completed() {
741 while let Some(Ok(msg)) = upstream.next().await {
742 match msg {
743 Message::Barrier(barrier) => {
744 state_table
746 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
747 .await?;
748
749 for vnode in state_table.vnodes().iter_vnodes() {
751 backfill_state.finish_vnode(vnode, pk_indices.len());
752 }
753
754 let total_snapshot_processed_rows: u64 = backfill_state
756 .vnodes()
757 .map(|(_, progress)| match *progress {
758 LocalityBackfillProgress::Completed { total_rows, .. } => {
759 total_rows
760 }
761 LocalityBackfillProgress::InProgress { processed_rows, .. } => {
762 processed_rows
763 }
764 LocalityBackfillProgress::NotStarted => 0,
765 })
766 .sum();
767
768 self.progress
770 .finish(barrier.epoch, total_snapshot_processed_rows);
771
772 Self::persist_backfill_state(&mut progress_table, &backfill_state).await?;
774 let post_commit = progress_table.commit(barrier.epoch).await?;
775
776 yield Message::Barrier(barrier);
777 post_commit.post_yield_barrier(None).await?;
778 break; }
780 Message::Chunk(chunk) => {
781 yield Message::Chunk(chunk);
783 }
784 Message::Watermark(watermark) => {
785 yield Message::Watermark(watermark);
787 }
788 }
789 }
790 }
791
792 #[for_await]
794 for msg in upstream {
795 let msg = msg?;
796
797 match msg {
798 Message::Barrier(barrier) => {
799 state_table
801 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
802 .await?;
803 progress_table
804 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
805 .await?;
806 yield Message::Barrier(barrier);
807 }
808 _ => {
809 yield msg;
811 }
812 }
813 }
814 }
815}