risingwave_stream/from_proto/source/
fs_fetch.rs1use std::sync::Arc;
16
17use risingwave_connector::WithOptionsSecResolved;
18use risingwave_connector::source::ConnectorProperties;
19use risingwave_connector::source::filesystem::opendal_source::{
20 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
21};
22use risingwave_connector::source::reader::desc::SourceDescBuilder;
23use risingwave_pb::stream_plan::StreamFsFetchNode;
24use risingwave_storage::StateStore;
25
26use crate::error::StreamResult;
27use crate::executor::source::{
28 BatchAdbcSnowflakeFetchExecutor, BatchIcebergFetchExecutor, BatchPosixFsFetchExecutor,
29 FsFetchExecutor, IcebergFetchExecutor, SourceStateTableHandler, StreamSourceCore,
30};
31use crate::executor::{Execute, Executor};
32use crate::from_proto::ExecutorBuilder;
33use crate::from_proto::source::is_full_reload_refresh;
34use crate::task::ExecutorParams;
35
36pub struct FsFetchExecutorBuilder;
37
38impl_stream_node_body!(StreamFsFetch(StreamFsFetchNode) => FsFetchExecutorBuilder);
39
40impl ExecutorBuilder for FsFetchExecutorBuilder {
41 type Node = StreamFsFetchNode;
42
43 async fn new_boxed_executor(
44 params: ExecutorParams,
45 node: &Self::Node,
46 store: impl StateStore,
47 ) -> StreamResult<Executor> {
48 let [upstream]: [_; 1] = params.input.try_into().unwrap();
49
50 let source = node.node_inner.as_ref().unwrap();
51 let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
52
53 let source_id = source.source_id;
54 let source_name = source.source_name.clone();
55 let source_info = source.get_info()?;
56 let source_options_with_secret =
57 WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
58 let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
59 let source_desc_builder = SourceDescBuilder::new(
60 source.columns.clone(),
61 params.env.source_metrics(),
62 source.row_id_index.map(|x| x as _),
63 source_options_with_secret,
64 source_info.clone(),
65 params.config.developer.connector_message_buffer_size,
66 params.info.stream_key.clone(),
67 );
68
69 let source_column_ids: Vec<_> = source_desc_builder
70 .column_catalogs_to_source_column_descs()
71 .iter()
72 .map(|column| column.column_id)
73 .collect();
74
75 let vnodes = Some(Arc::new(
76 params
77 .vnode_bitmap
78 .expect("vnodes not set for fetch executor"),
79 ));
80 let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
81 source.state_table.as_ref().unwrap(),
82 store.clone(),
83 vnodes,
84 )
85 .await;
86 let stream_source_core = StreamSourceCore::new(
87 source_id,
88 source_name,
89 source_column_ids,
90 source_desc_builder,
91 state_table_handler,
92 );
93
94 let exec = match properties {
95 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
96 FsFetchExecutor::<_, OpendalGcs>::new(
97 params.actor_context.clone(),
98 stream_source_core,
99 upstream,
100 source.rate_limit,
101 )
102 .boxed()
103 }
104 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
105 FsFetchExecutor::<_, OpendalS3>::new(
106 params.actor_context.clone(),
107 stream_source_core,
108 upstream,
109 source.rate_limit,
110 )
111 .boxed()
112 }
113 risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
114 if is_full_reload_refresh {
115 BatchIcebergFetchExecutor::new(
116 params.actor_context.clone(),
117 stream_source_core,
118 upstream,
119 params.local_barrier_manager.clone(),
120 params.config.clone(),
121 source.associated_table_id,
122 )
123 .boxed()
124 } else {
125 IcebergFetchExecutor::new(
126 params.actor_context.clone(),
127 stream_source_core,
128 upstream,
129 source.rate_limit,
130 params.config.clone(),
131 )
132 .boxed()
133 }
134 }
135 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
136 FsFetchExecutor::<_, OpendalAzblob>::new(
137 params.actor_context.clone(),
138 stream_source_core,
139 upstream,
140 source.rate_limit,
141 )
142 .boxed()
143 }
144 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
145 FsFetchExecutor::<_, OpendalPosixFs>::new(
146 params.actor_context.clone(),
147 stream_source_core,
148 upstream,
149 source.rate_limit,
150 )
151 .boxed()
152 }
153 risingwave_connector::source::ConnectorProperties::BatchPosixFs(_) => {
154 BatchPosixFsFetchExecutor::new(
155 params.actor_context.clone(),
156 stream_source_core,
157 upstream,
158 source.rate_limit,
159 params.local_barrier_manager.clone(),
160 source.associated_table_id,
161 )
162 .boxed()
163 }
164 risingwave_connector::source::ConnectorProperties::AdbcSnowflake(_) => {
165 if is_full_reload_refresh {
166 BatchAdbcSnowflakeFetchExecutor::new(
167 params.actor_context.clone(),
168 stream_source_core,
169 upstream,
170 params.local_barrier_manager.clone(),
171 source.associated_table_id,
172 )
173 .boxed()
174 } else {
175 unreachable!("AdbcSnowflake connector only supports FULL_RELOAD refresh mode")
176 }
177 }
178 _ => unreachable!(),
179 };
180 Ok((params.info, exec).into())
181 }
182}