risingwave_meta_service/
hosted_iceberg_catalog_service.rs1use risingwave_meta::manager::MetaSrvEnv;
16use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogService;
17use risingwave_pb::meta::{
18 ListIcebergTablesRequest, ListIcebergTablesResponse, list_iceberg_tables_response,
19};
20use sea_orm::{ConnectionTrait, DeriveIden, EnumIter, Statement, TryGetableMany};
21use thiserror_ext::AsReport;
22use tonic::{Request, Response, Status};
23
24#[derive(Clone)]
25pub struct HostedIcebergCatalogServiceImpl {
26 env: MetaSrvEnv,
27}
28
29impl HostedIcebergCatalogServiceImpl {
30 pub fn new(env: MetaSrvEnv) -> Self {
31 HostedIcebergCatalogServiceImpl { env }
32 }
33}
34
35#[derive(EnumIter, DeriveIden)]
36enum IcebergTableCols {
37 CatalogName,
38 TableNamespace,
39 TableName,
40 MetadataLocation,
41 PreviousMetadataLocation,
42}
43
44#[async_trait::async_trait]
45impl HostedIcebergCatalogService for HostedIcebergCatalogServiceImpl {
46 async fn list_iceberg_tables(
47 &self,
48 _request: Request<ListIcebergTablesRequest>,
49 ) -> Result<Response<ListIcebergTablesResponse>, Status> {
50 let rows: Vec<(String, String, String, Option<String>, Option<String>)> =
51 <(String, String, String, Option<String>, Option<String>)>::find_by_statement::<IcebergTableCols>(Statement::from_sql_and_values(
52 self.env.meta_store().conn.get_database_backend(),
53 r#"SELECT catalog_name, table_namespace, table_name, metadata_location, previous_metadata_location FROM iceberg_tables"#,
54 [],
55 ))
56 .all(&self.env.meta_store().conn)
57 .await.map_err(|e| Status::internal(format!("Failed to list iceberg tables: {}", e.as_report())))?;
58
59 let iceberg_tables = rows
60 .into_iter()
61 .map(|row| list_iceberg_tables_response::IcebergTable {
62 catalog_name: row.0,
63 table_namespace: row.1,
64 table_name: row.2,
65 metadata_location: row.3,
66 previous_metadata_location: row.4,
67 iceberg_type: None,
68 })
69 .collect();
70
71 return Ok(Response::new(ListIcebergTablesResponse { iceberg_tables }));
72 }
73}