risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::ready;
16
17use anyhow::anyhow;
18use futures::future::{Either, select};
19use futures::{FutureExt, TryStreamExt};
20use futures_async_stream::try_stream;
21use risingwave_common::catalog::TableId;
22use risingwave_common::metrics::LabelGuardedIntGauge;
23use risingwave_common_rate_limit::{
24    MonitoredRateLimiter, RateLimit, RateLimiter, RateLimiterTrait,
25};
26use risingwave_storage::StateStore;
27use rw_futures_util::drop_either_future;
28use tokio::sync::mpsc::UnboundedReceiver;
29
30use crate::executor::backfill::snapshot_backfill::consume_upstream::stream::ConsumeUpstreamStream;
31use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
32use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
33use crate::executor::backfill::snapshot_backfill::state::{BackfillState, EpochBackfillProgress};
34use crate::executor::backfill::utils::mapping_message;
35use crate::executor::prelude::{StateTable, *};
36use crate::executor::{Barrier, Message, StreamExecutorError};
37use crate::task::CreateMviewProgressReporter;
38
39pub struct UpstreamTableExecutor<T: UpstreamTable, S: StateStore> {
40    upstream_table: T,
41    progress_state_table: StateTable<S>,
42    snapshot_epoch: u64,
43    output_indices: Vec<usize>,
44
45    chunk_size: usize,
46    rate_limiter: MonitoredRateLimiter,
47    actor_ctx: ActorContextRef,
48    barrier_rx: UnboundedReceiver<Barrier>,
49    progress: CreateMviewProgressReporter,
50    crossdb_last_consumed_min_epoch: LabelGuardedIntGauge,
51}
52
53impl<T: UpstreamTable, S: StateStore> UpstreamTableExecutor<T, S> {
54    #[expect(clippy::too_many_arguments)]
55    pub fn new(
56        upstream_table_id: TableId,
57        upstream_table: T,
58        progress_state_table: StateTable<S>,
59        snapshot_epoch: u64,
60        output_indices: Vec<usize>,
61
62        chunk_size: usize,
63        rate_limit: RateLimit,
64        actor_ctx: ActorContextRef,
65        barrier_rx: UnboundedReceiver<Barrier>,
66        progress: CreateMviewProgressReporter,
67    ) -> Self {
68        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table_id);
69        let table_id_label = upstream_table_id.to_string();
70        let actor_id_label = actor_ctx.id.to_string();
71        let fragment_id_label = actor_ctx.fragment_id.to_string();
72        let crossdb_last_consumed_min_epoch = actor_ctx
73            .streaming_metrics
74            .crossdb_last_consumed_min_epoch
75            .with_guarded_label_values(&[
76                table_id_label.as_str(),
77                actor_id_label.as_str(),
78                fragment_id_label.as_str(),
79            ]);
80        Self {
81            upstream_table,
82            progress_state_table,
83            snapshot_epoch,
84            output_indices,
85            chunk_size,
86            rate_limiter,
87            actor_ctx,
88            barrier_rx,
89            progress,
90            crossdb_last_consumed_min_epoch,
91        }
92    }
93
94    fn extract_last_consumed_min_epoch(progress_state: &BackfillState<S>) -> u64 {
95        let mut min_epoch = u64::MAX;
96        for (_, progress) in progress_state.latest_progress() {
97            let Some(progress) = progress else {
98                // If any vnode has no progress yet, report `0` explicitly to indicate the
99                // progress is not fully ready, instead of hiding this state.
100                return 0;
101            };
102            min_epoch = min_epoch.min(progress.epoch);
103        }
104        if min_epoch == u64::MAX { 0 } else { min_epoch }
105    }
106
107    #[try_stream(ok = Message, error = StreamExecutorError)]
108    pub async fn into_stream(mut self) {
109        self.upstream_table
110            .check_initial_vnode_bitmap(self.progress_state_table.vnodes())?;
111        let first_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
112        let first_barrier_epoch = first_barrier.epoch;
113        yield Message::Barrier(first_barrier);
114        let mut progress_state = BackfillState::new(
115            self.progress_state_table,
116            first_barrier_epoch,
117            self.upstream_table.pk_serde(),
118        )
119        .await?;
120        let mut finish_reported = false;
121        let mut prev_reported_row_count = 0;
122        let mut upstream_table = self.upstream_table;
123        let snapshot_rebuild_interval = self
124            .actor_ctx
125            .config
126            .developer
127            .snapshot_iter_rebuild_interval();
128        let mut stream = ConsumeUpstreamStream::new(
129            progress_state.latest_progress(),
130            &upstream_table,
131            self.snapshot_epoch,
132            self.chunk_size,
133            self.rate_limiter.rate_limit(),
134            snapshot_rebuild_interval,
135        );
136
137        'on_new_stream: loop {
138            loop {
139                let barrier = {
140                    let rate_limited_stream = rate_limit_stream(&mut stream, &self.rate_limiter);
141                    pin_mut!(rate_limited_stream);
142
143                    loop {
144                        let future1 = receive_next_barrier(&mut self.barrier_rx);
145                        let future2 = rate_limited_stream.try_next().map(|result| {
146                            result
147                                .and_then(|opt| opt.ok_or_else(|| anyhow!("end of stream").into()))
148                        });
149                        pin_mut!(future1);
150                        pin_mut!(future2);
151                        match drop_either_future(select(future1, future2).await) {
152                            Either::Left(Ok(barrier)) => {
153                                break barrier;
154                            }
155                            Either::Right(Ok(chunk)) => {
156                                yield Message::Chunk(chunk);
157                            }
158                            Either::Left(Err(e)) | Either::Right(Err(e)) => {
159                                return Err(e);
160                            }
161                        }
162                    }
163                };
164
165                if let Some(chunk) = stream.consume_builder() {
166                    yield Message::Chunk(chunk);
167                }
168                stream
169                    .for_vnode_pk_progress(|vnode, epoch, row_count, progress| {
170                        if let Some(progress) = progress {
171                            progress_state.update_epoch_progress(vnode, epoch, row_count, progress);
172                        } else {
173                            progress_state.finish_epoch(vnode, epoch, row_count);
174                        }
175                    })
176                    .await?;
177
178                let last_consumed_min_epoch =
179                    Self::extract_last_consumed_min_epoch(&progress_state);
180                self.crossdb_last_consumed_min_epoch
181                    .set(last_consumed_min_epoch as i64);
182
183                if !finish_reported {
184                    let mut row_count = 0;
185                    let mut is_finished = true;
186                    for (_, progress) in progress_state.latest_progress() {
187                        if let Some(progress) = progress {
188                            if progress.epoch == self.snapshot_epoch {
189                                if let EpochBackfillProgress::Consuming { .. } = &progress.progress
190                                {
191                                    is_finished = false;
192                                }
193                                row_count += progress.row_count;
194                            }
195                        } else {
196                            is_finished = false;
197                        }
198                    }
199                    // ensure that the reported row count is non-decreasing.
200                    let row_count_to_report = std::cmp::max(prev_reported_row_count, row_count);
201                    prev_reported_row_count = row_count_to_report;
202
203                    if is_finished {
204                        self.progress
205                            .finish(barrier.epoch, row_count_to_report as _);
206                        finish_reported = true;
207                    } else {
208                        self.progress.update(
209                            barrier.epoch,
210                            self.snapshot_epoch,
211                            row_count_to_report as _,
212                        );
213                    }
214                }
215
216                let post_commit = progress_state.commit(barrier.epoch).await?;
217                let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
218                yield Message::Barrier(barrier);
219                if let Some(new_vnode_bitmap) =
220                    post_commit.post_yield_barrier(update_vnode_bitmap).await?
221                {
222                    drop(stream);
223                    upstream_table.update_vnode_bitmap(new_vnode_bitmap);
224                    // recreate the stream on update vnode bitmap
225                    stream = ConsumeUpstreamStream::new(
226                        progress_state.latest_progress(),
227                        &upstream_table,
228                        self.snapshot_epoch,
229                        self.chunk_size,
230                        self.rate_limiter.rate_limit(),
231                        snapshot_rebuild_interval,
232                    );
233                    continue 'on_new_stream;
234                }
235            }
236        }
237    }
238}
239
240// The future returned by `.next()` is cancellation safe even if the whole stream is dropped.
241#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
242async fn rate_limit_stream<'a>(
243    stream: &'a mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
244    rate_limiter: &'a RateLimiter,
245) {
246    while let Some(chunk) = stream.try_next().await? {
247        let quota = chunk.cardinality();
248        // the chunk is yielded immediately without any await point breaking in, so the stream is cancellation safe.
249        yield chunk;
250        rate_limiter.wait(quota as _).await;
251    }
252}
253
254impl<T: UpstreamTable, S: StateStore> Execute for UpstreamTableExecutor<T, S> {
255    fn execute(self: Box<Self>) -> BoxedMessageStream {
256        let output_indices = self.output_indices.clone();
257        self.into_stream()
258            .filter_map(move |result| {
259                ready({
260                    match result {
261                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
262                        Err(e) => Some(Err(e)),
263                    }
264                })
265            })
266            .boxed()
267    }
268}