1use std::future::Future;
16use std::pin::Pin;
17
18use either::Either;
19use futures::stream;
20use futures::stream::select_with_strategy;
21use itertools::Itertools;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_connector::parser::{
26 ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
27 ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig,
28};
29use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
30use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
31use rw_futures_util::pausable;
32use thiserror_ext::AsReport;
33use tracing::Instrument;
34
35use crate::executor::UpdateMutation;
36use crate::executor::backfill::CdcScanOptions;
37use crate::executor::backfill::cdc::state::CdcBackfillState;
38use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
39use crate::executor::backfill::cdc::upstream_table::snapshot::{
40 SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
41};
42use crate::executor::backfill::utils::{
43 get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
44};
45use crate::executor::monitor::CdcBackfillMetrics;
46use crate::executor::prelude::*;
47use crate::executor::source::get_infinite_backoff_strategy;
48use crate::task::CreateMviewProgressReporter;
49
50const METADATA_STATE_LEN: usize = 4;
52
53pub struct CdcBackfillExecutor<S: StateStore> {
54 actor_ctx: ActorContextRef,
55
56 external_table: ExternalStorageTable,
58
59 upstream: Executor,
61
62 output_indices: Vec<usize>,
64
65 output_columns: Vec<ColumnDesc>,
67
68 state_impl: CdcBackfillState<S>,
70
71 progress: Option<CreateMviewProgressReporter>,
74
75 metrics: CdcBackfillMetrics,
76
77 rate_limit_rps: Option<u32>,
79
80 options: CdcScanOptions,
81}
82
83impl<S: StateStore> CdcBackfillExecutor<S> {
84 #[allow(clippy::too_many_arguments)]
85 pub fn new(
86 actor_ctx: ActorContextRef,
87 external_table: ExternalStorageTable,
88 upstream: Executor,
89 output_indices: Vec<usize>,
90 output_columns: Vec<ColumnDesc>,
91 progress: Option<CreateMviewProgressReporter>,
92 metrics: Arc<StreamingMetrics>,
93 state_table: StateTable<S>,
94 rate_limit_rps: Option<u32>,
95 options: CdcScanOptions,
96 ) -> Self {
97 let pk_indices = external_table.pk_indices();
98 let upstream_table_id = external_table.table_id().table_id;
99 let state_impl = CdcBackfillState::new(
100 upstream_table_id,
101 state_table,
102 pk_indices.len() + METADATA_STATE_LEN,
103 );
104
105 let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
106
107 Self {
108 actor_ctx,
109 external_table,
110 upstream,
111 output_indices,
112 output_columns,
113 state_impl,
114 progress,
115 metrics,
116 rate_limit_rps,
117 options,
118 }
119 }
120
121 fn report_metrics(
122 metrics: &CdcBackfillMetrics,
123 snapshot_processed_row_count: u64,
124 upstream_processed_row_count: u64,
125 ) {
126 metrics
127 .cdc_backfill_snapshot_read_row_count
128 .inc_by(snapshot_processed_row_count);
129
130 metrics
131 .cdc_backfill_upstream_output_row_count
132 .inc_by(upstream_processed_row_count);
133 }
134
135 #[try_stream(ok = Message, error = StreamExecutorError)]
136 async fn execute_inner(mut self) {
137 let pk_indices = self.external_table.pk_indices().to_vec();
139 let pk_order = self.external_table.pk_order_types().to_vec();
140
141 let table_id = self.external_table.table_id().table_id;
142 let upstream_table_name = self.external_table.qualified_table_name();
143 let schema_table_name = self.external_table.schema_table_name().clone();
144 let external_database_name = self.external_table.database_name().to_owned();
145
146 let additional_columns = self
147 .output_columns
148 .iter()
149 .filter(|col| col.additional_column.column_type.is_some())
150 .cloned()
151 .collect_vec();
152
153 let mut upstream = self.upstream.execute();
154
155 let mut current_pk_pos: Option<OwnedRow>;
158
159 let first_barrier = expect_first_barrier(&mut upstream).await?;
161
162 let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
163 let first_barrier_epoch = first_barrier.epoch;
164 yield Message::Barrier(first_barrier);
166 let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
167
168 let mut state_impl = self.state_impl;
171
172 state_impl.init_epoch(first_barrier_epoch).await?;
173
174 let state = state_impl.restore_state().await?;
176 current_pk_pos = state.current_pk_pos.clone();
177
178 let need_backfill = !self.options.disable_backfill && !state.is_finished;
179
180 let mut total_snapshot_row_count = state.row_count as u64;
182
183 let mut table_reader: Option<ExternalTableReaderImpl> = None;
188 let external_table = self.external_table.clone();
189 let mut future = Box::pin(async move {
190 let backoff = get_infinite_backoff_strategy();
191 tokio_retry::Retry::spawn(backoff, || async {
192 match external_table.create_table_reader().await {
193 Ok(reader) => Ok(reader),
194 Err(e) => {
195 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
196 Err(e)
197 }
198 }
199 })
200 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
201 .await
202 .expect("Retry create cdc table reader until success.")
203 });
204
205 let mut upstream = transform_upstream(upstream, self.output_columns.clone()).boxed();
207 loop {
208 if let Some(msg) =
209 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
210 .await?
211 {
212 if let Some(msg) = mapping_message(msg, &self.output_indices) {
213 match msg {
214 Message::Barrier(barrier) => {
215 state_impl.commit_state(barrier.epoch).await?;
217 yield Message::Barrier(barrier);
218 }
219 Message::Chunk(chunk) => {
220 if need_backfill {
221 } else {
223 yield Message::Chunk(chunk);
225 }
226 }
227 Message::Watermark(_) => {
228 }
230 }
231 }
232 } else {
233 assert!(table_reader.is_some(), "table reader must created");
234 tracing::info!(
235 table_id,
236 upstream_table_name,
237 "table reader created successfully"
238 );
239 break;
240 }
241 }
242
243 let upstream_table_reader = UpstreamTableReader::new(
244 self.external_table.clone(),
245 table_reader.expect("table reader must created"),
246 );
247
248 let mut upstream = upstream.peekable();
249
250 let mut last_binlog_offset: Option<CdcOffset> = {
251 static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
253 tokio::sync::Semaphore::const_new(10);
254
255 let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
256 state
257 .last_cdc_offset
258 .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
259 };
260
261 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
262 let mut consumed_binlog_offset: Option<CdcOffset> = None;
263
264 tracing::info!(
265 table_id,
266 upstream_table_name,
267 initial_binlog_offset = ?last_binlog_offset,
268 ?current_pk_pos,
269 is_finished = state.is_finished,
270 is_snapshot_paused,
271 snapshot_row_count = total_snapshot_row_count,
272 rate_limit = self.rate_limit_rps,
273 disable_backfill = self.options.disable_backfill,
274 snapshot_interval = self.options.snapshot_interval,
275 snapshot_batch_size = self.options.snapshot_batch_size,
276 "start cdc backfill",
277 );
278
279 if need_backfill {
299 let _ = Pin::new(&mut upstream).peek().await;
302
303 #[for_await]
305 for msg in upstream.by_ref() {
306 match msg? {
307 Message::Barrier(barrier) => {
308 match barrier.mutation.as_deref() {
309 Some(crate::executor::Mutation::Pause) => {
310 is_snapshot_paused = true;
311 tracing::info!(
312 table_id,
313 upstream_table_name,
314 "snapshot is paused by barrier"
315 );
316 }
317 Some(crate::executor::Mutation::Resume) => {
318 is_snapshot_paused = false;
319 tracing::info!(
320 table_id,
321 upstream_table_name,
322 "snapshot is resumed by barrier"
323 );
324 }
325 _ => {
326 }
328 }
329 state_impl.commit_state(barrier.epoch).await?;
331 yield Message::Barrier(barrier);
332 break;
333 }
334 Message::Chunk(ref chunk) => {
335 last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
336 }
337 Message::Watermark(_) => {
338 }
340 }
341 }
342
343 tracing::info!(table_id,
344 upstream_table_name,
345 initial_binlog_offset = ?last_binlog_offset,
346 ?current_pk_pos,
347 is_snapshot_paused,
348 "start cdc backfill loop");
349
350 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
352
353 'backfill_loop: loop {
354 let left_upstream = upstream.by_ref().map(Either::Left);
355
356 let mut snapshot_read_row_cnt: usize = 0;
357 let read_args = SnapshotReadArgs::new(
358 current_pk_pos.clone(),
359 self.rate_limit_rps,
360 pk_indices.clone(),
361 additional_columns.clone(),
362 schema_table_name.clone(),
363 external_database_name.clone(),
364 );
365 let right_snapshot = pin!(
366 upstream_table_reader
367 .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
368 .map(Either::Right)
369 );
370 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
371 if is_snapshot_paused {
372 snapshot_valve.pause();
373 }
374
375 let mut backfill_stream =
377 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
378 stream::PollNext::Left
379 });
380
381 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
382 let mut cur_barrier_upstream_processed_rows: u64 = 0;
383 let mut barrier_count: u32 = 0;
384 let mut pending_barrier = None;
385
386 #[for_await]
387 for either in &mut backfill_stream {
388 match either {
389 Either::Left(msg) => {
391 match msg? {
392 Message::Barrier(barrier) => {
393 barrier_count += 1;
395 let can_start_new_snapshot =
396 barrier_count == self.options.snapshot_interval;
397
398 if let Some(mutation) = barrier.mutation.as_deref() {
399 use crate::executor::Mutation;
400 match mutation {
401 Mutation::Pause => {
402 is_snapshot_paused = true;
403 snapshot_valve.pause();
404 }
405 Mutation::Resume => {
406 is_snapshot_paused = false;
407 snapshot_valve.resume();
408 }
409 Mutation::Throttle(some) => {
410 if let Some(new_rate_limit) =
411 some.get(&self.actor_ctx.id)
412 && *new_rate_limit != self.rate_limit_rps
413 {
414 self.rate_limit_rps = *new_rate_limit;
415 rate_limit_to_zero = self
416 .rate_limit_rps
417 .is_some_and(|val| val == 0);
418 state_impl
420 .mutate_state(
421 current_pk_pos.clone(),
422 last_binlog_offset.clone(),
423 total_snapshot_row_count,
424 false,
425 )
426 .await?;
427 state_impl.commit_state(barrier.epoch).await?;
428 yield Message::Barrier(barrier);
429
430 continue 'backfill_loop;
432 }
433 }
434 Mutation::Update(UpdateMutation {
435 dropped_actors,
436 ..
437 }) => {
438 if dropped_actors.contains(&self.actor_ctx.id) {
439 tracing::info!(
441 table_id,
442 upstream_table_name,
443 "CdcBackfill has been dropped due to config change"
444 );
445 yield Message::Barrier(barrier);
446 break 'backfill_loop;
447 }
448 }
449 _ => (),
450 }
451 }
452
453 Self::report_metrics(
454 &self.metrics,
455 cur_barrier_snapshot_processed_rows,
456 cur_barrier_upstream_processed_rows,
457 );
458
459 if can_start_new_snapshot {
462 pending_barrier = Some(barrier);
464 tracing::debug!(
465 table_id,
466 ?current_pk_pos,
467 ?snapshot_read_row_cnt,
468 "Prepare to start a new snapshot"
469 );
470 break;
472 } else {
473 state_impl
475 .mutate_state(
476 current_pk_pos.clone(),
477 last_binlog_offset.clone(),
478 total_snapshot_row_count,
479 false,
480 )
481 .await?;
482
483 state_impl.commit_state(barrier.epoch).await?;
484
485 yield Message::Barrier(barrier);
487 }
488 }
489 Message::Chunk(chunk) => {
490 if chunk.cardinality() == 0 {
492 continue;
493 }
494
495 let chunk_binlog_offset =
496 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
497
498 tracing::trace!(
499 "recv changelog chunk: chunk_offset {:?}, capactiy {}",
500 chunk_binlog_offset,
501 chunk.capacity()
502 );
503
504 if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
508 && let Some(chunk_offset) = chunk_binlog_offset
509 && chunk_offset < *last_binlog_offset
510 {
511 tracing::trace!(
512 "skip changelog chunk: chunk_offset {:?}, capacity {}",
513 chunk_offset,
514 chunk.capacity()
515 );
516 continue;
517 }
518 upstream_chunk_buffer.push(chunk.compact());
520 }
521 Message::Watermark(_) => {
522 }
524 }
525 }
526 Either::Right(msg) => {
528 match msg? {
529 None => {
530 tracing::info!(
531 table_id,
532 ?last_binlog_offset,
533 ?current_pk_pos,
534 "snapshot read stream ends"
535 );
536 for chunk in upstream_chunk_buffer.drain(..) {
541 yield Message::Chunk(mapping_chunk(
542 chunk,
543 &self.output_indices,
544 ));
545 }
546
547 break 'backfill_loop;
549 }
550 Some(chunk) => {
551 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
555
556 tracing::trace!(
557 "got a snapshot chunk: len {}, current_pk_pos {:?}",
558 chunk.cardinality(),
559 current_pk_pos
560 );
561 let chunk_cardinality = chunk.cardinality() as u64;
562 cur_barrier_snapshot_processed_rows += chunk_cardinality;
563 total_snapshot_row_count += chunk_cardinality;
564 yield Message::Chunk(mapping_chunk(
565 chunk,
566 &self.output_indices,
567 ));
568 }
569 }
570 }
571 }
572 }
573
574 assert!(pending_barrier.is_some(), "pending_barrier must exist");
575 let pending_barrier = pending_barrier.unwrap();
576
577 let (_, mut snapshot_stream) = backfill_stream.into_inner();
582
583 if !is_snapshot_paused
585 && !rate_limit_to_zero
586 && let Some(msg) = snapshot_stream
587 .next()
588 .instrument_await("consume_snapshot_stream_once")
589 .await
590 {
591 let Either::Right(msg) = msg else {
592 bail!("BUG: snapshot_read contains upstream messages");
593 };
594 match msg? {
595 None => {
596 tracing::info!(
597 table_id,
598 ?last_binlog_offset,
599 ?current_pk_pos,
600 "snapshot read stream ends in the force emit branch"
601 );
602 for chunk in upstream_chunk_buffer.drain(..) {
605 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
606 }
607
608 state_impl
610 .mutate_state(
611 current_pk_pos.clone(),
612 last_binlog_offset.clone(),
613 total_snapshot_row_count,
614 true,
615 )
616 .await?;
617
618 state_impl.commit_state(pending_barrier.epoch).await?;
620 yield Message::Barrier(pending_barrier);
621 break 'backfill_loop;
623 }
624 Some(chunk) => {
625 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
627
628 let row_count = chunk.cardinality() as u64;
629 cur_barrier_snapshot_processed_rows += row_count;
630 total_snapshot_row_count += row_count;
631 snapshot_read_row_cnt += row_count as usize;
632
633 tracing::debug!(
634 table_id,
635 ?current_pk_pos,
636 ?snapshot_read_row_cnt,
637 "force emit a snapshot chunk"
638 );
639 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
640 }
641 }
642 }
643
644 if let Some(current_pos) = ¤t_pk_pos {
647 for chunk in upstream_chunk_buffer.drain(..) {
648 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
649
650 consumed_binlog_offset =
653 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
654
655 yield Message::Chunk(mapping_chunk(
656 mark_cdc_chunk(
657 &offset_parse_func,
658 chunk,
659 current_pos,
660 &pk_indices,
661 &pk_order,
662 last_binlog_offset.clone(),
663 )?,
664 &self.output_indices,
665 ));
666 }
667 } else {
668 upstream_chunk_buffer.clear();
671 }
672
673 if consumed_binlog_offset.is_some() {
675 last_binlog_offset.clone_from(&consumed_binlog_offset);
676 }
677
678 Self::report_metrics(
679 &self.metrics,
680 cur_barrier_snapshot_processed_rows,
681 cur_barrier_upstream_processed_rows,
682 );
683
684 state_impl
686 .mutate_state(
687 current_pk_pos.clone(),
688 last_binlog_offset.clone(),
689 total_snapshot_row_count,
690 false,
691 )
692 .await?;
693
694 state_impl.commit_state(pending_barrier.epoch).await?;
695 yield Message::Barrier(pending_barrier);
696 }
697 } else if self.options.disable_backfill {
698 tracing::info!(
700 table_id,
701 upstream_table_name,
702 "CdcBackfill has been disabled"
703 );
704 state_impl
705 .mutate_state(
706 current_pk_pos.clone(),
707 last_binlog_offset.clone(),
708 total_snapshot_row_count,
709 true,
710 )
711 .await?;
712 }
713
714 upstream_table_reader.disconnect().await?;
715
716 tracing::info!(
717 table_id,
718 upstream_table_name,
719 "CdcBackfill has already finished and will forward messages directly to the downstream"
720 );
721
722 while let Some(Ok(msg)) = upstream.next().await {
725 if let Some(msg) = mapping_message(msg, &self.output_indices) {
726 if let Message::Barrier(barrier) = &msg {
728 state_impl
731 .mutate_state(
732 current_pk_pos.clone(),
733 last_binlog_offset.clone(),
734 total_snapshot_row_count,
735 true,
736 )
737 .await?;
738 state_impl.commit_state(barrier.epoch).await?;
739
740 if let Some(progress) = self.progress.as_mut() {
742 progress.finish(barrier.epoch, total_snapshot_row_count);
743 }
744 yield msg;
745 break;
747 }
748 yield msg;
749 }
750 }
751
752 #[for_await]
756 for msg in upstream {
757 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
760 if let Message::Barrier(barrier) = &msg {
761 state_impl.commit_state(barrier.epoch).await?;
763 }
764 yield msg;
765 }
766 }
767 }
768}
769
770async fn build_reader_and_poll_upstream(
771 upstream: &mut BoxedMessageStream,
772 table_reader: &mut Option<ExternalTableReaderImpl>,
773 future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
774) -> StreamExecutorResult<Option<Message>> {
775 if table_reader.is_some() {
776 return Ok(None);
777 }
778 tokio::select! {
779 biased;
780 reader = &mut *future => {
781 *table_reader = Some(reader);
782 Ok(None)
783 }
784 msg = upstream.next() => {
785 msg.transpose()
786 }
787 }
788}
789
790#[try_stream(ok = Message, error = StreamExecutorError)]
791pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: Vec<ColumnDesc>) {
792 let props = SpecificParserConfig {
793 encoding_config: EncodingProperties::Json(JsonProperties {
794 use_schema_registry: false,
795 timestamptz_handling: None,
796 }),
797 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
799 };
800
801 let columns_with_meta = output_columns
803 .iter()
804 .map(SourceColumnDesc::from)
805 .collect_vec();
806
807 let mut parser = DebeziumParser::new(
808 props,
809 columns_with_meta.clone(),
810 Arc::new(SourceContext::dummy()),
811 )
812 .await
813 .map_err(StreamExecutorError::connector_error)?;
814
815 pin_mut!(upstream);
816 #[for_await]
817 for msg in upstream {
818 let mut msg = msg?;
819 if let Message::Chunk(chunk) = &mut msg {
820 let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
821 let _ = std::mem::replace(chunk, parsed_chunk);
822 }
823 yield msg;
824 }
825}
826
827async fn parse_debezium_chunk(
828 parser: &mut DebeziumParser,
829 chunk: &StreamChunk,
830) -> StreamExecutorResult<StreamChunk> {
831 let mut builder = SourceStreamChunkBuilder::new(
838 parser.columns().to_vec(),
839 SourceCtrlOpts {
840 chunk_size: chunk.capacity(),
841 split_txn: false,
842 },
843 );
844
845 let payloads = chunk.data_chunk().project(&[0]);
849 let offsets = chunk.data_chunk().project(&[1]).compact();
850
851 for payload in payloads.rows() {
853 let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
854 else {
855 panic!("payload must be jsonb");
856 };
857
858 parser
859 .parse_inner(
860 None,
861 Some(jsonb_ref.to_string().as_bytes().to_vec()),
862 builder.row_writer(),
863 )
864 .await
865 .unwrap();
866 }
867 builder.finish_current_chunk();
868
869 let parsed_chunk = {
870 let mut iter = builder.consume_ready_chunks();
871 assert_eq!(1, iter.len());
872 iter.next().unwrap()
873 };
874 assert_eq!(parsed_chunk.capacity(), chunk.capacity()); let (ops, mut columns, vis) = parsed_chunk.into_inner();
876 columns.extend(offsets.into_parts().0);
880
881 Ok(StreamChunk::from_parts(
882 ops,
883 DataChunk::from_parts(columns.into(), vis),
884 ))
885}
886
887impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
888 fn execute(self: Box<Self>) -> BoxedMessageStream {
889 self.execute_inner().boxed()
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use std::str::FromStr;
896
897 use futures::{StreamExt, pin_mut};
898 use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
899 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
900 use risingwave_common::types::{DataType, Datum, JsonbVal};
901 use risingwave_common::util::epoch::test_epoch;
902 use risingwave_common::util::iter_util::ZipEqFast;
903 use risingwave_storage::memory::MemoryStateStore;
904
905 use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
906 use crate::executor::monitor::StreamingMetrics;
907 use crate::executor::prelude::StateTable;
908 use crate::executor::source::default_source_internal_table;
909 use crate::executor::test_utils::MockSource;
910 use crate::executor::{
911 ActorContext, Barrier, CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable, Message,
912 };
913
914 #[tokio::test]
915 async fn test_transform_upstream_chunk() {
916 let schema = Schema::new(vec![
917 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
921 let pk_indices = vec![1];
922 let (mut tx, source) = MockSource::channel();
923 let source = source.into_executor(schema.clone(), pk_indices.clone());
924 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
926
927 let datums: Vec<Datum> = vec![
928 Some(JsonbVal::from_str(payload).unwrap().into()),
929 Some("file: 1.binlog, pos: 100".to_owned().into()),
930 Some("mydb.orders".to_owned().into()),
931 ];
932
933 println!("datums: {:?}", datums[1]);
934
935 let mut builders = schema.create_array_builders(8);
936 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
937 builder.append(datum.clone());
938 }
939 let columns = builders
940 .into_iter()
941 .map(|builder| builder.finish().into())
942 .collect();
943
944 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
946
947 tx.push_chunk(chunk);
948 let upstream = Box::new(source).execute();
949
950 let columns = vec![
952 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
953 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
954 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
955 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
956 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
957 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
958 ];
959
960 let parsed_stream = transform_upstream(upstream, columns);
961 pin_mut!(parsed_stream);
962 if let Some(message) = parsed_stream.next().await {
964 println!("chunk: {:#?}", message.unwrap());
965 }
966 }
967
968 #[tokio::test]
969 async fn test_build_reader_and_poll_upstream() {
970 let actor_context = ActorContext::for_test(1);
971 let external_storage_table = ExternalStorageTable::for_test_undefined();
972 let schema = Schema::new(vec![
973 Field::unnamed(DataType::Jsonb), Field::unnamed(DataType::Varchar), Field::unnamed(DataType::Varchar), ]);
977 let pk_indices = vec![1];
978 let (mut tx, source) = MockSource::channel();
979 let source = source.into_executor(schema.clone(), pk_indices.clone());
980 let output_indices = vec![1, 0, 4]; let output_columns = vec![
982 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
983 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
984 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
985 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
986 ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
987 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
988 ];
989 let store = MemoryStateStore::new();
990 let state_table =
991 StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
992 .await;
993 let cdc = CdcBackfillExecutor::new(
994 actor_context,
995 external_storage_table,
996 source,
997 output_indices,
998 output_columns,
999 None,
1000 StreamingMetrics::unused().into(),
1001 state_table,
1002 None,
1003 CdcScanOptions {
1004 disable_backfill: true,
1007 ..CdcScanOptions::default()
1008 },
1009 );
1010 let s = cdc.execute_inner();
1014 pin_mut!(s);
1015
1016 tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1018 {
1020 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1021 let datums: Vec<Datum> = vec![
1022 Some(JsonbVal::from_str(payload).unwrap().into()),
1023 Some("file: 1.binlog, pos: 100".to_owned().into()),
1024 Some("mydb.orders".to_owned().into()),
1025 ];
1026 let mut builders = schema.create_array_builders(8);
1027 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1028 builder.append(datum.clone());
1029 }
1030 let columns = builders
1031 .into_iter()
1032 .map(|builder| builder.finish().into())
1033 .collect();
1034 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1036
1037 tx.push_chunk(chunk);
1038 }
1039 let _first_barrier = s.next().await.unwrap();
1040 let upstream_change_log = s.next().await.unwrap().unwrap();
1041 let Message::Chunk(chunk) = upstream_change_log else {
1042 panic!("expect chunk");
1043 };
1044 assert_eq!(chunk.columns().len(), 3);
1045 assert_eq!(chunk.rows().count(), 1);
1046 assert_eq!(
1047 chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1048 vec![Some(44485)]
1049 );
1050 assert_eq!(
1051 chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1052 vec![Some(5)]
1053 );
1054 assert_eq!(
1055 chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1056 vec![Some(100)]
1057 );
1058 }
1059}