risingwave_frontend/optimizer/rule/batch/
batch_iceberg_predicate_pushdown.rs1use chrono::Datelike;
21use iceberg::expr::{Predicate as IcebergPredicate, Reference};
22use iceberg::spec::Datum as IcebergDatum;
23use risingwave_common::catalog::Field;
24use risingwave_common::types::{Decimal, ScalarImpl};
25
26use crate::expr::{Expr, ExprImpl, ExprType, Literal};
27use crate::optimizer::PlanRef;
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary};
30use crate::optimizer::rule::{BoxedRule, Rule};
31use crate::utils::Condition;
32
33pub struct BatchIcebergPredicatePushDownRule {}
38
39impl Rule for BatchIcebergPredicatePushDownRule {
40 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
41 let filter: &BatchFilter = plan.as_batch_filter()?;
42 let input = filter.input();
43 let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?;
44 assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue);
46
47 let predicate = filter.predicate().clone();
48 let (iceberg_predicate, rw_predicate) =
49 rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields());
50 let scan = scan.clone_with_predicate(iceberg_predicate);
51 if rw_predicate.always_true() {
52 Some(scan.into())
53 } else {
54 let filter = filter
55 .clone_with_input(scan.into())
56 .clone_with_predicate(rw_predicate);
57 Some(filter.into())
58 }
59 }
60}
61
62fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
63 let Some(scalar) = literal.get_data() else {
64 return None;
65 };
66 match scalar {
67 ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
68 ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
69 ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
70 ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
71 ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
72 ScalarImpl::Decimal(d) => {
73 let Decimal::Normalized(d) = d else {
74 return None;
75 };
76 let Ok(d) = IcebergDatum::decimal(*d) else {
77 return None;
78 };
79 Some(d)
80 }
81 ScalarImpl::Date(d) => {
82 let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
83 return None;
84 };
85 Some(datum)
86 }
87 ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
88 t.0.and_utc().timestamp_micros(),
89 )),
90 ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
91 ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
92 ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
93 _ => None,
94 }
95}
96
97fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
98 match expr {
99 ExprImpl::Literal(l) => match l.get_data() {
100 Some(ScalarImpl::Bool(b)) => {
101 if *b {
102 Some(IcebergPredicate::AlwaysTrue)
103 } else {
104 Some(IcebergPredicate::AlwaysFalse)
105 }
106 }
107 _ => None,
108 },
109 ExprImpl::FunctionCall(f) => {
110 let args = f.inputs();
111 match f.func_type() {
112 ExprType::Not => {
113 let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
114 Some(IcebergPredicate::negate(arg))
115 }
116 ExprType::And => {
117 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
118 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
119 Some(IcebergPredicate::and(arg0, arg1))
120 }
121 ExprType::Or => {
122 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
123 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
124 Some(IcebergPredicate::or(arg0, arg1))
125 }
126 ExprType::Equal if args[0].return_type() == args[1].return_type() => {
127 match [&args[0], &args[1]] {
128 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
129 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
130 let column_name = &fields[lhs.index].name;
131 let reference = Reference::new(column_name);
132 let datum = rw_literal_to_iceberg_datum(rhs)?;
133 Some(reference.equal_to(datum))
134 }
135 _ => None,
136 }
137 }
138 ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
139 match [&args[0], &args[1]] {
140 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
141 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
142 let column_name = &fields[lhs.index].name;
143 let reference = Reference::new(column_name);
144 let datum = rw_literal_to_iceberg_datum(rhs)?;
145 Some(reference.not_equal_to(datum))
146 }
147 _ => None,
148 }
149 }
150 ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
151 match [&args[0], &args[1]] {
152 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
153 let column_name = &fields[lhs.index].name;
154 let reference = Reference::new(column_name);
155 let datum = rw_literal_to_iceberg_datum(rhs)?;
156 Some(reference.greater_than(datum))
157 }
158 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
159 let column_name = &fields[lhs.index].name;
160 let reference = Reference::new(column_name);
161 let datum = rw_literal_to_iceberg_datum(rhs)?;
162 Some(reference.less_than_or_equal_to(datum))
163 }
164 _ => None,
165 }
166 }
167 ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
168 match [&args[0], &args[1]] {
169 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
170 let column_name = &fields[lhs.index].name;
171 let reference = Reference::new(column_name);
172 let datum = rw_literal_to_iceberg_datum(rhs)?;
173 Some(reference.greater_than_or_equal_to(datum))
174 }
175 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
176 let column_name = &fields[lhs.index].name;
177 let reference = Reference::new(column_name);
178 let datum = rw_literal_to_iceberg_datum(rhs)?;
179 Some(reference.less_than(datum))
180 }
181 _ => None,
182 }
183 }
184 ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
185 match [&args[0], &args[1]] {
186 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
187 let column_name = &fields[lhs.index].name;
188 let reference = Reference::new(column_name);
189 let datum = rw_literal_to_iceberg_datum(rhs)?;
190 Some(reference.less_than(datum))
191 }
192 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
193 let column_name = &fields[lhs.index].name;
194 let reference = Reference::new(column_name);
195 let datum = rw_literal_to_iceberg_datum(rhs)?;
196 Some(reference.greater_than_or_equal_to(datum))
197 }
198 _ => None,
199 }
200 }
201 ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
202 match [&args[0], &args[1]] {
203 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
204 let column_name = &fields[lhs.index].name;
205 let reference = Reference::new(column_name);
206 let datum = rw_literal_to_iceberg_datum(rhs)?;
207 Some(reference.less_than_or_equal_to(datum))
208 }
209 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
210 let column_name = &fields[lhs.index].name;
211 let reference = Reference::new(column_name);
212 let datum = rw_literal_to_iceberg_datum(rhs)?;
213 Some(reference.greater_than(datum))
214 }
215 _ => None,
216 }
217 }
218 ExprType::IsNull => match &args[0] {
219 ExprImpl::InputRef(lhs) => {
220 let column_name = &fields[lhs.index].name;
221 let reference = Reference::new(column_name);
222 Some(reference.is_null())
223 }
224 _ => None,
225 },
226 ExprType::IsNotNull => match &args[0] {
227 ExprImpl::InputRef(lhs) => {
228 let column_name = &fields[lhs.index].name;
229 let reference = Reference::new(column_name);
230 Some(reference.is_not_null())
231 }
232 _ => None,
233 },
234 ExprType::In => match &args[0] {
235 ExprImpl::InputRef(lhs) => {
236 let column_name = &fields[lhs.index].name;
237 let reference = Reference::new(column_name);
238 let mut datums = Vec::with_capacity(args.len() - 1);
239 for arg in &args[1..] {
240 if args[0].return_type() != arg.return_type() {
241 return None;
242 }
243 if let ExprImpl::Literal(l) = arg {
244 if let Some(datum) = rw_literal_to_iceberg_datum(l) {
245 datums.push(datum);
246 } else {
247 return None;
248 }
249 } else {
250 return None;
251 }
252 }
253 Some(reference.is_in(datums))
254 }
255 _ => None,
256 },
257 _ => None,
258 }
259 }
260 _ => None,
261 }
262}
263fn rw_predicate_to_iceberg_predicate(
264 predicate: Condition,
265 fields: &[Field],
266) -> (IcebergPredicate, Condition) {
267 if predicate.always_true() {
268 return (IcebergPredicate::AlwaysTrue, predicate);
269 }
270
271 let mut conjunctions = predicate.conjunctions;
272 let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
273
274 let mut iceberg_condition_root = None;
275 while let Some(conjunction) = conjunctions.pop() {
276 match rw_expr_to_iceberg_predicate(&conjunction, fields) {
277 iceberg_predicate @ Some(_) => {
278 iceberg_condition_root = iceberg_predicate;
279 break;
280 }
281 None => {
282 ignored_conjunctions.push(conjunction);
283 continue;
284 }
285 }
286 }
287
288 let mut iceberg_condition_root = match iceberg_condition_root {
289 Some(p) => p,
290 None => {
291 return (
292 IcebergPredicate::AlwaysTrue,
293 Condition {
294 conjunctions: ignored_conjunctions,
295 },
296 );
297 }
298 };
299
300 for rw_condition in conjunctions {
301 match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
302 Some(iceberg_predicate) => {
303 iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
304 }
305 None => ignored_conjunctions.push(rw_condition),
306 }
307 }
308 (
309 iceberg_condition_root,
310 Condition {
311 conjunctions: ignored_conjunctions,
312 },
313 )
314}
315
316impl BatchIcebergPredicatePushDownRule {
317 pub fn create() -> BoxedRule {
318 Box::new(BatchIcebergPredicatePushDownRule {})
319 }
320}