risingwave_frontend/optimizer/plan_node/
stream_union.rs1use std::ops::BitAnd;
16
17use fixedbitset::FixedBitSet;
18use pretty_xmlish::XmlNode;
19use risingwave_pb::stream_plan::UnionNode;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{Distill, childless_record, watermark_pretty};
24use super::{ExprRewritable, PlanRef, generic};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode};
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamUnion {
34 pub base: PlanBase<Stream>,
35 core: generic::Union<PlanRef>,
36}
37
38impl StreamUnion {
39 pub fn new(core: generic::Union<PlanRef>) -> Self {
40 let inputs = &core.inputs;
41 let dist = inputs[0].distribution().clone();
42 assert!(inputs.iter().all(|input| *input.distribution() == dist));
43 Self::new_with_dist(core, dist)
44 }
45
46 pub fn new_with_dist(core: generic::Union<PlanRef>, dist: Distribution) -> Self {
47 let inputs = &core.inputs;
48 let ctx = core.ctx();
49
50 let watermark_indices = inputs
51 .iter()
52 .map(|x| x.watermark_columns().index_set().to_bitset())
53 .fold(
54 {
55 let mut bitset = FixedBitSet::with_capacity(core.schema().len());
56 bitset.toggle_range(..);
57 bitset
58 },
59 |acc, x| acc.bitand(&x),
60 );
61 let mut watermark_columns = WatermarkColumns::new();
62 for idx in watermark_indices.ones() {
63 watermark_columns.insert(idx, ctx.next_watermark_group_id());
65 }
66
67 let base = PlanBase::new_stream_with_core(
68 &core,
69 dist,
70 inputs.iter().all(|x| x.append_only()),
71 inputs.iter().all(|x| x.emit_on_window_close()),
72 watermark_columns,
73 MonotonicityMap::new(),
74 );
75
76 StreamUnion { base, core }
77 }
78}
79
80impl Distill for StreamUnion {
81 fn distill<'a>(&self) -> XmlNode<'a> {
82 let mut vec = self.core.fields_pretty();
83 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
84 vec.push(("output_watermarks", ow));
85 }
86 childless_record("StreamUnion", vec)
87 }
88}
89
90impl PlanTreeNode for StreamUnion {
91 fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
92 smallvec::SmallVec::from_vec(self.core.inputs.clone())
93 }
94
95 fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
96 let mut new = self.core.clone();
97 new.inputs = inputs.to_vec();
98 let dist = self.distribution().clone();
99 Self::new_with_dist(new, dist).into()
100 }
101}
102
103impl StreamNode for StreamUnion {
104 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
105 PbNodeBody::Union(UnionNode {})
106 }
107}
108
109impl ExprRewritable for StreamUnion {}
110
111impl ExprVisitable for StreamUnion {}