risingwave_frontend/utils/
iceberg_predicate.rs1use chrono::Datelike;
16use iceberg::expr::{Predicate as IcebergPredicate, Reference};
17use iceberg::spec::Datum as IcebergDatum;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::ScalarImpl;
20
21use crate::expr::{Expr, ExprImpl, ExprType, Literal};
22use crate::utils::Condition;
23
24pub fn to_iceberg_predicate(
29 predicate: Condition,
30 fields: &[Field],
31) -> (IcebergPredicate, Condition) {
32 if predicate.always_true() {
33 return (IcebergPredicate::AlwaysTrue, predicate);
34 }
35
36 let mut conjunctions = predicate.conjunctions;
37 let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
38
39 let mut iceberg_condition_root = None;
40 while let Some(conjunction) = conjunctions.pop() {
41 match rw_expr_to_iceberg_predicate(&conjunction, fields) {
42 iceberg_predicate @ Some(_) => {
43 iceberg_condition_root = iceberg_predicate;
44 break;
45 }
46 None => {
47 ignored_conjunctions.push(conjunction);
48 continue;
49 }
50 }
51 }
52
53 let mut iceberg_condition_root = match iceberg_condition_root {
54 Some(p) => p,
55 None => {
56 return (
57 IcebergPredicate::AlwaysTrue,
58 Condition {
59 conjunctions: ignored_conjunctions,
60 },
61 );
62 }
63 };
64
65 for rw_condition in conjunctions {
66 match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
67 Some(iceberg_predicate) => {
68 iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
69 }
70 None => ignored_conjunctions.push(rw_condition),
71 }
72 }
73 (
74 iceberg_condition_root,
75 Condition {
76 conjunctions: ignored_conjunctions,
77 },
78 )
79}
80
81fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
82 let Some(scalar) = literal.get_data() else {
83 return None;
84 };
85 match scalar {
86 ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
87 ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
88 ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
89 ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
90 ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
91 ScalarImpl::Decimal(_) => {
92 None
94 }
95 ScalarImpl::Date(d) => {
96 let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
97 return None;
98 };
99 Some(datum)
100 }
101 ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
102 t.0.and_utc().timestamp_micros(),
103 )),
104 ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
105 ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
106 ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
107 _ => None,
108 }
109}
110
111fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
112 match expr {
113 ExprImpl::Literal(l) => match l.get_data() {
114 Some(ScalarImpl::Bool(b)) => {
115 if *b {
116 Some(IcebergPredicate::AlwaysTrue)
117 } else {
118 Some(IcebergPredicate::AlwaysFalse)
119 }
120 }
121 _ => None,
122 },
123 ExprImpl::FunctionCall(f) => {
124 let args = f.inputs();
125 match f.func_type() {
126 ExprType::Not => {
127 let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
128 Some(IcebergPredicate::negate(arg))
129 }
130 ExprType::And => {
131 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
132 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
133 Some(IcebergPredicate::and(arg0, arg1))
134 }
135 ExprType::Or => {
136 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
137 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
138 Some(IcebergPredicate::or(arg0, arg1))
139 }
140 ExprType::Equal if args[0].return_type() == args[1].return_type() => {
141 match [&args[0], &args[1]] {
142 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
143 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
144 let column_name = &fields[lhs.index].name;
145 let reference = Reference::new(column_name);
146 let datum = rw_literal_to_iceberg_datum(rhs)?;
147 Some(reference.equal_to(datum))
148 }
149 _ => None,
150 }
151 }
152 ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
153 match [&args[0], &args[1]] {
154 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
155 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
156 let column_name = &fields[lhs.index].name;
157 let reference = Reference::new(column_name);
158 let datum = rw_literal_to_iceberg_datum(rhs)?;
159 Some(reference.not_equal_to(datum))
160 }
161 _ => None,
162 }
163 }
164 ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
165 match [&args[0], &args[1]] {
166 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
167 let column_name = &fields[lhs.index].name;
168 let reference = Reference::new(column_name);
169 let datum = rw_literal_to_iceberg_datum(rhs)?;
170 Some(reference.greater_than(datum))
171 }
172 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
173 let column_name = &fields[lhs.index].name;
174 let reference = Reference::new(column_name);
175 let datum = rw_literal_to_iceberg_datum(rhs)?;
176 Some(reference.less_than_or_equal_to(datum))
177 }
178 _ => None,
179 }
180 }
181 ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
182 match [&args[0], &args[1]] {
183 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
184 let column_name = &fields[lhs.index].name;
185 let reference = Reference::new(column_name);
186 let datum = rw_literal_to_iceberg_datum(rhs)?;
187 Some(reference.greater_than_or_equal_to(datum))
188 }
189 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
190 let column_name = &fields[lhs.index].name;
191 let reference = Reference::new(column_name);
192 let datum = rw_literal_to_iceberg_datum(rhs)?;
193 Some(reference.less_than(datum))
194 }
195 _ => None,
196 }
197 }
198 ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
199 match [&args[0], &args[1]] {
200 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
201 let column_name = &fields[lhs.index].name;
202 let reference = Reference::new(column_name);
203 let datum = rw_literal_to_iceberg_datum(rhs)?;
204 Some(reference.less_than(datum))
205 }
206 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
207 let column_name = &fields[lhs.index].name;
208 let reference = Reference::new(column_name);
209 let datum = rw_literal_to_iceberg_datum(rhs)?;
210 Some(reference.greater_than_or_equal_to(datum))
211 }
212 _ => None,
213 }
214 }
215 ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
216 match [&args[0], &args[1]] {
217 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
218 let column_name = &fields[lhs.index].name;
219 let reference = Reference::new(column_name);
220 let datum = rw_literal_to_iceberg_datum(rhs)?;
221 Some(reference.less_than_or_equal_to(datum))
222 }
223 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
224 let column_name = &fields[lhs.index].name;
225 let reference = Reference::new(column_name);
226 let datum = rw_literal_to_iceberg_datum(rhs)?;
227 Some(reference.greater_than(datum))
228 }
229 _ => None,
230 }
231 }
232 ExprType::IsNull => match &args[0] {
233 ExprImpl::InputRef(lhs) => {
234 let column_name = &fields[lhs.index].name;
235 let reference = Reference::new(column_name);
236 Some(reference.is_null())
237 }
238 _ => None,
239 },
240 ExprType::IsNotNull => match &args[0] {
241 ExprImpl::InputRef(lhs) => {
242 let column_name = &fields[lhs.index].name;
243 let reference = Reference::new(column_name);
244 Some(reference.is_not_null())
245 }
246 _ => None,
247 },
248 ExprType::In => match &args[0] {
249 ExprImpl::InputRef(lhs) => {
250 let column_name = &fields[lhs.index].name;
251 let reference = Reference::new(column_name);
252 let mut datums = Vec::with_capacity(args.len() - 1);
253 for arg in &args[1..] {
254 if args[0].return_type() != arg.return_type() {
255 return None;
256 }
257 if let ExprImpl::Literal(l) = arg {
258 if let Some(datum) = rw_literal_to_iceberg_datum(l) {
259 datums.push(datum);
260 } else {
261 return None;
262 }
263 } else {
264 return None;
265 }
266 }
267 Some(reference.is_in(datums))
268 }
269 _ => None,
270 },
271 _ => None,
272 }
273 }
274 _ => None,
275 }
276}