risingwave_frontend/optimizer/rule/stream/
split_now_or_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16
17use crate::expr::{ExprImpl, ExprType, FunctionCall};
18use crate::optimizer::PlanRef;
19use crate::optimizer::plan_node::{LogicalFilter, LogicalShare, LogicalUnion, PlanTreeNodeUnary};
20use crate::optimizer::rule::{BoxedRule, Rule};
21
22/// Convert `LogicalFilter` with now or others predicates to a `UNION ALL`
23///
24/// Before:
25/// ```text
26/// `LogicalFilter`
27///  now() or others
28///        |
29///      Input
30/// ```
31///
32/// After:
33/// ```text
34///         `LogicalUnionAll`
35///         /              \
36/// `LogicalFilter`     `LogicalFilter`
37/// now() & !others        others
38///         |               |
39///         \              /
40///         `LogicalShare`
41///               |
42///             Input
43/// ```text
44pub struct SplitNowOrRule {}
45impl Rule for SplitNowOrRule {
46    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
47        let filter: &LogicalFilter = plan.as_logical_filter()?;
48        let input = filter.input();
49
50        if filter.predicate().conjunctions.len() != 1 {
51            return None;
52        }
53
54        let disjunctions = filter.predicate().conjunctions[0].as_or_disjunctions()?;
55
56        if disjunctions.len() < 2 {
57            return None;
58        }
59
60        let (now, others): (Vec<ExprImpl>, Vec<ExprImpl>) =
61            disjunctions.into_iter().partition(|x| x.count_nows() != 0);
62
63        // Only support now in one arm of disjunctions
64        if now.len() != 1 {
65            return None;
66        }
67
68        // A or B or C ... or Z
69        // =>
70        // + A & !B & !C ... &!Z
71        // + B | C ... | Z
72
73        let arm1 = ExprImpl::and(now.into_iter().chain(others.iter().map(|pred| {
74            FunctionCall::new_unchecked(ExprType::Not, vec![pred.clone()], DataType::Boolean).into()
75        })));
76        let arm2 = ExprImpl::or(others);
77
78        let share = LogicalShare::create(input);
79        let filter1 = LogicalFilter::create_with_expr(share.clone(), arm1);
80        let filter2 = LogicalFilter::create_with_expr(share.clone(), arm2);
81        let union_all = LogicalUnion::create(true, vec![filter1, filter2]);
82        Some(union_all)
83    }
84}
85
86impl SplitNowOrRule {
87    pub fn create() -> BoxedRule {
88        Box::new(SplitNowOrRule {})
89    }
90}