risingwave_frontend/optimizer/rule/
iceberg_intermediate_scan_rule.rs1use std::collections::HashMap;
34
35use anyhow::Context;
36use iceberg::scan::FileScanTask;
37use iceberg::spec::{DataContentType, FormatVersion};
38use risingwave_common::catalog::{
39 ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
40 ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
41};
42use risingwave_common::util::iter_util::ZipEqFast;
43use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergSplitEnumerator};
44use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
45
46use super::prelude::{PlanRef, *};
47use crate::error::Result;
48use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
49use crate::optimizer::plan_node::generic::GenericPlanRef;
50use crate::optimizer::plan_node::{
51 Logical, LogicalIcebergIntermediateScan, LogicalIcebergScan, LogicalJoin, LogicalProject,
52 LogicalValues,
53};
54use crate::optimizer::rule::{ApplyResult, FallibleRule};
55use crate::utils::{ColIndexMapping, Condition, FRONTEND_RUNTIME};
56
57pub struct IcebergIntermediateScanRule;
58
59impl FallibleRule<Logical> for IcebergIntermediateScanRule {
60 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
61 let scan: &LogicalIcebergIntermediateScan =
62 match plan.as_logical_iceberg_intermediate_scan() {
63 Some(s) => s,
64 None => return ApplyResult::NotApplicable,
65 };
66
67 let Some(catalog) = scan.source_catalog() else {
68 return ApplyResult::NotApplicable;
69 };
70
71 let enumerator = if let ConnectorProperties::Iceberg(prop) =
73 ConnectorProperties::extract(catalog.with_properties.clone(), false)?
74 {
75 IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
76 } else {
77 return ApplyResult::NotApplicable;
78 };
79
80 #[cfg(madsim)]
81 return ApplyResult::Err(
82 crate::error::ErrorCode::BindError(
83 "iceberg_scan can't be used in the madsim mode".to_string(),
84 )
85 .into(),
86 );
87
88 #[cfg(not(madsim))]
89 {
90 use risingwave_connector::source::iceberg::IcebergListResult;
91
92 let list_result =
93 tokio::task::block_in_place(|| {
94 FRONTEND_RUNTIME.block_on(enumerator.list_scan_tasks(
95 Some(scan.time_travel_info.clone()),
96 scan.predicate.clone(),
97 ))
98 })?;
99 let Some(IcebergListResult {
100 mut data_files,
101 mut equality_delete_files,
102 mut position_delete_files,
103 equality_delete_columns,
104 format_version,
105 }) = list_result
106 else {
107 tracing::info!(
108 "There is no valid snapshot for the Iceberg table, returning empty table plan"
109 );
110 return ApplyResult::Ok(empty_table_plan(&plan, scan));
111 };
112 if data_files.is_empty() {
113 tracing::info!(
114 "There is no data file for the Iceberg table, returning empty table plan"
115 );
116 return ApplyResult::Ok(empty_table_plan(&plan, scan));
117 }
118
119 let schema = data_files[0].schema.clone();
121 let mut projection_columns: Vec<&str> = scan
122 .output_columns
123 .iter()
124 .chain(&equality_delete_columns)
125 .map(|col| col.as_str())
126 .collect();
127 projection_columns.sort_unstable_by_key(|&s| schema.field_id_by_name(s));
128 projection_columns.dedup();
129 set_project_field_ids(&mut data_files, &schema, projection_columns.iter())?;
130 match format_version {
131 FormatVersion::V1 | FormatVersion::V2 => {
132 for file in &mut data_files {
133 file.deletes.clear();
134 }
135 }
136 FormatVersion::V3 => {
137 for file in &mut data_files {
138 file.deletes.retain(|delete| {
139 delete.data_file_content == DataContentType::PositionDeletes
140 });
141 }
142 }
143 }
144
145 let column_catalog_map: HashMap<&str, &ColumnCatalog> = catalog
146 .columns
147 .iter()
148 .map(|c| (c.column_desc.name.as_str(), c))
149 .collect();
150 if !equality_delete_files.is_empty() {
151 projection_columns.push(ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
152 }
153 let use_position_delete_join =
154 !position_delete_files.is_empty() && format_version < FormatVersion::V3;
155 if use_position_delete_join {
156 projection_columns.push(ICEBERG_FILE_PATH_COLUMN_NAME);
157 projection_columns.push(ICEBERG_FILE_POS_COLUMN_NAME);
158 }
159 let column_catalogs =
160 build_column_catalogs(projection_columns.iter(), &column_catalog_map)?;
161 let core = scan.core.clone_with_column_catalog(column_catalogs);
162 let mut plan: PlanRef =
163 LogicalIcebergScan::new(core, IcebergFileScanTask::Data(data_files)).into();
164
165 if !equality_delete_files.is_empty() {
167 set_project_field_ids(
168 &mut equality_delete_files,
169 &schema,
170 equality_delete_columns.iter(),
171 )?;
172 plan = build_equality_delete_hashjoin_scan(
173 scan,
174 &column_catalog_map,
175 plan,
176 equality_delete_files,
177 equality_delete_columns,
178 )?;
179 }
180
181 if use_position_delete_join {
183 set_project_field_ids(
184 &mut position_delete_files,
185 &schema,
186 std::iter::empty::<&str>(),
187 )?;
188 plan = build_position_delete_hashjoin_scan(
189 scan,
190 &column_catalog_map,
191 plan,
192 position_delete_files,
193 )?;
194 }
195
196 let schema_len = plan.schema().len();
198 let schema_names = plan.schema().fields.iter().map(|f| f.name.as_str());
199 let output_names = scan.output_columns.iter().map(|s| s.as_str());
200 if schema_len != scan.output_columns.len()
201 || !itertools::equal(schema_names, output_names)
202 {
203 let col_map: HashMap<&str, usize> = plan
204 .schema()
205 .fields
206 .iter()
207 .enumerate()
208 .map(|(idx, field)| (field.name.as_str(), idx))
209 .collect();
210 let output_col_idx: Vec<_> = scan
211 .output_columns
212 .iter()
213 .map(|col| {
214 col_map.get(col.as_str()).copied().with_context(|| {
215 format!("Output column {} not found in scan schema", col)
216 })
217 })
218 .try_collect()?;
219 let mapping = ColIndexMapping::with_remaining_columns(&output_col_idx, schema_len);
220 plan = LogicalProject::with_mapping(plan, mapping).into();
221 }
222
223 ApplyResult::Ok(plan)
224 }
225 }
226}
227
228impl IcebergIntermediateScanRule {
229 pub fn create() -> BoxedRule {
230 Box::new(IcebergIntermediateScanRule)
231 }
232}
233
234fn empty_table_plan(plan: &PlanRef, scan: &LogicalIcebergIntermediateScan) -> PlanRef {
236 LogicalValues::new(vec![], scan.schema().clone(), plan.ctx()).into()
237}
238
239fn build_column_catalogs(
241 column_names: impl Iterator<Item = impl AsRef<str>>,
242 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
243) -> Result<Vec<ColumnCatalog>> {
244 let res = column_names
245 .map(|name| {
246 let name = name.as_ref();
247 column_catalog_map
248 .get(name)
249 .map(|&c| c.clone())
250 .with_context(|| format!("Column catalog not found for column {}", name))
251 })
252 .try_collect()?;
253 Ok(res)
254}
255
256fn set_project_field_ids(
258 files: &mut [FileScanTask],
259 schema: &iceberg::spec::Schema,
260 column_names: impl Iterator<Item = impl AsRef<str>>,
261) -> Result<()> {
262 let project_field_ids: Vec<i32> = column_names
263 .map(|name| {
264 let name = name.as_ref();
265 schema
266 .field_id_by_name(name)
267 .with_context(|| format!("Column {} not found in data file schema", name))
268 })
269 .try_collect()?;
270 for file in files {
271 file.project_field_ids = project_field_ids.clone();
272 }
273 Ok(())
274}
275
276fn build_equal_conditions(
278 left_inputs: Vec<InputRef>,
279 right_inputs: Vec<InputRef>,
280) -> Result<Vec<ExprImpl>> {
281 left_inputs
282 .into_iter()
283 .zip_eq_fast(right_inputs.into_iter())
284 .map(|(left, right)| {
285 Ok(FunctionCall::new(ExprType::Equal, vec![left.into(), right.into()])?.into())
286 })
287 .collect()
288}
289
290pub fn build_equality_delete_hashjoin_scan(
291 scan: &LogicalIcebergIntermediateScan,
292 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
293 child: PlanRef,
294 equality_delete_files: Vec<FileScanTask>,
295 equality_delete_columns: Vec<String>,
296) -> Result<PlanRef> {
297 let column_names = equality_delete_columns
298 .iter()
299 .map(|s| s.as_str())
300 .chain(std::iter::once(ICEBERG_SEQUENCE_NUM_COLUMN_NAME));
301 let column_catalogs = build_column_catalogs(column_names, column_catalog_map)?;
302 let source = scan.core.clone_with_column_catalog(column_catalogs);
303
304 let equality_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
305 source,
306 IcebergFileScanTask::EqualityDelete(equality_delete_files),
307 )
308 .into();
309
310 let data_columns_len = child.schema().len();
311 let build_inputs = |scan: &PlanRef, offset: usize| -> Result<(Vec<InputRef>, InputRef)> {
314 let delete_column_index_map = scan
315 .schema()
316 .fields()
317 .iter()
318 .enumerate()
319 .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
320 .collect::<std::collections::HashMap<_, _>>();
321 let delete_column_inputs = equality_delete_columns
322 .iter()
323 .map(|name| {
324 let (index, data_type) = delete_column_index_map
325 .get(name)
326 .with_context(|| format!("Delete column {} not found in scan schema", name))?;
327 Ok(InputRef {
328 index: offset + index,
329 data_type: (*data_type).clone(),
330 })
331 })
332 .collect::<Result<Vec<InputRef>>>()?;
333 let seq_num_inputs = InputRef {
334 index: scan
335 .schema()
336 .fields()
337 .iter()
338 .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
339 .context("Sequence number column not found in scan schema")?
340 + offset,
341 data_type: risingwave_common::types::DataType::Int64,
342 };
343 Ok((delete_column_inputs, seq_num_inputs))
344 };
345 let (left_delete_column_inputs, left_seq_num_input) = build_inputs(&child, 0)?;
346 let (right_delete_column_inputs, right_seq_num_input) =
347 build_inputs(&equality_delete_iceberg_scan, data_columns_len)?;
348
349 let mut eq_join_expr =
350 build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
351 eq_join_expr.push(
352 FunctionCall::new(
353 ExprType::LessThan,
354 vec![left_seq_num_input.into(), right_seq_num_input.into()],
355 )?
356 .into(),
357 );
358 let on = Condition {
359 conjunctions: eq_join_expr,
360 };
361 let join = LogicalJoin::new(
362 child,
363 equality_delete_iceberg_scan,
364 risingwave_pb::plan_common::JoinType::LeftAnti,
365 on,
366 );
367 Ok(join.into())
368}
369
370pub fn build_position_delete_hashjoin_scan(
371 scan: &LogicalIcebergIntermediateScan,
372 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
373 child: PlanRef,
374 position_delete_files: Vec<FileScanTask>,
375) -> Result<PlanRef> {
376 let delete_column_names = [ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME];
378 let column_catalogs = build_column_catalogs(delete_column_names.iter(), column_catalog_map)?;
379 let position_delete_source = scan.core.clone_with_column_catalog(column_catalogs);
380
381 let position_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
382 position_delete_source,
383 IcebergFileScanTask::PositionDelete(position_delete_files),
384 )
385 .into();
386 let data_columns_len = child.schema().len();
387
388 let build_inputs = |scan: &PlanRef, offset: usize| {
389 scan.schema()
390 .fields()
391 .iter()
392 .enumerate()
393 .filter_map(|(index, data_column)| {
394 if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
395 || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
396 {
397 Some(InputRef {
398 index: offset + index,
399 data_type: data_column.data_type(),
400 })
401 } else {
402 None
403 }
404 })
405 .collect::<Vec<InputRef>>()
406 };
407 let left_delete_column_inputs = build_inputs(&child, 0);
408 let right_delete_column_inputs = build_inputs(&position_delete_iceberg_scan, data_columns_len);
409 let eq_join_expr =
410 build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
411 let on = Condition {
412 conjunctions: eq_join_expr,
413 };
414 let join = LogicalJoin::new(
415 child,
416 position_delete_iceberg_scan,
417 risingwave_pb::plan_common::JoinType::LeftAnti,
418 on,
419 );
420 Ok(join.into())
421}