risingwave_frontend/catalog/system_catalog/rw_catalog/
iceberg_tables.rs1use anyhow::anyhow;
16use risingwave_common::acl::AclMode;
17use risingwave_common::types::Fields;
18use risingwave_frontend_macro::system_catalog;
19
20use crate::catalog::root_catalog::SchemaPath;
21use crate::catalog::system_catalog::SysCatalogReaderImpl;
22use crate::error::Result;
23
24#[derive(Fields)]
30#[primary_key(catalog_name, table_namespace, table_name)]
31struct IcebergTables {
32 pub catalog_name: String,
33 pub table_namespace: String,
34 pub table_name: String,
35 pub metadata_location: Option<String>,
36 pub previous_metadata_location: Option<String>,
37 pub iceberg_type: Option<String>,
38}
39
40#[system_catalog(table, "rw_catalog.iceberg_tables")]
41async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<IcebergTables>> {
42 let rows = reader.meta_client.list_hosted_iceberg_tables().await?;
43
44 let catalog_reader = reader.catalog_reader.read_guard();
45 let user_reader = reader.user_info_reader.read_guard();
46 let user = user_reader
47 .get_user_by_name(&reader.auth_context.user_name)
48 .ok_or_else(|| anyhow!("User not found"))?;
49
50 let mut res = Vec::new();
51 for row in rows {
52 let record = IcebergTables {
53 catalog_name: row.catalog_name,
54 table_namespace: row.table_namespace,
55 table_name: row.table_name,
56 metadata_location: row.metadata_location,
57 previous_metadata_location: row.previous_metadata_location,
58 iceberg_type: row.iceberg_type,
59 };
60 let table = catalog_reader
61 .get_created_table_by_name(
62 &record.catalog_name,
63 SchemaPath::Name(&record.table_namespace),
64 &record.table_name,
65 )?
66 .0;
67
68 if user.is_super
69 || table.owner == user.id
70 || user.has_privilege(table.id(), AclMode::Select)
71 {
72 res.push(record);
73 }
74 }
75
76 Ok(res)
77}