risingwave_stream/executor/backfill/cdc/
cdc_backill_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use either::Either;
18use futures::stream;
19use futures::stream::select_with_strategy;
20use itertools::Itertools;
21use risingwave_common::bitmap::BitmapBuilder;
22use risingwave_common::catalog::{ColumnDesc, Field};
23use risingwave_common::row::RowDeserializer;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::sort_util::{OrderType, cmp_datum};
26use risingwave_connector::parser::{
27    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
28};
29use risingwave_connector::source::cdc::CdcScanOptions;
30use risingwave_connector::source::cdc::external::{
31    CdcOffset, ExternalCdcTableType, ExternalTableReaderImpl,
32};
33use risingwave_connector::source::{CdcTableSnapshotSplit, CdcTableSnapshotSplitRaw};
34use rw_futures_util::pausable;
35use thiserror_ext::AsReport;
36use tracing::Instrument;
37
38use crate::executor::UpdateMutation;
39use crate::executor::backfill::cdc::cdc_backfill::{
40    build_reader_and_poll_upstream, transform_upstream,
41};
42use crate::executor::backfill::cdc::state_v2::ParallelizedCdcBackfillState;
43use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
44use crate::executor::backfill::cdc::upstream_table::snapshot::{
45    SplitSnapshotReadArgs, UpstreamTableRead, UpstreamTableReader,
46};
47use crate::executor::backfill::utils::{get_cdc_chunk_last_offset, mapping_chunk, mapping_message};
48use crate::executor::prelude::*;
49use crate::executor::source::get_infinite_backoff_strategy;
50use crate::task::cdc_progress::CdcProgressReporter;
51pub struct ParallelizedCdcBackfillExecutor<S: StateStore> {
52    actor_ctx: ActorContextRef,
53
54    /// The external table to be backfilled
55    external_table: ExternalStorageTable,
56
57    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
58    upstream: Executor,
59
60    /// The column indices need to be forwarded to the downstream from the upstream and table scan.
61    output_indices: Vec<usize>,
62
63    /// The schema of output chunk, including additional columns if any
64    output_columns: Vec<ColumnDesc>,
65
66    /// Rate limit in rows/s.
67    rate_limit_rps: Option<u32>,
68
69    options: CdcScanOptions,
70
71    state_table: StateTable<S>,
72
73    properties: BTreeMap<String, String>,
74
75    progress: Option<CdcProgressReporter>,
76}
77
78impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
79    #[allow(clippy::too_many_arguments)]
80    pub fn new(
81        actor_ctx: ActorContextRef,
82        external_table: ExternalStorageTable,
83        upstream: Executor,
84        output_indices: Vec<usize>,
85        output_columns: Vec<ColumnDesc>,
86        _metrics: Arc<StreamingMetrics>,
87        state_table: StateTable<S>,
88        rate_limit_rps: Option<u32>,
89        options: CdcScanOptions,
90        properties: BTreeMap<String, String>,
91        progress: Option<CdcProgressReporter>,
92    ) -> Self {
93        Self {
94            actor_ctx,
95            external_table,
96            upstream,
97            output_indices,
98            output_columns,
99            rate_limit_rps,
100            options,
101            state_table,
102            properties,
103            progress,
104        }
105    }
106
107    #[try_stream(ok = Message, error = StreamExecutorError)]
108    async fn execute_inner(mut self) {
109        assert!(!self.options.disable_backfill);
110        // The indices to primary key columns
111        let pk_indices = self.external_table.pk_indices().to_vec();
112        let table_id = self.external_table.table_id();
113        let upstream_table_name = self.external_table.qualified_table_name();
114        let schema_table_name = self.external_table.schema_table_name().clone();
115        let external_database_name = self.external_table.database_name().to_owned();
116        let additional_columns = self
117            .output_columns
118            .iter()
119            .filter(|col| col.additional_column.column_type.is_some())
120            .cloned()
121            .collect_vec();
122        assert!(
123            (self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
124            "split pk column index {} out of bound",
125            self.options.backfill_split_pk_column_index
126        );
127        let snapshot_split_column_index =
128            pk_indices[self.options.backfill_split_pk_column_index as usize];
129        let cdc_table_snapshot_split_column =
130            vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];
131
132        let mut upstream = self.upstream.execute();
133        // Poll the upstream to get the first barrier.
134        let first_barrier = expect_first_barrier(&mut upstream).await?;
135        // Make sure to use mapping_message after transform_upstream.
136
137        // If user sets debezium.time.precision.mode to "connect", it means the user can guarantee
138        // that the upstream data precision is MilliSecond. In this case, we don't use GuessNumberUnit
139        // mode to guess precision, but use Milli mode directly, which can handle extreme timestamps.
140        let timestamp_handling: Option<TimestampHandling> = self
141            .properties
142            .get("debezium.time.precision.mode")
143            .map(|v| v == "connect")
144            .unwrap_or(false)
145            .then_some(TimestampHandling::Milli);
146        let timestamptz_handling: Option<TimestamptzHandling> = self
147            .properties
148            .get("debezium.time.precision.mode")
149            .map(|v| v == "connect")
150            .unwrap_or(false)
151            .then_some(TimestamptzHandling::Milli);
152        let time_handling: Option<TimeHandling> = self
153            .properties
154            .get("debezium.time.precision.mode")
155            .map(|v| v == "connect")
156            .unwrap_or(false)
157            .then_some(TimeHandling::Milli);
158        let bigint_unsigned_handling: Option<BigintUnsignedHandlingMode> = self
159            .properties
160            .get("debezium.bigint.unsigned.handling.mode")
161            .map(|v| v == "precise")
162            .unwrap_or(false)
163            .then_some(BigintUnsignedHandlingMode::Precise);
164        // Only postgres-cdc connector may trigger TOAST.
165        let handle_toast_columns: bool =
166            self.external_table.table_type() == &ExternalCdcTableType::Postgres;
167        let mut upstream = transform_upstream(
168            upstream,
169            self.output_columns.clone(),
170            timestamp_handling,
171            timestamptz_handling,
172            time_handling,
173            bigint_unsigned_handling,
174            handle_toast_columns,
175        )
176        .boxed();
177        let mut next_reset_barrier = Some(first_barrier);
178        let mut is_reset = false;
179        let mut state_impl = ParallelizedCdcBackfillState::new(self.state_table);
180        // The buffered chunks have already been mapped.
181        let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
182
183        // Need reset on CDC table snapshot splits reschedule.
184        'with_cdc_table_snapshot_splits: loop {
185            assert!(upstream_chunk_buffer.is_empty());
186            let reset_barrier = next_reset_barrier.take().unwrap();
187            let all_snapshot_splits = match reset_barrier.mutation.as_deref() {
188                Some(Mutation::Add(add)) => &add.actor_cdc_table_snapshot_splits.splits,
189
190                Some(Mutation::Update(update)) => &update.actor_cdc_table_snapshot_splits.splits,
191                _ => {
192                    return Err(anyhow::anyhow!("ParallelizedCdcBackfillExecutor expects either Mutation::Add or Mutation::Update to initialize CDC table snapshot splits.").into());
193                }
194            };
195            let mut actor_snapshot_splits = vec![];
196            let mut generation = None;
197            // TODO(zw): optimization: remove consumed splits to reduce barrier size for downstream.
198            if let Some((splits, snapshot_generation)) = all_snapshot_splits.get(&self.actor_ctx.id)
199            {
200                actor_snapshot_splits = splits
201                    .iter()
202                    .map(|s: &CdcTableSnapshotSplitRaw| {
203                        let de = RowDeserializer::new(
204                            cdc_table_snapshot_split_column
205                                .iter()
206                                .map(Field::data_type)
207                                .collect_vec(),
208                        );
209                        let left_bound_inclusive =
210                            de.deserialize(s.left_bound_inclusive.as_ref()).unwrap();
211                        let right_bound_exclusive =
212                            de.deserialize(s.right_bound_exclusive.as_ref()).unwrap();
213                        CdcTableSnapshotSplit {
214                            split_id: s.split_id,
215                            left_bound_inclusive,
216                            right_bound_exclusive,
217                        }
218                    })
219                    .collect();
220                generation = Some(*snapshot_generation);
221            }
222            tracing::debug!(?actor_snapshot_splits, ?generation, "actor splits");
223            assert_consecutive_splits(&actor_snapshot_splits);
224
225            let mut is_snapshot_paused = reset_barrier.is_pause_on_startup();
226            let barrier_epoch = reset_barrier.epoch;
227            yield Message::Barrier(reset_barrier);
228            if !is_reset {
229                state_impl.init_epoch(barrier_epoch).await?;
230                is_reset = true;
231                tracing::info!(%table_id, "Initialize executor.");
232            } else {
233                tracing::info!(%table_id, "Reset executor.");
234            }
235
236            let mut current_actor_bounds = None;
237            let mut actor_cdc_offset_high: Option<CdcOffset> = None;
238            let mut actor_cdc_offset_low: Option<CdcOffset> = None;
239            // Find next split that need backfill.
240            let mut next_split_idx = actor_snapshot_splits.len();
241            for (idx, split) in actor_snapshot_splits.iter().enumerate() {
242                let state = state_impl.restore_state(split.split_id).await?;
243                if !state.is_finished {
244                    next_split_idx = idx;
245                    break;
246                }
247                extends_current_actor_bound(&mut current_actor_bounds, split);
248                if let Some(ref cdc_offset) = state.cdc_offset_low {
249                    if let Some(ref cur) = actor_cdc_offset_low {
250                        if *cur > *cdc_offset {
251                            actor_cdc_offset_low = state.cdc_offset_low.clone();
252                        }
253                    } else {
254                        actor_cdc_offset_low = state.cdc_offset_low.clone();
255                    }
256                }
257                if let Some(ref cdc_offset) = state.cdc_offset_high {
258                    if let Some(ref cur) = actor_cdc_offset_high {
259                        if *cur < *cdc_offset {
260                            actor_cdc_offset_high = state.cdc_offset_high.clone();
261                        }
262                    } else {
263                        actor_cdc_offset_high = state.cdc_offset_high.clone();
264                    }
265                }
266            }
267            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
268                // Initialize state so that overall progress can be measured.
269                state_impl
270                    .mutate_state(split.split_id, false, 0, None, None)
271                    .await?;
272            }
273            let mut should_report_actor_backfill_progress = if next_split_idx > 0 {
274                Some((
275                    actor_snapshot_splits[0].split_id,
276                    actor_snapshot_splits[next_split_idx - 1].split_id,
277                ))
278            } else {
279                None
280            };
281
282            // After init the state table and forward the initial barrier to downstream,
283            // we now try to create the table reader with retry.
284            let mut table_reader: Option<ExternalTableReaderImpl> = None;
285            let external_table = self.external_table.clone();
286            let mut future = Box::pin(async move {
287                let backoff = get_infinite_backoff_strategy();
288                tokio_retry::Retry::spawn(backoff, || async {
289                    match external_table.create_table_reader().await {
290                        Ok(reader) => Ok(reader),
291                        Err(e) => {
292                            tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
293                            Err(e)
294                        }
295                    }
296                })
297                    .instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
298                    .await
299                    .expect("Retry create cdc table reader until success.")
300            });
301            loop {
302                if let Some(msg) =
303                    build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
304                        .await?
305                {
306                    if let Some(msg) = mapping_message(msg, &self.output_indices) {
307                        match msg {
308                            Message::Barrier(barrier) => {
309                                state_impl.commit_state(barrier.epoch).await?;
310                                if is_reset_barrier(&barrier, self.actor_ctx.id) {
311                                    next_reset_barrier = Some(barrier);
312                                    continue 'with_cdc_table_snapshot_splits;
313                                }
314                                yield Message::Barrier(barrier);
315                            }
316                            Message::Chunk(chunk) => {
317                                if chunk.cardinality() == 0 {
318                                    continue;
319                                }
320                                if let Some(filtered_chunk) = filter_stream_chunk(
321                                    chunk,
322                                    &current_actor_bounds,
323                                    snapshot_split_column_index,
324                                ) && filtered_chunk.cardinality() > 0
325                                {
326                                    yield Message::Chunk(filtered_chunk);
327                                }
328                            }
329                            Message::Watermark(_) => {
330                                // Ignore watermark, like the `CdcBackfillExecutor`.
331                            }
332                        }
333                    }
334                } else {
335                    assert!(table_reader.is_some(), "table reader must created");
336                    tracing::info!(
337                        %table_id,
338                        upstream_table_name,
339                        "table reader created successfully"
340                    );
341                    break;
342                }
343            }
344            let upstream_table_reader = UpstreamTableReader::new(
345                self.external_table.clone(),
346                table_reader.expect("table reader must created"),
347            );
348            // let mut upstream = upstream.peekable();
349            let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
350
351            // Backfill snapshot splits sequentially.
352            for split in actor_snapshot_splits.iter().skip(next_split_idx) {
353                tracing::info!(
354                    %table_id,
355                    upstream_table_name,
356                    ?split,
357                    is_snapshot_paused,
358                    "start cdc backfill split"
359                );
360                extends_current_actor_bound(&mut current_actor_bounds, split);
361
362                let split_cdc_offset_low = {
363                    // Limit concurrent CDC connections globally to 10 using a semaphore.
364                    static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
365                        tokio::sync::Semaphore::const_new(10);
366
367                    let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
368                    upstream_table_reader.current_cdc_offset().await?
369                };
370                if let Some(ref cdc_offset) = split_cdc_offset_low {
371                    if let Some(ref cur) = actor_cdc_offset_low {
372                        if *cur > *cdc_offset {
373                            actor_cdc_offset_low = split_cdc_offset_low.clone();
374                        }
375                    } else {
376                        actor_cdc_offset_low = split_cdc_offset_low.clone();
377                    }
378                }
379                let mut split_cdc_offset_high = None;
380
381                let left_upstream = upstream.by_ref().map(Either::Left);
382                let read_args = SplitSnapshotReadArgs::new(
383                    split.left_bound_inclusive.clone(),
384                    split.right_bound_exclusive.clone(),
385                    cdc_table_snapshot_split_column.clone(),
386                    self.rate_limit_rps,
387                    additional_columns.clone(),
388                    schema_table_name.clone(),
389                    external_database_name.clone(),
390                );
391                let right_snapshot = pin!(
392                    upstream_table_reader
393                        .snapshot_read_table_split(read_args)
394                        .map(Either::Right)
395                );
396                let (right_snapshot, snapshot_valve) = pausable(right_snapshot);
397                if is_snapshot_paused {
398                    snapshot_valve.pause();
399                }
400                let mut backfill_stream =
401                    select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
402                        stream::PollNext::Left
403                    });
404                let mut row_count: u64 = 0;
405                #[for_await]
406                for either in &mut backfill_stream {
407                    match either {
408                        // Upstream
409                        Either::Left(msg) => {
410                            match msg? {
411                                Message::Barrier(barrier) => {
412                                    state_impl.commit_state(barrier.epoch).await?;
413                                    if let Some(mutation) = barrier.mutation.as_deref() {
414                                        use crate::executor::Mutation;
415                                        match mutation {
416                                            Mutation::Pause => {
417                                                is_snapshot_paused = true;
418                                                snapshot_valve.pause();
419                                            }
420                                            Mutation::Resume => {
421                                                is_snapshot_paused = false;
422                                                snapshot_valve.resume();
423                                            }
424                                            Mutation::Throttle(some) => {
425                                                // TODO(zw): optimization: improve throttle.
426                                                // 1. Handle rate limit 0. Currently, to resume the process, the actor must be rebuilt.
427                                                // 2. Apply new rate limit immediately.
428                                                if let Some(new_rate_limit) =
429                                                    some.get(&self.actor_ctx.fragment_id)
430                                                    && *new_rate_limit != self.rate_limit_rps
431                                                {
432                                                    // The new rate limit will take effect since next split.
433                                                    self.rate_limit_rps = *new_rate_limit;
434                                                }
435                                            }
436                                            Mutation::Update(UpdateMutation {
437                                                dropped_actors,
438                                                ..
439                                            }) => {
440                                                if dropped_actors.contains(&self.actor_ctx.id) {
441                                                    tracing::info!(
442                                                        %table_id,
443                                                        upstream_table_name,
444                                                        "CdcBackfill has been dropped due to config change"
445                                                    );
446                                                    for chunk in upstream_chunk_buffer.drain(..) {
447                                                        yield Message::Chunk(chunk);
448                                                    }
449                                                    yield Message::Barrier(barrier);
450                                                    let () = futures::future::pending().await;
451                                                    unreachable!();
452                                                }
453                                            }
454                                            _ => (),
455                                        }
456                                    }
457                                    if is_reset_barrier(&barrier, self.actor_ctx.id) {
458                                        next_reset_barrier = Some(barrier);
459                                        for chunk in upstream_chunk_buffer.drain(..) {
460                                            yield Message::Chunk(chunk);
461                                        }
462                                        continue 'with_cdc_table_snapshot_splits;
463                                    }
464                                    if let Some(split_range) =
465                                        should_report_actor_backfill_progress.take()
466                                        && let Some(ref progress) = self.progress
467                                    {
468                                        progress.update(
469                                            self.actor_ctx.fragment_id,
470                                            self.actor_ctx.id,
471                                            barrier.epoch,
472                                            generation.expect("should have set generation when having progress to report"),
473                                            split_range,
474                                        );
475                                    }
476                                    // emit barrier and continue to consume the backfill stream
477                                    yield Message::Barrier(barrier);
478                                }
479                                Message::Chunk(chunk) => {
480                                    // skip empty upstream chunk
481                                    if chunk.cardinality() == 0 {
482                                        continue;
483                                    }
484
485                                    // TODO(zw): re-enable
486                                    // let chunk_cdc_offset =
487                                    //     get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
488                                    // if *self.external_table.table_type()
489                                    //     == ExternalCdcTableType::Postgres
490                                    //     && let Some(cur) = actor_cdc_offset_low.as_ref()
491                                    //     && let Some(chunk_offset) = chunk_cdc_offset
492                                    //     && chunk_offset < *cur
493                                    // {
494                                    //     continue;
495                                    // }
496
497                                    let chunk = mapping_chunk(chunk, &self.output_indices);
498                                    if let Some(filtered_chunk) = filter_stream_chunk(
499                                        chunk,
500                                        &current_actor_bounds,
501                                        snapshot_split_column_index,
502                                    ) && filtered_chunk.cardinality() > 0
503                                    {
504                                        // Buffer the upstream chunk.
505                                        upstream_chunk_buffer.push(filtered_chunk.compact_vis());
506                                    }
507                                }
508                                Message::Watermark(_) => {
509                                    // Ignore watermark during backfill, like the `CdcBackfillExecutor`.
510                                }
511                            }
512                        }
513                        // Snapshot read
514                        Either::Right(msg) => {
515                            match msg? {
516                                None => {
517                                    tracing::info!(
518                                        %table_id,
519                                        split_id = split.split_id,
520                                        "snapshot read stream ends"
521                                    );
522                                    for chunk in upstream_chunk_buffer.drain(..) {
523                                        yield Message::Chunk(chunk);
524                                    }
525
526                                    split_cdc_offset_high = {
527                                        // Limit concurrent CDC connections globally to 10 using a semaphore.
528                                        static CDC_CONN_SEMAPHORE: tokio::sync::Semaphore =
529                                            tokio::sync::Semaphore::const_new(10);
530
531                                        let _permit = CDC_CONN_SEMAPHORE.acquire().await.unwrap();
532                                        upstream_table_reader.current_cdc_offset().await?
533                                    };
534                                    if let Some(ref cdc_offset) = split_cdc_offset_high {
535                                        if let Some(ref cur) = actor_cdc_offset_high {
536                                            if *cur < *cdc_offset {
537                                                actor_cdc_offset_high =
538                                                    split_cdc_offset_high.clone();
539                                            }
540                                        } else {
541                                            actor_cdc_offset_high = split_cdc_offset_high.clone();
542                                        }
543                                    }
544                                    // Next split.
545                                    break;
546                                }
547                                Some(chunk) => {
548                                    let chunk_cardinality = chunk.cardinality() as u64;
549                                    row_count = row_count.saturating_add(chunk_cardinality);
550                                    yield Message::Chunk(mapping_chunk(
551                                        chunk,
552                                        &self.output_indices,
553                                    ));
554                                }
555                            }
556                        }
557                    }
558                }
559                // Mark current split backfill as finished. The state will be persisted by next barrier.
560                state_impl
561                    .mutate_state(
562                        split.split_id,
563                        true,
564                        row_count,
565                        split_cdc_offset_low,
566                        split_cdc_offset_high,
567                    )
568                    .await?;
569                if let Some((_, right_split)) = &mut should_report_actor_backfill_progress {
570                    assert!(
571                        *right_split < split.split_id,
572                        "{} {}",
573                        *right_split,
574                        split.split_id
575                    );
576                    *right_split = split.split_id;
577                } else {
578                    should_report_actor_backfill_progress = Some((split.split_id, split.split_id));
579                }
580            }
581
582            upstream_table_reader.disconnect().await?;
583            tracing::info!(
584                %table_id,
585                upstream_table_name,
586                "CdcBackfill has already finished and will forward messages directly to the downstream"
587            );
588
589            let mut should_report_actor_backfill_done = false;
590            // After backfill progress finished
591            // we can forward messages directly to the downstream,
592            // as backfill is finished.
593            #[for_await]
594            for msg in &mut upstream {
595                let msg = msg?;
596                match msg {
597                    Message::Barrier(barrier) => {
598                        state_impl.commit_state(barrier.epoch).await?;
599                        if is_reset_barrier(&barrier, self.actor_ctx.id) {
600                            next_reset_barrier = Some(barrier);
601                            continue 'with_cdc_table_snapshot_splits;
602                        }
603                        if let Some(split_range) = should_report_actor_backfill_progress.take()
604                            && let Some(ref progress) = self.progress
605                        {
606                            progress.update(
607                                self.actor_ctx.fragment_id,
608                                self.actor_ctx.id,
609                                barrier.epoch,
610                                generation.expect(
611                                    "should have set generation when having progress to report",
612                                ),
613                                split_range,
614                            );
615                        }
616                        if should_report_actor_backfill_done {
617                            should_report_actor_backfill_done = false;
618                            assert!(!actor_snapshot_splits.is_empty());
619                            if let Some(ref progress) = self.progress {
620                                progress.finish(
621                                    self.actor_ctx.fragment_id,
622                                    self.actor_ctx.id,
623                                    barrier.epoch,
624                                    generation.expect(
625                                        "should have set generation when having progress to report",
626                                    ),
627                                    (
628                                        actor_snapshot_splits[0].split_id,
629                                        actor_snapshot_splits[actor_snapshot_splits.len() - 1]
630                                            .split_id,
631                                    ),
632                                );
633                            }
634                        }
635                        yield Message::Barrier(barrier);
636                    }
637                    Message::Chunk(chunk) => {
638                        if actor_snapshot_splits.is_empty() {
639                            continue;
640                        }
641                        if chunk.cardinality() == 0 {
642                            continue;
643                        }
644
645                        let chunk_cdc_offset =
646                            get_cdc_chunk_last_offset(&offset_parse_func, &chunk)?;
647                        // // TODO(zw): re-enable
648                        // if *self.external_table.table_type() == ExternalCdcTableType::Postgres
649                        //     && let Some(cur) = actor_cdc_offset_low.as_ref()
650                        //     && let Some(ref chunk_offset) = chunk_cdc_offset
651                        //     && *chunk_offset < *cur
652                        // {
653                        //     continue;
654                        // }
655
656                        // should_report_actor_backfill_done is set to true at most once.
657                        if let Some(high) = actor_cdc_offset_high.as_ref() {
658                            if state_impl.is_legacy_state() {
659                                // Since the legacy state does not track CDC offsets, report backfill completion immediately.
660                                actor_cdc_offset_high = None;
661                                should_report_actor_backfill_done = true;
662                            } else if let Some(ref chunk_offset) = chunk_cdc_offset
663                                && *chunk_offset >= *high
664                            {
665                                // Report backfill completion once the latest CDC offset exceeds the highest offset tracked during the backfill.
666                                actor_cdc_offset_high = None;
667                                should_report_actor_backfill_done = true;
668                            }
669                        }
670                        let chunk = mapping_chunk(chunk, &self.output_indices);
671                        if let Some(filtered_chunk) = filter_stream_chunk(
672                            chunk,
673                            &current_actor_bounds,
674                            snapshot_split_column_index,
675                        ) && filtered_chunk.cardinality() > 0
676                        {
677                            yield Message::Chunk(filtered_chunk);
678                        }
679                    }
680                    msg @ Message::Watermark(_) => {
681                        if let Some(msg) = mapping_message(msg, &self.output_indices) {
682                            yield msg;
683                        }
684                    }
685                }
686            }
687        }
688    }
689}
690
691fn filter_stream_chunk(
692    chunk: StreamChunk,
693    bound: &Option<(OwnedRow, OwnedRow)>,
694    snapshot_split_column_index: usize,
695) -> Option<StreamChunk> {
696    let Some((left, right)) = bound else {
697        return None;
698    };
699    assert_eq!(left.len(), 1, "multiple split columns is not supported yet");
700    assert_eq!(
701        right.len(),
702        1,
703        "multiple split columns is not supported yet"
704    );
705    let left_split_key = left.datum_at(0);
706    let right_split_key = right.datum_at(0);
707    let is_leftmost_bound = is_leftmost_bound(left);
708    let is_rightmost_bound = is_rightmost_bound(right);
709    if is_leftmost_bound && is_rightmost_bound {
710        return Some(chunk);
711    }
712    let mut new_bitmap = BitmapBuilder::with_capacity(chunk.capacity());
713    let (ops, columns, visibility) = chunk.into_inner();
714    for (row_split_key, v) in columns[snapshot_split_column_index]
715        .iter()
716        .zip_eq_fast(visibility.iter())
717    {
718        if !v {
719            new_bitmap.append(false);
720            continue;
721        }
722        let mut is_in_range = true;
723        if !is_leftmost_bound {
724            is_in_range = cmp_datum(
725                row_split_key,
726                left_split_key,
727                OrderType::ascending_nulls_first(),
728            )
729            .is_ge();
730        }
731        if is_in_range && !is_rightmost_bound {
732            is_in_range = cmp_datum(
733                row_split_key,
734                right_split_key,
735                OrderType::ascending_nulls_first(),
736            )
737            .is_lt();
738        }
739        if !is_in_range {
740            tracing::trace!(?row_split_key, ?left_split_key, ?right_split_key, snapshot_split_column_index, data_type = ?columns[snapshot_split_column_index].data_type(), "filter out row")
741        }
742        new_bitmap.append(is_in_range);
743    }
744    Some(StreamChunk::with_visibility(
745        ops,
746        columns,
747        new_bitmap.finish(),
748    ))
749}
750
751fn is_leftmost_bound(row: &OwnedRow) -> bool {
752    row.iter().all(|d| d.is_none())
753}
754
755fn is_rightmost_bound(row: &OwnedRow) -> bool {
756    row.iter().all(|d| d.is_none())
757}
758
759impl<S: StateStore> Execute for ParallelizedCdcBackfillExecutor<S> {
760    fn execute(self: Box<Self>) -> BoxedMessageStream {
761        self.execute_inner().boxed()
762    }
763}
764
765fn extends_current_actor_bound(
766    current: &mut Option<(OwnedRow, OwnedRow)>,
767    split: &CdcTableSnapshotSplit,
768) {
769    if current.is_none() {
770        *current = Some((
771            split.left_bound_inclusive.clone(),
772            split.right_bound_exclusive.clone(),
773        ));
774    } else {
775        current.as_mut().unwrap().1 = split.right_bound_exclusive.clone();
776    }
777}
778
779fn is_reset_barrier(barrier: &Barrier, actor_id: ActorId) -> bool {
780    match barrier.mutation.as_deref() {
781        Some(Mutation::Update(update)) => update
782            .actor_cdc_table_snapshot_splits
783            .splits
784            .contains_key(&actor_id),
785        _ => false,
786    }
787}
788
789fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
790    for i in 1..actor_snapshot_splits.len() {
791        assert_eq!(
792            actor_snapshot_splits[i].split_id,
793            actor_snapshot_splits[i - 1].split_id + 1,
794            "{:?}",
795            actor_snapshot_splits
796        );
797        assert!(
798            cmp_datum(
799                actor_snapshot_splits[i - 1]
800                    .right_bound_exclusive
801                    .datum_at(0),
802                actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
803                OrderType::ascending_nulls_last(),
804            )
805            .is_lt()
806        );
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use risingwave_common::array::StreamChunk;
813    use risingwave_common::row::OwnedRow;
814    use risingwave_common::types::ScalarImpl;
815
816    use crate::executor::backfill::cdc::cdc_backill_v2::filter_stream_chunk;
817
818    #[test]
819    fn test_filter_stream_chunk() {
820        use risingwave_common::array::StreamChunkTestExt;
821        let chunk = StreamChunk::from_pretty(
822            "  I I
823             + 1 6
824             - 2 .
825            U- 3 7
826            U+ 4 .",
827        );
828        let bound = None;
829        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
830        assert!(c.is_none());
831
832        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
833        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
834        assert_eq!(c.unwrap().compact_vis(), chunk);
835
836        let bound = Some((
837            OwnedRow::new(vec![None]),
838            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
839        ));
840        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
841        assert_eq!(
842            c.unwrap().compact_vis(),
843            StreamChunk::from_pretty(
844                "  I I
845             + 1 6
846             - 2 .",
847            )
848        );
849
850        let bound = Some((
851            OwnedRow::new(vec![Some(ScalarImpl::Int64(3))]),
852            OwnedRow::new(vec![None]),
853        ));
854        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
855        assert_eq!(
856            c.unwrap().compact_vis(),
857            StreamChunk::from_pretty(
858                "  I I
859            U- 3 7
860            U+ 4 .",
861            )
862        );
863
864        let bound = Some((
865            OwnedRow::new(vec![Some(ScalarImpl::Int64(2))]),
866            OwnedRow::new(vec![Some(ScalarImpl::Int64(4))]),
867        ));
868        let c = filter_stream_chunk(chunk.clone(), &bound, 0);
869        assert_eq!(
870            c.unwrap().compact_vis(),
871            StreamChunk::from_pretty(
872                "  I I
873             - 2 .
874            U- 3 7",
875            )
876        );
877
878        // Test NULL value.
879        let bound = None;
880        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
881        assert!(c.is_none());
882
883        let bound = Some((OwnedRow::new(vec![None]), OwnedRow::new(vec![None])));
884        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
885        assert_eq!(c.unwrap().compact_vis(), chunk);
886
887        let bound = Some((
888            OwnedRow::new(vec![None]),
889            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
890        ));
891        let c = filter_stream_chunk(chunk.clone(), &bound, 1);
892        assert_eq!(
893            c.unwrap().compact_vis(),
894            StreamChunk::from_pretty(
895                "  I I
896             + 1 6
897             - 2 .
898            U+ 4 .",
899            )
900        );
901
902        let bound = Some((
903            OwnedRow::new(vec![Some(ScalarImpl::Int64(7))]),
904            OwnedRow::new(vec![None]),
905        ));
906        let c = filter_stream_chunk(chunk, &bound, 1);
907        assert_eq!(
908            c.unwrap().compact_vis(),
909            StreamChunk::from_pretty(
910                "  I I
911            U- 3 7",
912            )
913        );
914    }
915}