risingwave_frontend/optimizer/plan_node/
stream_project_set.rs1use itertools::Itertools;
16use risingwave_pb::stream_plan::ProjectSetNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{
22 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{
27 MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
28};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30use crate::utils::ColIndexMappingRewriteExt;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamProjectSet {
34 pub base: PlanBase<Stream>,
35 core: generic::ProjectSet<PlanRef>,
36 watermark_derivations: Vec<(usize, usize)>,
39 nondecreasing_exprs: Vec<usize>,
42}
43
44impl StreamProjectSet {
45 pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
46 let ctx = core.input.ctx();
47 let input = core.input.clone();
48 let distribution = core
49 .i2o_col_mapping()
50 .rewrite_provided_distribution(input.distribution());
51
52 let mut watermark_derivations = vec![];
53 let mut nondecreasing_exprs = vec![];
54 let mut out_watermark_columns = WatermarkColumns::new();
55 for (expr_idx, expr) in core.select_list.iter().enumerate() {
56 let out_expr_idx = expr_idx + 1;
57
58 use monotonicity_variants::*;
59 match analyze_monotonicity(expr) {
60 Inherent(monotonicity) => {
61 if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
62 nondecreasing_exprs.push(expr_idx);
66 out_watermark_columns.insert(out_expr_idx, ctx.next_watermark_group_id());
68 }
69 }
70 FollowingInput(input_idx) => {
71 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
72 watermark_derivations.push((input_idx, expr_idx));
74 out_watermark_columns.insert(out_expr_idx, wtmk_group);
76 }
77 }
78 _FollowingInputInversely(_) => {}
79 }
80 }
81
82 let base = PlanBase::new_stream_with_core(
85 &core,
86 distribution,
87 input.stream_kind(),
88 input.emit_on_window_close(),
89 out_watermark_columns,
90 MonotonicityMap::new(), );
92 StreamProjectSet {
93 base,
94 core,
95 watermark_derivations,
96 nondecreasing_exprs,
97 }
98 }
99}
100impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet");
101impl_plan_tree_node_for_unary! { Stream, StreamProjectSet }
102
103impl PlanTreeNodeUnary<Stream> for StreamProjectSet {
104 fn input(&self) -> PlanRef {
105 self.core.input.clone()
106 }
107
108 fn clone_with_input(&self, input: PlanRef) -> Self {
109 let mut core = self.core.clone();
110 core.input = input;
111 Self::new(core)
112 }
113}
114
115impl StreamNode for StreamProjectSet {
116 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
117 let (watermark_input_cols, watermark_expr_indices) = self
118 .watermark_derivations
119 .iter()
120 .map(|(i, o)| (*i as u32, *o as u32))
121 .unzip();
122 PbNodeBody::ProjectSet(Box::new(ProjectSetNode {
123 select_list: self
124 .core
125 .select_list
126 .iter()
127 .map(|select_item| select_item.to_project_set_select_item_proto())
128 .collect_vec(),
129 watermark_input_cols,
130 watermark_expr_indices,
131 nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
132 }))
133 }
134}
135
136impl ExprRewritable<Stream> for StreamProjectSet {
137 fn has_rewritable_expr(&self) -> bool {
138 true
139 }
140
141 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
142 let mut core = self.core.clone();
143 core.rewrite_exprs(r);
144 Self::new(core).into()
145 }
146}
147
148impl ExprVisitable for StreamProjectSet {
149 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
150 self.core.visit_exprs(v);
151 }
152}