risingwave_frontend/optimizer/plan_node/
stream_share.rs1use std::cell::RefCell;
16
17use pretty_xmlish::XmlNode;
18use risingwave_pb::stream_plan::PbStreamNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::stream::prelude::*;
22use super::utils::Distill;
23use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode, generic};
24use crate::Explain;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamShare {
33 pub base: PlanBase<Stream>,
34 core: generic::Share<PlanRef>,
35}
36
37impl StreamShare {
38 pub fn new(core: generic::Share<PlanRef>) -> Self {
39 let base = {
40 let input = core.input.borrow();
41 let dist = input.distribution().clone();
42 PlanBase::new_stream_with_core(
44 &core,
45 dist,
46 input.append_only(),
47 input.emit_on_window_close(),
48 input.watermark_columns().clone(),
49 input.columns_monotonicity().clone(),
50 )
51 };
52
53 StreamShare { base, core }
54 }
55
56 pub fn new_from_input(input: PlanRef) -> Self {
57 let core = generic::Share {
58 input: RefCell::new(input),
59 };
60 Self::new(core)
61 }
62}
63
64impl Distill for StreamShare {
65 fn distill<'a>(&self) -> XmlNode<'a> {
66 LogicalShare::pretty_fields(&self.base, "StreamShare")
67 }
68}
69
70impl PlanTreeNodeUnary for StreamShare {
71 fn input(&self) -> PlanRef {
72 self.core.input.borrow().clone()
73 }
74
75 fn clone_with_input(&self, input: PlanRef) -> Self {
76 let core = self.core.clone();
77 core.replace_input(input);
78 Self::new(core)
79 }
80}
81
82impl StreamShare {
83 pub fn replace_input(&self, plan: PlanRef) {
84 self.core.replace_input(plan);
85 }
86}
87
88impl_plan_tree_node_for_unary! { StreamShare }
89
90impl StreamNode for StreamShare {
91 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
92 unreachable!(
93 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
94 )
95 }
96}
97
98impl StreamShare {
99 pub fn adhoc_to_stream_prost(
100 &self,
101 state: &mut BuildFragmentGraphState,
102 ) -> SchedulerResult<PbStreamNode> {
103 let operator_id = self.base.id().0 as u32;
104
105 match state.get_share_stream_node(operator_id) {
106 None => {
107 let node_body =
108 StreamExchange::new_no_shuffle(self.input()).to_stream_prost_body(state);
109
110 let input = self
111 .inputs()
112 .into_iter()
113 .map(|plan| plan.to_stream_prost(state))
114 .try_collect()?;
115
116 let stream_node = PbStreamNode {
117 input,
118 identity: self.distill_to_string(),
119 node_body: Some(node_body),
120 operator_id: self.id().0 as _,
121 stream_key: self
122 .stream_key()
123 .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
124 PlanRef::from(self.clone()).explain_to_string()))
125 .iter()
126 .map(|x| *x as u32)
127 .collect(),
128 fields: self.schema().to_prost(),
129 append_only: self.append_only(),
130 };
131
132 state.add_share_stream_node(operator_id, stream_node.clone());
133 Ok(stream_node)
134 }
135
136 Some(stream_node) => Ok(stream_node.clone()),
137 }
138 }
139}
140
141impl ExprRewritable for StreamShare {}
142
143impl ExprVisitable for StreamShare {}