risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::ready;
16
17use anyhow::anyhow;
18use futures::future::{Either, select};
19use futures::{FutureExt, TryStreamExt};
20use futures_async_stream::try_stream;
21use risingwave_common::catalog::TableId;
22use risingwave_common_rate_limit::{
23    MonitoredRateLimiter, RateLimit, RateLimiter, RateLimiterTrait,
24};
25use risingwave_storage::StateStore;
26use rw_futures_util::drop_either_future;
27use tokio::sync::mpsc::UnboundedReceiver;
28
29use crate::executor::backfill::snapshot_backfill::consume_upstream::stream::ConsumeUpstreamStream;
30use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
31use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
32use crate::executor::backfill::snapshot_backfill::state::{BackfillState, EpochBackfillProgress};
33use crate::executor::backfill::utils::mapping_message;
34use crate::executor::prelude::{StateTable, *};
35use crate::executor::{Barrier, Message, StreamExecutorError};
36use crate::task::CreateMviewProgressReporter;
37
38pub struct UpstreamTableExecutor<T: UpstreamTable, S: StateStore> {
39    upstream_table: T,
40    progress_state_table: StateTable<S>,
41    snapshot_epoch: u64,
42    output_indices: Vec<usize>,
43
44    chunk_size: usize,
45    rate_limiter: MonitoredRateLimiter,
46    actor_ctx: ActorContextRef,
47    barrier_rx: UnboundedReceiver<Barrier>,
48    progress: CreateMviewProgressReporter,
49}
50
51impl<T: UpstreamTable, S: StateStore> UpstreamTableExecutor<T, S> {
52    #[expect(clippy::too_many_arguments)]
53    pub fn new(
54        upstream_table_id: TableId,
55        upstream_table: T,
56        progress_state_table: StateTable<S>,
57        snapshot_epoch: u64,
58        output_indices: Vec<usize>,
59
60        chunk_size: usize,
61        rate_limit: RateLimit,
62        actor_ctx: ActorContextRef,
63        barrier_rx: UnboundedReceiver<Barrier>,
64        progress: CreateMviewProgressReporter,
65    ) -> Self {
66        let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table_id);
67        Self {
68            upstream_table,
69            progress_state_table,
70            snapshot_epoch,
71            output_indices,
72            chunk_size,
73            rate_limiter,
74            actor_ctx,
75            barrier_rx,
76            progress,
77        }
78    }
79
80    #[try_stream(ok = Message, error = StreamExecutorError)]
81    pub async fn into_stream(mut self) {
82        self.upstream_table
83            .check_initial_vnode_bitmap(self.progress_state_table.vnodes())?;
84        let first_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
85        let first_barrier_epoch = first_barrier.epoch;
86        yield Message::Barrier(first_barrier);
87        let mut progress_state = BackfillState::new(
88            self.progress_state_table,
89            first_barrier_epoch,
90            self.upstream_table.pk_serde(),
91        )
92        .await?;
93        let mut finish_reported = false;
94        let mut prev_reported_row_count = 0;
95        let mut upstream_table = self.upstream_table;
96        let mut stream = ConsumeUpstreamStream::new(
97            progress_state.latest_progress(),
98            &upstream_table,
99            self.snapshot_epoch,
100            self.chunk_size,
101            self.rate_limiter.rate_limit(),
102        );
103
104        'on_new_stream: loop {
105            loop {
106                let barrier = {
107                    let rate_limited_stream = rate_limit_stream(&mut stream, &self.rate_limiter);
108                    pin_mut!(rate_limited_stream);
109
110                    loop {
111                        let future1 = receive_next_barrier(&mut self.barrier_rx);
112                        let future2 = rate_limited_stream.try_next().map(|result| {
113                            result
114                                .and_then(|opt| opt.ok_or_else(|| anyhow!("end of stream").into()))
115                        });
116                        pin_mut!(future1);
117                        pin_mut!(future2);
118                        match drop_either_future(select(future1, future2).await) {
119                            Either::Left(Ok(barrier)) => {
120                                break barrier;
121                            }
122                            Either::Right(Ok(chunk)) => {
123                                yield Message::Chunk(chunk);
124                            }
125                            Either::Left(Err(e)) | Either::Right(Err(e)) => {
126                                return Err(e);
127                            }
128                        }
129                    }
130                };
131
132                if let Some(chunk) = stream.consume_builder() {
133                    yield Message::Chunk(chunk);
134                }
135                stream
136                    .for_vnode_pk_progress(|vnode, epoch, row_count, progress| {
137                        if let Some(progress) = progress {
138                            progress_state.update_epoch_progress(vnode, epoch, row_count, progress);
139                        } else {
140                            progress_state.finish_epoch(vnode, epoch, row_count);
141                        }
142                    })
143                    .await?;
144
145                if !finish_reported {
146                    let mut row_count = 0;
147                    let mut is_finished = true;
148                    for (_, progress) in progress_state.latest_progress() {
149                        if let Some(progress) = progress {
150                            if progress.epoch == self.snapshot_epoch {
151                                if let EpochBackfillProgress::Consuming { .. } = &progress.progress
152                                {
153                                    is_finished = false;
154                                }
155                                row_count += progress.row_count;
156                            }
157                        } else {
158                            is_finished = false;
159                        }
160                    }
161                    // ensure that the reported row count is non-decreasing.
162                    let row_count_to_report = std::cmp::max(prev_reported_row_count, row_count);
163                    prev_reported_row_count = row_count_to_report;
164
165                    if is_finished {
166                        self.progress
167                            .finish(barrier.epoch, row_count_to_report as _);
168                        finish_reported = true;
169                    } else {
170                        self.progress.update(
171                            barrier.epoch,
172                            self.snapshot_epoch,
173                            row_count_to_report as _,
174                        );
175                    }
176                }
177
178                let post_commit = progress_state.commit(barrier.epoch).await?;
179                let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
180                yield Message::Barrier(barrier);
181                if let Some(new_vnode_bitmap) =
182                    post_commit.post_yield_barrier(update_vnode_bitmap).await?
183                {
184                    drop(stream);
185                    upstream_table.update_vnode_bitmap(new_vnode_bitmap.clone());
186                    // recreate the stream on update vnode bitmap
187                    stream = ConsumeUpstreamStream::new(
188                        progress_state.latest_progress(),
189                        &upstream_table,
190                        self.snapshot_epoch,
191                        self.chunk_size,
192                        self.rate_limiter.rate_limit(),
193                    );
194                    continue 'on_new_stream;
195                }
196            }
197        }
198    }
199}
200
201// The future returned by `.next()` is cancellation safe even if the whole stream is dropped.
202#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
203async fn rate_limit_stream<'a>(
204    stream: &'a mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
205    rate_limiter: &'a RateLimiter,
206) {
207    while let Some(chunk) = stream.try_next().await? {
208        let quota = chunk.cardinality();
209        // the chunk is yielded immediately without any await point breaking in, so the stream is cancellation safe.
210        yield chunk;
211        rate_limiter.wait(quota as _).await;
212    }
213}
214
215impl<T: UpstreamTable, S: StateStore> Execute for UpstreamTableExecutor<T, S> {
216    fn execute(self: Box<Self>) -> BoxedMessageStream {
217        let output_indices = self.output_indices.clone();
218        self.into_stream()
219            .filter_map(move |result| {
220                ready({
221                    match result {
222                        Ok(message) => mapping_message(message, &output_indices).map(Ok),
223                        Err(e) => Some(Err(e)),
224                    }
225                })
226            })
227            .boxed()
228    }
229}