risingwave_stream/from_proto/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Build executor from protobuf.
16
17mod agg_common;
18mod append_only_dedup;
19mod asof_join;
20mod barrier_recv;
21mod batch_query;
22mod cdc_filter;
23mod changelog;
24mod dml;
25mod dynamic_filter;
26mod eowc_over_window;
27mod expand;
28mod filter;
29mod group_top_n;
30mod hash_agg;
31mod hash_join;
32mod hop_window;
33mod lookup;
34mod lookup_union;
35mod materialized_exprs;
36mod merge;
37mod mview;
38mod no_op;
39mod now;
40mod over_window;
41mod project;
42mod project_set;
43mod row_id_gen;
44mod simple_agg;
45mod sink;
46mod sort;
47mod source;
48mod source_backfill;
49mod stateless_simple_agg;
50mod stream_cdc_scan;
51mod stream_scan;
52mod temporal_join;
53mod top_n;
54mod union;
55mod values;
56mod watermark_filter;
57
58mod row_merge;
59
60mod approx_percentile;
61
62mod sync_log_store;
63
64// import for submodules
65use itertools::Itertools;
66use risingwave_pb::stream_plan::stream_node::NodeBody;
67use risingwave_pb::stream_plan::{StreamNode, TemporalJoinNode};
68use risingwave_storage::StateStore;
69
70use self::append_only_dedup::*;
71use self::approx_percentile::global::*;
72use self::approx_percentile::local::*;
73use self::asof_join::AsOfJoinExecutorBuilder;
74use self::barrier_recv::*;
75use self::batch_query::*;
76use self::cdc_filter::CdcFilterExecutorBuilder;
77use self::dml::*;
78use self::dynamic_filter::*;
79use self::eowc_over_window::*;
80use self::expand::*;
81use self::filter::*;
82use self::group_top_n::GroupTopNExecutorBuilder;
83use self::hash_agg::*;
84use self::hash_join::*;
85use self::hop_window::*;
86use self::lookup::*;
87use self::lookup_union::*;
88use self::materialized_exprs::MaterializedExprsExecutorBuilder;
89pub(crate) use self::merge::MergeExecutorBuilder;
90use self::mview::*;
91use self::no_op::*;
92use self::now::NowExecutorBuilder;
93use self::over_window::*;
94use self::project::*;
95use self::project_set::*;
96use self::row_id_gen::RowIdGenExecutorBuilder;
97use self::row_merge::*;
98use self::simple_agg::*;
99use self::sink::*;
100use self::sort::*;
101use self::source::*;
102use self::source_backfill::*;
103use self::stateless_simple_agg::*;
104use self::stream_cdc_scan::*;
105use self::stream_scan::*;
106use self::sync_log_store::*;
107use self::temporal_join::*;
108use self::top_n::*;
109use self::union::*;
110use self::watermark_filter::WatermarkFilterBuilder;
111use crate::error::StreamResult;
112use crate::executor::{Execute, Executor, ExecutorInfo};
113use crate::from_proto::changelog::ChangeLogExecutorBuilder;
114use crate::from_proto::values::ValuesExecutorBuilder;
115use crate::task::ExecutorParams;
116
117trait ExecutorBuilder {
118    type Node;
119
120    /// Create an [`Executor`] from [`StreamNode`].
121    async fn new_boxed_executor(
122        params: ExecutorParams,
123        node: &Self::Node,
124        store: impl StateStore,
125    ) -> StreamResult<Executor>;
126}
127
128macro_rules! build_executor {
129    ($source:expr, $node:expr, $store:expr, $($proto_type_name:path => $data_type:ty),* $(,)?) => {
130        match $node.get_node_body().unwrap() {
131            $(
132                $proto_type_name(node) => {
133                    <$data_type>::new_boxed_executor($source, node, $store).await
134                },
135            )*
136            NodeBody::Exchange(_) | NodeBody::DeltaIndexJoin(_) => unreachable!()
137        }
138    }
139}
140
141/// Create an executor from protobuf [`StreamNode`].
142pub async fn create_executor(
143    params: ExecutorParams,
144    node: &StreamNode,
145    store: impl StateStore,
146) -> StreamResult<Executor> {
147    build_executor! {
148        params,
149        node,
150        store,
151        NodeBody::Source => SourceExecutorBuilder,
152        NodeBody::Sink => SinkExecutorBuilder,
153        NodeBody::Project => ProjectExecutorBuilder,
154        NodeBody::TopN => TopNExecutorBuilder::<false>,
155        NodeBody::AppendOnlyTopN => TopNExecutorBuilder::<true>,
156        NodeBody::StatelessSimpleAgg => StatelessSimpleAggExecutorBuilder,
157        NodeBody::SimpleAgg => SimpleAggExecutorBuilder,
158        NodeBody::HashAgg => HashAggExecutorBuilder,
159        NodeBody::HashJoin => HashJoinExecutorBuilder,
160        NodeBody::HopWindow => HopWindowExecutorBuilder,
161        NodeBody::StreamScan => StreamScanExecutorBuilder,
162        NodeBody::StreamCdcScan => StreamCdcScanExecutorBuilder,
163        NodeBody::BatchPlan => BatchQueryExecutorBuilder,
164        NodeBody::Merge => MergeExecutorBuilder,
165        NodeBody::Materialize => MaterializeExecutorBuilder,
166        NodeBody::Filter => FilterExecutorBuilder,
167        NodeBody::CdcFilter => CdcFilterExecutorBuilder,
168        NodeBody::Arrange => ArrangeExecutorBuilder,
169        NodeBody::Lookup => LookupExecutorBuilder,
170        NodeBody::Union => UnionExecutorBuilder,
171        NodeBody::LookupUnion => LookupUnionExecutorBuilder,
172        NodeBody::Expand => ExpandExecutorBuilder,
173        NodeBody::DynamicFilter => DynamicFilterExecutorBuilder,
174        NodeBody::ProjectSet => ProjectSetExecutorBuilder,
175        NodeBody::GroupTopN => GroupTopNExecutorBuilder::<false>,
176        NodeBody::AppendOnlyGroupTopN => GroupTopNExecutorBuilder::<true>,
177        NodeBody::Sort => SortExecutorBuilder,
178        NodeBody::WatermarkFilter => WatermarkFilterBuilder,
179        NodeBody::Dml => DmlExecutorBuilder,
180        NodeBody::RowIdGen => RowIdGenExecutorBuilder,
181        NodeBody::Now => NowExecutorBuilder,
182        NodeBody::TemporalJoin => TemporalJoinExecutorBuilder,
183        NodeBody::Values => ValuesExecutorBuilder,
184        NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
185        NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
186        NodeBody::NoOp => NoOpExecutorBuilder,
187        NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder,
188        NodeBody::OverWindow => OverWindowExecutorBuilder,
189        NodeBody::StreamFsFetch => FsFetchExecutorBuilder,
190        NodeBody::SourceBackfill => SourceBackfillExecutorBuilder,
191        NodeBody::Changelog => ChangeLogExecutorBuilder,
192        NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder,
193        NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
194        NodeBody::RowMerge => RowMergeExecutorBuilder,
195        NodeBody::AsOfJoin => AsOfJoinExecutorBuilder,
196        NodeBody::SyncLogStore => SyncLogStoreExecutorBuilder,
197        NodeBody::MaterializedExprs => MaterializedExprsExecutorBuilder,
198    }
199}