risingwave_frontend/optimizer/plan_node/
stream_row_merge.rs1use anyhow::anyhow;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail;
18use risingwave_common::catalog::Schema;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, childless_record};
28use crate::optimizer::plan_node::{
29 ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::optimizer::property::{FunctionalDependencySet, WatermarkColumns, reject_upsert_input};
32use crate::stream_fragmenter::BuildFragmentGraphState;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct StreamRowMerge {
40 pub base: PlanBase<Stream>,
41 pub lhs_input: PlanRef,
42 pub rhs_input: PlanRef,
43 pub lhs_mapping: ColIndexMapping,
45 pub rhs_mapping: ColIndexMapping,
47}
48
49impl StreamRowMerge {
50 pub fn new(
51 lhs_input: PlanRef,
52 rhs_input: PlanRef,
53 lhs_mapping: ColIndexMapping,
54 rhs_mapping: ColIndexMapping,
55 ) -> Result<Self> {
56 assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
57 assert_eq!(lhs_input.distribution(), rhs_input.distribution());
58 assert_eq!(lhs_input.stream_key(), rhs_input.stream_key());
59 let functional_dependency =
60 FunctionalDependencySet::with_key(lhs_mapping.target_size(), &[]);
61 let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
62 let o2i_lhs = lhs_mapping
63 .inverse()
64 .ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?;
65 let o2i_rhs = rhs_mapping
66 .inverse()
67 .ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?;
68 for output_idx in 0..lhs_mapping.target_size() {
69 if let Some(lhs_idx) = o2i_lhs.try_map(output_idx) {
70 schema_fields.push(lhs_input.schema().fields()[lhs_idx].clone());
71 } else if let Some(rhs_idx) = o2i_rhs.try_map(output_idx) {
72 schema_fields.push(rhs_input.schema().fields()[rhs_idx].clone());
73 } else {
74 bail!(
75 "output index {} not found in either lhs or rhs mapping",
76 output_idx
77 );
78 }
79 }
80 let schema = Schema::new(schema_fields);
81 assert!(!schema.is_empty());
82 let watermark_columns = WatermarkColumns::new();
83
84 let base = PlanBase::new_stream(
85 lhs_input.ctx(),
86 schema,
87 lhs_input.stream_key().map(|k| k.to_vec()),
88 functional_dependency,
89 lhs_input.distribution().clone(),
90 reject_upsert_input!(lhs_input),
91 lhs_input.emit_on_window_close(),
92 watermark_columns,
93 lhs_input.columns_monotonicity().clone(),
94 );
95 Ok(Self {
96 base,
97 lhs_input,
98 rhs_input,
99 lhs_mapping,
100 rhs_mapping,
101 })
102 }
103}
104
105impl Distill for StreamRowMerge {
106 fn distill<'a>(&self) -> XmlNode<'a> {
107 let mut out = Vec::with_capacity(1);
108
109 if self.base.ctx().is_explain_verbose() {
110 let f = |t| Pretty::debug(&t);
111 let e = Pretty::Array(self.base.schema().fields().iter().map(f).collect());
112 out = vec![("output", e)];
113 }
114 childless_record("StreamRowMerge", out)
115 }
116}
117
118impl PlanTreeNodeBinary<Stream> for StreamRowMerge {
119 fn left(&self) -> PlanRef {
120 self.lhs_input.clone()
121 }
122
123 fn right(&self) -> PlanRef {
124 self.rhs_input.clone()
125 }
126
127 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
128 Self {
129 base: self.base.clone(),
130 lhs_input: left,
131 rhs_input: right,
132 lhs_mapping: self.lhs_mapping.clone(),
133 rhs_mapping: self.rhs_mapping.clone(),
134 }
135 }
136}
137
138impl_plan_tree_node_for_binary! { Stream, StreamRowMerge }
139
140impl StreamNode for StreamRowMerge {
141 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
142 PbNodeBody::RowMerge(Box::new(risingwave_pb::stream_plan::RowMergeNode {
143 lhs_mapping: Some(self.lhs_mapping.to_protobuf()),
144 rhs_mapping: Some(self.rhs_mapping.to_protobuf()),
145 }))
146 }
147}
148
149impl ExprRewritable<Stream> for StreamRowMerge {
150 fn has_rewritable_expr(&self) -> bool {
151 false
152 }
153
154 fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
155 unimplemented!()
156 }
157}
158
159impl ExprVisitable for StreamRowMerge {
160 fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
161}