risingwave_frontend/optimizer/rule/
iceberg_intermediate_scan_rule.rs1use std::collections::HashMap;
34
35use anyhow::Context;
36use iceberg::scan::FileScanTask;
37use iceberg::spec::{DataContentType, FormatVersion};
38use risingwave_common::catalog::{
39 ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
40 ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
41};
42use risingwave_common::util::iter_util::ZipEqFast;
43use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergSplitEnumerator};
44use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
45
46use super::prelude::{PlanRef, *};
47use crate::error::Result;
48use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
49use crate::optimizer::plan_node::generic::GenericPlanRef;
50use crate::optimizer::plan_node::{
51 Logical, LogicalIcebergIntermediateScan, LogicalIcebergScan, LogicalJoin, LogicalProject,
52 LogicalValues,
53};
54use crate::optimizer::rule::{ApplyResult, FallibleRule};
55use crate::utils::{ColIndexMapping, Condition, FRONTEND_RUNTIME};
56
57pub struct IcebergIntermediateScanRule;
58
59impl FallibleRule<Logical> for IcebergIntermediateScanRule {
60 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
61 let scan: &LogicalIcebergIntermediateScan =
62 match plan.as_logical_iceberg_intermediate_scan() {
63 Some(s) => s,
64 None => return ApplyResult::NotApplicable,
65 };
66
67 let Some(catalog) = scan.source_catalog() else {
68 return ApplyResult::NotApplicable;
69 };
70
71 let enumerator = if let ConnectorProperties::Iceberg(prop) =
73 ConnectorProperties::extract(catalog.with_properties.clone(), false)?
74 {
75 IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
76 } else {
77 return ApplyResult::NotApplicable;
78 };
79
80 #[cfg(madsim)]
81 return ApplyResult::Err(
82 crate::error::ErrorCode::BindError(
83 "iceberg_scan can't be used in the madsim mode".to_string(),
84 )
85 .into(),
86 );
87
88 #[cfg(not(madsim))]
89 {
90 use risingwave_connector::source::iceberg::IcebergListResult;
91
92 let list_result = tokio::task::block_in_place(|| {
93 FRONTEND_RUNTIME.block_on(enumerator.list_scan_tasks(
94 Some(scan.time_travel_info.clone()),
95 scan.iceberg_predicate.clone(),
96 ))
97 })?;
98 let Some(IcebergListResult {
99 mut data_files,
100 mut equality_delete_files,
101 mut position_delete_files,
102 equality_delete_columns,
103 format_version,
104 schema: table_schema,
105 }) = list_result
106 else {
107 tracing::info!(
108 "There is no valid snapshot for the Iceberg table, returning empty table plan"
109 );
110 return ApplyResult::Ok(empty_table_plan(&plan, scan));
111 };
112 if data_files.is_empty() {
113 tracing::info!(
114 "There is no data file for the Iceberg table, returning empty table plan"
115 );
116 return ApplyResult::Ok(empty_table_plan(&plan, scan));
117 }
118
119 let mut projection_columns: Vec<&str> = scan
121 .output_columns()
122 .chain(equality_delete_columns.iter().map(|s| s.as_str()))
123 .collect();
124 projection_columns.sort_unstable_by_key(|&s| table_schema.field_id_by_name(s));
125 projection_columns.dedup();
126 set_project_field_ids(
127 &mut data_files,
128 table_schema.as_ref(),
129 projection_columns.iter(),
130 )?;
131 match format_version {
132 FormatVersion::V1 | FormatVersion::V2 => {
133 for file in &mut data_files {
134 file.deletes.clear();
135 }
136 }
137 FormatVersion::V3 => {
138 for file in &mut data_files {
139 file.deletes.retain(|delete| {
140 delete.data_file_content == DataContentType::PositionDeletes
141 });
142 }
143 }
144 }
145
146 let column_catalog_map: HashMap<&str, &ColumnCatalog> = catalog
147 .columns
148 .iter()
149 .map(|c| (c.column_desc.name.as_str(), c))
150 .collect();
151 if !equality_delete_files.is_empty() {
152 projection_columns.push(ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
153 }
154 let use_position_delete_join =
155 !position_delete_files.is_empty() && format_version < FormatVersion::V3;
156 if use_position_delete_join {
157 projection_columns.push(ICEBERG_FILE_PATH_COLUMN_NAME);
158 projection_columns.push(ICEBERG_FILE_POS_COLUMN_NAME);
159 }
160 let column_catalogs =
161 build_column_catalogs(projection_columns.iter(), &column_catalog_map)?;
162 let core = scan.core.clone_with_column_catalog(column_catalogs);
163 let mut plan: PlanRef =
164 LogicalIcebergScan::new(core, IcebergFileScanTask::Data(data_files)).into();
165
166 if !equality_delete_files.is_empty() {
168 set_project_field_ids(
169 &mut equality_delete_files,
170 table_schema.as_ref(),
171 equality_delete_columns.iter(),
172 )?;
173 plan = build_equality_delete_hashjoin_scan(
174 scan,
175 &column_catalog_map,
176 plan,
177 equality_delete_files,
178 equality_delete_columns,
179 )?;
180 }
181
182 if use_position_delete_join {
184 set_project_field_ids(
185 &mut position_delete_files,
186 table_schema.as_ref(),
187 std::iter::empty::<&str>(),
188 )?;
189 plan = build_position_delete_hashjoin_scan(
190 scan,
191 &column_catalog_map,
192 plan,
193 position_delete_files,
194 )?;
195 }
196
197 let schema_len = plan.schema().len();
199 let schema_names = plan.schema().fields.iter().map(|f| f.name.as_str());
200 let output_columns = scan.output_columns();
201 if schema_len != output_columns.len() || !itertools::equal(schema_names, output_columns)
202 {
203 let col_map: HashMap<&str, usize> = plan
204 .schema()
205 .fields
206 .iter()
207 .enumerate()
208 .map(|(idx, field)| (field.name.as_str(), idx))
209 .collect();
210 let output_col_idx: Vec<_> = scan
211 .output_columns()
212 .map(|col| {
213 col_map.get(col).copied().with_context(|| {
214 format!("Output column {} not found in scan schema", col)
215 })
216 })
217 .try_collect()?;
218 let mapping = ColIndexMapping::with_remaining_columns(&output_col_idx, schema_len);
219 plan = LogicalProject::with_mapping(plan, mapping).into();
220 }
221
222 if scan.has_type_mapping() {
227 let cast_exprs: Vec<ExprImpl> = plan
228 .schema()
229 .fields
230 .iter()
231 .enumerate()
232 .map(|(i, field)| {
233 let input_ref: ExprImpl = InputRef::new(i, field.data_type.clone()).into();
234 if let Some(target_type) = scan.table_column_type_mapping.get(&field.name) {
235 if &field.data_type != target_type {
236 match input_ref.cast_explicit(target_type) {
237 Ok(casted) => casted,
238 Err(_) => InputRef::new(i, field.data_type.clone()).into(),
239 }
240 } else {
241 input_ref
242 }
243 } else {
244 input_ref
245 }
246 })
247 .collect();
248 plan = LogicalProject::create(plan, cast_exprs);
249 }
250
251 ApplyResult::Ok(plan)
252 }
253 }
254}
255
256impl IcebergIntermediateScanRule {
257 pub fn create() -> BoxedRule {
258 Box::new(IcebergIntermediateScanRule)
259 }
260}
261
262fn empty_table_plan(plan: &PlanRef, scan: &LogicalIcebergIntermediateScan) -> PlanRef {
264 LogicalValues::new(vec![], scan.schema().clone(), plan.ctx()).into()
265}
266
267fn build_column_catalogs(
269 column_names: impl Iterator<Item = impl AsRef<str>>,
270 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
271) -> Result<Vec<ColumnCatalog>> {
272 let res = column_names
273 .map(|name| {
274 let name = name.as_ref();
275 column_catalog_map
276 .get(name)
277 .map(|&c| c.clone())
278 .with_context(|| format!("Column catalog not found for column {}", name))
279 })
280 .try_collect()?;
281 Ok(res)
282}
283
284fn set_project_field_ids(
286 files: &mut [FileScanTask],
287 schema: &iceberg::spec::Schema,
288 column_names: impl Iterator<Item = impl AsRef<str>>,
289) -> Result<()> {
290 let project_field_ids: Vec<i32> = column_names
291 .map(|name| {
292 let name = name.as_ref();
293 schema
294 .field_id_by_name(name)
295 .with_context(|| format!("Column {} not found in table schema", name))
296 })
297 .try_collect()?;
298 for file in files {
299 file.project_field_ids = project_field_ids.clone();
300 }
301 Ok(())
302}
303
304fn build_equal_conditions(
306 left_inputs: Vec<InputRef>,
307 right_inputs: Vec<InputRef>,
308) -> Result<Vec<ExprImpl>> {
309 left_inputs
310 .into_iter()
311 .zip_eq_fast(right_inputs.into_iter())
312 .map(|(left, right)| {
313 Ok(FunctionCall::new(ExprType::Equal, vec![left.into(), right.into()])?.into())
314 })
315 .collect()
316}
317
318pub fn build_equality_delete_hashjoin_scan(
319 scan: &LogicalIcebergIntermediateScan,
320 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
321 child: PlanRef,
322 equality_delete_files: Vec<FileScanTask>,
323 equality_delete_columns: Vec<String>,
324) -> Result<PlanRef> {
325 let column_names = equality_delete_columns
326 .iter()
327 .map(|s| s.as_str())
328 .chain(std::iter::once(ICEBERG_SEQUENCE_NUM_COLUMN_NAME));
329 let column_catalogs = build_column_catalogs(column_names, column_catalog_map)?;
330 let source = scan.core.clone_with_column_catalog(column_catalogs);
331
332 let equality_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
333 source,
334 IcebergFileScanTask::EqualityDelete(equality_delete_files),
335 )
336 .into();
337
338 let data_columns_len = child.schema().len();
339 let build_inputs = |scan: &PlanRef, offset: usize| -> Result<(Vec<InputRef>, InputRef)> {
342 let delete_column_index_map = scan
343 .schema()
344 .fields()
345 .iter()
346 .enumerate()
347 .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
348 .collect::<std::collections::HashMap<_, _>>();
349 let delete_column_inputs = equality_delete_columns
350 .iter()
351 .map(|name| {
352 let (index, data_type) = delete_column_index_map
353 .get(name)
354 .with_context(|| format!("Delete column {} not found in scan schema", name))?;
355 Ok(InputRef {
356 index: offset + index,
357 data_type: (*data_type).clone(),
358 })
359 })
360 .collect::<Result<Vec<InputRef>>>()?;
361 let seq_num_inputs = InputRef {
362 index: scan
363 .schema()
364 .fields()
365 .iter()
366 .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
367 .context("Sequence number column not found in scan schema")?
368 + offset,
369 data_type: risingwave_common::types::DataType::Int64,
370 };
371 Ok((delete_column_inputs, seq_num_inputs))
372 };
373 let (left_delete_column_inputs, left_seq_num_input) = build_inputs(&child, 0)?;
374 let (right_delete_column_inputs, right_seq_num_input) =
375 build_inputs(&equality_delete_iceberg_scan, data_columns_len)?;
376
377 let mut eq_join_expr =
378 build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
379 eq_join_expr.push(
380 FunctionCall::new(
381 ExprType::LessThan,
382 vec![left_seq_num_input.into(), right_seq_num_input.into()],
383 )?
384 .into(),
385 );
386 let on = Condition {
387 conjunctions: eq_join_expr,
388 };
389 let join = LogicalJoin::new(
390 child,
391 equality_delete_iceberg_scan,
392 risingwave_pb::plan_common::JoinType::LeftAnti,
393 on,
394 );
395 Ok(join.into())
396}
397
398pub fn build_position_delete_hashjoin_scan(
399 scan: &LogicalIcebergIntermediateScan,
400 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
401 child: PlanRef,
402 position_delete_files: Vec<FileScanTask>,
403) -> Result<PlanRef> {
404 let delete_column_names = [ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME];
406 let column_catalogs = build_column_catalogs(delete_column_names.iter(), column_catalog_map)?;
407 let position_delete_source = scan.core.clone_with_column_catalog(column_catalogs);
408
409 let position_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
410 position_delete_source,
411 IcebergFileScanTask::PositionDelete(position_delete_files),
412 )
413 .into();
414 let data_columns_len = child.schema().len();
415
416 let build_inputs = |scan: &PlanRef, offset: usize| {
417 scan.schema()
418 .fields()
419 .iter()
420 .enumerate()
421 .filter_map(|(index, data_column)| {
422 if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
423 || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
424 {
425 Some(InputRef {
426 index: offset + index,
427 data_type: data_column.data_type(),
428 })
429 } else {
430 None
431 }
432 })
433 .collect::<Vec<InputRef>>()
434 };
435 let left_delete_column_inputs = build_inputs(&child, 0);
436 let right_delete_column_inputs = build_inputs(&position_delete_iceberg_scan, data_columns_len);
437 let eq_join_expr =
438 build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
439 let on = Condition {
440 conjunctions: eq_join_expr,
441 };
442 let join = LogicalJoin::new(
443 child,
444 position_delete_iceberg_scan,
445 risingwave_pb::plan_common::JoinType::LeftAnti,
446 on,
447 );
448 Ok(join.into())
449}