1use std::collections::BTreeMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use either::Either;
20use futures::stream;
21use futures::stream::select_with_strategy;
22use itertools::Itertools;
23use risingwave_common::array::DataChunk;
24use risingwave_common::bail;
25use risingwave_common::catalog::ColumnDesc;
26use risingwave_connector::parser::{
27 ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
28 ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, TimeHandling,
29 TimestampHandling, TimestamptzHandling,
30};
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
33use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
34use rw_futures_util::pausable;
35use thiserror_ext::AsReport;
36use tracing::Instrument;
37
38use crate::executor::UpdateMutation;
39use crate::executor::backfill::cdc::state::CdcBackfillState;
40use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
41use crate::executor::backfill::cdc::upstream_table::snapshot::{
42 SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
43};
44use crate::executor::backfill::utils::{
45 get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
46};
47use crate::executor::monitor::CdcBackfillMetrics;
48use crate::executor::prelude::*;
49use crate::executor::source::get_infinite_backoff_strategy;
50use crate::task::CreateMviewProgressReporter;
51
52const METADATA_STATE_LEN: usize = 4;
54
55pub struct CdcBackfillExecutor<S: StateStore> {
56 actor_ctx: ActorContextRef,
57
58 external_table: ExternalStorageTable,
60
61 upstream: Executor,
63
64 output_indices: Vec<usize>,
66
67 output_columns: Vec<ColumnDesc>,
69
70 state_impl: CdcBackfillState<S>,
72
73 progress: Option<CreateMviewProgressReporter>,
76
77 metrics: CdcBackfillMetrics,
78
79 rate_limit_rps: Option<u32>,
81
82 options: CdcScanOptions,
83
84 properties: BTreeMap<String, String>,
85}
86
87impl<S: StateStore> CdcBackfillExecutor<S> {
88 #[allow(clippy::too_many_arguments)]
89 pub fn new(
90 actor_ctx: ActorContextRef,
91 external_table: ExternalStorageTable,
92 upstream: Executor,
93 output_indices: Vec<usize>,
94 output_columns: Vec<ColumnDesc>,
95 progress: Option<CreateMviewProgressReporter>,
96 metrics: Arc<StreamingMetrics>,
97 state_table: StateTable<S>,
98 rate_limit_rps: Option<u32>,
99 options: CdcScanOptions,
100 properties: BTreeMap<String, String>,
101 ) -> Self {
102 let pk_indices = external_table.pk_indices();
103 let upstream_table_id = external_table.table_id().table_id;
104 let state_impl = CdcBackfillState::new(
105 upstream_table_id,
106 state_table,
107 pk_indices.len() + METADATA_STATE_LEN,
108 );
109
110 let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
111
112 Self {
113 actor_ctx,
114 external_table,
115 upstream,
116 output_indices,
117 output_columns,
118 state_impl,
119 progress,
120 metrics,
121 rate_limit_rps,
122 options,
123 properties,
124 }
125 }
126
127 fn report_metrics(
128 metrics: &CdcBackfillMetrics,
129 snapshot_processed_row_count: u64,
130 upstream_processed_row_count: u64,
131 ) {
132 metrics
133 .cdc_backfill_snapshot_read_row_count
134 .inc_by(snapshot_processed_row_count);
135
136 metrics
137 .cdc_backfill_upstream_output_row_count
138 .inc_by(upstream_processed_row_count);
139 }
140
141 #[try_stream(ok = Message, error = StreamExecutorError)]
142 async fn execute_inner(mut self) {
143 let pk_indices = self.external_table.pk_indices().to_vec();
145 let pk_order = self.external_table.pk_order_types().to_vec();
146
147 let table_id = self.external_table.table_id().table_id;
148 let upstream_table_name = self.external_table.qualified_table_name();
149 let schema_table_name = self.external_table.schema_table_name().clone();
150 let external_database_name = self.external_table.database_name().to_owned();
151
152 let additional_columns = self
153 .output_columns
154 .iter()
155 .filter(|col| col.additional_column.column_type.is_some())
156 .cloned()
157 .collect_vec();
158
159 let mut upstream = self.upstream.execute();
160
161 let mut current_pk_pos: Option<OwnedRow>;
164
165 let first_barrier = expect_first_barrier(&mut upstream).await?;
167
168 let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
169 let first_barrier_epoch = first_barrier.epoch;
170 yield Message::Barrier(first_barrier);
172 let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
173
174 let mut state_impl = self.state_impl;
177
178 state_impl.init_epoch(first_barrier_epoch).await?;
179
180 let state = state_impl.restore_state().await?;
182 current_pk_pos = state.current_pk_pos.clone();
183
184 let need_backfill = !self.options.disable_backfill && !state.is_finished;
185
186 let mut total_snapshot_row_count = state.row_count as u64;
188
189 let mut table_reader: Option<ExternalTableReaderImpl> = None;
194 let external_table = self.external_table.clone();
195 let mut future = Box::pin(async move {
196 let backoff = get_infinite_backoff_strategy();
197 tokio_retry::Retry::spawn(backoff, || async {
198 match external_table.create_table_reader().await {
199 Ok(reader) => Ok(reader),
200 Err(e) => {
201 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
202 Err(e)
203 }
204 }
205 })
206 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
207 .await
208 .expect("Retry create cdc table reader until success.")
209 });
210 let timestamp_handling: Option<TimestampHandling> = self
211 .properties
212 .get("debezium.time.precision.mode")
213 .map(|v| v == "connect")
214 .unwrap_or(false)
215 .then_some(TimestampHandling::Milli);
216 let timestamptz_handling: Option<TimestamptzHandling> = self
217 .properties
218 .get("debezium.time.precision.mode")
219 .map(|v| v == "connect")
220 .unwrap_or(false)
221 .then_some(TimestamptzHandling::Milli);
222 let time_handling: Option<TimeHandling> = self
223 .properties
224 .get("debezium.time.precision.mode")
225 .map(|v| v == "connect")
226 .unwrap_or(false)
227 .then_some(TimeHandling::Milli);
228 let mut upstream = transform_upstream(
230 upstream,
231 self.output_columns.clone(),
232 timestamp_handling,
233 timestamptz_handling,
234 time_handling,
235 )
236 .boxed();
237 loop {
238 if let Some(msg) =
239 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
240 .await?
241 {
242 if let Some(msg) = mapping_message(msg, &self.output_indices) {
243 match msg {
244 Message::Barrier(barrier) => {
245 state_impl.commit_state(barrier.epoch).await?;
247 yield Message::Barrier(barrier);
248 }
249 Message::Chunk(chunk) => {
250 if need_backfill {
251 } else {
253 yield Message::Chunk(chunk);
255 }
256 }
257 Message::Watermark(_) => {
258 }
260 }
261 }
262 } else {
263 assert!(table_reader.is_some(), "table reader must created");
264 tracing::info!(
265 table_id,
266 upstream_table_name,
267 "table reader created successfully"
268 );
269 break;
270 }
271 }
272
273 let upstream_table_reader = UpstreamTableReader::new(
274 self.external_table.clone(),
275 table_reader.expect("table reader must created"),
276 );
277
278 let mut upstream = upstream.peekable();
279
280 let mut last_binlog_offset: Option<CdcOffset> = {
281 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
283 tokio::sync::Semaphore::const_new(10);
284
285 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
286 state
287 .last_cdc_offset
288 .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
289 };
290
291 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
292 let mut consumed_binlog_offset: Option<CdcOffset> = None;
293
294 tracing::info!(
295 table_id,
296 upstream_table_name,
297 initial_binlog_offset = ?last_binlog_offset,
298 ?current_pk_pos,
299 is_finished = state.is_finished,
300 is_snapshot_paused,
301 snapshot_row_count = total_snapshot_row_count,
302 rate_limit = self.rate_limit_rps,
303 disable_backfill = self.options.disable_backfill,
304 snapshot_barrier_interval = self.options.snapshot_barrier_interval,
305 snapshot_batch_size = self.options.snapshot_batch_size,
306 "start cdc backfill",
307 );
308
309 if need_backfill {
329 let _ = Pin::new(&mut upstream).peek().await;
332
333 #[for_await]
335 for msg in upstream.by_ref() {
336 match msg? {
337 Message::Barrier(barrier) => {
338 match barrier.mutation.as_deref() {
339 Some(crate::executor::Mutation::Pause) => {
340 is_snapshot_paused = true;
341 tracing::info!(
342 table_id,
343 upstream_table_name,
344 "snapshot is paused by barrier"
345 );
346 }
347 Some(crate::executor::Mutation::Resume) => {
348 is_snapshot_paused = false;
349 tracing::info!(
350 table_id,
351 upstream_table_name,
352 "snapshot is resumed by barrier"
353 );
354 }
355 _ => {
356 }
358 }
359 state_impl.commit_state(barrier.epoch).await?;
361 yield Message::Barrier(barrier);
362 break;
363 }
364 Message::Chunk(ref chunk) => {
365 last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
366 }
367 Message::Watermark(_) => {
368 }
370 }
371 }
372
373 tracing::info!(table_id,
374 upstream_table_name,
375 initial_binlog_offset = ?last_binlog_offset,
376 ?current_pk_pos,
377 is_snapshot_paused,
378 "start cdc backfill loop");
379
380 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
382
383 'backfill_loop: loop {
384 let left_upstream = upstream.by_ref().map(Either::Left);
385
386 let mut snapshot_read_row_cnt: usize = 0;
387 let read_args = SnapshotReadArgs::new(
388 current_pk_pos.clone(),
389 self.rate_limit_rps,
390 pk_indices.clone(),
391 additional_columns.clone(),
392 schema_table_name.clone(),
393 external_database_name.clone(),
394 );
395 let right_snapshot = pin!(
396 upstream_table_reader
397 .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
398 .map(Either::Right)
399 );
400 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
401 if is_snapshot_paused {
402 snapshot_valve.pause();
403 }
404
405 let mut backfill_stream =
407 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
408 stream::PollNext::Left
409 });
410
411 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
412 let mut cur_barrier_upstream_processed_rows: u64 = 0;
413 let mut barrier_count: u32 = 0;
414 let mut pending_barrier = None;
415
416 #[for_await]
417 for either in &mut backfill_stream {
418 match either {
419 Either::Left(msg) => {
421 match msg? {
422 Message::Barrier(barrier) => {
423 barrier_count += 1;
425 let can_start_new_snapshot =
426 barrier_count == self.options.snapshot_barrier_interval;
427
428 if let Some(mutation) = barrier.mutation.as_deref() {
429 use crate::executor::Mutation;
430 match mutation {
431 Mutation::Pause => {
432 is_snapshot_paused = true;
433 snapshot_valve.pause();
434 }
435 Mutation::Resume => {
436 is_snapshot_paused = false;
437 snapshot_valve.resume();
438 }
439 Mutation::Throttle(some) => {
440 if let Some(new_rate_limit) =
441 some.get(&self.actor_ctx.id)
442 && *new_rate_limit != self.rate_limit_rps
443 {
444 self.rate_limit_rps = *new_rate_limit;
445 rate_limit_to_zero = self
446 .rate_limit_rps
447 .is_some_and(|val| val == 0);
448 state_impl
450 .mutate_state(
451 current_pk_pos.clone(),
452 last_binlog_offset.clone(),
453 total_snapshot_row_count,
454 false,
455 )
456 .await?;
457 state_impl.commit_state(barrier.epoch).await?;
458 yield Message::Barrier(barrier);
459
460 continue 'backfill_loop;
462 }
463 }
464 Mutation::Update(UpdateMutation {
465 dropped_actors,
466 ..
467 }) => {
468 if dropped_actors.contains(&self.actor_ctx.id) {
469 tracing::info!(
471 table_id,
472 upstream_table_name,
473 "CdcBackfill has been dropped due to config change"
474 );
475 yield Message::Barrier(barrier);
476 break 'backfill_loop;
477 }
478 }
479 _ => (),
480 }
481 }
482
483 Self::report_metrics(
484 &self.metrics,
485 cur_barrier_snapshot_processed_rows,
486 cur_barrier_upstream_processed_rows,
487 );
488
489 if can_start_new_snapshot {
492 pending_barrier = Some(barrier);
494 tracing::debug!(
495 table_id,
496 ?current_pk_pos,
497 ?snapshot_read_row_cnt,
498 "Prepare to start a new snapshot"
499 );
500 break;
502 } else {
503 state_impl
505 .mutate_state(
506 current_pk_pos.clone(),
507 last_binlog_offset.clone(),
508 total_snapshot_row_count,
509 false,
510 )
511 .await?;
512
513 state_impl.commit_state(barrier.epoch).await?;
514
515 yield Message::Barrier(barrier);
517 }
518 }
519 Message::Chunk(chunk) => {
520 if chunk.cardinality() == 0 {
522 continue;
523 }
524
525 let chunk_binlog_offset =
526 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
527
528 tracing::trace!(
529 "recv changelog chunk: chunk_offset {:?}, capactiy {}",
530 chunk_binlog_offset,
531 chunk.capacity()
532 );
533
534 if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
538 && let Some(chunk_offset) = chunk_binlog_offset
539 && chunk_offset < *last_binlog_offset
540 {
541 tracing::trace!(
542 "skip changelog chunk: chunk_offset {:?}, capacity {}",
543 chunk_offset,
544 chunk.capacity()
545 );
546 continue;
547 }
548 upstream_chunk_buffer.push(chunk.compact());
550 }
551 Message::Watermark(_) => {
552 }
554 }
555 }
556 Either::Right(msg) => {
558 match msg? {
559 None => {
560 tracing::info!(
561 table_id,
562 ?last_binlog_offset,
563 ?current_pk_pos,
564 "snapshot read stream ends"
565 );
566 for chunk in upstream_chunk_buffer.drain(..) {
571 yield Message::Chunk(mapping_chunk(
572 chunk,
573 &self.output_indices,
574 ));
575 }
576
577 break 'backfill_loop;
579 }
580 Some(chunk) => {
581 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
585
586 tracing::trace!(
587 "got a snapshot chunk: len {}, current_pk_pos {:?}",
588 chunk.cardinality(),
589 current_pk_pos
590 );
591 let chunk_cardinality = chunk.cardinality() as u64;
592 cur_barrier_snapshot_processed_rows += chunk_cardinality;
593 total_snapshot_row_count += chunk_cardinality;
594 yield Message::Chunk(mapping_chunk(
595 chunk,
596 &self.output_indices,
597 ));
598 }
599 }
600 }
601 }
602 }
603
604 assert!(pending_barrier.is_some(), "pending_barrier must exist");
605 let pending_barrier = pending_barrier.unwrap();
606
607 let (_, mut snapshot_stream) = backfill_stream.into_inner();
612
613 if !is_snapshot_paused
615 && !rate_limit_to_zero
616 && let Some(msg) = snapshot_stream
617 .next()
618 .instrument_await("consume_snapshot_stream_once")
619 .await
620 {
621 let Either::Right(msg) = msg else {
622 bail!("BUG: snapshot_read contains upstream messages");
623 };
624 match msg? {
625 None => {
626 tracing::info!(
627 table_id,
628 ?last_binlog_offset,
629 ?current_pk_pos,
630 "snapshot read stream ends in the force emit branch"
631 );
632 for chunk in upstream_chunk_buffer.drain(..) {
635 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
636 }
637
638 state_impl
640 .mutate_state(
641 current_pk_pos.clone(),
642 last_binlog_offset.clone(),
643 total_snapshot_row_count,
644 true,
645 )
646 .await?;
647
648 state_impl.commit_state(pending_barrier.epoch).await?;
650 yield Message::Barrier(pending_barrier);
651 break 'backfill_loop;
653 }
654 Some(chunk) => {
655 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
657
658 let row_count = chunk.cardinality() as u64;
659 cur_barrier_snapshot_processed_rows += row_count;
660 total_snapshot_row_count += row_count;
661 snapshot_read_row_cnt += row_count as usize;
662
663 tracing::debug!(
664 table_id,
665 ?current_pk_pos,
666 ?snapshot_read_row_cnt,
667 "force emit a snapshot chunk"
668 );
669 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
670 }
671 }
672 }
673
674 if let Some(current_pos) = ¤t_pk_pos {
677 for chunk in upstream_chunk_buffer.drain(..) {
678 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
679
680 consumed_binlog_offset =
683 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
684
685 yield Message::Chunk(mapping_chunk(
686 mark_cdc_chunk(
687 &offset_parse_func,
688 chunk,
689 current_pos,
690 &pk_indices,
691 &pk_order,
692 last_binlog_offset.clone(),
693 )?,
694 &self.output_indices,
695 ));
696 }
697 } else {
698 upstream_chunk_buffer.clear();
701 }
702
703 if consumed_binlog_offset.is_some() {
705 last_binlog_offset.clone_from(&consumed_binlog_offset);
706 }
707
708 Self::report_metrics(
709 &self.metrics,
710 cur_barrier_snapshot_processed_rows,
711 cur_barrier_upstream_processed_rows,
712 );
713
714 state_impl
716 .mutate_state(
717 current_pk_pos.clone(),
718 last_binlog_offset.clone(),
719 total_snapshot_row_count,
720 false,
721 )
722 .await?;
723
724 state_impl.commit_state(pending_barrier.epoch).await?;
725 yield Message::Barrier(pending_barrier);
726 }
727 } else if self.options.disable_backfill {
728 tracing::info!(
730 table_id,
731 upstream_table_name,
732 "CdcBackfill has been disabled"
733 );
734 state_impl
735 .mutate_state(
736 current_pk_pos.clone(),
737 last_binlog_offset.clone(),
738 total_snapshot_row_count,
739 true,
740 )
741 .await?;
742 }
743
744 upstream_table_reader.disconnect().await?;
745
746 tracing::info!(
747 table_id,
748 upstream_table_name,
749 "CdcBackfill has already finished and will forward messages directly to the downstream"
750 );
751
752 while let Some(Ok(msg)) = upstream.next().await {
755 if let Some(msg) = mapping_message(msg, &self.output_indices) {
756 if let Message::Barrier(barrier) = &msg {
758 state_impl
761 .mutate_state(
762 current_pk_pos.clone(),
763 last_binlog_offset.clone(),
764 total_snapshot_row_count,
765 true,
766 )
767 .await?;
768 state_impl.commit_state(barrier.epoch).await?;
769
770 if let Some(progress) = self.progress.as_mut() {
772 progress.finish(barrier.epoch, total_snapshot_row_count);
773 }
774 yield msg;
775 break;
777 }
778 yield msg;
779 }
780 }
781
782 #[for_await]
786 for msg in upstream {
787 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
790 if let Message::Barrier(barrier) = &msg {
791 state_impl.commit_state(barrier.epoch).await?;
793 }
794 yield msg;
795 }
796 }
797 }
798}
799
800pub(crate) async fn build_reader_and_poll_upstream(
801 upstream: &mut BoxedMessageStream,
802 table_reader: &mut Option<ExternalTableReaderImpl>,
803 future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
804) -> StreamExecutorResult<Option<Message>> {
805 if table_reader.is_some() {
806 return Ok(None);
807 }
808 tokio::select! {
809 biased;
810 reader = &mut *future => {
811 *table_reader = Some(reader);
812 Ok(None)
813 }
814 msg = upstream.next() => {
815 msg.transpose()
816 }
817 }
818}
819
820#[try_stream(ok = Message, error = StreamExecutorError)]
821pub async fn transform_upstream(
822 upstream: BoxedMessageStream,
823 output_columns: Vec<ColumnDesc>,
824 timestamp_handling: Option<TimestampHandling>,
825 timestamptz_handling: Option<TimestamptzHandling>,
826 time_handling: Option<TimeHandling>,
827) {
828 let props = SpecificParserConfig {
829 encoding_config: EncodingProperties::Json(JsonProperties {
830 use_schema_registry: false,
831 timestamp_handling,
832 timestamptz_handling,
833 time_handling,
834 }),
835 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
837 };
838
839 let columns_with_meta = output_columns
841 .iter()
842 .map(SourceColumnDesc::from)
843 .collect_vec();
844 let mut parser = DebeziumParser::new(
845 props,
846 columns_with_meta.clone(),
847 Arc::new(SourceContext::dummy()),
848 )
849 .await
850 .map_err(StreamExecutorError::connector_error)?;
851
852 pin_mut!(upstream);
853 #[for_await]
854 for msg in upstream {
855 let mut msg = msg?;
856 if let Message::Chunk(chunk) = &mut msg {
857 let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
858 let _ = std::mem::replace(chunk, parsed_chunk);
859 }
860 yield msg;
861 }
862}
863
864async fn parse_debezium_chunk(
865 parser: &mut DebeziumParser,
866 chunk: &StreamChunk,
867) -> StreamExecutorResult<StreamChunk> {
868 let mut builder = SourceStreamChunkBuilder::new(
875 parser.columns().to_vec(),
876 SourceCtrlOpts {
877 chunk_size: chunk.capacity(),
878 split_txn: false,
879 },
880 );
881
882 let payloads = chunk.data_chunk().project(&[0]);
886 let offsets = chunk.data_chunk().project(&[1]).compact();
887
888 for payload in payloads.rows() {
890 let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
891 else {
892 panic!("payload must be jsonb");
893 };
894
895 parser
896 .parse_inner(
897 None,
898 Some(jsonb_ref.to_string().as_bytes().to_vec()),
899 builder.row_writer(),
900 )
901 .await
902 .unwrap();
903 }
904 builder.finish_current_chunk();
905
906 let parsed_chunk = {
907 let mut iter = builder.consume_ready_chunks();
908 assert_eq!(1, iter.len());
909 iter.next().unwrap()
910 };
911 assert_eq!(parsed_chunk.capacity(), chunk.capacity()); let (ops, mut columns, vis) = parsed_chunk.into_inner();
913 columns.extend(offsets.into_parts().0);
917
918 Ok(StreamChunk::from_parts(
919 ops,
920 DataChunk::from_parts(columns.into(), vis),
921 ))
922}
923
924impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
925 fn execute(self: Box<Self>) -> BoxedMessageStream {
926 self.execute_inner().boxed()
927 }
928}
929
930#[cfg(test)]
931mod tests {
932 use std::collections::BTreeMap;
933 use std::str::FromStr;
934
935 use futures::{StreamExt, pin_mut};
936 use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
937 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
938 use risingwave_common::types::{DataType, Datum, JsonbVal};
939 use risingwave_common::util::epoch::test_epoch;
940 use risingwave_common::util::iter_util::ZipEqFast;
941 use risingwave_connector::source::cdc::CdcScanOptions;
942 use risingwave_storage::memory::MemoryStateStore;
943
944 use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
945 use crate::executor::monitor::StreamingMetrics;
946 use crate::executor::prelude::StateTable;
947 use crate::executor::source::default_source_internal_table;
948 use crate::executor::test_utils::MockSource;
949 use crate::executor::{
950 ActorContext, Barrier, CdcBackfillExecutor, ExternalStorageTable, Message,
951 };
952
953 #[tokio::test]
954 async fn test_transform_upstream_chunk() {
955 let schema = Schema::new(vec![
956 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
960 let pk_indices = vec![1];
961 let (mut tx, source) = MockSource::channel();
962 let source = source.into_executor(schema.clone(), pk_indices.clone());
963 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
965
966 let datums: Vec<Datum> = vec![
967 Some(JsonbVal::from_str(payload).unwrap().into()),
968 Some("file: 1.binlog, pos: 100".to_owned().into()),
969 Some("mydb.orders".to_owned().into()),
970 ];
971
972 println!("datums: {:?}", datums[1]);
973
974 let mut builders = schema.create_array_builders(8);
975 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
976 builder.append(datum.clone());
977 }
978 let columns = builders
979 .into_iter()
980 .map(|builder| builder.finish().into())
981 .collect();
982
983 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
985
986 tx.push_chunk(chunk);
987 let upstream = Box::new(source).execute();
988
989 let columns = vec![
991 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
992 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
993 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
994 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
995 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
996 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
997 ];
998
999 let parsed_stream = transform_upstream(upstream, columns, None, None, None);
1000 pin_mut!(parsed_stream);
1001 if let Some(message) = parsed_stream.next().await {
1003 println!("chunk: {:#?}", message.unwrap());
1004 }
1005 }
1006
1007 #[tokio::test]
1008 async fn test_build_reader_and_poll_upstream() {
1009 let actor_context = ActorContext::for_test(1);
1010 let external_storage_table = ExternalStorageTable::for_test_undefined();
1011 let schema = Schema::new(vec![
1012 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
1016 let pk_indices = vec![1];
1017 let (mut tx, source) = MockSource::channel();
1018 let source = source.into_executor(schema.clone(), pk_indices.clone());
1019 let output_indices = vec![1, 0, 4]; let output_columns = vec![
1021 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1022 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1023 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1024 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1025 ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
1026 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1027 ];
1028 let store = MemoryStateStore::new();
1029 let state_table =
1030 StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
1031 .await;
1032 let cdc = CdcBackfillExecutor::new(
1033 actor_context,
1034 external_storage_table,
1035 source,
1036 output_indices,
1037 output_columns,
1038 None,
1039 StreamingMetrics::unused().into(),
1040 state_table,
1041 None,
1042 CdcScanOptions {
1043 disable_backfill: true,
1046 ..CdcScanOptions::default()
1047 },
1048 BTreeMap::default(),
1049 );
1050 let s = cdc.execute_inner();
1054 pin_mut!(s);
1055
1056 tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1058 {
1060 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1061 let datums: Vec<Datum> = vec![
1062 Some(JsonbVal::from_str(payload).unwrap().into()),
1063 Some("file: 1.binlog, pos: 100".to_owned().into()),
1064 Some("mydb.orders".to_owned().into()),
1065 ];
1066 let mut builders = schema.create_array_builders(8);
1067 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1068 builder.append(datum.clone());
1069 }
1070 let columns = builders
1071 .into_iter()
1072 .map(|builder| builder.finish().into())
1073 .collect();
1074 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1076
1077 tx.push_chunk(chunk);
1078 }
1079 let _first_barrier = s.next().await.unwrap();
1080 let upstream_change_log = s.next().await.unwrap().unwrap();
1081 let Message::Chunk(chunk) = upstream_change_log else {
1082 panic!("expect chunk");
1083 };
1084 assert_eq!(chunk.columns().len(), 3);
1085 assert_eq!(chunk.rows().count(), 1);
1086 assert_eq!(
1087 chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1088 vec![Some(44485)]
1089 );
1090 assert_eq!(
1091 chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1092 vec![Some(5)]
1093 );
1094 assert_eq!(
1095 chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1096 vec![Some(100)]
1097 );
1098 }
1099}