1use std::collections::BTreeMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use either::Either;
20use futures::stream;
21use futures::stream::select_with_strategy;
22use itertools::Itertools;
23use risingwave_common::array::DataChunk;
24use risingwave_common::bail;
25use risingwave_common::catalog::ColumnDesc;
26use risingwave_connector::parser::{
27 BigintUnsignedHandlingMode, ByteStreamSourceParser, DebeziumParser, DebeziumProps,
28 EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder,
29 SpecificParserConfig, TimeHandling, TimestampHandling, TimestamptzHandling,
30};
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_connector::source::cdc::external::{
33 CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
34};
35use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
36use risingwave_pb::common::ThrottleType;
37use rw_futures_util::pausable;
38use thiserror_ext::AsReport;
39use tracing::Instrument;
40
41use crate::executor::UpdateMutation;
42use crate::executor::backfill::cdc::state::CdcBackfillState;
43use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
44use crate::executor::backfill::cdc::upstream_table::snapshot::{
45 SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
46};
47use crate::executor::backfill::utils::{
48 get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
49};
50use crate::executor::monitor::CdcBackfillMetrics;
51use crate::executor::prelude::*;
52use crate::executor::source::get_infinite_backoff_strategy;
53use crate::task::CreateMviewProgressReporter;
54
55const METADATA_STATE_LEN: usize = 4;
57
58pub struct CdcBackfillExecutor<S: StateStore> {
59 actor_ctx: ActorContextRef,
60
61 external_table: ExternalStorageTable,
63
64 upstream: Executor,
66
67 output_indices: Vec<usize>,
69
70 output_columns: Vec<ColumnDesc>,
72
73 state_impl: CdcBackfillState<S>,
75
76 progress: Option<CreateMviewProgressReporter>,
79
80 metrics: CdcBackfillMetrics,
81
82 rate_limit_rps: Option<u32>,
84
85 options: CdcScanOptions,
86
87 properties: BTreeMap<String, String>,
88}
89
90impl<S: StateStore> CdcBackfillExecutor<S> {
91 #[allow(clippy::too_many_arguments)]
92 pub fn new(
93 actor_ctx: ActorContextRef,
94 external_table: ExternalStorageTable,
95 upstream: Executor,
96 output_indices: Vec<usize>,
97 output_columns: Vec<ColumnDesc>,
98 progress: Option<CreateMviewProgressReporter>,
99 metrics: Arc<StreamingMetrics>,
100 state_table: StateTable<S>,
101 rate_limit_rps: Option<u32>,
102 options: CdcScanOptions,
103 properties: BTreeMap<String, String>,
104 ) -> Self {
105 let pk_indices = external_table.pk_indices();
106 let upstream_table_id = external_table.table_id();
107 let state_impl = CdcBackfillState::new(
108 upstream_table_id,
109 state_table,
110 pk_indices.len() + METADATA_STATE_LEN,
111 );
112
113 let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
114 Self {
115 actor_ctx,
116 external_table,
117 upstream,
118 output_indices,
119 output_columns,
120 state_impl,
121 progress,
122 metrics,
123 rate_limit_rps,
124 options,
125 properties,
126 }
127 }
128
129 fn report_metrics(
130 metrics: &CdcBackfillMetrics,
131 snapshot_processed_row_count: u64,
132 upstream_processed_row_count: u64,
133 ) {
134 metrics
135 .cdc_backfill_snapshot_read_row_count
136 .inc_by(snapshot_processed_row_count);
137
138 metrics
139 .cdc_backfill_upstream_output_row_count
140 .inc_by(upstream_processed_row_count);
141 }
142
143 #[try_stream(ok = Message, error = StreamExecutorError)]
144 async fn execute_inner(mut self) {
145 let pk_indices = self.external_table.pk_indices().to_vec();
147 let pk_order = self.external_table.pk_order_types().to_vec();
148
149 let table_id = self.external_table.table_id();
150 let upstream_table_name = self.external_table.qualified_table_name();
151 let schema_table_name = self.external_table.schema_table_name().clone();
152 let external_database_name = self.external_table.database_name().to_owned();
153
154 let additional_columns = self
155 .output_columns
156 .iter()
157 .filter(|col| col.additional_column.column_type.is_some())
158 .cloned()
159 .collect_vec();
160
161 let mut upstream = self.upstream.execute();
162
163 let mut current_pk_pos: Option<OwnedRow>;
166
167 let first_barrier = expect_first_barrier(&mut upstream).await?;
169
170 let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
171 let first_barrier_epoch = first_barrier.epoch;
172 yield Message::Barrier(first_barrier);
174 let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
175
176 let mut state_impl = self.state_impl;
179
180 state_impl.init_epoch(first_barrier_epoch).await?;
181
182 let state = state_impl.restore_state().await?;
184 current_pk_pos = state.current_pk_pos.clone();
185
186 let need_backfill = !self.options.disable_backfill && !state.is_finished;
187
188 let mut total_snapshot_row_count = state.row_count as u64;
190
191 let mut table_reader: Option<ExternalTableReaderImpl> = None;
196 let external_table = self.external_table.clone();
197 let mut future = Box::pin(async move {
198 let backoff = get_infinite_backoff_strategy();
199 tokio_retry::Retry::spawn(backoff, || async {
200 match external_table.create_table_reader().await {
201 Ok(reader) => Ok(reader),
202 Err(e) => {
203 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
204 Err(e)
205 }
206 }
207 })
208 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
209 .await
210 .expect("Retry create cdc table reader until success.")
211 });
212 let timestamp_handling: Option<TimestampHandling> = self
213 .properties
214 .get("debezium.time.precision.mode")
215 .map(|v| v == "connect")
216 .unwrap_or(false)
217 .then_some(TimestampHandling::Milli);
218 let timestamptz_handling: Option<TimestamptzHandling> = self
219 .properties
220 .get("debezium.time.precision.mode")
221 .map(|v| v == "connect")
222 .unwrap_or(false)
223 .then_some(TimestamptzHandling::Milli);
224 let time_handling: Option<TimeHandling> = self
225 .properties
226 .get("debezium.time.precision.mode")
227 .map(|v| v == "connect")
228 .unwrap_or(false)
229 .then_some(TimeHandling::Milli);
230 let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
231 .properties
232 .get("debezium.bigint.unsigned.handling.mode")
233 .map(|v| v == "precise")
234 .unwrap_or(false)
235 .then_some(BigintUnsignedHandlingMode::Precise);
236 let handle_toast_columns: bool =
238 self.external_table.table_type() == &ExternalCdcTableType::Postgres;
239 let mut upstream = transform_upstream(
241 upstream,
242 self.output_columns.clone(),
243 timestamp_handling,
244 timestamptz_handling,
245 time_handling,
246 bigint_unsigned_handling,
247 handle_toast_columns,
248 )
249 .boxed();
250 loop {
251 if let Some(msg) =
252 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
253 .await?
254 {
255 if let Some(msg) = mapping_message(msg, &self.output_indices) {
256 match msg {
257 Message::Barrier(barrier) => {
258 state_impl.commit_state(barrier.epoch).await?;
260 yield Message::Barrier(barrier);
261 }
262 Message::Chunk(chunk) => {
263 if need_backfill {
264 } else {
266 yield Message::Chunk(chunk);
268 }
269 }
270 Message::Watermark(_) => {
271 }
273 }
274 }
275 } else {
276 assert!(table_reader.is_some(), "table reader must created");
277 tracing::info!(
278 %table_id,
279 upstream_table_name,
280 "table reader created successfully"
281 );
282 break;
283 }
284 }
285
286 let upstream_table_reader = UpstreamTableReader::new(
287 self.external_table.clone(),
288 table_reader.expect("table reader must created"),
289 );
290
291 let mut upstream = upstream.peekable();
292
293 let mut last_binlog_offset: Option<CdcOffset> = {
294 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
296 tokio::sync::Semaphore::const_new(10);
297
298 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
299 state
300 .last_cdc_offset
301 .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
302 };
303
304 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
305 let mut consumed_binlog_offset: Option<CdcOffset> = None;
306
307 tracing::info!(
308 %table_id,
309 upstream_table_name,
310 initial_binlog_offset = ?last_binlog_offset,
311 ?current_pk_pos,
312 is_finished = state.is_finished,
313 is_snapshot_paused,
314 snapshot_row_count = total_snapshot_row_count,
315 rate_limit = self.rate_limit_rps,
316 disable_backfill = self.options.disable_backfill,
317 snapshot_barrier_interval = self.options.snapshot_barrier_interval,
318 snapshot_batch_size = self.options.snapshot_batch_size,
319 "start cdc backfill",
320 );
321
322 if need_backfill {
342 let _ = Pin::new(&mut upstream).peek().await;
345
346 #[for_await]
348 for msg in upstream.by_ref() {
349 match msg? {
350 Message::Barrier(barrier) => {
351 match barrier.mutation.as_deref() {
352 Some(crate::executor::Mutation::Pause) => {
353 is_snapshot_paused = true;
354 tracing::info!(
355 %table_id,
356 upstream_table_name,
357 "snapshot is paused by barrier"
358 );
359 }
360 Some(crate::executor::Mutation::Resume) => {
361 is_snapshot_paused = false;
362 tracing::info!(
363 %table_id,
364 upstream_table_name,
365 "snapshot is resumed by barrier"
366 );
367 }
368 _ => {
369 }
371 }
372 state_impl.commit_state(barrier.epoch).await?;
374 yield Message::Barrier(barrier);
375 break;
376 }
377 Message::Chunk(ref chunk) => {
378 last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
379 }
380 Message::Watermark(_) => {
381 }
383 }
384 }
385
386 tracing::info!(%table_id,
387 upstream_table_name,
388 initial_binlog_offset = ?last_binlog_offset,
389 ?current_pk_pos,
390 is_snapshot_paused,
391 "start cdc backfill loop");
392
393 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
395
396 'backfill_loop: loop {
397 let left_upstream = upstream.by_ref().map(Either::Left);
398
399 let mut snapshot_read_row_cnt: usize = 0;
400 let read_args = SnapshotReadArgs::new(
401 current_pk_pos.clone(),
402 self.rate_limit_rps,
403 pk_indices.clone(),
404 additional_columns.clone(),
405 schema_table_name.clone(),
406 external_database_name.clone(),
407 );
408 let right_snapshot = pin!(
409 upstream_table_reader
410 .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
411 .map(Either::Right)
412 );
413 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
414 if is_snapshot_paused {
415 snapshot_valve.pause();
416 }
417
418 let mut backfill_stream =
420 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
421 stream::PollNext::Left
422 });
423
424 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
425 let mut cur_barrier_upstream_processed_rows: u64 = 0;
426 let mut barrier_count: u32 = 0;
427 let mut pending_barrier = None;
428
429 #[for_await]
430 for either in &mut backfill_stream {
431 match either {
432 Either::Left(msg) => {
434 match msg? {
435 Message::Barrier(barrier) => {
436 barrier_count += 1;
438 let can_start_new_snapshot =
439 barrier_count == self.options.snapshot_barrier_interval;
440
441 if let Some(mutation) = barrier.mutation.as_deref() {
442 use crate::executor::Mutation;
443 match mutation {
444 Mutation::Pause => {
445 is_snapshot_paused = true;
446 snapshot_valve.pause();
447 }
448 Mutation::Resume => {
449 is_snapshot_paused = false;
450 snapshot_valve.resume();
451 }
452 Mutation::Throttle(some) => {
453 if let Some(entry) =
454 some.get(&self.actor_ctx.fragment_id)
455 && entry.throttle_type()
456 == ThrottleType::Backfill
457 && entry.rate_limit != self.rate_limit_rps
458 {
459 self.rate_limit_rps = entry.rate_limit;
460 rate_limit_to_zero = self
461 .rate_limit_rps
462 .is_some_and(|val| val == 0);
463 state_impl
465 .mutate_state(
466 current_pk_pos.clone(),
467 last_binlog_offset.clone(),
468 total_snapshot_row_count,
469 false,
470 )
471 .await?;
472 state_impl.commit_state(barrier.epoch).await?;
473 yield Message::Barrier(barrier);
474
475 continue 'backfill_loop;
477 }
478 }
479 Mutation::Update(UpdateMutation {
480 dropped_actors,
481 ..
482 }) => {
483 if dropped_actors.contains(&self.actor_ctx.id) {
484 tracing::info!(
486 %table_id,
487 upstream_table_name,
488 "CdcBackfill has been dropped due to config change"
489 );
490 yield Message::Barrier(barrier);
491 break 'backfill_loop;
492 }
493 }
494 _ => (),
495 }
496 }
497
498 Self::report_metrics(
499 &self.metrics,
500 cur_barrier_snapshot_processed_rows,
501 cur_barrier_upstream_processed_rows,
502 );
503
504 if can_start_new_snapshot {
507 pending_barrier = Some(barrier);
509 tracing::debug!(
510 %table_id,
511 ?current_pk_pos,
512 ?snapshot_read_row_cnt,
513 "Prepare to start a new snapshot"
514 );
515 break;
517 } else {
518 state_impl
520 .mutate_state(
521 current_pk_pos.clone(),
522 last_binlog_offset.clone(),
523 total_snapshot_row_count,
524 false,
525 )
526 .await?;
527
528 state_impl.commit_state(barrier.epoch).await?;
529
530 yield Message::Barrier(barrier);
532 }
533 }
534 Message::Chunk(chunk) => {
535 if chunk.cardinality() == 0 {
537 continue;
538 }
539
540 let chunk_binlog_offset =
541 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
542
543 tracing::trace!(
544 "recv changelog chunk: chunk_offset {:?}, capactiy {}",
545 chunk_binlog_offset,
546 chunk.capacity()
547 );
548
549 if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
553 && let Some(chunk_offset) = chunk_binlog_offset
554 && chunk_offset < *last_binlog_offset
555 {
556 tracing::trace!(
557 "skip changelog chunk: chunk_offset {:?}, capacity {}",
558 chunk_offset,
559 chunk.capacity()
560 );
561 continue;
562 }
563 upstream_chunk_buffer.push(chunk.compact_vis());
565 }
566 Message::Watermark(_) => {
567 }
569 }
570 }
571 Either::Right(msg) => {
573 match msg? {
574 None => {
575 tracing::info!(
576 %table_id,
577 ?last_binlog_offset,
578 ?current_pk_pos,
579 "snapshot read stream ends"
580 );
581 for chunk in upstream_chunk_buffer.drain(..) {
586 yield Message::Chunk(mapping_chunk(
587 chunk,
588 &self.output_indices,
589 ));
590 }
591
592 break 'backfill_loop;
594 }
595 Some(chunk) => {
596 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
600
601 tracing::trace!(
602 "got a snapshot chunk: len {}, current_pk_pos {:?}",
603 chunk.cardinality(),
604 current_pk_pos
605 );
606 let chunk_cardinality = chunk.cardinality() as u64;
607 cur_barrier_snapshot_processed_rows += chunk_cardinality;
608 total_snapshot_row_count += chunk_cardinality;
609 yield Message::Chunk(mapping_chunk(
610 chunk,
611 &self.output_indices,
612 ));
613 }
614 }
615 }
616 }
617 }
618
619 assert!(pending_barrier.is_some(), "pending_barrier must exist");
620 let pending_barrier = pending_barrier.unwrap();
621
622 let (_, mut snapshot_stream) = backfill_stream.into_inner();
627
628 if !is_snapshot_paused
630 && !rate_limit_to_zero
631 && let Some(msg) = snapshot_stream
632 .next()
633 .instrument_await("consume_snapshot_stream_once")
634 .await
635 {
636 let Either::Right(msg) = msg else {
637 bail!("BUG: snapshot_read contains upstream messages");
638 };
639 match msg? {
640 None => {
641 tracing::info!(
642 %table_id,
643 ?last_binlog_offset,
644 ?current_pk_pos,
645 "snapshot read stream ends in the force emit branch"
646 );
647 for chunk in upstream_chunk_buffer.drain(..) {
650 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
651 }
652
653 state_impl
655 .mutate_state(
656 current_pk_pos.clone(),
657 last_binlog_offset.clone(),
658 total_snapshot_row_count,
659 true,
660 )
661 .await?;
662
663 state_impl.commit_state(pending_barrier.epoch).await?;
665 yield Message::Barrier(pending_barrier);
666 break 'backfill_loop;
668 }
669 Some(chunk) => {
670 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
672
673 let row_count = chunk.cardinality() as u64;
674 cur_barrier_snapshot_processed_rows += row_count;
675 total_snapshot_row_count += row_count;
676 snapshot_read_row_cnt += row_count as usize;
677
678 tracing::debug!(
679 %table_id,
680 ?current_pk_pos,
681 ?snapshot_read_row_cnt,
682 "force emit a snapshot chunk"
683 );
684 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
685 }
686 }
687 }
688
689 if let Some(current_pos) = ¤t_pk_pos {
692 for chunk in upstream_chunk_buffer.drain(..) {
693 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
694
695 consumed_binlog_offset =
698 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
699
700 yield Message::Chunk(mapping_chunk(
701 mark_cdc_chunk(
702 &offset_parse_func,
703 chunk,
704 current_pos,
705 &pk_indices,
706 &pk_order,
707 last_binlog_offset.clone(),
708 )?,
709 &self.output_indices,
710 ));
711 }
712 } else {
713 upstream_chunk_buffer.clear();
716 }
717
718 if consumed_binlog_offset.is_some() {
720 last_binlog_offset.clone_from(&consumed_binlog_offset);
721 }
722
723 Self::report_metrics(
724 &self.metrics,
725 cur_barrier_snapshot_processed_rows,
726 cur_barrier_upstream_processed_rows,
727 );
728
729 state_impl
731 .mutate_state(
732 current_pk_pos.clone(),
733 last_binlog_offset.clone(),
734 total_snapshot_row_count,
735 false,
736 )
737 .await?;
738
739 state_impl.commit_state(pending_barrier.epoch).await?;
740 yield Message::Barrier(pending_barrier);
741 }
742 } else if self.options.disable_backfill {
743 tracing::info!(
745 %table_id,
746 upstream_table_name,
747 "CdcBackfill has been disabled"
748 );
749 state_impl
750 .mutate_state(
751 current_pk_pos.clone(),
752 last_binlog_offset.clone(),
753 total_snapshot_row_count,
754 true,
755 )
756 .await?;
757 }
758
759 upstream_table_reader.disconnect().await?;
760
761 tracing::info!(
762 %table_id,
763 upstream_table_name,
764 "CdcBackfill has already finished and will forward messages directly to the downstream"
765 );
766
767 while let Some(Ok(msg)) = upstream.next().await {
770 if let Some(msg) = mapping_message(msg, &self.output_indices) {
771 if let Message::Barrier(barrier) = &msg {
773 state_impl
776 .mutate_state(
777 current_pk_pos.clone(),
778 last_binlog_offset.clone(),
779 total_snapshot_row_count,
780 true,
781 )
782 .await?;
783 state_impl.commit_state(barrier.epoch).await?;
784
785 if let Some(progress) = self.progress.as_mut() {
787 progress.finish(barrier.epoch, total_snapshot_row_count);
788 }
789 yield msg;
790 break;
792 }
793 yield msg;
794 }
795 }
796
797 #[for_await]
801 for msg in upstream {
802 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
805 if let Message::Barrier(barrier) = &msg {
806 state_impl.commit_state(barrier.epoch).await?;
808 }
809 yield msg;
810 }
811 }
812 }
813}
814
815pub(crate) async fn build_reader_and_poll_upstream(
816 upstream: &mut BoxedMessageStream,
817 table_reader: &mut Option<ExternalTableReaderImpl>,
818 future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
819) -> StreamExecutorResult<Option<Message>> {
820 if table_reader.is_some() {
821 return Ok(None);
822 }
823 tokio::select! {
824 biased;
825 reader = &mut *future => {
826 *table_reader = Some(reader);
827 Ok(None)
828 }
829 msg = upstream.next() => {
830 msg.transpose()
831 }
832 }
833}
834
835#[try_stream(ok = Message, error = StreamExecutorError)]
836pub async fn transform_upstream(
837 upstream: BoxedMessageStream,
838 output_columns: Vec<ColumnDesc>,
839 timestamp_handling: Option<TimestampHandling>,
840 timestamptz_handling: Option<TimestamptzHandling>,
841 time_handling: Option<TimeHandling>,
842 bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
843 handle_toast_columns: bool,
844) {
845 let props = SpecificParserConfig {
846 encoding_config: EncodingProperties::Json(JsonProperties {
847 use_schema_registry: false,
848 timestamp_handling,
849 timestamptz_handling,
850 time_handling,
851 bigint_unsigned_handling,
852 handle_toast_columns,
853 }),
854 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
856 };
857
858 let columns_with_meta = output_columns
860 .iter()
861 .map(SourceColumnDesc::from)
862 .collect_vec();
863 let mut parser = DebeziumParser::new(
864 props,
865 columns_with_meta.clone(),
866 Arc::new(SourceContext::dummy()),
867 )
868 .await
869 .map_err(StreamExecutorError::connector_error)?;
870
871 pin_mut!(upstream);
872 #[for_await]
873 for msg in upstream {
874 let mut msg = msg?;
875 if let Message::Chunk(chunk) = &mut msg {
876 let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
877 let _ = std::mem::replace(chunk, parsed_chunk);
878 }
879 yield msg;
880 }
881}
882
883async fn parse_debezium_chunk(
884 parser: &mut DebeziumParser,
885 chunk: &StreamChunk,
886) -> StreamExecutorResult<StreamChunk> {
887 let mut builder = SourceStreamChunkBuilder::new(
894 parser.columns().to_vec(),
895 SourceCtrlOpts {
896 chunk_size: chunk.capacity(),
897 split_txn: false,
898 },
899 );
900
901 let payloads = chunk.data_chunk().project(&[0]);
905 let offsets = chunk.data_chunk().project(&[1]).compact_vis();
906
907 for payload in payloads.rows() {
909 let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
910 else {
911 panic!("payload must be jsonb");
912 };
913
914 parser
915 .parse_inner(
916 None,
917 Some(jsonb_ref.to_string().as_bytes().to_vec()),
918 builder.row_writer(),
919 )
920 .await
921 .unwrap();
922 }
923 builder.finish_current_chunk();
924
925 let parsed_chunk = {
926 let mut iter = builder.consume_ready_chunks();
927 assert_eq!(1, iter.len());
928 iter.next().unwrap()
929 };
930 assert_eq!(parsed_chunk.capacity(), chunk.capacity()); let (ops, mut columns, vis) = parsed_chunk.into_inner();
932 columns.extend(offsets.into_parts().0);
936
937 Ok(StreamChunk::from_parts(
938 ops,
939 DataChunk::from_parts(columns.into(), vis),
940 ))
941}
942
943impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
944 fn execute(self: Box<Self>) -> BoxedMessageStream {
945 self.execute_inner().boxed()
946 }
947}
948
949#[cfg(test)]
950mod tests {
951 use std::collections::BTreeMap;
952 use std::str::FromStr;
953
954 use futures::{StreamExt, pin_mut};
955 use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
956 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
957 use risingwave_common::types::{DataType, Datum, JsonbVal};
958 use risingwave_common::util::epoch::test_epoch;
959 use risingwave_common::util::iter_util::ZipEqFast;
960 use risingwave_connector::source::cdc::CdcScanOptions;
961 use risingwave_storage::memory::MemoryStateStore;
962
963 use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
964 use crate::executor::monitor::StreamingMetrics;
965 use crate::executor::prelude::StateTable;
966 use crate::executor::source::default_source_internal_table;
967 use crate::executor::test_utils::MockSource;
968 use crate::executor::{
969 ActorContext, Barrier, CdcBackfillExecutor, ExternalStorageTable, Message,
970 };
971
972 #[tokio::test]
973 async fn test_transform_upstream_chunk() {
974 let schema = Schema::new(vec![
975 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
979 let stream_key = vec![1];
980 let (mut tx, source) = MockSource::channel();
981 let source = source.into_executor(schema.clone(), stream_key.clone());
982 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
984
985 let datums: Vec<Datum> = vec![
986 Some(JsonbVal::from_str(payload).unwrap().into()),
987 Some("file: 1.binlog, pos: 100".to_owned().into()),
988 Some("mydb.orders".to_owned().into()),
989 ];
990
991 println!("datums: {:?}", datums[1]);
992
993 let mut builders = schema.create_array_builders(8);
994 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
995 builder.append(datum.clone());
996 }
997 let columns = builders
998 .into_iter()
999 .map(|builder| builder.finish().into())
1000 .collect();
1001
1002 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1004
1005 tx.push_chunk(chunk);
1006 let upstream = Box::new(source).execute();
1007
1008 let columns = vec![
1010 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1011 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1012 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1013 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1014 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
1015 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1016 ];
1017
1018 let parsed_stream = transform_upstream(upstream, columns, None, None, None, None, false);
1019 pin_mut!(parsed_stream);
1020 if let Some(message) = parsed_stream.next().await {
1022 println!("chunk: {:#?}", message.unwrap());
1023 }
1024 }
1025
1026 #[tokio::test]
1027 async fn test_build_reader_and_poll_upstream() {
1028 let actor_context = ActorContext::for_test(1);
1029 let external_storage_table = ExternalStorageTable::for_test_undefined();
1030 let schema = Schema::new(vec![
1031 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
1035 let stream_key = vec![1];
1036 let (mut tx, source) = MockSource::channel();
1037 let source = source.into_executor(schema.clone(), stream_key.clone());
1038 let output_indices = vec![1, 0, 4]; let output_columns = vec![
1040 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1041 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1042 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1043 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1044 ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
1045 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1046 ];
1047 let store = MemoryStateStore::new();
1048 let state_table =
1049 StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
1050 .await;
1051 let cdc = CdcBackfillExecutor::new(
1052 actor_context,
1053 external_storage_table,
1054 source,
1055 output_indices,
1056 output_columns,
1057 None,
1058 StreamingMetrics::unused().into(),
1059 state_table,
1060 None,
1061 CdcScanOptions {
1062 disable_backfill: true,
1065 ..CdcScanOptions::default()
1066 },
1067 BTreeMap::default(),
1068 );
1069 let s = cdc.execute_inner();
1073 pin_mut!(s);
1074
1075 tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1077 {
1079 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1080 let datums: Vec<Datum> = vec![
1081 Some(JsonbVal::from_str(payload).unwrap().into()),
1082 Some("file: 1.binlog, pos: 100".to_owned().into()),
1083 Some("mydb.orders".to_owned().into()),
1084 ];
1085 let mut builders = schema.create_array_builders(8);
1086 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1087 builder.append(datum.clone());
1088 }
1089 let columns = builders
1090 .into_iter()
1091 .map(|builder| builder.finish().into())
1092 .collect();
1093 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1095
1096 tx.push_chunk(chunk);
1097 }
1098 let _first_barrier = s.next().await.unwrap();
1099 let upstream_change_log = s.next().await.unwrap().unwrap();
1100 let Message::Chunk(chunk) = upstream_change_log else {
1101 panic!("expect chunk");
1102 };
1103 assert_eq!(chunk.columns().len(), 3);
1104 assert_eq!(chunk.rows().count(), 1);
1105 assert_eq!(
1106 chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1107 vec![Some(44485)]
1108 );
1109 assert_eq!(
1110 chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1111 vec![Some(5)]
1112 );
1113 assert_eq!(
1114 chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1115 vec![Some(100)]
1116 );
1117 }
1118}