risingwave_stream/from_proto/source/
fs_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::TableId;
18use risingwave_connector::WithOptionsSecResolved;
19use risingwave_connector::source::ConnectorProperties;
20use risingwave_connector::source::filesystem::opendal_source::{
21    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
22};
23use risingwave_connector::source::reader::desc::SourceDescBuilder;
24use risingwave_pb::stream_plan::StreamFsFetchNode;
25use risingwave_storage::StateStore;
26
27use crate::error::StreamResult;
28use crate::executor::source::{
29    FsFetchExecutor, IcebergFetchExecutor, SourceStateTableHandler, StreamSourceCore,
30};
31use crate::executor::{Execute, Executor};
32use crate::from_proto::ExecutorBuilder;
33use crate::task::ExecutorParams;
34
35pub struct FsFetchExecutorBuilder;
36
37impl ExecutorBuilder for FsFetchExecutorBuilder {
38    type Node = StreamFsFetchNode;
39
40    async fn new_boxed_executor(
41        params: ExecutorParams,
42        node: &Self::Node,
43        store: impl StateStore,
44    ) -> StreamResult<Executor> {
45        let [upstream]: [_; 1] = params.input.try_into().unwrap();
46
47        let source = node.node_inner.as_ref().unwrap();
48
49        let source_id = TableId::new(source.source_id);
50        let source_name = source.source_name.clone();
51        let source_info = source.get_info()?;
52        let source_options_with_secret =
53            WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
54        let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
55        let source_desc_builder = SourceDescBuilder::new(
56            source.columns.clone(),
57            params.env.source_metrics(),
58            source.row_id_index.map(|x| x as _),
59            source_options_with_secret,
60            source_info.clone(),
61            params.env.config().developer.connector_message_buffer_size,
62            params.info.pk_indices.clone(),
63        );
64
65        let source_column_ids: Vec<_> = source_desc_builder
66            .column_catalogs_to_source_column_descs()
67            .iter()
68            .map(|column| column.column_id)
69            .collect();
70
71        let vnodes = Some(Arc::new(
72            params
73                .vnode_bitmap
74                .expect("vnodes not set for fetch executor"),
75        ));
76        let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
77            source.state_table.as_ref().unwrap(),
78            store.clone(),
79            vnodes,
80        )
81        .await;
82        let stream_source_core = StreamSourceCore::new(
83            source_id,
84            source_name,
85            source_column_ids,
86            source_desc_builder,
87            state_table_handler,
88        );
89
90        let exec = match properties {
91            risingwave_connector::source::ConnectorProperties::Gcs(_) => {
92                FsFetchExecutor::<_, OpendalGcs>::new(
93                    params.actor_context.clone(),
94                    stream_source_core,
95                    upstream,
96                    source.rate_limit,
97                )
98                .boxed()
99            }
100            risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
101                FsFetchExecutor::<_, OpendalS3>::new(
102                    params.actor_context.clone(),
103                    stream_source_core,
104                    upstream,
105                    source.rate_limit,
106                )
107                .boxed()
108            }
109            risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
110                IcebergFetchExecutor::new(
111                    params.actor_context.clone(),
112                    stream_source_core,
113                    upstream,
114                    source.rate_limit,
115                    params.env.config().clone(),
116                )
117                .boxed()
118            }
119            risingwave_connector::source::ConnectorProperties::Azblob(_) => {
120                FsFetchExecutor::<_, OpendalAzblob>::new(
121                    params.actor_context.clone(),
122                    stream_source_core,
123                    upstream,
124                    source.rate_limit,
125                )
126                .boxed()
127            }
128            risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
129                FsFetchExecutor::<_, OpendalPosixFs>::new(
130                    params.actor_context.clone(),
131                    stream_source_core,
132                    upstream,
133                    source.rate_limit,
134                )
135                .boxed()
136            }
137            _ => unreachable!(),
138        };
139        Ok((params.info, exec).into())
140    }
141}