risingwave_stream/from_proto/
mod.rs1macro_rules! impl_stream_node_body {
18 ($variant:ident($node:ty) => $executor_builder:ty) => {
19 paste::paste! {
20 impl crate::from_proto::StreamNodeBody
21 for risingwave_pb::stream_plan::stream_node::[<$variant Variant>]
22 {
23 type Node = $node;
24 type ExecutorBuilder = $executor_builder;
25 }
26 }
27 };
28}
29
30mod agg_common;
31mod append_only_dedup;
32mod asof_join;
33mod barrier_recv;
34mod batch_query;
35mod cdc_filter;
36mod changelog;
37mod dml;
38mod dynamic_filter;
39mod eowc_gap_fill;
40mod eowc_over_window;
41mod expand;
42mod filter;
43mod gap_fill;
44mod group_top_n;
45mod hash_agg;
46mod hash_join;
47mod hop_window;
48mod iceberg_with_pk_index;
49mod locality_provider;
50mod lookup;
51mod lookup_union;
52mod materialized_exprs;
53mod merge;
54mod mview;
55mod no_op;
56mod now;
57mod over_window;
58mod project;
59mod project_set;
60mod row_id_gen;
61mod simple_agg;
62mod sink;
63mod sort;
64mod source;
65mod source_backfill;
66mod stateless_simple_agg;
67mod stream_cdc_scan;
68mod stream_scan;
69mod temporal_join;
70mod top_n;
71mod union;
72mod upstream_sink_union;
73mod values;
74mod watermark_filter;
75
76mod row_merge;
77
78mod approx_percentile;
79
80mod sync_log_store;
81mod vector_index_lookup_join;
82mod vector_index_write;
83
84use itertools::Itertools;
86use risingwave_common::dispatch_stream_node_body;
87use risingwave_pb::stream_plan::{self, StreamNode, TemporalJoinNode};
88use risingwave_storage::StateStore;
89
90pub(crate) use self::merge::MergeExecutorBuilder;
91use crate::error::StreamResult;
92use crate::executor::{Execute, Executor, ExecutorInfo};
93use crate::task::ExecutorParams;
94
95trait ExecutorBuilder {
96 type Node;
97
98 async fn new_boxed_executor(
100 params: ExecutorParams,
101 node: &Self::Node,
102 store: impl StateStore,
103 ) -> StreamResult<Executor>;
104}
105
106trait StreamNodeBody {
107 type Node;
108 type ExecutorBuilder: ExecutorBuilder<Node = Self::Node>;
109}
110
111struct UnreachableExecutorBuilder<T>(std::marker::PhantomData<T>);
112
113impl<T> ExecutorBuilder for UnreachableExecutorBuilder<T> {
114 type Node = T;
115
116 async fn new_boxed_executor(
117 _params: ExecutorParams,
118 _node: &Self::Node,
119 _store: impl StateStore,
120 ) -> StreamResult<Executor> {
121 unreachable!()
122 }
123}
124
125impl_stream_node_body!(
126 Exchange(stream_plan::ExchangeNode) => UnreachableExecutorBuilder<stream_plan::ExchangeNode>
127);
128impl_stream_node_body!(
129 DeltaIndexJoin(stream_plan::DeltaIndexJoinNode) => UnreachableExecutorBuilder<stream_plan::DeltaIndexJoinNode>
130);
131
132macro_rules! create_executor {
133 ($params:expr, $node:expr, $store:expr) => {
134 dispatch_stream_node_body!($node.get_node_body().unwrap(), NodeVariant, node_body => {
135 <NodeVariant as StreamNodeBody>::ExecutorBuilder::new_boxed_executor(
136 $params, node_body, $store,
137 )
138 .await
139 })
140 };
141}
142
143pub async fn create_executor(
145 params: ExecutorParams,
146 node: &StreamNode,
147 store: impl StateStore,
148) -> StreamResult<Executor> {
149 create_executor!(params, node, store)
150}