risingwave_frontend/optimizer/rule/batch/
batch_iceberg_predicate_pushdown.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use chrono::Datelike;
21use iceberg::expr::{Predicate as IcebergPredicate, Reference};
22use iceberg::spec::Datum as IcebergDatum;
23use risingwave_common::catalog::Field;
24use risingwave_common::types::ScalarImpl;
25
26use super::prelude::*;
27use crate::expr::{Expr, ExprImpl, ExprType, Literal};
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary};
30use crate::utils::Condition;
31
32/// NOTE(kwannoel): We do predicate pushdown to the iceberg-sdk here.
33/// zone-map is used to evaluate predicates on iceberg tables.
34/// Without zone-map, iceberg-sdk will still apply the predicate on its own.
35/// See: <https://github.com/apache/iceberg-rust/blob/5c1a9e68da346819072a15327080a498ad91c488/crates/iceberg/src/arrow/reader.rs#L229-L235>.
36pub struct BatchIcebergPredicatePushDownRule {}
37
38impl Rule<Batch> for BatchIcebergPredicatePushDownRule {
39    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
40        let filter: &BatchFilter = plan.as_batch_filter()?;
41        let input = filter.input();
42        let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?;
43        // NOTE(kwannoel): We only fill iceberg predicate here.
44        assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue);
45
46        let predicate = filter.predicate().clone();
47        let (iceberg_predicate, rw_predicate) =
48            rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields());
49        let scan = scan.clone_with_predicate(iceberg_predicate);
50        if rw_predicate.always_true() {
51            Some(scan.into())
52        } else {
53            let filter = filter
54                .clone_with_input(scan.into())
55                .clone_with_predicate(rw_predicate);
56            Some(filter.into())
57        }
58    }
59}
60
61fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
62    let Some(scalar) = literal.get_data() else {
63        return None;
64    };
65    match scalar {
66        ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
67        ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
68        ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
69        ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
70        ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
71        ScalarImpl::Decimal(_) => {
72            // TODO(iceberg): iceberg-rust doesn't support decimal predicate pushdown yet.
73            None
74        }
75        ScalarImpl::Date(d) => {
76            let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
77                return None;
78            };
79            Some(datum)
80        }
81        ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
82            t.0.and_utc().timestamp_micros(),
83        )),
84        ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
85        ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
86        ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
87        _ => None,
88    }
89}
90
91fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
92    match expr {
93        ExprImpl::Literal(l) => match l.get_data() {
94            Some(ScalarImpl::Bool(b)) => {
95                if *b {
96                    Some(IcebergPredicate::AlwaysTrue)
97                } else {
98                    Some(IcebergPredicate::AlwaysFalse)
99                }
100            }
101            _ => None,
102        },
103        ExprImpl::FunctionCall(f) => {
104            let args = f.inputs();
105            match f.func_type() {
106                ExprType::Not => {
107                    let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
108                    Some(IcebergPredicate::negate(arg))
109                }
110                ExprType::And => {
111                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
112                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
113                    Some(IcebergPredicate::and(arg0, arg1))
114                }
115                ExprType::Or => {
116                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
117                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
118                    Some(IcebergPredicate::or(arg0, arg1))
119                }
120                ExprType::Equal if args[0].return_type() == args[1].return_type() => {
121                    match [&args[0], &args[1]] {
122                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
123                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
124                            let column_name = &fields[lhs.index].name;
125                            let reference = Reference::new(column_name);
126                            let datum = rw_literal_to_iceberg_datum(rhs)?;
127                            Some(reference.equal_to(datum))
128                        }
129                        _ => None,
130                    }
131                }
132                ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
133                    match [&args[0], &args[1]] {
134                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
135                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
136                            let column_name = &fields[lhs.index].name;
137                            let reference = Reference::new(column_name);
138                            let datum = rw_literal_to_iceberg_datum(rhs)?;
139                            Some(reference.not_equal_to(datum))
140                        }
141                        _ => None,
142                    }
143                }
144                ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
145                    match [&args[0], &args[1]] {
146                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
147                            let column_name = &fields[lhs.index].name;
148                            let reference = Reference::new(column_name);
149                            let datum = rw_literal_to_iceberg_datum(rhs)?;
150                            Some(reference.greater_than(datum))
151                        }
152                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
153                            let column_name = &fields[lhs.index].name;
154                            let reference = Reference::new(column_name);
155                            let datum = rw_literal_to_iceberg_datum(rhs)?;
156                            Some(reference.less_than_or_equal_to(datum))
157                        }
158                        _ => None,
159                    }
160                }
161                ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
162                    match [&args[0], &args[1]] {
163                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
164                            let column_name = &fields[lhs.index].name;
165                            let reference = Reference::new(column_name);
166                            let datum = rw_literal_to_iceberg_datum(rhs)?;
167                            Some(reference.greater_than_or_equal_to(datum))
168                        }
169                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
170                            let column_name = &fields[lhs.index].name;
171                            let reference = Reference::new(column_name);
172                            let datum = rw_literal_to_iceberg_datum(rhs)?;
173                            Some(reference.less_than(datum))
174                        }
175                        _ => None,
176                    }
177                }
178                ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
179                    match [&args[0], &args[1]] {
180                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
181                            let column_name = &fields[lhs.index].name;
182                            let reference = Reference::new(column_name);
183                            let datum = rw_literal_to_iceberg_datum(rhs)?;
184                            Some(reference.less_than(datum))
185                        }
186                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
187                            let column_name = &fields[lhs.index].name;
188                            let reference = Reference::new(column_name);
189                            let datum = rw_literal_to_iceberg_datum(rhs)?;
190                            Some(reference.greater_than_or_equal_to(datum))
191                        }
192                        _ => None,
193                    }
194                }
195                ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
196                    match [&args[0], &args[1]] {
197                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
198                            let column_name = &fields[lhs.index].name;
199                            let reference = Reference::new(column_name);
200                            let datum = rw_literal_to_iceberg_datum(rhs)?;
201                            Some(reference.less_than_or_equal_to(datum))
202                        }
203                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
204                            let column_name = &fields[lhs.index].name;
205                            let reference = Reference::new(column_name);
206                            let datum = rw_literal_to_iceberg_datum(rhs)?;
207                            Some(reference.greater_than(datum))
208                        }
209                        _ => None,
210                    }
211                }
212                ExprType::IsNull => match &args[0] {
213                    ExprImpl::InputRef(lhs) => {
214                        let column_name = &fields[lhs.index].name;
215                        let reference = Reference::new(column_name);
216                        Some(reference.is_null())
217                    }
218                    _ => None,
219                },
220                ExprType::IsNotNull => match &args[0] {
221                    ExprImpl::InputRef(lhs) => {
222                        let column_name = &fields[lhs.index].name;
223                        let reference = Reference::new(column_name);
224                        Some(reference.is_not_null())
225                    }
226                    _ => None,
227                },
228                ExprType::In => match &args[0] {
229                    ExprImpl::InputRef(lhs) => {
230                        let column_name = &fields[lhs.index].name;
231                        let reference = Reference::new(column_name);
232                        let mut datums = Vec::with_capacity(args.len() - 1);
233                        for arg in &args[1..] {
234                            if args[0].return_type() != arg.return_type() {
235                                return None;
236                            }
237                            if let ExprImpl::Literal(l) = arg {
238                                if let Some(datum) = rw_literal_to_iceberg_datum(l) {
239                                    datums.push(datum);
240                                } else {
241                                    return None;
242                                }
243                            } else {
244                                return None;
245                            }
246                        }
247                        Some(reference.is_in(datums))
248                    }
249                    _ => None,
250                },
251                _ => None,
252            }
253        }
254        _ => None,
255    }
256}
257fn rw_predicate_to_iceberg_predicate(
258    predicate: Condition,
259    fields: &[Field],
260) -> (IcebergPredicate, Condition) {
261    if predicate.always_true() {
262        return (IcebergPredicate::AlwaysTrue, predicate);
263    }
264
265    let mut conjunctions = predicate.conjunctions;
266    let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
267
268    let mut iceberg_condition_root = None;
269    while let Some(conjunction) = conjunctions.pop() {
270        match rw_expr_to_iceberg_predicate(&conjunction, fields) {
271            iceberg_predicate @ Some(_) => {
272                iceberg_condition_root = iceberg_predicate;
273                break;
274            }
275            None => {
276                ignored_conjunctions.push(conjunction);
277                continue;
278            }
279        }
280    }
281
282    let mut iceberg_condition_root = match iceberg_condition_root {
283        Some(p) => p,
284        None => {
285            return (
286                IcebergPredicate::AlwaysTrue,
287                Condition {
288                    conjunctions: ignored_conjunctions,
289                },
290            );
291        }
292    };
293
294    for rw_condition in conjunctions {
295        match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
296            Some(iceberg_predicate) => {
297                iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
298            }
299            None => ignored_conjunctions.push(rw_condition),
300        }
301    }
302    (
303        iceberg_condition_root,
304        Condition {
305            conjunctions: ignored_conjunctions,
306        },
307    )
308}
309
310impl BatchIcebergPredicatePushDownRule {
311    pub fn create() -> BoxedRule {
312        Box::new(BatchIcebergPredicatePushDownRule {})
313    }
314}