risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod upstream_table_ext {
16    use std::collections::HashMap;
17
18    use futures::future::{BoxFuture, try_join_all};
19    use futures::{TryFutureExt, TryStreamExt};
20    use risingwave_common::hash::VirtualNode;
21    use risingwave_common::row::OwnedRow;
22    use risingwave_common_rate_limit::RateLimit;
23    use risingwave_storage::table::ChangeLogRow;
24
25    use crate::executor::StreamExecutorResult;
26    use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
27    use crate::executor::backfill::snapshot_backfill::vnode_stream::{
28        ChangeLogRowStream, VnodeStream,
29    };
30    use crate::executor::backfill::utils::create_builder;
31
32    pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
33        VnodeStream<impl ChangeLogRowStream>;
34    pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
35        BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
36    pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
37        upstream_table: &T,
38        snapshot_epoch: u64,
39        rate_limit: RateLimit,
40        chunk_size: usize,
41        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
42    ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
43        Box::pin(async move {
44            let streams = try_join_all(vnode_progresses.into_iter().map(
45                |(vnode, (start_pk, row_count))| {
46                    upstream_table
47                        .snapshot_stream(vnode, snapshot_epoch, start_pk)
48                        .map_ok(move |stream| {
49                            (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
50                        })
51                },
52            ))
53            .await?;
54            Ok(VnodeStream::new(
55                streams,
56                upstream_table.pk_in_output_indices(),
57                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
58            ))
59        })
60    }
61
62    pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
63        VnodeStream<impl ChangeLogRowStream>;
64    pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
65        BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
66
67    pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
68        upstream_table: &T,
69        epoch: u64,
70        rate_limit: RateLimit,
71        chunk_size: usize,
72        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
73    ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
74        Box::pin(async move {
75            let streams = try_join_all(vnode_progresses.into_iter().map(
76                |(vnode, (start_pk, row_count))| {
77                    upstream_table
78                        .change_log_stream(vnode, epoch, start_pk)
79                        .map_ok(move |stream| (vnode, stream, row_count))
80                },
81            ))
82            .await?;
83            Ok(VnodeStream::new(
84                streams,
85                upstream_table.pk_in_output_indices(),
86                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
87            ))
88        })
89    }
90
91    pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
92    pub(super) fn next_epoch_future<T: UpstreamTable>(
93        upstream_table: &T,
94        epoch: u64,
95    ) -> NextEpochFuture<'_> {
96        Box::pin(async move { upstream_table.next_epoch(epoch).await })
97    }
98}
99
100use std::collections::{BTreeMap, HashMap};
101use std::mem::take;
102use std::pin::Pin;
103use std::task::{Context, Poll, ready};
104
105use risingwave_common::array::StreamChunk;
106use risingwave_common::hash::VirtualNode;
107use risingwave_common::row::OwnedRow;
108use risingwave_common_rate_limit::RateLimit;
109use upstream_table_ext::*;
110
111use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
112use crate::executor::backfill::snapshot_backfill::state::{
113    EpochBackfillProgress, VnodeBackfillProgress,
114};
115use crate::executor::prelude::{Stream, StreamExt};
116use crate::executor::{StreamExecutorError, StreamExecutorResult};
117
118enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
119    CreatingSnapshotStream {
120        future: UpstreamTableSnapshotStreamFuture<'a, T>,
121        snapshot_epoch: u64,
122        pre_finished_vnodes: HashMap<VirtualNode, usize>,
123    },
124    ConsumingSnapshotStream {
125        stream: UpstreamTableSnapshotStream<T>,
126        snapshot_epoch: u64,
127        pre_finished_vnodes: HashMap<VirtualNode, usize>,
128    },
129    CreatingChangeLogStream {
130        future: UpstreamTableChangeLogStreamFuture<'a, T>,
131        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
132        epoch: u64,
133        pre_finished_vnodes: HashMap<VirtualNode, usize>,
134    },
135    ConsumingChangeLogStream {
136        stream: UpstreamTableChangeLogStream<T>,
137        epoch: u64,
138        pre_finished_vnodes: HashMap<VirtualNode, usize>,
139    },
140    ResolvingNextEpoch {
141        future: NextEpochFuture<'a>,
142        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
143    },
144    Err,
145}
146
147pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
148    upstream_table: &'a T,
149    pending_epoch_vnode_progress:
150        BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
151    state: ConsumeUpstreamStreamState<'a, T>,
152
153    chunk_size: usize,
154    rate_limit: RateLimit,
155}
156
157impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
158    pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
159        match &mut self.state {
160            ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
161                stream.consume_builder()
162            }
163            ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
164                stream.consume_builder()
165            }
166            ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
167            | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
168            | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
169            ConsumeUpstreamStreamState::Err => {
170                unreachable!("should not be accessed on Err")
171            }
172        }
173    }
174
175    pub(super) async fn for_vnode_pk_progress(
176        &mut self,
177        mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
178    ) -> StreamExecutorResult<()> {
179        match &mut self.state {
180            ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
181                // no update
182            }
183            ConsumeUpstreamStreamState::ConsumingSnapshotStream {
184                stream,
185                snapshot_epoch,
186                ..
187            } => {
188                stream
189                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
190                        on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
191                    })
192                    .await?;
193            }
194            &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
195                ref mut stream,
196                ref epoch,
197                ..
198            } => {
199                stream
200                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
201                        on_vnode_progress(vnode, *epoch, row_count, pk_progress)
202                    })
203                    .await?;
204            }
205            &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
206                ref prev_epoch_finished_vnodes,
207                ..
208            }
209            | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
210                ref prev_epoch_finished_vnodes,
211                ..
212            } => {
213                if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
214                    for (vnode, row_count) in prev_epoch_finished_vnodes {
215                        on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
216                    }
217                }
218            }
219            ConsumeUpstreamStreamState::Err => {
220                unreachable!("should not be accessed on Err")
221            }
222        }
223        Ok(())
224    }
225}
226
227impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
228    type Item = StreamExecutorResult<StreamChunk>;
229
230    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231        let result: Result<!, StreamExecutorError> = try {
232            loop {
233                match &mut self.state {
234                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
235                        future,
236                        snapshot_epoch,
237                        pre_finished_vnodes,
238                    } => {
239                        let stream = ready!(future.as_mut().poll(cx))?;
240                        let snapshot_epoch = *snapshot_epoch;
241                        let pre_finished_vnodes = take(pre_finished_vnodes);
242                        self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
243                            stream,
244                            snapshot_epoch,
245                            pre_finished_vnodes,
246                        };
247                        continue;
248                    }
249                    ConsumeUpstreamStreamState::ConsumingSnapshotStream {
250                        stream,
251                        snapshot_epoch,
252                        pre_finished_vnodes,
253                    } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
254                        None => {
255                            let prev_epoch = *snapshot_epoch;
256                            let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
257                            for (vnode, row_count) in stream.take_finished_vnodes() {
258                                prev_epoch_finished_vnodes
259                                    .try_insert(vnode, row_count)
260                                    .expect("non-duplicate");
261                            }
262                            self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
263                                future: next_epoch_future(self.upstream_table, prev_epoch),
264                                prev_epoch_finished_vnodes: Some((
265                                    prev_epoch,
266                                    prev_epoch_finished_vnodes,
267                                )),
268                            };
269                            continue;
270                        }
271                        Some(chunk) => {
272                            return Poll::Ready(Some(Ok(chunk)));
273                        }
274                    },
275                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
276                        future,
277                        epoch,
278                        pre_finished_vnodes,
279                        ..
280                    } => {
281                        let stream = ready!(future.as_mut().poll(cx))?;
282                        let epoch = *epoch;
283                        let pre_finished_vnodes = take(pre_finished_vnodes);
284                        self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
285                            stream,
286                            epoch,
287                            pre_finished_vnodes,
288                        };
289                        continue;
290                    }
291                    ConsumeUpstreamStreamState::ConsumingChangeLogStream {
292                        stream,
293                        epoch,
294                        pre_finished_vnodes,
295                    } => {
296                        match ready!(stream.poll_next_unpin(cx)).transpose()? {
297                            None => {
298                                let prev_epoch = *epoch;
299                                let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
300                                for (vnode, row_count) in stream.take_finished_vnodes() {
301                                    prev_epoch_finished_vnodes
302                                        .try_insert(vnode, row_count)
303                                        .expect("non-duplicate");
304                                }
305                                self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
306                                    future: next_epoch_future(self.upstream_table, prev_epoch),
307                                    prev_epoch_finished_vnodes: Some((
308                                        prev_epoch,
309                                        prev_epoch_finished_vnodes,
310                                    )),
311                                };
312                                continue;
313                            }
314                            Some(chunk) => {
315                                return Poll::Ready(Some(Ok(chunk)));
316                            }
317                        };
318                    }
319                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
320                        future,
321                        prev_epoch_finished_vnodes,
322                    } => {
323                        let epoch = ready!(future.as_mut().poll(cx))?;
324                        let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
325                        let mut pre_finished_vnodes = HashMap::new();
326                        let mut vnode_progresses = HashMap::new();
327                        for prev_epoch_vnode in prev_epoch_finished_vnodes
328                            .as_ref()
329                            .map(|(_, vnodes)| vnodes.keys())
330                            .into_iter()
331                            .flatten()
332                        {
333                            vnode_progresses
334                                .try_insert(*prev_epoch_vnode, (None, 0))
335                                .expect("non-duplicate");
336                        }
337                        if let Some((pending_epoch, _)) =
338                            self.pending_epoch_vnode_progress.first_key_value()
339                        {
340                            // TODO: may return error instead to avoid panic
341                            assert!(
342                                epoch <= *pending_epoch,
343                                "pending_epoch {} earlier than next epoch {}",
344                                pending_epoch,
345                                epoch
346                            );
347                            if epoch == *pending_epoch {
348                                let (_, progress) = self
349                                    .pending_epoch_vnode_progress
350                                    .pop_first()
351                                    .expect("checked Some");
352                                for (vnode, (progress, row_count)) in progress {
353                                    match progress {
354                                        EpochBackfillProgress::Consuming { latest_pk } => {
355                                            vnode_progresses
356                                                .try_insert(vnode, (Some(latest_pk), row_count))
357                                                .expect("non-duplicate");
358                                        }
359                                        EpochBackfillProgress::Consumed => {
360                                            pre_finished_vnodes
361                                                .try_insert(vnode, row_count)
362                                                .expect("non-duplicate");
363                                        }
364                                    }
365                                }
366                            }
367                        }
368                        self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
369                            future: create_upstream_table_change_log_stream(
370                                self.upstream_table,
371                                epoch,
372                                self.rate_limit,
373                                self.chunk_size,
374                                vnode_progresses,
375                            ),
376                            prev_epoch_finished_vnodes,
377                            epoch,
378                            pre_finished_vnodes,
379                        };
380                        continue;
381                    }
382                    ConsumeUpstreamStreamState::Err => {
383                        unreachable!("should not be accessed on Err")
384                    }
385                }
386            }
387        };
388        self.state = ConsumeUpstreamStreamState::Err;
389        Poll::Ready(Some(result.map(|unreachable| unreachable)))
390    }
391}
392
393impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
394    pub(super) fn new<'p>(
395        initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
396        upstream_table: &'a T,
397        snapshot_epoch: u64,
398        chunk_size: usize,
399        rate_limit: RateLimit,
400    ) -> Self {
401        let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
402        let mut finished_snapshot_epoch_vnodes = HashMap::new();
403        let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
404        for (vnode, progress) in initial_progress {
405            match progress {
406                None => {
407                    ongoing_snapshot_epoch_vnodes
408                        .try_insert(vnode, (None, 0))
409                        .expect("non-duplicate");
410                }
411                Some(progress) => {
412                    let epoch = progress.epoch;
413                    let row_count = progress.row_count;
414                    if epoch == snapshot_epoch {
415                        match &progress.progress {
416                            EpochBackfillProgress::Consumed => {
417                                finished_snapshot_epoch_vnodes
418                                    .try_insert(vnode, row_count)
419                                    .expect("non-duplicate");
420                            }
421                            EpochBackfillProgress::Consuming { latest_pk } => {
422                                ongoing_snapshot_epoch_vnodes
423                                    .try_insert(vnode, (Some(latest_pk.clone()), row_count))
424                                    .expect("non-duplicate");
425                            }
426                        }
427                    } else {
428                        assert!(
429                            epoch > snapshot_epoch,
430                            "epoch {} earlier than snapshot_epoch {} on vnode {}",
431                            epoch,
432                            snapshot_epoch,
433                            vnode
434                        );
435                        pending_epoch_vnode_progress
436                            .entry(epoch)
437                            .or_default()
438                            .try_insert(vnode, (progress.progress.clone(), progress.row_count))
439                            .expect("non-duplicate");
440                    }
441                }
442            };
443        }
444        let (pending_epoch_vnode_progress, state) = {
445            if !ongoing_snapshot_epoch_vnodes.is_empty() {
446                // some vnode has not finished the snapshot epoch
447                (
448                    pending_epoch_vnode_progress,
449                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
450                        future: create_upstream_table_snapshot_stream(
451                            upstream_table,
452                            snapshot_epoch,
453                            rate_limit,
454                            chunk_size,
455                            ongoing_snapshot_epoch_vnodes,
456                        ),
457                        snapshot_epoch,
458                        pre_finished_vnodes: finished_snapshot_epoch_vnodes,
459                    },
460                )
461            } else if !finished_snapshot_epoch_vnodes.is_empty() {
462                // all vnodes have finished the snapshot epoch, but some vnodes has not yet start the first log epoch
463                (
464                    pending_epoch_vnode_progress,
465                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
466                        future: next_epoch_future(upstream_table, snapshot_epoch),
467                        prev_epoch_finished_vnodes: Some((
468                            snapshot_epoch,
469                            finished_snapshot_epoch_vnodes,
470                        )),
471                    },
472                )
473            } else {
474                // all vnodes are in log epoch
475                let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
476                    .pop_first()
477                    .expect("non-empty vnodes");
478                let mut ongoing_vnodes = HashMap::new();
479                let mut finished_vnodes = HashMap::new();
480                for (vnode, (progress, row_count)) in first_vnodes {
481                    match progress {
482                        EpochBackfillProgress::Consuming { latest_pk } => {
483                            ongoing_vnodes
484                                .try_insert(vnode, (Some(latest_pk), row_count))
485                                .expect("non-duplicate");
486                        }
487                        EpochBackfillProgress::Consumed => {
488                            finished_vnodes
489                                .try_insert(vnode, row_count)
490                                .expect("non-duplicate");
491                        }
492                    }
493                }
494                let state = if ongoing_vnodes.is_empty() {
495                    // all vnodes have finished the current epoch
496                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
497                        future: next_epoch_future(upstream_table, first_epoch),
498                        prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
499                    }
500                } else {
501                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
502                        future: create_upstream_table_change_log_stream(
503                            upstream_table,
504                            first_epoch,
505                            rate_limit,
506                            chunk_size,
507                            ongoing_vnodes,
508                        ),
509                        prev_epoch_finished_vnodes: None,
510                        epoch: first_epoch,
511                        pre_finished_vnodes: finished_vnodes,
512                    }
513                };
514                (pending_epoch_vnode_progress, state)
515            }
516        };
517        Self {
518            upstream_table,
519            pending_epoch_vnode_progress,
520            state,
521            chunk_size,
522            rate_limit,
523        }
524    }
525}