1use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{
27 BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
28};
29use risingwave_connector::source::cdc::CdcScanOptions;
30use risingwave_connector::source::cdc::external::{
31 CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
32};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use rw_futures_util::pausable;
35use thiserror_ext::AsReport;
36use tracing::Instrument;
37
38use crate::executor::UpdateMutation;
39use crate::executor::backfill::cdc::cdc_backfill::{
40 build_reader_and_poll_upstream, transform_upstream,
41};
42use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
43use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
44use crate::executor::backfill::cdc::upstream_table::snapshot::{
45 SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
46};
47use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
48use crate::executor::prelude::*;
49use crate::executor::source::get_infinite_backoff_strategy;
50use crate::task::cdc_progress::CdcProgressReporter;
51pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
52 actor_ctx: ActorContextRef,
53
54 external_table: ExternalStorageTable,
56
57 upstream: Executor,
59
60 output_indices: Vec<usize>,
62
63 output_columns: Vec<ColumnDesc>,
65
66 rate_limit_rps: Option<u32>,
68
69 options: CdcScanOptions,
70
71 state_table: StateTable<S>,
72
73 properties: BTreeMap<String, String>,
74
75 progress: Option<CdcProgressReporter>,
76}
77
78impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
79 #[allow(clippy::too_many_arguments)]
80 pub fn new(
81 actor_ctx: ActorContextRef,
82 external_table: ExternalStorageTable,
83 upstream: Executor,
84 output_indices: Vec<usize>,
85 output_columns: Vec<ColumnDesc>,
86 _metrics: Arc<StreamingMetrics>,
87 state_table: StateTable<S>,
88 rate_limit_rps: Option<u32>,
89 options: CdcScanOptions,
90 properties: BTreeMap<String, String>,
91 progress: Option<CdcProgressReporter>,
92 ) -> Self {
93 Self {
94 actor_ctx,
95 external_table,
96 upstream,
97 output_indices,
98 output_columns,
99 rate_limit_rps,
100 options,
101 state_table,
102 properties,
103 progress,
104 }
105 }
106
107 #[try_stream(ok = Message, error = StreamExecutorError)]
108 async fn execute_inner(mut self) {
109 assert!(!self.options.disable_backfill);
110 let pk_indices = self.external_table.pk_indices().to_vec();
112 let table_id = self.external_table.table_id().table_id;
113 let upstream_table_name = self.external_table.qualified_table_name();
114 let schema_table_name = self.external_table.schema_table_name().clone();
115 let external_database_name = self.external_table.database_name().to_owned();
116 let additional_columns = self
117 .output_columns
118 .iter()
119 .filter(|col| col.additional_column.column_type.is_some())
120 .cloned()
121 .collect_vec();
122 assert!(
123 (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
124 "split pk column index {} out of bound",
125 self.options.backfill_split_pk_column_index
126 );
127 let snapshot_split_column_index =
128 pk_indices[self.options.backfill_split_pk_column_index as usize];
129 let cdc_table_snapshot_split_column =
130 vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
131
132 let mut upstream = self.upstream.execute();
133 let first_barrier = expect_first_barrier(&mut upstream).await?;
135 let timestamp_handling: Option<TimestampHandling> = self
141 .properties
142 .get("debezium.time.precision.mode")
143 .map(|v| v == "connect")
144 .unwrap_or(false)
145 .then_some(TimestampHandling::Milli);
146 let timestamptz_handling: Option<TimestamptzHandling> = self
147 .properties
148 .get("debezium.time.precision.mode")
149 .map(|v| v == "connect")
150 .unwrap_or(false)
151 .then_some(TimestamptzHandling::Milli);
152 let time_handling: Option<TimeHandling> = self
153 .properties
154 .get("debezium.time.precision.mode")
155 .map(|v| v == "connect")
156 .unwrap_or(false)
157 .then_some(TimeHandling::Milli);
158 let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
159 .properties
160 .get("debezium.bigint.unsigned.handling.mode")
161 .map(|v| v == "precise")
162 .unwrap_or(false)
163 .then_some(BigintUnsignedHandlingMode::Precise);
164 let handle_toast_columns: bool =
166 self.external_table.table_type() == &ExternalCdcTableType::Postgres;
167 let mut upstream = transform_upstream(
168 upstream,
169 self.output_columns.clone(),
170 timestamp_handling,
171 timestamptz_handling,
172 time_handling,
173 bigint_unsigned_handling,
174 handle_toast_columns,
175 )
176 .boxed();
177 let mut next_reset_barrier = Some(first_barrier);
178 let mut is_reset = false;
179 let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
180 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
182
183 'with_cdc_table_snapshot_splits: loop {
185 assert!(upstream_chunk_buffer.is_empty());
186 let reset_barrier = next_reset_barrier.take().unwrap();
187 let (all_snapshot_splits, generation) = match reset_barrier.mutation.as_deref() {
188 Some(Mutation::Add(add)) => (
189 &add.actor_cdc_table_snapshot_splits.splits,
190 add.actor_cdc_table_snapshot_splits.generation,
191 ),
192 Some(Mutation::Update(update)) => (
193 &update.actor_cdc_table_snapshot_splits.splits,
194 update.actor_cdc_table_snapshot_splits.generation,
195 ),
196 _ => {
197 return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
198 }
199 };
200 let mut actor_snapshot_splits = vec![];
201 if let Some(splits) = all_snapshot_splits.get(&self.actor_ctx.id) {
203 actor_snapshot_splits = splits
204 .iter()
205 .map(|s: &CdcTableSnapshotSplitRaw| {
206 let de = RowDeserializer::new(
207 cdc_table_snapshot_split_column
208 .iter()
209 .map(Field::data_type)
210 .collect_vec(),
211 );
212 let left_bound_inclusive =
213 de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
214 let right_bound_exclusive =
215 de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
216 CdcTableSnapshotSplit {
217 split_id: s.split_id,
218 left_bound_inclusive,
219 right_bound_exclusive,
220 }
221 })
222 .collect();
223 }
224 tracing::debug!(?actor_snapshot_splits, "actor splits");
225 assert_consecutive_splits(&actor_snapshot_splits);
226
227 let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
228 let barrier_epoch = reset_barrier.epoch;
229 yield Message::Barrier(reset_barrier);
230 if !is_reset {
231 state_impl.init_epoch(barrier_epoch).await?;
232 is_reset = true;
233 tracing::info!(table_id, "Initialize executor.");
234 } else {
235 tracing::info!(table_id, "Reset executor.");
236 }
237
238 let mut current_actor_bounds = None;
239 let mut actor_cdc_offset_high: Option<CdcOffset> = None;
240 let mut actor_cdc_offset_low: Option<CdcOffset> = None;
241 let mut next_split_idx = actor_snapshot_splits.len();
243 for (idx, split) in actor_snapshot_splits.iter().enumerate() {
244 let state = state_impl.restore_state(split.split_id).await?;
245 if !state.is_finished {
246 next_split_idx = idx;
247 break;
248 }
249 extends_current_actor_bound(&mut current_actor_bounds, split);
250 if let Some(ref cdc_offset) = state.cdc_offset_low {
251 if let Some(ref cur) = actor_cdc_offset_low {
252 if *cur > *cdc_offset {
253 actor_cdc_offset_low = state.cdc_offset_low.clone();
254 }
255 } else {
256 actor_cdc_offset_low = state.cdc_offset_low.clone();
257 }
258 }
259 if let Some(ref cdc_offset) = state.cdc_offset_high {
260 if let Some(ref cur) = actor_cdc_offset_high {
261 if *cur < *cdc_offset {
262 actor_cdc_offset_high = state.cdc_offset_high.clone();
263 }
264 } else {
265 actor_cdc_offset_high = state.cdc_offset_high.clone();
266 }
267 }
268 }
269 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
270 state_impl
272 .mutate_state(split.split_id, false, 0, None, None)
273 .await?;
274 }
275 let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
276 Some((
277 actor_snapshot_splits[0].split_id,
278 actor_snapshot_splits[next_split_idx - 1].split_id,
279 ))
280 } else {
281 None
282 };
283
284 let mut table_reader: Option<ExternalTableReaderImpl> = None;
287 let external_table = self.external_table.clone();
288 let mut future = Box::pin(async move {
289 let backoff = get_infinite_backoff_strategy();
290 tokio_retry::Retry::spawn(backoff, || async {
291 match external_table.create_table_reader().await {
292 Ok(reader) => Ok(reader),
293 Err(e) => {
294 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
295 Err(e)
296 }
297 }
298 })
299 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
300 .await
301 .expect("Retry create cdc table reader until success.")
302 });
303 loop {
304 if let Some(msg) =
305 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
306 .await?
307 {
308 if let Some(msg) = mapping_message(msg, &self.output_indices) {
309 match msg {
310 Message::Barrier(barrier) => {
311 state_impl.commit_state(barrier.epoch).await?;
312 if is_reset_barrier(&barrier, self.actor_ctx.id) {
313 next_reset_barrier = Some(barrier);
314 continue 'with_cdc_table_snapshot_splits;
315 }
316 yield Message::Barrier(barrier);
317 }
318 Message::Chunk(chunk) => {
319 if chunk.cardinality() == 0 {
320 continue;
321 }
322 if let Some(filtered_chunk) = filter_stream_chunk(
323 chunk,
324 ¤t_actor_bounds,
325 snapshot_split_column_index,
326 ) && filtered_chunk.cardinality() > 0
327 {
328 yield Message::Chunk(filtered_chunk);
329 }
330 }
331 Message::Watermark(_) => {
332 }
334 }
335 }
336 } else {
337 assert!(table_reader.is_some(), "table reader must created");
338 tracing::info!(
339 table_id,
340 upstream_table_name,
341 "table reader created successfully"
342 );
343 break;
344 }
345 }
346 let upstream_table_reader = UpstreamTableReader::new(
347 self.external_table.clone(),
348 table_reader.expect("table reader must created"),
349 );
350 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
352
353 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
355 tracing::info!(
356 table_id,
357 upstream_table_name,
358 ?split,
359 is_snapshot_paused,
360 "start cdc backfill split"
361 );
362 extends_current_actor_bound(&mut current_actor_bounds, split);
363
364 let split_cdc_offset_low = {
365 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
367 tokio::sync::Semaphore::const_new(10);
368
369 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
370 upstream_table_reader.current_cdc_offset().await?
371 };
372 if let Some(ref cdc_offset) = split_cdc_offset_low {
373 if let Some(ref cur) = actor_cdc_offset_low {
374 if *cur > *cdc_offset {
375 actor_cdc_offset_low = split_cdc_offset_low.clone();
376 }
377 } else {
378 actor_cdc_offset_low = split_cdc_offset_low.clone();
379 }
380 }
381 let mut split_cdc_offset_high = None;
382
383 let left_upstream = upstream.by_ref().map(Either::Left);
384 let read_args = SplitSnapshotReadArgs::new(
385 split.left_bound_inclusive.clone(),
386 split.right_bound_exclusive.clone(),
387 cdc_table_snapshot_split_column.clone(),
388 self.rate_limit_rps,
389 additional_columns.clone(),
390 schema_table_name.clone(),
391 external_database_name.clone(),
392 );
393 let right_snapshot = pin!(
394 upstream_table_reader
395 .snapshot_read_table_split(read_args)
396 .map(Either::Right)
397 );
398 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
399 if is_snapshot_paused {
400 snapshot_valve.pause();
401 }
402 let mut backfill_stream =
403 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
404 stream::PollNext::Left
405 });
406 let mut row_count: u64 = 0;
407 #[for_await]
408 for either in &mut backfill_stream {
409 match either {
410 Either::Left(msg) => {
412 match msg? {
413 Message::Barrier(barrier) => {
414 state_impl.commit_state(barrier.epoch).await?;
415 if let Some(mutation) = barrier.mutation.as_deref() {
416 use crate::executor::Mutation;
417 match mutation {
418 Mutation::Pause => {
419 is_snapshot_paused = true;
420 snapshot_valve.pause();
421 }
422 Mutation::Resume => {
423 is_snapshot_paused = false;
424 snapshot_valve.resume();
425 }
426 Mutation::Throttle(some) => {
427 if let Some(new_rate_limit) =
431 some.get(&self.actor_ctx.id)
432 && *new_rate_limit != self.rate_limit_rps
433 {
434 self.rate_limit_rps = *new_rate_limit;
436 }
437 }
438 Mutation::Update(UpdateMutation {
439 dropped_actors,
440 ..
441 }) => {
442 if dropped_actors.contains(&self.actor_ctx.id) {
443 tracing::info!(
444 table_id,
445 upstream_table_name,
446 "CdcBackfill has been dropped due to config change"
447 );
448 for chunk in upstream_chunk_buffer.drain(..) {
449 yield Message::Chunk(chunk);
450 }
451 yield Message::Barrier(barrier);
452 let () = futures::future::pending().await;
453 unreachable!();
454 }
455 }
456 _ => (),
457 }
458 }
459 if is_reset_barrier(&barrier, self.actor_ctx.id) {
460 next_reset_barrier = Some(barrier);
461 for chunk in upstream_chunk_buffer.drain(..) {
462 yield Message::Chunk(chunk);
463 }
464 continue 'with_cdc_table_snapshot_splits;
465 }
466 if let Some(split_range) =
467 should_report_actor_backfill_progress.take()
468 && let Some(ref progress) = self.progress
469 {
470 progress.update(
471 self.actor_ctx.fragment_id,
472 self.actor_ctx.id,
473 barrier.epoch,
474 generation,
475 split_range,
476 );
477 }
478 yield Message::Barrier(barrier);
480 }
481 Message::Chunk(chunk) => {
482 if chunk.cardinality() == 0 {
484 continue;
485 }
486
487 let chunk = mapping_chunk(chunk, &self.output_indices);
500 if let Some(filtered_chunk) = filter_stream_chunk(
501 chunk,
502 ¤t_actor_bounds,
503 snapshot_split_column_index,
504 ) && filtered_chunk.cardinality() > 0
505 {
506 upstream_chunk_buffer.push(filtered_chunk.compact_vis());
508 }
509 }
510 Message::Watermark(_) => {
511 }
513 }
514 }
515 Either::Right(msg) => {
517 match msg? {
518 None => {
519 tracing::info!(
520 table_id,
521 split_id = split.split_id,
522 "snapshot read stream ends"
523 );
524 for chunk in upstream_chunk_buffer.drain(..) {
525 yield Message::Chunk(chunk);
526 }
527
528 split_cdc_offset_high = {
529 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
531 tokio::sync::Semaphore::const_new(10);
532
533 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
534 upstream_table_reader.current_cdc_offset().await?
535 };
536 if let Some(ref cdc_offset) = split_cdc_offset_high {
537 if let Some(ref cur) = actor_cdc_offset_high {
538 if *cur < *cdc_offset {
539 actor_cdc_offset_high =
540 split_cdc_offset_high.clone();
541 }
542 } else {
543 actor_cdc_offset_high = split_cdc_offset_high.clone();
544 }
545 }
546 break;
548 }
549 Some(chunk) => {
550 let chunk_cardinality = chunk.cardinality() as u64;
551 row_count = row_count.saturating_add(chunk_cardinality);
552 yield Message::Chunk(mapping_chunk(
553 chunk,
554 &self.output_indices,
555 ));
556 }
557 }
558 }
559 }
560 }
561 state_impl
563 .mutate_state(
564 split.split_id,
565 true,
566 row_count,
567 split_cdc_offset_low,
568 split_cdc_offset_high,
569 )
570 .await?;
571 if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
572 assert!(
573 *right_split < split.split_id,
574 "{} {}",
575 *right_split,
576 split.split_id
577 );
578 *right_split = split.split_id;
579 } else {
580 should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
581 }
582 }
583
584 upstream_table_reader.disconnect().await?;
585 tracing::info!(
586 table_id,
587 upstream_table_name,
588 "CdcBackfill has already finished and will forward messages directly to the downstream"
589 );
590
591 let mut should_report_actor_backfill_done = false;
592 #[for_await]
596 for msg in &mut upstream {
597 let msg = msg?;
598 match msg {
599 Message::Barrier(barrier) => {
600 state_impl.commit_state(barrier.epoch).await?;
601 if is_reset_barrier(&barrier, self.actor_ctx.id) {
602 next_reset_barrier = Some(barrier);
603 continue 'with_cdc_table_snapshot_splits;
604 }
605 if let Some(split_range) = should_report_actor_backfill_progress.take()
606 && let Some(ref progress) = self.progress
607 {
608 progress.update(
609 self.actor_ctx.fragment_id,
610 self.actor_ctx.id,
611 barrier.epoch,
612 generation,
613 split_range,
614 );
615 }
616 if should_report_actor_backfill_done {
617 should_report_actor_backfill_done = false;
618 assert!(!actor_snapshot_splits.is_empty());
619 if let Some(ref progress) = self.progress {
620 progress.finish(
621 self.actor_ctx.fragment_id,
622 self.actor_ctx.id,
623 barrier.epoch,
624 generation,
625 (
626 actor_snapshot_splits[0].split_id,
627 actor_snapshot_splits[actor_snapshot_splits.len() - 1]
628 .split_id,
629 ),
630 );
631 }
632 }
633 yield Message::Barrier(barrier);
634 }
635 Message::Chunk(chunk) => {
636 if actor_snapshot_splits.is_empty() {
637 continue;
638 }
639 if chunk.cardinality() == 0 {
640 continue;
641 }
642
643 let chunk_cdc_offset =
644 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
645 if let Some(high) = actor_cdc_offset_high.as_ref() {
656 if state_impl.is_legacy_state() {
657 actor_cdc_offset_high = None;
659 should_report_actor_backfill_done = true;
660 } else if let Some(ref chunk_offset) = chunk_cdc_offset
661 && *chunk_offset >= *high
662 {
663 actor_cdc_offset_high = None;
665 should_report_actor_backfill_done = true;
666 }
667 }
668 let chunk = mapping_chunk(chunk, &self.output_indices);
669 if let Some(filtered_chunk) = filter_stream_chunk(
670 chunk,
671 ¤t_actor_bounds,
672 snapshot_split_column_index,
673 ) && filtered_chunk.cardinality() > 0
674 {
675 yield Message::Chunk(filtered_chunk);
676 }
677 }
678 msg @ Message::Watermark(_) => {
679 if let Some(msg) = mapping_message(msg, &self.output_indices) {
680 yield msg;
681 }
682 }
683 }
684 }
685 }
686 }
687}
688
689fn filter_stream_chunk(
690 chunk: StreamChunk,
691 bound: &Option<(OwnedRow, OwnedRow)>,
692 snapshot_split_column_index: usize,
693) -> Option<StreamChunk> {
694 let Some((left, right)) = bound else {
695 return None;
696 };
697 assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
698 assert_eq!(
699 right.len(),
700 1,
701 "multiple split columns is not supported yet"
702 );
703 let left_split_key = left.datum_at(0);
704 let right_split_key = right.datum_at(0);
705 let is_leftmost_bound = is_leftmost_bound(left);
706 let is_rightmost_bound = is_rightmost_bound(right);
707 if is_leftmost_bound && is_rightmost_bound {
708 return Some(chunk);
709 }
710 let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
711 let (ops, columns, visibility) = chunk.into_inner();
712 for (row_split_key, v) in columns[snapshot_split_column_index]
713 .iter()
714 .zip_eq_fast(visibility.iter())
715 {
716 if !v {
717 new_bitmap.append(false);
718 continue;
719 }
720 let mut is_in_range = true;
721 if !is_leftmost_bound {
722 is_in_range = cmp_datum(
723 row_split_key,
724 left_split_key,
725 OrderType::ascending_nulls_first(),
726 )
727 .is_ge();
728 }
729 if is_in_range && !is_rightmost_bound {
730 is_in_range = cmp_datum(
731 row_split_key,
732 right_split_key,
733 OrderType::ascending_nulls_first(),
734 )
735 .is_lt();
736 }
737 if !is_in_range {
738 tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
739 }
740 new_bitmap.append(is_in_range);
741 }
742 Some(StreamChunk::with_visibility(
743 ops,
744 columns,
745 new_bitmap.finish(),
746 ))
747}
748
749fn is_leftmost_bound(row: &OwnedRow) -> bool {
750 row.iter().all(|d| d.is_none())
751}
752
753fn is_rightmost_bound(row: &OwnedRow) -> bool {
754 row.iter().all(|d| d.is_none())
755}
756
757impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
758 fn execute(self: Box<Self>) -> BoxedMessageStream {
759 self.execute_inner().boxed()
760 }
761}
762
763fn extends_current_actor_bound(
764 current: &mut Option<(OwnedRow, OwnedRow)>,
765 split: &CdcTableSnapshotSplit,
766) {
767 if current.is_none() {
768 *current = Some((
769 split.left_bound_inclusive.clone(),
770 split.right_bound_exclusive.clone(),
771 ));
772 } else {
773 current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
774 }
775}
776
777fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
778 match barrier.mutation.as_deref() {
779 Some(Mutation::Update(update)) => update
780 .actor_cdc_table_snapshot_splits
781 .splits
782 .contains_key(&actor_id),
783 _ => false,
784 }
785}
786
787fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
788 for i in 1..actor_snapshot_splits.len() {
789 assert_eq!(
790 actor_snapshot_splits[i].split_id,
791 actor_snapshot_splits[i - 1].split_id + 1,
792 "{:?}",
793 actor_snapshot_splits
794 );
795 assert!(
796 cmp_datum(
797 actor_snapshot_splits[i - 1]
798 .right_bound_exclusive
799 .datum_at(0),
800 actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
801 OrderType::ascending_nulls_last(),
802 )
803 .is_lt()
804 );
805 }
806}
807
808#[cfg(test)]
809mod tests {
810 use risingwave_common::array::StreamChunk;
811 use risingwave_common::row::OwnedRow;
812 use risingwave_common::types::ScalarImpl;
813
814 use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
815
816 #[test]
817 fn test_filter_stream_chunk() {
818 use risingwave_common::array::StreamChunkTestExt;
819 let chunk = StreamChunk::from_pretty(
820 " I I
821 + 1 6
822 - 2 .
823 U- 3 7
824 U+ 4 .",
825 );
826 let bound = None;
827 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
828 assert!(c.is_none());
829
830 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
831 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
832 assert_eq!(c.unwrap().compact_vis(), chunk);
833
834 let bound = Some((
835 OwnedRow::new(vec![None]),
836 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
837 ));
838 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
839 assert_eq!(
840 c.unwrap().compact_vis(),
841 StreamChunk::from_pretty(
842 " I I
843 + 1 6
844 - 2 .",
845 )
846 );
847
848 let bound = Some((
849 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
850 OwnedRow::new(vec![None]),
851 ));
852 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
853 assert_eq!(
854 c.unwrap().compact_vis(),
855 StreamChunk::from_pretty(
856 " I I
857 U- 3 7
858 U+ 4 .",
859 )
860 );
861
862 let bound = Some((
863 OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
864 OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
865 ));
866 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
867 assert_eq!(
868 c.unwrap().compact_vis(),
869 StreamChunk::from_pretty(
870 " I I
871 - 2 .
872 U- 3 7",
873 )
874 );
875
876 let bound = None;
878 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
879 assert!(c.is_none());
880
881 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
882 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
883 assert_eq!(c.unwrap().compact_vis(), chunk);
884
885 let bound = Some((
886 OwnedRow::new(vec![None]),
887 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
888 ));
889 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
890 assert_eq!(
891 c.unwrap().compact_vis(),
892 StreamChunk::from_pretty(
893 " I I
894 + 1 6
895 - 2 .
896 U+ 4 .",
897 )
898 );
899
900 let bound = Some((
901 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
902 OwnedRow::new(vec![None]),
903 ));
904 let c = filter_stream_chunk(chunk, &bound, 1);
905 assert_eq!(
906 c.unwrap().compact_vis(),
907 StreamChunk::from_pretty(
908 " I I
909 U- 3 7",
910 )
911 );
912 }
913}