risingwave_frontend/optimizer/plan_node/
stream_share.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16
17use pretty_xmlish::XmlNode;
18use risingwave_pb::stream_plan::PbStreamNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::stream::prelude::*;
22use super::utils::Distill;
23use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode, generic};
24use crate::Explain;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30/// `StreamShare` will be translated into an `ExchangeNode` based on its distribution finally.
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamShare {
33    pub base: PlanBase<Stream>,
34    core: generic::Share<PlanRef>,
35}
36
37impl StreamShare {
38    pub fn new(core: generic::Share<PlanRef>) -> Self {
39        let base = {
40            let input = core.input.borrow();
41            let dist = input.distribution().clone();
42            // Filter executor won't change the append-only behavior of the stream.
43            PlanBase::new_stream_with_core(
44                &core,
45                dist,
46                input.append_only(),
47                input.emit_on_window_close(),
48                input.watermark_columns().clone(),
49                input.columns_monotonicity().clone(),
50            )
51        };
52
53        StreamShare { base, core }
54    }
55
56    pub fn new_from_input(input: PlanRef) -> Self {
57        let core = generic::Share {
58            input: RefCell::new(input),
59        };
60        Self::new(core)
61    }
62}
63
64impl Distill for StreamShare {
65    fn distill<'a>(&self) -> XmlNode<'a> {
66        LogicalShare::pretty_fields(&self.base, "StreamShare")
67    }
68}
69
70impl PlanTreeNodeUnary for StreamShare {
71    fn input(&self) -> PlanRef {
72        self.core.input.borrow().clone()
73    }
74
75    fn clone_with_input(&self, input: PlanRef) -> Self {
76        let core = self.core.clone();
77        core.replace_input(input);
78        Self::new(core)
79    }
80}
81
82impl StreamShare {
83    pub fn replace_input(&self, plan: PlanRef) {
84        self.core.replace_input(plan);
85    }
86}
87
88impl_plan_tree_node_for_unary! { StreamShare }
89
90impl StreamNode for StreamShare {
91    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
92        unreachable!(
93            "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
94        )
95    }
96}
97
98impl StreamShare {
99    pub fn adhoc_to_stream_prost(
100        &self,
101        state: &mut BuildFragmentGraphState,
102    ) -> SchedulerResult<PbStreamNode> {
103        let operator_id = self.base.id().0 as u32;
104
105        match state.get_share_stream_node(operator_id) {
106            None => {
107                let node_body =
108                    StreamExchange::new_no_shuffle(self.input()).to_stream_prost_body(state);
109
110                let input = self
111                    .inputs()
112                    .into_iter()
113                    .map(|plan| plan.to_stream_prost(state))
114                    .try_collect()?;
115
116                let stream_node = PbStreamNode {
117                    input,
118                    identity: self.distill_to_string(),
119                    node_body: Some(node_body),
120                    operator_id: self.id().0 as _,
121                    stream_key: self
122                        .stream_key()
123                        .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
124                       PlanRef::from(self.clone()).explain_to_string()))
125                        .iter()
126                        .map(|x| *x as u32)
127                        .collect(),
128                    fields: self.schema().to_prost(),
129                    append_only: self.append_only(),
130                };
131
132                state.add_share_stream_node(operator_id, stream_node.clone());
133                Ok(stream_node)
134            }
135
136            Some(stream_node) => Ok(stream_node.clone()),
137        }
138    }
139}
140
141impl ExprRewritable for StreamShare {}
142
143impl ExprVisitable for StreamShare {}