risingwave_stream/from_proto/source/
fs_fetch.rs1use std::sync::Arc;
16
17use risingwave_connector::WithOptionsSecResolved;
18use risingwave_connector::source::ConnectorProperties;
19use risingwave_connector::source::filesystem::opendal_source::{
20 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
21};
22use risingwave_connector::source::reader::desc::SourceDescBuilder;
23use risingwave_pb::stream_plan::StreamFsFetchNode;
24use risingwave_storage::StateStore;
25
26use crate::error::StreamResult;
27use crate::executor::source::{
28 BatchIcebergFetchExecutor, BatchPosixFsFetchExecutor, FsFetchExecutor, IcebergFetchExecutor,
29 SourceStateTableHandler, StreamSourceCore,
30};
31use crate::executor::{Execute, Executor};
32use crate::from_proto::ExecutorBuilder;
33use crate::from_proto::source::is_full_reload_refresh;
34use crate::task::ExecutorParams;
35
36pub struct FsFetchExecutorBuilder;
37
38impl ExecutorBuilder for FsFetchExecutorBuilder {
39 type Node = StreamFsFetchNode;
40
41 async fn new_boxed_executor(
42 params: ExecutorParams,
43 node: &Self::Node,
44 store: impl StateStore,
45 ) -> StreamResult<Executor> {
46 let [upstream]: [_; 1] = params.input.try_into().unwrap();
47
48 let source = node.node_inner.as_ref().unwrap();
49 let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
50
51 let source_id = source.source_id;
52 let source_name = source.source_name.clone();
53 let source_info = source.get_info()?;
54 let source_options_with_secret =
55 WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
56 let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
57 let source_desc_builder = SourceDescBuilder::new(
58 source.columns.clone(),
59 params.env.source_metrics(),
60 source.row_id_index.map(|x| x as _),
61 source_options_with_secret,
62 source_info.clone(),
63 params.config.developer.connector_message_buffer_size,
64 params.info.stream_key.clone(),
65 );
66
67 let source_column_ids: Vec<_> = source_desc_builder
68 .column_catalogs_to_source_column_descs()
69 .iter()
70 .map(|column| column.column_id)
71 .collect();
72
73 let vnodes = Some(Arc::new(
74 params
75 .vnode_bitmap
76 .expect("vnodes not set for fetch executor"),
77 ));
78 let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
79 source.state_table.as_ref().unwrap(),
80 store.clone(),
81 vnodes,
82 )
83 .await;
84 let stream_source_core = StreamSourceCore::new(
85 source_id,
86 source_name,
87 source_column_ids,
88 source_desc_builder,
89 state_table_handler,
90 );
91
92 let exec = match properties {
93 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
94 FsFetchExecutor::<_, OpendalGcs>::new(
95 params.actor_context.clone(),
96 stream_source_core,
97 upstream,
98 source.rate_limit,
99 )
100 .boxed()
101 }
102 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
103 FsFetchExecutor::<_, OpendalS3>::new(
104 params.actor_context.clone(),
105 stream_source_core,
106 upstream,
107 source.rate_limit,
108 )
109 .boxed()
110 }
111 risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
112 if is_full_reload_refresh {
113 BatchIcebergFetchExecutor::new(
114 params.actor_context.clone(),
115 stream_source_core,
116 upstream,
117 params.local_barrier_manager.clone(),
118 params.config.clone(),
119 source.associated_table_id,
120 )
121 .boxed()
122 } else {
123 IcebergFetchExecutor::new(
124 params.actor_context.clone(),
125 stream_source_core,
126 upstream,
127 source.rate_limit,
128 params.config.clone(),
129 )
130 .boxed()
131 }
132 }
133 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
134 FsFetchExecutor::<_, OpendalAzblob>::new(
135 params.actor_context.clone(),
136 stream_source_core,
137 upstream,
138 source.rate_limit,
139 )
140 .boxed()
141 }
142 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
143 FsFetchExecutor::<_, OpendalPosixFs>::new(
144 params.actor_context.clone(),
145 stream_source_core,
146 upstream,
147 source.rate_limit,
148 )
149 .boxed()
150 }
151 risingwave_connector::source::ConnectorProperties::BatchPosixFs(_) => {
152 BatchPosixFsFetchExecutor::new(
153 params.actor_context.clone(),
154 stream_source_core,
155 upstream,
156 source.rate_limit,
157 params.local_barrier_manager.clone(),
158 source.associated_table_id,
159 )
160 .boxed()
161 }
162 _ => unreachable!(),
163 };
164 Ok((params.info, exec).into())
165 }
166}