risingwave_meta_service/
hosted_iceberg_catalog_service_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::manager::MetaSrvEnv;
16use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogService;
17use risingwave_pb::meta::{
18    ListIcebergTablesRequest, ListIcebergTablesResponse, list_iceberg_tables_response,
19};
20use sea_orm::{ConnectionTrait, DeriveIden, EnumIter, Statement, TryGetableMany};
21use thiserror_ext::AsReport;
22use tonic::{Request, Response, Status};
23
24#[derive(Clone)]
25pub struct HostedIcebergCatalogServiceImpl {
26    env: MetaSrvEnv,
27}
28
29impl HostedIcebergCatalogServiceImpl {
30    pub fn new(env: MetaSrvEnv) -> Self {
31        HostedIcebergCatalogServiceImpl { env }
32    }
33}
34
35#[derive(EnumIter, DeriveIden)]
36enum IcebergTableCols {
37    CatalogName,
38    TableNamespace,
39    TableName,
40    MetadataLocation,
41    PreviousMetadataLocation,
42}
43
44#[async_trait::async_trait]
45impl HostedIcebergCatalogService for HostedIcebergCatalogServiceImpl {
46    #[cfg_attr(coverage, coverage(off))]
47    async fn list_iceberg_tables(
48        &self,
49        _request: Request<ListIcebergTablesRequest>,
50    ) -> Result<Response<ListIcebergTablesResponse>, Status> {
51        let rows: Vec<(String, String, String, Option<String>, Option<String>)> =
52            <(String, String, String, Option<String>, Option<String>)>::find_by_statement::<IcebergTableCols>(Statement::from_sql_and_values(
53                self.env.meta_store().conn.get_database_backend(),
54                r#"SELECT catalog_name, table_namespace, table_name, metadata_location, previous_metadata_location FROM iceberg_tables"#,
55                [],
56            ))
57                .all(&self.env.meta_store().conn)
58                .await.map_err(|e| Status::internal(format!("Failed to list iceberg tables: {}", e.as_report())))?;
59
60        let iceberg_tables = rows
61            .into_iter()
62            .map(|row| list_iceberg_tables_response::IcebergTable {
63                catalog_name: row.0,
64                table_namespace: row.1,
65                table_name: row.2,
66                metadata_location: row.3,
67                previous_metadata_location: row.4,
68                iceberg_type: None,
69            })
70            .collect();
71
72        return Ok(Response::new(ListIcebergTablesResponse { iceberg_tables }));
73    }
74}