risingwave_frontend/optimizer/plan_node/
stream_union.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16
17use fixedbitset::FixedBitSet;
18use pretty_xmlish::XmlNode;
19use risingwave_pb::stream_plan::UnionNode;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{Distill, childless_record, watermark_pretty};
24use super::{ExprRewritable, PlanRef, generic};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode};
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31/// `StreamUnion` implements [`super::LogicalUnion`]
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamUnion {
34    pub base: PlanBase<Stream>,
35    core: generic::Union<PlanRef>,
36}
37
38impl StreamUnion {
39    pub fn new(core: generic::Union<PlanRef>) -> Self {
40        let inputs = &core.inputs;
41        let dist = inputs[0].distribution().clone();
42        assert!(inputs.iter().all(|input| *input.distribution() == dist));
43        Self::new_with_dist(core, dist)
44    }
45
46    pub fn new_with_dist(core: generic::Union<PlanRef>, dist: Distribution) -> Self {
47        let inputs = &core.inputs;
48        let ctx = core.ctx();
49
50        let watermark_indices = inputs
51            .iter()
52            .map(|x| x.watermark_columns().index_set().to_bitset())
53            .fold(
54                {
55                    let mut bitset = FixedBitSet::with_capacity(core.schema().len());
56                    bitset.toggle_range(..);
57                    bitset
58                },
59                |acc, x| acc.bitand(&x),
60            );
61        let mut watermark_columns = WatermarkColumns::new();
62        for idx in watermark_indices.ones() {
63            // XXX(rc): for the sake of simplicity, we assign each watermark column a new group
64            watermark_columns.insert(idx, ctx.next_watermark_group_id());
65        }
66
67        let base = PlanBase::new_stream_with_core(
68            &core,
69            dist,
70            inputs.iter().all(|x| x.append_only()),
71            inputs.iter().all(|x| x.emit_on_window_close()),
72            watermark_columns,
73            MonotonicityMap::new(),
74        );
75
76        StreamUnion { base, core }
77    }
78}
79
80impl Distill for StreamUnion {
81    fn distill<'a>(&self) -> XmlNode<'a> {
82        let mut vec = self.core.fields_pretty();
83        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
84            vec.push(("output_watermarks", ow));
85        }
86        childless_record("StreamUnion", vec)
87    }
88}
89
90impl PlanTreeNode for StreamUnion {
91    fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
92        smallvec::SmallVec::from_vec(self.core.inputs.clone())
93    }
94
95    fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
96        let mut new = self.core.clone();
97        new.inputs = inputs.to_vec();
98        let dist = self.distribution().clone();
99        Self::new_with_dist(new, dist).into()
100    }
101}
102
103impl StreamNode for StreamUnion {
104    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
105        PbNodeBody::Union(UnionNode {})
106    }
107}
108
109impl ExprRewritable for StreamUnion {}
110
111impl ExprVisitable for StreamUnion {}