risingwave_frontend/optimizer/property/
order.rs1use std::fmt;
16
17use itertools::Itertools;
18use pretty_xmlish::Pretty;
19use risingwave_common::catalog::Schema;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_pb::common::PbColumnOrder;
22
23use super::super::plan_node::*;
24use crate::error::Result;
25
26#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
28pub struct Order {
29 pub column_orders: Vec<ColumnOrder>,
30}
31
32impl Order {
33 pub const fn new(column_orders: Vec<ColumnOrder>) -> Self {
34 Self { column_orders }
35 }
36
37 pub fn to_protobuf(&self) -> Vec<PbColumnOrder> {
38 self.column_orders
39 .iter()
40 .map(ColumnOrder::to_protobuf)
41 .collect_vec()
42 }
43
44 #[allow(clippy::len_without_is_empty)]
45 pub fn len(&self) -> usize {
46 self.column_orders.len()
47 }
48
49 pub fn concat(self, other: Self) -> Self {
50 Self {
51 column_orders: self
52 .column_orders
53 .into_iter()
54 .chain(other.column_orders)
55 .collect(),
56 }
57 }
58}
59
60impl fmt::Display for Order {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "[")?;
63 for (i, column_order) in self.column_orders.iter().enumerate() {
64 if i > 0 {
65 write!(f, ", ")?;
66 }
67 write!(f, "{}", column_order)?;
68 }
69 write!(f, "]")
70 }
71}
72
73pub struct OrderDisplay<'a> {
74 pub order: &'a Order,
75 pub input_schema: &'a Schema,
76}
77
78impl OrderDisplay<'_> {
79 pub fn distill<'a>(self) -> Pretty<'a> {
80 let iter = self.order.column_orders.iter();
81 let vec = iter.map(|column_order| {
82 Pretty::display(&ColumnOrderDisplay {
83 column_order,
84 input_schema: self.input_schema,
85 })
86 });
87 Pretty::Array(vec.collect())
88 }
89}
90
91const ANY_ORDER: Order = Order {
92 column_orders: vec![],
93};
94
95impl Order {
96 pub fn enforce_if_not_satisfies(&self, plan: PlanRef) -> Result<PlanRef> {
97 use crate::optimizer::plan_node::batch::prelude::*;
98
99 if !plan.order().satisfies(self) {
100 Ok(self.enforce(plan))
101 } else {
102 Ok(plan)
103 }
104 }
105
106 fn enforce(&self, plan: PlanRef) -> PlanRef {
107 assert_eq!(plan.convention(), Convention::Batch);
108 BatchSort::new(plan, self.clone()).into()
109 }
110
111 pub fn satisfies(&self, other: &Order) -> bool {
112 if self.column_orders.len() < other.column_orders.len() {
113 return false;
114 }
115 #[expect(clippy::disallowed_methods)]
116 for (order, other_order) in self.column_orders.iter().zip(other.column_orders.iter()) {
117 if order != other_order {
118 return false;
119 }
120 }
121 true
122 }
123
124 #[inline(always)]
125 pub const fn any() -> Self {
126 ANY_ORDER
127 }
128
129 #[inline(always)]
130 pub fn is_any(&self) -> bool {
131 self.column_orders.is_empty()
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
138
139 use super::Order;
140
141 #[test]
142 fn test_order_satisfy() {
143 let o1 = Order {
144 column_orders: vec![
145 ColumnOrder {
146 column_index: 0,
147 order_type: OrderType::ascending(),
148 },
149 ColumnOrder {
150 column_index: 1,
151 order_type: OrderType::descending(),
152 },
153 ColumnOrder {
154 column_index: 2,
155 order_type: OrderType::ascending(),
156 },
157 ],
158 };
159 let o2 = Order {
160 column_orders: vec![
161 ColumnOrder {
162 column_index: 0,
163 order_type: OrderType::ascending(),
164 },
165 ColumnOrder {
166 column_index: 1,
167 order_type: OrderType::descending(),
168 },
169 ],
170 };
171 let o3 = Order {
172 column_orders: vec![
173 ColumnOrder {
174 column_index: 0,
175 order_type: OrderType::ascending(),
176 },
177 ColumnOrder {
178 column_index: 1,
179 order_type: OrderType::ascending(),
180 },
181 ],
182 };
183
184 assert!(o1.satisfies(&o2));
185 assert!(!o2.satisfies(&o1));
186 assert!(o1.satisfies(&o1));
187
188 assert!(!o2.satisfies(&o3));
189 assert!(!o3.satisfies(&o2));
190 }
191}