risingwave_frontend/optimizer/rule/
apply_topn_transpose_rule.rs1use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17
18use super::prelude::{PlanRef, *};
19use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalTopN};
20use crate::utils::Condition;
21
22pub struct ApplyTopNTransposeRule {}
44impl Rule<Logical> for ApplyTopNTransposeRule {
45 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
46 let apply: &LogicalApply = plan.as_logical_apply()?;
47 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
48 apply.clone().decompose();
49 assert_eq!(join_type, JoinType::Inner);
50 let topn: &LogicalTopN = right.as_logical_top_n()?;
51 let (topn_input, limit, offset, with_ties, mut order, mut group_key) =
52 topn.clone().decompose();
53
54 let apply_left_len = left.schema().len();
55
56 if max_one_row {
57 return None;
58 }
59
60 let new_apply = LogicalApply::create(
61 left,
62 topn_input,
63 JoinType::Inner,
64 Condition::true_cond(),
65 correlated_id,
66 correlated_indices,
67 false,
68 );
69
70 let new_topn = {
71 order
73 .column_orders
74 .iter_mut()
75 .for_each(|ord| ord.column_index += apply_left_len);
76 group_key.iter_mut().for_each(|idx| *idx += apply_left_len);
77 let new_group_key = (0..apply_left_len).chain(group_key).collect_vec();
78 LogicalTopN::new(new_apply, limit, offset, with_ties, order, new_group_key)
79 };
80
81 let filter = LogicalFilter::create(new_topn.into(), on);
82 Some(filter)
83 }
84}
85
86impl ApplyTopNTransposeRule {
87 pub fn create() -> BoxedRule {
88 Box::new(ApplyTopNTransposeRule {})
89 }
90}