risingwave_frontend/optimizer/plan_node/
stream_union.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16
17use fixedbitset::FixedBitSet;
18use pretty_xmlish::XmlNode;
19use risingwave_pb::stream_plan::UnionNode;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{Distill, childless_record, watermark_pretty};
24use super::{ExprRewritable, StreamPlanRef as PlanRef, generic};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode};
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31/// `StreamUnion` implements [`super::LogicalUnion`]
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamUnion {
34    pub base: PlanBase<Stream>,
35    core: generic::Union<PlanRef>,
36}
37
38impl StreamUnion {
39    pub fn new(core: generic::Union<PlanRef>) -> Self {
40        let inputs = &core.inputs;
41        let dist = inputs[0].distribution().clone();
42        assert!(inputs.iter().all(|input| *input.distribution() == dist));
43        Self::new_with_dist(core, dist)
44    }
45
46    pub fn new_with_dist(core: generic::Union<PlanRef>, dist: Distribution) -> Self {
47        assert!(
48            core.all,
49            "After UnionToDistinctRule, union should become union all"
50        );
51        assert!(!core.inputs.is_empty());
52
53        let inputs = &core.inputs;
54        let ctx = core.ctx();
55
56        let watermark_indices = inputs
57            .iter()
58            .map(|x| x.watermark_columns().index_set().to_bitset())
59            .fold(
60                {
61                    let mut bitset = FixedBitSet::with_capacity(core.schema().len());
62                    bitset.toggle_range(..);
63                    bitset
64                },
65                |acc, x| acc.bitand(&x),
66            );
67        let mut watermark_columns = WatermarkColumns::new();
68        for idx in watermark_indices.ones() {
69            // XXX(rc): for the sake of simplicity, we assign each watermark column a new group
70            watermark_columns.insert(idx, ctx.next_watermark_group_id());
71        }
72
73        let kind = if core.source_col.is_some() {
74            // There's no handling on key conflict in executor implementation. However, in most cases
75            // there's a `source_col` as a part of stream key, indicating which input the row comes from,
76            // there won't be actual key conflict and we can safely call `merge`.
77            (inputs.iter().map(|i| i.stream_kind()))
78                .reduce(StreamKind::merge)
79                .unwrap()
80        } else {
81            // No `source_col`, typically used in a `TABLE` plan to merge inputs from external source,
82            // upstream sink-into-table jobs, and DML.
83            if inputs.len() == 1 {
84                // Single input. Follow the input's kind.
85                inputs[0].stream_kind()
86            } else if inputs.iter().all(|x| x.stream_kind().is_append_only()) {
87                // Special case for append-only table. Either there will be a `RowIdGen` following the `Union`,
88                // or there will be a `Materialize` with conflict handling enabled. In both cases there
89                // will be no key conflict, so we can treat the merged stream as append-only here.
90                StreamKind::AppendOnly
91            } else {
92                // Otherwise we must treat it as upsert.
93                StreamKind::Upsert
94            }
95        };
96
97        let base = PlanBase::new_stream_with_core(
98            &core,
99            dist,
100            kind,
101            inputs.iter().all(|x| x.emit_on_window_close()),
102            watermark_columns,
103            MonotonicityMap::new(),
104        );
105
106        StreamUnion { base, core }
107    }
108}
109
110impl Distill for StreamUnion {
111    fn distill<'a>(&self) -> XmlNode<'a> {
112        let mut vec = self.core.fields_pretty();
113        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
114            vec.push(("output_watermarks", ow));
115        }
116        childless_record("StreamUnion", vec)
117    }
118}
119
120impl PlanTreeNode<Stream> for StreamUnion {
121    fn inputs(&self) -> smallvec::SmallVec<[PlanRef; 2]> {
122        smallvec::SmallVec::from_vec(self.core.inputs.clone())
123    }
124
125    fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
126        let mut new = self.core.clone();
127        new.inputs = inputs.to_vec();
128        let dist = self.distribution().clone();
129        Self::new_with_dist(new, dist).into()
130    }
131}
132
133impl StreamNode for StreamUnion {
134    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
135        PbNodeBody::Union(UnionNode {})
136    }
137}
138
139impl ExprRewritable<Stream> for StreamUnion {}
140
141impl ExprVisitable for StreamUnion {}