risingwave_frontend/optimizer/plan_node/
stream_project_set.rs1use risingwave_pb::stream_plan::ProjectSetNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::stream::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb, generic,
22};
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{
26 MonotonicityMap, WatermarkColumns, analyze_monotonicity, monotonicity_variants,
27};
28use crate::scheduler::SchedulerResult;
29use crate::stream_fragmenter::BuildFragmentGraphState;
30use crate::utils::ColIndexMappingRewriteExt;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamProjectSet {
34 pub base: PlanBase<Stream>,
35 core: generic::ProjectSet<PlanRef>,
36 watermark_derivations: Vec<(usize, usize)>,
39 nondecreasing_exprs: Vec<usize>,
42}
43
44impl StreamProjectSet {
45 pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
46 let ctx = core.input.ctx();
47 let input = core.input.clone();
48 let distribution = core
49 .i2o_col_mapping()
50 .rewrite_provided_distribution(input.distribution());
51
52 let mut watermark_derivations = vec![];
53 let mut nondecreasing_exprs = vec![];
54 let mut out_watermark_columns = WatermarkColumns::new();
55 for (expr_idx, expr) in core.select_list.iter().enumerate() {
56 let out_expr_idx = expr_idx + 1;
57
58 use monotonicity_variants::*;
59 match analyze_monotonicity(expr) {
60 Inherent(monotonicity) => {
61 if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
62 nondecreasing_exprs.push(expr_idx);
66 out_watermark_columns.insert(out_expr_idx, ctx.next_watermark_group_id());
68 }
69 }
70 FollowingInput(input_idx) => {
71 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
72 watermark_derivations.push((input_idx, expr_idx));
74 out_watermark_columns.insert(out_expr_idx, wtmk_group);
76 }
77 }
78 _FollowingInputInversely(_) => {}
79 }
80 }
81
82 let base = PlanBase::new_stream_with_core(
85 &core,
86 distribution,
87 input.stream_kind(),
88 input.emit_on_window_close(),
89 out_watermark_columns,
90 MonotonicityMap::new(), );
92 StreamProjectSet {
93 base,
94 core,
95 watermark_derivations,
96 nondecreasing_exprs,
97 }
98 }
99}
100impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet");
101impl_plan_tree_node_for_unary! { Stream, StreamProjectSet }
102
103impl PlanTreeNodeUnary<Stream> for StreamProjectSet {
104 fn input(&self) -> PlanRef {
105 self.core.input.clone()
106 }
107
108 fn clone_with_input(&self, input: PlanRef) -> Self {
109 let mut core = self.core.clone();
110 core.input = input;
111 Self::new(core)
112 }
113}
114
115impl TryToStreamPb for StreamProjectSet {
116 fn try_to_stream_prost_body(
117 &self,
118 _state: &mut BuildFragmentGraphState,
119 ) -> SchedulerResult<PbNodeBody> {
120 let (watermark_input_cols, watermark_expr_indices) = self
121 .watermark_derivations
122 .iter()
123 .map(|(i, o)| (*i as u32, *o as u32))
124 .unzip();
125 let select_list = self
126 .core
127 .select_list
128 .iter()
129 .map(|select_item| {
130 select_item.to_project_set_select_item_proto_checked_pure(
131 self.input().stream_kind().is_retract(),
132 )
133 })
134 .collect::<crate::error::Result<Vec<_>>>()?;
135 Ok(PbNodeBody::ProjectSet(Box::new(ProjectSetNode {
136 select_list,
137 watermark_input_cols,
138 watermark_expr_indices,
139 nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
140 })))
141 }
142}
143
144impl ExprRewritable<Stream> for StreamProjectSet {
145 fn has_rewritable_expr(&self) -> bool {
146 true
147 }
148
149 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
150 let mut core = self.core.clone();
151 core.rewrite_exprs(r);
152 Self::new(core).into()
153 }
154}
155
156impl ExprVisitable for StreamProjectSet {
157 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
158 self.core.visit_exprs(v);
159 }
160}