risingwave_frontend/optimizer/rule/stream/
split_now_and_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::optimizer::PlanRef;
16use crate::optimizer::plan_node::{LogicalFilter, PlanTreeNodeUnary};
17use crate::optimizer::rule::{BoxedRule, Rule};
18use crate::utils::Condition;
19
20/// Split `LogicalFilter` with many AND conjunctions with now into multiple `LogicalFilter`, prepared for `SplitNowOrRule`
21///
22/// Before:
23/// ```text
24/// `LogicalFilter`
25///  (now() or c11 or c12 ..) and (now() or c21 or c22 ...) and .. and other exprs
26///        |
27///      Input
28/// ```
29///
30/// After:
31/// ```text
32/// `LogicalFilter`(now() or c11 or c12 ..)
33///        |
34/// `LogicalFilter`(now() or c21 or c22 ...)
35///        |
36///      ......
37///        |
38/// `LogicalFilter` other exprs
39///        |
40///      Input
41/// ```
42pub struct SplitNowAndRule {}
43impl Rule for SplitNowAndRule {
44    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
45        let filter: &LogicalFilter = plan.as_logical_filter()?;
46        let input = filter.input();
47        if filter.predicate().conjunctions.len() == 1 {
48            return None;
49        }
50
51        if filter
52            .predicate()
53            .conjunctions
54            .iter()
55            .all(|e| e.count_nows() == 0)
56        {
57            return None;
58        }
59
60        let [with_now, others] = filter
61            .predicate()
62            .clone()
63            .group_by::<_, 2>(|e| if e.count_nows() > 0 { 0 } else { 1 });
64
65        let mut plan = LogicalFilter::create(input, others);
66        for e in with_now {
67            plan = LogicalFilter::new(
68                plan,
69                Condition {
70                    conjunctions: vec![e],
71                },
72            )
73            .into();
74        }
75        Some(plan)
76    }
77}
78
79impl SplitNowAndRule {
80    pub fn create() -> BoxedRule {
81        Box::new(SplitNowAndRule {})
82    }
83}