risingwave_stream/executor/backfill/cdc/
cdc_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17
18use either::Either;
19use futures::stream;
20use futures::stream::select_with_strategy;
21use itertools::Itertools;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_connector::parser::{
26    ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
27    ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig,
28};
29use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
30use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
31use rw_futures_util::pausable;
32use thiserror_ext::AsReport;
33use tracing::Instrument;
34
35use crate::executor::UpdateMutation;
36use crate::executor::backfill::CdcScanOptions;
37use crate::executor::backfill::cdc::state::CdcBackfillState;
38use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
39use crate::executor::backfill::cdc::upstream_table::snapshot::{
40    SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
41};
42use crate::executor::backfill::utils::{
43    get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
44};
45use crate::executor::monitor::CdcBackfillMetrics;
46use crate::executor::prelude::*;
47use crate::executor::source::get_infinite_backoff_strategy;
48use crate::task::CreateMviewProgressReporter;
49
50/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
51const METADATA_STATE_LEN: usize = 4;
52
53pub struct CdcBackfillExecutor<S: StateStore> {
54    actor_ctx: ActorContextRef,
55
56    /// The external table to be backfilled
57    external_table: ExternalStorageTable,
58
59    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
60    upstream: Executor,
61
62    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
63    output_indices: Vec<usize>,
64
65    /// The schema of output chunk, including additional columns if any
66    output_columns: Vec<ColumnDesc>,
67
68    /// State table of the `CdcBackfill` executor
69    state_impl: CdcBackfillState<S>,
70
71    // TODO: introduce a CdcBackfillProgress to report finish to Meta
72    // This object is just a stub right now
73    progress: Option<CreateMviewProgressReporter>,
74
75    metrics: CdcBackfillMetrics,
76
77    /// Rate limit in rows/s.
78    rate_limit_rps: Option<u32>,
79
80    options: CdcScanOptions,
81}
82
83impl<S: StateStore> CdcBackfillExecutor<S> {
84    #[allow(clippy::too_many_arguments)]
85    pub fn new(
86        actor_ctx: ActorContextRef,
87        external_table: ExternalStorageTable,
88        upstream: Executor,
89        output_indices: Vec<usize>,
90        output_columns: Vec<ColumnDesc>,
91        progress: Option<CreateMviewProgressReporter>,
92        metrics: Arc<StreamingMetrics>,
93        state_table: StateTable<S>,
94        rate_limit_rps: Option<u32>,
95        options: CdcScanOptions,
96    ) -> Self {
97        let pk_indices = external_table.pk_indices();
98        let upstream_table_id = external_table.table_id().table_id;
99        let state_impl = CdcBackfillState::new(
100            upstream_table_id,
101            state_table,
102            pk_indices.len() + METADATA_STATE_LEN,
103        );
104
105        let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
106
107        Self {
108            actor_ctx,
109            external_table,
110            upstream,
111            output_indices,
112            output_columns,
113            state_impl,
114            progress,
115            metrics,
116            rate_limit_rps,
117            options,
118        }
119    }
120
121    fn report_metrics(
122        metrics: &CdcBackfillMetrics,
123        snapshot_processed_row_count: u64,
124        upstream_processed_row_count: u64,
125    ) {
126        metrics
127            .cdc_backfill_snapshot_read_row_count
128            .inc_by(snapshot_processed_row_count);
129
130        metrics
131            .cdc_backfill_upstream_output_row_count
132            .inc_by(upstream_processed_row_count);
133    }
134
135    #[try_stream(ok = Message, error = StreamExecutorError)]
136    async fn execute_inner(mut self) {
137        // The indices to primary key columns
138        let pk_indices = self.external_table.pk_indices().to_vec();
139        let pk_order = self.external_table.pk_order_types().to_vec();
140
141        let table_id = self.external_table.table_id().table_id;
142        let upstream_table_name = self.external_table.qualified_table_name();
143        let schema_table_name = self.external_table.schema_table_name().clone();
144        let external_database_name = self.external_table.database_name().to_owned();
145
146        let additional_columns = self
147            .output_columns
148            .iter()
149            .filter(|col| col.additional_column.column_type.is_some())
150            .cloned()
151            .collect_vec();
152
153        let mut upstream = self.upstream.execute();
154
155        // Current position of the upstream_table storage primary key.
156        // `None` means it starts from the beginning.
157        let mut current_pk_pos: Option<OwnedRow>;
158
159        // Poll the upstream to get the first barrier.
160        let first_barrier = expect_first_barrier(&mut upstream).await?;
161
162        let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
163        let first_barrier_epoch = first_barrier.epoch;
164        // The first barrier message should be propagated.
165        yield Message::Barrier(first_barrier);
166        let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
167
168        // Check whether this parallelism has been assigned splits,
169        // if not, we should bypass the backfill directly.
170        let mut state_impl = self.state_impl;
171
172        state_impl.init_epoch(first_barrier_epoch).await?;
173
174        // restore backfill state
175        let state = state_impl.restore_state().await?;
176        current_pk_pos = state.current_pk_pos.clone();
177
178        let need_backfill = !self.options.disable_backfill && !state.is_finished;
179
180        // Keep track of rows from the snapshot.
181        let mut total_snapshot_row_count = state.row_count as u64;
182
183        // After init the state table and forward the initial barrier to downstream,
184        // we now try to create the table reader with retry.
185        // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
186        // If backfill is finished, we should forward the upstream cdc events to downstream.
187        let mut table_reader: Option<ExternalTableReaderImpl> = None;
188        let external_table = self.external_table.clone();
189        let mut future = Box::pin(async move {
190            let backoff = get_infinite_backoff_strategy();
191            tokio_retry::Retry::spawn(backoff, || async {
192                match external_table.create_table_reader().await {
193                    Ok(reader) => Ok(reader),
194                    Err(e) => {
195                        tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
196                        Err(e)
197                    }
198                }
199            })
200            .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
201            .await
202            .expect("Retry create cdc table reader until success.")
203        });
204
205        // Make sure to use mapping_message after transform_upstream.
206        let mut upstream = transform_upstream(upstream, self.output_columns.clone()).boxed();
207        loop {
208            if let Some(msg) =
209                build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
210                    .await?
211            {
212                if let Some(msg) = mapping_message(msg, &self.output_indices) {
213                    match msg {
214                        Message::Barrier(barrier) => {
215                            // commit state to bump the epoch of state table
216                            state_impl.commit_state(barrier.epoch).await?;
217                            yield Message::Barrier(barrier);
218                        }
219                        Message::Chunk(chunk) => {
220                            if need_backfill {
221                                // ignore chunk if we need backfill, since we can read the data from the snapshot
222                            } else {
223                                // forward the chunk to downstream
224                                yield Message::Chunk(chunk);
225                            }
226                        }
227                        Message::Watermark(_) => {
228                            // ignore watermark
229                        }
230                    }
231                }
232            } else {
233                assert!(table_reader.is_some(), "table reader must created");
234                tracing::info!(
235                    table_id,
236                    upstream_table_name,
237                    "table reader created successfully"
238                );
239                break;
240            }
241        }
242
243        let upstream_table_reader = UpstreamTableReader::new(
244            self.external_table.clone(),
245            table_reader.expect("table reader must created"),
246        );
247
248        let mut upstream = upstream.peekable();
249
250        let mut last_binlog_offset: Option<CdcOffset> = {
251            // Limit concurrent CDC connections globally to 10 using a semaphore.
252            static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
253                tokio::sync::Semaphore::const_new(10);
254
255            let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
256            state
257                .last_cdc_offset
258                .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
259        };
260
261        let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
262        let mut consumed_binlog_offset: Option<CdcOffset> = None;
263
264        tracing::info!(
265            table_id,
266            upstream_table_name,
267            initial_binlog_offset = ?last_binlog_offset,
268            ?current_pk_pos,
269            is_finished = state.is_finished,
270            is_snapshot_paused,
271            snapshot_row_count = total_snapshot_row_count,
272            rate_limit = self.rate_limit_rps,
273            disable_backfill = self.options.disable_backfill,
274            snapshot_interval = self.options.snapshot_interval,
275            snapshot_batch_size = self.options.snapshot_batch_size,
276            "start cdc backfill",
277        );
278
279        // CDC Backfill Algorithm:
280        //
281        // When the first barrier comes from upstream:
282        //  - read the current binlog offset as `binlog_low`
283        //  - start a snapshot read upon upstream table and iterate over the snapshot read stream
284        //  - buffer the changelog event from upstream
285        //
286        // When a new barrier comes from upstream:
287        //  - read the current binlog offset as `binlog_high`
288        //  - for each row of the upstream change log, forward it to downstream if it in the range
289        //    of [binlog_low, binlog_high] and its pk <= `current_pos`, otherwise ignore it
290        //  - reconstruct the whole backfill stream with upstream changelog and a new table snapshot
291        //
292        // When a chunk comes from snapshot, we forward it to the downstream and raise
293        // `current_pos`.
294        // When we reach the end of the snapshot read stream, it means backfill has been
295        // finished.
296        //
297        // Once the backfill loop ends, we forward the upstream directly to the downstream.
298        if need_backfill {
299            // drive the upstream changelog first to ensure we can receive timely changelog event,
300            // otherwise the upstream changelog may be blocked by the snapshot read stream
301            let _ = Pin::new(&mut upstream).peek().await;
302
303            // wait for a barrier to make sure the backfill starts after upstream source
304            #[for_await]
305            for msg in upstream.by_ref() {
306                match msg? {
307                    Message::Barrier(barrier) => {
308                        match barrier.mutation.as_deref() {
309                            Some(crate::executor::Mutation::Pause) => {
310                                is_snapshot_paused = true;
311                                tracing::info!(
312                                    table_id,
313                                    upstream_table_name,
314                                    "snapshot is paused by barrier"
315                                );
316                            }
317                            Some(crate::executor::Mutation::Resume) => {
318                                is_snapshot_paused = false;
319                                tracing::info!(
320                                    table_id,
321                                    upstream_table_name,
322                                    "snapshot is resumed by barrier"
323                                );
324                            }
325                            _ => {
326                                // ignore other mutations
327                            }
328                        }
329                        // commit state just to bump the epoch of state table
330                        state_impl.commit_state(barrier.epoch).await?;
331                        yield Message::Barrier(barrier);
332                        break;
333                    }
334                    Message::Chunk(ref chunk) => {
335                        last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
336                    }
337                    Message::Watermark(_) => {
338                        // Ignore watermark
339                    }
340                }
341            }
342
343            tracing::info!(table_id,
344                upstream_table_name,
345                initial_binlog_offset = ?last_binlog_offset,
346                ?current_pk_pos,
347                is_snapshot_paused,
348                "start cdc backfill loop");
349
350            // the buffer will be drained when a barrier comes
351            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
352
353            'backfill_loop: loop {
354                let left_upstream = upstream.by_ref().map(Either::Left);
355
356                let mut snapshot_read_row_cnt: usize = 0;
357                let read_args = SnapshotReadArgs::new(
358                    current_pk_pos.clone(),
359                    self.rate_limit_rps,
360                    pk_indices.clone(),
361                    additional_columns.clone(),
362                    schema_table_name.clone(),
363                    external_database_name.clone(),
364                );
365                let right_snapshot = pin!(
366                    upstream_table_reader
367                        .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
368                        .map(Either::Right)
369                );
370                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
371                if is_snapshot_paused {
372                    snapshot_valve.pause();
373                }
374
375                // Prefer to select upstream, so we can stop snapshot stream when barrier comes.
376                let mut backfill_stream =
377                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
378                        stream::PollNext::Left
379                    });
380
381                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
382                let mut cur_barrier_upstream_processed_rows: u64 = 0;
383                let mut barrier_count: u32 = 0;
384                let mut pending_barrier = None;
385
386                #[for_await]
387                for either in &mut backfill_stream {
388                    match either {
389                        // Upstream
390                        Either::Left(msg) => {
391                            match msg? {
392                                Message::Barrier(barrier) => {
393                                    // increase the barrier count and check whether need to start a new snapshot
394                                    barrier_count += 1;
395                                    let can_start_new_snapshot =
396                                        barrier_count == self.options.snapshot_interval;
397
398                                    if let Some(mutation) = barrier.mutation.as_deref() {
399                                        use crate::executor::Mutation;
400                                        match mutation {
401                                            Mutation::Pause => {
402                                                is_snapshot_paused = true;
403                                                snapshot_valve.pause();
404                                            }
405                                            Mutation::Resume => {
406                                                is_snapshot_paused = false;
407                                                snapshot_valve.resume();
408                                            }
409                                            Mutation::Throttle(some) => {
410                                                if let Some(new_rate_limit) =
411                                                    some.get(&self.actor_ctx.id)
412                                                    && *new_rate_limit != self.rate_limit_rps
413                                                {
414                                                    self.rate_limit_rps = *new_rate_limit;
415                                                    rate_limit_to_zero = self
416                                                        .rate_limit_rps
417                                                        .is_some_and(|val| val == 0);
418                                                    // update and persist current backfill progress without draining the buffered upstream chunks
419                                                    state_impl
420                                                        .mutate_state(
421                                                            current_pk_pos.clone(),
422                                                            last_binlog_offset.clone(),
423                                                            total_snapshot_row_count,
424                                                            false,
425                                                        )
426                                                        .await?;
427                                                    state_impl.commit_state(barrier.epoch).await?;
428                                                    yield Message::Barrier(barrier);
429
430                                                    // rebuild the snapshot stream with new rate limit
431                                                    continue 'backfill_loop;
432                                                }
433                                            }
434                                            Mutation::Update(UpdateMutation {
435                                                dropped_actors,
436                                                ..
437                                            }) => {
438                                                if dropped_actors.contains(&self.actor_ctx.id) {
439                                                    // the actor has been dropped, exit the backfill loop
440                                                    tracing::info!(
441                                                        table_id,
442                                                        upstream_table_name,
443                                                        "CdcBackfill has been dropped due to config change"
444                                                    );
445                                                    yield Message::Barrier(barrier);
446                                                    break 'backfill_loop;
447                                                }
448                                            }
449                                            _ => (),
450                                        }
451                                    }
452
453                                    Self::report_metrics(
454                                        &self.metrics,
455                                        cur_barrier_snapshot_processed_rows,
456                                        cur_barrier_upstream_processed_rows,
457                                    );
458
459                                    // when processing a barrier, check whether can start a new snapshot
460                                    // if the number of barriers reaches the snapshot interval
461                                    if can_start_new_snapshot {
462                                        // staging the barrier
463                                        pending_barrier = Some(barrier);
464                                        tracing::debug!(
465                                            table_id,
466                                            ?current_pk_pos,
467                                            ?snapshot_read_row_cnt,
468                                            "Prepare to start a new snapshot"
469                                        );
470                                        // Break the loop for consuming snapshot and prepare to start a new snapshot
471                                        break;
472                                    } else {
473                                        // update and persist current backfill progress
474                                        state_impl
475                                            .mutate_state(
476                                                current_pk_pos.clone(),
477                                                last_binlog_offset.clone(),
478                                                total_snapshot_row_count,
479                                                false,
480                                            )
481                                            .await?;
482
483                                        state_impl.commit_state(barrier.epoch).await?;
484
485                                        // emit barrier and continue consume the backfill stream
486                                        yield Message::Barrier(barrier);
487                                    }
488                                }
489                                Message::Chunk(chunk) => {
490                                    // skip empty upstream chunk
491                                    if chunk.cardinality() == 0 {
492                                        continue;
493                                    }
494
495                                    let chunk_binlog_offset =
496                                        get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
497
498                                    tracing::trace!(
499                                        "recv changelog chunk: chunk_offset {:?}, capactiy {}",
500                                        chunk_binlog_offset,
501                                        chunk.capacity()
502                                    );
503
504                                    // Since we don't need changelog before the
505                                    // `last_binlog_offset`, skip the chunk that *only* contains
506                                    // events before `last_binlog_offset`.
507                                    if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
508                                        && let Some(chunk_offset) = chunk_binlog_offset
509                                        && chunk_offset < *last_binlog_offset
510                                    {
511                                        tracing::trace!(
512                                            "skip changelog chunk: chunk_offset {:?}, capacity {}",
513                                            chunk_offset,
514                                            chunk.capacity()
515                                        );
516                                        continue;
517                                    }
518                                    // Buffer the upstream chunk.
519                                    upstream_chunk_buffer.push(chunk.compact());
520                                }
521                                Message::Watermark(_) => {
522                                    // Ignore watermark during backfill.
523                                }
524                            }
525                        }
526                        // Snapshot read
527                        Either::Right(msg) => {
528                            match msg? {
529                                None => {
530                                    tracing::info!(
531                                        table_id,
532                                        ?last_binlog_offset,
533                                        ?current_pk_pos,
534                                        "snapshot read stream ends"
535                                    );
536                                    // If the snapshot read stream ends, it means all historical
537                                    // data has been loaded.
538                                    // We should not mark the chunk anymore,
539                                    // otherwise, we will ignore some rows in the buffer.
540                                    for chunk in upstream_chunk_buffer.drain(..) {
541                                        yield Message::Chunk(mapping_chunk(
542                                            chunk,
543                                            &self.output_indices,
544                                        ));
545                                    }
546
547                                    // backfill has finished, exit the backfill loop and persist the state when we recv a barrier
548                                    break 'backfill_loop;
549                                }
550                                Some(chunk) => {
551                                    // Raise the current position.
552                                    // As snapshot read streams are ordered by pk, so we can
553                                    // just use the last row to update `current_pos`.
554                                    current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
555
556                                    tracing::trace!(
557                                        "got a snapshot chunk: len {}, current_pk_pos {:?}",
558                                        chunk.cardinality(),
559                                        current_pk_pos
560                                    );
561                                    let chunk_cardinality = chunk.cardinality() as u64;
562                                    cur_barrier_snapshot_processed_rows += chunk_cardinality;
563                                    total_snapshot_row_count += chunk_cardinality;
564                                    yield Message::Chunk(mapping_chunk(
565                                        chunk,
566                                        &self.output_indices,
567                                    ));
568                                }
569                            }
570                        }
571                    }
572                }
573
574                assert!(pending_barrier.is_some(), "pending_barrier must exist");
575                let pending_barrier = pending_barrier.unwrap();
576
577                // Here we have to ensure the snapshot stream is consumed at least once,
578                // since the barrier event can kick in anytime.
579                // Otherwise, the result set of the new snapshot stream may become empty.
580                // It maybe a cancellation bug of the mysql driver.
581                let (_, mut snapshot_stream) = backfill_stream.into_inner();
582
583                // skip consume the snapshot stream if it is paused or rate limit to 0
584                if !is_snapshot_paused
585                    && !rate_limit_to_zero
586                    && let Some(msg) = snapshot_stream
587                        .next()
588                        .instrument_await("consume_snapshot_stream_once")
589                        .await
590                {
591                    let Either::Right(msg) = msg else {
592                        bail!("BUG: snapshot_read contains upstream messages");
593                    };
594                    match msg? {
595                        None => {
596                            tracing::info!(
597                                table_id,
598                                ?last_binlog_offset,
599                                ?current_pk_pos,
600                                "snapshot read stream ends in the force emit branch"
601                            );
602                            // End of the snapshot read stream.
603                            // Consume the buffered upstream chunk without filtering by `binlog_low`.
604                            for chunk in upstream_chunk_buffer.drain(..) {
605                                yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
606                            }
607
608                            // mark backfill has finished
609                            state_impl
610                                .mutate_state(
611                                    current_pk_pos.clone(),
612                                    last_binlog_offset.clone(),
613                                    total_snapshot_row_count,
614                                    true,
615                                )
616                                .await?;
617
618                            // commit state because we have received a barrier message
619                            state_impl.commit_state(pending_barrier.epoch).await?;
620                            yield Message::Barrier(pending_barrier);
621                            // end of backfill loop, since backfill has finished
622                            break 'backfill_loop;
623                        }
624                        Some(chunk) => {
625                            // Raise the current pk position.
626                            current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
627
628                            let row_count = chunk.cardinality() as u64;
629                            cur_barrier_snapshot_processed_rows += row_count;
630                            total_snapshot_row_count += row_count;
631                            snapshot_read_row_cnt += row_count as usize;
632
633                            tracing::debug!(
634                                table_id,
635                                ?current_pk_pos,
636                                ?snapshot_read_row_cnt,
637                                "force emit a snapshot chunk"
638                            );
639                            yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
640                        }
641                    }
642                }
643
644                // If the number of barriers reaches the snapshot interval,
645                // consume the buffered upstream chunks.
646                if let Some(current_pos) = &current_pk_pos {
647                    for chunk in upstream_chunk_buffer.drain(..) {
648                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
649
650                        // record the consumed binlog offset that will be
651                        // persisted later
652                        consumed_binlog_offset =
653                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
654
655                        yield Message::Chunk(mapping_chunk(
656                            mark_cdc_chunk(
657                                &offset_parse_func,
658                                chunk,
659                                current_pos,
660                                &pk_indices,
661                                &pk_order,
662                                last_binlog_offset.clone(),
663                            )?,
664                            &self.output_indices,
665                        ));
666                    }
667                } else {
668                    // If no current_pos, means we did not process any snapshot yet.
669                    // we can just ignore the upstream buffer chunk in that case.
670                    upstream_chunk_buffer.clear();
671                }
672
673                // Update last seen binlog offset
674                if consumed_binlog_offset.is_some() {
675                    last_binlog_offset.clone_from(&consumed_binlog_offset);
676                }
677
678                Self::report_metrics(
679                    &self.metrics,
680                    cur_barrier_snapshot_processed_rows,
681                    cur_barrier_upstream_processed_rows,
682                );
683
684                // update and persist current backfill progress
685                state_impl
686                    .mutate_state(
687                        current_pk_pos.clone(),
688                        last_binlog_offset.clone(),
689                        total_snapshot_row_count,
690                        false,
691                    )
692                    .await?;
693
694                state_impl.commit_state(pending_barrier.epoch).await?;
695                yield Message::Barrier(pending_barrier);
696            }
697        } else if self.options.disable_backfill {
698            // If backfill is disabled, we just mark the backfill as finished
699            tracing::info!(
700                table_id,
701                upstream_table_name,
702                "CdcBackfill has been disabled"
703            );
704            state_impl
705                .mutate_state(
706                    current_pk_pos.clone(),
707                    last_binlog_offset.clone(),
708                    total_snapshot_row_count,
709                    true,
710                )
711                .await?;
712        }
713
714        upstream_table_reader.disconnect().await?;
715
716        tracing::info!(
717            table_id,
718            upstream_table_name,
719            "CdcBackfill has already finished and will forward messages directly to the downstream"
720        );
721
722        // Wait for first barrier to come after backfill is finished.
723        // So we can update our progress + persist the status.
724        while let Some(Ok(msg)) = upstream.next().await {
725            if let Some(msg) = mapping_message(msg, &self.output_indices) {
726                // If not finished then we need to update state, otherwise no need.
727                if let Message::Barrier(barrier) = &msg {
728                    // finalized the backfill state
729                    // TODO: unify `mutate_state` and `commit_state` into one method
730                    state_impl
731                        .mutate_state(
732                            current_pk_pos.clone(),
733                            last_binlog_offset.clone(),
734                            total_snapshot_row_count,
735                            true,
736                        )
737                        .await?;
738                    state_impl.commit_state(barrier.epoch).await?;
739
740                    // mark progress as finished
741                    if let Some(progress) = self.progress.as_mut() {
742                        progress.finish(barrier.epoch, total_snapshot_row_count);
743                    }
744                    yield msg;
745                    // break after the state have been saved
746                    break;
747                }
748                yield msg;
749            }
750        }
751
752        // After backfill progress finished
753        // we can forward messages directly to the downstream,
754        // as backfill is finished.
755        #[for_await]
756        for msg in upstream {
757            // upstream offsets will be removed from the message before forwarding to
758            // downstream
759            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
760                if let Message::Barrier(barrier) = &msg {
761                    // commit state just to bump the epoch of state table
762                    state_impl.commit_state(barrier.epoch).await?;
763                }
764                yield msg;
765            }
766        }
767    }
768}
769
770async fn build_reader_and_poll_upstream(
771    upstream: &mut BoxedMessageStream,
772    table_reader: &mut Option<ExternalTableReaderImpl>,
773    future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
774) -> StreamExecutorResult<Option<Message>> {
775    if table_reader.is_some() {
776        return Ok(None);
777    }
778    tokio::select! {
779        biased;
780        reader = &mut *future => {
781            *table_reader = Some(reader);
782            Ok(None)
783        }
784        msg = upstream.next() => {
785            msg.transpose()
786        }
787    }
788}
789
790#[try_stream(ok = Message, error = StreamExecutorError)]
791pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: Vec<ColumnDesc>) {
792    let props = SpecificParserConfig {
793        encoding_config: EncodingProperties::Json(JsonProperties {
794            use_schema_registry: false,
795            timestamptz_handling: None,
796        }),
797        // the cdc message is generated internally so the key must exist.
798        protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
799    };
800
801    // convert to source column desc to feed into parser
802    let columns_with_meta = output_columns
803        .iter()
804        .map(SourceColumnDesc::from)
805        .collect_vec();
806
807    let mut parser = DebeziumParser::new(
808        props,
809        columns_with_meta.clone(),
810        Arc::new(SourceContext::dummy()),
811    )
812    .await
813    .map_err(StreamExecutorError::connector_error)?;
814
815    pin_mut!(upstream);
816    #[for_await]
817    for msg in upstream {
818        let mut msg = msg?;
819        if let Message::Chunk(chunk) = &mut msg {
820            let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
821            let _ = std::mem::replace(chunk, parsed_chunk);
822        }
823        yield msg;
824    }
825}
826
827async fn parse_debezium_chunk(
828    parser: &mut DebeziumParser,
829    chunk: &StreamChunk,
830) -> StreamExecutorResult<StreamChunk> {
831    // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema
832    // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the
833    // table job with `_rw_offset` in the end
834    // see `gen_create_table_plan_for_cdc_source` for details
835
836    // use `SourceStreamChunkBuilder` for convenience
837    let mut builder = SourceStreamChunkBuilder::new(
838        parser.columns().to_vec(),
839        SourceCtrlOpts {
840            chunk_size: chunk.capacity(),
841            split_txn: false,
842        },
843    );
844
845    // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)`
846    // We should use the debezium parser to parse the first column,
847    // then chain the parsed row with `_rw_offset` row to get a new row.
848    let payloads = chunk.data_chunk().project(&[0]);
849    let offsets = chunk.data_chunk().project(&[1]).compact();
850
851    // TODO: preserve the transaction semantics
852    for payload in payloads.rows() {
853        let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
854        else {
855            panic!("payload must be jsonb");
856        };
857
858        parser
859            .parse_inner(
860                None,
861                Some(jsonb_ref.to_string().as_bytes().to_vec()),
862                builder.row_writer(),
863            )
864            .await
865            .unwrap();
866    }
867    builder.finish_current_chunk();
868
869    let parsed_chunk = {
870        let mut iter = builder.consume_ready_chunks();
871        assert_eq!(1, iter.len());
872        iter.next().unwrap()
873    };
874    assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row
875    let (ops, mut columns, vis) = parsed_chunk.into_inner();
876    // note that `vis` is not necessarily the same as the original chunk's visibilities
877
878    // concat the rows in the parsed chunk with the `_rw_offset` column
879    columns.extend(offsets.into_parts().0);
880
881    Ok(StreamChunk::from_parts(
882        ops,
883        DataChunk::from_parts(columns.into(), vis),
884    ))
885}
886
887impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
888    fn execute(self: Box<Self>) -> BoxedMessageStream {
889        self.execute_inner().boxed()
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use std::str::FromStr;
896
897    use futures::{StreamExt, pin_mut};
898    use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
899    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
900    use risingwave_common::types::{DataType, Datum, JsonbVal};
901    use risingwave_common::util::epoch::test_epoch;
902    use risingwave_common::util::iter_util::ZipEqFast;
903    use risingwave_storage::memory::MemoryStateStore;
904
905    use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
906    use crate::executor::monitor::StreamingMetrics;
907    use crate::executor::prelude::StateTable;
908    use crate::executor::source::default_source_internal_table;
909    use crate::executor::test_utils::MockSource;
910    use crate::executor::{
911        ActorContext, Barrier, CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable, Message,
912    };
913
914    #[tokio::test]
915    async fn test_transform_upstream_chunk() {
916        let schema = Schema::new(vec![
917            Field::unnamed(DataType::Jsonb),   // debezium json payload
918            Field::unnamed(DataType::Varchar), // _rw_offset
919            Field::unnamed(DataType::Varchar), // _rw_table_name
920        ]);
921        let pk_indices = vec![1];
922        let (mut tx, source) = MockSource::channel();
923        let source = source.into_executor(schema.clone(), pk_indices.clone());
924        // let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();
925        let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
926
927        let datums: Vec<Datum> = vec![
928            Some(JsonbVal::from_str(payload).unwrap().into()),
929            Some("file: 1.binlog, pos: 100".to_owned().into()),
930            Some("mydb.orders".to_owned().into()),
931        ];
932
933        println!("datums: {:?}", datums[1]);
934
935        let mut builders = schema.create_array_builders(8);
936        for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
937            builder.append(datum.clone());
938        }
939        let columns = builders
940            .into_iter()
941            .map(|builder| builder.finish().into())
942            .collect();
943
944        // one row chunk
945        let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
946
947        tx.push_chunk(chunk);
948        let upstream = Box::new(source).execute();
949
950        // schema to the debezium parser
951        let columns = vec![
952            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
953            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
954            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
955            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
956            ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
957            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
958        ];
959
960        let parsed_stream = transform_upstream(upstream, columns);
961        pin_mut!(parsed_stream);
962        // the output chunk must contain the offset column
963        if let Some(message) = parsed_stream.next().await {
964            println!("chunk: {:#?}", message.unwrap());
965        }
966    }
967
968    #[tokio::test]
969    async fn test_build_reader_and_poll_upstream() {
970        let actor_context = ActorContext::for_test(1);
971        let external_storage_table = ExternalStorageTable::for_test_undefined();
972        let schema = Schema::new(vec![
973            Field::unnamed(DataType::Jsonb),   // debezium json payload
974            Field::unnamed(DataType::Varchar), // _rw_offset
975            Field::unnamed(DataType::Varchar), // _rw_table_name
976        ]);
977        let pk_indices = vec![1];
978        let (mut tx, source) = MockSource::channel();
979        let source = source.into_executor(schema.clone(), pk_indices.clone());
980        let output_indices = vec![1, 0, 4]; //reorder
981        let output_columns = vec![
982            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
983            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
984            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
985            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
986            ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
987            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
988        ];
989        let store = MemoryStateStore::new();
990        let state_table =
991            StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
992                .await;
993        let cdc = CdcBackfillExecutor::new(
994            actor_context,
995            external_storage_table,
996            source,
997            output_indices,
998            output_columns,
999            None,
1000            StreamingMetrics::unused().into(),
1001            state_table,
1002            None,
1003            CdcScanOptions {
1004                // We want to mark backfill as finished. However it's not straightforward to do so.
1005                // Here we disable_backfill instead.
1006                disable_backfill: true,
1007                ..CdcScanOptions::default()
1008            },
1009        );
1010        // cdc.state_impl.init_epoch(EpochPair::new(test_epoch(4), test_epoch(3))).await.unwrap();
1011        // cdc.state_impl.mutate_state(None, None, 0, true).await.unwrap();
1012        // cdc.state_impl.commit_state(EpochPair::new(test_epoch(5), test_epoch(4))).await.unwrap();
1013        let s = cdc.execute_inner();
1014        pin_mut!(s);
1015
1016        // send first barrier
1017        tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1018        // send chunk
1019        {
1020            let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1021            let datums: Vec<Datum> = vec![
1022                Some(JsonbVal::from_str(payload).unwrap().into()),
1023                Some("file: 1.binlog, pos: 100".to_owned().into()),
1024                Some("mydb.orders".to_owned().into()),
1025            ];
1026            let mut builders = schema.create_array_builders(8);
1027            for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1028                builder.append(datum.clone());
1029            }
1030            let columns = builders
1031                .into_iter()
1032                .map(|builder| builder.finish().into())
1033                .collect();
1034            // one row chunk
1035            let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1036
1037            tx.push_chunk(chunk);
1038        }
1039        let _first_barrier = s.next().await.unwrap();
1040        let upstream_change_log = s.next().await.unwrap().unwrap();
1041        let Message::Chunk(chunk) = upstream_change_log else {
1042            panic!("expect chunk");
1043        };
1044        assert_eq!(chunk.columns().len(), 3);
1045        assert_eq!(chunk.rows().count(), 1);
1046        assert_eq!(
1047            chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1048            vec![Some(44485)]
1049        );
1050        assert_eq!(
1051            chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1052            vec![Some(5)]
1053        );
1054        assert_eq!(
1055            chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1056            vec![Some(100)]
1057        );
1058    }
1059}