risingwave_stream/from_proto/
stream_cdc_scan.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{Schema, TableId};
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::CdcScanOptions;
21use risingwave_connector::source::cdc::external::{
22 ExternalCdcTableType, ExternalTableConfig, SchemaTableName,
23};
24use risingwave_pb::plan_common::ExternalTableDesc;
25use risingwave_pb::stream_plan::StreamCdcScanNode;
26
27use super::*;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::executor::{CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor};
30use crate::task::cdc_progress::CdcProgressReporter;
31
32pub struct StreamCdcScanExecutorBuilder;
33
34impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
35 type Node = StreamCdcScanNode;
36
37 async fn new_boxed_executor(
38 params: ExecutorParams,
39 node: &Self::Node,
40 state_store: impl StateStore,
41 ) -> StreamResult<Executor> {
42 let [upstream]: [_; 1] = params.input.try_into().unwrap();
43
44 let output_indices = node
45 .output_indices
46 .iter()
47 .map(|&i| i as usize)
48 .collect_vec();
49
50 let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
51
52 let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
53 assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
54 assert_eq!(output_schema.data_types(), params.info.schema.data_types());
55
56 let properties = table_desc.connect_properties.clone();
57 let table_pk_order_types = table_desc
58 .pk
59 .iter()
60 .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
61 .collect_vec();
62 let table_pk_indices = table_desc
63 .pk
64 .iter()
65 .map(|k| k.column_index as usize)
66 .collect_vec();
67
68 let scan_options = node
69 .options
70 .as_ref()
71 .map(CdcScanOptions::from_proto)
72 .unwrap_or(CdcScanOptions {
73 disable_backfill: node.disable_backfill,
74 ..Default::default()
75 });
76 let table_type = ExternalCdcTableType::from_properties(&properties);
77 let table_schema: Schema = table_desc
79 .columns
80 .iter()
81 .filter(|col| {
82 col.additional_column
83 .as_ref()
84 .is_none_or(|a_col| a_col.column_type.is_none())
85 })
86 .map(Into::into)
87 .collect();
88
89 let schema_table_name = SchemaTableName::from_properties(&properties);
90 let table_config = ExternalTableConfig::try_from_btreemap(
91 properties.clone(),
92 table_desc.secret_refs.clone(),
93 )
94 .context("failed to parse external table config")?;
95
96 let database_name = table_config.database.clone();
97
98 let external_table = ExternalStorageTable::new(
99 TableId::new(table_desc.table_id),
100 schema_table_name,
101 database_name,
102 table_config,
103 table_type,
104 table_schema,
105 table_pk_order_types,
106 table_pk_indices,
107 );
108
109 let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
110 if scan_options.is_parallelized_backfill() {
111 let vnodes = None;
113 let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
114 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
115 .build()
116 .await;
117 let progress = CdcProgressReporter::new(params.local_barrier_manager.clone());
118 let exec = ParallelizedCdcBackfillExecutor::new(
119 params.actor_context.clone(),
120 external_table,
121 upstream,
122 output_indices,
123 output_columns,
124 params.executor_stats,
125 state_table,
126 node.rate_limit,
127 scan_options,
128 properties,
129 Some(progress),
130 );
131 Ok((params.info, exec).into())
132 } else {
133 let vnodes = params.vnode_bitmap.map(Arc::new);
134 assert_eq!(None, vnodes);
136 let state_table = StateTableBuilder::new(node.get_state_table()?, state_store, vnodes)
137 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
138 .build()
139 .await;
140 let exec = CdcBackfillExecutor::new(
141 params.actor_context.clone(),
142 external_table,
143 upstream,
144 output_indices,
145 output_columns,
146 None,
147 params.executor_stats,
148 state_table,
149 node.rate_limit,
150 scan_options,
151 properties,
152 );
153 Ok((params.info, exec).into())
154 }
155 }
156}