risingwave_frontend/optimizer/rule/batch/
batch_iceberg_predicate_pushdown.rs1use chrono::Datelike;
21use iceberg::expr::{Predicate as IcebergPredicate, Reference};
22use iceberg::spec::Datum as IcebergDatum;
23use risingwave_common::catalog::Field;
24use risingwave_common::types::ScalarImpl;
25
26use crate::expr::{Expr, ExprImpl, ExprType, Literal};
27use crate::optimizer::PlanRef;
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{BatchFilter, BatchIcebergScan, PlanTreeNodeUnary};
30use crate::optimizer::rule::{BoxedRule, Rule};
31use crate::utils::Condition;
32
33pub struct BatchIcebergPredicatePushDownRule {}
38
39impl Rule for BatchIcebergPredicatePushDownRule {
40 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
41 let filter: &BatchFilter = plan.as_batch_filter()?;
42 let input = filter.input();
43 let scan: &BatchIcebergScan = input.as_batch_iceberg_scan()?;
44 assert_eq!(scan.predicate, IcebergPredicate::AlwaysTrue);
46
47 let predicate = filter.predicate().clone();
48 let (iceberg_predicate, rw_predicate) =
49 rw_predicate_to_iceberg_predicate(predicate, scan.schema().fields());
50 let scan = scan.clone_with_predicate(iceberg_predicate);
51 if rw_predicate.always_true() {
52 Some(scan.into())
53 } else {
54 let filter = filter
55 .clone_with_input(scan.into())
56 .clone_with_predicate(rw_predicate);
57 Some(filter.into())
58 }
59 }
60}
61
62fn rw_literal_to_iceberg_datum(literal: &Literal) -> Option<IcebergDatum> {
63 let Some(scalar) = literal.get_data() else {
64 return None;
65 };
66 match scalar {
67 ScalarImpl::Bool(b) => Some(IcebergDatum::bool(*b)),
68 ScalarImpl::Int32(i) => Some(IcebergDatum::int(*i)),
69 ScalarImpl::Int64(i) => Some(IcebergDatum::long(*i)),
70 ScalarImpl::Float32(f) => Some(IcebergDatum::float(*f)),
71 ScalarImpl::Float64(f) => Some(IcebergDatum::double(*f)),
72 ScalarImpl::Decimal(_) => {
73 None
75 }
76 ScalarImpl::Date(d) => {
77 let Ok(datum) = IcebergDatum::date_from_ymd(d.0.year(), d.0.month(), d.0.day()) else {
78 return None;
79 };
80 Some(datum)
81 }
82 ScalarImpl::Timestamp(t) => Some(IcebergDatum::timestamp_micros(
83 t.0.and_utc().timestamp_micros(),
84 )),
85 ScalarImpl::Timestamptz(t) => Some(IcebergDatum::timestamptz_micros(t.timestamp_micros())),
86 ScalarImpl::Utf8(s) => Some(IcebergDatum::string(s)),
87 ScalarImpl::Bytea(b) => Some(IcebergDatum::binary(b.clone())),
88 _ => None,
89 }
90}
91
92fn rw_expr_to_iceberg_predicate(expr: &ExprImpl, fields: &[Field]) -> Option<IcebergPredicate> {
93 match expr {
94 ExprImpl::Literal(l) => match l.get_data() {
95 Some(ScalarImpl::Bool(b)) => {
96 if *b {
97 Some(IcebergPredicate::AlwaysTrue)
98 } else {
99 Some(IcebergPredicate::AlwaysFalse)
100 }
101 }
102 _ => None,
103 },
104 ExprImpl::FunctionCall(f) => {
105 let args = f.inputs();
106 match f.func_type() {
107 ExprType::Not => {
108 let arg = rw_expr_to_iceberg_predicate(&args[0], fields)?;
109 Some(IcebergPredicate::negate(arg))
110 }
111 ExprType::And => {
112 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
113 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
114 Some(IcebergPredicate::and(arg0, arg1))
115 }
116 ExprType::Or => {
117 let arg0 = rw_expr_to_iceberg_predicate(&args[0], fields)?;
118 let arg1 = rw_expr_to_iceberg_predicate(&args[1], fields)?;
119 Some(IcebergPredicate::or(arg0, arg1))
120 }
121 ExprType::Equal if args[0].return_type() == args[1].return_type() => {
122 match [&args[0], &args[1]] {
123 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
124 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
125 let column_name = &fields[lhs.index].name;
126 let reference = Reference::new(column_name);
127 let datum = rw_literal_to_iceberg_datum(rhs)?;
128 Some(reference.equal_to(datum))
129 }
130 _ => None,
131 }
132 }
133 ExprType::NotEqual if args[0].return_type() == args[1].return_type() => {
134 match [&args[0], &args[1]] {
135 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)]
136 | [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
137 let column_name = &fields[lhs.index].name;
138 let reference = Reference::new(column_name);
139 let datum = rw_literal_to_iceberg_datum(rhs)?;
140 Some(reference.not_equal_to(datum))
141 }
142 _ => None,
143 }
144 }
145 ExprType::GreaterThan if args[0].return_type() == args[1].return_type() => {
146 match [&args[0], &args[1]] {
147 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
148 let column_name = &fields[lhs.index].name;
149 let reference = Reference::new(column_name);
150 let datum = rw_literal_to_iceberg_datum(rhs)?;
151 Some(reference.greater_than(datum))
152 }
153 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
154 let column_name = &fields[lhs.index].name;
155 let reference = Reference::new(column_name);
156 let datum = rw_literal_to_iceberg_datum(rhs)?;
157 Some(reference.less_than_or_equal_to(datum))
158 }
159 _ => None,
160 }
161 }
162 ExprType::GreaterThanOrEqual if args[0].return_type() == args[1].return_type() => {
163 match [&args[0], &args[1]] {
164 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
165 let column_name = &fields[lhs.index].name;
166 let reference = Reference::new(column_name);
167 let datum = rw_literal_to_iceberg_datum(rhs)?;
168 Some(reference.greater_than_or_equal_to(datum))
169 }
170 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
171 let column_name = &fields[lhs.index].name;
172 let reference = Reference::new(column_name);
173 let datum = rw_literal_to_iceberg_datum(rhs)?;
174 Some(reference.less_than(datum))
175 }
176 _ => None,
177 }
178 }
179 ExprType::LessThan if args[0].return_type() == args[1].return_type() => {
180 match [&args[0], &args[1]] {
181 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
182 let column_name = &fields[lhs.index].name;
183 let reference = Reference::new(column_name);
184 let datum = rw_literal_to_iceberg_datum(rhs)?;
185 Some(reference.less_than(datum))
186 }
187 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
188 let column_name = &fields[lhs.index].name;
189 let reference = Reference::new(column_name);
190 let datum = rw_literal_to_iceberg_datum(rhs)?;
191 Some(reference.greater_than_or_equal_to(datum))
192 }
193 _ => None,
194 }
195 }
196 ExprType::LessThanOrEqual if args[0].return_type() == args[1].return_type() => {
197 match [&args[0], &args[1]] {
198 [ExprImpl::InputRef(lhs), ExprImpl::Literal(rhs)] => {
199 let column_name = &fields[lhs.index].name;
200 let reference = Reference::new(column_name);
201 let datum = rw_literal_to_iceberg_datum(rhs)?;
202 Some(reference.less_than_or_equal_to(datum))
203 }
204 [ExprImpl::Literal(rhs), ExprImpl::InputRef(lhs)] => {
205 let column_name = &fields[lhs.index].name;
206 let reference = Reference::new(column_name);
207 let datum = rw_literal_to_iceberg_datum(rhs)?;
208 Some(reference.greater_than(datum))
209 }
210 _ => None,
211 }
212 }
213 ExprType::IsNull => match &args[0] {
214 ExprImpl::InputRef(lhs) => {
215 let column_name = &fields[lhs.index].name;
216 let reference = Reference::new(column_name);
217 Some(reference.is_null())
218 }
219 _ => None,
220 },
221 ExprType::IsNotNull => match &args[0] {
222 ExprImpl::InputRef(lhs) => {
223 let column_name = &fields[lhs.index].name;
224 let reference = Reference::new(column_name);
225 Some(reference.is_not_null())
226 }
227 _ => None,
228 },
229 ExprType::In => match &args[0] {
230 ExprImpl::InputRef(lhs) => {
231 let column_name = &fields[lhs.index].name;
232 let reference = Reference::new(column_name);
233 let mut datums = Vec::with_capacity(args.len() - 1);
234 for arg in &args[1..] {
235 if args[0].return_type() != arg.return_type() {
236 return None;
237 }
238 if let ExprImpl::Literal(l) = arg {
239 if let Some(datum) = rw_literal_to_iceberg_datum(l) {
240 datums.push(datum);
241 } else {
242 return None;
243 }
244 } else {
245 return None;
246 }
247 }
248 Some(reference.is_in(datums))
249 }
250 _ => None,
251 },
252 _ => None,
253 }
254 }
255 _ => None,
256 }
257}
258fn rw_predicate_to_iceberg_predicate(
259 predicate: Condition,
260 fields: &[Field],
261) -> (IcebergPredicate, Condition) {
262 if predicate.always_true() {
263 return (IcebergPredicate::AlwaysTrue, predicate);
264 }
265
266 let mut conjunctions = predicate.conjunctions;
267 let mut ignored_conjunctions: Vec<ExprImpl> = Vec::with_capacity(conjunctions.len());
268
269 let mut iceberg_condition_root = None;
270 while let Some(conjunction) = conjunctions.pop() {
271 match rw_expr_to_iceberg_predicate(&conjunction, fields) {
272 iceberg_predicate @ Some(_) => {
273 iceberg_condition_root = iceberg_predicate;
274 break;
275 }
276 None => {
277 ignored_conjunctions.push(conjunction);
278 continue;
279 }
280 }
281 }
282
283 let mut iceberg_condition_root = match iceberg_condition_root {
284 Some(p) => p,
285 None => {
286 return (
287 IcebergPredicate::AlwaysTrue,
288 Condition {
289 conjunctions: ignored_conjunctions,
290 },
291 );
292 }
293 };
294
295 for rw_condition in conjunctions {
296 match rw_expr_to_iceberg_predicate(&rw_condition, fields) {
297 Some(iceberg_predicate) => {
298 iceberg_condition_root = iceberg_condition_root.and(iceberg_predicate)
299 }
300 None => ignored_conjunctions.push(rw_condition),
301 }
302 }
303 (
304 iceberg_condition_root,
305 Condition {
306 conjunctions: ignored_conjunctions,
307 },
308 )
309}
310
311impl BatchIcebergPredicatePushDownRule {
312 pub fn create() -> BoxedRule {
313 Box::new(BatchIcebergPredicatePushDownRule {})
314 }
315}