1use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use itertools::Itertools;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{
22 ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::session_config::IcebergQueryStorageMode;
25use risingwave_common::types::{DataType, Interval, ScalarImpl};
26use risingwave_connector::source::ConnectorProperties;
27use risingwave_connector::source::iceberg::IcebergTimeTravelInfo;
28use risingwave_sqlparser::ast::AsOf;
29
30use crate::TableCatalog;
31use crate::binder::{
32 BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput, BoundSource,
33 BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
34};
35use crate::catalog::source_catalog::SourceCatalog;
36use crate::error::{ErrorCode, Result};
37use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
38use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind};
39use crate::optimizer::plan_node::utils::to_iceberg_time_travel_as_of;
40use crate::optimizer::plan_node::{
41 LogicalApply, LogicalGapFill, LogicalHopWindow, LogicalIcebergIntermediateScan, LogicalJoin,
42 LogicalPlanRef as PlanRef, LogicalProject, LogicalScan, LogicalShare, LogicalSource,
43 LogicalSysScan, LogicalTableFunction, LogicalValues,
44};
45use crate::optimizer::property::Cardinality;
46use crate::planner::{PlanFor, Planner};
47use crate::utils::{ColIndexMapping, Condition};
48
49const ERROR_WINDOW_SIZE_ARG: &str =
50 "The size arg of window table function should be an interval literal.";
51
52impl Planner {
53 pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
54 match relation {
55 Relation::BaseTable(t) => self.plan_base_table(&t),
56 Relation::SystemTable(st) => self.plan_sys_table(*st),
57 Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
59 Relation::Join(join) => self.plan_join(*join),
60 Relation::Apply(join) => self.plan_apply(*join),
61 Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
62 Relation::Source(s) => self.plan_source(*s),
63 Relation::TableFunction {
64 expr: tf,
65 with_ordinality,
66 } => self.plan_table_function(tf, with_ordinality),
67 Relation::Watermark(tf) => self.plan_watermark(*tf),
68 Relation::Share(share) => self.plan_share(*share),
69 Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
70 }
71 }
72
73 pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
74 Ok(LogicalSysScan::create(
75 sys_table.sys_table_catalog,
76 self.ctx(),
77 Cardinality::unknown(), )
79 .into())
80 }
81
82 pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
83 let as_of = base_table.as_of.clone();
84 let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
85
86 match base_table.table_catalog.engine {
87 Engine::Hummock => {
88 match as_of {
89 None
90 | Some(AsOf::ProcessTime)
91 | Some(AsOf::TimestampNum(_))
92 | Some(AsOf::TimestampString(_))
93 | Some(AsOf::ProcessTimeWithInterval(_)) => {}
94 Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
95 bail_not_implemented!("As Of Version is not supported yet.")
96 }
97 };
98 Ok(scan.into())
99 }
100 Engine::Iceberg => self.plan_iceberg_table(base_table, scan, as_of),
101 }
102 }
103
104 fn plan_iceberg_table(
105 &mut self,
106 base_table: &BoundBaseTable,
107 scan: LogicalScan,
108 as_of: Option<AsOf>,
109 ) -> Result<PlanRef> {
110 let is_append_only = base_table.table_catalog.append_only;
111 let iceberg_query_storage_mode = self
112 .ctx()
113 .session_ctx()
114 .config()
115 .iceberg_query_storage_mode();
116
117 enum PlanTarget {
118 TableScan,
119 Source,
120 IntermediateScan,
121 }
122 let plan_target = match self.plan_for() {
123 PlanFor::StreamIcebergEngineInternal => PlanTarget::TableScan,
124 PlanFor::BatchDql => match iceberg_query_storage_mode {
125 IcebergQueryStorageMode::Hummock => PlanTarget::TableScan,
126 _ => PlanTarget::IntermediateScan,
127 },
128 PlanFor::Stream => {
129 if is_append_only {
130 PlanTarget::Source
131 } else {
132 PlanTarget::TableScan
133 }
134 }
135 PlanFor::Batch => {
136 if is_append_only {
137 PlanTarget::IntermediateScan
138 } else {
139 PlanTarget::TableScan
140 }
141 }
142 };
143 match as_of {
144 None
145 | Some(AsOf::VersionNum(_))
146 | Some(AsOf::TimestampString(_))
147 | Some(AsOf::TimestampNum(_)) => {}
148 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
149 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
150 }
151 Some(AsOf::VersionString(_)) => {
152 bail_not_implemented!("As Of Version is not supported yet.")
153 }
154 }
155
156 if matches!(plan_target, PlanTarget::TableScan) {
157 return Ok(scan.into());
158 }
159
160 let source_catalog = self.get_iceberg_source_by_table_catalog(&base_table.table_catalog)
161 .ok_or_else(|| {
162 ErrorCode::BindError(format!(
163 "failed to plan an iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
164 base_table.table_catalog.name()
165 ))
166 })?;
167
168 let mut table_column_type_mapping = HashMap::new();
173 let table_column_map: HashMap<&str, &DataType> = base_table
174 .table_catalog
175 .columns
176 .iter()
177 .map(|c| (c.name.as_str(), &c.column_desc.data_type))
178 .collect();
179 for source_col in &source_catalog.columns {
180 let source_name = source_col.name();
181 let table_name = if source_name == RISINGWAVE_ICEBERG_ROW_ID {
182 ROW_ID_COLUMN_NAME
183 } else {
184 source_name
185 };
186 if let Some(&table_type) = table_column_map.get(table_name)
187 && source_col.column_desc.data_type != *table_type
188 {
189 table_column_type_mapping.insert(source_name.to_owned(), table_type.clone());
190 }
191 }
192
193 let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
194 .columns
195 .clone()
196 .into_iter()
197 .enumerate()
198 .map(|(i, column)| (column.name().to_owned(), (i, column)))
199 .collect();
200 let exprs = scan
204 .table()
205 .column_schema()
206 .fields()
207 .iter()
208 .map(|field| {
209 let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
210 RISINGWAVE_ICEBERG_ROW_ID
211 } else {
212 &field.name
213 };
214 if let Some((i, source_column)) = column_map.get(source_filed_name) {
215 let mut input_ref =
216 ExprImpl::InputRef(InputRef::new(*i, field.data_type.clone()).into());
217 if matches!(plan_target, PlanTarget::Source)
218 && source_column.column_desc.data_type != field.data_type
219 {
220 FunctionCall::cast_mut(
221 &mut input_ref,
222 &field.data_type(),
223 CastContext::Explicit,
224 )
225 .unwrap();
226 }
227 input_ref
228 } else {
229 ExprImpl::Literal(Literal::new(None, field.data_type.clone()).into())
231 }
232 })
233 .collect_vec();
234
235 let table_col_index: HashMap<&str, usize> = base_table
238 .table_catalog
239 .columns
240 .iter()
241 .enumerate()
242 .map(|(i, c)| (c.name.as_str(), i))
243 .collect();
244 let source_to_table_mapping = ColIndexMapping::new(
245 source_catalog
246 .columns
247 .iter()
248 .map(|c| {
249 let table_name = if c.name() == RISINGWAVE_ICEBERG_ROW_ID {
250 ROW_ID_COLUMN_NAME
251 } else {
252 c.name()
253 };
254 table_col_index.get(table_name).copied()
255 })
256 .collect(),
257 base_table.table_catalog.columns.len(),
258 );
259
260 let logical_source = LogicalSource::with_catalog(
261 Rc::new(source_catalog),
262 SourceNodeKind::CreateMViewOrBatch,
263 self.ctx(),
264 as_of,
265 )?;
266 if matches!(plan_target, PlanTarget::Source) {
267 return Ok(LogicalProject::new(logical_source.into(), exprs).into());
268 }
269
270 let logical_iceberg_intermediate_scan = self.plan_iceberg_intermediate_scan(
271 &logical_source,
272 table_column_type_mapping,
273 source_to_table_mapping,
274 )?;
275 Ok(LogicalProject::new(logical_iceberg_intermediate_scan, exprs).into())
276 }
277
278 pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
279 if source.is_shareable_cdc_connector() {
280 Err(ErrorCode::InternalError(
281 "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
282 )
283 .into())
284 } else {
285 let as_of = source.as_of.clone();
286 match as_of {
287 None
288 | Some(AsOf::VersionNum(_))
289 | Some(AsOf::TimestampString(_))
290 | Some(AsOf::TimestampNum(_)) => {}
291 Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
292 bail_not_implemented!("As Of ProcessTime() is not supported yet.")
293 }
294 Some(AsOf::VersionString(_)) => {
295 bail_not_implemented!("As Of Version is not supported yet.")
296 }
297 }
298 let is_iceberg = source.catalog.is_iceberg_connector();
299
300 if matches!(self.plan_for(), PlanFor::Stream) {
303 let has_pk =
304 source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
305 if !has_pk {
306 debug_assert!(is_iceberg);
309 if is_iceberg {
310 return Err(ErrorCode::BindError(format!(
311 "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
312 source.catalog
313 ))
314 .into());
315 } else {
316 return Err(ErrorCode::BindError(format!(
317 "Cannot create a stream job from a source without a primary key.
318This is a bug. We would appreciate a bug report at:
319https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
320
321source: {:?}",
322 source.catalog
323 ))
324 .into());
325 }
326 }
327 }
328
329 let source = LogicalSource::with_catalog(
330 Rc::new(source.catalog),
331 SourceNodeKind::CreateMViewOrBatch,
332 self.ctx(),
333 as_of,
334 )?;
335 if is_iceberg && !matches!(self.plan_for(), PlanFor::Stream) {
336 let num_cols = source.core.column_catalog.len();
337 let intermediate_scan = self.plan_iceberg_intermediate_scan(
338 &source,
339 HashMap::new(),
340 ColIndexMapping::identity(num_cols),
341 )?;
342 Ok(intermediate_scan)
343 } else {
344 Ok(source.into())
345 }
346 }
347 }
348
349 pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
350 let left = self.plan_relation(join.left)?;
351 let right = self.plan_relation(join.right)?;
352 let join_type = join.join_type;
353 let on_clause = join.cond;
354 if on_clause.has_subquery() {
355 bail_not_implemented!("Subquery in join on condition");
356 } else {
357 Ok(LogicalJoin::create(left, right, join_type, on_clause))
358 }
359 }
360
361 pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
362 let join_type = join.join_type;
363 let on_clause = join.cond;
364 if on_clause.has_subquery() {
365 bail_not_implemented!("Subquery in join on condition");
366 }
367
368 let correlated_id = self.ctx.next_correlated_id();
369 let correlated_indices = join
370 .right
371 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
372 let left = self.plan_relation(join.left)?;
373 let right = self.plan_relation(join.right)?;
374
375 Ok(LogicalApply::create(
376 left,
377 right,
378 join_type,
379 Condition::with_expr(on_clause),
380 correlated_id,
381 correlated_indices,
382 false,
383 ))
384 }
385
386 pub(super) fn plan_window_table_function(
387 &mut self,
388 table_function: BoundWindowTableFunction,
389 ) -> Result<PlanRef> {
390 use WindowTableFunctionKind::*;
391 match table_function.kind {
392 Tumble => self.plan_tumble_window(
393 table_function.input,
394 table_function.time_col,
395 table_function.args,
396 ),
397 Hop => self.plan_hop_window(
398 table_function.input,
399 table_function.time_col,
400 table_function.args,
401 ),
402 }
403 }
404
405 pub(super) fn plan_table_function(
406 &mut self,
407 table_function: ExprImpl,
408 with_ordinality: bool,
409 ) -> Result<PlanRef> {
410 match table_function {
412 ExprImpl::TableFunction(tf) => {
413 Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
414 }
415 expr => {
416 let schema = Schema {
417 fields: vec![Field::unnamed(expr.return_type())],
419 };
420 let expr_return_type = expr.return_type();
421 let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
422 let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
423 let mut exprs = if let DataType::Struct(st) = expr_return_type {
424 st.iter()
425 .enumerate()
426 .map(|(i, (_, ty))| {
427 let idx = ExprImpl::literal_int(i.try_into().unwrap());
428 let args = vec![input_ref.clone(), idx];
429 FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
430 })
431 .collect()
432 } else {
433 vec![input_ref]
434 };
435 if with_ordinality {
436 exprs.push(ExprImpl::literal_bigint(1));
437 }
438 Ok(LogicalProject::create(root, exprs))
439 }
440 }
441 }
442
443 pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
444 match share.input {
445 BoundShareInput::Query(query) => {
446 let id = share.share_id;
447 match self.share_cache.get(&id) {
448 None => {
449 let result = self.plan_query(query)?.into_unordered_subplan();
450 let logical_share = LogicalShare::create(result);
451 self.share_cache.insert(id, logical_share.clone());
452 Ok(logical_share)
453 }
454 Some(result) => Ok(result.clone()),
455 }
456 }
457 BoundShareInput::ChangeLog(relation) => {
458 let id = share.share_id;
459 let result = self.plan_changelog(relation)?;
460 let logical_share = LogicalShare::create(result);
461 self.share_cache.insert(id, logical_share.clone());
462 Ok(logical_share)
463 }
464 }
465 }
466
467 pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
468 todo!("plan watermark");
469 }
470
471 pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
472 let input = self.plan_relation(gap_fill.input)?;
473 Ok(LogicalGapFill::new(
474 input,
475 gap_fill.time_col,
476 gap_fill.interval,
477 gap_fill.fill_strategies,
478 )
479 .into())
480 }
481
482 fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
483 let col_data_types = match relation {
484 Relation::Source(s) => s
485 .catalog
486 .columns
487 .iter()
488 .map(|col| col.data_type().clone())
489 .collect(),
490 Relation::BaseTable(t) => t
491 .table_catalog
492 .columns
493 .iter()
494 .map(|col| col.data_type().clone())
495 .collect(),
496 Relation::Subquery(q) => q.query.schema().data_types(),
497 Relation::Share(share) => share
498 .input
499 .fields()?
500 .into_iter()
501 .map(|(_, f)| f.data_type)
502 .collect(),
503 r => {
504 return Err(ErrorCode::BindError(format!(
505 "Invalid input relation to tumble: {r:?}"
506 ))
507 .into());
508 }
509 };
510 Ok(col_data_types)
511 }
512
513 fn plan_tumble_window(
514 &mut self,
515 input: Relation,
516 time_col: InputRef,
517 args: Vec<ExprImpl>,
518 ) -> Result<PlanRef> {
519 let mut args = args.into_iter();
520 let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
521
522 match (args.next(), args.next(), args.next()) {
523 (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
524 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
525 for (idx, col_dt) in col_data_types.iter().enumerate() {
526 exprs.push(InputRef::new(idx, col_dt.clone()).into());
527 }
528 let window_start: ExprImpl = FunctionCall::new(
529 ExprType::TumbleStart,
530 vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
531 )?
532 .into();
533 let window_end =
537 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
538 .into();
539 exprs.push(window_start);
540 exprs.push(window_end);
541 let base = self.plan_relation(input)?;
542 let project = LogicalProject::create(base, exprs);
543 Ok(project)
544 }
545 (
546 Some(window_size @ ExprImpl::Literal(_)),
547 Some(window_offset @ ExprImpl::Literal(_)),
548 None,
549 ) => {
550 let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
551 for (idx, col_dt) in col_data_types.iter().enumerate() {
552 exprs.push(InputRef::new(idx, col_dt.clone()).into());
553 }
554 let window_start: ExprImpl = FunctionCall::new(
555 ExprType::TumbleStart,
556 vec![
557 ExprImpl::InputRef(Box::new(time_col)),
558 window_size.clone(),
559 window_offset,
560 ],
561 )?
562 .into();
563 let window_end =
567 FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
568 .into();
569 exprs.push(window_start);
570 exprs.push(window_end);
571 let base = self.plan_relation(input)?;
572 let project = LogicalProject::create(base, exprs);
573 Ok(project)
574 }
575 _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
576 }
577 }
578
579 fn plan_hop_window(
580 &mut self,
581 input: Relation,
582 time_col: InputRef,
583 args: Vec<ExprImpl>,
584 ) -> Result<PlanRef> {
585 let input = self.plan_relation(input)?;
586 let mut args = args.into_iter();
587 let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
588 args.next_tuple()
589 else {
590 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
591 };
592
593 let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
594 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
595 };
596 let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
597 return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
598 };
599
600 let window_offset = match (args.next(), args.next()) {
601 (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
602 Some(ScalarImpl::Interval(window_offset)) => window_offset,
603 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
604 },
605 (None, None) => Interval::from_month_day_usec(0, 0, 0),
606 _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
607 };
608
609 if !window_size.is_positive() || !window_slide.is_positive() {
610 return Err(ErrorCode::BindError(format!(
611 "window_size {} and window_slide {} must be positive",
612 window_size, window_slide
613 ))
614 .into());
615 }
616
617 if window_size.exact_div(&window_slide).is_none() {
618 return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
619 }
620
621 Ok(LogicalHopWindow::create(
622 input,
623 time_col,
624 window_slide,
625 window_size,
626 window_offset,
627 ))
628 }
629
630 fn plan_iceberg_intermediate_scan(
631 &self,
632 source: &LogicalSource,
633 table_column_type_mapping: HashMap<String, DataType>,
634 source_to_table_mapping: ColIndexMapping,
635 ) -> Result<PlanRef> {
636 let timezone = self.ctx().get_session_timezone();
638 let mut time_travel_info = to_iceberg_time_travel_as_of(&source.core.as_of, &timezone)?;
639 if time_travel_info.is_none() {
640 time_travel_info = self
641 .fetch_current_snapshot_id(source)?
642 .map(IcebergTimeTravelInfo::Version);
643 }
644 let Some(time_travel_info) = time_travel_info else {
645 let mut schema = source.schema().clone();
646 for field in &mut schema.fields {
647 if let Some(target_type) = table_column_type_mapping.get(&field.name) {
648 field.data_type = target_type.clone();
649 }
650 }
651 return Ok(LogicalValues::new(vec![], schema, self.ctx()).into());
652 };
653 let intermediate_scan = LogicalIcebergIntermediateScan::new(
654 source,
655 time_travel_info,
656 table_column_type_mapping,
657 source_to_table_mapping,
658 );
659 Ok(intermediate_scan.into())
660 }
661
662 fn fetch_current_snapshot_id(&self, source: &LogicalSource) -> Result<Option<i64>> {
663 let mut map = self.ctx.iceberg_snapshot_id_map();
664 let catalog = source.source_catalog().ok_or_else(|| {
665 crate::error::ErrorCode::InternalError(
666 "Iceberg source must have a valid source catalog".to_owned(),
667 )
668 })?;
669 let name = catalog.name.as_str();
670 if let Some(&snapshot_id) = map.get(name) {
671 return Ok(snapshot_id);
672 }
673
674 #[cfg(madsim)]
675 return Err(crate::error::ErrorCode::BindError(
676 "iceberg source time travel can't be used in the madsim mode".to_string(),
677 )
678 .into());
679
680 #[cfg(not(madsim))]
681 {
682 let ConnectorProperties::Iceberg(prop) =
683 ConnectorProperties::extract(catalog.with_properties.clone(), false)?
684 else {
685 return Err(crate::error::ErrorCode::InternalError(
686 "Iceberg source must have Iceberg connector properties".to_owned(),
687 )
688 .into());
689 };
690
691 let snapshot_id = tokio::task::block_in_place(|| {
692 crate::utils::FRONTEND_RUNTIME.block_on(async {
693 prop.load_table()
694 .await
695 .map(|table| table.metadata().current_snapshot_id())
696 })
697 })?;
698 map.insert(name.to_owned(), snapshot_id);
699 Ok(snapshot_id)
700 }
701 }
702
703 fn get_iceberg_source_by_table_catalog(
704 &self,
705 table_catalog: &TableCatalog,
706 ) -> Option<SourceCatalog> {
707 let catalog_reader = self.ctx.session_ctx().env().catalog_reader().read_guard();
708
709 let iceberg_source_name = table_catalog.iceberg_source_name()?;
710 let schema = catalog_reader
711 .get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)
712 .ok()?;
713 let source_catalog = schema.get_source_by_name(&iceberg_source_name)?;
714 Some(source_catalog.deref().clone())
715 }
716}