risingwave_frontend/optimizer/property/
order.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16
17use itertools::Itertools;
18use pretty_xmlish::Pretty;
19use risingwave_common::catalog::Schema;
20use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
21use risingwave_pb::common::PbColumnOrder;
22
23use super::super::plan_node::*;
24use crate::error::Result;
25
26// TODO(rc): use this type to replace all `Vec<ColumnOrder>`
27#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
28pub struct Order {
29    pub column_orders: Vec<ColumnOrder>,
30}
31
32impl Order {
33    pub const fn new(column_orders: Vec<ColumnOrder>) -> Self {
34        Self { column_orders }
35    }
36
37    pub fn to_protobuf(&self) -> Vec<PbColumnOrder> {
38        self.column_orders
39            .iter()
40            .map(ColumnOrder::to_protobuf)
41            .collect_vec()
42    }
43
44    #[allow(clippy::len_without_is_empty)]
45    pub fn len(&self) -> usize {
46        self.column_orders.len()
47    }
48
49    pub fn concat(self, other: Self) -> Self {
50        Self {
51            column_orders: self
52                .column_orders
53                .into_iter()
54                .chain(other.column_orders)
55                .collect(),
56        }
57    }
58}
59
60impl fmt::Display for Order {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "[")?;
63        for (i, column_order) in self.column_orders.iter().enumerate() {
64            if i > 0 {
65                write!(f, ", ")?;
66            }
67            write!(f, "{}", column_order)?;
68        }
69        write!(f, "]")
70    }
71}
72
73pub struct OrderDisplay<'a> {
74    pub order: &'a Order,
75    pub input_schema: &'a Schema,
76}
77
78impl OrderDisplay<'_> {
79    pub fn distill<'a>(self) -> Pretty<'a> {
80        let iter = self.order.column_orders.iter();
81        let vec = iter.map(|column_order| {
82            Pretty::display(&ColumnOrderDisplay {
83                column_order,
84                input_schema: self.input_schema,
85            })
86        });
87        Pretty::Array(vec.collect())
88    }
89}
90
91const ANY_ORDER: Order = Order {
92    column_orders: vec![],
93};
94
95impl Order {
96    pub fn enforce_if_not_satisfies(&self, plan: PlanRef) -> Result<PlanRef> {
97        use crate::optimizer::plan_node::batch::prelude::*;
98
99        if !plan.order().satisfies(self) {
100            Ok(self.enforce(plan))
101        } else {
102            Ok(plan)
103        }
104    }
105
106    fn enforce(&self, plan: PlanRef) -> PlanRef {
107        assert_eq!(plan.convention(), Convention::Batch);
108        BatchSort::new(plan, self.clone()).into()
109    }
110
111    pub fn satisfies(&self, other: &Order) -> bool {
112        if self.column_orders.len() < other.column_orders.len() {
113            return false;
114        }
115        #[expect(clippy::disallowed_methods)]
116        for (order, other_order) in self.column_orders.iter().zip(other.column_orders.iter()) {
117            if order != other_order {
118                return false;
119            }
120        }
121        true
122    }
123
124    #[inline(always)]
125    pub const fn any() -> Self {
126        ANY_ORDER
127    }
128
129    #[inline(always)]
130    pub fn is_any(&self) -> bool {
131        self.column_orders.is_empty()
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
138
139    use super::Order;
140
141    #[test]
142    fn test_order_satisfy() {
143        let o1 = Order {
144            column_orders: vec![
145                ColumnOrder {
146                    column_index: 0,
147                    order_type: OrderType::ascending(),
148                },
149                ColumnOrder {
150                    column_index: 1,
151                    order_type: OrderType::descending(),
152                },
153                ColumnOrder {
154                    column_index: 2,
155                    order_type: OrderType::ascending(),
156                },
157            ],
158        };
159        let o2 = Order {
160            column_orders: vec![
161                ColumnOrder {
162                    column_index: 0,
163                    order_type: OrderType::ascending(),
164                },
165                ColumnOrder {
166                    column_index: 1,
167                    order_type: OrderType::descending(),
168                },
169            ],
170        };
171        let o3 = Order {
172            column_orders: vec![
173                ColumnOrder {
174                    column_index: 0,
175                    order_type: OrderType::ascending(),
176                },
177                ColumnOrder {
178                    column_index: 1,
179                    order_type: OrderType::ascending(),
180                },
181            ],
182        };
183
184        assert!(o1.satisfies(&o2));
185        assert!(!o2.satisfies(&o1));
186        assert!(o1.satisfies(&o1));
187
188        assert!(!o2.satisfies(&o3));
189        assert!(!o3.satisfies(&o2));
190    }
191}