risingwave_frontend/optimizer/plan_node/
stream_row_merge.rs1use anyhow::anyhow;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail;
18use risingwave_common::catalog::Schema;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use crate::PlanRef;
23use crate::error::Result;
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
27use crate::optimizer::plan_node::stream::StreamPlanRef;
28use crate::optimizer::plan_node::utils::{Distill, childless_record};
29use crate::optimizer::plan_node::{
30 ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode,
31};
32use crate::optimizer::property::{FunctionalDependencySet, WatermarkColumns};
33use crate::stream_fragmenter::BuildFragmentGraphState;
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamRowMerge {
41 pub base: PlanBase<Stream>,
42 pub lhs_input: PlanRef,
43 pub rhs_input: PlanRef,
44 pub lhs_mapping: ColIndexMapping,
46 pub rhs_mapping: ColIndexMapping,
48}
49
50impl StreamRowMerge {
51 pub fn new(
52 lhs_input: PlanRef,
53 rhs_input: PlanRef,
54 lhs_mapping: ColIndexMapping,
55 rhs_mapping: ColIndexMapping,
56 ) -> Result<Self> {
57 assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
58 assert_eq!(lhs_input.distribution(), rhs_input.distribution());
59 assert_eq!(lhs_input.stream_key(), rhs_input.stream_key());
60 let functional_dependency =
61 FunctionalDependencySet::with_key(lhs_mapping.target_size(), &[]);
62 let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
63 let o2i_lhs = lhs_mapping
64 .inverse()
65 .ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?;
66 let o2i_rhs = rhs_mapping
67 .inverse()
68 .ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?;
69 for output_idx in 0..lhs_mapping.target_size() {
70 if let Some(lhs_idx) = o2i_lhs.try_map(output_idx) {
71 schema_fields.push(lhs_input.schema().fields()[lhs_idx].clone());
72 } else if let Some(rhs_idx) = o2i_rhs.try_map(output_idx) {
73 schema_fields.push(rhs_input.schema().fields()[rhs_idx].clone());
74 } else {
75 bail!(
76 "output index {} not found in either lhs or rhs mapping",
77 output_idx
78 );
79 }
80 }
81 let schema = Schema::new(schema_fields);
82 assert!(!schema.is_empty());
83 let watermark_columns = WatermarkColumns::new();
84
85 let base = PlanBase::new_stream(
86 lhs_input.ctx(),
87 schema,
88 lhs_input.stream_key().map(|k| k.to_vec()),
89 functional_dependency,
90 lhs_input.distribution().clone(),
91 lhs_input.append_only(),
92 lhs_input.emit_on_window_close(),
93 watermark_columns,
94 lhs_input.columns_monotonicity().clone(),
95 );
96 Ok(Self {
97 base,
98 lhs_input,
99 rhs_input,
100 lhs_mapping,
101 rhs_mapping,
102 })
103 }
104}
105
106impl Distill for StreamRowMerge {
107 fn distill<'a>(&self) -> XmlNode<'a> {
108 let mut out = Vec::with_capacity(1);
109
110 if self.base.ctx().is_explain_verbose() {
111 let f = |t| Pretty::debug(&t);
112 let e = Pretty::Array(self.base.schema().fields().iter().map(f).collect());
113 out = vec![("output", e)];
114 }
115 childless_record("StreamRowMerge", out)
116 }
117}
118
119impl PlanTreeNodeBinary for StreamRowMerge {
120 fn left(&self) -> PlanRef {
121 self.lhs_input.clone()
122 }
123
124 fn right(&self) -> PlanRef {
125 self.rhs_input.clone()
126 }
127
128 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
129 Self {
130 base: self.base.clone(),
131 lhs_input: left,
132 rhs_input: right,
133 lhs_mapping: self.lhs_mapping.clone(),
134 rhs_mapping: self.rhs_mapping.clone(),
135 }
136 }
137}
138
139impl_plan_tree_node_for_binary! { StreamRowMerge }
140
141impl StreamNode for StreamRowMerge {
142 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
143 PbNodeBody::RowMerge(Box::new(risingwave_pb::stream_plan::RowMergeNode {
144 lhs_mapping: Some(self.lhs_mapping.to_protobuf()),
145 rhs_mapping: Some(self.rhs_mapping.to_protobuf()),
146 }))
147 }
148}
149
150impl ExprRewritable for StreamRowMerge {
151 fn has_rewritable_expr(&self) -> bool {
152 false
153 }
154
155 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
156 unimplemented!()
157 }
158}
159
160impl ExprVisitable for StreamRowMerge {
161 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
162}