risingwave_frontend/catalog/system_catalog/rw_catalog/
iceberg_tables.rs1use anyhow::anyhow;
16use risingwave_common::acl::AclMode;
17use risingwave_common::types::Fields;
18use risingwave_frontend_macro::system_catalog;
19use risingwave_pb::user::grant_privilege::Object as GrantObject;
20
21use crate::catalog::root_catalog::SchemaPath;
22use crate::catalog::system_catalog::SysCatalogReaderImpl;
23use crate::error::Result;
24
25#[derive(Fields)]
31#[primary_key(catalog_name, table_namespace, table_name)]
32struct IcebergTables {
33 pub catalog_name: String,
34 pub table_namespace: String,
35 pub table_name: String,
36 pub metadata_location: Option<String>,
37 pub previous_metadata_location: Option<String>,
38 pub iceberg_type: Option<String>,
39}
40
41#[system_catalog(table, "rw_catalog.iceberg_tables")]
42async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<IcebergTables>> {
43 let rows = reader.meta_client.list_hosted_iceberg_tables().await?;
44
45 let catalog_reader = reader.catalog_reader.read_guard();
46 let user_reader = reader.user_info_reader.read_guard();
47 let user = user_reader
48 .get_user_by_name(&reader.auth_context.user_name)
49 .ok_or_else(|| anyhow!("User not found"))?;
50
51 let mut res = Vec::new();
52 for row in rows {
53 let record = IcebergTables {
54 catalog_name: row.catalog_name,
55 table_namespace: row.table_namespace,
56 table_name: row.table_name,
57 metadata_location: row.metadata_location,
58 previous_metadata_location: row.previous_metadata_location,
59 iceberg_type: row.iceberg_type,
60 };
61 let table = catalog_reader
62 .get_created_table_by_name(
63 &record.catalog_name,
64 SchemaPath::Name(&record.table_namespace),
65 &record.table_name,
66 )?
67 .0;
68
69 if user.is_super
70 || table.owner == user.id
71 || user.has_privilege(
72 &GrantObject::TableId(table.id().table_id()),
73 AclMode::Select,
74 )
75 {
76 res.push(record);
77 }
78 }
79
80 Ok(res)
81}