1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::MaxOneRowNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
1819use super::batch::prelude::*;
20use super::generic::DistillUnit;
21use super::utils::Distill;
22use super::{
23 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, RequiredDist};
2930/// [`BatchMaxOneRow`] fetches up to one row from the input, returning an error
31/// if the input contains more than one row at runtime.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchMaxOneRow {
34pub base: PlanBase<Batch>,
35 core: generic::MaxOneRow<PlanRef>,
36}
3738impl BatchMaxOneRow {
39pub fn new(core: generic::MaxOneRow<PlanRef>) -> Self {
40let base = PlanBase::new_batch_with_core(
41&core,
42 core.input.distribution().clone(),
43 core.input.order().clone(),
44 );
45 BatchMaxOneRow { base, core }
46 }
47}
4849impl PlanTreeNodeUnary for BatchMaxOneRow {
50fn input(&self) -> PlanRef {
51self.core.input.clone()
52 }
5354fn clone_with_input(&self, input: PlanRef) -> Self {
55let core = generic::MaxOneRow { input };
5657Self::new(core)
58 }
59}
60impl_plan_tree_node_for_unary! {BatchMaxOneRow}
6162impl Distill for BatchMaxOneRow {
63fn distill<'a>(&self) -> XmlNode<'a> {
64self.core.distill_with_name("BatchMaxOneRow")
65 }
66}
6768impl ToDistributedBatch for BatchMaxOneRow {
69fn to_distributed(&self) -> Result<PlanRef> {
70let new_input = RequiredDist::single()
71 .enforce_if_not_satisfies(self.input().to_distributed()?, &Order::any())?;
72Ok(self.clone_with_input(new_input).into())
73 }
74}
7576impl ToBatchPb for BatchMaxOneRow {
77fn to_batch_prost_body(&self) -> NodeBody {
78 NodeBody::MaxOneRow(MaxOneRowNode {})
79 }
80}
8182impl ToLocalBatch for BatchMaxOneRow {
83fn to_local(&self) -> Result<PlanRef> {
84let new_input = RequiredDist::single()
85 .enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
86Ok(self.clone_with_input(new_input).into())
87 }
88}
8990impl ExprRewritable for BatchMaxOneRow {}
9192impl ExprVisitable for BatchMaxOneRow {}