risingwave_stream/from_proto/
stream_cdc_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{Schema, TableId};
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::external::{
21    CdcTableType, ExternalTableConfig, SchemaTableName,
22};
23use risingwave_pb::plan_common::ExternalTableDesc;
24use risingwave_pb::stream_plan::StreamCdcScanNode;
25
26use super::*;
27use crate::common::table::state_table::StateTable;
28use crate::executor::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
29
30pub struct StreamCdcScanExecutorBuilder;
31
32impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
33    type Node = StreamCdcScanNode;
34
35    async fn new_boxed_executor(
36        params: ExecutorParams,
37        node: &Self::Node,
38        state_store: impl StateStore,
39    ) -> StreamResult<Executor> {
40        let [upstream]: [_; 1] = params.input.try_into().unwrap();
41
42        let output_indices = node
43            .output_indices
44            .iter()
45            .map(|&i| i as usize)
46            .collect_vec();
47
48        let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
49
50        let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
51        assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
52        assert_eq!(output_schema.data_types(), params.info.schema.data_types());
53
54        let properties = table_desc.connect_properties.clone();
55
56        let table_pk_order_types = table_desc
57            .pk
58            .iter()
59            .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
60            .collect_vec();
61        let table_pk_indices = table_desc
62            .pk
63            .iter()
64            .map(|k| k.column_index as usize)
65            .collect_vec();
66
67        let scan_options = node
68            .options
69            .as_ref()
70            .map(CdcScanOptions::from_proto)
71            .unwrap_or(CdcScanOptions {
72                disable_backfill: node.disable_backfill,
73                ..Default::default()
74            });
75        let table_type = CdcTableType::from_properties(&properties);
76
77        // Filter out additional columns to construct the external table schema
78        let table_schema: Schema = table_desc
79            .columns
80            .iter()
81            .filter(|col| {
82                col.additional_column
83                    .as_ref()
84                    .is_none_or(|a_col| a_col.column_type.is_none())
85            })
86            .map(Into::into)
87            .collect();
88
89        let schema_table_name = SchemaTableName::from_properties(&properties);
90        let table_config = ExternalTableConfig::try_from_btreemap(
91            properties.clone(),
92            table_desc.secret_refs.clone(),
93        )
94        .context("failed to parse external table config")?;
95
96        let database_name = table_config.database.clone();
97
98        let external_table = ExternalStorageTable::new(
99            TableId::new(table_desc.table_id),
100            schema_table_name,
101            database_name,
102            table_config,
103            table_type,
104            table_schema,
105            table_pk_order_types,
106            table_pk_indices,
107        );
108
109        let vnodes = params.vnode_bitmap.map(Arc::new);
110        // cdc backfill should be singleton, so vnodes must be None.
111        assert_eq!(None, vnodes);
112        let state_table =
113            StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await;
114
115        let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
116        let exec = CdcBackfillExecutor::new(
117            params.actor_context.clone(),
118            external_table,
119            upstream,
120            output_indices,
121            output_columns,
122            None,
123            params.executor_stats,
124            state_table,
125            node.rate_limit,
126            scan_options,
127        );
128        Ok((params.info, exec).into())
129    }
130}