risingwave_stream/from_proto/
mod.rs1mod agg_common;
18mod append_only_dedup;
19mod asof_join;
20mod barrier_recv;
21mod batch_query;
22mod cdc_filter;
23mod changelog;
24mod dml;
25mod dynamic_filter;
26mod eowc_gap_fill;
27mod eowc_over_window;
28mod expand;
29mod filter;
30mod gap_fill;
31mod group_top_n;
32mod hash_agg;
33mod hash_join;
34mod hop_window;
35mod iceberg_with_pk_index;
36mod locality_provider;
37mod lookup;
38mod lookup_union;
39mod materialized_exprs;
40mod merge;
41mod mview;
42mod no_op;
43mod now;
44mod over_window;
45mod project;
46mod project_set;
47mod row_id_gen;
48mod simple_agg;
49mod sink;
50mod sort;
51mod source;
52mod source_backfill;
53mod stateless_simple_agg;
54mod stream_cdc_scan;
55mod stream_scan;
56mod temporal_join;
57mod top_n;
58mod union;
59mod upstream_sink_union;
60mod values;
61mod watermark_filter;
62
63mod row_merge;
64
65mod approx_percentile;
66
67mod sync_log_store;
68mod vector_index_lookup_join;
69mod vector_index_write;
70
71use itertools::Itertools;
73use risingwave_pb::stream_plan::stream_node::NodeBody;
74use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode};
75use risingwave_storage::StateStore;
76
77use self::append_only_dedup::*;
78use self::approx_percentile::global::*;
79use self::approx_percentile::local::*;
80use self::asof_join::AsOfJoinExecutorBuilder;
81use self::barrier_recv::*;
82use self::batch_query::*;
83use self::cdc_filter::CdcFilterExecutorBuilder;
84use self::dml::*;
85use self::dynamic_filter::*;
86use self::eowc_gap_fill::EowcGapFillExecutorBuilder;
87use self::eowc_over_window::*;
88use self::expand::*;
89use self::filter::*;
90use self::gap_fill::GapFillExecutorBuilder;
91use self::group_top_n::GroupTopNExecutorBuilder;
92use self::hash_agg::*;
93use self::hash_join::*;
94use self::hop_window::*;
95use self::iceberg_with_pk_index::*;
96use self::locality_provider::*;
97use self::lookup::*;
98use self::lookup_union::*;
99use self::materialized_exprs::MaterializedExprsExecutorBuilder;
100pub(crate) use self::merge::MergeExecutorBuilder;
101use self::mview::*;
102use self::no_op::*;
103use self::now::NowExecutorBuilder;
104use self::over_window::*;
105use self::project::*;
106use self::project_set::*;
107use self::row_id_gen::RowIdGenExecutorBuilder;
108use self::row_merge::*;
109use self::simple_agg::*;
110use self::sink::*;
111use self::sort::*;
112use self::source::*;
113use self::source_backfill::*;
114use self::stateless_simple_agg::*;
115use self::stream_cdc_scan::*;
116use self::stream_scan::*;
117use self::sync_log_store::*;
118use self::temporal_join::*;
119use self::top_n::*;
120use self::union::*;
121use self::upstream_sink_union::*;
122use self::watermark_filter::WatermarkFilterBuilder;
123use crate::error::StreamResult;
124use crate::executor::{Execute, Executor, ExecutorInfo};
125use crate::from_proto::changelog::ChangeLogExecutorBuilder;
126use crate::from_proto::values::ValuesExecutorBuilder;
127use crate::from_proto::vector_index_lookup_join::VectorIndexLookupJoinBuilder;
128use crate::from_proto::vector_index_write::VectorIndexWriteExecutorBuilder;
129use crate::task::ExecutorParams;
130
131trait ExecutorBuilder {
132 type Node;
133
134 async fn new_boxed_executor(
136 params: ExecutorParams,
137 node: &Self::Node,
138 store: impl StateStore,
139 ) -> StreamResult<Executor>;
140}
141
142macro_rules! build_executor {
143 ($source:expr, $node:expr, $store:expr, $($proto_type_name:path => $data_type:ty),* $(,)?) => {
144 match $node.get_node_body().unwrap() {
145 $(
146 $proto_type_name(node) => {
147 <$data_type>::new_boxed_executor($source, node, $store).await
148 },
149 )*
150 NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!()
151 }
152 }
153}
154
155pub async fn create_executor(
157 params: ExecutorParams,
158 node: &StreamNode,
159 store: impl StateStore,
160) -> StreamResult<Executor> {
161 build_executor! {
162 params,
163 node,
164 store,
165 NodeBody::Source => SourceExecutorBuilder,
166 NodeBody::Sink => SinkExecutorBuilder,
167 NodeBody::Project => ProjectExecutorBuilder,
168 NodeBody::TopN => TopNExecutorBuilder::<false>,
169 NodeBody::AppendOnlyTopN => TopNExecutorBuilder::<true>,
170 NodeBody::StatelessSimpleAgg => StatelessSimpleAggExecutorBuilder,
171 NodeBody::SimpleAgg => SimpleAggExecutorBuilder,
172 NodeBody::HashAgg => HashAggExecutorBuilder,
173 NodeBody::HashJoin => HashJoinExecutorBuilder,
174 NodeBody::HopWindow => HopWindowExecutorBuilder,
175 NodeBody::StreamScan => StreamScanExecutorBuilder,
176 NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder,
177 NodeBody::BatchPlan => BatchQueryExecutorBuilder,
178 NodeBody::Merge => MergeExecutorBuilder,
179 NodeBody::Materialize => MaterializeExecutorBuilder,
180 NodeBody::Filter => FilterExecutorBuilder,
181 NodeBody::CdcFilter => CdcFilterExecutorBuilder,
182 NodeBody::Arrange => ArrangeExecutorBuilder,
183 NodeBody::Lookup => LookupExecutorBuilder,
184 NodeBody::Union => UnionExecutorBuilder,
185 NodeBody::LookupUnion => LookupUnionExecutorBuilder,
186 NodeBody::Expand => ExpandExecutorBuilder,
187 NodeBody::DynamicFilter => DynamicFilterExecutorBuilder,
188 NodeBody::ProjectSet => ProjectSetExecutorBuilder,
189 NodeBody::GroupTopN => GroupTopNExecutorBuilder::<false>,
190 NodeBody::AppendOnlyGroupTopN => GroupTopNExecutorBuilder::<true>,
191 NodeBody::Sort => SortExecutorBuilder,
192 NodeBody::WatermarkFilter => WatermarkFilterBuilder,
193 NodeBody::Dml => DmlExecutorBuilder,
194 NodeBody::RowIdGen => RowIdGenExecutorBuilder,
195 NodeBody::Now => NowExecutorBuilder,
196 NodeBody::TemporalJoin => TemporalJoinExecutorBuilder,
197 NodeBody::Values => ValuesExecutorBuilder,
198 NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
199 NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
200 NodeBody::NoOp => NoOpExecutorBuilder,
201 NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder,
202 NodeBody::OverWindow => OverWindowExecutorBuilder,
203 NodeBody::StreamFsFetch => FsFetchExecutorBuilder,
204 NodeBody::SourceBackfill => SourceBackfillExecutorBuilder,
205 NodeBody::Changelog => ChangeLogExecutorBuilder,
206 NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder,
207 NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
208 NodeBody::RowMerge => RowMergeExecutorBuilder,
209 NodeBody::AsOfJoin => AsOfJoinExecutorBuilder,
210 NodeBody::SyncLogStore => SyncLogStoreExecutorBuilder,
211 NodeBody::MaterializedExprs => MaterializedExprsExecutorBuilder,
212 NodeBody::VectorIndexWrite => VectorIndexWriteExecutorBuilder,
213 NodeBody::UpstreamSinkUnion => UpstreamSinkUnionExecutorBuilder,
214 NodeBody::LocalityProvider => LocalityProviderBuilder,
215 NodeBody::EowcGapFill => EowcGapFillExecutorBuilder,
216 NodeBody::GapFill => GapFillExecutorBuilder,
217 NodeBody::VectorIndexLookupJoin => VectorIndexLookupJoinBuilder,
218 NodeBody::IcebergWithPkIndexWriter => IcebergWithPkIndexWriterExecutorBuilder,
219 NodeBody::IcebergWithPkIndexDvMerger => IcebergWithPkIndexDvMergerExecutorBuilder,
220 }
221}