risingwave_frontend/optimizer/rule/stream/
split_now_or_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16
17use crate::expr::{ExprImpl, ExprType, FunctionCall};
18use crate::optimizer::plan_node::{LogicalFilter, LogicalShare, LogicalUnion, PlanTreeNodeUnary};
19use crate::optimizer::rule::prelude::{PlanRef, *};
20
21/// Convert `LogicalFilter` with now or others predicates to a `UNION ALL`
22///
23/// Before:
24/// ```text
25/// `LogicalFilter`
26///  now() or others
27///        |
28///      Input
29/// ```
30///
31/// After:
32/// ```text
33///         `LogicalUnionAll`
34///         /              \
35/// `LogicalFilter`     `LogicalFilter`
36/// now() & !others        others
37///         |               |
38///         \              /
39///         `LogicalShare`
40///               |
41///             Input
42/// ```text
43pub struct SplitNowOrRule {}
44impl Rule<Logical> for SplitNowOrRule {
45    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
46        let filter: &LogicalFilter = plan.as_logical_filter()?;
47        let input = filter.input();
48
49        if filter.predicate().conjunctions.len() != 1 {
50            return None;
51        }
52
53        let disjunctions = filter.predicate().conjunctions[0].as_or_disjunctions()?;
54
55        if disjunctions.len() < 2 {
56            return None;
57        }
58
59        let (now, others): (Vec<ExprImpl>, Vec<ExprImpl>) =
60            disjunctions.into_iter().partition(|x| x.count_nows() != 0);
61
62        // Only support now in one arm of disjunctions
63        if now.len() != 1 {
64            return None;
65        }
66
67        // A or B or C ... or Z
68        // =>
69        // + A & !B & !C ... &!Z
70        // + B | C ... | Z
71
72        let arm1 = ExprImpl::and(now.into_iter().chain(others.iter().map(|pred| {
73            FunctionCall::new_unchecked(ExprType::Not, vec![pred.clone()], DataType::Boolean).into()
74        })));
75        let arm2 = ExprImpl::or(others);
76
77        let share = LogicalShare::create(input);
78        let filter1 = LogicalFilter::create_with_expr(share.clone(), arm1);
79        let filter2 = LogicalFilter::create_with_expr(share.clone(), arm2);
80        let union_all = LogicalUnion::create(true, vec![filter1, filter2]);
81        Some(union_all)
82    }
83}
84
85impl SplitNowOrRule {
86    pub fn create() -> BoxedRule {
87        Box::new(SplitNowOrRule {})
88    }
89}