risingwave_stream/executor/source/batch_source/
mod.rs1mod batch_posix_fs_list;
16pub use batch_posix_fs_list::*;
17mod batch_posix_fs_fetch;
18pub use batch_posix_fs_fetch::*;
19mod batch_iceberg_list;
20pub use batch_iceberg_list::*;
21mod batch_iceberg_fetch;
22pub use batch_iceberg_fetch::*;
23
24macro_rules! feature_gated_executor_mod {
50 (
51 $mod_name:ident,
52 $executor_name:ident <S: StateStore>,
53 $source_name:literal,
54 ( $( $param_name:ident : $param_type:ty ),* $(,)? )
55 ) => {
56 paste::paste! {
57 #[cfg(feature = "source-" $source_name)]
58 mod $mod_name;
59 #[cfg(feature = "source-" $source_name)]
60 pub use $mod_name::*;
61
62 #[cfg(not(feature = "source-" $source_name))]
63 #[doc = "Dummy implementation for executor when the feature `source-" $source_name "` is not enabled."]
64 mod [<$mod_name _stub>] {
65 #![allow(unused_imports)]
66 use std::sync::Arc;
67
68 use risingwave_common::id::TableId;
69 use risingwave_storage::StateStore;
70 use tokio::sync::mpsc::UnboundedReceiver;
71
72 use crate::executor::prelude::*;
73 use crate::executor::source::StreamSourceCore;
74 use crate::task::LocalBarrierManager;
75
76 fn err_feature_not_enabled() -> StreamExecutorError {
77 StreamExecutorError::from(anyhow::anyhow!(
78 "Feature `source-{}` is not enabled at compile time. \
79 Please enable it in `Cargo.toml` and rebuild.",
80 $source_name
81 ))
82 }
83
84 #[doc = "A dummy executor that returns an error, as the feature `source-" $source_name "` is currently not enabled."]
85 pub struct $executor_name<S: StateStore> {
86 _marker: std::marker::PhantomData<S>,
87 }
88
89 impl<S: StateStore> $executor_name<S> {
90 #[allow(clippy::too_many_arguments)]
91 pub fn new( $( $param_name : $param_type ),* ) -> Self {
92 $( let _ = $param_name; )*
94 Self {
95 _marker: std::marker::PhantomData,
96 }
97 }
98 }
99
100 impl<S: StateStore> Execute for $executor_name<S> {
101 fn execute(self: Box<Self>) -> BoxedMessageStream {
102 futures::stream::once(async { Err(err_feature_not_enabled()) }).boxed()
103 }
104 }
105
106 impl<S: StateStore> std::fmt::Debug for $executor_name<S> {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct(concat!(stringify!($executor_name), " (stub)"))
109 .finish()
110 }
111 }
112 }
113 #[cfg(not(feature = "source-" $source_name))]
114 pub use [<$mod_name _stub>]::*;
115 }
116 };
117}
118
119feature_gated_executor_mod!(
120 batch_adbc_snowflake_list,
121 BatchAdbcSnowflakeListExecutor<S: StateStore>,
122 "adbc_snowflake",
123 (
124 _actor_ctx: ActorContextRef,
125 _stream_source_core: StreamSourceCore<S>,
126 _metrics: Arc<StreamingMetrics>,
127 _barrier_receiver: UnboundedReceiver<Barrier>,
128 _barrier_manager: LocalBarrierManager,
129 _associated_table_id: Option<TableId>,
130 )
131);
132
133feature_gated_executor_mod!(
134 batch_adbc_snowflake_fetch,
135 BatchAdbcSnowflakeFetchExecutor<S: StateStore>,
136 "adbc_snowflake",
137 (
138 _actor_ctx: ActorContextRef,
139 _stream_source_core: StreamSourceCore<S>,
140 _upstream: Executor,
141 _barrier_manager: LocalBarrierManager,
142 _associated_table_id: Option<TableId>,
143 )
144);