risingwave_stream/from_proto/
mod.rs1mod agg_common;
18mod append_only_dedup;
19mod asof_join;
20mod barrier_recv;
21mod batch_query;
22mod cdc_filter;
23mod changelog;
24mod dml;
25mod dynamic_filter;
26mod eowc_over_window;
27mod expand;
28mod filter;
29mod group_top_n;
30mod hash_agg;
31mod hash_join;
32mod hop_window;
33mod lookup;
34mod lookup_union;
35mod materialized_exprs;
36mod merge;
37mod mview;
38mod no_op;
39mod now;
40mod over_window;
41mod project;
42mod project_set;
43mod row_id_gen;
44mod simple_agg;
45mod sink;
46mod sort;
47mod source;
48mod source_backfill;
49mod stateless_simple_agg;
50mod stream_cdc_scan;
51mod stream_scan;
52mod temporal_join;
53mod top_n;
54mod union;
55mod values;
56mod watermark_filter;
57
58mod row_merge;
59
60mod approx_percentile;
61
62mod sync_log_store;
63mod vector_index;
64
65use itertools::Itertools;
67use risingwave_pb::stream_plan::stream_node::NodeBody;
68use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode};
69use risingwave_storage::StateStore;
70
71use self::append_only_dedup::*;
72use self::approx_percentile::global::*;
73use self::approx_percentile::local::*;
74use self::asof_join::AsOfJoinExecutorBuilder;
75use self::barrier_recv::*;
76use self::batch_query::*;
77use self::cdc_filter::CdcFilterExecutorBuilder;
78use self::dml::*;
79use self::dynamic_filter::*;
80use self::eowc_over_window::*;
81use self::expand::*;
82use self::filter::*;
83use self::group_top_n::GroupTopNExecutorBuilder;
84use self::hash_agg::*;
85use self::hash_join::*;
86use self::hop_window::*;
87use self::lookup::*;
88use self::lookup_union::*;
89use self::materialized_exprs::MaterializedExprsExecutorBuilder;
90pub(crate) use self::merge::MergeExecutorBuilder;
91use self::mview::*;
92use self::no_op::*;
93use self::now::NowExecutorBuilder;
94use self::over_window::*;
95use self::project::*;
96use self::project_set::*;
97use self::row_id_gen::RowIdGenExecutorBuilder;
98use self::row_merge::*;
99use self::simple_agg::*;
100use self::sink::*;
101use self::sort::*;
102use self::source::*;
103use self::source_backfill::*;
104use self::stateless_simple_agg::*;
105use self::stream_cdc_scan::*;
106use self::stream_scan::*;
107use self::sync_log_store::*;
108use self::temporal_join::*;
109use self::top_n::*;
110use self::union::*;
111use self::watermark_filter::WatermarkFilterBuilder;
112use crate::error::StreamResult;
113use crate::executor::{Execute, Executor, ExecutorInfo};
114use crate::from_proto::changelog::ChangeLogExecutorBuilder;
115use crate::from_proto::values::ValuesExecutorBuilder;
116use crate::from_proto::vector_index::VectorIndexWriteExecutorBuilder;
117use crate::task::ExecutorParams;
118
119trait ExecutorBuilder {
120 type Node;
121
122 async fn new_boxed_executor(
124 params: ExecutorParams,
125 node: &Self::Node,
126 store: impl StateStore,
127 ) -> StreamResult<Executor>;
128}
129
130macro_rules! build_executor {
131 ($source:expr, $node:expr, $store:expr, $($proto_type_name:path => $data_type:ty),* $(,)?) => {
132 match $node.get_node_body().unwrap() {
133 $(
134 $proto_type_name(node) => {
135 <$data_type>::new_boxed_executor($source, node, $store).await
136 },
137 )*
138 NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!()
139 }
140 }
141}
142
143pub async fn create_executor(
145 params: ExecutorParams,
146 node: &StreamNode,
147 store: impl StateStore,
148) -> StreamResult<Executor> {
149 build_executor! {
150 params,
151 node,
152 store,
153 NodeBody::Source => SourceExecutorBuilder,
154 NodeBody::Sink => SinkExecutorBuilder,
155 NodeBody::Project => ProjectExecutorBuilder,
156 NodeBody::TopN => TopNExecutorBuilder::<false>,
157 NodeBody::AppendOnlyTopN => TopNExecutorBuilder::<true>,
158 NodeBody::StatelessSimpleAgg => StatelessSimpleAggExecutorBuilder,
159 NodeBody::SimpleAgg => SimpleAggExecutorBuilder,
160 NodeBody::HashAgg => HashAggExecutorBuilder,
161 NodeBody::HashJoin => HashJoinExecutorBuilder,
162 NodeBody::HopWindow => HopWindowExecutorBuilder,
163 NodeBody::StreamScan => StreamScanExecutorBuilder,
164 NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder,
165 NodeBody::BatchPlan => BatchQueryExecutorBuilder,
166 NodeBody::Merge => MergeExecutorBuilder,
167 NodeBody::Materialize => MaterializeExecutorBuilder,
168 NodeBody::Filter => FilterExecutorBuilder,
169 NodeBody::CdcFilter => CdcFilterExecutorBuilder,
170 NodeBody::Arrange => ArrangeExecutorBuilder,
171 NodeBody::Lookup => LookupExecutorBuilder,
172 NodeBody::Union => UnionExecutorBuilder,
173 NodeBody::LookupUnion => LookupUnionExecutorBuilder,
174 NodeBody::Expand => ExpandExecutorBuilder,
175 NodeBody::DynamicFilter => DynamicFilterExecutorBuilder,
176 NodeBody::ProjectSet => ProjectSetExecutorBuilder,
177 NodeBody::GroupTopN => GroupTopNExecutorBuilder::<false>,
178 NodeBody::AppendOnlyGroupTopN => GroupTopNExecutorBuilder::<true>,
179 NodeBody::Sort => SortExecutorBuilder,
180 NodeBody::WatermarkFilter => WatermarkFilterBuilder,
181 NodeBody::Dml => DmlExecutorBuilder,
182 NodeBody::RowIdGen => RowIdGenExecutorBuilder,
183 NodeBody::Now => NowExecutorBuilder,
184 NodeBody::TemporalJoin => TemporalJoinExecutorBuilder,
185 NodeBody::Values => ValuesExecutorBuilder,
186 NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
187 NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
188 NodeBody::NoOp => NoOpExecutorBuilder,
189 NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder,
190 NodeBody::OverWindow => OverWindowExecutorBuilder,
191 NodeBody::StreamFsFetch => FsFetchExecutorBuilder,
192 NodeBody::SourceBackfill => SourceBackfillExecutorBuilder,
193 NodeBody::Changelog => ChangeLogExecutorBuilder,
194 NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder,
195 NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
196 NodeBody::RowMerge => RowMergeExecutorBuilder,
197 NodeBody::AsOfJoin => AsOfJoinExecutorBuilder,
198 NodeBody::SyncLogStore => SyncLogStoreExecutorBuilder,
199 NodeBody::MaterializedExprs => MaterializedExprsExecutorBuilder,
200 NodeBody::VectorIndexWrite => VectorIndexWriteExecutorBuilder,
201 }
202}