risingwave_stream/from_proto/
mod.rs1mod agg_common;
18mod append_only_dedup;
19mod asof_join;
20mod barrier_recv;
21mod batch_query;
22mod cdc_filter;
23mod changelog;
24mod dml;
25mod dynamic_filter;
26mod eowc_gap_fill;
27mod eowc_over_window;
28mod expand;
29mod filter;
30mod gap_fill;
31mod group_top_n;
32mod hash_agg;
33mod hash_join;
34mod hop_window;
35mod locality_provider;
36mod lookup;
37mod lookup_union;
38mod materialized_exprs;
39mod merge;
40mod mview;
41mod no_op;
42mod now;
43mod over_window;
44mod project;
45mod project_set;
46mod row_id_gen;
47mod simple_agg;
48mod sink;
49mod sort;
50mod source;
51mod source_backfill;
52mod stateless_simple_agg;
53mod stream_cdc_scan;
54mod stream_scan;
55mod temporal_join;
56mod top_n;
57mod union;
58mod upstream_sink_union;
59mod values;
60mod watermark_filter;
61
62mod row_merge;
63
64mod approx_percentile;
65
66mod sync_log_store;
67mod vector_index;
68
69use itertools::Itertools;
71use risingwave_pb::stream_plan::stream_node::NodeBody;
72use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode};
73use risingwave_storage::StateStore;
74
75use self::append_only_dedup::*;
76use self::approx_percentile::global::*;
77use self::approx_percentile::local::*;
78use self::asof_join::AsOfJoinExecutorBuilder;
79use self::barrier_recv::*;
80use self::batch_query::*;
81use self::cdc_filter::CdcFilterExecutorBuilder;
82use self::dml::*;
83use self::dynamic_filter::*;
84use self::eowc_gap_fill::EowcGapFillExecutorBuilder;
85use self::eowc_over_window::*;
86use self::expand::*;
87use self::filter::*;
88use self::gap_fill::GapFillExecutorBuilder;
89use self::group_top_n::GroupTopNExecutorBuilder;
90use self::hash_agg::*;
91use self::hash_join::*;
92use self::hop_window::*;
93use self::locality_provider::*;
94use self::lookup::*;
95use self::lookup_union::*;
96use self::materialized_exprs::MaterializedExprsExecutorBuilder;
97pub(crate) use self::merge::MergeExecutorBuilder;
98use self::mview::*;
99use self::no_op::*;
100use self::now::NowExecutorBuilder;
101use self::over_window::*;
102use self::project::*;
103use self::project_set::*;
104use self::row_id_gen::RowIdGenExecutorBuilder;
105use self::row_merge::*;
106use self::simple_agg::*;
107use self::sink::*;
108use self::sort::*;
109use self::source::*;
110use self::source_backfill::*;
111use self::stateless_simple_agg::*;
112use self::stream_cdc_scan::*;
113use self::stream_scan::*;
114use self::sync_log_store::*;
115use self::temporal_join::*;
116use self::top_n::*;
117use self::union::*;
118use self::upstream_sink_union::*;
119use self::watermark_filter::WatermarkFilterBuilder;
120use crate::error::StreamResult;
121use crate::executor::{Execute, Executor, ExecutorInfo};
122use crate::from_proto::changelog::ChangeLogExecutorBuilder;
123use crate::from_proto::values::ValuesExecutorBuilder;
124use crate::from_proto::vector_index::VectorIndexWriteExecutorBuilder;
125use crate::task::ExecutorParams;
126
127trait ExecutorBuilder {
128 type Node;
129
130 async fn new_boxed_executor(
132 params: ExecutorParams,
133 node: &Self::Node,
134 store: impl StateStore,
135 ) -> StreamResult<Executor>;
136}
137
138macro_rules! build_executor {
139 ($source:expr, $node:expr, $store:expr, $($proto_type_name:path => $data_type:ty),* $(,)?) => {
140 match $node.get_node_body().unwrap() {
141 $(
142 $proto_type_name(node) => {
143 <$data_type>::new_boxed_executor($source, node, $store).await
144 },
145 )*
146 NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!()
147 }
148 }
149}
150
151pub async fn create_executor(
153 params: ExecutorParams,
154 node: &StreamNode,
155 store: impl StateStore,
156) -> StreamResult<Executor> {
157 build_executor! {
158 params,
159 node,
160 store,
161 NodeBody::Source => SourceExecutorBuilder,
162 NodeBody::Sink => SinkExecutorBuilder,
163 NodeBody::Project => ProjectExecutorBuilder,
164 NodeBody::TopN => TopNExecutorBuilder::<false>,
165 NodeBody::AppendOnlyTopN => TopNExecutorBuilder::<true>,
166 NodeBody::StatelessSimpleAgg => StatelessSimpleAggExecutorBuilder,
167 NodeBody::SimpleAgg => SimpleAggExecutorBuilder,
168 NodeBody::HashAgg => HashAggExecutorBuilder,
169 NodeBody::HashJoin => HashJoinExecutorBuilder,
170 NodeBody::HopWindow => HopWindowExecutorBuilder,
171 NodeBody::StreamScan => StreamScanExecutorBuilder,
172 NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder,
173 NodeBody::BatchPlan => BatchQueryExecutorBuilder,
174 NodeBody::Merge => MergeExecutorBuilder,
175 NodeBody::Materialize => MaterializeExecutorBuilder,
176 NodeBody::Filter => FilterExecutorBuilder,
177 NodeBody::CdcFilter => CdcFilterExecutorBuilder,
178 NodeBody::Arrange => ArrangeExecutorBuilder,
179 NodeBody::Lookup => LookupExecutorBuilder,
180 NodeBody::Union => UnionExecutorBuilder,
181 NodeBody::LookupUnion => LookupUnionExecutorBuilder,
182 NodeBody::Expand => ExpandExecutorBuilder,
183 NodeBody::DynamicFilter => DynamicFilterExecutorBuilder,
184 NodeBody::ProjectSet => ProjectSetExecutorBuilder,
185 NodeBody::GroupTopN => GroupTopNExecutorBuilder::<false>,
186 NodeBody::AppendOnlyGroupTopN => GroupTopNExecutorBuilder::<true>,
187 NodeBody::Sort => SortExecutorBuilder,
188 NodeBody::WatermarkFilter => WatermarkFilterBuilder,
189 NodeBody::Dml => DmlExecutorBuilder,
190 NodeBody::RowIdGen => RowIdGenExecutorBuilder,
191 NodeBody::Now => NowExecutorBuilder,
192 NodeBody::TemporalJoin => TemporalJoinExecutorBuilder,
193 NodeBody::Values => ValuesExecutorBuilder,
194 NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
195 NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
196 NodeBody::NoOp => NoOpExecutorBuilder,
197 NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder,
198 NodeBody::OverWindow => OverWindowExecutorBuilder,
199 NodeBody::StreamFsFetch => FsFetchExecutorBuilder,
200 NodeBody::SourceBackfill => SourceBackfillExecutorBuilder,
201 NodeBody::Changelog => ChangeLogExecutorBuilder,
202 NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder,
203 NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
204 NodeBody::RowMerge => RowMergeExecutorBuilder,
205 NodeBody::AsOfJoin => AsOfJoinExecutorBuilder,
206 NodeBody::SyncLogStore => SyncLogStoreExecutorBuilder,
207 NodeBody::MaterializedExprs => MaterializedExprsExecutorBuilder,
208 NodeBody::VectorIndexWrite => VectorIndexWriteExecutorBuilder,
209 NodeBody::UpstreamSinkUnion => UpstreamSinkUnionExecutorBuilder,
210 NodeBody::LocalityProvider => LocalityProviderBuilder,
211 NodeBody::EowcGapFill => EowcGapFillExecutorBuilder,
212 NodeBody::GapFill => GapFillExecutorBuilder,
213 }
214}