risingwave_frontend/optimizer/rule/
source_to_iceberg_intermediate_scan_rule.rs1use std::collections::HashMap;
26
27#[cfg(not(madsim))]
28use risingwave_connector::source::ConnectorProperties;
29#[cfg(not(madsim))]
30use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
31
32use super::prelude::{PlanRef, *};
33use crate::error::Result;
34#[cfg(not(madsim))]
35use crate::optimizer::plan_node::LogicalSource;
36#[cfg(not(madsim))]
37use crate::optimizer::plan_node::LogicalValues;
38use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
39use crate::optimizer::plan_node::{Logical, LogicalIcebergIntermediateScan};
40use crate::optimizer::rule::{ApplyResult, FallibleRule};
41
42pub struct SourceToIcebergIntermediateScanRule;
43
44impl FallibleRule<Logical> for SourceToIcebergIntermediateScanRule {
45 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
46 let Some(source) = plan.as_logical_source() else {
47 return ApplyResult::NotApplicable;
48 };
49
50 if !source.core.is_iceberg_connector() {
51 return ApplyResult::NotApplicable;
52 }
53
54 #[cfg(madsim)]
55 return ApplyResult::Err(
56 crate::error::ErrorCode::BindError(
57 "iceberg_scan can't be used in the madsim mode".to_string(),
58 )
59 .into(),
60 );
61
62 #[cfg(not(madsim))]
63 {
64 let timezone = plan.ctx().get_session_timezone();
66 let mut time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
67 if time_travel_info.is_none() {
68 time_travel_info =
69 fetch_current_snapshot_id(&mut plan.ctx().iceberg_snapshot_id_map(), source)?
70 .map(IcebergTimeTravelInfo::Version);
71 }
72 let Some(time_travel_info) = time_travel_info else {
73 return ApplyResult::Ok(
74 LogicalValues::new(vec![], plan.schema().clone(), plan.ctx()).into(),
75 );
76 };
77 let intermediate_scan = LogicalIcebergIntermediateScan::new(source, time_travel_info);
78 ApplyResult::Ok(intermediate_scan.into())
79 }
80 }
81}
82
83#[cfg(not(madsim))]
84fn fetch_current_snapshot_id(
85 map: &mut HashMap<String, Option<i64>>,
86 source: &LogicalSource,
87) -> Result<Option<i64>> {
88 let catalog = source.source_catalog().ok_or_else(|| {
89 crate::error::ErrorCode::InternalError(
90 "Iceberg source must have a valid source catalog".to_owned(),
91 )
92 })?;
93 let name = catalog.name.as_str();
94 if let Some(&snapshot_id) = map.get(name) {
95 return Ok(snapshot_id);
96 }
97
98 let ConnectorProperties::Iceberg(prop) =
99 ConnectorProperties::extract(catalog.with_properties.clone(), false)?
100 else {
101 return Err(crate::error::ErrorCode::InternalError(
102 "Iceberg source must have Iceberg connector properties".to_owned(),
103 )
104 .into());
105 };
106
107 let snapshot_id = tokio::task::block_in_place(|| {
108 crate::utils::FRONTEND_RUNTIME.block_on(async {
109 prop.load_table()
110 .await
111 .map(|table| table.metadata().current_snapshot_id())
112 })
113 })?;
114 map.insert(name.to_owned(), snapshot_id);
115 Ok(snapshot_id)
116}
117
118impl SourceToIcebergIntermediateScanRule {
119 pub fn create() -> BoxedRule {
120 Box::new(SourceToIcebergIntermediateScanRule)
121 }
122}