risingwave_frontend/optimizer/plan_node/generic/
over_window.rs
1use itertools::Itertools;
16use pretty_xmlish::{Pretty, Str, XmlNode};
17use risingwave_common::catalog::{Field, Schema};
18use risingwave_common::types::DataType;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_expr::window_function::{Frame, WindowFuncKind};
22use risingwave_pb::expr::PbWindowFunction;
23
24use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
25use crate::OptimizerContextRef;
26use crate::expr::{InputRef, InputRefDisplay};
27use crate::optimizer::plan_node::utils::childless_record;
28use crate::optimizer::property::FunctionalDependencySet;
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct PlanWindowFunction {
35 pub kind: WindowFuncKind,
36 pub return_type: DataType,
37 pub args: Vec<InputRef>,
38 pub ignore_nulls: bool,
39 pub partition_by: Vec<InputRef>,
40 pub order_by: Vec<ColumnOrder>,
41 pub frame: Frame,
42}
43
44struct PlanWindowFunctionDisplay<'a> {
45 pub window_function: &'a PlanWindowFunction,
46 pub input_schema: &'a Schema,
47}
48
49impl std::fmt::Debug for PlanWindowFunctionDisplay<'_> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 let window_function = self.window_function;
52 if f.alternate() {
53 f.debug_struct("WindowFunction")
54 .field("kind", &window_function.kind)
55 .field("return_type", &window_function.return_type)
56 .field("args", &window_function.args)
57 .field("ignore_nulls", &window_function.ignore_nulls)
58 .field("partition_by", &window_function.partition_by)
59 .field("order_by", &window_function.order_by)
60 .field("frame", &window_function.frame)
61 .finish()
62 } else {
63 write!(f, "{}(", window_function.kind)?;
64 let mut delim = "";
65 for arg in &window_function.args {
66 write!(f, "{}", delim)?;
67 delim = ", ";
68 write!(
69 f,
70 "{}",
71 InputRefDisplay {
72 input_ref: arg,
73 input_schema: self.input_schema
74 }
75 )?;
76 }
77 if window_function.ignore_nulls {
78 write!(f, " IGNORE NULLS")?;
79 }
80 write!(f, ") OVER(")?;
81 let mut delim = "";
82 if !window_function.partition_by.is_empty() {
83 delim = " ";
84 write!(
85 f,
86 "PARTITION BY {}",
87 window_function
88 .partition_by
89 .iter()
90 .format_with(", ", |input_ref, f| {
91 f(&InputRefDisplay {
92 input_ref,
93 input_schema: self.input_schema,
94 })
95 })
96 )?;
97 }
98 if !window_function.order_by.is_empty() {
99 write!(
100 f,
101 "{delim}ORDER BY {}",
102 window_function.order_by.iter().format_with(", ", |o, f| {
103 f(&ColumnOrderDisplay {
104 column_order: o,
105 input_schema: self.input_schema,
106 })
107 })
108 )?;
109 }
110 write!(f, "{delim}{}", window_function.frame)?;
111 f.write_str(")")?;
112
113 Ok(())
114 }
115 }
116}
117
118impl PlanWindowFunction {
119 pub fn to_protobuf(&self) -> PbWindowFunction {
120 use WindowFuncKind::*;
121 use risingwave_pb::expr::window_function::{PbGeneralType, PbType};
122
123 let r#type = match &self.kind {
124 RowNumber => PbType::General(PbGeneralType::RowNumber as _),
125 Rank => PbType::General(PbGeneralType::Rank as _),
126 DenseRank => PbType::General(PbGeneralType::DenseRank as _),
127 Lag => PbType::General(PbGeneralType::Lag as _),
128 Lead => PbType::General(PbGeneralType::Lead as _),
129 Aggregate(agg_type) => PbType::Aggregate2(agg_type.to_protobuf()),
130 };
131
132 PbWindowFunction {
133 r#type: Some(r#type),
134 args: self.args.iter().map(InputRef::to_proto).collect(),
135 return_type: Some(self.return_type.to_protobuf()),
136 frame: Some(self.frame.to_protobuf()),
137 ignore_nulls: self.ignore_nulls,
138 }
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
143pub struct OverWindow<PlanRef> {
144 pub window_functions: Vec<PlanWindowFunction>,
145 pub input: PlanRef,
146}
147
148impl<PlanRef: GenericPlanRef> OverWindow<PlanRef> {
149 pub fn new(window_functions: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
150 Self {
151 window_functions,
152 input,
153 }
154 }
155
156 pub fn input_len(&self) -> usize {
157 self.input.schema().len()
158 }
159
160 pub fn output_len(&self) -> usize {
161 self.input.schema().len() + self.window_functions.len()
162 }
163
164 pub fn window_functions(&self) -> &[PlanWindowFunction] {
165 &self.window_functions
166 }
167
168 pub fn funcs_have_same_partition_and_order(&self) -> bool {
169 self.window_functions
170 .iter()
171 .map(|f| (&f.partition_by, &f.order_by))
172 .all_equal()
173 }
174
175 pub fn partition_key_indices(&self) -> Vec<usize> {
176 assert!(self.funcs_have_same_partition_and_order());
177 self.window_functions[0]
178 .partition_by
179 .iter()
180 .map(|i| i.index())
181 .collect()
182 }
183
184 pub fn order_key(&self) -> &[ColumnOrder] {
185 assert!(self.funcs_have_same_partition_and_order());
186 &self.window_functions[0].order_by
187 }
188
189 pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
190 (self.input, self.window_functions)
191 }
192}
193
194impl<PlanRef: GenericPlanRef> DistillUnit for OverWindow<PlanRef> {
195 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
196 let f = |func| {
197 Pretty::debug(&PlanWindowFunctionDisplay {
198 window_function: func,
199 input_schema: self.input.schema(),
200 })
201 };
202 let wf = Pretty::Array(self.window_functions.iter().map(f).collect());
203 let vec = vec![("window_functions", wf)];
204 childless_record(name, vec)
205 }
206}
207
208impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
209 fn functional_dependency(&self) -> FunctionalDependencySet {
210 let mapping =
211 ColIndexMapping::identity_or_none(self.input.schema().len(), self.output_len());
212 let fd_set = self.input.functional_dependency().clone();
213 mapping.rewrite_functional_dependency_set(fd_set)
214 }
215
216 fn schema(&self) -> Schema {
217 let mut schema = self.input.schema().clone();
218 self.window_functions.iter().for_each(|call| {
219 schema.fields.push(Field::with_name(
220 call.return_type.clone(),
221 call.kind.to_string(),
222 ));
223 });
224 schema
225 }
226
227 fn stream_key(&self) -> Option<Vec<usize>> {
228 let mut output_pk = self.input.stream_key()?.to_vec();
229 for part_key_idx in self
230 .window_functions
231 .iter()
232 .flat_map(|f| f.partition_by.iter().map(|i| i.index))
233 {
234 if !output_pk.contains(&part_key_idx) {
235 output_pk.push(part_key_idx);
236 }
237 }
238 Some(output_pk)
239 }
240
241 fn ctx(&self) -> OptimizerContextRef {
242 self.input.ctx()
243 }
244}