risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod upstream_table_ext {
16    use std::collections::HashMap;
17
18    use futures::future::{BoxFuture, try_join_all};
19    use futures::{TryFutureExt, TryStreamExt};
20    use risingwave_common::hash::VirtualNode;
21    use risingwave_common::row::OwnedRow;
22    use risingwave_common_rate_limit::RateLimit;
23    use risingwave_storage::table::ChangeLogRow;
24
25    use crate::executor::StreamExecutorResult;
26    use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
27    use crate::executor::backfill::snapshot_backfill::vnode_stream::{
28        ChangeLogRowStream, VnodeStream,
29    };
30    use crate::executor::backfill::utils::create_builder;
31
32    pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
33        VnodeStream<impl ChangeLogRowStream>;
34    pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
35        BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
36
37    #[define_opaque(UpstreamTableSnapshotStream)]
38    pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
39        upstream_table: &T,
40        snapshot_epoch: u64,
41        rate_limit: RateLimit,
42        chunk_size: usize,
43        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
44    ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
45        Box::pin(async move {
46            let streams = try_join_all(vnode_progresses.into_iter().map(
47                |(vnode, (start_pk, row_count))| {
48                    upstream_table
49                        .snapshot_stream(vnode, snapshot_epoch, start_pk)
50                        .map_ok(move |stream| {
51                            (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
52                        })
53                },
54            ))
55            .await?;
56            Ok(VnodeStream::new(
57                streams,
58                upstream_table.pk_in_output_indices(),
59                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
60            ))
61        })
62    }
63
64    pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
65        VnodeStream<impl ChangeLogRowStream>;
66    pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
67        BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
68
69    #[define_opaque(UpstreamTableChangeLogStreamFuture)]
70    pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
71        upstream_table: &T,
72        epoch: u64,
73        rate_limit: RateLimit,
74        chunk_size: usize,
75        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
76    ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
77        Box::pin(async move {
78            let streams = try_join_all(vnode_progresses.into_iter().map(
79                |(vnode, (start_pk, row_count))| {
80                    upstream_table
81                        .change_log_stream(vnode, epoch, start_pk)
82                        .map_ok(move |stream| (vnode, stream, row_count))
83                },
84            ))
85            .await?;
86            Ok(VnodeStream::new(
87                streams,
88                upstream_table.pk_in_output_indices(),
89                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
90            ))
91        })
92    }
93
94    pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
95    pub(super) fn next_epoch_future<T: UpstreamTable>(
96        upstream_table: &T,
97        epoch: u64,
98    ) -> NextEpochFuture<'_> {
99        Box::pin(async move { upstream_table.next_epoch(epoch).await })
100    }
101}
102
103use std::collections::{BTreeMap, HashMap};
104use std::mem::take;
105use std::pin::Pin;
106use std::task::{Context, Poll, ready};
107
108use risingwave_common::array::StreamChunk;
109use risingwave_common::hash::VirtualNode;
110use risingwave_common::row::OwnedRow;
111use risingwave_common_rate_limit::RateLimit;
112use upstream_table_ext::*;
113
114use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
115use crate::executor::backfill::snapshot_backfill::state::{
116    EpochBackfillProgress, VnodeBackfillProgress,
117};
118use crate::executor::prelude::{Stream, StreamExt};
119use crate::executor::{StreamExecutorError, StreamExecutorResult};
120
121enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
122    CreatingSnapshotStream {
123        future: UpstreamTableSnapshotStreamFuture<'a, T>,
124        snapshot_epoch: u64,
125        pre_finished_vnodes: HashMap<VirtualNode, usize>,
126    },
127    ConsumingSnapshotStream {
128        stream: UpstreamTableSnapshotStream<T>,
129        snapshot_epoch: u64,
130        pre_finished_vnodes: HashMap<VirtualNode, usize>,
131    },
132    CreatingChangeLogStream {
133        future: UpstreamTableChangeLogStreamFuture<'a, T>,
134        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
135        epoch: u64,
136        pre_finished_vnodes: HashMap<VirtualNode, usize>,
137    },
138    ConsumingChangeLogStream {
139        stream: UpstreamTableChangeLogStream<T>,
140        epoch: u64,
141        pre_finished_vnodes: HashMap<VirtualNode, usize>,
142    },
143    ResolvingNextEpoch {
144        future: NextEpochFuture<'a>,
145        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
146    },
147    Err,
148}
149
150pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
151    upstream_table: &'a T,
152    pending_epoch_vnode_progress:
153        BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
154    state: ConsumeUpstreamStreamState<'a, T>,
155
156    chunk_size: usize,
157    rate_limit: RateLimit,
158}
159
160impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
161    pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
162        match &mut self.state {
163            ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
164                stream.consume_builder()
165            }
166            ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
167                stream.consume_builder()
168            }
169            ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
170            | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
171            | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
172            ConsumeUpstreamStreamState::Err => {
173                unreachable!("should not be accessed on Err")
174            }
175        }
176    }
177
178    pub(super) async fn for_vnode_pk_progress(
179        &mut self,
180        mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
181    ) -> StreamExecutorResult<()> {
182        match &mut self.state {
183            ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
184                // no update
185            }
186            ConsumeUpstreamStreamState::ConsumingSnapshotStream {
187                stream,
188                snapshot_epoch,
189                ..
190            } => {
191                stream
192                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
193                        on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
194                    })
195                    .await?;
196            }
197            &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
198                ref mut stream,
199                ref epoch,
200                ..
201            } => {
202                stream
203                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
204                        on_vnode_progress(vnode, *epoch, row_count, pk_progress)
205                    })
206                    .await?;
207            }
208            &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
209                ref prev_epoch_finished_vnodes,
210                ..
211            }
212            | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
213                ref prev_epoch_finished_vnodes,
214                ..
215            } => {
216                if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
217                    for (vnode, row_count) in prev_epoch_finished_vnodes {
218                        on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
219                    }
220                }
221            }
222            ConsumeUpstreamStreamState::Err => {
223                unreachable!("should not be accessed on Err")
224            }
225        }
226        Ok(())
227    }
228}
229
230impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
231    type Item = StreamExecutorResult<StreamChunk>;
232
233    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
234        let result: Result<!, StreamExecutorError> = try {
235            loop {
236                match &mut self.state {
237                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
238                        future,
239                        snapshot_epoch,
240                        pre_finished_vnodes,
241                    } => {
242                        let stream = ready!(future.as_mut().poll(cx))?;
243                        let snapshot_epoch = *snapshot_epoch;
244                        let pre_finished_vnodes = take(pre_finished_vnodes);
245                        self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
246                            stream,
247                            snapshot_epoch,
248                            pre_finished_vnodes,
249                        };
250                        continue;
251                    }
252                    ConsumeUpstreamStreamState::ConsumingSnapshotStream {
253                        stream,
254                        snapshot_epoch,
255                        pre_finished_vnodes,
256                    } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
257                        None => {
258                            let prev_epoch = *snapshot_epoch;
259                            let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
260                            for (vnode, row_count) in stream.take_finished_vnodes() {
261                                prev_epoch_finished_vnodes
262                                    .try_insert(vnode, row_count)
263                                    .expect("non-duplicate");
264                            }
265                            self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
266                                future: next_epoch_future(self.upstream_table, prev_epoch),
267                                prev_epoch_finished_vnodes: Some((
268                                    prev_epoch,
269                                    prev_epoch_finished_vnodes,
270                                )),
271                            };
272                            continue;
273                        }
274                        Some(chunk) => {
275                            return Poll::Ready(Some(Ok(chunk)));
276                        }
277                    },
278                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
279                        future,
280                        epoch,
281                        pre_finished_vnodes,
282                        ..
283                    } => {
284                        let stream = ready!(future.as_mut().poll(cx))?;
285                        let epoch = *epoch;
286                        let pre_finished_vnodes = take(pre_finished_vnodes);
287                        self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
288                            stream,
289                            epoch,
290                            pre_finished_vnodes,
291                        };
292                        continue;
293                    }
294                    ConsumeUpstreamStreamState::ConsumingChangeLogStream {
295                        stream,
296                        epoch,
297                        pre_finished_vnodes,
298                    } => {
299                        match ready!(stream.poll_next_unpin(cx)).transpose()? {
300                            None => {
301                                let prev_epoch = *epoch;
302                                let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
303                                for (vnode, row_count) in stream.take_finished_vnodes() {
304                                    prev_epoch_finished_vnodes
305                                        .try_insert(vnode, row_count)
306                                        .expect("non-duplicate");
307                                }
308                                self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
309                                    future: next_epoch_future(self.upstream_table, prev_epoch),
310                                    prev_epoch_finished_vnodes: Some((
311                                        prev_epoch,
312                                        prev_epoch_finished_vnodes,
313                                    )),
314                                };
315                                continue;
316                            }
317                            Some(chunk) => {
318                                return Poll::Ready(Some(Ok(chunk)));
319                            }
320                        };
321                    }
322                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
323                        future,
324                        prev_epoch_finished_vnodes,
325                    } => {
326                        let epoch = ready!(future.as_mut().poll(cx))?;
327                        let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
328                        let mut pre_finished_vnodes = HashMap::new();
329                        let mut vnode_progresses = HashMap::new();
330                        for prev_epoch_vnode in prev_epoch_finished_vnodes
331                            .as_ref()
332                            .map(|(_, vnodes)| vnodes.keys())
333                            .into_iter()
334                            .flatten()
335                        {
336                            vnode_progresses
337                                .try_insert(*prev_epoch_vnode, (None, 0))
338                                .expect("non-duplicate");
339                        }
340                        if let Some((pending_epoch, _)) =
341                            self.pending_epoch_vnode_progress.first_key_value()
342                        {
343                            // TODO: may return error instead to avoid panic
344                            assert!(
345                                epoch <= *pending_epoch,
346                                "pending_epoch {} earlier than next epoch {}",
347                                pending_epoch,
348                                epoch
349                            );
350                            if epoch == *pending_epoch {
351                                let (_, progress) = self
352                                    .pending_epoch_vnode_progress
353                                    .pop_first()
354                                    .expect("checked Some");
355                                for (vnode, (progress, row_count)) in progress {
356                                    match progress {
357                                        EpochBackfillProgress::Consuming { latest_pk } => {
358                                            vnode_progresses
359                                                .try_insert(vnode, (Some(latest_pk), row_count))
360                                                .expect("non-duplicate");
361                                        }
362                                        EpochBackfillProgress::Consumed => {
363                                            pre_finished_vnodes
364                                                .try_insert(vnode, row_count)
365                                                .expect("non-duplicate");
366                                        }
367                                    }
368                                }
369                            }
370                        }
371                        self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
372                            future: create_upstream_table_change_log_stream(
373                                self.upstream_table,
374                                epoch,
375                                self.rate_limit,
376                                self.chunk_size,
377                                vnode_progresses,
378                            ),
379                            prev_epoch_finished_vnodes,
380                            epoch,
381                            pre_finished_vnodes,
382                        };
383                        continue;
384                    }
385                    ConsumeUpstreamStreamState::Err => {
386                        unreachable!("should not be accessed on Err")
387                    }
388                }
389            }
390        };
391        self.state = ConsumeUpstreamStreamState::Err;
392        Poll::Ready(Some(result.map(|unreachable| unreachable)))
393    }
394}
395
396impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
397    pub(super) fn new<'p>(
398        initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
399        upstream_table: &'a T,
400        snapshot_epoch: u64,
401        chunk_size: usize,
402        rate_limit: RateLimit,
403    ) -> Self {
404        let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
405        let mut finished_snapshot_epoch_vnodes = HashMap::new();
406        let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
407        for (vnode, progress) in initial_progress {
408            match progress {
409                None => {
410                    ongoing_snapshot_epoch_vnodes
411                        .try_insert(vnode, (None, 0))
412                        .expect("non-duplicate");
413                }
414                Some(progress) => {
415                    let epoch = progress.epoch;
416                    let row_count = progress.row_count;
417                    if epoch == snapshot_epoch {
418                        match &progress.progress {
419                            EpochBackfillProgress::Consumed => {
420                                finished_snapshot_epoch_vnodes
421                                    .try_insert(vnode, row_count)
422                                    .expect("non-duplicate");
423                            }
424                            EpochBackfillProgress::Consuming { latest_pk } => {
425                                ongoing_snapshot_epoch_vnodes
426                                    .try_insert(vnode, (Some(latest_pk.clone()), row_count))
427                                    .expect("non-duplicate");
428                            }
429                        }
430                    } else {
431                        assert!(
432                            epoch > snapshot_epoch,
433                            "epoch {} earlier than snapshot_epoch {} on vnode {}",
434                            epoch,
435                            snapshot_epoch,
436                            vnode
437                        );
438                        pending_epoch_vnode_progress
439                            .entry(epoch)
440                            .or_default()
441                            .try_insert(vnode, (progress.progress.clone(), progress.row_count))
442                            .expect("non-duplicate");
443                    }
444                }
445            };
446        }
447        let (pending_epoch_vnode_progress, state) = {
448            if !ongoing_snapshot_epoch_vnodes.is_empty() {
449                // some vnode has not finished the snapshot epoch
450                (
451                    pending_epoch_vnode_progress,
452                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
453                        future: create_upstream_table_snapshot_stream(
454                            upstream_table,
455                            snapshot_epoch,
456                            rate_limit,
457                            chunk_size,
458                            ongoing_snapshot_epoch_vnodes,
459                        ),
460                        snapshot_epoch,
461                        pre_finished_vnodes: finished_snapshot_epoch_vnodes,
462                    },
463                )
464            } else if !finished_snapshot_epoch_vnodes.is_empty() {
465                // all vnodes have finished the snapshot epoch, but some vnodes has not yet start the first log epoch
466                (
467                    pending_epoch_vnode_progress,
468                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
469                        future: next_epoch_future(upstream_table, snapshot_epoch),
470                        prev_epoch_finished_vnodes: Some((
471                            snapshot_epoch,
472                            finished_snapshot_epoch_vnodes,
473                        )),
474                    },
475                )
476            } else {
477                // all vnodes are in log epoch
478                let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
479                    .pop_first()
480                    .expect("non-empty vnodes");
481                let mut ongoing_vnodes = HashMap::new();
482                let mut finished_vnodes = HashMap::new();
483                for (vnode, (progress, row_count)) in first_vnodes {
484                    match progress {
485                        EpochBackfillProgress::Consuming { latest_pk } => {
486                            ongoing_vnodes
487                                .try_insert(vnode, (Some(latest_pk), row_count))
488                                .expect("non-duplicate");
489                        }
490                        EpochBackfillProgress::Consumed => {
491                            finished_vnodes
492                                .try_insert(vnode, row_count)
493                                .expect("non-duplicate");
494                        }
495                    }
496                }
497                let state = if ongoing_vnodes.is_empty() {
498                    // all vnodes have finished the current epoch
499                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
500                        future: next_epoch_future(upstream_table, first_epoch),
501                        prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
502                    }
503                } else {
504                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
505                        future: create_upstream_table_change_log_stream(
506                            upstream_table,
507                            first_epoch,
508                            rate_limit,
509                            chunk_size,
510                            ongoing_vnodes,
511                        ),
512                        prev_epoch_finished_vnodes: None,
513                        epoch: first_epoch,
514                        pre_finished_vnodes: finished_vnodes,
515                    }
516                };
517                (pending_epoch_vnode_progress, state)
518            }
519        };
520        Self {
521            upstream_table,
522            pending_epoch_vnode_progress,
523            state,
524            chunk_size,
525            rate_limit,
526        }
527    }
528}