risingwave_frontend/optimizer/plan_node/
stream_row_merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail;
18use risingwave_common::catalog::Schema;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use crate::PlanRef;
23use crate::error::Result;
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
27use crate::optimizer::plan_node::stream::StreamPlanRef;
28use crate::optimizer::plan_node::utils::{Distill, childless_record};
29use crate::optimizer::plan_node::{
30    ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode,
31};
32use crate::optimizer::property::{FunctionalDependencySet, WatermarkColumns};
33use crate::stream_fragmenter::BuildFragmentGraphState;
34
35/// `StreamRowMerge` is used for merging two streams with the same stream key and distribution.
36/// It will buffer the outputs from its input streams until we receive a barrier.
37/// On receiving a barrier, it will `Project` their outputs according
38/// to the provided `lhs_mapping` and `rhs_mapping`.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct StreamRowMerge {
41    pub base: PlanBase<Stream>,
42    pub lhs_input: PlanRef,
43    pub rhs_input: PlanRef,
44    /// Maps input from the lhs to the output.
45    pub lhs_mapping: ColIndexMapping,
46    /// Maps input from the rhs to the output.
47    pub rhs_mapping: ColIndexMapping,
48}
49
50impl StreamRowMerge {
51    pub fn new(
52        lhs_input: PlanRef,
53        rhs_input: PlanRef,
54        lhs_mapping: ColIndexMapping,
55        rhs_mapping: ColIndexMapping,
56    ) -> Result<Self> {
57        assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
58        assert_eq!(lhs_input.distribution(), rhs_input.distribution());
59        assert_eq!(lhs_input.stream_key(), rhs_input.stream_key());
60        let functional_dependency =
61            FunctionalDependencySet::with_key(lhs_mapping.target_size(), &[]);
62        let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
63        let o2i_lhs = lhs_mapping
64            .inverse()
65            .ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?;
66        let o2i_rhs = rhs_mapping
67            .inverse()
68            .ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?;
69        for output_idx in 0..lhs_mapping.target_size() {
70            if let Some(lhs_idx) = o2i_lhs.try_map(output_idx) {
71                schema_fields.push(lhs_input.schema().fields()[lhs_idx].clone());
72            } else if let Some(rhs_idx) = o2i_rhs.try_map(output_idx) {
73                schema_fields.push(rhs_input.schema().fields()[rhs_idx].clone());
74            } else {
75                bail!(
76                    "output index {} not found in either lhs or rhs mapping",
77                    output_idx
78                );
79            }
80        }
81        let schema = Schema::new(schema_fields);
82        assert!(!schema.is_empty());
83        let watermark_columns = WatermarkColumns::new();
84
85        let base = PlanBase::new_stream(
86            lhs_input.ctx(),
87            schema,
88            lhs_input.stream_key().map(|k| k.to_vec()),
89            functional_dependency,
90            lhs_input.distribution().clone(),
91            lhs_input.append_only(),
92            lhs_input.emit_on_window_close(),
93            watermark_columns,
94            lhs_input.columns_monotonicity().clone(),
95        );
96        Ok(Self {
97            base,
98            lhs_input,
99            rhs_input,
100            lhs_mapping,
101            rhs_mapping,
102        })
103    }
104}
105
106impl Distill for StreamRowMerge {
107    fn distill<'a>(&self) -> XmlNode<'a> {
108        let mut out = Vec::with_capacity(1);
109
110        if self.base.ctx().is_explain_verbose() {
111            let f = |t| Pretty::debug(&t);
112            let e = Pretty::Array(self.base.schema().fields().iter().map(f).collect());
113            out = vec![("output", e)];
114        }
115        childless_record("StreamRowMerge", out)
116    }
117}
118
119impl PlanTreeNodeBinary for StreamRowMerge {
120    fn left(&self) -> PlanRef {
121        self.lhs_input.clone()
122    }
123
124    fn right(&self) -> PlanRef {
125        self.rhs_input.clone()
126    }
127
128    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
129        Self {
130            base: self.base.clone(),
131            lhs_input: left,
132            rhs_input: right,
133            lhs_mapping: self.lhs_mapping.clone(),
134            rhs_mapping: self.rhs_mapping.clone(),
135        }
136    }
137}
138
139impl_plan_tree_node_for_binary! { StreamRowMerge }
140
141impl StreamNode for StreamRowMerge {
142    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
143        PbNodeBody::RowMerge(Box::new(risingwave_pb::stream_plan::RowMergeNode {
144            lhs_mapping: Some(self.lhs_mapping.to_protobuf()),
145            rhs_mapping: Some(self.rhs_mapping.to_protobuf()),
146        }))
147    }
148}
149
150impl ExprRewritable for StreamRowMerge {
151    fn has_rewritable_expr(&self) -> bool {
152        false
153    }
154
155    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
156        unimplemented!()
157    }
158}
159
160impl ExprVisitable for StreamRowMerge {
161    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
162}