risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod upstream_table_ext {
16    use std::collections::HashMap;
17    use std::time::Duration;
18
19    use futures::future::{BoxFuture, try_join_all};
20    use futures::{TryFutureExt, TryStreamExt};
21    use risingwave_common::hash::VirtualNode;
22    use risingwave_common::row::OwnedRow;
23    use risingwave_common_rate_limit::RateLimit;
24    use risingwave_storage::table::ChangeLogRow;
25
26    use crate::executor::StreamExecutorResult;
27    use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
28    use crate::executor::backfill::snapshot_backfill::vnode_stream::{
29        ChangeLogRowStream, VnodeStream,
30    };
31    use crate::executor::backfill::utils::create_builder;
32
33    pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
34        VnodeStream<impl ChangeLogRowStream>;
35    pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
36        BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
37
38    #[define_opaque(UpstreamTableSnapshotStream)]
39    pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
40        upstream_table: &T,
41        snapshot_epoch: u64,
42        rate_limit: RateLimit,
43        chunk_size: usize,
44        rebuild_interval: Duration,
45        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
46    ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
47        Box::pin(async move {
48            let streams = try_join_all(vnode_progresses.into_iter().map(
49                |(vnode, (start_pk, row_count))| {
50                    upstream_table
51                        .snapshot_stream(vnode, snapshot_epoch, start_pk, rebuild_interval)
52                        .map_ok(move |stream| {
53                            (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
54                        })
55                },
56            ))
57            .await?;
58            Ok(VnodeStream::new(
59                streams,
60                upstream_table.pk_in_output_indices(),
61                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
62            ))
63        })
64    }
65
66    pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
67        VnodeStream<impl ChangeLogRowStream>;
68    pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
69        BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
70
71    #[define_opaque(UpstreamTableChangeLogStreamFuture)]
72    pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
73        upstream_table: &T,
74        epoch: u64,
75        rate_limit: RateLimit,
76        chunk_size: usize,
77        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
78    ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
79        Box::pin(async move {
80            let streams = try_join_all(vnode_progresses.into_iter().map(
81                |(vnode, (start_pk, row_count))| {
82                    upstream_table
83                        .change_log_stream(vnode, epoch, start_pk)
84                        .map_ok(move |stream| (vnode, stream, row_count))
85                },
86            ))
87            .await?;
88            Ok(VnodeStream::new(
89                streams,
90                upstream_table.pk_in_output_indices(),
91                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
92            ))
93        })
94    }
95
96    pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
97    pub(super) fn next_epoch_future<T: UpstreamTable>(
98        upstream_table: &T,
99        epoch: u64,
100    ) -> NextEpochFuture<'_> {
101        Box::pin(async move { upstream_table.next_epoch(epoch).await })
102    }
103}
104
105use std::collections::{BTreeMap, HashMap};
106use std::mem::take;
107use std::pin::Pin;
108use std::task::{Context, Poll, ready};
109use std::time::Duration;
110
111use risingwave_common::array::StreamChunk;
112use risingwave_common::hash::VirtualNode;
113use risingwave_common::row::OwnedRow;
114use risingwave_common_rate_limit::RateLimit;
115use upstream_table_ext::*;
116
117use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
118use crate::executor::backfill::snapshot_backfill::state::{
119    EpochBackfillProgress, VnodeBackfillProgress,
120};
121use crate::executor::prelude::{Stream, StreamExt};
122use crate::executor::{StreamExecutorError, StreamExecutorResult};
123
124enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
125    CreatingSnapshotStream {
126        future: UpstreamTableSnapshotStreamFuture<'a, T>,
127        snapshot_epoch: u64,
128        pre_finished_vnodes: HashMap<VirtualNode, usize>,
129    },
130    ConsumingSnapshotStream {
131        stream: UpstreamTableSnapshotStream<T>,
132        snapshot_epoch: u64,
133        pre_finished_vnodes: HashMap<VirtualNode, usize>,
134    },
135    CreatingChangeLogStream {
136        future: UpstreamTableChangeLogStreamFuture<'a, T>,
137        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
138        epoch: u64,
139        pre_finished_vnodes: HashMap<VirtualNode, usize>,
140    },
141    ConsumingChangeLogStream {
142        stream: UpstreamTableChangeLogStream<T>,
143        epoch: u64,
144        pre_finished_vnodes: HashMap<VirtualNode, usize>,
145    },
146    ResolvingNextEpoch {
147        future: NextEpochFuture<'a>,
148        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
149    },
150    Err,
151}
152
153pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
154    upstream_table: &'a T,
155    pending_epoch_vnode_progress:
156        BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
157    state: ConsumeUpstreamStreamState<'a, T>,
158
159    chunk_size: usize,
160    rate_limit: RateLimit,
161}
162
163impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
164    pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
165        match &mut self.state {
166            ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
167                stream.consume_builder()
168            }
169            ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
170                stream.consume_builder()
171            }
172            ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
173            | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
174            | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
175            ConsumeUpstreamStreamState::Err => {
176                unreachable!("should not be accessed on Err")
177            }
178        }
179    }
180
181    pub(super) async fn for_vnode_pk_progress(
182        &mut self,
183        mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
184    ) -> StreamExecutorResult<()> {
185        match &mut self.state {
186            ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
187                // no update
188            }
189            ConsumeUpstreamStreamState::ConsumingSnapshotStream {
190                stream,
191                snapshot_epoch,
192                ..
193            } => {
194                stream
195                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
196                        on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
197                    })
198                    .await?;
199            }
200            &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
201                ref mut stream,
202                ref epoch,
203                ..
204            } => {
205                stream
206                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
207                        on_vnode_progress(vnode, *epoch, row_count, pk_progress)
208                    })
209                    .await?;
210            }
211            &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
212                ref prev_epoch_finished_vnodes,
213                ..
214            }
215            | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
216                ref prev_epoch_finished_vnodes,
217                ..
218            } => {
219                if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
220                    for (vnode, row_count) in prev_epoch_finished_vnodes {
221                        on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
222                    }
223                }
224            }
225            ConsumeUpstreamStreamState::Err => {
226                unreachable!("should not be accessed on Err")
227            }
228        }
229        Ok(())
230    }
231}
232
233impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
234    type Item = StreamExecutorResult<StreamChunk>;
235
236    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237        let result: Result<!, StreamExecutorError> = try {
238            loop {
239                match &mut self.state {
240                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
241                        future,
242                        snapshot_epoch,
243                        pre_finished_vnodes,
244                    } => {
245                        let stream = ready!(future.as_mut().poll(cx))?;
246                        let snapshot_epoch = *snapshot_epoch;
247                        let pre_finished_vnodes = take(pre_finished_vnodes);
248                        self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
249                            stream,
250                            snapshot_epoch,
251                            pre_finished_vnodes,
252                        };
253                        continue;
254                    }
255                    ConsumeUpstreamStreamState::ConsumingSnapshotStream {
256                        stream,
257                        snapshot_epoch,
258                        pre_finished_vnodes,
259                    } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
260                        None => {
261                            let prev_epoch = *snapshot_epoch;
262                            let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
263                            for (vnode, row_count) in stream.take_finished_vnodes() {
264                                prev_epoch_finished_vnodes
265                                    .try_insert(vnode, row_count)
266                                    .expect("non-duplicate");
267                            }
268                            self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
269                                future: next_epoch_future(self.upstream_table, prev_epoch),
270                                prev_epoch_finished_vnodes: Some((
271                                    prev_epoch,
272                                    prev_epoch_finished_vnodes,
273                                )),
274                            };
275                            continue;
276                        }
277                        Some(chunk) => {
278                            return Poll::Ready(Some(Ok(chunk)));
279                        }
280                    },
281                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
282                        future,
283                        epoch,
284                        pre_finished_vnodes,
285                        ..
286                    } => {
287                        let stream = ready!(future.as_mut().poll(cx))?;
288                        let epoch = *epoch;
289                        let pre_finished_vnodes = take(pre_finished_vnodes);
290                        self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
291                            stream,
292                            epoch,
293                            pre_finished_vnodes,
294                        };
295                        continue;
296                    }
297                    ConsumeUpstreamStreamState::ConsumingChangeLogStream {
298                        stream,
299                        epoch,
300                        pre_finished_vnodes,
301                    } => {
302                        match ready!(stream.poll_next_unpin(cx)).transpose()? {
303                            None => {
304                                let prev_epoch = *epoch;
305                                let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
306                                for (vnode, row_count) in stream.take_finished_vnodes() {
307                                    prev_epoch_finished_vnodes
308                                        .try_insert(vnode, row_count)
309                                        .expect("non-duplicate");
310                                }
311                                self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
312                                    future: next_epoch_future(self.upstream_table, prev_epoch),
313                                    prev_epoch_finished_vnodes: Some((
314                                        prev_epoch,
315                                        prev_epoch_finished_vnodes,
316                                    )),
317                                };
318                                continue;
319                            }
320                            Some(chunk) => {
321                                return Poll::Ready(Some(Ok(chunk)));
322                            }
323                        };
324                    }
325                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
326                        future,
327                        prev_epoch_finished_vnodes,
328                    } => {
329                        let epoch = ready!(future.as_mut().poll(cx))?;
330                        let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
331                        let mut pre_finished_vnodes = HashMap::new();
332                        let mut vnode_progresses = HashMap::new();
333                        for prev_epoch_vnode in prev_epoch_finished_vnodes
334                            .as_ref()
335                            .map(|(_, vnodes)| vnodes.keys())
336                            .into_iter()
337                            .flatten()
338                        {
339                            vnode_progresses
340                                .try_insert(*prev_epoch_vnode, (None, 0))
341                                .expect("non-duplicate");
342                        }
343                        if let Some((pending_epoch, _)) =
344                            self.pending_epoch_vnode_progress.first_key_value()
345                        {
346                            // TODO: may return error instead to avoid panic
347                            assert!(
348                                epoch <= *pending_epoch,
349                                "pending_epoch {} earlier than next epoch {}",
350                                pending_epoch,
351                                epoch
352                            );
353                            if epoch == *pending_epoch {
354                                let (_, progress) = self
355                                    .pending_epoch_vnode_progress
356                                    .pop_first()
357                                    .expect("checked Some");
358                                for (vnode, (progress, row_count)) in progress {
359                                    match progress {
360                                        EpochBackfillProgress::Consuming { latest_pk } => {
361                                            vnode_progresses
362                                                .try_insert(vnode, (Some(latest_pk), row_count))
363                                                .expect("non-duplicate");
364                                        }
365                                        EpochBackfillProgress::Consumed => {
366                                            pre_finished_vnodes
367                                                .try_insert(vnode, row_count)
368                                                .expect("non-duplicate");
369                                        }
370                                    }
371                                }
372                            }
373                        }
374                        self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
375                            future: create_upstream_table_change_log_stream(
376                                self.upstream_table,
377                                epoch,
378                                self.rate_limit,
379                                self.chunk_size,
380                                vnode_progresses,
381                            ),
382                            prev_epoch_finished_vnodes,
383                            epoch,
384                            pre_finished_vnodes,
385                        };
386                        continue;
387                    }
388                    ConsumeUpstreamStreamState::Err => {
389                        unreachable!("should not be accessed on Err")
390                    }
391                }
392            }
393        };
394        self.state = ConsumeUpstreamStreamState::Err;
395        Poll::Ready(Some(result.map(|unreachable| unreachable)))
396    }
397}
398
399impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
400    pub(super) fn new<'p>(
401        initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
402        upstream_table: &'a T,
403        snapshot_epoch: u64,
404        chunk_size: usize,
405        rate_limit: RateLimit,
406        snapshot_rebuild_interval: Duration,
407    ) -> Self {
408        let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
409        let mut finished_snapshot_epoch_vnodes = HashMap::new();
410        let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
411        for (vnode, progress) in initial_progress {
412            match progress {
413                None => {
414                    ongoing_snapshot_epoch_vnodes
415                        .try_insert(vnode, (None, 0))
416                        .expect("non-duplicate");
417                }
418                Some(progress) => {
419                    let epoch = progress.epoch;
420                    let row_count = progress.row_count;
421                    if epoch == snapshot_epoch {
422                        match &progress.progress {
423                            EpochBackfillProgress::Consumed => {
424                                finished_snapshot_epoch_vnodes
425                                    .try_insert(vnode, row_count)
426                                    .expect("non-duplicate");
427                            }
428                            EpochBackfillProgress::Consuming { latest_pk } => {
429                                ongoing_snapshot_epoch_vnodes
430                                    .try_insert(vnode, (Some(latest_pk.clone()), row_count))
431                                    .expect("non-duplicate");
432                            }
433                        }
434                    } else {
435                        assert!(
436                            epoch > snapshot_epoch,
437                            "epoch {} earlier than snapshot_epoch {} on vnode {}",
438                            epoch,
439                            snapshot_epoch,
440                            vnode
441                        );
442                        pending_epoch_vnode_progress
443                            .entry(epoch)
444                            .or_default()
445                            .try_insert(vnode, (progress.progress.clone(), progress.row_count))
446                            .expect("non-duplicate");
447                    }
448                }
449            };
450        }
451        let (pending_epoch_vnode_progress, state) = {
452            if !ongoing_snapshot_epoch_vnodes.is_empty() {
453                // some vnode has not finished the snapshot epoch
454                (
455                    pending_epoch_vnode_progress,
456                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
457                        future: create_upstream_table_snapshot_stream(
458                            upstream_table,
459                            snapshot_epoch,
460                            rate_limit,
461                            chunk_size,
462                            snapshot_rebuild_interval,
463                            ongoing_snapshot_epoch_vnodes,
464                        ),
465                        snapshot_epoch,
466                        pre_finished_vnodes: finished_snapshot_epoch_vnodes,
467                    },
468                )
469            } else if !finished_snapshot_epoch_vnodes.is_empty() {
470                // all vnodes have finished the snapshot epoch, but some vnodes has not yet start the first log epoch
471                (
472                    pending_epoch_vnode_progress,
473                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
474                        future: next_epoch_future(upstream_table, snapshot_epoch),
475                        prev_epoch_finished_vnodes: Some((
476                            snapshot_epoch,
477                            finished_snapshot_epoch_vnodes,
478                        )),
479                    },
480                )
481            } else {
482                // all vnodes are in log epoch
483                let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
484                    .pop_first()
485                    .expect("non-empty vnodes");
486                let mut ongoing_vnodes = HashMap::new();
487                let mut finished_vnodes = HashMap::new();
488                for (vnode, (progress, row_count)) in first_vnodes {
489                    match progress {
490                        EpochBackfillProgress::Consuming { latest_pk } => {
491                            ongoing_vnodes
492                                .try_insert(vnode, (Some(latest_pk), row_count))
493                                .expect("non-duplicate");
494                        }
495                        EpochBackfillProgress::Consumed => {
496                            finished_vnodes
497                                .try_insert(vnode, row_count)
498                                .expect("non-duplicate");
499                        }
500                    }
501                }
502                let state = if ongoing_vnodes.is_empty() {
503                    // all vnodes have finished the current epoch
504                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
505                        future: next_epoch_future(upstream_table, first_epoch),
506                        prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
507                    }
508                } else {
509                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
510                        future: create_upstream_table_change_log_stream(
511                            upstream_table,
512                            first_epoch,
513                            rate_limit,
514                            chunk_size,
515                            ongoing_vnodes,
516                        ),
517                        prev_epoch_finished_vnodes: None,
518                        epoch: first_epoch,
519                        pre_finished_vnodes: finished_vnodes,
520                    }
521                };
522                (pending_epoch_vnode_progress, state)
523            }
524        };
525        Self {
526            upstream_table,
527            pending_epoch_vnode_progress,
528            state,
529            chunk_size,
530            rate_limit,
531        }
532    }
533}