risingwave_frontend/optimizer/plan_node/
logical_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
16
17use super::utils::impl_distill_by_unit;
18use super::{
19    BatchLimit, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, PlanTreeNodeUnary,
20    PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
21};
22use crate::error::Result;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::{
25    ColumnPruningContext, LogicalTopN, PredicatePushdownContext, RewriteStreamContext,
26    ToStreamContext,
27};
28use crate::optimizer::property::Order;
29use crate::utils::{ColIndexMapping, Condition};
30
31/// `LogicalLimit` fetches up to `limit` rows from `offset`
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct LogicalLimit {
34    pub base: PlanBase<Logical>,
35    pub(super) core: generic::Limit<PlanRef>,
36}
37
38impl LogicalLimit {
39    pub fn new(core: generic::Limit<PlanRef>) -> Self {
40        let base = PlanBase::new_logical_with_core(&core);
41        LogicalLimit { base, core }
42    }
43
44    /// the function will check if the cond is bool expression
45    pub fn create(input: PlanRef, limit: u64, offset: u64) -> PlanRef {
46        Self::new(generic::Limit::new(input, limit, offset)).into()
47    }
48
49    pub fn limit(&self) -> u64 {
50        self.core.limit
51    }
52
53    pub fn offset(&self) -> u64 {
54        self.core.offset
55    }
56}
57
58impl PlanTreeNodeUnary for LogicalLimit {
59    fn input(&self) -> PlanRef {
60        self.core.input.clone()
61    }
62
63    fn clone_with_input(&self, input: PlanRef) -> Self {
64        let mut core = self.core.clone();
65        core.input = input;
66        Self::new(core)
67    }
68
69    fn rewrite_with_input(
70        &self,
71        input: PlanRef,
72        input_col_change: ColIndexMapping,
73    ) -> (Self, ColIndexMapping) {
74        (self.clone_with_input(input), input_col_change)
75    }
76}
77impl_plan_tree_node_for_unary! {LogicalLimit}
78impl_distill_by_unit!(LogicalLimit, core, "LogicalLimit");
79
80impl ColPrunable for LogicalLimit {
81    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
82        let new_input = self.input().prune_col(required_cols, ctx);
83        self.clone_with_input(new_input).into()
84    }
85}
86
87impl ExprRewritable for LogicalLimit {}
88
89impl ExprVisitable for LogicalLimit {}
90
91impl PredicatePushdown for LogicalLimit {
92    fn predicate_pushdown(
93        &self,
94        predicate: Condition,
95        ctx: &mut PredicatePushdownContext,
96    ) -> PlanRef {
97        // filter can not transpose limit
98        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
99    }
100}
101
102impl ToBatch for LogicalLimit {
103    fn to_batch(&self) -> Result<PlanRef> {
104        let new_input = self.input().to_batch()?;
105        let mut new_logical = self.core.clone();
106        new_logical.input = new_input;
107        Ok(BatchLimit::new(new_logical).into())
108    }
109}
110
111impl ToStream for LogicalLimit {
112    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
113        // use the first column as an order to provide determinism for streaming queries.
114        let order = Order::new(vec![ColumnOrder::new(0, OrderType::ascending())]);
115        let topn = LogicalTopN::new(
116            self.input(),
117            self.limit(),
118            self.offset(),
119            false,
120            order,
121            vec![],
122        );
123        topn.to_stream(ctx)
124    }
125
126    fn logical_rewrite_for_stream(
127        &self,
128        ctx: &mut RewriteStreamContext,
129    ) -> Result<(PlanRef, ColIndexMapping)> {
130        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
131        let (filter, out_col_change) = self.rewrite_with_input(input, input_col_change);
132        Ok((filter.into(), out_col_change))
133    }
134}