risingwave_frontend/catalog/system_catalog/rw_catalog/
iceberg_tables.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::acl::AclMode;
17use risingwave_common::types::Fields;
18use risingwave_frontend_macro::system_catalog;
19
20use crate::catalog::root_catalog::SchemaPath;
21use crate::catalog::system_catalog::SysCatalogReaderImpl;
22use crate::error::Result;
23
24// JDBC/SQL catalog integration docs: https://iceberg.apache.org/docs/latest/jdbc/#configurations
25// `iceberg_tables` definition in iceberg java sdk https://github.com/apache/iceberg/blob/4850b622c778deb4b234880bfd7643070e0a5458/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java#L125-L146
26// This system table is used to store the iceberg tables' metadata and only show the tables that the user has access to,
27// so it can be used by other query engine to fetch iceberg catalog and provide an access control layer.
28
29#[derive(Fields)]
30#[primary_key(catalog_name, table_namespace, table_name)]
31struct IcebergTables {
32    pub catalog_name: String,
33    pub table_namespace: String,
34    pub table_name: String,
35    pub metadata_location: Option<String>,
36    pub previous_metadata_location: Option<String>,
37    pub iceberg_type: Option<String>,
38}
39
40#[system_catalog(table, "rw_catalog.iceberg_tables")]
41async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<IcebergTables>> {
42    let rows = reader.meta_client.list_hosted_iceberg_tables().await?;
43
44    let catalog_reader = reader.catalog_reader.read_guard();
45    let user_reader = reader.user_info_reader.read_guard();
46    let user = user_reader
47        .get_user_by_name(&reader.auth_context.user_name)
48        .ok_or_else(|| anyhow!("User not found"))?;
49
50    let mut res = Vec::new();
51    for row in rows {
52        let record = IcebergTables {
53            catalog_name: row.catalog_name,
54            table_namespace: row.table_namespace,
55            table_name: row.table_name,
56            metadata_location: row.metadata_location,
57            previous_metadata_location: row.previous_metadata_location,
58            iceberg_type: row.iceberg_type,
59        };
60        let table = catalog_reader
61            .get_created_table_by_name(
62                &record.catalog_name,
63                SchemaPath::Name(&record.table_namespace),
64                &record.table_name,
65            )?
66            .0;
67
68        if user.is_super
69            || table.owner == user.id
70            || user.has_privilege(table.id(), AclMode::Select)
71        {
72            res.push(record);
73        }
74    }
75
76    Ok(res)
77}