risingwave_frontend/optimizer/rule/
source_to_iceberg_scan_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{
16    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
17};
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_connector::source::iceberg::IcebergSplitEnumerator;
20use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22
23use super::{ApplyResult, BoxedRule, FallibleRule};
24use crate::error::Result;
25use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
26use crate::optimizer::PlanRef;
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
29use crate::optimizer::plan_node::{LogicalIcebergScan, LogicalJoin, LogicalSource};
30use crate::utils::{Condition, FRONTEND_RUNTIME};
31
32pub struct SourceToIcebergScanRule {}
33impl FallibleRule for SourceToIcebergScanRule {
34    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
35        let source: &LogicalSource = match plan.as_logical_source() {
36            Some(s) => s,
37            None => return ApplyResult::NotApplicable,
38        };
39        if source.core.is_iceberg_connector() {
40            let s = if let ConnectorProperties::Iceberg(prop) = ConnectorProperties::extract(
41                source
42                    .core
43                    .catalog
44                    .as_ref()
45                    .unwrap()
46                    .with_properties
47                    .clone(),
48                false,
49            )? {
50                IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
51            } else {
52                return ApplyResult::NotApplicable;
53            };
54
55            #[cfg(madsim)]
56            return ApplyResult::Err(
57                crate::error::ErrorCode::BindError(
58                    "iceberg_scan can't be used in the madsim mode".to_string(),
59                )
60                .into(),
61            );
62            #[cfg(not(madsim))]
63            {
64                let timezone = plan.ctx().get_session_timezone();
65                let time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
66                let (delete_column_names, have_position_delete) =
67                    tokio::task::block_in_place(|| {
68                        FRONTEND_RUNTIME.block_on(s.get_delete_parameters(time_travel_info))
69                    })?;
70                // data file scan
71                let mut data_iceberg_scan: PlanRef =
72                    LogicalIcebergScan::new(source, IcebergScanType::DataScan).into();
73                if !delete_column_names.is_empty() {
74                    data_iceberg_scan = build_equality_delete_hashjoin_scan(
75                        source,
76                        delete_column_names,
77                        data_iceberg_scan,
78                    )?;
79                }
80                if have_position_delete {
81                    data_iceberg_scan =
82                        build_position_delete_hashjoin_scan(source, data_iceberg_scan)?;
83                }
84                ApplyResult::Ok(data_iceberg_scan)
85            }
86        } else {
87            ApplyResult::NotApplicable
88        }
89    }
90}
91
92fn build_equality_delete_hashjoin_scan(
93    source: &LogicalSource,
94    delete_column_names: Vec<String>,
95    data_iceberg_scan: PlanRef,
96) -> Result<PlanRef> {
97    // equality delete scan
98    let column_catalog_map = source
99        .core
100        .column_catalog
101        .iter()
102        .map(|c| (&c.column_desc.name, c))
103        .collect::<std::collections::HashMap<_, _>>();
104    let column_catalog: Vec<_> = delete_column_names
105        .iter()
106        .chain(std::iter::once(
107            &ICEBERG_SEQUENCE_NUM_COLUMN_NAME.to_owned(),
108        ))
109        .map(|name| *column_catalog_map.get(&name).unwrap())
110        .cloned()
111        .collect();
112    let equality_delete_source = source.clone_with_column_catalog(column_catalog)?;
113    let equality_delete_iceberg_scan =
114        LogicalIcebergScan::new(&equality_delete_source, IcebergScanType::EqualityDeleteScan);
115
116    let data_columns_len = data_iceberg_scan.schema().len();
117    // The join condition is delete_column_names is equal and sequence number is less than, join type is left anti
118    let build_inputs = |scan: &PlanRef, offset: usize| {
119        let delete_column_index_map = scan
120            .schema()
121            .fields()
122            .iter()
123            .enumerate()
124            .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
125            .collect::<std::collections::HashMap<_, _>>();
126        let delete_column_inputs = delete_column_names
127            .iter()
128            .map(|name| {
129                let (index, data_type) = delete_column_index_map.get(name).unwrap();
130                InputRef {
131                    index: offset + index,
132                    data_type: (*data_type).clone(),
133                }
134            })
135            .collect::<Vec<InputRef>>();
136        let seq_num_inputs = InputRef {
137            index: scan
138                .schema()
139                .fields()
140                .iter()
141                .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
142                .unwrap()
143                + offset,
144            data_type: risingwave_common::types::DataType::Int64,
145        };
146        (delete_column_inputs, seq_num_inputs)
147    };
148    let (join_left_delete_column_inputs, join_left_seq_num_input) =
149        build_inputs(&data_iceberg_scan, 0);
150    let equality_delete_iceberg_scan = equality_delete_iceberg_scan.into();
151    let (join_right_delete_column_inputs, join_right_seq_num_input) =
152        build_inputs(&equality_delete_iceberg_scan, data_columns_len);
153
154    let mut eq_join_expr = join_left_delete_column_inputs
155        .iter()
156        .zip_eq_fast(join_right_delete_column_inputs.iter())
157        .map(|(left, right)| {
158            Ok(FunctionCall::new(
159                ExprType::Equal,
160                vec![left.clone().into(), right.clone().into()],
161            )?
162            .into())
163        })
164        .collect::<Result<Vec<ExprImpl>>>()?;
165    eq_join_expr.push(
166        FunctionCall::new(
167            ExprType::LessThan,
168            vec![
169                join_left_seq_num_input.into(),
170                join_right_seq_num_input.into(),
171            ],
172        )?
173        .into(),
174    );
175    let on = Condition {
176        conjunctions: eq_join_expr,
177    };
178    let join = LogicalJoin::new(
179        data_iceberg_scan,
180        equality_delete_iceberg_scan,
181        risingwave_pb::plan_common::JoinType::LeftAnti,
182        on,
183    );
184    Ok(join.into())
185}
186
187fn build_position_delete_hashjoin_scan(
188    source: &LogicalSource,
189    data_iceberg_scan: PlanRef,
190) -> Result<PlanRef> {
191    // FILE_PATH, FILE_POS
192    let column_catalog = source
193        .core
194        .column_catalog
195        .iter()
196        .filter(|c| {
197            c.column_desc.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
198                || c.column_desc.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
199        })
200        .cloned()
201        .collect();
202    let position_delete_source = source.clone_with_column_catalog(column_catalog)?;
203    let position_delete_iceberg_scan =
204        LogicalIcebergScan::new(&position_delete_source, IcebergScanType::PositionDeleteScan);
205    let data_columns_len = data_iceberg_scan.schema().len();
206
207    let build_inputs = |scan: &PlanRef, offset: usize| {
208        scan.schema()
209            .fields()
210            .iter()
211            .enumerate()
212            .filter_map(|(index, data_column)| {
213                if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
214                    || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
215                {
216                    Some(InputRef {
217                        index: offset + index,
218                        data_type: data_column.data_type(),
219                    })
220                } else {
221                    None
222                }
223            })
224            .collect::<Vec<InputRef>>()
225    };
226    let join_left_delete_column_inputs = build_inputs(&data_iceberg_scan, 0);
227    let position_delete_iceberg_scan = position_delete_iceberg_scan.into();
228    let join_right_delete_column_inputs =
229        build_inputs(&position_delete_iceberg_scan, data_columns_len);
230    let eq_join_expr = join_left_delete_column_inputs
231        .iter()
232        .zip_eq_fast(join_right_delete_column_inputs.iter())
233        .map(|(left, right)| {
234            Ok(FunctionCall::new(
235                ExprType::Equal,
236                vec![left.clone().into(), right.clone().into()],
237            )?
238            .into())
239        })
240        .collect::<Result<Vec<ExprImpl>>>()?;
241    let on = Condition {
242        conjunctions: eq_join_expr,
243    };
244    let join = LogicalJoin::new(
245        data_iceberg_scan,
246        position_delete_iceberg_scan,
247        risingwave_pb::plan_common::JoinType::LeftAnti,
248        on,
249    );
250    Ok(join.into())
251}
252
253impl SourceToIcebergScanRule {
254    pub fn create() -> BoxedRule {
255        Box::new(SourceToIcebergScanRule {})
256    }
257}