risingwave_frontend/planner/
relation.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use itertools::Itertools;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{
22    ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::session_config::IcebergQueryStorageMode;
25use risingwave_common::types::{DataType, Interval, ScalarImpl};
26use risingwave_connector::source::ConnectorProperties;
27use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
28use risingwave_sqlparser::ast::AsOf;
29
30use crate::TableCatalog;
31use crate::binder::{
32    BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput, BoundSource,
33    BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
34};
35use crate::catalog::source_catalog::SourceCatalog;
36use crate::error::{ErrorCode, Result};
37use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
38use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind};
39use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
40use crate::optimizer::plan_node::{
41    LogicalApply, LogicalGapFill, LogicalHopWindow, LogicalIcebergIntermediateScan, LogicalJoin,
42    LogicalPlanRef as PlanRef, LogicalProject, LogicalScan, LogicalShare, LogicalSource,
43    LogicalSysScan, LogicalTableFunction, LogicalValues,
44};
45use crate::optimizer::property::Cardinality;
46use crate::planner::{PlanFor, Planner};
47use crate::utils::{ColIndexMapping, Condition};
48
49const ERROR_WINDOW_SIZE_ARG: &str =
50    "The size arg of window table function should be an interval literal.";
51
52impl Planner {
53    pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
54        match relation {
55            Relation::BaseTable(t) => self.plan_base_table(&t),
56            Relation::SystemTable(st) => self.plan_sys_table(*st),
57            // TODO: order is ignored in the subquery
58            Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
59            Relation::Join(join) => self.plan_join(*join),
60            Relation::Apply(join) => self.plan_apply(*join),
61            Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
62            Relation::Source(s) => self.plan_source(*s),
63            Relation::TableFunction {
64                expr: tf,
65                with_ordinality,
66            } => self.plan_table_function(tf, with_ordinality),
67            Relation::Watermark(tf) => self.plan_watermark(*tf),
68            Relation::Share(share) => self.plan_share(*share),
69            Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
70        }
71    }
72
73    pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
74        Ok(LogicalSysScan::create(
75            sys_table.sys_table_catalog,
76            self.ctx(),
77            Cardinality::unknown(), // TODO(card): cardinality of system table
78        )
79        .into())
80    }
81
82    pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
83        let as_of = base_table.as_of.clone();
84        let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
85
86        match base_table.table_catalog.engine {
87            Engine::Hummock => {
88                match as_of {
89                    None
90                    | Some(AsOf::ProcessTime)
91                    | Some(AsOf::TimestampNum(_))
92                    | Some(AsOf::TimestampString(_))
93                    | Some(AsOf::ProcessTimeWithInterval(_)) => {}
94                    Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
95                        bail_not_implemented!("As Of Version is not supported yet.")
96                    }
97                };
98                Ok(scan.into())
99            }
100            Engine::Iceberg => self.plan_iceberg_table(base_table, scan, as_of),
101        }
102    }
103
104    fn plan_iceberg_table(
105        &mut self,
106        base_table: &BoundBaseTable,
107        scan: LogicalScan,
108        as_of: Option<AsOf>,
109    ) -> Result<PlanRef> {
110        let is_append_only = base_table.table_catalog.append_only;
111        let iceberg_query_storage_mode = self
112            .ctx()
113            .session_ctx()
114            .config()
115            .iceberg_query_storage_mode();
116
117        enum PlanTarget {
118            TableScan,
119            Source,
120            IntermediateScan,
121        }
122        let plan_target = match self.plan_for() {
123            PlanFor::StreamIcebergEngineInternal => PlanTarget::TableScan,
124            PlanFor::BatchDql => match iceberg_query_storage_mode {
125                IcebergQueryStorageMode::Hummock => PlanTarget::TableScan,
126                _ => PlanTarget::IntermediateScan,
127            },
128            PlanFor::Stream => {
129                if is_append_only {
130                    PlanTarget::Source
131                } else {
132                    PlanTarget::TableScan
133                }
134            }
135            PlanFor::Batch => {
136                if is_append_only {
137                    PlanTarget::IntermediateScan
138                } else {
139                    PlanTarget::TableScan
140                }
141            }
142        };
143        match as_of {
144            None
145            | Some(AsOf::VersionNum(_))
146            | Some(AsOf::TimestampString(_))
147            | Some(AsOf::TimestampNum(_)) => {}
148            Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
149                bail_not_implemented!("As Of ProcessTime() is not supported yet.")
150            }
151            Some(AsOf::VersionString(_)) => {
152                bail_not_implemented!("As Of Version is not supported yet.")
153            }
154        }
155
156        if matches!(plan_target, PlanTarget::TableScan) {
157            return Ok(scan.into());
158        }
159
160        let source_catalog = self.get_iceberg_source_by_table_catalog(&base_table.table_catalog)
161            .ok_or_else(|| {
162                ErrorCode::BindError(format!(
163                    "failed to plan an iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
164                    base_table.table_catalog.name()
165                    ))
166            })?;
167
168        // Build type mapping: source column name → Hummock table type.
169        // This lets the intermediate scan output Hummock types directly,
170        // while the Iceberg scan path still adds explicit casts after
171        // materialization to match the expected output types.
172        let mut table_column_type_mapping = HashMap::new();
173        let table_column_map: HashMap<&str, &DataType> = base_table
174            .table_catalog
175            .columns
176            .iter()
177            .map(|c| (c.name.as_str(), &c.column_desc.data_type))
178            .collect();
179        for source_col in &source_catalog.columns {
180            let source_name = source_col.name();
181            let table_name = if source_name == RISINGWAVE_ICEBERG_ROW_ID {
182                ROW_ID_COLUMN_NAME
183            } else {
184                source_name
185            };
186            if let Some(&table_type) = table_column_map.get(table_name)
187                && source_col.column_desc.data_type != *table_type
188            {
189                table_column_type_mapping.insert(source_name.to_owned(), table_type.clone());
190            }
191        }
192
193        let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
194            .columns
195            .clone()
196            .into_iter()
197            .enumerate()
198            .map(|(i, column)| (column.name().to_owned(), (i, column)))
199            .collect();
200        // For intermediate scan, it will output Hummock types directly. But for source, it still
201        // outputs original types. So only when the plan target is source and the column type is
202        // different, we need to add cast.
203        let exprs = scan
204            .table()
205            .column_schema()
206            .fields()
207            .iter()
208            .map(|field| {
209                let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
210                    RISINGWAVE_ICEBERG_ROW_ID
211                } else {
212                    &field.name
213                };
214                if let Some((i, source_column)) = column_map.get(source_filed_name) {
215                    let input_type = &source_column.column_desc.data_type;
216                    if matches!(plan_target, PlanTarget::Source) && input_type != &field.data_type {
217                        let mut input_ref =
218                            ExprImpl::InputRef(InputRef::new(*i, input_type.clone()).into());
219                        FunctionCall::cast_mut(
220                            &mut input_ref,
221                            &field.data_type,
222                            CastContext::Explicit,
223                        )
224                        .unwrap();
225                        input_ref
226                    } else {
227                        ExprImpl::InputRef(InputRef::new(*i, field.data_type.clone()).into())
228                    }
229                } else {
230                    // fields like `_rw_timestamp`, would not be found in source.
231                    ExprImpl::Literal(Literal::new(None, field.data_type.clone()).into())
232                }
233            })
234            .collect_vec();
235
236        // Build source→table column index mapping for Hummock rewrite.
237        // Must be built before source_catalog is moved into Rc.
238        let table_col_index: HashMap<&str, usize> = base_table
239            .table_catalog
240            .columns
241            .iter()
242            .enumerate()
243            .map(|(i, c)| (c.name.as_str(), i))
244            .collect();
245        let source_to_table_mapping = ColIndexMapping::new(
246            source_catalog
247                .columns
248                .iter()
249                .map(|c| {
250                    let table_name = if c.name() == RISINGWAVE_ICEBERG_ROW_ID {
251                        ROW_ID_COLUMN_NAME
252                    } else {
253                        c.name()
254                    };
255                    table_col_index.get(table_name).copied()
256                })
257                .collect(),
258            base_table.table_catalog.columns.len(),
259        );
260
261        let logical_source = LogicalSource::with_catalog(
262            Rc::new(source_catalog),
263            SourceNodeKind::CreateMViewOrBatch,
264            self.ctx(),
265            as_of,
266        )?;
267        if matches!(plan_target, PlanTarget::Source) {
268            return Ok(LogicalProject::new(logical_source.into(), exprs).into());
269        }
270
271        let logical_iceberg_intermediate_scan = self.plan_iceberg_intermediate_scan(
272            &logical_source,
273            table_column_type_mapping,
274            source_to_table_mapping,
275        )?;
276        Ok(LogicalProject::new(logical_iceberg_intermediate_scan, exprs).into())
277    }
278
279    pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
280        if source.is_shareable_cdc_connector() {
281            Err(ErrorCode::InternalError(
282                "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
283            )
284            .into())
285        } else {
286            let as_of = source.as_of.clone();
287            match as_of {
288                None
289                | Some(AsOf::VersionNum(_))
290                | Some(AsOf::TimestampString(_))
291                | Some(AsOf::TimestampNum(_)) => {}
292                Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
293                    bail_not_implemented!("As Of ProcessTime() is not supported yet.")
294                }
295                Some(AsOf::VersionString(_)) => {
296                    bail_not_implemented!("As Of Version is not supported yet.")
297                }
298            }
299            let is_iceberg = source.catalog.is_iceberg_connector();
300
301            // validate the source has pk. We raise an error here to avoid panic in expect_stream_key later
302            // for a nicer error message.
303            if matches!(self.plan_for(), PlanFor::Stream) {
304                let has_pk =
305                    source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
306                if !has_pk {
307                    // in older version, iceberg source doesn't have row_id, thus may hit this
308                    // only iceberg should hit this.
309                    debug_assert!(is_iceberg);
310                    if is_iceberg {
311                        return Err(ErrorCode::BindError(format!(
312                        "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
313                        source.catalog
314                    ))
315                    .into());
316                    } else {
317                        return Err(ErrorCode::BindError(format!(
318                            "Cannot create a stream job from a source without a primary key.
319This is a bug. We would appreciate a bug report at:
320https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
321
322source: {:?}",
323                            source.catalog
324                        ))
325                        .into());
326                    }
327                }
328            }
329
330            let source = LogicalSource::with_catalog(
331                Rc::new(source.catalog),
332                SourceNodeKind::CreateMViewOrBatch,
333                self.ctx(),
334                as_of,
335            )?;
336            if is_iceberg && !matches!(self.plan_for(), PlanFor::Stream) {
337                let num_cols = source.core.column_catalog.len();
338                let intermediate_scan = self.plan_iceberg_intermediate_scan(
339                    &source,
340                    HashMap::new(),
341                    ColIndexMapping::identity(num_cols),
342                )?;
343                Ok(intermediate_scan)
344            } else {
345                Ok(source.into())
346            }
347        }
348    }
349
350    pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
351        let left = self.plan_relation(join.left)?;
352        let right = self.plan_relation(join.right)?;
353        let join_type = join.join_type;
354        let on_clause = join.cond;
355        if on_clause.has_subquery() {
356            bail_not_implemented!("Subquery in join on condition");
357        } else {
358            Ok(LogicalJoin::create(left, right, join_type, on_clause))
359        }
360    }
361
362    pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
363        let join_type = join.join_type;
364        let on_clause = join.cond;
365        if on_clause.has_subquery() {
366            bail_not_implemented!("Subquery in join on condition");
367        }
368
369        let correlated_id = self.ctx.next_correlated_id();
370        let correlated_indices = join
371            .right
372            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
373        let left = self.plan_relation(join.left)?;
374        let right = self.plan_relation(join.right)?;
375
376        Ok(LogicalApply::create(
377            left,
378            right,
379            join_type,
380            Condition::with_expr(on_clause),
381            correlated_id,
382            correlated_indices,
383            false,
384        ))
385    }
386
387    pub(super) fn plan_window_table_function(
388        &mut self,
389        table_function: BoundWindowTableFunction,
390    ) -> Result<PlanRef> {
391        use WindowTableFunctionKind::*;
392        match table_function.kind {
393            Tumble => self.plan_tumble_window(
394                table_function.input,
395                table_function.time_col,
396                table_function.args,
397            ),
398            Hop => self.plan_hop_window(
399                table_function.input,
400                table_function.time_col,
401                table_function.args,
402            ),
403        }
404    }
405
406    pub(super) fn plan_table_function(
407        &mut self,
408        table_function: ExprImpl,
409        with_ordinality: bool,
410    ) -> Result<PlanRef> {
411        // TODO: maybe we can unify LogicalTableFunction with LogicalValues
412        match table_function {
413            ExprImpl::TableFunction(tf) => {
414                Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
415            }
416            expr => {
417                let schema = Schema {
418                    // TODO: should be named
419                    fields: vec![Field::unnamed(expr.return_type())],
420                };
421                let expr_return_type = expr.return_type();
422                let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
423                let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
424                let mut exprs = if let DataType::Struct(st) = expr_return_type {
425                    st.iter()
426                        .enumerate()
427                        .map(|(i, (_, ty))| {
428                            let idx = ExprImpl::literal_int(i.try_into().unwrap());
429                            let args = vec![input_ref.clone(), idx];
430                            FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
431                        })
432                        .collect()
433                } else {
434                    vec![input_ref]
435                };
436                if with_ordinality {
437                    exprs.push(ExprImpl::literal_bigint(1));
438                }
439                Ok(LogicalProject::create(root, exprs))
440            }
441        }
442    }
443
444    pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
445        match share.input {
446            BoundShareInput::Query(query) => {
447                let id = share.share_id;
448                match self.share_cache.get(&id) {
449                    None => {
450                        let result = self.plan_query(query)?.into_unordered_subplan();
451                        let logical_share = LogicalShare::create(result);
452                        self.share_cache.insert(id, logical_share.clone());
453                        Ok(logical_share)
454                    }
455                    Some(result) => Ok(result.clone()),
456                }
457            }
458            BoundShareInput::ChangeLog(relation) => {
459                let id = share.share_id;
460                let result = self.plan_changelog(relation)?;
461                let logical_share = LogicalShare::create(result);
462                self.share_cache.insert(id, logical_share.clone());
463                Ok(logical_share)
464            }
465        }
466    }
467
468    pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
469        todo!("plan watermark");
470    }
471
472    pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
473        let input = self.plan_relation(gap_fill.input)?;
474        Ok(LogicalGapFill::new(
475            input,
476            gap_fill.time_col,
477            gap_fill.interval,
478            gap_fill.fill_strategies,
479        )
480        .into())
481    }
482
483    fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
484        let col_data_types = match relation {
485            Relation::Source(s) => s
486                .catalog
487                .columns
488                .iter()
489                .map(|col| col.data_type().clone())
490                .collect(),
491            Relation::BaseTable(t) => t
492                .table_catalog
493                .columns
494                .iter()
495                .map(|col| col.data_type().clone())
496                .collect(),
497            Relation::Subquery(q) => q.query.schema().data_types(),
498            Relation::Share(share) => share
499                .input
500                .fields()?
501                .into_iter()
502                .map(|(_, f)| f.data_type)
503                .collect(),
504            r => {
505                return Err(ErrorCode::BindError(format!(
506                    "Invalid input relation to tumble: {r:?}"
507                ))
508                .into());
509            }
510        };
511        Ok(col_data_types)
512    }
513
514    fn plan_tumble_window(
515        &mut self,
516        input: Relation,
517        time_col: InputRef,
518        args: Vec<ExprImpl>,
519    ) -> Result<PlanRef> {
520        let mut args = args.into_iter();
521        let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
522
523        match (args.next(), args.next(), args.next()) {
524            (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
525                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
526                for (idx, col_dt) in col_data_types.iter().enumerate() {
527                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
528                }
529                let window_start: ExprImpl = FunctionCall::new(
530                    ExprType::TumbleStart,
531                    vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
532                )?
533                .into();
534                // TODO: `window_end` may be optimized to avoid double calculation of
535                // `tumble_start`, or we can depends on common expression
536                // optimization.
537                let window_end =
538                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
539                        .into();
540                exprs.push(window_start);
541                exprs.push(window_end);
542                let base = self.plan_relation(input)?;
543                let project = LogicalProject::create(base, exprs);
544                Ok(project)
545            }
546            (
547                Some(window_size @ ExprImpl::Literal(_)),
548                Some(window_offset @ ExprImpl::Literal(_)),
549                None,
550            ) => {
551                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
552                for (idx, col_dt) in col_data_types.iter().enumerate() {
553                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
554                }
555                let window_start: ExprImpl = FunctionCall::new(
556                    ExprType::TumbleStart,
557                    vec![
558                        ExprImpl::InputRef(Box::new(time_col)),
559                        window_size.clone(),
560                        window_offset,
561                    ],
562                )?
563                .into();
564                // TODO: `window_end` may be optimized to avoid double calculation of
565                // `tumble_start`, or we can depends on common expression
566                // optimization.
567                let window_end =
568                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
569                        .into();
570                exprs.push(window_start);
571                exprs.push(window_end);
572                let base = self.plan_relation(input)?;
573                let project = LogicalProject::create(base, exprs);
574                Ok(project)
575            }
576            _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
577        }
578    }
579
580    fn plan_hop_window(
581        &mut self,
582        input: Relation,
583        time_col: InputRef,
584        args: Vec<ExprImpl>,
585    ) -> Result<PlanRef> {
586        let input = self.plan_relation(input)?;
587        let mut args = args.into_iter();
588        let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
589            args.next_tuple()
590        else {
591            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
592        };
593
594        let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
595            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
596        };
597        let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
598            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
599        };
600
601        let window_offset = match (args.next(), args.next()) {
602            (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
603                Some(ScalarImpl::Interval(window_offset)) => window_offset,
604                _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
605            },
606            (None, None) => Interval::from_month_day_usec(0, 0, 0),
607            _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
608        };
609
610        if !window_size.is_positive() || !window_slide.is_positive() {
611            return Err(ErrorCode::BindError(format!(
612                "window_size {} and window_slide {} must be positive",
613                window_size, window_slide
614            ))
615            .into());
616        }
617
618        if window_size.exact_div(&window_slide).is_none() {
619            return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
620        }
621
622        Ok(LogicalHopWindow::create(
623            input,
624            time_col,
625            window_slide,
626            window_size,
627            window_offset,
628        ))
629    }
630
631    fn plan_iceberg_intermediate_scan(
632        &self,
633        source: &LogicalSource,
634        table_column_type_mapping: HashMap<String, DataType>,
635        source_to_table_mapping: ColIndexMapping,
636    ) -> Result<PlanRef> {
637        // If time travel is not specified, we use current timestamp to get the latest snapshot
638        let timezone = self.ctx().get_session_timezone();
639        let mut time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
640        if time_travel_info.is_none() {
641            time_travel_info = self
642                .fetch_current_snapshot_id(source)?
643                .map(IcebergTimeTravelInfo::Version);
644        }
645        let Some(time_travel_info) = time_travel_info else {
646            let mut schema = source.schema().clone();
647            for field in &mut schema.fields {
648                if let Some(target_type) = table_column_type_mapping.get(&field.name) {
649                    field.data_type = target_type.clone();
650                }
651            }
652            return Ok(LogicalValues::new(vec![], schema, self.ctx()).into());
653        };
654        let intermediate_scan = LogicalIcebergIntermediateScan::new(
655            source,
656            time_travel_info,
657            table_column_type_mapping,
658            source_to_table_mapping,
659        );
660        Ok(intermediate_scan.into())
661    }
662
663    fn fetch_current_snapshot_id(&self, source: &LogicalSource) -> Result<Option<i64>> {
664        let mut map = self.ctx.iceberg_snapshot_id_map();
665        let catalog = source.source_catalog().ok_or_else(|| {
666            crate::error::ErrorCode::InternalError(
667                "Iceberg source must have a valid source catalog".to_owned(),
668            )
669        })?;
670        let name = catalog.name.as_str();
671        if let Some(&snapshot_id) = map.get(name) {
672            return Ok(snapshot_id);
673        }
674
675        #[cfg(madsim)]
676        return Err(crate::error::ErrorCode::BindError(
677            "iceberg source time travel can't be used in the madsim mode".to_string(),
678        )
679        .into());
680
681        #[cfg(not(madsim))]
682        {
683            let ConnectorProperties::Iceberg(prop) =
684                ConnectorProperties::extract(catalog.with_properties.clone(), false)?
685            else {
686                return Err(crate::error::ErrorCode::InternalError(
687                    "Iceberg source must have Iceberg connector properties".to_owned(),
688                )
689                .into());
690            };
691
692            let snapshot_id = tokio::task::block_in_place(|| {
693                crate::utils::FRONTEND_RUNTIME.block_on(async {
694                    prop.load_table()
695                        .await
696                        .map(|table| table.metadata().current_snapshot_id())
697                })
698            })?;
699            map.insert(name.to_owned(), snapshot_id);
700            Ok(snapshot_id)
701        }
702    }
703
704    fn get_iceberg_source_by_table_catalog(
705        &self,
706        table_catalog: &TableCatalog,
707    ) -> Option<SourceCatalog> {
708        let catalog_reader = self.ctx.session_ctx().env().catalog_reader().read_guard();
709
710        let iceberg_source_name = table_catalog.iceberg_source_name()?;
711        let schema = catalog_reader
712            .get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)
713            .ok()?;
714        let source_catalog = schema.get_source_by_name(&iceberg_source_name)?;
715        Some(source_catalog.deref().clone())
716    }
717}