risingwave_frontend/optimizer/rule/stream/split_now_or_rule.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16
17use crate::expr::{ExprImpl, ExprType, FunctionCall};
18use crate::optimizer::PlanRef;
19use crate::optimizer::plan_node::{LogicalFilter, LogicalShare, LogicalUnion, PlanTreeNodeUnary};
20use crate::optimizer::rule::{BoxedRule, Rule};
21
22/// Convert `LogicalFilter` with now or others predicates to a `UNION ALL`
23///
24/// Before:
25/// ```text
26/// `LogicalFilter`
27/// now() or others
28/// |
29/// Input
30/// ```
31///
32/// After:
33/// ```text
34/// `LogicalUnionAll`
35/// / \
36/// `LogicalFilter` `LogicalFilter`
37/// now() & !others others
38/// | |
39/// \ /
40/// `LogicalShare`
41/// |
42/// Input
43/// ```text
44pub struct SplitNowOrRule {}
45impl Rule for SplitNowOrRule {
46 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
47 let filter: &LogicalFilter = plan.as_logical_filter()?;
48 let input = filter.input();
49
50 if filter.predicate().conjunctions.len() != 1 {
51 return None;
52 }
53
54 let disjunctions = filter.predicate().conjunctions[0].as_or_disjunctions()?;
55
56 if disjunctions.len() < 2 {
57 return None;
58 }
59
60 let (now, others): (Vec<ExprImpl>, Vec<ExprImpl>) =
61 disjunctions.into_iter().partition(|x| x.count_nows() != 0);
62
63 // Only support now in one arm of disjunctions
64 if now.len() != 1 {
65 return None;
66 }
67
68 // A or B or C ... or Z
69 // =>
70 // + A & !B & !C ... &!Z
71 // + B | C ... | Z
72
73 let arm1 = ExprImpl::and(now.into_iter().chain(others.iter().map(|pred| {
74 FunctionCall::new_unchecked(ExprType::Not, vec![pred.clone()], DataType::Boolean).into()
75 })));
76 let arm2 = ExprImpl::or(others);
77
78 let share = LogicalShare::create(input);
79 let filter1 = LogicalFilter::create_with_expr(share.clone(), arm1);
80 let filter2 = LogicalFilter::create_with_expr(share.clone(), arm2);
81 let union_all = LogicalUnion::create(true, vec![filter1, filter2]);
82 Some(union_all)
83 }
84}
85
86impl SplitNowOrRule {
87 pub fn create() -> BoxedRule {
88 Box::new(SplitNowOrRule {})
89 }
90}