risingwave_stream/from_proto/
stream_cdc_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{Schema, TableId};
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::CdcScanOptions;
21use risingwave_connector::source::cdc::external::{
22    ExternalCdcTableType, ExternalTableConfig, SchemaTableName,
23};
24use risingwave_pb::plan_common::ExternalTableDesc;
25use risingwave_pb::stream_plan::StreamCdcScanNode;
26
27use super::*;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::executor::{CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor};
30use crate::task::cdc_progress::CdcProgressReporter;
31
32pub struct StreamCdcScanExecutorBuilder;
33
34impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
35    type Node = StreamCdcScanNode;
36
37    async fn new_boxed_executor(
38        params: ExecutorParams,
39        node: &Self::Node,
40        state_store: impl StateStore,
41    ) -> StreamResult<Executor> {
42        let [upstream]: [_; 1] = params.input.try_into().unwrap();
43
44        let output_indices = node
45            .output_indices
46            .iter()
47            .map(|&i| i as usize)
48            .collect_vec();
49
50        let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
51
52        let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
53        assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
54        assert_eq!(output_schema.data_types(), params.info.schema.data_types());
55
56        let properties = table_desc.connect_properties.clone();
57        let table_pk_order_types = table_desc
58            .pk
59            .iter()
60            .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
61            .collect_vec();
62        let table_pk_indices = table_desc
63            .pk
64            .iter()
65            .map(|k| k.column_index as usize)
66            .collect_vec();
67
68        let scan_options = node
69            .options
70            .as_ref()
71            .map(CdcScanOptions::from_proto)
72            .unwrap_or(CdcScanOptions {
73                disable_backfill: node.disable_backfill,
74                ..Default::default()
75            });
76        let table_type = ExternalCdcTableType::from_properties(&properties);
77        // Filter out additional columns to construct the external table schema
78        let table_schema: Schema = table_desc
79            .columns
80            .iter()
81            .filter(|col| {
82                col.additional_column
83                    .as_ref()
84                    .is_none_or(|a_col| a_col.column_type.is_none())
85            })
86            .map(Into::into)
87            .collect();
88
89        let schema_table_name = SchemaTableName::from_properties(&properties);
90        let table_config = ExternalTableConfig::try_from_btreemap(
91            properties.clone(),
92            table_desc.secret_refs.clone(),
93        )
94        .context("failed to parse external table config")?;
95
96        let database_name = table_config.database.clone();
97
98        let external_table = ExternalStorageTable::new(
99            TableId::new(table_desc.table_id),
100            schema_table_name,
101            database_name,
102            table_config,
103            table_type,
104            table_schema,
105            table_pk_order_types,
106            table_pk_indices,
107        );
108
109        let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
110        if scan_options.is_parallelized_backfill() {
111            // Set state table's vnodes to None to allow splits to be assigned to any actors, without following vnode constraints.
112            let vnodes = None;
113            let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
114                .enable_preload_all_rows_by_config(&params.actor_context.streaming_config)
115                .build()
116                .await;
117            let progress = CdcProgressReporter::new(params.local_barrier_manager.clone());
118            let exec = ParallelizedCdcBackfillExecutor::new(
119                params.actor_context.clone(),
120                external_table,
121                upstream,
122                output_indices,
123                output_columns,
124                params.executor_stats,
125                state_table,
126                node.rate_limit,
127                scan_options,
128                properties,
129                Some(progress),
130            );
131            Ok((params.info, exec).into())
132        } else {
133            let vnodes = params.vnode_bitmap.map(Arc::new);
134            // cdc backfill should be singleton, so vnodes must be None.
135            assert_eq!(None, vnodes);
136            let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
137                .enable_preload_all_rows_by_config(&params.actor_context.streaming_config)
138                .build()
139                .await;
140            let exec = CdcBackfillExecutor::new(
141                params.actor_context.clone(),
142                external_table,
143                upstream,
144                output_indices,
145                output_columns,
146                None,
147                params.executor_stats,
148                state_table,
149                node.rate_limit,
150                scan_options,
151                properties,
152            );
153            Ok((params.info, exec).into())
154        }
155    }
156}