risingwave_stream/from_proto/
mod.rs1mod agg_common;
18mod append_only_dedup;
19mod asof_join;
20mod barrier_recv;
21mod batch_query;
22mod cdc_filter;
23mod changelog;
24mod dml;
25mod dynamic_filter;
26mod eowc_over_window;
27mod expand;
28mod filter;
29mod group_top_n;
30mod hash_agg;
31mod hash_join;
32mod hop_window;
33mod locality_provider;
34mod lookup;
35mod lookup_union;
36mod materialized_exprs;
37mod merge;
38mod mview;
39mod no_op;
40mod now;
41mod over_window;
42mod project;
43mod project_set;
44mod row_id_gen;
45mod simple_agg;
46mod sink;
47mod sort;
48mod source;
49mod source_backfill;
50mod stateless_simple_agg;
51mod stream_cdc_scan;
52mod stream_scan;
53mod temporal_join;
54mod top_n;
55mod union;
56mod upstream_sink_union;
57mod values;
58mod watermark_filter;
59
60mod row_merge;
61
62mod approx_percentile;
63
64mod sync_log_store;
65mod vector_index;
66
67use itertools::Itertools;
69use risingwave_pb::stream_plan::stream_node::NodeBody;
70use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode};
71use risingwave_storage::StateStore;
72
73use self::append_only_dedup::*;
74use self::approx_percentile::global::*;
75use self::approx_percentile::local::*;
76use self::asof_join::AsOfJoinExecutorBuilder;
77use self::barrier_recv::*;
78use self::batch_query::*;
79use self::cdc_filter::CdcFilterExecutorBuilder;
80use self::dml::*;
81use self::dynamic_filter::*;
82use self::eowc_over_window::*;
83use self::expand::*;
84use self::filter::*;
85use self::group_top_n::GroupTopNExecutorBuilder;
86use self::hash_agg::*;
87use self::hash_join::*;
88use self::hop_window::*;
89use self::locality_provider::*;
90use self::lookup::*;
91use self::lookup_union::*;
92use self::materialized_exprs::MaterializedExprsExecutorBuilder;
93pub(crate) use self::merge::MergeExecutorBuilder;
94use self::mview::*;
95use self::no_op::*;
96use self::now::NowExecutorBuilder;
97use self::over_window::*;
98use self::project::*;
99use self::project_set::*;
100use self::row_id_gen::RowIdGenExecutorBuilder;
101use self::row_merge::*;
102use self::simple_agg::*;
103use self::sink::*;
104use self::sort::*;
105use self::source::*;
106use self::source_backfill::*;
107use self::stateless_simple_agg::*;
108use self::stream_cdc_scan::*;
109use self::stream_scan::*;
110use self::sync_log_store::*;
111use self::temporal_join::*;
112use self::top_n::*;
113use self::union::*;
114use self::upstream_sink_union::*;
115use self::watermark_filter::WatermarkFilterBuilder;
116use crate::error::StreamResult;
117use crate::executor::{Execute, Executor, ExecutorInfo};
118use crate::from_proto::changelog::ChangeLogExecutorBuilder;
119use crate::from_proto::values::ValuesExecutorBuilder;
120use crate::from_proto::vector_index::VectorIndexWriteExecutorBuilder;
121use crate::task::ExecutorParams;
122
123trait ExecutorBuilder {
124 type Node;
125
126 async fn new_boxed_executor(
128 params: ExecutorParams,
129 node: &Self::Node,
130 store: impl StateStore,
131 ) -> StreamResult<Executor>;
132}
133
134macro_rules! build_executor {
135 ($source:expr, $node:expr, $store:expr, $($proto_type_name:path => $data_type:ty),* $(,)?) => {
136 match $node.get_node_body().unwrap() {
137 $(
138 $proto_type_name(node) => {
139 <$data_type>::new_boxed_executor($source, node, $store).await
140 },
141 )*
142 NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!()
143 }
144 }
145}
146
147pub async fn create_executor(
149 params: ExecutorParams,
150 node: &StreamNode,
151 store: impl StateStore,
152) -> StreamResult<Executor> {
153 build_executor! {
154 params,
155 node,
156 store,
157 NodeBody::Source => SourceExecutorBuilder,
158 NodeBody::Sink => SinkExecutorBuilder,
159 NodeBody::Project => ProjectExecutorBuilder,
160 NodeBody::TopN => TopNExecutorBuilder::<false>,
161 NodeBody::AppendOnlyTopN => TopNExecutorBuilder::<true>,
162 NodeBody::StatelessSimpleAgg => StatelessSimpleAggExecutorBuilder,
163 NodeBody::SimpleAgg => SimpleAggExecutorBuilder,
164 NodeBody::HashAgg => HashAggExecutorBuilder,
165 NodeBody::HashJoin => HashJoinExecutorBuilder,
166 NodeBody::HopWindow => HopWindowExecutorBuilder,
167 NodeBody::StreamScan => StreamScanExecutorBuilder,
168 NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder,
169 NodeBody::BatchPlan => BatchQueryExecutorBuilder,
170 NodeBody::Merge => MergeExecutorBuilder,
171 NodeBody::Materialize => MaterializeExecutorBuilder,
172 NodeBody::Filter => FilterExecutorBuilder,
173 NodeBody::CdcFilter => CdcFilterExecutorBuilder,
174 NodeBody::Arrange => ArrangeExecutorBuilder,
175 NodeBody::Lookup => LookupExecutorBuilder,
176 NodeBody::Union => UnionExecutorBuilder,
177 NodeBody::LookupUnion => LookupUnionExecutorBuilder,
178 NodeBody::Expand => ExpandExecutorBuilder,
179 NodeBody::DynamicFilter => DynamicFilterExecutorBuilder,
180 NodeBody::ProjectSet => ProjectSetExecutorBuilder,
181 NodeBody::GroupTopN => GroupTopNExecutorBuilder::<false>,
182 NodeBody::AppendOnlyGroupTopN => GroupTopNExecutorBuilder::<true>,
183 NodeBody::Sort => SortExecutorBuilder,
184 NodeBody::WatermarkFilter => WatermarkFilterBuilder,
185 NodeBody::Dml => DmlExecutorBuilder,
186 NodeBody::RowIdGen => RowIdGenExecutorBuilder,
187 NodeBody::Now => NowExecutorBuilder,
188 NodeBody::TemporalJoin => TemporalJoinExecutorBuilder,
189 NodeBody::Values => ValuesExecutorBuilder,
190 NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
191 NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
192 NodeBody::NoOp => NoOpExecutorBuilder,
193 NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder,
194 NodeBody::OverWindow => OverWindowExecutorBuilder,
195 NodeBody::StreamFsFetch => FsFetchExecutorBuilder,
196 NodeBody::SourceBackfill => SourceBackfillExecutorBuilder,
197 NodeBody::Changelog => ChangeLogExecutorBuilder,
198 NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder,
199 NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
200 NodeBody::RowMerge => RowMergeExecutorBuilder,
201 NodeBody::AsOfJoin => AsOfJoinExecutorBuilder,
202 NodeBody::SyncLogStore => SyncLogStoreExecutorBuilder,
203 NodeBody::MaterializedExprs => MaterializedExprsExecutorBuilder,
204 NodeBody::VectorIndexWrite => VectorIndexWriteExecutorBuilder,
205 NodeBody::UpstreamSinkUnion => UpstreamSinkUnionExecutorBuilder,
206 NodeBody::LocalityProvider => LocalityProviderBuilder,
207 }
208}