risingwave_stream/from_proto/
stream_cdc_scan.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{Schema, TableId};
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::CdcScanOptions;
21use risingwave_connector::source::cdc::external::{
22 CdcTableType, ExternalTableConfig, SchemaTableName,
23};
24use risingwave_pb::plan_common::ExternalTableDesc;
25use risingwave_pb::stream_plan::StreamCdcScanNode;
26
27use super::*;
28use crate::common::table::state_table::StateTable;
29use crate::executor::{CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor};
30
31pub struct StreamCdcScanExecutorBuilder;
32
33impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
34 type Node = StreamCdcScanNode;
35
36 async fn new_boxed_executor(
37 params: ExecutorParams,
38 node: &Self::Node,
39 state_store: impl StateStore,
40 ) -> StreamResult<Executor> {
41 let [upstream]: [_; 1] = params.input.try_into().unwrap();
42
43 let output_indices = node
44 .output_indices
45 .iter()
46 .map(|&i| i as usize)
47 .collect_vec();
48
49 let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
50
51 let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
52 assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
53 assert_eq!(output_schema.data_types(), params.info.schema.data_types());
54
55 let properties = table_desc.connect_properties.clone();
56 let table_pk_order_types = table_desc
57 .pk
58 .iter()
59 .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
60 .collect_vec();
61 let table_pk_indices = table_desc
62 .pk
63 .iter()
64 .map(|k| k.column_index as usize)
65 .collect_vec();
66
67 let scan_options = node
68 .options
69 .as_ref()
70 .map(CdcScanOptions::from_proto)
71 .unwrap_or(CdcScanOptions {
72 disable_backfill: node.disable_backfill,
73 ..Default::default()
74 });
75 let table_type = CdcTableType::from_properties(&properties);
76 let table_schema: Schema = table_desc
78 .columns
79 .iter()
80 .filter(|col| {
81 col.additional_column
82 .as_ref()
83 .is_none_or(|a_col| a_col.column_type.is_none())
84 })
85 .map(Into::into)
86 .collect();
87
88 let schema_table_name = SchemaTableName::from_properties(&properties);
89 let table_config = ExternalTableConfig::try_from_btreemap(
90 properties.clone(),
91 table_desc.secret_refs.clone(),
92 )
93 .context("failed to parse external table config")?;
94
95 let database_name = table_config.database.clone();
96
97 let external_table = ExternalStorageTable::new(
98 TableId::new(table_desc.table_id),
99 schema_table_name,
100 database_name,
101 table_config,
102 table_type,
103 table_schema,
104 table_pk_order_types,
105 table_pk_indices,
106 );
107
108 let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
109 if scan_options.is_parallelized_backfill() {
110 let vnodes = None;
112 let state_table =
113 StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await;
114 let exec = ParallelizedCdcBackfillExecutor::new(
115 params.actor_context.clone(),
116 external_table,
117 upstream,
118 output_indices,
119 output_columns,
120 params.executor_stats,
121 state_table,
122 node.rate_limit,
123 scan_options,
124 properties,
125 );
126 Ok((params.info, exec).into())
127 } else {
128 let vnodes = params.vnode_bitmap.map(Arc::new);
129 assert_eq!(None, vnodes);
131 let state_table =
132 StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await;
133 let exec = CdcBackfillExecutor::new(
134 params.actor_context.clone(),
135 external_table,
136 upstream,
137 output_indices,
138 output_columns,
139 None,
140 params.executor_stats,
141 state_table,
142 node.rate_limit,
143 scan_options,
144 properties,
145 );
146 Ok((params.info, exec).into())
147 }
148 }
149}