risingwave_frontend/optimizer/rule/batch/
batch_iceberg_predicate_pushdown.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use chrono::Datelike;
21use iceberg::expr::{Predicate as IcebergPredicate, Reference};
22use iceberg::spec::Datum as IcebergDatum;
23use risingwave_common::catalog::Field;
24use risingwave_common::types::{Decimal, ScalarImpl};
25
26use crate::expr::{Expr, ExprImpl, ExprType, Literal};
27use crate::optimizer::PlanRef;
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary};
30use crate::optimizer::rule::{BoxedRule, Rule};
31use crate::utils::Condition;
32
33/// NOTE(kwannoel): We do predicate pushdown to the iceberg-sdk here.
34/// zone-map is used to evaluate predicates on iceberg tables.
35/// Without zone-map, iceberg-sdk will still apply the predicate on its own.
36/// See: <https://github.com/apache/iceberg-rust/blob/5c1a9e68da346819072a15327080a498ad91c488/crates/iceberg/src/arrow/reader.rs#L229-L235>.
37pub struct BatchIcebergPredicatePushDownRule {}
38
39impl Rule for BatchIcebergPredicatePushDownRule {
40    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
41        let filter: &BatchFilter = plan.as_batch_filter()?;
42        let input = filter.input();
43        let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?;
44        // NOTE(kwannoel): We only fill iceberg predicate here.
45        assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue);
46
47        let predicate = filter.predicate().clone();
48        let (iceberg_predicate, rw_predicate) =
49            rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields());
50        let scan = scan.clone_with_predicate(iceberg_predicate);
51        if rw_predicate.always_true() {
52            Some(scan.into())
53        } else {
54            let filter = filter
55                .clone_with_input(scan.into())
56                .clone_with_predicate(rw_predicate);
57            Some(filter.into())
58        }
59    }
60}
61
62fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
63    let Some(scalar) = literal.get_data() else {
64        return None;
65    };
66    match scalar {
67        ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
68        ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
69        ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
70        ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
71        ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
72        ScalarImpl::Decimal(d) => {
73            let Decimal::Normalized(d) = d else {
74                return None;
75            };
76            let Ok(d) = IcebergDatum::decimal(*d) else {
77                return None;
78            };
79            Some(d)
80        }
81        ScalarImpl::Date(d) => {
82            let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
83                return None;
84            };
85            Some(datum)
86        }
87        ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
88            t.0.and_utc().timestamp_micros(),
89        )),
90        ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
91        ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
92        ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
93        _ => None,
94    }
95}
96
97fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
98    match expr {
99        ExprImpl::Literal(l) => match l.get_data() {
100            Some(ScalarImpl::Bool(b)) => {
101                if *b {
102                    Some(IcebergPredicate::AlwaysTrue)
103                } else {
104                    Some(IcebergPredicate::AlwaysFalse)
105                }
106            }
107            _ => None,
108        },
109        ExprImpl::FunctionCall(f) => {
110            let args = f.inputs();
111            match f.func_type() {
112                ExprType::Not => {
113                    let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
114                    Some(IcebergPredicate::negate(arg))
115                }
116                ExprType::And => {
117                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
118                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
119                    Some(IcebergPredicate::and(arg0, arg1))
120                }
121                ExprType::Or => {
122                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
123                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
124                    Some(IcebergPredicate::or(arg0, arg1))
125                }
126                ExprType::Equal if args[0].return_type() == args[1].return_type() => {
127                    match [&args[0], &args[1]] {
128                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
129                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
130                            let column_name = &fields[lhs.index].name;
131                            let reference = Reference::new(column_name);
132                            let datum = rw_literal_to_iceberg_datum(rhs)?;
133                            Some(reference.equal_to(datum))
134                        }
135                        _ => None,
136                    }
137                }
138                ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
139                    match [&args[0], &args[1]] {
140                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
141                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
142                            let column_name = &fields[lhs.index].name;
143                            let reference = Reference::new(column_name);
144                            let datum = rw_literal_to_iceberg_datum(rhs)?;
145                            Some(reference.not_equal_to(datum))
146                        }
147                        _ => None,
148                    }
149                }
150                ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
151                    match [&args[0], &args[1]] {
152                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
153                            let column_name = &fields[lhs.index].name;
154                            let reference = Reference::new(column_name);
155                            let datum = rw_literal_to_iceberg_datum(rhs)?;
156                            Some(reference.greater_than(datum))
157                        }
158                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
159                            let column_name = &fields[lhs.index].name;
160                            let reference = Reference::new(column_name);
161                            let datum = rw_literal_to_iceberg_datum(rhs)?;
162                            Some(reference.less_than_or_equal_to(datum))
163                        }
164                        _ => None,
165                    }
166                }
167                ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
168                    match [&args[0], &args[1]] {
169                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
170                            let column_name = &fields[lhs.index].name;
171                            let reference = Reference::new(column_name);
172                            let datum = rw_literal_to_iceberg_datum(rhs)?;
173                            Some(reference.greater_than_or_equal_to(datum))
174                        }
175                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
176                            let column_name = &fields[lhs.index].name;
177                            let reference = Reference::new(column_name);
178                            let datum = rw_literal_to_iceberg_datum(rhs)?;
179                            Some(reference.less_than(datum))
180                        }
181                        _ => None,
182                    }
183                }
184                ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
185                    match [&args[0], &args[1]] {
186                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
187                            let column_name = &fields[lhs.index].name;
188                            let reference = Reference::new(column_name);
189                            let datum = rw_literal_to_iceberg_datum(rhs)?;
190                            Some(reference.less_than(datum))
191                        }
192                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
193                            let column_name = &fields[lhs.index].name;
194                            let reference = Reference::new(column_name);
195                            let datum = rw_literal_to_iceberg_datum(rhs)?;
196                            Some(reference.greater_than_or_equal_to(datum))
197                        }
198                        _ => None,
199                    }
200                }
201                ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
202                    match [&args[0], &args[1]] {
203                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
204                            let column_name = &fields[lhs.index].name;
205                            let reference = Reference::new(column_name);
206                            let datum = rw_literal_to_iceberg_datum(rhs)?;
207                            Some(reference.less_than_or_equal_to(datum))
208                        }
209                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
210                            let column_name = &fields[lhs.index].name;
211                            let reference = Reference::new(column_name);
212                            let datum = rw_literal_to_iceberg_datum(rhs)?;
213                            Some(reference.greater_than(datum))
214                        }
215                        _ => None,
216                    }
217                }
218                ExprType::IsNull => match &args[0] {
219                    ExprImpl::InputRef(lhs) => {
220                        let column_name = &fields[lhs.index].name;
221                        let reference = Reference::new(column_name);
222                        Some(reference.is_null())
223                    }
224                    _ => None,
225                },
226                ExprType::IsNotNull => match &args[0] {
227                    ExprImpl::InputRef(lhs) => {
228                        let column_name = &fields[lhs.index].name;
229                        let reference = Reference::new(column_name);
230                        Some(reference.is_not_null())
231                    }
232                    _ => None,
233                },
234                ExprType::In => match &args[0] {
235                    ExprImpl::InputRef(lhs) => {
236                        let column_name = &fields[lhs.index].name;
237                        let reference = Reference::new(column_name);
238                        let mut datums = Vec::with_capacity(args.len() - 1);
239                        for arg in &args[1..] {
240                            if args[0].return_type() != arg.return_type() {
241                                return None;
242                            }
243                            if let ExprImpl::Literal(l) = arg {
244                                if let Some(datum) = rw_literal_to_iceberg_datum(l) {
245                                    datums.push(datum);
246                                } else {
247                                    return None;
248                                }
249                            } else {
250                                return None;
251                            }
252                        }
253                        Some(reference.is_in(datums))
254                    }
255                    _ => None,
256                },
257                _ => None,
258            }
259        }
260        _ => None,
261    }
262}
263fn rw_predicate_to_iceberg_predicate(
264    predicate: Condition,
265    fields: &[Field],
266) -> (IcebergPredicate, Condition) {
267    if predicate.always_true() {
268        return (IcebergPredicate::AlwaysTrue, predicate);
269    }
270
271    let mut conjunctions = predicate.conjunctions;
272    let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
273
274    let mut iceberg_condition_root = None;
275    while let Some(conjunction) = conjunctions.pop() {
276        match rw_expr_to_iceberg_predicate(&conjunction, fields) {
277            iceberg_predicate @ Some(_) => {
278                iceberg_condition_root = iceberg_predicate;
279                break;
280            }
281            None => {
282                ignored_conjunctions.push(conjunction);
283                continue;
284            }
285        }
286    }
287
288    let mut iceberg_condition_root = match iceberg_condition_root {
289        Some(p) => p,
290        None => {
291            return (
292                IcebergPredicate::AlwaysTrue,
293                Condition {
294                    conjunctions: ignored_conjunctions,
295                },
296            );
297        }
298    };
299
300    for rw_condition in conjunctions {
301        match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
302            Some(iceberg_predicate) => {
303                iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
304            }
305            None => ignored_conjunctions.push(rw_condition),
306        }
307    }
308    (
309        iceberg_condition_root,
310        Condition {
311            conjunctions: ignored_conjunctions,
312        },
313    )
314}
315
316impl BatchIcebergPredicatePushDownRule {
317    pub fn create() -> BoxedRule {
318        Box::new(BatchIcebergPredicatePushDownRule {})
319    }
320}