risingwave_frontend/catalog/system_catalog/rw_catalog/
iceberg_tables.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::acl::AclMode;
17use risingwave_common::types::Fields;
18use risingwave_frontend_macro::system_catalog;
19use risingwave_pb::user::grant_privilege::Object as GrantObject;
20
21use crate::catalog::root_catalog::SchemaPath;
22use crate::catalog::system_catalog::SysCatalogReaderImpl;
23use crate::error::Result;
24
25// JDBC/SQL catalog integration docs: https://iceberg.apache.org/docs/latest/jdbc/#configurations
26// `iceberg_tables` definition in iceberg java sdk https://github.com/apache/iceberg/blob/4850b622c778deb4b234880bfd7643070e0a5458/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java#L125-L146
27// This system table is used to store the iceberg tables' metadata and only show the tables that the user has access to,
28// so it can be used by other query engine to fetch iceberg catalog and provide an access control layer.
29
30#[derive(Fields)]
31#[primary_key(catalog_name, table_namespace, table_name)]
32struct IcebergTables {
33    pub catalog_name: String,
34    pub table_namespace: String,
35    pub table_name: String,
36    pub metadata_location: Option<String>,
37    pub previous_metadata_location: Option<String>,
38    pub iceberg_type: Option<String>,
39}
40
41#[system_catalog(table, "rw_catalog.iceberg_tables")]
42async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<IcebergTables>> {
43    let rows = reader.meta_client.list_hosted_iceberg_tables().await?;
44
45    let catalog_reader = reader.catalog_reader.read_guard();
46    let user_reader = reader.user_info_reader.read_guard();
47    let user = user_reader
48        .get_user_by_name(&reader.auth_context.user_name)
49        .ok_or_else(|| anyhow!("User not found"))?;
50
51    let mut res = Vec::new();
52    for row in rows {
53        let record = IcebergTables {
54            catalog_name: row.catalog_name,
55            table_namespace: row.table_namespace,
56            table_name: row.table_name,
57            metadata_location: row.metadata_location,
58            previous_metadata_location: row.previous_metadata_location,
59            iceberg_type: row.iceberg_type,
60        };
61        let table = catalog_reader
62            .get_created_table_by_name(
63                &record.catalog_name,
64                SchemaPath::Name(&record.table_namespace),
65                &record.table_name,
66            )?
67            .0;
68
69        if user.is_super
70            || table.owner == user.id
71            || user.has_privilege(
72                &GrantObject::TableId(table.id().table_id()),
73                AclMode::Select,
74            )
75        {
76            res.push(record);
77        }
78    }
79
80    Ok(res)
81}