risingwave_frontend/optimizer/rule/
source_to_iceberg_scan_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{
16    ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
17};
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_connector::source::iceberg::{IcebergDeleteParameters, IcebergSplitEnumerator};
20use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22
23use super::prelude::{PlanRef, *};
24use crate::error::Result;
25use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
26use crate::optimizer::plan_node::generic::GenericPlanRef;
27use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
28use crate::optimizer::plan_node::{Logical, LogicalIcebergScan, LogicalJoin, LogicalSource};
29use crate::optimizer::rule::{ApplyResult, FallibleRule};
30use crate::utils::{Condition, FRONTEND_RUNTIME};
31
32pub struct SourceToIcebergScanRule {}
33impl FallibleRule<Logical> for SourceToIcebergScanRule {
34    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
35        let source: &LogicalSource = match plan.as_logical_source() {
36            Some(s) => s,
37            None => return ApplyResult::NotApplicable,
38        };
39        if source.core.is_iceberg_connector() {
40            let s = if let ConnectorProperties::Iceberg(prop) = ConnectorProperties::extract(
41                source
42                    .core
43                    .catalog
44                    .as_ref()
45                    .unwrap()
46                    .with_properties
47                    .clone(),
48                false,
49            )? {
50                IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
51            } else {
52                return ApplyResult::NotApplicable;
53            };
54
55            #[cfg(madsim)]
56            return ApplyResult::Err(
57                crate::error::ErrorCode::BindError(
58                    "iceberg_scan can't be used in the madsim mode".to_string(),
59                )
60                .into(),
61            );
62            #[cfg(not(madsim))]
63            {
64                let timezone = plan.ctx().get_session_timezone();
65                let time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
66                let delete_parameters: IcebergDeleteParameters =
67                    tokio::task::block_in_place(|| {
68                        FRONTEND_RUNTIME.block_on(s.get_delete_parameters(time_travel_info))
69                    })?;
70                // data file scan
71                let mut data_iceberg_scan: PlanRef = LogicalIcebergScan::new(
72                    source,
73                    IcebergScanType::DataScan,
74                    delete_parameters.snapshot_id,
75                )
76                .into();
77                if !delete_parameters.equality_delete_columns.is_empty() {
78                    data_iceberg_scan = build_equality_delete_hashjoin_scan(
79                        source,
80                        delete_parameters.equality_delete_columns,
81                        data_iceberg_scan,
82                        delete_parameters.snapshot_id,
83                    )?;
84                }
85                if delete_parameters.has_position_delete {
86                    data_iceberg_scan = build_position_delete_hashjoin_scan(
87                        source,
88                        data_iceberg_scan,
89                        delete_parameters.snapshot_id,
90                    )?;
91                }
92                ApplyResult::Ok(data_iceberg_scan)
93            }
94        } else {
95            ApplyResult::NotApplicable
96        }
97    }
98}
99
100fn build_equality_delete_hashjoin_scan(
101    source: &LogicalSource,
102    delete_column_names: Vec<String>,
103    data_iceberg_scan: PlanRef,
104    snapshot_id: Option<i64>,
105) -> Result<PlanRef> {
106    // equality delete scan
107    let column_catalog_map = source
108        .core
109        .column_catalog
110        .iter()
111        .map(|c| (&c.column_desc.name, c))
112        .collect::<std::collections::HashMap<_, _>>();
113    let column_catalog: Vec<_> = delete_column_names
114        .iter()
115        .chain(std::iter::once(
116            &ICEBERG_SEQUENCE_NUM_COLUMN_NAME.to_owned(),
117        ))
118        .map(|name| *column_catalog_map.get(&name).unwrap())
119        .cloned()
120        .collect();
121    let equality_delete_source = source.clone_with_column_catalog(column_catalog)?;
122    let equality_delete_iceberg_scan = LogicalIcebergScan::new(
123        &equality_delete_source,
124        IcebergScanType::EqualityDeleteScan,
125        snapshot_id,
126    );
127
128    let data_columns_len = data_iceberg_scan.schema().len();
129    // The join condition is delete_column_names is equal and sequence number is less than, join type is left anti
130    let build_inputs = |scan: &PlanRef, offset: usize| {
131        let delete_column_index_map = scan
132            .schema()
133            .fields()
134            .iter()
135            .enumerate()
136            .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
137            .collect::<std::collections::HashMap<_, _>>();
138        let delete_column_inputs = delete_column_names
139            .iter()
140            .map(|name| {
141                let (index, data_type) = delete_column_index_map.get(name).unwrap();
142                InputRef {
143                    index: offset + index,
144                    data_type: (*data_type).clone(),
145                }
146            })
147            .collect::<Vec<InputRef>>();
148        let seq_num_inputs = InputRef {
149            index: scan
150                .schema()
151                .fields()
152                .iter()
153                .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
154                .unwrap()
155                + offset,
156            data_type: risingwave_common::types::DataType::Int64,
157        };
158        (delete_column_inputs, seq_num_inputs)
159    };
160    let (join_left_delete_column_inputs, join_left_seq_num_input) =
161        build_inputs(&data_iceberg_scan, 0);
162    let equality_delete_iceberg_scan = equality_delete_iceberg_scan.into();
163    let (join_right_delete_column_inputs, join_right_seq_num_input) =
164        build_inputs(&equality_delete_iceberg_scan, data_columns_len);
165
166    let mut eq_join_expr = join_left_delete_column_inputs
167        .iter()
168        .zip_eq_fast(join_right_delete_column_inputs.iter())
169        .map(|(left, right)| {
170            Ok(FunctionCall::new(
171                ExprType::Equal,
172                vec![left.clone().into(), right.clone().into()],
173            )?
174            .into())
175        })
176        .collect::<Result<Vec<ExprImpl>>>()?;
177    eq_join_expr.push(
178        FunctionCall::new(
179            ExprType::LessThan,
180            vec![
181                join_left_seq_num_input.into(),
182                join_right_seq_num_input.into(),
183            ],
184        )?
185        .into(),
186    );
187    let on = Condition {
188        conjunctions: eq_join_expr,
189    };
190    let join = LogicalJoin::new(
191        data_iceberg_scan,
192        equality_delete_iceberg_scan,
193        risingwave_pb::plan_common::JoinType::LeftAnti,
194        on,
195    );
196    Ok(join.into())
197}
198
199fn build_position_delete_hashjoin_scan(
200    source: &LogicalSource,
201    data_iceberg_scan: PlanRef,
202    snapshot_id: Option<i64>,
203) -> Result<PlanRef> {
204    // FILE_PATH, FILE_POS
205    let column_catalog = source
206        .core
207        .column_catalog
208        .iter()
209        .filter(|c| {
210            c.column_desc.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
211                || c.column_desc.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
212        })
213        .cloned()
214        .collect();
215    let position_delete_source = source.clone_with_column_catalog(column_catalog)?;
216    let position_delete_iceberg_scan = LogicalIcebergScan::new(
217        &position_delete_source,
218        IcebergScanType::PositionDeleteScan,
219        snapshot_id,
220    );
221    let data_columns_len = data_iceberg_scan.schema().len();
222
223    let build_inputs = |scan: &PlanRef, offset: usize| {
224        scan.schema()
225            .fields()
226            .iter()
227            .enumerate()
228            .filter_map(|(index, data_column)| {
229                if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
230                    || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
231                {
232                    Some(InputRef {
233                        index: offset + index,
234                        data_type: data_column.data_type(),
235                    })
236                } else {
237                    None
238                }
239            })
240            .collect::<Vec<InputRef>>()
241    };
242    let join_left_delete_column_inputs = build_inputs(&data_iceberg_scan, 0);
243    let position_delete_iceberg_scan = position_delete_iceberg_scan.into();
244    let join_right_delete_column_inputs =
245        build_inputs(&position_delete_iceberg_scan, data_columns_len);
246    let eq_join_expr = join_left_delete_column_inputs
247        .iter()
248        .zip_eq_fast(join_right_delete_column_inputs.iter())
249        .map(|(left, right)| {
250            Ok(FunctionCall::new(
251                ExprType::Equal,
252                vec![left.clone().into(), right.clone().into()],
253            )?
254            .into())
255        })
256        .collect::<Result<Vec<ExprImpl>>>()?;
257    let on = Condition {
258        conjunctions: eq_join_expr,
259    };
260    let join = LogicalJoin::new(
261        data_iceberg_scan,
262        position_delete_iceberg_scan,
263        risingwave_pb::plan_common::JoinType::LeftAnti,
264        on,
265    );
266    Ok(join.into())
267}
268
269impl SourceToIcebergScanRule {
270    pub fn create() -> BoxedRule {
271        Box::new(SourceToIcebergScanRule {})
272    }
273}