risingwave_stream/executor/backfill/cdc/
cdc_backill_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{TimeHandling, TimestampHandling, TimestamptzHandling};
27use risingwave_connector::source::cdc::CdcScanOptions;
28use risingwave_connector::source::cdc::external::{
29    CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
30};
31use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
32use rw_futures_util::pausable;
33use thiserror_ext::AsReport;
34use tracing::Instrument;
35
36use crate::executor::UpdateMutation;
37use crate::executor::backfill::cdc::cdc_backfill::{
38    build_reader_and_poll_upstream, transform_upstream,
39};
40use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
41use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
42use crate::executor::backfill::cdc::upstream_table::snapshot::{
43    SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
44};
45use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
46use crate::executor::prelude::*;
47use crate::executor::source::get_infinite_backoff_strategy;
48use crate::task::cdc_progress::CdcProgressReporter;
49pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
50    actor_ctx: ActorContextRef,
51
52    /// The external table to be backfilled
53    external_table: ExternalStorageTable,
54
55    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
56    upstream: Executor,
57
58    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
59    output_indices: Vec<usize>,
60
61    /// The schema of output chunk, including additional columns if any
62    output_columns: Vec<ColumnDesc>,
63
64    /// Rate limit in rows/s.
65    rate_limit_rps: Option<u32>,
66
67    options: CdcScanOptions,
68
69    state_table: StateTable<S>,
70
71    properties: BTreeMap<String, String>,
72
73    progress: Option<CdcProgressReporter>,
74}
75
76impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
77    #[allow(clippy::too_many_arguments)]
78    pub fn new(
79        actor_ctx: ActorContextRef,
80        external_table: ExternalStorageTable,
81        upstream: Executor,
82        output_indices: Vec<usize>,
83        output_columns: Vec<ColumnDesc>,
84        _metrics: Arc<StreamingMetrics>,
85        state_table: StateTable<S>,
86        rate_limit_rps: Option<u32>,
87        options: CdcScanOptions,
88        properties: BTreeMap<String, String>,
89        progress: Option<CdcProgressReporter>,
90    ) -> Self {
91        Self {
92            actor_ctx,
93            external_table,
94            upstream,
95            output_indices,
96            output_columns,
97            rate_limit_rps,
98            options,
99            state_table,
100            properties,
101            progress,
102        }
103    }
104
105    #[try_stream(ok = Message, error = StreamExecutorError)]
106    async fn execute_inner(mut self) {
107        assert!(!self.options.disable_backfill);
108        // The indices to primary key columns
109        let pk_indices = self.external_table.pk_indices().to_vec();
110        let table_id = self.external_table.table_id().table_id;
111        let upstream_table_name = self.external_table.qualified_table_name();
112        let schema_table_name = self.external_table.schema_table_name().clone();
113        let external_database_name = self.external_table.database_name().to_owned();
114        let additional_columns = self
115            .output_columns
116            .iter()
117            .filter(|col| col.additional_column.column_type.is_some())
118            .cloned()
119            .collect_vec();
120        assert!(
121            (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
122            "split pk column index {} out of bound",
123            self.options.backfill_split_pk_column_index
124        );
125        let snapshot_split_column_index =
126            pk_indices[self.options.backfill_split_pk_column_index as usize];
127        let cdc_table_snapshot_split_column =
128            vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
129
130        let mut upstream = self.upstream.execute();
131        // Poll the upstream to get the first barrier.
132        let first_barrier = expect_first_barrier(&mut upstream).await?;
133        // Make sure to use mapping_message after transform_upstream.
134
135        // If user sets debezium.time.precision.mode to "connect", it means the user can guarantee
136        // that the upstream data precision is MilliSecond. In this case, we don't use GuessNumberUnit
137        // mode to guess precision, but use Milli mode directly, which can handle extreme timestamps.
138        let timestamp_handling: Option<TimestampHandling> = self
139            .properties
140            .get("debezium.time.precision.mode")
141            .map(|v| v == "connect")
142            .unwrap_or(false)
143            .then_some(TimestampHandling::Milli);
144        let timestamptz_handling: Option<TimestamptzHandling> = self
145            .properties
146            .get("debezium.time.precision.mode")
147            .map(|v| v == "connect")
148            .unwrap_or(false)
149            .then_some(TimestamptzHandling::Milli);
150        let time_handling: Option<TimeHandling> = self
151            .properties
152            .get("debezium.time.precision.mode")
153            .map(|v| v == "connect")
154            .unwrap_or(false)
155            .then_some(TimeHandling::Milli);
156        // Only postgres-cdc connector may trigger TOAST.
157        let handle_toast_columns: bool =
158            self.external_table.table_type() == &ExternalCdcTableType::Postgres;
159        let mut upstream = transform_upstream(
160            upstream,
161            self.output_columns.clone(),
162            timestamp_handling,
163            timestamptz_handling,
164            time_handling,
165            handle_toast_columns,
166        )
167        .boxed();
168        let mut next_reset_barrier = Some(first_barrier);
169        let mut is_reset = false;
170        let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
171        // The buffered chunks have already been mapped.
172        let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
173
174        // Need reset on CDC table snapshot splits reschedule.
175        'with_cdc_table_snapshot_splits: loop {
176            assert!(upstream_chunk_buffer.is_empty());
177            let reset_barrier = next_reset_barrier.take().unwrap();
178            let (all_snapshot_splits, generation) = match reset_barrier.mutation.as_deref() {
179                Some(Mutation::Add(add)) => (
180                    &add.actor_cdc_table_snapshot_splits.splits,
181                    add.actor_cdc_table_snapshot_splits.generation,
182                ),
183                Some(Mutation::Update(update)) => (
184                    &update.actor_cdc_table_snapshot_splits.splits,
185                    update.actor_cdc_table_snapshot_splits.generation,
186                ),
187                _ => {
188                    return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
189                }
190            };
191            let mut actor_snapshot_splits = vec![];
192            // TODO(zw): optimization: remove consumed splits to reduce barrier size for downstream.
193            if let Some(splits) = all_snapshot_splits.get(&self.actor_ctx.id) {
194                actor_snapshot_splits = splits
195                    .iter()
196                    .map(|s: &CdcTableSnapshotSplitRaw| {
197                        let de = RowDeserializer::new(
198                            cdc_table_snapshot_split_column
199                                .iter()
200                                .map(Field::data_type)
201                                .collect_vec(),
202                        );
203                        let left_bound_inclusive =
204                            de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
205                        let right_bound_exclusive =
206                            de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
207                        CdcTableSnapshotSplit {
208                            split_id: s.split_id,
209                            left_bound_inclusive,
210                            right_bound_exclusive,
211                        }
212                    })
213                    .collect();
214            }
215            tracing::debug!(?actor_snapshot_splits, "actor splits");
216            assert_consecutive_splits(&actor_snapshot_splits);
217
218            let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
219            let barrier_epoch = reset_barrier.epoch;
220            yield Message::Barrier(reset_barrier);
221            if !is_reset {
222                state_impl.init_epoch(barrier_epoch).await?;
223                is_reset = true;
224                tracing::info!(table_id, "Initialize executor.");
225            } else {
226                tracing::info!(table_id, "Reset executor.");
227            }
228
229            let mut current_actor_bounds = None;
230            let mut actor_cdc_offset_high: Option<CdcOffset> = None;
231            let mut actor_cdc_offset_low: Option<CdcOffset> = None;
232            // Find next split that need backfill.
233            let mut next_split_idx = actor_snapshot_splits.len();
234            for (idx, split) in actor_snapshot_splits.iter().enumerate() {
235                let state = state_impl.restore_state(split.split_id).await?;
236                if !state.is_finished {
237                    next_split_idx = idx;
238                    break;
239                }
240                extends_current_actor_bound(&mut current_actor_bounds, split);
241                if let Some(ref cdc_offset) = state.cdc_offset_low {
242                    if let Some(ref cur) = actor_cdc_offset_low {
243                        if *cur > *cdc_offset {
244                            actor_cdc_offset_low = state.cdc_offset_low.clone();
245                        }
246                    } else {
247                        actor_cdc_offset_low = state.cdc_offset_low.clone();
248                    }
249                }
250                if let Some(ref cdc_offset) = state.cdc_offset_high {
251                    if let Some(ref cur) = actor_cdc_offset_high {
252                        if *cur < *cdc_offset {
253                            actor_cdc_offset_high = state.cdc_offset_high.clone();
254                        }
255                    } else {
256                        actor_cdc_offset_high = state.cdc_offset_high.clone();
257                    }
258                }
259            }
260            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
261                // Initialize state so that overall progress can be measured.
262                state_impl
263                    .mutate_state(split.split_id, false, 0, None, None)
264                    .await?;
265            }
266            let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
267                Some((
268                    actor_snapshot_splits[0].split_id,
269                    actor_snapshot_splits[next_split_idx - 1].split_id,
270                ))
271            } else {
272                None
273            };
274
275            // After init the state table and forward the initial barrier to downstream,
276            // we now try to create the table reader with retry.
277            let mut table_reader: Option<ExternalTableReaderImpl> = None;
278            let external_table = self.external_table.clone();
279            let mut future = Box::pin(async move {
280                let backoff = get_infinite_backoff_strategy();
281                tokio_retry::Retry::spawn(backoff, || async {
282                    match external_table.create_table_reader().await {
283                        Ok(reader) => Ok(reader),
284                        Err(e) => {
285                            tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
286                            Err(e)
287                        }
288                    }
289                })
290                    .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
291                    .await
292                    .expect("Retry create cdc table reader until success.")
293            });
294            loop {
295                if let Some(msg) =
296                    build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
297                        .await?
298                {
299                    if let Some(msg) = mapping_message(msg, &self.output_indices) {
300                        match msg {
301                            Message::Barrier(barrier) => {
302                                state_impl.commit_state(barrier.epoch).await?;
303                                if is_reset_barrier(&barrier, self.actor_ctx.id) {
304                                    next_reset_barrier = Some(barrier);
305                                    continue 'with_cdc_table_snapshot_splits;
306                                }
307                                yield Message::Barrier(barrier);
308                            }
309                            Message::Chunk(chunk) => {
310                                if chunk.cardinality() == 0 {
311                                    continue;
312                                }
313                                if let Some(filtered_chunk) = filter_stream_chunk(
314                                    chunk,
315                                    &current_actor_bounds,
316                                    snapshot_split_column_index,
317                                ) && filtered_chunk.cardinality() > 0
318                                {
319                                    yield Message::Chunk(filtered_chunk);
320                                }
321                            }
322                            Message::Watermark(_) => {
323                                // Ignore watermark, like the `CdcBackfillExecutor`.
324                            }
325                        }
326                    }
327                } else {
328                    assert!(table_reader.is_some(), "table reader must created");
329                    tracing::info!(
330                        table_id,
331                        upstream_table_name,
332                        "table reader created successfully"
333                    );
334                    break;
335                }
336            }
337            let upstream_table_reader = UpstreamTableReader::new(
338                self.external_table.clone(),
339                table_reader.expect("table reader must created"),
340            );
341            // let mut upstream = upstream.peekable();
342            let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
343
344            // Backfill snapshot splits sequentially.
345            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
346                tracing::info!(
347                    table_id,
348                    upstream_table_name,
349                    ?split,
350                    is_snapshot_paused,
351                    "start cdc backfill split"
352                );
353                extends_current_actor_bound(&mut current_actor_bounds, split);
354
355                let split_cdc_offset_low = {
356                    // Limit concurrent CDC connections globally to 10 using a semaphore.
357                    static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
358                        tokio::sync::Semaphore::const_new(10);
359
360                    let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
361                    upstream_table_reader.current_cdc_offset().await?
362                };
363                if let Some(ref cdc_offset) = split_cdc_offset_low {
364                    if let Some(ref cur) = actor_cdc_offset_low {
365                        if *cur > *cdc_offset {
366                            actor_cdc_offset_low = split_cdc_offset_low.clone();
367                        }
368                    } else {
369                        actor_cdc_offset_low = split_cdc_offset_low.clone();
370                    }
371                }
372                let mut split_cdc_offset_high = None;
373
374                let left_upstream = upstream.by_ref().map(Either::Left);
375                let read_args = SplitSnapshotReadArgs::new(
376                    split.left_bound_inclusive.clone(),
377                    split.right_bound_exclusive.clone(),
378                    cdc_table_snapshot_split_column.clone(),
379                    self.rate_limit_rps,
380                    additional_columns.clone(),
381                    schema_table_name.clone(),
382                    external_database_name.clone(),
383                );
384                let right_snapshot = pin!(
385                    upstream_table_reader
386                        .snapshot_read_table_split(read_args)
387                        .map(Either::Right)
388                );
389                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
390                if is_snapshot_paused {
391                    snapshot_valve.pause();
392                }
393                let mut backfill_stream =
394                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
395                        stream::PollNext::Left
396                    });
397                let mut row_count: u64 = 0;
398                #[for_await]
399                for either in &mut backfill_stream {
400                    match either {
401                        // Upstream
402                        Either::Left(msg) => {
403                            match msg? {
404                                Message::Barrier(barrier) => {
405                                    state_impl.commit_state(barrier.epoch).await?;
406                                    if let Some(mutation) = barrier.mutation.as_deref() {
407                                        use crate::executor::Mutation;
408                                        match mutation {
409                                            Mutation::Pause => {
410                                                is_snapshot_paused = true;
411                                                snapshot_valve.pause();
412                                            }
413                                            Mutation::Resume => {
414                                                is_snapshot_paused = false;
415                                                snapshot_valve.resume();
416                                            }
417                                            Mutation::Throttle(some) => {
418                                                // TODO(zw): optimization: improve throttle.
419                                                // 1. Handle rate limit 0. Currently, to resume the process, the actor must be rebuilt.
420                                                // 2. Apply new rate limit immediately.
421                                                if let Some(new_rate_limit) =
422                                                    some.get(&self.actor_ctx.id)
423                                                    && *new_rate_limit != self.rate_limit_rps
424                                                {
425                                                    // The new rate limit will take effect since next split.
426                                                    self.rate_limit_rps = *new_rate_limit;
427                                                }
428                                            }
429                                            Mutation::Update(UpdateMutation {
430                                                dropped_actors,
431                                                ..
432                                            }) => {
433                                                if dropped_actors.contains(&self.actor_ctx.id) {
434                                                    tracing::info!(
435                                                        table_id,
436                                                        upstream_table_name,
437                                                        "CdcBackfill has been dropped due to config change"
438                                                    );
439                                                    for chunk in upstream_chunk_buffer.drain(..) {
440                                                        yield Message::Chunk(chunk);
441                                                    }
442                                                    yield Message::Barrier(barrier);
443                                                    let () = futures::future::pending().await;
444                                                    unreachable!();
445                                                }
446                                            }
447                                            _ => (),
448                                        }
449                                    }
450                                    if is_reset_barrier(&barrier, self.actor_ctx.id) {
451                                        next_reset_barrier = Some(barrier);
452                                        for chunk in upstream_chunk_buffer.drain(..) {
453                                            yield Message::Chunk(chunk);
454                                        }
455                                        continue 'with_cdc_table_snapshot_splits;
456                                    }
457                                    if let Some(split_range) =
458                                        should_report_actor_backfill_progress.take()
459                                        && let Some(ref progress) = self.progress
460                                    {
461                                        progress.update(
462                                            self.actor_ctx.fragment_id,
463                                            self.actor_ctx.id,
464                                            barrier.epoch,
465                                            generation,
466                                            split_range,
467                                        );
468                                    }
469                                    // emit barrier and continue to consume the backfill stream
470                                    yield Message::Barrier(barrier);
471                                }
472                                Message::Chunk(chunk) => {
473                                    // skip empty upstream chunk
474                                    if chunk.cardinality() == 0 {
475                                        continue;
476                                    }
477
478                                    // TODO(zw): re-enable
479                                    // let chunk_cdc_offset =
480                                    //     get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
481                                    // if *self.external_table.table_type()
482                                    //     == ExternalCdcTableType::Postgres
483                                    //     && let Some(cur) = actor_cdc_offset_low.as_ref()
484                                    //     && let Some(chunk_offset) = chunk_cdc_offset
485                                    //     && chunk_offset < *cur
486                                    // {
487                                    //     continue;
488                                    // }
489
490                                    let chunk = mapping_chunk(chunk, &self.output_indices);
491                                    if let Some(filtered_chunk) = filter_stream_chunk(
492                                        chunk,
493                                        &current_actor_bounds,
494                                        snapshot_split_column_index,
495                                    ) && filtered_chunk.cardinality() > 0
496                                    {
497                                        // Buffer the upstream chunk.
498                                        upstream_chunk_buffer.push(filtered_chunk.compact());
499                                    }
500                                }
501                                Message::Watermark(_) => {
502                                    // Ignore watermark during backfill, like the `CdcBackfillExecutor`.
503                                }
504                            }
505                        }
506                        // Snapshot read
507                        Either::Right(msg) => {
508                            match msg? {
509                                None => {
510                                    tracing::info!(
511                                        table_id,
512                                        split_id = split.split_id,
513                                        "snapshot read stream ends"
514                                    );
515                                    for chunk in upstream_chunk_buffer.drain(..) {
516                                        yield Message::Chunk(chunk);
517                                    }
518
519                                    split_cdc_offset_high = {
520                                        // Limit concurrent CDC connections globally to 10 using a semaphore.
521                                        static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
522                                            tokio::sync::Semaphore::const_new(10);
523
524                                        let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
525                                        upstream_table_reader.current_cdc_offset().await?
526                                    };
527                                    if let Some(ref cdc_offset) = split_cdc_offset_high {
528                                        if let Some(ref cur) = actor_cdc_offset_high {
529                                            if *cur < *cdc_offset {
530                                                actor_cdc_offset_high =
531                                                    split_cdc_offset_high.clone();
532                                            }
533                                        } else {
534                                            actor_cdc_offset_high = split_cdc_offset_high.clone();
535                                        }
536                                    }
537                                    // Next split.
538                                    break;
539                                }
540                                Some(chunk) => {
541                                    let chunk_cardinality = chunk.cardinality() as u64;
542                                    row_count = row_count.saturating_add(chunk_cardinality);
543                                    yield Message::Chunk(mapping_chunk(
544                                        chunk,
545                                        &self.output_indices,
546                                    ));
547                                }
548                            }
549                        }
550                    }
551                }
552                // Mark current split backfill as finished. The state will be persisted by next barrier.
553                state_impl
554                    .mutate_state(
555                        split.split_id,
556                        true,
557                        row_count,
558                        split_cdc_offset_low,
559                        split_cdc_offset_high,
560                    )
561                    .await?;
562                if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
563                    assert!(
564                        *right_split < split.split_id,
565                        "{} {}",
566                        *right_split,
567                        split.split_id
568                    );
569                    *right_split = split.split_id;
570                } else {
571                    should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
572                }
573            }
574
575            upstream_table_reader.disconnect().await?;
576            tracing::info!(
577                table_id,
578                upstream_table_name,
579                "CdcBackfill has already finished and will forward messages directly to the downstream"
580            );
581
582            let mut should_report_actor_backfill_done = false;
583            // After backfill progress finished
584            // we can forward messages directly to the downstream,
585            // as backfill is finished.
586            #[for_await]
587            for msg in &mut upstream {
588                let msg = msg?;
589                match msg {
590                    Message::Barrier(barrier) => {
591                        state_impl.commit_state(barrier.epoch).await?;
592                        if is_reset_barrier(&barrier, self.actor_ctx.id) {
593                            next_reset_barrier = Some(barrier);
594                            continue 'with_cdc_table_snapshot_splits;
595                        }
596                        if let Some(split_range) = should_report_actor_backfill_progress.take()
597                            && let Some(ref progress) = self.progress
598                        {
599                            progress.update(
600                                self.actor_ctx.fragment_id,
601                                self.actor_ctx.id,
602                                barrier.epoch,
603                                generation,
604                                split_range,
605                            );
606                        }
607                        if should_report_actor_backfill_done {
608                            should_report_actor_backfill_done = false;
609                            assert!(!actor_snapshot_splits.is_empty());
610                            if let Some(ref progress) = self.progress {
611                                progress.finish(
612                                    self.actor_ctx.fragment_id,
613                                    self.actor_ctx.id,
614                                    barrier.epoch,
615                                    generation,
616                                    (
617                                        actor_snapshot_splits[0].split_id,
618                                        actor_snapshot_splits[actor_snapshot_splits.len() - 1]
619                                            .split_id,
620                                    ),
621                                );
622                            }
623                        }
624                        yield Message::Barrier(barrier);
625                    }
626                    Message::Chunk(chunk) => {
627                        if actor_snapshot_splits.is_empty() {
628                            continue;
629                        }
630                        if chunk.cardinality() == 0 {
631                            continue;
632                        }
633
634                        let chunk_cdc_offset =
635                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
636                        // // TODO(zw): re-enable
637                        // if *self.external_table.table_type() == ExternalCdcTableType::Postgres
638                        //     && let Some(cur) = actor_cdc_offset_low.as_ref()
639                        //     && let Some(ref chunk_offset) = chunk_cdc_offset
640                        //     && *chunk_offset < *cur
641                        // {
642                        //     continue;
643                        // }
644
645                        // should_report_actor_backfill_done is set to true at most once.
646                        if let Some(high) = actor_cdc_offset_high.as_ref() {
647                            if state_impl.is_legacy_state() {
648                                // Since the legacy state does not track CDC offsets, report backfill completion immediately.
649                                actor_cdc_offset_high = None;
650                                should_report_actor_backfill_done = true;
651                            } else if let Some(ref chunk_offset) = chunk_cdc_offset
652                                && *chunk_offset >= *high
653                            {
654                                // Report backfill completion once the latest CDC offset exceeds the highest offset tracked during the backfill.
655                                actor_cdc_offset_high = None;
656                                should_report_actor_backfill_done = true;
657                            }
658                        }
659                        let chunk = mapping_chunk(chunk, &self.output_indices);
660                        if let Some(filtered_chunk) = filter_stream_chunk(
661                            chunk,
662                            &current_actor_bounds,
663                            snapshot_split_column_index,
664                        ) && filtered_chunk.cardinality() > 0
665                        {
666                            yield Message::Chunk(filtered_chunk);
667                        }
668                    }
669                    msg @ Message::Watermark(_) => {
670                        if let Some(msg) = mapping_message(msg, &self.output_indices) {
671                            yield msg;
672                        }
673                    }
674                }
675            }
676        }
677    }
678}
679
680fn filter_stream_chunk(
681    chunk: StreamChunk,
682    bound: &Option<(OwnedRow, OwnedRow)>,
683    snapshot_split_column_index: usize,
684) -> Option<StreamChunk> {
685    let Some((left, right)) = bound else {
686        return None;
687    };
688    assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
689    assert_eq!(
690        right.len(),
691        1,
692        "multiple split columns is not supported yet"
693    );
694    let left_split_key = left.datum_at(0);
695    let right_split_key = right.datum_at(0);
696    let is_leftmost_bound = is_leftmost_bound(left);
697    let is_rightmost_bound = is_rightmost_bound(right);
698    if is_leftmost_bound && is_rightmost_bound {
699        return Some(chunk);
700    }
701    let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
702    let (ops, columns, visibility) = chunk.into_inner();
703    for (row_split_key, v) in columns[snapshot_split_column_index]
704        .iter()
705        .zip_eq_fast(visibility.iter())
706    {
707        if !v {
708            new_bitmap.append(false);
709            continue;
710        }
711        let mut is_in_range = true;
712        if !is_leftmost_bound {
713            is_in_range = cmp_datum(
714                row_split_key,
715                left_split_key,
716                OrderType::ascending_nulls_first(),
717            )
718            .is_ge();
719        }
720        if is_in_range && !is_rightmost_bound {
721            is_in_range = cmp_datum(
722                row_split_key,
723                right_split_key,
724                OrderType::ascending_nulls_first(),
725            )
726            .is_lt();
727        }
728        if !is_in_range {
729            tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
730        }
731        new_bitmap.append(is_in_range);
732    }
733    Some(StreamChunk::with_visibility(
734        ops,
735        columns,
736        new_bitmap.finish(),
737    ))
738}
739
740fn is_leftmost_bound(row: &OwnedRow) -> bool {
741    row.iter().all(|d| d.is_none())
742}
743
744fn is_rightmost_bound(row: &OwnedRow) -> bool {
745    row.iter().all(|d| d.is_none())
746}
747
748impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
749    fn execute(self: Box<Self>) -> BoxedMessageStream {
750        self.execute_inner().boxed()
751    }
752}
753
754fn extends_current_actor_bound(
755    current: &mut Option<(OwnedRow, OwnedRow)>,
756    split: &CdcTableSnapshotSplit,
757) {
758    if current.is_none() {
759        *current = Some((
760            split.left_bound_inclusive.clone(),
761            split.right_bound_exclusive.clone(),
762        ));
763    } else {
764        current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
765    }
766}
767
768fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
769    match barrier.mutation.as_deref() {
770        Some(Mutation::Update(update)) => update
771            .actor_cdc_table_snapshot_splits
772            .splits
773            .contains_key(&actor_id),
774        _ => false,
775    }
776}
777
778fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
779    for i in 1..actor_snapshot_splits.len() {
780        assert_eq!(
781            actor_snapshot_splits[i].split_id,
782            actor_snapshot_splits[i - 1].split_id + 1,
783            "{:?}",
784            actor_snapshot_splits
785        );
786        assert!(
787            cmp_datum(
788                actor_snapshot_splits[i - 1]
789                    .right_bound_exclusive
790                    .datum_at(0),
791                actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
792                OrderType::ascending_nulls_last(),
793            )
794            .is_lt()
795        );
796    }
797}
798
799#[cfg(test)]
800mod tests {
801    use risingwave_common::array::StreamChunk;
802    use risingwave_common::row::OwnedRow;
803    use risingwave_common::types::ScalarImpl;
804
805    use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
806
807    #[test]
808    fn test_filter_stream_chunk() {
809        use risingwave_common::array::StreamChunkTestExt;
810        let chunk = StreamChunk::from_pretty(
811            "  I I
812             + 1 6
813             - 2 .
814            U- 3 7
815            U+ 4 .",
816        );
817        let bound = None;
818        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
819        assert!(c.is_none());
820
821        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
822        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
823        assert_eq!(c.unwrap().compact(), chunk);
824
825        let bound = Some((
826            OwnedRow::new(vec![None]),
827            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
828        ));
829        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
830        assert_eq!(
831            c.unwrap().compact(),
832            StreamChunk::from_pretty(
833                "  I I
834             + 1 6
835             - 2 .",
836            )
837        );
838
839        let bound = Some((
840            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
841            OwnedRow::new(vec![None]),
842        ));
843        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
844        assert_eq!(
845            c.unwrap().compact(),
846            StreamChunk::from_pretty(
847                "  I I
848            U- 3 7
849            U+ 4 .",
850            )
851        );
852
853        let bound = Some((
854            OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
855            OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
856        ));
857        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
858        assert_eq!(
859            c.unwrap().compact(),
860            StreamChunk::from_pretty(
861                "  I I
862             - 2 .
863            U- 3 7",
864            )
865        );
866
867        // Test NULL value.
868        let bound = None;
869        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
870        assert!(c.is_none());
871
872        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
873        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
874        assert_eq!(c.unwrap().compact(), chunk);
875
876        let bound = Some((
877            OwnedRow::new(vec![None]),
878            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
879        ));
880        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
881        assert_eq!(
882            c.unwrap().compact(),
883            StreamChunk::from_pretty(
884                "  I I
885             + 1 6
886             - 2 .
887            U+ 4 .",
888            )
889        );
890
891        let bound = Some((
892            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
893            OwnedRow::new(vec![None]),
894        ));
895        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
896        assert_eq!(
897            c.unwrap().compact(),
898            StreamChunk::from_pretty(
899                "  I I
900            U- 3 7",
901            )
902        );
903    }
904}