Skip to main content

risingwave_stream/from_proto/
stream_cdc_scan.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::Schema;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::CdcScanOptions;
21use risingwave_connector::source::cdc::external::{
22    ExternalCdcTableType, ExternalTableConfig, SchemaTableName,
23};
24use risingwave_pb::plan_common::ExternalTableDesc;
25use risingwave_pb::stream_plan::StreamCdcScanNode;
26
27use super::*;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::executor::{CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor};
30use crate::task::cdc_progress::CdcProgressReporter;
31
32pub struct StreamCdcScanExecutorBuilder;
33
34impl_stream_node_body!(StreamCdcScan(StreamCdcScanNode) => StreamCdcScanExecutorBuilder);
35
36impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
37    type Node = StreamCdcScanNode;
38
39    async fn new_boxed_executor(
40        params: ExecutorParams,
41        node: &Self::Node,
42        state_store: impl StateStore,
43    ) -> StreamResult<Executor> {
44        let [upstream]: [_; 1] = params.input.try_into().unwrap();
45
46        let output_indices = node
47            .output_indices
48            .iter()
49            .map(|&i| i as usize)
50            .collect_vec();
51
52        let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
53
54        let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
55        assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
56        assert_eq!(output_schema.data_types(), params.info.schema.data_types());
57
58        let properties = table_desc.connect_properties.clone();
59        let table_pk_order_types = table_desc
60            .pk
61            .iter()
62            .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
63            .collect_vec();
64        let table_pk_indices = table_desc
65            .pk
66            .iter()
67            .map(|k| k.column_index as usize)
68            .collect_vec();
69
70        let scan_options = node
71            .options
72            .as_ref()
73            .map(CdcScanOptions::from_proto)
74            .unwrap_or(CdcScanOptions {
75                disable_backfill: node.disable_backfill,
76                ..Default::default()
77            });
78        let table_type = ExternalCdcTableType::from_properties(&properties);
79        // Filter out additional columns to construct the external table schema
80        let table_schema: Schema = table_desc
81            .columns
82            .iter()
83            .filter(|col| {
84                col.additional_column
85                    .as_ref()
86                    .is_none_or(|a_col| a_col.column_type.is_none())
87            })
88            .map(Into::into)
89            .collect();
90
91        let schema_table_name = SchemaTableName::from_properties(&properties);
92        let table_config = ExternalTableConfig::try_from_btreemap(
93            properties.clone(),
94            table_desc.secret_refs.clone(),
95        )
96        .context("failed to parse external table config")?;
97
98        let database_name = table_config.database.clone();
99
100        let external_table = ExternalStorageTable::new(
101            table_desc.table_id,
102            schema_table_name,
103            database_name,
104            table_config,
105            table_type,
106            table_schema,
107            table_pk_order_types,
108            table_pk_indices,
109        );
110
111        let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
112        if scan_options.is_parallelized_backfill() {
113            // Set state table's vnodes to None to allow splits to be assigned to any actors, without following vnode constraints.
114            let vnodes = None;
115            let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
116                .enable_preload_all_rows_by_config(&params.config)
117                .build()
118                .await;
119            let progress = CdcProgressReporter::new(params.local_barrier_manager.clone());
120            let exec = ParallelizedCdcBackfillExecutor::new(
121                params.actor_context.clone(),
122                external_table,
123                upstream,
124                output_indices,
125                output_columns,
126                params.executor_stats,
127                state_table,
128                node.rate_limit,
129                scan_options,
130                properties,
131                Some(progress),
132            );
133            Ok((params.info, exec).into())
134        } else {
135            let vnodes = params.vnode_bitmap.map(Arc::new);
136            // cdc backfill should be singleton, so vnodes must be None.
137            assert_eq!(None, vnodes);
138            let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
139                .enable_preload_all_rows_by_config(&params.config)
140                .build()
141                .await;
142            let exec = CdcBackfillExecutor::new(
143                params.actor_context.clone(),
144                external_table,
145                upstream,
146                output_indices,
147                output_columns,
148                None,
149                params.executor_stats,
150                state_table,
151                node.rate_limit,
152                scan_options,
153                properties,
154            );
155            Ok((params.info, exec).into())
156        }
157    }
158}