1use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{
27 BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
28};
29use risingwave_connector::source::cdc::CdcScanOptions;
30use risingwave_connector::source::cdc::external::{
31 CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
32};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use risingwave_pb::common::ThrottleType;
35use rw_futures_util::pausable;
36use thiserror_ext::AsReport;
37use tracing::Instrument;
38
39use crate::executor::UpdateMutation;
40use crate::executor::backfill::cdc::cdc_backfill::{
41 build_reader_and_poll_upstream, transform_upstream,
42};
43use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
44use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
45use crate::executor::backfill::cdc::upstream_table::snapshot::{
46 SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
47};
48use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
49use crate::executor::prelude::*;
50use crate::executor::source::get_infinite_backoff_strategy;
51use crate::task::cdc_progress::CdcProgressReporter;
52pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
53 actor_ctx: ActorContextRef,
54
55 external_table: ExternalStorageTable,
57
58 upstream: Executor,
60
61 output_indices: Vec<usize>,
63
64 output_columns: Vec<ColumnDesc>,
66
67 rate_limit_rps: Option<u32>,
69
70 options: CdcScanOptions,
71
72 state_table: StateTable<S>,
73
74 properties: BTreeMap<String, String>,
75
76 progress: Option<CdcProgressReporter>,
77}
78
79impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
80 #[allow(clippy::too_many_arguments)]
81 pub fn new(
82 actor_ctx: ActorContextRef,
83 external_table: ExternalStorageTable,
84 upstream: Executor,
85 output_indices: Vec<usize>,
86 output_columns: Vec<ColumnDesc>,
87 _metrics: Arc<StreamingMetrics>,
88 state_table: StateTable<S>,
89 rate_limit_rps: Option<u32>,
90 options: CdcScanOptions,
91 properties: BTreeMap<String, String>,
92 progress: Option<CdcProgressReporter>,
93 ) -> Self {
94 Self {
95 actor_ctx,
96 external_table,
97 upstream,
98 output_indices,
99 output_columns,
100 rate_limit_rps,
101 options,
102 state_table,
103 properties,
104 progress,
105 }
106 }
107
108 #[try_stream(ok = Message, error = StreamExecutorError)]
109 async fn execute_inner(mut self) {
110 assert!(!self.options.disable_backfill);
111 let pk_indices = self.external_table.pk_indices().to_vec();
113 let table_id = self.external_table.table_id();
114 let upstream_table_name = self.external_table.qualified_table_name();
115 let schema_table_name = self.external_table.schema_table_name().clone();
116 let external_database_name = self.external_table.database_name().to_owned();
117 let additional_columns = self
118 .output_columns
119 .iter()
120 .filter(|col| col.additional_column.column_type.is_some())
121 .cloned()
122 .collect_vec();
123 assert!(
124 (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
125 "split pk column index {} out of bound",
126 self.options.backfill_split_pk_column_index
127 );
128 let snapshot_split_column_index =
129 pk_indices[self.options.backfill_split_pk_column_index as usize];
130 let cdc_table_snapshot_split_column =
131 vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
132
133 let mut upstream = self.upstream.execute();
134 let first_barrier = expect_first_barrier(&mut upstream).await?;
136 let timestamp_handling: Option<TimestampHandling> = self
142 .properties
143 .get("debezium.time.precision.mode")
144 .map(|v| v == "connect")
145 .unwrap_or(false)
146 .then_some(TimestampHandling::Milli);
147 let timestamptz_handling: Option<TimestamptzHandling> = self
148 .properties
149 .get("debezium.time.precision.mode")
150 .map(|v| v == "connect")
151 .unwrap_or(false)
152 .then_some(TimestamptzHandling::Milli);
153 let time_handling: Option<TimeHandling> = self
154 .properties
155 .get("debezium.time.precision.mode")
156 .map(|v| v == "connect")
157 .unwrap_or(false)
158 .then_some(TimeHandling::Milli);
159 let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
160 .properties
161 .get("debezium.bigint.unsigned.handling.mode")
162 .map(|v| v == "precise")
163 .unwrap_or(false)
164 .then_some(BigintUnsignedHandlingMode::Precise);
165 let handle_toast_columns: bool =
167 self.external_table.table_type() == &ExternalCdcTableType::Postgres;
168 let mut upstream = transform_upstream(
169 upstream,
170 self.output_columns.clone(),
171 timestamp_handling,
172 timestamptz_handling,
173 time_handling,
174 bigint_unsigned_handling,
175 handle_toast_columns,
176 )
177 .boxed();
178 let mut next_reset_barrier = Some(first_barrier);
179 let mut is_reset = false;
180 let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
181 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
183
184 'with_cdc_table_snapshot_splits: loop {
186 assert!(upstream_chunk_buffer.is_empty());
187 let reset_barrier = next_reset_barrier.take().unwrap();
188 let all_snapshot_splits = match reset_barrier.mutation.as_deref() {
189 Some(Mutation::Add(add)) => &add.actor_cdc_table_snapshot_splits.splits,
190
191 Some(Mutation::Update(update)) => &update.actor_cdc_table_snapshot_splits.splits,
192 _ => {
193 return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
194 }
195 };
196 let mut actor_snapshot_splits = vec![];
197 let mut generation = None;
198 if let Some((splits, snapshot_generation)) = all_snapshot_splits.get(&self.actor_ctx.id)
200 {
201 actor_snapshot_splits = splits
202 .iter()
203 .map(|s: &CdcTableSnapshotSplitRaw| {
204 let de = RowDeserializer::new(
205 cdc_table_snapshot_split_column
206 .iter()
207 .map(Field::data_type)
208 .collect_vec(),
209 );
210 let left_bound_inclusive =
211 de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
212 let right_bound_exclusive =
213 de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
214 CdcTableSnapshotSplit {
215 split_id: s.split_id,
216 left_bound_inclusive,
217 right_bound_exclusive,
218 }
219 })
220 .collect();
221 generation = Some(*snapshot_generation);
222 }
223 tracing::debug!(?actor_snapshot_splits, ?generation, "actor splits");
224 assert_consecutive_splits(&actor_snapshot_splits);
225
226 let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
227 let barrier_epoch = reset_barrier.epoch;
228 yield Message::Barrier(reset_barrier);
229 if !is_reset {
230 state_impl.init_epoch(barrier_epoch).await?;
231 is_reset = true;
232 tracing::info!(%table_id, "Initialize executor.");
233 } else {
234 tracing::info!(%table_id, "Reset executor.");
235 }
236
237 let mut current_actor_bounds = None;
238 let mut actor_cdc_offset_high: Option<CdcOffset> = None;
239 let mut actor_cdc_offset_low: Option<CdcOffset> = None;
240 let mut next_split_idx = actor_snapshot_splits.len();
242 for (idx, split) in actor_snapshot_splits.iter().enumerate() {
243 let state = state_impl.restore_state(split.split_id).await?;
244 if !state.is_finished {
245 next_split_idx = idx;
246 break;
247 }
248 extends_current_actor_bound(&mut current_actor_bounds, split);
249 if let Some(ref cdc_offset) = state.cdc_offset_low {
250 if let Some(ref cur) = actor_cdc_offset_low {
251 if *cur > *cdc_offset {
252 actor_cdc_offset_low = state.cdc_offset_low.clone();
253 }
254 } else {
255 actor_cdc_offset_low = state.cdc_offset_low.clone();
256 }
257 }
258 if let Some(ref cdc_offset) = state.cdc_offset_high {
259 if let Some(ref cur) = actor_cdc_offset_high {
260 if *cur < *cdc_offset {
261 actor_cdc_offset_high = state.cdc_offset_high.clone();
262 }
263 } else {
264 actor_cdc_offset_high = state.cdc_offset_high.clone();
265 }
266 }
267 }
268 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
269 state_impl
271 .mutate_state(split.split_id, false, 0, None, None)
272 .await?;
273 }
274 let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
275 Some((
276 actor_snapshot_splits[0].split_id,
277 actor_snapshot_splits[next_split_idx - 1].split_id,
278 ))
279 } else {
280 None
281 };
282
283 let mut table_reader: Option<ExternalTableReaderImpl> = None;
286 let external_table = self.external_table.clone();
287 let mut future = Box::pin(async move {
288 let backoff = get_infinite_backoff_strategy();
289 tokio_retry::Retry::spawn(backoff, || async {
290 match external_table.create_table_reader().await {
291 Ok(reader) => Ok(reader),
292 Err(e) => {
293 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
294 Err(e)
295 }
296 }
297 })
298 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
299 .await
300 .expect("Retry create cdc table reader until success.")
301 });
302 loop {
303 if let Some(msg) =
304 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
305 .await?
306 {
307 if let Some(msg) = mapping_message(msg, &self.output_indices) {
308 match msg {
309 Message::Barrier(barrier) => {
310 state_impl.commit_state(barrier.epoch).await?;
311 if is_reset_barrier(&barrier, self.actor_ctx.id) {
312 next_reset_barrier = Some(barrier);
313 continue 'with_cdc_table_snapshot_splits;
314 }
315 yield Message::Barrier(barrier);
316 }
317 Message::Chunk(chunk) => {
318 if chunk.cardinality() == 0 {
319 continue;
320 }
321 if let Some(filtered_chunk) = filter_stream_chunk(
322 chunk,
323 ¤t_actor_bounds,
324 snapshot_split_column_index,
325 ) && filtered_chunk.cardinality() > 0
326 {
327 yield Message::Chunk(filtered_chunk);
328 }
329 }
330 Message::Watermark(_) => {
331 }
333 }
334 }
335 } else {
336 assert!(table_reader.is_some(), "table reader must created");
337 tracing::info!(
338 %table_id,
339 upstream_table_name,
340 "table reader created successfully"
341 );
342 break;
343 }
344 }
345 let upstream_table_reader = UpstreamTableReader::new(
346 self.external_table.clone(),
347 table_reader.expect("table reader must created"),
348 );
349 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
351
352 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
354 tracing::info!(
355 %table_id,
356 upstream_table_name,
357 ?split,
358 is_snapshot_paused,
359 "start cdc backfill split"
360 );
361 extends_current_actor_bound(&mut current_actor_bounds, split);
362
363 let split_cdc_offset_low = {
364 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
366 tokio::sync::Semaphore::const_new(10);
367
368 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
369 upstream_table_reader.current_cdc_offset().await?
370 };
371 if let Some(ref cdc_offset) = split_cdc_offset_low {
372 if let Some(ref cur) = actor_cdc_offset_low {
373 if *cur > *cdc_offset {
374 actor_cdc_offset_low = split_cdc_offset_low.clone();
375 }
376 } else {
377 actor_cdc_offset_low = split_cdc_offset_low.clone();
378 }
379 }
380 let mut split_cdc_offset_high = None;
381
382 let left_upstream = upstream.by_ref().map(Either::Left);
383 let read_args = SplitSnapshotReadArgs::new(
384 split.left_bound_inclusive.clone(),
385 split.right_bound_exclusive.clone(),
386 cdc_table_snapshot_split_column.clone(),
387 self.rate_limit_rps,
388 additional_columns.clone(),
389 schema_table_name.clone(),
390 external_database_name.clone(),
391 );
392 let right_snapshot = pin!(
393 upstream_table_reader
394 .snapshot_read_table_split(read_args)
395 .map(Either::Right)
396 );
397 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
398 if is_snapshot_paused {
399 snapshot_valve.pause();
400 }
401 let mut backfill_stream =
402 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
403 stream::PollNext::Left
404 });
405 let mut row_count: u64 = 0;
406 #[for_await]
407 for either in &mut backfill_stream {
408 match either {
409 Either::Left(msg) => {
411 match msg? {
412 Message::Barrier(barrier) => {
413 state_impl.commit_state(barrier.epoch).await?;
414 if let Some(mutation) = barrier.mutation.as_deref() {
415 use crate::executor::Mutation;
416 match mutation {
417 Mutation::Pause => {
418 is_snapshot_paused = true;
419 snapshot_valve.pause();
420 }
421 Mutation::Resume => {
422 is_snapshot_paused = false;
423 snapshot_valve.resume();
424 }
425 Mutation::Throttle(some) => {
426 if let Some(entry) =
430 some.get(&self.actor_ctx.fragment_id)
431 && entry.throttle_type()
432 == ThrottleType::Backfill
433 && entry.rate_limit != self.rate_limit_rps
434 {
435 self.rate_limit_rps = entry.rate_limit;
437 }
438 }
439 Mutation::Update(UpdateMutation {
440 dropped_actors,
441 ..
442 }) => {
443 if dropped_actors.contains(&self.actor_ctx.id) {
444 tracing::info!(
445 %table_id,
446 upstream_table_name,
447 "CdcBackfill has been dropped due to config change"
448 );
449 for chunk in upstream_chunk_buffer.drain(..) {
450 yield Message::Chunk(chunk);
451 }
452 yield Message::Barrier(barrier);
453 let () = futures::future::pending().await;
454 unreachable!();
455 }
456 }
457 _ => (),
458 }
459 }
460 if is_reset_barrier(&barrier, self.actor_ctx.id) {
461 next_reset_barrier = Some(barrier);
462 for chunk in upstream_chunk_buffer.drain(..) {
463 yield Message::Chunk(chunk);
464 }
465 continue 'with_cdc_table_snapshot_splits;
466 }
467 if let Some(split_range) =
468 should_report_actor_backfill_progress.take()
469 && let Some(ref progress) = self.progress
470 {
471 progress.update(
472 self.actor_ctx.fragment_id,
473 self.actor_ctx.id,
474 barrier.epoch,
475 generation.expect("should have set generation when having progress to report"),
476 split_range,
477 );
478 }
479 yield Message::Barrier(barrier);
481 }
482 Message::Chunk(chunk) => {
483 if chunk.cardinality() == 0 {
485 continue;
486 }
487
488 let chunk = mapping_chunk(chunk, &self.output_indices);
501 if let Some(filtered_chunk) = filter_stream_chunk(
502 chunk,
503 ¤t_actor_bounds,
504 snapshot_split_column_index,
505 ) && filtered_chunk.cardinality() > 0
506 {
507 upstream_chunk_buffer.push(filtered_chunk.compact_vis());
509 }
510 }
511 Message::Watermark(_) => {
512 }
514 }
515 }
516 Either::Right(msg) => {
518 match msg? {
519 None => {
520 tracing::info!(
521 %table_id,
522 split_id = split.split_id,
523 "snapshot read stream ends"
524 );
525 for chunk in upstream_chunk_buffer.drain(..) {
526 yield Message::Chunk(chunk);
527 }
528
529 split_cdc_offset_high = {
530 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
532 tokio::sync::Semaphore::const_new(10);
533
534 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
535 upstream_table_reader.current_cdc_offset().await?
536 };
537 if let Some(ref cdc_offset) = split_cdc_offset_high {
538 if let Some(ref cur) = actor_cdc_offset_high {
539 if *cur < *cdc_offset {
540 actor_cdc_offset_high =
541 split_cdc_offset_high.clone();
542 }
543 } else {
544 actor_cdc_offset_high = split_cdc_offset_high.clone();
545 }
546 }
547 break;
549 }
550 Some(chunk) => {
551 let chunk_cardinality = chunk.cardinality() as u64;
552 row_count = row_count.saturating_add(chunk_cardinality);
553 yield Message::Chunk(mapping_chunk(
554 chunk,
555 &self.output_indices,
556 ));
557 }
558 }
559 }
560 }
561 }
562 state_impl
564 .mutate_state(
565 split.split_id,
566 true,
567 row_count,
568 split_cdc_offset_low,
569 split_cdc_offset_high,
570 )
571 .await?;
572 if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
573 assert!(
574 *right_split < split.split_id,
575 "{} {}",
576 *right_split,
577 split.split_id
578 );
579 *right_split = split.split_id;
580 } else {
581 should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
582 }
583 }
584
585 upstream_table_reader.disconnect().await?;
586 tracing::info!(
587 %table_id,
588 upstream_table_name,
589 "CdcBackfill has already finished and will forward messages directly to the downstream"
590 );
591
592 let mut should_report_actor_backfill_done = false;
593 #[for_await]
597 for msg in &mut upstream {
598 let msg = msg?;
599 match msg {
600 Message::Barrier(barrier) => {
601 state_impl.commit_state(barrier.epoch).await?;
602 if is_reset_barrier(&barrier, self.actor_ctx.id) {
603 next_reset_barrier = Some(barrier);
604 continue 'with_cdc_table_snapshot_splits;
605 }
606 if let Some(split_range) = should_report_actor_backfill_progress.take()
607 && let Some(ref progress) = self.progress
608 {
609 progress.update(
610 self.actor_ctx.fragment_id,
611 self.actor_ctx.id,
612 barrier.epoch,
613 generation.expect(
614 "should have set generation when having progress to report",
615 ),
616 split_range,
617 );
618 }
619 if should_report_actor_backfill_done {
620 should_report_actor_backfill_done = false;
621 assert!(!actor_snapshot_splits.is_empty());
622 if let Some(ref progress) = self.progress {
623 progress.finish(
624 self.actor_ctx.fragment_id,
625 self.actor_ctx.id,
626 barrier.epoch,
627 generation.expect(
628 "should have set generation when having progress to report",
629 ),
630 (
631 actor_snapshot_splits[0].split_id,
632 actor_snapshot_splits[actor_snapshot_splits.len() - 1]
633 .split_id,
634 ),
635 );
636 }
637 }
638 yield Message::Barrier(barrier);
639 }
640 Message::Chunk(chunk) => {
641 if actor_snapshot_splits.is_empty() {
642 continue;
643 }
644 if chunk.cardinality() == 0 {
645 continue;
646 }
647
648 let chunk_cdc_offset =
649 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
650 if let Some(high) = actor_cdc_offset_high.as_ref() {
661 if state_impl.is_legacy_state() {
662 actor_cdc_offset_high = None;
664 should_report_actor_backfill_done = true;
665 } else if let Some(ref chunk_offset) = chunk_cdc_offset
666 && *chunk_offset >= *high
667 {
668 actor_cdc_offset_high = None;
670 should_report_actor_backfill_done = true;
671 }
672 }
673 let chunk = mapping_chunk(chunk, &self.output_indices);
674 if let Some(filtered_chunk) = filter_stream_chunk(
675 chunk,
676 ¤t_actor_bounds,
677 snapshot_split_column_index,
678 ) && filtered_chunk.cardinality() > 0
679 {
680 yield Message::Chunk(filtered_chunk);
681 }
682 }
683 msg @ Message::Watermark(_) => {
684 if let Some(msg) = mapping_message(msg, &self.output_indices) {
685 yield msg;
686 }
687 }
688 }
689 }
690 }
691 }
692}
693
694fn filter_stream_chunk(
695 chunk: StreamChunk,
696 bound: &Option<(OwnedRow, OwnedRow)>,
697 snapshot_split_column_index: usize,
698) -> Option<StreamChunk> {
699 let Some((left, right)) = bound else {
700 return None;
701 };
702 assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
703 assert_eq!(
704 right.len(),
705 1,
706 "multiple split columns is not supported yet"
707 );
708 let left_split_key = left.datum_at(0);
709 let right_split_key = right.datum_at(0);
710 let is_leftmost_bound = is_leftmost_bound(left);
711 let is_rightmost_bound = is_rightmost_bound(right);
712 if is_leftmost_bound && is_rightmost_bound {
713 return Some(chunk);
714 }
715 let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
716 let (ops, columns, visibility) = chunk.into_inner();
717 for (row_split_key, v) in columns[snapshot_split_column_index]
718 .iter()
719 .zip_eq_fast(visibility.iter())
720 {
721 if !v {
722 new_bitmap.append(false);
723 continue;
724 }
725 let mut is_in_range = true;
726 if !is_leftmost_bound {
727 is_in_range = cmp_datum(
728 row_split_key,
729 left_split_key,
730 OrderType::ascending_nulls_first(),
731 )
732 .is_ge();
733 }
734 if is_in_range && !is_rightmost_bound {
735 is_in_range = cmp_datum(
736 row_split_key,
737 right_split_key,
738 OrderType::ascending_nulls_first(),
739 )
740 .is_lt();
741 }
742 if !is_in_range {
743 tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
744 }
745 new_bitmap.append(is_in_range);
746 }
747 Some(StreamChunk::with_visibility(
748 ops,
749 columns,
750 new_bitmap.finish(),
751 ))
752}
753
754fn is_leftmost_bound(row: &OwnedRow) -> bool {
755 row.iter().all(|d| d.is_none())
756}
757
758fn is_rightmost_bound(row: &OwnedRow) -> bool {
759 row.iter().all(|d| d.is_none())
760}
761
762impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
763 fn execute(self: Box<Self>) -> BoxedMessageStream {
764 self.execute_inner().boxed()
765 }
766}
767
768fn extends_current_actor_bound(
769 current: &mut Option<(OwnedRow, OwnedRow)>,
770 split: &CdcTableSnapshotSplit,
771) {
772 if current.is_none() {
773 *current = Some((
774 split.left_bound_inclusive.clone(),
775 split.right_bound_exclusive.clone(),
776 ));
777 } else {
778 current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
779 }
780}
781
782fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
783 match barrier.mutation.as_deref() {
784 Some(Mutation::Update(update)) => update
785 .actor_cdc_table_snapshot_splits
786 .splits
787 .contains_key(&actor_id),
788 _ => false,
789 }
790}
791
792fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
793 for i in 1..actor_snapshot_splits.len() {
794 assert_eq!(
795 actor_snapshot_splits[i].split_id,
796 actor_snapshot_splits[i - 1].split_id + 1,
797 "{:?}",
798 actor_snapshot_splits
799 );
800 assert!(
801 cmp_datum(
802 actor_snapshot_splits[i - 1]
803 .right_bound_exclusive
804 .datum_at(0),
805 actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
806 OrderType::ascending_nulls_last(),
807 )
808 .is_lt()
809 );
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use risingwave_common::array::StreamChunk;
816 use risingwave_common::row::OwnedRow;
817 use risingwave_common::types::ScalarImpl;
818
819 use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
820
821 #[test]
822 fn test_filter_stream_chunk() {
823 use risingwave_common::array::StreamChunkTestExt;
824 let chunk = StreamChunk::from_pretty(
825 " I I
826 + 1 6
827 - 2 .
828 U- 3 7
829 U+ 4 .",
830 );
831 let bound = None;
832 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
833 assert!(c.is_none());
834
835 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
836 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
837 assert_eq!(c.unwrap().compact_vis(), chunk);
838
839 let bound = Some((
840 OwnedRow::new(vec![None]),
841 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
842 ));
843 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
844 assert_eq!(
845 c.unwrap().compact_vis(),
846 StreamChunk::from_pretty(
847 " I I
848 + 1 6
849 - 2 .",
850 )
851 );
852
853 let bound = Some((
854 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
855 OwnedRow::new(vec![None]),
856 ));
857 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
858 assert_eq!(
859 c.unwrap().compact_vis(),
860 StreamChunk::from_pretty(
861 " I I
862 U- 3 7
863 U+ 4 .",
864 )
865 );
866
867 let bound = Some((
868 OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
869 OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
870 ));
871 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
872 assert_eq!(
873 c.unwrap().compact_vis(),
874 StreamChunk::from_pretty(
875 " I I
876 - 2 .
877 U- 3 7",
878 )
879 );
880
881 let bound = None;
883 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
884 assert!(c.is_none());
885
886 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
887 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
888 assert_eq!(c.unwrap().compact_vis(), chunk);
889
890 let bound = Some((
891 OwnedRow::new(vec![None]),
892 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
893 ));
894 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
895 assert_eq!(
896 c.unwrap().compact_vis(),
897 StreamChunk::from_pretty(
898 " I I
899 + 1 6
900 - 2 .
901 U+ 4 .",
902 )
903 );
904
905 let bound = Some((
906 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
907 OwnedRow::new(vec![None]),
908 ));
909 let c = filter_stream_chunk(chunk, &bound, 1);
910 assert_eq!(
911 c.unwrap().compact_vis(),
912 StreamChunk::from_pretty(
913 " I I
914 U- 3 7",
915 )
916 );
917 }
918}