Skip to main content

risingwave_stream/from_proto/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Build executor from protobuf.
16
17macro_rules! impl_stream_node_body {
18    ($variant:ident($node:ty) => $executor_builder:ty) => {
19        paste::paste! {
20            impl crate::from_proto::StreamNodeBody
21                for risingwave_pb::stream_plan::stream_node::[<$variant Variant>]
22            {
23                type Node = $node;
24                type ExecutorBuilder = $executor_builder;
25            }
26        }
27    };
28}
29
30mod agg_common;
31mod append_only_dedup;
32mod asof_join;
33mod barrier_recv;
34mod batch_query;
35mod cdc_filter;
36mod changelog;
37mod dml;
38mod dynamic_filter;
39mod eowc_gap_fill;
40mod eowc_over_window;
41mod expand;
42mod filter;
43mod gap_fill;
44mod group_top_n;
45mod hash_agg;
46mod hash_join;
47mod hop_window;
48mod iceberg_with_pk_index;
49mod locality_provider;
50mod lookup;
51mod lookup_union;
52mod materialized_exprs;
53mod merge;
54mod mview;
55mod no_op;
56mod now;
57mod over_window;
58mod project;
59mod project_set;
60mod row_id_gen;
61mod simple_agg;
62mod sink;
63mod sort;
64mod source;
65mod source_backfill;
66mod stateless_simple_agg;
67mod stream_cdc_scan;
68mod stream_scan;
69mod temporal_join;
70mod top_n;
71mod union;
72mod upstream_sink_union;
73mod values;
74mod watermark_filter;
75
76mod row_merge;
77
78mod approx_percentile;
79
80mod sync_log_store;
81mod vector_index_lookup_join;
82mod vector_index_write;
83
84// import for submodules
85use itertools::Itertools;
86use risingwave_common::dispatch_stream_node_body;
87use risingwave_pb::stream_plan::{self, StreamNode, TemporalJoinNode};
88use risingwave_storage::StateStore;
89
90pub(crate) use self::merge::MergeExecutorBuilder;
91use crate::error::StreamResult;
92use crate::executor::{Execute, Executor, ExecutorInfo};
93use crate::task::ExecutorParams;
94
95trait ExecutorBuilder {
96    type Node;
97
98    /// Create an [`Executor`] from [`StreamNode`].
99    async fn new_boxed_executor(
100        params: ExecutorParams,
101        node: &Self::Node,
102        store: impl StateStore,
103    ) -> StreamResult<Executor>;
104}
105
106trait StreamNodeBody {
107    type Node;
108    type ExecutorBuilder: ExecutorBuilder<Node = Self::Node>;
109}
110
111struct UnreachableExecutorBuilder<T>(std::marker::PhantomData<T>);
112
113impl<T> ExecutorBuilder for UnreachableExecutorBuilder<T> {
114    type Node = T;
115
116    async fn new_boxed_executor(
117        _params: ExecutorParams,
118        _node: &Self::Node,
119        _store: impl StateStore,
120    ) -> StreamResult<Executor> {
121        unreachable!()
122    }
123}
124
125impl_stream_node_body!(
126    Exchange(stream_plan::ExchangeNode) => UnreachableExecutorBuilder<stream_plan::ExchangeNode>
127);
128impl_stream_node_body!(
129    DeltaIndexJoin(stream_plan::DeltaIndexJoinNode) => UnreachableExecutorBuilder<stream_plan::DeltaIndexJoinNode>
130);
131
132macro_rules! create_executor {
133    ($params:expr, $node:expr, $store:expr) => {
134        dispatch_stream_node_body!($node.get_node_body().unwrap(), NodeVariant, node_body => {
135            <NodeVariant as StreamNodeBody>::ExecutorBuilder::new_boxed_executor(
136                $params, node_body, $store,
137            )
138            .await
139        })
140    };
141}
142
143/// Create an executor from protobuf [`StreamNode`].
144pub async fn create_executor(
145    params: ExecutorParams,
146    node: &StreamNode,
147    store: impl StateStore,
148) -> StreamResult<Executor> {
149    create_executor!(params, node, store)
150}