risingwave_frontend/optimizer/plan_node/
batch_sort.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::SortNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
23    ToDistributedBatch,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, OrderDisplay};
29
30/// `BatchSort` buffers all data from input and sort these rows by specified order, providing the
31/// collation required by user or parent plan node.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchSort {
34    pub base: PlanBase<Batch>,
35    input: PlanRef,
36}
37
38impl BatchSort {
39    pub fn new(input: PlanRef, order: Order) -> Self {
40        let ctx = input.ctx();
41        let schema = input.schema().clone();
42        let dist = input.distribution().clone();
43        let base = PlanBase::new_batch(ctx, schema, dist, order);
44        BatchSort { base, input }
45    }
46}
47
48impl Distill for BatchSort {
49    fn distill<'a>(&self) -> XmlNode<'a> {
50        let data = OrderDisplay {
51            order: self.order(),
52            input_schema: self.input.schema(),
53        };
54        childless_record("BatchSort", vec![("order", data.distill())])
55    }
56}
57
58impl PlanTreeNodeUnary<Batch> for BatchSort {
59    fn input(&self) -> PlanRef {
60        self.input.clone()
61    }
62
63    fn clone_with_input(&self, input: PlanRef) -> Self {
64        Self::new(input, self.base.order().clone())
65    }
66}
67impl_plan_tree_node_for_unary! { Batch, BatchSort}
68
69impl ToDistributedBatch for BatchSort {
70    fn to_distributed(&self) -> Result<PlanRef> {
71        let new_input = self.input().to_distributed()?;
72        Ok(self.clone_with_input(new_input).into())
73    }
74}
75
76impl ToBatchPb for BatchSort {
77    fn to_batch_prost_body(&self) -> NodeBody {
78        let column_orders = self.base.order().to_protobuf();
79        NodeBody::Sort(SortNode { column_orders })
80    }
81}
82
83impl ToLocalBatch for BatchSort {
84    fn to_local(&self) -> Result<PlanRef> {
85        let new_input = self.input().to_local()?;
86        Ok(self.clone_with_input(new_input).into())
87    }
88}
89
90impl ExprRewritable<Batch> for BatchSort {}
91
92impl ExprVisitable for BatchSort {}