risingwave_frontend/optimizer/plan_node/
logical_max_one_row.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16
17use super::generic::DistillUnit;
18use super::utils::Distill;
19use super::{
20    BatchMaxOneRow, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PlanTreeNodeUnary,
21    PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
22};
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
27};
28use crate::utils::{ColIndexMapping, Condition};
29
30/// [`LogicalMaxOneRow`] fetches up to one row from the input, returning an error
31/// if the input contains more than one row at runtime. Only available in batch mode.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct LogicalMaxOneRow {
34    pub base: PlanBase<Logical>,
35    pub(super) core: generic::MaxOneRow<PlanRef>,
36}
37
38impl LogicalMaxOneRow {
39    fn new(core: generic::MaxOneRow<PlanRef>) -> Self {
40        let base = PlanBase::new_logical_with_core(&core);
41        LogicalMaxOneRow { base, core }
42    }
43
44    pub fn create(input: PlanRef) -> PlanRef {
45        let core = generic::MaxOneRow { input };
46        Self::new(core).into()
47    }
48}
49
50impl PlanTreeNodeUnary for LogicalMaxOneRow {
51    fn input(&self) -> PlanRef {
52        self.core.input.clone()
53    }
54
55    fn clone_with_input(&self, input: PlanRef) -> Self {
56        let core = generic::MaxOneRow { input };
57        Self::new(core)
58    }
59
60    fn rewrite_with_input(
61        &self,
62        input: PlanRef,
63        input_col_change: ColIndexMapping,
64    ) -> (Self, ColIndexMapping) {
65        (self.clone_with_input(input), input_col_change)
66    }
67}
68impl_plan_tree_node_for_unary! {LogicalMaxOneRow}
69
70impl Distill for LogicalMaxOneRow {
71    fn distill<'a>(&self) -> XmlNode<'a> {
72        self.core.distill_with_name("LogicalMaxOneRow")
73    }
74}
75
76impl ColPrunable for LogicalMaxOneRow {
77    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
78        let new_input = self.input().prune_col(required_cols, ctx);
79        self.clone_with_input(new_input).into()
80    }
81}
82
83impl ExprRewritable for LogicalMaxOneRow {}
84
85impl ExprVisitable for LogicalMaxOneRow {}
86
87impl PredicatePushdown for LogicalMaxOneRow {
88    fn predicate_pushdown(
89        &self,
90        predicate: Condition,
91        ctx: &mut PredicatePushdownContext,
92    ) -> PlanRef {
93        // Filter can not transpose MaxOneRow
94        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
95    }
96}
97
98impl ToBatch for LogicalMaxOneRow {
99    fn to_batch(&self) -> Result<PlanRef> {
100        let input = self.input().to_batch()?;
101        let core = generic::MaxOneRow { input };
102        Ok(BatchMaxOneRow::new(core).into())
103    }
104}
105
106impl ToStream for LogicalMaxOneRow {
107    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
108        // Check `LogicalOptimizer::gen_optimized_logical_plan_for_stream`.
109        unreachable!("should already bail out after subquery unnesting")
110    }
111
112    fn logical_rewrite_for_stream(
113        &self,
114        ctx: &mut RewriteStreamContext,
115    ) -> Result<(PlanRef, ColIndexMapping)> {
116        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
117        let (this, out_col_change) = self.rewrite_with_input(input, input_col_change);
118        Ok((this.into(), out_col_change))
119    }
120}