risingwave_stream/from_proto/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Build executor from protobuf.
16
17mod agg_common;
18mod append_only_dedup;
19mod asof_join;
20mod barrier_recv;
21mod batch_query;
22mod cdc_filter;
23mod changelog;
24mod dml;
25mod dynamic_filter;
26mod eowc_gap_fill;
27mod eowc_over_window;
28mod expand;
29mod filter;
30mod gap_fill;
31mod group_top_n;
32mod hash_agg;
33mod hash_join;
34mod hop_window;
35mod locality_provider;
36mod lookup;
37mod lookup_union;
38mod materialized_exprs;
39mod merge;
40mod mview;
41mod no_op;
42mod now;
43mod over_window;
44mod project;
45mod project_set;
46mod row_id_gen;
47mod simple_agg;
48mod sink;
49mod sort;
50mod source;
51mod source_backfill;
52mod stateless_simple_agg;
53mod stream_cdc_scan;
54mod stream_scan;
55mod temporal_join;
56mod top_n;
57mod union;
58mod upstream_sink_union;
59mod values;
60mod watermark_filter;
61
62mod row_merge;
63
64mod approx_percentile;
65
66mod sync_log_store;
67mod vector_index_lookup_join;
68mod vector_index_write;
69
70// import for submodules
71use itertools::Itertools;
72use risingwave_pb::stream_plan::stream_node::NodeBody;
73use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode};
74use risingwave_storage::StateStore;
75
76use self::append_only_dedup::*;
77use self::approx_percentile::global::*;
78use self::approx_percentile::local::*;
79use self::asof_join::AsOfJoinExecutorBuilder;
80use self::barrier_recv::*;
81use self::batch_query::*;
82use self::cdc_filter::CdcFilterExecutorBuilder;
83use self::dml::*;
84use self::dynamic_filter::*;
85use self::eowc_gap_fill::EowcGapFillExecutorBuilder;
86use self::eowc_over_window::*;
87use self::expand::*;
88use self::filter::*;
89use self::gap_fill::GapFillExecutorBuilder;
90use self::group_top_n::GroupTopNExecutorBuilder;
91use self::hash_agg::*;
92use self::hash_join::*;
93use self::hop_window::*;
94use self::locality_provider::*;
95use self::lookup::*;
96use self::lookup_union::*;
97use self::materialized_exprs::MaterializedExprsExecutorBuilder;
98pub(crate) use self::merge::MergeExecutorBuilder;
99use self::mview::*;
100use self::no_op::*;
101use self::now::NowExecutorBuilder;
102use self::over_window::*;
103use self::project::*;
104use self::project_set::*;
105use self::row_id_gen::RowIdGenExecutorBuilder;
106use self::row_merge::*;
107use self::simple_agg::*;
108use self::sink::*;
109use self::sort::*;
110use self::source::*;
111use self::source_backfill::*;
112use self::stateless_simple_agg::*;
113use self::stream_cdc_scan::*;
114use self::stream_scan::*;
115use self::sync_log_store::*;
116use self::temporal_join::*;
117use self::top_n::*;
118use self::union::*;
119use self::upstream_sink_union::*;
120use self::watermark_filter::WatermarkFilterBuilder;
121use crate::error::StreamResult;
122use crate::executor::{Execute, Executor, ExecutorInfo};
123use crate::from_proto::changelog::ChangeLogExecutorBuilder;
124use crate::from_proto::values::ValuesExecutorBuilder;
125use crate::from_proto::vector_index_lookup_join::VectorIndexLookupJoinBuilder;
126use crate::from_proto::vector_index_write::VectorIndexWriteExecutorBuilder;
127use crate::task::ExecutorParams;
128
129trait ExecutorBuilder {
130    type Node;
131
132    /// Create an [`Executor`] from [`StreamNode`].
133    async fn new_boxed_executor(
134        params: ExecutorParams,
135        node: &Self::Node,
136        store: impl StateStore,
137    ) -> StreamResult<Executor>;
138}
139
140macro_rules! build_executor {
141    ($source:expr, $node:expr, $store:expr, $($proto_type_name:path => $data_type:ty),* $(,)?) => {
142        match $node.get_node_body().unwrap() {
143            $(
144                $proto_type_name(node) => {
145                    <$data_type>::new_boxed_executor($source, node, $store).await
146                },
147            )*
148            NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!()
149        }
150    }
151}
152
153/// Create an executor from protobuf [`StreamNode`].
154pub async fn create_executor(
155    params: ExecutorParams,
156    node: &StreamNode,
157    store: impl StateStore,
158) -> StreamResult<Executor> {
159    build_executor! {
160        params,
161        node,
162        store,
163        NodeBody::Source => SourceExecutorBuilder,
164        NodeBody::Sink => SinkExecutorBuilder,
165        NodeBody::Project => ProjectExecutorBuilder,
166        NodeBody::TopN => TopNExecutorBuilder::<false>,
167        NodeBody::AppendOnlyTopN => TopNExecutorBuilder::<true>,
168        NodeBody::StatelessSimpleAgg => StatelessSimpleAggExecutorBuilder,
169        NodeBody::SimpleAgg => SimpleAggExecutorBuilder,
170        NodeBody::HashAgg => HashAggExecutorBuilder,
171        NodeBody::HashJoin => HashJoinExecutorBuilder,
172        NodeBody::HopWindow => HopWindowExecutorBuilder,
173        NodeBody::StreamScan => StreamScanExecutorBuilder,
174        NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder,
175        NodeBody::BatchPlan => BatchQueryExecutorBuilder,
176        NodeBody::Merge => MergeExecutorBuilder,
177        NodeBody::Materialize => MaterializeExecutorBuilder,
178        NodeBody::Filter => FilterExecutorBuilder,
179        NodeBody::CdcFilter => CdcFilterExecutorBuilder,
180        NodeBody::Arrange => ArrangeExecutorBuilder,
181        NodeBody::Lookup => LookupExecutorBuilder,
182        NodeBody::Union => UnionExecutorBuilder,
183        NodeBody::LookupUnion => LookupUnionExecutorBuilder,
184        NodeBody::Expand => ExpandExecutorBuilder,
185        NodeBody::DynamicFilter => DynamicFilterExecutorBuilder,
186        NodeBody::ProjectSet => ProjectSetExecutorBuilder,
187        NodeBody::GroupTopN => GroupTopNExecutorBuilder::<false>,
188        NodeBody::AppendOnlyGroupTopN => GroupTopNExecutorBuilder::<true>,
189        NodeBody::Sort => SortExecutorBuilder,
190        NodeBody::WatermarkFilter => WatermarkFilterBuilder,
191        NodeBody::Dml => DmlExecutorBuilder,
192        NodeBody::RowIdGen => RowIdGenExecutorBuilder,
193        NodeBody::Now => NowExecutorBuilder,
194        NodeBody::TemporalJoin => TemporalJoinExecutorBuilder,
195        NodeBody::Values => ValuesExecutorBuilder,
196        NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
197        NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
198        NodeBody::NoOp => NoOpExecutorBuilder,
199        NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder,
200        NodeBody::OverWindow => OverWindowExecutorBuilder,
201        NodeBody::StreamFsFetch => FsFetchExecutorBuilder,
202        NodeBody::SourceBackfill => SourceBackfillExecutorBuilder,
203        NodeBody::Changelog => ChangeLogExecutorBuilder,
204        NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder,
205        NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
206        NodeBody::RowMerge => RowMergeExecutorBuilder,
207        NodeBody::AsOfJoin => AsOfJoinExecutorBuilder,
208        NodeBody::SyncLogStore => SyncLogStoreExecutorBuilder,
209        NodeBody::MaterializedExprs => MaterializedExprsExecutorBuilder,
210        NodeBody::VectorIndexWrite => VectorIndexWriteExecutorBuilder,
211        NodeBody::UpstreamSinkUnion => UpstreamSinkUnionExecutorBuilder,
212        NodeBody::LocalityProvider => LocalityProviderBuilder,
213        NodeBody::EowcGapFill => EowcGapFillExecutorBuilder,
214        NodeBody::GapFill => GapFillExecutorBuilder,
215        NodeBody::VectorIndexLookupJoin => VectorIndexLookupJoinBuilder,
216    }
217}