risingwave_frontend/optimizer/rule/
apply_limit_transpose_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
17use risingwave_pb::plan_common::JoinType;
18
19use super::{BoxedRule, Rule};
20use crate::optimizer::PlanRef;
21use crate::optimizer::plan_node::{
22    LogicalApply, LogicalFilter, LogicalLimit, LogicalTopN, PlanTreeNodeUnary,
23};
24use crate::optimizer::property::Order;
25use crate::utils::Condition;
26
27/// Transpose `LogicalApply` and `LogicalLimit`.
28///
29/// Before:
30///
31/// ```text
32///     LogicalApply
33///    /            \
34///  Domain      LogicalLimit
35///                  |
36///                Input
37/// ```
38///
39/// After:
40///
41/// ```text
42///      LogicalTopN
43///          |
44///     LogicalApply
45///    /            \
46///  Domain        Input
47/// ```
48pub struct ApplyLimitTransposeRule {}
49impl Rule for ApplyLimitTransposeRule {
50    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
51        let apply: &LogicalApply = plan.as_logical_apply()?;
52        let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
53            apply.clone().decompose();
54        assert_eq!(join_type, JoinType::Inner);
55        let logical_limit: &LogicalLimit = right.as_logical_limit()?;
56        let limit_input = logical_limit.input();
57        let limit = logical_limit.limit();
58        let offset = logical_limit.offset();
59
60        let apply_left_len = left.schema().len();
61
62        if max_one_row {
63            return None;
64        }
65
66        let new_apply = LogicalApply::create(
67            left,
68            limit_input,
69            JoinType::Inner,
70            Condition::true_cond(),
71            correlated_id,
72            correlated_indices,
73            false,
74        );
75
76        let new_topn = {
77            // use the first column as an order to provide determinism for streaming queries.
78            let order = Order::new(vec![ColumnOrder::new(0, OrderType::ascending())]);
79            LogicalTopN::new(
80                new_apply,
81                limit,
82                offset,
83                false,
84                order,
85                (0..apply_left_len).collect_vec(),
86            )
87        };
88
89        let filter = LogicalFilter::create(new_topn.into(), on);
90        Some(filter)
91    }
92}
93
94impl ApplyLimitTransposeRule {
95    pub fn create() -> BoxedRule {
96        Box::new(ApplyLimitTransposeRule {})
97    }
98}