risingwave_frontend/optimizer/rule/
source_to_iceberg_scan_rule.rs1use risingwave_common::catalog::{
16 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
17};
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_connector::source::iceberg::{IcebergDeleteParameters, IcebergSplitEnumerator};
20use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22
23use super::prelude::{PlanRef, *};
24use crate::error::Result;
25use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
26use crate::optimizer::plan_node::generic::GenericPlanRef;
27use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
28use crate::optimizer::plan_node::{Logical, LogicalIcebergScan, LogicalJoin, LogicalSource};
29use crate::optimizer::rule::{ApplyResult, FallibleRule};
30use crate::utils::{Condition, FRONTEND_RUNTIME};
31
32pub struct SourceToIcebergScanRule {}
33impl FallibleRule<Logical> for SourceToIcebergScanRule {
34 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
35 let source: &LogicalSource = match plan.as_logical_source() {
36 Some(s) => s,
37 None => return ApplyResult::NotApplicable,
38 };
39 if source.core.is_iceberg_connector() {
40 let s = if let ConnectorProperties::Iceberg(prop) = ConnectorProperties::extract(
41 source
42 .core
43 .catalog
44 .as_ref()
45 .unwrap()
46 .with_properties
47 .clone(),
48 false,
49 )? {
50 IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
51 } else {
52 return ApplyResult::NotApplicable;
53 };
54
55 #[cfg(madsim)]
56 return ApplyResult::Err(
57 crate::error::ErrorCode::BindError(
58 "iceberg_scan can't be used in the madsim mode".to_string(),
59 )
60 .into(),
61 );
62 #[cfg(not(madsim))]
63 {
64 let timezone = plan.ctx().get_session_timezone();
65 let time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
66 let delete_parameters: IcebergDeleteParameters =
67 tokio::task::block_in_place(|| {
68 FRONTEND_RUNTIME.block_on(s.get_delete_parameters(time_travel_info))
69 })?;
70 let mut data_iceberg_scan: PlanRef = LogicalIcebergScan::new(
72 source,
73 IcebergScanType::DataScan,
74 delete_parameters.snapshot_id,
75 )
76 .into();
77 if !delete_parameters.equality_delete_columns.is_empty() {
78 data_iceberg_scan = build_equality_delete_hashjoin_scan(
79 source,
80 delete_parameters.equality_delete_columns,
81 data_iceberg_scan,
82 delete_parameters.snapshot_id,
83 )?;
84 }
85 if delete_parameters.has_position_delete {
86 data_iceberg_scan = build_position_delete_hashjoin_scan(
87 source,
88 data_iceberg_scan,
89 delete_parameters.snapshot_id,
90 )?;
91 }
92 ApplyResult::Ok(data_iceberg_scan)
93 }
94 } else {
95 ApplyResult::NotApplicable
96 }
97 }
98}
99
100fn build_equality_delete_hashjoin_scan(
101 source: &LogicalSource,
102 delete_column_names: Vec<String>,
103 data_iceberg_scan: PlanRef,
104 snapshot_id: Option<i64>,
105) -> Result<PlanRef> {
106 let column_catalog_map = source
108 .core
109 .column_catalog
110 .iter()
111 .map(|c| (&c.column_desc.name, c))
112 .collect::<std::collections::HashMap<_, _>>();
113 let column_catalog: Vec<_> = delete_column_names
114 .iter()
115 .chain(std::iter::once(
116 &ICEBERG_SEQUENCE_NUM_COLUMN_NAME.to_owned(),
117 ))
118 .map(|name| *column_catalog_map.get(&name).unwrap())
119 .cloned()
120 .collect();
121 let equality_delete_source = source.clone_with_column_catalog(column_catalog)?;
122 let equality_delete_iceberg_scan = LogicalIcebergScan::new(
123 &equality_delete_source,
124 IcebergScanType::EqualityDeleteScan,
125 snapshot_id,
126 );
127
128 let data_columns_len = data_iceberg_scan.schema().len();
129 let build_inputs = |scan: &PlanRef, offset: usize| {
131 let delete_column_index_map = scan
132 .schema()
133 .fields()
134 .iter()
135 .enumerate()
136 .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
137 .collect::<std::collections::HashMap<_, _>>();
138 let delete_column_inputs = delete_column_names
139 .iter()
140 .map(|name| {
141 let (index, data_type) = delete_column_index_map.get(name).unwrap();
142 InputRef {
143 index: offset + index,
144 data_type: (*data_type).clone(),
145 }
146 })
147 .collect::<Vec<InputRef>>();
148 let seq_num_inputs = InputRef {
149 index: scan
150 .schema()
151 .fields()
152 .iter()
153 .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
154 .unwrap()
155 + offset,
156 data_type: risingwave_common::types::DataType::Int64,
157 };
158 (delete_column_inputs, seq_num_inputs)
159 };
160 let (join_left_delete_column_inputs, join_left_seq_num_input) =
161 build_inputs(&data_iceberg_scan, 0);
162 let equality_delete_iceberg_scan = equality_delete_iceberg_scan.into();
163 let (join_right_delete_column_inputs, join_right_seq_num_input) =
164 build_inputs(&equality_delete_iceberg_scan, data_columns_len);
165
166 let mut eq_join_expr = join_left_delete_column_inputs
167 .iter()
168 .zip_eq_fast(join_right_delete_column_inputs.iter())
169 .map(|(left, right)| {
170 Ok(FunctionCall::new(
171 ExprType::Equal,
172 vec![left.clone().into(), right.clone().into()],
173 )?
174 .into())
175 })
176 .collect::<Result<Vec<ExprImpl>>>()?;
177 eq_join_expr.push(
178 FunctionCall::new(
179 ExprType::LessThan,
180 vec![
181 join_left_seq_num_input.into(),
182 join_right_seq_num_input.into(),
183 ],
184 )?
185 .into(),
186 );
187 let on = Condition {
188 conjunctions: eq_join_expr,
189 };
190 let join = LogicalJoin::new(
191 data_iceberg_scan,
192 equality_delete_iceberg_scan,
193 risingwave_pb::plan_common::JoinType::LeftAnti,
194 on,
195 );
196 Ok(join.into())
197}
198
199fn build_position_delete_hashjoin_scan(
200 source: &LogicalSource,
201 data_iceberg_scan: PlanRef,
202 snapshot_id: Option<i64>,
203) -> Result<PlanRef> {
204 let column_catalog = source
206 .core
207 .column_catalog
208 .iter()
209 .filter(|c| {
210 c.column_desc.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
211 || c.column_desc.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
212 })
213 .cloned()
214 .collect();
215 let position_delete_source = source.clone_with_column_catalog(column_catalog)?;
216 let position_delete_iceberg_scan = LogicalIcebergScan::new(
217 &position_delete_source,
218 IcebergScanType::PositionDeleteScan,
219 snapshot_id,
220 );
221 let data_columns_len = data_iceberg_scan.schema().len();
222
223 let build_inputs = |scan: &PlanRef, offset: usize| {
224 scan.schema()
225 .fields()
226 .iter()
227 .enumerate()
228 .filter_map(|(index, data_column)| {
229 if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
230 || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
231 {
232 Some(InputRef {
233 index: offset + index,
234 data_type: data_column.data_type(),
235 })
236 } else {
237 None
238 }
239 })
240 .collect::<Vec<InputRef>>()
241 };
242 let join_left_delete_column_inputs = build_inputs(&data_iceberg_scan, 0);
243 let position_delete_iceberg_scan = position_delete_iceberg_scan.into();
244 let join_right_delete_column_inputs =
245 build_inputs(&position_delete_iceberg_scan, data_columns_len);
246 let eq_join_expr = join_left_delete_column_inputs
247 .iter()
248 .zip_eq_fast(join_right_delete_column_inputs.iter())
249 .map(|(left, right)| {
250 Ok(FunctionCall::new(
251 ExprType::Equal,
252 vec![left.clone().into(), right.clone().into()],
253 )?
254 .into())
255 })
256 .collect::<Result<Vec<ExprImpl>>>()?;
257 let on = Condition {
258 conjunctions: eq_join_expr,
259 };
260 let join = LogicalJoin::new(
261 data_iceberg_scan,
262 position_delete_iceberg_scan,
263 risingwave_pb::plan_common::JoinType::LeftAnti,
264 on,
265 );
266 Ok(join.into())
267}
268
269impl SourceToIcebergScanRule {
270 pub fn create() -> BoxedRule {
271 Box::new(SourceToIcebergScanRule {})
272 }
273}