risingwave_frontend/optimizer/plan_node/generic/
over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, Str, XmlNode};
17use risingwave_common::catalog::{Field, Schema};
18use risingwave_common::types::DataType;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_expr::window_function::{Frame, WindowFuncKind};
22use risingwave_pb::expr::PbWindowFunction;
23
24use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
25use crate::OptimizerContextRef;
26use crate::expr::{InputRef, InputRefDisplay};
27use crate::optimizer::plan_node::utils::childless_record;
28use crate::optimizer::property::FunctionalDependencySet;
29use crate::utils::ColIndexMappingRewriteExt;
30
31/// Rewritten version of [`crate::expr::WindowFunction`] which uses `InputRef` instead of
32/// `ExprImpl`.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct PlanWindowFunction {
35    pub kind: WindowFuncKind,
36    pub return_type: DataType,
37    pub args: Vec<InputRef>,
38    pub ignore_nulls: bool,
39    pub partition_by: Vec<InputRef>,
40    pub order_by: Vec<ColumnOrder>,
41    pub frame: Frame,
42}
43
44struct PlanWindowFunctionDisplay<'a> {
45    pub window_function: &'a PlanWindowFunction,
46    pub input_schema: &'a Schema,
47}
48
49impl std::fmt::Debug for PlanWindowFunctionDisplay<'_> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        let window_function = self.window_function;
52        if f.alternate() {
53            f.debug_struct("WindowFunction")
54                .field("kind", &window_function.kind)
55                .field("return_type", &window_function.return_type)
56                .field("args", &window_function.args)
57                .field("ignore_nulls", &window_function.ignore_nulls)
58                .field("partition_by", &window_function.partition_by)
59                .field("order_by", &window_function.order_by)
60                .field("frame", &window_function.frame)
61                .finish()
62        } else {
63            write!(f, "{}(", window_function.kind)?;
64            let mut delim = "";
65            for arg in &window_function.args {
66                write!(f, "{}", delim)?;
67                delim = ", ";
68                write!(
69                    f,
70                    "{}",
71                    InputRefDisplay {
72                        input_ref: arg,
73                        input_schema: self.input_schema
74                    }
75                )?;
76            }
77            if window_function.ignore_nulls {
78                write!(f, " IGNORE NULLS")?;
79            }
80            write!(f, ") OVER(")?;
81            let mut delim = "";
82            if !window_function.partition_by.is_empty() {
83                delim = " ";
84                write!(
85                    f,
86                    "PARTITION BY {}",
87                    window_function
88                        .partition_by
89                        .iter()
90                        .format_with(", ", |input_ref, f| {
91                            f(&InputRefDisplay {
92                                input_ref,
93                                input_schema: self.input_schema,
94                            })
95                        })
96                )?;
97            }
98            if !window_function.order_by.is_empty() {
99                write!(
100                    f,
101                    "{delim}ORDER BY {}",
102                    window_function.order_by.iter().format_with(", ", |o, f| {
103                        f(&ColumnOrderDisplay {
104                            column_order: o,
105                            input_schema: self.input_schema,
106                        })
107                    })
108                )?;
109            }
110            write!(f, "{delim}{}", window_function.frame)?;
111            f.write_str(")")?;
112
113            Ok(())
114        }
115    }
116}
117
118impl PlanWindowFunction {
119    pub fn to_protobuf(&self) -> PbWindowFunction {
120        use WindowFuncKind::*;
121        use risingwave_pb::expr::window_function::{PbGeneralType, PbType};
122
123        let r#type = match &self.kind {
124            RowNumber => PbType::General(PbGeneralType::RowNumber as _),
125            Rank => PbType::General(PbGeneralType::Rank as _),
126            DenseRank => PbType::General(PbGeneralType::DenseRank as _),
127            Lag => PbType::General(PbGeneralType::Lag as _),
128            Lead => PbType::General(PbGeneralType::Lead as _),
129            Aggregate(agg_type) => PbType::Aggregate2(agg_type.to_protobuf()),
130        };
131
132        PbWindowFunction {
133            r#type: Some(r#type),
134            args: self.args.iter().map(InputRef::to_proto).collect(),
135            return_type: Some(self.return_type.to_protobuf()),
136            frame: Some(self.frame.to_protobuf()),
137            ignore_nulls: self.ignore_nulls,
138        }
139    }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
143pub struct OverWindow<PlanRef> {
144    pub window_functions: Vec<PlanWindowFunction>,
145    pub input: PlanRef,
146}
147
148impl<PlanRef: GenericPlanRef> OverWindow<PlanRef> {
149    pub fn new(window_functions: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
150        Self {
151            window_functions,
152            input,
153        }
154    }
155
156    pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> OverWindow<OtherPlanRef> {
157        OverWindow {
158            window_functions: self.window_functions.clone(),
159            input,
160        }
161    }
162
163    pub fn input_len(&self) -> usize {
164        self.input.schema().len()
165    }
166
167    pub fn output_len(&self) -> usize {
168        self.input.schema().len() + self.window_functions.len()
169    }
170
171    pub fn window_functions(&self) -> &[PlanWindowFunction] {
172        &self.window_functions
173    }
174
175    pub fn funcs_have_same_partition_and_order(&self) -> bool {
176        self.window_functions
177            .iter()
178            .map(|f| (&f.partition_by, &f.order_by))
179            .all_equal()
180    }
181
182    pub fn partition_key_indices(&self) -> Vec<usize> {
183        assert!(self.funcs_have_same_partition_and_order());
184        self.window_functions[0]
185            .partition_by
186            .iter()
187            .map(|i| i.index())
188            .collect()
189    }
190
191    pub fn order_key(&self) -> &[ColumnOrder] {
192        assert!(self.funcs_have_same_partition_and_order());
193        &self.window_functions[0].order_by
194    }
195
196    pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
197        (self.input, self.window_functions)
198    }
199}
200
201impl<PlanRef: GenericPlanRef> DistillUnit for OverWindow<PlanRef> {
202    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
203        let f = |func| {
204            Pretty::debug(&PlanWindowFunctionDisplay {
205                window_function: func,
206                input_schema: self.input.schema(),
207            })
208        };
209        let wf = Pretty::Array(self.window_functions.iter().map(f).collect());
210        let vec = vec![("window_functions", wf)];
211        childless_record(name, vec)
212    }
213}
214
215impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
216    fn functional_dependency(&self) -> FunctionalDependencySet {
217        let mapping =
218            ColIndexMapping::identity_or_none(self.input.schema().len(), self.output_len());
219        let fd_set = self.input.functional_dependency().clone();
220        mapping.rewrite_functional_dependency_set(fd_set)
221    }
222
223    fn schema(&self) -> Schema {
224        let mut schema = self.input.schema().clone();
225        self.window_functions.iter().for_each(|call| {
226            schema.fields.push(Field::with_name(
227                call.return_type.clone(),
228                call.kind.to_string(),
229            ));
230        });
231        schema
232    }
233
234    fn stream_key(&self) -> Option<Vec<usize>> {
235        let mut output_pk = self.input.stream_key()?.to_vec();
236        for part_key_idx in self
237            .window_functions
238            .iter()
239            .flat_map(|f| f.partition_by.iter().map(|i| i.index))
240        {
241            if !output_pk.contains(&part_key_idx) {
242                output_pk.push(part_key_idx);
243            }
244        }
245        Some(output_pk)
246    }
247
248    fn ctx(&self) -> OptimizerContextRef {
249        self.input.ctx()
250    }
251}