1use std::collections::BTreeMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use either::Either;
20use futures::stream;
21use futures::stream::select_with_strategy;
22use itertools::Itertools;
23use risingwave_common::array::DataChunk;
24use risingwave_common::bail;
25use risingwave_common::catalog::ColumnDesc;
26use risingwave_connector::parser::{
27 ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
28 ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, TimeHandling,
29 TimestampHandling, TimestamptzHandling,
30};
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_connector::source::cdc::external::{
33 CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
34};
35use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
36use rw_futures_util::pausable;
37use thiserror_ext::AsReport;
38use tracing::Instrument;
39
40use crate::executor::UpdateMutation;
41use crate::executor::backfill::cdc::state::CdcBackfillState;
42use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
43use crate::executor::backfill::cdc::upstream_table::snapshot::{
44 SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
45};
46use crate::executor::backfill::utils::{
47 get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
48};
49use crate::executor::monitor::CdcBackfillMetrics;
50use crate::executor::prelude::*;
51use crate::executor::source::get_infinite_backoff_strategy;
52use crate::task::CreateMviewProgressReporter;
53
54const METADATA_STATE_LEN: usize = 4;
56
57pub struct CdcBackfillExecutor<S: StateStore> {
58 actor_ctx: ActorContextRef,
59
60 external_table: ExternalStorageTable,
62
63 upstream: Executor,
65
66 output_indices: Vec<usize>,
68
69 output_columns: Vec<ColumnDesc>,
71
72 state_impl: CdcBackfillState<S>,
74
75 progress: Option<CreateMviewProgressReporter>,
78
79 metrics: CdcBackfillMetrics,
80
81 rate_limit_rps: Option<u32>,
83
84 options: CdcScanOptions,
85
86 properties: BTreeMap<String, String>,
87}
88
89impl<S: StateStore> CdcBackfillExecutor<S> {
90 #[allow(clippy::too_many_arguments)]
91 pub fn new(
92 actor_ctx: ActorContextRef,
93 external_table: ExternalStorageTable,
94 upstream: Executor,
95 output_indices: Vec<usize>,
96 output_columns: Vec<ColumnDesc>,
97 progress: Option<CreateMviewProgressReporter>,
98 metrics: Arc<StreamingMetrics>,
99 state_table: StateTable<S>,
100 rate_limit_rps: Option<u32>,
101 options: CdcScanOptions,
102 properties: BTreeMap<String, String>,
103 ) -> Self {
104 let pk_indices = external_table.pk_indices();
105 let upstream_table_id = external_table.table_id().table_id;
106 let state_impl = CdcBackfillState::new(
107 upstream_table_id,
108 state_table,
109 pk_indices.len() + METADATA_STATE_LEN,
110 );
111
112 let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
113 Self {
114 actor_ctx,
115 external_table,
116 upstream,
117 output_indices,
118 output_columns,
119 state_impl,
120 progress,
121 metrics,
122 rate_limit_rps,
123 options,
124 properties,
125 }
126 }
127
128 fn report_metrics(
129 metrics: &CdcBackfillMetrics,
130 snapshot_processed_row_count: u64,
131 upstream_processed_row_count: u64,
132 ) {
133 metrics
134 .cdc_backfill_snapshot_read_row_count
135 .inc_by(snapshot_processed_row_count);
136
137 metrics
138 .cdc_backfill_upstream_output_row_count
139 .inc_by(upstream_processed_row_count);
140 }
141
142 #[try_stream(ok = Message, error = StreamExecutorError)]
143 async fn execute_inner(mut self) {
144 let pk_indices = self.external_table.pk_indices().to_vec();
146 let pk_order = self.external_table.pk_order_types().to_vec();
147
148 let table_id = self.external_table.table_id().table_id;
149 let upstream_table_name = self.external_table.qualified_table_name();
150 let schema_table_name = self.external_table.schema_table_name().clone();
151 let external_database_name = self.external_table.database_name().to_owned();
152
153 let additional_columns = self
154 .output_columns
155 .iter()
156 .filter(|col| col.additional_column.column_type.is_some())
157 .cloned()
158 .collect_vec();
159
160 let mut upstream = self.upstream.execute();
161
162 let mut current_pk_pos: Option<OwnedRow>;
165
166 let first_barrier = expect_first_barrier(&mut upstream).await?;
168
169 let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
170 let first_barrier_epoch = first_barrier.epoch;
171 yield Message::Barrier(first_barrier);
173 let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
174
175 let mut state_impl = self.state_impl;
178
179 state_impl.init_epoch(first_barrier_epoch).await?;
180
181 let state = state_impl.restore_state().await?;
183 current_pk_pos = state.current_pk_pos.clone();
184
185 let need_backfill = !self.options.disable_backfill && !state.is_finished;
186
187 let mut total_snapshot_row_count = state.row_count as u64;
189
190 let mut table_reader: Option<ExternalTableReaderImpl> = None;
195 let external_table = self.external_table.clone();
196 let mut future = Box::pin(async move {
197 let backoff = get_infinite_backoff_strategy();
198 tokio_retry::Retry::spawn(backoff, || async {
199 match external_table.create_table_reader().await {
200 Ok(reader) => Ok(reader),
201 Err(e) => {
202 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
203 Err(e)
204 }
205 }
206 })
207 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
208 .await
209 .expect("Retry create cdc table reader until success.")
210 });
211 let timestamp_handling: Option<TimestampHandling> = self
212 .properties
213 .get("debezium.time.precision.mode")
214 .map(|v| v == "connect")
215 .unwrap_or(false)
216 .then_some(TimestampHandling::Milli);
217 let timestamptz_handling: Option<TimestamptzHandling> = self
218 .properties
219 .get("debezium.time.precision.mode")
220 .map(|v| v == "connect")
221 .unwrap_or(false)
222 .then_some(TimestamptzHandling::Milli);
223 let time_handling: Option<TimeHandling> = self
224 .properties
225 .get("debezium.time.precision.mode")
226 .map(|v| v == "connect")
227 .unwrap_or(false)
228 .then_some(TimeHandling::Milli);
229 let handle_toast_columns: bool =
231 self.external_table.table_type() == &ExternalCdcTableType::Postgres;
232 let mut upstream = transform_upstream(
234 upstream,
235 self.output_columns.clone(),
236 timestamp_handling,
237 timestamptz_handling,
238 time_handling,
239 handle_toast_columns,
240 )
241 .boxed();
242 loop {
243 if let Some(msg) =
244 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
245 .await?
246 {
247 if let Some(msg) = mapping_message(msg, &self.output_indices) {
248 match msg {
249 Message::Barrier(barrier) => {
250 state_impl.commit_state(barrier.epoch).await?;
252 yield Message::Barrier(barrier);
253 }
254 Message::Chunk(chunk) => {
255 if need_backfill {
256 } else {
258 yield Message::Chunk(chunk);
260 }
261 }
262 Message::Watermark(_) => {
263 }
265 }
266 }
267 } else {
268 assert!(table_reader.is_some(), "table reader must created");
269 tracing::info!(
270 table_id,
271 upstream_table_name,
272 "table reader created successfully"
273 );
274 break;
275 }
276 }
277
278 let upstream_table_reader = UpstreamTableReader::new(
279 self.external_table.clone(),
280 table_reader.expect("table reader must created"),
281 );
282
283 let mut upstream = upstream.peekable();
284
285 let mut last_binlog_offset: Option<CdcOffset> = {
286 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
288 tokio::sync::Semaphore::const_new(10);
289
290 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
291 state
292 .last_cdc_offset
293 .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
294 };
295
296 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
297 let mut consumed_binlog_offset: Option<CdcOffset> = None;
298
299 tracing::info!(
300 table_id,
301 upstream_table_name,
302 initial_binlog_offset = ?last_binlog_offset,
303 ?current_pk_pos,
304 is_finished = state.is_finished,
305 is_snapshot_paused,
306 snapshot_row_count = total_snapshot_row_count,
307 rate_limit = self.rate_limit_rps,
308 disable_backfill = self.options.disable_backfill,
309 snapshot_barrier_interval = self.options.snapshot_barrier_interval,
310 snapshot_batch_size = self.options.snapshot_batch_size,
311 "start cdc backfill",
312 );
313
314 if need_backfill {
334 let _ = Pin::new(&mut upstream).peek().await;
337
338 #[for_await]
340 for msg in upstream.by_ref() {
341 match msg? {
342 Message::Barrier(barrier) => {
343 match barrier.mutation.as_deref() {
344 Some(crate::executor::Mutation::Pause) => {
345 is_snapshot_paused = true;
346 tracing::info!(
347 table_id,
348 upstream_table_name,
349 "snapshot is paused by barrier"
350 );
351 }
352 Some(crate::executor::Mutation::Resume) => {
353 is_snapshot_paused = false;
354 tracing::info!(
355 table_id,
356 upstream_table_name,
357 "snapshot is resumed by barrier"
358 );
359 }
360 _ => {
361 }
363 }
364 state_impl.commit_state(barrier.epoch).await?;
366 yield Message::Barrier(barrier);
367 break;
368 }
369 Message::Chunk(ref chunk) => {
370 last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
371 }
372 Message::Watermark(_) => {
373 }
375 }
376 }
377
378 tracing::info!(table_id,
379 upstream_table_name,
380 initial_binlog_offset = ?last_binlog_offset,
381 ?current_pk_pos,
382 is_snapshot_paused,
383 "start cdc backfill loop");
384
385 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
387
388 'backfill_loop: loop {
389 let left_upstream = upstream.by_ref().map(Either::Left);
390
391 let mut snapshot_read_row_cnt: usize = 0;
392 let read_args = SnapshotReadArgs::new(
393 current_pk_pos.clone(),
394 self.rate_limit_rps,
395 pk_indices.clone(),
396 additional_columns.clone(),
397 schema_table_name.clone(),
398 external_database_name.clone(),
399 );
400 let right_snapshot = pin!(
401 upstream_table_reader
402 .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
403 .map(Either::Right)
404 );
405 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
406 if is_snapshot_paused {
407 snapshot_valve.pause();
408 }
409
410 let mut backfill_stream =
412 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
413 stream::PollNext::Left
414 });
415
416 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
417 let mut cur_barrier_upstream_processed_rows: u64 = 0;
418 let mut barrier_count: u32 = 0;
419 let mut pending_barrier = None;
420
421 #[for_await]
422 for either in &mut backfill_stream {
423 match either {
424 Either::Left(msg) => {
426 match msg? {
427 Message::Barrier(barrier) => {
428 barrier_count += 1;
430 let can_start_new_snapshot =
431 barrier_count == self.options.snapshot_barrier_interval;
432
433 if let Some(mutation) = barrier.mutation.as_deref() {
434 use crate::executor::Mutation;
435 match mutation {
436 Mutation::Pause => {
437 is_snapshot_paused = true;
438 snapshot_valve.pause();
439 }
440 Mutation::Resume => {
441 is_snapshot_paused = false;
442 snapshot_valve.resume();
443 }
444 Mutation::Throttle(some) => {
445 if let Some(new_rate_limit) =
446 some.get(&self.actor_ctx.id)
447 && *new_rate_limit != self.rate_limit_rps
448 {
449 self.rate_limit_rps = *new_rate_limit;
450 rate_limit_to_zero = self
451 .rate_limit_rps
452 .is_some_and(|val| val == 0);
453 state_impl
455 .mutate_state(
456 current_pk_pos.clone(),
457 last_binlog_offset.clone(),
458 total_snapshot_row_count,
459 false,
460 )
461 .await?;
462 state_impl.commit_state(barrier.epoch).await?;
463 yield Message::Barrier(barrier);
464
465 continue 'backfill_loop;
467 }
468 }
469 Mutation::Update(UpdateMutation {
470 dropped_actors,
471 ..
472 }) => {
473 if dropped_actors.contains(&self.actor_ctx.id) {
474 tracing::info!(
476 table_id,
477 upstream_table_name,
478 "CdcBackfill has been dropped due to config change"
479 );
480 yield Message::Barrier(barrier);
481 break 'backfill_loop;
482 }
483 }
484 _ => (),
485 }
486 }
487
488 Self::report_metrics(
489 &self.metrics,
490 cur_barrier_snapshot_processed_rows,
491 cur_barrier_upstream_processed_rows,
492 );
493
494 if can_start_new_snapshot {
497 pending_barrier = Some(barrier);
499 tracing::debug!(
500 table_id,
501 ?current_pk_pos,
502 ?snapshot_read_row_cnt,
503 "Prepare to start a new snapshot"
504 );
505 break;
507 } else {
508 state_impl
510 .mutate_state(
511 current_pk_pos.clone(),
512 last_binlog_offset.clone(),
513 total_snapshot_row_count,
514 false,
515 )
516 .await?;
517
518 state_impl.commit_state(barrier.epoch).await?;
519
520 yield Message::Barrier(barrier);
522 }
523 }
524 Message::Chunk(chunk) => {
525 if chunk.cardinality() == 0 {
527 continue;
528 }
529
530 let chunk_binlog_offset =
531 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
532
533 tracing::trace!(
534 "recv changelog chunk: chunk_offset {:?}, capactiy {}",
535 chunk_binlog_offset,
536 chunk.capacity()
537 );
538
539 if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
543 && let Some(chunk_offset) = chunk_binlog_offset
544 && chunk_offset < *last_binlog_offset
545 {
546 tracing::trace!(
547 "skip changelog chunk: chunk_offset {:?}, capacity {}",
548 chunk_offset,
549 chunk.capacity()
550 );
551 continue;
552 }
553 upstream_chunk_buffer.push(chunk.compact());
555 }
556 Message::Watermark(_) => {
557 }
559 }
560 }
561 Either::Right(msg) => {
563 match msg? {
564 None => {
565 tracing::info!(
566 table_id,
567 ?last_binlog_offset,
568 ?current_pk_pos,
569 "snapshot read stream ends"
570 );
571 for chunk in upstream_chunk_buffer.drain(..) {
576 yield Message::Chunk(mapping_chunk(
577 chunk,
578 &self.output_indices,
579 ));
580 }
581
582 break 'backfill_loop;
584 }
585 Some(chunk) => {
586 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
590
591 tracing::trace!(
592 "got a snapshot chunk: len {}, current_pk_pos {:?}",
593 chunk.cardinality(),
594 current_pk_pos
595 );
596 let chunk_cardinality = chunk.cardinality() as u64;
597 cur_barrier_snapshot_processed_rows += chunk_cardinality;
598 total_snapshot_row_count += chunk_cardinality;
599 yield Message::Chunk(mapping_chunk(
600 chunk,
601 &self.output_indices,
602 ));
603 }
604 }
605 }
606 }
607 }
608
609 assert!(pending_barrier.is_some(), "pending_barrier must exist");
610 let pending_barrier = pending_barrier.unwrap();
611
612 let (_, mut snapshot_stream) = backfill_stream.into_inner();
617
618 if !is_snapshot_paused
620 && !rate_limit_to_zero
621 && let Some(msg) = snapshot_stream
622 .next()
623 .instrument_await("consume_snapshot_stream_once")
624 .await
625 {
626 let Either::Right(msg) = msg else {
627 bail!("BUG: snapshot_read contains upstream messages");
628 };
629 match msg? {
630 None => {
631 tracing::info!(
632 table_id,
633 ?last_binlog_offset,
634 ?current_pk_pos,
635 "snapshot read stream ends in the force emit branch"
636 );
637 for chunk in upstream_chunk_buffer.drain(..) {
640 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
641 }
642
643 state_impl
645 .mutate_state(
646 current_pk_pos.clone(),
647 last_binlog_offset.clone(),
648 total_snapshot_row_count,
649 true,
650 )
651 .await?;
652
653 state_impl.commit_state(pending_barrier.epoch).await?;
655 yield Message::Barrier(pending_barrier);
656 break 'backfill_loop;
658 }
659 Some(chunk) => {
660 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
662
663 let row_count = chunk.cardinality() as u64;
664 cur_barrier_snapshot_processed_rows += row_count;
665 total_snapshot_row_count += row_count;
666 snapshot_read_row_cnt += row_count as usize;
667
668 tracing::debug!(
669 table_id,
670 ?current_pk_pos,
671 ?snapshot_read_row_cnt,
672 "force emit a snapshot chunk"
673 );
674 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
675 }
676 }
677 }
678
679 if let Some(current_pos) = ¤t_pk_pos {
682 for chunk in upstream_chunk_buffer.drain(..) {
683 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
684
685 consumed_binlog_offset =
688 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
689
690 yield Message::Chunk(mapping_chunk(
691 mark_cdc_chunk(
692 &offset_parse_func,
693 chunk,
694 current_pos,
695 &pk_indices,
696 &pk_order,
697 last_binlog_offset.clone(),
698 )?,
699 &self.output_indices,
700 ));
701 }
702 } else {
703 upstream_chunk_buffer.clear();
706 }
707
708 if consumed_binlog_offset.is_some() {
710 last_binlog_offset.clone_from(&consumed_binlog_offset);
711 }
712
713 Self::report_metrics(
714 &self.metrics,
715 cur_barrier_snapshot_processed_rows,
716 cur_barrier_upstream_processed_rows,
717 );
718
719 state_impl
721 .mutate_state(
722 current_pk_pos.clone(),
723 last_binlog_offset.clone(),
724 total_snapshot_row_count,
725 false,
726 )
727 .await?;
728
729 state_impl.commit_state(pending_barrier.epoch).await?;
730 yield Message::Barrier(pending_barrier);
731 }
732 } else if self.options.disable_backfill {
733 tracing::info!(
735 table_id,
736 upstream_table_name,
737 "CdcBackfill has been disabled"
738 );
739 state_impl
740 .mutate_state(
741 current_pk_pos.clone(),
742 last_binlog_offset.clone(),
743 total_snapshot_row_count,
744 true,
745 )
746 .await?;
747 }
748
749 upstream_table_reader.disconnect().await?;
750
751 tracing::info!(
752 table_id,
753 upstream_table_name,
754 "CdcBackfill has already finished and will forward messages directly to the downstream"
755 );
756
757 while let Some(Ok(msg)) = upstream.next().await {
760 if let Some(msg) = mapping_message(msg, &self.output_indices) {
761 if let Message::Barrier(barrier) = &msg {
763 state_impl
766 .mutate_state(
767 current_pk_pos.clone(),
768 last_binlog_offset.clone(),
769 total_snapshot_row_count,
770 true,
771 )
772 .await?;
773 state_impl.commit_state(barrier.epoch).await?;
774
775 if let Some(progress) = self.progress.as_mut() {
777 progress.finish(barrier.epoch, total_snapshot_row_count);
778 }
779 yield msg;
780 break;
782 }
783 yield msg;
784 }
785 }
786
787 #[for_await]
791 for msg in upstream {
792 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
795 if let Message::Barrier(barrier) = &msg {
796 state_impl.commit_state(barrier.epoch).await?;
798 }
799 yield msg;
800 }
801 }
802 }
803}
804
805pub(crate) async fn build_reader_and_poll_upstream(
806 upstream: &mut BoxedMessageStream,
807 table_reader: &mut Option<ExternalTableReaderImpl>,
808 future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
809) -> StreamExecutorResult<Option<Message>> {
810 if table_reader.is_some() {
811 return Ok(None);
812 }
813 tokio::select! {
814 biased;
815 reader = &mut *future => {
816 *table_reader = Some(reader);
817 Ok(None)
818 }
819 msg = upstream.next() => {
820 msg.transpose()
821 }
822 }
823}
824
825#[try_stream(ok = Message, error = StreamExecutorError)]
826pub async fn transform_upstream(
827 upstream: BoxedMessageStream,
828 output_columns: Vec<ColumnDesc>,
829 timestamp_handling: Option<TimestampHandling>,
830 timestamptz_handling: Option<TimestamptzHandling>,
831 time_handling: Option<TimeHandling>,
832 handle_toast_columns: bool,
833) {
834 let props = SpecificParserConfig {
835 encoding_config: EncodingProperties::Json(JsonProperties {
836 use_schema_registry: false,
837 timestamp_handling,
838 timestamptz_handling,
839 time_handling,
840 handle_toast_columns,
841 }),
842 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
844 };
845
846 let columns_with_meta = output_columns
848 .iter()
849 .map(SourceColumnDesc::from)
850 .collect_vec();
851 let mut parser = DebeziumParser::new(
852 props,
853 columns_with_meta.clone(),
854 Arc::new(SourceContext::dummy()),
855 )
856 .await
857 .map_err(StreamExecutorError::connector_error)?;
858
859 pin_mut!(upstream);
860 #[for_await]
861 for msg in upstream {
862 let mut msg = msg?;
863 if let Message::Chunk(chunk) = &mut msg {
864 let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
865 let _ = std::mem::replace(chunk, parsed_chunk);
866 }
867 yield msg;
868 }
869}
870
871async fn parse_debezium_chunk(
872 parser: &mut DebeziumParser,
873 chunk: &StreamChunk,
874) -> StreamExecutorResult<StreamChunk> {
875 let mut builder = SourceStreamChunkBuilder::new(
882 parser.columns().to_vec(),
883 SourceCtrlOpts {
884 chunk_size: chunk.capacity(),
885 split_txn: false,
886 },
887 );
888
889 let payloads = chunk.data_chunk().project(&[0]);
893 let offsets = chunk.data_chunk().project(&[1]).compact();
894
895 for payload in payloads.rows() {
897 let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
898 else {
899 panic!("payload must be jsonb");
900 };
901
902 parser
903 .parse_inner(
904 None,
905 Some(jsonb_ref.to_string().as_bytes().to_vec()),
906 builder.row_writer(),
907 )
908 .await
909 .unwrap();
910 }
911 builder.finish_current_chunk();
912
913 let parsed_chunk = {
914 let mut iter = builder.consume_ready_chunks();
915 assert_eq!(1, iter.len());
916 iter.next().unwrap()
917 };
918 assert_eq!(parsed_chunk.capacity(), chunk.capacity()); let (ops, mut columns, vis) = parsed_chunk.into_inner();
920 columns.extend(offsets.into_parts().0);
924
925 Ok(StreamChunk::from_parts(
926 ops,
927 DataChunk::from_parts(columns.into(), vis),
928 ))
929}
930
931impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
932 fn execute(self: Box<Self>) -> BoxedMessageStream {
933 self.execute_inner().boxed()
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use std::collections::BTreeMap;
940 use std::str::FromStr;
941
942 use futures::{StreamExt, pin_mut};
943 use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
944 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
945 use risingwave_common::types::{DataType, Datum, JsonbVal};
946 use risingwave_common::util::epoch::test_epoch;
947 use risingwave_common::util::iter_util::ZipEqFast;
948 use risingwave_connector::source::cdc::CdcScanOptions;
949 use risingwave_storage::memory::MemoryStateStore;
950
951 use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
952 use crate::executor::monitor::StreamingMetrics;
953 use crate::executor::prelude::StateTable;
954 use crate::executor::source::default_source_internal_table;
955 use crate::executor::test_utils::MockSource;
956 use crate::executor::{
957 ActorContext, Barrier, CdcBackfillExecutor, ExternalStorageTable, Message,
958 };
959
960 #[tokio::test]
961 async fn test_transform_upstream_chunk() {
962 let schema = Schema::new(vec![
963 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
967 let pk_indices = vec![1];
968 let (mut tx, source) = MockSource::channel();
969 let source = source.into_executor(schema.clone(), pk_indices.clone());
970 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
972
973 let datums: Vec<Datum> = vec![
974 Some(JsonbVal::from_str(payload).unwrap().into()),
975 Some("file: 1.binlog, pos: 100".to_owned().into()),
976 Some("mydb.orders".to_owned().into()),
977 ];
978
979 println!("datums: {:?}", datums[1]);
980
981 let mut builders = schema.create_array_builders(8);
982 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
983 builder.append(datum.clone());
984 }
985 let columns = builders
986 .into_iter()
987 .map(|builder| builder.finish().into())
988 .collect();
989
990 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
992
993 tx.push_chunk(chunk);
994 let upstream = Box::new(source).execute();
995
996 let columns = vec![
998 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
999 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1000 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1001 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1002 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
1003 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1004 ];
1005
1006 let parsed_stream = transform_upstream(upstream, columns, None, None, None, false);
1007 pin_mut!(parsed_stream);
1008 if let Some(message) = parsed_stream.next().await {
1010 println!("chunk: {:#?}", message.unwrap());
1011 }
1012 }
1013
1014 #[tokio::test]
1015 async fn test_build_reader_and_poll_upstream() {
1016 let actor_context = ActorContext::for_test(1);
1017 let external_storage_table = ExternalStorageTable::for_test_undefined();
1018 let schema = Schema::new(vec![
1019 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
1023 let pk_indices = vec![1];
1024 let (mut tx, source) = MockSource::channel();
1025 let source = source.into_executor(schema.clone(), pk_indices.clone());
1026 let output_indices = vec![1, 0, 4]; let output_columns = vec![
1028 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1029 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1030 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1031 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1032 ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
1033 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1034 ];
1035 let store = MemoryStateStore::new();
1036 let state_table =
1037 StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
1038 .await;
1039 let cdc = CdcBackfillExecutor::new(
1040 actor_context,
1041 external_storage_table,
1042 source,
1043 output_indices,
1044 output_columns,
1045 None,
1046 StreamingMetrics::unused().into(),
1047 state_table,
1048 None,
1049 CdcScanOptions {
1050 disable_backfill: true,
1053 ..CdcScanOptions::default()
1054 },
1055 BTreeMap::default(),
1056 );
1057 let s = cdc.execute_inner();
1061 pin_mut!(s);
1062
1063 tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1065 {
1067 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1068 let datums: Vec<Datum> = vec![
1069 Some(JsonbVal::from_str(payload).unwrap().into()),
1070 Some("file: 1.binlog, pos: 100".to_owned().into()),
1071 Some("mydb.orders".to_owned().into()),
1072 ];
1073 let mut builders = schema.create_array_builders(8);
1074 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1075 builder.append(datum.clone());
1076 }
1077 let columns = builders
1078 .into_iter()
1079 .map(|builder| builder.finish().into())
1080 .collect();
1081 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1083
1084 tx.push_chunk(chunk);
1085 }
1086 let _first_barrier = s.next().await.unwrap();
1087 let upstream_change_log = s.next().await.unwrap().unwrap();
1088 let Message::Chunk(chunk) = upstream_change_log else {
1089 panic!("expect chunk");
1090 };
1091 assert_eq!(chunk.columns().len(), 3);
1092 assert_eq!(chunk.rows().count(), 1);
1093 assert_eq!(
1094 chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1095 vec![Some(44485)]
1096 );
1097 assert_eq!(
1098 chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1099 vec![Some(5)]
1100 );
1101 assert_eq!(
1102 chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1103 vec![Some(100)]
1104 );
1105 }
1106}