risingwave_stream/executor/
locality_provider.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use either::Either;
19use futures::stream::select_with_strategy;
20use futures::{TryStreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::{DataChunk, Op, StreamChunk};
24use risingwave_common::catalog::Schema;
25use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::types::{Datum, ToOwnedDatum};
28use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
29use risingwave_common::util::sort_util::cmp_datum_iter;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_storage::StateStore;
32use risingwave_storage::store::PrefetchOptions;
33
34use crate::common::table::state_table::StateTable;
35use crate::executor::backfill::utils::create_builder;
36use crate::executor::prelude::*;
37use crate::task::{CreateMviewProgressReporter, FragmentId};
38
39type Builders = HashMap<VirtualNode, DataChunkBuilder>;
40
41/// Progress state for tracking backfill per vnode
42#[derive(Clone, Debug, PartialEq, Eq)]
43enum LocalityBackfillProgress {
44    /// Backfill not started for this vnode
45    NotStarted,
46    /// Backfill in progress, tracking current position
47    InProgress {
48        /// Current position in the locality-ordered scan
49        current_pos: OwnedRow,
50        /// Number of rows processed for this vnode
51        processed_rows: u64,
52    },
53    /// Backfill completed for this vnode
54    Completed {
55        /// Final position reached
56        final_pos: OwnedRow,
57        /// Total rows processed for this vnode
58        total_rows: u64,
59    },
60}
61
62/// State management for locality provider backfill process
63#[derive(Clone, Debug)]
64struct LocalityBackfillState {
65    /// Progress per vnode
66    per_vnode: HashMap<VirtualNode, LocalityBackfillProgress>,
67    /// Total snapshot rows read across all vnodes
68    total_snapshot_rows: u64,
69}
70
71impl LocalityBackfillState {
72    fn new(vnodes: impl Iterator<Item = VirtualNode>) -> Self {
73        let per_vnode = vnodes
74            .map(|vnode| (vnode, LocalityBackfillProgress::NotStarted))
75            .collect();
76        Self {
77            per_vnode,
78            total_snapshot_rows: 0,
79        }
80    }
81
82    fn is_completed(&self) -> bool {
83        self.per_vnode
84            .values()
85            .all(|progress| matches!(progress, LocalityBackfillProgress::Completed { .. }))
86    }
87
88    fn vnodes(&self) -> impl Iterator<Item = (VirtualNode, &LocalityBackfillProgress)> {
89        self.per_vnode
90            .iter()
91            .map(|(&vnode, progress)| (vnode, progress))
92    }
93
94    fn has_progress(&self) -> bool {
95        self.per_vnode
96            .values()
97            .any(|progress| matches!(progress, LocalityBackfillProgress::InProgress { .. }))
98    }
99
100    fn update_progress(&mut self, vnode: VirtualNode, new_pos: OwnedRow, row_count_delta: u64) {
101        let progress = self.per_vnode.get_mut(&vnode).unwrap();
102        match progress {
103            LocalityBackfillProgress::NotStarted => {
104                *progress = LocalityBackfillProgress::InProgress {
105                    current_pos: new_pos,
106                    processed_rows: row_count_delta,
107                };
108            }
109            LocalityBackfillProgress::InProgress { processed_rows, .. } => {
110                *progress = LocalityBackfillProgress::InProgress {
111                    current_pos: new_pos,
112                    processed_rows: *processed_rows + row_count_delta,
113                };
114            }
115            LocalityBackfillProgress::Completed { .. } => {
116                // Already completed, shouldn't update
117            }
118        }
119        self.total_snapshot_rows += row_count_delta;
120    }
121
122    fn finish_vnode(&mut self, vnode: VirtualNode, pk_len: usize) {
123        let progress = self.per_vnode.get_mut(&vnode).unwrap();
124        match progress {
125            LocalityBackfillProgress::NotStarted => {
126                // Create a final position with pk_len NULL values to indicate completion
127                let final_pos = OwnedRow::new(vec![None; pk_len]);
128                *progress = LocalityBackfillProgress::Completed {
129                    final_pos,
130                    total_rows: 0,
131                };
132            }
133            LocalityBackfillProgress::InProgress {
134                current_pos,
135                processed_rows,
136            } => {
137                *progress = LocalityBackfillProgress::Completed {
138                    final_pos: current_pos.clone(),
139                    total_rows: *processed_rows,
140                };
141            }
142            LocalityBackfillProgress::Completed { .. } => {
143                // Already completed
144            }
145        }
146    }
147
148    fn get_progress(&self, vnode: &VirtualNode) -> &LocalityBackfillProgress {
149        self.per_vnode.get(vnode).unwrap()
150    }
151}
152
153/// The `LocalityProviderExecutor` provides locality for operators during backfilling.
154/// It buffers input data into a state table using locality columns as primary key prefix.
155///
156/// The executor implements a proper backfill process similar to arrangement backfill:
157/// 1. Backfill phase: Buffer incoming data and provide locality-ordered snapshot reads
158/// 2. Forward phase: Once backfill is complete, forward upstream messages directly
159///
160/// Key improvements over the original implementation:
161/// - Removes arbitrary barrier buffer limit
162/// - Implements proper upstream chunk tracking during backfill
163/// - Uses per-vnode progress tracking for better state management
164pub struct LocalityProviderExecutor<S: StateStore> {
165    /// Upstream input
166    upstream: Executor,
167
168    /// Locality columns (indices in input schema)
169    #[allow(dead_code)]
170    locality_columns: Vec<usize>,
171
172    /// State table for buffering input data
173    state_table: StateTable<S>,
174
175    /// Progress table for tracking backfill progress per vnode
176    progress_table: StateTable<S>,
177
178    input_schema: Schema,
179
180    /// Progress reporter for materialized view creation
181    progress: CreateMviewProgressReporter,
182
183    fragment_id: FragmentId,
184
185    actor_id: ActorId,
186
187    /// Metrics
188    metrics: Arc<StreamingMetrics>,
189
190    /// Chunk size for output
191    chunk_size: usize,
192}
193
194impl<S: StateStore> LocalityProviderExecutor<S> {
195    #[allow(clippy::too_many_arguments)]
196    pub fn new(
197        upstream: Executor,
198        locality_columns: Vec<usize>,
199        state_table: StateTable<S>,
200        progress_table: StateTable<S>,
201        input_schema: Schema,
202        progress: CreateMviewProgressReporter,
203        metrics: Arc<StreamingMetrics>,
204        chunk_size: usize,
205        fragment_id: FragmentId,
206    ) -> Self {
207        Self {
208            upstream,
209            locality_columns,
210            state_table,
211            progress_table,
212            input_schema,
213            actor_id: progress.actor_id(),
214            progress,
215            metrics,
216            chunk_size,
217            fragment_id,
218        }
219    }
220
221    /// Creates a snapshot stream that reads from state table in locality order
222    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
223    async fn make_snapshot_stream<'a>(
224        state_table: &'a StateTable<S>,
225        backfill_state: LocalityBackfillState,
226    ) {
227        // Read from state table per vnode in locality order
228        for vnode in state_table.vnodes().iter_vnodes() {
229            let progress = backfill_state.get_progress(&vnode);
230
231            let current_pos = match progress {
232                LocalityBackfillProgress::NotStarted => None,
233                LocalityBackfillProgress::Completed { .. } => {
234                    // Skip completed vnodes
235                    continue;
236                }
237                LocalityBackfillProgress::InProgress { current_pos, .. } => {
238                    Some(current_pos.clone())
239                }
240            };
241
242            // Compute range bounds for iteration based on current position
243            let range_bounds = if let Some(ref pos) = current_pos {
244                let start_bound = std::ops::Bound::Excluded(pos.as_inner());
245                (start_bound, std::ops::Bound::<&[Datum]>::Unbounded)
246            } else {
247                (
248                    std::ops::Bound::<&[Datum]>::Unbounded,
249                    std::ops::Bound::<&[Datum]>::Unbounded,
250                )
251            };
252
253            // Iterate over rows for this vnode
254            let iter = state_table
255                .iter_with_vnode(
256                    vnode,
257                    &range_bounds,
258                    PrefetchOptions::prefetch_for_small_range_scan(),
259                )
260                .await?;
261            pin_mut!(iter);
262
263            while let Some(row) = iter.try_next().await? {
264                yield Some((vnode, row));
265            }
266        }
267
268        // Signal end of stream
269        yield None;
270    }
271
272    /// Persist backfill state to progress table
273    async fn persist_backfill_state(
274        progress_table: &mut StateTable<S>,
275        backfill_state: &LocalityBackfillState,
276    ) -> StreamExecutorResult<()> {
277        for (vnode, progress) in &backfill_state.per_vnode {
278            let (is_finished, current_pos, row_count) = match progress {
279                LocalityBackfillProgress::NotStarted => continue, // Don't persist NotStarted
280                LocalityBackfillProgress::InProgress {
281                    current_pos,
282                    processed_rows,
283                } => (false, current_pos.clone(), *processed_rows),
284                LocalityBackfillProgress::Completed {
285                    final_pos,
286                    total_rows,
287                } => (true, final_pos.clone(), *total_rows),
288            };
289
290            // Build progress row: vnode + current_pos + is_finished + row_count
291            let mut row_data = vec![Some(vnode.to_scalar().into())];
292            row_data.extend(current_pos);
293            row_data.push(Some(risingwave_common::types::ScalarImpl::Bool(
294                is_finished,
295            )));
296            row_data.push(Some(risingwave_common::types::ScalarImpl::Int64(
297                row_count as i64,
298            )));
299
300            let new_row = OwnedRow::new(row_data);
301
302            // Check if there's an existing row for this vnode to determine insert vs update
303            // This ensures state operation consistency - update existing rows, insert new ones
304            let key_data = vec![Some(vnode.to_scalar().into())];
305            let key = OwnedRow::new(key_data);
306
307            if let Some(existing_row) = progress_table.get_row(&key).await? {
308                // Update existing state - ensures proper state transition for recovery
309                progress_table.update(existing_row, new_row);
310            } else {
311                // Insert new state - first time persisting for this vnode
312                progress_table.insert(new_row);
313            }
314        }
315        Ok(())
316    }
317
318    /// Load backfill state from progress table
319    async fn load_backfill_state(
320        progress_table: &StateTable<S>,
321    ) -> StreamExecutorResult<LocalityBackfillState> {
322        let mut backfill_state = LocalityBackfillState::new(progress_table.vnodes().iter_vnodes());
323        let mut total_snapshot_rows = 0;
324
325        // For each vnode, try to get its progress state
326        for vnode in progress_table.vnodes().iter_vnodes() {
327            // Build key: vnode + NULL values for locality columns (to match progress table schema)
328            let key_data = vec![Some(vnode.to_scalar().into())];
329
330            let key = OwnedRow::new(key_data);
331
332            if let Some(row) = progress_table.get_row(&key).await? {
333                // Parse is_finished flag (second to last column)
334                let finished_col_idx = row.len() - 2;
335                let is_finished = row
336                    .datum_at(finished_col_idx)
337                    .map(|d| d.into_bool())
338                    .unwrap_or(false);
339
340                // Parse row count (last column)
341                let row_count = row
342                    .datum_at(row.len() - 1)
343                    .map(|d| d.into_int64() as u64)
344                    .unwrap_or(0);
345
346                let current_pos_data: Vec<Datum> = (1..finished_col_idx)
347                    .map(|i| row.datum_at(i).to_owned_datum())
348                    .collect();
349                let current_pos = OwnedRow::new(current_pos_data);
350
351                // Set progress based on is_finished flag
352                let progress = if is_finished {
353                    LocalityBackfillProgress::Completed {
354                        final_pos: current_pos,
355                        total_rows: row_count,
356                    }
357                } else {
358                    LocalityBackfillProgress::InProgress {
359                        current_pos,
360                        processed_rows: row_count,
361                    }
362                };
363
364                backfill_state.per_vnode.insert(vnode, progress);
365                total_snapshot_rows += row_count;
366            }
367            // If no row found, keep the default NotStarted state
368        }
369
370        backfill_state.total_snapshot_rows = total_snapshot_rows;
371        Ok(backfill_state)
372    }
373
374    /// Mark chunk for forwarding based on backfill progress
375    fn mark_chunk(
376        chunk: StreamChunk,
377        backfill_state: &LocalityBackfillState,
378        state_table: &StateTable<S>,
379    ) -> StreamExecutorResult<StreamChunk> {
380        let chunk = chunk.compact();
381        let (data, ops) = chunk.into_parts();
382        let mut new_visibility = risingwave_common::bitmap::BitmapBuilder::with_capacity(ops.len());
383
384        let pk_indices = state_table.pk_indices();
385        let pk_order = state_table.pk_serde().get_order_types();
386
387        for row in data.rows() {
388            // Project to primary key columns for comparison
389            let pk = row.project(pk_indices);
390            let vnode = state_table.compute_vnode_by_pk(pk);
391
392            let visible = match backfill_state.get_progress(&vnode) {
393                LocalityBackfillProgress::Completed { .. } => true,
394                LocalityBackfillProgress::NotStarted => false,
395                LocalityBackfillProgress::InProgress { current_pos, .. } => {
396                    // Compare primary key with current position
397                    cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
398                }
399            };
400
401            new_visibility.append(visible);
402        }
403
404        let (columns, _) = data.into_parts();
405        let chunk = StreamChunk::with_visibility(ops, columns, new_visibility.finish());
406        Ok(chunk)
407    }
408
409    fn handle_snapshot_chunk(
410        data_chunk: DataChunk,
411        vnode: VirtualNode,
412        pk_indices: &[usize],
413        backfill_state: &mut LocalityBackfillState,
414        cur_barrier_snapshot_processed_rows: &mut u64,
415    ) -> StreamExecutorResult<StreamChunk> {
416        let chunk = StreamChunk::from_parts(vec![Op::Insert; data_chunk.cardinality()], data_chunk);
417        let chunk_cardinality = chunk.cardinality() as u64;
418
419        // Extract primary key from the last row to update progress
420        // As snapshot read streams are ordered by pk, we can use the last row to update current_pos
421        if let Some(last_row) = chunk.rows().last() {
422            let pk = last_row.1.project(pk_indices);
423            let pk_owned = pk.into_owned_row();
424            backfill_state.update_progress(vnode, pk_owned, chunk_cardinality);
425        }
426
427        *cur_barrier_snapshot_processed_rows += chunk_cardinality;
428        Ok(chunk)
429    }
430}
431
432impl<S: StateStore> Execute for LocalityProviderExecutor<S> {
433    fn execute(self: Box<Self>) -> BoxedMessageStream {
434        self.execute_inner().boxed()
435    }
436}
437
438impl<S: StateStore> LocalityProviderExecutor<S> {
439    #[try_stream(ok = Message, error = StreamExecutorError)]
440    async fn execute_inner(mut self) {
441        let mut upstream = self.upstream.execute();
442
443        // Wait for first barrier to initialize
444        let first_barrier = expect_first_barrier(&mut upstream).await?;
445        let first_epoch = first_barrier.epoch;
446
447        // Propagate the first barrier
448        yield Message::Barrier(first_barrier);
449
450        let mut state_table = self.state_table;
451        let mut progress_table = self.progress_table;
452
453        // Initialize state tables
454        state_table.init_epoch(first_epoch).await?;
455        progress_table.init_epoch(first_epoch).await?;
456
457        // Load backfill state from progress table
458        let mut backfill_state = Self::load_backfill_state(&progress_table).await?;
459
460        // Get pk info from state table
461        let pk_indices = state_table.pk_indices().iter().cloned().collect_vec();
462
463        let need_backfill = !backfill_state.is_completed();
464
465        let need_buffering = backfill_state
466            .per_vnode
467            .values()
468            .all(|progress| matches!(progress, LocalityBackfillProgress::NotStarted));
469
470        // Initial buffering phase before backfill - wait for StartFragmentBackfill mutation (if needed)
471        if need_buffering {
472            // Enter buffering phase - buffer data until StartFragmentBackfill is received
473            let mut start_backfill = false;
474
475            #[for_await]
476            for msg in upstream.by_ref() {
477                let msg = msg?;
478
479                match msg {
480                    Message::Watermark(_) => {
481                        // Ignore watermarks during initial buffering
482                    }
483                    Message::Chunk(chunk) => {
484                        state_table.write_chunk(chunk);
485                        state_table.try_flush().await?;
486                    }
487                    Message::Barrier(barrier) => {
488                        let epoch = barrier.epoch;
489
490                        // Check for StartFragmentBackfill mutation
491                        if let Some(mutation) = barrier.mutation.as_deref() {
492                            use crate::executor::Mutation;
493                            if let Mutation::StartFragmentBackfill { fragment_ids } = mutation
494                                && fragment_ids.contains(&self.fragment_id)
495                            {
496                                tracing::info!(
497                                    "Start backfill of locality provider with fragment id: {:?}",
498                                    &self.fragment_id
499                                );
500                                start_backfill = true;
501                            }
502                        }
503
504                        // Commit state tables
505                        let post_commit1 = state_table.commit(epoch).await?;
506                        let post_commit2 = progress_table.commit(epoch).await?;
507
508                        yield Message::Barrier(barrier);
509                        post_commit1.post_yield_barrier(None).await?;
510                        post_commit2.post_yield_barrier(None).await?;
511
512                        // Start backfill when StartFragmentBackfill mutation is received
513                        if start_backfill {
514                            break;
515                        }
516                    }
517                }
518            }
519        }
520
521        // Locality Provider Backfill Algorithm (adapted from Arrangement Backfill):
522        //
523        //   backfill_stream
524        //  /               \
525        // upstream       snapshot (from state_table)
526        //
527        // We construct a backfill stream with upstream as its left input and locality-ordered
528        // snapshot read stream as its right input. When a chunk comes from upstream, we buffer it.
529        //
530        // When a barrier comes from upstream:
531        //  - For each row of the upstream chunk buffer, compute vnode.
532        //  - Get the `current_pos` corresponding to the vnode. Forward it to downstream if its
533        //    locality key <= `current_pos`, otherwise ignore it.
534        //  - Flush all buffered upstream_chunks to state table.
535        //  - Persist backfill progress to progress table.
536        //  - Reconstruct the whole backfill stream with upstream and new snapshot read stream.
537        //
538        // When a chunk comes from snapshot, we forward it to the downstream and raise
539        // `current_pos`.
540        //
541        // When we reach the end of the snapshot read stream, it means backfill has been
542        // finished.
543        //
544        // Once the backfill loop ends, we forward the upstream directly to the downstream.
545
546        if need_backfill {
547            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
548            let mut pending_barrier: Option<Barrier> = None;
549
550            let metrics = self
551                .metrics
552                .new_backfill_metrics(state_table.table_id(), self.actor_id);
553
554            // Create builders for snapshot data chunks
555            let snapshot_data_types = self.input_schema.data_types();
556            let mut builders: Builders = state_table
557                .vnodes()
558                .iter_vnodes()
559                .map(|vnode| {
560                    let builder = create_builder(
561                        RateLimit::Disabled,
562                        self.chunk_size,
563                        snapshot_data_types.clone(),
564                    );
565                    (vnode, builder)
566                })
567                .collect();
568
569            'backfill_loop: loop {
570                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
571                let mut cur_barrier_upstream_processed_rows: u64 = 0;
572
573                // Create the backfill stream with upstream and snapshot
574                {
575                    let left_upstream = upstream.by_ref().map(Either::Left);
576                    let right_snapshot = pin!(
577                        Self::make_snapshot_stream(&state_table, backfill_state.clone(),)
578                            .map(Either::Right)
579                    );
580
581                    // Prefer to select upstream, so we can stop snapshot stream as soon as the
582                    // barrier comes.
583                    let mut backfill_stream =
584                        select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
585                            stream::PollNext::Left
586                        });
587
588                    #[for_await]
589                    for either in &mut backfill_stream {
590                        match either {
591                            // Upstream
592                            Either::Left(msg) => {
593                                match msg? {
594                                    Message::Barrier(barrier) => {
595                                        // We have to process the barrier outside of the loop.
596                                        pending_barrier = Some(barrier);
597                                        break;
598                                    }
599                                    Message::Chunk(chunk) => {
600                                        // Buffer the upstream chunk.
601                                        upstream_chunk_buffer.push(chunk.compact());
602                                    }
603                                    Message::Watermark(_) => {
604                                        // Ignore watermark during backfill.
605                                    }
606                                }
607                            }
608                            // Snapshot read
609                            Either::Right(msg) => {
610                                match msg? {
611                                    None => {
612                                        // End of the snapshot read stream.
613                                        // Consume remaining rows in the builders.
614                                        for (vnode, builder) in &mut builders {
615                                            if let Some(data_chunk) = builder.consume_all() {
616                                                let chunk = Self::handle_snapshot_chunk(
617                                                    data_chunk,
618                                                    *vnode,
619                                                    &pk_indices,
620                                                    &mut backfill_state,
621                                                    &mut cur_barrier_snapshot_processed_rows,
622                                                )?;
623                                                yield Message::Chunk(chunk);
624                                            }
625                                        }
626
627                                        // Consume remaining rows in the upstream buffer.
628                                        for chunk in upstream_chunk_buffer.drain(..) {
629                                            let chunk_cardinality = chunk.cardinality() as u64;
630                                            cur_barrier_upstream_processed_rows +=
631                                                chunk_cardinality;
632                                            yield Message::Chunk(chunk);
633                                        }
634                                        metrics
635                                            .backfill_snapshot_read_row_count
636                                            .inc_by(cur_barrier_snapshot_processed_rows);
637                                        metrics
638                                            .backfill_upstream_output_row_count
639                                            .inc_by(cur_barrier_upstream_processed_rows);
640                                        break 'backfill_loop;
641                                    }
642                                    Some((vnode, row)) => {
643                                        // Use builder to batch rows efficiently
644                                        let builder = builders.get_mut(&vnode).unwrap();
645                                        if let Some(data_chunk) = builder.append_one_row(row) {
646                                            // Builder is full, handle the chunk
647                                            let chunk = Self::handle_snapshot_chunk(
648                                                data_chunk,
649                                                vnode,
650                                                &pk_indices,
651                                                &mut backfill_state,
652                                                &mut cur_barrier_snapshot_processed_rows,
653                                            )?;
654                                            yield Message::Chunk(chunk);
655                                        }
656                                        // If append_one_row returns None, row is buffered but no chunk is produced yet
657                                        // Progress will be updated when the builder is consumed later
658                                    }
659                                }
660                            }
661                        }
662                    }
663                }
664
665                // Process barrier
666                let barrier = match pending_barrier.take() {
667                    Some(barrier) => barrier,
668                    None => break 'backfill_loop, // Reached end of backfill
669                };
670
671                // Consume remaining rows from builders at barrier
672                for (vnode, builder) in &mut builders {
673                    if let Some(data_chunk) = builder.consume_all() {
674                        let chunk = Self::handle_snapshot_chunk(
675                            data_chunk,
676                            *vnode,
677                            &pk_indices,
678                            &mut backfill_state,
679                            &mut cur_barrier_snapshot_processed_rows,
680                        )?;
681                        yield Message::Chunk(chunk);
682                    }
683                }
684
685                // Process upstream buffer chunks with marking
686                for chunk in upstream_chunk_buffer.drain(..) {
687                    cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
688
689                    // Mark chunk based on backfill progress
690                    if backfill_state.has_progress() {
691                        let marked_chunk =
692                            Self::mark_chunk(chunk.clone(), &backfill_state, &state_table)?;
693                        yield Message::Chunk(marked_chunk);
694                    }
695                }
696
697                // no-op commit state table
698                state_table
699                    .commit_assert_no_update_vnode_bitmap(barrier.epoch)
700                    .await?;
701
702                // Update progress with current epoch and snapshot read count
703                let total_snapshot_processed_rows: u64 = backfill_state
704                    .vnodes()
705                    .map(|(_, progress)| match *progress {
706                        LocalityBackfillProgress::InProgress { processed_rows, .. } => {
707                            processed_rows
708                        }
709                        LocalityBackfillProgress::Completed { total_rows, .. } => total_rows,
710                        LocalityBackfillProgress::NotStarted => 0,
711                    })
712                    .sum();
713
714                self.progress.update(
715                    barrier.epoch,
716                    barrier.epoch.curr, // Use barrier epoch as snapshot read epoch
717                    total_snapshot_processed_rows,
718                );
719
720                // Persist backfill progress
721                Self::persist_backfill_state(&mut progress_table, &backfill_state).await?;
722                let barrier_epoch = barrier.epoch;
723                let post_commit = progress_table.commit(barrier_epoch).await?;
724
725                metrics
726                    .backfill_snapshot_read_row_count
727                    .inc_by(cur_barrier_snapshot_processed_rows);
728                metrics
729                    .backfill_upstream_output_row_count
730                    .inc_by(cur_barrier_upstream_processed_rows);
731
732                yield Message::Barrier(barrier);
733                post_commit.post_yield_barrier(None).await?;
734            }
735        }
736
737        tracing::debug!("Locality provider backfill finished, forwarding upstream directly");
738
739        // Wait for first barrier after backfill completion to mark progress as finished
740        if need_backfill && !backfill_state.is_completed() {
741            while let Some(Ok(msg)) = upstream.next().await {
742                match msg {
743                    Message::Barrier(barrier) => {
744                        // no-op commit state table
745                        state_table
746                            .commit_assert_no_update_vnode_bitmap(barrier.epoch)
747                            .await?;
748
749                        // Mark all vnodes as completed
750                        for vnode in state_table.vnodes().iter_vnodes() {
751                            backfill_state.finish_vnode(vnode, pk_indices.len());
752                        }
753
754                        // Calculate final total processed rows
755                        let total_snapshot_processed_rows: u64 = backfill_state
756                            .vnodes()
757                            .map(|(_, progress)| match *progress {
758                                LocalityBackfillProgress::Completed { total_rows, .. } => {
759                                    total_rows
760                                }
761                                LocalityBackfillProgress::InProgress { processed_rows, .. } => {
762                                    processed_rows
763                                }
764                                LocalityBackfillProgress::NotStarted => 0,
765                            })
766                            .sum();
767
768                        // Finish progress reporting
769                        self.progress
770                            .finish(barrier.epoch, total_snapshot_processed_rows);
771
772                        // Persist final state
773                        Self::persist_backfill_state(&mut progress_table, &backfill_state).await?;
774                        let post_commit = progress_table.commit(barrier.epoch).await?;
775
776                        yield Message::Barrier(barrier);
777                        post_commit.post_yield_barrier(None).await?;
778                        break; // Exit the loop after processing the barrier
779                    }
780                    Message::Chunk(chunk) => {
781                        // Forward chunks directly during completion phase
782                        yield Message::Chunk(chunk);
783                    }
784                    Message::Watermark(watermark) => {
785                        // Forward watermarks directly during completion phase
786                        yield Message::Watermark(watermark);
787                    }
788                }
789            }
790        }
791
792        // After backfill completion, forward messages directly
793        #[for_await]
794        for msg in upstream {
795            let msg = msg?;
796
797            match msg {
798                Message::Barrier(barrier) => {
799                    // Commit state tables but don't modify them
800                    state_table
801                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
802                        .await?;
803                    progress_table
804                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
805                        .await?;
806                    yield Message::Barrier(barrier);
807                }
808                _ => {
809                    // Forward all other messages directly
810                    yield msg;
811                }
812            }
813        }
814    }
815}