risingwave_frontend/optimizer/rule/stream/
split_now_or_rule.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16
17use crate::expr::{ExprImpl, ExprType, FunctionCall};
18use crate::optimizer::plan_node::{LogicalFilter, LogicalShare, LogicalUnion, PlanTreeNodeUnary};
19use crate::optimizer::rule::prelude::{PlanRef, *};
20
21/// Convert `LogicalFilter` with now or others predicates to a `UNION ALL`
22///
23/// Before:
24/// ```text
25/// `LogicalFilter`
26///  now() or others
27///        |
28///      Input
29/// ```
30///
31/// After:
32/// ```text
33///         `LogicalUnionAll`
34///         /              \
35/// `LogicalFilter`     `LogicalFilter`
36/// now() & others IS      others
37///        NOT TRUE
38///         |               |
39///         \              /
40///         `LogicalShare`
41///               |
42///             Input
43/// ```text
44pub struct SplitNowOrRule {}
45impl Rule<Logical> for SplitNowOrRule {
46    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
47        let filter: &LogicalFilter = plan.as_logical_filter()?;
48        let input = filter.input();
49
50        if filter.predicate().conjunctions.len() != 1 {
51            return None;
52        }
53
54        let disjunctions = filter.predicate().conjunctions[0].as_or_disjunctions()?;
55
56        if disjunctions.len() < 2 {
57            return None;
58        }
59
60        let (now, others): (Vec<ExprImpl>, Vec<ExprImpl>) =
61            disjunctions.into_iter().partition(|x| x.count_nows() != 0);
62
63        // Only support now in one arm of disjunctions
64        if now.len() != 1 {
65            return None;
66        }
67
68        // A or B or C ... or Z, where A is the now() arm.
69        // =>
70        // + A & (B | C ... | Z) IS NOT TRUE
71        // + B | C ... | Z
72        //
73        // Do not use `NOT (B | C ... | Z)` here: in SQL three-valued logic,
74        // `NOT NULL` is still `NULL`, while this branch must keep rows where
75        // the non-now arms are not true and the now() arm is true.
76
77        let arm2 = ExprImpl::or(others);
78        let arm1 = ExprImpl::and([
79            now.into_iter()
80                .next()
81                .expect("there should be exactly one now() arm"),
82            FunctionCall::new_unchecked(ExprType::IsNotTrue, vec![arm2.clone()], DataType::Boolean)
83                .into(),
84        ]);
85
86        let share = LogicalShare::create(input);
87        let filter1 = LogicalFilter::create_with_expr(share.clone(), arm1);
88        let filter2 = LogicalFilter::create_with_expr(share, arm2);
89        let union_all = LogicalUnion::create(true, vec![filter1, filter2]);
90        Some(union_all)
91    }
92}
93
94impl SplitNowOrRule {
95    pub fn create() -> BoxedRule {
96        Box::new(SplitNowOrRule {})
97    }
98}