1mod upstream_table_ext {
16 use std::collections::HashMap;
17 use std::time::Duration;
18
19 use futures::future::{BoxFuture, try_join_all};
20 use futures::{TryFutureExt, TryStreamExt};
21 use risingwave_common::hash::VirtualNode;
22 use risingwave_common::row::OwnedRow;
23 use risingwave_common_rate_limit::RateLimit;
24 use risingwave_storage::table::ChangeLogRow;
25
26 use crate::executor::StreamExecutorResult;
27 use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
28 use crate::executor::backfill::snapshot_backfill::vnode_stream::{
29 ChangeLogRowStream, VnodeStream,
30 };
31 use crate::executor::backfill::utils::create_builder;
32
33 pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
34 VnodeStream<impl ChangeLogRowStream>;
35 pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
36 BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
37
38 #[define_opaque(UpstreamTableSnapshotStream)]
39 pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
40 upstream_table: &T,
41 snapshot_epoch: u64,
42 rate_limit: RateLimit,
43 chunk_size: usize,
44 rebuild_interval: Duration,
45 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
46 ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
47 Box::pin(async move {
48 let streams = try_join_all(vnode_progresses.into_iter().map(
49 |(vnode, (start_pk, row_count))| {
50 upstream_table
51 .snapshot_stream(vnode, snapshot_epoch, start_pk, rebuild_interval)
52 .map_ok(move |stream| {
53 (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
54 })
55 },
56 ))
57 .await?;
58 Ok(VnodeStream::new(
59 streams,
60 upstream_table.pk_in_output_indices(),
61 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
62 ))
63 })
64 }
65
66 pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
67 VnodeStream<impl ChangeLogRowStream>;
68 pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
69 BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
70
71 #[define_opaque(UpstreamTableChangeLogStreamFuture)]
72 pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
73 upstream_table: &T,
74 epoch: u64,
75 rate_limit: RateLimit,
76 chunk_size: usize,
77 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
78 ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
79 Box::pin(async move {
80 let streams = try_join_all(vnode_progresses.into_iter().map(
81 |(vnode, (start_pk, row_count))| {
82 upstream_table
83 .change_log_stream(vnode, epoch, start_pk)
84 .map_ok(move |stream| (vnode, stream, row_count))
85 },
86 ))
87 .await?;
88 Ok(VnodeStream::new(
89 streams,
90 upstream_table.pk_in_output_indices(),
91 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
92 ))
93 })
94 }
95
96 pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
97 pub(super) fn next_epoch_future<T: UpstreamTable>(
98 upstream_table: &T,
99 epoch: u64,
100 ) -> NextEpochFuture<'_> {
101 Box::pin(async move { upstream_table.next_epoch(epoch).await })
102 }
103}
104
105use std::collections::{BTreeMap, HashMap};
106use std::mem::take;
107use std::pin::Pin;
108use std::task::{Context, Poll, ready};
109use std::time::Duration;
110
111use risingwave_common::array::StreamChunk;
112use risingwave_common::hash::VirtualNode;
113use risingwave_common::row::OwnedRow;
114use risingwave_common_rate_limit::RateLimit;
115use upstream_table_ext::*;
116
117use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
118use crate::executor::backfill::snapshot_backfill::state::{
119 EpochBackfillProgress, VnodeBackfillProgress,
120};
121use crate::executor::prelude::{Stream, StreamExt};
122use crate::executor::{StreamExecutorError, StreamExecutorResult};
123
124enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
125 CreatingSnapshotStream {
126 future: UpstreamTableSnapshotStreamFuture<'a, T>,
127 snapshot_epoch: u64,
128 pre_finished_vnodes: HashMap<VirtualNode, usize>,
129 },
130 ConsumingSnapshotStream {
131 stream: UpstreamTableSnapshotStream<T>,
132 snapshot_epoch: u64,
133 pre_finished_vnodes: HashMap<VirtualNode, usize>,
134 },
135 CreatingChangeLogStream {
136 future: UpstreamTableChangeLogStreamFuture<'a, T>,
137 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
138 epoch: u64,
139 pre_finished_vnodes: HashMap<VirtualNode, usize>,
140 },
141 ConsumingChangeLogStream {
142 stream: UpstreamTableChangeLogStream<T>,
143 epoch: u64,
144 pre_finished_vnodes: HashMap<VirtualNode, usize>,
145 },
146 ResolvingNextEpoch {
147 future: NextEpochFuture<'a>,
148 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
149 },
150 Err,
151}
152
153pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
154 upstream_table: &'a T,
155 pending_epoch_vnode_progress:
156 BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
157 state: ConsumeUpstreamStreamState<'a, T>,
158
159 chunk_size: usize,
160 rate_limit: RateLimit,
161}
162
163impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
164 pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
165 match &mut self.state {
166 ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
167 stream.consume_builder()
168 }
169 ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
170 stream.consume_builder()
171 }
172 ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
173 | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
174 | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
175 ConsumeUpstreamStreamState::Err => {
176 unreachable!("should not be accessed on Err")
177 }
178 }
179 }
180
181 pub(super) async fn for_vnode_pk_progress(
182 &mut self,
183 mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
184 ) -> StreamExecutorResult<()> {
185 match &mut self.state {
186 ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
187 }
189 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
190 stream,
191 snapshot_epoch,
192 ..
193 } => {
194 stream
195 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
196 on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
197 })
198 .await?;
199 }
200 &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
201 ref mut stream,
202 ref epoch,
203 ..
204 } => {
205 stream
206 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
207 on_vnode_progress(vnode, *epoch, row_count, pk_progress)
208 })
209 .await?;
210 }
211 &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
212 ref prev_epoch_finished_vnodes,
213 ..
214 }
215 | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
216 ref prev_epoch_finished_vnodes,
217 ..
218 } => {
219 if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
220 for (vnode, row_count) in prev_epoch_finished_vnodes {
221 on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
222 }
223 }
224 }
225 ConsumeUpstreamStreamState::Err => {
226 unreachable!("should not be accessed on Err")
227 }
228 }
229 Ok(())
230 }
231}
232
233impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
234 type Item = StreamExecutorResult<StreamChunk>;
235
236 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237 let result: Result<!, StreamExecutorError> = try {
238 loop {
239 match &mut self.state {
240 ConsumeUpstreamStreamState::CreatingSnapshotStream {
241 future,
242 snapshot_epoch,
243 pre_finished_vnodes,
244 } => {
245 let stream = ready!(future.as_mut().poll(cx))?;
246 let snapshot_epoch = *snapshot_epoch;
247 let pre_finished_vnodes = take(pre_finished_vnodes);
248 self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
249 stream,
250 snapshot_epoch,
251 pre_finished_vnodes,
252 };
253 continue;
254 }
255 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
256 stream,
257 snapshot_epoch,
258 pre_finished_vnodes,
259 } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
260 None => {
261 let prev_epoch = *snapshot_epoch;
262 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
263 for (vnode, row_count) in stream.take_finished_vnodes() {
264 prev_epoch_finished_vnodes
265 .try_insert(vnode, row_count)
266 .expect("non-duplicate");
267 }
268 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
269 future: next_epoch_future(self.upstream_table, prev_epoch),
270 prev_epoch_finished_vnodes: Some((
271 prev_epoch,
272 prev_epoch_finished_vnodes,
273 )),
274 };
275 continue;
276 }
277 Some(chunk) => {
278 return Poll::Ready(Some(Ok(chunk)));
279 }
280 },
281 ConsumeUpstreamStreamState::CreatingChangeLogStream {
282 future,
283 epoch,
284 pre_finished_vnodes,
285 ..
286 } => {
287 let stream = ready!(future.as_mut().poll(cx))?;
288 let epoch = *epoch;
289 let pre_finished_vnodes = take(pre_finished_vnodes);
290 self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
291 stream,
292 epoch,
293 pre_finished_vnodes,
294 };
295 continue;
296 }
297 ConsumeUpstreamStreamState::ConsumingChangeLogStream {
298 stream,
299 epoch,
300 pre_finished_vnodes,
301 } => {
302 match ready!(stream.poll_next_unpin(cx)).transpose()? {
303 None => {
304 let prev_epoch = *epoch;
305 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
306 for (vnode, row_count) in stream.take_finished_vnodes() {
307 prev_epoch_finished_vnodes
308 .try_insert(vnode, row_count)
309 .expect("non-duplicate");
310 }
311 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
312 future: next_epoch_future(self.upstream_table, prev_epoch),
313 prev_epoch_finished_vnodes: Some((
314 prev_epoch,
315 prev_epoch_finished_vnodes,
316 )),
317 };
318 continue;
319 }
320 Some(chunk) => {
321 return Poll::Ready(Some(Ok(chunk)));
322 }
323 };
324 }
325 ConsumeUpstreamStreamState::ResolvingNextEpoch {
326 future,
327 prev_epoch_finished_vnodes,
328 } => {
329 let epoch = ready!(future.as_mut().poll(cx))?;
330 let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
331 let mut pre_finished_vnodes = HashMap::new();
332 let mut vnode_progresses = HashMap::new();
333 for prev_epoch_vnode in prev_epoch_finished_vnodes
334 .as_ref()
335 .map(|(_, vnodes)| vnodes.keys())
336 .into_iter()
337 .flatten()
338 {
339 vnode_progresses
340 .try_insert(*prev_epoch_vnode, (None, 0))
341 .expect("non-duplicate");
342 }
343 if let Some((pending_epoch, _)) =
344 self.pending_epoch_vnode_progress.first_key_value()
345 {
346 assert!(
348 epoch <= *pending_epoch,
349 "pending_epoch {} earlier than next epoch {}",
350 pending_epoch,
351 epoch
352 );
353 if epoch == *pending_epoch {
354 let (_, progress) = self
355 .pending_epoch_vnode_progress
356 .pop_first()
357 .expect("checked Some");
358 for (vnode, (progress, row_count)) in progress {
359 match progress {
360 EpochBackfillProgress::Consuming { latest_pk } => {
361 vnode_progresses
362 .try_insert(vnode, (Some(latest_pk), row_count))
363 .expect("non-duplicate");
364 }
365 EpochBackfillProgress::Consumed => {
366 pre_finished_vnodes
367 .try_insert(vnode, row_count)
368 .expect("non-duplicate");
369 }
370 }
371 }
372 }
373 }
374 self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
375 future: create_upstream_table_change_log_stream(
376 self.upstream_table,
377 epoch,
378 self.rate_limit,
379 self.chunk_size,
380 vnode_progresses,
381 ),
382 prev_epoch_finished_vnodes,
383 epoch,
384 pre_finished_vnodes,
385 };
386 continue;
387 }
388 ConsumeUpstreamStreamState::Err => {
389 unreachable!("should not be accessed on Err")
390 }
391 }
392 }
393 };
394 self.state = ConsumeUpstreamStreamState::Err;
395 Poll::Ready(Some(result.map(|unreachable| unreachable)))
396 }
397}
398
399impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
400 pub(super) fn new<'p>(
401 initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
402 upstream_table: &'a T,
403 snapshot_epoch: u64,
404 chunk_size: usize,
405 rate_limit: RateLimit,
406 snapshot_rebuild_interval: Duration,
407 ) -> Self {
408 let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
409 let mut finished_snapshot_epoch_vnodes = HashMap::new();
410 let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
411 for (vnode, progress) in initial_progress {
412 match progress {
413 None => {
414 ongoing_snapshot_epoch_vnodes
415 .try_insert(vnode, (None, 0))
416 .expect("non-duplicate");
417 }
418 Some(progress) => {
419 let epoch = progress.epoch;
420 let row_count = progress.row_count;
421 if epoch == snapshot_epoch {
422 match &progress.progress {
423 EpochBackfillProgress::Consumed => {
424 finished_snapshot_epoch_vnodes
425 .try_insert(vnode, row_count)
426 .expect("non-duplicate");
427 }
428 EpochBackfillProgress::Consuming { latest_pk } => {
429 ongoing_snapshot_epoch_vnodes
430 .try_insert(vnode, (Some(latest_pk.clone()), row_count))
431 .expect("non-duplicate");
432 }
433 }
434 } else {
435 assert!(
436 epoch > snapshot_epoch,
437 "epoch {} earlier than snapshot_epoch {} on vnode {}",
438 epoch,
439 snapshot_epoch,
440 vnode
441 );
442 pending_epoch_vnode_progress
443 .entry(epoch)
444 .or_default()
445 .try_insert(vnode, (progress.progress.clone(), progress.row_count))
446 .expect("non-duplicate");
447 }
448 }
449 };
450 }
451 let (pending_epoch_vnode_progress, state) = {
452 if !ongoing_snapshot_epoch_vnodes.is_empty() {
453 (
455 pending_epoch_vnode_progress,
456 ConsumeUpstreamStreamState::CreatingSnapshotStream {
457 future: create_upstream_table_snapshot_stream(
458 upstream_table,
459 snapshot_epoch,
460 rate_limit,
461 chunk_size,
462 snapshot_rebuild_interval,
463 ongoing_snapshot_epoch_vnodes,
464 ),
465 snapshot_epoch,
466 pre_finished_vnodes: finished_snapshot_epoch_vnodes,
467 },
468 )
469 } else if !finished_snapshot_epoch_vnodes.is_empty() {
470 (
472 pending_epoch_vnode_progress,
473 ConsumeUpstreamStreamState::ResolvingNextEpoch {
474 future: next_epoch_future(upstream_table, snapshot_epoch),
475 prev_epoch_finished_vnodes: Some((
476 snapshot_epoch,
477 finished_snapshot_epoch_vnodes,
478 )),
479 },
480 )
481 } else {
482 let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
484 .pop_first()
485 .expect("non-empty vnodes");
486 let mut ongoing_vnodes = HashMap::new();
487 let mut finished_vnodes = HashMap::new();
488 for (vnode, (progress, row_count)) in first_vnodes {
489 match progress {
490 EpochBackfillProgress::Consuming { latest_pk } => {
491 ongoing_vnodes
492 .try_insert(vnode, (Some(latest_pk), row_count))
493 .expect("non-duplicate");
494 }
495 EpochBackfillProgress::Consumed => {
496 finished_vnodes
497 .try_insert(vnode, row_count)
498 .expect("non-duplicate");
499 }
500 }
501 }
502 let state = if ongoing_vnodes.is_empty() {
503 ConsumeUpstreamStreamState::ResolvingNextEpoch {
505 future: next_epoch_future(upstream_table, first_epoch),
506 prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
507 }
508 } else {
509 ConsumeUpstreamStreamState::CreatingChangeLogStream {
510 future: create_upstream_table_change_log_stream(
511 upstream_table,
512 first_epoch,
513 rate_limit,
514 chunk_size,
515 ongoing_vnodes,
516 ),
517 prev_epoch_finished_vnodes: None,
518 epoch: first_epoch,
519 pre_finished_vnodes: finished_vnodes,
520 }
521 };
522 (pending_epoch_vnode_progress, state)
523 }
524 };
525 Self {
526 upstream_table,
527 pending_epoch_vnode_progress,
528 state,
529 chunk_size,
530 rate_limit,
531 }
532 }
533}