risingwave_stream/from_proto/
stream_cdc_scan.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{Schema, TableId};
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::source::cdc::external::{
21 CdcTableType, ExternalTableConfig, SchemaTableName,
22};
23use risingwave_pb::plan_common::ExternalTableDesc;
24use risingwave_pb::stream_plan::StreamCdcScanNode;
25
26use super::*;
27use crate::common::table::state_table::StateTable;
28use crate::executor::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
29
30pub struct StreamCdcScanExecutorBuilder;
31
32impl ExecutorBuilder for StreamCdcScanExecutorBuilder {
33 type Node = StreamCdcScanNode;
34
35 async fn new_boxed_executor(
36 params: ExecutorParams,
37 node: &Self::Node,
38 state_store: impl StateStore,
39 ) -> StreamResult<Executor> {
40 let [upstream]: [_; 1] = params.input.try_into().unwrap();
41
42 let output_indices = node
43 .output_indices
44 .iter()
45 .map(|&i| i as usize)
46 .collect_vec();
47
48 let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?;
49
50 let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect();
51 assert_eq!(output_indices, (0..output_schema.len()).collect_vec());
52 assert_eq!(output_schema.data_types(), params.info.schema.data_types());
53
54 let properties = table_desc.connect_properties.clone();
55
56 let table_pk_order_types = table_desc
57 .pk
58 .iter()
59 .map(|desc| OrderType::from_protobuf(desc.get_order_type().unwrap()))
60 .collect_vec();
61 let table_pk_indices = table_desc
62 .pk
63 .iter()
64 .map(|k| k.column_index as usize)
65 .collect_vec();
66
67 let scan_options = node
68 .options
69 .as_ref()
70 .map(CdcScanOptions::from_proto)
71 .unwrap_or(CdcScanOptions {
72 disable_backfill: node.disable_backfill,
73 ..Default::default()
74 });
75 let table_type = CdcTableType::from_properties(&properties);
76
77 let table_schema: Schema = table_desc
79 .columns
80 .iter()
81 .filter(|col| {
82 col.additional_column
83 .as_ref()
84 .is_none_or(|a_col| a_col.column_type.is_none())
85 })
86 .map(Into::into)
87 .collect();
88
89 let schema_table_name = SchemaTableName::from_properties(&properties);
90 let table_config = ExternalTableConfig::try_from_btreemap(
91 properties.clone(),
92 table_desc.secret_refs.clone(),
93 )
94 .context("failed to parse external table config")?;
95
96 let database_name = table_config.database.clone();
97
98 let external_table = ExternalStorageTable::new(
99 TableId::new(table_desc.table_id),
100 schema_table_name,
101 database_name,
102 table_config,
103 table_type,
104 table_schema,
105 table_pk_order_types,
106 table_pk_indices,
107 );
108
109 let vnodes = params.vnode_bitmap.map(Arc::new);
110 assert_eq!(None, vnodes);
112 let state_table =
113 StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await;
114
115 let output_columns = table_desc.columns.iter().map(Into::into).collect_vec();
116 let exec = CdcBackfillExecutor::new(
117 params.actor_context.clone(),
118 external_table,
119 upstream,
120 output_indices,
121 output_columns,
122 None,
123 params.executor_stats,
124 state_table,
125 node.rate_limit,
126 scan_options,
127 );
128 Ok((params.info, exec).into())
129 }
130}