risingwave_frontend/optimizer/plan_node/
stream_row_merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail;
18use risingwave_common::catalog::Schema;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
26use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
27use crate::optimizer::plan_node::utils::{Distill, childless_record};
28use crate::optimizer::plan_node::{
29    ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::optimizer::property::{
32    Distribution, FunctionalDependencySet, MonotonicityMap, StreamKind, WatermarkColumns,
33};
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36/// `StreamRowMerge` is used for merging two streams with the same stream key and distribution.
37/// It will buffer the outputs from its input streams until we receive a barrier.
38/// On receiving a barrier, it will `Project` their outputs according
39/// to the provided `lhs_mapping` and `rhs_mapping`.
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct StreamRowMerge {
42    pub base: PlanBase<Stream>,
43    pub lhs_input: PlanRef,
44    pub rhs_input: PlanRef,
45    /// Maps input from the lhs to the output.
46    pub lhs_mapping: ColIndexMapping,
47    /// Maps input from the rhs to the output.
48    pub rhs_mapping: ColIndexMapping,
49}
50
51impl StreamRowMerge {
52    pub fn new(
53        lhs_input: PlanRef,
54        rhs_input: PlanRef,
55        lhs_mapping: ColIndexMapping,
56        rhs_mapping: ColIndexMapping,
57    ) -> Result<Self> {
58        assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
59        assert_eq!(lhs_input.distribution(), rhs_input.distribution());
60        assert_eq!(lhs_input.stream_key(), rhs_input.stream_key());
61        assert_eq!(lhs_input.stream_kind(), rhs_input.stream_kind());
62
63        // Currently, `RowMerge` only supports and is only used for merging singleton inputs, typically
64        // simple agg and approx percentile agg. We check the input's properties here for sanity.
65        assert_eq!(lhs_input.distribution(), &Distribution::Single);
66        assert_eq!(lhs_input.stream_key(), Some(&[][..]));
67        assert_eq!(lhs_input.stream_kind(), StreamKind::Retract);
68
69        let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
70        let o2i_lhs = lhs_mapping
71            .inverse()
72            .ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?;
73        let o2i_rhs = rhs_mapping
74            .inverse()
75            .ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?;
76        for output_idx in 0..lhs_mapping.target_size() {
77            if let Some(lhs_idx) = o2i_lhs.try_map(output_idx) {
78                schema_fields.push(lhs_input.schema().fields()[lhs_idx].clone());
79            } else if let Some(rhs_idx) = o2i_rhs.try_map(output_idx) {
80                schema_fields.push(rhs_input.schema().fields()[rhs_idx].clone());
81            } else {
82                bail!(
83                    "output index {} not found in either lhs or rhs mapping",
84                    output_idx
85                );
86            }
87        }
88        let schema = Schema::new(schema_fields);
89        assert!(!schema.is_empty());
90        let base = PlanBase::new_stream(
91            lhs_input.ctx(),
92            schema,
93            Some(vec![]),
94            FunctionalDependencySet::new(lhs_mapping.target_size()),
95            Distribution::Single,
96            StreamKind::Retract,
97            lhs_input.emit_on_window_close(),
98            WatermarkColumns::new(),
99            MonotonicityMap::new(),
100        );
101        Ok(Self {
102            base,
103            lhs_input,
104            rhs_input,
105            lhs_mapping,
106            rhs_mapping,
107        })
108    }
109}
110
111impl Distill for StreamRowMerge {
112    fn distill<'a>(&self) -> XmlNode<'a> {
113        let mut out = Vec::with_capacity(1);
114
115        if self.base.ctx().is_explain_verbose() {
116            let f = |t| Pretty::debug(&t);
117            let e = Pretty::Array(self.base.schema().fields().iter().map(f).collect());
118            out = vec![("output", e)];
119        }
120        childless_record("StreamRowMerge", out)
121    }
122}
123
124impl PlanTreeNodeBinary<Stream> for StreamRowMerge {
125    fn left(&self) -> PlanRef {
126        self.lhs_input.clone()
127    }
128
129    fn right(&self) -> PlanRef {
130        self.rhs_input.clone()
131    }
132
133    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
134        Self {
135            base: self.base.clone(),
136            lhs_input: left,
137            rhs_input: right,
138            lhs_mapping: self.lhs_mapping.clone(),
139            rhs_mapping: self.rhs_mapping.clone(),
140        }
141    }
142}
143
144impl_plan_tree_node_for_binary! { Stream, StreamRowMerge }
145
146impl StreamNode for StreamRowMerge {
147    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
148        PbNodeBody::RowMerge(Box::new(risingwave_pb::stream_plan::RowMergeNode {
149            lhs_mapping: Some(self.lhs_mapping.to_protobuf()),
150            rhs_mapping: Some(self.rhs_mapping.to_protobuf()),
151        }))
152    }
153}
154
155impl ExprRewritable<Stream> for StreamRowMerge {
156    fn has_rewritable_expr(&self) -> bool {
157        false
158    }
159
160    fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
161        unimplemented!()
162    }
163}
164
165impl ExprVisitable for StreamRowMerge {
166    fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
167}