risingwave_frontend/optimizer/property/
order.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16
17use itertools::Itertools;
18use pretty_xmlish::Pretty;
19use risingwave_common::catalog::Schema;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_pb::common::PbColumnOrder;
22
23use super::super::plan_node::*;
24use crate::error::Result;
25
26#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
27pub struct Order {
28    pub column_orders: Vec<ColumnOrder>,
29}
30
31impl Order {
32    pub const fn new(column_orders: Vec<ColumnOrder>) -> Self {
33        Self { column_orders }
34    }
35
36    pub fn to_protobuf(&self) -> Vec<PbColumnOrder> {
37        self.column_orders
38            .iter()
39            .map(ColumnOrder::to_protobuf)
40            .collect_vec()
41    }
42
43    #[allow(clippy::len_without_is_empty)]
44    pub fn len(&self) -> usize {
45        self.column_orders.len()
46    }
47
48    pub fn concat(self, other: Self) -> Self {
49        Self {
50            column_orders: self
51                .column_orders
52                .into_iter()
53                .chain(other.column_orders)
54                .collect(),
55        }
56    }
57}
58
59impl fmt::Display for Order {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(f, "[")?;
62        for (i, column_order) in self.column_orders.iter().enumerate() {
63            if i > 0 {
64                write!(f, ", ")?;
65            }
66            write!(f, "{}", column_order)?;
67        }
68        write!(f, "]")
69    }
70}
71
72pub struct OrderDisplay<'a> {
73    pub order: &'a Order,
74    pub input_schema: &'a Schema,
75}
76
77impl OrderDisplay<'_> {
78    pub fn distill<'a>(self) -> Pretty<'a> {
79        let iter = self.order.column_orders.iter();
80        let vec = iter.map(|column_order| {
81            Pretty::display(&ColumnOrderDisplay {
82                column_order,
83                input_schema: self.input_schema,
84            })
85        });
86        Pretty::Array(vec.collect())
87    }
88}
89
90const ANY_ORDER: Order = Order {
91    column_orders: vec![],
92};
93
94impl Order {
95    pub fn enforce_if_not_satisfies(&self, plan: PlanRef) -> Result<PlanRef> {
96        use crate::optimizer::plan_node::batch::prelude::*;
97
98        if !plan.order().satisfies(self) {
99            Ok(self.enforce(plan))
100        } else {
101            Ok(plan)
102        }
103    }
104
105    fn enforce(&self, plan: PlanRef) -> PlanRef {
106        assert_eq!(plan.convention(), Convention::Batch);
107        BatchSort::new(plan, self.clone()).into()
108    }
109
110    pub fn satisfies(&self, other: &Order) -> bool {
111        if self.column_orders.len() < other.column_orders.len() {
112            return false;
113        }
114        #[expect(clippy::disallowed_methods)]
115        for (order, other_order) in self.column_orders.iter().zip(other.column_orders.iter()) {
116            if order != other_order {
117                return false;
118            }
119        }
120        true
121    }
122
123    #[inline(always)]
124    pub const fn any() -> Self {
125        ANY_ORDER
126    }
127
128    #[inline(always)]
129    pub fn is_any(&self) -> bool {
130        self.column_orders.is_empty()
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
137
138    use super::Order;
139
140    #[test]
141    fn test_order_satisfy() {
142        let o1 = Order {
143            column_orders: vec![
144                ColumnOrder {
145                    column_index: 0,
146                    order_type: OrderType::ascending(),
147                },
148                ColumnOrder {
149                    column_index: 1,
150                    order_type: OrderType::descending(),
151                },
152                ColumnOrder {
153                    column_index: 2,
154                    order_type: OrderType::ascending(),
155                },
156            ],
157        };
158        let o2 = Order {
159            column_orders: vec![
160                ColumnOrder {
161                    column_index: 0,
162                    order_type: OrderType::ascending(),
163                },
164                ColumnOrder {
165                    column_index: 1,
166                    order_type: OrderType::descending(),
167                },
168            ],
169        };
170        let o3 = Order {
171            column_orders: vec![
172                ColumnOrder {
173                    column_index: 0,
174                    order_type: OrderType::ascending(),
175                },
176                ColumnOrder {
177                    column_index: 1,
178                    order_type: OrderType::ascending(),
179                },
180            ],
181        };
182
183        assert!(o1.satisfies(&o2));
184        assert!(!o2.satisfies(&o1));
185        assert!(o1.satisfies(&o1));
186
187        assert!(!o2.satisfies(&o3));
188        assert!(!o3.satisfies(&o2));
189    }
190}