risingwave_stream/executor/backfill/cdc/cdc_backfill.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17
18use either::Either;
19use futures::stream;
20use futures::stream::select_with_strategy;
21use itertools::Itertools;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_connector::parser::{
26 ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
27 ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig,
28};
29use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
30use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
31use rw_futures_util::pausable;
32use thiserror_ext::AsReport;
33use tracing::Instrument;
34
35use crate::executor::UpdateMutation;
36use crate::executor::backfill::CdcScanOptions;
37use crate::executor::backfill::cdc::state::CdcBackfillState;
38use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
39use crate::executor::backfill::cdc::upstream_table::snapshot::{
40 SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
41};
42use crate::executor::backfill::utils::{
43 get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
44};
45use crate::executor::monitor::CdcBackfillMetrics;
46use crate::executor::prelude::*;
47use crate::executor::source::get_infinite_backoff_strategy;
48use crate::task::CreateMviewProgressReporter;
49
50/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
51const METADATA_STATE_LEN: usize = 4;
52
53pub struct CdcBackfillExecutor<S: StateStore> {
54 actor_ctx: ActorContextRef,
55
56 /// The external table to be backfilled
57 external_table: ExternalStorageTable,
58
59 /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
60 upstream: Executor,
61
62 /// The column indices need to be forwarded to the downstream from the upstream and table scan.
63 output_indices: Vec<usize>,
64
65 /// The schema of output chunk, including additional columns if any
66 output_columns: Vec<ColumnDesc>,
67
68 /// State table of the `CdcBackfill` executor
69 state_impl: CdcBackfillState<S>,
70
71 // TODO: introduce a CdcBackfillProgress to report finish to Meta
72 // This object is just a stub right now
73 progress: Option<CreateMviewProgressReporter>,
74
75 metrics: CdcBackfillMetrics,
76
77 /// Rate limit in rows/s.
78 rate_limit_rps: Option<u32>,
79
80 options: CdcScanOptions,
81}
82
83impl<S: StateStore> CdcBackfillExecutor<S> {
84 #[allow(clippy::too_many_arguments)]
85 pub fn new(
86 actor_ctx: ActorContextRef,
87 external_table: ExternalStorageTable,
88 upstream: Executor,
89 output_indices: Vec<usize>,
90 output_columns: Vec<ColumnDesc>,
91 progress: Option<CreateMviewProgressReporter>,
92 metrics: Arc<StreamingMetrics>,
93 state_table: StateTable<S>,
94 rate_limit_rps: Option<u32>,
95 options: CdcScanOptions,
96 ) -> Self {
97 let pk_indices = external_table.pk_indices();
98 let upstream_table_id = external_table.table_id().table_id;
99 let state_impl = CdcBackfillState::new(
100 upstream_table_id,
101 state_table,
102 pk_indices.len() + METADATA_STATE_LEN,
103 );
104
105 let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
106
107 Self {
108 actor_ctx,
109 external_table,
110 upstream,
111 output_indices,
112 output_columns,
113 state_impl,
114 progress,
115 metrics,
116 rate_limit_rps,
117 options,
118 }
119 }
120
121 fn report_metrics(
122 metrics: &CdcBackfillMetrics,
123 snapshot_processed_row_count: u64,
124 upstream_processed_row_count: u64,
125 ) {
126 metrics
127 .cdc_backfill_snapshot_read_row_count
128 .inc_by(snapshot_processed_row_count);
129
130 metrics
131 .cdc_backfill_upstream_output_row_count
132 .inc_by(upstream_processed_row_count);
133 }
134
135 #[try_stream(ok = Message, error = StreamExecutorError)]
136 async fn execute_inner(mut self) {
137 // The indices to primary key columns
138 let pk_indices = self.external_table.pk_indices().to_vec();
139 let pk_order = self.external_table.pk_order_types().to_vec();
140
141 let table_id = self.external_table.table_id().table_id;
142 let upstream_table_name = self.external_table.qualified_table_name();
143 let schema_table_name = self.external_table.schema_table_name().clone();
144 let external_database_name = self.external_table.database_name().to_owned();
145
146 let additional_columns = self
147 .output_columns
148 .iter()
149 .filter(|col| col.additional_column.column_type.is_some())
150 .cloned()
151 .collect_vec();
152
153 let mut upstream = self.upstream.execute();
154
155 // Current position of the upstream_table storage primary key.
156 // `None` means it starts from the beginning.
157 let mut current_pk_pos: Option<OwnedRow>;
158
159 // Poll the upstream to get the first barrier.
160 let first_barrier = expect_first_barrier(&mut upstream).await?;
161
162 let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
163 let first_barrier_epoch = first_barrier.epoch;
164 // The first barrier message should be propagated.
165 yield Message::Barrier(first_barrier);
166 let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
167
168 // Check whether this parallelism has been assigned splits,
169 // if not, we should bypass the backfill directly.
170 let mut state_impl = self.state_impl;
171
172 state_impl.init_epoch(first_barrier_epoch).await?;
173
174 // restore backfill state
175 let state = state_impl.restore_state().await?;
176 current_pk_pos = state.current_pk_pos.clone();
177
178 let need_backfill = !self.options.disable_backfill && !state.is_finished;
179
180 // Keep track of rows from the snapshot.
181 let mut total_snapshot_row_count = state.row_count as u64;
182
183 // After init the state table and forward the initial barrier to downstream,
184 // we now try to create the table reader with retry.
185 // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
186 // If backfill is finished, we should forward the upstream cdc events to downstream.
187 let mut table_reader: Option<ExternalTableReaderImpl> = None;
188 let external_table = self.external_table.clone();
189 let mut future = Box::pin(async move {
190 let backoff = get_infinite_backoff_strategy();
191 tokio_retry::Retry::spawn(backoff, || async {
192 match external_table.create_table_reader().await {
193 Ok(reader) => Ok(reader),
194 Err(e) => {
195 tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
196 Err(e)
197 }
198 }
199 })
200 .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
201 .await
202 .expect("Retry create cdc table reader until success.")
203 });
204 loop {
205 if let Some(msg) =
206 build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
207 .await?
208 {
209 match msg {
210 Message::Barrier(barrier) => {
211 // commit state to bump the epoch of state table
212 state_impl.commit_state(barrier.epoch).await?;
213 yield Message::Barrier(barrier);
214 }
215 Message::Chunk(chunk) => {
216 if need_backfill {
217 // ignore chunk if we need backfill, since we can read the data from the snapshot
218 } else {
219 // forward the chunk to downstream
220 yield Message::Chunk(chunk);
221 }
222 }
223 Message::Watermark(_) => {
224 // ignore watermark
225 }
226 }
227 } else {
228 assert!(table_reader.is_some(), "table reader must created");
229 tracing::info!(
230 table_id,
231 upstream_table_name,
232 "table reader created successfully"
233 );
234 break;
235 }
236 }
237
238 let upstream_table_reader = UpstreamTableReader::new(
239 self.external_table.clone(),
240 table_reader.expect("table reader must created"),
241 );
242
243 let mut upstream = transform_upstream(upstream, &self.output_columns)
244 .boxed()
245 .peekable();
246 let mut last_binlog_offset: Option<CdcOffset> = state
247 .last_cdc_offset
248 .map_or(upstream_table_reader.current_cdc_offset().await?, Some);
249
250 let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
251 let mut consumed_binlog_offset: Option<CdcOffset> = None;
252
253 tracing::info!(
254 table_id,
255 upstream_table_name,
256 initial_binlog_offset = ?last_binlog_offset,
257 ?current_pk_pos,
258 is_finished = state.is_finished,
259 is_snapshot_paused,
260 snapshot_row_count = total_snapshot_row_count,
261 rate_limit = self.rate_limit_rps,
262 disable_backfill = self.options.disable_backfill,
263 snapshot_interval = self.options.snapshot_interval,
264 snapshot_batch_size = self.options.snapshot_batch_size,
265 "start cdc backfill",
266 );
267
268 // CDC Backfill Algorithm:
269 //
270 // When the first barrier comes from upstream:
271 // - read the current binlog offset as `binlog_low`
272 // - start a snapshot read upon upstream table and iterate over the snapshot read stream
273 // - buffer the changelog event from upstream
274 //
275 // When a new barrier comes from upstream:
276 // - read the current binlog offset as `binlog_high`
277 // - for each row of the upstream change log, forward it to downstream if it in the range
278 // of [binlog_low, binlog_high] and its pk <= `current_pos`, otherwise ignore it
279 // - reconstruct the whole backfill stream with upstream changelog and a new table snapshot
280 //
281 // When a chunk comes from snapshot, we forward it to the downstream and raise
282 // `current_pos`.
283 // When we reach the end of the snapshot read stream, it means backfill has been
284 // finished.
285 //
286 // Once the backfill loop ends, we forward the upstream directly to the downstream.
287 if need_backfill {
288 // drive the upstream changelog first to ensure we can receive timely changelog event,
289 // otherwise the upstream changelog may be blocked by the snapshot read stream
290 let _ = Pin::new(&mut upstream).peek().await;
291
292 // wait for a barrier to make sure the backfill starts after upstream source
293 #[for_await]
294 for msg in upstream.by_ref() {
295 match msg? {
296 Message::Barrier(barrier) => {
297 match barrier.mutation.as_deref() {
298 Some(crate::executor::Mutation::Pause) => {
299 is_snapshot_paused = true;
300 tracing::info!(
301 table_id,
302 upstream_table_name,
303 "snapshot is paused by barrier"
304 );
305 }
306 Some(crate::executor::Mutation::Resume) => {
307 is_snapshot_paused = false;
308 tracing::info!(
309 table_id,
310 upstream_table_name,
311 "snapshot is resumed by barrier"
312 );
313 }
314 _ => {
315 // ignore other mutations
316 }
317 }
318 // commit state just to bump the epoch of state table
319 state_impl.commit_state(barrier.epoch).await?;
320 yield Message::Barrier(barrier);
321 break;
322 }
323 Message::Chunk(ref chunk) => {
324 last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
325 }
326 Message::Watermark(_) => {
327 // Ignore watermark
328 }
329 }
330 }
331
332 tracing::info!(table_id,
333 upstream_table_name,
334 initial_binlog_offset = ?last_binlog_offset,
335 ?current_pk_pos,
336 is_snapshot_paused,
337 "start cdc backfill loop");
338
339 // the buffer will be drained when a barrier comes
340 let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
341
342 'backfill_loop: loop {
343 let left_upstream = upstream.by_ref().map(Either::Left);
344
345 let mut snapshot_read_row_cnt: usize = 0;
346 let read_args = SnapshotReadArgs::new(
347 current_pk_pos.clone(),
348 self.rate_limit_rps,
349 pk_indices.clone(),
350 additional_columns.clone(),
351 schema_table_name.clone(),
352 external_database_name.clone(),
353 );
354
355 let right_snapshot = pin!(
356 upstream_table_reader
357 .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
358 .map(Either::Right)
359 );
360
361 let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
362 if is_snapshot_paused {
363 snapshot_valve.pause();
364 }
365
366 // Prefer to select upstream, so we can stop snapshot stream when barrier comes.
367 let mut backfill_stream =
368 select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
369 stream::PollNext::Left
370 });
371
372 let mut cur_barrier_snapshot_processed_rows: u64 = 0;
373 let mut cur_barrier_upstream_processed_rows: u64 = 0;
374 let mut barrier_count: u32 = 0;
375 let mut pending_barrier = None;
376
377 #[for_await]
378 for either in &mut backfill_stream {
379 match either {
380 // Upstream
381 Either::Left(msg) => {
382 match msg? {
383 Message::Barrier(barrier) => {
384 // increase the barrier count and check whether need to start a new snapshot
385 barrier_count += 1;
386 let can_start_new_snapshot =
387 barrier_count == self.options.snapshot_interval;
388
389 if let Some(mutation) = barrier.mutation.as_deref() {
390 use crate::executor::Mutation;
391 match mutation {
392 Mutation::Pause => {
393 is_snapshot_paused = true;
394 snapshot_valve.pause();
395 }
396 Mutation::Resume => {
397 is_snapshot_paused = false;
398 snapshot_valve.resume();
399 }
400 Mutation::Throttle(some) => {
401 if let Some(new_rate_limit) =
402 some.get(&self.actor_ctx.id)
403 && *new_rate_limit != self.rate_limit_rps
404 {
405 self.rate_limit_rps = *new_rate_limit;
406 rate_limit_to_zero = self
407 .rate_limit_rps
408 .is_some_and(|val| val == 0);
409
410 // update and persist current backfill progress without draining the buffered upstream chunks
411 state_impl
412 .mutate_state(
413 current_pk_pos.clone(),
414 last_binlog_offset.clone(),
415 total_snapshot_row_count,
416 false,
417 )
418 .await?;
419 state_impl.commit_state(barrier.epoch).await?;
420 yield Message::Barrier(barrier);
421
422 // rebuild the snapshot stream with new rate limit
423 continue 'backfill_loop;
424 }
425 }
426 Mutation::Update(UpdateMutation {
427 dropped_actors,
428 ..
429 }) => {
430 if dropped_actors.contains(&self.actor_ctx.id) {
431 // the actor has been dropped, exit the backfill loop
432 tracing::info!(
433 table_id,
434 upstream_table_name,
435 "CdcBackfill has been dropped due to config change"
436 );
437 yield Message::Barrier(barrier);
438 break 'backfill_loop;
439 }
440 }
441 _ => (),
442 }
443 }
444
445 Self::report_metrics(
446 &self.metrics,
447 cur_barrier_snapshot_processed_rows,
448 cur_barrier_upstream_processed_rows,
449 );
450
451 // when processing a barrier, check whether can start a new snapshot
452 // if the number of barriers reaches the snapshot interval
453 if can_start_new_snapshot {
454 // staging the barrier
455 pending_barrier = Some(barrier);
456 tracing::debug!(
457 table_id,
458 ?current_pk_pos,
459 ?snapshot_read_row_cnt,
460 "Prepare to start a new snapshot"
461 );
462 // Break the loop for consuming snapshot and prepare to start a new snapshot
463 break;
464 } else {
465 // update and persist current backfill progress
466 state_impl
467 .mutate_state(
468 current_pk_pos.clone(),
469 last_binlog_offset.clone(),
470 total_snapshot_row_count,
471 false,
472 )
473 .await?;
474
475 state_impl.commit_state(barrier.epoch).await?;
476
477 // emit barrier and continue consume the backfill stream
478 yield Message::Barrier(barrier);
479 }
480 }
481 Message::Chunk(chunk) => {
482 // skip empty upstream chunk
483 if chunk.cardinality() == 0 {
484 continue;
485 }
486
487 let chunk_binlog_offset =
488 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
489
490 tracing::trace!(
491 "recv changelog chunk: chunk_offset {:?}, capactiy {}",
492 chunk_binlog_offset,
493 chunk.capacity()
494 );
495
496 // Since we don't need changelog before the
497 // `last_binlog_offset`, skip the chunk that *only* contains
498 // events before `last_binlog_offset`.
499 if let Some(last_binlog_offset) = last_binlog_offset.as_ref() {
500 if let Some(chunk_offset) = chunk_binlog_offset
501 && chunk_offset < *last_binlog_offset
502 {
503 tracing::trace!(
504 "skip changelog chunk: chunk_offset {:?}, capacity {}",
505 chunk_offset,
506 chunk.capacity()
507 );
508 continue;
509 }
510 }
511 // Buffer the upstream chunk.
512 upstream_chunk_buffer.push(chunk.compact());
513 }
514 Message::Watermark(_) => {
515 // Ignore watermark during backfill.
516 }
517 }
518 }
519 // Snapshot read
520 Either::Right(msg) => {
521 match msg? {
522 None => {
523 tracing::info!(
524 table_id,
525 ?last_binlog_offset,
526 ?current_pk_pos,
527 "snapshot read stream ends"
528 );
529 // If the snapshot read stream ends, it means all historical
530 // data has been loaded.
531 // We should not mark the chunk anymore,
532 // otherwise, we will ignore some rows in the buffer.
533 for chunk in upstream_chunk_buffer.drain(..) {
534 yield Message::Chunk(mapping_chunk(
535 chunk,
536 &self.output_indices,
537 ));
538 }
539
540 // backfill has finished, exit the backfill loop and persist the state when we recv a barrier
541 break 'backfill_loop;
542 }
543 Some(chunk) => {
544 // Raise the current position.
545 // As snapshot read streams are ordered by pk, so we can
546 // just use the last row to update `current_pos`.
547 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
548
549 tracing::trace!(
550 "got a snapshot chunk: len {}, current_pk_pos {:?}",
551 chunk.cardinality(),
552 current_pk_pos
553 );
554 let chunk_cardinality = chunk.cardinality() as u64;
555 cur_barrier_snapshot_processed_rows += chunk_cardinality;
556 total_snapshot_row_count += chunk_cardinality;
557 yield Message::Chunk(mapping_chunk(
558 chunk,
559 &self.output_indices,
560 ));
561 }
562 }
563 }
564 }
565 }
566
567 assert!(pending_barrier.is_some(), "pending_barrier must exist");
568 let pending_barrier = pending_barrier.unwrap();
569
570 // Here we have to ensure the snapshot stream is consumed at least once,
571 // since the barrier event can kick in anytime.
572 // Otherwise, the result set of the new snapshot stream may become empty.
573 // It maybe a cancellation bug of the mysql driver.
574 let (_, mut snapshot_stream) = backfill_stream.into_inner();
575
576 // skip consume the snapshot stream if it is paused or rate limit to 0
577 if !is_snapshot_paused
578 && !rate_limit_to_zero
579 && let Some(msg) = snapshot_stream
580 .next()
581 .instrument_await("consume_snapshot_stream_once")
582 .await
583 {
584 let Either::Right(msg) = msg else {
585 bail!("BUG: snapshot_read contains upstream messages");
586 };
587 match msg? {
588 None => {
589 tracing::info!(
590 table_id,
591 ?last_binlog_offset,
592 ?current_pk_pos,
593 "snapshot read stream ends in the force emit branch"
594 );
595 // End of the snapshot read stream.
596 // Consume the buffered upstream chunk without filtering by `binlog_low`.
597 for chunk in upstream_chunk_buffer.drain(..) {
598 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
599 }
600
601 // mark backfill has finished
602 state_impl
603 .mutate_state(
604 current_pk_pos.clone(),
605 last_binlog_offset.clone(),
606 total_snapshot_row_count,
607 true,
608 )
609 .await?;
610
611 // commit state because we have received a barrier message
612 state_impl.commit_state(pending_barrier.epoch).await?;
613 yield Message::Barrier(pending_barrier);
614 // end of backfill loop, since backfill has finished
615 break 'backfill_loop;
616 }
617 Some(chunk) => {
618 // Raise the current pk position.
619 current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
620
621 let row_count = chunk.cardinality() as u64;
622 cur_barrier_snapshot_processed_rows += row_count;
623 total_snapshot_row_count += row_count;
624 snapshot_read_row_cnt += row_count as usize;
625
626 tracing::debug!(
627 table_id,
628 ?current_pk_pos,
629 ?snapshot_read_row_cnt,
630 "force emit a snapshot chunk"
631 );
632 yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
633 }
634 }
635 }
636
637 // If the number of barriers reaches the snapshot interval,
638 // consume the buffered upstream chunks.
639 if let Some(current_pos) = ¤t_pk_pos {
640 for chunk in upstream_chunk_buffer.drain(..) {
641 cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
642
643 // record the consumed binlog offset that will be
644 // persisted later
645 consumed_binlog_offset =
646 get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
647
648 yield Message::Chunk(mapping_chunk(
649 mark_cdc_chunk(
650 &offset_parse_func,
651 chunk,
652 current_pos,
653 &pk_indices,
654 &pk_order,
655 last_binlog_offset.clone(),
656 )?,
657 &self.output_indices,
658 ));
659 }
660 } else {
661 // If no current_pos, means we did not process any snapshot yet.
662 // we can just ignore the upstream buffer chunk in that case.
663 upstream_chunk_buffer.clear();
664 }
665
666 // Update last seen binlog offset
667 if consumed_binlog_offset.is_some() {
668 last_binlog_offset.clone_from(&consumed_binlog_offset);
669 }
670
671 Self::report_metrics(
672 &self.metrics,
673 cur_barrier_snapshot_processed_rows,
674 cur_barrier_upstream_processed_rows,
675 );
676
677 // update and persist current backfill progress
678 state_impl
679 .mutate_state(
680 current_pk_pos.clone(),
681 last_binlog_offset.clone(),
682 total_snapshot_row_count,
683 false,
684 )
685 .await?;
686
687 state_impl.commit_state(pending_barrier.epoch).await?;
688 yield Message::Barrier(pending_barrier);
689 }
690 } else if self.options.disable_backfill {
691 // If backfill is disabled, we just mark the backfill as finished
692 tracing::info!(
693 table_id,
694 upstream_table_name,
695 "CdcBackfill has been disabled"
696 );
697 state_impl
698 .mutate_state(
699 current_pk_pos.clone(),
700 last_binlog_offset.clone(),
701 total_snapshot_row_count,
702 true,
703 )
704 .await?;
705 }
706
707 // drop reader to release db connection
708 drop(upstream_table_reader);
709
710 tracing::info!(
711 table_id,
712 upstream_table_name,
713 "CdcBackfill has already finished and will forward messages directly to the downstream"
714 );
715
716 // Wait for first barrier to come after backfill is finished.
717 // So we can update our progress + persist the status.
718 while let Some(Ok(msg)) = upstream.next().await {
719 if let Some(msg) = mapping_message(msg, &self.output_indices) {
720 // If not finished then we need to update state, otherwise no need.
721 if let Message::Barrier(barrier) = &msg {
722 // finalized the backfill state
723 // TODO: unify `mutate_state` and `commit_state` into one method
724 state_impl
725 .mutate_state(
726 current_pk_pos.clone(),
727 last_binlog_offset.clone(),
728 total_snapshot_row_count,
729 true,
730 )
731 .await?;
732 state_impl.commit_state(barrier.epoch).await?;
733
734 // mark progress as finished
735 if let Some(progress) = self.progress.as_mut() {
736 progress.finish(barrier.epoch, total_snapshot_row_count);
737 }
738 yield msg;
739 // break after the state have been saved
740 break;
741 }
742 yield msg;
743 }
744 }
745
746 // After backfill progress finished
747 // we can forward messages directly to the downstream,
748 // as backfill is finished.
749 #[for_await]
750 for msg in upstream {
751 // upstream offsets will be removed from the message before forwarding to
752 // downstream
753 if let Some(msg) = mapping_message(msg?, &self.output_indices) {
754 if let Message::Barrier(barrier) = &msg {
755 // commit state just to bump the epoch of state table
756 state_impl.commit_state(barrier.epoch).await?;
757 }
758 yield msg;
759 }
760 }
761 }
762}
763
764async fn build_reader_and_poll_upstream(
765 upstream: &mut BoxedMessageStream,
766 table_reader: &mut Option<ExternalTableReaderImpl>,
767 future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
768) -> StreamExecutorResult<Option<Message>> {
769 if table_reader.is_some() {
770 return Ok(None);
771 }
772 tokio::select! {
773 biased;
774 reader = &mut *future => {
775 *table_reader = Some(reader);
776 Ok(None)
777 }
778 msg = upstream.next() => {
779 msg.transpose()
780 }
781 }
782}
783
784#[try_stream(ok = Message, error = StreamExecutorError)]
785pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) {
786 let props = SpecificParserConfig {
787 encoding_config: EncodingProperties::Json(JsonProperties {
788 use_schema_registry: false,
789 timestamptz_handling: None,
790 }),
791 // the cdc message is generated internally so the key must exist.
792 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
793 };
794
795 // convert to source column desc to feed into parser
796 let columns_with_meta = output_columns
797 .iter()
798 .map(SourceColumnDesc::from)
799 .collect_vec();
800
801 let mut parser = DebeziumParser::new(
802 props,
803 columns_with_meta.clone(),
804 Arc::new(SourceContext::dummy()),
805 )
806 .await
807 .map_err(StreamExecutorError::connector_error)?;
808
809 pin_mut!(upstream);
810 #[for_await]
811 for msg in upstream {
812 let mut msg = msg?;
813 if let Message::Chunk(chunk) = &mut msg {
814 let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
815 let _ = std::mem::replace(chunk, parsed_chunk);
816 }
817 yield msg;
818 }
819}
820
821async fn parse_debezium_chunk(
822 parser: &mut DebeziumParser,
823 chunk: &StreamChunk,
824) -> StreamExecutorResult<StreamChunk> {
825 // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema
826 // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the
827 // table job with `_rw_offset` in the end
828 // see `gen_create_table_plan_for_cdc_source` for details
829
830 // use `SourceStreamChunkBuilder` for convenience
831 let mut builder = SourceStreamChunkBuilder::new(
832 parser.columns().to_vec(),
833 SourceCtrlOpts {
834 chunk_size: chunk.capacity(),
835 split_txn: false,
836 },
837 );
838
839 // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)`
840 // We should use the debezium parser to parse the first column,
841 // then chain the parsed row with `_rw_offset` row to get a new row.
842 let payloads = chunk.data_chunk().project(&[0]);
843 let offsets = chunk.data_chunk().project(&[1]).compact();
844
845 // TODO: preserve the transaction semantics
846 for payload in payloads.rows() {
847 let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
848 else {
849 panic!("payload must be jsonb");
850 };
851
852 parser
853 .parse_inner(
854 None,
855 Some(jsonb_ref.to_string().as_bytes().to_vec()),
856 builder.row_writer(),
857 )
858 .await
859 .unwrap();
860 }
861 builder.finish_current_chunk();
862
863 let parsed_chunk = {
864 let mut iter = builder.consume_ready_chunks();
865 assert_eq!(1, iter.len());
866 iter.next().unwrap()
867 };
868 assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row
869 let (ops, mut columns, vis) = parsed_chunk.into_inner();
870 // note that `vis` is not necessarily the same as the original chunk's visibilities
871
872 // concat the rows in the parsed chunk with the `_rw_offset` column
873 columns.extend(offsets.into_parts().0);
874
875 Ok(StreamChunk::from_parts(
876 ops,
877 DataChunk::from_parts(columns.into(), vis),
878 ))
879}
880
881impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
882 fn execute(self: Box<Self>) -> BoxedMessageStream {
883 self.execute_inner().boxed()
884 }
885}
886
887#[cfg(test)]
888mod tests {
889 use std::str::FromStr;
890
891 use futures::{StreamExt, pin_mut};
892 use risingwave_common::array::{DataChunk, Op, StreamChunk};
893 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
894 use risingwave_common::types::{DataType, Datum, JsonbVal};
895 use risingwave_common::util::iter_util::ZipEqFast;
896
897 use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
898 use crate::executor::test_utils::MockSource;
899
900 #[tokio::test]
901 async fn test_transform_upstream_chunk() {
902 let schema = Schema::new(vec![
903 Field::unnamed(DataType::Jsonb), // debezium json payload
904 Field::unnamed(DataType::Varchar), // _rw_offset
905 Field::unnamed(DataType::Varchar), // _rw_table_name
906 ]);
907 let pk_indices = vec![1];
908 let (mut tx, source) = MockSource::channel();
909 let source = source.into_executor(schema.clone(), pk_indices.clone());
910 // let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();
911 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
912
913 let datums: Vec<Datum> = vec![
914 Some(JsonbVal::from_str(payload).unwrap().into()),
915 Some("file: 1.binlog, pos: 100".to_owned().into()),
916 Some("mydb.orders".to_owned().into()),
917 ];
918
919 println!("datums: {:?}", datums[1]);
920
921 let mut builders = schema.create_array_builders(8);
922 for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
923 builder.append(datum.clone());
924 }
925 let columns = builders
926 .into_iter()
927 .map(|builder| builder.finish().into())
928 .collect();
929
930 // one row chunk
931 let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
932
933 tx.push_chunk(chunk);
934 let upstream = Box::new(source).execute();
935
936 // schema to the debezium parser
937 let columns = vec![
938 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
939 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
940 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
941 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
942 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
943 ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
944 ];
945
946 let parsed_stream = transform_upstream(upstream, &columns);
947 pin_mut!(parsed_stream);
948 // the output chunk must contain the offset column
949 if let Some(message) = parsed_stream.next().await {
950 println!("chunk: {:#?}", message.unwrap());
951 }
952 }
953}