risingwave_frontend/optimizer/rule/
apply_topn_transpose_rule.rs1use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17
18use super::{BoxedRule, Rule};
19use crate::optimizer::PlanRef;
20use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalTopN};
21use crate::utils::Condition;
22
23pub struct ApplyTopNTransposeRule {}
45impl Rule for ApplyTopNTransposeRule {
46 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
47 let apply: &LogicalApply = plan.as_logical_apply()?;
48 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
49 apply.clone().decompose();
50 assert_eq!(join_type, JoinType::Inner);
51 let topn: &LogicalTopN = right.as_logical_top_n()?;
52 let (topn_input, limit, offset, with_ties, mut order, mut group_key) =
53 topn.clone().decompose();
54
55 let apply_left_len = left.schema().len();
56
57 if max_one_row {
58 return None;
59 }
60
61 let new_apply = LogicalApply::create(
62 left,
63 topn_input,
64 JoinType::Inner,
65 Condition::true_cond(),
66 correlated_id,
67 correlated_indices,
68 false,
69 );
70
71 let new_topn = {
72 order
74 .column_orders
75 .iter_mut()
76 .for_each(|ord| ord.column_index += apply_left_len);
77 group_key.iter_mut().for_each(|idx| *idx += apply_left_len);
78 let new_group_key = (0..apply_left_len).chain(group_key).collect_vec();
79 LogicalTopN::new(new_apply, limit, offset, with_ties, order, new_group_key)
80 };
81
82 let filter = LogicalFilter::create(new_topn.into(), on);
83 Some(filter)
84 }
85}
86
87impl ApplyTopNTransposeRule {
88 pub fn create() -> BoxedRule {
89 Box::new(ApplyTopNTransposeRule {})
90 }
91}