risingwave_frontend/optimizer/rule/
apply_hop_window_transpose_rule.rs1use risingwave_pb::plan_common::JoinType;
16
17use super::{BoxedRule, Rule};
18use crate::optimizer::PlanRef;
19use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalHopWindow};
20use crate::utils::Condition;
21
22pub struct ApplyHopWindowTransposeRule {}
44impl Rule for ApplyHopWindowTransposeRule {
45 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
46 let apply: &LogicalApply = plan.as_logical_apply()?;
47 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
48 apply.clone().decompose();
49 let hop_window: &LogicalHopWindow = right.as_logical_hop_window()?;
50 assert_eq!(join_type, JoinType::Inner);
51
52 if !hop_window.output_indices_are_trivial() {
53 return None;
54 }
55
56 let (hop_window_input, time_col, window_slide, window_size, window_offset, _output_indices) =
57 hop_window.clone().into_parts();
58
59 let apply_left_len = left.schema().len() as isize;
60
61 if max_one_row {
62 return None;
63 }
64
65 let new_apply = LogicalApply::create(
66 left,
67 hop_window_input,
68 JoinType::Inner,
69 Condition::true_cond(),
70 correlated_id,
71 correlated_indices,
72 false,
73 );
74
75 let new_hop_window = LogicalHopWindow::create(
76 new_apply,
77 time_col.clone_with_offset(apply_left_len),
78 window_slide,
79 window_size,
80 window_offset,
81 );
82
83 let filter = LogicalFilter::create(new_hop_window, on);
84 Some(filter)
85 }
86}
87
88impl ApplyHopWindowTransposeRule {
89 pub fn create() -> BoxedRule {
90 Box::new(ApplyHopWindowTransposeRule {})
91 }
92}