risingwave_stream/from_proto/
stream_cdc_scan.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::Schema;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::CdcScanOptions;
21use risingwave_connector::source::cdc::external::{
22 ExternalCdcTableType, ExternalTableConfig, SchemaTableName,
23};
24use risingwave_pb::plan_common::ExternalTableDesc;
25use risingwave_pb::stream_plan::StreamCdcScanNode;
26
27use super::*;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::executor::{CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor};
30use crate::task::cdc_progress::CdcProgressReporter;
31
32pub struct StreamCdcScanExecutorBuilder;
33
34impl_stream_node_body!(StreamCdcScan(StreamCdcScanNode) => StreamCdcScanExecutorBuilder);
35
36impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
37 type Node = StreamCdcScanNode;
38
39 async fn new_boxed_executor(
40 params: ExecutorParams,
41 node: &Self::Node,
42 state_store: impl StateStore,
43 ) -> StreamResult<Executor> {
44 let [upstream]: [_; 1] = params.input.try_into().unwrap();
45
46 let output_indices = node
47 .output_indices
48 .iter()
49 .map(|&i| i as usize)
50 .collect_vec();
51
52 let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
53
54 let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
55 assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
56 assert_eq!(output_schema.data_types(), params.info.schema.data_types());
57
58 let properties = table_desc.connect_properties.clone();
59 let table_pk_order_types = table_desc
60 .pk
61 .iter()
62 .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
63 .collect_vec();
64 let table_pk_indices = table_desc
65 .pk
66 .iter()
67 .map(|k| k.column_index as usize)
68 .collect_vec();
69
70 let scan_options = node
71 .options
72 .as_ref()
73 .map(CdcScanOptions::from_proto)
74 .unwrap_or(CdcScanOptions {
75 disable_backfill: node.disable_backfill,
76 ..Default::default()
77 });
78 let table_type = ExternalCdcTableType::from_properties(&properties);
79 let table_schema: Schema = table_desc
81 .columns
82 .iter()
83 .filter(|col| {
84 col.additional_column
85 .as_ref()
86 .is_none_or(|a_col| a_col.column_type.is_none())
87 })
88 .map(Into::into)
89 .collect();
90
91 let schema_table_name = SchemaTableName::from_properties(&properties);
92 let table_config = ExternalTableConfig::try_from_btreemap(
93 properties.clone(),
94 table_desc.secret_refs.clone(),
95 )
96 .context("failed to parse external table config")?;
97
98 let database_name = table_config.database.clone();
99
100 let external_table = ExternalStorageTable::new(
101 table_desc.table_id,
102 schema_table_name,
103 database_name,
104 table_config,
105 table_type,
106 table_schema,
107 table_pk_order_types,
108 table_pk_indices,
109 );
110
111 let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
112 if scan_options.is_parallelized_backfill() {
113 let vnodes = None;
115 let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
116 .enable_preload_all_rows_by_config(¶ms.config)
117 .build()
118 .await;
119 let progress = CdcProgressReporter::new(params.local_barrier_manager.clone());
120 let exec = ParallelizedCdcBackfillExecutor::new(
121 params.actor_context.clone(),
122 external_table,
123 upstream,
124 output_indices,
125 output_columns,
126 params.executor_stats,
127 state_table,
128 node.rate_limit,
129 scan_options,
130 properties,
131 Some(progress),
132 );
133 Ok((params.info, exec).into())
134 } else {
135 let vnodes = params.vnode_bitmap.map(Arc::new);
136 assert_eq!(None, vnodes);
138 let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
139 .enable_preload_all_rows_by_config(¶ms.config)
140 .build()
141 .await;
142 let exec = CdcBackfillExecutor::new(
143 params.actor_context.clone(),
144 external_table,
145 upstream,
146 output_indices,
147 output_columns,
148 None,
149 params.executor_stats,
150 state_table,
151 node.rate_limit,
152 scan_options,
153 properties,
154 );
155 Ok((params.info, exec).into())
156 }
157 }
158}