risingwave_stream/executor/backfill/cdc/
cdc_backfill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use either::Either;
20use futures::stream;
21use futures::stream::select_with_strategy;
22use itertools::Itertools;
23use risingwave_common::array::DataChunk;
24use risingwave_common::bail;
25use risingwave_common::catalog::ColumnDesc;
26use risingwave_connector::parser::{
27    BigintUnsignedHandlingMode, ByteStreamSourceParser, DebeziumParser, DebeziumProps,
28    EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder,
29    SpecificParserConfig, TimeHandling, TimestampHandling, TimestamptzHandling,
30};
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_connector::source::cdc::external::{
33    CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
34};
35use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
36use rw_futures_util::pausable;
37use thiserror_ext::AsReport;
38use tracing::Instrument;
39
40use crate::executor::UpdateMutation;
41use crate::executor::backfill::cdc::state::CdcBackfillState;
42use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
43use crate::executor::backfill::cdc::upstream_table::snapshot::{
44    SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
45};
46use crate::executor::backfill::utils::{
47    get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
48};
49use crate::executor::monitor::CdcBackfillMetrics;
50use crate::executor::prelude::*;
51use crate::executor::source::get_infinite_backoff_strategy;
52use crate::task::CreateMviewProgressReporter;
53
54/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
55const METADATA_STATE_LEN: usize = 4;
56
57pub struct CdcBackfillExecutor<S: StateStore> {
58    actor_ctx: ActorContextRef,
59
60    /// The external table to be backfilled
61    external_table: ExternalStorageTable,
62
63    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
64    upstream: Executor,
65
66    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
67    output_indices: Vec<usize>,
68
69    /// The schema of output chunk, including additional columns if any
70    output_columns: Vec<ColumnDesc>,
71
72    /// State table of the `CdcBackfill` executor
73    state_impl: CdcBackfillState<S>,
74
75    // TODO: introduce a CdcBackfillProgress to report finish to Meta
76    // This object is just a stub right now
77    progress: Option<CreateMviewProgressReporter>,
78
79    metrics: CdcBackfillMetrics,
80
81    /// Rate limit in rows/s.
82    rate_limit_rps: Option<u32>,
83
84    options: CdcScanOptions,
85
86    properties: BTreeMap<String, String>,
87}
88
89impl<S: StateStore> CdcBackfillExecutor<S> {
90    #[allow(clippy::too_many_arguments)]
91    pub fn new(
92        actor_ctx: ActorContextRef,
93        external_table: ExternalStorageTable,
94        upstream: Executor,
95        output_indices: Vec<usize>,
96        output_columns: Vec<ColumnDesc>,
97        progress: Option<CreateMviewProgressReporter>,
98        metrics: Arc<StreamingMetrics>,
99        state_table: StateTable<S>,
100        rate_limit_rps: Option<u32>,
101        options: CdcScanOptions,
102        properties: BTreeMap<String, String>,
103    ) -> Self {
104        let pk_indices = external_table.pk_indices();
105        let upstream_table_id = external_table.table_id();
106        let state_impl = CdcBackfillState::new(
107            upstream_table_id,
108            state_table,
109            pk_indices.len() + METADATA_STATE_LEN,
110        );
111
112        let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
113        Self {
114            actor_ctx,
115            external_table,
116            upstream,
117            output_indices,
118            output_columns,
119            state_impl,
120            progress,
121            metrics,
122            rate_limit_rps,
123            options,
124            properties,
125        }
126    }
127
128    fn report_metrics(
129        metrics: &CdcBackfillMetrics,
130        snapshot_processed_row_count: u64,
131        upstream_processed_row_count: u64,
132    ) {
133        metrics
134            .cdc_backfill_snapshot_read_row_count
135            .inc_by(snapshot_processed_row_count);
136
137        metrics
138            .cdc_backfill_upstream_output_row_count
139            .inc_by(upstream_processed_row_count);
140    }
141
142    #[try_stream(ok = Message, error = StreamExecutorError)]
143    async fn execute_inner(mut self) {
144        // The indices to primary key columns
145        let pk_indices = self.external_table.pk_indices().to_vec();
146        let pk_order = self.external_table.pk_order_types().to_vec();
147
148        let table_id = self.external_table.table_id();
149        let upstream_table_name = self.external_table.qualified_table_name();
150        let schema_table_name = self.external_table.schema_table_name().clone();
151        let external_database_name = self.external_table.database_name().to_owned();
152
153        let additional_columns = self
154            .output_columns
155            .iter()
156            .filter(|col| col.additional_column.column_type.is_some())
157            .cloned()
158            .collect_vec();
159
160        let mut upstream = self.upstream.execute();
161
162        // Current position of the upstream_table storage primary key.
163        // `None` means it starts from the beginning.
164        let mut current_pk_pos: Option<OwnedRow>;
165
166        // Poll the upstream to get the first barrier.
167        let first_barrier = expect_first_barrier(&mut upstream).await?;
168
169        let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
170        let first_barrier_epoch = first_barrier.epoch;
171        // The first barrier message should be propagated.
172        yield Message::Barrier(first_barrier);
173        let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
174
175        // Check whether this parallelism has been assigned splits,
176        // if not, we should bypass the backfill directly.
177        let mut state_impl = self.state_impl;
178
179        state_impl.init_epoch(first_barrier_epoch).await?;
180
181        // restore backfill state
182        let state = state_impl.restore_state().await?;
183        current_pk_pos = state.current_pk_pos.clone();
184
185        let need_backfill = !self.options.disable_backfill && !state.is_finished;
186
187        // Keep track of rows from the snapshot.
188        let mut total_snapshot_row_count = state.row_count as u64;
189
190        // After init the state table and forward the initial barrier to downstream,
191        // we now try to create the table reader with retry.
192        // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
193        // If backfill is finished, we should forward the upstream cdc events to downstream.
194        let mut table_reader: Option<ExternalTableReaderImpl> = None;
195        let external_table = self.external_table.clone();
196        let mut future = Box::pin(async move {
197            let backoff = get_infinite_backoff_strategy();
198            tokio_retry::Retry::spawn(backoff, || async {
199                match external_table.create_table_reader().await {
200                    Ok(reader) => Ok(reader),
201                    Err(e) => {
202                        tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
203                        Err(e)
204                    }
205                }
206            })
207            .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
208            .await
209            .expect("Retry create cdc table reader until success.")
210        });
211        let timestamp_handling: Option<TimestampHandling> = self
212            .properties
213            .get("debezium.time.precision.mode")
214            .map(|v| v == "connect")
215            .unwrap_or(false)
216            .then_some(TimestampHandling::Milli);
217        let timestamptz_handling: Option<TimestamptzHandling> = self
218            .properties
219            .get("debezium.time.precision.mode")
220            .map(|v| v == "connect")
221            .unwrap_or(false)
222            .then_some(TimestamptzHandling::Milli);
223        let time_handling: Option<TimeHandling> = self
224            .properties
225            .get("debezium.time.precision.mode")
226            .map(|v| v == "connect")
227            .unwrap_or(false)
228            .then_some(TimeHandling::Milli);
229        let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
230            .properties
231            .get("debezium.bigint.unsigned.handling.mode")
232            .map(|v| v == "precise")
233            .unwrap_or(false)
234            .then_some(BigintUnsignedHandlingMode::Precise);
235        // Only postgres-cdc connector may trigger TOAST.
236        let handle_toast_columns: bool =
237            self.external_table.table_type() == &ExternalCdcTableType::Postgres;
238        // Make sure to use mapping_message after transform_upstream.
239        let mut upstream = transform_upstream(
240            upstream,
241            self.output_columns.clone(),
242            timestamp_handling,
243            timestamptz_handling,
244            time_handling,
245            bigint_unsigned_handling,
246            handle_toast_columns,
247        )
248        .boxed();
249        loop {
250            if let Some(msg) =
251                build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
252                    .await?
253            {
254                if let Some(msg) = mapping_message(msg, &self.output_indices) {
255                    match msg {
256                        Message::Barrier(barrier) => {
257                            // commit state to bump the epoch of state table
258                            state_impl.commit_state(barrier.epoch).await?;
259                            yield Message::Barrier(barrier);
260                        }
261                        Message::Chunk(chunk) => {
262                            if need_backfill {
263                                // ignore chunk if we need backfill, since we can read the data from the snapshot
264                            } else {
265                                // forward the chunk to downstream
266                                yield Message::Chunk(chunk);
267                            }
268                        }
269                        Message::Watermark(_) => {
270                            // ignore watermark
271                        }
272                    }
273                }
274            } else {
275                assert!(table_reader.is_some(), "table reader must created");
276                tracing::info!(
277                    %table_id,
278                    upstream_table_name,
279                    "table reader created successfully"
280                );
281                break;
282            }
283        }
284
285        let upstream_table_reader = UpstreamTableReader::new(
286            self.external_table.clone(),
287            table_reader.expect("table reader must created"),
288        );
289
290        let mut upstream = upstream.peekable();
291
292        let mut last_binlog_offset: Option<CdcOffset> = {
293            // Limit concurrent CDC connections globally to 10 using a semaphore.
294            static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
295                tokio::sync::Semaphore::const_new(10);
296
297            let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
298            state
299                .last_cdc_offset
300                .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
301        };
302
303        let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
304        let mut consumed_binlog_offset: Option<CdcOffset> = None;
305
306        tracing::info!(
307            %table_id,
308            upstream_table_name,
309            initial_binlog_offset = ?last_binlog_offset,
310            ?current_pk_pos,
311            is_finished = state.is_finished,
312            is_snapshot_paused,
313            snapshot_row_count = total_snapshot_row_count,
314            rate_limit = self.rate_limit_rps,
315            disable_backfill = self.options.disable_backfill,
316            snapshot_barrier_interval = self.options.snapshot_barrier_interval,
317            snapshot_batch_size = self.options.snapshot_batch_size,
318            "start cdc backfill",
319        );
320
321        // CDC Backfill Algorithm:
322        //
323        // When the first barrier comes from upstream:
324        //  - read the current binlog offset as `binlog_low`
325        //  - start a snapshot read upon upstream table and iterate over the snapshot read stream
326        //  - buffer the changelog event from upstream
327        //
328        // When a new barrier comes from upstream:
329        //  - read the current binlog offset as `binlog_high`
330        //  - for each row of the upstream change log, forward it to downstream if it in the range
331        //    of [binlog_low, binlog_high] and its pk <= `current_pos`, otherwise ignore it
332        //  - reconstruct the whole backfill stream with upstream changelog and a new table snapshot
333        //
334        // When a chunk comes from snapshot, we forward it to the downstream and raise
335        // `current_pos`.
336        // When we reach the end of the snapshot read stream, it means backfill has been
337        // finished.
338        //
339        // Once the backfill loop ends, we forward the upstream directly to the downstream.
340        if need_backfill {
341            // drive the upstream changelog first to ensure we can receive timely changelog event,
342            // otherwise the upstream changelog may be blocked by the snapshot read stream
343            let _ = Pin::new(&mut upstream).peek().await;
344
345            // wait for a barrier to make sure the backfill starts after upstream source
346            #[for_await]
347            for msg in upstream.by_ref() {
348                match msg? {
349                    Message::Barrier(barrier) => {
350                        match barrier.mutation.as_deref() {
351                            Some(crate::executor::Mutation::Pause) => {
352                                is_snapshot_paused = true;
353                                tracing::info!(
354                                    %table_id,
355                                    upstream_table_name,
356                                    "snapshot is paused by barrier"
357                                );
358                            }
359                            Some(crate::executor::Mutation::Resume) => {
360                                is_snapshot_paused = false;
361                                tracing::info!(
362                                    %table_id,
363                                    upstream_table_name,
364                                    "snapshot is resumed by barrier"
365                                );
366                            }
367                            _ => {
368                                // ignore other mutations
369                            }
370                        }
371                        // commit state just to bump the epoch of state table
372                        state_impl.commit_state(barrier.epoch).await?;
373                        yield Message::Barrier(barrier);
374                        break;
375                    }
376                    Message::Chunk(ref chunk) => {
377                        last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
378                    }
379                    Message::Watermark(_) => {
380                        // Ignore watermark
381                    }
382                }
383            }
384
385            tracing::info!(%table_id,
386                upstream_table_name,
387                initial_binlog_offset = ?last_binlog_offset,
388                ?current_pk_pos,
389                is_snapshot_paused,
390                "start cdc backfill loop");
391
392            // the buffer will be drained when a barrier comes
393            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
394
395            'backfill_loop: loop {
396                let left_upstream = upstream.by_ref().map(Either::Left);
397
398                let mut snapshot_read_row_cnt: usize = 0;
399                let read_args = SnapshotReadArgs::new(
400                    current_pk_pos.clone(),
401                    self.rate_limit_rps,
402                    pk_indices.clone(),
403                    additional_columns.clone(),
404                    schema_table_name.clone(),
405                    external_database_name.clone(),
406                );
407                let right_snapshot = pin!(
408                    upstream_table_reader
409                        .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
410                        .map(Either::Right)
411                );
412                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
413                if is_snapshot_paused {
414                    snapshot_valve.pause();
415                }
416
417                // Prefer to select upstream, so we can stop snapshot stream when barrier comes.
418                let mut backfill_stream =
419                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
420                        stream::PollNext::Left
421                    });
422
423                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
424                let mut cur_barrier_upstream_processed_rows: u64 = 0;
425                let mut barrier_count: u32 = 0;
426                let mut pending_barrier = None;
427
428                #[for_await]
429                for either in &mut backfill_stream {
430                    match either {
431                        // Upstream
432                        Either::Left(msg) => {
433                            match msg? {
434                                Message::Barrier(barrier) => {
435                                    // increase the barrier count and check whether need to start a new snapshot
436                                    barrier_count += 1;
437                                    let can_start_new_snapshot =
438                                        barrier_count == self.options.snapshot_barrier_interval;
439
440                                    if let Some(mutation) = barrier.mutation.as_deref() {
441                                        use crate::executor::Mutation;
442                                        match mutation {
443                                            Mutation::Pause => {
444                                                is_snapshot_paused = true;
445                                                snapshot_valve.pause();
446                                            }
447                                            Mutation::Resume => {
448                                                is_snapshot_paused = false;
449                                                snapshot_valve.resume();
450                                            }
451                                            Mutation::Throttle(some) => {
452                                                if let Some(new_rate_limit) =
453                                                    some.get(&self.actor_ctx.id)
454                                                    && *new_rate_limit != self.rate_limit_rps
455                                                {
456                                                    self.rate_limit_rps = *new_rate_limit;
457                                                    rate_limit_to_zero = self
458                                                        .rate_limit_rps
459                                                        .is_some_and(|val| val == 0);
460                                                    // update and persist current backfill progress without draining the buffered upstream chunks
461                                                    state_impl
462                                                        .mutate_state(
463                                                            current_pk_pos.clone(),
464                                                            last_binlog_offset.clone(),
465                                                            total_snapshot_row_count,
466                                                            false,
467                                                        )
468                                                        .await?;
469                                                    state_impl.commit_state(barrier.epoch).await?;
470                                                    yield Message::Barrier(barrier);
471
472                                                    // rebuild the snapshot stream with new rate limit
473                                                    continue 'backfill_loop;
474                                                }
475                                            }
476                                            Mutation::Update(UpdateMutation {
477                                                dropped_actors,
478                                                ..
479                                            }) => {
480                                                if dropped_actors.contains(&self.actor_ctx.id) {
481                                                    // the actor has been dropped, exit the backfill loop
482                                                    tracing::info!(
483                                                        %table_id,
484                                                        upstream_table_name,
485                                                        "CdcBackfill has been dropped due to config change"
486                                                    );
487                                                    yield Message::Barrier(barrier);
488                                                    break 'backfill_loop;
489                                                }
490                                            }
491                                            _ => (),
492                                        }
493                                    }
494
495                                    Self::report_metrics(
496                                        &self.metrics,
497                                        cur_barrier_snapshot_processed_rows,
498                                        cur_barrier_upstream_processed_rows,
499                                    );
500
501                                    // when processing a barrier, check whether can start a new snapshot
502                                    // if the number of barriers reaches the snapshot interval
503                                    if can_start_new_snapshot {
504                                        // staging the barrier
505                                        pending_barrier = Some(barrier);
506                                        tracing::debug!(
507                                            %table_id,
508                                            ?current_pk_pos,
509                                            ?snapshot_read_row_cnt,
510                                            "Prepare to start a new snapshot"
511                                        );
512                                        // Break the loop for consuming snapshot and prepare to start a new snapshot
513                                        break;
514                                    } else {
515                                        // update and persist current backfill progress
516                                        state_impl
517                                            .mutate_state(
518                                                current_pk_pos.clone(),
519                                                last_binlog_offset.clone(),
520                                                total_snapshot_row_count,
521                                                false,
522                                            )
523                                            .await?;
524
525                                        state_impl.commit_state(barrier.epoch).await?;
526
527                                        // emit barrier and continue consume the backfill stream
528                                        yield Message::Barrier(barrier);
529                                    }
530                                }
531                                Message::Chunk(chunk) => {
532                                    // skip empty upstream chunk
533                                    if chunk.cardinality() == 0 {
534                                        continue;
535                                    }
536
537                                    let chunk_binlog_offset =
538                                        get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
539
540                                    tracing::trace!(
541                                        "recv changelog chunk: chunk_offset {:?}, capactiy {}",
542                                        chunk_binlog_offset,
543                                        chunk.capacity()
544                                    );
545
546                                    // Since we don't need changelog before the
547                                    // `last_binlog_offset`, skip the chunk that *only* contains
548                                    // events before `last_binlog_offset`.
549                                    if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
550                                        && let Some(chunk_offset) = chunk_binlog_offset
551                                        && chunk_offset < *last_binlog_offset
552                                    {
553                                        tracing::trace!(
554                                            "skip changelog chunk: chunk_offset {:?}, capacity {}",
555                                            chunk_offset,
556                                            chunk.capacity()
557                                        );
558                                        continue;
559                                    }
560                                    // Buffer the upstream chunk.
561                                    upstream_chunk_buffer.push(chunk.compact_vis());
562                                }
563                                Message::Watermark(_) => {
564                                    // Ignore watermark during backfill.
565                                }
566                            }
567                        }
568                        // Snapshot read
569                        Either::Right(msg) => {
570                            match msg? {
571                                None => {
572                                    tracing::info!(
573                                        %table_id,
574                                        ?last_binlog_offset,
575                                        ?current_pk_pos,
576                                        "snapshot read stream ends"
577                                    );
578                                    // If the snapshot read stream ends, it means all historical
579                                    // data has been loaded.
580                                    // We should not mark the chunk anymore,
581                                    // otherwise, we will ignore some rows in the buffer.
582                                    for chunk in upstream_chunk_buffer.drain(..) {
583                                        yield Message::Chunk(mapping_chunk(
584                                            chunk,
585                                            &self.output_indices,
586                                        ));
587                                    }
588
589                                    // backfill has finished, exit the backfill loop and persist the state when we recv a barrier
590                                    break 'backfill_loop;
591                                }
592                                Some(chunk) => {
593                                    // Raise the current position.
594                                    // As snapshot read streams are ordered by pk, so we can
595                                    // just use the last row to update `current_pos`.
596                                    current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
597
598                                    tracing::trace!(
599                                        "got a snapshot chunk: len {}, current_pk_pos {:?}",
600                                        chunk.cardinality(),
601                                        current_pk_pos
602                                    );
603                                    let chunk_cardinality = chunk.cardinality() as u64;
604                                    cur_barrier_snapshot_processed_rows += chunk_cardinality;
605                                    total_snapshot_row_count += chunk_cardinality;
606                                    yield Message::Chunk(mapping_chunk(
607                                        chunk,
608                                        &self.output_indices,
609                                    ));
610                                }
611                            }
612                        }
613                    }
614                }
615
616                assert!(pending_barrier.is_some(), "pending_barrier must exist");
617                let pending_barrier = pending_barrier.unwrap();
618
619                // Here we have to ensure the snapshot stream is consumed at least once,
620                // since the barrier event can kick in anytime.
621                // Otherwise, the result set of the new snapshot stream may become empty.
622                // It maybe a cancellation bug of the mysql driver.
623                let (_, mut snapshot_stream) = backfill_stream.into_inner();
624
625                // skip consume the snapshot stream if it is paused or rate limit to 0
626                if !is_snapshot_paused
627                    && !rate_limit_to_zero
628                    && let Some(msg) = snapshot_stream
629                        .next()
630                        .instrument_await("consume_snapshot_stream_once")
631                        .await
632                {
633                    let Either::Right(msg) = msg else {
634                        bail!("BUG: snapshot_read contains upstream messages");
635                    };
636                    match msg? {
637                        None => {
638                            tracing::info!(
639                                %table_id,
640                                ?last_binlog_offset,
641                                ?current_pk_pos,
642                                "snapshot read stream ends in the force emit branch"
643                            );
644                            // End of the snapshot read stream.
645                            // Consume the buffered upstream chunk without filtering by `binlog_low`.
646                            for chunk in upstream_chunk_buffer.drain(..) {
647                                yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
648                            }
649
650                            // mark backfill has finished
651                            state_impl
652                                .mutate_state(
653                                    current_pk_pos.clone(),
654                                    last_binlog_offset.clone(),
655                                    total_snapshot_row_count,
656                                    true,
657                                )
658                                .await?;
659
660                            // commit state because we have received a barrier message
661                            state_impl.commit_state(pending_barrier.epoch).await?;
662                            yield Message::Barrier(pending_barrier);
663                            // end of backfill loop, since backfill has finished
664                            break 'backfill_loop;
665                        }
666                        Some(chunk) => {
667                            // Raise the current pk position.
668                            current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
669
670                            let row_count = chunk.cardinality() as u64;
671                            cur_barrier_snapshot_processed_rows += row_count;
672                            total_snapshot_row_count += row_count;
673                            snapshot_read_row_cnt += row_count as usize;
674
675                            tracing::debug!(
676                                %table_id,
677                                ?current_pk_pos,
678                                ?snapshot_read_row_cnt,
679                                "force emit a snapshot chunk"
680                            );
681                            yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
682                        }
683                    }
684                }
685
686                // If the number of barriers reaches the snapshot interval,
687                // consume the buffered upstream chunks.
688                if let Some(current_pos) = &current_pk_pos {
689                    for chunk in upstream_chunk_buffer.drain(..) {
690                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
691
692                        // record the consumed binlog offset that will be
693                        // persisted later
694                        consumed_binlog_offset =
695                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
696
697                        yield Message::Chunk(mapping_chunk(
698                            mark_cdc_chunk(
699                                &offset_parse_func,
700                                chunk,
701                                current_pos,
702                                &pk_indices,
703                                &pk_order,
704                                last_binlog_offset.clone(),
705                            )?,
706                            &self.output_indices,
707                        ));
708                    }
709                } else {
710                    // If no current_pos, means we did not process any snapshot yet.
711                    // we can just ignore the upstream buffer chunk in that case.
712                    upstream_chunk_buffer.clear();
713                }
714
715                // Update last seen binlog offset
716                if consumed_binlog_offset.is_some() {
717                    last_binlog_offset.clone_from(&consumed_binlog_offset);
718                }
719
720                Self::report_metrics(
721                    &self.metrics,
722                    cur_barrier_snapshot_processed_rows,
723                    cur_barrier_upstream_processed_rows,
724                );
725
726                // update and persist current backfill progress
727                state_impl
728                    .mutate_state(
729                        current_pk_pos.clone(),
730                        last_binlog_offset.clone(),
731                        total_snapshot_row_count,
732                        false,
733                    )
734                    .await?;
735
736                state_impl.commit_state(pending_barrier.epoch).await?;
737                yield Message::Barrier(pending_barrier);
738            }
739        } else if self.options.disable_backfill {
740            // If backfill is disabled, we just mark the backfill as finished
741            tracing::info!(
742                %table_id,
743                upstream_table_name,
744                "CdcBackfill has been disabled"
745            );
746            state_impl
747                .mutate_state(
748                    current_pk_pos.clone(),
749                    last_binlog_offset.clone(),
750                    total_snapshot_row_count,
751                    true,
752                )
753                .await?;
754        }
755
756        upstream_table_reader.disconnect().await?;
757
758        tracing::info!(
759            %table_id,
760            upstream_table_name,
761            "CdcBackfill has already finished and will forward messages directly to the downstream"
762        );
763
764        // Wait for first barrier to come after backfill is finished.
765        // So we can update our progress + persist the status.
766        while let Some(Ok(msg)) = upstream.next().await {
767            if let Some(msg) = mapping_message(msg, &self.output_indices) {
768                // If not finished then we need to update state, otherwise no need.
769                if let Message::Barrier(barrier) = &msg {
770                    // finalized the backfill state
771                    // TODO: unify `mutate_state` and `commit_state` into one method
772                    state_impl
773                        .mutate_state(
774                            current_pk_pos.clone(),
775                            last_binlog_offset.clone(),
776                            total_snapshot_row_count,
777                            true,
778                        )
779                        .await?;
780                    state_impl.commit_state(barrier.epoch).await?;
781
782                    // mark progress as finished
783                    if let Some(progress) = self.progress.as_mut() {
784                        progress.finish(barrier.epoch, total_snapshot_row_count);
785                    }
786                    yield msg;
787                    // break after the state have been saved
788                    break;
789                }
790                yield msg;
791            }
792        }
793
794        // After backfill progress finished
795        // we can forward messages directly to the downstream,
796        // as backfill is finished.
797        #[for_await]
798        for msg in upstream {
799            // upstream offsets will be removed from the message before forwarding to
800            // downstream
801            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
802                if let Message::Barrier(barrier) = &msg {
803                    // commit state just to bump the epoch of state table
804                    state_impl.commit_state(barrier.epoch).await?;
805                }
806                yield msg;
807            }
808        }
809    }
810}
811
812pub(crate) async fn build_reader_and_poll_upstream(
813    upstream: &mut BoxedMessageStream,
814    table_reader: &mut Option<ExternalTableReaderImpl>,
815    future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
816) -> StreamExecutorResult<Option<Message>> {
817    if table_reader.is_some() {
818        return Ok(None);
819    }
820    tokio::select! {
821        biased;
822        reader = &mut *future => {
823            *table_reader = Some(reader);
824            Ok(None)
825        }
826        msg = upstream.next() => {
827            msg.transpose()
828        }
829    }
830}
831
832#[try_stream(ok = Message, error = StreamExecutorError)]
833pub async fn transform_upstream(
834    upstream: BoxedMessageStream,
835    output_columns: Vec<ColumnDesc>,
836    timestamp_handling: Option<TimestampHandling>,
837    timestamptz_handling: Option<TimestamptzHandling>,
838    time_handling: Option<TimeHandling>,
839    bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
840    handle_toast_columns: bool,
841) {
842    let props = SpecificParserConfig {
843        encoding_config: EncodingProperties::Json(JsonProperties {
844            use_schema_registry: false,
845            timestamp_handling,
846            timestamptz_handling,
847            time_handling,
848            bigint_unsigned_handling,
849            handle_toast_columns,
850        }),
851        // the cdc message is generated internally so the key must exist.
852        protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
853    };
854
855    // convert to source column desc to feed into parser
856    let columns_with_meta = output_columns
857        .iter()
858        .map(SourceColumnDesc::from)
859        .collect_vec();
860    let mut parser = DebeziumParser::new(
861        props,
862        columns_with_meta.clone(),
863        Arc::new(SourceContext::dummy()),
864    )
865    .await
866    .map_err(StreamExecutorError::connector_error)?;
867
868    pin_mut!(upstream);
869    #[for_await]
870    for msg in upstream {
871        let mut msg = msg?;
872        if let Message::Chunk(chunk) = &mut msg {
873            let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
874            let _ = std::mem::replace(chunk, parsed_chunk);
875        }
876        yield msg;
877    }
878}
879
880async fn parse_debezium_chunk(
881    parser: &mut DebeziumParser,
882    chunk: &StreamChunk,
883) -> StreamExecutorResult<StreamChunk> {
884    // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema
885    // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the
886    // table job with `_rw_offset` in the end
887    // see `gen_create_table_plan_for_cdc_source` for details
888
889    // use `SourceStreamChunkBuilder` for convenience
890    let mut builder = SourceStreamChunkBuilder::new(
891        parser.columns().to_vec(),
892        SourceCtrlOpts {
893            chunk_size: chunk.capacity(),
894            split_txn: false,
895        },
896    );
897
898    // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)`
899    // We should use the debezium parser to parse the first column,
900    // then chain the parsed row with `_rw_offset` row to get a new row.
901    let payloads = chunk.data_chunk().project(&[0]);
902    let offsets = chunk.data_chunk().project(&[1]).compact_vis();
903
904    // TODO: preserve the transaction semantics
905    for payload in payloads.rows() {
906        let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
907        else {
908            panic!("payload must be jsonb");
909        };
910
911        parser
912            .parse_inner(
913                None,
914                Some(jsonb_ref.to_string().as_bytes().to_vec()),
915                builder.row_writer(),
916            )
917            .await
918            .unwrap();
919    }
920    builder.finish_current_chunk();
921
922    let parsed_chunk = {
923        let mut iter = builder.consume_ready_chunks();
924        assert_eq!(1, iter.len());
925        iter.next().unwrap()
926    };
927    assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row
928    let (ops, mut columns, vis) = parsed_chunk.into_inner();
929    // note that `vis` is not necessarily the same as the original chunk's visibilities
930
931    // concat the rows in the parsed chunk with the `_rw_offset` column
932    columns.extend(offsets.into_parts().0);
933
934    Ok(StreamChunk::from_parts(
935        ops,
936        DataChunk::from_parts(columns.into(), vis),
937    ))
938}
939
940impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
941    fn execute(self: Box<Self>) -> BoxedMessageStream {
942        self.execute_inner().boxed()
943    }
944}
945
946#[cfg(test)]
947mod tests {
948    use std::collections::BTreeMap;
949    use std::str::FromStr;
950
951    use futures::{StreamExt, pin_mut};
952    use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
953    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
954    use risingwave_common::types::{DataType, Datum, JsonbVal};
955    use risingwave_common::util::epoch::test_epoch;
956    use risingwave_common::util::iter_util::ZipEqFast;
957    use risingwave_connector::source::cdc::CdcScanOptions;
958    use risingwave_storage::memory::MemoryStateStore;
959
960    use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
961    use crate::executor::monitor::StreamingMetrics;
962    use crate::executor::prelude::StateTable;
963    use crate::executor::source::default_source_internal_table;
964    use crate::executor::test_utils::MockSource;
965    use crate::executor::{
966        ActorContext, Barrier, CdcBackfillExecutor, ExternalStorageTable, Message,
967    };
968
969    #[tokio::test]
970    async fn test_transform_upstream_chunk() {
971        let schema = Schema::new(vec![
972            Field::unnamed(DataType::Jsonb),   // debezium json payload
973            Field::unnamed(DataType::Varchar), // _rw_offset
974            Field::unnamed(DataType::Varchar), // _rw_table_name
975        ]);
976        let stream_key = vec![1];
977        let (mut tx, source) = MockSource::channel();
978        let source = source.into_executor(schema.clone(), stream_key.clone());
979        // let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();
980        let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
981
982        let datums: Vec<Datum> = vec![
983            Some(JsonbVal::from_str(payload).unwrap().into()),
984            Some("file: 1.binlog, pos: 100".to_owned().into()),
985            Some("mydb.orders".to_owned().into()),
986        ];
987
988        println!("datums: {:?}", datums[1]);
989
990        let mut builders = schema.create_array_builders(8);
991        for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
992            builder.append(datum.clone());
993        }
994        let columns = builders
995            .into_iter()
996            .map(|builder| builder.finish().into())
997            .collect();
998
999        // one row chunk
1000        let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1001
1002        tx.push_chunk(chunk);
1003        let upstream = Box::new(source).execute();
1004
1005        // schema to the debezium parser
1006        let columns = vec![
1007            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1008            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1009            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1010            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1011            ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
1012            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1013        ];
1014
1015        let parsed_stream = transform_upstream(upstream, columns, None, None, None, None, false);
1016        pin_mut!(parsed_stream);
1017        // the output chunk must contain the offset column
1018        if let Some(message) = parsed_stream.next().await {
1019            println!("chunk: {:#?}", message.unwrap());
1020        }
1021    }
1022
1023    #[tokio::test]
1024    async fn test_build_reader_and_poll_upstream() {
1025        let actor_context = ActorContext::for_test(1);
1026        let external_storage_table = ExternalStorageTable::for_test_undefined();
1027        let schema = Schema::new(vec![
1028            Field::unnamed(DataType::Jsonb),   // debezium json payload
1029            Field::unnamed(DataType::Varchar), // _rw_offset
1030            Field::unnamed(DataType::Varchar), // _rw_table_name
1031        ]);
1032        let stream_key = vec![1];
1033        let (mut tx, source) = MockSource::channel();
1034        let source = source.into_executor(schema.clone(), stream_key.clone());
1035        let output_indices = vec![1, 0, 4]; //reorder
1036        let output_columns = vec![
1037            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1038            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1039            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1040            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1041            ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
1042            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1043        ];
1044        let store = MemoryStateStore::new();
1045        let state_table =
1046            StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
1047                .await;
1048        let cdc = CdcBackfillExecutor::new(
1049            actor_context,
1050            external_storage_table,
1051            source,
1052            output_indices,
1053            output_columns,
1054            None,
1055            StreamingMetrics::unused().into(),
1056            state_table,
1057            None,
1058            CdcScanOptions {
1059                // We want to mark backfill as finished. However it's not straightforward to do so.
1060                // Here we disable_backfill instead.
1061                disable_backfill: true,
1062                ..CdcScanOptions::default()
1063            },
1064            BTreeMap::default(),
1065        );
1066        // cdc.state_impl.init_epoch(EpochPair::new(test_epoch(4), test_epoch(3))).await.unwrap();
1067        // cdc.state_impl.mutate_state(None, None, 0, true).await.unwrap();
1068        // cdc.state_impl.commit_state(EpochPair::new(test_epoch(5), test_epoch(4))).await.unwrap();
1069        let s = cdc.execute_inner();
1070        pin_mut!(s);
1071
1072        // send first barrier
1073        tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1074        // send chunk
1075        {
1076            let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1077            let datums: Vec<Datum> = vec![
1078                Some(JsonbVal::from_str(payload).unwrap().into()),
1079                Some("file: 1.binlog, pos: 100".to_owned().into()),
1080                Some("mydb.orders".to_owned().into()),
1081            ];
1082            let mut builders = schema.create_array_builders(8);
1083            for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1084                builder.append(datum.clone());
1085            }
1086            let columns = builders
1087                .into_iter()
1088                .map(|builder| builder.finish().into())
1089                .collect();
1090            // one row chunk
1091            let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1092
1093            tx.push_chunk(chunk);
1094        }
1095        let _first_barrier = s.next().await.unwrap();
1096        let upstream_change_log = s.next().await.unwrap().unwrap();
1097        let Message::Chunk(chunk) = upstream_change_log else {
1098            panic!("expect chunk");
1099        };
1100        assert_eq!(chunk.columns().len(), 3);
1101        assert_eq!(chunk.rows().count(), 1);
1102        assert_eq!(
1103            chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1104            vec![Some(44485)]
1105        );
1106        assert_eq!(
1107            chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1108            vec![Some(5)]
1109        );
1110        assert_eq!(
1111            chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1112            vec![Some(100)]
1113        );
1114    }
1115}