risingwave_frontend/optimizer/plan_node/
batch_max_one_row.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::MaxOneRowNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::generic::DistillUnit;
21use super::utils::Distill;
22use super::{
23 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
24 ToDistributedBatch, generic,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::ToLocalBatch;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::property::{Order, RequiredDist};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchMaxOneRow {
35 pub base: PlanBase<Batch>,
36 core: generic::MaxOneRow<PlanRef>,
37}
38
39impl BatchMaxOneRow {
40 pub fn new(core: generic::MaxOneRow<PlanRef>) -> Self {
41 let base = PlanBase::new_batch_with_core(
42 &core,
43 core.input.distribution().clone(),
44 core.input.order().clone(),
45 );
46 BatchMaxOneRow { base, core }
47 }
48}
49
50impl PlanTreeNodeUnary<Batch> for BatchMaxOneRow {
51 fn input(&self) -> PlanRef {
52 self.core.input.clone()
53 }
54
55 fn clone_with_input(&self, input: PlanRef) -> Self {
56 let core = generic::MaxOneRow { input };
57
58 Self::new(core)
59 }
60}
61impl_plan_tree_node_for_unary! { Batch, BatchMaxOneRow}
62
63impl Distill for BatchMaxOneRow {
64 fn distill<'a>(&self) -> XmlNode<'a> {
65 self.core.distill_with_name("BatchMaxOneRow")
66 }
67}
68
69impl ToDistributedBatch for BatchMaxOneRow {
70 fn to_distributed(&self) -> Result<PlanRef> {
71 let new_input = RequiredDist::single()
72 .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
73 Ok(self.clone_with_input(new_input).into())
74 }
75}
76
77impl ToBatchPb for BatchMaxOneRow {
78 fn to_batch_prost_body(&self) -> NodeBody {
79 NodeBody::MaxOneRow(MaxOneRowNode {})
80 }
81}
82
83impl ToLocalBatch for BatchMaxOneRow {
84 fn to_local(&self) -> Result<PlanRef> {
85 let new_input = RequiredDist::single()
86 .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
87 Ok(self.clone_with_input(new_input).into())
88 }
89}
90
91impl ExprRewritable<Batch> for BatchMaxOneRow {}
92
93impl ExprVisitable for BatchMaxOneRow {}