risingwave_frontend/optimizer/rule/
apply_limit_transpose_rule.rs1use itertools::Itertools;
16use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
17use risingwave_pb::plan_common::JoinType;
18
19use super::{BoxedRule, Rule};
20use crate::optimizer::PlanRef;
21use crate::optimizer::plan_node::{
22 LogicalApply, LogicalFilter, LogicalLimit, LogicalTopN, PlanTreeNodeUnary,
23};
24use crate::optimizer::property::Order;
25use crate::utils::Condition;
26
27pub struct ApplyLimitTransposeRule {}
49impl Rule for ApplyLimitTransposeRule {
50 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
51 let apply: &LogicalApply = plan.as_logical_apply()?;
52 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
53 apply.clone().decompose();
54 assert_eq!(join_type, JoinType::Inner);
55 let logical_limit: &LogicalLimit = right.as_logical_limit()?;
56 let limit_input = logical_limit.input();
57 let limit = logical_limit.limit();
58 let offset = logical_limit.offset();
59
60 let apply_left_len = left.schema().len();
61
62 if max_one_row {
63 return None;
64 }
65
66 let new_apply = LogicalApply::create(
67 left,
68 limit_input,
69 JoinType::Inner,
70 Condition::true_cond(),
71 correlated_id,
72 correlated_indices,
73 false,
74 );
75
76 let new_topn = {
77 let order = Order::new(vec![ColumnOrder::new(0, OrderType::ascending())]);
79 LogicalTopN::new(
80 new_apply,
81 limit,
82 offset,
83 false,
84 order,
85 (0..apply_left_len).collect_vec(),
86 )
87 };
88
89 let filter = LogicalFilter::create(new_topn.into(), on);
90 Some(filter)
91 }
92}
93
94impl ApplyLimitTransposeRule {
95 pub fn create() -> BoxedRule {
96 Box::new(ApplyLimitTransposeRule {})
97 }
98}