risingwave_frontend/optimizer/plan_node/
stream_share.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16
17use pretty_xmlish::XmlNode;
18use risingwave_pb::stream_plan::PbStreamNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::stream::prelude::*;
22use super::utils::Distill;
23use super::{
24    ExprRewritable, PlanTreeNodeUnary, ShareNode, StreamExchange, StreamNode,
25    StreamPlanRef as PlanRef, generic,
26};
27use crate::Explain;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::generic::Share;
30use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34/// `StreamShare` will be translated into an `ExchangeNode` based on its distribution finally.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamShare {
37    pub base: PlanBase<Stream>,
38    core: generic::Share<PlanRef>,
39}
40
41impl StreamShare {
42    pub fn new(core: generic::Share<PlanRef>) -> Self {
43        let base = {
44            let input = core.input.borrow();
45            let dist = input.distribution().clone();
46            // Filter executor won't change the append-only behavior of the stream.
47            PlanBase::new_stream_with_core(
48                &core,
49                dist,
50                input.stream_kind(),
51                input.emit_on_window_close(),
52                input.watermark_columns().clone(),
53                input.columns_monotonicity().clone(),
54            )
55        };
56
57        StreamShare { base, core }
58    }
59
60    pub fn new_from_input(input: PlanRef) -> Self {
61        let core = generic::Share {
62            input: RefCell::new(input),
63        };
64        Self::new(core)
65    }
66}
67
68impl Distill for StreamShare {
69    fn distill<'a>(&self) -> XmlNode<'a> {
70        LogicalShare::pretty_fields(&self.base, "StreamShare")
71    }
72}
73
74impl PlanTreeNodeUnary<Stream> for StreamShare {
75    fn input(&self) -> PlanRef {
76        self.core.input.borrow().clone()
77    }
78
79    fn clone_with_input(&self, _input: PlanRef) -> Self {
80        unreachable!("shared node should be handled specially in PlanRef::clone_with_input")
81    }
82}
83
84impl ShareNode<Stream> for StreamShare {
85    fn new_share(core: Share<PlanRef>) -> PlanRef {
86        Self::new(core).into()
87    }
88
89    fn replace_input(&self, plan: PlanRef) {
90        self.core.replace_input(plan);
91    }
92}
93
94impl_plan_tree_node_for_unary! { Stream, StreamShare }
95
96impl StreamNode for StreamShare {
97    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
98        unreachable!(
99            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
100        )
101    }
102}
103
104impl StreamShare {
105    pub fn adhoc_to_stream_prost(
106        &self,
107        state: &mut BuildFragmentGraphState,
108    ) -> SchedulerResult<PbStreamNode> {
109        let operator_id = self.base.id().0 as u32;
110
111        match state.get_share_stream_node(operator_id) {
112            None => {
113                let node_body =
114                    StreamExchange::new_no_shuffle(self.input()).to_stream_prost_body(state);
115
116                let input = self
117                    .inputs()
118                    .into_iter()
119                    .map(|plan| plan.to_stream_prost(state))
120                    .try_collect()?;
121
122                let stream_node = PbStreamNode {
123                    input,
124                    identity: self.distill_to_string(),
125                    node_body: Some(node_body),
126                    operator_id: self.id().0 as _,
127                    stream_key: self
128                        .stream_key()
129                        .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
130                       PlanRef::from(self.clone()).explain_to_string()))
131                        .iter()
132                        .map(|x| *x as u32)
133                        .collect(),
134                    fields: self.schema().to_prost(),
135                    append_only: self.append_only(),
136                };
137
138                state.add_share_stream_node(operator_id, stream_node.clone());
139                Ok(stream_node)
140            }
141
142            Some(stream_node) => Ok(stream_node.clone()),
143        }
144    }
145}
146
147impl ExprRewritable<Stream> for StreamShare {}
148
149impl ExprVisitable for StreamShare {}