1use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use itertools::Itertools;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{
22 ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::session_config::IcebergQueryStorageMode;
25use risingwave_common::types::{DataType, Interval, ScalarImpl};
26use risingwave_connector::source::ConnectorProperties;
27use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
28use risingwave_sqlparser::ast::AsOf;
29
30use crate::TableCatalog;
31use crate::binder::{
32 BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput, BoundSource,
33 BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
34};
35use crate::catalog::source_catalog::SourceCatalog;
36use crate::error::{ErrorCode, Result};
37use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
38use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind};
39use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
40use crate::optimizer::plan_node::{
41 LogicalApply, LogicalGapFill, LogicalHopWindow, LogicalIcebergIntermediateScan, LogicalJoin,
42 LogicalPlanRef as PlanRef, LogicalProject, LogicalScan, LogicalShare, LogicalSource,
43 LogicalSysScan, LogicalTableFunction, LogicalValues,
44};
45use crate::optimizer::property::Cardinality;
46use crate::planner::{PlanFor, Planner};
47use crate::utils::{ColIndexMapping, Condition};
48
49const ERROR_WINDOW_SIZE_ARG: &str =
50 "The size arg of window table function should be an interval literal.";
51
52impl Planner {
53 pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
54 match relation {
55 Relation::BaseTable(t) => self.plan_base_table(&t),
56 Relation::SystemTable(st) => self.plan_sys_table(*st),
57 Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
59 Relation::Join(join) => self.plan_join(*join),
60 Relation::Apply(join) => self.plan_apply(*join),
61 Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
62 Relation::Source(s) => self.plan_source(*s),
63 Relation::TableFunction {
64 expr: tf,
65 with_ordinality,
66 } => self.plan_table_function(tf, with_ordinality),
67 Relation::Watermark(tf) => self.plan_watermark(*tf),
68 Relation::Share(share) => self.plan_share(*share),
69 Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
70 }
71 }
72
73 pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
74 Ok(LogicalSysScan::create(
75 sys_table.sys_table_catalog,
76 self.ctx(),
77 Cardinality::unknown(), )
79 .into())
80 }
81
82 pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
83 let as_of = base_table.as_of.clone();
84 let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
85
86 match base_table.table_catalog.engine {
87 Engine::Hummock => {
88 match as_of {
89 None
90 | Some(AsOf::ProcessTime)
91 | Some(AsOf::TimestampNum(_))
92 | Some(AsOf::TimestampString(_))
93 | Some(AsOf::ProcessTimeWithInterval(_)) => {}
94 Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
95 bail_not_implemented!("As Of Version is not supported yet.")
96 }
97 };
98 Ok(scan.into())
99 }
100 Engine::Iceberg => self.plan_iceberg_table(base_table, scan, as_of),
101 }
102 }
103
104 fn plan_iceberg_table(
105 &mut self,
106 base_table: &BoundBaseTable,
107 scan: LogicalScan,
108 as_of: Option<AsOf>,
109 ) -> Result<PlanRef> {
110 let is_append_only = base_table.table_catalog.append_only;
111 let iceberg_query_storage_mode = self
112 .ctx()
113 .session_ctx()
114 .config()
115 .iceberg_query_storage_mode();
116
117 enum PlanTarget {
118 TableScan,
119 Source,
120 IntermediateScan,
121 }
122 let plan_target = match self.plan_for() {
123 PlanFor::StreamIcebergEngineInternal => PlanTarget::TableScan,
124 PlanFor::BatchDql => match iceberg_query_storage_mode {
125 IcebergQueryStorageMode::Hummock => PlanTarget::TableScan,
126 _ => PlanTarget::IntermediateScan,
127 },
128 PlanFor::Stream => {
129 if is_append_only {
130 PlanTarget::Source
131 } else {
132 PlanTarget::TableScan
133 }
134 }
135 PlanFor::Batch => {
136 if is_append_only {
137 PlanTarget::IntermediateScan
138 } else {
139 PlanTarget::TableScan
140 }
141 }
142 };
143 match as_of {
144 None
145 | Some(AsOf::VersionNum(_))
146 | Some(AsOf::TimestampString(_))
147 | Some(AsOf::TimestampNum(_)) => {}
148 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
149 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
150 }
151 Some(AsOf::VersionString(_)) => {
152 bail_not_implemented!("As Of Version is not supported yet.")
153 }
154 }
155
156 if matches!(plan_target, PlanTarget::TableScan) {
157 return Ok(scan.into());
158 }
159
160 let source_catalog = self.get_iceberg_source_by_table_catalog(&base_table.table_catalog)
161 .ok_or_else(|| {
162 ErrorCode::BindError(format!(
163 "failed to plan an iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
164 base_table.table_catalog.name()
165 ))
166 })?;
167
168 let mut table_column_type_mapping = HashMap::new();
173 let table_column_map: HashMap<&str, &DataType> = base_table
174 .table_catalog
175 .columns
176 .iter()
177 .map(|c| (c.name.as_str(), &c.column_desc.data_type))
178 .collect();
179 for source_col in &source_catalog.columns {
180 let source_name = source_col.name();
181 let table_name = if source_name == RISINGWAVE_ICEBERG_ROW_ID {
182 ROW_ID_COLUMN_NAME
183 } else {
184 source_name
185 };
186 if let Some(&table_type) = table_column_map.get(table_name)
187 && source_col.column_desc.data_type != *table_type
188 {
189 table_column_type_mapping.insert(source_name.to_owned(), table_type.clone());
190 }
191 }
192
193 let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
194 .columns
195 .clone()
196 .into_iter()
197 .enumerate()
198 .map(|(i, column)| (column.name().to_owned(), (i, column)))
199 .collect();
200 let exprs = scan
204 .table()
205 .column_schema()
206 .fields()
207 .iter()
208 .map(|field| {
209 let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
210 RISINGWAVE_ICEBERG_ROW_ID
211 } else {
212 &field.name
213 };
214 if let Some((i, source_column)) = column_map.get(source_filed_name) {
215 let input_type = &source_column.column_desc.data_type;
216 if matches!(plan_target, PlanTarget::Source) && input_type != &field.data_type {
217 let mut input_ref =
218 ExprImpl::InputRef(InputRef::new(*i, input_type.clone()).into());
219 FunctionCall::cast_mut(
220 &mut input_ref,
221 &field.data_type,
222 CastContext::Explicit,
223 )
224 .unwrap();
225 input_ref
226 } else {
227 ExprImpl::InputRef(InputRef::new(*i, field.data_type.clone()).into())
228 }
229 } else {
230 ExprImpl::Literal(Literal::new(None, field.data_type.clone()).into())
232 }
233 })
234 .collect_vec();
235
236 let table_col_index: HashMap<&str, usize> = base_table
239 .table_catalog
240 .columns
241 .iter()
242 .enumerate()
243 .map(|(i, c)| (c.name.as_str(), i))
244 .collect();
245 let source_to_table_mapping = ColIndexMapping::new(
246 source_catalog
247 .columns
248 .iter()
249 .map(|c| {
250 let table_name = if c.name() == RISINGWAVE_ICEBERG_ROW_ID {
251 ROW_ID_COLUMN_NAME
252 } else {
253 c.name()
254 };
255 table_col_index.get(table_name).copied()
256 })
257 .collect(),
258 base_table.table_catalog.columns.len(),
259 );
260
261 let logical_source = LogicalSource::with_catalog(
262 Rc::new(source_catalog),
263 SourceNodeKind::CreateMViewOrBatch,
264 self.ctx(),
265 as_of,
266 )?;
267 if matches!(plan_target, PlanTarget::Source) {
268 return Ok(LogicalProject::new(logical_source.into(), exprs).into());
269 }
270
271 let logical_iceberg_intermediate_scan = self.plan_iceberg_intermediate_scan(
272 &logical_source,
273 table_column_type_mapping,
274 source_to_table_mapping,
275 )?;
276 Ok(LogicalProject::new(logical_iceberg_intermediate_scan, exprs).into())
277 }
278
279 pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
280 if source.is_shareable_cdc_connector() {
281 Err(ErrorCode::InternalError(
282 "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
283 )
284 .into())
285 } else {
286 let as_of = source.as_of.clone();
287 match as_of {
288 None
289 | Some(AsOf::VersionNum(_))
290 | Some(AsOf::TimestampString(_))
291 | Some(AsOf::TimestampNum(_)) => {}
292 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
293 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
294 }
295 Some(AsOf::VersionString(_)) => {
296 bail_not_implemented!("As Of Version is not supported yet.")
297 }
298 }
299 let is_iceberg = source.catalog.is_iceberg_connector();
300
301 if matches!(self.plan_for(), PlanFor::Stream) {
304 let has_pk =
305 source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
306 if !has_pk {
307 debug_assert!(is_iceberg);
310 if is_iceberg {
311 return Err(ErrorCode::BindError(format!(
312 "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
313 source.catalog
314 ))
315 .into());
316 } else {
317 return Err(ErrorCode::BindError(format!(
318 "Cannot create a stream job from a source without a primary key.
319This is a bug. We would appreciate a bug report at:
320https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
321
322source: {:?}",
323 source.catalog
324 ))
325 .into());
326 }
327 }
328 }
329
330 let source = LogicalSource::with_catalog(
331 Rc::new(source.catalog),
332 SourceNodeKind::CreateMViewOrBatch,
333 self.ctx(),
334 as_of,
335 )?;
336 if is_iceberg && !matches!(self.plan_for(), PlanFor::Stream) {
337 let num_cols = source.core.column_catalog.len();
338 let intermediate_scan = self.plan_iceberg_intermediate_scan(
339 &source,
340 HashMap::new(),
341 ColIndexMapping::identity(num_cols),
342 )?;
343 Ok(intermediate_scan)
344 } else {
345 Ok(source.into())
346 }
347 }
348 }
349
350 pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
351 let left = self.plan_relation(join.left)?;
352 let right = self.plan_relation(join.right)?;
353 let join_type = join.join_type;
354 let on_clause = join.cond;
355 if on_clause.has_subquery() {
356 bail_not_implemented!("Subquery in join on condition");
357 } else {
358 Ok(LogicalJoin::create(left, right, join_type, on_clause))
359 }
360 }
361
362 pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
363 let join_type = join.join_type;
364 let on_clause = join.cond;
365 if on_clause.has_subquery() {
366 bail_not_implemented!("Subquery in join on condition");
367 }
368
369 let correlated_id = self.ctx.next_correlated_id();
370 let correlated_indices = join
371 .right
372 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
373 let left = self.plan_relation(join.left)?;
374 let right = self.plan_relation(join.right)?;
375
376 Ok(LogicalApply::create(
377 left,
378 right,
379 join_type,
380 Condition::with_expr(on_clause),
381 correlated_id,
382 correlated_indices,
383 false,
384 ))
385 }
386
387 pub(super) fn plan_window_table_function(
388 &mut self,
389 table_function: BoundWindowTableFunction,
390 ) -> Result<PlanRef> {
391 use WindowTableFunctionKind::*;
392 match table_function.kind {
393 Tumble => self.plan_tumble_window(
394 table_function.input,
395 table_function.time_col,
396 table_function.args,
397 ),
398 Hop => self.plan_hop_window(
399 table_function.input,
400 table_function.time_col,
401 table_function.args,
402 ),
403 }
404 }
405
406 pub(super) fn plan_table_function(
407 &mut self,
408 table_function: ExprImpl,
409 with_ordinality: bool,
410 ) -> Result<PlanRef> {
411 match table_function {
413 ExprImpl::TableFunction(tf) => {
414 Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
415 }
416 expr => {
417 let schema = Schema {
418 fields: vec![Field::unnamed(expr.return_type())],
420 };
421 let expr_return_type = expr.return_type();
422 let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
423 let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
424 let mut exprs = if let DataType::Struct(st) = expr_return_type {
425 st.iter()
426 .enumerate()
427 .map(|(i, (_, ty))| {
428 let idx = ExprImpl::literal_int(i.try_into().unwrap());
429 let args = vec![input_ref.clone(), idx];
430 FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
431 })
432 .collect()
433 } else {
434 vec![input_ref]
435 };
436 if with_ordinality {
437 exprs.push(ExprImpl::literal_bigint(1));
438 }
439 Ok(LogicalProject::create(root, exprs))
440 }
441 }
442 }
443
444 pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
445 match share.input {
446 BoundShareInput::Query(query) => {
447 let id = share.share_id;
448 match self.share_cache.get(&id) {
449 None => {
450 let result = self.plan_query(query)?.into_unordered_subplan();
451 let logical_share = LogicalShare::create(result);
452 self.share_cache.insert(id, logical_share.clone());
453 Ok(logical_share)
454 }
455 Some(result) => Ok(result.clone()),
456 }
457 }
458 BoundShareInput::ChangeLog(relation) => {
459 let id = share.share_id;
460 let result = self.plan_changelog(relation)?;
461 let logical_share = LogicalShare::create(result);
462 self.share_cache.insert(id, logical_share.clone());
463 Ok(logical_share)
464 }
465 }
466 }
467
468 pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
469 todo!("plan watermark");
470 }
471
472 pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
473 let input = self.plan_relation(gap_fill.input)?;
474 Ok(LogicalGapFill::new(
475 input,
476 gap_fill.time_col,
477 gap_fill.interval,
478 gap_fill.fill_strategies,
479 )
480 .into())
481 }
482
483 fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
484 let col_data_types = match relation {
485 Relation::Source(s) => s
486 .catalog
487 .columns
488 .iter()
489 .map(|col| col.data_type().clone())
490 .collect(),
491 Relation::BaseTable(t) => t
492 .table_catalog
493 .columns
494 .iter()
495 .map(|col| col.data_type().clone())
496 .collect(),
497 Relation::Subquery(q) => q.query.schema().data_types(),
498 Relation::Share(share) => share
499 .input
500 .fields()?
501 .into_iter()
502 .map(|(_, f)| f.data_type)
503 .collect(),
504 r => {
505 return Err(ErrorCode::BindError(format!(
506 "Invalid input relation to tumble: {r:?}"
507 ))
508 .into());
509 }
510 };
511 Ok(col_data_types)
512 }
513
514 fn plan_tumble_window(
515 &mut self,
516 input: Relation,
517 time_col: InputRef,
518 args: Vec<ExprImpl>,
519 ) -> Result<PlanRef> {
520 let mut args = args.into_iter();
521 let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
522
523 match (args.next(), args.next(), args.next()) {
524 (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
525 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
526 for (idx, col_dt) in col_data_types.iter().enumerate() {
527 exprs.push(InputRef::new(idx, col_dt.clone()).into());
528 }
529 let window_start: ExprImpl = FunctionCall::new(
530 ExprType::TumbleStart,
531 vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
532 )?
533 .into();
534 let window_end =
538 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
539 .into();
540 exprs.push(window_start);
541 exprs.push(window_end);
542 let base = self.plan_relation(input)?;
543 let project = LogicalProject::create(base, exprs);
544 Ok(project)
545 }
546 (
547 Some(window_size @ ExprImpl::Literal(_)),
548 Some(window_offset @ ExprImpl::Literal(_)),
549 None,
550 ) => {
551 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
552 for (idx, col_dt) in col_data_types.iter().enumerate() {
553 exprs.push(InputRef::new(idx, col_dt.clone()).into());
554 }
555 let window_start: ExprImpl = FunctionCall::new(
556 ExprType::TumbleStart,
557 vec![
558 ExprImpl::InputRef(Box::new(time_col)),
559 window_size.clone(),
560 window_offset,
561 ],
562 )?
563 .into();
564 let window_end =
568 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
569 .into();
570 exprs.push(window_start);
571 exprs.push(window_end);
572 let base = self.plan_relation(input)?;
573 let project = LogicalProject::create(base, exprs);
574 Ok(project)
575 }
576 _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
577 }
578 }
579
580 fn plan_hop_window(
581 &mut self,
582 input: Relation,
583 time_col: InputRef,
584 args: Vec<ExprImpl>,
585 ) -> Result<PlanRef> {
586 let input = self.plan_relation(input)?;
587 let mut args = args.into_iter();
588 let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
589 args.next_tuple()
590 else {
591 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
592 };
593
594 let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
595 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
596 };
597 let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
598 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
599 };
600
601 let window_offset = match (args.next(), args.next()) {
602 (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
603 Some(ScalarImpl::Interval(window_offset)) => window_offset,
604 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
605 },
606 (None, None) => Interval::from_month_day_usec(0, 0, 0),
607 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
608 };
609
610 if !window_size.is_positive() || !window_slide.is_positive() {
611 return Err(ErrorCode::BindError(format!(
612 "window_size {} and window_slide {} must be positive",
613 window_size, window_slide
614 ))
615 .into());
616 }
617
618 if window_size.exact_div(&window_slide).is_none() {
619 return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
620 }
621
622 Ok(LogicalHopWindow::create(
623 input,
624 time_col,
625 window_slide,
626 window_size,
627 window_offset,
628 ))
629 }
630
631 fn plan_iceberg_intermediate_scan(
632 &self,
633 source: &LogicalSource,
634 table_column_type_mapping: HashMap<String, DataType>,
635 source_to_table_mapping: ColIndexMapping,
636 ) -> Result<PlanRef> {
637 let timezone = self.ctx().get_session_timezone();
639 let mut time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
640 if time_travel_info.is_none() {
641 time_travel_info = self
642 .fetch_current_snapshot_id(source)?
643 .map(IcebergTimeTravelInfo::Version);
644 }
645 let Some(time_travel_info) = time_travel_info else {
646 let mut schema = source.schema().clone();
647 for field in &mut schema.fields {
648 if let Some(target_type) = table_column_type_mapping.get(&field.name) {
649 field.data_type = target_type.clone();
650 }
651 }
652 return Ok(LogicalValues::new(vec![], schema, self.ctx()).into());
653 };
654 let intermediate_scan = LogicalIcebergIntermediateScan::new(
655 source,
656 time_travel_info,
657 table_column_type_mapping,
658 source_to_table_mapping,
659 );
660 Ok(intermediate_scan.into())
661 }
662
663 fn fetch_current_snapshot_id(&self, source: &LogicalSource) -> Result<Option<i64>> {
664 let mut map = self.ctx.iceberg_snapshot_id_map();
665 let catalog = source.source_catalog().ok_or_else(|| {
666 crate::error::ErrorCode::InternalError(
667 "Iceberg source must have a valid source catalog".to_owned(),
668 )
669 })?;
670 let name = catalog.name.as_str();
671 if let Some(&snapshot_id) = map.get(name) {
672 return Ok(snapshot_id);
673 }
674
675 #[cfg(madsim)]
676 return Err(crate::error::ErrorCode::BindError(
677 "iceberg source time travel can't be used in the madsim mode".to_string(),
678 )
679 .into());
680
681 #[cfg(not(madsim))]
682 {
683 let ConnectorProperties::Iceberg(prop) =
684 ConnectorProperties::extract(catalog.with_properties.clone(), false)?
685 else {
686 return Err(crate::error::ErrorCode::InternalError(
687 "Iceberg source must have Iceberg connector properties".to_owned(),
688 )
689 .into());
690 };
691
692 let snapshot_id = tokio::task::block_in_place(|| {
693 crate::utils::FRONTEND_RUNTIME.block_on(async {
694 prop.load_table()
695 .await
696 .map(|table| table.metadata().current_snapshot_id())
697 })
698 })?;
699 map.insert(name.to_owned(), snapshot_id);
700 Ok(snapshot_id)
701 }
702 }
703
704 fn get_iceberg_source_by_table_catalog(
705 &self,
706 table_catalog: &TableCatalog,
707 ) -> Option<SourceCatalog> {
708 let catalog_reader = self.ctx.session_ctx().env().catalog_reader().read_guard();
709
710 let iceberg_source_name = table_catalog.iceberg_source_name()?;
711 let schema = catalog_reader
712 .get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)
713 .ok()?;
714 let source_catalog = schema.get_source_by_name(&iceberg_source_name)?;
715 Some(source_catalog.deref().clone())
716 }
717}