risingwave_frontend/optimizer/plan_node/
logical_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
16
17use super::utils::impl_distill_by_unit;
18use super::{
19    BatchLimit, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase,
20    PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
21};
22use crate::error::Result;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::Limit;
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, LogicalTopN, PredicatePushdownContext, RewriteStreamContext,
27    ToStreamContext,
28};
29use crate::optimizer::property::Order;
30use crate::utils::{ColIndexMapping, Condition};
31
32/// `LogicalLimit` fetches up to `limit` rows from `offset`
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalLimit {
35    pub base: PlanBase<Logical>,
36    pub(super) core: generic::Limit<PlanRef>,
37}
38
39impl LogicalLimit {
40    pub fn new(core: generic::Limit<PlanRef>) -> Self {
41        let base = PlanBase::new_logical_with_core(&core);
42        LogicalLimit { base, core }
43    }
44
45    /// the function will check if the cond is bool expression
46    pub fn create(input: PlanRef, limit: u64, offset: u64) -> PlanRef {
47        Self::new(generic::Limit::new(input, limit, offset)).into()
48    }
49
50    pub fn limit(&self) -> u64 {
51        self.core.limit
52    }
53
54    pub fn offset(&self) -> u64 {
55        self.core.offset
56    }
57}
58
59impl PlanTreeNodeUnary<Logical> for LogicalLimit {
60    fn input(&self) -> PlanRef {
61        self.core.input.clone()
62    }
63
64    fn clone_with_input(&self, input: PlanRef) -> Self {
65        let mut core = self.core.clone();
66        core.input = input;
67        Self::new(core)
68    }
69
70    fn rewrite_with_input(
71        &self,
72        input: PlanRef,
73        input_col_change: ColIndexMapping,
74    ) -> (Self, ColIndexMapping) {
75        (self.clone_with_input(input), input_col_change)
76    }
77}
78impl_plan_tree_node_for_unary! { Logical, LogicalLimit}
79impl_distill_by_unit!(LogicalLimit, core, "LogicalLimit");
80
81impl ColPrunable for LogicalLimit {
82    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
83        let new_input = self.input().prune_col(required_cols, ctx);
84        self.clone_with_input(new_input).into()
85    }
86}
87
88impl ExprRewritable<Logical> for LogicalLimit {}
89
90impl ExprVisitable for LogicalLimit {}
91
92impl PredicatePushdown for LogicalLimit {
93    fn predicate_pushdown(
94        &self,
95        predicate: Condition,
96        ctx: &mut PredicatePushdownContext,
97    ) -> PlanRef {
98        // filter can not transpose limit
99        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
100    }
101}
102
103impl ToBatch for LogicalLimit {
104    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
105        let new_input = self.input().to_batch()?;
106        let core = Limit::new(new_input, self.core.limit, self.core.offset);
107        Ok(BatchLimit::new(core).into())
108    }
109}
110
111impl ToStream for LogicalLimit {
112    fn to_stream(
113        &self,
114        ctx: &mut ToStreamContext,
115    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
116        // use the first column as an order to provide determinism for streaming queries.
117        let order = Order::new(vec![ColumnOrder::new(0, OrderType::ascending())]);
118        let topn = LogicalTopN::new(
119            self.input(),
120            self.limit(),
121            self.offset(),
122            false,
123            order,
124            vec![],
125        );
126        topn.to_stream(ctx)
127    }
128
129    fn logical_rewrite_for_stream(
130        &self,
131        ctx: &mut RewriteStreamContext,
132    ) -> Result<(PlanRef, ColIndexMapping)> {
133        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
134        let (filter, out_col_change) = self.rewrite_with_input(input, input_col_change);
135        Ok((filter.into(), out_col_change))
136    }
137}