risingwave_frontend/optimizer/rule/
source_to_iceberg_scan_rule.rs1use risingwave_common::catalog::{
16 ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
17};
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_connector::source::iceberg::IcebergSplitEnumerator;
20use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22
23use super::{ApplyResult, BoxedRule, FallibleRule};
24use crate::error::Result;
25use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
26use crate::optimizer::PlanRef;
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
29use crate::optimizer::plan_node::{LogicalIcebergScan, LogicalJoin, LogicalSource};
30use crate::utils::{Condition, FRONTEND_RUNTIME};
31
32pub struct SourceToIcebergScanRule {}
33impl FallibleRule for SourceToIcebergScanRule {
34 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
35 let source: &LogicalSource = match plan.as_logical_source() {
36 Some(s) => s,
37 None => return ApplyResult::NotApplicable,
38 };
39 if source.core.is_iceberg_connector() {
40 let s = if let ConnectorProperties::Iceberg(prop) = ConnectorProperties::extract(
41 source
42 .core
43 .catalog
44 .as_ref()
45 .unwrap()
46 .with_properties
47 .clone(),
48 false,
49 )? {
50 IcebergSplitEnumerator::new_inner(*prop, SourceEnumeratorContext::dummy().into())
51 } else {
52 return ApplyResult::NotApplicable;
53 };
54
55 #[cfg(madsim)]
56 return ApplyResult::Err(
57 crate::error::ErrorCode::BindError(
58 "iceberg_scan can't be used in the madsim mode".to_string(),
59 )
60 .into(),
61 );
62 #[cfg(not(madsim))]
63 {
64 let timezone = plan.ctx().get_session_timezone();
65 let time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
66 let (delete_column_names, have_position_delete) =
67 tokio::task::block_in_place(|| {
68 FRONTEND_RUNTIME.block_on(s.get_delete_parameters(time_travel_info))
69 })?;
70 let mut data_iceberg_scan: PlanRef =
72 LogicalIcebergScan::new(source, IcebergScanType::DataScan).into();
73 if !delete_column_names.is_empty() {
74 data_iceberg_scan = build_equality_delete_hashjoin_scan(
75 source,
76 delete_column_names,
77 data_iceberg_scan,
78 )?;
79 }
80 if have_position_delete {
81 data_iceberg_scan =
82 build_position_delete_hashjoin_scan(source, data_iceberg_scan)?;
83 }
84 ApplyResult::Ok(data_iceberg_scan)
85 }
86 } else {
87 ApplyResult::NotApplicable
88 }
89 }
90}
91
92fn build_equality_delete_hashjoin_scan(
93 source: &LogicalSource,
94 delete_column_names: Vec<String>,
95 data_iceberg_scan: PlanRef,
96) -> Result<PlanRef> {
97 let column_catalog_map = source
99 .core
100 .column_catalog
101 .iter()
102 .map(|c| (&c.column_desc.name, c))
103 .collect::<std::collections::HashMap<_, _>>();
104 let column_catalog: Vec<_> = delete_column_names
105 .iter()
106 .chain(std::iter::once(
107 &ICEBERG_SEQUENCE_NUM_COLUMN_NAME.to_owned(),
108 ))
109 .map(|name| *column_catalog_map.get(&name).unwrap())
110 .cloned()
111 .collect();
112 let equality_delete_source = source.clone_with_column_catalog(column_catalog)?;
113 let equality_delete_iceberg_scan =
114 LogicalIcebergScan::new(&equality_delete_source, IcebergScanType::EqualityDeleteScan);
115
116 let data_columns_len = data_iceberg_scan.schema().len();
117 let build_inputs = |scan: &PlanRef, offset: usize| {
119 let delete_column_index_map = scan
120 .schema()
121 .fields()
122 .iter()
123 .enumerate()
124 .map(|(index, data_column)| (&data_column.name, (index, &data_column.data_type)))
125 .collect::<std::collections::HashMap<_, _>>();
126 let delete_column_inputs = delete_column_names
127 .iter()
128 .map(|name| {
129 let (index, data_type) = delete_column_index_map.get(name).unwrap();
130 InputRef {
131 index: offset + index,
132 data_type: (*data_type).clone(),
133 }
134 })
135 .collect::<Vec<InputRef>>();
136 let seq_num_inputs = InputRef {
137 index: scan
138 .schema()
139 .fields()
140 .iter()
141 .position(|f| f.name.eq(ICEBERG_SEQUENCE_NUM_COLUMN_NAME))
142 .unwrap()
143 + offset,
144 data_type: risingwave_common::types::DataType::Int64,
145 };
146 (delete_column_inputs, seq_num_inputs)
147 };
148 let (join_left_delete_column_inputs, join_left_seq_num_input) =
149 build_inputs(&data_iceberg_scan, 0);
150 let equality_delete_iceberg_scan = equality_delete_iceberg_scan.into();
151 let (join_right_delete_column_inputs, join_right_seq_num_input) =
152 build_inputs(&equality_delete_iceberg_scan, data_columns_len);
153
154 let mut eq_join_expr = join_left_delete_column_inputs
155 .iter()
156 .zip_eq_fast(join_right_delete_column_inputs.iter())
157 .map(|(left, right)| {
158 Ok(FunctionCall::new(
159 ExprType::Equal,
160 vec![left.clone().into(), right.clone().into()],
161 )?
162 .into())
163 })
164 .collect::<Result<Vec<ExprImpl>>>()?;
165 eq_join_expr.push(
166 FunctionCall::new(
167 ExprType::LessThan,
168 vec![
169 join_left_seq_num_input.into(),
170 join_right_seq_num_input.into(),
171 ],
172 )?
173 .into(),
174 );
175 let on = Condition {
176 conjunctions: eq_join_expr,
177 };
178 let join = LogicalJoin::new(
179 data_iceberg_scan,
180 equality_delete_iceberg_scan,
181 risingwave_pb::plan_common::JoinType::LeftAnti,
182 on,
183 );
184 Ok(join.into())
185}
186
187fn build_position_delete_hashjoin_scan(
188 source: &LogicalSource,
189 data_iceberg_scan: PlanRef,
190) -> Result<PlanRef> {
191 let column_catalog = source
193 .core
194 .column_catalog
195 .iter()
196 .filter(|c| {
197 c.column_desc.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
198 || c.column_desc.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
199 })
200 .cloned()
201 .collect();
202 let position_delete_source = source.clone_with_column_catalog(column_catalog)?;
203 let position_delete_iceberg_scan =
204 LogicalIcebergScan::new(&position_delete_source, IcebergScanType::PositionDeleteScan);
205 let data_columns_len = data_iceberg_scan.schema().len();
206
207 let build_inputs = |scan: &PlanRef, offset: usize| {
208 scan.schema()
209 .fields()
210 .iter()
211 .enumerate()
212 .filter_map(|(index, data_column)| {
213 if data_column.name.eq(ICEBERG_FILE_PATH_COLUMN_NAME)
214 || data_column.name.eq(ICEBERG_FILE_POS_COLUMN_NAME)
215 {
216 Some(InputRef {
217 index: offset + index,
218 data_type: data_column.data_type(),
219 })
220 } else {
221 None
222 }
223 })
224 .collect::<Vec<InputRef>>()
225 };
226 let join_left_delete_column_inputs = build_inputs(&data_iceberg_scan, 0);
227 let position_delete_iceberg_scan = position_delete_iceberg_scan.into();
228 let join_right_delete_column_inputs =
229 build_inputs(&position_delete_iceberg_scan, data_columns_len);
230 let eq_join_expr = join_left_delete_column_inputs
231 .iter()
232 .zip_eq_fast(join_right_delete_column_inputs.iter())
233 .map(|(left, right)| {
234 Ok(FunctionCall::new(
235 ExprType::Equal,
236 vec![left.clone().into(), right.clone().into()],
237 )?
238 .into())
239 })
240 .collect::<Result<Vec<ExprImpl>>>()?;
241 let on = Condition {
242 conjunctions: eq_join_expr,
243 };
244 let join = LogicalJoin::new(
245 data_iceberg_scan,
246 position_delete_iceberg_scan,
247 risingwave_pb::plan_common::JoinType::LeftAnti,
248 on,
249 );
250 Ok(join.into())
251}
252
253impl SourceToIcebergScanRule {
254 pub fn create() -> BoxedRule {
255 Box::new(SourceToIcebergScanRule {})
256 }
257}