risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
executor.rs1use std::future::ready;
16
17use anyhow::anyhow;
18use futures::future::{Either, select};
19use futures::{FutureExt, TryStreamExt};
20use futures_async_stream::try_stream;
21use risingwave_common::catalog::TableId;
22use risingwave_common_rate_limit::{
23 MonitoredRateLimiter, RateLimit, RateLimiter, RateLimiterTrait,
24};
25use risingwave_storage::StateStore;
26use rw_futures_util::drop_either_future;
27use tokio::sync::mpsc::UnboundedReceiver;
28
29use crate::executor::backfill::snapshot_backfill::consume_upstream::stream::ConsumeUpstreamStream;
30use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
31use crate::executor::backfill::snapshot_backfill::receive_next_barrier;
32use crate::executor::backfill::snapshot_backfill::state::{BackfillState, EpochBackfillProgress};
33use crate::executor::backfill::utils::mapping_message;
34use crate::executor::prelude::{StateTable, *};
35use crate::executor::{Barrier, Message, StreamExecutorError};
36use crate::task::CreateMviewProgressReporter;
37
38pub struct UpstreamTableExecutor<T: UpstreamTable, S: StateStore> {
39 upstream_table: T,
40 progress_state_table: StateTable<S>,
41 snapshot_epoch: u64,
42 output_indices: Vec<usize>,
43
44 chunk_size: usize,
45 rate_limiter: MonitoredRateLimiter,
46 actor_ctx: ActorContextRef,
47 barrier_rx: UnboundedReceiver<Barrier>,
48 progress: CreateMviewProgressReporter,
49}
50
51impl<T: UpstreamTable, S: StateStore> UpstreamTableExecutor<T, S> {
52 #[expect(clippy::too_many_arguments)]
53 pub fn new(
54 upstream_table_id: TableId,
55 upstream_table: T,
56 progress_state_table: StateTable<S>,
57 snapshot_epoch: u64,
58 output_indices: Vec<usize>,
59
60 chunk_size: usize,
61 rate_limit: RateLimit,
62 actor_ctx: ActorContextRef,
63 barrier_rx: UnboundedReceiver<Barrier>,
64 progress: CreateMviewProgressReporter,
65 ) -> Self {
66 let rate_limiter = RateLimiter::new(rate_limit).monitored(upstream_table_id);
67 Self {
68 upstream_table,
69 progress_state_table,
70 snapshot_epoch,
71 output_indices,
72 chunk_size,
73 rate_limiter,
74 actor_ctx,
75 barrier_rx,
76 progress,
77 }
78 }
79
80 #[try_stream(ok = Message, error = StreamExecutorError)]
81 pub async fn into_stream(mut self) {
82 self.upstream_table
83 .check_initial_vnode_bitmap(self.progress_state_table.vnodes())?;
84 let first_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
85 let first_barrier_epoch = first_barrier.epoch;
86 yield Message::Barrier(first_barrier);
87 let mut progress_state = BackfillState::new(
88 self.progress_state_table,
89 first_barrier_epoch,
90 self.upstream_table.pk_serde(),
91 )
92 .await?;
93 let mut finish_reported = false;
94 let mut prev_reported_row_count = 0;
95 let mut upstream_table = self.upstream_table;
96 let mut stream = ConsumeUpstreamStream::new(
97 progress_state.latest_progress(),
98 &upstream_table,
99 self.snapshot_epoch,
100 self.chunk_size,
101 self.rate_limiter.rate_limit(),
102 );
103
104 'on_new_stream: loop {
105 loop {
106 let barrier = {
107 let rate_limited_stream = rate_limit_stream(&mut stream, &self.rate_limiter);
108 pin_mut!(rate_limited_stream);
109
110 loop {
111 let future1 = receive_next_barrier(&mut self.barrier_rx);
112 let future2 = rate_limited_stream.try_next().map(|result| {
113 result
114 .and_then(|opt| opt.ok_or_else(|| anyhow!("end of stream").into()))
115 });
116 pin_mut!(future1);
117 pin_mut!(future2);
118 match drop_either_future(select(future1, future2).await) {
119 Either::Left(Ok(barrier)) => {
120 break barrier;
121 }
122 Either::Right(Ok(chunk)) => {
123 yield Message::Chunk(chunk);
124 }
125 Either::Left(Err(e)) | Either::Right(Err(e)) => {
126 return Err(e);
127 }
128 }
129 }
130 };
131
132 if let Some(chunk) = stream.consume_builder() {
133 yield Message::Chunk(chunk);
134 }
135 stream
136 .for_vnode_pk_progress(|vnode, epoch, row_count, progress| {
137 if let Some(progress) = progress {
138 progress_state.update_epoch_progress(vnode, epoch, row_count, progress);
139 } else {
140 progress_state.finish_epoch(vnode, epoch, row_count);
141 }
142 })
143 .await?;
144
145 if !finish_reported {
146 let mut row_count = 0;
147 let mut is_finished = true;
148 for (_, progress) in progress_state.latest_progress() {
149 if let Some(progress) = progress {
150 if progress.epoch == self.snapshot_epoch {
151 if let EpochBackfillProgress::Consuming { .. } = &progress.progress
152 {
153 is_finished = false;
154 }
155 row_count += progress.row_count;
156 }
157 } else {
158 is_finished = false;
159 }
160 }
161 let row_count_to_report = std::cmp::max(prev_reported_row_count, row_count);
163 prev_reported_row_count = row_count_to_report;
164
165 if is_finished {
166 self.progress
167 .finish(barrier.epoch, row_count_to_report as _);
168 finish_reported = true;
169 } else {
170 self.progress.update(
171 barrier.epoch,
172 self.snapshot_epoch,
173 row_count_to_report as _,
174 );
175 }
176 }
177
178 let post_commit = progress_state.commit(barrier.epoch).await?;
179 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
180 yield Message::Barrier(barrier);
181 if let Some(new_vnode_bitmap) =
182 post_commit.post_yield_barrier(update_vnode_bitmap).await?
183 {
184 drop(stream);
185 upstream_table.update_vnode_bitmap(new_vnode_bitmap.clone());
186 stream = ConsumeUpstreamStream::new(
188 progress_state.latest_progress(),
189 &upstream_table,
190 self.snapshot_epoch,
191 self.chunk_size,
192 self.rate_limiter.rate_limit(),
193 );
194 continue 'on_new_stream;
195 }
196 }
197 }
198 }
199}
200
201#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
203async fn rate_limit_stream<'a>(
204 stream: &'a mut (impl Stream<Item = StreamExecutorResult<StreamChunk>> + Unpin),
205 rate_limiter: &'a RateLimiter,
206) {
207 while let Some(chunk) = stream.try_next().await? {
208 let quota = chunk.cardinality();
209 yield chunk;
211 rate_limiter.wait(quota as _).await;
212 }
213}
214
215impl<T: UpstreamTable, S: StateStore> Execute for UpstreamTableExecutor<T, S> {
216 fn execute(self: Box<Self>) -> BoxedMessageStream {
217 let output_indices = self.output_indices.clone();
218 self.into_stream()
219 .filter_map(move |result| {
220 ready({
221 match result {
222 Ok(message) => mapping_message(message, &output_indices).map(Ok),
223 Err(e) => Some(Err(e)),
224 }
225 })
226 })
227 .boxed()
228 }
229}