risingwave_frontend/optimizer/rule/
iceberg_engine_storage_selection_rule.rs1use std::collections::HashSet;
20use std::sync::Arc;
21
22use risingwave_common::session_config::IcebergQueryStorageMode;
23
24use super::prelude::{PlanRef, *};
25use crate::TableCatalog;
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::{Logical, LogicalIcebergIntermediateScan, LogicalScan, generic};
29use crate::optimizer::rule::InfallibleRule;
30use crate::session::SessionImpl;
31
32pub struct IcebergEngineStorageSelectionRule;
33
34impl InfallibleRule<Logical> for IcebergEngineStorageSelectionRule {
35 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
36 let scan = plan.as_logical_iceberg_intermediate_scan()?;
37 let ctx = plan.ctx();
38 let session = ctx.session_ctx();
39
40 if session.config().iceberg_query_storage_mode() != IcebergQueryStorageMode::Auto {
42 return None;
43 }
44 let source_catalog = scan.source_catalog()?;
45 let table = get_table_from_iceberg_source(session, source_catalog)?;
46
47 let prefer_rowstore = check_point_lookup(scan, &table);
48 if !prefer_rowstore {
49 return None;
50 }
51
52 rewrite_to_table_scan(scan, &table)
53 }
54}
55
56impl IcebergEngineStorageSelectionRule {
57 pub fn create() -> BoxedRule {
58 Box::new(IcebergEngineStorageSelectionRule)
59 }
60}
61
62fn rewrite_to_table_scan(
64 scan: &LogicalIcebergIntermediateScan,
65 table: &Arc<TableCatalog>,
66) -> Option<PlanRef> {
67 let output_col_idx = scan
70 .hummock_rewrite
71 .output_column_mapping
72 .to_parts()
73 .0
74 .iter()
75 .copied()
76 .try_collect()?;
77 let table_scan = generic::TableScan::new(
78 output_col_idx,
79 table.clone(),
80 vec![],
81 vec![],
82 scan.ctx(),
83 scan.hummock_rewrite.origin_condition.clone(),
84 scan.core.as_of.clone(),
85 );
86 Some(LogicalScan::from(table_scan).into())
87}
88
89fn get_table_from_iceberg_source(
90 session: &SessionImpl,
91 source_catalog: &SourceCatalog,
92) -> Option<Arc<TableCatalog>> {
93 let catalog_reader = session.env().catalog_reader().read_guard();
94 let schema = catalog_reader
95 .get_schema_by_id(source_catalog.database_id, source_catalog.schema_id)
96 .ok()?;
97 let table_name = source_catalog.iceberg_table_name()?;
98 let table = schema.get_created_table_by_name(&table_name)?;
99 Some(table.clone())
100}
101
102fn check_point_lookup(scan: &LogicalIcebergIntermediateScan, table: &TableCatalog) -> bool {
106 let pk_column_names: HashSet<&str> = table.pk_column_names().into_iter().collect();
107 if pk_column_names.is_empty() {
108 return false;
109 }
110
111 let eq_input_refs = scan
113 .hummock_rewrite
114 .origin_condition
115 .get_eq_const_input_refs();
116 let eq_col_names: HashSet<&str> = eq_input_refs
117 .iter()
118 .filter_map(|input_ref| table.columns().get(input_ref.index()))
119 .filter(|c| !c.is_hidden())
120 .map(|c| c.name.as_str())
121 .collect();
122
123 pk_column_names.is_subset(&eq_col_names)
125}