Skip to main content

risingwave_stream/executor/backfill/snapshot_backfill/consume_upstream/
stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod upstream_table_ext {
16    use std::collections::HashMap;
17    use std::time::Duration;
18
19    use futures::future::{BoxFuture, try_join_all};
20    use futures::{TryFutureExt, TryStreamExt};
21    use risingwave_common::hash::VirtualNode;
22    use risingwave_common::row::OwnedRow;
23    use risingwave_common_rate_limit::RateLimit;
24    use risingwave_storage::table::ChangeLogRow;
25
26    use crate::executor::StreamExecutorResult;
27    use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
28    use crate::executor::backfill::snapshot_backfill::vnode_stream::{
29        ChangeLogRowStream, VnodeStream,
30    };
31    use crate::executor::backfill::utils::create_builder;
32
33    pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
34        VnodeStream<impl ChangeLogRowStream>;
35    pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
36        BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
37
38    #[define_opaque(UpstreamTableSnapshotStream)]
39    pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
40        upstream_table: &T,
41        snapshot_epoch: u64,
42        rate_limit: RateLimit,
43        chunk_size: usize,
44        rebuild_interval: Duration,
45        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
46    ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
47        Box::pin(async move {
48            let streams = try_join_all(vnode_progresses.into_iter().map(
49                |(vnode, (start_pk, row_count))| {
50                    upstream_table
51                        .snapshot_stream(vnode, snapshot_epoch, start_pk, rebuild_interval)
52                        .map_ok(move |stream| {
53                            (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
54                        })
55                },
56            ))
57            .await?;
58            Ok(VnodeStream::new(
59                streams,
60                upstream_table.pk_in_output_indices(),
61                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
62            ))
63        })
64    }
65
66    pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
67        VnodeStream<impl ChangeLogRowStream>;
68    pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
69        BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
70
71    #[define_opaque(UpstreamTableChangeLogStreamFuture)]
72    pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
73        upstream_table: &T,
74        epoch: u64,
75        rate_limit: RateLimit,
76        chunk_size: usize,
77        vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
78    ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
79        Box::pin(async move {
80            let streams = try_join_all(vnode_progresses.into_iter().map(
81                |(vnode, (start_pk, row_count))| {
82                    upstream_table
83                        .change_log_stream(vnode, epoch, start_pk)
84                        .map_ok(move |stream| (vnode, stream, row_count))
85                },
86            ))
87            .await?;
88            Ok(VnodeStream::new(
89                streams,
90                upstream_table.pk_in_output_indices(),
91                create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
92            ))
93        })
94    }
95
96    pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
97    pub(super) fn next_epoch_future<T: UpstreamTable>(
98        upstream_table: &T,
99        epoch: u64,
100    ) -> NextEpochFuture<'_> {
101        Box::pin(async move { upstream_table.next_epoch(epoch).await })
102    }
103}
104
105use std::collections::{BTreeMap, HashMap};
106use std::mem::take;
107use std::pin::Pin;
108use std::task::{Context, Poll, ready};
109use std::time::Duration;
110
111use risingwave_common::array::StreamChunk;
112use risingwave_common::hash::VirtualNode;
113use risingwave_common::row::OwnedRow;
114use risingwave_common_rate_limit::RateLimit;
115use risingwave_storage::error::ErrorKind as StorageErrorKind;
116use risingwave_storage::hummock::HummockErrorInner;
117use thiserror_ext::AsReport;
118use upstream_table_ext::*;
119
120use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
121use crate::executor::backfill::snapshot_backfill::state::{
122    EpochBackfillProgress, VnodeBackfillProgress,
123};
124use crate::executor::error::ErrorKind as StreamExecutorErrorKind;
125use crate::executor::prelude::{Stream, StreamExt};
126use crate::executor::{StreamExecutorError, StreamExecutorResult};
127
128enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
129    CreatingSnapshotStream {
130        future: UpstreamTableSnapshotStreamFuture<'a, T>,
131        snapshot_epoch: u64,
132        pre_finished_vnodes: HashMap<VirtualNode, usize>,
133    },
134    ConsumingSnapshotStream {
135        stream: UpstreamTableSnapshotStream<T>,
136        snapshot_epoch: u64,
137        pre_finished_vnodes: HashMap<VirtualNode, usize>,
138    },
139    CreatingChangeLogStream {
140        future: UpstreamTableChangeLogStreamFuture<'a, T>,
141        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
142        epoch: u64,
143        pre_finished_vnodes: HashMap<VirtualNode, usize>,
144    },
145    ConsumingChangeLogStream {
146        stream: UpstreamTableChangeLogStream<T>,
147        epoch: u64,
148        pre_finished_vnodes: HashMap<VirtualNode, usize>,
149    },
150    ResolvingNextEpoch {
151        future: NextEpochFuture<'a>,
152        prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
153    },
154    StoppedOnRetentionMiss,
155    Err,
156}
157
158fn is_retention_or_snapshot_expired_error(error: &StreamExecutorError) -> bool {
159    let StreamExecutorErrorKind::Storage(storage_error) = error.inner() else {
160        return false;
161    };
162    let StorageErrorKind::Hummock(hummock_error) = storage_error.inner() else {
163        return false;
164    };
165
166    matches!(
167        hummock_error.inner(),
168        HummockErrorInner::ChangeLogRetentionMiss { .. }
169            | HummockErrorInner::TimeTravelVersionExpired { .. }
170            | HummockErrorInner::CommittedEpochMismatch { .. }
171    )
172}
173
174fn log_retention_or_snapshot_expired(error: &StreamExecutorError) {
175    tracing::warn!(
176        error = %error.as_report(),
177        "stop consuming cross-database upstream because upstream retention or snapshot availability was exceeded"
178    );
179}
180
181pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
182    upstream_table: &'a T,
183    pending_epoch_vnode_progress:
184        BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
185    state: ConsumeUpstreamStreamState<'a, T>,
186
187    chunk_size: usize,
188    rate_limit: RateLimit,
189}
190
191impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
192    pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
193        match &mut self.state {
194            ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
195                stream.consume_builder()
196            }
197            ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
198                stream.consume_builder()
199            }
200            ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
201            | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
202            | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
203            ConsumeUpstreamStreamState::StoppedOnRetentionMiss => None,
204            ConsumeUpstreamStreamState::Err => {
205                unreachable!("should not be accessed on Err")
206            }
207        }
208    }
209
210    pub(super) async fn for_vnode_pk_progress(
211        &mut self,
212        mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
213    ) -> StreamExecutorResult<()> {
214        match &mut self.state {
215            ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
216                // no update
217            }
218            ConsumeUpstreamStreamState::ConsumingSnapshotStream {
219                stream,
220                snapshot_epoch,
221                ..
222            } => {
223                stream
224                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
225                        on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
226                    })
227                    .await?;
228            }
229            &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
230                ref mut stream,
231                ref epoch,
232                ..
233            } => {
234                stream
235                    .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
236                        on_vnode_progress(vnode, *epoch, row_count, pk_progress)
237                    })
238                    .await?;
239            }
240            &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
241                ref prev_epoch_finished_vnodes,
242                ..
243            }
244            | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
245                ref prev_epoch_finished_vnodes,
246                ..
247            } => {
248                if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
249                    for (vnode, row_count) in prev_epoch_finished_vnodes {
250                        on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
251                    }
252                }
253            }
254            ConsumeUpstreamStreamState::StoppedOnRetentionMiss => {}
255            ConsumeUpstreamStreamState::Err => {
256                unreachable!("should not be accessed on Err")
257            }
258        }
259        Ok(())
260    }
261}
262
263impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
264    type Item = StreamExecutorResult<StreamChunk>;
265
266    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
267        let result: Result<!, StreamExecutorError> = try {
268            loop {
269                match &mut self.state {
270                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
271                        future,
272                        snapshot_epoch,
273                        pre_finished_vnodes,
274                    } => {
275                        let stream = ready!(future.as_mut().poll(cx))?;
276                        let snapshot_epoch = *snapshot_epoch;
277                        let pre_finished_vnodes = take(pre_finished_vnodes);
278                        self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
279                            stream,
280                            snapshot_epoch,
281                            pre_finished_vnodes,
282                        };
283                        continue;
284                    }
285                    ConsumeUpstreamStreamState::ConsumingSnapshotStream {
286                        stream,
287                        snapshot_epoch,
288                        pre_finished_vnodes,
289                    } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
290                        None => {
291                            let prev_epoch = *snapshot_epoch;
292                            let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
293                            for (vnode, row_count) in stream.take_finished_vnodes() {
294                                prev_epoch_finished_vnodes
295                                    .try_insert(vnode, row_count)
296                                    .expect("non-duplicate");
297                            }
298                            self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
299                                future: next_epoch_future(self.upstream_table, prev_epoch),
300                                prev_epoch_finished_vnodes: Some((
301                                    prev_epoch,
302                                    prev_epoch_finished_vnodes,
303                                )),
304                            };
305                            continue;
306                        }
307                        Some(chunk) => {
308                            return Poll::Ready(Some(Ok(chunk)));
309                        }
310                    },
311                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
312                        future,
313                        epoch,
314                        pre_finished_vnodes,
315                        ..
316                    } => {
317                        let stream = ready!(future.as_mut().poll(cx))?;
318                        let epoch = *epoch;
319                        let pre_finished_vnodes = take(pre_finished_vnodes);
320                        self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
321                            stream,
322                            epoch,
323                            pre_finished_vnodes,
324                        };
325                        continue;
326                    }
327                    ConsumeUpstreamStreamState::ConsumingChangeLogStream {
328                        stream,
329                        epoch,
330                        pre_finished_vnodes,
331                    } => {
332                        match ready!(stream.poll_next_unpin(cx)).transpose()? {
333                            None => {
334                                let prev_epoch = *epoch;
335                                let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
336                                for (vnode, row_count) in stream.take_finished_vnodes() {
337                                    prev_epoch_finished_vnodes
338                                        .try_insert(vnode, row_count)
339                                        .expect("non-duplicate");
340                                }
341                                self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
342                                    future: next_epoch_future(self.upstream_table, prev_epoch),
343                                    prev_epoch_finished_vnodes: Some((
344                                        prev_epoch,
345                                        prev_epoch_finished_vnodes,
346                                    )),
347                                };
348                                continue;
349                            }
350                            Some(chunk) => {
351                                return Poll::Ready(Some(Ok(chunk)));
352                            }
353                        };
354                    }
355                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
356                        future,
357                        prev_epoch_finished_vnodes,
358                    } => {
359                        let epoch = ready!(future.as_mut().poll(cx))?;
360                        let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
361                        let mut pre_finished_vnodes = HashMap::new();
362                        let mut vnode_progresses = HashMap::new();
363                        for prev_epoch_vnode in prev_epoch_finished_vnodes
364                            .as_ref()
365                            .map(|(_, vnodes)| vnodes.keys())
366                            .into_iter()
367                            .flatten()
368                        {
369                            vnode_progresses
370                                .try_insert(*prev_epoch_vnode, (None, 0))
371                                .expect("non-duplicate");
372                        }
373                        if let Some((pending_epoch, _)) =
374                            self.pending_epoch_vnode_progress.first_key_value()
375                        {
376                            // TODO: may return error instead to avoid panic
377                            assert!(
378                                epoch <= *pending_epoch,
379                                "pending_epoch {} earlier than next epoch {}",
380                                pending_epoch,
381                                epoch
382                            );
383                            if epoch == *pending_epoch {
384                                let (_, progress) = self
385                                    .pending_epoch_vnode_progress
386                                    .pop_first()
387                                    .expect("checked Some");
388                                for (vnode, (progress, row_count)) in progress {
389                                    match progress {
390                                        EpochBackfillProgress::Consuming { latest_pk } => {
391                                            vnode_progresses
392                                                .try_insert(vnode, (Some(latest_pk), row_count))
393                                                .expect("non-duplicate");
394                                        }
395                                        EpochBackfillProgress::Consumed => {
396                                            pre_finished_vnodes
397                                                .try_insert(vnode, row_count)
398                                                .expect("non-duplicate");
399                                        }
400                                    }
401                                }
402                            }
403                        }
404                        self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
405                            future: create_upstream_table_change_log_stream(
406                                self.upstream_table,
407                                epoch,
408                                self.rate_limit,
409                                self.chunk_size,
410                                vnode_progresses,
411                            ),
412                            prev_epoch_finished_vnodes,
413                            epoch,
414                            pre_finished_vnodes,
415                        };
416                        continue;
417                    }
418                    ConsumeUpstreamStreamState::StoppedOnRetentionMiss => {
419                        return Poll::Pending;
420                    }
421                    ConsumeUpstreamStreamState::Err => {
422                        unreachable!("should not be accessed on Err")
423                    }
424                }
425            }
426        };
427        match result {
428            Err(error) if is_retention_or_snapshot_expired_error(&error) => {
429                log_retention_or_snapshot_expired(&error);
430                self.state = ConsumeUpstreamStreamState::StoppedOnRetentionMiss;
431                Poll::Pending
432            }
433            Err(error) => {
434                self.state = ConsumeUpstreamStreamState::Err;
435                Poll::Ready(Some(Err(error)))
436            }
437            Ok(unreachable) => match unreachable {},
438        }
439    }
440}
441
442impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
443    pub(super) fn new<'p>(
444        initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
445        upstream_table: &'a T,
446        snapshot_epoch: u64,
447        chunk_size: usize,
448        rate_limit: RateLimit,
449        snapshot_rebuild_interval: Duration,
450    ) -> Self {
451        let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
452        let mut finished_snapshot_epoch_vnodes = HashMap::new();
453        let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
454        for (vnode, progress) in initial_progress {
455            match progress {
456                None => {
457                    ongoing_snapshot_epoch_vnodes
458                        .try_insert(vnode, (None, 0))
459                        .expect("non-duplicate");
460                }
461                Some(progress) => {
462                    let epoch = progress.epoch;
463                    let row_count = progress.row_count;
464                    if epoch == snapshot_epoch {
465                        match &progress.progress {
466                            EpochBackfillProgress::Consumed => {
467                                finished_snapshot_epoch_vnodes
468                                    .try_insert(vnode, row_count)
469                                    .expect("non-duplicate");
470                            }
471                            EpochBackfillProgress::Consuming { latest_pk } => {
472                                ongoing_snapshot_epoch_vnodes
473                                    .try_insert(vnode, (Some(latest_pk.clone()), row_count))
474                                    .expect("non-duplicate");
475                            }
476                        }
477                    } else {
478                        assert!(
479                            epoch > snapshot_epoch,
480                            "epoch {} earlier than snapshot_epoch {} on vnode {}",
481                            epoch,
482                            snapshot_epoch,
483                            vnode
484                        );
485                        pending_epoch_vnode_progress
486                            .entry(epoch)
487                            .or_default()
488                            .try_insert(vnode, (progress.progress.clone(), progress.row_count))
489                            .expect("non-duplicate");
490                    }
491                }
492            };
493        }
494        let (pending_epoch_vnode_progress, state) = {
495            if !ongoing_snapshot_epoch_vnodes.is_empty() {
496                // some vnode has not finished the snapshot epoch
497                (
498                    pending_epoch_vnode_progress,
499                    ConsumeUpstreamStreamState::CreatingSnapshotStream {
500                        future: create_upstream_table_snapshot_stream(
501                            upstream_table,
502                            snapshot_epoch,
503                            rate_limit,
504                            chunk_size,
505                            snapshot_rebuild_interval,
506                            ongoing_snapshot_epoch_vnodes,
507                        ),
508                        snapshot_epoch,
509                        pre_finished_vnodes: finished_snapshot_epoch_vnodes,
510                    },
511                )
512            } else if !finished_snapshot_epoch_vnodes.is_empty() {
513                // all vnodes have finished the snapshot epoch, but some vnodes has not yet start the first log epoch
514                (
515                    pending_epoch_vnode_progress,
516                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
517                        future: next_epoch_future(upstream_table, snapshot_epoch),
518                        prev_epoch_finished_vnodes: Some((
519                            snapshot_epoch,
520                            finished_snapshot_epoch_vnodes,
521                        )),
522                    },
523                )
524            } else {
525                // all vnodes are in log epoch
526                let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
527                    .pop_first()
528                    .expect("non-empty vnodes");
529                let mut ongoing_vnodes = HashMap::new();
530                let mut finished_vnodes = HashMap::new();
531                for (vnode, (progress, row_count)) in first_vnodes {
532                    match progress {
533                        EpochBackfillProgress::Consuming { latest_pk } => {
534                            ongoing_vnodes
535                                .try_insert(vnode, (Some(latest_pk), row_count))
536                                .expect("non-duplicate");
537                        }
538                        EpochBackfillProgress::Consumed => {
539                            finished_vnodes
540                                .try_insert(vnode, row_count)
541                                .expect("non-duplicate");
542                        }
543                    }
544                }
545                let state = if ongoing_vnodes.is_empty() {
546                    // all vnodes have finished the current epoch
547                    ConsumeUpstreamStreamState::ResolvingNextEpoch {
548                        future: next_epoch_future(upstream_table, first_epoch),
549                        prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
550                    }
551                } else {
552                    ConsumeUpstreamStreamState::CreatingChangeLogStream {
553                        future: create_upstream_table_change_log_stream(
554                            upstream_table,
555                            first_epoch,
556                            rate_limit,
557                            chunk_size,
558                            ongoing_vnodes,
559                        ),
560                        prev_epoch_finished_vnodes: None,
561                        epoch: first_epoch,
562                        pre_finished_vnodes: finished_vnodes,
563                    }
564                };
565                (pending_epoch_vnode_progress, state)
566            }
567        };
568        Self {
569            upstream_table,
570            pending_epoch_vnode_progress,
571            state,
572            chunk_size,
573            rate_limit,
574        }
575    }
576}
577
578#[cfg(test)]
579mod tests {
580    use risingwave_common::catalog::TableId;
581    use risingwave_storage::hummock::HummockError;
582
583    use super::*;
584
585    fn stream_error_from_hummock(error: HummockError) -> StreamExecutorError {
586        let storage_error: risingwave_storage::error::StorageError = error.into();
587        storage_error.into()
588    }
589
590    #[test]
591    fn test_change_log_retention_miss_error() {
592        let retention_miss = stream_error_from_hummock(HummockError::change_log_retention_miss(
593            TableId::new(233),
594            10678350547714048,
595        ));
596        assert!(is_retention_or_snapshot_expired_error(&retention_miss));
597
598        let missing_time_travel_version = stream_error_from_hummock(
599            HummockError::time_travel_version_expired(TableId::new(233), 10678350547714048),
600        );
601        assert!(is_retention_or_snapshot_expired_error(
602            &missing_time_travel_version
603        ));
604
605        let committed_epoch_mismatch =
606            stream_error_from_hummock(HummockError::committed_epoch_mismatch(
607                TableId::new(233),
608                10682740646084608,
609                10678350547714048,
610            ));
611        assert!(is_retention_or_snapshot_expired_error(
612            &committed_epoch_mismatch
613        ));
614
615        let dropped_table =
616            stream_error_from_hummock(HummockError::next_epoch("table 233 has been dropped"));
617        assert!(!is_retention_or_snapshot_expired_error(&dropped_table));
618
619        let other_error = StreamExecutorError::from(anyhow::anyhow!("other error"));
620        assert!(!is_retention_or_snapshot_expired_error(&other_error));
621    }
622}