risingwave_frontend/optimizer/plan_node/
batch_sort.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::SortNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
23 ToDistributedBatch,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, OrderDisplay};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchSort {
34 pub base: PlanBase<Batch>,
35 input: PlanRef,
36}
37
38impl BatchSort {
39 pub fn new(input: PlanRef, order: Order) -> Self {
40 let ctx = input.ctx();
41 let schema = input.schema().clone();
42 let dist = input.distribution().clone();
43 let base = PlanBase::new_batch(ctx, schema, dist, order);
44 BatchSort { base, input }
45 }
46}
47
48impl Distill for BatchSort {
49 fn distill<'a>(&self) -> XmlNode<'a> {
50 let data = OrderDisplay {
51 order: self.order(),
52 input_schema: self.input.schema(),
53 };
54 childless_record("BatchSort", vec![("order", data.distill())])
55 }
56}
57
58impl PlanTreeNodeUnary<Batch> for BatchSort {
59 fn input(&self) -> PlanRef {
60 self.input.clone()
61 }
62
63 fn clone_with_input(&self, input: PlanRef) -> Self {
64 Self::new(input, self.base.order().clone())
65 }
66}
67impl_plan_tree_node_for_unary! { Batch, BatchSort}
68
69impl ToDistributedBatch for BatchSort {
70 fn to_distributed(&self) -> Result<PlanRef> {
71 let new_input = self.input().to_distributed()?;
72 Ok(self.clone_with_input(new_input).into())
73 }
74}
75
76impl ToBatchPb for BatchSort {
77 fn to_batch_prost_body(&self) -> NodeBody {
78 let column_orders = self.base.order().to_protobuf();
79 NodeBody::Sort(SortNode { column_orders })
80 }
81}
82
83impl ToLocalBatch for BatchSort {
84 fn to_local(&self) -> Result<PlanRef> {
85 let new_input = self.input().to_local()?;
86 Ok(self.clone_with_input(new_input).into())
87 }
88}
89
90impl ExprRewritable<Batch> for BatchSort {}
91
92impl ExprVisitable for BatchSort {}