risingwave_stream/executor/backfill/cdc/
cdc_backill_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{
27    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
28};
29use risingwave_connector::source::cdc::CdcScanOptions;
30use risingwave_connector::source::cdc::external::{
31    CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
32};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use rw_futures_util::pausable;
35use thiserror_ext::AsReport;
36use tracing::Instrument;
37
38use crate::executor::UpdateMutation;
39use crate::executor::backfill::cdc::cdc_backfill::{
40    build_reader_and_poll_upstream, transform_upstream,
41};
42use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
43use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
44use crate::executor::backfill::cdc::upstream_table::snapshot::{
45    SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
46};
47use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
48use crate::executor::prelude::*;
49use crate::executor::source::get_infinite_backoff_strategy;
50use crate::task::cdc_progress::CdcProgressReporter;
51pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
52    actor_ctx: ActorContextRef,
53
54    /// The external table to be backfilled
55    external_table: ExternalStorageTable,
56
57    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
58    upstream: Executor,
59
60    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
61    output_indices: Vec<usize>,
62
63    /// The schema of output chunk, including additional columns if any
64    output_columns: Vec<ColumnDesc>,
65
66    /// Rate limit in rows/s.
67    rate_limit_rps: Option<u32>,
68
69    options: CdcScanOptions,
70
71    state_table: StateTable<S>,
72
73    properties: BTreeMap<String, String>,
74
75    progress: Option<CdcProgressReporter>,
76}
77
78impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
79    #[allow(clippy::too_many_arguments)]
80    pub fn new(
81        actor_ctx: ActorContextRef,
82        external_table: ExternalStorageTable,
83        upstream: Executor,
84        output_indices: Vec<usize>,
85        output_columns: Vec<ColumnDesc>,
86        _metrics: Arc<StreamingMetrics>,
87        state_table: StateTable<S>,
88        rate_limit_rps: Option<u32>,
89        options: CdcScanOptions,
90        properties: BTreeMap<String, String>,
91        progress: Option<CdcProgressReporter>,
92    ) -> Self {
93        Self {
94            actor_ctx,
95            external_table,
96            upstream,
97            output_indices,
98            output_columns,
99            rate_limit_rps,
100            options,
101            state_table,
102            properties,
103            progress,
104        }
105    }
106
107    #[try_stream(ok = Message, error = StreamExecutorError)]
108    async fn execute_inner(mut self) {
109        assert!(!self.options.disable_backfill);
110        // The indices to primary key columns
111        let pk_indices = self.external_table.pk_indices().to_vec();
112        let table_id = self.external_table.table_id().table_id;
113        let upstream_table_name = self.external_table.qualified_table_name();
114        let schema_table_name = self.external_table.schema_table_name().clone();
115        let external_database_name = self.external_table.database_name().to_owned();
116        let additional_columns = self
117            .output_columns
118            .iter()
119            .filter(|col| col.additional_column.column_type.is_some())
120            .cloned()
121            .collect_vec();
122        assert!(
123            (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
124            "split pk column index {} out of bound",
125            self.options.backfill_split_pk_column_index
126        );
127        let snapshot_split_column_index =
128            pk_indices[self.options.backfill_split_pk_column_index as usize];
129        let cdc_table_snapshot_split_column =
130            vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
131
132        let mut upstream = self.upstream.execute();
133        // Poll the upstream to get the first barrier.
134        let first_barrier = expect_first_barrier(&mut upstream).await?;
135        // Make sure to use mapping_message after transform_upstream.
136
137        // If user sets debezium.time.precision.mode to "connect", it means the user can guarantee
138        // that the upstream data precision is MilliSecond. In this case, we don't use GuessNumberUnit
139        // mode to guess precision, but use Milli mode directly, which can handle extreme timestamps.
140        let timestamp_handling: Option<TimestampHandling> = self
141            .properties
142            .get("debezium.time.precision.mode")
143            .map(|v| v == "connect")
144            .unwrap_or(false)
145            .then_some(TimestampHandling::Milli);
146        let timestamptz_handling: Option<TimestamptzHandling> = self
147            .properties
148            .get("debezium.time.precision.mode")
149            .map(|v| v == "connect")
150            .unwrap_or(false)
151            .then_some(TimestamptzHandling::Milli);
152        let time_handling: Option<TimeHandling> = self
153            .properties
154            .get("debezium.time.precision.mode")
155            .map(|v| v == "connect")
156            .unwrap_or(false)
157            .then_some(TimeHandling::Milli);
158        let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
159            .properties
160            .get("debezium.bigint.unsigned.handling.mode")
161            .map(|v| v == "precise")
162            .unwrap_or(false)
163            .then_some(BigintUnsignedHandlingMode::Precise);
164        // Only postgres-cdc connector may trigger TOAST.
165        let handle_toast_columns: bool =
166            self.external_table.table_type() == &ExternalCdcTableType::Postgres;
167        let mut upstream = transform_upstream(
168            upstream,
169            self.output_columns.clone(),
170            timestamp_handling,
171            timestamptz_handling,
172            time_handling,
173            bigint_unsigned_handling,
174            handle_toast_columns,
175        )
176        .boxed();
177        let mut next_reset_barrier = Some(first_barrier);
178        let mut is_reset = false;
179        let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
180        // The buffered chunks have already been mapped.
181        let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
182
183        // Need reset on CDC table snapshot splits reschedule.
184        'with_cdc_table_snapshot_splits: loop {
185            assert!(upstream_chunk_buffer.is_empty());
186            let reset_barrier = next_reset_barrier.take().unwrap();
187            let (all_snapshot_splits, generation) = match reset_barrier.mutation.as_deref() {
188                Some(Mutation::Add(add)) => (
189                    &add.actor_cdc_table_snapshot_splits.splits,
190                    add.actor_cdc_table_snapshot_splits.generation,
191                ),
192                Some(Mutation::Update(update)) => (
193                    &update.actor_cdc_table_snapshot_splits.splits,
194                    update.actor_cdc_table_snapshot_splits.generation,
195                ),
196                _ => {
197                    return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
198                }
199            };
200            let mut actor_snapshot_splits = vec![];
201            // TODO(zw): optimization: remove consumed splits to reduce barrier size for downstream.
202            if let Some(splits) = all_snapshot_splits.get(&self.actor_ctx.id) {
203                actor_snapshot_splits = splits
204                    .iter()
205                    .map(|s: &CdcTableSnapshotSplitRaw| {
206                        let de = RowDeserializer::new(
207                            cdc_table_snapshot_split_column
208                                .iter()
209                                .map(Field::data_type)
210                                .collect_vec(),
211                        );
212                        let left_bound_inclusive =
213                            de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
214                        let right_bound_exclusive =
215                            de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
216                        CdcTableSnapshotSplit {
217                            split_id: s.split_id,
218                            left_bound_inclusive,
219                            right_bound_exclusive,
220                        }
221                    })
222                    .collect();
223            }
224            tracing::debug!(?actor_snapshot_splits, "actor splits");
225            assert_consecutive_splits(&actor_snapshot_splits);
226
227            let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
228            let barrier_epoch = reset_barrier.epoch;
229            yield Message::Barrier(reset_barrier);
230            if !is_reset {
231                state_impl.init_epoch(barrier_epoch).await?;
232                is_reset = true;
233                tracing::info!(table_id, "Initialize executor.");
234            } else {
235                tracing::info!(table_id, "Reset executor.");
236            }
237
238            let mut current_actor_bounds = None;
239            let mut actor_cdc_offset_high: Option<CdcOffset> = None;
240            let mut actor_cdc_offset_low: Option<CdcOffset> = None;
241            // Find next split that need backfill.
242            let mut next_split_idx = actor_snapshot_splits.len();
243            for (idx, split) in actor_snapshot_splits.iter().enumerate() {
244                let state = state_impl.restore_state(split.split_id).await?;
245                if !state.is_finished {
246                    next_split_idx = idx;
247                    break;
248                }
249                extends_current_actor_bound(&mut current_actor_bounds, split);
250                if let Some(ref cdc_offset) = state.cdc_offset_low {
251                    if let Some(ref cur) = actor_cdc_offset_low {
252                        if *cur > *cdc_offset {
253                            actor_cdc_offset_low = state.cdc_offset_low.clone();
254                        }
255                    } else {
256                        actor_cdc_offset_low = state.cdc_offset_low.clone();
257                    }
258                }
259                if let Some(ref cdc_offset) = state.cdc_offset_high {
260                    if let Some(ref cur) = actor_cdc_offset_high {
261                        if *cur < *cdc_offset {
262                            actor_cdc_offset_high = state.cdc_offset_high.clone();
263                        }
264                    } else {
265                        actor_cdc_offset_high = state.cdc_offset_high.clone();
266                    }
267                }
268            }
269            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
270                // Initialize state so that overall progress can be measured.
271                state_impl
272                    .mutate_state(split.split_id, false, 0, None, None)
273                    .await?;
274            }
275            let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
276                Some((
277                    actor_snapshot_splits[0].split_id,
278                    actor_snapshot_splits[next_split_idx - 1].split_id,
279                ))
280            } else {
281                None
282            };
283
284            // After init the state table and forward the initial barrier to downstream,
285            // we now try to create the table reader with retry.
286            let mut table_reader: Option<ExternalTableReaderImpl> = None;
287            let external_table = self.external_table.clone();
288            let mut future = Box::pin(async move {
289                let backoff = get_infinite_backoff_strategy();
290                tokio_retry::Retry::spawn(backoff, || async {
291                    match external_table.create_table_reader().await {
292                        Ok(reader) => Ok(reader),
293                        Err(e) => {
294                            tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
295                            Err(e)
296                        }
297                    }
298                })
299                    .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
300                    .await
301                    .expect("Retry create cdc table reader until success.")
302            });
303            loop {
304                if let Some(msg) =
305                    build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
306                        .await?
307                {
308                    if let Some(msg) = mapping_message(msg, &self.output_indices) {
309                        match msg {
310                            Message::Barrier(barrier) => {
311                                state_impl.commit_state(barrier.epoch).await?;
312                                if is_reset_barrier(&barrier, self.actor_ctx.id) {
313                                    next_reset_barrier = Some(barrier);
314                                    continue 'with_cdc_table_snapshot_splits;
315                                }
316                                yield Message::Barrier(barrier);
317                            }
318                            Message::Chunk(chunk) => {
319                                if chunk.cardinality() == 0 {
320                                    continue;
321                                }
322                                if let Some(filtered_chunk) = filter_stream_chunk(
323                                    chunk,
324                                    &current_actor_bounds,
325                                    snapshot_split_column_index,
326                                ) && filtered_chunk.cardinality() > 0
327                                {
328                                    yield Message::Chunk(filtered_chunk);
329                                }
330                            }
331                            Message::Watermark(_) => {
332                                // Ignore watermark, like the `CdcBackfillExecutor`.
333                            }
334                        }
335                    }
336                } else {
337                    assert!(table_reader.is_some(), "table reader must created");
338                    tracing::info!(
339                        table_id,
340                        upstream_table_name,
341                        "table reader created successfully"
342                    );
343                    break;
344                }
345            }
346            let upstream_table_reader = UpstreamTableReader::new(
347                self.external_table.clone(),
348                table_reader.expect("table reader must created"),
349            );
350            // let mut upstream = upstream.peekable();
351            let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
352
353            // Backfill snapshot splits sequentially.
354            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
355                tracing::info!(
356                    table_id,
357                    upstream_table_name,
358                    ?split,
359                    is_snapshot_paused,
360                    "start cdc backfill split"
361                );
362                extends_current_actor_bound(&mut current_actor_bounds, split);
363
364                let split_cdc_offset_low = {
365                    // Limit concurrent CDC connections globally to 10 using a semaphore.
366                    static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
367                        tokio::sync::Semaphore::const_new(10);
368
369                    let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
370                    upstream_table_reader.current_cdc_offset().await?
371                };
372                if let Some(ref cdc_offset) = split_cdc_offset_low {
373                    if let Some(ref cur) = actor_cdc_offset_low {
374                        if *cur > *cdc_offset {
375                            actor_cdc_offset_low = split_cdc_offset_low.clone();
376                        }
377                    } else {
378                        actor_cdc_offset_low = split_cdc_offset_low.clone();
379                    }
380                }
381                let mut split_cdc_offset_high = None;
382
383                let left_upstream = upstream.by_ref().map(Either::Left);
384                let read_args = SplitSnapshotReadArgs::new(
385                    split.left_bound_inclusive.clone(),
386                    split.right_bound_exclusive.clone(),
387                    cdc_table_snapshot_split_column.clone(),
388                    self.rate_limit_rps,
389                    additional_columns.clone(),
390                    schema_table_name.clone(),
391                    external_database_name.clone(),
392                );
393                let right_snapshot = pin!(
394                    upstream_table_reader
395                        .snapshot_read_table_split(read_args)
396                        .map(Either::Right)
397                );
398                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
399                if is_snapshot_paused {
400                    snapshot_valve.pause();
401                }
402                let mut backfill_stream =
403                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
404                        stream::PollNext::Left
405                    });
406                let mut row_count: u64 = 0;
407                #[for_await]
408                for either in &mut backfill_stream {
409                    match either {
410                        // Upstream
411                        Either::Left(msg) => {
412                            match msg? {
413                                Message::Barrier(barrier) => {
414                                    state_impl.commit_state(barrier.epoch).await?;
415                                    if let Some(mutation) = barrier.mutation.as_deref() {
416                                        use crate::executor::Mutation;
417                                        match mutation {
418                                            Mutation::Pause => {
419                                                is_snapshot_paused = true;
420                                                snapshot_valve.pause();
421                                            }
422                                            Mutation::Resume => {
423                                                is_snapshot_paused = false;
424                                                snapshot_valve.resume();
425                                            }
426                                            Mutation::Throttle(some) => {
427                                                // TODO(zw): optimization: improve throttle.
428                                                // 1. Handle rate limit 0. Currently, to resume the process, the actor must be rebuilt.
429                                                // 2. Apply new rate limit immediately.
430                                                if let Some(new_rate_limit) =
431                                                    some.get(&self.actor_ctx.id)
432                                                    && *new_rate_limit != self.rate_limit_rps
433                                                {
434                                                    // The new rate limit will take effect since next split.
435                                                    self.rate_limit_rps = *new_rate_limit;
436                                                }
437                                            }
438                                            Mutation::Update(UpdateMutation {
439                                                dropped_actors,
440                                                ..
441                                            }) => {
442                                                if dropped_actors.contains(&self.actor_ctx.id) {
443                                                    tracing::info!(
444                                                        table_id,
445                                                        upstream_table_name,
446                                                        "CdcBackfill has been dropped due to config change"
447                                                    );
448                                                    for chunk in upstream_chunk_buffer.drain(..) {
449                                                        yield Message::Chunk(chunk);
450                                                    }
451                                                    yield Message::Barrier(barrier);
452                                                    let () = futures::future::pending().await;
453                                                    unreachable!();
454                                                }
455                                            }
456                                            _ => (),
457                                        }
458                                    }
459                                    if is_reset_barrier(&barrier, self.actor_ctx.id) {
460                                        next_reset_barrier = Some(barrier);
461                                        for chunk in upstream_chunk_buffer.drain(..) {
462                                            yield Message::Chunk(chunk);
463                                        }
464                                        continue 'with_cdc_table_snapshot_splits;
465                                    }
466                                    if let Some(split_range) =
467                                        should_report_actor_backfill_progress.take()
468                                        && let Some(ref progress) = self.progress
469                                    {
470                                        progress.update(
471                                            self.actor_ctx.fragment_id,
472                                            self.actor_ctx.id,
473                                            barrier.epoch,
474                                            generation,
475                                            split_range,
476                                        );
477                                    }
478                                    // emit barrier and continue to consume the backfill stream
479                                    yield Message::Barrier(barrier);
480                                }
481                                Message::Chunk(chunk) => {
482                                    // skip empty upstream chunk
483                                    if chunk.cardinality() == 0 {
484                                        continue;
485                                    }
486
487                                    // TODO(zw): re-enable
488                                    // let chunk_cdc_offset =
489                                    //     get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
490                                    // if *self.external_table.table_type()
491                                    //     == ExternalCdcTableType::Postgres
492                                    //     && let Some(cur) = actor_cdc_offset_low.as_ref()
493                                    //     && let Some(chunk_offset) = chunk_cdc_offset
494                                    //     && chunk_offset < *cur
495                                    // {
496                                    //     continue;
497                                    // }
498
499                                    let chunk = mapping_chunk(chunk, &self.output_indices);
500                                    if let Some(filtered_chunk) = filter_stream_chunk(
501                                        chunk,
502                                        &current_actor_bounds,
503                                        snapshot_split_column_index,
504                                    ) && filtered_chunk.cardinality() > 0
505                                    {
506                                        // Buffer the upstream chunk.
507                                        upstream_chunk_buffer.push(filtered_chunk.compact_vis());
508                                    }
509                                }
510                                Message::Watermark(_) => {
511                                    // Ignore watermark during backfill, like the `CdcBackfillExecutor`.
512                                }
513                            }
514                        }
515                        // Snapshot read
516                        Either::Right(msg) => {
517                            match msg? {
518                                None => {
519                                    tracing::info!(
520                                        table_id,
521                                        split_id = split.split_id,
522                                        "snapshot read stream ends"
523                                    );
524                                    for chunk in upstream_chunk_buffer.drain(..) {
525                                        yield Message::Chunk(chunk);
526                                    }
527
528                                    split_cdc_offset_high = {
529                                        // Limit concurrent CDC connections globally to 10 using a semaphore.
530                                        static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
531                                            tokio::sync::Semaphore::const_new(10);
532
533                                        let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
534                                        upstream_table_reader.current_cdc_offset().await?
535                                    };
536                                    if let Some(ref cdc_offset) = split_cdc_offset_high {
537                                        if let Some(ref cur) = actor_cdc_offset_high {
538                                            if *cur < *cdc_offset {
539                                                actor_cdc_offset_high =
540                                                    split_cdc_offset_high.clone();
541                                            }
542                                        } else {
543                                            actor_cdc_offset_high = split_cdc_offset_high.clone();
544                                        }
545                                    }
546                                    // Next split.
547                                    break;
548                                }
549                                Some(chunk) => {
550                                    let chunk_cardinality = chunk.cardinality() as u64;
551                                    row_count = row_count.saturating_add(chunk_cardinality);
552                                    yield Message::Chunk(mapping_chunk(
553                                        chunk,
554                                        &self.output_indices,
555                                    ));
556                                }
557                            }
558                        }
559                    }
560                }
561                // Mark current split backfill as finished. The state will be persisted by next barrier.
562                state_impl
563                    .mutate_state(
564                        split.split_id,
565                        true,
566                        row_count,
567                        split_cdc_offset_low,
568                        split_cdc_offset_high,
569                    )
570                    .await?;
571                if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
572                    assert!(
573                        *right_split < split.split_id,
574                        "{} {}",
575                        *right_split,
576                        split.split_id
577                    );
578                    *right_split = split.split_id;
579                } else {
580                    should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
581                }
582            }
583
584            upstream_table_reader.disconnect().await?;
585            tracing::info!(
586                table_id,
587                upstream_table_name,
588                "CdcBackfill has already finished and will forward messages directly to the downstream"
589            );
590
591            let mut should_report_actor_backfill_done = false;
592            // After backfill progress finished
593            // we can forward messages directly to the downstream,
594            // as backfill is finished.
595            #[for_await]
596            for msg in &mut upstream {
597                let msg = msg?;
598                match msg {
599                    Message::Barrier(barrier) => {
600                        state_impl.commit_state(barrier.epoch).await?;
601                        if is_reset_barrier(&barrier, self.actor_ctx.id) {
602                            next_reset_barrier = Some(barrier);
603                            continue 'with_cdc_table_snapshot_splits;
604                        }
605                        if let Some(split_range) = should_report_actor_backfill_progress.take()
606                            && let Some(ref progress) = self.progress
607                        {
608                            progress.update(
609                                self.actor_ctx.fragment_id,
610                                self.actor_ctx.id,
611                                barrier.epoch,
612                                generation,
613                                split_range,
614                            );
615                        }
616                        if should_report_actor_backfill_done {
617                            should_report_actor_backfill_done = false;
618                            assert!(!actor_snapshot_splits.is_empty());
619                            if let Some(ref progress) = self.progress {
620                                progress.finish(
621                                    self.actor_ctx.fragment_id,
622                                    self.actor_ctx.id,
623                                    barrier.epoch,
624                                    generation,
625                                    (
626                                        actor_snapshot_splits[0].split_id,
627                                        actor_snapshot_splits[actor_snapshot_splits.len() - 1]
628                                            .split_id,
629                                    ),
630                                );
631                            }
632                        }
633                        yield Message::Barrier(barrier);
634                    }
635                    Message::Chunk(chunk) => {
636                        if actor_snapshot_splits.is_empty() {
637                            continue;
638                        }
639                        if chunk.cardinality() == 0 {
640                            continue;
641                        }
642
643                        let chunk_cdc_offset =
644                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
645                        // // TODO(zw): re-enable
646                        // if *self.external_table.table_type() == ExternalCdcTableType::Postgres
647                        //     && let Some(cur) = actor_cdc_offset_low.as_ref()
648                        //     && let Some(ref chunk_offset) = chunk_cdc_offset
649                        //     && *chunk_offset < *cur
650                        // {
651                        //     continue;
652                        // }
653
654                        // should_report_actor_backfill_done is set to true at most once.
655                        if let Some(high) = actor_cdc_offset_high.as_ref() {
656                            if state_impl.is_legacy_state() {
657                                // Since the legacy state does not track CDC offsets, report backfill completion immediately.
658                                actor_cdc_offset_high = None;
659                                should_report_actor_backfill_done = true;
660                            } else if let Some(ref chunk_offset) = chunk_cdc_offset
661                                && *chunk_offset >= *high
662                            {
663                                // Report backfill completion once the latest CDC offset exceeds the highest offset tracked during the backfill.
664                                actor_cdc_offset_high = None;
665                                should_report_actor_backfill_done = true;
666                            }
667                        }
668                        let chunk = mapping_chunk(chunk, &self.output_indices);
669                        if let Some(filtered_chunk) = filter_stream_chunk(
670                            chunk,
671                            &current_actor_bounds,
672                            snapshot_split_column_index,
673                        ) && filtered_chunk.cardinality() > 0
674                        {
675                            yield Message::Chunk(filtered_chunk);
676                        }
677                    }
678                    msg @ Message::Watermark(_) => {
679                        if let Some(msg) = mapping_message(msg, &self.output_indices) {
680                            yield msg;
681                        }
682                    }
683                }
684            }
685        }
686    }
687}
688
689fn filter_stream_chunk(
690    chunk: StreamChunk,
691    bound: &Option<(OwnedRow, OwnedRow)>,
692    snapshot_split_column_index: usize,
693) -> Option<StreamChunk> {
694    let Some((left, right)) = bound else {
695        return None;
696    };
697    assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
698    assert_eq!(
699        right.len(),
700        1,
701        "multiple split columns is not supported yet"
702    );
703    let left_split_key = left.datum_at(0);
704    let right_split_key = right.datum_at(0);
705    let is_leftmost_bound = is_leftmost_bound(left);
706    let is_rightmost_bound = is_rightmost_bound(right);
707    if is_leftmost_bound && is_rightmost_bound {
708        return Some(chunk);
709    }
710    let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
711    let (ops, columns, visibility) = chunk.into_inner();
712    for (row_split_key, v) in columns[snapshot_split_column_index]
713        .iter()
714        .zip_eq_fast(visibility.iter())
715    {
716        if !v {
717            new_bitmap.append(false);
718            continue;
719        }
720        let mut is_in_range = true;
721        if !is_leftmost_bound {
722            is_in_range = cmp_datum(
723                row_split_key,
724                left_split_key,
725                OrderType::ascending_nulls_first(),
726            )
727            .is_ge();
728        }
729        if is_in_range && !is_rightmost_bound {
730            is_in_range = cmp_datum(
731                row_split_key,
732                right_split_key,
733                OrderType::ascending_nulls_first(),
734            )
735            .is_lt();
736        }
737        if !is_in_range {
738            tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
739        }
740        new_bitmap.append(is_in_range);
741    }
742    Some(StreamChunk::with_visibility(
743        ops,
744        columns,
745        new_bitmap.finish(),
746    ))
747}
748
749fn is_leftmost_bound(row: &OwnedRow) -> bool {
750    row.iter().all(|d| d.is_none())
751}
752
753fn is_rightmost_bound(row: &OwnedRow) -> bool {
754    row.iter().all(|d| d.is_none())
755}
756
757impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
758    fn execute(self: Box<Self>) -> BoxedMessageStream {
759        self.execute_inner().boxed()
760    }
761}
762
763fn extends_current_actor_bound(
764    current: &mut Option<(OwnedRow, OwnedRow)>,
765    split: &CdcTableSnapshotSplit,
766) {
767    if current.is_none() {
768        *current = Some((
769            split.left_bound_inclusive.clone(),
770            split.right_bound_exclusive.clone(),
771        ));
772    } else {
773        current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
774    }
775}
776
777fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
778    match barrier.mutation.as_deref() {
779        Some(Mutation::Update(update)) => update
780            .actor_cdc_table_snapshot_splits
781            .splits
782            .contains_key(&actor_id),
783        _ => false,
784    }
785}
786
787fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
788    for i in 1..actor_snapshot_splits.len() {
789        assert_eq!(
790            actor_snapshot_splits[i].split_id,
791            actor_snapshot_splits[i - 1].split_id + 1,
792            "{:?}",
793            actor_snapshot_splits
794        );
795        assert!(
796            cmp_datum(
797                actor_snapshot_splits[i - 1]
798                    .right_bound_exclusive
799                    .datum_at(0),
800                actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
801                OrderType::ascending_nulls_last(),
802            )
803            .is_lt()
804        );
805    }
806}
807
808#[cfg(test)]
809mod tests {
810    use risingwave_common::array::StreamChunk;
811    use risingwave_common::row::OwnedRow;
812    use risingwave_common::types::ScalarImpl;
813
814    use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
815
816    #[test]
817    fn test_filter_stream_chunk() {
818        use risingwave_common::array::StreamChunkTestExt;
819        let chunk = StreamChunk::from_pretty(
820            "  I I
821             + 1 6
822             - 2 .
823            U- 3 7
824            U+ 4 .",
825        );
826        let bound = None;
827        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
828        assert!(c.is_none());
829
830        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
831        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
832        assert_eq!(c.unwrap().compact_vis(), chunk);
833
834        let bound = Some((
835            OwnedRow::new(vec![None]),
836            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
837        ));
838        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
839        assert_eq!(
840            c.unwrap().compact_vis(),
841            StreamChunk::from_pretty(
842                "  I I
843             + 1 6
844             - 2 .",
845            )
846        );
847
848        let bound = Some((
849            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
850            OwnedRow::new(vec![None]),
851        ));
852        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
853        assert_eq!(
854            c.unwrap().compact_vis(),
855            StreamChunk::from_pretty(
856                "  I I
857            U- 3 7
858            U+ 4 .",
859            )
860        );
861
862        let bound = Some((
863            OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
864            OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
865        ));
866        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
867        assert_eq!(
868            c.unwrap().compact_vis(),
869            StreamChunk::from_pretty(
870                "  I I
871             - 2 .
872            U- 3 7",
873            )
874        );
875
876        // Test NULL value.
877        let bound = None;
878        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
879        assert!(c.is_none());
880
881        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
882        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
883        assert_eq!(c.unwrap().compact_vis(), chunk);
884
885        let bound = Some((
886            OwnedRow::new(vec![None]),
887            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
888        ));
889        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
890        assert_eq!(
891            c.unwrap().compact_vis(),
892            StreamChunk::from_pretty(
893                "  I I
894             + 1 6
895             - 2 .
896            U+ 4 .",
897            )
898        );
899
900        let bound = Some((
901            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
902            OwnedRow::new(vec![None]),
903        ));
904        let c = filter_stream_chunk(chunk, &bound, 1);
905        assert_eq!(
906            c.unwrap().compact_vis(),
907            StreamChunk::from_pretty(
908                "  I I
909            U- 3 7",
910            )
911        );
912    }
913}