risingwave_meta_service/
hosted_iceberg_catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::manager::MetaSrvEnv;
16use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogService;
17use risingwave_pb::meta::{
18    ListIcebergTablesRequest, ListIcebergTablesResponse, list_iceberg_tables_response,
19};
20use sea_orm::{ConnectionTrait, DeriveIden, EnumIter, Statement, TryGetableMany};
21use thiserror_ext::AsReport;
22use tonic::{Request, Response, Status};
23
24#[derive(Clone)]
25pub struct HostedIcebergCatalogServiceImpl {
26    env: MetaSrvEnv,
27}
28
29impl HostedIcebergCatalogServiceImpl {
30    pub fn new(env: MetaSrvEnv) -> Self {
31        HostedIcebergCatalogServiceImpl { env }
32    }
33}
34
35#[derive(EnumIter, DeriveIden)]
36enum IcebergTableCols {
37    CatalogName,
38    TableNamespace,
39    TableName,
40    MetadataLocation,
41    PreviousMetadataLocation,
42}
43
44#[async_trait::async_trait]
45impl HostedIcebergCatalogService for HostedIcebergCatalogServiceImpl {
46    async fn list_iceberg_tables(
47        &self,
48        _request: Request<ListIcebergTablesRequest>,
49    ) -> Result<Response<ListIcebergTablesResponse>, Status> {
50        let rows: Vec<(String, String, String, Option<String>, Option<String>)> =
51            <(String, String, String, Option<String>, Option<String>)>::find_by_statement::<IcebergTableCols>(Statement::from_sql_and_values(
52                self.env.meta_store().conn.get_database_backend(),
53                r#"SELECT catalog_name, table_namespace, table_name, metadata_location, previous_metadata_location FROM iceberg_tables"#,
54                [],
55            ))
56                .all(&self.env.meta_store().conn)
57                .await.map_err(|e| Status::internal(format!("Failed to list iceberg tables: {}", e.as_report())))?;
58
59        let iceberg_tables = rows
60            .into_iter()
61            .map(|row| list_iceberg_tables_response::IcebergTable {
62                catalog_name: row.0,
63                table_namespace: row.1,
64                table_name: row.2,
65                metadata_location: row.3,
66                previous_metadata_location: row.4,
67                iceberg_type: None,
68            })
69            .collect();
70
71        return Ok(Response::new(ListIcebergTablesResponse { iceberg_tables }));
72    }
73}