risingwave_frontend/optimizer/plan_node/
batch_exchange.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
23 ToDistributedBatch,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Distribution, DistributionDisplay, Order, OrderDisplay};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchExchange {
34 pub base: PlanBase<Batch>,
35 input: PlanRef,
36 sequential: bool,
39}
40
41impl BatchExchange {
42 pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self {
43 Self::new_inner(input, order, dist, false)
44 }
45
46 pub fn new_with_sequential(input: PlanRef, order: Order, dist: Distribution) -> Self {
47 Self::new_inner(input, order, dist, true)
48 }
49
50 fn new_inner(input: PlanRef, order: Order, dist: Distribution, sequential: bool) -> Self {
51 let ctx = input.ctx();
52 let schema = input.schema().clone();
53 let base = PlanBase::new_batch(ctx, schema, dist, order);
54 BatchExchange {
55 base,
56 input,
57 sequential,
58 }
59 }
60}
61
62impl Distill for BatchExchange {
63 fn distill<'a>(&self) -> XmlNode<'a> {
64 let input_schema = self.input.schema();
65 let order = OrderDisplay {
66 order: self.base.order(),
67 input_schema,
68 }
69 .distill();
70 let dist = Pretty::display(&DistributionDisplay {
71 distribution: self.base.distribution(),
72 input_schema,
73 });
74 let mut fields = vec![("order", order), ("dist", dist)];
75 if self.sequential {
76 fields.push(("sequential", Pretty::display(&true)));
77 }
78 childless_record("BatchExchange", fields)
79 }
80}
81
82impl PlanTreeNodeUnary<Batch> for BatchExchange {
83 fn input(&self) -> PlanRef {
84 self.input.clone()
85 }
86
87 fn clone_with_input(&self, input: PlanRef) -> Self {
88 Self::new_inner(
89 input,
90 self.order().clone(),
91 self.distribution().clone(),
92 self.sequential,
93 )
94 }
95}
96impl_plan_tree_node_for_unary! { Batch, BatchExchange}
97
98impl ToDistributedBatch for BatchExchange {
99 fn to_distributed(&self) -> Result<PlanRef> {
100 unreachable!()
101 }
102}
103
104impl ToBatchPb for BatchExchange {
106 fn to_batch_prost_body(&self) -> NodeBody {
107 if self.base.order().is_any() {
108 NodeBody::Exchange(ExchangeNode {
109 sources: vec![],
110 sequential: self.sequential,
111 input_schema: self.base.schema().to_prost(),
112 })
113 } else {
114 assert!(!self.sequential);
115 NodeBody::MergeSortExchange(MergeSortExchangeNode {
116 exchange: Some(ExchangeNode {
117 sources: vec![],
118 sequential: self.sequential,
119 input_schema: self.base.schema().to_prost(),
120 }),
121 column_orders: self.base.order().to_protobuf(),
122 })
123 }
124 }
125}
126
127impl ToLocalBatch for BatchExchange {
128 fn to_local(&self) -> Result<PlanRef> {
129 unreachable!()
130 }
131}
132
133impl ExprRewritable<Batch> for BatchExchange {}
134
135impl ExprVisitable for BatchExchange {}