risingwave_frontend/utils/
iceberg_predicate.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::Datelike;
16use iceberg::expr::{Predicate as IcebergPredicate, Reference};
17use iceberg::spec::Datum as IcebergDatum;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::ScalarImpl;
20
21use crate::expr::{Expr, ExprImpl, ExprType, Literal};
22use crate::utils::Condition;
23
24pub struct ExtractIcebergPredicateResult {
25    pub iceberg_predicate: IcebergPredicate,
26    pub extracted_condition: Condition,
27    pub remaining_condition: Condition,
28}
29
30/// NOTE(kwannoel): We do predicate pushdown to the iceberg-sdk here.
31/// zone-map is used to evaluate predicates on iceberg tables.
32/// Without zone-map, iceberg-sdk will still apply the predicate on its own.
33/// See: <https://github.com/apache/iceberg-rust/blob/5c1a9e68da346819072a15327080a498ad91c488/crates/iceberg/src/arrow/reader.rs#L229-L235>.
34pub fn extract_iceberg_predicate(
35    predicate: Condition,
36    fields: &[Field],
37) -> ExtractIcebergPredicateResult {
38    if predicate.always_true() {
39        return ExtractIcebergPredicateResult {
40            iceberg_predicate: IcebergPredicate::AlwaysTrue,
41            extracted_condition: Condition {
42                conjunctions: vec![],
43            },
44            remaining_condition: Condition {
45                conjunctions: vec![],
46            },
47        };
48    }
49
50    let mut iceberg_predicates = Vec::new();
51    let mut extracted_conjunctions = Vec::new();
52    let mut remaining_conjunctions = Vec::new();
53
54    for conjunction in predicate.conjunctions {
55        match rw_expr_to_iceberg_predicate(&conjunction, fields) {
56            Some(iceberg_predicate) => {
57                iceberg_predicates.push(iceberg_predicate);
58                extracted_conjunctions.push(conjunction);
59            }
60            None => remaining_conjunctions.push(conjunction),
61        }
62    }
63
64    let iceberg_predicate = iceberg_predicates
65        .into_iter()
66        .reduce(IcebergPredicate::and)
67        .unwrap_or(IcebergPredicate::AlwaysTrue);
68
69    ExtractIcebergPredicateResult {
70        iceberg_predicate,
71        extracted_condition: Condition {
72            conjunctions: extracted_conjunctions,
73        },
74        remaining_condition: Condition {
75            conjunctions: remaining_conjunctions,
76        },
77    }
78}
79
80fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
81    let Some(scalar) = literal.get_data() else {
82        return None;
83    };
84    match scalar {
85        ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
86        ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
87        ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
88        ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
89        ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
90        ScalarImpl::Decimal(_) => {
91            // TODO(iceberg): iceberg-rust doesn't support decimal predicate pushdown yet.
92            None
93        }
94        ScalarImpl::Date(d) => {
95            let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
96                return None;
97            };
98            Some(datum)
99        }
100        ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
101            t.0.and_utc().timestamp_micros(),
102        )),
103        ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
104        ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
105        ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
106        _ => None,
107    }
108}
109
110fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
111    match expr {
112        ExprImpl::Literal(l) => match l.get_data() {
113            Some(ScalarImpl::Bool(b)) => {
114                if *b {
115                    Some(IcebergPredicate::AlwaysTrue)
116                } else {
117                    Some(IcebergPredicate::AlwaysFalse)
118                }
119            }
120            _ => None,
121        },
122        ExprImpl::FunctionCall(f) => {
123            let args = f.inputs();
124            match f.func_type() {
125                ExprType::Not => {
126                    let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
127                    Some(IcebergPredicate::negate(arg))
128                }
129                ExprType::And => {
130                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
131                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
132                    Some(IcebergPredicate::and(arg0, arg1))
133                }
134                ExprType::Or => {
135                    let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
136                    let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
137                    Some(IcebergPredicate::or(arg0, arg1))
138                }
139                ExprType::Equal if args[0].return_type() == args[1].return_type() => {
140                    match [&args[0], &args[1]] {
141                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
142                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
143                            let column_name = &fields[lhs.index].name;
144                            let reference = Reference::new(column_name);
145                            let datum = rw_literal_to_iceberg_datum(rhs)?;
146                            Some(reference.equal_to(datum))
147                        }
148                        _ => None,
149                    }
150                }
151                ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
152                    match [&args[0], &args[1]] {
153                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
154                        | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
155                            let column_name = &fields[lhs.index].name;
156                            let reference = Reference::new(column_name);
157                            let datum = rw_literal_to_iceberg_datum(rhs)?;
158                            Some(reference.not_equal_to(datum))
159                        }
160                        _ => None,
161                    }
162                }
163                ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
164                    match [&args[0], &args[1]] {
165                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
166                            let column_name = &fields[lhs.index].name;
167                            let reference = Reference::new(column_name);
168                            let datum = rw_literal_to_iceberg_datum(rhs)?;
169                            Some(reference.greater_than(datum))
170                        }
171                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
172                            let column_name = &fields[lhs.index].name;
173                            let reference = Reference::new(column_name);
174                            let datum = rw_literal_to_iceberg_datum(rhs)?;
175                            Some(reference.less_than_or_equal_to(datum))
176                        }
177                        _ => None,
178                    }
179                }
180                ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
181                    match [&args[0], &args[1]] {
182                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
183                            let column_name = &fields[lhs.index].name;
184                            let reference = Reference::new(column_name);
185                            let datum = rw_literal_to_iceberg_datum(rhs)?;
186                            Some(reference.greater_than_or_equal_to(datum))
187                        }
188                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
189                            let column_name = &fields[lhs.index].name;
190                            let reference = Reference::new(column_name);
191                            let datum = rw_literal_to_iceberg_datum(rhs)?;
192                            Some(reference.less_than(datum))
193                        }
194                        _ => None,
195                    }
196                }
197                ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
198                    match [&args[0], &args[1]] {
199                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
200                            let column_name = &fields[lhs.index].name;
201                            let reference = Reference::new(column_name);
202                            let datum = rw_literal_to_iceberg_datum(rhs)?;
203                            Some(reference.less_than(datum))
204                        }
205                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
206                            let column_name = &fields[lhs.index].name;
207                            let reference = Reference::new(column_name);
208                            let datum = rw_literal_to_iceberg_datum(rhs)?;
209                            Some(reference.greater_than_or_equal_to(datum))
210                        }
211                        _ => None,
212                    }
213                }
214                ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
215                    match [&args[0], &args[1]] {
216                        [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
217                            let column_name = &fields[lhs.index].name;
218                            let reference = Reference::new(column_name);
219                            let datum = rw_literal_to_iceberg_datum(rhs)?;
220                            Some(reference.less_than_or_equal_to(datum))
221                        }
222                        [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
223                            let column_name = &fields[lhs.index].name;
224                            let reference = Reference::new(column_name);
225                            let datum = rw_literal_to_iceberg_datum(rhs)?;
226                            Some(reference.greater_than(datum))
227                        }
228                        _ => None,
229                    }
230                }
231                ExprType::IsNull => match &args[0] {
232                    ExprImpl::InputRef(lhs) => {
233                        let column_name = &fields[lhs.index].name;
234                        let reference = Reference::new(column_name);
235                        Some(reference.is_null())
236                    }
237                    _ => None,
238                },
239                ExprType::IsNotNull => match &args[0] {
240                    ExprImpl::InputRef(lhs) => {
241                        let column_name = &fields[lhs.index].name;
242                        let reference = Reference::new(column_name);
243                        Some(reference.is_not_null())
244                    }
245                    _ => None,
246                },
247                ExprType::In => match &args[0] {
248                    ExprImpl::InputRef(lhs) => {
249                        let column_name = &fields[lhs.index].name;
250                        let reference = Reference::new(column_name);
251                        let mut datums = Vec::with_capacity(args.len() - 1);
252                        for arg in &args[1..] {
253                            if args[0].return_type() != arg.return_type() {
254                                return None;
255                            }
256                            if let ExprImpl::Literal(l) = arg {
257                                if let Some(datum) = rw_literal_to_iceberg_datum(l) {
258                                    datums.push(datum);
259                                } else {
260                                    return None;
261                                }
262                            } else {
263                                return None;
264                            }
265                        }
266                        Some(reference.is_in(datums))
267                    }
268                    _ => None,
269                },
270                _ => None,
271            }
272        }
273        _ => None,
274    }
275}