risingwave_stream/executor/backfill/no_shuffle_backfill.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures::stream;
17use futures::stream::select_with_strategy;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::{bail, row};
22use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
23use risingwave_hummock_sdk::HummockReadEpoch;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::batch_table::BatchTable;
26
27use crate::executor::backfill::utils;
28use crate::executor::backfill::utils::{
29 METADATA_STATE_LEN, compute_bounds, construct_initial_finished_state, create_builder,
30 get_new_pos, mapping_chunk, mapping_message, mark_chunk,
31};
32use crate::executor::prelude::*;
33use crate::task::{CreateMviewProgressReporter, FragmentId};
34
35/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
36/// We can decode that into `BackfillState` on recovery.
37#[derive(Debug, Eq, PartialEq)]
38pub struct BackfillState {
39 current_pos: Option<OwnedRow>,
40 old_state: Option<Vec<Datum>>,
41 is_finished: bool,
42 row_count: u64,
43}
44
45/// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13).
46/// `BackfillExecutor` is used to create a materialized view on another materialized view.
47///
48/// It can only buffer chunks between two barriers instead of unbundled memory usage of
49/// `RearrangedChainExecutor`.
50///
51/// It uses the latest epoch to read the snapshot of the upstream mv during two barriers and all the
52/// `StreamChunk` of the snapshot read will forward to the downstream.
53///
54/// It uses `current_pos` to record the progress of the backfill (the pk of the upstream mv) and
55/// `current_pos` is initiated as an empty `Row`.
56///
57/// All upstream messages during the two barriers interval will be buffered and decide to forward or
58/// ignore based on the `current_pos` at the end of the later barrier. Once `current_pos` reaches
59/// the end of the upstream mv pk, the backfill would finish.
60///
61/// Notice:
62/// The pk we are talking about here refers to the storage primary key.
63/// We rely on the scheduler to schedule the `BackfillExecutor` together with the upstream mv/table
64/// in the same worker, so that we can read uncommitted data from the upstream table without
65/// waiting.
66pub struct BackfillExecutor<S: StateStore> {
67 /// Upstream table
68 upstream_table: BatchTable<S>,
69 /// Upstream with the same schema with the upstream table.
70 upstream: Executor,
71
72 /// Internal state table for persisting state of backfill state.
73 state_table: Option<StateTable<S>>,
74
75 /// The column indices need to be forwarded to the downstream from the upstream and table scan.
76 output_indices: Vec<usize>,
77
78 /// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
79 progress: CreateMviewProgressReporter,
80
81 actor_id: ActorId,
82
83 metrics: Arc<StreamingMetrics>,
84
85 chunk_size: usize,
86
87 rate_limiter: MonitoredRateLimiter,
88
89 /// Fragment id of the fragment this backfill node belongs to.
90 fragment_id: FragmentId,
91}
92
93impl<S> BackfillExecutor<S>
94where
95 S: StateStore,
96{
97 #[allow(clippy::too_many_arguments)]
98 pub fn new(
99 upstream_table: BatchTable<S>,
100 upstream: Executor,
101 state_table: Option<StateTable<S>>,
102 output_indices: Vec<usize>,
103 progress: CreateMviewProgressReporter,
104 metrics: Arc<StreamingMetrics>,
105 chunk_size: usize,
106 rate_limit: RateLimit,
107 fragment_id: FragmentId,
108 ) -> Self {
109 let actor_id = progress.actor_id();
110 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
111 Self {
112 upstream_table,
113 upstream,
114 state_table,
115 output_indices,
116 progress,
117 actor_id,
118 metrics,
119 chunk_size,
120 rate_limiter,
121 fragment_id,
122 }
123 }
124
125 #[try_stream(ok = Message, error = StreamExecutorError)]
126 async fn execute_inner(mut self) {
127 // The primary key columns.
128 // We receive a pruned chunk from the upstream table,
129 // which will only contain output columns of the scan on the upstream table.
130 // The pk indices specify the pk columns of the pruned chunk.
131 let pk_indices = self.upstream_table.pk_in_output_indices().unwrap();
132
133 let state_len = pk_indices.len() + METADATA_STATE_LEN;
134
135 let pk_order = self.upstream_table.pk_serializer().get_order_types();
136
137 let upstream_table_id = self.upstream_table.table_id().table_id;
138
139 let mut upstream = self.upstream.execute();
140
141 // Poll the upstream to get the first barrier.
142 let first_barrier = expect_first_barrier(&mut upstream).await?;
143 let mut global_pause = first_barrier.is_pause_on_startup();
144 let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
145 let first_epoch = first_barrier.epoch;
146 let init_epoch = first_barrier.epoch.prev;
147 // The first barrier message should be propagated.
148 yield Message::Barrier(first_barrier);
149
150 if let Some(state_table) = self.state_table.as_mut() {
151 state_table.init_epoch(first_epoch).await?;
152 }
153
154 let BackfillState {
155 mut current_pos,
156 is_finished,
157 row_count,
158 mut old_state,
159 } = Self::recover_backfill_state(self.state_table.as_ref(), pk_indices.len()).await?;
160 tracing::trace!(is_finished, row_count, "backfill state recovered");
161
162 let data_types = self.upstream_table.schema().data_types();
163
164 // Chunk builder will be instantiated with min(rate_limit, self.chunk_size) as the chunk's max size.
165 let mut builder = create_builder(
166 self.rate_limiter.rate_limit(),
167 self.chunk_size,
168 data_types.clone(),
169 );
170
171 // Use this buffer to construct state,
172 // which will then be persisted.
173 let mut current_state: Vec<Datum> = vec![None; state_len];
174
175 // If no need backfill, but state was still "unfinished" we need to finish it.
176 // So we just update the state + progress to meta at the next barrier to finish progress,
177 // and forward other messages.
178 //
179 // Reason for persisting on second barrier rather than first:
180 // We can't update meta with progress as finished until state_table
181 // has been updated.
182 // We also can't update state_table in first epoch, since state_table
183 // expects to have been initialized in previous epoch.
184
185 // The epoch used to snapshot read upstream mv.
186 let mut snapshot_read_epoch = init_epoch;
187
188 // Keep track of rows from the snapshot.
189 let mut total_snapshot_processed_rows: u64 = row_count;
190
191 // Backfill Algorithm:
192 //
193 // backfill_stream
194 // / \
195 // upstream snapshot
196 //
197 // We construct a backfill stream with upstream as its left input and mv snapshot read
198 // stream as its right input. When a chunk comes from upstream, we will buffer it.
199 //
200 // When a barrier comes from upstream:
201 // - Update the `snapshot_read_epoch`.
202 // - For each row of the upstream chunk buffer, forward it to downstream if its pk <=
203 // `current_pos`, otherwise ignore it.
204 // - reconstruct the whole backfill stream with upstream and new mv snapshot read stream
205 // with the `snapshot_read_epoch`.
206 //
207 // When a chunk comes from snapshot, we forward it to the downstream and raise
208 // `current_pos`.
209 //
210 // When we reach the end of the snapshot read stream, it means backfill has been
211 // finished.
212 //
213 // Once the backfill loop ends, we forward the upstream directly to the downstream.
214 if !is_finished {
215 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
216 let mut pending_barrier: Option<Barrier> = None;
217
218 let metrics = self
219 .metrics
220 .new_backfill_metrics(upstream_table_id, self.actor_id);
221
222 'backfill_loop: loop {
223 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
224 let mut cur_barrier_upstream_processed_rows: u64 = 0;
225 let mut snapshot_read_complete = false;
226 let mut has_snapshot_read = false;
227
228 // We should not buffer rows from previous epoch, else we can have duplicates.
229 assert!(upstream_chunk_buffer.is_empty());
230
231 {
232 let left_upstream = upstream.by_ref().map(Either::Left);
233 let paused = global_pause
234 || backfill_paused
235 || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
236 let right_snapshot = pin!(
237 Self::make_snapshot_stream(
238 &self.upstream_table,
239 snapshot_read_epoch,
240 current_pos.clone(),
241 paused,
242 &self.rate_limiter,
243 )
244 .map(Either::Right)
245 );
246
247 // Prefer to select upstream, so we can stop snapshot stream as soon as the
248 // barrier comes.
249 let mut backfill_stream =
250 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
251 stream::PollNext::Left
252 });
253
254 #[for_await]
255 for either in &mut backfill_stream {
256 match either {
257 // Upstream
258 Either::Left(msg) => {
259 match msg? {
260 Message::Barrier(barrier) => {
261 // We have to process barrier outside of the loop.
262 // This is because the backfill stream holds a mutable
263 // reference to our chunk builder.
264 // We want to create another mutable reference
265 // to flush remaining chunks from the chunk builder
266 // on barrier.
267 // Hence we break here and process it after this block.
268 pending_barrier = Some(barrier);
269 break;
270 }
271 Message::Chunk(chunk) => {
272 // Buffer the upstream chunk.
273 upstream_chunk_buffer.push(chunk.compact());
274 }
275 Message::Watermark(_) => {
276 // Ignore watermark during backfill.
277 }
278 }
279 }
280 // Snapshot read
281 Either::Right(msg) => {
282 has_snapshot_read = true;
283 match msg? {
284 None => {
285 // Consume remaining rows in the builder.
286 if let Some(data_chunk) = builder.consume_all() {
287 yield Message::Chunk(Self::handle_snapshot_chunk(
288 data_chunk,
289 &mut current_pos,
290 &mut cur_barrier_snapshot_processed_rows,
291 &mut total_snapshot_processed_rows,
292 &pk_indices,
293 &self.output_indices,
294 ));
295 }
296
297 // End of the snapshot read stream.
298 // We should not mark the chunk anymore,
299 // otherwise, we will ignore some rows
300 // in the buffer. Here we choose to never mark the chunk.
301 // Consume with the renaming stream buffer chunk without
302 // mark.
303 for chunk in upstream_chunk_buffer.drain(..) {
304 let chunk_cardinality = chunk.cardinality() as u64;
305 cur_barrier_upstream_processed_rows +=
306 chunk_cardinality;
307 yield Message::Chunk(mapping_chunk(
308 chunk,
309 &self.output_indices,
310 ));
311 }
312 metrics
313 .backfill_snapshot_read_row_count
314 .inc_by(cur_barrier_snapshot_processed_rows);
315 metrics
316 .backfill_upstream_output_row_count
317 .inc_by(cur_barrier_upstream_processed_rows);
318 break 'backfill_loop;
319 }
320 Some(record) => {
321 // Buffer the snapshot read row.
322 if let Some(data_chunk) = builder.append_one_row(record) {
323 yield Message::Chunk(Self::handle_snapshot_chunk(
324 data_chunk,
325 &mut current_pos,
326 &mut cur_barrier_snapshot_processed_rows,
327 &mut total_snapshot_processed_rows,
328 &pk_indices,
329 &self.output_indices,
330 ));
331 }
332 }
333 }
334 }
335 }
336 }
337
338 // Before processing barrier, if did not snapshot read,
339 // do a snapshot read first.
340 // This is so we don't lose the tombstone iteration progress.
341 // If paused, we also can't read any snapshot records.
342 if !has_snapshot_read && !paused {
343 assert!(
344 builder.is_empty(),
345 "Builder should be empty if no snapshot read"
346 );
347 let (_, snapshot) = backfill_stream.into_inner();
348 #[for_await]
349 for msg in snapshot {
350 let Either::Right(msg) = msg else {
351 bail!("BUG: snapshot_read contains upstream messages");
352 };
353 match msg? {
354 None => {
355 // End of the snapshot read stream.
356 // We let the barrier handling logic take care of upstream updates.
357 // But we still want to exit backfill loop, so we mark snapshot read complete.
358 snapshot_read_complete = true;
359 break;
360 }
361 Some(row) => {
362 let chunk = DataChunk::from_rows(&[row], &data_types);
363 yield Message::Chunk(Self::handle_snapshot_chunk(
364 chunk,
365 &mut current_pos,
366 &mut cur_barrier_snapshot_processed_rows,
367 &mut total_snapshot_processed_rows,
368 &pk_indices,
369 &self.output_indices,
370 ));
371 break;
372 }
373 }
374 }
375 }
376 }
377 // When we break out of inner backfill_stream loop, it means we have a barrier.
378 // If there are no updates and there are no snapshots left,
379 // we already finished backfill and should have exited the outer backfill loop.
380 let barrier = match pending_barrier.take() {
381 Some(barrier) => barrier,
382 None => bail!("BUG: current_backfill loop exited without a barrier"),
383 };
384
385 // Process barrier:
386 // - consume snapshot rows left in builder
387 // - consume upstream buffer chunk
388 // - switch snapshot
389
390 // Consume snapshot rows left in builder
391 let chunk = builder.consume_all();
392 if let Some(chunk) = chunk {
393 yield Message::Chunk(Self::handle_snapshot_chunk(
394 chunk,
395 &mut current_pos,
396 &mut cur_barrier_snapshot_processed_rows,
397 &mut total_snapshot_processed_rows,
398 &pk_indices,
399 &self.output_indices,
400 ));
401 }
402
403 // Consume upstream buffer chunk
404 // If no current_pos, means we did not process any snapshot
405 // yet. In that case
406 // we can just ignore the upstream buffer chunk, but still need to clean it.
407 if let Some(current_pos) = ¤t_pos {
408 for chunk in upstream_chunk_buffer.drain(..) {
409 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
410 yield Message::Chunk(mapping_chunk(
411 mark_chunk(chunk, current_pos, &pk_indices, pk_order),
412 &self.output_indices,
413 ));
414 }
415 } else {
416 upstream_chunk_buffer.clear()
417 }
418
419 metrics
420 .backfill_snapshot_read_row_count
421 .inc_by(cur_barrier_snapshot_processed_rows);
422 metrics
423 .backfill_upstream_output_row_count
424 .inc_by(cur_barrier_upstream_processed_rows);
425
426 // Update snapshot read epoch.
427 snapshot_read_epoch = barrier.epoch.prev;
428
429 self.progress.update(
430 barrier.epoch,
431 snapshot_read_epoch,
432 total_snapshot_processed_rows,
433 );
434
435 // Persist state on barrier
436 Self::persist_state(
437 barrier.epoch,
438 &mut self.state_table,
439 false,
440 ¤t_pos,
441 total_snapshot_processed_rows,
442 &mut old_state,
443 &mut current_state,
444 )
445 .await?;
446
447 tracing::trace!(
448 epoch = ?barrier.epoch,
449 ?current_pos,
450 total_snapshot_processed_rows,
451 "Backfill state persisted"
452 );
453
454 // Update snapshot read chunk builder.
455 if let Some(mutation) = barrier.mutation.as_deref() {
456 match mutation {
457 Mutation::Pause => {
458 global_pause = true;
459 }
460 Mutation::Resume => {
461 global_pause = false;
462 }
463 Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
464 if fragment_ids.contains(&self.fragment_id) {
465 backfill_paused = false;
466 }
467 }
468 Mutation::Throttle(actor_to_apply) => {
469 let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
470 if let Some(new_rate_limit) = new_rate_limit_entry {
471 let new_rate_limit = (*new_rate_limit).into();
472 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
473 if old_rate_limit != new_rate_limit {
474 tracing::info!(
475 old_rate_limit = ?old_rate_limit,
476 new_rate_limit = ?new_rate_limit,
477 upstream_table_id = upstream_table_id,
478 actor_id = self.actor_id,
479 "backfill rate limit changed",
480 );
481 // The builder is emptied above via `DataChunkBuilder::consume_all`.
482 assert!(
483 builder.is_empty(),
484 "builder should already be emptied"
485 );
486 builder = create_builder(
487 new_rate_limit,
488 self.chunk_size,
489 self.upstream_table.schema().data_types(),
490 );
491 }
492 }
493 }
494 _ => (),
495 }
496 }
497
498 yield Message::Barrier(barrier);
499
500 if snapshot_read_complete {
501 break 'backfill_loop;
502 }
503
504 // We will switch snapshot at the start of the next iteration of the backfill loop.
505 }
506 }
507
508 tracing::trace!("Backfill has finished, waiting for barrier");
509
510 // Wait for first barrier to come after backfill is finished.
511 // So we can update our progress + persist the status.
512 while let Some(Ok(msg)) = upstream.next().await {
513 if let Some(msg) = mapping_message(msg, &self.output_indices) {
514 // If not finished then we need to update state, otherwise no need.
515 if let Message::Barrier(barrier) = &msg {
516 if is_finished {
517 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
518 if let Some(table) = &mut self.state_table {
519 table
520 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
521 .await?;
522 }
523 } else {
524 // If snapshot was empty, we do not need to backfill,
525 // but we still need to persist the finished state.
526 // We currently persist it on the second barrier here rather than first.
527 // This is because we can't update state table in first epoch,
528 // since it expects to have been initialized in previous epoch
529 // (there's no epoch before the first epoch).
530 if current_pos.is_none() {
531 current_pos = Some(construct_initial_finished_state(pk_indices.len()))
532 }
533
534 // We will update current_pos at least once,
535 // since snapshot read has to be non-empty,
536 // Or snapshot was empty and we construct a placeholder state.
537 debug_assert_ne!(current_pos, None);
538
539 Self::persist_state(
540 barrier.epoch,
541 &mut self.state_table,
542 true,
543 ¤t_pos,
544 total_snapshot_processed_rows,
545 &mut old_state,
546 &mut current_state,
547 )
548 .await?;
549 tracing::trace!(
550 epoch = ?barrier.epoch,
551 ?current_pos,
552 total_snapshot_processed_rows,
553 "Backfill position persisted after completion"
554 );
555 }
556
557 // For both backfill finished before recovery,
558 // and backfill which just finished, we need to update mview tracker,
559 // it does not persist this information.
560 self.progress
561 .finish(barrier.epoch, total_snapshot_processed_rows);
562 tracing::trace!(
563 epoch = ?barrier.epoch,
564 "Updated CreateMaterializedTracker"
565 );
566 yield msg;
567 break;
568 }
569 // Allow other messages to pass through.
570 // We won't yield twice here, since if there's a barrier,
571 // we will always break out of the loop.
572 yield msg;
573 }
574 }
575
576 tracing::trace!(
577 "Backfill has already finished and forward messages directly to the downstream"
578 );
579
580 // After progress finished + state persisted,
581 // we can forward messages directly to the downstream,
582 // as backfill is finished.
583 // We don't need to report backfill progress any longer, as it has finished.
584 // It will always be at 100%.
585 #[for_await]
586 for msg in upstream {
587 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
588 if let Message::Barrier(barrier) = &msg {
589 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
590 if let Some(table) = &mut self.state_table {
591 table
592 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
593 .await?;
594 }
595 }
596
597 yield msg;
598 }
599 }
600 }
601
602 async fn recover_backfill_state(
603 state_table: Option<&StateTable<S>>,
604 pk_len: usize,
605 ) -> StreamExecutorResult<BackfillState> {
606 let Some(state_table) = state_table else {
607 // If no state table, but backfill is present, it must be from an old cluster.
608 // In that case backfill must be finished, otherwise it won't have been persisted.
609 return Ok(BackfillState {
610 current_pos: None,
611 is_finished: true,
612 row_count: 0,
613 old_state: None,
614 });
615 };
616 let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
617 let first_vnode = vnodes.next().unwrap();
618 let key: &[Datum] = &[Some(first_vnode.into())];
619 let row = state_table.get_row(key).await?;
620 let expected_state = Self::deserialize_backfill_state(row, pk_len);
621
622 // All vnode partitions should have same state (no scale-in supported).
623 for vnode in vnodes {
624 let key: &[Datum] = &[Some(vnode.into())];
625 let row = state_table.get_row(key).await?;
626 let state = Self::deserialize_backfill_state(row, pk_len);
627 assert_eq!(state.is_finished, expected_state.is_finished);
628 }
629 Ok(expected_state)
630 }
631
632 fn deserialize_backfill_state(row: Option<OwnedRow>, pk_len: usize) -> BackfillState {
633 let Some(row) = row else {
634 return BackfillState {
635 current_pos: None,
636 is_finished: false,
637 row_count: 0,
638 old_state: None,
639 };
640 };
641 let row = row.into_inner();
642 let mut old_state = vec![None; pk_len + METADATA_STATE_LEN];
643 old_state[1..row.len() + 1].clone_from_slice(&row);
644 let current_pos = Some((&row[0..pk_len]).into_owned_row());
645 let is_finished = row[pk_len].clone().is_some_and(|d| d.into_bool());
646 let row_count = row
647 .get(pk_len + 1)
648 .cloned()
649 .unwrap_or(None)
650 .map_or(0, |d| d.into_int64() as u64);
651 BackfillState {
652 current_pos,
653 is_finished,
654 row_count,
655 old_state: Some(old_state),
656 }
657 }
658
659 #[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
660 async fn make_snapshot_stream<'a>(
661 upstream_table: &'a BatchTable<S>,
662 epoch: u64,
663 current_pos: Option<OwnedRow>,
664 paused: bool,
665 rate_limiter: &'a MonitoredRateLimiter,
666 ) {
667 if paused {
668 #[for_await]
669 for _ in tokio_stream::pending() {
670 bail!("BUG: paused stream should not yield");
671 }
672 } else {
673 // Checked the rate limit is not zero.
674 #[for_await]
675 for r in
676 Self::snapshot_read(upstream_table, HummockReadEpoch::NoWait(epoch), current_pos)
677 {
678 rate_limiter.wait(1).await;
679 yield Some(r?);
680 }
681 }
682 yield None;
683 }
684
685 /// Snapshot read the upstream mv.
686 /// The rows from upstream snapshot read will be buffered inside the `builder`.
687 /// If snapshot is dropped before its rows are consumed,
688 /// remaining data in `builder` must be flushed manually.
689 /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
690 /// present, Then when we flush we contain duplicate rows.
691 #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
692 pub async fn snapshot_read(
693 upstream_table: &BatchTable<S>,
694 epoch: HummockReadEpoch,
695 current_pos: Option<OwnedRow>,
696 ) {
697 let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
698 let range_bounds = match range_bounds {
699 None => {
700 return Ok(());
701 }
702 Some(range_bounds) => range_bounds,
703 };
704
705 // We use uncommitted read here, because we have already scheduled the `BackfillExecutor`
706 // together with the upstream mv.
707 let row_iter = upstream_table
708 .batch_iter_with_pk_bounds(
709 epoch,
710 row::empty(),
711 range_bounds,
712 true,
713 // Here we only use small range prefetch because every barrier change, the executor will recreate a new iterator. So we do not need prefetch too much data.
714 PrefetchOptions::prefetch_for_small_range_scan(),
715 )
716 .await?;
717
718 #[for_await]
719 for row in row_iter {
720 yield row?;
721 }
722 }
723
724 async fn persist_state(
725 epoch: EpochPair,
726 table: &mut Option<StateTable<S>>,
727 is_finished: bool,
728 current_pos: &Option<OwnedRow>,
729 row_count: u64,
730 old_state: &mut Option<Vec<Datum>>,
731 current_state: &mut [Datum],
732 ) -> StreamExecutorResult<()> {
733 // Backwards compatibility with no state table in backfill.
734 let Some(table) = table else { return Ok(()) };
735 utils::persist_state(
736 epoch,
737 table,
738 is_finished,
739 current_pos,
740 row_count,
741 old_state,
742 current_state,
743 )
744 .await
745 }
746
747 /// 1. Converts from data chunk to stream chunk.
748 /// 2. Update the current position.
749 /// 3. Update Metrics
750 /// 4. Map the chunk according to output indices, return
751 /// the stream chunk and do wrapping outside.
752 fn handle_snapshot_chunk(
753 data_chunk: DataChunk,
754 current_pos: &mut Option<OwnedRow>,
755 cur_barrier_snapshot_processed_rows: &mut u64,
756 total_snapshot_processed_rows: &mut u64,
757 pk_indices: &[usize],
758 output_indices: &[usize],
759 ) -> StreamChunk {
760 let ops = vec![Op::Insert; data_chunk.capacity()];
761 let chunk = StreamChunk::from_parts(ops, data_chunk);
762 // Raise the current position.
763 // As snapshot read streams are ordered by pk, so we can
764 // just use the last row to update `current_pos`.
765 *current_pos = Some(get_new_pos(&chunk, pk_indices));
766
767 let chunk_cardinality = chunk.cardinality() as u64;
768 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
769 *total_snapshot_processed_rows += chunk_cardinality;
770
771 mapping_chunk(chunk, output_indices)
772 }
773}
774
775impl<S> Execute for BackfillExecutor<S>
776where
777 S: StateStore,
778{
779 fn execute(self: Box<Self>) -> BoxedMessageStream {
780 self.execute_inner().boxed()
781 }
782}