risingwave_frontend/utils/
iceberg_predicate.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::Datelike;
16use iceberg::expr::{Predicate as IcebergPredicate, Reference};
17use iceberg::spec::Datum as IcebergDatum;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::ScalarImpl;
20
21use crate::expr::{Expr, ExprImpl, ExprType, Literal};
22use crate::utils::Condition;
23
24/// NOTE(kwannoel): We do predicate pushdown to the iceberg-sdk here.
25/// zone-map is used to evaluate predicates on iceberg tables.
26/// Without zone-map, iceberg-sdk will still apply the predicate on its own.
27/// See: <https://github.com/apache/iceberg-rust/blob/5c1a9e68da346819072a15327080a498ad91c488/crates/iceberg/src/arrow/reader.rs#L229-L235>.
28pub fn to_iceberg_predicate(
29    predicate: Condition,
30    fields: &[Field],
31) -> (IcebergPredicate, Condition) {
32    if predicate.always_true() {
33        return (IcebergPredicate::AlwaysTrue, predicate);
34    }
35
36    let mut conjunctions = predicate.conjunctions;
37    let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
38
39    let mut iceberg_condition_root = None;
40    while let Some(conjunction) = conjunctions.pop() {
41        match rw_expr_to_iceberg_predicate(&conjunction, fields) {
42            iceberg_predicate @ Some(_) => {
43                iceberg_condition_root = iceberg_predicate;
44                break;
45            }
46            None => {
47                ignored_conjunctions.push(conjunction);
48                continue;
49            }
50        }
51    }
52
53    let mut iceberg_condition_root = match iceberg_condition_root {
54        Some(p) => p,
55        None => {
56            return (
57                IcebergPredicate::AlwaysTrue,
58                Condition {
59                    conjunctions: ignored_conjunctions,
60                },
61            );
62        }
63    };
64
65    for rw_condition in conjunctions {
66        match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
67            Some(iceberg_predicate) => {
68                iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
69            }
70            None => ignored_conjunctions.push(rw_condition),
71        }
72    }
73    (
74        iceberg_condition_root,
75        Condition {
76            conjunctions: ignored_conjunctions,
77        },
78    )
79}
80
81fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
82    let Some(scalar) = literal.get_data() else {
83        return None;
84    };
85    match scalar {
86        ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
87        ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
88        ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
89        ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
90        ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
91        ScalarImpl::Decimal(_) => {
92            // TODO(iceberg): iceberg-rust doesn't support decimal predicate pushdown yet.
93            None
94        }
95        ScalarImpl::Date(d) => {
96            let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
97                return None;
98            };
99            Some(datum)
100        }
101        ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
102            t.0.and_utc().timestamp_micros(),
103        )),
104        ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
105        ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
106        ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
107        _ => None,
108    }
109}
110
111fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
112    match expr {
113        ExprImpl::Literal(l) => match l.get_data() {
114            Some(ScalarImpl::Bool(b)) => {
115                if *b {
116                    Some(IcebergPredicate::AlwaysTrue)
117                } else {
118                    Some(IcebergPredicate::AlwaysFalse)
119                }
120            }
121            _ => None,
122        },
123        ExprImpl::FunctionCall(f) => {
124            let args = f.inputs();
125            match f.func_type() {
126                ExprType::Not => {
127                    let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
128                    Some(IcebergPredicate::negate(arg))
129                }
130                ExprType::And => {
131                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
132                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
133                    Some(IcebergPredicate::and(arg0, arg1))
134                }
135                ExprType::Or => {
136                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
137                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
138                    Some(IcebergPredicate::or(arg0, arg1))
139                }
140                ExprType::Equal if args[0].return_type() == args[1].return_type() => {
141                    match [&args[0], &args[1]] {
142                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
143                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
144                            let column_name = &fields[lhs.index].name;
145                            let reference = Reference::new(column_name);
146                            let datum = rw_literal_to_iceberg_datum(rhs)?;
147                            Some(reference.equal_to(datum))
148                        }
149                        _ => None,
150                    }
151                }
152                ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
153                    match [&args[0], &args[1]] {
154                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
155                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
156                            let column_name = &fields[lhs.index].name;
157                            let reference = Reference::new(column_name);
158                            let datum = rw_literal_to_iceberg_datum(rhs)?;
159                            Some(reference.not_equal_to(datum))
160                        }
161                        _ => None,
162                    }
163                }
164                ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
165                    match [&args[0], &args[1]] {
166                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
167                            let column_name = &fields[lhs.index].name;
168                            let reference = Reference::new(column_name);
169                            let datum = rw_literal_to_iceberg_datum(rhs)?;
170                            Some(reference.greater_than(datum))
171                        }
172                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
173                            let column_name = &fields[lhs.index].name;
174                            let reference = Reference::new(column_name);
175                            let datum = rw_literal_to_iceberg_datum(rhs)?;
176                            Some(reference.less_than_or_equal_to(datum))
177                        }
178                        _ => None,
179                    }
180                }
181                ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
182                    match [&args[0], &args[1]] {
183                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
184                            let column_name = &fields[lhs.index].name;
185                            let reference = Reference::new(column_name);
186                            let datum = rw_literal_to_iceberg_datum(rhs)?;
187                            Some(reference.greater_than_or_equal_to(datum))
188                        }
189                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
190                            let column_name = &fields[lhs.index].name;
191                            let reference = Reference::new(column_name);
192                            let datum = rw_literal_to_iceberg_datum(rhs)?;
193                            Some(reference.less_than(datum))
194                        }
195                        _ => None,
196                    }
197                }
198                ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
199                    match [&args[0], &args[1]] {
200                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
201                            let column_name = &fields[lhs.index].name;
202                            let reference = Reference::new(column_name);
203                            let datum = rw_literal_to_iceberg_datum(rhs)?;
204                            Some(reference.less_than(datum))
205                        }
206                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
207                            let column_name = &fields[lhs.index].name;
208                            let reference = Reference::new(column_name);
209                            let datum = rw_literal_to_iceberg_datum(rhs)?;
210                            Some(reference.greater_than_or_equal_to(datum))
211                        }
212                        _ => None,
213                    }
214                }
215                ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
216                    match [&args[0], &args[1]] {
217                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
218                            let column_name = &fields[lhs.index].name;
219                            let reference = Reference::new(column_name);
220                            let datum = rw_literal_to_iceberg_datum(rhs)?;
221                            Some(reference.less_than_or_equal_to(datum))
222                        }
223                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
224                            let column_name = &fields[lhs.index].name;
225                            let reference = Reference::new(column_name);
226                            let datum = rw_literal_to_iceberg_datum(rhs)?;
227                            Some(reference.greater_than(datum))
228                        }
229                        _ => None,
230                    }
231                }
232                ExprType::IsNull => match &args[0] {
233                    ExprImpl::InputRef(lhs) => {
234                        let column_name = &fields[lhs.index].name;
235                        let reference = Reference::new(column_name);
236                        Some(reference.is_null())
237                    }
238                    _ => None,
239                },
240                ExprType::IsNotNull => match &args[0] {
241                    ExprImpl::InputRef(lhs) => {
242                        let column_name = &fields[lhs.index].name;
243                        let reference = Reference::new(column_name);
244                        Some(reference.is_not_null())
245                    }
246                    _ => None,
247                },
248                ExprType::In => match &args[0] {
249                    ExprImpl::InputRef(lhs) => {
250                        let column_name = &fields[lhs.index].name;
251                        let reference = Reference::new(column_name);
252                        let mut datums = Vec::with_capacity(args.len() - 1);
253                        for arg in &args[1..] {
254                            if args[0].return_type() != arg.return_type() {
255                                return None;
256                            }
257                            if let ExprImpl::Literal(l) = arg {
258                                if let Some(datum) = rw_literal_to_iceberg_datum(l) {
259                                    datums.push(datum);
260                                } else {
261                                    return None;
262                                }
263                            } else {
264                                return None;
265                            }
266                        }
267                        Some(reference.is_in(datums))
268                    }
269                    _ => None,
270                },
271                _ => None,
272            }
273        }
274        _ => None,
275    }
276}