risingwave_frontend/optimizer/plan_node/
batch_exchange.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
22use crate::error::Result;
23use crate::optimizer::plan_node::ToLocalBatch;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, DistributionDisplay, Order, OrderDisplay};
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchExchange {
31 pub base: PlanBase<Batch>,
32 input: PlanRef,
33 sequential: bool,
36}
37
38impl BatchExchange {
39 pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self {
40 Self::new_inner(input, order, dist, false)
41 }
42
43 pub fn new_with_sequential(input: PlanRef, order: Order, dist: Distribution) -> Self {
44 Self::new_inner(input, order, dist, true)
45 }
46
47 fn new_inner(input: PlanRef, order: Order, dist: Distribution, sequential: bool) -> Self {
48 let ctx = input.ctx();
49 let schema = input.schema().clone();
50 let base = PlanBase::new_batch(ctx, schema, dist, order);
51 BatchExchange {
52 base,
53 input,
54 sequential,
55 }
56 }
57}
58
59impl Distill for BatchExchange {
60 fn distill<'a>(&self) -> XmlNode<'a> {
61 let input_schema = self.input.schema();
62 let order = OrderDisplay {
63 order: self.base.order(),
64 input_schema,
65 }
66 .distill();
67 let dist = Pretty::display(&DistributionDisplay {
68 distribution: self.base.distribution(),
69 input_schema,
70 });
71 let mut fields = vec![("order", order), ("dist", dist)];
72 if self.sequential {
73 fields.push(("sequential", Pretty::display(&true)));
74 }
75 childless_record("BatchExchange", fields)
76 }
77}
78
79impl PlanTreeNodeUnary for BatchExchange {
80 fn input(&self) -> PlanRef {
81 self.input.clone()
82 }
83
84 fn clone_with_input(&self, input: PlanRef) -> Self {
85 Self::new_inner(
86 input,
87 self.order().clone(),
88 self.distribution().clone(),
89 self.sequential,
90 )
91 }
92}
93impl_plan_tree_node_for_unary! {BatchExchange}
94
95impl ToDistributedBatch for BatchExchange {
96 fn to_distributed(&self) -> Result<PlanRef> {
97 unreachable!()
98 }
99}
100
101impl ToBatchPb for BatchExchange {
103 fn to_batch_prost_body(&self) -> NodeBody {
104 if self.base.order().is_any() {
105 NodeBody::Exchange(ExchangeNode {
106 sources: vec![],
107 sequential: self.sequential,
108 input_schema: self.base.schema().to_prost(),
109 })
110 } else {
111 assert!(!self.sequential);
112 NodeBody::MergeSortExchange(MergeSortExchangeNode {
113 exchange: Some(ExchangeNode {
114 sources: vec![],
115 sequential: self.sequential,
116 input_schema: self.base.schema().to_prost(),
117 }),
118 column_orders: self.base.order().to_protobuf(),
119 })
120 }
121 }
122}
123
124impl ToLocalBatch for BatchExchange {
125 fn to_local(&self) -> Result<PlanRef> {
126 unreachable!()
127 }
128}
129
130impl ExprRewritable for BatchExchange {}
131
132impl ExprVisitable for BatchExchange {}