risingwave_frontend/optimizer/rule/
iceberg_engine_storage_selection_rule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! When `iceberg_query_storage_mode` is `auto`, this rule may rewrite
16//! `LogicalIcebergIntermediateScan` (columnar Iceberg) to `LogicalScan` (row Hummock)
17//! for Iceberg engine tables.
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use risingwave_common::session_config::IcebergQueryStorageMode;
23
24use super::prelude::{PlanRef, *};
25use crate::TableCatalog;
26use crate::catalog::source_catalog::SourceCatalog;
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::{Logical, LogicalIcebergIntermediateScan, LogicalScan, generic};
29use crate::optimizer::rule::InfallibleRule;
30use crate::session::SessionImpl;
31
32pub struct IcebergEngineStorageSelectionRule;
33
34impl InfallibleRule<Logical> for IcebergEngineStorageSelectionRule {
35    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
36        let scan = plan.as_logical_iceberg_intermediate_scan()?;
37        let ctx = plan.ctx();
38        let session = ctx.session_ctx();
39
40        // Only apply when storage mode is auto.
41        if session.config().iceberg_query_storage_mode() != IcebergQueryStorageMode::Auto {
42            return None;
43        }
44        let source_catalog = scan.source_catalog()?;
45        let table = get_table_from_iceberg_source(session, source_catalog)?;
46
47        let prefer_rowstore = check_point_lookup(scan, &table);
48        if !prefer_rowstore {
49            return None;
50        }
51
52        rewrite_to_table_scan(scan, &table)
53    }
54}
55
56impl IcebergEngineStorageSelectionRule {
57    pub fn create() -> BoxedRule {
58        Box::new(IcebergEngineStorageSelectionRule)
59    }
60}
61
62/// Rewrite the intermediate Iceberg scan to a Hummock `LogicalScan`.
63fn rewrite_to_table_scan(
64    scan: &LogicalIcebergIntermediateScan,
65    table: &Arc<TableCatalog>,
66) -> Option<PlanRef> {
67    // output_column_mapping already maps to table-column indices (built at
68    // construction time), so we can use it and origin_condition directly.
69    let output_col_idx = scan
70        .hummock_rewrite
71        .output_column_mapping
72        .to_parts()
73        .0
74        .iter()
75        .copied()
76        .try_collect()?;
77    let table_scan = generic::TableScan::new(
78        output_col_idx,
79        table.clone(),
80        vec![],
81        vec![],
82        scan.ctx(),
83        scan.hummock_rewrite.origin_condition.clone(),
84        scan.core.as_of.clone(),
85    );
86    Some(LogicalScan::from(table_scan).into())
87}
88
89fn get_table_from_iceberg_source(
90    session: &SessionImpl,
91    source_catalog: &SourceCatalog,
92) -> Option<Arc<TableCatalog>> {
93    let catalog_reader = session.env().catalog_reader().read_guard();
94    let schema = catalog_reader
95        .get_schema_by_id(source_catalog.database_id, source_catalog.schema_id)
96        .ok()?;
97    let table_name = source_catalog.iceberg_table_name()?;
98    let table = schema.get_created_table_by_name(&table_name)?;
99    Some(table.clone())
100}
101
102/// Returns `true` when the predicate has equality-to-constant conditions on
103/// *all* PK columns of the table, making this a point lookup that benefits
104/// from the row store's key-value access pattern.
105fn check_point_lookup(scan: &LogicalIcebergIntermediateScan, table: &TableCatalog) -> bool {
106    let pk_column_names: HashSet<&str> = table.pk_column_names().into_iter().collect();
107    if pk_column_names.is_empty() {
108        return false;
109    }
110
111    // origin_condition is already in table-column index space.
112    let eq_input_refs = scan
113        .hummock_rewrite
114        .origin_condition
115        .get_eq_const_input_refs();
116    let eq_col_names: HashSet<&str> = eq_input_refs
117        .iter()
118        .filter_map(|input_ref| table.columns().get(input_ref.index()))
119        .filter(|c| !c.is_hidden())
120        .map(|c| c.name.as_str())
121        .collect();
122
123    // All PK columns must be covered by equality predicates.
124    pk_column_names.is_subset(&eq_col_names)
125}