risingwave_frontend/optimizer/rule/
apply_limit_transpose_rule.rs1use itertools::Itertools;
16use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
17use risingwave_pb::plan_common::JoinType;
18
19use super::prelude::{PlanRef, *};
20use crate::optimizer::plan_node::{
21 LogicalApply, LogicalFilter, LogicalLimit, LogicalTopN, PlanTreeNodeUnary,
22};
23use crate::optimizer::property::Order;
24use crate::utils::Condition;
25
26pub struct ApplyLimitTransposeRule {}
48impl Rule<Logical> for ApplyLimitTransposeRule {
49 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
50 let apply: &LogicalApply = plan.as_logical_apply()?;
51 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
52 apply.clone().decompose();
53 assert_eq!(join_type, JoinType::Inner);
54 let logical_limit: &LogicalLimit = right.as_logical_limit()?;
55 let limit_input = logical_limit.input();
56 let limit = logical_limit.limit();
57 let offset = logical_limit.offset();
58
59 let apply_left_len = left.schema().len();
60
61 if max_one_row {
62 return None;
63 }
64
65 let new_apply = LogicalApply::create(
66 left,
67 limit_input,
68 JoinType::Inner,
69 Condition::true_cond(),
70 correlated_id,
71 correlated_indices,
72 false,
73 );
74
75 let new_topn = {
76 let order = Order::new(vec![ColumnOrder::new(0, OrderType::ascending())]);
78 LogicalTopN::new(
79 new_apply,
80 limit,
81 offset,
82 false,
83 order,
84 (0..apply_left_len).collect_vec(),
85 )
86 };
87
88 let filter = LogicalFilter::create(new_topn.into(), on);
89 Some(filter)
90 }
91}
92
93impl ApplyLimitTransposeRule {
94 pub fn create() -> BoxedRule {
95 Box::new(ApplyLimitTransposeRule {})
96 }
97}