risingwave_frontend/optimizer/plan_node/
to_prost.rs1use anyhow::anyhow;
16use paste::paste;
17use risingwave_pb::batch_plan::plan_node as pb_batch_node;
18use risingwave_pb::stream_plan::stream_node as pb_stream_node;
19
20use super::*;
21use crate::{
22 for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes,
23};
24
25pub trait ToPb: TryToBatchPb + TryToStreamPb {}
26
27pub trait TryToBatchPb {
28 fn try_to_batch_prost_body(&self) -> SchedulerResult<pb_batch_node::NodeBody> {
29 Err(anyhow!(
32 "Node {} cannot be convert to batch node",
33 std::any::type_name::<Self>()
34 )
35 .into())
36 }
37}
38
39pub trait ToBatchPb {
40 fn to_batch_prost_body(&self) -> pb_batch_node::NodeBody;
41}
42
43impl<T: ToBatchPb> TryToBatchPb for T {
44 fn try_to_batch_prost_body(&self) -> SchedulerResult<pb_batch_node::NodeBody> {
45 Ok(self.to_batch_prost_body())
46 }
47}
48
49pub trait TryToStreamPb {
50 fn try_to_stream_prost_body(
51 &self,
52 _state: &mut BuildFragmentGraphState,
53 ) -> SchedulerResult<pb_stream_node::NodeBody> {
54 Err(anyhow!(
57 "Node {} cannot be convert to stream node",
58 std::any::type_name::<Self>()
59 )
60 .into())
61 }
62}
63
64impl<T: StreamNode> TryToStreamPb for T {
65 fn try_to_stream_prost_body(
66 &self,
67 state: &mut BuildFragmentGraphState,
68 ) -> SchedulerResult<pb_stream_node::NodeBody> {
69 Ok(self.to_stream_prost_body(state))
70 }
71}
72
73pub trait StreamNode {
74 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState)
75 -> pb_stream_node::NodeBody;
76}
77
78macro_rules! impl_to_prost {
80 ($( { $convention:ident, $name:ident }),*) => {
81 paste!{
82 $(impl ToPb for [<$convention $name>] { })*
83 }
84 }
85}
86for_all_plan_nodes! { impl_to_prost }
87macro_rules! ban_to_batch_prost {
89 ($( { $convention:ident, $name:ident }),*) => {
90 paste!{
91 $(impl TryToBatchPb for [<$convention $name>] {})*
92 }
93 }
94}
95for_logical_plan_nodes! { ban_to_batch_prost }
96for_stream_plan_nodes! { ban_to_batch_prost }
97macro_rules! ban_to_stream_prost {
99 ($( { $convention:ident, $name:ident }),*) => {
100 paste!{
101 $(impl TryToStreamPb for [<$convention $name>] {})*
102 }
103 }
104}
105for_logical_plan_nodes! { ban_to_stream_prost }
106for_batch_plan_nodes! { ban_to_stream_prost }