risingwave_frontend/utils/
iceberg_predicate.rs1use chrono::Datelike;
16use iceberg::expr::{Predicate as IcebergPredicate, Reference};
17use iceberg::spec::Datum as IcebergDatum;
18use risingwave_common::catalog::Field;
19use risingwave_common::types::ScalarImpl;
20
21use crate::expr::{Expr, ExprImpl, ExprType, Literal};
22use crate::utils::Condition;
23
24pub struct ExtractIcebergPredicateResult {
25 pub iceberg_predicate: IcebergPredicate,
26 pub extracted_condition: Condition,
27 pub remaining_condition: Condition,
28}
29
30pub fn extract_iceberg_predicate(
35 predicate: Condition,
36 fields: &[Field],
37) -> ExtractIcebergPredicateResult {
38 if predicate.always_true() {
39 return ExtractIcebergPredicateResult {
40 iceberg_predicate: IcebergPredicate::AlwaysTrue,
41 extracted_condition: Condition {
42 conjunctions: vec![],
43 },
44 remaining_condition: Condition {
45 conjunctions: vec![],
46 },
47 };
48 }
49
50 let mut iceberg_predicates = Vec::new();
51 let mut extracted_conjunctions = Vec::new();
52 let mut remaining_conjunctions = Vec::new();
53
54 for conjunction in predicate.conjunctions {
55 match rw_expr_to_iceberg_predicate(&conjunction, fields) {
56 Some(iceberg_predicate) => {
57 iceberg_predicates.push(iceberg_predicate);
58 extracted_conjunctions.push(conjunction);
59 }
60 None => remaining_conjunctions.push(conjunction),
61 }
62 }
63
64 let iceberg_predicate = iceberg_predicates
65 .into_iter()
66 .reduce(IcebergPredicate::and)
67 .unwrap_or(IcebergPredicate::AlwaysTrue);
68
69 ExtractIcebergPredicateResult {
70 iceberg_predicate,
71 extracted_condition: Condition {
72 conjunctions: extracted_conjunctions,
73 },
74 remaining_condition: Condition {
75 conjunctions: remaining_conjunctions,
76 },
77 }
78}
79
80fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
81 let Some(scalar) = literal.get_data() else {
82 return None;
83 };
84 match scalar {
85 ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
86 ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
87 ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
88 ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
89 ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
90 ScalarImpl::Decimal(_) => {
91 None
93 }
94 ScalarImpl::Date(d) => {
95 let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
96 return None;
97 };
98 Some(datum)
99 }
100 ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
101 t.0.and_utc().timestamp_micros(),
102 )),
103 ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
104 ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
105 ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
106 _ => None,
107 }
108}
109
110fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
111 match expr {
112 ExprImpl::Literal(l) => match l.get_data() {
113 Some(ScalarImpl::Bool(b)) => {
114 if *b {
115 Some(IcebergPredicate::AlwaysTrue)
116 } else {
117 Some(IcebergPredicate::AlwaysFalse)
118 }
119 }
120 _ => None,
121 },
122 ExprImpl::FunctionCall(f) => {
123 let args = f.inputs();
124 match f.func_type() {
125 ExprType::Not => {
126 let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
127 Some(IcebergPredicate::negate(arg))
128 }
129 ExprType::And => {
130 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
131 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
132 Some(IcebergPredicate::and(arg0, arg1))
133 }
134 ExprType::Or => {
135 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
136 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
137 Some(IcebergPredicate::or(arg0, arg1))
138 }
139 ExprType::Equal if args[0].return_type() == args[1].return_type() => {
140 match [&args[0], &args[1]] {
141 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
142 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
143 let column_name = &fields[lhs.index].name;
144 let reference = Reference::new(column_name);
145 let datum = rw_literal_to_iceberg_datum(rhs)?;
146 Some(reference.equal_to(datum))
147 }
148 _ => None,
149 }
150 }
151 ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
152 match [&args[0], &args[1]] {
153 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
154 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
155 let column_name = &fields[lhs.index].name;
156 let reference = Reference::new(column_name);
157 let datum = rw_literal_to_iceberg_datum(rhs)?;
158 Some(reference.not_equal_to(datum))
159 }
160 _ => None,
161 }
162 }
163 ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
164 match [&args[0], &args[1]] {
165 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
166 let column_name = &fields[lhs.index].name;
167 let reference = Reference::new(column_name);
168 let datum = rw_literal_to_iceberg_datum(rhs)?;
169 Some(reference.greater_than(datum))
170 }
171 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
172 let column_name = &fields[lhs.index].name;
173 let reference = Reference::new(column_name);
174 let datum = rw_literal_to_iceberg_datum(rhs)?;
175 Some(reference.less_than_or_equal_to(datum))
176 }
177 _ => None,
178 }
179 }
180 ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
181 match [&args[0], &args[1]] {
182 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
183 let column_name = &fields[lhs.index].name;
184 let reference = Reference::new(column_name);
185 let datum = rw_literal_to_iceberg_datum(rhs)?;
186 Some(reference.greater_than_or_equal_to(datum))
187 }
188 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
189 let column_name = &fields[lhs.index].name;
190 let reference = Reference::new(column_name);
191 let datum = rw_literal_to_iceberg_datum(rhs)?;
192 Some(reference.less_than(datum))
193 }
194 _ => None,
195 }
196 }
197 ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
198 match [&args[0], &args[1]] {
199 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
200 let column_name = &fields[lhs.index].name;
201 let reference = Reference::new(column_name);
202 let datum = rw_literal_to_iceberg_datum(rhs)?;
203 Some(reference.less_than(datum))
204 }
205 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
206 let column_name = &fields[lhs.index].name;
207 let reference = Reference::new(column_name);
208 let datum = rw_literal_to_iceberg_datum(rhs)?;
209 Some(reference.greater_than_or_equal_to(datum))
210 }
211 _ => None,
212 }
213 }
214 ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
215 match [&args[0], &args[1]] {
216 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
217 let column_name = &fields[lhs.index].name;
218 let reference = Reference::new(column_name);
219 let datum = rw_literal_to_iceberg_datum(rhs)?;
220 Some(reference.less_than_or_equal_to(datum))
221 }
222 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
223 let column_name = &fields[lhs.index].name;
224 let reference = Reference::new(column_name);
225 let datum = rw_literal_to_iceberg_datum(rhs)?;
226 Some(reference.greater_than(datum))
227 }
228 _ => None,
229 }
230 }
231 ExprType::IsNull => match &args[0] {
232 ExprImpl::InputRef(lhs) => {
233 let column_name = &fields[lhs.index].name;
234 let reference = Reference::new(column_name);
235 Some(reference.is_null())
236 }
237 _ => None,
238 },
239 ExprType::IsNotNull => match &args[0] {
240 ExprImpl::InputRef(lhs) => {
241 let column_name = &fields[lhs.index].name;
242 let reference = Reference::new(column_name);
243 Some(reference.is_not_null())
244 }
245 _ => None,
246 },
247 ExprType::In => match &args[0] {
248 ExprImpl::InputRef(lhs) => {
249 let column_name = &fields[lhs.index].name;
250 let reference = Reference::new(column_name);
251 let mut datums = Vec::with_capacity(args.len() - 1);
252 for arg in &args[1..] {
253 if args[0].return_type() != arg.return_type() {
254 return None;
255 }
256 if let ExprImpl::Literal(l) = arg {
257 if let Some(datum) = rw_literal_to_iceberg_datum(l) {
258 datums.push(datum);
259 } else {
260 return None;
261 }
262 } else {
263 return None;
264 }
265 }
266 Some(reference.is_in(datums))
267 }
268 _ => None,
269 },
270 _ => None,
271 }
272 }
273 _ => None,
274 }
275}