risingwave_stream/executor/backfill/cdc/
cdc_backfill.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use either::Either;
20use futures::stream;
21use futures::stream::select_with_strategy;
22use itertools::Itertools;
23use risingwave_common::array::DataChunk;
24use risingwave_common::bail;
25use risingwave_common::catalog::ColumnDesc;
26use risingwave_connector::parser::{
27    BigintUnsignedHandlingMode, ByteStreamSourceParser, DebeziumParser, DebeziumProps,
28    EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder,
29    SpecificParserConfig, TimeHandling, TimestampHandling, TimestamptzHandling,
30};
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_connector::source::cdc::external::{
33    CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
34};
35use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
36use risingwave_pb::common::ThrottleType;
37use rw_futures_util::pausable;
38use thiserror_ext::AsReport;
39use tracing::Instrument;
40
41use crate::executor::UpdateMutation;
42use crate::executor::backfill::cdc::state::CdcBackfillState;
43use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
44use crate::executor::backfill::cdc::upstream_table::snapshot::{
45    SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
46};
47use crate::executor::backfill::utils::{
48    get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
49};
50use crate::executor::monitor::CdcBackfillMetrics;
51use crate::executor::prelude::*;
52use crate::executor::source::get_infinite_backoff_strategy;
53use crate::task::CreateMviewProgressReporter;
54
55/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
56const METADATA_STATE_LEN: usize = 4;
57
58pub struct CdcBackfillExecutor<S: StateStore> {
59    actor_ctx: ActorContextRef,
60
61    /// The external table to be backfilled
62    external_table: ExternalStorageTable,
63
64    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
65    upstream: Executor,
66
67    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
68    output_indices: Vec<usize>,
69
70    /// The schema of output chunk, including additional columns if any
71    output_columns: Vec<ColumnDesc>,
72
73    /// State table of the `CdcBackfill` executor
74    state_impl: CdcBackfillState<S>,
75
76    // TODO: introduce a CdcBackfillProgress to report finish to Meta
77    // This object is just a stub right now
78    progress: Option<CreateMviewProgressReporter>,
79
80    metrics: CdcBackfillMetrics,
81
82    /// Rate limit in rows/s.
83    rate_limit_rps: Option<u32>,
84
85    options: CdcScanOptions,
86
87    properties: BTreeMap<String, String>,
88}
89
90impl<S: StateStore> CdcBackfillExecutor<S> {
91    #[allow(clippy::too_many_arguments)]
92    pub fn new(
93        actor_ctx: ActorContextRef,
94        external_table: ExternalStorageTable,
95        upstream: Executor,
96        output_indices: Vec<usize>,
97        output_columns: Vec<ColumnDesc>,
98        progress: Option<CreateMviewProgressReporter>,
99        metrics: Arc<StreamingMetrics>,
100        state_table: StateTable<S>,
101        rate_limit_rps: Option<u32>,
102        options: CdcScanOptions,
103        properties: BTreeMap<String, String>,
104    ) -> Self {
105        let pk_indices = external_table.pk_indices();
106        let upstream_table_id = external_table.table_id();
107        let state_impl = CdcBackfillState::new(
108            upstream_table_id,
109            state_table,
110            pk_indices.len() + METADATA_STATE_LEN,
111        );
112
113        let metrics = metrics.new_cdc_backfill_metrics(external_table.table_id(), actor_ctx.id);
114        Self {
115            actor_ctx,
116            external_table,
117            upstream,
118            output_indices,
119            output_columns,
120            state_impl,
121            progress,
122            metrics,
123            rate_limit_rps,
124            options,
125            properties,
126        }
127    }
128
129    fn report_metrics(
130        metrics: &CdcBackfillMetrics,
131        snapshot_processed_row_count: u64,
132        upstream_processed_row_count: u64,
133    ) {
134        metrics
135            .cdc_backfill_snapshot_read_row_count
136            .inc_by(snapshot_processed_row_count);
137
138        metrics
139            .cdc_backfill_upstream_output_row_count
140            .inc_by(upstream_processed_row_count);
141    }
142
143    #[try_stream(ok = Message, error = StreamExecutorError)]
144    async fn execute_inner(mut self) {
145        // The indices to primary key columns
146        let pk_indices = self.external_table.pk_indices().to_vec();
147        let pk_order = self.external_table.pk_order_types().to_vec();
148
149        let table_id = self.external_table.table_id();
150        let upstream_table_name = self.external_table.qualified_table_name();
151        let schema_table_name = self.external_table.schema_table_name().clone();
152        let external_database_name = self.external_table.database_name().to_owned();
153
154        let additional_columns = self
155            .output_columns
156            .iter()
157            .filter(|col| col.additional_column.column_type.is_some())
158            .cloned()
159            .collect_vec();
160
161        let mut upstream = self.upstream.execute();
162
163        // Current position of the upstream_table storage primary key.
164        // `None` means it starts from the beginning.
165        let mut current_pk_pos: Option<OwnedRow>;
166
167        // Poll the upstream to get the first barrier.
168        let first_barrier = expect_first_barrier(&mut upstream).await?;
169
170        let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
171        let first_barrier_epoch = first_barrier.epoch;
172        // The first barrier message should be propagated.
173        yield Message::Barrier(first_barrier);
174        let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);
175
176        // Check whether this parallelism has been assigned splits,
177        // if not, we should bypass the backfill directly.
178        let mut state_impl = self.state_impl;
179
180        state_impl.init_epoch(first_barrier_epoch).await?;
181
182        // restore backfill state
183        let state = state_impl.restore_state().await?;
184        current_pk_pos = state.current_pk_pos.clone();
185
186        let need_backfill = !self.options.disable_backfill && !state.is_finished;
187
188        // Keep track of rows from the snapshot.
189        let mut total_snapshot_row_count = state.row_count as u64;
190
191        // After init the state table and forward the initial barrier to downstream,
192        // we now try to create the table reader with retry.
193        // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
194        // If backfill is finished, we should forward the upstream cdc events to downstream.
195        let mut table_reader: Option<ExternalTableReaderImpl> = None;
196        let external_table = self.external_table.clone();
197        let mut future = Box::pin(async move {
198            let backoff = get_infinite_backoff_strategy();
199            tokio_retry::Retry::spawn(backoff, || async {
200                match external_table.create_table_reader().await {
201                    Ok(reader) => Ok(reader),
202                    Err(e) => {
203                        tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
204                        Err(e)
205                    }
206                }
207            })
208            .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
209            .await
210            .expect("Retry create cdc table reader until success.")
211        });
212        let timestamp_handling: Option<TimestampHandling> = self
213            .properties
214            .get("debezium.time.precision.mode")
215            .map(|v| v == "connect")
216            .unwrap_or(false)
217            .then_some(TimestampHandling::Milli);
218        let timestamptz_handling: Option<TimestamptzHandling> = self
219            .properties
220            .get("debezium.time.precision.mode")
221            .map(|v| v == "connect")
222            .unwrap_or(false)
223            .then_some(TimestamptzHandling::Milli);
224        let time_handling: Option<TimeHandling> = self
225            .properties
226            .get("debezium.time.precision.mode")
227            .map(|v| v == "connect")
228            .unwrap_or(false)
229            .then_some(TimeHandling::Milli);
230        let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
231            .properties
232            .get("debezium.bigint.unsigned.handling.mode")
233            .map(|v| v == "precise")
234            .unwrap_or(false)
235            .then_some(BigintUnsignedHandlingMode::Precise);
236        // Only postgres-cdc connector may trigger TOAST.
237        let handle_toast_columns: bool =
238            self.external_table.table_type() == &ExternalCdcTableType::Postgres;
239        // Make sure to use mapping_message after transform_upstream.
240        let mut upstream = transform_upstream(
241            upstream,
242            self.output_columns.clone(),
243            timestamp_handling,
244            timestamptz_handling,
245            time_handling,
246            bigint_unsigned_handling,
247            handle_toast_columns,
248        )
249        .boxed();
250        loop {
251            if let Some(msg) =
252                build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
253                    .await?
254            {
255                if let Some(msg) = mapping_message(msg, &self.output_indices) {
256                    match msg {
257                        Message::Barrier(barrier) => {
258                            // commit state to bump the epoch of state table
259                            state_impl.commit_state(barrier.epoch).await?;
260                            yield Message::Barrier(barrier);
261                        }
262                        Message::Chunk(chunk) => {
263                            if need_backfill {
264                                // ignore chunk if we need backfill, since we can read the data from the snapshot
265                            } else {
266                                // forward the chunk to downstream
267                                yield Message::Chunk(chunk);
268                            }
269                        }
270                        Message::Watermark(_) => {
271                            // ignore watermark
272                        }
273                    }
274                }
275            } else {
276                assert!(table_reader.is_some(), "table reader must created");
277                tracing::info!(
278                    %table_id,
279                    upstream_table_name,
280                    "table reader created successfully"
281                );
282                break;
283            }
284        }
285
286        let upstream_table_reader = UpstreamTableReader::new(
287            self.external_table.clone(),
288            table_reader.expect("table reader must created"),
289        );
290
291        let mut upstream = upstream.peekable();
292
293        let mut last_binlog_offset: Option<CdcOffset> = {
294            // Limit concurrent CDC connections globally to 10 using a semaphore.
295            static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
296                tokio::sync::Semaphore::const_new(10);
297
298            let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
299            state
300                .last_cdc_offset
301                .map_or(upstream_table_reader.current_cdc_offset().await?, Some)
302        };
303
304        let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
305        let mut consumed_binlog_offset: Option<CdcOffset> = None;
306
307        tracing::info!(
308            %table_id,
309            upstream_table_name,
310            initial_binlog_offset = ?last_binlog_offset,
311            ?current_pk_pos,
312            is_finished = state.is_finished,
313            is_snapshot_paused,
314            snapshot_row_count = total_snapshot_row_count,
315            rate_limit = self.rate_limit_rps,
316            disable_backfill = self.options.disable_backfill,
317            snapshot_barrier_interval = self.options.snapshot_barrier_interval,
318            snapshot_batch_size = self.options.snapshot_batch_size,
319            "start cdc backfill",
320        );
321
322        // CDC Backfill Algorithm:
323        //
324        // When the first barrier comes from upstream:
325        //  - read the current binlog offset as `binlog_low`
326        //  - start a snapshot read upon upstream table and iterate over the snapshot read stream
327        //  - buffer the changelog event from upstream
328        //
329        // When a new barrier comes from upstream:
330        //  - read the current binlog offset as `binlog_high`
331        //  - for each row of the upstream change log, forward it to downstream if it in the range
332        //    of [binlog_low, binlog_high] and its pk <= `current_pos`, otherwise ignore it
333        //  - reconstruct the whole backfill stream with upstream changelog and a new table snapshot
334        //
335        // When a chunk comes from snapshot, we forward it to the downstream and raise
336        // `current_pos`.
337        // When we reach the end of the snapshot read stream, it means backfill has been
338        // finished.
339        //
340        // Once the backfill loop ends, we forward the upstream directly to the downstream.
341        if need_backfill {
342            // drive the upstream changelog first to ensure we can receive timely changelog event,
343            // otherwise the upstream changelog may be blocked by the snapshot read stream
344            let _ = Pin::new(&mut upstream).peek().await;
345
346            // wait for a barrier to make sure the backfill starts after upstream source
347            #[for_await]
348            for msg in upstream.by_ref() {
349                match msg? {
350                    Message::Barrier(barrier) => {
351                        match barrier.mutation.as_deref() {
352                            Some(crate::executor::Mutation::Pause) => {
353                                is_snapshot_paused = true;
354                                tracing::info!(
355                                    %table_id,
356                                    upstream_table_name,
357                                    "snapshot is paused by barrier"
358                                );
359                            }
360                            Some(crate::executor::Mutation::Resume) => {
361                                is_snapshot_paused = false;
362                                tracing::info!(
363                                    %table_id,
364                                    upstream_table_name,
365                                    "snapshot is resumed by barrier"
366                                );
367                            }
368                            _ => {
369                                // ignore other mutations
370                            }
371                        }
372                        // commit state just to bump the epoch of state table
373                        state_impl.commit_state(barrier.epoch).await?;
374                        yield Message::Barrier(barrier);
375                        break;
376                    }
377                    Message::Chunk(ref chunk) => {
378                        last_binlog_offset = get_cdc_chunk_last_offset(&offset_parse_func, chunk)?;
379                    }
380                    Message::Watermark(_) => {
381                        // Ignore watermark
382                    }
383                }
384            }
385
386            tracing::info!(%table_id,
387                upstream_table_name,
388                initial_binlog_offset = ?last_binlog_offset,
389                ?current_pk_pos,
390                is_snapshot_paused,
391                "start cdc backfill loop");
392
393            // the buffer will be drained when a barrier comes
394            let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
395
396            'backfill_loop: loop {
397                let left_upstream = upstream.by_ref().map(Either::Left);
398
399                let mut snapshot_read_row_cnt: usize = 0;
400                let read_args = SnapshotReadArgs::new(
401                    current_pk_pos.clone(),
402                    self.rate_limit_rps,
403                    pk_indices.clone(),
404                    additional_columns.clone(),
405                    schema_table_name.clone(),
406                    external_database_name.clone(),
407                );
408                let right_snapshot = pin!(
409                    upstream_table_reader
410                        .snapshot_read_full_table(read_args, self.options.snapshot_batch_size)
411                        .map(Either::Right)
412                );
413                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
414                if is_snapshot_paused {
415                    snapshot_valve.pause();
416                }
417
418                // Prefer to select upstream, so we can stop snapshot stream when barrier comes.
419                let mut backfill_stream =
420                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
421                        stream::PollNext::Left
422                    });
423
424                let mut cur_barrier_snapshot_processed_rows: u64 = 0;
425                let mut cur_barrier_upstream_processed_rows: u64 = 0;
426                let mut barrier_count: u32 = 0;
427                let mut pending_barrier = None;
428
429                #[for_await]
430                for either in &mut backfill_stream {
431                    match either {
432                        // Upstream
433                        Either::Left(msg) => {
434                            match msg? {
435                                Message::Barrier(barrier) => {
436                                    // increase the barrier count and check whether need to start a new snapshot
437                                    barrier_count += 1;
438                                    let can_start_new_snapshot =
439                                        barrier_count == self.options.snapshot_barrier_interval;
440
441                                    if let Some(mutation) = barrier.mutation.as_deref() {
442                                        use crate::executor::Mutation;
443                                        match mutation {
444                                            Mutation::Pause => {
445                                                is_snapshot_paused = true;
446                                                snapshot_valve.pause();
447                                            }
448                                            Mutation::Resume => {
449                                                is_snapshot_paused = false;
450                                                snapshot_valve.resume();
451                                            }
452                                            Mutation::Throttle(some) => {
453                                                if let Some(entry) =
454                                                    some.get(&self.actor_ctx.fragment_id)
455                                                    && entry.throttle_type()
456                                                        == ThrottleType::Backfill
457                                                    && entry.rate_limit != self.rate_limit_rps
458                                                {
459                                                    self.rate_limit_rps = entry.rate_limit;
460                                                    rate_limit_to_zero = self
461                                                        .rate_limit_rps
462                                                        .is_some_and(|val| val == 0);
463                                                    // update and persist current backfill progress without draining the buffered upstream chunks
464                                                    state_impl
465                                                        .mutate_state(
466                                                            current_pk_pos.clone(),
467                                                            last_binlog_offset.clone(),
468                                                            total_snapshot_row_count,
469                                                            false,
470                                                        )
471                                                        .await?;
472                                                    state_impl.commit_state(barrier.epoch).await?;
473                                                    yield Message::Barrier(barrier);
474
475                                                    // rebuild the snapshot stream with new rate limit
476                                                    continue 'backfill_loop;
477                                                }
478                                            }
479                                            Mutation::Update(UpdateMutation {
480                                                dropped_actors,
481                                                ..
482                                            }) => {
483                                                if dropped_actors.contains(&self.actor_ctx.id) {
484                                                    // the actor has been dropped, exit the backfill loop
485                                                    tracing::info!(
486                                                        %table_id,
487                                                        upstream_table_name,
488                                                        "CdcBackfill has been dropped due to config change"
489                                                    );
490                                                    yield Message::Barrier(barrier);
491                                                    break 'backfill_loop;
492                                                }
493                                            }
494                                            _ => (),
495                                        }
496                                    }
497
498                                    Self::report_metrics(
499                                        &self.metrics,
500                                        cur_barrier_snapshot_processed_rows,
501                                        cur_barrier_upstream_processed_rows,
502                                    );
503
504                                    // when processing a barrier, check whether can start a new snapshot
505                                    // if the number of barriers reaches the snapshot interval
506                                    if can_start_new_snapshot {
507                                        // staging the barrier
508                                        pending_barrier = Some(barrier);
509                                        tracing::debug!(
510                                            %table_id,
511                                            ?current_pk_pos,
512                                            ?snapshot_read_row_cnt,
513                                            "Prepare to start a new snapshot"
514                                        );
515                                        // Break the loop for consuming snapshot and prepare to start a new snapshot
516                                        break;
517                                    } else {
518                                        // update and persist current backfill progress
519                                        state_impl
520                                            .mutate_state(
521                                                current_pk_pos.clone(),
522                                                last_binlog_offset.clone(),
523                                                total_snapshot_row_count,
524                                                false,
525                                            )
526                                            .await?;
527
528                                        state_impl.commit_state(barrier.epoch).await?;
529
530                                        // emit barrier and continue consume the backfill stream
531                                        yield Message::Barrier(barrier);
532                                    }
533                                }
534                                Message::Chunk(chunk) => {
535                                    // skip empty upstream chunk
536                                    if chunk.cardinality() == 0 {
537                                        continue;
538                                    }
539
540                                    let chunk_binlog_offset =
541                                        get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
542
543                                    tracing::trace!(
544                                        "recv changelog chunk: chunk_offset {:?}, capactiy {}",
545                                        chunk_binlog_offset,
546                                        chunk.capacity()
547                                    );
548
549                                    // Since we don't need changelog before the
550                                    // `last_binlog_offset`, skip the chunk that *only* contains
551                                    // events before `last_binlog_offset`.
552                                    if let Some(last_binlog_offset) = last_binlog_offset.as_ref()
553                                        && let Some(chunk_offset) = chunk_binlog_offset
554                                        && chunk_offset < *last_binlog_offset
555                                    {
556                                        tracing::trace!(
557                                            "skip changelog chunk: chunk_offset {:?}, capacity {}",
558                                            chunk_offset,
559                                            chunk.capacity()
560                                        );
561                                        continue;
562                                    }
563                                    // Buffer the upstream chunk.
564                                    upstream_chunk_buffer.push(chunk.compact_vis());
565                                }
566                                Message::Watermark(_) => {
567                                    // Ignore watermark during backfill.
568                                }
569                            }
570                        }
571                        // Snapshot read
572                        Either::Right(msg) => {
573                            match msg? {
574                                None => {
575                                    tracing::info!(
576                                        %table_id,
577                                        ?last_binlog_offset,
578                                        ?current_pk_pos,
579                                        "snapshot read stream ends"
580                                    );
581                                    // If the snapshot read stream ends, it means all historical
582                                    // data has been loaded.
583                                    // We should not mark the chunk anymore,
584                                    // otherwise, we will ignore some rows in the buffer.
585                                    for chunk in upstream_chunk_buffer.drain(..) {
586                                        yield Message::Chunk(mapping_chunk(
587                                            chunk,
588                                            &self.output_indices,
589                                        ));
590                                    }
591
592                                    // backfill has finished, exit the backfill loop and persist the state when we recv a barrier
593                                    break 'backfill_loop;
594                                }
595                                Some(chunk) => {
596                                    // Raise the current position.
597                                    // As snapshot read streams are ordered by pk, so we can
598                                    // just use the last row to update `current_pos`.
599                                    current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
600
601                                    tracing::trace!(
602                                        "got a snapshot chunk: len {}, current_pk_pos {:?}",
603                                        chunk.cardinality(),
604                                        current_pk_pos
605                                    );
606                                    let chunk_cardinality = chunk.cardinality() as u64;
607                                    cur_barrier_snapshot_processed_rows += chunk_cardinality;
608                                    total_snapshot_row_count += chunk_cardinality;
609                                    yield Message::Chunk(mapping_chunk(
610                                        chunk,
611                                        &self.output_indices,
612                                    ));
613                                }
614                            }
615                        }
616                    }
617                }
618
619                assert!(pending_barrier.is_some(), "pending_barrier must exist");
620                let pending_barrier = pending_barrier.unwrap();
621
622                // Here we have to ensure the snapshot stream is consumed at least once,
623                // since the barrier event can kick in anytime.
624                // Otherwise, the result set of the new snapshot stream may become empty.
625                // It maybe a cancellation bug of the mysql driver.
626                let (_, mut snapshot_stream) = backfill_stream.into_inner();
627
628                // skip consume the snapshot stream if it is paused or rate limit to 0
629                if !is_snapshot_paused
630                    && !rate_limit_to_zero
631                    && let Some(msg) = snapshot_stream
632                        .next()
633                        .instrument_await("consume_snapshot_stream_once")
634                        .await
635                {
636                    let Either::Right(msg) = msg else {
637                        bail!("BUG: snapshot_read contains upstream messages");
638                    };
639                    match msg? {
640                        None => {
641                            tracing::info!(
642                                %table_id,
643                                ?last_binlog_offset,
644                                ?current_pk_pos,
645                                "snapshot read stream ends in the force emit branch"
646                            );
647                            // End of the snapshot read stream.
648                            // Consume the buffered upstream chunk without filtering by `binlog_low`.
649                            for chunk in upstream_chunk_buffer.drain(..) {
650                                yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
651                            }
652
653                            // mark backfill has finished
654                            state_impl
655                                .mutate_state(
656                                    current_pk_pos.clone(),
657                                    last_binlog_offset.clone(),
658                                    total_snapshot_row_count,
659                                    true,
660                                )
661                                .await?;
662
663                            // commit state because we have received a barrier message
664                            state_impl.commit_state(pending_barrier.epoch).await?;
665                            yield Message::Barrier(pending_barrier);
666                            // end of backfill loop, since backfill has finished
667                            break 'backfill_loop;
668                        }
669                        Some(chunk) => {
670                            // Raise the current pk position.
671                            current_pk_pos = Some(get_new_pos(&chunk, &pk_indices));
672
673                            let row_count = chunk.cardinality() as u64;
674                            cur_barrier_snapshot_processed_rows += row_count;
675                            total_snapshot_row_count += row_count;
676                            snapshot_read_row_cnt += row_count as usize;
677
678                            tracing::debug!(
679                                %table_id,
680                                ?current_pk_pos,
681                                ?snapshot_read_row_cnt,
682                                "force emit a snapshot chunk"
683                            );
684                            yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
685                        }
686                    }
687                }
688
689                // If the number of barriers reaches the snapshot interval,
690                // consume the buffered upstream chunks.
691                if let Some(current_pos) = &current_pk_pos {
692                    for chunk in upstream_chunk_buffer.drain(..) {
693                        cur_barrier_upstream_processed_rows += chunk.cardinality() as u64;
694
695                        // record the consumed binlog offset that will be
696                        // persisted later
697                        consumed_binlog_offset =
698                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
699
700                        yield Message::Chunk(mapping_chunk(
701                            mark_cdc_chunk(
702                                &offset_parse_func,
703                                chunk,
704                                current_pos,
705                                &pk_indices,
706                                &pk_order,
707                                last_binlog_offset.clone(),
708                            )?,
709                            &self.output_indices,
710                        ));
711                    }
712                } else {
713                    // If no current_pos, means we did not process any snapshot yet.
714                    // we can just ignore the upstream buffer chunk in that case.
715                    upstream_chunk_buffer.clear();
716                }
717
718                // Update last seen binlog offset
719                if consumed_binlog_offset.is_some() {
720                    last_binlog_offset.clone_from(&consumed_binlog_offset);
721                }
722
723                Self::report_metrics(
724                    &self.metrics,
725                    cur_barrier_snapshot_processed_rows,
726                    cur_barrier_upstream_processed_rows,
727                );
728
729                // update and persist current backfill progress
730                state_impl
731                    .mutate_state(
732                        current_pk_pos.clone(),
733                        last_binlog_offset.clone(),
734                        total_snapshot_row_count,
735                        false,
736                    )
737                    .await?;
738
739                state_impl.commit_state(pending_barrier.epoch).await?;
740                yield Message::Barrier(pending_barrier);
741            }
742        } else if self.options.disable_backfill {
743            // If backfill is disabled, we just mark the backfill as finished
744            tracing::info!(
745                %table_id,
746                upstream_table_name,
747                "CdcBackfill has been disabled"
748            );
749            state_impl
750                .mutate_state(
751                    current_pk_pos.clone(),
752                    last_binlog_offset.clone(),
753                    total_snapshot_row_count,
754                    true,
755                )
756                .await?;
757        }
758
759        upstream_table_reader.disconnect().await?;
760
761        tracing::info!(
762            %table_id,
763            upstream_table_name,
764            "CdcBackfill has already finished and will forward messages directly to the downstream"
765        );
766
767        // Wait for first barrier to come after backfill is finished.
768        // So we can update our progress + persist the status.
769        while let Some(Ok(msg)) = upstream.next().await {
770            if let Some(msg) = mapping_message(msg, &self.output_indices) {
771                // If not finished then we need to update state, otherwise no need.
772                if let Message::Barrier(barrier) = &msg {
773                    // finalized the backfill state
774                    // TODO: unify `mutate_state` and `commit_state` into one method
775                    state_impl
776                        .mutate_state(
777                            current_pk_pos.clone(),
778                            last_binlog_offset.clone(),
779                            total_snapshot_row_count,
780                            true,
781                        )
782                        .await?;
783                    state_impl.commit_state(barrier.epoch).await?;
784
785                    // mark progress as finished
786                    if let Some(progress) = self.progress.as_mut() {
787                        progress.finish(barrier.epoch, total_snapshot_row_count);
788                    }
789                    yield msg;
790                    // break after the state have been saved
791                    break;
792                }
793                yield msg;
794            }
795        }
796
797        // After backfill progress finished
798        // we can forward messages directly to the downstream,
799        // as backfill is finished.
800        #[for_await]
801        for msg in upstream {
802            // upstream offsets will be removed from the message before forwarding to
803            // downstream
804            if let Some(msg) = mapping_message(msg?, &self.output_indices) {
805                if let Message::Barrier(barrier) = &msg {
806                    // commit state just to bump the epoch of state table
807                    state_impl.commit_state(barrier.epoch).await?;
808                }
809                yield msg;
810            }
811        }
812    }
813}
814
815pub(crate) async fn build_reader_and_poll_upstream(
816    upstream: &mut BoxedMessageStream,
817    table_reader: &mut Option<ExternalTableReaderImpl>,
818    future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
819) -> StreamExecutorResult<Option<Message>> {
820    if table_reader.is_some() {
821        return Ok(None);
822    }
823    tokio::select! {
824        biased;
825        reader = &mut *future => {
826            *table_reader = Some(reader);
827            Ok(None)
828        }
829        msg = upstream.next() => {
830            msg.transpose()
831        }
832    }
833}
834
835#[try_stream(ok = Message, error = StreamExecutorError)]
836pub async fn transform_upstream(
837    upstream: BoxedMessageStream,
838    output_columns: Vec<ColumnDesc>,
839    timestamp_handling: Option<TimestampHandling>,
840    timestamptz_handling: Option<TimestamptzHandling>,
841    time_handling: Option<TimeHandling>,
842    bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
843    handle_toast_columns: bool,
844) {
845    let props = SpecificParserConfig {
846        encoding_config: EncodingProperties::Json(JsonProperties {
847            use_schema_registry: false,
848            timestamp_handling,
849            timestamptz_handling,
850            time_handling,
851            bigint_unsigned_handling,
852            handle_toast_columns,
853        }),
854        // the cdc message is generated internally so the key must exist.
855        protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
856    };
857
858    // convert to source column desc to feed into parser
859    let columns_with_meta = output_columns
860        .iter()
861        .map(SourceColumnDesc::from)
862        .collect_vec();
863    let mut parser = DebeziumParser::new(
864        props,
865        columns_with_meta.clone(),
866        Arc::new(SourceContext::dummy()),
867    )
868    .await
869    .map_err(StreamExecutorError::connector_error)?;
870
871    pin_mut!(upstream);
872    #[for_await]
873    for msg in upstream {
874        let mut msg = msg?;
875        if let Message::Chunk(chunk) = &mut msg {
876            let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?;
877            let _ = std::mem::replace(chunk, parsed_chunk);
878        }
879        yield msg;
880    }
881}
882
883async fn parse_debezium_chunk(
884    parser: &mut DebeziumParser,
885    chunk: &StreamChunk,
886) -> StreamExecutorResult<StreamChunk> {
887    // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema
888    // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the
889    // table job with `_rw_offset` in the end
890    // see `gen_create_table_plan_for_cdc_source` for details
891
892    // use `SourceStreamChunkBuilder` for convenience
893    let mut builder = SourceStreamChunkBuilder::new(
894        parser.columns().to_vec(),
895        SourceCtrlOpts {
896            chunk_size: chunk.capacity(),
897            split_txn: false,
898        },
899    );
900
901    // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)`
902    // We should use the debezium parser to parse the first column,
903    // then chain the parsed row with `_rw_offset` row to get a new row.
904    let payloads = chunk.data_chunk().project(&[0]);
905    let offsets = chunk.data_chunk().project(&[1]).compact_vis();
906
907    // TODO: preserve the transaction semantics
908    for payload in payloads.rows() {
909        let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist")
910        else {
911            panic!("payload must be jsonb");
912        };
913
914        parser
915            .parse_inner(
916                None,
917                Some(jsonb_ref.to_string().as_bytes().to_vec()),
918                builder.row_writer(),
919            )
920            .await
921            .unwrap();
922    }
923    builder.finish_current_chunk();
924
925    let parsed_chunk = {
926        let mut iter = builder.consume_ready_chunks();
927        assert_eq!(1, iter.len());
928        iter.next().unwrap()
929    };
930    assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row
931    let (ops, mut columns, vis) = parsed_chunk.into_inner();
932    // note that `vis` is not necessarily the same as the original chunk's visibilities
933
934    // concat the rows in the parsed chunk with the `_rw_offset` column
935    columns.extend(offsets.into_parts().0);
936
937    Ok(StreamChunk::from_parts(
938        ops,
939        DataChunk::from_parts(columns.into(), vis),
940    ))
941}
942
943impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
944    fn execute(self: Box<Self>) -> BoxedMessageStream {
945        self.execute_inner().boxed()
946    }
947}
948
949#[cfg(test)]
950mod tests {
951    use std::collections::BTreeMap;
952    use std::str::FromStr;
953
954    use futures::{StreamExt, pin_mut};
955    use risingwave_common::array::{Array, DataChunk, Op, StreamChunk};
956    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
957    use risingwave_common::types::{DataType, Datum, JsonbVal};
958    use risingwave_common::util::epoch::test_epoch;
959    use risingwave_common::util::iter_util::ZipEqFast;
960    use risingwave_connector::source::cdc::CdcScanOptions;
961    use risingwave_storage::memory::MemoryStateStore;
962
963    use crate::executor::backfill::cdc::cdc_backfill::transform_upstream;
964    use crate::executor::monitor::StreamingMetrics;
965    use crate::executor::prelude::StateTable;
966    use crate::executor::source::default_source_internal_table;
967    use crate::executor::test_utils::MockSource;
968    use crate::executor::{
969        ActorContext, Barrier, CdcBackfillExecutor, ExternalStorageTable, Message,
970    };
971
972    #[tokio::test]
973    async fn test_transform_upstream_chunk() {
974        let schema = Schema::new(vec![
975            Field::unnamed(DataType::Jsonb),   // debezium json payload
976            Field::unnamed(DataType::Varchar), // _rw_offset
977            Field::unnamed(DataType::Varchar), // _rw_table_name
978        ]);
979        let stream_key = vec![1];
980        let (mut tx, source) = MockSource::channel();
981        let source = source.into_executor(schema.clone(), stream_key.clone());
982        // let payload = r#"{"before": null,"after":{"O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" },"source":{"version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null},"op":"r","ts_ms":1695277757017,"transaction":null}"#.to_string();
983        let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
984
985        let datums: Vec<Datum> = vec![
986            Some(JsonbVal::from_str(payload).unwrap().into()),
987            Some("file: 1.binlog, pos: 100".to_owned().into()),
988            Some("mydb.orders".to_owned().into()),
989        ];
990
991        println!("datums: {:?}", datums[1]);
992
993        let mut builders = schema.create_array_builders(8);
994        for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
995            builder.append(datum.clone());
996        }
997        let columns = builders
998            .into_iter()
999            .map(|builder| builder.finish().into())
1000            .collect();
1001
1002        // one row chunk
1003        let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1004
1005        tx.push_chunk(chunk);
1006        let upstream = Box::new(source).execute();
1007
1008        // schema to the debezium parser
1009        let columns = vec![
1010            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1011            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1012            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1013            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1014            ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
1015            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1016        ];
1017
1018        let parsed_stream = transform_upstream(upstream, columns, None, None, None, None, false);
1019        pin_mut!(parsed_stream);
1020        // the output chunk must contain the offset column
1021        if let Some(message) = parsed_stream.next().await {
1022            println!("chunk: {:#?}", message.unwrap());
1023        }
1024    }
1025
1026    #[tokio::test]
1027    async fn test_build_reader_and_poll_upstream() {
1028        let actor_context = ActorContext::for_test(1);
1029        let external_storage_table = ExternalStorageTable::for_test_undefined();
1030        let schema = Schema::new(vec![
1031            Field::unnamed(DataType::Jsonb),   // debezium json payload
1032            Field::unnamed(DataType::Varchar), // _rw_offset
1033            Field::unnamed(DataType::Varchar), // _rw_table_name
1034        ]);
1035        let stream_key = vec![1];
1036        let (mut tx, source) = MockSource::channel();
1037        let source = source.into_executor(schema.clone(), stream_key.clone());
1038        let output_indices = vec![1, 0, 4]; //reorder
1039        let output_columns = vec![
1040            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
1041            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
1042            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
1043            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
1044            ColumnDesc::named("O_DUMMY", ColumnId::new(5), DataType::Int64),
1045            ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz),
1046        ];
1047        let store = MemoryStateStore::new();
1048        let state_table =
1049            StateTable::from_table_catalog(&default_source_internal_table(0x2333), store, None)
1050                .await;
1051        let cdc = CdcBackfillExecutor::new(
1052            actor_context,
1053            external_storage_table,
1054            source,
1055            output_indices,
1056            output_columns,
1057            None,
1058            StreamingMetrics::unused().into(),
1059            state_table,
1060            None,
1061            CdcScanOptions {
1062                // We want to mark backfill as finished. However it's not straightforward to do so.
1063                // Here we disable_backfill instead.
1064                disable_backfill: true,
1065                ..CdcScanOptions::default()
1066            },
1067            BTreeMap::default(),
1068        );
1069        // cdc.state_impl.init_epoch(EpochPair::new(test_epoch(4), test_epoch(3))).await.unwrap();
1070        // cdc.state_impl.mutate_state(None, None, 0, true).await.unwrap();
1071        // cdc.state_impl.commit_state(EpochPair::new(test_epoch(5), test_epoch(4))).await.unwrap();
1072        let s = cdc.execute_inner();
1073        pin_mut!(s);
1074
1075        // send first barrier
1076        tx.send_barrier(Barrier::new_test_barrier(test_epoch(8)));
1077        // send chunk
1078        {
1079            let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"#;
1080            let datums: Vec<Datum> = vec![
1081                Some(JsonbVal::from_str(payload).unwrap().into()),
1082                Some("file: 1.binlog, pos: 100".to_owned().into()),
1083                Some("mydb.orders".to_owned().into()),
1084            ];
1085            let mut builders = schema.create_array_builders(8);
1086            for (builder, datum) in builders.iter_mut().zip_eq_fast(datums.iter()) {
1087                builder.append(datum.clone());
1088            }
1089            let columns = builders
1090                .into_iter()
1091                .map(|builder| builder.finish().into())
1092                .collect();
1093            // one row chunk
1094            let chunk = StreamChunk::from_parts(vec![Op::Insert], DataChunk::new(columns, 1));
1095
1096            tx.push_chunk(chunk);
1097        }
1098        let _first_barrier = s.next().await.unwrap();
1099        let upstream_change_log = s.next().await.unwrap().unwrap();
1100        let Message::Chunk(chunk) = upstream_change_log else {
1101            panic!("expect chunk");
1102        };
1103        assert_eq!(chunk.columns().len(), 3);
1104        assert_eq!(chunk.rows().count(), 1);
1105        assert_eq!(
1106            chunk.columns()[0].as_int64().iter().collect::<Vec<_>>(),
1107            vec![Some(44485)]
1108        );
1109        assert_eq!(
1110            chunk.columns()[1].as_int64().iter().collect::<Vec<_>>(),
1111            vec![Some(5)]
1112        );
1113        assert_eq!(
1114            chunk.columns()[2].as_int64().iter().collect::<Vec<_>>(),
1115            vec![Some(100)]
1116        );
1117    }
1118}