risingwave_frontend/optimizer/plan_node/
stream_project_set.rs1use itertools::Itertools;
16use risingwave_pb::stream_plan::ProjectSetNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::expr::{ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::property::{
25 MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
26};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28use crate::utils::ColIndexMappingRewriteExt;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamProjectSet {
32 pub base: PlanBase<Stream>,
33 core: generic::ProjectSet<PlanRef>,
34 watermark_derivations: Vec<(usize, usize)>,
37 nondecreasing_exprs: Vec<usize>,
40}
41
42impl StreamProjectSet {
43 pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
44 let ctx = core.input.ctx();
45 let input = core.input.clone();
46 let distribution = core
47 .i2o_col_mapping()
48 .rewrite_provided_distribution(input.distribution());
49
50 let mut watermark_derivations = vec![];
51 let mut nondecreasing_exprs = vec![];
52 let mut out_watermark_columns = WatermarkColumns::new();
53 for (expr_idx, expr) in core.select_list.iter().enumerate() {
54 let out_expr_idx = expr_idx + 1;
55
56 use monotonicity_variants::*;
57 match analyze_monotonicity(expr) {
58 Inherent(monotonicity) => {
59 if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
60 nondecreasing_exprs.push(expr_idx);
64 out_watermark_columns.insert(out_expr_idx, ctx.next_watermark_group_id());
66 }
67 }
68 FollowingInput(input_idx) => {
69 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
70 watermark_derivations.push((input_idx, expr_idx));
72 out_watermark_columns.insert(out_expr_idx, wtmk_group);
74 }
75 }
76 _FollowingInputInversely(_) => {}
77 }
78 }
79
80 let base = PlanBase::new_stream_with_core(
83 &core,
84 distribution,
85 input.append_only(),
86 input.emit_on_window_close(),
87 out_watermark_columns,
88 MonotonicityMap::new(), );
90 StreamProjectSet {
91 base,
92 core,
93 watermark_derivations,
94 nondecreasing_exprs,
95 }
96 }
97}
98impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet");
99impl_plan_tree_node_for_unary! { StreamProjectSet }
100
101impl PlanTreeNodeUnary for StreamProjectSet {
102 fn input(&self) -> PlanRef {
103 self.core.input.clone()
104 }
105
106 fn clone_with_input(&self, input: PlanRef) -> Self {
107 let mut core = self.core.clone();
108 core.input = input;
109 Self::new(core)
110 }
111}
112
113impl StreamNode for StreamProjectSet {
114 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
115 let (watermark_input_cols, watermark_expr_indices) = self
116 .watermark_derivations
117 .iter()
118 .map(|(i, o)| (*i as u32, *o as u32))
119 .unzip();
120 PbNodeBody::ProjectSet(Box::new(ProjectSetNode {
121 select_list: self
122 .core
123 .select_list
124 .iter()
125 .map(|select_item| select_item.to_project_set_select_item_proto())
126 .collect_vec(),
127 watermark_input_cols,
128 watermark_expr_indices,
129 nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
130 }))
131 }
132}
133
134impl ExprRewritable for StreamProjectSet {
135 fn has_rewritable_expr(&self) -> bool {
136 true
137 }
138
139 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
140 let mut core = self.core.clone();
141 core.rewrite_exprs(r);
142 Self::new(core).into()
143 }
144}
145
146impl ExprVisitable for StreamProjectSet {
147 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
148 self.core.visit_exprs(v);
149 }
150}