1use std::collections::BTreeMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use either::Either;
20use futures::stream;
21use futures::stream::select_with_strategy;
22use itertools::Itertools;
23use risingwave_common::array::DataChunk;
24use risingwave_common::bail;
25use risingwave_common::catalog::ColumnDesc;
26use risingwave_connector::parser::{
27 BigintUnsignedHandlingMode, ByteStreamSourceParser, DebeziumParser, DebeziumProps,
28 EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder,
29 SpecificParserConfig, TimeHandling, TimestampHandling, TimestamptzHandling,
30};
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_connector::source::cdc::external::{
33 CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
34};
35use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
36use rw_futures_util::pausable;
37use thiserror_ext::AsReport;
38use tracing::Instrument;
39
40use crate::executor::UpdateMutation;
41use crate::executor::backfill::cdc::state::CdcBackfillState;
42use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
43use crate::executor::backfill::cdc::upstream_table::snapshot::{
44 SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
45};
46use crate::executor::backfill::utils::{
47 get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
48};
49use crate::executor::monitor::CdcBackfillMetrics;
50use crate::executor::prelude::*;
51use crate::executor::source::get_infinite_backoff_strategy;
52use crate::task::CreateMviewProgressReporter;
53
54const METADATA_STATE_LEN: usize = 4;
56
57pub struct CdcBackfillExecutor<S: StateStore> {
58 actor_ctx: ActorContextRef,
59
60 external_table: ExternalStorageTable,
62
63 upstream: Executor,
65
66 output_indices: Vec<usize>,
68
69 output_columns: Vec<ColumnDesc>,
71
72 state_impl: CdcBackfillState<S>,
74
75 progress: Option<CreateMviewProgressReporter>,
78
79 metrics: CdcBackfillMetrics,
80
81 rate_limit_rps: Option<u32>,
83
84 options: CdcScanOptions,
85
86 properties: BTreeMap<String, String>,
87}
88
89impl<S: StateStore> CdcBackfillExecutor<S> {
90 #[allow(clippy::too_many_arguments)]
91 pub fn new(
92 actor_ctx: ActorContextRef,
93 external_table: ExternalStorageTable,
94 upstream: Executor,
95 output_indices: Vec<usize>,
96 output_columns: Vec<ColumnDesc>,
97 progress: Option<CreateMviewProgressReporter>,
98 metrics: Arc<StreamingMetrics>,
99 state_table: StateTable<S>,
100 rate_limit_rps: Option<u32>,
101 options: CdcScanOptions,
102 properties: BTreeMap<String, String>,
103 ) -> Self {
104 let pk_indices = external_table.pk_indices();
105 let upstream_table_id = external_table.table_id();
106 let state_impl = CdcBackfillState::new(
107 upstream_table_id,
108 state_table,
109 pk_indices.len() + METADATA_STATE_LEN,
110 );
111
112 let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
113 Self {
114 actor_ctx,
115 external_table,
116 upstream,
117 output_indices,
118 output_columns,
119 state_impl,
120 progress,
121 metrics,
122 rate_limit_rps,
123 options,
124 properties,
125 }
126 }
127
128 fn report_metrics(
129 metrics: &CdcBackfillMetrics,
130 snapshot_processed_row_count: u64,
131 upstream_processed_row_count: u64,
132 ) {
133 metrics
134 .cdc_backfill_snapshot_read_row_count
135 .inc_by(snapshot_processed_row_count);
136
137 metrics
138 .cdc_backfill_upstream_output_row_count
139 .inc_by(upstream_processed_row_count);
140 }
141
142 #[try_stream(ok = Message, error = StreamExecutorError)]
143 async fn execute_inner(mut self) {
144 let pk_indices = self.external_table.pk_indices().to_vec();
146 let pk_order = self.external_table.pk_order_types().to_vec();
147
148 let table_id = self.external_table.table_id();
149 let upstream_table_name = self.external_table.qualified_table_name();
150 let schema_table_name = self.external_table.schema_table_name().clone();
151 let external_database_name = self.external_table.database_name().to_owned();
152
153 let additional_columns = self
154 .output_columns
155 .iter()
156 .filter(|col| col.additional_column.column_type.is_some())
157 .cloned()
158 .collect_vec();
159
160 let mut upstream = self.upstream.execute();
161
162 let mut current_pk_pos: Option<OwnedRow>;
165
166 let first_barrier = expect_first_barrier(&mut upstream).await?;
168
169 let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
170 let first_barrier_epoch = first_barrier.epoch;
171 yield Message::Barrier(first_barrier);
173 let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
174
175 let mut state_impl = self.state_impl;
178
179 state_impl.init_epoch(first_barrier_epoch).await?;
180
181 let state = state_impl.restore_state().await?;
183 current_pk_pos = state.current_pk_pos.clone();
184
185 let need_backfill = !self.options.disable_backfill && !state.is_finished;
186
187 let mut total_snapshot_row_count = state.row_count as u64;
189
190 let mut table_reader: Option<ExternalTableReaderImpl> = None;
195 let external_table = self.external_table.clone();
196 let mut future = Box::pin(async move {
197 let backoff = get_infinite_backoff_strategy();
198 tokio_retry::Retry::spawn(backoff, || async {
199 match external_table.create_table_reader().await {
200 Ok(reader) => Ok(reader),
201 Err(e) => {
202 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
203 Err(e)
204 }
205 }
206 })
207 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
208 .await
209 .expect("Retry create cdc table reader until success.")
210 });
211 let timestamp_handling: Option<TimestampHandling> = self
212 .properties
213 .get("debezium.time.precision.mode")
214 .map(|v| v == "connect")
215 .unwrap_or(false)
216 .then_some(TimestampHandling::Milli);
217 let timestamptz_handling: Option<TimestamptzHandling> = self
218 .properties
219 .get("debezium.time.precision.mode")
220 .map(|v| v == "connect")
221 .unwrap_or(false)
222 .then_some(TimestamptzHandling::Milli);
223 let time_handling: Option<TimeHandling> = self
224 .properties
225 .get("debezium.time.precision.mode")
226 .map(|v| v == "connect")
227 .unwrap_or(false)
228 .then_some(TimeHandling::Milli);
229 let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
230 .properties
231 .get("debezium.bigint.unsigned.handling.mode")
232 .map(|v| v == "precise")
233 .unwrap_or(false)
234 .then_some(BigintUnsignedHandlingMode::Precise);
235 let handle_toast_columns: bool =
237 self.external_table.table_type() == &ExternalCdcTableType::Postgres;
238 let mut upstream = transform_upstream(
240 upstream,
241 self.output_columns.clone(),
242 timestamp_handling,
243 timestamptz_handling,
244 time_handling,
245 bigint_unsigned_handling,
246 handle_toast_columns,
247 )
248 .boxed();
249 loop {
250 if let Some(msg) =
251 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
252 .await?
253 {
254 if let Some(msg) = mapping_message(msg, &self.output_indices) {
255 match msg {
256 Message::Barrier(barrier) => {
257 state_impl.commit_state(barrier.epoch).await?;
259 yield Message::Barrier(barrier);
260 }
261 Message::Chunk(chunk) => {
262 if need_backfill {
263 } else {
265 yield Message::Chunk(chunk);
267 }
268 }
269 Message::Watermark(_) => {
270 }
272 }
273 }
274 } else {
275 assert!(table_reader.is_some(), "table reader must created");
276 tracing::info!(
277 %table_id,
278 upstream_table_name,
279 "table reader created successfully"
280 );
281 break;
282 }
283 }
284
285 let upstream_table_reader = UpstreamTableReader::new(
286 self.external_table.clone(),
287 table_reader.expect("table reader must created"),
288 );
289
290 let mut upstream = upstream.peekable();
291
292 let mut last_binlog_offset: Option<CdcOffset> = {
293 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
295 tokio::sync::Semaphore::const_new(10);
296
297 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
298 state
299 .last_cdc_offset
300 .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
301 };
302
303 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
304 let mut consumed_binlog_offset: Option<CdcOffset> = None;
305
306 tracing::info!(
307 %table_id,
308 upstream_table_name,
309 initial_binlog_offset = ?last_binlog_offset,
310 ?current_pk_pos,
311 is_finished = state.is_finished,
312 is_snapshot_paused,
313 snapshot_row_count = total_snapshot_row_count,
314 rate_limit = self.rate_limit_rps,
315 disable_backfill = self.options.disable_backfill,
316 snapshot_barrier_interval = self.options.snapshot_barrier_interval,
317 snapshot_batch_size = self.options.snapshot_batch_size,
318 "start cdc backfill",
319 );
320
321 if need_backfill {
341 let _ = Pin::new(&mut upstream).peek().await;
344
345 #[for_await]
347 for msg in upstream.by_ref() {
348 match msg? {
349 Message::Barrier(barrier) => {
350 match barrier.mutation.as_deref() {
351 Some(crate::executor::Mutation::Pause) => {
352 is_snapshot_paused = true;
353 tracing::info!(
354 %table_id,
355 upstream_table_name,
356 "snapshot is paused by barrier"
357 );
358 }
359 Some(crate::executor::Mutation::Resume) => {
360 is_snapshot_paused = false;
361 tracing::info!(
362 %table_id,
363 upstream_table_name,
364 "snapshot is resumed by barrier"
365 );
366 }
367 _ => {
368 }
370 }
371 state_impl.commit_state(barrier.epoch).await?;
373 yield Message::Barrier(barrier);
374 break;
375 }
376 Message::Chunk(ref chunk) => {
377 last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
378 }
379 Message::Watermark(_) => {
380 }
382 }
383 }
384
385 tracing::info!(%table_id,
386 upstream_table_name,
387 initial_binlog_offset = ?last_binlog_offset,
388 ?current_pk_pos,
389 is_snapshot_paused,
390 "start cdc backfill loop");
391
392 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
394
395 'backfill_loop: loop {
396 let left_upstream = upstream.by_ref().map(Either::Left);
397
398 let mut snapshot_read_row_cnt: usize = 0;
399 let read_args = SnapshotReadArgs::new(
400 current_pk_pos.clone(),
401 self.rate_limit_rps,
402 pk_indices.clone(),
403 additional_columns.clone(),
404 schema_table_name.clone(),
405 external_database_name.clone(),
406 );
407 let right_snapshot = pin!(
408 upstream_table_reader
409 .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
410 .map(Either::Right)
411 );
412 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
413 if is_snapshot_paused {
414 snapshot_valve.pause();
415 }
416
417 let mut backfill_stream =
419 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
420 stream::PollNext::Left
421 });
422
423 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
424 let mut cur_barrier_upstream_processed_rows: u64 = 0;
425 let mut barrier_count: u32 = 0;
426 let mut pending_barrier = None;
427
428 #[for_await]
429 for either in &mut backfill_stream {
430 match either {
431 Either::Left(msg) => {
433 match msg? {
434 Message::Barrier(barrier) => {
435 barrier_count += 1;
437 let can_start_new_snapshot =
438 barrier_count == self.options.snapshot_barrier_interval;
439
440 if let Some(mutation) = barrier.mutation.as_deref() {
441 use crate::executor::Mutation;
442 match mutation {
443 Mutation::Pause => {
444 is_snapshot_paused = true;
445 snapshot_valve.pause();
446 }
447 Mutation::Resume => {
448 is_snapshot_paused = false;
449 snapshot_valve.resume();
450 }
451 Mutation::Throttle(some) => {
452 if let Some(new_rate_limit) =
453 some.get(&self.actor_ctx.id)
454 && *new_rate_limit != self.rate_limit_rps
455 {
456 self.rate_limit_rps = *new_rate_limit;
457 rate_limit_to_zero = self
458 .rate_limit_rps
459 .is_some_and(|val| val == 0);
460 state_impl
462 .mutate_state(
463 current_pk_pos.clone(),
464 last_binlog_offset.clone(),
465 total_snapshot_row_count,
466 false,
467 )
468 .await?;
469 state_impl.commit_state(barrier.epoch).await?;
470 yield Message::Barrier(barrier);
471
472 continue 'backfill_loop;
474 }
475 }
476 Mutation::Update(UpdateMutation {
477 dropped_actors,
478 ..
479 }) => {
480 if dropped_actors.contains(&self.actor_ctx.id) {
481 tracing::info!(
483 %table_id,
484 upstream_table_name,
485 "CdcBackfill has been dropped due to config change"
486 );
487 yield Message::Barrier(barrier);
488 break 'backfill_loop;
489 }
490 }
491 _ => (),
492 }
493 }
494
495 Self::report_metrics(
496 &self.metrics,
497 cur_barrier_snapshot_processed_rows,
498 cur_barrier_upstream_processed_rows,
499 );
500
501 if can_start_new_snapshot {
504 pending_barrier = Some(barrier);
506 tracing::debug!(
507 %table_id,
508 ?current_pk_pos,
509 ?snapshot_read_row_cnt,
510 "Prepare to start a new snapshot"
511 );
512 break;
514 } else {
515 state_impl
517 .mutate_state(
518 current_pk_pos.clone(),
519 last_binlog_offset.clone(),
520 total_snapshot_row_count,
521 false,
522 )
523 .await?;
524
525 state_impl.commit_state(barrier.epoch).await?;
526
527 yield Message::Barrier(barrier);
529 }
530 }
531 Message::Chunk(chunk) => {
532 if chunk.cardinality() == 0 {
534 continue;
535 }
536
537 let chunk_binlog_offset =
538 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
539
540 tracing::trace!(
541 "recv changelog chunk: chunk_offset {:?}, capactiy {}",
542 chunk_binlog_offset,
543 chunk.capacity()
544 );
545
546 if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
550 && let Some(chunk_offset) = chunk_binlog_offset
551 && chunk_offset < *last_binlog_offset
552 {
553 tracing::trace!(
554 "skip changelog chunk: chunk_offset {:?}, capacity {}",
555 chunk_offset,
556 chunk.capacity()
557 );
558 continue;
559 }
560 upstream_chunk_buffer.push(chunk.compact_vis());
562 }
563 Message::Watermark(_) => {
564 }
566 }
567 }
568 Either::Right(msg) => {
570 match msg? {
571 None => {
572 tracing::info!(
573 %table_id,
574 ?last_binlog_offset,
575 ?current_pk_pos,
576 "snapshot read stream ends"
577 );
578 for chunk in upstream_chunk_buffer.drain(..) {
583 yield Message::Chunk(mapping_chunk(
584 chunk,
585 &self.output_indices,
586 ));
587 }
588
589 break 'backfill_loop;
591 }
592 Some(chunk) => {
593 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
597
598 tracing::trace!(
599 "got a snapshot chunk: len {}, current_pk_pos {:?}",
600 chunk.cardinality(),
601 current_pk_pos
602 );
603 let chunk_cardinality = chunk.cardinality() as u64;
604 cur_barrier_snapshot_processed_rows += chunk_cardinality;
605 total_snapshot_row_count += chunk_cardinality;
606 yield Message::Chunk(mapping_chunk(
607 chunk,
608 &self.output_indices,
609 ));
610 }
611 }
612 }
613 }
614 }
615
616 assert!(pending_barrier.is_some(), "pending_barrier must exist");
617 let pending_barrier = pending_barrier.unwrap();
618
619 let (_, mut snapshot_stream) = backfill_stream.into_inner();
624
625 if !is_snapshot_paused
627 && !rate_limit_to_zero
628 && let Some(msg) = snapshot_stream
629 .next()
630 .instrument_await("consume_snapshot_stream_once")
631 .await
632 {
633 let Either::Right(msg) = msg else {
634 bail!("BUG: snapshot_read contains upstream messages");
635 };
636 match msg? {
637 None => {
638 tracing::info!(
639 %table_id,
640 ?last_binlog_offset,
641 ?current_pk_pos,
642 "snapshot read stream ends in the force emit branch"
643 );
644 for chunk in upstream_chunk_buffer.drain(..) {
647 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
648 }
649
650 state_impl
652 .mutate_state(
653 current_pk_pos.clone(),
654 last_binlog_offset.clone(),
655 total_snapshot_row_count,
656 true,
657 )
658 .await?;
659
660 state_impl.commit_state(pending_barrier.epoch).await?;
662 yield Message::Barrier(pending_barrier);
663 break 'backfill_loop;
665 }
666 Some(chunk) => {
667 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
669
670 let row_count = chunk.cardinality() as u64;
671 cur_barrier_snapshot_processed_rows += row_count;
672 total_snapshot_row_count += row_count;
673 snapshot_read_row_cnt += row_count as usize;
674
675 tracing::debug!(
676 %table_id,
677 ?current_pk_pos,
678 ?snapshot_read_row_cnt,
679 "force emit a snapshot chunk"
680 );
681 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
682 }
683 }
684 }
685
686 if let Some(current_pos) = ¤t_pk_pos {
689 for chunk in upstream_chunk_buffer.drain(..) {
690 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
691
692 consumed_binlog_offset =
695 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
696
697 yield Message::Chunk(mapping_chunk(
698 mark_cdc_chunk(
699 &offset_parse_func,
700 chunk,
701 current_pos,
702 &pk_indices,
703 &pk_order,
704 last_binlog_offset.clone(),
705 )?,
706 &self.output_indices,
707 ));
708 }
709 } else {
710 upstream_chunk_buffer.clear();
713 }
714
715 if consumed_binlog_offset.is_some() {
717 last_binlog_offset.clone_from(&consumed_binlog_offset);
718 }
719
720 Self::report_metrics(
721 &self.metrics,
722 cur_barrier_snapshot_processed_rows,
723 cur_barrier_upstream_processed_rows,
724 );
725
726 state_impl
728 .mutate_state(
729 current_pk_pos.clone(),
730 last_binlog_offset.clone(),
731 total_snapshot_row_count,
732 false,
733 )
734 .await?;
735
736 state_impl.commit_state(pending_barrier.epoch).await?;
737 yield Message::Barrier(pending_barrier);
738 }
739 } else if self.options.disable_backfill {
740 tracing::info!(
742 %table_id,
743 upstream_table_name,
744 "CdcBackfill has been disabled"
745 );
746 state_impl
747 .mutate_state(
748 current_pk_pos.clone(),
749 last_binlog_offset.clone(),
750 total_snapshot_row_count,
751 true,
752 )
753 .await?;
754 }
755
756 upstream_table_reader.disconnect().await?;
757
758 tracing::info!(
759 %table_id,
760 upstream_table_name,
761 "CdcBackfill has already finished and will forward messages directly to the downstream"
762 );
763
764 while let Some(Ok(msg)) = upstream.next().await {
767 if let Some(msg) = mapping_message(msg, &self.output_indices) {
768 if let Message::Barrier(barrier) = &msg {
770 state_impl
773 .mutate_state(
774 current_pk_pos.clone(),
775 last_binlog_offset.clone(),
776 total_snapshot_row_count,
777 true,
778 )
779 .await?;
780 state_impl.commit_state(barrier.epoch).await?;
781
782 if let Some(progress) = self.progress.as_mut() {
784 progress.finish(barrier.epoch, total_snapshot_row_count);
785 }
786 yield msg;
787 break;
789 }
790 yield msg;
791 }
792 }
793
794 #[for_await]
798 for msg in upstream {
799 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
802 if let Message::Barrier(barrier) = &msg {
803 state_impl.commit_state(barrier.epoch).await?;
805 }
806 yield msg;
807 }
808 }
809 }
810}
811
812pub(crate) async fn build_reader_and_poll_upstream(
813 upstream: &mut BoxedMessageStream,
814 table_reader: &mut Option<ExternalTableReaderImpl>,
815 future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
816) -> StreamExecutorResult<Option<Message>> {
817 if table_reader.is_some() {
818 return Ok(None);
819 }
820 tokio::select! {
821 biased;
822 reader = &mut *future => {
823 *table_reader = Some(reader);
824 Ok(None)
825 }
826 msg = upstream.next() => {
827 msg.transpose()
828 }
829 }
830}
831
832#[try_stream(ok = Message, error = StreamExecutorError)]
833pub async fn transform_upstream(
834 upstream: BoxedMessageStream,
835 output_columns: Vec<ColumnDesc>,
836 timestamp_handling: Option<TimestampHandling>,
837 timestamptz_handling: Option<TimestamptzHandling>,
838 time_handling: Option<TimeHandling>,
839 bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
840 handle_toast_columns: bool,
841) {
842 let props = SpecificParserConfig {
843 encoding_config: EncodingProperties::Json(JsonProperties {
844 use_schema_registry: false,
845 timestamp_handling,
846 timestamptz_handling,
847 time_handling,
848 bigint_unsigned_handling,
849 handle_toast_columns,
850 }),
851 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
853 };
854
855 let columns_with_meta = output_columns
857 .iter()
858 .map(SourceColumnDesc::from)
859 .collect_vec();
860 let mut parser = DebeziumParser::new(
861 props,
862 columns_with_meta.clone(),
863 Arc::new(SourceContext::dummy()),
864 )
865 .await
866 .map_err(StreamExecutorError::connector_error)?;
867
868 pin_mut!(upstream);
869 #[for_await]
870 for msg in upstream {
871 let mut msg = msg?;
872 if let Message::Chunk(chunk) = &mut msg {
873 let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
874 let _ = std::mem::replace(chunk, parsed_chunk);
875 }
876 yield msg;
877 }
878}
879
880async fn parse_debezium_chunk(
881 parser: &mut DebeziumParser,
882 chunk: &StreamChunk,
883) -> StreamExecutorResult<StreamChunk> {
884 let mut builder = SourceStreamChunkBuilder::new(
891 parser.columns().to_vec(),
892 SourceCtrlOpts {
893 chunk_size: chunk.capacity(),
894 split_txn: false,
895 },
896 );
897
898 let payloads = chunk.data_chunk().project(&[0]);
902 let offsets = chunk.data_chunk().project(&[1]).compact_vis();
903
904 for payload in payloads.rows() {
906 let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
907 else {
908 panic!("payload must be jsonb");
909 };
910
911 parser
912 .parse_inner(
913 None,
914 Some(jsonb_ref.to_string().as_bytes().to_vec()),
915 builder.row_writer(),
916 )
917 .await
918 .unwrap();
919 }
920 builder.finish_current_chunk();
921
922 let parsed_chunk = {
923 let mut iter = builder.consume_ready_chunks();
924 assert_eq!(1, iter.len());
925 iter.next().unwrap()
926 };
927 assert_eq!(parsed_chunk.capacity(), chunk.capacity()); let (ops, mut columns, vis) = parsed_chunk.into_inner();
929 columns.extend(offsets.into_parts().0);
933
934 Ok(StreamChunk::from_parts(
935 ops,
936 DataChunk::from_parts(columns.into(), vis),
937 ))
938}
939
940impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
941 fn execute(self: Box<Self>) -> BoxedMessageStream {
942 self.execute_inner().boxed()
943 }
944}
945
946#[cfg(test)]
947mod tests {
948 use std::collections::BTreeMap;
949 use std::str::FromStr;
950
951 use futures::{StreamExt, pin_mut};
952 use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
953 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
954 use risingwave_common::types::{DataType, Datum, JsonbVal};
955 use risingwave_common::util::epoch::test_epoch;
956 use risingwave_common::util::iter_util::ZipEqFast;
957 use risingwave_connector::source::cdc::CdcScanOptions;
958 use risingwave_storage::memory::MemoryStateStore;
959
960 use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
961 use crate::executor::monitor::StreamingMetrics;
962 use crate::executor::prelude::StateTable;
963 use crate::executor::source::default_source_internal_table;
964 use crate::executor::test_utils::MockSource;
965 use crate::executor::{
966 ActorContext, Barrier, CdcBackfillExecutor, ExternalStorageTable, Message,
967 };
968
969 #[tokio::test]
970 async fn test_transform_upstream_chunk() {
971 let schema = Schema::new(vec![
972 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
976 let stream_key = vec![1];
977 let (mut tx, source) = MockSource::channel();
978 let source = source.into_executor(schema.clone(), stream_key.clone());
979 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
981
982 let datums: Vec<Datum> = vec![
983 Some(JsonbVal::from_str(payload).unwrap().into()),
984 Some("file: 1.binlog, pos: 100".to_owned().into()),
985 Some("mydb.orders".to_owned().into()),
986 ];
987
988 println!("datums: {:?}", datums[1]);
989
990 let mut builders = schema.create_array_builders(8);
991 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
992 builder.append(datum.clone());
993 }
994 let columns = builders
995 .into_iter()
996 .map(|builder| builder.finish().into())
997 .collect();
998
999 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1001
1002 tx.push_chunk(chunk);
1003 let upstream = Box::new(source).execute();
1004
1005 let columns = vec![
1007 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1008 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1009 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1010 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1011 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
1012 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1013 ];
1014
1015 let parsed_stream = transform_upstream(upstream, columns, None, None, None, None, false);
1016 pin_mut!(parsed_stream);
1017 if let Some(message) = parsed_stream.next().await {
1019 println!("chunk: {:#?}", message.unwrap());
1020 }
1021 }
1022
1023 #[tokio::test]
1024 async fn test_build_reader_and_poll_upstream() {
1025 let actor_context = ActorContext::for_test(1);
1026 let external_storage_table = ExternalStorageTable::for_test_undefined();
1027 let schema = Schema::new(vec![
1028 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
1032 let stream_key = vec![1];
1033 let (mut tx, source) = MockSource::channel();
1034 let source = source.into_executor(schema.clone(), stream_key.clone());
1035 let output_indices = vec![1, 0, 4]; let output_columns = vec![
1037 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1038 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1039 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1040 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1041 ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
1042 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1043 ];
1044 let store = MemoryStateStore::new();
1045 let state_table =
1046 StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
1047 .await;
1048 let cdc = CdcBackfillExecutor::new(
1049 actor_context,
1050 external_storage_table,
1051 source,
1052 output_indices,
1053 output_columns,
1054 None,
1055 StreamingMetrics::unused().into(),
1056 state_table,
1057 None,
1058 CdcScanOptions {
1059 disable_backfill: true,
1062 ..CdcScanOptions::default()
1063 },
1064 BTreeMap::default(),
1065 );
1066 let s = cdc.execute_inner();
1070 pin_mut!(s);
1071
1072 tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1074 {
1076 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1077 let datums: Vec<Datum> = vec![
1078 Some(JsonbVal::from_str(payload).unwrap().into()),
1079 Some("file: 1.binlog, pos: 100".to_owned().into()),
1080 Some("mydb.orders".to_owned().into()),
1081 ];
1082 let mut builders = schema.create_array_builders(8);
1083 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1084 builder.append(datum.clone());
1085 }
1086 let columns = builders
1087 .into_iter()
1088 .map(|builder| builder.finish().into())
1089 .collect();
1090 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1092
1093 tx.push_chunk(chunk);
1094 }
1095 let _first_barrier = s.next().await.unwrap();
1096 let upstream_change_log = s.next().await.unwrap().unwrap();
1097 let Message::Chunk(chunk) = upstream_change_log else {
1098 panic!("expect chunk");
1099 };
1100 assert_eq!(chunk.columns().len(), 3);
1101 assert_eq!(chunk.rows().count(), 1);
1102 assert_eq!(
1103 chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1104 vec![Some(44485)]
1105 );
1106 assert_eq!(
1107 chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1108 vec![Some(5)]
1109 );
1110 assert_eq!(
1111 chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1112 vec![Some(100)]
1113 );
1114 }
1115}