risingwave_stream/from_proto/
stream_cdc_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{Schema, TableId};
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::CdcScanOptions;
21use risingwave_connector::source::cdc::external::{
22    CdcTableType, ExternalTableConfig, SchemaTableName,
23};
24use risingwave_pb::plan_common::ExternalTableDesc;
25use risingwave_pb::stream_plan::StreamCdcScanNode;
26
27use super::*;
28use crate::common::table::state_table::StateTable;
29use crate::executor::{CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor};
30
31pub struct StreamCdcScanExecutorBuilder;
32
33impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
34    type Node = StreamCdcScanNode;
35
36    async fn new_boxed_executor(
37        params: ExecutorParams,
38        node: &Self::Node,
39        state_store: impl StateStore,
40    ) -> StreamResult<Executor> {
41        let [upstream]: [_; 1] = params.input.try_into().unwrap();
42
43        let output_indices = node
44            .output_indices
45            .iter()
46            .map(|&i| i as usize)
47            .collect_vec();
48
49        let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
50
51        let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
52        assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
53        assert_eq!(output_schema.data_types(), params.info.schema.data_types());
54
55        let properties = table_desc.connect_properties.clone();
56        let table_pk_order_types = table_desc
57            .pk
58            .iter()
59            .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
60            .collect_vec();
61        let table_pk_indices = table_desc
62            .pk
63            .iter()
64            .map(|k| k.column_index as usize)
65            .collect_vec();
66
67        let scan_options = node
68            .options
69            .as_ref()
70            .map(CdcScanOptions::from_proto)
71            .unwrap_or(CdcScanOptions {
72                disable_backfill: node.disable_backfill,
73                ..Default::default()
74            });
75        let table_type = CdcTableType::from_properties(&properties);
76        // Filter out additional columns to construct the external table schema
77        let table_schema: Schema = table_desc
78            .columns
79            .iter()
80            .filter(|col| {
81                col.additional_column
82                    .as_ref()
83                    .is_none_or(|a_col| a_col.column_type.is_none())
84            })
85            .map(Into::into)
86            .collect();
87
88        let schema_table_name = SchemaTableName::from_properties(&properties);
89        let table_config = ExternalTableConfig::try_from_btreemap(
90            properties.clone(),
91            table_desc.secret_refs.clone(),
92        )
93        .context("failed to parse external table config")?;
94
95        let database_name = table_config.database.clone();
96
97        let external_table = ExternalStorageTable::new(
98            TableId::new(table_desc.table_id),
99            schema_table_name,
100            database_name,
101            table_config,
102            table_type,
103            table_schema,
104            table_pk_order_types,
105            table_pk_indices,
106        );
107
108        let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
109        if scan_options.is_parallelized_backfill() {
110            // Set state table's vnodes to None to allow splits to be assigned to any actors, without following vnode constraints.
111            let vnodes = None;
112            let state_table =
113                StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await;
114            let exec = ParallelizedCdcBackfillExecutor::new(
115                params.actor_context.clone(),
116                external_table,
117                upstream,
118                output_indices,
119                output_columns,
120                params.executor_stats,
121                state_table,
122                node.rate_limit,
123                scan_options,
124                properties,
125            );
126            Ok((params.info, exec).into())
127        } else {
128            let vnodes = params.vnode_bitmap.map(Arc::new);
129            // cdc backfill should be singleton, so vnodes must be None.
130            assert_eq!(None, vnodes);
131            let state_table =
132                StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await;
133            let exec = CdcBackfillExecutor::new(
134                params.actor_context.clone(),
135                external_table,
136                upstream,
137                output_indices,
138                output_columns,
139                None,
140                params.executor_stats,
141                state_table,
142                node.rate_limit,
143                scan_options,
144                properties,
145            );
146            Ok((params.info, exec).into())
147        }
148    }
149}