risingwave_stream/executor/backfill/cdc/
cdc_backill_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{
27    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
28};
29use risingwave_connector::source::cdc::CdcScanOptions;
30use risingwave_connector::source::cdc::external::{
31    CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
32};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use risingwave_pb::common::ThrottleType;
35use rw_futures_util::pausable;
36use thiserror_ext::AsReport;
37use tracing::Instrument;
38
39use crate::executor::UpdateMutation;
40use crate::executor::backfill::cdc::cdc_backfill::{
41    build_reader_and_poll_upstream, transform_upstream,
42};
43use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
44use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
45use crate::executor::backfill::cdc::upstream_table::snapshot::{
46    SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
47};
48use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
49use crate::executor::prelude::*;
50use crate::executor::source::get_infinite_backoff_strategy;
51use crate::task::cdc_progress::CdcProgressReporter;
52pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
53    actor_ctx: ActorContextRef,
54
55    /// The external table to be backfilled
56    external_table: ExternalStorageTable,
57
58    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
59    upstream: Executor,
60
61    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
62    output_indices: Vec<usize>,
63
64    /// The schema of output chunk, including additional columns if any
65    output_columns: Vec<ColumnDesc>,
66
67    /// Rate limit in rows/s.
68    rate_limit_rps: Option<u32>,
69
70    options: CdcScanOptions,
71
72    state_table: StateTable<S>,
73
74    properties: BTreeMap<String, String>,
75
76    progress: Option<CdcProgressReporter>,
77}
78
79impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
80    #[allow(clippy::too_many_arguments)]
81    pub fn new(
82        actor_ctx: ActorContextRef,
83        external_table: ExternalStorageTable,
84        upstream: Executor,
85        output_indices: Vec<usize>,
86        output_columns: Vec<ColumnDesc>,
87        _metrics: Arc<StreamingMetrics>,
88        state_table: StateTable<S>,
89        rate_limit_rps: Option<u32>,
90        options: CdcScanOptions,
91        properties: BTreeMap<String, String>,
92        progress: Option<CdcProgressReporter>,
93    ) -> Self {
94        Self {
95            actor_ctx,
96            external_table,
97            upstream,
98            output_indices,
99            output_columns,
100            rate_limit_rps,
101            options,
102            state_table,
103            properties,
104            progress,
105        }
106    }
107
108    #[try_stream(ok = Message, error = StreamExecutorError)]
109    async fn execute_inner(mut self) {
110        assert!(!self.options.disable_backfill);
111        // The indices to primary key columns
112        let pk_indices = self.external_table.pk_indices().to_vec();
113        let table_id = self.external_table.table_id();
114        let upstream_table_name = self.external_table.qualified_table_name();
115        let schema_table_name = self.external_table.schema_table_name().clone();
116        let external_database_name = self.external_table.database_name().to_owned();
117        let additional_columns = self
118            .output_columns
119            .iter()
120            .filter(|col| col.additional_column.column_type.is_some())
121            .cloned()
122            .collect_vec();
123        assert!(
124            (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
125            "split pk column index {} out of bound",
126            self.options.backfill_split_pk_column_index
127        );
128        let snapshot_split_column_index =
129            pk_indices[self.options.backfill_split_pk_column_index as usize];
130        let cdc_table_snapshot_split_column =
131            vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
132
133        let mut upstream = self.upstream.execute();
134        // Poll the upstream to get the first barrier.
135        let first_barrier = expect_first_barrier(&mut upstream).await?;
136        // Make sure to use mapping_message after transform_upstream.
137
138        // If user sets debezium.time.precision.mode to "connect", it means the user can guarantee
139        // that the upstream data precision is MilliSecond. In this case, we don't use GuessNumberUnit
140        // mode to guess precision, but use Milli mode directly, which can handle extreme timestamps.
141        let timestamp_handling: Option<TimestampHandling> = self
142            .properties
143            .get("debezium.time.precision.mode")
144            .map(|v| v == "connect")
145            .unwrap_or(false)
146            .then_some(TimestampHandling::Milli);
147        let timestamptz_handling: Option<TimestamptzHandling> = self
148            .properties
149            .get("debezium.time.precision.mode")
150            .map(|v| v == "connect")
151            .unwrap_or(false)
152            .then_some(TimestamptzHandling::Milli);
153        let time_handling: Option<TimeHandling> = self
154            .properties
155            .get("debezium.time.precision.mode")
156            .map(|v| v == "connect")
157            .unwrap_or(false)
158            .then_some(TimeHandling::Milli);
159        let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
160            .properties
161            .get("debezium.bigint.unsigned.handling.mode")
162            .map(|v| v == "precise")
163            .unwrap_or(false)
164            .then_some(BigintUnsignedHandlingMode::Precise);
165        // Only postgres-cdc connector may trigger TOAST.
166        let handle_toast_columns: bool =
167            self.external_table.table_type() == &ExternalCdcTableType::Postgres;
168        let mut upstream = transform_upstream(
169            upstream,
170            self.output_columns.clone(),
171            timestamp_handling,
172            timestamptz_handling,
173            time_handling,
174            bigint_unsigned_handling,
175            handle_toast_columns,
176        )
177        .boxed();
178        let mut next_reset_barrier = Some(first_barrier);
179        let mut is_reset = false;
180        let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
181        // The buffered chunks have already been mapped.
182        let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
183
184        // Need reset on CDC table snapshot splits reschedule.
185        'with_cdc_table_snapshot_splits: loop {
186            assert!(upstream_chunk_buffer.is_empty());
187            let reset_barrier = next_reset_barrier.take().unwrap();
188            let all_snapshot_splits = match reset_barrier.mutation.as_deref() {
189                Some(Mutation::Add(add)) => &add.actor_cdc_table_snapshot_splits.splits,
190
191                Some(Mutation::Update(update)) => &update.actor_cdc_table_snapshot_splits.splits,
192                _ => {
193                    return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
194                }
195            };
196            let mut actor_snapshot_splits = vec![];
197            let mut generation = None;
198            // TODO(zw): optimization: remove consumed splits to reduce barrier size for downstream.
199            if let Some((splits, snapshot_generation)) = all_snapshot_splits.get(&self.actor_ctx.id)
200            {
201                actor_snapshot_splits = splits
202                    .iter()
203                    .map(|s: &CdcTableSnapshotSplitRaw| {
204                        let de = RowDeserializer::new(
205                            cdc_table_snapshot_split_column
206                                .iter()
207                                .map(Field::data_type)
208                                .collect_vec(),
209                        );
210                        let left_bound_inclusive =
211                            de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
212                        let right_bound_exclusive =
213                            de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
214                        CdcTableSnapshotSplit {
215                            split_id: s.split_id,
216                            left_bound_inclusive,
217                            right_bound_exclusive,
218                        }
219                    })
220                    .collect();
221                generation = Some(*snapshot_generation);
222            }
223            tracing::debug!(?actor_snapshot_splits, ?generation, "actor splits");
224            assert_consecutive_splits(&actor_snapshot_splits);
225
226            let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
227            let barrier_epoch = reset_barrier.epoch;
228            yield Message::Barrier(reset_barrier);
229            if !is_reset {
230                state_impl.init_epoch(barrier_epoch).await?;
231                is_reset = true;
232                tracing::info!(%table_id, "Initialize executor.");
233            } else {
234                tracing::info!(%table_id, "Reset executor.");
235            }
236
237            let mut current_actor_bounds = None;
238            let mut actor_cdc_offset_high: Option<CdcOffset> = None;
239            let mut actor_cdc_offset_low: Option<CdcOffset> = None;
240            // Find next split that need backfill.
241            let mut next_split_idx = actor_snapshot_splits.len();
242            for (idx, split) in actor_snapshot_splits.iter().enumerate() {
243                let state = state_impl.restore_state(split.split_id).await?;
244                if !state.is_finished {
245                    next_split_idx = idx;
246                    break;
247                }
248                extends_current_actor_bound(&mut current_actor_bounds, split);
249                if let Some(ref cdc_offset) = state.cdc_offset_low {
250                    if let Some(ref cur) = actor_cdc_offset_low {
251                        if *cur > *cdc_offset {
252                            actor_cdc_offset_low = state.cdc_offset_low.clone();
253                        }
254                    } else {
255                        actor_cdc_offset_low = state.cdc_offset_low.clone();
256                    }
257                }
258                if let Some(ref cdc_offset) = state.cdc_offset_high {
259                    if let Some(ref cur) = actor_cdc_offset_high {
260                        if *cur < *cdc_offset {
261                            actor_cdc_offset_high = state.cdc_offset_high.clone();
262                        }
263                    } else {
264                        actor_cdc_offset_high = state.cdc_offset_high.clone();
265                    }
266                }
267            }
268            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
269                // Initialize state so that overall progress can be measured.
270                state_impl
271                    .mutate_state(split.split_id, false, 0, None, None)
272                    .await?;
273            }
274            let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
275                Some((
276                    actor_snapshot_splits[0].split_id,
277                    actor_snapshot_splits[next_split_idx - 1].split_id,
278                ))
279            } else {
280                None
281            };
282
283            // After init the state table and forward the initial barrier to downstream,
284            // we now try to create the table reader with retry.
285            let mut table_reader: Option<ExternalTableReaderImpl> = None;
286            let external_table = self.external_table.clone();
287            let mut future = Box::pin(async move {
288                let backoff = get_infinite_backoff_strategy();
289                tokio_retry::Retry::spawn(backoff, || async {
290                    match external_table.create_table_reader().await {
291                        Ok(reader) => Ok(reader),
292                        Err(e) => {
293                            tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
294                            Err(e)
295                        }
296                    }
297                })
298                    .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
299                    .await
300                    .expect("Retry create cdc table reader until success.")
301            });
302            loop {
303                if let Some(msg) =
304                    build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
305                        .await?
306                {
307                    if let Some(msg) = mapping_message(msg, &self.output_indices) {
308                        match msg {
309                            Message::Barrier(barrier) => {
310                                state_impl.commit_state(barrier.epoch).await?;
311                                if is_reset_barrier(&barrier, self.actor_ctx.id) {
312                                    next_reset_barrier = Some(barrier);
313                                    continue 'with_cdc_table_snapshot_splits;
314                                }
315                                yield Message::Barrier(barrier);
316                            }
317                            Message::Chunk(chunk) => {
318                                if chunk.cardinality() == 0 {
319                                    continue;
320                                }
321                                if let Some(filtered_chunk) = filter_stream_chunk(
322                                    chunk,
323                                    &current_actor_bounds,
324                                    snapshot_split_column_index,
325                                ) && filtered_chunk.cardinality() > 0
326                                {
327                                    yield Message::Chunk(filtered_chunk);
328                                }
329                            }
330                            Message::Watermark(_) => {
331                                // Ignore watermark, like the `CdcBackfillExecutor`.
332                            }
333                        }
334                    }
335                } else {
336                    assert!(table_reader.is_some(), "table reader must created");
337                    tracing::info!(
338                        %table_id,
339                        upstream_table_name,
340                        "table reader created successfully"
341                    );
342                    break;
343                }
344            }
345            let upstream_table_reader = UpstreamTableReader::new(
346                self.external_table.clone(),
347                table_reader.expect("table reader must created"),
348            );
349            // let mut upstream = upstream.peekable();
350            let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
351
352            // Backfill snapshot splits sequentially.
353            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
354                tracing::info!(
355                    %table_id,
356                    upstream_table_name,
357                    ?split,
358                    is_snapshot_paused,
359                    "start cdc backfill split"
360                );
361                extends_current_actor_bound(&mut current_actor_bounds, split);
362
363                let split_cdc_offset_low = {
364                    // Limit concurrent CDC connections globally to 10 using a semaphore.
365                    static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
366                        tokio::sync::Semaphore::const_new(10);
367
368                    let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
369                    upstream_table_reader.current_cdc_offset().await?
370                };
371                if let Some(ref cdc_offset) = split_cdc_offset_low {
372                    if let Some(ref cur) = actor_cdc_offset_low {
373                        if *cur > *cdc_offset {
374                            actor_cdc_offset_low = split_cdc_offset_low.clone();
375                        }
376                    } else {
377                        actor_cdc_offset_low = split_cdc_offset_low.clone();
378                    }
379                }
380                let mut split_cdc_offset_high = None;
381
382                let left_upstream = upstream.by_ref().map(Either::Left);
383                let read_args = SplitSnapshotReadArgs::new(
384                    split.left_bound_inclusive.clone(),
385                    split.right_bound_exclusive.clone(),
386                    cdc_table_snapshot_split_column.clone(),
387                    self.rate_limit_rps,
388                    additional_columns.clone(),
389                    schema_table_name.clone(),
390                    external_database_name.clone(),
391                );
392                let right_snapshot = pin!(
393                    upstream_table_reader
394                        .snapshot_read_table_split(read_args)
395                        .map(Either::Right)
396                );
397                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
398                if is_snapshot_paused {
399                    snapshot_valve.pause();
400                }
401                let mut backfill_stream =
402                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
403                        stream::PollNext::Left
404                    });
405                let mut row_count: u64 = 0;
406                #[for_await]
407                for either in &mut backfill_stream {
408                    match either {
409                        // Upstream
410                        Either::Left(msg) => {
411                            match msg? {
412                                Message::Barrier(barrier) => {
413                                    state_impl.commit_state(barrier.epoch).await?;
414                                    if let Some(mutation) = barrier.mutation.as_deref() {
415                                        use crate::executor::Mutation;
416                                        match mutation {
417                                            Mutation::Pause => {
418                                                is_snapshot_paused = true;
419                                                snapshot_valve.pause();
420                                            }
421                                            Mutation::Resume => {
422                                                is_snapshot_paused = false;
423                                                snapshot_valve.resume();
424                                            }
425                                            Mutation::Throttle(some) => {
426                                                // TODO(zw): optimization: improve throttle.
427                                                // 1. Handle rate limit 0. Currently, to resume the process, the actor must be rebuilt.
428                                                // 2. Apply new rate limit immediately.
429                                                if let Some(entry) =
430                                                    some.get(&self.actor_ctx.fragment_id)
431                                                    && entry.throttle_type()
432                                                        == ThrottleType::Backfill
433                                                    && entry.rate_limit != self.rate_limit_rps
434                                                {
435                                                    // The new rate limit will take effect since next split.
436                                                    self.rate_limit_rps = entry.rate_limit;
437                                                }
438                                            }
439                                            Mutation::Update(UpdateMutation {
440                                                dropped_actors,
441                                                ..
442                                            }) => {
443                                                if dropped_actors.contains(&self.actor_ctx.id) {
444                                                    tracing::info!(
445                                                        %table_id,
446                                                        upstream_table_name,
447                                                        "CdcBackfill has been dropped due to config change"
448                                                    );
449                                                    for chunk in upstream_chunk_buffer.drain(..) {
450                                                        yield Message::Chunk(chunk);
451                                                    }
452                                                    yield Message::Barrier(barrier);
453                                                    let () = futures::future::pending().await;
454                                                    unreachable!();
455                                                }
456                                            }
457                                            _ => (),
458                                        }
459                                    }
460                                    if is_reset_barrier(&barrier, self.actor_ctx.id) {
461                                        next_reset_barrier = Some(barrier);
462                                        for chunk in upstream_chunk_buffer.drain(..) {
463                                            yield Message::Chunk(chunk);
464                                        }
465                                        continue 'with_cdc_table_snapshot_splits;
466                                    }
467                                    if let Some(split_range) =
468                                        should_report_actor_backfill_progress.take()
469                                        && let Some(ref progress) = self.progress
470                                    {
471                                        progress.update(
472                                            self.actor_ctx.fragment_id,
473                                            self.actor_ctx.id,
474                                            barrier.epoch,
475                                            generation.expect("should have set generation when having progress to report"),
476                                            split_range,
477                                        );
478                                    }
479                                    // emit barrier and continue to consume the backfill stream
480                                    yield Message::Barrier(barrier);
481                                }
482                                Message::Chunk(chunk) => {
483                                    // skip empty upstream chunk
484                                    if chunk.cardinality() == 0 {
485                                        continue;
486                                    }
487
488                                    // TODO(zw): re-enable
489                                    // let chunk_cdc_offset =
490                                    //     get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
491                                    // if *self.external_table.table_type()
492                                    //     == ExternalCdcTableType::Postgres
493                                    //     && let Some(cur) = actor_cdc_offset_low.as_ref()
494                                    //     && let Some(chunk_offset) = chunk_cdc_offset
495                                    //     && chunk_offset < *cur
496                                    // {
497                                    //     continue;
498                                    // }
499
500                                    let chunk = mapping_chunk(chunk, &self.output_indices);
501                                    if let Some(filtered_chunk) = filter_stream_chunk(
502                                        chunk,
503                                        &current_actor_bounds,
504                                        snapshot_split_column_index,
505                                    ) && filtered_chunk.cardinality() > 0
506                                    {
507                                        // Buffer the upstream chunk.
508                                        upstream_chunk_buffer.push(filtered_chunk.compact_vis());
509                                    }
510                                }
511                                Message::Watermark(_) => {
512                                    // Ignore watermark during backfill, like the `CdcBackfillExecutor`.
513                                }
514                            }
515                        }
516                        // Snapshot read
517                        Either::Right(msg) => {
518                            match msg? {
519                                None => {
520                                    tracing::info!(
521                                        %table_id,
522                                        split_id = split.split_id,
523                                        "snapshot read stream ends"
524                                    );
525                                    for chunk in upstream_chunk_buffer.drain(..) {
526                                        yield Message::Chunk(chunk);
527                                    }
528
529                                    split_cdc_offset_high = {
530                                        // Limit concurrent CDC connections globally to 10 using a semaphore.
531                                        static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
532                                            tokio::sync::Semaphore::const_new(10);
533
534                                        let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
535                                        upstream_table_reader.current_cdc_offset().await?
536                                    };
537                                    if let Some(ref cdc_offset) = split_cdc_offset_high {
538                                        if let Some(ref cur) = actor_cdc_offset_high {
539                                            if *cur < *cdc_offset {
540                                                actor_cdc_offset_high =
541                                                    split_cdc_offset_high.clone();
542                                            }
543                                        } else {
544                                            actor_cdc_offset_high = split_cdc_offset_high.clone();
545                                        }
546                                    }
547                                    // Next split.
548                                    break;
549                                }
550                                Some(chunk) => {
551                                    let chunk_cardinality = chunk.cardinality() as u64;
552                                    row_count = row_count.saturating_add(chunk_cardinality);
553                                    yield Message::Chunk(mapping_chunk(
554                                        chunk,
555                                        &self.output_indices,
556                                    ));
557                                }
558                            }
559                        }
560                    }
561                }
562                // Mark current split backfill as finished. The state will be persisted by next barrier.
563                state_impl
564                    .mutate_state(
565                        split.split_id,
566                        true,
567                        row_count,
568                        split_cdc_offset_low,
569                        split_cdc_offset_high,
570                    )
571                    .await?;
572                if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
573                    assert!(
574                        *right_split < split.split_id,
575                        "{} {}",
576                        *right_split,
577                        split.split_id
578                    );
579                    *right_split = split.split_id;
580                } else {
581                    should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
582                }
583            }
584
585            upstream_table_reader.disconnect().await?;
586            tracing::info!(
587                %table_id,
588                upstream_table_name,
589                "CdcBackfill has already finished and will forward messages directly to the downstream"
590            );
591
592            let mut should_report_actor_backfill_done = false;
593            // After backfill progress finished
594            // we can forward messages directly to the downstream,
595            // as backfill is finished.
596            #[for_await]
597            for msg in &mut upstream {
598                let msg = msg?;
599                match msg {
600                    Message::Barrier(barrier) => {
601                        state_impl.commit_state(barrier.epoch).await?;
602                        if is_reset_barrier(&barrier, self.actor_ctx.id) {
603                            next_reset_barrier = Some(barrier);
604                            continue 'with_cdc_table_snapshot_splits;
605                        }
606                        if let Some(split_range) = should_report_actor_backfill_progress.take()
607                            && let Some(ref progress) = self.progress
608                        {
609                            progress.update(
610                                self.actor_ctx.fragment_id,
611                                self.actor_ctx.id,
612                                barrier.epoch,
613                                generation.expect(
614                                    "should have set generation when having progress to report",
615                                ),
616                                split_range,
617                            );
618                        }
619                        if should_report_actor_backfill_done {
620                            should_report_actor_backfill_done = false;
621                            assert!(!actor_snapshot_splits.is_empty());
622                            if let Some(ref progress) = self.progress {
623                                progress.finish(
624                                    self.actor_ctx.fragment_id,
625                                    self.actor_ctx.id,
626                                    barrier.epoch,
627                                    generation.expect(
628                                        "should have set generation when having progress to report",
629                                    ),
630                                    (
631                                        actor_snapshot_splits[0].split_id,
632                                        actor_snapshot_splits[actor_snapshot_splits.len() - 1]
633                                            .split_id,
634                                    ),
635                                );
636                            }
637                        }
638                        yield Message::Barrier(barrier);
639                    }
640                    Message::Chunk(chunk) => {
641                        if actor_snapshot_splits.is_empty() {
642                            continue;
643                        }
644                        if chunk.cardinality() == 0 {
645                            continue;
646                        }
647
648                        let chunk_cdc_offset =
649                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
650                        // // TODO(zw): re-enable
651                        // if *self.external_table.table_type() == ExternalCdcTableType::Postgres
652                        //     && let Some(cur) = actor_cdc_offset_low.as_ref()
653                        //     && let Some(ref chunk_offset) = chunk_cdc_offset
654                        //     && *chunk_offset < *cur
655                        // {
656                        //     continue;
657                        // }
658
659                        // should_report_actor_backfill_done is set to true at most once.
660                        if let Some(high) = actor_cdc_offset_high.as_ref() {
661                            if state_impl.is_legacy_state() {
662                                // Since the legacy state does not track CDC offsets, report backfill completion immediately.
663                                actor_cdc_offset_high = None;
664                                should_report_actor_backfill_done = true;
665                            } else if let Some(ref chunk_offset) = chunk_cdc_offset
666                                && *chunk_offset >= *high
667                            {
668                                // Report backfill completion once the latest CDC offset exceeds the highest offset tracked during the backfill.
669                                actor_cdc_offset_high = None;
670                                should_report_actor_backfill_done = true;
671                            }
672                        }
673                        let chunk = mapping_chunk(chunk, &self.output_indices);
674                        if let Some(filtered_chunk) = filter_stream_chunk(
675                            chunk,
676                            &current_actor_bounds,
677                            snapshot_split_column_index,
678                        ) && filtered_chunk.cardinality() > 0
679                        {
680                            yield Message::Chunk(filtered_chunk);
681                        }
682                    }
683                    msg @ Message::Watermark(_) => {
684                        if let Some(msg) = mapping_message(msg, &self.output_indices) {
685                            yield msg;
686                        }
687                    }
688                }
689            }
690        }
691    }
692}
693
694fn filter_stream_chunk(
695    chunk: StreamChunk,
696    bound: &Option<(OwnedRow, OwnedRow)>,
697    snapshot_split_column_index: usize,
698) -> Option<StreamChunk> {
699    let Some((left, right)) = bound else {
700        return None;
701    };
702    assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
703    assert_eq!(
704        right.len(),
705        1,
706        "multiple split columns is not supported yet"
707    );
708    let left_split_key = left.datum_at(0);
709    let right_split_key = right.datum_at(0);
710    let is_leftmost_bound = is_leftmost_bound(left);
711    let is_rightmost_bound = is_rightmost_bound(right);
712    if is_leftmost_bound && is_rightmost_bound {
713        return Some(chunk);
714    }
715    let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
716    let (ops, columns, visibility) = chunk.into_inner();
717    for (row_split_key, v) in columns[snapshot_split_column_index]
718        .iter()
719        .zip_eq_fast(visibility.iter())
720    {
721        if !v {
722            new_bitmap.append(false);
723            continue;
724        }
725        let mut is_in_range = true;
726        if !is_leftmost_bound {
727            is_in_range = cmp_datum(
728                row_split_key,
729                left_split_key,
730                OrderType::ascending_nulls_first(),
731            )
732            .is_ge();
733        }
734        if is_in_range && !is_rightmost_bound {
735            is_in_range = cmp_datum(
736                row_split_key,
737                right_split_key,
738                OrderType::ascending_nulls_first(),
739            )
740            .is_lt();
741        }
742        if !is_in_range {
743            tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
744        }
745        new_bitmap.append(is_in_range);
746    }
747    Some(StreamChunk::with_visibility(
748        ops,
749        columns,
750        new_bitmap.finish(),
751    ))
752}
753
754fn is_leftmost_bound(row: &OwnedRow) -> bool {
755    row.iter().all(|d| d.is_none())
756}
757
758fn is_rightmost_bound(row: &OwnedRow) -> bool {
759    row.iter().all(|d| d.is_none())
760}
761
762impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
763    fn execute(self: Box<Self>) -> BoxedMessageStream {
764        self.execute_inner().boxed()
765    }
766}
767
768fn extends_current_actor_bound(
769    current: &mut Option<(OwnedRow, OwnedRow)>,
770    split: &CdcTableSnapshotSplit,
771) {
772    if current.is_none() {
773        *current = Some((
774            split.left_bound_inclusive.clone(),
775            split.right_bound_exclusive.clone(),
776        ));
777    } else {
778        current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
779    }
780}
781
782fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
783    match barrier.mutation.as_deref() {
784        Some(Mutation::Update(update)) => update
785            .actor_cdc_table_snapshot_splits
786            .splits
787            .contains_key(&actor_id),
788        _ => false,
789    }
790}
791
792fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
793    for i in 1..actor_snapshot_splits.len() {
794        assert_eq!(
795            actor_snapshot_splits[i].split_id,
796            actor_snapshot_splits[i - 1].split_id + 1,
797            "{:?}",
798            actor_snapshot_splits
799        );
800        assert!(
801            cmp_datum(
802                actor_snapshot_splits[i - 1]
803                    .right_bound_exclusive
804                    .datum_at(0),
805                actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
806                OrderType::ascending_nulls_last(),
807            )
808            .is_lt()
809        );
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use risingwave_common::array::StreamChunk;
816    use risingwave_common::row::OwnedRow;
817    use risingwave_common::types::ScalarImpl;
818
819    use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
820
821    #[test]
822    fn test_filter_stream_chunk() {
823        use risingwave_common::array::StreamChunkTestExt;
824        let chunk = StreamChunk::from_pretty(
825            "  I I
826             + 1 6
827             - 2 .
828            U- 3 7
829            U+ 4 .",
830        );
831        let bound = None;
832        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
833        assert!(c.is_none());
834
835        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
836        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
837        assert_eq!(c.unwrap().compact_vis(), chunk);
838
839        let bound = Some((
840            OwnedRow::new(vec![None]),
841            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
842        ));
843        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
844        assert_eq!(
845            c.unwrap().compact_vis(),
846            StreamChunk::from_pretty(
847                "  I I
848             + 1 6
849             - 2 .",
850            )
851        );
852
853        let bound = Some((
854            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
855            OwnedRow::new(vec![None]),
856        ));
857        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
858        assert_eq!(
859            c.unwrap().compact_vis(),
860            StreamChunk::from_pretty(
861                "  I I
862            U- 3 7
863            U+ 4 .",
864            )
865        );
866
867        let bound = Some((
868            OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
869            OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
870        ));
871        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
872        assert_eq!(
873            c.unwrap().compact_vis(),
874            StreamChunk::from_pretty(
875                "  I I
876             - 2 .
877            U- 3 7",
878            )
879        );
880
881        // Test NULL value.
882        let bound = None;
883        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
884        assert!(c.is_none());
885
886        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
887        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
888        assert_eq!(c.unwrap().compact_vis(), chunk);
889
890        let bound = Some((
891            OwnedRow::new(vec![None]),
892            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
893        ));
894        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
895        assert_eq!(
896            c.unwrap().compact_vis(),
897            StreamChunk::from_pretty(
898                "  I I
899             + 1 6
900             - 2 .
901            U+ 4 .",
902            )
903        );
904
905        let bound = Some((
906            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
907            OwnedRow::new(vec![None]),
908        ));
909        let c = filter_stream_chunk(chunk, &bound, 1);
910        assert_eq!(
911            c.unwrap().compact_vis(),
912            StreamChunk::from_pretty(
913                "  I I
914            U- 3 7",
915            )
916        );
917    }
918}