risingwave_frontend/optimizer/plan_node/generic/
over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::{Pretty, Str, XmlNode};
17use risingwave_common::catalog::{Field, Schema};
18use risingwave_common::types::DataType;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_expr::window_function::{Frame, WindowFuncKind};
22use risingwave_pb::expr::PbWindowFunction;
23
24use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
25use crate::OptimizerContextRef;
26use crate::expr::{InputRef, InputRefDisplay};
27use crate::optimizer::plan_node::utils::childless_record;
28use crate::optimizer::property::FunctionalDependencySet;
29use crate::utils::ColIndexMappingRewriteExt;
30
31/// Rewritten version of [`crate::expr::WindowFunction`] which uses `InputRef` instead of
32/// `ExprImpl`.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct PlanWindowFunction {
35    pub kind: WindowFuncKind,
36    pub return_type: DataType,
37    pub args: Vec<InputRef>,
38    pub ignore_nulls: bool,
39    pub partition_by: Vec<InputRef>,
40    pub order_by: Vec<ColumnOrder>,
41    pub frame: Frame,
42}
43
44struct PlanWindowFunctionDisplay<'a> {
45    pub window_function: &'a PlanWindowFunction,
46    pub input_schema: &'a Schema,
47}
48
49impl std::fmt::Debug for PlanWindowFunctionDisplay<'_> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        let window_function = self.window_function;
52        if f.alternate() {
53            f.debug_struct("WindowFunction")
54                .field("kind", &window_function.kind)
55                .field("return_type", &window_function.return_type)
56                .field("args", &window_function.args)
57                .field("ignore_nulls", &window_function.ignore_nulls)
58                .field("partition_by", &window_function.partition_by)
59                .field("order_by", &window_function.order_by)
60                .field("frame", &window_function.frame)
61                .finish()
62        } else {
63            write!(f, "{}(", window_function.kind)?;
64            let mut delim = "";
65            for arg in &window_function.args {
66                write!(f, "{}", delim)?;
67                delim = ", ";
68                write!(
69                    f,
70                    "{}",
71                    InputRefDisplay {
72                        input_ref: arg,
73                        input_schema: self.input_schema
74                    }
75                )?;
76            }
77            if window_function.ignore_nulls {
78                write!(f, " IGNORE NULLS")?;
79            }
80            write!(f, ") OVER(")?;
81            let mut delim = "";
82            if !window_function.partition_by.is_empty() {
83                delim = " ";
84                write!(
85                    f,
86                    "PARTITION BY {}",
87                    window_function
88                        .partition_by
89                        .iter()
90                        .format_with(", ", |input_ref, f| {
91                            f(&InputRefDisplay {
92                                input_ref,
93                                input_schema: self.input_schema,
94                            })
95                        })
96                )?;
97            }
98            if !window_function.order_by.is_empty() {
99                write!(
100                    f,
101                    "{delim}ORDER BY {}",
102                    window_function.order_by.iter().format_with(", ", |o, f| {
103                        f(&ColumnOrderDisplay {
104                            column_order: o,
105                            input_schema: self.input_schema,
106                        })
107                    })
108                )?;
109            }
110            write!(f, "{delim}{}", window_function.frame)?;
111            f.write_str(")")?;
112
113            Ok(())
114        }
115    }
116}
117
118impl PlanWindowFunction {
119    pub fn to_protobuf(&self) -> PbWindowFunction {
120        use WindowFuncKind::*;
121        use risingwave_pb::expr::window_function::{PbGeneralType, PbType};
122
123        let r#type = match &self.kind {
124            RowNumber => PbType::General(PbGeneralType::RowNumber as _),
125            Rank => PbType::General(PbGeneralType::Rank as _),
126            DenseRank => PbType::General(PbGeneralType::DenseRank as _),
127            Lag => PbType::General(PbGeneralType::Lag as _),
128            Lead => PbType::General(PbGeneralType::Lead as _),
129            Aggregate(agg_type) => PbType::Aggregate2(agg_type.to_protobuf()),
130        };
131
132        PbWindowFunction {
133            r#type: Some(r#type),
134            args: self.args.iter().map(InputRef::to_proto).collect(),
135            return_type: Some(self.return_type.to_protobuf()),
136            frame: Some(self.frame.to_protobuf()),
137            ignore_nulls: self.ignore_nulls,
138        }
139    }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
143pub struct OverWindow<PlanRef> {
144    pub window_functions: Vec<PlanWindowFunction>,
145    pub input: PlanRef,
146}
147
148impl<PlanRef: GenericPlanRef> OverWindow<PlanRef> {
149    pub fn new(window_functions: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
150        Self {
151            window_functions,
152            input,
153        }
154    }
155
156    pub fn input_len(&self) -> usize {
157        self.input.schema().len()
158    }
159
160    pub fn output_len(&self) -> usize {
161        self.input.schema().len() + self.window_functions.len()
162    }
163
164    pub fn window_functions(&self) -> &[PlanWindowFunction] {
165        &self.window_functions
166    }
167
168    pub fn funcs_have_same_partition_and_order(&self) -> bool {
169        self.window_functions
170            .iter()
171            .map(|f| (&f.partition_by, &f.order_by))
172            .all_equal()
173    }
174
175    pub fn partition_key_indices(&self) -> Vec<usize> {
176        assert!(self.funcs_have_same_partition_and_order());
177        self.window_functions[0]
178            .partition_by
179            .iter()
180            .map(|i| i.index())
181            .collect()
182    }
183
184    pub fn order_key(&self) -> &[ColumnOrder] {
185        assert!(self.funcs_have_same_partition_and_order());
186        &self.window_functions[0].order_by
187    }
188
189    pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
190        (self.input, self.window_functions)
191    }
192}
193
194impl<PlanRef: GenericPlanRef> DistillUnit for OverWindow<PlanRef> {
195    fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
196        let f = |func| {
197            Pretty::debug(&PlanWindowFunctionDisplay {
198                window_function: func,
199                input_schema: self.input.schema(),
200            })
201        };
202        let wf = Pretty::Array(self.window_functions.iter().map(f).collect());
203        let vec = vec![("window_functions", wf)];
204        childless_record(name, vec)
205    }
206}
207
208impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
209    fn functional_dependency(&self) -> FunctionalDependencySet {
210        let mapping =
211            ColIndexMapping::identity_or_none(self.input.schema().len(), self.output_len());
212        let fd_set = self.input.functional_dependency().clone();
213        mapping.rewrite_functional_dependency_set(fd_set)
214    }
215
216    fn schema(&self) -> Schema {
217        let mut schema = self.input.schema().clone();
218        self.window_functions.iter().for_each(|call| {
219            schema.fields.push(Field::with_name(
220                call.return_type.clone(),
221                call.kind.to_string(),
222            ));
223        });
224        schema
225    }
226
227    fn stream_key(&self) -> Option<Vec<usize>> {
228        let mut output_pk = self.input.stream_key()?.to_vec();
229        for part_key_idx in self
230            .window_functions
231            .iter()
232            .flat_map(|f| f.partition_by.iter().map(|i| i.index))
233        {
234            if !output_pk.contains(&part_key_idx) {
235                output_pk.push(part_key_idx);
236            }
237        }
238        Some(output_pk)
239    }
240
241    fn ctx(&self) -> OptimizerContextRef {
242        self.input.ctx()
243    }
244}