risingwave_frontend/optimizer/plan_node/
stream_share.rs1use std::cell::RefCell;
16
17use pretty_xmlish::XmlNode;
18use risingwave_pb::stream_plan::PbStreamNode;
19use risingwave_pb::stream_plan::stream_node::PbNodeBody;
20
21use super::stream::prelude::*;
22use super::utils::Distill;
23use super::{
24 ExprRewritable, PlanTreeNodeUnary, ShareNode, StreamExchange, StreamNode,
25 StreamPlanRef as PlanRef, generic,
26};
27use crate::Explain;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::generic::Share;
30use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
31use crate::scheduler::SchedulerResult;
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamShare {
37 pub base: PlanBase<Stream>,
38 core: generic::Share<PlanRef>,
39}
40
41impl StreamShare {
42 pub fn new(core: generic::Share<PlanRef>) -> Self {
43 let base = {
44 let input = core.input.borrow();
45 let dist = input.distribution().clone();
46 PlanBase::new_stream_with_core(
48 &core,
49 dist,
50 input.stream_kind(),
51 input.emit_on_window_close(),
52 input.watermark_columns().clone(),
53 input.columns_monotonicity().clone(),
54 )
55 };
56
57 StreamShare { base, core }
58 }
59
60 pub fn new_from_input(input: PlanRef) -> Self {
61 let core = generic::Share {
62 input: RefCell::new(input),
63 };
64 Self::new(core)
65 }
66}
67
68impl Distill for StreamShare {
69 fn distill<'a>(&self) -> XmlNode<'a> {
70 LogicalShare::pretty_fields(&self.base, "StreamShare")
71 }
72}
73
74impl PlanTreeNodeUnary<Stream> for StreamShare {
75 fn input(&self) -> PlanRef {
76 self.core.input.borrow().clone()
77 }
78
79 fn clone_with_input(&self, _input: PlanRef) -> Self {
80 unreachable!("shared node should be handled specially in PlanRef::clone_with_input")
81 }
82}
83
84impl ShareNode<Stream> for StreamShare {
85 fn new_share(core: Share<PlanRef>) -> PlanRef {
86 Self::new(core).into()
87 }
88
89 fn replace_input(&self, plan: PlanRef) {
90 self.core.replace_input(plan);
91 }
92}
93
94impl_plan_tree_node_for_unary! { Stream, StreamShare }
95
96impl StreamNode for StreamShare {
97 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
98 unreachable!(
99 "stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead."
100 )
101 }
102}
103
104impl StreamShare {
105 pub fn adhoc_to_stream_prost(
106 &self,
107 state: &mut BuildFragmentGraphState,
108 ) -> SchedulerResult<PbStreamNode> {
109 let operator_id = self.base.id().0 as u32;
110
111 match state.get_share_stream_node(operator_id) {
112 None => {
113 let node_body =
114 StreamExchange::new_no_shuffle(self.input()).to_stream_prost_body(state);
115
116 let input = self
117 .inputs()
118 .into_iter()
119 .map(|plan| plan.to_stream_prost(state))
120 .try_collect()?;
121
122 let stream_node = PbStreamNode {
123 input,
124 identity: self.distill_to_string(),
125 node_body: Some(node_body),
126 operator_id: self.id().0 as _,
127 stream_key: self
128 .stream_key()
129 .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
130 PlanRef::from(self.clone()).explain_to_string()))
131 .iter()
132 .map(|x| *x as u32)
133 .collect(),
134 fields: self.schema().to_prost(),
135 append_only: self.append_only(),
136 };
137
138 state.add_share_stream_node(operator_id, stream_node.clone());
139 Ok(stream_node)
140 }
141
142 Some(stream_node) => Ok(stream_node.clone()),
143 }
144 }
145}
146
147impl ExprRewritable<Stream> for StreamShare {}
148
149impl ExprVisitable for StreamShare {}