1use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{TimeHandling, TimestampHandling, TimestamptzHandling};
27use risingwave_connector::source::cdc::CdcScanOptions;
28use risingwave_connector::source::cdc::external::ExternalTableReaderImpl;
29use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
30use rw_futures_util::pausable;
31use thiserror_ext::AsReport;
32use tracing::Instrument;
33
34use crate::executor::UpdateMutation;
35use crate::executor::backfill::cdc::cdc_backfill::{
36 build_reader_and_poll_upstream, transform_upstream,
37};
38use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
39use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
40use crate::executor::backfill::cdc::upstream_table::snapshot::{
41 SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
42};
43use crate::executor::backfill::utils::{mapping_chunk, mapping_message};
44use crate::executor::prelude::*;
45use crate::executor::source::get_infinite_backoff_strategy;
46
47const METADATA_STATE_LEN: usize = 3;
49
50pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
51 actor_ctx: ActorContextRef,
52
53 external_table: ExternalStorageTable,
55
56 upstream: Executor,
58
59 output_indices: Vec<usize>,
61
62 output_columns: Vec<ColumnDesc>,
64
65 rate_limit_rps: Option<u32>,
67
68 options: CdcScanOptions,
69
70 state_table: StateTable<S>,
71
72 properties: BTreeMap<String, String>,
73}
74
75impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
76 #[allow(clippy::too_many_arguments)]
77 pub fn new(
78 actor_ctx: ActorContextRef,
79 external_table: ExternalStorageTable,
80 upstream: Executor,
81 output_indices: Vec<usize>,
82 output_columns: Vec<ColumnDesc>,
83 _metrics: Arc<StreamingMetrics>,
84 state_table: StateTable<S>,
85 rate_limit_rps: Option<u32>,
86 options: CdcScanOptions,
87 properties: BTreeMap<String, String>,
88 ) -> Self {
89 Self {
90 actor_ctx,
91 external_table,
92 upstream,
93 output_indices,
94 output_columns,
95 rate_limit_rps,
96 options,
97 state_table,
98 properties,
99 }
100 }
101
102 #[try_stream(ok = Message, error = StreamExecutorError)]
103 async fn execute_inner(mut self) {
104 assert!(!self.options.disable_backfill);
105 let pk_indices = self.external_table.pk_indices().to_vec();
107 let table_id = self.external_table.table_id().table_id;
108 let upstream_table_name = self.external_table.qualified_table_name();
109 let schema_table_name = self.external_table.schema_table_name().clone();
110 let external_database_name = self.external_table.database_name().to_owned();
111 let additional_columns = self
112 .output_columns
113 .iter()
114 .filter(|col| col.additional_column.column_type.is_some())
115 .cloned()
116 .collect_vec();
117 assert!(
118 (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
119 "split pk column index {} out of bound",
120 self.options.backfill_split_pk_column_index
121 );
122 let snapshot_split_column_index =
123 pk_indices[self.options.backfill_split_pk_column_index as usize];
124 let cdc_table_snapshot_split_column =
125 vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
126
127 let mut upstream = self.upstream.execute();
128 let first_barrier = expect_first_barrier(&mut upstream).await?;
130 let timestamp_handling: Option<TimestampHandling> = self
136 .properties
137 .get("debezium.time.precision.mode")
138 .map(|v| v == "connect")
139 .unwrap_or(false)
140 .then_some(TimestampHandling::Milli);
141 let timestamptz_handling: Option<TimestamptzHandling> = self
142 .properties
143 .get("debezium.time.precision.mode")
144 .map(|v| v == "connect")
145 .unwrap_or(false)
146 .then_some(TimestamptzHandling::Milli);
147 let time_handling: Option<TimeHandling> = self
148 .properties
149 .get("debezium.time.precision.mode")
150 .map(|v| v == "connect")
151 .unwrap_or(false)
152 .then_some(TimeHandling::Milli);
153 let mut upstream = transform_upstream(
154 upstream,
155 self.output_columns.clone(),
156 timestamp_handling,
157 timestamptz_handling,
158 time_handling,
159 )
160 .boxed();
161 let mut next_reset_barrier = Some(first_barrier);
162 let mut is_reset = false;
163 let mut state_impl =
164 ParallelizedCdcBackfillState::new(self.state_table, METADATA_STATE_LEN);
165 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
166 'with_cdc_table_snapshot_splits: loop {
168 assert!(upstream_chunk_buffer.is_empty());
169 let reset_barrier = next_reset_barrier.take().unwrap();
170 let all_snapshot_splits = match reset_barrier.mutation.as_deref() {
171 Some(Mutation::Add(add)) => &add.actor_cdc_table_snapshot_splits,
172 Some(Mutation::Update(update)) => &update.actor_cdc_table_snapshot_splits,
173 _ => {
174 return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
175 }
176 };
177 let mut actor_snapshot_splits = vec![];
178 if let Some(splits) = all_snapshot_splits.get(&self.actor_ctx.id) {
180 actor_snapshot_splits = splits
181 .iter()
182 .map(|s: &CdcTableSnapshotSplitRaw| {
183 let de = RowDeserializer::new(
184 cdc_table_snapshot_split_column
185 .iter()
186 .map(Field::data_type)
187 .collect_vec(),
188 );
189 let left_bound_inclusive =
190 de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
191 let right_bound_exclusive =
192 de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
193 CdcTableSnapshotSplit {
194 split_id: s.split_id,
195 left_bound_inclusive,
196 right_bound_exclusive,
197 }
198 })
199 .collect();
200 }
201 tracing::debug!(?actor_snapshot_splits, "actor splits");
202 assert_consecutive_splits(&actor_snapshot_splits);
203
204 let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
205 let barrier_epoch = reset_barrier.epoch;
206 yield Message::Barrier(reset_barrier);
207 if !is_reset {
208 state_impl.init_epoch(barrier_epoch).await?;
209 is_reset = true;
210 tracing::info!(table_id, "Initialize executor.");
211 } else {
212 tracing::info!(table_id, "Reset executor.");
213 }
214
215 let mut current_actor_bounds = None;
216 let mut next_split_idx = actor_snapshot_splits.len();
218 for (idx, split) in actor_snapshot_splits.iter().enumerate() {
219 let state = state_impl.restore_state(split.split_id).await?;
220 if !state.is_finished {
221 next_split_idx = idx;
222 break;
223 }
224 extends_current_actor_bound(&mut current_actor_bounds, split);
225 }
226 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
227 state_impl.mutate_state(split.split_id, false, 0).await?;
229 }
230
231 let mut table_reader: Option<ExternalTableReaderImpl> = None;
234 let external_table = self.external_table.clone();
235 let mut future = Box::pin(async move {
236 let backoff = get_infinite_backoff_strategy();
237 tokio_retry::Retry::spawn(backoff, || async {
238 match external_table.create_table_reader().await {
239 Ok(reader) => Ok(reader),
240 Err(e) => {
241 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
242 Err(e)
243 }
244 }
245 })
246 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
247 .await
248 .expect("Retry create cdc table reader until success.")
249 });
250 loop {
251 if let Some(msg) =
252 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
253 .await?
254 {
255 if let Some(msg) = mapping_message(msg, &self.output_indices) {
256 match msg {
257 Message::Barrier(barrier) => {
258 state_impl.commit_state(barrier.epoch).await?;
259 if is_reset_barrier(&barrier, self.actor_ctx.id) {
260 next_reset_barrier = Some(barrier);
261 continue 'with_cdc_table_snapshot_splits;
262 }
263 yield Message::Barrier(barrier);
264 }
265 Message::Chunk(chunk) => {
266 if chunk.cardinality() == 0 {
267 continue;
268 }
269 if let Some(filtered_chunk) = filter_stream_chunk(
270 chunk,
271 ¤t_actor_bounds,
272 snapshot_split_column_index,
273 ) && filtered_chunk.cardinality() > 0
274 {
275 yield Message::Chunk(filtered_chunk);
276 }
277 }
278 Message::Watermark(_) => {
279 }
281 }
282 }
283 } else {
284 assert!(table_reader.is_some(), "table reader must created");
285 tracing::info!(
286 table_id,
287 upstream_table_name,
288 "table reader created successfully"
289 );
290 break;
291 }
292 }
293 let upstream_table_reader = UpstreamTableReader::new(
294 self.external_table.clone(),
295 table_reader.expect("table reader must created"),
296 );
297 for split in actor_snapshot_splits.iter().skip(next_split_idx) {
299 tracing::info!(
300 table_id,
301 upstream_table_name,
302 ?split,
303 is_snapshot_paused,
304 "start cdc backfill split"
305 );
306 extends_current_actor_bound(&mut current_actor_bounds, split);
307 let left_upstream = upstream.by_ref().map(Either::Left);
308 let read_args = SplitSnapshotReadArgs::new(
309 split.left_bound_inclusive.clone(),
310 split.right_bound_exclusive.clone(),
311 cdc_table_snapshot_split_column.clone(),
312 self.rate_limit_rps,
313 additional_columns.clone(),
314 schema_table_name.clone(),
315 external_database_name.clone(),
316 );
317 let right_snapshot = pin!(
318 upstream_table_reader
319 .snapshot_read_table_split(read_args)
320 .map(Either::Right)
321 );
322 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
323 if is_snapshot_paused {
324 snapshot_valve.pause();
325 }
326 let mut backfill_stream =
327 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
328 stream::PollNext::Left
329 });
330 let mut row_count: u64 = 0;
331 #[for_await]
332 for either in &mut backfill_stream {
333 match either {
334 Either::Left(msg) => {
336 match msg? {
337 Message::Barrier(barrier) => {
338 state_impl.commit_state(barrier.epoch).await?;
339 if let Some(mutation) = barrier.mutation.as_deref() {
340 use crate::executor::Mutation;
341 match mutation {
342 Mutation::Pause => {
343 is_snapshot_paused = true;
344 snapshot_valve.pause();
345 }
346 Mutation::Resume => {
347 is_snapshot_paused = false;
348 snapshot_valve.resume();
349 }
350 Mutation::Throttle(some) => {
351 if let Some(new_rate_limit) =
355 some.get(&self.actor_ctx.id)
356 && *new_rate_limit != self.rate_limit_rps
357 {
358 self.rate_limit_rps = *new_rate_limit;
360 }
361 }
362 Mutation::Update(UpdateMutation {
363 dropped_actors,
364 ..
365 }) => {
366 if dropped_actors.contains(&self.actor_ctx.id) {
367 tracing::info!(
368 table_id,
369 upstream_table_name,
370 "CdcBackfill has been dropped due to config change"
371 );
372 for chunk in upstream_chunk_buffer.drain(..) {
373 yield Message::Chunk(mapping_chunk(
374 chunk,
375 &self.output_indices,
376 ));
377 }
378 yield Message::Barrier(barrier);
379 let () = futures::future::pending().await;
380 unreachable!();
381 }
382 }
383 _ => (),
384 }
385 }
386 if is_reset_barrier(&barrier, self.actor_ctx.id) {
387 next_reset_barrier = Some(barrier);
388 for chunk in upstream_chunk_buffer.drain(..) {
389 yield Message::Chunk(mapping_chunk(
390 chunk,
391 &self.output_indices,
392 ));
393 }
394 continue 'with_cdc_table_snapshot_splits;
395 }
396 yield Message::Barrier(barrier);
398 }
399 Message::Chunk(chunk) => {
400 if chunk.cardinality() == 0 {
402 continue;
403 }
404 if let Some(filtered_chunk) = filter_stream_chunk(
405 chunk,
406 ¤t_actor_bounds,
407 snapshot_split_column_index,
408 ) && filtered_chunk.cardinality() > 0
409 {
410 upstream_chunk_buffer.push(filtered_chunk.compact());
412 }
413 }
414 Message::Watermark(_) => {
415 }
417 }
418 }
419 Either::Right(msg) => {
421 match msg? {
422 None => {
423 tracing::info!(
424 table_id,
425 split_id = split.split_id,
426 "snapshot read stream ends"
427 );
428 for chunk in upstream_chunk_buffer.drain(..) {
429 yield Message::Chunk(mapping_chunk(
430 chunk,
431 &self.output_indices,
432 ));
433 }
434 break;
436 }
437 Some(chunk) => {
438 let chunk_cardinality = chunk.cardinality() as u64;
439 row_count = row_count.saturating_add(chunk_cardinality);
440 yield Message::Chunk(mapping_chunk(
441 chunk,
442 &self.output_indices,
443 ));
444 }
445 }
446 }
447 }
448 }
449 state_impl
451 .mutate_state(split.split_id, true, row_count)
452 .await?;
453 }
454
455 upstream_table_reader.disconnect().await?;
456 tracing::info!(
457 table_id,
458 upstream_table_name,
459 "CdcBackfill has already finished and will forward messages directly to the downstream"
460 );
461
462 #[for_await]
466 for msg in &mut upstream {
467 let Some(msg) = mapping_message(msg?, &self.output_indices) else {
468 continue;
469 };
470 match msg {
471 Message::Barrier(barrier) => {
472 state_impl.commit_state(barrier.epoch).await?;
473 if is_reset_barrier(&barrier, self.actor_ctx.id) {
474 next_reset_barrier = Some(barrier);
475 continue 'with_cdc_table_snapshot_splits;
476 }
477 yield Message::Barrier(barrier);
478 }
479 Message::Chunk(chunk) => {
480 if chunk.cardinality() == 0 {
481 continue;
482 }
483 if let Some(filtered_chunk) = filter_stream_chunk(
484 chunk,
485 ¤t_actor_bounds,
486 snapshot_split_column_index,
487 ) && filtered_chunk.cardinality() > 0
488 {
489 yield Message::Chunk(filtered_chunk);
490 }
491 }
492 msg @ Message::Watermark(_) => {
493 yield msg;
494 }
495 }
496 }
497 }
498 }
499}
500
501fn filter_stream_chunk(
502 chunk: StreamChunk,
503 bound: &Option<(OwnedRow, OwnedRow)>,
504 snapshot_split_column_index: usize,
505) -> Option<StreamChunk> {
506 let Some((left, right)) = bound else {
507 return None;
508 };
509 assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
510 assert_eq!(
511 right.len(),
512 1,
513 "multiple split columns is not supported yet"
514 );
515 let left_split_key = left.datum_at(0);
516 let right_split_key = right.datum_at(0);
517 let is_leftmost_bound = is_leftmost_bound(left);
518 let is_rightmost_bound = is_rightmost_bound(right);
519 if is_leftmost_bound && is_rightmost_bound {
520 return Some(chunk);
521 }
522 let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
523 let (ops, columns, visibility) = chunk.into_inner();
524 for (row_split_key, v) in columns[snapshot_split_column_index]
525 .iter()
526 .zip_eq_fast(visibility.iter())
527 {
528 if !v {
529 new_bitmap.append(false);
530 continue;
531 }
532 let mut is_in_range = true;
533 if !is_leftmost_bound {
534 is_in_range = cmp_datum(
535 row_split_key,
536 left_split_key,
537 OrderType::ascending_nulls_first(),
538 )
539 .is_ge();
540 }
541 if is_in_range && !is_rightmost_bound {
542 is_in_range = cmp_datum(
543 row_split_key,
544 right_split_key,
545 OrderType::ascending_nulls_first(),
546 )
547 .is_lt();
548 }
549 if !is_in_range {
550 tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
551 }
552 new_bitmap.append(is_in_range);
553 }
554 Some(StreamChunk::with_visibility(
555 ops,
556 columns,
557 new_bitmap.finish(),
558 ))
559}
560
561fn is_leftmost_bound(row: &OwnedRow) -> bool {
562 row.iter().all(|d| d.is_none())
563}
564
565fn is_rightmost_bound(row: &OwnedRow) -> bool {
566 row.iter().all(|d| d.is_none())
567}
568
569impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
570 fn execute(self: Box<Self>) -> BoxedMessageStream {
571 self.execute_inner().boxed()
572 }
573}
574
575fn extends_current_actor_bound(
576 current: &mut Option<(OwnedRow, OwnedRow)>,
577 split: &CdcTableSnapshotSplit,
578) {
579 if current.is_none() {
580 *current = Some((
581 split.left_bound_inclusive.clone(),
582 split.right_bound_exclusive.clone(),
583 ));
584 } else {
585 current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
586 }
587}
588
589fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
590 match barrier.mutation.as_deref() {
591 Some(Mutation::Update(update)) => update
592 .actor_cdc_table_snapshot_splits
593 .contains_key(&actor_id),
594 _ => false,
595 }
596}
597
598fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
599 for i in 1..actor_snapshot_splits.len() {
600 assert_eq!(
601 actor_snapshot_splits[i].split_id,
602 actor_snapshot_splits[i - 1].split_id + 1,
603 "{:?}",
604 actor_snapshot_splits
605 );
606 assert!(
607 cmp_datum(
608 actor_snapshot_splits[i - 1]
609 .right_bound_exclusive
610 .datum_at(0),
611 actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
612 OrderType::ascending_nulls_last(),
613 )
614 .is_lt()
615 );
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use risingwave_common::array::StreamChunk;
622 use risingwave_common::row::OwnedRow;
623 use risingwave_common::types::ScalarImpl;
624
625 use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
626
627 #[test]
628 fn test_filter_stream_chunk() {
629 use risingwave_common::array::StreamChunkTestExt;
630 let chunk = StreamChunk::from_pretty(
631 " I I
632 + 1 6
633 - 2 .
634 U- 3 7
635 U+ 4 .",
636 );
637 let bound = None;
638 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
639 assert!(c.is_none());
640
641 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
642 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
643 assert_eq!(c.unwrap().compact(), chunk);
644
645 let bound = Some((
646 OwnedRow::new(vec![None]),
647 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
648 ));
649 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
650 assert_eq!(
651 c.unwrap().compact(),
652 StreamChunk::from_pretty(
653 " I I
654 + 1 6
655 - 2 .",
656 )
657 );
658
659 let bound = Some((
660 OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
661 OwnedRow::new(vec![None]),
662 ));
663 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
664 assert_eq!(
665 c.unwrap().compact(),
666 StreamChunk::from_pretty(
667 " I I
668 U- 3 7
669 U+ 4 .",
670 )
671 );
672
673 let bound = Some((
674 OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
675 OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
676 ));
677 let c = filter_stream_chunk(chunk.clone(), &bound, 0);
678 assert_eq!(
679 c.unwrap().compact(),
680 StreamChunk::from_pretty(
681 " I I
682 - 2 .
683 U- 3 7",
684 )
685 );
686
687 let bound = None;
689 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
690 assert!(c.is_none());
691
692 let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
693 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
694 assert_eq!(c.unwrap().compact(), chunk);
695
696 let bound = Some((
697 OwnedRow::new(vec![None]),
698 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
699 ));
700 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
701 assert_eq!(
702 c.unwrap().compact(),
703 StreamChunk::from_pretty(
704 " I I
705 + 1 6
706 - 2 .
707 U+ 4 .",
708 )
709 );
710
711 let bound = Some((
712 OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
713 OwnedRow::new(vec![None]),
714 ));
715 let c = filter_stream_chunk(chunk.clone(), &bound, 1);
716 assert_eq!(
717 c.unwrap().compact(),
718 StreamChunk::from_pretty(
719 " I I
720 U- 3 7",
721 )
722 );
723 }
724}