risingwave_stream/from_proto/source/
fs_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::TableId;
18use risingwave_connector::WithOptionsSecResolved;
19use risingwave_connector::source::ConnectorProperties;
20use risingwave_connector::source::filesystem::opendal_source::{
21    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
22};
23use risingwave_connector::source::reader::desc::SourceDescBuilder;
24use risingwave_pb::stream_plan::StreamFsFetchNode;
25use risingwave_storage::StateStore;
26
27use crate::error::StreamResult;
28use crate::executor::source::{
29    BatchPosixFsFetchExecutor, FsFetchExecutor, IcebergFetchExecutor, SourceStateTableHandler,
30    StreamSourceCore,
31};
32use crate::executor::{Execute, Executor};
33use crate::from_proto::ExecutorBuilder;
34use crate::task::ExecutorParams;
35
36pub struct FsFetchExecutorBuilder;
37
38impl ExecutorBuilder for FsFetchExecutorBuilder {
39    type Node = StreamFsFetchNode;
40
41    async fn new_boxed_executor(
42        params: ExecutorParams,
43        node: &Self::Node,
44        store: impl StateStore,
45    ) -> StreamResult<Executor> {
46        let [upstream]: [_; 1] = params.input.try_into().unwrap();
47
48        let source = node.node_inner.as_ref().unwrap();
49
50        let source_id = TableId::new(source.source_id);
51        let source_name = source.source_name.clone();
52        let source_info = source.get_info()?;
53        let source_options_with_secret =
54            WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
55        let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
56        let source_desc_builder = SourceDescBuilder::new(
57            source.columns.clone(),
58            params.env.source_metrics(),
59            source.row_id_index.map(|x| x as _),
60            source_options_with_secret,
61            source_info.clone(),
62            params.env.config().developer.connector_message_buffer_size,
63            params.info.stream_key.clone(),
64        );
65
66        let source_column_ids: Vec<_> = source_desc_builder
67            .column_catalogs_to_source_column_descs()
68            .iter()
69            .map(|column| column.column_id)
70            .collect();
71
72        let vnodes = Some(Arc::new(
73            params
74                .vnode_bitmap
75                .expect("vnodes not set for fetch executor"),
76        ));
77        let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
78            source.state_table.as_ref().unwrap(),
79            store.clone(),
80            vnodes,
81        )
82        .await;
83        let stream_source_core = StreamSourceCore::new(
84            source_id,
85            source_name,
86            source_column_ids,
87            source_desc_builder,
88            state_table_handler,
89        );
90
91        let exec = match properties {
92            risingwave_connector::source::ConnectorProperties::Gcs(_) => {
93                FsFetchExecutor::<_, OpendalGcs>::new(
94                    params.actor_context.clone(),
95                    stream_source_core,
96                    upstream,
97                    source.rate_limit,
98                )
99                .boxed()
100            }
101            risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
102                FsFetchExecutor::<_, OpendalS3>::new(
103                    params.actor_context.clone(),
104                    stream_source_core,
105                    upstream,
106                    source.rate_limit,
107                )
108                .boxed()
109            }
110            risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
111                IcebergFetchExecutor::new(
112                    params.actor_context.clone(),
113                    stream_source_core,
114                    upstream,
115                    source.rate_limit,
116                    params.env.config().clone(),
117                )
118                .boxed()
119            }
120            risingwave_connector::source::ConnectorProperties::Azblob(_) => {
121                FsFetchExecutor::<_, OpendalAzblob>::new(
122                    params.actor_context.clone(),
123                    stream_source_core,
124                    upstream,
125                    source.rate_limit,
126                )
127                .boxed()
128            }
129            risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
130                FsFetchExecutor::<_, OpendalPosixFs>::new(
131                    params.actor_context.clone(),
132                    stream_source_core,
133                    upstream,
134                    source.rate_limit,
135                )
136                .boxed()
137            }
138            risingwave_connector::source::ConnectorProperties::BatchPosixFs(_) => {
139                BatchPosixFsFetchExecutor::new(
140                    params.actor_context.clone(),
141                    stream_source_core,
142                    upstream,
143                    source.rate_limit,
144                    params.local_barrier_manager.clone(),
145                )
146                .boxed()
147            }
148            _ => unreachable!(),
149        };
150        Ok((params.info, exec).into())
151    }
152}