risingwave_stream/from_proto/source/
fs_fetch.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::TableId;
18use risingwave_connector::WithOptionsSecResolved;
19use risingwave_connector::source::ConnectorProperties;
20use risingwave_connector::source::filesystem::opendal_source::{
21 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
22};
23use risingwave_connector::source::reader::desc::SourceDescBuilder;
24use risingwave_pb::stream_plan::StreamFsFetchNode;
25use risingwave_storage::StateStore;
26
27use crate::error::StreamResult;
28use crate::executor::source::{
29 FsFetchExecutor, IcebergFetchExecutor, SourceStateTableHandler, StreamSourceCore,
30};
31use crate::executor::{Execute, Executor};
32use crate::from_proto::ExecutorBuilder;
33use crate::task::ExecutorParams;
34
35pub struct FsFetchExecutorBuilder;
36
37impl ExecutorBuilder for FsFetchExecutorBuilder {
38 type Node = StreamFsFetchNode;
39
40 async fn new_boxed_executor(
41 params: ExecutorParams,
42 node: &Self::Node,
43 store: impl StateStore,
44 ) -> StreamResult<Executor> {
45 let [upstream]: [_; 1] = params.input.try_into().unwrap();
46
47 let source = node.node_inner.as_ref().unwrap();
48
49 let source_id = TableId::new(source.source_id);
50 let source_name = source.source_name.clone();
51 let source_info = source.get_info()?;
52 let source_options_with_secret =
53 WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
54 let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
55 let source_desc_builder = SourceDescBuilder::new(
56 source.columns.clone(),
57 params.env.source_metrics(),
58 source.row_id_index.map(|x| x as _),
59 source_options_with_secret,
60 source_info.clone(),
61 params.env.config().developer.connector_message_buffer_size,
62 params.info.pk_indices.clone(),
63 );
64
65 let source_column_ids: Vec<_> = source_desc_builder
66 .column_catalogs_to_source_column_descs()
67 .iter()
68 .map(|column| column.column_id)
69 .collect();
70
71 let vnodes = Some(Arc::new(
72 params
73 .vnode_bitmap
74 .expect("vnodes not set for fetch executor"),
75 ));
76 let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
77 source.state_table.as_ref().unwrap(),
78 store.clone(),
79 vnodes,
80 )
81 .await;
82 let stream_source_core = StreamSourceCore::new(
83 source_id,
84 source_name,
85 source_column_ids,
86 source_desc_builder,
87 state_table_handler,
88 );
89
90 let exec = match properties {
91 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
92 FsFetchExecutor::<_, OpendalGcs>::new(
93 params.actor_context.clone(),
94 stream_source_core,
95 upstream,
96 source.rate_limit,
97 )
98 .boxed()
99 }
100 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
101 FsFetchExecutor::<_, OpendalS3>::new(
102 params.actor_context.clone(),
103 stream_source_core,
104 upstream,
105 source.rate_limit,
106 )
107 .boxed()
108 }
109 risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
110 IcebergFetchExecutor::new(
111 params.actor_context.clone(),
112 stream_source_core,
113 upstream,
114 source.rate_limit,
115 params.env.config().clone(),
116 )
117 .boxed()
118 }
119 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
120 FsFetchExecutor::<_, OpendalAzblob>::new(
121 params.actor_context.clone(),
122 stream_source_core,
123 upstream,
124 source.rate_limit,
125 )
126 .boxed()
127 }
128 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
129 FsFetchExecutor::<_, OpendalPosixFs>::new(
130 params.actor_context.clone(),
131 stream_source_core,
132 upstream,
133 source.rate_limit,
134 )
135 .boxed()
136 }
137 _ => unreachable!(),
138 };
139 Ok((params.info, exec).into())
140 }
141}