risingwave_stream/executor/backfill/cdc/
cdc_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use either::Either;
20use futures::stream;
21use futures::stream::select_with_strategy;
22use itertools::Itertools;
23use risingwave_common::array::DataChunk;
24use risingwave_common::bail;
25use risingwave_common::catalog::ColumnDesc;
26use risingwave_connector::parser::{
27    ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
28    ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, TimeHandling,
29    TimestampHandling, TimestamptzHandling,
30};
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
33use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
34use rw_futures_util::pausable;
35use thiserror_ext::AsReport;
36use tracing::Instrument;
37
38use crate::executor::UpdateMutation;
39use crate::executor::backfill::cdc::state::CdcBackfillState;
40use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
41use crate::executor::backfill::cdc::upstream_table::snapshot::{
42    SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
43};
44use crate::executor::backfill::utils::{
45    get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
46};
47use crate::executor::monitor::CdcBackfillMetrics;
48use crate::executor::prelude::*;
49use crate::executor::source::get_infinite_backoff_strategy;
50use crate::task::CreateMviewProgressReporter;
51
52/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
53const METADATA_STATE_LEN: usize = 4;
54
55pub struct CdcBackfillExecutor<S: StateStore> {
56    actor_ctx: ActorContextRef,
57
58    /// The external table to be backfilled
59    external_table: ExternalStorageTable,
60
61    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
62    upstream: Executor,
63
64    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
65    output_indices: Vec<usize>,
66
67    /// The schema of output chunk, including additional columns if any
68    output_columns: Vec<ColumnDesc>,
69
70    /// State table of the `CdcBackfill` executor
71    state_impl: CdcBackfillState<S>,
72
73    // TODO: introduce a CdcBackfillProgress to report finish to Meta
74    // This object is just a stub right now
75    progress: Option<CreateMviewProgressReporter>,
76
77    metrics: CdcBackfillMetrics,
78
79    /// Rate limit in rows/s.
80    rate_limit_rps: Option<u32>,
81
82    options: CdcScanOptions,
83
84    properties: BTreeMap<String, String>,
85}
86
87impl<S: StateStore> CdcBackfillExecutor<S> {
88    #[allow(clippy::too_many_arguments)]
89    pub fn new(
90        actor_ctx: ActorContextRef,
91        external_table: ExternalStorageTable,
92        upstream: Executor,
93        output_indices: Vec<usize>,
94        output_columns: Vec<ColumnDesc>,
95        progress: Option<CreateMviewProgressReporter>,
96        metrics: Arc<StreamingMetrics>,
97        state_table: StateTable<S>,
98        rate_limit_rps: Option<u32>,
99        options: CdcScanOptions,
100        properties: BTreeMap<String, String>,
101    ) -> Self {
102        let pk_indices = external_table.pk_indices();
103        let upstream_table_id = external_table.table_id().table_id;
104        let state_impl = CdcBackfillState::new(
105            upstream_table_id,
106            state_table,
107            pk_indices.len() + METADATA_STATE_LEN,
108        );
109
110        let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
111
112        Self {
113            actor_ctx,
114            external_table,
115            upstream,
116            output_indices,
117            output_columns,
118            state_impl,
119            progress,
120            metrics,
121            rate_limit_rps,
122            options,
123            properties,
124        }
125    }
126
127    fn report_metrics(
128        metrics: &CdcBackfillMetrics,
129        snapshot_processed_row_count: u64,
130        upstream_processed_row_count: u64,
131    ) {
132        metrics
133            .cdc_backfill_snapshot_read_row_count
134            .inc_by(snapshot_processed_row_count);
135
136        metrics
137            .cdc_backfill_upstream_output_row_count
138            .inc_by(upstream_processed_row_count);
139    }
140
141    #[try_stream(ok = Message, error = StreamExecutorError)]
142    async fn execute_inner(mut self) {
143        // The indices to primary key columns
144        let pk_indices = self.external_table.pk_indices().to_vec();
145        let pk_order = self.external_table.pk_order_types().to_vec();
146
147        let table_id = self.external_table.table_id().table_id;
148        let upstream_table_name = self.external_table.qualified_table_name();
149        let schema_table_name = self.external_table.schema_table_name().clone();
150        let external_database_name = self.external_table.database_name().to_owned();
151
152        let additional_columns = self
153            .output_columns
154            .iter()
155            .filter(|col| col.additional_column.column_type.is_some())
156            .cloned()
157            .collect_vec();
158
159        let mut upstream = self.upstream.execute();
160
161        // Current position of the upstream_table storage primary key.
162        // `None` means it starts from the beginning.
163        let mut current_pk_pos: Option<OwnedRow>;
164
165        // Poll the upstream to get the first barrier.
166        let first_barrier = expect_first_barrier(&mut upstream).await?;
167
168        let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
169        let first_barrier_epoch = first_barrier.epoch;
170        // The first barrier message should be propagated.
171        yield Message::Barrier(first_barrier);
172        let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
173
174        // Check whether this parallelism has been assigned splits,
175        // if not, we should bypass the backfill directly.
176        let mut state_impl = self.state_impl;
177
178        state_impl.init_epoch(first_barrier_epoch).await?;
179
180        // restore backfill state
181        let state = state_impl.restore_state().await?;
182        current_pk_pos = state.current_pk_pos.clone();
183
184        let need_backfill = !self.options.disable_backfill && !state.is_finished;
185
186        // Keep track of rows from the snapshot.
187        let mut total_snapshot_row_count = state.row_count as u64;
188
189        // After init the state table and forward the initial barrier to downstream,
190        // we now try to create the table reader with retry.
191        // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
192        // If backfill is finished, we should forward the upstream cdc events to downstream.
193        let mut table_reader: Option<ExternalTableReaderImpl> = None;
194        let external_table = self.external_table.clone();
195        let mut future = Box::pin(async move {
196            let backoff = get_infinite_backoff_strategy();
197            tokio_retry::Retry::spawn(backoff, || async {
198                match external_table.create_table_reader().await {
199                    Ok(reader) => Ok(reader),
200                    Err(e) => {
201                        tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
202                        Err(e)
203                    }
204                }
205            })
206            .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
207            .await
208            .expect("Retry create cdc table reader until success.")
209        });
210        let timestamp_handling: Option<TimestampHandling> = self
211            .properties
212            .get("debezium.time.precision.mode")
213            .map(|v| v == "connect")
214            .unwrap_or(false)
215            .then_some(TimestampHandling::Milli);
216        let timestamptz_handling: Option<TimestamptzHandling> = self
217            .properties
218            .get("debezium.time.precision.mode")
219            .map(|v| v == "connect")
220            .unwrap_or(false)
221            .then_some(TimestamptzHandling::Milli);
222        let time_handling: Option<TimeHandling> = self
223            .properties
224            .get("debezium.time.precision.mode")
225            .map(|v| v == "connect")
226            .unwrap_or(false)
227            .then_some(TimeHandling::Milli);
228        // Make sure to use mapping_message after transform_upstream.
229        let mut upstream = transform_upstream(
230            upstream,
231            self.output_columns.clone(),
232            timestamp_handling,
233            timestamptz_handling,
234            time_handling,
235        )
236        .boxed();
237        loop {
238            if let Some(msg) =
239                build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
240                    .await?
241            {
242                if let Some(msg) = mapping_message(msg, &self.output_indices) {
243                    match msg {
244                        Message::Barrier(barrier) => {
245                            // commit state to bump the epoch of state table
246                            state_impl.commit_state(barrier.epoch).await?;
247                            yield Message::Barrier(barrier);
248                        }
249                        Message::Chunk(chunk) => {
250                            if need_backfill {
251                                // ignore chunk if we need backfill, since we can read the data from the snapshot
252                            } else {
253                                // forward the chunk to downstream
254                                yield Message::Chunk(chunk);
255                            }
256                        }
257                        Message::Watermark(_) => {
258                            // ignore watermark
259                        }
260                    }
261                }
262            } else {
263                assert!(table_reader.is_some(), "table reader must created");
264                tracing::info!(
265                    table_id,
266                    upstream_table_name,
267                    "table reader created successfully"
268                );
269                break;
270            }
271        }
272
273        let upstream_table_reader = UpstreamTableReader::new(
274            self.external_table.clone(),
275            table_reader.expect("table reader must created"),
276        );
277
278        let mut upstream = upstream.peekable();
279
280        let mut last_binlog_offset: Option<CdcOffset> = {
281            // Limit concurrent CDC connections globally to 10 using a semaphore.
282            static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
283                tokio::sync::Semaphore::const_new(10);
284
285            let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
286            state
287                .last_cdc_offset
288                .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
289        };
290
291        let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
292        let mut consumed_binlog_offset: Option<CdcOffset> = None;
293
294        tracing::info!(
295            table_id,
296            upstream_table_name,
297            initial_binlog_offset = ?last_binlog_offset,
298            ?current_pk_pos,
299            is_finished = state.is_finished,
300            is_snapshot_paused,
301            snapshot_row_count = total_snapshot_row_count,
302            rate_limit = self.rate_limit_rps,
303            disable_backfill = self.options.disable_backfill,
304            snapshot_barrier_interval = self.options.snapshot_barrier_interval,
305            snapshot_batch_size = self.options.snapshot_batch_size,
306            "start cdc backfill",
307        );
308
309        // CDC Backfill Algorithm:
310        //
311        // When the first barrier comes from upstream:
312        //  - read the current binlog offset as `binlog_low`
313        //  - start a snapshot read upon upstream table and iterate over the snapshot read stream
314        //  - buffer the changelog event from upstream
315        //
316        // When a new barrier comes from upstream:
317        //  - read the current binlog offset as `binlog_high`
318        //  - for each row of the upstream change log, forward it to downstream if it in the range
319        //    of [binlog_low, binlog_high] and its pk <= `current_pos`, otherwise ignore it
320        //  - reconstruct the whole backfill stream with upstream changelog and a new table snapshot
321        //
322        // When a chunk comes from snapshot, we forward it to the downstream and raise
323        // `current_pos`.
324        // When we reach the end of the snapshot read stream, it means backfill has been
325        // finished.
326        //
327        // Once the backfill loop ends, we forward the upstream directly to the downstream.
328        if need_backfill {
329            // drive the upstream changelog first to ensure we can receive timely changelog event,
330            // otherwise the upstream changelog may be blocked by the snapshot read stream
331            let _ = Pin::new(&mut upstream).peek().await;
332
333            // wait for a barrier to make sure the backfill starts after upstream source
334            #[for_await]
335            for msg in upstream.by_ref() {
336                match msg? {
337                    Message::Barrier(barrier) => {
338                        match barrier.mutation.as_deref() {
339                            Some(crate::executor::Mutation::Pause) => {
340                                is_snapshot_paused = true;
341                                tracing::info!(
342                                    table_id,
343                                    upstream_table_name,
344                                    "snapshot is paused by barrier"
345                                );
346                            }
347                            Some(crate::executor::Mutation::Resume) => {
348                                is_snapshot_paused = false;
349                                tracing::info!(
350                                    table_id,
351                                    upstream_table_name,
352                                    "snapshot is resumed by barrier"
353                                );
354                            }
355                            _ => {
356                                // ignore other mutations
357                            }
358                        }
359                        // commit state just to bump the epoch of state table
360                        state_impl.commit_state(barrier.epoch).await?;
361                        yield Message::Barrier(barrier);
362                        break;
363                    }
364                    Message::Chunk(ref chunk) => {
365                        last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
366                    }
367                    Message::Watermark(_) => {
368                        // Ignore watermark
369                    }
370                }
371            }
372
373            tracing::info!(table_id,
374                upstream_table_name,
375                initial_binlog_offset = ?last_binlog_offset,
376                ?current_pk_pos,
377                is_snapshot_paused,
378                "start cdc backfill loop");
379
380            // the buffer will be drained when a barrier comes
381            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
382
383            'backfill_loop: loop {
384                let left_upstream = upstream.by_ref().map(Either::Left);
385
386                let mut snapshot_read_row_cnt: usize = 0;
387                let read_args = SnapshotReadArgs::new(
388                    current_pk_pos.clone(),
389                    self.rate_limit_rps,
390                    pk_indices.clone(),
391                    additional_columns.clone(),
392                    schema_table_name.clone(),
393                    external_database_name.clone(),
394                );
395                let right_snapshot = pin!(
396                    upstream_table_reader
397                        .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
398                        .map(Either::Right)
399                );
400                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
401                if is_snapshot_paused {
402                    snapshot_valve.pause();
403                }
404
405                // Prefer to select upstream, so we can stop snapshot stream when barrier comes.
406                let mut backfill_stream =
407                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
408                        stream::PollNext::Left
409                    });
410
411                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
412                let mut cur_barrier_upstream_processed_rows: u64 = 0;
413                let mut barrier_count: u32 = 0;
414                let mut pending_barrier = None;
415
416                #[for_await]
417                for either in &mut backfill_stream {
418                    match either {
419                        // Upstream
420                        Either::Left(msg) => {
421                            match msg? {
422                                Message::Barrier(barrier) => {
423                                    // increase the barrier count and check whether need to start a new snapshot
424                                    barrier_count += 1;
425                                    let can_start_new_snapshot =
426                                        barrier_count == self.options.snapshot_barrier_interval;
427
428                                    if let Some(mutation) = barrier.mutation.as_deref() {
429                                        use crate::executor::Mutation;
430                                        match mutation {
431                                            Mutation::Pause => {
432                                                is_snapshot_paused = true;
433                                                snapshot_valve.pause();
434                                            }
435                                            Mutation::Resume => {
436                                                is_snapshot_paused = false;
437                                                snapshot_valve.resume();
438                                            }
439                                            Mutation::Throttle(some) => {
440                                                if let Some(new_rate_limit) =
441                                                    some.get(&self.actor_ctx.id)
442                                                    && *new_rate_limit != self.rate_limit_rps
443                                                {
444                                                    self.rate_limit_rps = *new_rate_limit;
445                                                    rate_limit_to_zero = self
446                                                        .rate_limit_rps
447                                                        .is_some_and(|val| val == 0);
448                                                    // update and persist current backfill progress without draining the buffered upstream chunks
449                                                    state_impl
450                                                        .mutate_state(
451                                                            current_pk_pos.clone(),
452                                                            last_binlog_offset.clone(),
453                                                            total_snapshot_row_count,
454                                                            false,
455                                                        )
456                                                        .await?;
457                                                    state_impl.commit_state(barrier.epoch).await?;
458                                                    yield Message::Barrier(barrier);
459
460                                                    // rebuild the snapshot stream with new rate limit
461                                                    continue 'backfill_loop;
462                                                }
463                                            }
464                                            Mutation::Update(UpdateMutation {
465                                                dropped_actors,
466                                                ..
467                                            }) => {
468                                                if dropped_actors.contains(&self.actor_ctx.id) {
469                                                    // the actor has been dropped, exit the backfill loop
470                                                    tracing::info!(
471                                                        table_id,
472                                                        upstream_table_name,
473                                                        "CdcBackfill has been dropped due to config change"
474                                                    );
475                                                    yield Message::Barrier(barrier);
476                                                    break 'backfill_loop;
477                                                }
478                                            }
479                                            _ => (),
480                                        }
481                                    }
482
483                                    Self::report_metrics(
484                                        &self.metrics,
485                                        cur_barrier_snapshot_processed_rows,
486                                        cur_barrier_upstream_processed_rows,
487                                    );
488
489                                    // when processing a barrier, check whether can start a new snapshot
490                                    // if the number of barriers reaches the snapshot interval
491                                    if can_start_new_snapshot {
492                                        // staging the barrier
493                                        pending_barrier = Some(barrier);
494                                        tracing::debug!(
495                                            table_id,
496                                            ?current_pk_pos,
497                                            ?snapshot_read_row_cnt,
498                                            "Prepare to start a new snapshot"
499                                        );
500                                        // Break the loop for consuming snapshot and prepare to start a new snapshot
501                                        break;
502                                    } else {
503                                        // update and persist current backfill progress
504                                        state_impl
505                                            .mutate_state(
506                                                current_pk_pos.clone(),
507                                                last_binlog_offset.clone(),
508                                                total_snapshot_row_count,
509                                                false,
510                                            )
511                                            .await?;
512
513                                        state_impl.commit_state(barrier.epoch).await?;
514
515                                        // emit barrier and continue consume the backfill stream
516                                        yield Message::Barrier(barrier);
517                                    }
518                                }
519                                Message::Chunk(chunk) => {
520                                    // skip empty upstream chunk
521                                    if chunk.cardinality() == 0 {
522                                        continue;
523                                    }
524
525                                    let chunk_binlog_offset =
526                                        get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
527
528                                    tracing::trace!(
529                                        "recv changelog chunk: chunk_offset {:?}, capactiy {}",
530                                        chunk_binlog_offset,
531                                        chunk.capacity()
532                                    );
533
534                                    // Since we don't need changelog before the
535                                    // `last_binlog_offset`, skip the chunk that *only* contains
536                                    // events before `last_binlog_offset`.
537                                    if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
538                                        && let Some(chunk_offset) = chunk_binlog_offset
539                                        && chunk_offset < *last_binlog_offset
540                                    {
541                                        tracing::trace!(
542                                            "skip changelog chunk: chunk_offset {:?}, capacity {}",
543                                            chunk_offset,
544                                            chunk.capacity()
545                                        );
546                                        continue;
547                                    }
548                                    // Buffer the upstream chunk.
549                                    upstream_chunk_buffer.push(chunk.compact());
550                                }
551                                Message::Watermark(_) => {
552                                    // Ignore watermark during backfill.
553                                }
554                            }
555                        }
556                        // Snapshot read
557                        Either::Right(msg) => {
558                            match msg? {
559                                None => {
560                                    tracing::info!(
561                                        table_id,
562                                        ?last_binlog_offset,
563                                        ?current_pk_pos,
564                                        "snapshot read stream ends"
565                                    );
566                                    // If the snapshot read stream ends, it means all historical
567                                    // data has been loaded.
568                                    // We should not mark the chunk anymore,
569                                    // otherwise, we will ignore some rows in the buffer.
570                                    for chunk in upstream_chunk_buffer.drain(..) {
571                                        yield Message::Chunk(mapping_chunk(
572                                            chunk,
573                                            &self.output_indices,
574                                        ));
575                                    }
576
577                                    // backfill has finished, exit the backfill loop and persist the state when we recv a barrier
578                                    break 'backfill_loop;
579                                }
580                                Some(chunk) => {
581                                    // Raise the current position.
582                                    // As snapshot read streams are ordered by pk, so we can
583                                    // just use the last row to update `current_pos`.
584                                    current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
585
586                                    tracing::trace!(
587                                        "got a snapshot chunk: len {}, current_pk_pos {:?}",
588                                        chunk.cardinality(),
589                                        current_pk_pos
590                                    );
591                                    let chunk_cardinality = chunk.cardinality() as u64;
592                                    cur_barrier_snapshot_processed_rows += chunk_cardinality;
593                                    total_snapshot_row_count += chunk_cardinality;
594                                    yield Message::Chunk(mapping_chunk(
595                                        chunk,
596                                        &self.output_indices,
597                                    ));
598                                }
599                            }
600                        }
601                    }
602                }
603
604                assert!(pending_barrier.is_some(), "pending_barrier must exist");
605                let pending_barrier = pending_barrier.unwrap();
606
607                // Here we have to ensure the snapshot stream is consumed at least once,
608                // since the barrier event can kick in anytime.
609                // Otherwise, the result set of the new snapshot stream may become empty.
610                // It maybe a cancellation bug of the mysql driver.
611                let (_, mut snapshot_stream) = backfill_stream.into_inner();
612
613                // skip consume the snapshot stream if it is paused or rate limit to 0
614                if !is_snapshot_paused
615                    && !rate_limit_to_zero
616                    && let Some(msg) = snapshot_stream
617                        .next()
618                        .instrument_await("consume_snapshot_stream_once")
619                        .await
620                {
621                    let Either::Right(msg) = msg else {
622                        bail!("BUG: snapshot_read contains upstream messages");
623                    };
624                    match msg? {
625                        None => {
626                            tracing::info!(
627                                table_id,
628                                ?last_binlog_offset,
629                                ?current_pk_pos,
630                                "snapshot read stream ends in the force emit branch"
631                            );
632                            // End of the snapshot read stream.
633                            // Consume the buffered upstream chunk without filtering by `binlog_low`.
634                            for chunk in upstream_chunk_buffer.drain(..) {
635                                yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
636                            }
637
638                            // mark backfill has finished
639                            state_impl
640                                .mutate_state(
641                                    current_pk_pos.clone(),
642                                    last_binlog_offset.clone(),
643                                    total_snapshot_row_count,
644                                    true,
645                                )
646                                .await?;
647
648                            // commit state because we have received a barrier message
649                            state_impl.commit_state(pending_barrier.epoch).await?;
650                            yield Message::Barrier(pending_barrier);
651                            // end of backfill loop, since backfill has finished
652                            break 'backfill_loop;
653                        }
654                        Some(chunk) => {
655                            // Raise the current pk position.
656                            current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
657
658                            let row_count = chunk.cardinality() as u64;
659                            cur_barrier_snapshot_processed_rows += row_count;
660                            total_snapshot_row_count += row_count;
661                            snapshot_read_row_cnt += row_count as usize;
662
663                            tracing::debug!(
664                                table_id,
665                                ?current_pk_pos,
666                                ?snapshot_read_row_cnt,
667                                "force emit a snapshot chunk"
668                            );
669                            yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
670                        }
671                    }
672                }
673
674                // If the number of barriers reaches the snapshot interval,
675                // consume the buffered upstream chunks.
676                if let Some(current_pos) = &current_pk_pos {
677                    for chunk in upstream_chunk_buffer.drain(..) {
678                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
679
680                        // record the consumed binlog offset that will be
681                        // persisted later
682                        consumed_binlog_offset =
683                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
684
685                        yield Message::Chunk(mapping_chunk(
686                            mark_cdc_chunk(
687                                &offset_parse_func,
688                                chunk,
689                                current_pos,
690                                &pk_indices,
691                                &pk_order,
692                                last_binlog_offset.clone(),
693                            )?,
694                            &self.output_indices,
695                        ));
696                    }
697                } else {
698                    // If no current_pos, means we did not process any snapshot yet.
699                    // we can just ignore the upstream buffer chunk in that case.
700                    upstream_chunk_buffer.clear();
701                }
702
703                // Update last seen binlog offset
704                if consumed_binlog_offset.is_some() {
705                    last_binlog_offset.clone_from(&consumed_binlog_offset);
706                }
707
708                Self::report_metrics(
709                    &self.metrics,
710                    cur_barrier_snapshot_processed_rows,
711                    cur_barrier_upstream_processed_rows,
712                );
713
714                // update and persist current backfill progress
715                state_impl
716                    .mutate_state(
717                        current_pk_pos.clone(),
718                        last_binlog_offset.clone(),
719                        total_snapshot_row_count,
720                        false,
721                    )
722                    .await?;
723
724                state_impl.commit_state(pending_barrier.epoch).await?;
725                yield Message::Barrier(pending_barrier);
726            }
727        } else if self.options.disable_backfill {
728            // If backfill is disabled, we just mark the backfill as finished
729            tracing::info!(
730                table_id,
731                upstream_table_name,
732                "CdcBackfill has been disabled"
733            );
734            state_impl
735                .mutate_state(
736                    current_pk_pos.clone(),
737                    last_binlog_offset.clone(),
738                    total_snapshot_row_count,
739                    true,
740                )
741                .await?;
742        }
743
744        upstream_table_reader.disconnect().await?;
745
746        tracing::info!(
747            table_id,
748            upstream_table_name,
749            "CdcBackfill has already finished and will forward messages directly to the downstream"
750        );
751
752        // Wait for first barrier to come after backfill is finished.
753        // So we can update our progress + persist the status.
754        while let Some(Ok(msg)) = upstream.next().await {
755            if let Some(msg) = mapping_message(msg, &self.output_indices) {
756                // If not finished then we need to update state, otherwise no need.
757                if let Message::Barrier(barrier) = &msg {
758                    // finalized the backfill state
759                    // TODO: unify `mutate_state` and `commit_state` into one method
760                    state_impl
761                        .mutate_state(
762                            current_pk_pos.clone(),
763                            last_binlog_offset.clone(),
764                            total_snapshot_row_count,
765                            true,
766                        )
767                        .await?;
768                    state_impl.commit_state(barrier.epoch).await?;
769
770                    // mark progress as finished
771                    if let Some(progress) = self.progress.as_mut() {
772                        progress.finish(barrier.epoch, total_snapshot_row_count);
773                    }
774                    yield msg;
775                    // break after the state have been saved
776                    break;
777                }
778                yield msg;
779            }
780        }
781
782        // After backfill progress finished
783        // we can forward messages directly to the downstream,
784        // as backfill is finished.
785        #[for_await]
786        for msg in upstream {
787            // upstream offsets will be removed from the message before forwarding to
788            // downstream
789            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
790                if let Message::Barrier(barrier) = &msg {
791                    // commit state just to bump the epoch of state table
792                    state_impl.commit_state(barrier.epoch).await?;
793                }
794                yield msg;
795            }
796        }
797    }
798}
799
800pub(crate) async fn build_reader_and_poll_upstream(
801    upstream: &mut BoxedMessageStream,
802    table_reader: &mut Option<ExternalTableReaderImpl>,
803    future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
804) -> StreamExecutorResult<Option<Message>> {
805    if table_reader.is_some() {
806        return Ok(None);
807    }
808    tokio::select! {
809        biased;
810        reader = &mut *future => {
811            *table_reader = Some(reader);
812            Ok(None)
813        }
814        msg = upstream.next() => {
815            msg.transpose()
816        }
817    }
818}
819
820#[try_stream(ok = Message, error = StreamExecutorError)]
821pub async fn transform_upstream(
822    upstream: BoxedMessageStream,
823    output_columns: Vec<ColumnDesc>,
824    timestamp_handling: Option<TimestampHandling>,
825    timestamptz_handling: Option<TimestamptzHandling>,
826    time_handling: Option<TimeHandling>,
827) {
828    let props = SpecificParserConfig {
829        encoding_config: EncodingProperties::Json(JsonProperties {
830            use_schema_registry: false,
831            timestamp_handling,
832            timestamptz_handling,
833            time_handling,
834        }),
835        // the cdc message is generated internally so the key must exist.
836        protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
837    };
838
839    // convert to source column desc to feed into parser
840    let columns_with_meta = output_columns
841        .iter()
842        .map(SourceColumnDesc::from)
843        .collect_vec();
844    let mut parser = DebeziumParser::new(
845        props,
846        columns_with_meta.clone(),
847        Arc::new(SourceContext::dummy()),
848    )
849    .await
850    .map_err(StreamExecutorError::connector_error)?;
851
852    pin_mut!(upstream);
853    #[for_await]
854    for msg in upstream {
855        let mut msg = msg?;
856        if let Message::Chunk(chunk) = &mut msg {
857            let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
858            let _ = std::mem::replace(chunk, parsed_chunk);
859        }
860        yield msg;
861    }
862}
863
864async fn parse_debezium_chunk(
865    parser: &mut DebeziumParser,
866    chunk: &StreamChunk,
867) -> StreamExecutorResult<StreamChunk> {
868    // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema
869    // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the
870    // table job with `_rw_offset` in the end
871    // see `gen_create_table_plan_for_cdc_source` for details
872
873    // use `SourceStreamChunkBuilder` for convenience
874    let mut builder = SourceStreamChunkBuilder::new(
875        parser.columns().to_vec(),
876        SourceCtrlOpts {
877            chunk_size: chunk.capacity(),
878            split_txn: false,
879        },
880    );
881
882    // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)`
883    // We should use the debezium parser to parse the first column,
884    // then chain the parsed row with `_rw_offset` row to get a new row.
885    let payloads = chunk.data_chunk().project(&[0]);
886    let offsets = chunk.data_chunk().project(&[1]).compact();
887
888    // TODO: preserve the transaction semantics
889    for payload in payloads.rows() {
890        let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
891        else {
892            panic!("payload must be jsonb");
893        };
894
895        parser
896            .parse_inner(
897                None,
898                Some(jsonb_ref.to_string().as_bytes().to_vec()),
899                builder.row_writer(),
900            )
901            .await
902            .unwrap();
903    }
904    builder.finish_current_chunk();
905
906    let parsed_chunk = {
907        let mut iter = builder.consume_ready_chunks();
908        assert_eq!(1, iter.len());
909        iter.next().unwrap()
910    };
911    assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row
912    let (ops, mut columns, vis) = parsed_chunk.into_inner();
913    // note that `vis` is not necessarily the same as the original chunk's visibilities
914
915    // concat the rows in the parsed chunk with the `_rw_offset` column
916    columns.extend(offsets.into_parts().0);
917
918    Ok(StreamChunk::from_parts(
919        ops,
920        DataChunk::from_parts(columns.into(), vis),
921    ))
922}
923
924impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
925    fn execute(self: Box<Self>) -> BoxedMessageStream {
926        self.execute_inner().boxed()
927    }
928}
929
930#[cfg(test)]
931mod tests {
932    use std::collections::BTreeMap;
933    use std::str::FromStr;
934
935    use futures::{StreamExt, pin_mut};
936    use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
937    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
938    use risingwave_common::types::{DataType, Datum, JsonbVal};
939    use risingwave_common::util::epoch::test_epoch;
940    use risingwave_common::util::iter_util::ZipEqFast;
941    use risingwave_connector::source::cdc::CdcScanOptions;
942    use risingwave_storage::memory::MemoryStateStore;
943
944    use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
945    use crate::executor::monitor::StreamingMetrics;
946    use crate::executor::prelude::StateTable;
947    use crate::executor::source::default_source_internal_table;
948    use crate::executor::test_utils::MockSource;
949    use crate::executor::{
950        ActorContext, Barrier, CdcBackfillExecutor, ExternalStorageTable, Message,
951    };
952
953    #[tokio::test]
954    async fn test_transform_upstream_chunk() {
955        let schema = Schema::new(vec![
956            Field::unnamed(DataType::Jsonb),   // debezium json payload
957            Field::unnamed(DataType::Varchar), // _rw_offset
958            Field::unnamed(DataType::Varchar), // _rw_table_name
959        ]);
960        let pk_indices = vec![1];
961        let (mut tx, source) = MockSource::channel();
962        let source = source.into_executor(schema.clone(), pk_indices.clone());
963        // let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();
964        let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
965
966        let datums: Vec<Datum> = vec![
967            Some(JsonbVal::from_str(payload).unwrap().into()),
968            Some("file: 1.binlog, pos: 100".to_owned().into()),
969            Some("mydb.orders".to_owned().into()),
970        ];
971
972        println!("datums: {:?}", datums[1]);
973
974        let mut builders = schema.create_array_builders(8);
975        for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
976            builder.append(datum.clone());
977        }
978        let columns = builders
979            .into_iter()
980            .map(|builder| builder.finish().into())
981            .collect();
982
983        // one row chunk
984        let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
985
986        tx.push_chunk(chunk);
987        let upstream = Box::new(source).execute();
988
989        // schema to the debezium parser
990        let columns = vec![
991            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
992            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
993            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
994            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
995            ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
996            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
997        ];
998
999        let parsed_stream = transform_upstream(upstream, columns, None, None, None);
1000        pin_mut!(parsed_stream);
1001        // the output chunk must contain the offset column
1002        if let Some(message) = parsed_stream.next().await {
1003            println!("chunk: {:#?}", message.unwrap());
1004        }
1005    }
1006
1007    #[tokio::test]
1008    async fn test_build_reader_and_poll_upstream() {
1009        let actor_context = ActorContext::for_test(1);
1010        let external_storage_table = ExternalStorageTable::for_test_undefined();
1011        let schema = Schema::new(vec![
1012            Field::unnamed(DataType::Jsonb),   // debezium json payload
1013            Field::unnamed(DataType::Varchar), // _rw_offset
1014            Field::unnamed(DataType::Varchar), // _rw_table_name
1015        ]);
1016        let pk_indices = vec![1];
1017        let (mut tx, source) = MockSource::channel();
1018        let source = source.into_executor(schema.clone(), pk_indices.clone());
1019        let output_indices = vec![1, 0, 4]; //reorder
1020        let output_columns = vec![
1021            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1022            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1023            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1024            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1025            ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
1026            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1027        ];
1028        let store = MemoryStateStore::new();
1029        let state_table =
1030            StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
1031                .await;
1032        let cdc = CdcBackfillExecutor::new(
1033            actor_context,
1034            external_storage_table,
1035            source,
1036            output_indices,
1037            output_columns,
1038            None,
1039            StreamingMetrics::unused().into(),
1040            state_table,
1041            None,
1042            CdcScanOptions {
1043                // We want to mark backfill as finished. However it's not straightforward to do so.
1044                // Here we disable_backfill instead.
1045                disable_backfill: true,
1046                ..CdcScanOptions::default()
1047            },
1048            BTreeMap::default(),
1049        );
1050        // cdc.state_impl.init_epoch(EpochPair::new(test_epoch(4), test_epoch(3))).await.unwrap();
1051        // cdc.state_impl.mutate_state(None, None, 0, true).await.unwrap();
1052        // cdc.state_impl.commit_state(EpochPair::new(test_epoch(5), test_epoch(4))).await.unwrap();
1053        let s = cdc.execute_inner();
1054        pin_mut!(s);
1055
1056        // send first barrier
1057        tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1058        // send chunk
1059        {
1060            let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1061            let datums: Vec<Datum> = vec![
1062                Some(JsonbVal::from_str(payload).unwrap().into()),
1063                Some("file: 1.binlog, pos: 100".to_owned().into()),
1064                Some("mydb.orders".to_owned().into()),
1065            ];
1066            let mut builders = schema.create_array_builders(8);
1067            for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1068                builder.append(datum.clone());
1069            }
1070            let columns = builders
1071                .into_iter()
1072                .map(|builder| builder.finish().into())
1073                .collect();
1074            // one row chunk
1075            let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1076
1077            tx.push_chunk(chunk);
1078        }
1079        let _first_barrier = s.next().await.unwrap();
1080        let upstream_change_log = s.next().await.unwrap().unwrap();
1081        let Message::Chunk(chunk) = upstream_change_log else {
1082            panic!("expect chunk");
1083        };
1084        assert_eq!(chunk.columns().len(), 3);
1085        assert_eq!(chunk.rows().count(), 1);
1086        assert_eq!(
1087            chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1088            vec![Some(44485)]
1089        );
1090        assert_eq!(
1091            chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1092            vec![Some(5)]
1093        );
1094        assert_eq!(
1095            chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1096            vec![Some(100)]
1097        );
1098    }
1099}