risingwave_frontend/optimizer/rule/stream/split_now_or_rule.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16
17use crate::expr::{ExprImpl, ExprType, FunctionCall};
18use crate::optimizer::plan_node::{LogicalFilter, LogicalShare, LogicalUnion, PlanTreeNodeUnary};
19use crate::optimizer::rule::prelude::{PlanRef, *};
20
21/// Convert `LogicalFilter` with now or others predicates to a `UNION ALL`
22///
23/// Before:
24/// ```text
25/// `LogicalFilter`
26/// now() or others
27/// |
28/// Input
29/// ```
30///
31/// After:
32/// ```text
33/// `LogicalUnionAll`
34/// / \
35/// `LogicalFilter` `LogicalFilter`
36/// now() & !others others
37/// | |
38/// \ /
39/// `LogicalShare`
40/// |
41/// Input
42/// ```text
43pub struct SplitNowOrRule {}
44impl Rule<Logical> for SplitNowOrRule {
45 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
46 let filter: &LogicalFilter = plan.as_logical_filter()?;
47 let input = filter.input();
48
49 if filter.predicate().conjunctions.len() != 1 {
50 return None;
51 }
52
53 let disjunctions = filter.predicate().conjunctions[0].as_or_disjunctions()?;
54
55 if disjunctions.len() < 2 {
56 return None;
57 }
58
59 let (now, others): (Vec<ExprImpl>, Vec<ExprImpl>) =
60 disjunctions.into_iter().partition(|x| x.count_nows() != 0);
61
62 // Only support now in one arm of disjunctions
63 if now.len() != 1 {
64 return None;
65 }
66
67 // A or B or C ... or Z
68 // =>
69 // + A & !B & !C ... &!Z
70 // + B | C ... | Z
71
72 let arm1 = ExprImpl::and(now.into_iter().chain(others.iter().map(|pred| {
73 FunctionCall::new_unchecked(ExprType::Not, vec![pred.clone()], DataType::Boolean).into()
74 })));
75 let arm2 = ExprImpl::or(others);
76
77 let share = LogicalShare::create(input);
78 let filter1 = LogicalFilter::create_with_expr(share.clone(), arm1);
79 let filter2 = LogicalFilter::create_with_expr(share.clone(), arm2);
80 let union_all = LogicalUnion::create(true, vec![filter1, filter2]);
81 Some(union_all)
82 }
83}
84
85impl SplitNowOrRule {
86 pub fn create() -> BoxedRule {
87 Box::new(SplitNowOrRule {})
88 }
89}