risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
executor.rs1use std::future::ready;
16
17use anyhow::anyhow;
18use futures::future::{Either, select};
19use futures::{FutureExt, TryStreamExt};
20use futures_async_stream::try_stream;
21use risingwave_common::catalog::TableId;
22use risingwave_common::metrics::LabelGuardedIntGauge;
23use risingwave_common_rate_limit::{
24 MonitoredRateLimiter, RateLimit, RateLimiter, RateLimiterTrait,
25};
26use risingwave_storage::StateStore;
27use rw_futures_util::drop_either_future;
28use tokio::sync::mpsc::UnboundedReceiver;
29
30use crate::executor::backfill::snapshot_backfill::consume_upstream::stream::ConsumeUpstreamStream;
31use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
32use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
33use crate::executor::backfill::snapshot_backfill::state::{BackfillState, EpochBackfillProgress};
34use crate::executor::backfill::utils::mapping_message;
35use crate::executor::prelude::{StateTable, *};
36use crate::executor::{Barrier, Message, StreamExecutorError};
37use crate::task::CreateMviewProgressReporter;
38
39pub struct UpstreamTableExecutor<T: UpstreamTable, S: StateStore> {
40 upstream_table: T,
41 progress_state_table: StateTable<S>,
42 snapshot_epoch: u64,
43 output_indices: Vec<usize>,
44
45 chunk_size: usize,
46 rate_limiter: MonitoredRateLimiter,
47 actor_ctx: ActorContextRef,
48 barrier_rx: UnboundedReceiver<Barrier>,
49 progress: CreateMviewProgressReporter,
50 crossdb_last_consumed_min_epoch: LabelGuardedIntGauge,
51}
52
53impl<T: UpstreamTable, S: StateStore> UpstreamTableExecutor<T, S> {
54 #[expect(clippy::too_many_arguments)]
55 pub fn new(
56 upstream_table_id: TableId,
57 upstream_table: T,
58 progress_state_table: StateTable<S>,
59 snapshot_epoch: u64,
60 output_indices: Vec<usize>,
61
62 chunk_size: usize,
63 rate_limit: RateLimit,
64 actor_ctx: ActorContextRef,
65 barrier_rx: UnboundedReceiver<Barrier>,
66 progress: CreateMviewProgressReporter,
67 ) -> Self {
68 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table_id);
69 let table_id_label = upstream_table_id.to_string();
70 let actor_id_label = actor_ctx.id.to_string();
71 let fragment_id_label = actor_ctx.fragment_id.to_string();
72 let crossdb_last_consumed_min_epoch = actor_ctx
73 .streaming_metrics
74 .crossdb_last_consumed_min_epoch
75 .with_guarded_label_values(&[
76 table_id_label.as_str(),
77 actor_id_label.as_str(),
78 fragment_id_label.as_str(),
79 ]);
80 Self {
81 upstream_table,
82 progress_state_table,
83 snapshot_epoch,
84 output_indices,
85 chunk_size,
86 rate_limiter,
87 actor_ctx,
88 barrier_rx,
89 progress,
90 crossdb_last_consumed_min_epoch,
91 }
92 }
93
94 fn extract_last_consumed_min_epoch(progress_state: &BackfillState<S>) -> u64 {
95 let mut min_epoch = u64::MAX;
96 for (_, progress) in progress_state.latest_progress() {
97 let Some(progress) = progress else {
98 return 0;
101 };
102 min_epoch = min_epoch.min(progress.epoch);
103 }
104 if min_epoch == u64::MAX { 0 } else { min_epoch }
105 }
106
107 #[try_stream(ok = Message, error = StreamExecutorError)]
108 pub async fn into_stream(mut self) {
109 self.upstream_table
110 .check_initial_vnode_bitmap(self.progress_state_table.vnodes())?;
111 let first_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
112 let first_barrier_epoch = first_barrier.epoch;
113 yield Message::Barrier(first_barrier);
114 let mut progress_state = BackfillState::new(
115 self.progress_state_table,
116 first_barrier_epoch,
117 self.upstream_table.pk_serde(),
118 )
119 .await?;
120 let mut finish_reported = false;
121 let mut prev_reported_row_count = 0;
122 let mut upstream_table = self.upstream_table;
123 let snapshot_rebuild_interval = self
124 .actor_ctx
125 .config
126 .developer
127 .snapshot_iter_rebuild_interval();
128 let mut stream = ConsumeUpstreamStream::new(
129 progress_state.latest_progress(),
130 &upstream_table,
131 self.snapshot_epoch,
132 self.chunk_size,
133 self.rate_limiter.rate_limit(),
134 snapshot_rebuild_interval,
135 );
136
137 'on_new_stream: loop {
138 loop {
139 let barrier = {
140 let rate_limited_stream = rate_limit_stream(&mut stream, &self.rate_limiter);
141 pin_mut!(rate_limited_stream);
142
143 loop {
144 let future1 = receive_next_barrier(&mut self.barrier_rx);
145 let future2 = rate_limited_stream.try_next().map(|result| {
146 result
147 .and_then(|opt| opt.ok_or_else(|| anyhow!("end of stream").into()))
148 });
149 pin_mut!(future1);
150 pin_mut!(future2);
151 match drop_either_future(select(future1, future2).await) {
152 Either::Left(Ok(barrier)) => {
153 break barrier;
154 }
155 Either::Right(Ok(chunk)) => {
156 yield Message::Chunk(chunk);
157 }
158 Either::Left(Err(e)) | Either::Right(Err(e)) => {
159 return Err(e);
160 }
161 }
162 }
163 };
164
165 if let Some(chunk) = stream.consume_builder() {
166 yield Message::Chunk(chunk);
167 }
168 stream
169 .for_vnode_pk_progress(|vnode, epoch, row_count, progress| {
170 if let Some(progress) = progress {
171 progress_state.update_epoch_progress(vnode, epoch, row_count, progress);
172 } else {
173 progress_state.finish_epoch(vnode, epoch, row_count);
174 }
175 })
176 .await?;
177
178 let last_consumed_min_epoch =
179 Self::extract_last_consumed_min_epoch(&progress_state);
180 self.crossdb_last_consumed_min_epoch
181 .set(last_consumed_min_epoch as i64);
182
183 if !finish_reported {
184 let mut row_count = 0;
185 let mut is_finished = true;
186 for (_, progress) in progress_state.latest_progress() {
187 if let Some(progress) = progress {
188 if progress.epoch == self.snapshot_epoch {
189 if let EpochBackfillProgress::Consuming { .. } = &progress.progress
190 {
191 is_finished = false;
192 }
193 row_count += progress.row_count;
194 }
195 } else {
196 is_finished = false;
197 }
198 }
199 let row_count_to_report = std::cmp::max(prev_reported_row_count, row_count);
201 prev_reported_row_count = row_count_to_report;
202
203 if is_finished {
204 self.progress
205 .finish(barrier.epoch, row_count_to_report as _);
206 finish_reported = true;
207 } else {
208 self.progress.update(
209 barrier.epoch,
210 self.snapshot_epoch,
211 row_count_to_report as _,
212 );
213 }
214 }
215
216 let post_commit = progress_state.commit(barrier.epoch).await?;
217 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
218 yield Message::Barrier(barrier);
219 if let Some(new_vnode_bitmap) =
220 post_commit.post_yield_barrier(update_vnode_bitmap).await?
221 {
222 drop(stream);
223 upstream_table.update_vnode_bitmap(new_vnode_bitmap);
224 stream = ConsumeUpstreamStream::new(
226 progress_state.latest_progress(),
227 &upstream_table,
228 self.snapshot_epoch,
229 self.chunk_size,
230 self.rate_limiter.rate_limit(),
231 snapshot_rebuild_interval,
232 );
233 continue 'on_new_stream;
234 }
235 }
236 }
237 }
238}
239
240#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
242async fn rate_limit_stream<'a>(
243 stream: &'a mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
244 rate_limiter: &'a RateLimiter,
245) {
246 while let Some(chunk) = stream.try_next().await? {
247 let quota = chunk.cardinality();
248 yield chunk;
250 rate_limiter.wait(quota as _).await;
251 }
252}
253
254impl<T: UpstreamTable, S: StateStore> Execute for UpstreamTableExecutor<T, S> {
255 fn execute(self: Box<Self>) -> BoxedMessageStream {
256 let output_indices = self.output_indices.clone();
257 self.into_stream()
258 .filter_map(move |result| {
259 ready({
260 match result {
261 Ok(message) => mapping_message(message, &output_indices).map(Ok),
262 Err(e) => Some(Err(e)),
263 }
264 })
265 })
266 .boxed()
267 }
268}