risingwave_stream/executor/backfill/no_shuffle_backfill.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures::stream;
17use futures::stream::select_with_strategy;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::hash::VnodeBitmapExt;
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::{bail, row};
22use risingwave_common_rate_limit::{MonitoredRateLimiter, RateLimit, RateLimiter};
23use risingwave_hummock_sdk::HummockReadEpoch;
24use risingwave_storage::store::PrefetchOptions;
25use risingwave_storage::table::batch_table::BatchTable;
26
27use crate::executor::backfill::utils;
28use crate::executor::backfill::utils::{
29 METADATA_STATE_LEN, compute_bounds, construct_initial_finished_state, create_builder,
30 get_new_pos, mapping_chunk, mapping_message, mark_chunk,
31};
32use crate::executor::prelude::*;
33use crate::task::CreateMviewProgressReporter;
34
35/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
36/// We can decode that into `BackfillState` on recovery.
37#[derive(Debug, Eq, PartialEq)]
38pub struct BackfillState {
39 current_pos: Option<OwnedRow>,
40 old_state: Option<Vec<Datum>>,
41 is_finished: bool,
42 row_count: u64,
43}
44
45/// An implementation of the [RFC: Use Backfill To Let Mv On Mv Stream Again](https://github.com/risingwavelabs/rfcs/pull/13).
46/// `BackfillExecutor` is used to create a materialized view on another materialized view.
47///
48/// It can only buffer chunks between two barriers instead of unbundled memory usage of
49/// `RearrangedChainExecutor`.
50///
51/// It uses the latest epoch to read the snapshot of the upstream mv during two barriers and all the
52/// `StreamChunk` of the snapshot read will forward to the downstream.
53///
54/// It uses `current_pos` to record the progress of the backfill (the pk of the upstream mv) and
55/// `current_pos` is initiated as an empty `Row`.
56///
57/// All upstream messages during the two barriers interval will be buffered and decide to forward or
58/// ignore based on the `current_pos` at the end of the later barrier. Once `current_pos` reaches
59/// the end of the upstream mv pk, the backfill would finish.
60///
61/// Notice:
62/// The pk we are talking about here refers to the storage primary key.
63/// We rely on the scheduler to schedule the `BackfillExecutor` together with the upstream mv/table
64/// in the same worker, so that we can read uncommitted data from the upstream table without
65/// waiting.
66pub struct BackfillExecutor<S: StateStore> {
67 /// Upstream table
68 upstream_table: BatchTable<S>,
69 /// Upstream with the same schema with the upstream table.
70 upstream: Executor,
71
72 /// Internal state table for persisting state of backfill state.
73 state_table: Option<StateTable<S>>,
74
75 /// The column indices need to be forwarded to the downstream from the upstream and table scan.
76 output_indices: Vec<usize>,
77
78 /// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
79 progress: CreateMviewProgressReporter,
80
81 actor_id: ActorId,
82
83 metrics: Arc<StreamingMetrics>,
84
85 chunk_size: usize,
86
87 rate_limiter: MonitoredRateLimiter,
88}
89
90impl<S> BackfillExecutor<S>
91where
92 S: StateStore,
93{
94 #[allow(clippy::too_many_arguments)]
95 pub fn new(
96 upstream_table: BatchTable<S>,
97 upstream: Executor,
98 state_table: Option<StateTable<S>>,
99 output_indices: Vec<usize>,
100 progress: CreateMviewProgressReporter,
101 metrics: Arc<StreamingMetrics>,
102 chunk_size: usize,
103 rate_limit: RateLimit,
104 ) -> Self {
105 let actor_id = progress.actor_id();
106 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table.table_id());
107 Self {
108 upstream_table,
109 upstream,
110 state_table,
111 output_indices,
112 progress,
113 actor_id,
114 metrics,
115 chunk_size,
116 rate_limiter,
117 }
118 }
119
120 #[try_stream(ok = Message, error = StreamExecutorError)]
121 async fn execute_inner(mut self) {
122 // The primary key columns.
123 // We receive a pruned chunk from the upstream table,
124 // which will only contain output columns of the scan on the upstream table.
125 // The pk indices specify the pk columns of the pruned chunk.
126 let pk_indices = self.upstream_table.pk_in_output_indices().unwrap();
127
128 let state_len = pk_indices.len() + METADATA_STATE_LEN;
129
130 let pk_order = self.upstream_table.pk_serializer().get_order_types();
131
132 let upstream_table_id = self.upstream_table.table_id().table_id;
133
134 let mut upstream = self.upstream.execute();
135
136 // Poll the upstream to get the first barrier.
137 let first_barrier = expect_first_barrier(&mut upstream).await?;
138 let mut paused = first_barrier.is_pause_on_startup();
139 let first_epoch = first_barrier.epoch;
140 let init_epoch = first_barrier.epoch.prev;
141 // The first barrier message should be propagated.
142 yield Message::Barrier(first_barrier);
143
144 if let Some(state_table) = self.state_table.as_mut() {
145 state_table.init_epoch(first_epoch).await?;
146 }
147
148 let BackfillState {
149 mut current_pos,
150 is_finished,
151 row_count,
152 mut old_state,
153 } = Self::recover_backfill_state(self.state_table.as_ref(), pk_indices.len()).await?;
154 tracing::trace!(is_finished, row_count, "backfill state recovered");
155
156 let data_types = self.upstream_table.schema().data_types();
157
158 // Chunk builder will be instantiated with min(rate_limit, self.chunk_size) as the chunk's max size.
159 let mut builder = create_builder(
160 self.rate_limiter.rate_limit(),
161 self.chunk_size,
162 data_types.clone(),
163 );
164
165 // Use this buffer to construct state,
166 // which will then be persisted.
167 let mut current_state: Vec<Datum> = vec![None; state_len];
168
169 // If no need backfill, but state was still "unfinished" we need to finish it.
170 // So we just update the state + progress to meta at the next barrier to finish progress,
171 // and forward other messages.
172 //
173 // Reason for persisting on second barrier rather than first:
174 // We can't update meta with progress as finished until state_table
175 // has been updated.
176 // We also can't update state_table in first epoch, since state_table
177 // expects to have been initialized in previous epoch.
178
179 // The epoch used to snapshot read upstream mv.
180 let mut snapshot_read_epoch = init_epoch;
181
182 // Keep track of rows from the snapshot.
183 let mut total_snapshot_processed_rows: u64 = row_count;
184
185 // Backfill Algorithm:
186 //
187 // backfill_stream
188 // / \
189 // upstream snapshot
190 //
191 // We construct a backfill stream with upstream as its left input and mv snapshot read
192 // stream as its right input. When a chunk comes from upstream, we will buffer it.
193 //
194 // When a barrier comes from upstream:
195 // - Update the `snapshot_read_epoch`.
196 // - For each row of the upstream chunk buffer, forward it to downstream if its pk <=
197 // `current_pos`, otherwise ignore it.
198 // - reconstruct the whole backfill stream with upstream and new mv snapshot read stream
199 // with the `snapshot_read_epoch`.
200 //
201 // When a chunk comes from snapshot, we forward it to the downstream and raise
202 // `current_pos`.
203 //
204 // When we reach the end of the snapshot read stream, it means backfill has been
205 // finished.
206 //
207 // Once the backfill loop ends, we forward the upstream directly to the downstream.
208 if !is_finished {
209 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
210 let mut pending_barrier: Option<Barrier> = None;
211
212 let metrics = self
213 .metrics
214 .new_backfill_metrics(upstream_table_id, self.actor_id);
215
216 'backfill_loop: loop {
217 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
218 let mut cur_barrier_upstream_processed_rows: u64 = 0;
219 let mut snapshot_read_complete = false;
220 let mut has_snapshot_read = false;
221
222 // We should not buffer rows from previous epoch, else we can have duplicates.
223 assert!(upstream_chunk_buffer.is_empty());
224
225 {
226 let left_upstream = upstream.by_ref().map(Either::Left);
227 let paused =
228 paused || matches!(self.rate_limiter.rate_limit(), RateLimit::Pause);
229 let right_snapshot = pin!(
230 Self::make_snapshot_stream(
231 &self.upstream_table,
232 snapshot_read_epoch,
233 current_pos.clone(),
234 paused,
235 &self.rate_limiter,
236 )
237 .map(Either::Right)
238 );
239
240 // Prefer to select upstream, so we can stop snapshot stream as soon as the
241 // barrier comes.
242 let mut backfill_stream =
243 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
244 stream::PollNext::Left
245 });
246
247 #[for_await]
248 for either in &mut backfill_stream {
249 match either {
250 // Upstream
251 Either::Left(msg) => {
252 match msg? {
253 Message::Barrier(barrier) => {
254 // We have to process barrier outside of the loop.
255 // This is because the backfill stream holds a mutable
256 // reference to our chunk builder.
257 // We want to create another mutable reference
258 // to flush remaining chunks from the chunk builder
259 // on barrier.
260 // Hence we break here and process it after this block.
261 pending_barrier = Some(barrier);
262 break;
263 }
264 Message::Chunk(chunk) => {
265 // Buffer the upstream chunk.
266 upstream_chunk_buffer.push(chunk.compact());
267 }
268 Message::Watermark(_) => {
269 // Ignore watermark during backfill.
270 }
271 }
272 }
273 // Snapshot read
274 Either::Right(msg) => {
275 has_snapshot_read = true;
276 match msg? {
277 None => {
278 // Consume remaining rows in the builder.
279 if let Some(data_chunk) = builder.consume_all() {
280 yield Message::Chunk(Self::handle_snapshot_chunk(
281 data_chunk,
282 &mut current_pos,
283 &mut cur_barrier_snapshot_processed_rows,
284 &mut total_snapshot_processed_rows,
285 &pk_indices,
286 &self.output_indices,
287 ));
288 }
289
290 // End of the snapshot read stream.
291 // We should not mark the chunk anymore,
292 // otherwise, we will ignore some rows
293 // in the buffer. Here we choose to never mark the chunk.
294 // Consume with the renaming stream buffer chunk without
295 // mark.
296 for chunk in upstream_chunk_buffer.drain(..) {
297 let chunk_cardinality = chunk.cardinality() as u64;
298 cur_barrier_upstream_processed_rows +=
299 chunk_cardinality;
300 yield Message::Chunk(mapping_chunk(
301 chunk,
302 &self.output_indices,
303 ));
304 }
305 metrics
306 .backfill_snapshot_read_row_count
307 .inc_by(cur_barrier_snapshot_processed_rows);
308 metrics
309 .backfill_upstream_output_row_count
310 .inc_by(cur_barrier_upstream_processed_rows);
311 break 'backfill_loop;
312 }
313 Some(record) => {
314 // Buffer the snapshot read row.
315 if let Some(data_chunk) = builder.append_one_row(record) {
316 yield Message::Chunk(Self::handle_snapshot_chunk(
317 data_chunk,
318 &mut current_pos,
319 &mut cur_barrier_snapshot_processed_rows,
320 &mut total_snapshot_processed_rows,
321 &pk_indices,
322 &self.output_indices,
323 ));
324 }
325 }
326 }
327 }
328 }
329 }
330
331 // Before processing barrier, if did not snapshot read,
332 // do a snapshot read first.
333 // This is so we don't lose the tombstone iteration progress.
334 // If paused, we also can't read any snapshot records.
335 if !has_snapshot_read && !paused {
336 assert!(
337 builder.is_empty(),
338 "Builder should be empty if no snapshot read"
339 );
340 let (_, snapshot) = backfill_stream.into_inner();
341 #[for_await]
342 for msg in snapshot {
343 let Either::Right(msg) = msg else {
344 bail!("BUG: snapshot_read contains upstream messages");
345 };
346 match msg? {
347 None => {
348 // End of the snapshot read stream.
349 // We let the barrier handling logic take care of upstream updates.
350 // But we still want to exit backfill loop, so we mark snapshot read complete.
351 snapshot_read_complete = true;
352 break;
353 }
354 Some(row) => {
355 let chunk = DataChunk::from_rows(&[row], &data_types);
356 yield Message::Chunk(Self::handle_snapshot_chunk(
357 chunk,
358 &mut current_pos,
359 &mut cur_barrier_snapshot_processed_rows,
360 &mut total_snapshot_processed_rows,
361 &pk_indices,
362 &self.output_indices,
363 ));
364 break;
365 }
366 }
367 }
368 }
369 }
370 // When we break out of inner backfill_stream loop, it means we have a barrier.
371 // If there are no updates and there are no snapshots left,
372 // we already finished backfill and should have exited the outer backfill loop.
373 let barrier = match pending_barrier.take() {
374 Some(barrier) => barrier,
375 None => bail!("BUG: current_backfill loop exited without a barrier"),
376 };
377
378 // Process barrier:
379 // - consume snapshot rows left in builder
380 // - consume upstream buffer chunk
381 // - switch snapshot
382
383 // Consume snapshot rows left in builder
384 let chunk = builder.consume_all();
385 if let Some(chunk) = chunk {
386 yield Message::Chunk(Self::handle_snapshot_chunk(
387 chunk,
388 &mut current_pos,
389 &mut cur_barrier_snapshot_processed_rows,
390 &mut total_snapshot_processed_rows,
391 &pk_indices,
392 &self.output_indices,
393 ));
394 }
395
396 // Consume upstream buffer chunk
397 // If no current_pos, means we did not process any snapshot
398 // yet. In that case
399 // we can just ignore the upstream buffer chunk, but still need to clean it.
400 if let Some(current_pos) = ¤t_pos {
401 for chunk in upstream_chunk_buffer.drain(..) {
402 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
403 yield Message::Chunk(mapping_chunk(
404 mark_chunk(chunk, current_pos, &pk_indices, pk_order),
405 &self.output_indices,
406 ));
407 }
408 } else {
409 upstream_chunk_buffer.clear()
410 }
411
412 metrics
413 .backfill_snapshot_read_row_count
414 .inc_by(cur_barrier_snapshot_processed_rows);
415 metrics
416 .backfill_upstream_output_row_count
417 .inc_by(cur_barrier_upstream_processed_rows);
418
419 // Update snapshot read epoch.
420 snapshot_read_epoch = barrier.epoch.prev;
421
422 self.progress.update(
423 barrier.epoch,
424 snapshot_read_epoch,
425 total_snapshot_processed_rows,
426 );
427
428 // Persist state on barrier
429 Self::persist_state(
430 barrier.epoch,
431 &mut self.state_table,
432 false,
433 ¤t_pos,
434 total_snapshot_processed_rows,
435 &mut old_state,
436 &mut current_state,
437 )
438 .await?;
439
440 tracing::trace!(
441 epoch = ?barrier.epoch,
442 ?current_pos,
443 total_snapshot_processed_rows,
444 "Backfill state persisted"
445 );
446
447 // Update snapshot read chunk builder.
448 if let Some(mutation) = barrier.mutation.as_deref() {
449 match mutation {
450 Mutation::Pause => {
451 paused = true;
452 }
453 Mutation::Resume => {
454 paused = false;
455 }
456 Mutation::Throttle(actor_to_apply) => {
457 let new_rate_limit_entry = actor_to_apply.get(&self.actor_id);
458 if let Some(new_rate_limit) = new_rate_limit_entry {
459 let new_rate_limit = (*new_rate_limit).into();
460 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
461 if old_rate_limit != new_rate_limit {
462 tracing::info!(
463 old_rate_limit = ?old_rate_limit,
464 new_rate_limit = ?new_rate_limit,
465 upstream_table_id = upstream_table_id,
466 actor_id = self.actor_id,
467 "backfill rate limit changed",
468 );
469 // The builder is emptied above via `DataChunkBuilder::consume_all`.
470 assert!(
471 builder.is_empty(),
472 "builder should already be emptied"
473 );
474 builder = create_builder(
475 new_rate_limit,
476 self.chunk_size,
477 self.upstream_table.schema().data_types(),
478 );
479 }
480 }
481 }
482 _ => (),
483 }
484 }
485
486 yield Message::Barrier(barrier);
487
488 if snapshot_read_complete {
489 break 'backfill_loop;
490 }
491
492 // We will switch snapshot at the start of the next iteration of the backfill loop.
493 }
494 }
495
496 tracing::trace!("Backfill has finished, waiting for barrier");
497
498 // Wait for first barrier to come after backfill is finished.
499 // So we can update our progress + persist the status.
500 while let Some(Ok(msg)) = upstream.next().await {
501 if let Some(msg) = mapping_message(msg, &self.output_indices) {
502 // If not finished then we need to update state, otherwise no need.
503 if let Message::Barrier(barrier) = &msg {
504 if is_finished {
505 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
506 if let Some(table) = &mut self.state_table {
507 table
508 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
509 .await?;
510 }
511 } else {
512 // If snapshot was empty, we do not need to backfill,
513 // but we still need to persist the finished state.
514 // We currently persist it on the second barrier here rather than first.
515 // This is because we can't update state table in first epoch,
516 // since it expects to have been initialized in previous epoch
517 // (there's no epoch before the first epoch).
518 if current_pos.is_none() {
519 current_pos = Some(construct_initial_finished_state(pk_indices.len()))
520 }
521
522 // We will update current_pos at least once,
523 // since snapshot read has to be non-empty,
524 // Or snapshot was empty and we construct a placeholder state.
525 debug_assert_ne!(current_pos, None);
526
527 Self::persist_state(
528 barrier.epoch,
529 &mut self.state_table,
530 true,
531 ¤t_pos,
532 total_snapshot_processed_rows,
533 &mut old_state,
534 &mut current_state,
535 )
536 .await?;
537 tracing::trace!(
538 epoch = ?barrier.epoch,
539 ?current_pos,
540 total_snapshot_processed_rows,
541 "Backfill position persisted after completion"
542 );
543 }
544
545 // For both backfill finished before recovery,
546 // and backfill which just finished, we need to update mview tracker,
547 // it does not persist this information.
548 self.progress
549 .finish(barrier.epoch, total_snapshot_processed_rows);
550 tracing::trace!(
551 epoch = ?barrier.epoch,
552 "Updated CreateMaterializedTracker"
553 );
554 yield msg;
555 break;
556 }
557 // Allow other messages to pass through.
558 // We won't yield twice here, since if there's a barrier,
559 // we will always break out of the loop.
560 yield msg;
561 }
562 }
563
564 tracing::trace!(
565 "Backfill has already finished and forward messages directly to the downstream"
566 );
567
568 // After progress finished + state persisted,
569 // we can forward messages directly to the downstream,
570 // as backfill is finished.
571 // We don't need to report backfill progress any longer, as it has finished.
572 // It will always be at 100%.
573 #[for_await]
574 for msg in upstream {
575 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
576 if let Message::Barrier(barrier) = &msg {
577 // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway.
578 if let Some(table) = &mut self.state_table {
579 table
580 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
581 .await?;
582 }
583 }
584
585 yield msg;
586 }
587 }
588 }
589
590 async fn recover_backfill_state(
591 state_table: Option<&StateTable<S>>,
592 pk_len: usize,
593 ) -> StreamExecutorResult<BackfillState> {
594 let Some(state_table) = state_table else {
595 // If no state table, but backfill is present, it must be from an old cluster.
596 // In that case backfill must be finished, otherwise it won't have been persisted.
597 return Ok(BackfillState {
598 current_pos: None,
599 is_finished: true,
600 row_count: 0,
601 old_state: None,
602 });
603 };
604 let mut vnodes = state_table.vnodes().iter_vnodes_scalar();
605 let first_vnode = vnodes.next().unwrap();
606 let key: &[Datum] = &[Some(first_vnode.into())];
607 let row = state_table.get_row(key).await?;
608 let expected_state = Self::deserialize_backfill_state(row, pk_len);
609
610 // All vnode partitions should have same state (no scale-in supported).
611 for vnode in vnodes {
612 let key: &[Datum] = &[Some(vnode.into())];
613 let row = state_table.get_row(key).await?;
614 let state = Self::deserialize_backfill_state(row, pk_len);
615 assert_eq!(state.is_finished, expected_state.is_finished);
616 }
617 Ok(expected_state)
618 }
619
620 fn deserialize_backfill_state(row: Option<OwnedRow>, pk_len: usize) -> BackfillState {
621 let Some(row) = row else {
622 return BackfillState {
623 current_pos: None,
624 is_finished: false,
625 row_count: 0,
626 old_state: None,
627 };
628 };
629 let row = row.into_inner();
630 let mut old_state = vec![None; pk_len + METADATA_STATE_LEN];
631 old_state[1..row.len() + 1].clone_from_slice(&row);
632 let current_pos = Some((&row[0..pk_len]).into_owned_row());
633 let is_finished = row[pk_len].clone().is_some_and(|d| d.into_bool());
634 let row_count = row
635 .get(pk_len + 1)
636 .cloned()
637 .unwrap_or(None)
638 .map_or(0, |d| d.into_int64() as u64);
639 BackfillState {
640 current_pos,
641 is_finished,
642 row_count,
643 old_state: Some(old_state),
644 }
645 }
646
647 #[try_stream(ok = Option<OwnedRow>, error = StreamExecutorError)]
648 async fn make_snapshot_stream<'a>(
649 upstream_table: &'a BatchTable<S>,
650 epoch: u64,
651 current_pos: Option<OwnedRow>,
652 paused: bool,
653 rate_limiter: &'a MonitoredRateLimiter,
654 ) {
655 if paused {
656 #[for_await]
657 for _ in tokio_stream::pending() {
658 bail!("BUG: paused stream should not yield");
659 }
660 } else {
661 // Checked the rate limit is not zero.
662 #[for_await]
663 for r in
664 Self::snapshot_read(upstream_table, HummockReadEpoch::NoWait(epoch), current_pos)
665 {
666 rate_limiter.wait(1).await;
667 yield Some(r?);
668 }
669 }
670 yield None;
671 }
672
673 /// Snapshot read the upstream mv.
674 /// The rows from upstream snapshot read will be buffered inside the `builder`.
675 /// If snapshot is dropped before its rows are consumed,
676 /// remaining data in `builder` must be flushed manually.
677 /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
678 /// present, Then when we flush we contain duplicate rows.
679 #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
680 pub async fn snapshot_read(
681 upstream_table: &BatchTable<S>,
682 epoch: HummockReadEpoch,
683 current_pos: Option<OwnedRow>,
684 ) {
685 let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos);
686 let range_bounds = match range_bounds {
687 None => {
688 return Ok(());
689 }
690 Some(range_bounds) => range_bounds,
691 };
692
693 // We use uncommitted read here, because we have already scheduled the `BackfillExecutor`
694 // together with the upstream mv.
695 let row_iter = upstream_table
696 .batch_iter_with_pk_bounds(
697 epoch,
698 row::empty(),
699 range_bounds,
700 true,
701 // Here we only use small range prefetch because every barrier change, the executor will recreate a new iterator. So we do not need prefetch too much data.
702 PrefetchOptions::prefetch_for_small_range_scan(),
703 )
704 .await?;
705
706 #[for_await]
707 for row in row_iter {
708 yield row?;
709 }
710 }
711
712 async fn persist_state(
713 epoch: EpochPair,
714 table: &mut Option<StateTable<S>>,
715 is_finished: bool,
716 current_pos: &Option<OwnedRow>,
717 row_count: u64,
718 old_state: &mut Option<Vec<Datum>>,
719 current_state: &mut [Datum],
720 ) -> StreamExecutorResult<()> {
721 // Backwards compatibility with no state table in backfill.
722 let Some(table) = table else { return Ok(()) };
723 utils::persist_state(
724 epoch,
725 table,
726 is_finished,
727 current_pos,
728 row_count,
729 old_state,
730 current_state,
731 )
732 .await
733 }
734
735 /// 1. Converts from data chunk to stream chunk.
736 /// 2. Update the current position.
737 /// 3. Update Metrics
738 /// 4. Map the chunk according to output indices, return
739 /// the stream chunk and do wrapping outside.
740 fn handle_snapshot_chunk(
741 data_chunk: DataChunk,
742 current_pos: &mut Option<OwnedRow>,
743 cur_barrier_snapshot_processed_rows: &mut u64,
744 total_snapshot_processed_rows: &mut u64,
745 pk_indices: &[usize],
746 output_indices: &[usize],
747 ) -> StreamChunk {
748 let ops = vec![Op::Insert; data_chunk.capacity()];
749 let chunk = StreamChunk::from_parts(ops, data_chunk);
750 // Raise the current position.
751 // As snapshot read streams are ordered by pk, so we can
752 // just use the last row to update `current_pos`.
753 *current_pos = Some(get_new_pos(&chunk, pk_indices));
754
755 let chunk_cardinality = chunk.cardinality() as u64;
756 *cur_barrier_snapshot_processed_rows += chunk_cardinality;
757 *total_snapshot_processed_rows += chunk_cardinality;
758
759 mapping_chunk(chunk, output_indices)
760 }
761}
762
763impl<S> Execute for BackfillExecutor<S>
764where
765 S: StateStore,
766{
767 fn execute(self: Box<Self>) -> BoxedMessageStream {
768 self.execute_inner().boxed()
769 }
770}