risingwave_stream/executor/backfill/snapshot_backfill/
executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::VecDeque;
17use std::future::{Future, pending, ready};
18use std::mem::take;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use futures::future::{Either, try_join_all};
24use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, pin_mut};
25use risingwave_common::array::StreamChunk;
26use risingwave_common::hash::VnodeBitmapExt;
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::OwnedRow;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_pb::common::PbThrottleType;
33use risingwave_storage::StateStore;
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::ChangeLogRow;
36use risingwave_storage::table::batch_table::BatchTable;
37use tokio::select;
38use tokio::sync::mpsc::UnboundedReceiver;
39use tokio::time::sleep;
40
41use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
42use crate::executor::backfill::snapshot_backfill::state::{
43    BackfillState, EpochBackfillProgress, VnodeBackfillProgress,
44};
45use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
46use crate::executor::backfill::utils::{create_builder, mapping_message};
47use crate::executor::monitor::StreamingMetrics;
48use crate::executor::prelude::{StateTable, StreamExt, try_stream};
49use crate::executor::{
50    ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, DispatcherMessage, Execute,
51    MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult,
52    expect_first_barrier,
53};
54use crate::task::CreateMviewProgressReporter;
55
56pub struct SnapshotBackfillExecutor<S: StateStore> {
57    /// Upstream table
58    upstream_table: BatchTable<S>,
59
60    /// Backfill progress table
61    progress_state_table: StateTable<S>,
62
63    /// Upstream with the same schema with the upstream table.
64    upstream: MergeExecutorInput,
65
66    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
67    output_indices: Vec<usize>,
68
69    progress: CreateMviewProgressReporter,
70
71    chunk_size: usize,
72    rate_limit: RateLimit,
73
74    barrier_rx: UnboundedReceiver<Barrier>,
75
76    actor_ctx: ActorContextRef,
77    metrics: Arc<StreamingMetrics>,
78
79    snapshot_epoch: Option<u64>,
80}
81
82impl<S: StateStore> SnapshotBackfillExecutor<S> {
83    #[expect(clippy::too_many_arguments)]
84    pub(crate) fn new(
85        upstream_table: BatchTable<S>,
86        progress_state_table: StateTable<S>,
87        upstream: MergeExecutorInput,
88        output_indices: Vec<usize>,
89        actor_ctx: ActorContextRef,
90        progress: CreateMviewProgressReporter,
91        chunk_size: usize,
92        rate_limit: RateLimit,
93        barrier_rx: UnboundedReceiver<Barrier>,
94        metrics: Arc<StreamingMetrics>,
95        snapshot_epoch: Option<u64>,
96    ) -> Self {
97        assert_eq!(&upstream.info.schema, upstream_table.schema());
98        if upstream_table.pk_in_output_indices().is_none() {
99            panic!(
100                "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}",
101                upstream_table.pk_indices(),
102                upstream_table.output_indices(),
103                upstream_table.schema()
104            )
105        };
106        if !matches!(rate_limit, RateLimit::Disabled) {
107            trace!(
108                ?rate_limit,
109                "create snapshot backfill executor with rate limit"
110            );
111        }
112        Self {
113            upstream_table,
114            progress_state_table,
115            upstream,
116            output_indices,
117            progress,
118            chunk_size,
119            rate_limit,
120            barrier_rx,
121            actor_ctx,
122            metrics,
123            snapshot_epoch,
124        }
125    }
126
127    #[try_stream(ok = Message, error = StreamExecutorError)]
128    async fn execute_inner(mut self) {
129        trace!("snapshot backfill executor start");
130        let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
131        trace!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
132        let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
133        trace!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
134        let should_snapshot_backfill: Option<u64> = if let Some(snapshot_epoch) =
135            self.snapshot_epoch
136        {
137            if first_upstream_barrier.epoch != first_recv_barrier.epoch {
138                assert!(snapshot_epoch <= first_upstream_barrier.epoch.prev);
139                Some(snapshot_epoch)
140            } else {
141                None
142            }
143        } else {
144            // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill
145            if cfg!(debug_assertions) {
146                panic!(
147                    "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}",
148                    first_upstream_barrier.epoch, first_recv_barrier.epoch
149                );
150            } else {
151                warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set");
152                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch);
153                None
154            }
155        };
156        let first_recv_barrier_epoch = first_recv_barrier.epoch;
157        let initial_backfill_paused =
158            first_recv_barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id);
159        yield Message::Barrier(first_recv_barrier);
160        let mut backfill_state = BackfillState::new(
161            self.progress_state_table,
162            first_recv_barrier_epoch,
163            self.upstream_table.pk_serializer().clone(),
164        )
165        .await?;
166
167        let (mut barrier_epoch, mut need_report_finish) = {
168            if let Some(snapshot_epoch) = should_snapshot_backfill {
169                let table_id_str = format!("{}", self.upstream_table.table_id());
170                let actor_id_str = format!("{}", self.actor_ctx.id);
171
172                let consume_upstream_row_count = self
173                    .metrics
174                    .snapshot_backfill_consume_row_count
175                    .with_guarded_label_values(&[
176                        table_id_str.as_str(),
177                        actor_id_str.as_str(),
178                        "consume_upstream",
179                    ]);
180
181                let mut upstream_buffer = UpstreamBuffer::new(
182                    &mut self.upstream,
183                    first_upstream_barrier,
184                    consume_upstream_row_count,
185                );
186
187                // Phase 1: consume upstream snapshot
188                let (mut barrier_epoch, upstream_buffer) = if first_recv_barrier_epoch.prev
189                    < snapshot_epoch
190                {
191                    trace!(
192                        table_id = %self.upstream_table.table_id(),
193                        snapshot_epoch,
194                        barrier_epoch = ?first_recv_barrier_epoch,
195                        "start consuming snapshot"
196                    );
197                    {
198                        let consuming_snapshot_row_count = self
199                            .metrics
200                            .snapshot_backfill_consume_row_count
201                            .with_guarded_label_values(&[
202                                table_id_str.as_str(),
203                                actor_id_str.as_str(),
204                                "consuming_snapshot",
205                            ]);
206                        let snapshot_stream = make_consume_snapshot_stream(
207                            &self.upstream_table,
208                            snapshot_epoch,
209                            self.chunk_size,
210                            &mut self.rate_limit,
211                            &mut self.barrier_rx,
212                            &mut self.progress,
213                            &mut backfill_state,
214                            first_recv_barrier_epoch,
215                            initial_backfill_paused,
216                            &self.actor_ctx,
217                        );
218
219                        pin_mut!(snapshot_stream);
220
221                        while let Some(message) = upstream_buffer
222                            .run_future(snapshot_stream.try_next())
223                            .await?
224                        {
225                            if let Message::Chunk(chunk) = &message {
226                                consuming_snapshot_row_count.inc_by(chunk.cardinality() as _);
227                            }
228                            yield message;
229                        }
230                    }
231
232                    let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
233                    let recv_barrier_epoch = recv_barrier.epoch;
234                    assert_eq!(snapshot_epoch, recv_barrier_epoch.prev);
235                    let post_commit = backfill_state.commit(recv_barrier.epoch).await?;
236                    yield Message::Barrier(recv_barrier);
237                    post_commit.post_yield_barrier(None).await?;
238                    (
239                        recv_barrier_epoch,
240                        upstream_buffer.start_consuming_log_store(snapshot_epoch),
241                    )
242                } else {
243                    trace!(
244                        table_id = %self.upstream_table.table_id(),
245                        snapshot_epoch,
246                        barrier_epoch = ?first_recv_barrier_epoch,
247                        "skip consuming snapshot"
248                    );
249                    (
250                        first_recv_barrier_epoch,
251                        upstream_buffer.start_consuming_log_store(first_recv_barrier_epoch.prev),
252                    )
253                };
254
255                // Phase 2: consume upstream log store
256                if let Some(mut upstream_buffer) = upstream_buffer {
257                    let initial_pending_lag = Duration::from_millis(
258                        Epoch(upstream_buffer.pending_epoch_lag()).physical_time(),
259                    );
260                    trace!(
261                        ?barrier_epoch,
262                        table_id = %self.upstream_table.table_id(),
263                        ?initial_pending_lag,
264                        "start consuming log store"
265                    );
266
267                    let consuming_log_store_row_count = self
268                        .metrics
269                        .snapshot_backfill_consume_row_count
270                        .with_guarded_label_values(&[
271                            table_id_str.as_str(),
272                            actor_id_str.as_str(),
273                            "consuming_log_store",
274                        ]);
275                    let mut pending_non_checkpoint_barrier: Vec<EpochPair> = vec![];
276                    loop {
277                        let barrier = receive_next_barrier(&mut self.barrier_rx).await?;
278                        assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
279                        let is_finished = upstream_buffer.consumed_epoch(barrier.epoch).await?;
280                        // Disable calling next_epoch, because, if barrier_epoch.prev is a checkpoint epoch,
281                        // next_epoch(barrier_epoch.prev) is actually waiting for the committed epoch.
282                        // However, upstream_buffer's is_polling_epoch_data can be false, since just received
283                        // the checkpoint barrier_epoch.prev. And then the upstream_buffer may stop polling upstream
284                        // when the max_pending_epoch_lag is small. When upstream is not polled, the barrier of the next
285                        // committed epoch cannot be collected.
286                        // {
287                        //     // we must call `next_epoch` after `consumed_epoch`, and otherwise in `next_epoch`
288                        //     // we may block the upstream, and the upstream never get a chance to finish the `next_epoch`
289                        //     let next_prev_epoch = upstream_buffer
290                        //         .run_future(self.upstream_table.next_epoch(barrier_epoch.prev))
291                        //         .await?;
292                        //     assert_eq!(next_prev_epoch, barrier.epoch.prev);
293                        // }
294                        barrier_epoch = barrier.epoch;
295                        if barrier.kind.is_checkpoint() {
296                            let pending_non_checkpoint_barrier =
297                                take(&mut pending_non_checkpoint_barrier);
298                            let end_epoch = barrier_epoch.prev;
299                            let start_epoch = pending_non_checkpoint_barrier
300                                .first()
301                                .map(|epoch| epoch.prev)
302                                .unwrap_or(end_epoch);
303                            trace!(?barrier_epoch, kind = ?barrier.kind, ?pending_non_checkpoint_barrier, "start consume epoch change log");
304                            // use `upstream_buffer.run_future` to poll upstream concurrently so that we won't have back-pressure
305                            // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed,
306                            // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock.
307                            let mut stream = upstream_buffer
308                                .run_future(make_log_stream(
309                                    &self.upstream_table,
310                                    start_epoch,
311                                    end_epoch,
312                                    None,
313                                    self.chunk_size,
314                                ))
315                                .await?;
316                            while let Some(chunk) =
317                                upstream_buffer.run_future(stream.try_next()).await?
318                            {
319                                trace!(
320                                    ?barrier_epoch,
321                                    size = chunk.cardinality(),
322                                    "consume change log yield chunk",
323                                );
324                                consuming_log_store_row_count.inc_by(chunk.cardinality() as _);
325                                yield Message::Chunk(chunk);
326                            }
327
328                            trace!(?barrier_epoch, "after consume change log");
329
330                            stream
331                                .for_vnode_pk_progress(|vnode, row_count, progress| {
332                                    assert_eq!(progress, None);
333                                    backfill_state.finish_epoch(
334                                        vnode,
335                                        barrier.epoch.prev,
336                                        row_count,
337                                    );
338                                })
339                                .await?;
340                        } else {
341                            pending_non_checkpoint_barrier.push(barrier.epoch);
342                        }
343
344                        if is_finished {
345                            assert_eq!(upstream_buffer.pending_epoch_lag(), 0);
346                            self.progress.finish_consuming_log_store(barrier.epoch);
347                        } else {
348                            self.progress.update_create_mview_log_store_progress(
349                                barrier.epoch,
350                                upstream_buffer.pending_epoch_lag(),
351                            );
352                        }
353
354                        let post_commit = backfill_state.commit(barrier.epoch).await?;
355                        let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
356                        yield Message::Barrier(barrier);
357                        post_commit.post_yield_barrier(None).await?;
358                        if update_vnode_bitmap.is_some() {
359                            return Err(anyhow!(
360                                "should not update vnode bitmap during consuming log store"
361                            )
362                            .into());
363                        }
364
365                        if is_finished {
366                            assert!(
367                                pending_non_checkpoint_barrier.is_empty(),
368                                "{pending_non_checkpoint_barrier:?}"
369                            );
370                            break;
371                        }
372                    }
373                    trace!(
374                        ?barrier_epoch,
375                        table_id = %self.upstream_table.table_id(),
376                        "finish consuming log store"
377                    );
378
379                    (barrier_epoch, false)
380                } else {
381                    trace!(
382                        ?barrier_epoch,
383                        table_id = %self.upstream_table.table_id(),
384                        "skip consuming log store and start consuming upstream directly"
385                    );
386
387                    (barrier_epoch, true)
388                }
389            } else {
390                backfill_state
391                    .latest_progress()
392                    .for_each(|(vnode, progress)| {
393                        let progress = progress.expect("should not be empty");
394                        assert_eq!(
395                            progress.epoch, first_upstream_barrier.epoch.prev,
396                            "vnode: {:?}",
397                            vnode
398                        );
399                        assert_eq!(
400                            progress.progress,
401                            EpochBackfillProgress::Consumed,
402                            "vnode: {:?}",
403                            vnode
404                        );
405                    });
406                trace!(
407                    table_id = %self.upstream_table.table_id(),
408                    "skip backfill"
409                );
410                assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
411                (first_upstream_barrier.epoch, true)
412            }
413        };
414        let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
415        let mut epoch_row_count = 0;
416        // Phase 3: consume upstream
417        while let Some(msg) = upstream.try_next().await? {
418            match msg {
419                Message::Barrier(barrier) => {
420                    assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
421                    self.upstream_table
422                        .vnodes()
423                        .iter_vnodes()
424                        .for_each(|vnode| {
425                            // Note: the `epoch_row_count` is the accumulated row count of all vnodes of the current
426                            // executor.
427                            backfill_state.finish_epoch(vnode, barrier.epoch.prev, epoch_row_count);
428                        });
429                    epoch_row_count = 0;
430                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
431                    barrier_epoch = barrier.epoch;
432                    if need_report_finish {
433                        need_report_finish = false;
434                        self.progress.finish_consuming_log_store(barrier_epoch);
435                    }
436                    let post_commit = backfill_state.commit(barrier.epoch).await?;
437                    yield Message::Barrier(barrier);
438                    if let Some(new_vnode_bitmap) =
439                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
440                    {
441                        let _prev_vnode_bitmap =
442                            self.upstream_table.update_vnode_bitmap(new_vnode_bitmap);
443                        backfill_state
444                            .latest_progress()
445                            .for_each(|(vnode, progress)| {
446                                let progress = progress.expect("should not be empty");
447                                assert_eq!(
448                                    progress.epoch, barrier_epoch.prev,
449                                    "vnode {:?} has unexpected progress epoch",
450                                    vnode
451                                );
452                                assert_eq!(
453                                    progress.progress,
454                                    EpochBackfillProgress::Consumed,
455                                    "vnode {:?} has unexpected progress",
456                                    vnode
457                                );
458                            });
459                    }
460                }
461                msg => {
462                    if let Message::Chunk(chunk) = &msg {
463                        epoch_row_count += chunk.cardinality();
464                    }
465                    yield msg;
466                }
467            }
468        }
469    }
470}
471
472impl<S: StateStore> Execute for SnapshotBackfillExecutor<S> {
473    fn execute(self: Box<Self>) -> BoxedMessageStream {
474        let output_indices = self.output_indices.clone();
475        self.execute_inner()
476            .filter_map(move |result| {
477                ready({
478                    match result {
479                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
480                        Err(e) => Some(Err(e)),
481                    }
482                })
483            })
484            .boxed()
485    }
486}
487
488struct ConsumingSnapshot;
489struct ConsumingLogStore;
490
491#[derive(Debug)]
492struct PendingBarriers {
493    first_upstream_barrier_epoch: EpochPair,
494
495    /// Pending non-checkpoint barriers before receiving the next checkpoint barrier
496    /// Newer barrier at the front
497    pending_non_checkpoint_barriers: VecDeque<DispatcherBarrier>,
498
499    /// In the outer `VecDeque`, newer barriers at the front.
500    /// In the inner `VecDeque`, newer barrier at the front, with the first barrier as checkpoint barrier,
501    /// and others as non-checkpoint barrier
502    checkpoint_barrier_groups: VecDeque<VecDeque<DispatcherBarrier>>,
503}
504
505impl PendingBarriers {
506    fn new(first_upstream_barrier: DispatcherBarrier) -> Self {
507        Self {
508            first_upstream_barrier_epoch: first_upstream_barrier.epoch,
509            pending_non_checkpoint_barriers: Default::default(),
510            checkpoint_barrier_groups: VecDeque::from_iter([VecDeque::from_iter([
511                first_upstream_barrier,
512            ])]),
513        }
514    }
515
516    fn add(&mut self, barrier: DispatcherBarrier) {
517        let is_checkpoint = barrier.kind.is_checkpoint();
518        self.pending_non_checkpoint_barriers.push_front(barrier);
519        if is_checkpoint {
520            self.checkpoint_barrier_groups
521                .push_front(take(&mut self.pending_non_checkpoint_barriers));
522        }
523    }
524
525    fn pop(&mut self) -> Option<VecDeque<DispatcherBarrier>> {
526        self.checkpoint_barrier_groups.pop_back()
527    }
528
529    fn consume_epoch(&mut self, epoch: EpochPair) {
530        let barriers = self
531            .checkpoint_barrier_groups
532            .back_mut()
533            .expect("non-empty");
534        let oldest_upstream_barrier = barriers.back().expect("non-empty");
535        assert!(
536            oldest_upstream_barrier.epoch.prev >= epoch.prev,
537            "oldest upstream barrier has epoch {:?} earlier than epoch to consume {:?}",
538            oldest_upstream_barrier.epoch,
539            epoch
540        );
541        if oldest_upstream_barrier.epoch.prev == epoch.prev {
542            assert_eq!(oldest_upstream_barrier.epoch, epoch);
543            barriers.pop_back();
544            if barriers.is_empty() {
545                self.checkpoint_barrier_groups.pop_back();
546            }
547        }
548    }
549
550    fn latest_epoch(&self) -> Option<EpochPair> {
551        self.pending_non_checkpoint_barriers
552            .front()
553            .or_else(|| {
554                self.checkpoint_barrier_groups
555                    .front()
556                    .and_then(|barriers| barriers.front())
557            })
558            .map(|barrier| barrier.epoch)
559    }
560
561    fn checkpoint_epoch_count(&self) -> usize {
562        self.checkpoint_barrier_groups.len()
563    }
564
565    fn has_checkpoint_epoch(&self) -> bool {
566        !self.checkpoint_barrier_groups.is_empty()
567    }
568}
569
570struct UpstreamBuffer<'a, S> {
571    upstream: &'a mut MergeExecutorInput,
572    max_pending_epoch_lag: u64,
573    consumed_epoch: u64,
574    /// Barriers received from upstream but not yet received the barrier from local barrier worker.
575    upstream_pending_barriers: PendingBarriers,
576    /// Whether we have started polling any upstream data before the next checkpoint barrier.
577    /// When `true`, we should continue polling until the next checkpoint barrier, because
578    /// some data in this epoch have been discarded and data in this epoch
579    /// must be read from log store
580    is_polling_epoch_data: bool,
581    consume_upstream_row_count: LabelGuardedIntCounter,
582    _phase: S,
583}
584
585impl<'a> UpstreamBuffer<'a, ConsumingSnapshot> {
586    fn new(
587        upstream: &'a mut MergeExecutorInput,
588        first_upstream_barrier: DispatcherBarrier,
589        consume_upstream_row_count: LabelGuardedIntCounter,
590    ) -> Self {
591        Self {
592            upstream,
593            is_polling_epoch_data: false,
594            consume_upstream_row_count,
595            upstream_pending_barriers: PendingBarriers::new(first_upstream_barrier),
596            // no limit on the number of pending barrier in the beginning
597            max_pending_epoch_lag: u64::MAX,
598            consumed_epoch: 0,
599            _phase: ConsumingSnapshot {},
600        }
601    }
602
603    fn start_consuming_log_store(
604        mut self,
605        consumed_epoch: u64,
606    ) -> Option<UpstreamBuffer<'a, ConsumingLogStore>> {
607        if self
608            .upstream_pending_barriers
609            .first_upstream_barrier_epoch
610            .prev
611            == consumed_epoch
612        {
613            assert_eq!(
614                1,
615                self.upstream_pending_barriers
616                    .pop()
617                    .expect("non-empty")
618                    .len()
619            );
620        }
621        let max_pending_epoch_lag = self.pending_epoch_lag();
622        let buffer = UpstreamBuffer {
623            upstream: self.upstream,
624            upstream_pending_barriers: self.upstream_pending_barriers,
625            max_pending_epoch_lag,
626            is_polling_epoch_data: self.is_polling_epoch_data,
627            consume_upstream_row_count: self.consume_upstream_row_count,
628            consumed_epoch,
629            _phase: ConsumingLogStore {},
630        };
631        if buffer.is_finished() {
632            None
633        } else {
634            Some(buffer)
635        }
636    }
637}
638
639impl<S> UpstreamBuffer<'_, S> {
640    fn can_consume_upstream(&self) -> bool {
641        self.is_polling_epoch_data || self.pending_epoch_lag() < self.max_pending_epoch_lag
642    }
643
644    async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
645        {
646            loop {
647                if let Err(e) = try {
648                    if !self.can_consume_upstream() {
649                        // pause the future to block consuming upstream
650                        sleep(Duration::from_secs(30)).await;
651                        warn!(pending_barrier = ?self.upstream_pending_barriers, "not polling upstream but timeout");
652                        return pending().await;
653                    }
654                    self.consume_until_next_checkpoint_barrier().await?;
655                } {
656                    break e;
657                }
658            }
659        }
660    }
661
662    /// Consume the upstream until seeing the next barrier.
663    async fn consume_until_next_checkpoint_barrier(&mut self) -> StreamExecutorResult<()> {
664        loop {
665            let msg: DispatcherMessage = self
666                .upstream
667                .try_next()
668                .await?
669                .ok_or_else(|| anyhow!("end of upstream"))?;
670            match msg {
671                DispatcherMessage::Chunk(chunk) => {
672                    self.is_polling_epoch_data = true;
673                    self.consume_upstream_row_count
674                        .inc_by(chunk.cardinality() as _);
675                }
676                DispatcherMessage::Barrier(barrier) => {
677                    let is_checkpoint = barrier.kind.is_checkpoint();
678                    self.upstream_pending_barriers.add(barrier);
679                    if is_checkpoint {
680                        self.is_polling_epoch_data = false;
681                        break;
682                    } else {
683                        self.is_polling_epoch_data = true;
684                    }
685                }
686                DispatcherMessage::Watermark(_) => {
687                    self.is_polling_epoch_data = true;
688                }
689            }
690        }
691        Ok(())
692    }
693}
694
695impl UpstreamBuffer<'_, ConsumingLogStore> {
696    #[await_tree::instrument("consumed_epoch: {:?}", epoch)]
697    async fn consumed_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<bool> {
698        assert!(!self.is_finished());
699        if !self.upstream_pending_barriers.has_checkpoint_epoch() {
700            // when upstream_pending_barriers is empty and not polling any intermediate epoch data,
701            // we must have returned true to indicate finish, and should not be called again.
702            assert!(self.is_polling_epoch_data);
703            self.consume_until_next_checkpoint_barrier().await?;
704            assert_eq!(self.upstream_pending_barriers.checkpoint_epoch_count(), 1);
705        }
706        self.upstream_pending_barriers.consume_epoch(epoch);
707
708        {
709            {
710                let prev_epoch = epoch.prev;
711                assert!(self.consumed_epoch < prev_epoch);
712                let elapsed_epoch = prev_epoch - self.consumed_epoch;
713                self.consumed_epoch = prev_epoch;
714                if self.upstream_pending_barriers.has_checkpoint_epoch() {
715                    // try consuming ready upstreams when we haven't yielded all pending barriers yet.
716                    while self.can_consume_upstream()
717                        && let Some(result) =
718                            self.consume_until_next_checkpoint_barrier().now_or_never()
719                    {
720                        result?;
721                    }
722                }
723                // sub to ensure that the lag is monotonically decreasing.
724                // here we subtract half the elapsed epoch, so that approximately when downstream progresses two epochs,
725                // the upstream can at least progress for one epoch.
726                self.max_pending_epoch_lag = min(
727                    self.pending_epoch_lag(),
728                    self.max_pending_epoch_lag.saturating_sub(elapsed_epoch / 2),
729                );
730            }
731        }
732        Ok(self.is_finished())
733    }
734
735    fn is_finished(&self) -> bool {
736        if cfg!(debug_assertions) && !self.is_polling_epoch_data {
737            assert!(
738                self.upstream_pending_barriers
739                    .pending_non_checkpoint_barriers
740                    .is_empty()
741            )
742        }
743        !self.upstream_pending_barriers.has_checkpoint_epoch() && !self.is_polling_epoch_data
744    }
745}
746
747impl<S> UpstreamBuffer<'_, S> {
748    /// Run a future while concurrently polling the upstream so that the upstream
749    /// won't be back-pressured.
750    async fn run_future<T, E: Into<StreamExecutorError>>(
751        &mut self,
752        future: impl Future<Output = Result<T, E>>,
753    ) -> StreamExecutorResult<T> {
754        select! {
755            biased;
756            e = self.concurrently_consume_upstream() => {
757                Err(e)
758            }
759            // this arm won't be starved, because the first arm is always pending unless returning with error
760            result = future => {
761                result.map_err(Into::into)
762            }
763        }
764    }
765
766    fn pending_epoch_lag(&self) -> u64 {
767        self.upstream_pending_barriers
768            .latest_epoch()
769            .map(|epoch| {
770                epoch
771                    .prev
772                    .checked_sub(self.consumed_epoch)
773                    .expect("pending epoch must be later than consumed_epoch")
774            })
775            .unwrap_or(0)
776    }
777}
778
779#[await_tree::instrument("make_log_stream: {start_epoch}-{end_epoch} table {}", upstream_table.table_id())]
780async fn make_log_stream(
781    upstream_table: &BatchTable<impl StateStore>,
782    start_epoch: u64,
783    end_epoch: u64,
784    start_pk: Option<OwnedRow>,
785    chunk_size: usize,
786) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
787    let data_types = upstream_table.schema().data_types();
788    let start_pk = start_pk.as_ref();
789    // TODO: may avoid polling all vnodes concurrently at the same time but instead with a limit on concurrency.
790    let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| {
791        upstream_table
792            .batch_iter_vnode_log(
793                start_epoch,
794                HummockReadEpoch::Committed(end_epoch),
795                start_pk,
796                vnode,
797            )
798            .map_ok(move |stream| {
799                let stream = stream.map_err(Into::into);
800                (vnode, stream, 0)
801            })
802    }))
803    .await?;
804    let builder = create_builder(RateLimit::Disabled, chunk_size, data_types.clone());
805    Ok(VnodeStream::new(
806        vnode_streams,
807        upstream_table.pk_in_output_indices().expect("should exist"),
808        builder,
809    ))
810}
811
812async fn make_snapshot_stream(
813    upstream_table: &BatchTable<impl StateStore>,
814    snapshot_epoch: u64,
815    backfill_state: &BackfillState<impl StateStore>,
816    rate_limit: RateLimit,
817    chunk_size: usize,
818    snapshot_rebuild_interval: Duration,
819) -> StreamExecutorResult<VnodeStream<impl super::vnode_stream::ChangeLogRowStream>> {
820    let data_types = upstream_table.schema().data_types();
821    let vnode_streams = try_join_all(backfill_state.latest_progress().filter_map(
822        move |(vnode, progress)| {
823            let start_pk = match progress {
824                None => Some((None, 0)),
825                Some(VnodeBackfillProgress {
826                    row_count,
827                    progress: EpochBackfillProgress::Consuming { latest_pk },
828                    ..
829                }) => Some((Some(latest_pk), *row_count)),
830                Some(VnodeBackfillProgress {
831                    progress: EpochBackfillProgress::Consumed,
832                    ..
833                }) => None,
834            };
835            start_pk.map(|(start_pk, row_count)| {
836                upstream_table
837                    .batch_iter_vnode(
838                        HummockReadEpoch::Committed(snapshot_epoch),
839                        start_pk,
840                        vnode,
841                        PrefetchOptions::prefetch_for_large_range_scan(),
842                        snapshot_rebuild_interval,
843                    )
844                    .map_ok(move |stream| {
845                        let stream = stream.map_ok(ChangeLogRow::Insert).map_err(Into::into);
846                        (vnode, stream, row_count)
847                    })
848            })
849        },
850    ))
851    .await?;
852    let builder = create_builder(rate_limit, chunk_size, data_types.clone());
853    Ok(VnodeStream::new(
854        vnode_streams,
855        upstream_table.pk_in_output_indices().expect("should exist"),
856        builder,
857    ))
858}
859
860#[expect(clippy::too_many_arguments)]
861#[try_stream(ok = Message, error = StreamExecutorError)]
862async fn make_consume_snapshot_stream<'a, S: StateStore>(
863    upstream_table: &'a BatchTable<S>,
864    snapshot_epoch: u64,
865    chunk_size: usize,
866    rate_limit: &'a mut RateLimit,
867    barrier_rx: &'a mut UnboundedReceiver<Barrier>,
868    progress: &'a mut CreateMviewProgressReporter,
869    backfill_state: &'a mut BackfillState<S>,
870    first_recv_barrier_epoch: EpochPair,
871    initial_backfill_paused: bool,
872    actor_ctx: &'a ActorContextRef,
873) {
874    let mut barrier_epoch = first_recv_barrier_epoch;
875
876    // start consume upstream snapshot
877    let mut snapshot_stream = make_snapshot_stream(
878        upstream_table,
879        snapshot_epoch,
880        &*backfill_state,
881        *rate_limit,
882        chunk_size,
883        actor_ctx.config.developer.snapshot_iter_rebuild_interval(),
884    )
885    .await?;
886
887    async fn select_barrier_and_snapshot_stream(
888        barrier_rx: &mut UnboundedReceiver<Barrier>,
889        snapshot_stream: &mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
890        throttle_snapshot_stream: bool,
891        backfill_paused: bool,
892    ) -> StreamExecutorResult<Either<Barrier, Option<StreamChunk>>> {
893        select!(
894            result = receive_next_barrier(barrier_rx) => {
895                Ok(Either::Left(result?))
896            },
897            result = snapshot_stream.try_next(), if !throttle_snapshot_stream && !backfill_paused => {
898                Ok(Either::Right(result?))
899            }
900        )
901    }
902
903    let mut count = 0;
904    let mut epoch_row_count = 0;
905    let mut backfill_paused = initial_backfill_paused;
906    loop {
907        let throttle_snapshot_stream = epoch_row_count as u64 >= rate_limit.to_u64();
908        match select_barrier_and_snapshot_stream(
909            barrier_rx,
910            &mut snapshot_stream,
911            throttle_snapshot_stream,
912            backfill_paused,
913        )
914        .await?
915        {
916            Either::Left(barrier) => {
917                assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
918                barrier_epoch = barrier.epoch;
919
920                if barrier_epoch.curr >= snapshot_epoch {
921                    return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into());
922                }
923                if barrier.should_start_fragment_backfill(actor_ctx.fragment_id) {
924                    backfill_paused = false;
925                }
926                if let Some(chunk) = snapshot_stream.consume_builder() {
927                    count += chunk.cardinality();
928                    epoch_row_count += chunk.cardinality();
929                    yield Message::Chunk(chunk);
930                }
931                snapshot_stream
932                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
933                        if let Some(pk) = pk_progress {
934                            backfill_state.update_epoch_progress(
935                                vnode,
936                                snapshot_epoch,
937                                row_count,
938                                pk,
939                            );
940                        } else {
941                            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
942                        }
943                    })
944                    .await?;
945                let post_commit = backfill_state.commit(barrier.epoch).await?;
946                trace!(?barrier_epoch, count, epoch_row_count, "update progress");
947                progress.update(barrier_epoch, barrier_epoch.prev, count as _);
948                epoch_row_count = 0;
949
950                let new_rate_limit = barrier.mutation.as_ref().and_then(|m| {
951                    if let Mutation::Throttle(config) = &**m
952                        && let Some(config) = config.get(&actor_ctx.fragment_id)
953                        && config.throttle_type() == PbThrottleType::Backfill
954                    {
955                        Some(config.rate_limit)
956                    } else {
957                        None
958                    }
959                });
960                yield Message::Barrier(barrier);
961                post_commit.post_yield_barrier(None).await?;
962
963                if let Some(new_rate_limit) = new_rate_limit {
964                    let new_rate_limit = new_rate_limit.into();
965                    *rate_limit = new_rate_limit;
966                    snapshot_stream.update_rate_limiter(new_rate_limit, chunk_size);
967                }
968            }
969            Either::Right(Some(chunk)) => {
970                if backfill_paused {
971                    return Err(
972                        anyhow!("snapshot backfill paused, but received snapshot chunk").into(),
973                    );
974                }
975                count += chunk.cardinality();
976                epoch_row_count += chunk.cardinality();
977                yield Message::Chunk(chunk);
978            }
979            Either::Right(None) => {
980                break;
981            }
982        }
983    }
984
985    // finish consuming upstream snapshot, report finish
986    let barrier_to_report_finish = receive_next_barrier(barrier_rx).await?;
987    assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr);
988    barrier_epoch = barrier_to_report_finish.epoch;
989    trace!(?barrier_epoch, count, "report finish");
990    snapshot_stream
991        .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
992            assert_eq!(pk_progress, None);
993            backfill_state.finish_epoch(vnode, snapshot_epoch, row_count);
994        })
995        .await?;
996    let post_commit = backfill_state.commit(barrier_epoch).await?;
997    progress.finish(barrier_epoch, count as _);
998    yield Message::Barrier(barrier_to_report_finish);
999    post_commit.post_yield_barrier(None).await?;
1000
1001    // keep receiving remaining barriers until receiving a barrier with epoch as snapshot_epoch
1002    loop {
1003        let barrier = receive_next_barrier(barrier_rx).await?;
1004        assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
1005        barrier_epoch = barrier.epoch;
1006        let post_commit = backfill_state.commit(barrier.epoch).await?;
1007        yield Message::Barrier(barrier);
1008        post_commit.post_yield_barrier(None).await?;
1009        if barrier_epoch.curr == snapshot_epoch {
1010            break;
1011        }
1012    }
1013    trace!(?barrier_epoch, "finish consuming snapshot");
1014}