1mod upstream_table_ext {
16 use std::collections::HashMap;
17
18 use futures::future::{BoxFuture, try_join_all};
19 use futures::{TryFutureExt, TryStreamExt};
20 use risingwave_common::hash::VirtualNode;
21 use risingwave_common::row::OwnedRow;
22 use risingwave_common_rate_limit::RateLimit;
23 use risingwave_storage::table::ChangeLogRow;
24
25 use crate::executor::StreamExecutorResult;
26 use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
27 use crate::executor::backfill::snapshot_backfill::vnode_stream::{
28 ChangeLogRowStream, VnodeStream,
29 };
30 use crate::executor::backfill::utils::create_builder;
31
32 pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
33 VnodeStream<impl ChangeLogRowStream>;
34 pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
35 BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
36 pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
37 upstream_table: &T,
38 snapshot_epoch: u64,
39 rate_limit: RateLimit,
40 chunk_size: usize,
41 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
42 ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
43 Box::pin(async move {
44 let streams = try_join_all(vnode_progresses.into_iter().map(
45 |(vnode, (start_pk, row_count))| {
46 upstream_table
47 .snapshot_stream(vnode, snapshot_epoch, start_pk)
48 .map_ok(move |stream| {
49 (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
50 })
51 },
52 ))
53 .await?;
54 Ok(VnodeStream::new(
55 streams,
56 upstream_table.pk_in_output_indices(),
57 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
58 ))
59 })
60 }
61
62 pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
63 VnodeStream<impl ChangeLogRowStream>;
64 pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
65 BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
66
67 pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
68 upstream_table: &T,
69 epoch: u64,
70 rate_limit: RateLimit,
71 chunk_size: usize,
72 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
73 ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
74 Box::pin(async move {
75 let streams = try_join_all(vnode_progresses.into_iter().map(
76 |(vnode, (start_pk, row_count))| {
77 upstream_table
78 .change_log_stream(vnode, epoch, start_pk)
79 .map_ok(move |stream| (vnode, stream, row_count))
80 },
81 ))
82 .await?;
83 Ok(VnodeStream::new(
84 streams,
85 upstream_table.pk_in_output_indices(),
86 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
87 ))
88 })
89 }
90
91 pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
92 pub(super) fn next_epoch_future<T: UpstreamTable>(
93 upstream_table: &T,
94 epoch: u64,
95 ) -> NextEpochFuture<'_> {
96 Box::pin(async move { upstream_table.next_epoch(epoch).await })
97 }
98}
99
100use std::collections::{BTreeMap, HashMap};
101use std::mem::take;
102use std::pin::Pin;
103use std::task::{Context, Poll, ready};
104
105use risingwave_common::array::StreamChunk;
106use risingwave_common::hash::VirtualNode;
107use risingwave_common::row::OwnedRow;
108use risingwave_common_rate_limit::RateLimit;
109use upstream_table_ext::*;
110
111use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
112use crate::executor::backfill::snapshot_backfill::state::{
113 EpochBackfillProgress, VnodeBackfillProgress,
114};
115use crate::executor::prelude::{Stream, StreamExt};
116use crate::executor::{StreamExecutorError, StreamExecutorResult};
117
118enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
119 CreatingSnapshotStream {
120 future: UpstreamTableSnapshotStreamFuture<'a, T>,
121 snapshot_epoch: u64,
122 pre_finished_vnodes: HashMap<VirtualNode, usize>,
123 },
124 ConsumingSnapshotStream {
125 stream: UpstreamTableSnapshotStream<T>,
126 snapshot_epoch: u64,
127 pre_finished_vnodes: HashMap<VirtualNode, usize>,
128 },
129 CreatingChangeLogStream {
130 future: UpstreamTableChangeLogStreamFuture<'a, T>,
131 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
132 epoch: u64,
133 pre_finished_vnodes: HashMap<VirtualNode, usize>,
134 },
135 ConsumingChangeLogStream {
136 stream: UpstreamTableChangeLogStream<T>,
137 epoch: u64,
138 pre_finished_vnodes: HashMap<VirtualNode, usize>,
139 },
140 ResolvingNextEpoch {
141 future: NextEpochFuture<'a>,
142 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
143 },
144 Err,
145}
146
147pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
148 upstream_table: &'a T,
149 pending_epoch_vnode_progress:
150 BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
151 state: ConsumeUpstreamStreamState<'a, T>,
152
153 chunk_size: usize,
154 rate_limit: RateLimit,
155}
156
157impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
158 pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
159 match &mut self.state {
160 ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
161 stream.consume_builder()
162 }
163 ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
164 stream.consume_builder()
165 }
166 ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
167 | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
168 | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
169 ConsumeUpstreamStreamState::Err => {
170 unreachable!("should not be accessed on Err")
171 }
172 }
173 }
174
175 pub(super) async fn for_vnode_pk_progress(
176 &mut self,
177 mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
178 ) -> StreamExecutorResult<()> {
179 match &mut self.state {
180 ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
181 }
183 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
184 stream,
185 snapshot_epoch,
186 ..
187 } => {
188 stream
189 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
190 on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
191 })
192 .await?;
193 }
194 &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
195 ref mut stream,
196 ref epoch,
197 ..
198 } => {
199 stream
200 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
201 on_vnode_progress(vnode, *epoch, row_count, pk_progress)
202 })
203 .await?;
204 }
205 &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
206 ref prev_epoch_finished_vnodes,
207 ..
208 }
209 | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
210 ref prev_epoch_finished_vnodes,
211 ..
212 } => {
213 if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
214 for (vnode, row_count) in prev_epoch_finished_vnodes {
215 on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
216 }
217 }
218 }
219 ConsumeUpstreamStreamState::Err => {
220 unreachable!("should not be accessed on Err")
221 }
222 }
223 Ok(())
224 }
225}
226
227impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
228 type Item = StreamExecutorResult<StreamChunk>;
229
230 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231 let result: Result<!, StreamExecutorError> = try {
232 loop {
233 match &mut self.state {
234 ConsumeUpstreamStreamState::CreatingSnapshotStream {
235 future,
236 snapshot_epoch,
237 pre_finished_vnodes,
238 } => {
239 let stream = ready!(future.as_mut().poll(cx))?;
240 let snapshot_epoch = *snapshot_epoch;
241 let pre_finished_vnodes = take(pre_finished_vnodes);
242 self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
243 stream,
244 snapshot_epoch,
245 pre_finished_vnodes,
246 };
247 continue;
248 }
249 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
250 stream,
251 snapshot_epoch,
252 pre_finished_vnodes,
253 } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
254 None => {
255 let prev_epoch = *snapshot_epoch;
256 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
257 for (vnode, row_count) in stream.take_finished_vnodes() {
258 prev_epoch_finished_vnodes
259 .try_insert(vnode, row_count)
260 .expect("non-duplicate");
261 }
262 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
263 future: next_epoch_future(self.upstream_table, prev_epoch),
264 prev_epoch_finished_vnodes: Some((
265 prev_epoch,
266 prev_epoch_finished_vnodes,
267 )),
268 };
269 continue;
270 }
271 Some(chunk) => {
272 return Poll::Ready(Some(Ok(chunk)));
273 }
274 },
275 ConsumeUpstreamStreamState::CreatingChangeLogStream {
276 future,
277 epoch,
278 pre_finished_vnodes,
279 ..
280 } => {
281 let stream = ready!(future.as_mut().poll(cx))?;
282 let epoch = *epoch;
283 let pre_finished_vnodes = take(pre_finished_vnodes);
284 self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
285 stream,
286 epoch,
287 pre_finished_vnodes,
288 };
289 continue;
290 }
291 ConsumeUpstreamStreamState::ConsumingChangeLogStream {
292 stream,
293 epoch,
294 pre_finished_vnodes,
295 } => {
296 match ready!(stream.poll_next_unpin(cx)).transpose()? {
297 None => {
298 let prev_epoch = *epoch;
299 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
300 for (vnode, row_count) in stream.take_finished_vnodes() {
301 prev_epoch_finished_vnodes
302 .try_insert(vnode, row_count)
303 .expect("non-duplicate");
304 }
305 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
306 future: next_epoch_future(self.upstream_table, prev_epoch),
307 prev_epoch_finished_vnodes: Some((
308 prev_epoch,
309 prev_epoch_finished_vnodes,
310 )),
311 };
312 continue;
313 }
314 Some(chunk) => {
315 return Poll::Ready(Some(Ok(chunk)));
316 }
317 };
318 }
319 ConsumeUpstreamStreamState::ResolvingNextEpoch {
320 future,
321 prev_epoch_finished_vnodes,
322 } => {
323 let epoch = ready!(future.as_mut().poll(cx))?;
324 let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
325 let mut pre_finished_vnodes = HashMap::new();
326 let mut vnode_progresses = HashMap::new();
327 for prev_epoch_vnode in prev_epoch_finished_vnodes
328 .as_ref()
329 .map(|(_, vnodes)| vnodes.keys())
330 .into_iter()
331 .flatten()
332 {
333 vnode_progresses
334 .try_insert(*prev_epoch_vnode, (None, 0))
335 .expect("non-duplicate");
336 }
337 if let Some((pending_epoch, _)) =
338 self.pending_epoch_vnode_progress.first_key_value()
339 {
340 assert!(
342 epoch <= *pending_epoch,
343 "pending_epoch {} earlier than next epoch {}",
344 pending_epoch,
345 epoch
346 );
347 if epoch == *pending_epoch {
348 let (_, progress) = self
349 .pending_epoch_vnode_progress
350 .pop_first()
351 .expect("checked Some");
352 for (vnode, (progress, row_count)) in progress {
353 match progress {
354 EpochBackfillProgress::Consuming { latest_pk } => {
355 vnode_progresses
356 .try_insert(vnode, (Some(latest_pk), row_count))
357 .expect("non-duplicate");
358 }
359 EpochBackfillProgress::Consumed => {
360 pre_finished_vnodes
361 .try_insert(vnode, row_count)
362 .expect("non-duplicate");
363 }
364 }
365 }
366 }
367 }
368 self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
369 future: create_upstream_table_change_log_stream(
370 self.upstream_table,
371 epoch,
372 self.rate_limit,
373 self.chunk_size,
374 vnode_progresses,
375 ),
376 prev_epoch_finished_vnodes,
377 epoch,
378 pre_finished_vnodes,
379 };
380 continue;
381 }
382 ConsumeUpstreamStreamState::Err => {
383 unreachable!("should not be accessed on Err")
384 }
385 }
386 }
387 };
388 self.state = ConsumeUpstreamStreamState::Err;
389 Poll::Ready(Some(result.map(|unreachable| unreachable)))
390 }
391}
392
393impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
394 pub(super) fn new<'p>(
395 initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
396 upstream_table: &'a T,
397 snapshot_epoch: u64,
398 chunk_size: usize,
399 rate_limit: RateLimit,
400 ) -> Self {
401 let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
402 let mut finished_snapshot_epoch_vnodes = HashMap::new();
403 let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
404 for (vnode, progress) in initial_progress {
405 match progress {
406 None => {
407 ongoing_snapshot_epoch_vnodes
408 .try_insert(vnode, (None, 0))
409 .expect("non-duplicate");
410 }
411 Some(progress) => {
412 let epoch = progress.epoch;
413 let row_count = progress.row_count;
414 if epoch == snapshot_epoch {
415 match &progress.progress {
416 EpochBackfillProgress::Consumed => {
417 finished_snapshot_epoch_vnodes
418 .try_insert(vnode, row_count)
419 .expect("non-duplicate");
420 }
421 EpochBackfillProgress::Consuming { latest_pk } => {
422 ongoing_snapshot_epoch_vnodes
423 .try_insert(vnode, (Some(latest_pk.clone()), row_count))
424 .expect("non-duplicate");
425 }
426 }
427 } else {
428 assert!(
429 epoch > snapshot_epoch,
430 "epoch {} earlier than snapshot_epoch {} on vnode {}",
431 epoch,
432 snapshot_epoch,
433 vnode
434 );
435 pending_epoch_vnode_progress
436 .entry(epoch)
437 .or_default()
438 .try_insert(vnode, (progress.progress.clone(), progress.row_count))
439 .expect("non-duplicate");
440 }
441 }
442 };
443 }
444 let (pending_epoch_vnode_progress, state) = {
445 if !ongoing_snapshot_epoch_vnodes.is_empty() {
446 (
448 pending_epoch_vnode_progress,
449 ConsumeUpstreamStreamState::CreatingSnapshotStream {
450 future: create_upstream_table_snapshot_stream(
451 upstream_table,
452 snapshot_epoch,
453 rate_limit,
454 chunk_size,
455 ongoing_snapshot_epoch_vnodes,
456 ),
457 snapshot_epoch,
458 pre_finished_vnodes: finished_snapshot_epoch_vnodes,
459 },
460 )
461 } else if !finished_snapshot_epoch_vnodes.is_empty() {
462 (
464 pending_epoch_vnode_progress,
465 ConsumeUpstreamStreamState::ResolvingNextEpoch {
466 future: next_epoch_future(upstream_table, snapshot_epoch),
467 prev_epoch_finished_vnodes: Some((
468 snapshot_epoch,
469 finished_snapshot_epoch_vnodes,
470 )),
471 },
472 )
473 } else {
474 let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
476 .pop_first()
477 .expect("non-empty vnodes");
478 let mut ongoing_vnodes = HashMap::new();
479 let mut finished_vnodes = HashMap::new();
480 for (vnode, (progress, row_count)) in first_vnodes {
481 match progress {
482 EpochBackfillProgress::Consuming { latest_pk } => {
483 ongoing_vnodes
484 .try_insert(vnode, (Some(latest_pk), row_count))
485 .expect("non-duplicate");
486 }
487 EpochBackfillProgress::Consumed => {
488 finished_vnodes
489 .try_insert(vnode, row_count)
490 .expect("non-duplicate");
491 }
492 }
493 }
494 let state = if ongoing_vnodes.is_empty() {
495 ConsumeUpstreamStreamState::ResolvingNextEpoch {
497 future: next_epoch_future(upstream_table, first_epoch),
498 prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
499 }
500 } else {
501 ConsumeUpstreamStreamState::CreatingChangeLogStream {
502 future: create_upstream_table_change_log_stream(
503 upstream_table,
504 first_epoch,
505 rate_limit,
506 chunk_size,
507 ongoing_vnodes,
508 ),
509 prev_epoch_finished_vnodes: None,
510 epoch: first_epoch,
511 pre_finished_vnodes: finished_vnodes,
512 }
513 };
514 (pending_epoch_vnode_progress, state)
515 }
516 };
517 Self {
518 upstream_table,
519 pending_epoch_vnode_progress,
520 state,
521 chunk_size,
522 rate_limit,
523 }
524 }
525}