1mod upstream_table_ext {
16 use std::collections::HashMap;
17
18 use futures::future::{BoxFuture, try_join_all};
19 use futures::{TryFutureExt, TryStreamExt};
20 use risingwave_common::hash::VirtualNode;
21 use risingwave_common::row::OwnedRow;
22 use risingwave_common_rate_limit::RateLimit;
23 use risingwave_storage::table::ChangeLogRow;
24
25 use crate::executor::StreamExecutorResult;
26 use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
27 use crate::executor::backfill::snapshot_backfill::vnode_stream::{
28 ChangeLogRowStream, VnodeStream,
29 };
30 use crate::executor::backfill::utils::create_builder;
31
32 pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
33 VnodeStream<impl ChangeLogRowStream>;
34 pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
35 BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
36
37 #[define_opaque(UpstreamTableSnapshotStream)]
38 pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
39 upstream_table: &T,
40 snapshot_epoch: u64,
41 rate_limit: RateLimit,
42 chunk_size: usize,
43 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
44 ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
45 Box::pin(async move {
46 let streams = try_join_all(vnode_progresses.into_iter().map(
47 |(vnode, (start_pk, row_count))| {
48 upstream_table
49 .snapshot_stream(vnode, snapshot_epoch, start_pk)
50 .map_ok(move |stream| {
51 (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
52 })
53 },
54 ))
55 .await?;
56 Ok(VnodeStream::new(
57 streams,
58 upstream_table.pk_in_output_indices(),
59 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
60 ))
61 })
62 }
63
64 pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
65 VnodeStream<impl ChangeLogRowStream>;
66 pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
67 BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
68
69 #[define_opaque(UpstreamTableChangeLogStreamFuture)]
70 pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
71 upstream_table: &T,
72 epoch: u64,
73 rate_limit: RateLimit,
74 chunk_size: usize,
75 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
76 ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
77 Box::pin(async move {
78 let streams = try_join_all(vnode_progresses.into_iter().map(
79 |(vnode, (start_pk, row_count))| {
80 upstream_table
81 .change_log_stream(vnode, epoch, start_pk)
82 .map_ok(move |stream| (vnode, stream, row_count))
83 },
84 ))
85 .await?;
86 Ok(VnodeStream::new(
87 streams,
88 upstream_table.pk_in_output_indices(),
89 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
90 ))
91 })
92 }
93
94 pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
95 pub(super) fn next_epoch_future<T: UpstreamTable>(
96 upstream_table: &T,
97 epoch: u64,
98 ) -> NextEpochFuture<'_> {
99 Box::pin(async move { upstream_table.next_epoch(epoch).await })
100 }
101}
102
103use std::collections::{BTreeMap, HashMap};
104use std::mem::take;
105use std::pin::Pin;
106use std::task::{Context, Poll, ready};
107
108use risingwave_common::array::StreamChunk;
109use risingwave_common::hash::VirtualNode;
110use risingwave_common::row::OwnedRow;
111use risingwave_common_rate_limit::RateLimit;
112use upstream_table_ext::*;
113
114use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
115use crate::executor::backfill::snapshot_backfill::state::{
116 EpochBackfillProgress, VnodeBackfillProgress,
117};
118use crate::executor::prelude::{Stream, StreamExt};
119use crate::executor::{StreamExecutorError, StreamExecutorResult};
120
121enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
122 CreatingSnapshotStream {
123 future: UpstreamTableSnapshotStreamFuture<'a, T>,
124 snapshot_epoch: u64,
125 pre_finished_vnodes: HashMap<VirtualNode, usize>,
126 },
127 ConsumingSnapshotStream {
128 stream: UpstreamTableSnapshotStream<T>,
129 snapshot_epoch: u64,
130 pre_finished_vnodes: HashMap<VirtualNode, usize>,
131 },
132 CreatingChangeLogStream {
133 future: UpstreamTableChangeLogStreamFuture<'a, T>,
134 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
135 epoch: u64,
136 pre_finished_vnodes: HashMap<VirtualNode, usize>,
137 },
138 ConsumingChangeLogStream {
139 stream: UpstreamTableChangeLogStream<T>,
140 epoch: u64,
141 pre_finished_vnodes: HashMap<VirtualNode, usize>,
142 },
143 ResolvingNextEpoch {
144 future: NextEpochFuture<'a>,
145 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
146 },
147 Err,
148}
149
150pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
151 upstream_table: &'a T,
152 pending_epoch_vnode_progress:
153 BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
154 state: ConsumeUpstreamStreamState<'a, T>,
155
156 chunk_size: usize,
157 rate_limit: RateLimit,
158}
159
160impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
161 pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
162 match &mut self.state {
163 ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
164 stream.consume_builder()
165 }
166 ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
167 stream.consume_builder()
168 }
169 ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
170 | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
171 | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
172 ConsumeUpstreamStreamState::Err => {
173 unreachable!("should not be accessed on Err")
174 }
175 }
176 }
177
178 pub(super) async fn for_vnode_pk_progress(
179 &mut self,
180 mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
181 ) -> StreamExecutorResult<()> {
182 match &mut self.state {
183 ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
184 }
186 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
187 stream,
188 snapshot_epoch,
189 ..
190 } => {
191 stream
192 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
193 on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
194 })
195 .await?;
196 }
197 &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
198 ref mut stream,
199 ref epoch,
200 ..
201 } => {
202 stream
203 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
204 on_vnode_progress(vnode, *epoch, row_count, pk_progress)
205 })
206 .await?;
207 }
208 &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
209 ref prev_epoch_finished_vnodes,
210 ..
211 }
212 | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
213 ref prev_epoch_finished_vnodes,
214 ..
215 } => {
216 if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
217 for (vnode, row_count) in prev_epoch_finished_vnodes {
218 on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
219 }
220 }
221 }
222 ConsumeUpstreamStreamState::Err => {
223 unreachable!("should not be accessed on Err")
224 }
225 }
226 Ok(())
227 }
228}
229
230impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
231 type Item = StreamExecutorResult<StreamChunk>;
232
233 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
234 let result: Result<!, StreamExecutorError> = try {
235 loop {
236 match &mut self.state {
237 ConsumeUpstreamStreamState::CreatingSnapshotStream {
238 future,
239 snapshot_epoch,
240 pre_finished_vnodes,
241 } => {
242 let stream = ready!(future.as_mut().poll(cx))?;
243 let snapshot_epoch = *snapshot_epoch;
244 let pre_finished_vnodes = take(pre_finished_vnodes);
245 self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
246 stream,
247 snapshot_epoch,
248 pre_finished_vnodes,
249 };
250 continue;
251 }
252 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
253 stream,
254 snapshot_epoch,
255 pre_finished_vnodes,
256 } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
257 None => {
258 let prev_epoch = *snapshot_epoch;
259 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
260 for (vnode, row_count) in stream.take_finished_vnodes() {
261 prev_epoch_finished_vnodes
262 .try_insert(vnode, row_count)
263 .expect("non-duplicate");
264 }
265 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
266 future: next_epoch_future(self.upstream_table, prev_epoch),
267 prev_epoch_finished_vnodes: Some((
268 prev_epoch,
269 prev_epoch_finished_vnodes,
270 )),
271 };
272 continue;
273 }
274 Some(chunk) => {
275 return Poll::Ready(Some(Ok(chunk)));
276 }
277 },
278 ConsumeUpstreamStreamState::CreatingChangeLogStream {
279 future,
280 epoch,
281 pre_finished_vnodes,
282 ..
283 } => {
284 let stream = ready!(future.as_mut().poll(cx))?;
285 let epoch = *epoch;
286 let pre_finished_vnodes = take(pre_finished_vnodes);
287 self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
288 stream,
289 epoch,
290 pre_finished_vnodes,
291 };
292 continue;
293 }
294 ConsumeUpstreamStreamState::ConsumingChangeLogStream {
295 stream,
296 epoch,
297 pre_finished_vnodes,
298 } => {
299 match ready!(stream.poll_next_unpin(cx)).transpose()? {
300 None => {
301 let prev_epoch = *epoch;
302 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
303 for (vnode, row_count) in stream.take_finished_vnodes() {
304 prev_epoch_finished_vnodes
305 .try_insert(vnode, row_count)
306 .expect("non-duplicate");
307 }
308 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
309 future: next_epoch_future(self.upstream_table, prev_epoch),
310 prev_epoch_finished_vnodes: Some((
311 prev_epoch,
312 prev_epoch_finished_vnodes,
313 )),
314 };
315 continue;
316 }
317 Some(chunk) => {
318 return Poll::Ready(Some(Ok(chunk)));
319 }
320 };
321 }
322 ConsumeUpstreamStreamState::ResolvingNextEpoch {
323 future,
324 prev_epoch_finished_vnodes,
325 } => {
326 let epoch = ready!(future.as_mut().poll(cx))?;
327 let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
328 let mut pre_finished_vnodes = HashMap::new();
329 let mut vnode_progresses = HashMap::new();
330 for prev_epoch_vnode in prev_epoch_finished_vnodes
331 .as_ref()
332 .map(|(_, vnodes)| vnodes.keys())
333 .into_iter()
334 .flatten()
335 {
336 vnode_progresses
337 .try_insert(*prev_epoch_vnode, (None, 0))
338 .expect("non-duplicate");
339 }
340 if let Some((pending_epoch, _)) =
341 self.pending_epoch_vnode_progress.first_key_value()
342 {
343 assert!(
345 epoch <= *pending_epoch,
346 "pending_epoch {} earlier than next epoch {}",
347 pending_epoch,
348 epoch
349 );
350 if epoch == *pending_epoch {
351 let (_, progress) = self
352 .pending_epoch_vnode_progress
353 .pop_first()
354 .expect("checked Some");
355 for (vnode, (progress, row_count)) in progress {
356 match progress {
357 EpochBackfillProgress::Consuming { latest_pk } => {
358 vnode_progresses
359 .try_insert(vnode, (Some(latest_pk), row_count))
360 .expect("non-duplicate");
361 }
362 EpochBackfillProgress::Consumed => {
363 pre_finished_vnodes
364 .try_insert(vnode, row_count)
365 .expect("non-duplicate");
366 }
367 }
368 }
369 }
370 }
371 self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
372 future: create_upstream_table_change_log_stream(
373 self.upstream_table,
374 epoch,
375 self.rate_limit,
376 self.chunk_size,
377 vnode_progresses,
378 ),
379 prev_epoch_finished_vnodes,
380 epoch,
381 pre_finished_vnodes,
382 };
383 continue;
384 }
385 ConsumeUpstreamStreamState::Err => {
386 unreachable!("should not be accessed on Err")
387 }
388 }
389 }
390 };
391 self.state = ConsumeUpstreamStreamState::Err;
392 Poll::Ready(Some(result.map(|unreachable| unreachable)))
393 }
394}
395
396impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
397 pub(super) fn new<'p>(
398 initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
399 upstream_table: &'a T,
400 snapshot_epoch: u64,
401 chunk_size: usize,
402 rate_limit: RateLimit,
403 ) -> Self {
404 let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
405 let mut finished_snapshot_epoch_vnodes = HashMap::new();
406 let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
407 for (vnode, progress) in initial_progress {
408 match progress {
409 None => {
410 ongoing_snapshot_epoch_vnodes
411 .try_insert(vnode, (None, 0))
412 .expect("non-duplicate");
413 }
414 Some(progress) => {
415 let epoch = progress.epoch;
416 let row_count = progress.row_count;
417 if epoch == snapshot_epoch {
418 match &progress.progress {
419 EpochBackfillProgress::Consumed => {
420 finished_snapshot_epoch_vnodes
421 .try_insert(vnode, row_count)
422 .expect("non-duplicate");
423 }
424 EpochBackfillProgress::Consuming { latest_pk } => {
425 ongoing_snapshot_epoch_vnodes
426 .try_insert(vnode, (Some(latest_pk.clone()), row_count))
427 .expect("non-duplicate");
428 }
429 }
430 } else {
431 assert!(
432 epoch > snapshot_epoch,
433 "epoch {} earlier than snapshot_epoch {} on vnode {}",
434 epoch,
435 snapshot_epoch,
436 vnode
437 );
438 pending_epoch_vnode_progress
439 .entry(epoch)
440 .or_default()
441 .try_insert(vnode, (progress.progress.clone(), progress.row_count))
442 .expect("non-duplicate");
443 }
444 }
445 };
446 }
447 let (pending_epoch_vnode_progress, state) = {
448 if !ongoing_snapshot_epoch_vnodes.is_empty() {
449 (
451 pending_epoch_vnode_progress,
452 ConsumeUpstreamStreamState::CreatingSnapshotStream {
453 future: create_upstream_table_snapshot_stream(
454 upstream_table,
455 snapshot_epoch,
456 rate_limit,
457 chunk_size,
458 ongoing_snapshot_epoch_vnodes,
459 ),
460 snapshot_epoch,
461 pre_finished_vnodes: finished_snapshot_epoch_vnodes,
462 },
463 )
464 } else if !finished_snapshot_epoch_vnodes.is_empty() {
465 (
467 pending_epoch_vnode_progress,
468 ConsumeUpstreamStreamState::ResolvingNextEpoch {
469 future: next_epoch_future(upstream_table, snapshot_epoch),
470 prev_epoch_finished_vnodes: Some((
471 snapshot_epoch,
472 finished_snapshot_epoch_vnodes,
473 )),
474 },
475 )
476 } else {
477 let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
479 .pop_first()
480 .expect("non-empty vnodes");
481 let mut ongoing_vnodes = HashMap::new();
482 let mut finished_vnodes = HashMap::new();
483 for (vnode, (progress, row_count)) in first_vnodes {
484 match progress {
485 EpochBackfillProgress::Consuming { latest_pk } => {
486 ongoing_vnodes
487 .try_insert(vnode, (Some(latest_pk), row_count))
488 .expect("non-duplicate");
489 }
490 EpochBackfillProgress::Consumed => {
491 finished_vnodes
492 .try_insert(vnode, row_count)
493 .expect("non-duplicate");
494 }
495 }
496 }
497 let state = if ongoing_vnodes.is_empty() {
498 ConsumeUpstreamStreamState::ResolvingNextEpoch {
500 future: next_epoch_future(upstream_table, first_epoch),
501 prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
502 }
503 } else {
504 ConsumeUpstreamStreamState::CreatingChangeLogStream {
505 future: create_upstream_table_change_log_stream(
506 upstream_table,
507 first_epoch,
508 rate_limit,
509 chunk_size,
510 ongoing_vnodes,
511 ),
512 prev_epoch_finished_vnodes: None,
513 epoch: first_epoch,
514 pre_finished_vnodes: finished_vnodes,
515 }
516 };
517 (pending_epoch_vnode_progress, state)
518 }
519 };
520 Self {
521 upstream_table,
522 pending_epoch_vnode_progress,
523 state,
524 chunk_size,
525 rate_limit,
526 }
527 }
528}