risingwave_frontend/optimizer/plan_node/
stream_union.rs1use std::ops::BitAnd;
16
17use fixedbitset::FixedBitSet;
18use pretty_xmlish::XmlNode;
19use risingwave_pb::stream_plan::UnionNode;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{Distill, childless_record, watermark_pretty};
24use super::{ExprRewritable, StreamPlanRef as PlanRef, generic};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode};
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamUnion {
34 pub base: PlanBase<Stream>,
35 core: generic::Union<PlanRef>,
36}
37
38impl StreamUnion {
39 pub fn new(core: generic::Union<PlanRef>) -> Self {
40 let inputs = &core.inputs;
41 let dist = inputs[0].distribution().clone();
42 assert!(inputs.iter().all(|input| *input.distribution() == dist));
43 Self::new_with_dist(core, dist)
44 }
45
46 pub fn new_with_dist(core: generic::Union<PlanRef>, dist: Distribution) -> Self {
47 assert!(
48 core.all,
49 "After UnionToDistinctRule, union should become union all"
50 );
51 assert!(!core.inputs.is_empty());
52
53 let inputs = &core.inputs;
54 let ctx = core.ctx();
55
56 let watermark_indices = inputs
57 .iter()
58 .map(|x| x.watermark_columns().index_set().to_bitset())
59 .fold(
60 {
61 let mut bitset = FixedBitSet::with_capacity(core.schema().len());
62 bitset.toggle_range(..);
63 bitset
64 },
65 |acc, x| acc.bitand(&x),
66 );
67 let mut watermark_columns = WatermarkColumns::new();
68 for idx in watermark_indices.ones() {
69 watermark_columns.insert(idx, ctx.next_watermark_group_id());
71 }
72
73 let kind = if core.source_col.is_some() {
74 (inputs.iter().map(|i| i.stream_kind()))
78 .reduce(StreamKind::merge)
79 .unwrap()
80 } else {
81 if inputs.len() == 1 {
84 inputs[0].stream_kind()
86 } else if inputs.iter().all(|x| x.stream_kind().is_append_only()) {
87 StreamKind::AppendOnly
91 } else {
92 StreamKind::Upsert
94 }
95 };
96
97 let base = PlanBase::new_stream_with_core(
98 &core,
99 dist,
100 kind,
101 inputs.iter().all(|x| x.emit_on_window_close()),
102 watermark_columns,
103 MonotonicityMap::new(),
104 );
105
106 StreamUnion { base, core }
107 }
108}
109
110impl Distill for StreamUnion {
111 fn distill<'a>(&self) -> XmlNode<'a> {
112 let mut vec = self.core.fields_pretty();
113 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
114 vec.push(("output_watermarks", ow));
115 }
116 childless_record("StreamUnion", vec)
117 }
118}
119
120impl PlanTreeNode<Stream> for StreamUnion {
121 fn inputs(&self) -> smallvec::SmallVec<[PlanRef; 2]> {
122 smallvec::SmallVec::from_vec(self.core.inputs.clone())
123 }
124
125 fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
126 let mut new = self.core.clone();
127 new.inputs = inputs.to_vec();
128 let dist = self.distribution().clone();
129 Self::new_with_dist(new, dist).into()
130 }
131}
132
133impl StreamNode for StreamUnion {
134 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
135 PbNodeBody::Union(UnionNode {})
136 }
137}
138
139impl ExprRewritable<Stream> for StreamUnion {}
140
141impl ExprVisitable for StreamUnion {}