risingwave_stream/executor/source/batch_source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod batch_posix_fs_list;
16pub use batch_posix_fs_list::*;
17mod batch_posix_fs_fetch;
18pub use batch_posix_fs_fetch::*;
19mod batch_iceberg_list;
20pub use batch_iceberg_list::*;
21mod batch_iceberg_fetch;
22pub use batch_iceberg_fetch::*;
23
24/// Define a stream executor module that is gated by a feature.
25///
26/// This is similar to `feature_gated_source_mod` in the connector crate, allowing heavy or
27/// unpopular source implementations (and their dependencies) to be disabled at compile time
28/// to decrease compilation time and binary size.
29///
30/// When the feature is disabled, this macro generates a dummy executor implementation that
31/// returns an error indicating the feature is not enabled.
32///
33/// # Example
34/// ```ignore
35/// feature_gated_executor_mod!(
36///     batch_adbc_snowflake_list,
37///     BatchAdbcSnowflakeListExecutor<S: StateStore>,
38///     "adbc_snowflake",
39///     (
40///         _actor_ctx: ActorContextRef,
41///         _stream_source_core: StreamSourceCore<S>,
42///         _metrics: Arc<StreamingMetrics>,
43///         _barrier_receiver: UnboundedReceiver<Barrier>,
44///         _barrier_manager: LocalBarrierManager,
45///         _associated_table_id: Option<TableId>,
46///     )
47/// );
48/// ```
49macro_rules! feature_gated_executor_mod {
50    (
51        $mod_name:ident,
52        $executor_name:ident <S: StateStore>,
53        $source_name:literal,
54        ( $( $param_name:ident : $param_type:ty ),* $(,)? )
55    ) => {
56        paste::paste! {
57            #[cfg(feature = "source-" $source_name)]
58            mod $mod_name;
59            #[cfg(feature = "source-" $source_name)]
60            pub use $mod_name::*;
61
62            #[cfg(not(feature = "source-" $source_name))]
63            #[doc = "Dummy implementation for executor when the feature `source-" $source_name "` is not enabled."]
64            mod [<$mod_name _stub>] {
65                #![allow(unused_imports)]
66                use std::sync::Arc;
67
68                use risingwave_common::id::TableId;
69                use risingwave_storage::StateStore;
70                use tokio::sync::mpsc::UnboundedReceiver;
71
72                use crate::executor::prelude::*;
73                use crate::executor::source::StreamSourceCore;
74                use crate::task::LocalBarrierManager;
75
76                fn err_feature_not_enabled() -> StreamExecutorError {
77                    StreamExecutorError::from(anyhow::anyhow!(
78                        "Feature `source-{}` is not enabled at compile time. \
79                        Please enable it in `Cargo.toml` and rebuild.",
80                        $source_name
81                    ))
82                }
83
84                #[doc = "A dummy executor that returns an error, as the feature `source-" $source_name "` is currently not enabled."]
85                pub struct $executor_name<S: StateStore> {
86                    _marker: std::marker::PhantomData<S>,
87                }
88
89                impl<S: StateStore> $executor_name<S> {
90                    #[allow(clippy::too_many_arguments)]
91                    pub fn new( $( $param_name : $param_type ),* ) -> Self {
92                        // Suppress unused variable warnings
93                        $( let _ = $param_name; )*
94                        Self {
95                            _marker: std::marker::PhantomData,
96                        }
97                    }
98                }
99
100                impl<S: StateStore> Execute for $executor_name<S> {
101                    fn execute(self: Box<Self>) -> BoxedMessageStream {
102                        futures::stream::once(async { Err(err_feature_not_enabled()) }).boxed()
103                    }
104                }
105
106                impl<S: StateStore> std::fmt::Debug for $executor_name<S> {
107                    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108                        f.debug_struct(concat!(stringify!($executor_name), " (stub)"))
109                            .finish()
110                    }
111                }
112            }
113            #[cfg(not(feature = "source-" $source_name))]
114            pub use [<$mod_name _stub>]::*;
115        }
116    };
117}
118
119feature_gated_executor_mod!(
120    batch_adbc_snowflake_list,
121    BatchAdbcSnowflakeListExecutor<S: StateStore>,
122    "adbc_snowflake",
123    (
124        _actor_ctx: ActorContextRef,
125        _stream_source_core: StreamSourceCore<S>,
126        _metrics: Arc<StreamingMetrics>,
127        _barrier_receiver: UnboundedReceiver<Barrier>,
128        _barrier_manager: LocalBarrierManager,
129        _associated_table_id: Option<TableId>,
130    )
131);
132
133feature_gated_executor_mod!(
134    batch_adbc_snowflake_fetch,
135    BatchAdbcSnowflakeFetchExecutor<S: StateStore>,
136    "adbc_snowflake",
137    (
138        _actor_ctx: ActorContextRef,
139        _stream_source_core: StreamSourceCore<S>,
140        _upstream: Executor,
141        _barrier_manager: LocalBarrierManager,
142        _associated_table_id: Option<TableId>,
143    )
144);