risingwave_frontend/optimizer/plan_node/
logical_limit.rs1use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
16
17use super::utils::impl_distill_by_unit;
18use super::{
19 BatchLimit, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase,
20 PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
21};
22use crate::error::Result;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::Limit;
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, LogicalTopN, PredicatePushdownContext, RewriteStreamContext,
27 ToStreamContext,
28};
29use crate::optimizer::property::Order;
30use crate::utils::{ColIndexMapping, Condition};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalLimit {
35 pub base: PlanBase<Logical>,
36 pub(super) core: generic::Limit<PlanRef>,
37}
38
39impl LogicalLimit {
40 pub fn new(core: generic::Limit<PlanRef>) -> Self {
41 let base = PlanBase::new_logical_with_core(&core);
42 LogicalLimit { base, core }
43 }
44
45 pub fn create(input: PlanRef, limit: u64, offset: u64) -> PlanRef {
47 Self::new(generic::Limit::new(input, limit, offset)).into()
48 }
49
50 pub fn limit(&self) -> u64 {
51 self.core.limit
52 }
53
54 pub fn offset(&self) -> u64 {
55 self.core.offset
56 }
57}
58
59impl PlanTreeNodeUnary<Logical> for LogicalLimit {
60 fn input(&self) -> PlanRef {
61 self.core.input.clone()
62 }
63
64 fn clone_with_input(&self, input: PlanRef) -> Self {
65 let mut core = self.core.clone();
66 core.input = input;
67 Self::new(core)
68 }
69
70 fn rewrite_with_input(
71 &self,
72 input: PlanRef,
73 input_col_change: ColIndexMapping,
74 ) -> (Self, ColIndexMapping) {
75 (self.clone_with_input(input), input_col_change)
76 }
77}
78impl_plan_tree_node_for_unary! { Logical, LogicalLimit}
79impl_distill_by_unit!(LogicalLimit, core, "LogicalLimit");
80
81impl ColPrunable for LogicalLimit {
82 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
83 let new_input = self.input().prune_col(required_cols, ctx);
84 self.clone_with_input(new_input).into()
85 }
86}
87
88impl ExprRewritable<Logical> for LogicalLimit {}
89
90impl ExprVisitable for LogicalLimit {}
91
92impl PredicatePushdown for LogicalLimit {
93 fn predicate_pushdown(
94 &self,
95 predicate: Condition,
96 ctx: &mut PredicatePushdownContext,
97 ) -> PlanRef {
98 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
100 }
101}
102
103impl ToBatch for LogicalLimit {
104 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
105 let new_input = self.input().to_batch()?;
106 let core = Limit::new(new_input, self.core.limit, self.core.offset);
107 Ok(BatchLimit::new(core).into())
108 }
109}
110
111impl ToStream for LogicalLimit {
112 fn to_stream(
113 &self,
114 ctx: &mut ToStreamContext,
115 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
116 let order = Order::new(vec![ColumnOrder::new(0, OrderType::ascending())]);
118 let topn = LogicalTopN::new(
119 self.input(),
120 self.limit(),
121 self.offset(),
122 false,
123 order,
124 vec![],
125 );
126 topn.to_stream(ctx)
127 }
128
129 fn logical_rewrite_for_stream(
130 &self,
131 ctx: &mut RewriteStreamContext,
132 ) -> Result<(PlanRef, ColIndexMapping)> {
133 let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
134 let (filter, out_col_change) = self.rewrite_with_input(input, input_col_change);
135 Ok((filter.into(), out_col_change))
136 }
137}