risingwave_frontend/optimizer/rule/
apply_hop_window_transpose_rule.rs1use risingwave_pb::plan_common::JoinType;
16
17use super::prelude::{PlanRef, *};
18use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalHopWindow};
19use crate::utils::Condition;
20
21pub struct ApplyHopWindowTransposeRule {}
43impl Rule<Logical> for ApplyHopWindowTransposeRule {
44 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
45 let apply: &LogicalApply = plan.as_logical_apply()?;
46 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
47 apply.clone().decompose();
48 let hop_window: &LogicalHopWindow = right.as_logical_hop_window()?;
49 assert_eq!(join_type, JoinType::Inner);
50
51 if !hop_window.output_indices_are_trivial() {
52 return None;
53 }
54
55 let (hop_window_input, time_col, window_slide, window_size, window_offset, _output_indices) =
56 hop_window.clone().into_parts();
57
58 let apply_left_len = left.schema().len() as isize;
59
60 if max_one_row {
61 return None;
62 }
63
64 let new_apply = LogicalApply::create(
65 left,
66 hop_window_input,
67 JoinType::Inner,
68 Condition::true_cond(),
69 correlated_id,
70 correlated_indices,
71 false,
72 );
73
74 let new_hop_window = LogicalHopWindow::create(
75 new_apply,
76 time_col.clone_with_offset(apply_left_len),
77 window_slide,
78 window_size,
79 window_offset,
80 );
81
82 let filter = LogicalFilter::create(new_hop_window, on);
83 Some(filter)
84 }
85}
86
87impl ApplyHopWindowTransposeRule {
88 pub fn create() -> BoxedRule {
89 Box::new(ApplyHopWindowTransposeRule {})
90 }
91}