risingwave_frontend/optimizer/plan_node/
to_prost.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use paste::paste;
17use risingwave_pb::batch_plan::plan_node as pb_batch_node;
18use risingwave_pb::stream_plan::stream_node as pb_stream_node;
19
20use super::*;
21use crate::{
22    for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes,
23};
24
25pub trait ToPb: TryToBatchPb + TryToStreamPb {}
26
27pub trait TryToBatchPb {
28    fn try_to_batch_prost_body(&self) -> SchedulerResult<pb_batch_node::NodeBody> {
29        // Originally we panic in the following way
30        // panic!("convert into distributed is only allowed on batch plan")
31        Err(anyhow!(
32            "Node {} cannot be convert to batch node",
33            std::any::type_name::<Self>()
34        )
35        .into())
36    }
37}
38
39pub trait ToBatchPb {
40    fn to_batch_prost_body(&self) -> pb_batch_node::NodeBody;
41}
42
43impl<T: ToBatchPb> TryToBatchPb for T {
44    fn try_to_batch_prost_body(&self) -> SchedulerResult<pb_batch_node::NodeBody> {
45        Ok(self.to_batch_prost_body())
46    }
47}
48
49pub trait TryToStreamPb {
50    fn try_to_stream_prost_body(
51        &self,
52        _state: &mut BuildFragmentGraphState,
53    ) -> SchedulerResult<pb_stream_node::NodeBody> {
54        // Originally we panic in the following way
55        // panic!("convert into distributed is only allowed on stream plan")
56        Err(anyhow!(
57            "Node {} cannot be convert to stream node",
58            std::any::type_name::<Self>()
59        )
60        .into())
61    }
62}
63
64impl<T: StreamNode> TryToStreamPb for T {
65    fn try_to_stream_prost_body(
66        &self,
67        state: &mut BuildFragmentGraphState,
68    ) -> SchedulerResult<pb_stream_node::NodeBody> {
69        Ok(self.to_stream_prost_body(state))
70    }
71}
72
73pub trait StreamNode {
74    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState)
75    -> pb_stream_node::NodeBody;
76}
77
78/// impl `ToPb` nodes which have impl `ToBatchPb` and `ToStreamPb`.
79macro_rules! impl_to_prost {
80    ($( { $convention:ident, $name:ident }),*) => {
81        paste!{
82            $(impl ToPb for [<$convention $name>] { })*
83        }
84    }
85}
86for_all_plan_nodes! { impl_to_prost }
87/// impl a panic `ToBatchPb` for logical and stream node.
88macro_rules! ban_to_batch_prost {
89    ($( { $convention:ident, $name:ident }),*) => {
90        paste!{
91            $(impl TryToBatchPb for [<$convention $name>] {})*
92        }
93    }
94}
95for_logical_plan_nodes! { ban_to_batch_prost }
96for_stream_plan_nodes! { ban_to_batch_prost }
97/// impl a panic `ToStreamPb` for logical and batch node.
98macro_rules! ban_to_stream_prost {
99    ($( { $convention:ident, $name:ident }),*) => {
100        paste!{
101            $(impl TryToStreamPb for [<$convention $name>] {})*
102        }
103    }
104}
105for_logical_plan_nodes! { ban_to_stream_prost }
106for_batch_plan_nodes! { ban_to_stream_prost }