risingwave_frontend/planner/
relation.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use itertools::Itertools;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{
22    ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::session_config::IcebergQueryStorageMode;
25use risingwave_common::types::{DataType, Interval, ScalarImpl};
26use risingwave_connector::source::ConnectorProperties;
27use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
28use risingwave_sqlparser::ast::AsOf;
29
30use crate::TableCatalog;
31use crate::binder::{
32    BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput, BoundSource,
33    BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
34};
35use crate::catalog::source_catalog::SourceCatalog;
36use crate::error::{ErrorCode, Result};
37use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
38use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind};
39use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
40use crate::optimizer::plan_node::{
41    LogicalApply, LogicalGapFill, LogicalHopWindow, LogicalIcebergIntermediateScan, LogicalJoin,
42    LogicalPlanRef as PlanRef, LogicalProject, LogicalScan, LogicalShare, LogicalSource,
43    LogicalSysScan, LogicalTableFunction, LogicalValues,
44};
45use crate::optimizer::property::Cardinality;
46use crate::planner::{PlanFor, Planner};
47use crate::utils::{ColIndexMapping, Condition};
48
49const ERROR_WINDOW_SIZE_ARG: &str =
50    "The size arg of window table function should be an interval literal.";
51
52impl Planner {
53    pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
54        match relation {
55            Relation::BaseTable(t) => self.plan_base_table(&t),
56            Relation::SystemTable(st) => self.plan_sys_table(*st),
57            // TODO: order is ignored in the subquery
58            Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
59            Relation::Join(join) => self.plan_join(*join),
60            Relation::Apply(join) => self.plan_apply(*join),
61            Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
62            Relation::Source(s) => self.plan_source(*s),
63            Relation::TableFunction {
64                expr: tf,
65                with_ordinality,
66            } => self.plan_table_function(tf, with_ordinality),
67            Relation::Watermark(tf) => self.plan_watermark(*tf),
68            Relation::Share(share) => self.plan_share(*share),
69            Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
70        }
71    }
72
73    pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
74        Ok(LogicalSysScan::create(
75            sys_table.sys_table_catalog,
76            self.ctx(),
77            Cardinality::unknown(), // TODO(card): cardinality of system table
78        )
79        .into())
80    }
81
82    pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
83        let as_of = base_table.as_of.clone();
84        let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
85
86        match base_table.table_catalog.engine {
87            Engine::Hummock => {
88                match as_of {
89                    None
90                    | Some(AsOf::ProcessTime)
91                    | Some(AsOf::TimestampNum(_))
92                    | Some(AsOf::TimestampString(_))
93                    | Some(AsOf::ProcessTimeWithInterval(_)) => {}
94                    Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
95                        bail_not_implemented!("As Of Version is not supported yet.")
96                    }
97                };
98                Ok(scan.into())
99            }
100            Engine::Iceberg => self.plan_iceberg_table(base_table, scan, as_of),
101        }
102    }
103
104    fn plan_iceberg_table(
105        &mut self,
106        base_table: &BoundBaseTable,
107        scan: LogicalScan,
108        as_of: Option<AsOf>,
109    ) -> Result<PlanRef> {
110        let is_append_only = base_table.table_catalog.append_only;
111        let iceberg_query_storage_mode = self
112            .ctx()
113            .session_ctx()
114            .config()
115            .iceberg_query_storage_mode();
116
117        enum PlanTarget {
118            TableScan,
119            Source,
120            IntermediateScan,
121        }
122        let plan_target = match self.plan_for() {
123            PlanFor::StreamIcebergEngineInternal => PlanTarget::TableScan,
124            PlanFor::BatchDql => match iceberg_query_storage_mode {
125                IcebergQueryStorageMode::Hummock => PlanTarget::TableScan,
126                _ => PlanTarget::IntermediateScan,
127            },
128            PlanFor::Stream => {
129                if is_append_only {
130                    PlanTarget::Source
131                } else {
132                    PlanTarget::TableScan
133                }
134            }
135            PlanFor::Batch => {
136                if is_append_only {
137                    PlanTarget::IntermediateScan
138                } else {
139                    PlanTarget::TableScan
140                }
141            }
142        };
143        match as_of {
144            None
145            | Some(AsOf::VersionNum(_))
146            | Some(AsOf::TimestampString(_))
147            | Some(AsOf::TimestampNum(_)) => {}
148            Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
149                bail_not_implemented!("As Of ProcessTime() is not supported yet.")
150            }
151            Some(AsOf::VersionString(_)) => {
152                bail_not_implemented!("As Of Version is not supported yet.")
153            }
154        }
155
156        if matches!(plan_target, PlanTarget::TableScan) {
157            return Ok(scan.into());
158        }
159
160        let source_catalog = self.get_iceberg_source_by_table_catalog(&base_table.table_catalog)
161            .ok_or_else(|| {
162                ErrorCode::BindError(format!(
163                    "failed to plan an iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
164                    base_table.table_catalog.name()
165                    ))
166            })?;
167
168        // Build type mapping: source column name → Hummock table type.
169        // This lets the intermediate scan output Hummock types directly,
170        // while the Iceberg scan path still adds explicit casts after
171        // materialization to match the expected output types.
172        let mut table_column_type_mapping = HashMap::new();
173        let table_column_map: HashMap<&str, &DataType> = base_table
174            .table_catalog
175            .columns
176            .iter()
177            .map(|c| (c.name.as_str(), &c.column_desc.data_type))
178            .collect();
179        for source_col in &source_catalog.columns {
180            let source_name = source_col.name();
181            let table_name = if source_name == RISINGWAVE_ICEBERG_ROW_ID {
182                ROW_ID_COLUMN_NAME
183            } else {
184                source_name
185            };
186            if let Some(&table_type) = table_column_map.get(table_name)
187                && source_col.column_desc.data_type != *table_type
188            {
189                table_column_type_mapping.insert(source_name.to_owned(), table_type.clone());
190            }
191        }
192
193        let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
194            .columns
195            .clone()
196            .into_iter()
197            .enumerate()
198            .map(|(i, column)| (column.name().to_owned(), (i, column)))
199            .collect();
200        // For intermediate scan, it will output Hummock types directly. But for source, it still
201        // outputs original types. So only when the plan target is source and the column type is
202        // different, we need to add cast.
203        let exprs = scan
204            .table()
205            .column_schema()
206            .fields()
207            .iter()
208            .map(|field| {
209                let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
210                    RISINGWAVE_ICEBERG_ROW_ID
211                } else {
212                    &field.name
213                };
214                if let Some((i, source_column)) = column_map.get(source_filed_name) {
215                    let mut input_ref =
216                        ExprImpl::InputRef(InputRef::new(*i, field.data_type.clone()).into());
217                    if matches!(plan_target, PlanTarget::Source)
218                        && source_column.column_desc.data_type != field.data_type
219                    {
220                        FunctionCall::cast_mut(
221                            &mut input_ref,
222                            &field.data_type(),
223                            CastContext::Explicit,
224                        )
225                        .unwrap();
226                    }
227                    input_ref
228                } else {
229                    // fields like `_rw_timestamp`, would not be found in source.
230                    ExprImpl::Literal(Literal::new(None, field.data_type.clone()).into())
231                }
232            })
233            .collect_vec();
234
235        // Build source→table column index mapping for Hummock rewrite.
236        // Must be built before source_catalog is moved into Rc.
237        let table_col_index: HashMap<&str, usize> = base_table
238            .table_catalog
239            .columns
240            .iter()
241            .enumerate()
242            .map(|(i, c)| (c.name.as_str(), i))
243            .collect();
244        let source_to_table_mapping = ColIndexMapping::new(
245            source_catalog
246                .columns
247                .iter()
248                .map(|c| {
249                    let table_name = if c.name() == RISINGWAVE_ICEBERG_ROW_ID {
250                        ROW_ID_COLUMN_NAME
251                    } else {
252                        c.name()
253                    };
254                    table_col_index.get(table_name).copied()
255                })
256                .collect(),
257            base_table.table_catalog.columns.len(),
258        );
259
260        let logical_source = LogicalSource::with_catalog(
261            Rc::new(source_catalog),
262            SourceNodeKind::CreateMViewOrBatch,
263            self.ctx(),
264            as_of,
265        )?;
266        if matches!(plan_target, PlanTarget::Source) {
267            return Ok(LogicalProject::new(logical_source.into(), exprs).into());
268        }
269
270        let logical_iceberg_intermediate_scan = self.plan_iceberg_intermediate_scan(
271            &logical_source,
272            table_column_type_mapping,
273            source_to_table_mapping,
274        )?;
275        Ok(LogicalProject::new(logical_iceberg_intermediate_scan, exprs).into())
276    }
277
278    pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
279        if source.is_shareable_cdc_connector() {
280            Err(ErrorCode::InternalError(
281                "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
282            )
283            .into())
284        } else {
285            let as_of = source.as_of.clone();
286            match as_of {
287                None
288                | Some(AsOf::VersionNum(_))
289                | Some(AsOf::TimestampString(_))
290                | Some(AsOf::TimestampNum(_)) => {}
291                Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
292                    bail_not_implemented!("As Of ProcessTime() is not supported yet.")
293                }
294                Some(AsOf::VersionString(_)) => {
295                    bail_not_implemented!("As Of Version is not supported yet.")
296                }
297            }
298            let is_iceberg = source.catalog.is_iceberg_connector();
299
300            // validate the source has pk. We raise an error here to avoid panic in expect_stream_key later
301            // for a nicer error message.
302            if matches!(self.plan_for(), PlanFor::Stream) {
303                let has_pk =
304                    source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
305                if !has_pk {
306                    // in older version, iceberg source doesn't have row_id, thus may hit this
307                    // only iceberg should hit this.
308                    debug_assert!(is_iceberg);
309                    if is_iceberg {
310                        return Err(ErrorCode::BindError(format!(
311                        "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
312                        source.catalog
313                    ))
314                    .into());
315                    } else {
316                        return Err(ErrorCode::BindError(format!(
317                            "Cannot create a stream job from a source without a primary key.
318This is a bug. We would appreciate a bug report at:
319https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
320
321source: {:?}",
322                            source.catalog
323                        ))
324                        .into());
325                    }
326                }
327            }
328
329            let source = LogicalSource::with_catalog(
330                Rc::new(source.catalog),
331                SourceNodeKind::CreateMViewOrBatch,
332                self.ctx(),
333                as_of,
334            )?;
335            if is_iceberg && !matches!(self.plan_for(), PlanFor::Stream) {
336                let num_cols = source.core.column_catalog.len();
337                let intermediate_scan = self.plan_iceberg_intermediate_scan(
338                    &source,
339                    HashMap::new(),
340                    ColIndexMapping::identity(num_cols),
341                )?;
342                Ok(intermediate_scan)
343            } else {
344                Ok(source.into())
345            }
346        }
347    }
348
349    pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
350        let left = self.plan_relation(join.left)?;
351        let right = self.plan_relation(join.right)?;
352        let join_type = join.join_type;
353        let on_clause = join.cond;
354        if on_clause.has_subquery() {
355            bail_not_implemented!("Subquery in join on condition");
356        } else {
357            Ok(LogicalJoin::create(left, right, join_type, on_clause))
358        }
359    }
360
361    pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
362        let join_type = join.join_type;
363        let on_clause = join.cond;
364        if on_clause.has_subquery() {
365            bail_not_implemented!("Subquery in join on condition");
366        }
367
368        let correlated_id = self.ctx.next_correlated_id();
369        let correlated_indices = join
370            .right
371            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
372        let left = self.plan_relation(join.left)?;
373        let right = self.plan_relation(join.right)?;
374
375        Ok(LogicalApply::create(
376            left,
377            right,
378            join_type,
379            Condition::with_expr(on_clause),
380            correlated_id,
381            correlated_indices,
382            false,
383        ))
384    }
385
386    pub(super) fn plan_window_table_function(
387        &mut self,
388        table_function: BoundWindowTableFunction,
389    ) -> Result<PlanRef> {
390        use WindowTableFunctionKind::*;
391        match table_function.kind {
392            Tumble => self.plan_tumble_window(
393                table_function.input,
394                table_function.time_col,
395                table_function.args,
396            ),
397            Hop => self.plan_hop_window(
398                table_function.input,
399                table_function.time_col,
400                table_function.args,
401            ),
402        }
403    }
404
405    pub(super) fn plan_table_function(
406        &mut self,
407        table_function: ExprImpl,
408        with_ordinality: bool,
409    ) -> Result<PlanRef> {
410        // TODO: maybe we can unify LogicalTableFunction with LogicalValues
411        match table_function {
412            ExprImpl::TableFunction(tf) => {
413                Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
414            }
415            expr => {
416                let schema = Schema {
417                    // TODO: should be named
418                    fields: vec![Field::unnamed(expr.return_type())],
419                };
420                let expr_return_type = expr.return_type();
421                let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
422                let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
423                let mut exprs = if let DataType::Struct(st) = expr_return_type {
424                    st.iter()
425                        .enumerate()
426                        .map(|(i, (_, ty))| {
427                            let idx = ExprImpl::literal_int(i.try_into().unwrap());
428                            let args = vec![input_ref.clone(), idx];
429                            FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
430                        })
431                        .collect()
432                } else {
433                    vec![input_ref]
434                };
435                if with_ordinality {
436                    exprs.push(ExprImpl::literal_bigint(1));
437                }
438                Ok(LogicalProject::create(root, exprs))
439            }
440        }
441    }
442
443    pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
444        match share.input {
445            BoundShareInput::Query(query) => {
446                let id = share.share_id;
447                match self.share_cache.get(&id) {
448                    None => {
449                        let result = self.plan_query(query)?.into_unordered_subplan();
450                        let logical_share = LogicalShare::create(result);
451                        self.share_cache.insert(id, logical_share.clone());
452                        Ok(logical_share)
453                    }
454                    Some(result) => Ok(result.clone()),
455                }
456            }
457            BoundShareInput::ChangeLog(relation) => {
458                let id = share.share_id;
459                let result = self.plan_changelog(relation)?;
460                let logical_share = LogicalShare::create(result);
461                self.share_cache.insert(id, logical_share.clone());
462                Ok(logical_share)
463            }
464        }
465    }
466
467    pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
468        todo!("plan watermark");
469    }
470
471    pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
472        let input = self.plan_relation(gap_fill.input)?;
473        Ok(LogicalGapFill::new(
474            input,
475            gap_fill.time_col,
476            gap_fill.interval,
477            gap_fill.fill_strategies,
478        )
479        .into())
480    }
481
482    fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
483        let col_data_types = match relation {
484            Relation::Source(s) => s
485                .catalog
486                .columns
487                .iter()
488                .map(|col| col.data_type().clone())
489                .collect(),
490            Relation::BaseTable(t) => t
491                .table_catalog
492                .columns
493                .iter()
494                .map(|col| col.data_type().clone())
495                .collect(),
496            Relation::Subquery(q) => q.query.schema().data_types(),
497            Relation::Share(share) => share
498                .input
499                .fields()?
500                .into_iter()
501                .map(|(_, f)| f.data_type)
502                .collect(),
503            r => {
504                return Err(ErrorCode::BindError(format!(
505                    "Invalid input relation to tumble: {r:?}"
506                ))
507                .into());
508            }
509        };
510        Ok(col_data_types)
511    }
512
513    fn plan_tumble_window(
514        &mut self,
515        input: Relation,
516        time_col: InputRef,
517        args: Vec<ExprImpl>,
518    ) -> Result<PlanRef> {
519        let mut args = args.into_iter();
520        let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
521
522        match (args.next(), args.next(), args.next()) {
523            (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
524                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
525                for (idx, col_dt) in col_data_types.iter().enumerate() {
526                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
527                }
528                let window_start: ExprImpl = FunctionCall::new(
529                    ExprType::TumbleStart,
530                    vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
531                )?
532                .into();
533                // TODO: `window_end` may be optimized to avoid double calculation of
534                // `tumble_start`, or we can depends on common expression
535                // optimization.
536                let window_end =
537                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
538                        .into();
539                exprs.push(window_start);
540                exprs.push(window_end);
541                let base = self.plan_relation(input)?;
542                let project = LogicalProject::create(base, exprs);
543                Ok(project)
544            }
545            (
546                Some(window_size @ ExprImpl::Literal(_)),
547                Some(window_offset @ ExprImpl::Literal(_)),
548                None,
549            ) => {
550                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
551                for (idx, col_dt) in col_data_types.iter().enumerate() {
552                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
553                }
554                let window_start: ExprImpl = FunctionCall::new(
555                    ExprType::TumbleStart,
556                    vec![
557                        ExprImpl::InputRef(Box::new(time_col)),
558                        window_size.clone(),
559                        window_offset,
560                    ],
561                )?
562                .into();
563                // TODO: `window_end` may be optimized to avoid double calculation of
564                // `tumble_start`, or we can depends on common expression
565                // optimization.
566                let window_end =
567                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
568                        .into();
569                exprs.push(window_start);
570                exprs.push(window_end);
571                let base = self.plan_relation(input)?;
572                let project = LogicalProject::create(base, exprs);
573                Ok(project)
574            }
575            _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
576        }
577    }
578
579    fn plan_hop_window(
580        &mut self,
581        input: Relation,
582        time_col: InputRef,
583        args: Vec<ExprImpl>,
584    ) -> Result<PlanRef> {
585        let input = self.plan_relation(input)?;
586        let mut args = args.into_iter();
587        let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
588            args.next_tuple()
589        else {
590            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
591        };
592
593        let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
594            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
595        };
596        let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
597            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
598        };
599
600        let window_offset = match (args.next(), args.next()) {
601            (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
602                Some(ScalarImpl::Interval(window_offset)) => window_offset,
603                _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
604            },
605            (None, None) => Interval::from_month_day_usec(0, 0, 0),
606            _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
607        };
608
609        if !window_size.is_positive() || !window_slide.is_positive() {
610            return Err(ErrorCode::BindError(format!(
611                "window_size {} and window_slide {} must be positive",
612                window_size, window_slide
613            ))
614            .into());
615        }
616
617        if window_size.exact_div(&window_slide).is_none() {
618            return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
619        }
620
621        Ok(LogicalHopWindow::create(
622            input,
623            time_col,
624            window_slide,
625            window_size,
626            window_offset,
627        ))
628    }
629
630    fn plan_iceberg_intermediate_scan(
631        &self,
632        source: &LogicalSource,
633        table_column_type_mapping: HashMap<String, DataType>,
634        source_to_table_mapping: ColIndexMapping,
635    ) -> Result<PlanRef> {
636        // If time travel is not specified, we use current timestamp to get the latest snapshot
637        let timezone = self.ctx().get_session_timezone();
638        let mut time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
639        if time_travel_info.is_none() {
640            time_travel_info = self
641                .fetch_current_snapshot_id(source)?
642                .map(IcebergTimeTravelInfo::Version);
643        }
644        let Some(time_travel_info) = time_travel_info else {
645            let mut schema = source.schema().clone();
646            for field in &mut schema.fields {
647                if let Some(target_type) = table_column_type_mapping.get(&field.name) {
648                    field.data_type = target_type.clone();
649                }
650            }
651            return Ok(LogicalValues::new(vec![], schema, self.ctx()).into());
652        };
653        let intermediate_scan = LogicalIcebergIntermediateScan::new(
654            source,
655            time_travel_info,
656            table_column_type_mapping,
657            source_to_table_mapping,
658        );
659        Ok(intermediate_scan.into())
660    }
661
662    fn fetch_current_snapshot_id(&self, source: &LogicalSource) -> Result<Option<i64>> {
663        let mut map = self.ctx.iceberg_snapshot_id_map();
664        let catalog = source.source_catalog().ok_or_else(|| {
665            crate::error::ErrorCode::InternalError(
666                "Iceberg source must have a valid source catalog".to_owned(),
667            )
668        })?;
669        let name = catalog.name.as_str();
670        if let Some(&snapshot_id) = map.get(name) {
671            return Ok(snapshot_id);
672        }
673
674        #[cfg(madsim)]
675        return Err(crate::error::ErrorCode::BindError(
676            "iceberg source time travel can't be used in the madsim mode".to_string(),
677        )
678        .into());
679
680        #[cfg(not(madsim))]
681        {
682            let ConnectorProperties::Iceberg(prop) =
683                ConnectorProperties::extract(catalog.with_properties.clone(), false)?
684            else {
685                return Err(crate::error::ErrorCode::InternalError(
686                    "Iceberg source must have Iceberg connector properties".to_owned(),
687                )
688                .into());
689            };
690
691            let snapshot_id = tokio::task::block_in_place(|| {
692                crate::utils::FRONTEND_RUNTIME.block_on(async {
693                    prop.load_table()
694                        .await
695                        .map(|table| table.metadata().current_snapshot_id())
696                })
697            })?;
698            map.insert(name.to_owned(), snapshot_id);
699            Ok(snapshot_id)
700        }
701    }
702
703    fn get_iceberg_source_by_table_catalog(
704        &self,
705        table_catalog: &TableCatalog,
706    ) -> Option<SourceCatalog> {
707        let catalog_reader = self.ctx.session_ctx().env().catalog_reader().read_guard();
708
709        let iceberg_source_name = table_catalog.iceberg_source_name()?;
710        let schema = catalog_reader
711            .get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)
712            .ok()?;
713        let source_catalog = schema.get_source_by_name(&iceberg_source_name)?;
714        Some(source_catalog.deref().clone())
715    }
716}