risingwave_frontend/optimizer/rule/batch/
batch_iceberg_predicate_pushdown.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use chrono::Datelike;
21use iceberg::expr::{Predicate as IcebergPredicate, Reference};
22use iceberg::spec::Datum as IcebergDatum;
23use risingwave_common::catalog::Field;
24use risingwave_common::types::ScalarImpl;
25
26use crate::expr::{Expr, ExprImpl, ExprType, Literal};
27use crate::optimizer::PlanRef;
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary};
30use crate::optimizer::rule::{BoxedRule, Rule};
31use crate::utils::Condition;
32
33/// NOTE(kwannoel): We do predicate pushdown to the iceberg-sdk here.
34/// zone-map is used to evaluate predicates on iceberg tables.
35/// Without zone-map, iceberg-sdk will still apply the predicate on its own.
36/// See: <https://github.com/apache/iceberg-rust/blob/5c1a9e68da346819072a15327080a498ad91c488/crates/iceberg/src/arrow/reader.rs#L229-L235>.
37pub struct BatchIcebergPredicatePushDownRule {}
38
39impl Rule for BatchIcebergPredicatePushDownRule {
40    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
41        let filter: &BatchFilter = plan.as_batch_filter()?;
42        let input = filter.input();
43        let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?;
44        // NOTE(kwannoel): We only fill iceberg predicate here.
45        assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue);
46
47        let predicate = filter.predicate().clone();
48        let (iceberg_predicate, rw_predicate) =
49            rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields());
50        let scan = scan.clone_with_predicate(iceberg_predicate);
51        if rw_predicate.always_true() {
52            Some(scan.into())
53        } else {
54            let filter = filter
55                .clone_with_input(scan.into())
56                .clone_with_predicate(rw_predicate);
57            Some(filter.into())
58        }
59    }
60}
61
62fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
63    let Some(scalar) = literal.get_data() else {
64        return None;
65    };
66    match scalar {
67        ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
68        ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
69        ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
70        ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
71        ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
72        ScalarImpl::Decimal(_) => {
73            // TODO(iceberg): iceberg-rust doesn't support decimal predicate pushdown yet.
74            None
75        }
76        ScalarImpl::Date(d) => {
77            let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
78                return None;
79            };
80            Some(datum)
81        }
82        ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
83            t.0.and_utc().timestamp_micros(),
84        )),
85        ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
86        ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
87        ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
88        _ => None,
89    }
90}
91
92fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
93    match expr {
94        ExprImpl::Literal(l) => match l.get_data() {
95            Some(ScalarImpl::Bool(b)) => {
96                if *b {
97                    Some(IcebergPredicate::AlwaysTrue)
98                } else {
99                    Some(IcebergPredicate::AlwaysFalse)
100                }
101            }
102            _ => None,
103        },
104        ExprImpl::FunctionCall(f) => {
105            let args = f.inputs();
106            match f.func_type() {
107                ExprType::Not => {
108                    let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
109                    Some(IcebergPredicate::negate(arg))
110                }
111                ExprType::And => {
112                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
113                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
114                    Some(IcebergPredicate::and(arg0, arg1))
115                }
116                ExprType::Or => {
117                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
118                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
119                    Some(IcebergPredicate::or(arg0, arg1))
120                }
121                ExprType::Equal if args[0].return_type() == args[1].return_type() => {
122                    match [&args[0], &args[1]] {
123                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
124                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
125                            let column_name = &fields[lhs.index].name;
126                            let reference = Reference::new(column_name);
127                            let datum = rw_literal_to_iceberg_datum(rhs)?;
128                            Some(reference.equal_to(datum))
129                        }
130                        _ => None,
131                    }
132                }
133                ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
134                    match [&args[0], &args[1]] {
135                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
136                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
137                            let column_name = &fields[lhs.index].name;
138                            let reference = Reference::new(column_name);
139                            let datum = rw_literal_to_iceberg_datum(rhs)?;
140                            Some(reference.not_equal_to(datum))
141                        }
142                        _ => None,
143                    }
144                }
145                ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
146                    match [&args[0], &args[1]] {
147                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
148                            let column_name = &fields[lhs.index].name;
149                            let reference = Reference::new(column_name);
150                            let datum = rw_literal_to_iceberg_datum(rhs)?;
151                            Some(reference.greater_than(datum))
152                        }
153                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
154                            let column_name = &fields[lhs.index].name;
155                            let reference = Reference::new(column_name);
156                            let datum = rw_literal_to_iceberg_datum(rhs)?;
157                            Some(reference.less_than_or_equal_to(datum))
158                        }
159                        _ => None,
160                    }
161                }
162                ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
163                    match [&args[0], &args[1]] {
164                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
165                            let column_name = &fields[lhs.index].name;
166                            let reference = Reference::new(column_name);
167                            let datum = rw_literal_to_iceberg_datum(rhs)?;
168                            Some(reference.greater_than_or_equal_to(datum))
169                        }
170                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
171                            let column_name = &fields[lhs.index].name;
172                            let reference = Reference::new(column_name);
173                            let datum = rw_literal_to_iceberg_datum(rhs)?;
174                            Some(reference.less_than(datum))
175                        }
176                        _ => None,
177                    }
178                }
179                ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
180                    match [&args[0], &args[1]] {
181                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
182                            let column_name = &fields[lhs.index].name;
183                            let reference = Reference::new(column_name);
184                            let datum = rw_literal_to_iceberg_datum(rhs)?;
185                            Some(reference.less_than(datum))
186                        }
187                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
188                            let column_name = &fields[lhs.index].name;
189                            let reference = Reference::new(column_name);
190                            let datum = rw_literal_to_iceberg_datum(rhs)?;
191                            Some(reference.greater_than_or_equal_to(datum))
192                        }
193                        _ => None,
194                    }
195                }
196                ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
197                    match [&args[0], &args[1]] {
198                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
199                            let column_name = &fields[lhs.index].name;
200                            let reference = Reference::new(column_name);
201                            let datum = rw_literal_to_iceberg_datum(rhs)?;
202                            Some(reference.less_than_or_equal_to(datum))
203                        }
204                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
205                            let column_name = &fields[lhs.index].name;
206                            let reference = Reference::new(column_name);
207                            let datum = rw_literal_to_iceberg_datum(rhs)?;
208                            Some(reference.greater_than(datum))
209                        }
210                        _ => None,
211                    }
212                }
213                ExprType::IsNull => match &args[0] {
214                    ExprImpl::InputRef(lhs) => {
215                        let column_name = &fields[lhs.index].name;
216                        let reference = Reference::new(column_name);
217                        Some(reference.is_null())
218                    }
219                    _ => None,
220                },
221                ExprType::IsNotNull => match &args[0] {
222                    ExprImpl::InputRef(lhs) => {
223                        let column_name = &fields[lhs.index].name;
224                        let reference = Reference::new(column_name);
225                        Some(reference.is_not_null())
226                    }
227                    _ => None,
228                },
229                ExprType::In => match &args[0] {
230                    ExprImpl::InputRef(lhs) => {
231                        let column_name = &fields[lhs.index].name;
232                        let reference = Reference::new(column_name);
233                        let mut datums = Vec::with_capacity(args.len() - 1);
234                        for arg in &args[1..] {
235                            if args[0].return_type() != arg.return_type() {
236                                return None;
237                            }
238                            if let ExprImpl::Literal(l) = arg {
239                                if let Some(datum) = rw_literal_to_iceberg_datum(l) {
240                                    datums.push(datum);
241                                } else {
242                                    return None;
243                                }
244                            } else {
245                                return None;
246                            }
247                        }
248                        Some(reference.is_in(datums))
249                    }
250                    _ => None,
251                },
252                _ => None,
253            }
254        }
255        _ => None,
256    }
257}
258fn rw_predicate_to_iceberg_predicate(
259    predicate: Condition,
260    fields: &[Field],
261) -> (IcebergPredicate, Condition) {
262    if predicate.always_true() {
263        return (IcebergPredicate::AlwaysTrue, predicate);
264    }
265
266    let mut conjunctions = predicate.conjunctions;
267    let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
268
269    let mut iceberg_condition_root = None;
270    while let Some(conjunction) = conjunctions.pop() {
271        match rw_expr_to_iceberg_predicate(&conjunction, fields) {
272            iceberg_predicate @ Some(_) => {
273                iceberg_condition_root = iceberg_predicate;
274                break;
275            }
276            None => {
277                ignored_conjunctions.push(conjunction);
278                continue;
279            }
280        }
281    }
282
283    let mut iceberg_condition_root = match iceberg_condition_root {
284        Some(p) => p,
285        None => {
286            return (
287                IcebergPredicate::AlwaysTrue,
288                Condition {
289                    conjunctions: ignored_conjunctions,
290                },
291            );
292        }
293    };
294
295    for rw_condition in conjunctions {
296        match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
297            Some(iceberg_predicate) => {
298                iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
299            }
300            None => ignored_conjunctions.push(rw_condition),
301        }
302    }
303    (
304        iceberg_condition_root,
305        Condition {
306            conjunctions: ignored_conjunctions,
307        },
308    )
309}
310
311impl BatchIcebergPredicatePushDownRule {
312    pub fn create() -> BoxedRule {
313        Box::new(BatchIcebergPredicatePushDownRule {})
314    }
315}