risingwave_frontend/optimizer/rule/stream/split_now_and_rule.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::optimizer::PlanRef;
16use crate::optimizer::plan_node::{LogicalFilter, PlanTreeNodeUnary};
17use crate::optimizer::rule::{BoxedRule, Rule};
18use crate::utils::Condition;
19
20/// Split `LogicalFilter` with many AND conjunctions with now into multiple `LogicalFilter`, prepared for `SplitNowOrRule`
21///
22/// Before:
23/// ```text
24/// `LogicalFilter`
25/// (now() or c11 or c12 ..) and (now() or c21 or c22 ...) and .. and other exprs
26/// |
27/// Input
28/// ```
29///
30/// After:
31/// ```text
32/// `LogicalFilter`(now() or c11 or c12 ..)
33/// |
34/// `LogicalFilter`(now() or c21 or c22 ...)
35/// |
36/// ......
37/// |
38/// `LogicalFilter` other exprs
39/// |
40/// Input
41/// ```
42pub struct SplitNowAndRule {}
43impl Rule for SplitNowAndRule {
44 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
45 let filter: &LogicalFilter = plan.as_logical_filter()?;
46 let input = filter.input();
47 if filter.predicate().conjunctions.len() == 1 {
48 return None;
49 }
50
51 if filter
52 .predicate()
53 .conjunctions
54 .iter()
55 .all(|e| e.count_nows() == 0)
56 {
57 return None;
58 }
59
60 let [with_now, others] = filter
61 .predicate()
62 .clone()
63 .group_by::<_, 2>(|e| if e.count_nows() > 0 { 0 } else { 1 });
64
65 let mut plan = LogicalFilter::create(input, others);
66 for e in with_now {
67 plan = LogicalFilter::new(
68 plan,
69 Condition {
70 conjunctions: vec![e],
71 },
72 )
73 .into();
74 }
75 Some(plan)
76 }
77}
78
79impl SplitNowAndRule {
80 pub fn create() -> BoxedRule {
81 Box::new(SplitNowAndRule {})
82 }
83}