risingwave_frontend/optimizer/plan_node/
batch_max_one_row.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::MaxOneRowNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::generic::DistillUnit;
21use super::utils::Distill;
22use super::{
23    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
24    ToDistributedBatch, generic,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::ToLocalBatch;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::property::{Order, RequiredDist};
30
31/// [`BatchMaxOneRow`] fetches up to one row from the input, returning an error
32/// if the input contains more than one row at runtime.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchMaxOneRow {
35    pub base: PlanBase<Batch>,
36    core: generic::MaxOneRow<PlanRef>,
37}
38
39impl BatchMaxOneRow {
40    pub fn new(core: generic::MaxOneRow<PlanRef>) -> Self {
41        let base = PlanBase::new_batch_with_core(
42            &core,
43            core.input.distribution().clone(),
44            core.input.order().clone(),
45        );
46        BatchMaxOneRow { base, core }
47    }
48}
49
50impl PlanTreeNodeUnary<Batch> for BatchMaxOneRow {
51    fn input(&self) -> PlanRef {
52        self.core.input.clone()
53    }
54
55    fn clone_with_input(&self, input: PlanRef) -> Self {
56        let core = generic::MaxOneRow { input };
57
58        Self::new(core)
59    }
60}
61impl_plan_tree_node_for_unary! { Batch, BatchMaxOneRow}
62
63impl Distill for BatchMaxOneRow {
64    fn distill<'a>(&self) -> XmlNode<'a> {
65        self.core.distill_with_name("BatchMaxOneRow")
66    }
67}
68
69impl ToDistributedBatch for BatchMaxOneRow {
70    fn to_distributed(&self) -> Result<PlanRef> {
71        let new_input = RequiredDist::single()
72            .batch_enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
73        Ok(self.clone_with_input(new_input).into())
74    }
75}
76
77impl ToBatchPb for BatchMaxOneRow {
78    fn to_batch_prost_body(&self) -> NodeBody {
79        NodeBody::MaxOneRow(MaxOneRowNode {})
80    }
81}
82
83impl ToLocalBatch for BatchMaxOneRow {
84    fn to_local(&self) -> Result<PlanRef> {
85        let new_input = RequiredDist::single()
86            .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
87        Ok(self.clone_with_input(new_input).into())
88    }
89}
90
91impl ExprRewritable<Batch> for BatchMaxOneRow {}
92
93impl ExprVisitable for BatchMaxOneRow {}