risingwave_frontend/optimizer/rule/batch/
batch_iceberg_predicate_pushdown.rs1use chrono::Datelike;
21use iceberg::expr::{Predicate as IcebergPredicate, Reference};
22use iceberg::spec::Datum as IcebergDatum;
23use risingwave_common::catalog::Field;
24use risingwave_common::types::ScalarImpl;
25
26use super::prelude::*;
27use crate::expr::{Expr, ExprImpl, ExprType, Literal};
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary};
30use crate::utils::Condition;
31
32pub struct BatchIcebergPredicatePushDownRule {}
37
38impl Rule<Batch> for BatchIcebergPredicatePushDownRule {
39 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
40 let filter: &BatchFilter = plan.as_batch_filter()?;
41 let input = filter.input();
42 let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?;
43 assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue);
45
46 let predicate = filter.predicate().clone();
47 let (iceberg_predicate, rw_predicate) =
48 rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields());
49 let scan = scan.clone_with_predicate(iceberg_predicate);
50 if rw_predicate.always_true() {
51 Some(scan.into())
52 } else {
53 let filter = filter
54 .clone_with_input(scan.into())
55 .clone_with_predicate(rw_predicate);
56 Some(filter.into())
57 }
58 }
59}
60
61fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
62 let Some(scalar) = literal.get_data() else {
63 return None;
64 };
65 match scalar {
66 ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
67 ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
68 ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
69 ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
70 ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
71 ScalarImpl::Decimal(_) => {
72 None
74 }
75 ScalarImpl::Date(d) => {
76 let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
77 return None;
78 };
79 Some(datum)
80 }
81 ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
82 t.0.and_utc().timestamp_micros(),
83 )),
84 ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
85 ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
86 ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
87 _ => None,
88 }
89}
90
91fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
92 match expr {
93 ExprImpl::Literal(l) => match l.get_data() {
94 Some(ScalarImpl::Bool(b)) => {
95 if *b {
96 Some(IcebergPredicate::AlwaysTrue)
97 } else {
98 Some(IcebergPredicate::AlwaysFalse)
99 }
100 }
101 _ => None,
102 },
103 ExprImpl::FunctionCall(f) => {
104 let args = f.inputs();
105 match f.func_type() {
106 ExprType::Not => {
107 let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
108 Some(IcebergPredicate::negate(arg))
109 }
110 ExprType::And => {
111 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
112 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
113 Some(IcebergPredicate::and(arg0, arg1))
114 }
115 ExprType::Or => {
116 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
117 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
118 Some(IcebergPredicate::or(arg0, arg1))
119 }
120 ExprType::Equal if args[0].return_type() == args[1].return_type() => {
121 match [&args[0], &args[1]] {
122 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
123 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
124 let column_name = &fields[lhs.index].name;
125 let reference = Reference::new(column_name);
126 let datum = rw_literal_to_iceberg_datum(rhs)?;
127 Some(reference.equal_to(datum))
128 }
129 _ => None,
130 }
131 }
132 ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
133 match [&args[0], &args[1]] {
134 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
135 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
136 let column_name = &fields[lhs.index].name;
137 let reference = Reference::new(column_name);
138 let datum = rw_literal_to_iceberg_datum(rhs)?;
139 Some(reference.not_equal_to(datum))
140 }
141 _ => None,
142 }
143 }
144 ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
145 match [&args[0], &args[1]] {
146 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
147 let column_name = &fields[lhs.index].name;
148 let reference = Reference::new(column_name);
149 let datum = rw_literal_to_iceberg_datum(rhs)?;
150 Some(reference.greater_than(datum))
151 }
152 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
153 let column_name = &fields[lhs.index].name;
154 let reference = Reference::new(column_name);
155 let datum = rw_literal_to_iceberg_datum(rhs)?;
156 Some(reference.less_than_or_equal_to(datum))
157 }
158 _ => None,
159 }
160 }
161 ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
162 match [&args[0], &args[1]] {
163 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
164 let column_name = &fields[lhs.index].name;
165 let reference = Reference::new(column_name);
166 let datum = rw_literal_to_iceberg_datum(rhs)?;
167 Some(reference.greater_than_or_equal_to(datum))
168 }
169 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
170 let column_name = &fields[lhs.index].name;
171 let reference = Reference::new(column_name);
172 let datum = rw_literal_to_iceberg_datum(rhs)?;
173 Some(reference.less_than(datum))
174 }
175 _ => None,
176 }
177 }
178 ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
179 match [&args[0], &args[1]] {
180 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
181 let column_name = &fields[lhs.index].name;
182 let reference = Reference::new(column_name);
183 let datum = rw_literal_to_iceberg_datum(rhs)?;
184 Some(reference.less_than(datum))
185 }
186 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
187 let column_name = &fields[lhs.index].name;
188 let reference = Reference::new(column_name);
189 let datum = rw_literal_to_iceberg_datum(rhs)?;
190 Some(reference.greater_than_or_equal_to(datum))
191 }
192 _ => None,
193 }
194 }
195 ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
196 match [&args[0], &args[1]] {
197 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
198 let column_name = &fields[lhs.index].name;
199 let reference = Reference::new(column_name);
200 let datum = rw_literal_to_iceberg_datum(rhs)?;
201 Some(reference.less_than_or_equal_to(datum))
202 }
203 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
204 let column_name = &fields[lhs.index].name;
205 let reference = Reference::new(column_name);
206 let datum = rw_literal_to_iceberg_datum(rhs)?;
207 Some(reference.greater_than(datum))
208 }
209 _ => None,
210 }
211 }
212 ExprType::IsNull => match &args[0] {
213 ExprImpl::InputRef(lhs) => {
214 let column_name = &fields[lhs.index].name;
215 let reference = Reference::new(column_name);
216 Some(reference.is_null())
217 }
218 _ => None,
219 },
220 ExprType::IsNotNull => match &args[0] {
221 ExprImpl::InputRef(lhs) => {
222 let column_name = &fields[lhs.index].name;
223 let reference = Reference::new(column_name);
224 Some(reference.is_not_null())
225 }
226 _ => None,
227 },
228 ExprType::In => match &args[0] {
229 ExprImpl::InputRef(lhs) => {
230 let column_name = &fields[lhs.index].name;
231 let reference = Reference::new(column_name);
232 let mut datums = Vec::with_capacity(args.len() - 1);
233 for arg in &args[1..] {
234 if args[0].return_type() != arg.return_type() {
235 return None;
236 }
237 if let ExprImpl::Literal(l) = arg {
238 if let Some(datum) = rw_literal_to_iceberg_datum(l) {
239 datums.push(datum);
240 } else {
241 return None;
242 }
243 } else {
244 return None;
245 }
246 }
247 Some(reference.is_in(datums))
248 }
249 _ => None,
250 },
251 _ => None,
252 }
253 }
254 _ => None,
255 }
256}
257fn rw_predicate_to_iceberg_predicate(
258 predicate: Condition,
259 fields: &[Field],
260) -> (IcebergPredicate, Condition) {
261 if predicate.always_true() {
262 return (IcebergPredicate::AlwaysTrue, predicate);
263 }
264
265 let mut conjunctions = predicate.conjunctions;
266 let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
267
268 let mut iceberg_condition_root = None;
269 while let Some(conjunction) = conjunctions.pop() {
270 match rw_expr_to_iceberg_predicate(&conjunction, fields) {
271 iceberg_predicate @ Some(_) => {
272 iceberg_condition_root = iceberg_predicate;
273 break;
274 }
275 None => {
276 ignored_conjunctions.push(conjunction);
277 continue;
278 }
279 }
280 }
281
282 let mut iceberg_condition_root = match iceberg_condition_root {
283 Some(p) => p,
284 None => {
285 return (
286 IcebergPredicate::AlwaysTrue,
287 Condition {
288 conjunctions: ignored_conjunctions,
289 },
290 );
291 }
292 };
293
294 for rw_condition in conjunctions {
295 match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
296 Some(iceberg_predicate) => {
297 iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
298 }
299 None => ignored_conjunctions.push(rw_condition),
300 }
301 }
302 (
303 iceberg_condition_root,
304 Condition {
305 conjunctions: ignored_conjunctions,
306 },
307 )
308}
309
310impl BatchIcebergPredicatePushDownRule {
311 pub fn create() -> BoxedRule {
312 Box::new(BatchIcebergPredicatePushDownRule {})
313 }
314}