1use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{TimeHandling, TimestampHandling, TimestamptzHandling};
27use risingwave_connector::source::cdc::CdcScanOptions;
28use risingwave_connector::source::cdc::external::{
29 CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
30};
31use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
32use rw_futures_util::pausable;
33use thiserror_ext::AsReport;
34use tracing::Instrument;
35
36use crate::executor::UpdateMutation;
37use crate::executor::backfill::cdc::cdc_backfill::{
38 build_reader_and_poll_upstream, transform_upstream,
39};
40use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
41use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
42use crate::executor::backfill::cdc::upstream_table::snapshot::{
43 SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
44};
45use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
46use crate::executor::prelude::*;
47use crate::executor::source::get_infinite_backoff_strategy;
48use crate::task::cdc_progress::CdcProgressReporter;
49pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
50 actor_ctx: ActorContextRef,
51
52 external_table: ExternalStorageTable,
54
55 upstream: Executor,
57
58 output_indices: Vec<usize>,
60
61 output_columns: Vec<ColumnDesc>,
63
64 rate_limit_rps: Option<u32>,
66
67 options: CdcScanOptions,
68
69 state_table: StateTable<S>,
70
71 properties: BTreeMap<String, String>,
72
73 progress: Option<CdcProgressReporter>,
74}
75
76impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
77 #[allow(clippy::too_many_arguments)]
78 pub fn new(
79 actor_ctx: ActorContextRef,
80 external_table: ExternalStorageTable,
81 upstream: Executor,
82 output_indices: Vec<usize>,
83 output_columns: Vec<ColumnDesc>,
84 _metrics: Arc<StreamingMetrics>,
85 state_table: StateTable<S>,
86 rate_limit_rps: Option<u32>,
87 options: CdcScanOptions,
88 properties: BTreeMap<String, String>,
89 progress: Option<CdcProgressReporter>,
90 ) -> Self {
91 Self {
92 actor_ctx,
93 external_table,
94 upstream,
95 output_indices,
96 output_columns,
97 rate_limit_rps,
98 options,
99 state_table,
100 properties,
101 progress,
102 }
103 }
104
105 #[try_stream(ok = Message, error = StreamExecutorError)]
106 async fn execute_inner(mut self) {
107 assert!(!self.options.disable_backfill);
108 let pk_indices = self.external_table.pk_indices().to_vec();
110 let table_id = self.external_table.table_id().table_id;
111 let upstream_table_name = self.external_table.qualified_table_name();
112 let schema_table_name = self.external_table.schema_table_name().clone();
113 let external_database_name = self.external_table.database_name().to_owned();
114 let additional_columns = self
115 .output_columns
116 .iter()
117 .filter(|col| col.additional_column.column_type.is_some())
118 .cloned()
119 .collect_vec();
120 assert!(
121 (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
122 "split pk column index {} out of bound",
123 self.options.backfill_split_pk_column_index
124 );
125 let snapshot_split_column_index =
126 pk_indices[self.options.backfill_split_pk_column_index as usize];
127 let cdc_table_snapshot_split_column =
128 vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
129
130 let mut upstream = self.upstream.execute();
131 let first_barrier = expect_first_barrier(&mut upstream).await?;
133 let timestamp_handling: Option<TimestampHandling> = self
139 .properties
140 .get("debezium.time.precision.mode")
141 .map(|v| v == "connect")
142 .unwrap_or(false)
143 .then_some(TimestampHandling::Milli);
144 let timestamptz_handling: Option<TimestamptzHandling> = self
145 .properties
146 .get("debezium.time.precision.mode")
147 .map(|v| v == "connect")
148 .unwrap_or(false)
149 .then_some(TimestamptzHandling::Milli);
150 let time_handling: Option<TimeHandling> = self
151 .properties
152 .get("debezium.time.precision.mode")
153 .map(|v| v == "connect")
154 .unwrap_or(false)
155 .then_some(TimeHandling::Milli);
156 let handle_toast_columns: bool =
158 self.external_table.table_type() == &ExternalCdcTableType::Postgres;
159 let mut upstream = transform_upstream(
160 upstream,
161 self.output_columns.clone(),
162 timestamp_handling,
163 timestamptz_handling,
164 time_handling,
165 handle_toast_columns,
166 )
167 .boxed();
168 let mut next_reset_barrier = Some(first_barrier);
169 let mut is_reset = false;
170 let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
171 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
173
174 'with_cdc_table_snapshot_splits: loop {
176 assert!(upstream_chunk_buffer.is_empty());
177 let reset_barrier = next_reset_barrier.take().unwrap();
178 let (all_snapshot_splits, generation) = match reset_barrier.mutation.as_deref() {
179 Some(Mutation::Add(add)) => (
180 &add.actor_cdc_table_snapshot_splits.splits,
181 add.actor_cdc_table_snapshot_splits.generation,
182 ),
183 Some(Mutation::Update(update)) => (
184 &update.actor_cdc_table_snapshot_splits.splits,
185 update.actor_cdc_table_snapshot_splits.generation,
186 ),
187 _ => {
188 return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
189 }
190 };
191 let mut actor_snapshot_splits = vec![];
192 if let Some(splits) = all_snapshot_splits.get(&self.actor_ctx.id) {
194 actor_snapshot_splits = splits
195 .iter()
196 .map(|s: &CdcTableSnapshotSplitRaw| {
197 let de = RowDeserializer::new(
198 cdc_table_snapshot_split_column
199 .iter()
200 .map(Field::data_type)
201 .collect_vec(),
202 );
203 let left_bound_inclusive =
204 de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
205 let right_bound_exclusive =
206 de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
207 CdcTableSnapshotSplit {
208 split_id: s.split_id,
209 left_bound_inclusive,
210 right_bound_exclusive,
211 }
212 })
213 .collect();
214 }
215 tracing::debug!(?actor_snapshot_splits, "actor splits");
216 assert_consecutive_splits(&actor_snapshot_splits);
217
218 let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
219 let barrier_epoch = reset_barrier.epoch;
220 yield Message::Barrier(reset_barrier);
221 if !is_reset {
222 state_impl.init_epoch(barrier_epoch).await?;
223 is_reset = true;
224 tracing::info!(table_id, "Initialize executor.");
225 } else {
226 tracing::info!(table_id, "Reset executor.");
227 }
228
229 let mut current_actor_bounds = None;
230 let mut actor_cdc_offset_high: Option<CdcOffset> = None;
231 let mut actor_cdc_offset_low: Option<CdcOffset> = None;
232 let mut next_split_idx = actor_snapshot_splits.len();
234 for (idx, split) in actor_snapshot_splits.iter().enumerate() {
235 let state = state_impl.restore_state(split.split_id).await?;
236 if !state.is_finished {
237 next_split_idx = idx;
238 break;
239 }
240 extends_current_actor_bound(&mut current_actor_bounds, split);
241 if let Some(ref cdc_offset) = state.cdc_offset_low {
242 if let Some(ref cur) = actor_cdc_offset_low {
243 if *cur > *cdc_offset {
244 actor_cdc_offset_low = state.cdc_offset_low.clone();
245 }
246 } else {
247 actor_cdc_offset_low = state.cdc_offset_low.clone();
248 }
249 }
250 if let Some(ref cdc_offset) = state.cdc_offset_high {
251 if let Some(ref cur) = actor_cdc_offset_high {
252 if *cur < *cdc_offset {
253 actor_cdc_offset_high = state.cdc_offset_high.clone();
254 }
255 } else {
256 actor_cdc_offset_high = state.cdc_offset_high.clone();
257 }
258 }
259 }
260 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
261 state_impl
263 .mutate_state(split.split_id, false, 0, None, None)
264 .await?;
265 }
266 let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
267 Some((
268 actor_snapshot_splits[0].split_id,
269 actor_snapshot_splits[next_split_idx - 1].split_id,
270 ))
271 } else {
272 None
273 };
274
275 let mut table_reader: Option<ExternalTableReaderImpl> = None;
278 let external_table = self.external_table.clone();
279 let mut future = Box::pin(async move {
280 let backoff = get_infinite_backoff_strategy();
281 tokio_retry::Retry::spawn(backoff, || async {
282 match external_table.create_table_reader().await {
283 Ok(reader) => Ok(reader),
284 Err(e) => {
285 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
286 Err(e)
287 }
288 }
289 })
290 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
291 .await
292 .expect("Retry create cdc table reader until success.")
293 });
294 loop {
295 if let Some(msg) =
296 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
297 .await?
298 {
299 if let Some(msg) = mapping_message(msg, &self.output_indices) {
300 match msg {
301 Message::Barrier(barrier) => {
302 state_impl.commit_state(barrier.epoch).await?;
303 if is_reset_barrier(&barrier, self.actor_ctx.id) {
304 next_reset_barrier = Some(barrier);
305 continue 'with_cdc_table_snapshot_splits;
306 }
307 yield Message::Barrier(barrier);
308 }
309 Message::Chunk(chunk) => {
310 if chunk.cardinality() == 0 {
311 continue;
312 }
313 if let Some(filtered_chunk) = filter_stream_chunk(
314 chunk,
315 ¤t_actor_bounds,
316 snapshot_split_column_index,
317 ) && filtered_chunk.cardinality() > 0
318 {
319 yield Message::Chunk(filtered_chunk);
320 }
321 }
322 Message::Watermark(_) => {
323 }
325 }
326 }
327 } else {
328 assert!(table_reader.is_some(), "table reader must created");
329 tracing::info!(
330 table_id,
331 upstream_table_name,
332 "table reader created successfully"
333 );
334 break;
335 }
336 }
337 let upstream_table_reader = UpstreamTableReader::new(
338 self.external_table.clone(),
339 table_reader.expect("table reader must created"),
340 );
341 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
343
344 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
346 tracing::info!(
347 table_id,
348 upstream_table_name,
349 ?split,
350 is_snapshot_paused,
351 "start cdc backfill split"
352 );
353 extends_current_actor_bound(&mut current_actor_bounds, split);
354
355 let split_cdc_offset_low = {
356 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
358 tokio::sync::Semaphore::const_new(10);
359
360 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
361 upstream_table_reader.current_cdc_offset().await?
362 };
363 if let Some(ref cdc_offset) = split_cdc_offset_low {
364 if let Some(ref cur) = actor_cdc_offset_low {
365 if *cur > *cdc_offset {
366 actor_cdc_offset_low = split_cdc_offset_low.clone();
367 }
368 } else {
369 actor_cdc_offset_low = split_cdc_offset_low.clone();
370 }
371 }
372 let mut split_cdc_offset_high = None;
373
374 let left_upstream = upstream.by_ref().map(Either::Left);
375 let read_args = SplitSnapshotReadArgs::new(
376 split.left_bound_inclusive.clone(),
377 split.right_bound_exclusive.clone(),
378 cdc_table_snapshot_split_column.clone(),
379 self.rate_limit_rps,
380 additional_columns.clone(),
381 schema_table_name.clone(),
382 external_database_name.clone(),
383 );
384 let right_snapshot = pin!(
385 upstream_table_reader
386 .snapshot_read_table_split(read_args)
387 .map(Either::Right)
388 );
389 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
390 if is_snapshot_paused {
391 snapshot_valve.pause();
392 }
393 let mut backfill_stream =
394 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
395 stream::PollNext::Left
396 });
397 let mut row_count: u64 = 0;
398 #[for_await]
399 for either in &mut backfill_stream {
400 match either {
401 Either::Left(msg) => {
403 match msg? {
404 Message::Barrier(barrier) => {
405 state_impl.commit_state(barrier.epoch).await?;
406 if let Some(mutation) = barrier.mutation.as_deref() {
407 use crate::executor::Mutation;
408 match mutation {
409 Mutation::Pause => {
410 is_snapshot_paused = true;
411 snapshot_valve.pause();
412 }
413 Mutation::Resume => {
414 is_snapshot_paused = false;
415 snapshot_valve.resume();
416 }
417 Mutation::Throttle(some) => {
418 if let Some(new_rate_limit) =
422 some.get(&self.actor_ctx.id)
423 && *new_rate_limit != self.rate_limit_rps
424 {
425 self.rate_limit_rps = *new_rate_limit;
427 }
428 }
429 Mutation::Update(UpdateMutation {
430 dropped_actors,
431 ..
432 }) => {
433 if dropped_actors.contains(&self.actor_ctx.id) {
434 tracing::info!(
435 table_id,
436 upstream_table_name,
437 "CdcBackfill has been dropped due to config change"
438 );
439 for chunk in upstream_chunk_buffer.drain(..) {
440 yield Message::Chunk(chunk);
441 }
442 yield Message::Barrier(barrier);
443 let () = futures::future::pending().await;
444 unreachable!();
445 }
446 }
447 _ => (),
448 }
449 }
450 if is_reset_barrier(&barrier, self.actor_ctx.id) {
451 next_reset_barrier = Some(barrier);
452 for chunk in upstream_chunk_buffer.drain(..) {
453 yield Message::Chunk(chunk);
454 }
455 continue 'with_cdc_table_snapshot_splits;
456 }
457 if let Some(split_range) =
458 should_report_actor_backfill_progress.take()
459 && let Some(ref progress) = self.progress
460 {
461 progress.update(
462 self.actor_ctx.fragment_id,
463 self.actor_ctx.id,
464 barrier.epoch,
465 generation,
466 split_range,
467 );
468 }
469 yield Message::Barrier(barrier);
471 }
472 Message::Chunk(chunk) => {
473 if chunk.cardinality() == 0 {
475 continue;
476 }
477
478 let chunk = mapping_chunk(chunk, &self.output_indices);
491 if let Some(filtered_chunk) = filter_stream_chunk(
492 chunk,
493 ¤t_actor_bounds,
494 snapshot_split_column_index,
495 ) && filtered_chunk.cardinality() > 0
496 {
497 upstream_chunk_buffer.push(filtered_chunk.compact());
499 }
500 }
501 Message::Watermark(_) => {
502 }
504 }
505 }
506 Either::Right(msg) => {
508 match msg? {
509 None => {
510 tracing::info!(
511 table_id,
512 split_id = split.split_id,
513 "snapshot read stream ends"
514 );
515 for chunk in upstream_chunk_buffer.drain(..) {
516 yield Message::Chunk(chunk);
517 }
518
519 split_cdc_offset_high = {
520 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
522 tokio::sync::Semaphore::const_new(10);
523
524 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
525 upstream_table_reader.current_cdc_offset().await?
526 };
527 if let Some(ref cdc_offset) = split_cdc_offset_high {
528 if let Some(ref cur) = actor_cdc_offset_high {
529 if *cur < *cdc_offset {
530 actor_cdc_offset_high =
531 split_cdc_offset_high.clone();
532 }
533 } else {
534 actor_cdc_offset_high = split_cdc_offset_high.clone();
535 }
536 }
537 break;
539 }
540 Some(chunk) => {
541 let chunk_cardinality = chunk.cardinality() as u64;
542 row_count = row_count.saturating_add(chunk_cardinality);
543 yield Message::Chunk(mapping_chunk(
544 chunk,
545 &self.output_indices,
546 ));
547 }
548 }
549 }
550 }
551 }
552 state_impl
554 .mutate_state(
555 split.split_id,
556 true,
557 row_count,
558 split_cdc_offset_low,
559 split_cdc_offset_high,
560 )
561 .await?;
562 if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
563 assert!(
564 *right_split < split.split_id,
565 "{} {}",
566 *right_split,
567 split.split_id
568 );
569 *right_split = split.split_id;
570 } else {
571 should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
572 }
573 }
574
575 upstream_table_reader.disconnect().await?;
576 tracing::info!(
577 table_id,
578 upstream_table_name,
579 "CdcBackfill has already finished and will forward messages directly to the downstream"
580 );
581
582 let mut should_report_actor_backfill_done = false;
583 #[for_await]
587 for msg in &mut upstream {
588 let msg = msg?;
589 match msg {
590 Message::Barrier(barrier) => {
591 state_impl.commit_state(barrier.epoch).await?;
592 if is_reset_barrier(&barrier, self.actor_ctx.id) {
593 next_reset_barrier = Some(barrier);
594 continue 'with_cdc_table_snapshot_splits;
595 }
596 if let Some(split_range) = should_report_actor_backfill_progress.take()
597 && let Some(ref progress) = self.progress
598 {
599 progress.update(
600 self.actor_ctx.fragment_id,
601 self.actor_ctx.id,
602 barrier.epoch,
603 generation,
604 split_range,
605 );
606 }
607 if should_report_actor_backfill_done {
608 should_report_actor_backfill_done = false;
609 assert!(!actor_snapshot_splits.is_empty());
610 if let Some(ref progress) = self.progress {
611 progress.finish(
612 self.actor_ctx.fragment_id,
613 self.actor_ctx.id,
614 barrier.epoch,
615 generation,
616 (
617 actor_snapshot_splits[0].split_id,
618 actor_snapshot_splits[actor_snapshot_splits.len() - 1]
619 .split_id,
620 ),
621 );
622 }
623 }
624 yield Message::Barrier(barrier);
625 }
626 Message::Chunk(chunk) => {
627 if actor_snapshot_splits.is_empty() {
628 continue;
629 }
630 if chunk.cardinality() == 0 {
631 continue;
632 }
633
634 let chunk_cdc_offset =
635 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
636 if let Some(high) = actor_cdc_offset_high.as_ref() {
647 if state_impl.is_legacy_state() {
648 actor_cdc_offset_high = None;
650 should_report_actor_backfill_done = true;
651 } else if let Some(ref chunk_offset) = chunk_cdc_offset
652 && *chunk_offset >= *high
653 {
654 actor_cdc_offset_high = None;
656 should_report_actor_backfill_done = true;
657 }
658 }
659 let chunk = mapping_chunk(chunk, &self.output_indices);
660 if let Some(filtered_chunk) = filter_stream_chunk(
661 chunk,
662 ¤t_actor_bounds,
663 snapshot_split_column_index,
664 ) && filtered_chunk.cardinality() > 0
665 {
666 yield Message::Chunk(filtered_chunk);
667 }
668 }
669 msg @ Message::Watermark(_) => {
670 if let Some(msg) = mapping_message(msg, &self.output_indices) {
671 yield msg;
672 }
673 }
674 }
675 }
676 }
677 }
678}
679
680fn filter_stream_chunk(
681 chunk: StreamChunk,
682 bound: &Option<(OwnedRow, OwnedRow)>,
683 snapshot_split_column_index: usize,
684) -> Option<StreamChunk> {
685 let Some((left, right)) = bound else {
686 return None;
687 };
688 assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
689 assert_eq!(
690 right.len(),
691 1,
692 "multiple split columns is not supported yet"
693 );
694 let left_split_key = left.datum_at(0);
695 let right_split_key = right.datum_at(0);
696 let is_leftmost_bound = is_leftmost_bound(left);
697 let is_rightmost_bound = is_rightmost_bound(right);
698 if is_leftmost_bound && is_rightmost_bound {
699 return Some(chunk);
700 }
701 let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
702 let (ops, columns, visibility) = chunk.into_inner();
703 for (row_split_key, v) in columns[snapshot_split_column_index]
704 .iter()
705 .zip_eq_fast(visibility.iter())
706 {
707 if !v {
708 new_bitmap.append(false);
709 continue;
710 }
711 let mut is_in_range = true;
712 if !is_leftmost_bound {
713 is_in_range = cmp_datum(
714 row_split_key,
715 left_split_key,
716 OrderType::ascending_nulls_first(),
717 )
718 .is_ge();
719 }
720 if is_in_range && !is_rightmost_bound {
721 is_in_range = cmp_datum(
722 row_split_key,
723 right_split_key,
724 OrderType::ascending_nulls_first(),
725 )
726 .is_lt();
727 }
728 if !is_in_range {
729 tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
730 }
731 new_bitmap.append(is_in_range);
732 }
733 Some(StreamChunk::with_visibility(
734 ops,
735 columns,
736 new_bitmap.finish(),
737 ))
738}
739
740fn is_leftmost_bound(row: &OwnedRow) -> bool {
741 row.iter().all(|d| d.is_none())
742}
743
744fn is_rightmost_bound(row: &OwnedRow) -> bool {
745 row.iter().all(|d| d.is_none())
746}
747
748impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
749 fn execute(self: Box<Self>) -> BoxedMessageStream {
750 self.execute_inner().boxed()
751 }
752}
753
754fn extends_current_actor_bound(
755 current: &mut Option<(OwnedRow, OwnedRow)>,
756 split: &CdcTableSnapshotSplit,
757) {
758 if current.is_none() {
759 *current = Some((
760 split.left_bound_inclusive.clone(),
761 split.right_bound_exclusive.clone(),
762 ));
763 } else {
764 current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
765 }
766}
767
768fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
769 match barrier.mutation.as_deref() {
770 Some(Mutation::Update(update)) => update
771 .actor_cdc_table_snapshot_splits
772 .splits
773 .contains_key(&actor_id),
774 _ => false,
775 }
776}
777
778fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
779 for i in 1..actor_snapshot_splits.len() {
780 assert_eq!(
781 actor_snapshot_splits[i].split_id,
782 actor_snapshot_splits[i - 1].split_id + 1,
783 "{:?}",
784 actor_snapshot_splits
785 );
786 assert!(
787 cmp_datum(
788 actor_snapshot_splits[i - 1]
789 .right_bound_exclusive
790 .datum_at(0),
791 actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
792 OrderType::ascending_nulls_last(),
793 )
794 .is_lt()
795 );
796 }
797}
798
799#[cfg(test)]
800mod tests {
801 use risingwave_common::array::StreamChunk;
802 use risingwave_common::row::OwnedRow;
803 use risingwave_common::types::ScalarImpl;
804
805 use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
806
807 #[test]
808 fn test_filter_stream_chunk() {
809 use risingwave_common::array::StreamChunkTestExt;
810 let chunk = StreamChunk::from_pretty(
811 " I I
812 + 1 6
813 - 2 .
814 U- 3 7
815 U+ 4 .",
816 );
817 let bound = None;
818 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
819 assert!(c.is_none());
820
821 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
822 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
823 assert_eq!(c.unwrap().compact(), chunk);
824
825 let bound = Some((
826 OwnedRow::new(vec![None]),
827 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
828 ));
829 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
830 assert_eq!(
831 c.unwrap().compact(),
832 StreamChunk::from_pretty(
833 " I I
834 + 1 6
835 - 2 .",
836 )
837 );
838
839 let bound = Some((
840 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
841 OwnedRow::new(vec![None]),
842 ));
843 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
844 assert_eq!(
845 c.unwrap().compact(),
846 StreamChunk::from_pretty(
847 " I I
848 U- 3 7
849 U+ 4 .",
850 )
851 );
852
853 let bound = Some((
854 OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
855 OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
856 ));
857 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
858 assert_eq!(
859 c.unwrap().compact(),
860 StreamChunk::from_pretty(
861 " I I
862 - 2 .
863 U- 3 7",
864 )
865 );
866
867 let bound = None;
869 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
870 assert!(c.is_none());
871
872 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
873 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
874 assert_eq!(c.unwrap().compact(), chunk);
875
876 let bound = Some((
877 OwnedRow::new(vec![None]),
878 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
879 ));
880 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
881 assert_eq!(
882 c.unwrap().compact(),
883 StreamChunk::from_pretty(
884 " I I
885 + 1 6
886 - 2 .
887 U+ 4 .",
888 )
889 );
890
891 let bound = Some((
892 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
893 OwnedRow::new(vec![None]),
894 ));
895 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
896 assert_eq!(
897 c.unwrap().compact(),
898 StreamChunk::from_pretty(
899 " I I
900 U- 3 7",
901 )
902 );
903 }
904}