Skip to main content

risingwave_stream/from_proto/source/
fs_fetch.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_connector::WithOptionsSecResolved;
18use risingwave_connector::source::ConnectorProperties;
19use risingwave_connector::source::filesystem::opendal_source::{
20    OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
21};
22use risingwave_connector::source::reader::desc::SourceDescBuilder;
23use risingwave_pb::stream_plan::StreamFsFetchNode;
24use risingwave_storage::StateStore;
25
26use crate::error::StreamResult;
27use crate::executor::source::{
28    BatchAdbcSnowflakeFetchExecutor, BatchIcebergFetchExecutor, BatchPosixFsFetchExecutor,
29    FsFetchExecutor, IcebergFetchExecutor, SourceStateTableHandler, StreamSourceCore,
30};
31use crate::executor::{Execute, Executor};
32use crate::from_proto::ExecutorBuilder;
33use crate::from_proto::source::is_full_reload_refresh;
34use crate::task::ExecutorParams;
35
36pub struct FsFetchExecutorBuilder;
37
38impl_stream_node_body!(StreamFsFetch(StreamFsFetchNode) => FsFetchExecutorBuilder);
39
40impl ExecutorBuilder for FsFetchExecutorBuilder {
41    type Node = StreamFsFetchNode;
42
43    async fn new_boxed_executor(
44        params: ExecutorParams,
45        node: &Self::Node,
46        store: impl StateStore,
47    ) -> StreamResult<Executor> {
48        let [upstream]: [_; 1] = params.input.try_into().unwrap();
49
50        let source = node.node_inner.as_ref().unwrap();
51        let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
52
53        let source_id = source.source_id;
54        let source_name = source.source_name.clone();
55        let source_info = source.get_info()?;
56        let source_options_with_secret =
57            WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
58        let properties = ConnectorProperties::extract(source_options_with_secret.clone(), false)?;
59        let source_desc_builder = SourceDescBuilder::new(
60            source.columns.clone(),
61            params.env.source_metrics(),
62            source.row_id_index.map(|x| x as _),
63            source_options_with_secret,
64            source_info.clone(),
65            params.config.developer.connector_message_buffer_size,
66            params.info.stream_key.clone(),
67        );
68
69        let source_column_ids: Vec<_> = source_desc_builder
70            .column_catalogs_to_source_column_descs()
71            .iter()
72            .map(|column| column.column_id)
73            .collect();
74
75        let vnodes = Some(Arc::new(
76            params
77                .vnode_bitmap
78                .expect("vnodes not set for fetch executor"),
79        ));
80        let state_table_handler = SourceStateTableHandler::from_table_catalog_with_vnodes(
81            source.state_table.as_ref().unwrap(),
82            store.clone(),
83            vnodes,
84        )
85        .await;
86        let stream_source_core = StreamSourceCore::new(
87            source_id,
88            source_name,
89            source_column_ids,
90            source_desc_builder,
91            state_table_handler,
92        );
93
94        let exec = match properties {
95            risingwave_connector::source::ConnectorProperties::Gcs(_) => {
96                FsFetchExecutor::<_, OpendalGcs>::new(
97                    params.actor_context.clone(),
98                    stream_source_core,
99                    upstream,
100                    source.rate_limit,
101                )
102                .boxed()
103            }
104            risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
105                FsFetchExecutor::<_, OpendalS3>::new(
106                    params.actor_context.clone(),
107                    stream_source_core,
108                    upstream,
109                    source.rate_limit,
110                )
111                .boxed()
112            }
113            risingwave_connector::source::ConnectorProperties::Iceberg(_) => {
114                if is_full_reload_refresh {
115                    BatchIcebergFetchExecutor::new(
116                        params.actor_context.clone(),
117                        stream_source_core,
118                        upstream,
119                        params.local_barrier_manager.clone(),
120                        params.config.clone(),
121                        source.associated_table_id,
122                    )
123                    .boxed()
124                } else {
125                    IcebergFetchExecutor::new(
126                        params.actor_context.clone(),
127                        stream_source_core,
128                        upstream,
129                        source.rate_limit,
130                        params.config.clone(),
131                    )
132                    .boxed()
133                }
134            }
135            risingwave_connector::source::ConnectorProperties::Azblob(_) => {
136                FsFetchExecutor::<_, OpendalAzblob>::new(
137                    params.actor_context.clone(),
138                    stream_source_core,
139                    upstream,
140                    source.rate_limit,
141                )
142                .boxed()
143            }
144            risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
145                FsFetchExecutor::<_, OpendalPosixFs>::new(
146                    params.actor_context.clone(),
147                    stream_source_core,
148                    upstream,
149                    source.rate_limit,
150                )
151                .boxed()
152            }
153            risingwave_connector::source::ConnectorProperties::BatchPosixFs(_) => {
154                BatchPosixFsFetchExecutor::new(
155                    params.actor_context.clone(),
156                    stream_source_core,
157                    upstream,
158                    source.rate_limit,
159                    params.local_barrier_manager.clone(),
160                    source.associated_table_id,
161                )
162                .boxed()
163            }
164            risingwave_connector::source::ConnectorProperties::AdbcSnowflake(_) => {
165                if is_full_reload_refresh {
166                    BatchAdbcSnowflakeFetchExecutor::new(
167                        params.actor_context.clone(),
168                        stream_source_core,
169                        upstream,
170                        params.local_barrier_manager.clone(),
171                        source.associated_table_id,
172                    )
173                    .boxed()
174                } else {
175                    unreachable!("AdbcSnowflake connector only supports FULL_RELOAD refresh mode")
176                }
177            }
178            _ => unreachable!(),
179        };
180        Ok((params.info, exec).into())
181    }
182}