risingwave_frontend/optimizer/plan_node/generic/
over_window.rs1use itertools::Itertools;
16use pretty_xmlish::{Pretty, Str, XmlNode};
17use risingwave_common::catalog::{Field, Schema};
18use risingwave_common::types::DataType;
19use risingwave_common::util::column_index_mapping::ColIndexMapping;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_expr::window_function::{Frame, WindowFuncKind};
22use risingwave_pb::expr::PbWindowFunction;
23
24use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
25use crate::OptimizerContextRef;
26use crate::expr::{InputRef, InputRefDisplay};
27use crate::optimizer::plan_node::utils::childless_record;
28use crate::optimizer::property::FunctionalDependencySet;
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct PlanWindowFunction {
35 pub kind: WindowFuncKind,
36 pub return_type: DataType,
37 pub args: Vec<InputRef>,
38 pub ignore_nulls: bool,
39 pub partition_by: Vec<InputRef>,
40 pub order_by: Vec<ColumnOrder>,
41 pub frame: Frame,
42}
43
44struct PlanWindowFunctionDisplay<'a> {
45 pub window_function: &'a PlanWindowFunction,
46 pub input_schema: &'a Schema,
47}
48
49impl std::fmt::Debug for PlanWindowFunctionDisplay<'_> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 let window_function = self.window_function;
52 if f.alternate() {
53 f.debug_struct("WindowFunction")
54 .field("kind", &window_function.kind)
55 .field("return_type", &window_function.return_type)
56 .field("args", &window_function.args)
57 .field("ignore_nulls", &window_function.ignore_nulls)
58 .field("partition_by", &window_function.partition_by)
59 .field("order_by", &window_function.order_by)
60 .field("frame", &window_function.frame)
61 .finish()
62 } else {
63 write!(f, "{}(", window_function.kind)?;
64 let mut delim = "";
65 for arg in &window_function.args {
66 write!(f, "{}", delim)?;
67 delim = ", ";
68 write!(
69 f,
70 "{}",
71 InputRefDisplay {
72 input_ref: arg,
73 input_schema: self.input_schema
74 }
75 )?;
76 }
77 if window_function.ignore_nulls {
78 write!(f, " IGNORE NULLS")?;
79 }
80 write!(f, ") OVER(")?;
81 let mut delim = "";
82 if !window_function.partition_by.is_empty() {
83 delim = " ";
84 write!(
85 f,
86 "PARTITION BY {}",
87 window_function
88 .partition_by
89 .iter()
90 .format_with(", ", |input_ref, f| {
91 f(&InputRefDisplay {
92 input_ref,
93 input_schema: self.input_schema,
94 })
95 })
96 )?;
97 }
98 if !window_function.order_by.is_empty() {
99 write!(
100 f,
101 "{delim}ORDER BY {}",
102 window_function.order_by.iter().format_with(", ", |o, f| {
103 f(&ColumnOrderDisplay {
104 column_order: o,
105 input_schema: self.input_schema,
106 })
107 })
108 )?;
109 }
110 write!(f, "{delim}{}", window_function.frame)?;
111 f.write_str(")")?;
112
113 Ok(())
114 }
115 }
116}
117
118impl PlanWindowFunction {
119 pub fn to_protobuf(&self) -> PbWindowFunction {
120 use WindowFuncKind::*;
121 use risingwave_pb::expr::window_function::{PbGeneralType, PbType};
122
123 let r#type = match &self.kind {
124 RowNumber => PbType::General(PbGeneralType::RowNumber as _),
125 Rank => PbType::General(PbGeneralType::Rank as _),
126 DenseRank => PbType::General(PbGeneralType::DenseRank as _),
127 Lag => PbType::General(PbGeneralType::Lag as _),
128 Lead => PbType::General(PbGeneralType::Lead as _),
129 Aggregate(agg_type) => PbType::Aggregate2(agg_type.to_protobuf()),
130 };
131
132 PbWindowFunction {
133 r#type: Some(r#type),
134 args: self.args.iter().map(InputRef::to_proto).collect(),
135 return_type: Some(self.return_type.to_protobuf()),
136 frame: Some(self.frame.to_protobuf()),
137 ignore_nulls: self.ignore_nulls,
138 }
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
143pub struct OverWindow<PlanRef> {
144 pub window_functions: Vec<PlanWindowFunction>,
145 pub input: PlanRef,
146}
147
148impl<PlanRef: GenericPlanRef> OverWindow<PlanRef> {
149 pub fn new(window_functions: Vec<PlanWindowFunction>, input: PlanRef) -> Self {
150 Self {
151 window_functions,
152 input,
153 }
154 }
155
156 pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> OverWindow<OtherPlanRef> {
157 OverWindow {
158 window_functions: self.window_functions.clone(),
159 input,
160 }
161 }
162
163 pub fn input_len(&self) -> usize {
164 self.input.schema().len()
165 }
166
167 pub fn output_len(&self) -> usize {
168 self.input.schema().len() + self.window_functions.len()
169 }
170
171 pub fn window_functions(&self) -> &[PlanWindowFunction] {
172 &self.window_functions
173 }
174
175 pub fn funcs_have_same_partition_and_order(&self) -> bool {
176 self.window_functions
177 .iter()
178 .map(|f| (&f.partition_by, &f.order_by))
179 .all_equal()
180 }
181
182 pub fn partition_key_indices(&self) -> Vec<usize> {
183 assert!(self.funcs_have_same_partition_and_order());
184 self.window_functions[0]
185 .partition_by
186 .iter()
187 .map(|i| i.index())
188 .collect()
189 }
190
191 pub fn order_key(&self) -> &[ColumnOrder] {
192 assert!(self.funcs_have_same_partition_and_order());
193 &self.window_functions[0].order_by
194 }
195
196 pub fn decompose(self) -> (PlanRef, Vec<PlanWindowFunction>) {
197 (self.input, self.window_functions)
198 }
199}
200
201impl<PlanRef: GenericPlanRef> DistillUnit for OverWindow<PlanRef> {
202 fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
203 let f = |func| {
204 Pretty::debug(&PlanWindowFunctionDisplay {
205 window_function: func,
206 input_schema: self.input.schema(),
207 })
208 };
209 let wf = Pretty::Array(self.window_functions.iter().map(f).collect());
210 let vec = vec![("window_functions", wf)];
211 childless_record(name, vec)
212 }
213}
214
215impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
216 fn functional_dependency(&self) -> FunctionalDependencySet {
217 let mapping =
218 ColIndexMapping::identity_or_none(self.input.schema().len(), self.output_len());
219 let fd_set = self.input.functional_dependency().clone();
220 mapping.rewrite_functional_dependency_set(fd_set)
221 }
222
223 fn schema(&self) -> Schema {
224 let mut schema = self.input.schema().clone();
225 self.window_functions.iter().for_each(|call| {
226 schema.fields.push(Field::with_name(
227 call.return_type.clone(),
228 call.kind.to_string(),
229 ));
230 });
231 schema
232 }
233
234 fn stream_key(&self) -> Option<Vec<usize>> {
235 let mut output_pk = self.input.stream_key()?.to_vec();
236 for part_key_idx in self
237 .window_functions
238 .iter()
239 .flat_map(|f| f.partition_by.iter().map(|i| i.index))
240 {
241 if !output_pk.contains(&part_key_idx) {
242 output_pk.push(part_key_idx);
243 }
244 }
245 Some(output_pk)
246 }
247
248 fn ctx(&self) -> OptimizerContextRef {
249 self.input.ctx()
250 }
251}