risingwave_stream/from_proto/source/
fs_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_connector::WithOptionsSecResolved;
18use risingwave_connector::source::ConnectorProperties;
19use risingwave_connector::source::filesystem::opendal_source::{
20    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
21};
22use risingwave_connector::source::reader::desc::SourceDescBuilder;
23use risingwave_pb::stream_plan::StreamFsFetchNode;
24use risingwave_storage::StateStore;
25
26use crate::error::StreamResult;
27use crate::executor::source::{
28    BatchIcebergFetchExecutor, BatchPosixFsFetchExecutor, FsFetchExecutor, IcebergFetchExecutor,
29    SourceStateTableHandler, StreamSourceCore,
30};
31use crate::executor::{Execute, Executor};
32use crate::from_proto::ExecutorBuilder;
33use crate::from_proto::source::is_full_reload_refresh;
34use crate::task::ExecutorParams;
35
36pub struct FsFetchExecutorBuilder;
37
38impl ExecutorBuilder for FsFetchExecutorBuilder {
39    type Node = StreamFsFetchNode;
40
41    async fn new_boxed_executor(
42        params: ExecutorParams,
43        node: &Self::Node,
44        store: impl StateStore,
45    ) -> StreamResult<Executor> {
46        let [upstream]: [_; 1] = params.input.try_into().unwrap();
47
48        let source = node.node_inner.as_ref().unwrap();
49        let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
50
51        let source_id = source.source_id;
52        let source_name = source.source_name.clone();
53        let source_info = source.get_info()?;
54        let source_options_with_secret =
55            WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
56        let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
57        let source_desc_builder = SourceDescBuilder::new(
58            source.columns.clone(),
59            params.env.source_metrics(),
60            source.row_id_index.map(|x| x as _),
61            source_options_with_secret,
62            source_info.clone(),
63            params.config.developer.connector_message_buffer_size,
64            params.info.stream_key.clone(),
65        );
66
67        let source_column_ids: Vec<_> = source_desc_builder
68            .column_catalogs_to_source_column_descs()
69            .iter()
70            .map(|column| column.column_id)
71            .collect();
72
73        let vnodes = Some(Arc::new(
74            params
75                .vnode_bitmap
76                .expect("vnodes not set for fetch executor"),
77        ));
78        let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
79            source.state_table.as_ref().unwrap(),
80            store.clone(),
81            vnodes,
82        )
83        .await;
84        let stream_source_core = StreamSourceCore::new(
85            source_id,
86            source_name,
87            source_column_ids,
88            source_desc_builder,
89            state_table_handler,
90        );
91
92        let exec = match properties {
93            risingwave_connector::source::ConnectorProperties::Gcs(_) => {
94                FsFetchExecutor::<_, OpendalGcs>::new(
95                    params.actor_context.clone(),
96                    stream_source_core,
97                    upstream,
98                    source.rate_limit,
99                )
100                .boxed()
101            }
102            risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
103                FsFetchExecutor::<_, OpendalS3>::new(
104                    params.actor_context.clone(),
105                    stream_source_core,
106                    upstream,
107                    source.rate_limit,
108                )
109                .boxed()
110            }
111            risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
112                if is_full_reload_refresh {
113                    BatchIcebergFetchExecutor::new(
114                        params.actor_context.clone(),
115                        stream_source_core,
116                        upstream,
117                        params.local_barrier_manager.clone(),
118                        params.config.clone(),
119                        source.associated_table_id,
120                    )
121                    .boxed()
122                } else {
123                    IcebergFetchExecutor::new(
124                        params.actor_context.clone(),
125                        stream_source_core,
126                        upstream,
127                        source.rate_limit,
128                        params.config.clone(),
129                    )
130                    .boxed()
131                }
132            }
133            risingwave_connector::source::ConnectorProperties::Azblob(_) => {
134                FsFetchExecutor::<_, OpendalAzblob>::new(
135                    params.actor_context.clone(),
136                    stream_source_core,
137                    upstream,
138                    source.rate_limit,
139                )
140                .boxed()
141            }
142            risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
143                FsFetchExecutor::<_, OpendalPosixFs>::new(
144                    params.actor_context.clone(),
145                    stream_source_core,
146                    upstream,
147                    source.rate_limit,
148                )
149                .boxed()
150            }
151            risingwave_connector::source::ConnectorProperties::BatchPosixFs(_) => {
152                BatchPosixFsFetchExecutor::new(
153                    params.actor_context.clone(),
154                    stream_source_core,
155                    upstream,
156                    source.rate_limit,
157                    params.local_barrier_manager.clone(),
158                    source.associated_table_id,
159                )
160                .boxed()
161            }
162            _ => unreachable!(),
163        };
164        Ok((params.info, exec).into())
165    }
166}