risingwave_frontend/optimizer/rule/
iceberg_intermediate_scan_rule.rs1use std::collections::HashMap;
34
35use anyhow::Context;
36use iceberg::scan::FileScanTask;
37use risingwave_common::catalog::{
38 ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
39 ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
40};
41use risingwave_common::util::iter_util::ZipEqFast;
42use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergSplitEnumerator};
43use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
44
45use super::prelude::{PlanRef, *};
46use crate::error::Result;
47use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
48use crate::optimizer::plan_node::generic::GenericPlanRef;
49use crate::optimizer::plan_node::{
50 Logical, LogicalIcebergIntermediateScan, LogicalIcebergScan, LogicalJoin, LogicalProject,
51 LogicalValues,
52};
53use crate::optimizer::rule::{ApplyResult, FallibleRule};
54use crate::utils::{ColIndexMapping, Condition, FRONTEND_RUNTIME};
55
56pub struct IcebergIntermediateScanRule;
57
58impl FallibleRule<Logical> for IcebergIntermediateScanRule {
59 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
60 let scan: &LogicalIcebergIntermediateScan =
61 match plan.as_logical_iceberg_intermediate_scan() {
62 Some(s) => s,
63 None => return ApplyResult::NotApplicable,
64 };
65
66 let Some(catalog) = scan.source_catalog() else {
67 return ApplyResult::NotApplicable;
68 };
69
70 let enumerator = if let ConnectorProperties::Iceberg(prop) =
72 ConnectorProperties::extract(catalog.with_properties.clone(), false)?
73 {
74 IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
75 } else {
76 return ApplyResult::NotApplicable;
77 };
78
79 #[cfg(madsim)]
80 return ApplyResult::Err(
81 crate::error::ErrorCode::BindError(
82 "iceberg_scan can't be used in the madsim mode".to_string(),
83 )
84 .into(),
85 );
86
87 #[cfg(not(madsim))]
88 {
89 use risingwave_connector::source::iceberg::IcebergListResult;
90
91 let list_result =
92 tokio::task::block_in_place(|| {
93 FRONTEND_RUNTIME.block_on(enumerator.list_scan_tasks(
94 Some(scan.time_travel_info.clone()),
95 scan.predicate.clone(),
96 ))
97 })?;
98 let Some(IcebergListResult {
99 mut data_files,
100 mut equality_delete_files,
101 mut position_delete_files,
102 equality_delete_columns,
103 }) = list_result
104 else {
105 tracing::info!(
106 "There is no valid snapshot for the Iceberg table, returning empty table plan"
107 );
108 return ApplyResult::Ok(empty_table_plan(&plan, scan));
109 };
110 if data_files.is_empty() {
111 tracing::info!(
112 "There is no data file for the Iceberg table, returning empty table plan"
113 );
114 return ApplyResult::Ok(empty_table_plan(&plan, scan));
115 }
116
117 let schema = data_files[0].schema.clone();
119 let mut projection_columns: Vec<&str> = scan
120 .output_columns
121 .iter()
122 .chain(&equality_delete_columns)
123 .map(|col| col.as_str())
124 .collect();
125 projection_columns.sort_unstable_by_key(|&s| schema.field_id_by_name(s));
126 projection_columns.dedup();
127 set_project_field_ids(&mut data_files, &schema, projection_columns.iter())?;
128 for file in &mut data_files {
129 file.deletes.clear();
130 }
131
132 let column_catalog_map: HashMap<&str, &ColumnCatalog> = catalog
133 .columns
134 .iter()
135 .map(|c| (c.column_desc.name.as_str(), c))
136 .collect();
137 if !equality_delete_files.is_empty() {
138 projection_columns.push(ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
139 }
140 if !position_delete_files.is_empty() {
141 projection_columns.push(ICEBERG_FILE_PATH_COLUMN_NAME);
142 projection_columns.push(ICEBERG_FILE_POS_COLUMN_NAME);
143 }
144 let column_catalogs =
145 build_column_catalogs(projection_columns.iter(), &column_catalog_map)?;
146 let core = scan.core.clone_with_column_catalog(column_catalogs);
147 let mut plan: PlanRef =
148 LogicalIcebergScan::new(core, IcebergFileScanTask::Data(data_files)).into();
149
150 if !equality_delete_files.is_empty() {
152 set_project_field_ids(
153 &mut equality_delete_files,
154 &schema,
155 equality_delete_columns.iter(),
156 )?;
157 plan = build_equality_delete_hashjoin_scan(
158 scan,
159 &column_catalog_map,
160 plan,
161 equality_delete_files,
162 equality_delete_columns,
163 )?;
164 }
165
166 if !position_delete_files.is_empty() {
168 set_project_field_ids(
169 &mut position_delete_files,
170 &schema,
171 std::iter::empty::<&str>(),
172 )?;
173 plan = build_position_delete_hashjoin_scan(
174 scan,
175 &column_catalog_map,
176 plan,
177 position_delete_files,
178 )?;
179 }
180
181 let schema_len = plan.schema().len();
183 let schema_names = plan.schema().fields.iter().map(|f| f.name.as_str());
184 let output_names = scan.output_columns.iter().map(|s| s.as_str());
185 if schema_len != scan.output_columns.len()
186 || !itertools::equal(schema_names, output_names)
187 {
188 let col_map: HashMap<&str, usize> = plan
189 .schema()
190 .fields
191 .iter()
192 .enumerate()
193 .map(|(idx, field)| (field.name.as_str(), idx))
194 .collect();
195 let output_col_idx: Vec<_> = scan
196 .output_columns
197 .iter()
198 .map(|col| {
199 col_map.get(col.as_str()).copied().with_context(|| {
200 format!("Output column {} not found in scan schema", col)
201 })
202 })
203 .try_collect()?;
204 let mapping = ColIndexMapping::with_remaining_columns(&output_col_idx, schema_len);
205 plan = LogicalProject::with_mapping(plan, mapping).into();
206 }
207
208 ApplyResult::Ok(plan)
209 }
210 }
211}
212
213impl IcebergIntermediateScanRule {
214 pub fn create() -> BoxedRule {
215 Box::new(IcebergIntermediateScanRule)
216 }
217}
218
219fn empty_table_plan(plan: &PlanRef, scan: &LogicalIcebergIntermediateScan) -> PlanRef {
221 LogicalValues::new(vec![], scan.schema().clone(), plan.ctx()).into()
222}
223
224fn build_column_catalogs(
226 column_names: impl Iterator<Item = impl AsRef<str>>,
227 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
228) -> Result<Vec<ColumnCatalog>> {
229 let res = column_names
230 .map(|name| {
231 let name = name.as_ref();
232 column_catalog_map
233 .get(name)
234 .map(|&c| c.clone())
235 .with_context(|| format!("Column catalog not found for column {}", name))
236 })
237 .try_collect()?;
238 Ok(res)
239}
240
241fn set_project_field_ids(
243 files: &mut [FileScanTask],
244 schema: &iceberg::spec::Schema,
245 column_names: impl Iterator<Item = impl AsRef<str>>,
246) -> Result<()> {
247 let project_field_ids: Vec<i32> = column_names
248 .map(|name| {
249 let name = name.as_ref();
250 schema
251 .field_id_by_name(name)
252 .with_context(|| format!("Column {} not found in data file schema", name))
253 })
254 .try_collect()?;
255 for file in files {
256 file.project_field_ids = project_field_ids.clone();
257 }
258 Ok(())
259}
260
261fn build_equal_conditions(
263 left_inputs: Vec<InputRef>,
264 right_inputs: Vec<InputRef>,
265) -> Result<Vec<ExprImpl>> {
266 left_inputs
267 .into_iter()
268 .zip_eq_fast(right_inputs.into_iter())
269 .map(|(left, right)| {
270 Ok(FunctionCall::new(ExprType::Equal, vec![left.into(), right.into()])?.into())
271 })
272 .collect()
273}
274
275pub fn build_equality_delete_hashjoin_scan(
276 scan: &LogicalIcebergIntermediateScan,
277 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
278 child: PlanRef,
279 equality_delete_files: Vec<FileScanTask>,
280 equality_delete_columns: Vec<String>,
281) -> Result<PlanRef> {
282 let column_names = equality_delete_columns
283 .iter()
284 .map(|s| s.as_str())
285 .chain(std::iter::once(ICEBERG_SEQUENCE_NUM_COLUMN_NAME));
286 let column_catalogs = build_column_catalogs(column_names, column_catalog_map)?;
287 let source = scan.core.clone_with_column_catalog(column_catalogs);
288
289 let equality_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
290 source,
291 IcebergFileScanTask::EqualityDelete(equality_delete_files),
292 )
293 .into();
294
295 let data_columns_len = child.schema().len();
296 let build_inputs = |scan: &PlanRef, offset: usize| -> Result<(Vec<InputRef>, InputRef)> {
299 let delete_column_index_map = scan
300 .schema()
301 .fields()
302 .iter()
303 .enumerate()
304 .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
305 .collect::<std::collections::HashMap<_, _>>();
306 let delete_column_inputs = equality_delete_columns
307 .iter()
308 .map(|name| {
309 let (index, data_type) = delete_column_index_map
310 .get(name)
311 .with_context(|| format!("Delete column {} not found in scan schema", name))?;
312 Ok(InputRef {
313 index: offset + index,
314 data_type: (*data_type).clone(),
315 })
316 })
317 .collect::<Result<Vec<InputRef>>>()?;
318 let seq_num_inputs = InputRef {
319 index: scan
320 .schema()
321 .fields()
322 .iter()
323 .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
324 .context("Sequence number column not found in scan schema")?
325 + offset,
326 data_type: risingwave_common::types::DataType::Int64,
327 };
328 Ok((delete_column_inputs, seq_num_inputs))
329 };
330 let (left_delete_column_inputs, left_seq_num_input) = build_inputs(&child, 0)?;
331 let (right_delete_column_inputs, right_seq_num_input) =
332 build_inputs(&equality_delete_iceberg_scan, data_columns_len)?;
333
334 let mut eq_join_expr =
335 build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
336 eq_join_expr.push(
337 FunctionCall::new(
338 ExprType::LessThan,
339 vec![left_seq_num_input.into(), right_seq_num_input.into()],
340 )?
341 .into(),
342 );
343 let on = Condition {
344 conjunctions: eq_join_expr,
345 };
346 let join = LogicalJoin::new(
347 child,
348 equality_delete_iceberg_scan,
349 risingwave_pb::plan_common::JoinType::LeftAnti,
350 on,
351 );
352 Ok(join.into())
353}
354
355pub fn build_position_delete_hashjoin_scan(
356 scan: &LogicalIcebergIntermediateScan,
357 column_catalog_map: &HashMap<&str, &ColumnCatalog>,
358 child: PlanRef,
359 position_delete_files: Vec<FileScanTask>,
360) -> Result<PlanRef> {
361 let delete_column_names = [ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME];
363 let column_catalogs = build_column_catalogs(delete_column_names.iter(), column_catalog_map)?;
364 let position_delete_source = scan.core.clone_with_column_catalog(column_catalogs);
365
366 let position_delete_iceberg_scan: PlanRef = LogicalIcebergScan::new(
367 position_delete_source,
368 IcebergFileScanTask::PositionDelete(position_delete_files),
369 )
370 .into();
371 let data_columns_len = child.schema().len();
372
373 let build_inputs = |scan: &PlanRef, offset: usize| {
374 scan.schema()
375 .fields()
376 .iter()
377 .enumerate()
378 .filter_map(|(index, data_column)| {
379 if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
380 || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
381 {
382 Some(InputRef {
383 index: offset + index,
384 data_type: data_column.data_type(),
385 })
386 } else {
387 None
388 }
389 })
390 .collect::<Vec<InputRef>>()
391 };
392 let left_delete_column_inputs = build_inputs(&child, 0);
393 let right_delete_column_inputs = build_inputs(&position_delete_iceberg_scan, data_columns_len);
394 let eq_join_expr =
395 build_equal_conditions(left_delete_column_inputs, right_delete_column_inputs)?;
396 let on = Condition {
397 conjunctions: eq_join_expr,
398 };
399 let join = LogicalJoin::new(
400 child,
401 position_delete_iceberg_scan,
402 risingwave_pb::plan_common::JoinType::LeftAnti,
403 on,
404 );
405 Ok(join.into())
406}