risingwave_frontend/optimizer/plan_node/
stream_row_merge.rs1use anyhow::anyhow;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail;
18use risingwave_common::catalog::Schema;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, childless_record};
28use crate::optimizer::plan_node::{
29 ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::optimizer::property::{
32 Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
33};
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct StreamRowMerge {
42 pub base: PlanBase<Stream>,
43 pub lhs_input: PlanRef,
44 pub rhs_input: PlanRef,
45 pub lhs_mapping: ColIndexMapping,
47 pub rhs_mapping: ColIndexMapping,
49}
50
51impl StreamRowMerge {
52 pub fn new(
53 lhs_input: PlanRef,
54 rhs_input: PlanRef,
55 lhs_mapping: ColIndexMapping,
56 rhs_mapping: ColIndexMapping,
57 ) -> Result<Self> {
58 assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
59 assert_eq!(lhs_input.distribution(), rhs_input.distribution());
60 assert_eq!(lhs_input.stream_key(), rhs_input.stream_key());
61 assert_eq!(lhs_input.stream_kind(), rhs_input.stream_kind());
62
63 assert_eq!(lhs_input.distribution(), &Distribution::Single);
66 assert_eq!(lhs_input.stream_key(), Some(&[][..]));
67 assert_eq!(lhs_input.stream_kind(), StreamKind::Retract);
68
69 let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
70 let o2i_lhs = lhs_mapping
71 .inverse()
72 .ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?;
73 let o2i_rhs = rhs_mapping
74 .inverse()
75 .ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?;
76 for output_idx in 0..lhs_mapping.target_size() {
77 if let Some(lhs_idx) = o2i_lhs.try_map(output_idx) {
78 schema_fields.push(lhs_input.schema().fields()[lhs_idx].clone());
79 } else if let Some(rhs_idx) = o2i_rhs.try_map(output_idx) {
80 schema_fields.push(rhs_input.schema().fields()[rhs_idx].clone());
81 } else {
82 bail!(
83 "output index {} not found in either lhs or rhs mapping",
84 output_idx
85 );
86 }
87 }
88 let schema = Schema::new(schema_fields);
89 assert!(!schema.is_empty());
90 let base = PlanBase::new_stream(
91 lhs_input.ctx(),
92 schema,
93 Some(vec![]),
94 FunctionalDependencySet::new(lhs_mapping.target_size()),
95 Distribution::Single,
96 StreamKind::Retract,
97 lhs_input.emit_on_window_close(),
98 WatermarkColumns::new(),
99 MonotonicityMap::new(),
100 );
101 Ok(Self {
102 base,
103 lhs_input,
104 rhs_input,
105 lhs_mapping,
106 rhs_mapping,
107 })
108 }
109}
110
111impl Distill for StreamRowMerge {
112 fn distill<'a>(&self) -> XmlNode<'a> {
113 let mut out = Vec::with_capacity(1);
114
115 if self.base.ctx().is_explain_verbose() {
116 let f = |t| Pretty::debug(&t);
117 let e = Pretty::Array(self.base.schema().fields().iter().map(f).collect());
118 out = vec![("output", e)];
119 }
120 childless_record("StreamRowMerge", out)
121 }
122}
123
124impl PlanTreeNodeBinary<Stream> for StreamRowMerge {
125 fn left(&self) -> PlanRef {
126 self.lhs_input.clone()
127 }
128
129 fn right(&self) -> PlanRef {
130 self.rhs_input.clone()
131 }
132
133 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
134 Self {
135 base: self.base.clone(),
136 lhs_input: left,
137 rhs_input: right,
138 lhs_mapping: self.lhs_mapping.clone(),
139 rhs_mapping: self.rhs_mapping.clone(),
140 }
141 }
142}
143
144impl_plan_tree_node_for_binary! { Stream, StreamRowMerge }
145
146impl StreamNode for StreamRowMerge {
147 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
148 PbNodeBody::RowMerge(Box::new(risingwave_pb::stream_plan::RowMergeNode {
149 lhs_mapping: Some(self.lhs_mapping.to_protobuf()),
150 rhs_mapping: Some(self.rhs_mapping.to_protobuf()),
151 }))
152 }
153}
154
155impl ExprRewritable<Stream> for StreamRowMerge {
156 fn has_rewritable_expr(&self) -> bool {
157 false
158 }
159
160 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
161 unimplemented!()
162 }
163}
164
165impl ExprVisitable for StreamRowMerge {
166 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
167}