risingwave_connector/connector_common/iceberg/
mock_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use async_trait::async_trait;
18use iceberg::io::FileIO;
19use iceberg::spec::{
20    NestedField, PrimitiveType, Schema, TableMetadataBuilder, Transform, Type,
21    UnboundPartitionField, UnboundPartitionSpec,
22};
23use iceberg::table::Table;
24use iceberg::{
25    Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
26};
27
28/// A mock catalog for iceberg used for plan test.
29#[derive(Debug)]
30pub struct MockCatalog;
31
32impl MockCatalog {
33    const RANGE_TABLE: &'static str = "range_table";
34    const SPARSE_TABLE: &'static str = "sparse_table";
35}
36
37impl MockCatalog {
38    fn build_table(name: &str, schema: Schema, partition_spec: UnboundPartitionSpec) -> Table {
39        let file_io = FileIO::from_path("memory://").unwrap().build().unwrap();
40        let table_creation = TableCreation {
41            name: "ignore".to_owned(),
42            location: Some("1".to_owned()),
43            schema,
44            partition_spec: Some(partition_spec),
45            sort_order: None,
46            properties: HashMap::new(),
47            format_version: iceberg::spec::FormatVersion::V2,
48        };
49        Table::builder()
50            .identifier(TableIdent::new(
51                NamespaceIdent::new("mock_namespace".to_owned()),
52                name.to_owned(),
53            ))
54            .file_io(file_io)
55            .metadata(
56                TableMetadataBuilder::from_table_creation(table_creation)
57                    .unwrap()
58                    .build()
59                    .unwrap()
60                    .metadata,
61            )
62            .build()
63            .unwrap()
64    }
65
66    fn sparse_table() -> Table {
67        Self::build_table(
68            Self::SPARSE_TABLE,
69            Schema::builder()
70                .with_fields(vec![
71                    NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Int), true).into(),
72                    NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Long), true).into(),
73                    NestedField::new(3, "v3", Type::Primitive(PrimitiveType::String), true).into(),
74                    NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Time), true).into(),
75                ])
76                .build()
77                .unwrap(),
78            UnboundPartitionSpec::builder()
79                .with_spec_id(1)
80                .add_partition_fields(vec![
81                    UnboundPartitionField {
82                        source_id: 1,
83                        field_id: Some(5),
84                        name: "f1".to_owned(),
85                        transform: Transform::Identity,
86                    },
87                    UnboundPartitionField {
88                        source_id: 2,
89                        field_id: Some(6),
90                        name: "f2".to_owned(),
91                        transform: Transform::Bucket(1),
92                    },
93                    UnboundPartitionField {
94                        source_id: 3,
95                        field_id: Some(7),
96                        name: "f3".to_owned(),
97                        transform: Transform::Truncate(1),
98                    },
99                    UnboundPartitionField {
100                        source_id: 4,
101                        field_id: Some(8),
102                        name: "f4".to_owned(),
103                        transform: Transform::Void,
104                    },
105                ])
106                .unwrap()
107                .build(),
108        )
109    }
110
111    fn range_table() -> Table {
112        Self::build_table(
113            Self::RANGE_TABLE,
114            Schema::builder()
115                .with_fields(vec![
116                    NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Date), true).into(),
117                    NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Timestamp), true)
118                        .into(),
119                    NestedField::new(3, "v3", Type::Primitive(PrimitiveType::Timestamptz), true)
120                        .into(),
121                    NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Timestamptz), true)
122                        .into(),
123                ])
124                .build()
125                .unwrap(),
126            UnboundPartitionSpec::builder()
127                .with_spec_id(1)
128                .add_partition_fields(vec![
129                    UnboundPartitionField {
130                        source_id: 1,
131                        field_id: Some(5),
132                        name: "f1".to_owned(),
133                        transform: Transform::Year,
134                    },
135                    UnboundPartitionField {
136                        source_id: 2,
137                        field_id: Some(6),
138                        name: "f2".to_owned(),
139                        transform: Transform::Month,
140                    },
141                    UnboundPartitionField {
142                        source_id: 3,
143                        field_id: Some(7),
144                        name: "f3".to_owned(),
145                        transform: Transform::Day,
146                    },
147                    UnboundPartitionField {
148                        source_id: 4,
149                        field_id: Some(8),
150                        name: "f4".to_owned(),
151                        transform: Transform::Hour,
152                    },
153                ])
154                .unwrap()
155                .build(),
156        )
157    }
158}
159
160#[async_trait]
161impl CatalogV2 for MockCatalog {
162    /// List namespaces from table.
163    async fn list_namespaces(
164        &self,
165        _parent: Option<&NamespaceIdent>,
166    ) -> iceberg::Result<Vec<NamespaceIdent>> {
167        todo!()
168    }
169
170    /// Create a new namespace inside the catalog.
171    async fn create_namespace(
172        &self,
173        _namespace: &iceberg::NamespaceIdent,
174        _properties: HashMap<String, String>,
175    ) -> iceberg::Result<iceberg::Namespace> {
176        todo!()
177    }
178
179    /// Get a namespace information from the catalog.
180    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
181        todo!()
182    }
183
184    /// Check if namespace exists in catalog.
185    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result<bool> {
186        todo!()
187    }
188
189    /// Drop a namespace from the catalog.
190    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
191        todo!()
192    }
193
194    /// List tables from namespace.
195    async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
196        todo!()
197    }
198
199    async fn update_namespace(
200        &self,
201        _namespace: &NamespaceIdent,
202        _properties: HashMap<String, String>,
203    ) -> iceberg::Result<()> {
204        todo!()
205    }
206
207    /// Create a new table inside the namespace.
208    async fn create_table(
209        &self,
210        _namespace: &NamespaceIdent,
211        _creation: TableCreation,
212    ) -> iceberg::Result<Table> {
213        todo!()
214    }
215
216    /// Load table from the catalog.
217    async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
218        match table.name.as_ref() {
219            Self::SPARSE_TABLE => Ok(Self::sparse_table()),
220            Self::RANGE_TABLE => Ok(Self::range_table()),
221            _ => unimplemented!("table {} not found", table.name()),
222        }
223    }
224
225    /// Drop a table from the catalog.
226    async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
227        todo!()
228    }
229
230    /// Check if a table exists in the catalog.
231    async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
232        match table.name.as_ref() {
233            Self::SPARSE_TABLE => Ok(true),
234            Self::RANGE_TABLE => Ok(true),
235            _ => Ok(false),
236        }
237    }
238
239    /// Rename a table in the catalog.
240    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
241        todo!()
242    }
243
244    /// Update a table to the catalog.
245    async fn update_table(&self, _commit: TableCommit) -> iceberg::Result<Table> {
246        todo!()
247    }
248
249    #[expect(
250        clippy::disallowed_types,
251        reason = "iceberg catalog trait requires returning iceberg::Error"
252    )]
253    async fn register_table(
254        &self,
255        _table_ident: &TableIdent,
256        _metadata_location: String,
257    ) -> iceberg::Result<Table> {
258        Err(iceberg::Error::new(
259            iceberg::ErrorKind::Unexpected,
260            "register_table is not supported in mock catalog",
261        ))
262    }
263}