risingwave_frontend/optimizer/rule/stream/split_now_and_rule.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::optimizer::plan_node::{LogicalFilter, PlanTreeNodeUnary};
16use crate::optimizer::rule::prelude::{PlanRef, *};
17use crate::utils::Condition;
18
19/// Split `LogicalFilter` with many AND conjunctions with now into multiple `LogicalFilter`, prepared for `SplitNowOrRule`
20///
21/// Before:
22/// ```text
23/// `LogicalFilter`
24/// (now() or c11 or c12 ..) and (now() or c21 or c22 ...) and .. and other exprs
25/// |
26/// Input
27/// ```
28///
29/// After:
30/// ```text
31/// `LogicalFilter`(now() or c11 or c12 ..)
32/// |
33/// `LogicalFilter`(now() or c21 or c22 ...)
34/// |
35/// ......
36/// |
37/// `LogicalFilter` other exprs
38/// |
39/// Input
40/// ```
41pub struct SplitNowAndRule {}
42impl Rule<Logical> for SplitNowAndRule {
43 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
44 let filter: &LogicalFilter = plan.as_logical_filter()?;
45 let input = filter.input();
46 if filter.predicate().conjunctions.len() == 1 {
47 return None;
48 }
49
50 if filter
51 .predicate()
52 .conjunctions
53 .iter()
54 .all(|e| e.count_nows() == 0)
55 {
56 return None;
57 }
58
59 let [with_now, others] = filter
60 .predicate()
61 .clone()
62 .group_by::<_, 2>(|e| if e.count_nows() > 0 { 0 } else { 1 });
63
64 let mut plan = LogicalFilter::create(input, others);
65 for e in with_now {
66 plan = LogicalFilter::new(
67 plan,
68 Condition {
69 conjunctions: vec![e],
70 },
71 )
72 .into();
73 }
74 Some(plan)
75 }
76}
77
78impl SplitNowAndRule {
79 pub fn create() -> BoxedRule {
80 Box::new(SplitNowAndRule {})
81 }
82}