1mod upstream_table_ext {
16 use std::collections::HashMap;
17 use std::time::Duration;
18
19 use futures::future::{BoxFuture, try_join_all};
20 use futures::{TryFutureExt, TryStreamExt};
21 use risingwave_common::hash::VirtualNode;
22 use risingwave_common::row::OwnedRow;
23 use risingwave_common_rate_limit::RateLimit;
24 use risingwave_storage::table::ChangeLogRow;
25
26 use crate::executor::StreamExecutorResult;
27 use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
28 use crate::executor::backfill::snapshot_backfill::vnode_stream::{
29 ChangeLogRowStream, VnodeStream,
30 };
31 use crate::executor::backfill::utils::create_builder;
32
33 pub(super) type UpstreamTableSnapshotStream<T: UpstreamTable> =
34 VnodeStream<impl ChangeLogRowStream>;
35 pub(super) type UpstreamTableSnapshotStreamFuture<'a, T> =
36 BoxFuture<'a, StreamExecutorResult<UpstreamTableSnapshotStream<T>>>;
37
38 #[define_opaque(UpstreamTableSnapshotStream)]
39 pub(super) fn create_upstream_table_snapshot_stream<T: UpstreamTable>(
40 upstream_table: &T,
41 snapshot_epoch: u64,
42 rate_limit: RateLimit,
43 chunk_size: usize,
44 rebuild_interval: Duration,
45 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
46 ) -> UpstreamTableSnapshotStreamFuture<'_, T> {
47 Box::pin(async move {
48 let streams = try_join_all(vnode_progresses.into_iter().map(
49 |(vnode, (start_pk, row_count))| {
50 upstream_table
51 .snapshot_stream(vnode, snapshot_epoch, start_pk, rebuild_interval)
52 .map_ok(move |stream| {
53 (vnode, stream.map_ok(ChangeLogRow::Insert), row_count)
54 })
55 },
56 ))
57 .await?;
58 Ok(VnodeStream::new(
59 streams,
60 upstream_table.pk_in_output_indices(),
61 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
62 ))
63 })
64 }
65
66 pub(super) type UpstreamTableChangeLogStream<T: UpstreamTable> =
67 VnodeStream<impl ChangeLogRowStream>;
68 pub(super) type UpstreamTableChangeLogStreamFuture<'a, T> =
69 BoxFuture<'a, StreamExecutorResult<UpstreamTableChangeLogStream<T>>>;
70
71 #[define_opaque(UpstreamTableChangeLogStreamFuture)]
72 pub(super) fn create_upstream_table_change_log_stream<T: UpstreamTable>(
73 upstream_table: &T,
74 epoch: u64,
75 rate_limit: RateLimit,
76 chunk_size: usize,
77 vnode_progresses: HashMap<VirtualNode, (Option<OwnedRow>, usize)>,
78 ) -> UpstreamTableChangeLogStreamFuture<'_, T> {
79 Box::pin(async move {
80 let streams = try_join_all(vnode_progresses.into_iter().map(
81 |(vnode, (start_pk, row_count))| {
82 upstream_table
83 .change_log_stream(vnode, epoch, start_pk)
84 .map_ok(move |stream| (vnode, stream, row_count))
85 },
86 ))
87 .await?;
88 Ok(VnodeStream::new(
89 streams,
90 upstream_table.pk_in_output_indices(),
91 create_builder(rate_limit, chunk_size, upstream_table.output_data_types()),
92 ))
93 })
94 }
95
96 pub(super) type NextEpochFuture<'a> = BoxFuture<'a, StreamExecutorResult<u64>>;
97 pub(super) fn next_epoch_future<T: UpstreamTable>(
98 upstream_table: &T,
99 epoch: u64,
100 ) -> NextEpochFuture<'_> {
101 Box::pin(async move { upstream_table.next_epoch(epoch).await })
102 }
103}
104
105use std::collections::{BTreeMap, HashMap};
106use std::mem::take;
107use std::pin::Pin;
108use std::task::{Context, Poll, ready};
109use std::time::Duration;
110
111use risingwave_common::array::StreamChunk;
112use risingwave_common::hash::VirtualNode;
113use risingwave_common::row::OwnedRow;
114use risingwave_common_rate_limit::RateLimit;
115use risingwave_storage::error::ErrorKind as StorageErrorKind;
116use risingwave_storage::hummock::HummockErrorInner;
117use thiserror_ext::AsReport;
118use upstream_table_ext::*;
119
120use crate::executor::backfill::snapshot_backfill::consume_upstream::upstream_table_trait::UpstreamTable;
121use crate::executor::backfill::snapshot_backfill::state::{
122 EpochBackfillProgress, VnodeBackfillProgress,
123};
124use crate::executor::error::ErrorKind as StreamExecutorErrorKind;
125use crate::executor::prelude::{Stream, StreamExt};
126use crate::executor::{StreamExecutorError, StreamExecutorResult};
127
128enum ConsumeUpstreamStreamState<'a, T: UpstreamTable> {
129 CreatingSnapshotStream {
130 future: UpstreamTableSnapshotStreamFuture<'a, T>,
131 snapshot_epoch: u64,
132 pre_finished_vnodes: HashMap<VirtualNode, usize>,
133 },
134 ConsumingSnapshotStream {
135 stream: UpstreamTableSnapshotStream<T>,
136 snapshot_epoch: u64,
137 pre_finished_vnodes: HashMap<VirtualNode, usize>,
138 },
139 CreatingChangeLogStream {
140 future: UpstreamTableChangeLogStreamFuture<'a, T>,
141 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
142 epoch: u64,
143 pre_finished_vnodes: HashMap<VirtualNode, usize>,
144 },
145 ConsumingChangeLogStream {
146 stream: UpstreamTableChangeLogStream<T>,
147 epoch: u64,
148 pre_finished_vnodes: HashMap<VirtualNode, usize>,
149 },
150 ResolvingNextEpoch {
151 future: NextEpochFuture<'a>,
152 prev_epoch_finished_vnodes: Option<(u64, HashMap<VirtualNode, usize>)>,
153 },
154 StoppedOnRetentionMiss,
155 Err,
156}
157
158fn is_retention_or_snapshot_expired_error(error: &StreamExecutorError) -> bool {
159 let StreamExecutorErrorKind::Storage(storage_error) = error.inner() else {
160 return false;
161 };
162 let StorageErrorKind::Hummock(hummock_error) = storage_error.inner() else {
163 return false;
164 };
165
166 matches!(
167 hummock_error.inner(),
168 HummockErrorInner::ChangeLogRetentionMiss { .. }
169 | HummockErrorInner::TimeTravelVersionExpired { .. }
170 | HummockErrorInner::CommittedEpochMismatch { .. }
171 )
172}
173
174fn log_retention_or_snapshot_expired(error: &StreamExecutorError) {
175 tracing::warn!(
176 error = %error.as_report(),
177 "stop consuming cross-database upstream because upstream retention or snapshot availability was exceeded"
178 );
179}
180
181pub(super) struct ConsumeUpstreamStream<'a, T: UpstreamTable> {
182 upstream_table: &'a T,
183 pending_epoch_vnode_progress:
184 BTreeMap<u64, HashMap<VirtualNode, (EpochBackfillProgress, usize)>>,
185 state: ConsumeUpstreamStreamState<'a, T>,
186
187 chunk_size: usize,
188 rate_limit: RateLimit,
189}
190
191impl<T: UpstreamTable> ConsumeUpstreamStream<'_, T> {
192 pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
193 match &mut self.state {
194 ConsumeUpstreamStreamState::ConsumingSnapshotStream { stream, .. } => {
195 stream.consume_builder()
196 }
197 ConsumeUpstreamStreamState::ConsumingChangeLogStream { stream, .. } => {
198 stream.consume_builder()
199 }
200 ConsumeUpstreamStreamState::ResolvingNextEpoch { .. }
201 | ConsumeUpstreamStreamState::CreatingChangeLogStream { .. }
202 | ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => None,
203 ConsumeUpstreamStreamState::StoppedOnRetentionMiss => None,
204 ConsumeUpstreamStreamState::Err => {
205 unreachable!("should not be accessed on Err")
206 }
207 }
208 }
209
210 pub(super) async fn for_vnode_pk_progress(
211 &mut self,
212 mut on_vnode_progress: impl FnMut(VirtualNode, u64, usize, Option<OwnedRow>),
213 ) -> StreamExecutorResult<()> {
214 match &mut self.state {
215 ConsumeUpstreamStreamState::CreatingSnapshotStream { .. } => {
216 }
218 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
219 stream,
220 snapshot_epoch,
221 ..
222 } => {
223 stream
224 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
225 on_vnode_progress(vnode, *snapshot_epoch, row_count, pk_progress)
226 })
227 .await?;
228 }
229 &mut ConsumeUpstreamStreamState::ConsumingChangeLogStream {
230 ref mut stream,
231 ref epoch,
232 ..
233 } => {
234 stream
235 .for_vnode_pk_progress(|vnode, row_count, pk_progress| {
236 on_vnode_progress(vnode, *epoch, row_count, pk_progress)
237 })
238 .await?;
239 }
240 &mut ConsumeUpstreamStreamState::CreatingChangeLogStream {
241 ref prev_epoch_finished_vnodes,
242 ..
243 }
244 | &mut ConsumeUpstreamStreamState::ResolvingNextEpoch {
245 ref prev_epoch_finished_vnodes,
246 ..
247 } => {
248 if let Some((prev_epoch, prev_epoch_finished_vnodes)) = prev_epoch_finished_vnodes {
249 for (vnode, row_count) in prev_epoch_finished_vnodes {
250 on_vnode_progress(*vnode, *prev_epoch, *row_count, None);
251 }
252 }
253 }
254 ConsumeUpstreamStreamState::StoppedOnRetentionMiss => {}
255 ConsumeUpstreamStreamState::Err => {
256 unreachable!("should not be accessed on Err")
257 }
258 }
259 Ok(())
260 }
261}
262
263impl<T: UpstreamTable> Stream for ConsumeUpstreamStream<'_, T> {
264 type Item = StreamExecutorResult<StreamChunk>;
265
266 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
267 let result: Result<!, StreamExecutorError> = try {
268 loop {
269 match &mut self.state {
270 ConsumeUpstreamStreamState::CreatingSnapshotStream {
271 future,
272 snapshot_epoch,
273 pre_finished_vnodes,
274 } => {
275 let stream = ready!(future.as_mut().poll(cx))?;
276 let snapshot_epoch = *snapshot_epoch;
277 let pre_finished_vnodes = take(pre_finished_vnodes);
278 self.state = ConsumeUpstreamStreamState::ConsumingSnapshotStream {
279 stream,
280 snapshot_epoch,
281 pre_finished_vnodes,
282 };
283 continue;
284 }
285 ConsumeUpstreamStreamState::ConsumingSnapshotStream {
286 stream,
287 snapshot_epoch,
288 pre_finished_vnodes,
289 } => match ready!(stream.poll_next_unpin(cx)).transpose()? {
290 None => {
291 let prev_epoch = *snapshot_epoch;
292 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
293 for (vnode, row_count) in stream.take_finished_vnodes() {
294 prev_epoch_finished_vnodes
295 .try_insert(vnode, row_count)
296 .expect("non-duplicate");
297 }
298 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
299 future: next_epoch_future(self.upstream_table, prev_epoch),
300 prev_epoch_finished_vnodes: Some((
301 prev_epoch,
302 prev_epoch_finished_vnodes,
303 )),
304 };
305 continue;
306 }
307 Some(chunk) => {
308 return Poll::Ready(Some(Ok(chunk)));
309 }
310 },
311 ConsumeUpstreamStreamState::CreatingChangeLogStream {
312 future,
313 epoch,
314 pre_finished_vnodes,
315 ..
316 } => {
317 let stream = ready!(future.as_mut().poll(cx))?;
318 let epoch = *epoch;
319 let pre_finished_vnodes = take(pre_finished_vnodes);
320 self.state = ConsumeUpstreamStreamState::ConsumingChangeLogStream {
321 stream,
322 epoch,
323 pre_finished_vnodes,
324 };
325 continue;
326 }
327 ConsumeUpstreamStreamState::ConsumingChangeLogStream {
328 stream,
329 epoch,
330 pre_finished_vnodes,
331 } => {
332 match ready!(stream.poll_next_unpin(cx)).transpose()? {
333 None => {
334 let prev_epoch = *epoch;
335 let mut prev_epoch_finished_vnodes = take(pre_finished_vnodes);
336 for (vnode, row_count) in stream.take_finished_vnodes() {
337 prev_epoch_finished_vnodes
338 .try_insert(vnode, row_count)
339 .expect("non-duplicate");
340 }
341 self.state = ConsumeUpstreamStreamState::ResolvingNextEpoch {
342 future: next_epoch_future(self.upstream_table, prev_epoch),
343 prev_epoch_finished_vnodes: Some((
344 prev_epoch,
345 prev_epoch_finished_vnodes,
346 )),
347 };
348 continue;
349 }
350 Some(chunk) => {
351 return Poll::Ready(Some(Ok(chunk)));
352 }
353 };
354 }
355 ConsumeUpstreamStreamState::ResolvingNextEpoch {
356 future,
357 prev_epoch_finished_vnodes,
358 } => {
359 let epoch = ready!(future.as_mut().poll(cx))?;
360 let prev_epoch_finished_vnodes = take(prev_epoch_finished_vnodes);
361 let mut pre_finished_vnodes = HashMap::new();
362 let mut vnode_progresses = HashMap::new();
363 for prev_epoch_vnode in prev_epoch_finished_vnodes
364 .as_ref()
365 .map(|(_, vnodes)| vnodes.keys())
366 .into_iter()
367 .flatten()
368 {
369 vnode_progresses
370 .try_insert(*prev_epoch_vnode, (None, 0))
371 .expect("non-duplicate");
372 }
373 if let Some((pending_epoch, _)) =
374 self.pending_epoch_vnode_progress.first_key_value()
375 {
376 assert!(
378 epoch <= *pending_epoch,
379 "pending_epoch {} earlier than next epoch {}",
380 pending_epoch,
381 epoch
382 );
383 if epoch == *pending_epoch {
384 let (_, progress) = self
385 .pending_epoch_vnode_progress
386 .pop_first()
387 .expect("checked Some");
388 for (vnode, (progress, row_count)) in progress {
389 match progress {
390 EpochBackfillProgress::Consuming { latest_pk } => {
391 vnode_progresses
392 .try_insert(vnode, (Some(latest_pk), row_count))
393 .expect("non-duplicate");
394 }
395 EpochBackfillProgress::Consumed => {
396 pre_finished_vnodes
397 .try_insert(vnode, row_count)
398 .expect("non-duplicate");
399 }
400 }
401 }
402 }
403 }
404 self.state = ConsumeUpstreamStreamState::CreatingChangeLogStream {
405 future: create_upstream_table_change_log_stream(
406 self.upstream_table,
407 epoch,
408 self.rate_limit,
409 self.chunk_size,
410 vnode_progresses,
411 ),
412 prev_epoch_finished_vnodes,
413 epoch,
414 pre_finished_vnodes,
415 };
416 continue;
417 }
418 ConsumeUpstreamStreamState::StoppedOnRetentionMiss => {
419 return Poll::Pending;
420 }
421 ConsumeUpstreamStreamState::Err => {
422 unreachable!("should not be accessed on Err")
423 }
424 }
425 }
426 };
427 match result {
428 Err(error) if is_retention_or_snapshot_expired_error(&error) => {
429 log_retention_or_snapshot_expired(&error);
430 self.state = ConsumeUpstreamStreamState::StoppedOnRetentionMiss;
431 Poll::Pending
432 }
433 Err(error) => {
434 self.state = ConsumeUpstreamStreamState::Err;
435 Poll::Ready(Some(Err(error)))
436 }
437 Ok(unreachable) => match unreachable {},
438 }
439 }
440}
441
442impl<'a, T: UpstreamTable> ConsumeUpstreamStream<'a, T> {
443 pub(super) fn new<'p>(
444 initial_progress: impl Iterator<Item = (VirtualNode, Option<&'p VnodeBackfillProgress>)>,
445 upstream_table: &'a T,
446 snapshot_epoch: u64,
447 chunk_size: usize,
448 rate_limit: RateLimit,
449 snapshot_rebuild_interval: Duration,
450 ) -> Self {
451 let mut ongoing_snapshot_epoch_vnodes = HashMap::new();
452 let mut finished_snapshot_epoch_vnodes = HashMap::new();
453 let mut pending_epoch_vnode_progress: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
454 for (vnode, progress) in initial_progress {
455 match progress {
456 None => {
457 ongoing_snapshot_epoch_vnodes
458 .try_insert(vnode, (None, 0))
459 .expect("non-duplicate");
460 }
461 Some(progress) => {
462 let epoch = progress.epoch;
463 let row_count = progress.row_count;
464 if epoch == snapshot_epoch {
465 match &progress.progress {
466 EpochBackfillProgress::Consumed => {
467 finished_snapshot_epoch_vnodes
468 .try_insert(vnode, row_count)
469 .expect("non-duplicate");
470 }
471 EpochBackfillProgress::Consuming { latest_pk } => {
472 ongoing_snapshot_epoch_vnodes
473 .try_insert(vnode, (Some(latest_pk.clone()), row_count))
474 .expect("non-duplicate");
475 }
476 }
477 } else {
478 assert!(
479 epoch > snapshot_epoch,
480 "epoch {} earlier than snapshot_epoch {} on vnode {}",
481 epoch,
482 snapshot_epoch,
483 vnode
484 );
485 pending_epoch_vnode_progress
486 .entry(epoch)
487 .or_default()
488 .try_insert(vnode, (progress.progress.clone(), progress.row_count))
489 .expect("non-duplicate");
490 }
491 }
492 };
493 }
494 let (pending_epoch_vnode_progress, state) = {
495 if !ongoing_snapshot_epoch_vnodes.is_empty() {
496 (
498 pending_epoch_vnode_progress,
499 ConsumeUpstreamStreamState::CreatingSnapshotStream {
500 future: create_upstream_table_snapshot_stream(
501 upstream_table,
502 snapshot_epoch,
503 rate_limit,
504 chunk_size,
505 snapshot_rebuild_interval,
506 ongoing_snapshot_epoch_vnodes,
507 ),
508 snapshot_epoch,
509 pre_finished_vnodes: finished_snapshot_epoch_vnodes,
510 },
511 )
512 } else if !finished_snapshot_epoch_vnodes.is_empty() {
513 (
515 pending_epoch_vnode_progress,
516 ConsumeUpstreamStreamState::ResolvingNextEpoch {
517 future: next_epoch_future(upstream_table, snapshot_epoch),
518 prev_epoch_finished_vnodes: Some((
519 snapshot_epoch,
520 finished_snapshot_epoch_vnodes,
521 )),
522 },
523 )
524 } else {
525 let (first_epoch, first_vnodes) = pending_epoch_vnode_progress
527 .pop_first()
528 .expect("non-empty vnodes");
529 let mut ongoing_vnodes = HashMap::new();
530 let mut finished_vnodes = HashMap::new();
531 for (vnode, (progress, row_count)) in first_vnodes {
532 match progress {
533 EpochBackfillProgress::Consuming { latest_pk } => {
534 ongoing_vnodes
535 .try_insert(vnode, (Some(latest_pk), row_count))
536 .expect("non-duplicate");
537 }
538 EpochBackfillProgress::Consumed => {
539 finished_vnodes
540 .try_insert(vnode, row_count)
541 .expect("non-duplicate");
542 }
543 }
544 }
545 let state = if ongoing_vnodes.is_empty() {
546 ConsumeUpstreamStreamState::ResolvingNextEpoch {
548 future: next_epoch_future(upstream_table, first_epoch),
549 prev_epoch_finished_vnodes: Some((first_epoch, finished_vnodes)),
550 }
551 } else {
552 ConsumeUpstreamStreamState::CreatingChangeLogStream {
553 future: create_upstream_table_change_log_stream(
554 upstream_table,
555 first_epoch,
556 rate_limit,
557 chunk_size,
558 ongoing_vnodes,
559 ),
560 prev_epoch_finished_vnodes: None,
561 epoch: first_epoch,
562 pre_finished_vnodes: finished_vnodes,
563 }
564 };
565 (pending_epoch_vnode_progress, state)
566 }
567 };
568 Self {
569 upstream_table,
570 pending_epoch_vnode_progress,
571 state,
572 chunk_size,
573 rate_limit,
574 }
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use risingwave_common::catalog::TableId;
581 use risingwave_storage::hummock::HummockError;
582
583 use super::*;
584
585 fn stream_error_from_hummock(error: HummockError) -> StreamExecutorError {
586 let storage_error: risingwave_storage::error::StorageError = error.into();
587 storage_error.into()
588 }
589
590 #[test]
591 fn test_change_log_retention_miss_error() {
592 let retention_miss = stream_error_from_hummock(HummockError::change_log_retention_miss(
593 TableId::new(233),
594 10678350547714048,
595 ));
596 assert!(is_retention_or_snapshot_expired_error(&retention_miss));
597
598 let missing_time_travel_version = stream_error_from_hummock(
599 HummockError::time_travel_version_expired(TableId::new(233), 10678350547714048),
600 );
601 assert!(is_retention_or_snapshot_expired_error(
602 &missing_time_travel_version
603 ));
604
605 let committed_epoch_mismatch =
606 stream_error_from_hummock(HummockError::committed_epoch_mismatch(
607 TableId::new(233),
608 10682740646084608,
609 10678350547714048,
610 ));
611 assert!(is_retention_or_snapshot_expired_error(
612 &committed_epoch_mismatch
613 ));
614
615 let dropped_table =
616 stream_error_from_hummock(HummockError::next_epoch("table 233 has been dropped"));
617 assert!(!is_retention_or_snapshot_expired_error(&dropped_table));
618
619 let other_error = StreamExecutorError::from(anyhow::anyhow!("other error"));
620 assert!(!is_retention_or_snapshot_expired_error(&other_error));
621 }
622}