risingwave_frontend/optimizer/property/
order.rs1use std::fmt;
16
17use itertools::Itertools;
18use pretty_xmlish::Pretty;
19use risingwave_common::catalog::Schema;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_pb::common::PbColumnOrder;
22
23use super::super::plan_node::*;
24use crate::error::Result;
25
26#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
27pub struct Order {
28 pub column_orders: Vec<ColumnOrder>,
29}
30
31impl Order {
32 pub const fn new(column_orders: Vec<ColumnOrder>) -> Self {
33 Self { column_orders }
34 }
35
36 pub fn to_protobuf(&self) -> Vec<PbColumnOrder> {
37 self.column_orders
38 .iter()
39 .map(ColumnOrder::to_protobuf)
40 .collect_vec()
41 }
42
43 #[allow(clippy::len_without_is_empty)]
44 pub fn len(&self) -> usize {
45 self.column_orders.len()
46 }
47
48 pub fn concat(self, other: Self) -> Self {
49 Self {
50 column_orders: self
51 .column_orders
52 .into_iter()
53 .chain(other.column_orders)
54 .collect(),
55 }
56 }
57}
58
59impl fmt::Display for Order {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 write!(f, "[")?;
62 for (i, column_order) in self.column_orders.iter().enumerate() {
63 if i > 0 {
64 write!(f, ", ")?;
65 }
66 write!(f, "{}", column_order)?;
67 }
68 write!(f, "]")
69 }
70}
71
72pub struct OrderDisplay<'a> {
73 pub order: &'a Order,
74 pub input_schema: &'a Schema,
75}
76
77impl OrderDisplay<'_> {
78 pub fn distill<'a>(self) -> Pretty<'a> {
79 let iter = self.order.column_orders.iter();
80 let vec = iter.map(|column_order| {
81 Pretty::display(&ColumnOrderDisplay {
82 column_order,
83 input_schema: self.input_schema,
84 })
85 });
86 Pretty::Array(vec.collect())
87 }
88}
89
90const ANY_ORDER: Order = Order {
91 column_orders: vec![],
92};
93
94impl Order {
95 pub fn enforce_if_not_satisfies(&self, plan: PlanRef) -> Result<PlanRef> {
96 use crate::optimizer::plan_node::batch::prelude::*;
97
98 if !plan.order().satisfies(self) {
99 Ok(self.enforce(plan))
100 } else {
101 Ok(plan)
102 }
103 }
104
105 fn enforce(&self, plan: PlanRef) -> PlanRef {
106 assert_eq!(plan.convention(), Convention::Batch);
107 BatchSort::new(plan, self.clone()).into()
108 }
109
110 pub fn satisfies(&self, other: &Order) -> bool {
111 if self.column_orders.len() < other.column_orders.len() {
112 return false;
113 }
114 #[expect(clippy::disallowed_methods)]
115 for (order, other_order) in self.column_orders.iter().zip(other.column_orders.iter()) {
116 if order != other_order {
117 return false;
118 }
119 }
120 true
121 }
122
123 #[inline(always)]
124 pub const fn any() -> Self {
125 ANY_ORDER
126 }
127
128 #[inline(always)]
129 pub fn is_any(&self) -> bool {
130 self.column_orders.is_empty()
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
137
138 use super::Order;
139
140 #[test]
141 fn test_order_satisfy() {
142 let o1 = Order {
143 column_orders: vec![
144 ColumnOrder {
145 column_index: 0,
146 order_type: OrderType::ascending(),
147 },
148 ColumnOrder {
149 column_index: 1,
150 order_type: OrderType::descending(),
151 },
152 ColumnOrder {
153 column_index: 2,
154 order_type: OrderType::ascending(),
155 },
156 ],
157 };
158 let o2 = Order {
159 column_orders: vec![
160 ColumnOrder {
161 column_index: 0,
162 order_type: OrderType::ascending(),
163 },
164 ColumnOrder {
165 column_index: 1,
166 order_type: OrderType::descending(),
167 },
168 ],
169 };
170 let o3 = Order {
171 column_orders: vec![
172 ColumnOrder {
173 column_index: 0,
174 order_type: OrderType::ascending(),
175 },
176 ColumnOrder {
177 column_index: 1,
178 order_type: OrderType::ascending(),
179 },
180 ],
181 };
182
183 assert!(o1.satisfies(&o2));
184 assert!(!o2.satisfies(&o1));
185 assert!(o1.satisfies(&o1));
186
187 assert!(!o2.satisfies(&o3));
188 assert!(!o3.satisfies(&o2));
189 }
190}