risingwave_stream/from_proto/source/
fs_fetch.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::TableId;
18use risingwave_connector::WithOptionsSecResolved;
19use risingwave_connector::source::ConnectorProperties;
20use risingwave_connector::source::filesystem::opendal_source::{
21 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
22};
23use risingwave_connector::source::reader::desc::SourceDescBuilder;
24use risingwave_pb::stream_plan::StreamFsFetchNode;
25use risingwave_storage::StateStore;
26
27use crate::error::StreamResult;
28use crate::executor::source::{
29 BatchPosixFsFetchExecutor, FsFetchExecutor, IcebergFetchExecutor, SourceStateTableHandler,
30 StreamSourceCore,
31};
32use crate::executor::{Execute, Executor};
33use crate::from_proto::ExecutorBuilder;
34use crate::task::ExecutorParams;
35
36pub struct FsFetchExecutorBuilder;
37
38impl ExecutorBuilder for FsFetchExecutorBuilder {
39 type Node = StreamFsFetchNode;
40
41 async fn new_boxed_executor(
42 params: ExecutorParams,
43 node: &Self::Node,
44 store: impl StateStore,
45 ) -> StreamResult<Executor> {
46 let [upstream]: [_; 1] = params.input.try_into().unwrap();
47
48 let source = node.node_inner.as_ref().unwrap();
49
50 let source_id = TableId::new(source.source_id);
51 let source_name = source.source_name.clone();
52 let source_info = source.get_info()?;
53 let source_options_with_secret =
54 WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
55 let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
56 let source_desc_builder = SourceDescBuilder::new(
57 source.columns.clone(),
58 params.env.source_metrics(),
59 source.row_id_index.map(|x| x as _),
60 source_options_with_secret,
61 source_info.clone(),
62 params.env.config().developer.connector_message_buffer_size,
63 params.info.stream_key.clone(),
64 );
65
66 let source_column_ids: Vec<_> = source_desc_builder
67 .column_catalogs_to_source_column_descs()
68 .iter()
69 .map(|column| column.column_id)
70 .collect();
71
72 let vnodes = Some(Arc::new(
73 params
74 .vnode_bitmap
75 .expect("vnodes not set for fetch executor"),
76 ));
77 let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
78 source.state_table.as_ref().unwrap(),
79 store.clone(),
80 vnodes,
81 )
82 .await;
83 let stream_source_core = StreamSourceCore::new(
84 source_id,
85 source_name,
86 source_column_ids,
87 source_desc_builder,
88 state_table_handler,
89 );
90
91 let exec = match properties {
92 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
93 FsFetchExecutor::<_, OpendalGcs>::new(
94 params.actor_context.clone(),
95 stream_source_core,
96 upstream,
97 source.rate_limit,
98 )
99 .boxed()
100 }
101 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
102 FsFetchExecutor::<_, OpendalS3>::new(
103 params.actor_context.clone(),
104 stream_source_core,
105 upstream,
106 source.rate_limit,
107 )
108 .boxed()
109 }
110 risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
111 IcebergFetchExecutor::new(
112 params.actor_context.clone(),
113 stream_source_core,
114 upstream,
115 source.rate_limit,
116 params.env.config().clone(),
117 )
118 .boxed()
119 }
120 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
121 FsFetchExecutor::<_, OpendalAzblob>::new(
122 params.actor_context.clone(),
123 stream_source_core,
124 upstream,
125 source.rate_limit,
126 )
127 .boxed()
128 }
129 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
130 FsFetchExecutor::<_, OpendalPosixFs>::new(
131 params.actor_context.clone(),
132 stream_source_core,
133 upstream,
134 source.rate_limit,
135 )
136 .boxed()
137 }
138 risingwave_connector::source::ConnectorProperties::BatchPosixFs(_) => {
139 BatchPosixFsFetchExecutor::new(
140 params.actor_context.clone(),
141 stream_source_core,
142 upstream,
143 source.rate_limit,
144 params.local_barrier_manager.clone(),
145 )
146 .boxed()
147 }
148 _ => unreachable!(),
149 };
150 Ok((params.info, exec).into())
151 }
152}