risingwave_stream/executor/backfill/cdc/
cdc_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17
18use either::Either;
19use futures::stream;
20use futures::stream::select_with_strategy;
21use itertools::Itertools;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_connector::parser::{
26    ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
27    ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig,
28};
29use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
30use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
31use rw_futures_util::pausable;
32use thiserror_ext::AsReport;
33use tracing::Instrument;
34
35use crate::executor::UpdateMutation;
36use crate::executor::backfill::CdcScanOptions;
37use crate::executor::backfill::cdc::state::CdcBackfillState;
38use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
39use crate::executor::backfill::cdc::upstream_table::snapshot::{
40    SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
41};
42use crate::executor::backfill::utils::{
43    get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
44};
45use crate::executor::monitor::CdcBackfillMetrics;
46use crate::executor::prelude::*;
47use crate::executor::source::get_infinite_backoff_strategy;
48use crate::task::CreateMviewProgressReporter;
49
50/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
51const METADATA_STATE_LEN: usize = 4;
52
53pub struct CdcBackfillExecutor<S: StateStore> {
54    actor_ctx: ActorContextRef,
55
56    /// The external table to be backfilled
57    external_table: ExternalStorageTable,
58
59    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
60    upstream: Executor,
61
62    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
63    output_indices: Vec<usize>,
64
65    /// The schema of output chunk, including additional columns if any
66    output_columns: Vec<ColumnDesc>,
67
68    /// State table of the `CdcBackfill` executor
69    state_impl: CdcBackfillState<S>,
70
71    // TODO: introduce a CdcBackfillProgress to report finish to Meta
72    // This object is just a stub right now
73    progress: Option<CreateMviewProgressReporter>,
74
75    metrics: CdcBackfillMetrics,
76
77    /// Rate limit in rows/s.
78    rate_limit_rps: Option<u32>,
79
80    options: CdcScanOptions,
81}
82
83impl<S: StateStore> CdcBackfillExecutor<S> {
84    #[allow(clippy::too_many_arguments)]
85    pub fn new(
86        actor_ctx: ActorContextRef,
87        external_table: ExternalStorageTable,
88        upstream: Executor,
89        output_indices: Vec<usize>,
90        output_columns: Vec<ColumnDesc>,
91        progress: Option<CreateMviewProgressReporter>,
92        metrics: Arc<StreamingMetrics>,
93        state_table: StateTable<S>,
94        rate_limit_rps: Option<u32>,
95        options: CdcScanOptions,
96    ) -> Self {
97        let pk_indices = external_table.pk_indices();
98        let upstream_table_id = external_table.table_id().table_id;
99        let state_impl = CdcBackfillState::new(
100            upstream_table_id,
101            state_table,
102            pk_indices.len() + METADATA_STATE_LEN,
103        );
104
105        let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
106
107        Self {
108            actor_ctx,
109            external_table,
110            upstream,
111            output_indices,
112            output_columns,
113            state_impl,
114            progress,
115            metrics,
116            rate_limit_rps,
117            options,
118        }
119    }
120
121    fn report_metrics(
122        metrics: &CdcBackfillMetrics,
123        snapshot_processed_row_count: u64,
124        upstream_processed_row_count: u64,
125    ) {
126        metrics
127            .cdc_backfill_snapshot_read_row_count
128            .inc_by(snapshot_processed_row_count);
129
130        metrics
131            .cdc_backfill_upstream_output_row_count
132            .inc_by(upstream_processed_row_count);
133    }
134
135    #[try_stream(ok = Message, error = StreamExecutorError)]
136    async fn execute_inner(mut self) {
137        // The indices to primary key columns
138        let pk_indices = self.external_table.pk_indices().to_vec();
139        let pk_order = self.external_table.pk_order_types().to_vec();
140
141        let table_id = self.external_table.table_id().table_id;
142        let upstream_table_name = self.external_table.qualified_table_name();
143        let schema_table_name = self.external_table.schema_table_name().clone();
144        let external_database_name = self.external_table.database_name().to_owned();
145
146        let additional_columns = self
147            .output_columns
148            .iter()
149            .filter(|col| col.additional_column.column_type.is_some())
150            .cloned()
151            .collect_vec();
152
153        let mut upstream = self.upstream.execute();
154
155        // Current position of the upstream_table storage primary key.
156        // `None` means it starts from the beginning.
157        let mut current_pk_pos: Option<OwnedRow>;
158
159        // Poll the upstream to get the first barrier.
160        let first_barrier = expect_first_barrier(&mut upstream).await?;
161
162        let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
163        let first_barrier_epoch = first_barrier.epoch;
164        // The first barrier message should be propagated.
165        yield Message::Barrier(first_barrier);
166        let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
167
168        // Check whether this parallelism has been assigned splits,
169        // if not, we should bypass the backfill directly.
170        let mut state_impl = self.state_impl;
171
172        state_impl.init_epoch(first_barrier_epoch).await?;
173
174        // restore backfill state
175        let state = state_impl.restore_state().await?;
176        current_pk_pos = state.current_pk_pos.clone();
177
178        let need_backfill = !self.options.disable_backfill && !state.is_finished;
179
180        // Keep track of rows from the snapshot.
181        let mut total_snapshot_row_count = state.row_count as u64;
182
183        // After init the state table and forward the initial barrier to downstream,
184        // we now try to create the table reader with retry.
185        // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
186        // If backfill is finished, we should forward the upstream cdc events to downstream.
187        let mut table_reader: Option<ExternalTableReaderImpl> = None;
188        let external_table = self.external_table.clone();
189        let mut future = Box::pin(async move {
190            let backoff = get_infinite_backoff_strategy();
191            tokio_retry::Retry::spawn(backoff, || async {
192                match external_table.create_table_reader().await {
193                    Ok(reader) => Ok(reader),
194                    Err(e) => {
195                        tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
196                        Err(e)
197                    }
198                }
199            })
200            .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
201            .await
202            .expect("Retry create cdc table reader until success.")
203        });
204        loop {
205            if let Some(msg) =
206                build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
207                    .await?
208            {
209                match msg {
210                    Message::Barrier(barrier) => {
211                        // commit state to bump the epoch of state table
212                        state_impl.commit_state(barrier.epoch).await?;
213                        yield Message::Barrier(barrier);
214                    }
215                    Message::Chunk(chunk) => {
216                        if need_backfill {
217                            // ignore chunk if we need backfill, since we can read the data from the snapshot
218                        } else {
219                            // forward the chunk to downstream
220                            yield Message::Chunk(chunk);
221                        }
222                    }
223                    Message::Watermark(_) => {
224                        // ignore watermark
225                    }
226                }
227            } else {
228                assert!(table_reader.is_some(), "table reader must created");
229                tracing::info!(
230                    table_id,
231                    upstream_table_name,
232                    "table reader created successfully"
233                );
234                break;
235            }
236        }
237
238        let upstream_table_reader = UpstreamTableReader::new(
239            self.external_table.clone(),
240            table_reader.expect("table reader must created"),
241        );
242
243        let mut upstream = transform_upstream(upstream, &self.output_columns)
244            .boxed()
245            .peekable();
246        let mut last_binlog_offset: Option<CdcOffset> = state
247            .last_cdc_offset
248            .map_or(upstream_table_reader.current_cdc_offset().await?, Some);
249
250        let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
251        let mut consumed_binlog_offset: Option<CdcOffset> = None;
252
253        tracing::info!(
254            table_id,
255            upstream_table_name,
256            initial_binlog_offset = ?last_binlog_offset,
257            ?current_pk_pos,
258            is_finished = state.is_finished,
259            is_snapshot_paused,
260            snapshot_row_count = total_snapshot_row_count,
261            rate_limit = self.rate_limit_rps,
262            disable_backfill = self.options.disable_backfill,
263            snapshot_interval = self.options.snapshot_interval,
264            snapshot_batch_size = self.options.snapshot_batch_size,
265            "start cdc backfill",
266        );
267
268        // CDC Backfill Algorithm:
269        //
270        // When the first barrier comes from upstream:
271        //  - read the current binlog offset as `binlog_low`
272        //  - start a snapshot read upon upstream table and iterate over the snapshot read stream
273        //  - buffer the changelog event from upstream
274        //
275        // When a new barrier comes from upstream:
276        //  - read the current binlog offset as `binlog_high`
277        //  - for each row of the upstream change log, forward it to downstream if it in the range
278        //    of [binlog_low, binlog_high] and its pk <= `current_pos`, otherwise ignore it
279        //  - reconstruct the whole backfill stream with upstream changelog and a new table snapshot
280        //
281        // When a chunk comes from snapshot, we forward it to the downstream and raise
282        // `current_pos`.
283        // When we reach the end of the snapshot read stream, it means backfill has been
284        // finished.
285        //
286        // Once the backfill loop ends, we forward the upstream directly to the downstream.
287        if need_backfill {
288            // drive the upstream changelog first to ensure we can receive timely changelog event,
289            // otherwise the upstream changelog may be blocked by the snapshot read stream
290            let _ = Pin::new(&mut upstream).peek().await;
291
292            // wait for a barrier to make sure the backfill starts after upstream source
293            #[for_await]
294            for msg in upstream.by_ref() {
295                match msg? {
296                    Message::Barrier(barrier) => {
297                        match barrier.mutation.as_deref() {
298                            Some(crate::executor::Mutation::Pause) => {
299                                is_snapshot_paused = true;
300                                tracing::info!(
301                                    table_id,
302                                    upstream_table_name,
303                                    "snapshot is paused by barrier"
304                                );
305                            }
306                            Some(crate::executor::Mutation::Resume) => {
307                                is_snapshot_paused = false;
308                                tracing::info!(
309                                    table_id,
310                                    upstream_table_name,
311                                    "snapshot is resumed by barrier"
312                                );
313                            }
314                            _ => {
315                                // ignore other mutations
316                            }
317                        }
318                        // commit state just to bump the epoch of state table
319                        state_impl.commit_state(barrier.epoch).await?;
320                        yield Message::Barrier(barrier);
321                        break;
322                    }
323                    Message::Chunk(ref chunk) => {
324                        last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
325                    }
326                    Message::Watermark(_) => {
327                        // Ignore watermark
328                    }
329                }
330            }
331
332            tracing::info!(table_id,
333                upstream_table_name,
334                initial_binlog_offset = ?last_binlog_offset,
335                ?current_pk_pos,
336                is_snapshot_paused,
337                "start cdc backfill loop");
338
339            // the buffer will be drained when a barrier comes
340            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
341
342            'backfill_loop: loop {
343                let left_upstream = upstream.by_ref().map(Either::Left);
344
345                let mut snapshot_read_row_cnt: usize = 0;
346                let read_args = SnapshotReadArgs::new(
347                    current_pk_pos.clone(),
348                    self.rate_limit_rps,
349                    pk_indices.clone(),
350                    additional_columns.clone(),
351                    schema_table_name.clone(),
352                    external_database_name.clone(),
353                );
354
355                let right_snapshot = pin!(
356                    upstream_table_reader
357                        .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
358                        .map(Either::Right)
359                );
360
361                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
362                if is_snapshot_paused {
363                    snapshot_valve.pause();
364                }
365
366                // Prefer to select upstream, so we can stop snapshot stream when barrier comes.
367                let mut backfill_stream =
368                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
369                        stream::PollNext::Left
370                    });
371
372                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
373                let mut cur_barrier_upstream_processed_rows: u64 = 0;
374                let mut barrier_count: u32 = 0;
375                let mut pending_barrier = None;
376
377                #[for_await]
378                for either in &mut backfill_stream {
379                    match either {
380                        // Upstream
381                        Either::Left(msg) => {
382                            match msg? {
383                                Message::Barrier(barrier) => {
384                                    // increase the barrier count and check whether need to start a new snapshot
385                                    barrier_count += 1;
386                                    let can_start_new_snapshot =
387                                        barrier_count == self.options.snapshot_interval;
388
389                                    if let Some(mutation) = barrier.mutation.as_deref() {
390                                        use crate::executor::Mutation;
391                                        match mutation {
392                                            Mutation::Pause => {
393                                                is_snapshot_paused = true;
394                                                snapshot_valve.pause();
395                                            }
396                                            Mutation::Resume => {
397                                                is_snapshot_paused = false;
398                                                snapshot_valve.resume();
399                                            }
400                                            Mutation::Throttle(some) => {
401                                                if let Some(new_rate_limit) =
402                                                    some.get(&self.actor_ctx.id)
403                                                    && *new_rate_limit != self.rate_limit_rps
404                                                {
405                                                    self.rate_limit_rps = *new_rate_limit;
406                                                    rate_limit_to_zero = self
407                                                        .rate_limit_rps
408                                                        .is_some_and(|val| val == 0);
409
410                                                    // update and persist current backfill progress without draining the buffered upstream chunks
411                                                    state_impl
412                                                        .mutate_state(
413                                                            current_pk_pos.clone(),
414                                                            last_binlog_offset.clone(),
415                                                            total_snapshot_row_count,
416                                                            false,
417                                                        )
418                                                        .await?;
419                                                    state_impl.commit_state(barrier.epoch).await?;
420                                                    yield Message::Barrier(barrier);
421
422                                                    // rebuild the snapshot stream with new rate limit
423                                                    continue 'backfill_loop;
424                                                }
425                                            }
426                                            Mutation::Update(UpdateMutation {
427                                                dropped_actors,
428                                                ..
429                                            }) => {
430                                                if dropped_actors.contains(&self.actor_ctx.id) {
431                                                    // the actor has been dropped, exit the backfill loop
432                                                    tracing::info!(
433                                                        table_id,
434                                                        upstream_table_name,
435                                                        "CdcBackfill has been dropped due to config change"
436                                                    );
437                                                    yield Message::Barrier(barrier);
438                                                    break 'backfill_loop;
439                                                }
440                                            }
441                                            _ => (),
442                                        }
443                                    }
444
445                                    Self::report_metrics(
446                                        &self.metrics,
447                                        cur_barrier_snapshot_processed_rows,
448                                        cur_barrier_upstream_processed_rows,
449                                    );
450
451                                    // when processing a barrier, check whether can start a new snapshot
452                                    // if the number of barriers reaches the snapshot interval
453                                    if can_start_new_snapshot {
454                                        // staging the barrier
455                                        pending_barrier = Some(barrier);
456                                        tracing::debug!(
457                                            table_id,
458                                            ?current_pk_pos,
459                                            ?snapshot_read_row_cnt,
460                                            "Prepare to start a new snapshot"
461                                        );
462                                        // Break the loop for consuming snapshot and prepare to start a new snapshot
463                                        break;
464                                    } else {
465                                        // update and persist current backfill progress
466                                        state_impl
467                                            .mutate_state(
468                                                current_pk_pos.clone(),
469                                                last_binlog_offset.clone(),
470                                                total_snapshot_row_count,
471                                                false,
472                                            )
473                                            .await?;
474
475                                        state_impl.commit_state(barrier.epoch).await?;
476
477                                        // emit barrier and continue consume the backfill stream
478                                        yield Message::Barrier(barrier);
479                                    }
480                                }
481                                Message::Chunk(chunk) => {
482                                    // skip empty upstream chunk
483                                    if chunk.cardinality() == 0 {
484                                        continue;
485                                    }
486
487                                    let chunk_binlog_offset =
488                                        get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
489
490                                    tracing::trace!(
491                                        "recv changelog chunk: chunk_offset {:?}, capactiy {}",
492                                        chunk_binlog_offset,
493                                        chunk.capacity()
494                                    );
495
496                                    // Since we don't need changelog before the
497                                    // `last_binlog_offset`, skip the chunk that *only* contains
498                                    // events before `last_binlog_offset`.
499                                    if let Some(last_binlog_offset) = last_binlog_offset.as_ref() {
500                                        if let Some(chunk_offset) = chunk_binlog_offset
501                                            && chunk_offset < *last_binlog_offset
502                                        {
503                                            tracing::trace!(
504                                                "skip changelog chunk: chunk_offset {:?}, capacity {}",
505                                                chunk_offset,
506                                                chunk.capacity()
507                                            );
508                                            continue;
509                                        }
510                                    }
511                                    // Buffer the upstream chunk.
512                                    upstream_chunk_buffer.push(chunk.compact());
513                                }
514                                Message::Watermark(_) => {
515                                    // Ignore watermark during backfill.
516                                }
517                            }
518                        }
519                        // Snapshot read
520                        Either::Right(msg) => {
521                            match msg? {
522                                None => {
523                                    tracing::info!(
524                                        table_id,
525                                        ?last_binlog_offset,
526                                        ?current_pk_pos,
527                                        "snapshot read stream ends"
528                                    );
529                                    // If the snapshot read stream ends, it means all historical
530                                    // data has been loaded.
531                                    // We should not mark the chunk anymore,
532                                    // otherwise, we will ignore some rows in the buffer.
533                                    for chunk in upstream_chunk_buffer.drain(..) {
534                                        yield Message::Chunk(mapping_chunk(
535                                            chunk,
536                                            &self.output_indices,
537                                        ));
538                                    }
539
540                                    // backfill has finished, exit the backfill loop and persist the state when we recv a barrier
541                                    break 'backfill_loop;
542                                }
543                                Some(chunk) => {
544                                    // Raise the current position.
545                                    // As snapshot read streams are ordered by pk, so we can
546                                    // just use the last row to update `current_pos`.
547                                    current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
548
549                                    tracing::trace!(
550                                        "got a snapshot chunk: len {}, current_pk_pos {:?}",
551                                        chunk.cardinality(),
552                                        current_pk_pos
553                                    );
554                                    let chunk_cardinality = chunk.cardinality() as u64;
555                                    cur_barrier_snapshot_processed_rows += chunk_cardinality;
556                                    total_snapshot_row_count += chunk_cardinality;
557                                    yield Message::Chunk(mapping_chunk(
558                                        chunk,
559                                        &self.output_indices,
560                                    ));
561                                }
562                            }
563                        }
564                    }
565                }
566
567                assert!(pending_barrier.is_some(), "pending_barrier must exist");
568                let pending_barrier = pending_barrier.unwrap();
569
570                // Here we have to ensure the snapshot stream is consumed at least once,
571                // since the barrier event can kick in anytime.
572                // Otherwise, the result set of the new snapshot stream may become empty.
573                // It maybe a cancellation bug of the mysql driver.
574                let (_, mut snapshot_stream) = backfill_stream.into_inner();
575
576                // skip consume the snapshot stream if it is paused or rate limit to 0
577                if !is_snapshot_paused
578                    && !rate_limit_to_zero
579                    && let Some(msg) = snapshot_stream
580                        .next()
581                        .instrument_await("consume_snapshot_stream_once")
582                        .await
583                {
584                    let Either::Right(msg) = msg else {
585                        bail!("BUG: snapshot_read contains upstream messages");
586                    };
587                    match msg? {
588                        None => {
589                            tracing::info!(
590                                table_id,
591                                ?last_binlog_offset,
592                                ?current_pk_pos,
593                                "snapshot read stream ends in the force emit branch"
594                            );
595                            // End of the snapshot read stream.
596                            // Consume the buffered upstream chunk without filtering by `binlog_low`.
597                            for chunk in upstream_chunk_buffer.drain(..) {
598                                yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
599                            }
600
601                            // mark backfill has finished
602                            state_impl
603                                .mutate_state(
604                                    current_pk_pos.clone(),
605                                    last_binlog_offset.clone(),
606                                    total_snapshot_row_count,
607                                    true,
608                                )
609                                .await?;
610
611                            // commit state because we have received a barrier message
612                            state_impl.commit_state(pending_barrier.epoch).await?;
613                            yield Message::Barrier(pending_barrier);
614                            // end of backfill loop, since backfill has finished
615                            break 'backfill_loop;
616                        }
617                        Some(chunk) => {
618                            // Raise the current pk position.
619                            current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
620
621                            let row_count = chunk.cardinality() as u64;
622                            cur_barrier_snapshot_processed_rows += row_count;
623                            total_snapshot_row_count += row_count;
624                            snapshot_read_row_cnt += row_count as usize;
625
626                            tracing::debug!(
627                                table_id,
628                                ?current_pk_pos,
629                                ?snapshot_read_row_cnt,
630                                "force emit a snapshot chunk"
631                            );
632                            yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
633                        }
634                    }
635                }
636
637                // If the number of barriers reaches the snapshot interval,
638                // consume the buffered upstream chunks.
639                if let Some(current_pos) = &current_pk_pos {
640                    for chunk in upstream_chunk_buffer.drain(..) {
641                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
642
643                        // record the consumed binlog offset that will be
644                        // persisted later
645                        consumed_binlog_offset =
646                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
647
648                        yield Message::Chunk(mapping_chunk(
649                            mark_cdc_chunk(
650                                &offset_parse_func,
651                                chunk,
652                                current_pos,
653                                &pk_indices,
654                                &pk_order,
655                                last_binlog_offset.clone(),
656                            )?,
657                            &self.output_indices,
658                        ));
659                    }
660                } else {
661                    // If no current_pos, means we did not process any snapshot yet.
662                    // we can just ignore the upstream buffer chunk in that case.
663                    upstream_chunk_buffer.clear();
664                }
665
666                // Update last seen binlog offset
667                if consumed_binlog_offset.is_some() {
668                    last_binlog_offset.clone_from(&consumed_binlog_offset);
669                }
670
671                Self::report_metrics(
672                    &self.metrics,
673                    cur_barrier_snapshot_processed_rows,
674                    cur_barrier_upstream_processed_rows,
675                );
676
677                // update and persist current backfill progress
678                state_impl
679                    .mutate_state(
680                        current_pk_pos.clone(),
681                        last_binlog_offset.clone(),
682                        total_snapshot_row_count,
683                        false,
684                    )
685                    .await?;
686
687                state_impl.commit_state(pending_barrier.epoch).await?;
688                yield Message::Barrier(pending_barrier);
689            }
690        } else if self.options.disable_backfill {
691            // If backfill is disabled, we just mark the backfill as finished
692            tracing::info!(
693                table_id,
694                upstream_table_name,
695                "CdcBackfill has been disabled"
696            );
697            state_impl
698                .mutate_state(
699                    current_pk_pos.clone(),
700                    last_binlog_offset.clone(),
701                    total_snapshot_row_count,
702                    true,
703                )
704                .await?;
705        }
706
707        // drop reader to release db connection
708        drop(upstream_table_reader);
709
710        tracing::info!(
711            table_id,
712            upstream_table_name,
713            "CdcBackfill has already finished and will forward messages directly to the downstream"
714        );
715
716        // Wait for first barrier to come after backfill is finished.
717        // So we can update our progress + persist the status.
718        while let Some(Ok(msg)) = upstream.next().await {
719            if let Some(msg) = mapping_message(msg, &self.output_indices) {
720                // If not finished then we need to update state, otherwise no need.
721                if let Message::Barrier(barrier) = &msg {
722                    // finalized the backfill state
723                    // TODO: unify `mutate_state` and `commit_state` into one method
724                    state_impl
725                        .mutate_state(
726                            current_pk_pos.clone(),
727                            last_binlog_offset.clone(),
728                            total_snapshot_row_count,
729                            true,
730                        )
731                        .await?;
732                    state_impl.commit_state(barrier.epoch).await?;
733
734                    // mark progress as finished
735                    if let Some(progress) = self.progress.as_mut() {
736                        progress.finish(barrier.epoch, total_snapshot_row_count);
737                    }
738                    yield msg;
739                    // break after the state have been saved
740                    break;
741                }
742                yield msg;
743            }
744        }
745
746        // After backfill progress finished
747        // we can forward messages directly to the downstream,
748        // as backfill is finished.
749        #[for_await]
750        for msg in upstream {
751            // upstream offsets will be removed from the message before forwarding to
752            // downstream
753            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
754                if let Message::Barrier(barrier) = &msg {
755                    // commit state just to bump the epoch of state table
756                    state_impl.commit_state(barrier.epoch).await?;
757                }
758                yield msg;
759            }
760        }
761    }
762}
763
764async fn build_reader_and_poll_upstream(
765    upstream: &mut BoxedMessageStream,
766    table_reader: &mut Option<ExternalTableReaderImpl>,
767    future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
768) -> StreamExecutorResult<Option<Message>> {
769    if table_reader.is_some() {
770        return Ok(None);
771    }
772    tokio::select! {
773        biased;
774        reader = &mut *future => {
775            *table_reader = Some(reader);
776            Ok(None)
777        }
778        msg = upstream.next() => {
779            msg.transpose()
780        }
781    }
782}
783
784#[try_stream(ok = Message, error = StreamExecutorError)]
785pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) {
786    let props = SpecificParserConfig {
787        encoding_config: EncodingProperties::Json(JsonProperties {
788            use_schema_registry: false,
789            timestamptz_handling: None,
790        }),
791        // the cdc message is generated internally so the key must exist.
792        protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
793    };
794
795    // convert to source column desc to feed into parser
796    let columns_with_meta = output_columns
797        .iter()
798        .map(SourceColumnDesc::from)
799        .collect_vec();
800
801    let mut parser = DebeziumParser::new(
802        props,
803        columns_with_meta.clone(),
804        Arc::new(SourceContext::dummy()),
805    )
806    .await
807    .map_err(StreamExecutorError::connector_error)?;
808
809    pin_mut!(upstream);
810    #[for_await]
811    for msg in upstream {
812        let mut msg = msg?;
813        if let Message::Chunk(chunk) = &mut msg {
814            let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
815            let _ = std::mem::replace(chunk, parsed_chunk);
816        }
817        yield msg;
818    }
819}
820
821async fn parse_debezium_chunk(
822    parser: &mut DebeziumParser,
823    chunk: &StreamChunk,
824) -> StreamExecutorResult<StreamChunk> {
825    // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema
826    // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the
827    // table job with `_rw_offset` in the end
828    // see `gen_create_table_plan_for_cdc_source` for details
829
830    // use `SourceStreamChunkBuilder` for convenience
831    let mut builder = SourceStreamChunkBuilder::new(
832        parser.columns().to_vec(),
833        SourceCtrlOpts {
834            chunk_size: chunk.capacity(),
835            split_txn: false,
836        },
837    );
838
839    // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)`
840    // We should use the debezium parser to parse the first column,
841    // then chain the parsed row with `_rw_offset` row to get a new row.
842    let payloads = chunk.data_chunk().project(&[0]);
843    let offsets = chunk.data_chunk().project(&[1]).compact();
844
845    // TODO: preserve the transaction semantics
846    for payload in payloads.rows() {
847        let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
848        else {
849            panic!("payload must be jsonb");
850        };
851
852        parser
853            .parse_inner(
854                None,
855                Some(jsonb_ref.to_string().as_bytes().to_vec()),
856                builder.row_writer(),
857            )
858            .await
859            .unwrap();
860    }
861    builder.finish_current_chunk();
862
863    let parsed_chunk = {
864        let mut iter = builder.consume_ready_chunks();
865        assert_eq!(1, iter.len());
866        iter.next().unwrap()
867    };
868    assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row
869    let (ops, mut columns, vis) = parsed_chunk.into_inner();
870    // note that `vis` is not necessarily the same as the original chunk's visibilities
871
872    // concat the rows in the parsed chunk with the `_rw_offset` column
873    columns.extend(offsets.into_parts().0);
874
875    Ok(StreamChunk::from_parts(
876        ops,
877        DataChunk::from_parts(columns.into(), vis),
878    ))
879}
880
881impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
882    fn execute(self: Box<Self>) -> BoxedMessageStream {
883        self.execute_inner().boxed()
884    }
885}
886
887#[cfg(test)]
888mod tests {
889    use std::str::FromStr;
890
891    use futures::{StreamExt, pin_mut};
892    use risingwave_common::array::{DataChunk, Op, StreamChunk};
893    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
894    use risingwave_common::types::{DataType, Datum, JsonbVal};
895    use risingwave_common::util::iter_util::ZipEqFast;
896
897    use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
898    use crate::executor::test_utils::MockSource;
899
900    #[tokio::test]
901    async fn test_transform_upstream_chunk() {
902        let schema = Schema::new(vec![
903            Field::unnamed(DataType::Jsonb),   // debezium json payload
904            Field::unnamed(DataType::Varchar), // _rw_offset
905            Field::unnamed(DataType::Varchar), // _rw_table_name
906        ]);
907        let pk_indices = vec![1];
908        let (mut tx, source) = MockSource::channel();
909        let source = source.into_executor(schema.clone(), pk_indices.clone());
910        // let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();
911        let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
912
913        let datums: Vec<Datum> = vec![
914            Some(JsonbVal::from_str(payload).unwrap().into()),
915            Some("file: 1.binlog, pos: 100".to_owned().into()),
916            Some("mydb.orders".to_owned().into()),
917        ];
918
919        println!("datums: {:?}", datums[1]);
920
921        let mut builders = schema.create_array_builders(8);
922        for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
923            builder.append(datum.clone());
924        }
925        let columns = builders
926            .into_iter()
927            .map(|builder| builder.finish().into())
928            .collect();
929
930        // one row chunk
931        let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
932
933        tx.push_chunk(chunk);
934        let upstream = Box::new(source).execute();
935
936        // schema to the debezium parser
937        let columns = vec![
938            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
939            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
940            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
941            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
942            ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
943            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
944        ];
945
946        let parsed_stream = transform_upstream(upstream, &columns);
947        pin_mut!(parsed_stream);
948        // the output chunk must contain the offset column
949        if let Some(message) = parsed_stream.next().await {
950            println!("chunk: {:#?}", message.unwrap());
951        }
952    }
953}