risingwave_stream/executor/backfill/no_shuffle_backfill.rs
1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures::stream;
17use futures::stream::select_with_strategy;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::{bail, row};
22use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
23use risingwave_hummock_sdk::HummockReadEpoch;
24use risingwave_pb::common::ThrottleType;
25use risingwave_storage::store::PrefetchOptions;
26use risingwave_storage::table::batch_table::BatchTable;
27
28use crate::executor::backfill::utils;
29use crate::executor::backfill::utils::{
30 METADATA_STATE_LEN, compute_bounds, construct_initial_finished_state, create_builder,
31 get_new_pos, mapping_chunk, mapping_message, mark_chunk,
32};
33use crate::executor::prelude::*;
34use crate::task::{CreateMviewProgressReporter, FragmentId};
35
36/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
37/// We can decode that into `BackfillState` on recovery.
38#[derive(Debug, Eq, PartialEq)]
39pub struct BackfillState {
40 current_pos: Option<OwnedRow>,
41 old_state: Option<Vec<Datum>>,
42 is_finished: bool,
43 row_count: u64,
44}
45
46/// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13).
47/// `BackfillExecutor` is used to create a materialized view on another materialized view.
48///
49/// It can only buffer chunks between two barriers instead of unbundled memory usage of
50/// `RearrangedChainExecutor`.
51///
52/// It uses the latest epoch to read the snapshot of the upstream mv during two barriers and all the
53/// `StreamChunk` of the snapshot read will forward to the downstream.
54///
55/// It uses `current_pos` to record the progress of the backfill (the pk of the upstream mv) and
56/// `current_pos` is initiated as an empty `Row`.
57///
58/// All upstream messages during the two barriers interval will be buffered and decide to forward or
59/// ignore based on the `current_pos` at the end of the later barrier. Once `current_pos` reaches
60/// the end of the upstream mv pk, the backfill would finish.
61///
62/// Notice:
63/// The pk we are talking about here refers to the storage primary key.
64/// We rely on the scheduler to schedule the `BackfillExecutor` together with the upstream mv/table
65/// in the same worker, so that we can read uncommitted data from the upstream table without
66/// waiting.
67pub struct BackfillExecutor<S: StateStore> {
68 /// Upstream table
69 upstream_table: BatchTable<S>,
70 /// Upstream with the same schema with the upstream table.
71 upstream: Executor,
72
73 /// Internal state table for persisting state of backfill state.
74 state_table: Option<StateTable<S>>,
75
76 /// The column indices need to be forwarded to the downstream from the upstream and table scan.
77 output_indices: Vec<usize>,
78
79 /// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
80 progress: CreateMviewProgressReporter,
81
82 actor_id: ActorId,
83
84 metrics: Arc<StreamingMetrics>,
85
86 chunk_size: usize,
87
88 rate_limiter: MonitoredRateLimiter,
89
90 /// Fragment id of the fragment this backfill node belongs to.
91 fragment_id: FragmentId,
92}
93
94impl<S> BackfillExecutor<S>
95where
96 S: StateStore,
97{
98 #[allow(clippy::too_many_arguments)]
99 pub fn new(
100 upstream_table: BatchTable<S>,
101 upstream: Executor,
102 state_table: Option<StateTable<S>>,
103 output_indices: Vec<usize>,
104 progress: CreateMviewProgressReporter,
105 metrics: Arc<StreamingMetrics>,
106 chunk_size: usize,
107 rate_limit: RateLimit,
108 fragment_id: FragmentId,
109 ) -> Self {
110 let actor_id = progress.actor_id();
111 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
112 Self {
113 upstream_table,
114 upstream,
115 state_table,
116 output_indices,
117 progress,
118 actor_id,
119 metrics,
120 chunk_size,
121 rate_limiter,
122 fragment_id,
123 }
124 }
125
126 #[try_stream(ok = Message, error = StreamExecutorError)]
127 async fn execute_inner(mut self) {
128 // The primary key columns.
129 // We receive a pruned chunk from the upstream table,
130 // which will only contain output columns of the scan on the upstream table.
131 // The pk indices specify the pk columns of the pruned chunk.
132 let pk_indices = self.upstream_table.pk_in_output_indices().unwrap();
133
134 let state_len = pk_indices.len() + METADATA_STATE_LEN;
135
136 let pk_order = self.upstream_table.pk_serializer().get_order_types();
137
138 let upstream_table_id = self.upstream_table.table_id();
139
140 let mut upstream = self.upstream.execute();
141
142 // Poll the upstream to get the first barrier.
143 let first_barrier = expect_first_barrier(&mut upstream).await?;
144 let mut global_pause = first_barrier.is_pause_on_startup();
145 let mut backfill_paused = first_barrier.is_backfill_pause_on_startup(self.fragment_id);
146 let first_epoch = first_barrier.epoch;
147 let init_epoch = first_barrier.epoch.prev;
148 // The first barrier message should be propagated.
149 yield Message::Barrier(first_barrier);
150
151 if let Some(state_table) = self.state_table.as_mut() {
152 state_table.init_epoch(first_epoch).await?;
153 }
154
155 let BackfillState {
156 mut current_pos,
157 is_finished,
158 row_count,
159 mut old_state,
160 } = Self::recover_backfill_state(self.state_table.as_ref(), pk_indices.len()).await?;
161 tracing::trace!(is_finished, row_count, "backfill state recovered");
162
163 let data_types = self.upstream_table.schema().data_types();
164
165 // Chunk builder will be instantiated with min(rate_limit, self.chunk_size) as the chunk's max size.
166 let mut builder = create_builder(
167 self.rate_limiter.rate_limit(),
168 self.chunk_size,
169 data_types.clone(),
170 );
171
172 // Use this buffer to construct state,
173 // which will then be persisted.
174 let mut current_state: Vec<Datum> = vec![None; state_len];
175
176 // If no need backfill, but state was still "unfinished" we need to finish it.
177 // So we just update the state + progress to meta at the next barrier to finish progress,
178 // and forward other messages.
179 //
180 // Reason for persisting on second barrier rather than first:
181 // We can't update meta with progress as finished until state_table
182 // has been updated.
183 // We also can't update state_table in first epoch, since state_table
184 // expects to have been initialized in previous epoch.
185
186 // The epoch used to snapshot read upstream mv.
187 let mut snapshot_read_epoch = init_epoch;
188
189 // Keep track of rows from the snapshot.
190 let mut total_snapshot_processed_rows: u64 = row_count;
191
192 // Backfill Algorithm:
193 //
194 // backfill_stream
195 // / \
196 // upstream snapshot
197 //
198 // We construct a backfill stream with upstream as its left input and mv snapshot read
199 // stream as its right input. When a chunk comes from upstream, we will buffer it.
200 //
201 // When a barrier comes from upstream:
202 // - Update the `snapshot_read_epoch`.
203 // - For each row of the upstream chunk buffer, forward it to downstream if its pk <=
204 // `current_pos`, otherwise ignore it.
205 // - reconstruct the whole backfill stream with upstream and new mv snapshot read stream
206 // with the `snapshot_read_epoch`.
207 //
208 // When a chunk comes from snapshot, we forward it to the downstream and raise
209 // `current_pos`.
210 //
211 // When we reach the end of the snapshot read stream, it means backfill has been
212 // finished.
213 //
214 // Once the backfill loop ends, we forward the upstream directly to the downstream.
215 if !is_finished {
216 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
217 let mut pending_barrier: Option<Barrier> = None;
218
219 let metrics = self
220 .metrics
221 .new_backfill_metrics(upstream_table_id, self.actor_id);
222
223 'backfill_loop: loop {
224 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
225 let mut cur_barrier_upstream_processed_rows: u64 = 0;
226 let mut snapshot_read_complete = false;
227 let mut has_snapshot_read = false;
228
229 // We should not buffer rows from previous epoch, else we can have duplicates.
230 assert!(upstream_chunk_buffer.is_empty());
231
232 {
233 let left_upstream = upstream.by_ref().map(Either::Left);
234 let paused = global_pause
235 || backfill_paused
236 || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
237 let right_snapshot = pin!(
238 Self::make_snapshot_stream(
239 &self.upstream_table,
240 snapshot_read_epoch,
241 current_pos.clone(),
242 paused,
243 &self.rate_limiter,
244 )
245 .map(Either::Right)
246 );
247
248 // Prefer to select upstream, so we can stop snapshot stream as soon as the
249 // barrier comes.
250 let mut backfill_stream =
251 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
252 stream::PollNext::Left
253 });
254
255 #[for_await]
256 for either in &mut backfill_stream {
257 match either {
258 // Upstream
259 Either::Left(msg) => {
260 match msg? {
261 Message::Barrier(barrier) => {
262 // We have to process barrier outside of the loop.
263 // This is because the backfill stream holds a mutable
264 // reference to our chunk builder.
265 // We want to create another mutable reference
266 // to flush remaining chunks from the chunk builder
267 // on barrier.
268 // Hence we break here and process it after this block.
269 pending_barrier = Some(barrier);
270 break;
271 }
272 Message::Chunk(chunk) => {
273 // Buffer the upstream chunk.
274 upstream_chunk_buffer.push(chunk.compact_vis());
275 }
276 Message::Watermark(_) => {
277 // Ignore watermark during backfill.
278 }
279 }
280 }
281 // Snapshot read
282 Either::Right(msg) => {
283 has_snapshot_read = true;
284 match msg? {
285 None => {
286 // Consume remaining rows in the builder.
287 if let Some(data_chunk) = builder.consume_all() {
288 yield Message::Chunk(Self::handle_snapshot_chunk(
289 data_chunk,
290 &mut current_pos,
291 &mut cur_barrier_snapshot_processed_rows,
292 &mut total_snapshot_processed_rows,
293 &pk_indices,
294 &self.output_indices,
295 ));
296 }
297
298 // End of the snapshot read stream.
299 // We should not mark the chunk anymore,
300 // otherwise, we will ignore some rows
301 // in the buffer. Here we choose to never mark the chunk.
302 // Consume with the renaming stream buffer chunk without
303 // mark.
304 for chunk in upstream_chunk_buffer.drain(..) {
305 let chunk_cardinality = chunk.cardinality() as u64;
306 cur_barrier_upstream_processed_rows +=
307 chunk_cardinality;
308 yield Message::Chunk(mapping_chunk(
309 chunk,
310 &self.output_indices,
311 ));
312 }
313 metrics
314 .backfill_snapshot_read_row_count
315 .inc_by(cur_barrier_snapshot_processed_rows);
316 metrics
317 .backfill_upstream_output_row_count
318 .inc_by(cur_barrier_upstream_processed_rows);
319 break 'backfill_loop;
320 }
321 Some(record) => {
322 // Buffer the snapshot read row.
323 if let Some(data_chunk) = builder.append_one_row(record) {
324 yield Message::Chunk(Self::handle_snapshot_chunk(
325 data_chunk,
326 &mut current_pos,
327 &mut cur_barrier_snapshot_processed_rows,
328 &mut total_snapshot_processed_rows,
329 &pk_indices,
330 &self.output_indices,
331 ));
332 }
333 }
334 }
335 }
336 }
337 }
338
339 // Before processing barrier, if did not snapshot read,
340 // do a snapshot read first.
341 // This is so we don't lose the tombstone iteration progress.
342 // If paused, we also can't read any snapshot records.
343 if !has_snapshot_read && !paused {
344 assert!(
345 builder.is_empty(),
346 "Builder should be empty if no snapshot read"
347 );
348 let (_, snapshot) = backfill_stream.into_inner();
349 #[for_await]
350 for msg in snapshot {
351 let Either::Right(msg) = msg else {
352 bail!("BUG: snapshot_read contains upstream messages");
353 };
354 match msg? {
355 None => {
356 // End of the snapshot read stream.
357 // We let the barrier handling logic take care of upstream updates.
358 // But we still want to exit backfill loop, so we mark snapshot read complete.
359 snapshot_read_complete = true;
360 break;
361 }
362 Some(row) => {
363 let chunk = DataChunk::from_rows(&[row], &data_types);
364 yield Message::Chunk(Self::handle_snapshot_chunk(
365 chunk,
366 &mut current_pos,
367 &mut cur_barrier_snapshot_processed_rows,
368 &mut total_snapshot_processed_rows,
369 &pk_indices,
370 &self.output_indices,
371 ));
372 break;
373 }
374 }
375 }
376 }
377 }
378 // When we break out of inner backfill_stream loop, it means we have a barrier.
379 // If there are no updates and there are no snapshots left,
380 // we already finished backfill and should have exited the outer backfill loop.
381 let barrier = match pending_barrier.take() {
382 Some(barrier) => barrier,
383 None => bail!("BUG: current_backfill loop exited without a barrier"),
384 };
385
386 // Process barrier:
387 // - consume snapshot rows left in builder
388 // - consume upstream buffer chunk
389 // - switch snapshot
390
391 // Consume snapshot rows left in builder
392 let chunk = builder.consume_all();
393 if let Some(chunk) = chunk {
394 yield Message::Chunk(Self::handle_snapshot_chunk(
395 chunk,
396 &mut current_pos,
397 &mut cur_barrier_snapshot_processed_rows,
398 &mut total_snapshot_processed_rows,
399 &pk_indices,
400 &self.output_indices,
401 ));
402 }
403
404 // Consume upstream buffer chunk
405 // If no current_pos, means we did not process any snapshot
406 // yet. In that case
407 // we can just ignore the upstream buffer chunk, but still need to clean it.
408 if let Some(current_pos) = ¤t_pos {
409 for chunk in upstream_chunk_buffer.drain(..) {
410 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
411 yield Message::Chunk(mapping_chunk(
412 mark_chunk(chunk, current_pos, &pk_indices, pk_order),
413 &self.output_indices,
414 ));
415 }
416 } else {
417 upstream_chunk_buffer.clear()
418 }
419
420 metrics
421 .backfill_snapshot_read_row_count
422 .inc_by(cur_barrier_snapshot_processed_rows);
423 metrics
424 .backfill_upstream_output_row_count
425 .inc_by(cur_barrier_upstream_processed_rows);
426
427 // Update snapshot read epoch.
428 snapshot_read_epoch = barrier.epoch.prev;
429
430 self.progress.update(
431 barrier.epoch,
432 snapshot_read_epoch,
433 total_snapshot_processed_rows,
434 );
435
436 // Persist state on barrier
437 Self::persist_state(
438 barrier.epoch,
439 &mut self.state_table,
440 false,
441 ¤t_pos,
442 total_snapshot_processed_rows,
443 &mut old_state,
444 &mut current_state,
445 )
446 .await?;
447
448 tracing::trace!(
449 epoch = ?barrier.epoch,
450 ?current_pos,
451 total_snapshot_processed_rows,
452 "Backfill state persisted"
453 );
454
455 // Update snapshot read chunk builder.
456 if let Some(mutation) = barrier.mutation.as_deref() {
457 match mutation {
458 Mutation::Pause => {
459 global_pause = true;
460 }
461 Mutation::Resume => {
462 global_pause = false;
463 }
464 Mutation::StartFragmentBackfill { fragment_ids } if backfill_paused => {
465 if fragment_ids.contains(&self.fragment_id) {
466 backfill_paused = false;
467 }
468 }
469 Mutation::Throttle(fragment_to_apply) => {
470 if let Some(entry) = fragment_to_apply.get(&self.fragment_id)
471 && entry.throttle_type() == ThrottleType::Backfill
472 {
473 let new_rate_limit = entry.rate_limit.into();
474 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
475 if old_rate_limit != new_rate_limit {
476 tracing::info!(
477 old_rate_limit = ?old_rate_limit,
478 new_rate_limit = ?new_rate_limit,
479 %upstream_table_id,
480 actor_id = %self.actor_id,
481 "backfill rate limit changed",
482 );
483 // The builder is emptied above via `DataChunkBuilder::consume_all`.
484 assert!(
485 builder.is_empty(),
486 "builder should already be emptied"
487 );
488 builder = create_builder(
489 new_rate_limit,
490 self.chunk_size,
491 self.upstream_table.schema().data_types(),
492 );
493 }
494 }
495 }
496 _ => (),
497 }
498 }
499
500 yield Message::Barrier(barrier);
501
502 if snapshot_read_complete {
503 break 'backfill_loop;
504 }
505
506 // We will switch snapshot at the start of the next iteration of the backfill loop.
507 }
508 }
509
510 tracing::trace!("Backfill has finished, waiting for barrier");
511
512 // Wait for first barrier to come after backfill is finished.
513 // So we can update our progress + persist the status.
514 while let Some(Ok(msg)) = upstream.next().await {
515 if let Some(msg) = mapping_message(msg, &self.output_indices) {
516 // If not finished then we need to update state, otherwise no need.
517 if let Message::Barrier(barrier) = &msg {
518 if is_finished {
519 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
520 if let Some(table) = &mut self.state_table {
521 table
522 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
523 .await?;
524 }
525 } else {
526 // If snapshot was empty, we do not need to backfill,
527 // but we still need to persist the finished state.
528 // We currently persist it on the second barrier here rather than first.
529 // This is because we can't update state table in first epoch,
530 // since it expects to have been initialized in previous epoch
531 // (there's no epoch before the first epoch).
532 if current_pos.is_none() {
533 current_pos = Some(construct_initial_finished_state(pk_indices.len()))
534 }
535
536 // We will update current_pos at least once,
537 // since snapshot read has to be non-empty,
538 // Or snapshot was empty and we construct a placeholder state.
539 debug_assert_ne!(current_pos, None);
540
541 Self::persist_state(
542 barrier.epoch,
543 &mut self.state_table,
544 true,
545 ¤t_pos,
546 total_snapshot_processed_rows,
547 &mut old_state,
548 &mut current_state,
549 )
550 .await?;
551 tracing::trace!(
552 epoch = ?barrier.epoch,
553 ?current_pos,
554 total_snapshot_processed_rows,
555 "Backfill position persisted after completion"
556 );
557 }
558
559 // For both backfill finished before recovery,
560 // and backfill which just finished, we need to update mview tracker,
561 // it does not persist this information.
562 self.progress
563 .finish(barrier.epoch, total_snapshot_processed_rows);
564 tracing::trace!(
565 epoch = ?barrier.epoch,
566 "Updated CreateMaterializedTracker"
567 );
568 yield msg;
569 break;
570 }
571 // Allow other messages to pass through.
572 // We won't yield twice here, since if there's a barrier,
573 // we will always break out of the loop.
574 yield msg;
575 }
576 }
577
578 tracing::trace!(
579 "Backfill has already finished and forward messages directly to the downstream"
580 );
581
582 // After progress finished + state persisted,
583 // we can forward messages directly to the downstream,
584 // as backfill is finished.
585 // We don't need to report backfill progress any longer, as it has finished.
586 // It will always be at 100%.
587 #[for_await]
588 for msg in upstream {
589 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
590 if let Message::Barrier(barrier) = &msg {
591 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
592 if let Some(table) = &mut self.state_table {
593 table
594 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
595 .await?;
596 }
597 }
598
599 yield msg;
600 }
601 }
602 }
603
604 async fn recover_backfill_state(
605 state_table: Option<&StateTable<S>>,
606 pk_len: usize,
607 ) -> StreamExecutorResult<BackfillState> {
608 let Some(state_table) = state_table else {
609 // If no state table, but backfill is present, it must be from an old cluster.
610 // In that case backfill must be finished, otherwise it won't have been persisted.
611 return Ok(BackfillState {
612 current_pos: None,
613 is_finished: true,
614 row_count: 0,
615 old_state: None,
616 });
617 };
618 let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
619 let first_vnode = vnodes.next().unwrap();
620 let key: &[Datum] = &[Some(first_vnode.into())];
621 let row = state_table.get_row(key).await?;
622 let expected_state = Self::deserialize_backfill_state(row, pk_len);
623
624 // All vnode partitions should have same state (no scale-in supported).
625 for vnode in vnodes {
626 let key: &[Datum] = &[Some(vnode.into())];
627 let row = state_table.get_row(key).await?;
628 let state = Self::deserialize_backfill_state(row, pk_len);
629 assert_eq!(state.is_finished, expected_state.is_finished);
630 }
631 Ok(expected_state)
632 }
633
634 fn deserialize_backfill_state(row: Option<OwnedRow>, pk_len: usize) -> BackfillState {
635 let Some(row) = row else {
636 return BackfillState {
637 current_pos: None,
638 is_finished: false,
639 row_count: 0,
640 old_state: None,
641 };
642 };
643 let row = row.into_inner();
644 let mut old_state = vec![None; pk_len + METADATA_STATE_LEN];
645 old_state[1..row.len() + 1].clone_from_slice(&row);
646 let current_pos = Some((&row[0..pk_len]).into_owned_row());
647 let is_finished = row[pk_len].clone().is_some_and(|d| d.into_bool());
648 let row_count = row
649 .get(pk_len + 1)
650 .cloned()
651 .unwrap_or(None)
652 .map_or(0, |d| d.into_int64() as u64);
653 BackfillState {
654 current_pos,
655 is_finished,
656 row_count,
657 old_state: Some(old_state),
658 }
659 }
660
661 #[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
662 async fn make_snapshot_stream<'a>(
663 upstream_table: &'a BatchTable<S>,
664 epoch: u64,
665 current_pos: Option<OwnedRow>,
666 paused: bool,
667 rate_limiter: &'a MonitoredRateLimiter,
668 ) {
669 if paused {
670 #[for_await]
671 for _ in tokio_stream::pending() {
672 bail!("BUG: paused stream should not yield");
673 }
674 } else {
675 // Checked the rate limit is not zero.
676 #[for_await]
677 for r in
678 Self::snapshot_read(upstream_table, HummockReadEpoch::NoWait(epoch), current_pos)
679 {
680 rate_limiter.wait(1).await;
681 yield Some(r?);
682 }
683 }
684 yield None;
685 }
686
687 /// Snapshot read the upstream mv.
688 /// The rows from upstream snapshot read will be buffered inside the `builder`.
689 /// If snapshot is dropped before its rows are consumed,
690 /// remaining data in `builder` must be flushed manually.
691 /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
692 /// present, Then when we flush we contain duplicate rows.
693 #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
694 pub async fn snapshot_read(
695 upstream_table: &BatchTable<S>,
696 epoch: HummockReadEpoch,
697 current_pos: Option<OwnedRow>,
698 ) {
699 let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
700 let range_bounds = match range_bounds {
701 None => {
702 return Ok(());
703 }
704 Some(range_bounds) => range_bounds,
705 };
706
707 // We use uncommitted read here, because we have already scheduled the `BackfillExecutor`
708 // together with the upstream mv.
709 let row_iter = upstream_table
710 .batch_iter_with_pk_bounds(
711 epoch,
712 row::empty(),
713 range_bounds,
714 true,
715 // Here we only use small range prefetch because every barrier change, the executor will recreate a new iterator. So we do not need prefetch too much data.
716 PrefetchOptions::prefetch_for_small_range_scan(),
717 )
718 .await?;
719
720 #[for_await]
721 for row in row_iter {
722 yield row?;
723 }
724 }
725
726 async fn persist_state(
727 epoch: EpochPair,
728 table: &mut Option<StateTable<S>>,
729 is_finished: bool,
730 current_pos: &Option<OwnedRow>,
731 row_count: u64,
732 old_state: &mut Option<Vec<Datum>>,
733 current_state: &mut [Datum],
734 ) -> StreamExecutorResult<()> {
735 // Backwards compatibility with no state table in backfill.
736 let Some(table) = table else { return Ok(()) };
737 utils::persist_state(
738 epoch,
739 table,
740 is_finished,
741 current_pos,
742 row_count,
743 old_state,
744 current_state,
745 )
746 .await
747 }
748
749 /// 1. Converts from data chunk to stream chunk.
750 /// 2. Update the current position.
751 /// 3. Update Metrics
752 /// 4. Map the chunk according to output indices, return
753 /// the stream chunk and do wrapping outside.
754 fn handle_snapshot_chunk(
755 data_chunk: DataChunk,
756 current_pos: &mut Option<OwnedRow>,
757 cur_barrier_snapshot_processed_rows: &mut u64,
758 total_snapshot_processed_rows: &mut u64,
759 pk_indices: &[usize],
760 output_indices: &[usize],
761 ) -> StreamChunk {
762 let ops = vec![Op::Insert; data_chunk.capacity()];
763 let chunk = StreamChunk::from_parts(ops, data_chunk);
764 // Raise the current position.
765 // As snapshot read streams are ordered by pk, so we can
766 // just use the last row to update `current_pos`.
767 *current_pos = Some(get_new_pos(&chunk, pk_indices));
768
769 let chunk_cardinality = chunk.cardinality() as u64;
770 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
771 *total_snapshot_processed_rows += chunk_cardinality;
772
773 mapping_chunk(chunk, output_indices)
774 }
775}
776
777impl<S> Execute for BackfillExecutor<S>
778where
779 S: StateStore,
780{
781 fn execute(self: Box<Self>) -> BoxedMessageStream {
782 self.execute_inner().boxed()
783 }
784}