risingwave_frontend/optimizer/rule/stream/
split_now_and_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::optimizer::plan_node::{LogicalFilter, PlanTreeNodeUnary};
16use crate::optimizer::rule::prelude::{PlanRef, *};
17use crate::utils::Condition;
18
19/// Split `LogicalFilter` with many AND conjunctions with now into multiple `LogicalFilter`, prepared for `SplitNowOrRule`
20///
21/// Before:
22/// ```text
23/// `LogicalFilter`
24///  (now() or c11 or c12 ..) and (now() or c21 or c22 ...) and .. and other exprs
25///        |
26///      Input
27/// ```
28///
29/// After:
30/// ```text
31/// `LogicalFilter`(now() or c11 or c12 ..)
32///        |
33/// `LogicalFilter`(now() or c21 or c22 ...)
34///        |
35///      ......
36///        |
37/// `LogicalFilter` other exprs
38///        |
39///      Input
40/// ```
41pub struct SplitNowAndRule {}
42impl Rule<Logical> for SplitNowAndRule {
43    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
44        let filter: &LogicalFilter = plan.as_logical_filter()?;
45        let input = filter.input();
46        if filter.predicate().conjunctions.len() == 1 {
47            return None;
48        }
49
50        if filter
51            .predicate()
52            .conjunctions
53            .iter()
54            .all(|e| e.count_nows() == 0)
55        {
56            return None;
57        }
58
59        let [with_now, others] = filter
60            .predicate()
61            .clone()
62            .group_by::<_, 2>(|e| if e.count_nows() > 0 { 0 } else { 1 });
63
64        let mut plan = LogicalFilter::create(input, others);
65        for e in with_now {
66            plan = LogicalFilter::new(
67                plan,
68                Condition {
69                    conjunctions: vec![e],
70                },
71            )
72            .into();
73        }
74        Some(plan)
75    }
76}
77
78impl SplitNowAndRule {
79    pub fn create() -> BoxedRule {
80        Box::new(SplitNowAndRule {})
81    }
82}