1use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{
27 BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
28};
29use risingwave_connector::source::cdc::CdcScanOptions;
30use risingwave_connector::source::cdc::external::{
31 CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
32};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use rw_futures_util::pausable;
35use thiserror_ext::AsReport;
36use tracing::Instrument;
37
38use crate::executor::UpdateMutation;
39use crate::executor::backfill::cdc::cdc_backfill::{
40 build_reader_and_poll_upstream, transform_upstream,
41};
42use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
43use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
44use crate::executor::backfill::cdc::upstream_table::snapshot::{
45 SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
46};
47use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
48use crate::executor::prelude::*;
49use crate::executor::source::get_infinite_backoff_strategy;
50use crate::task::cdc_progress::CdcProgressReporter;
51pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
52 actor_ctx: ActorContextRef,
53
54 external_table: ExternalStorageTable,
56
57 upstream: Executor,
59
60 output_indices: Vec<usize>,
62
63 output_columns: Vec<ColumnDesc>,
65
66 rate_limit_rps: Option<u32>,
68
69 options: CdcScanOptions,
70
71 state_table: StateTable<S>,
72
73 properties: BTreeMap<String, String>,
74
75 progress: Option<CdcProgressReporter>,
76}
77
78impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
79 #[allow(clippy::too_many_arguments)]
80 pub fn new(
81 actor_ctx: ActorContextRef,
82 external_table: ExternalStorageTable,
83 upstream: Executor,
84 output_indices: Vec<usize>,
85 output_columns: Vec<ColumnDesc>,
86 _metrics: Arc<StreamingMetrics>,
87 state_table: StateTable<S>,
88 rate_limit_rps: Option<u32>,
89 options: CdcScanOptions,
90 properties: BTreeMap<String, String>,
91 progress: Option<CdcProgressReporter>,
92 ) -> Self {
93 Self {
94 actor_ctx,
95 external_table,
96 upstream,
97 output_indices,
98 output_columns,
99 rate_limit_rps,
100 options,
101 state_table,
102 properties,
103 progress,
104 }
105 }
106
107 #[try_stream(ok = Message, error = StreamExecutorError)]
108 async fn execute_inner(mut self) {
109 assert!(!self.options.disable_backfill);
110 let pk_indices = self.external_table.pk_indices().to_vec();
112 let table_id = self.external_table.table_id();
113 let upstream_table_name = self.external_table.qualified_table_name();
114 let schema_table_name = self.external_table.schema_table_name().clone();
115 let external_database_name = self.external_table.database_name().to_owned();
116 let additional_columns = self
117 .output_columns
118 .iter()
119 .filter(|col| col.additional_column.column_type.is_some())
120 .cloned()
121 .collect_vec();
122 assert!(
123 (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
124 "split pk column index {} out of bound",
125 self.options.backfill_split_pk_column_index
126 );
127 let snapshot_split_column_index =
128 pk_indices[self.options.backfill_split_pk_column_index as usize];
129 let cdc_table_snapshot_split_column =
130 vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
131
132 let mut upstream = self.upstream.execute();
133 let first_barrier = expect_first_barrier(&mut upstream).await?;
135 let timestamp_handling: Option<TimestampHandling> = self
141 .properties
142 .get("debezium.time.precision.mode")
143 .map(|v| v == "connect")
144 .unwrap_or(false)
145 .then_some(TimestampHandling::Milli);
146 let timestamptz_handling: Option<TimestamptzHandling> = self
147 .properties
148 .get("debezium.time.precision.mode")
149 .map(|v| v == "connect")
150 .unwrap_or(false)
151 .then_some(TimestamptzHandling::Milli);
152 let time_handling: Option<TimeHandling> = self
153 .properties
154 .get("debezium.time.precision.mode")
155 .map(|v| v == "connect")
156 .unwrap_or(false)
157 .then_some(TimeHandling::Milli);
158 let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
159 .properties
160 .get("debezium.bigint.unsigned.handling.mode")
161 .map(|v| v == "precise")
162 .unwrap_or(false)
163 .then_some(BigintUnsignedHandlingMode::Precise);
164 let handle_toast_columns: bool =
166 self.external_table.table_type() == &ExternalCdcTableType::Postgres;
167 let mut upstream = transform_upstream(
168 upstream,
169 self.output_columns.clone(),
170 timestamp_handling,
171 timestamptz_handling,
172 time_handling,
173 bigint_unsigned_handling,
174 handle_toast_columns,
175 )
176 .boxed();
177 let mut next_reset_barrier = Some(first_barrier);
178 let mut is_reset = false;
179 let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
180 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
182
183 'with_cdc_table_snapshot_splits: loop {
185 assert!(upstream_chunk_buffer.is_empty());
186 let reset_barrier = next_reset_barrier.take().unwrap();
187 let all_snapshot_splits = match reset_barrier.mutation.as_deref() {
188 Some(Mutation::Add(add)) => &add.actor_cdc_table_snapshot_splits.splits,
189
190 Some(Mutation::Update(update)) => &update.actor_cdc_table_snapshot_splits.splits,
191 _ => {
192 return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
193 }
194 };
195 let mut actor_snapshot_splits = vec![];
196 let mut generation = None;
197 if let Some((splits, snapshot_generation)) = all_snapshot_splits.get(&self.actor_ctx.id)
199 {
200 actor_snapshot_splits = splits
201 .iter()
202 .map(|s: &CdcTableSnapshotSplitRaw| {
203 let de = RowDeserializer::new(
204 cdc_table_snapshot_split_column
205 .iter()
206 .map(Field::data_type)
207 .collect_vec(),
208 );
209 let left_bound_inclusive =
210 de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
211 let right_bound_exclusive =
212 de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
213 CdcTableSnapshotSplit {
214 split_id: s.split_id,
215 left_bound_inclusive,
216 right_bound_exclusive,
217 }
218 })
219 .collect();
220 generation = Some(*snapshot_generation);
221 }
222 tracing::debug!(?actor_snapshot_splits, ?generation, "actor splits");
223 assert_consecutive_splits(&actor_snapshot_splits);
224
225 let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
226 let barrier_epoch = reset_barrier.epoch;
227 yield Message::Barrier(reset_barrier);
228 if !is_reset {
229 state_impl.init_epoch(barrier_epoch).await?;
230 is_reset = true;
231 tracing::info!(%table_id, "Initialize executor.");
232 } else {
233 tracing::info!(%table_id, "Reset executor.");
234 }
235
236 let mut current_actor_bounds = None;
237 let mut actor_cdc_offset_high: Option<CdcOffset> = None;
238 let mut actor_cdc_offset_low: Option<CdcOffset> = None;
239 let mut next_split_idx = actor_snapshot_splits.len();
241 for (idx, split) in actor_snapshot_splits.iter().enumerate() {
242 let state = state_impl.restore_state(split.split_id).await?;
243 if !state.is_finished {
244 next_split_idx = idx;
245 break;
246 }
247 extends_current_actor_bound(&mut current_actor_bounds, split);
248 if let Some(ref cdc_offset) = state.cdc_offset_low {
249 if let Some(ref cur) = actor_cdc_offset_low {
250 if *cur > *cdc_offset {
251 actor_cdc_offset_low = state.cdc_offset_low.clone();
252 }
253 } else {
254 actor_cdc_offset_low = state.cdc_offset_low.clone();
255 }
256 }
257 if let Some(ref cdc_offset) = state.cdc_offset_high {
258 if let Some(ref cur) = actor_cdc_offset_high {
259 if *cur < *cdc_offset {
260 actor_cdc_offset_high = state.cdc_offset_high.clone();
261 }
262 } else {
263 actor_cdc_offset_high = state.cdc_offset_high.clone();
264 }
265 }
266 }
267 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
268 state_impl
270 .mutate_state(split.split_id, false, 0, None, None)
271 .await?;
272 }
273 let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
274 Some((
275 actor_snapshot_splits[0].split_id,
276 actor_snapshot_splits[next_split_idx - 1].split_id,
277 ))
278 } else {
279 None
280 };
281
282 let mut table_reader: Option<ExternalTableReaderImpl> = None;
285 let external_table = self.external_table.clone();
286 let mut future = Box::pin(async move {
287 let backoff = get_infinite_backoff_strategy();
288 tokio_retry::Retry::spawn(backoff, || async {
289 match external_table.create_table_reader().await {
290 Ok(reader) => Ok(reader),
291 Err(e) => {
292 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
293 Err(e)
294 }
295 }
296 })
297 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
298 .await
299 .expect("Retry create cdc table reader until success.")
300 });
301 loop {
302 if let Some(msg) =
303 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
304 .await?
305 {
306 if let Some(msg) = mapping_message(msg, &self.output_indices) {
307 match msg {
308 Message::Barrier(barrier) => {
309 state_impl.commit_state(barrier.epoch).await?;
310 if is_reset_barrier(&barrier, self.actor_ctx.id) {
311 next_reset_barrier = Some(barrier);
312 continue 'with_cdc_table_snapshot_splits;
313 }
314 yield Message::Barrier(barrier);
315 }
316 Message::Chunk(chunk) => {
317 if chunk.cardinality() == 0 {
318 continue;
319 }
320 if let Some(filtered_chunk) = filter_stream_chunk(
321 chunk,
322 ¤t_actor_bounds,
323 snapshot_split_column_index,
324 ) && filtered_chunk.cardinality() > 0
325 {
326 yield Message::Chunk(filtered_chunk);
327 }
328 }
329 Message::Watermark(_) => {
330 }
332 }
333 }
334 } else {
335 assert!(table_reader.is_some(), "table reader must created");
336 tracing::info!(
337 %table_id,
338 upstream_table_name,
339 "table reader created successfully"
340 );
341 break;
342 }
343 }
344 let upstream_table_reader = UpstreamTableReader::new(
345 self.external_table.clone(),
346 table_reader.expect("table reader must created"),
347 );
348 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
350
351 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
353 tracing::info!(
354 %table_id,
355 upstream_table_name,
356 ?split,
357 is_snapshot_paused,
358 "start cdc backfill split"
359 );
360 extends_current_actor_bound(&mut current_actor_bounds, split);
361
362 let split_cdc_offset_low = {
363 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
365 tokio::sync::Semaphore::const_new(10);
366
367 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
368 upstream_table_reader.current_cdc_offset().await?
369 };
370 if let Some(ref cdc_offset) = split_cdc_offset_low {
371 if let Some(ref cur) = actor_cdc_offset_low {
372 if *cur > *cdc_offset {
373 actor_cdc_offset_low = split_cdc_offset_low.clone();
374 }
375 } else {
376 actor_cdc_offset_low = split_cdc_offset_low.clone();
377 }
378 }
379 let mut split_cdc_offset_high = None;
380
381 let left_upstream = upstream.by_ref().map(Either::Left);
382 let read_args = SplitSnapshotReadArgs::new(
383 split.left_bound_inclusive.clone(),
384 split.right_bound_exclusive.clone(),
385 cdc_table_snapshot_split_column.clone(),
386 self.rate_limit_rps,
387 additional_columns.clone(),
388 schema_table_name.clone(),
389 external_database_name.clone(),
390 );
391 let right_snapshot = pin!(
392 upstream_table_reader
393 .snapshot_read_table_split(read_args)
394 .map(Either::Right)
395 );
396 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
397 if is_snapshot_paused {
398 snapshot_valve.pause();
399 }
400 let mut backfill_stream =
401 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
402 stream::PollNext::Left
403 });
404 let mut row_count: u64 = 0;
405 #[for_await]
406 for either in &mut backfill_stream {
407 match either {
408 Either::Left(msg) => {
410 match msg? {
411 Message::Barrier(barrier) => {
412 state_impl.commit_state(barrier.epoch).await?;
413 if let Some(mutation) = barrier.mutation.as_deref() {
414 use crate::executor::Mutation;
415 match mutation {
416 Mutation::Pause => {
417 is_snapshot_paused = true;
418 snapshot_valve.pause();
419 }
420 Mutation::Resume => {
421 is_snapshot_paused = false;
422 snapshot_valve.resume();
423 }
424 Mutation::Throttle(some) => {
425 if let Some(new_rate_limit) =
429 some.get(&self.actor_ctx.fragment_id)
430 && *new_rate_limit != self.rate_limit_rps
431 {
432 self.rate_limit_rps = *new_rate_limit;
434 }
435 }
436 Mutation::Update(UpdateMutation {
437 dropped_actors,
438 ..
439 }) => {
440 if dropped_actors.contains(&self.actor_ctx.id) {
441 tracing::info!(
442 %table_id,
443 upstream_table_name,
444 "CdcBackfill has been dropped due to config change"
445 );
446 for chunk in upstream_chunk_buffer.drain(..) {
447 yield Message::Chunk(chunk);
448 }
449 yield Message::Barrier(barrier);
450 let () = futures::future::pending().await;
451 unreachable!();
452 }
453 }
454 _ => (),
455 }
456 }
457 if is_reset_barrier(&barrier, self.actor_ctx.id) {
458 next_reset_barrier = Some(barrier);
459 for chunk in upstream_chunk_buffer.drain(..) {
460 yield Message::Chunk(chunk);
461 }
462 continue 'with_cdc_table_snapshot_splits;
463 }
464 if let Some(split_range) =
465 should_report_actor_backfill_progress.take()
466 && let Some(ref progress) = self.progress
467 {
468 progress.update(
469 self.actor_ctx.fragment_id,
470 self.actor_ctx.id,
471 barrier.epoch,
472 generation.expect("should have set generation when having progress to report"),
473 split_range,
474 );
475 }
476 yield Message::Barrier(barrier);
478 }
479 Message::Chunk(chunk) => {
480 if chunk.cardinality() == 0 {
482 continue;
483 }
484
485 let chunk = mapping_chunk(chunk, &self.output_indices);
498 if let Some(filtered_chunk) = filter_stream_chunk(
499 chunk,
500 ¤t_actor_bounds,
501 snapshot_split_column_index,
502 ) && filtered_chunk.cardinality() > 0
503 {
504 upstream_chunk_buffer.push(filtered_chunk.compact_vis());
506 }
507 }
508 Message::Watermark(_) => {
509 }
511 }
512 }
513 Either::Right(msg) => {
515 match msg? {
516 None => {
517 tracing::info!(
518 %table_id,
519 split_id = split.split_id,
520 "snapshot read stream ends"
521 );
522 for chunk in upstream_chunk_buffer.drain(..) {
523 yield Message::Chunk(chunk);
524 }
525
526 split_cdc_offset_high = {
527 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
529 tokio::sync::Semaphore::const_new(10);
530
531 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
532 upstream_table_reader.current_cdc_offset().await?
533 };
534 if let Some(ref cdc_offset) = split_cdc_offset_high {
535 if let Some(ref cur) = actor_cdc_offset_high {
536 if *cur < *cdc_offset {
537 actor_cdc_offset_high =
538 split_cdc_offset_high.clone();
539 }
540 } else {
541 actor_cdc_offset_high = split_cdc_offset_high.clone();
542 }
543 }
544 break;
546 }
547 Some(chunk) => {
548 let chunk_cardinality = chunk.cardinality() as u64;
549 row_count = row_count.saturating_add(chunk_cardinality);
550 yield Message::Chunk(mapping_chunk(
551 chunk,
552 &self.output_indices,
553 ));
554 }
555 }
556 }
557 }
558 }
559 state_impl
561 .mutate_state(
562 split.split_id,
563 true,
564 row_count,
565 split_cdc_offset_low,
566 split_cdc_offset_high,
567 )
568 .await?;
569 if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
570 assert!(
571 *right_split < split.split_id,
572 "{} {}",
573 *right_split,
574 split.split_id
575 );
576 *right_split = split.split_id;
577 } else {
578 should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
579 }
580 }
581
582 upstream_table_reader.disconnect().await?;
583 tracing::info!(
584 %table_id,
585 upstream_table_name,
586 "CdcBackfill has already finished and will forward messages directly to the downstream"
587 );
588
589 let mut should_report_actor_backfill_done = false;
590 #[for_await]
594 for msg in &mut upstream {
595 let msg = msg?;
596 match msg {
597 Message::Barrier(barrier) => {
598 state_impl.commit_state(barrier.epoch).await?;
599 if is_reset_barrier(&barrier, self.actor_ctx.id) {
600 next_reset_barrier = Some(barrier);
601 continue 'with_cdc_table_snapshot_splits;
602 }
603 if let Some(split_range) = should_report_actor_backfill_progress.take()
604 && let Some(ref progress) = self.progress
605 {
606 progress.update(
607 self.actor_ctx.fragment_id,
608 self.actor_ctx.id,
609 barrier.epoch,
610 generation.expect(
611 "should have set generation when having progress to report",
612 ),
613 split_range,
614 );
615 }
616 if should_report_actor_backfill_done {
617 should_report_actor_backfill_done = false;
618 assert!(!actor_snapshot_splits.is_empty());
619 if let Some(ref progress) = self.progress {
620 progress.finish(
621 self.actor_ctx.fragment_id,
622 self.actor_ctx.id,
623 barrier.epoch,
624 generation.expect(
625 "should have set generation when having progress to report",
626 ),
627 (
628 actor_snapshot_splits[0].split_id,
629 actor_snapshot_splits[actor_snapshot_splits.len() - 1]
630 .split_id,
631 ),
632 );
633 }
634 }
635 yield Message::Barrier(barrier);
636 }
637 Message::Chunk(chunk) => {
638 if actor_snapshot_splits.is_empty() {
639 continue;
640 }
641 if chunk.cardinality() == 0 {
642 continue;
643 }
644
645 let chunk_cdc_offset =
646 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
647 if let Some(high) = actor_cdc_offset_high.as_ref() {
658 if state_impl.is_legacy_state() {
659 actor_cdc_offset_high = None;
661 should_report_actor_backfill_done = true;
662 } else if let Some(ref chunk_offset) = chunk_cdc_offset
663 && *chunk_offset >= *high
664 {
665 actor_cdc_offset_high = None;
667 should_report_actor_backfill_done = true;
668 }
669 }
670 let chunk = mapping_chunk(chunk, &self.output_indices);
671 if let Some(filtered_chunk) = filter_stream_chunk(
672 chunk,
673 ¤t_actor_bounds,
674 snapshot_split_column_index,
675 ) && filtered_chunk.cardinality() > 0
676 {
677 yield Message::Chunk(filtered_chunk);
678 }
679 }
680 msg @ Message::Watermark(_) => {
681 if let Some(msg) = mapping_message(msg, &self.output_indices) {
682 yield msg;
683 }
684 }
685 }
686 }
687 }
688 }
689}
690
691fn filter_stream_chunk(
692 chunk: StreamChunk,
693 bound: &Option<(OwnedRow, OwnedRow)>,
694 snapshot_split_column_index: usize,
695) -> Option<StreamChunk> {
696 let Some((left, right)) = bound else {
697 return None;
698 };
699 assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
700 assert_eq!(
701 right.len(),
702 1,
703 "multiple split columns is not supported yet"
704 );
705 let left_split_key = left.datum_at(0);
706 let right_split_key = right.datum_at(0);
707 let is_leftmost_bound = is_leftmost_bound(left);
708 let is_rightmost_bound = is_rightmost_bound(right);
709 if is_leftmost_bound && is_rightmost_bound {
710 return Some(chunk);
711 }
712 let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
713 let (ops, columns, visibility) = chunk.into_inner();
714 for (row_split_key, v) in columns[snapshot_split_column_index]
715 .iter()
716 .zip_eq_fast(visibility.iter())
717 {
718 if !v {
719 new_bitmap.append(false);
720 continue;
721 }
722 let mut is_in_range = true;
723 if !is_leftmost_bound {
724 is_in_range = cmp_datum(
725 row_split_key,
726 left_split_key,
727 OrderType::ascending_nulls_first(),
728 )
729 .is_ge();
730 }
731 if is_in_range && !is_rightmost_bound {
732 is_in_range = cmp_datum(
733 row_split_key,
734 right_split_key,
735 OrderType::ascending_nulls_first(),
736 )
737 .is_lt();
738 }
739 if !is_in_range {
740 tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
741 }
742 new_bitmap.append(is_in_range);
743 }
744 Some(StreamChunk::with_visibility(
745 ops,
746 columns,
747 new_bitmap.finish(),
748 ))
749}
750
751fn is_leftmost_bound(row: &OwnedRow) -> bool {
752 row.iter().all(|d| d.is_none())
753}
754
755fn is_rightmost_bound(row: &OwnedRow) -> bool {
756 row.iter().all(|d| d.is_none())
757}
758
759impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
760 fn execute(self: Box<Self>) -> BoxedMessageStream {
761 self.execute_inner().boxed()
762 }
763}
764
765fn extends_current_actor_bound(
766 current: &mut Option<(OwnedRow, OwnedRow)>,
767 split: &CdcTableSnapshotSplit,
768) {
769 if current.is_none() {
770 *current = Some((
771 split.left_bound_inclusive.clone(),
772 split.right_bound_exclusive.clone(),
773 ));
774 } else {
775 current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
776 }
777}
778
779fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
780 match barrier.mutation.as_deref() {
781 Some(Mutation::Update(update)) => update
782 .actor_cdc_table_snapshot_splits
783 .splits
784 .contains_key(&actor_id),
785 _ => false,
786 }
787}
788
789fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
790 for i in 1..actor_snapshot_splits.len() {
791 assert_eq!(
792 actor_snapshot_splits[i].split_id,
793 actor_snapshot_splits[i - 1].split_id + 1,
794 "{:?}",
795 actor_snapshot_splits
796 );
797 assert!(
798 cmp_datum(
799 actor_snapshot_splits[i - 1]
800 .right_bound_exclusive
801 .datum_at(0),
802 actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
803 OrderType::ascending_nulls_last(),
804 )
805 .is_lt()
806 );
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use risingwave_common::array::StreamChunk;
813 use risingwave_common::row::OwnedRow;
814 use risingwave_common::types::ScalarImpl;
815
816 use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
817
818 #[test]
819 fn test_filter_stream_chunk() {
820 use risingwave_common::array::StreamChunkTestExt;
821 let chunk = StreamChunk::from_pretty(
822 " I I
823 + 1 6
824 - 2 .
825 U- 3 7
826 U+ 4 .",
827 );
828 let bound = None;
829 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
830 assert!(c.is_none());
831
832 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
833 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
834 assert_eq!(c.unwrap().compact_vis(), chunk);
835
836 let bound = Some((
837 OwnedRow::new(vec![None]),
838 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
839 ));
840 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
841 assert_eq!(
842 c.unwrap().compact_vis(),
843 StreamChunk::from_pretty(
844 " I I
845 + 1 6
846 - 2 .",
847 )
848 );
849
850 let bound = Some((
851 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
852 OwnedRow::new(vec![None]),
853 ));
854 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
855 assert_eq!(
856 c.unwrap().compact_vis(),
857 StreamChunk::from_pretty(
858 " I I
859 U- 3 7
860 U+ 4 .",
861 )
862 );
863
864 let bound = Some((
865 OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
866 OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
867 ));
868 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
869 assert_eq!(
870 c.unwrap().compact_vis(),
871 StreamChunk::from_pretty(
872 " I I
873 - 2 .
874 U- 3 7",
875 )
876 );
877
878 let bound = None;
880 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
881 assert!(c.is_none());
882
883 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
884 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
885 assert_eq!(c.unwrap().compact_vis(), chunk);
886
887 let bound = Some((
888 OwnedRow::new(vec![None]),
889 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
890 ));
891 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
892 assert_eq!(
893 c.unwrap().compact_vis(),
894 StreamChunk::from_pretty(
895 " I I
896 + 1 6
897 - 2 .
898 U+ 4 .",
899 )
900 );
901
902 let bound = Some((
903 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
904 OwnedRow::new(vec![None]),
905 ));
906 let c = filter_stream_chunk(chunk, &bound, 1);
907 assert_eq!(
908 c.unwrap().compact_vis(),
909 StreamChunk::from_pretty(
910 " I I
911 U- 3 7",
912 )
913 );
914 }
915}