risingwave_connector/connector_common/iceberg/
mock_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use async_trait::async_trait;
18use iceberg::io::FileIO;
19use iceberg::spec::{
20    NestedField, PrimitiveType, Schema, TableMetadataBuilder, Transform, Type,
21    UnboundPartitionField, UnboundPartitionSpec,
22};
23use iceberg::table::Table;
24use iceberg::{
25    Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
26};
27
28/// A mock catalog for iceberg used for plan test.
29#[derive(Debug)]
30pub struct MockCatalog;
31
32impl MockCatalog {
33    const RANGE_TABLE: &'static str = "range_table";
34    const SPARSE_TABLE: &'static str = "sparse_table";
35}
36
37impl MockCatalog {
38    fn build_table(name: &str, schema: Schema, partition_spec: UnboundPartitionSpec) -> Table {
39        let file_io = FileIO::from_path("memory://").unwrap().build().unwrap();
40        let table_creation = TableCreation {
41            name: "ignore".to_owned(),
42            location: Some("1".to_owned()),
43            schema,
44            partition_spec: Some(partition_spec),
45            sort_order: None,
46            properties: HashMap::new(),
47        };
48        Table::builder()
49            .identifier(TableIdent::new(
50                NamespaceIdent::new("mock_namespace".to_owned()),
51                name.to_owned(),
52            ))
53            .file_io(file_io)
54            .metadata(
55                TableMetadataBuilder::from_table_creation(table_creation)
56                    .unwrap()
57                    .build()
58                    .unwrap()
59                    .metadata,
60            )
61            .build()
62            .unwrap()
63    }
64
65    fn sparse_table() -> Table {
66        Self::build_table(
67            Self::SPARSE_TABLE,
68            Schema::builder()
69                .with_fields(vec![
70                    NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Int), true).into(),
71                    NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Long), true).into(),
72                    NestedField::new(3, "v3", Type::Primitive(PrimitiveType::String), true).into(),
73                    NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Time), true).into(),
74                ])
75                .build()
76                .unwrap(),
77            UnboundPartitionSpec::builder()
78                .with_spec_id(1)
79                .add_partition_fields(vec![
80                    UnboundPartitionField {
81                        source_id: 1,
82                        field_id: Some(5),
83                        name: "f1".to_owned(),
84                        transform: Transform::Identity,
85                    },
86                    UnboundPartitionField {
87                        source_id: 2,
88                        field_id: Some(6),
89                        name: "f2".to_owned(),
90                        transform: Transform::Bucket(1),
91                    },
92                    UnboundPartitionField {
93                        source_id: 3,
94                        field_id: Some(7),
95                        name: "f3".to_owned(),
96                        transform: Transform::Truncate(1),
97                    },
98                    UnboundPartitionField {
99                        source_id: 4,
100                        field_id: Some(8),
101                        name: "f4".to_owned(),
102                        transform: Transform::Void,
103                    },
104                ])
105                .unwrap()
106                .build(),
107        )
108    }
109
110    fn range_table() -> Table {
111        Self::build_table(
112            Self::RANGE_TABLE,
113            Schema::builder()
114                .with_fields(vec![
115                    NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Date), true).into(),
116                    NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Timestamp), true)
117                        .into(),
118                    NestedField::new(3, "v3", Type::Primitive(PrimitiveType::Timestamptz), true)
119                        .into(),
120                    NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Timestamptz), true)
121                        .into(),
122                ])
123                .build()
124                .unwrap(),
125            UnboundPartitionSpec::builder()
126                .with_spec_id(1)
127                .add_partition_fields(vec![
128                    UnboundPartitionField {
129                        source_id: 1,
130                        field_id: Some(5),
131                        name: "f1".to_owned(),
132                        transform: Transform::Year,
133                    },
134                    UnboundPartitionField {
135                        source_id: 2,
136                        field_id: Some(6),
137                        name: "f2".to_owned(),
138                        transform: Transform::Month,
139                    },
140                    UnboundPartitionField {
141                        source_id: 3,
142                        field_id: Some(7),
143                        name: "f3".to_owned(),
144                        transform: Transform::Day,
145                    },
146                    UnboundPartitionField {
147                        source_id: 4,
148                        field_id: Some(8),
149                        name: "f4".to_owned(),
150                        transform: Transform::Hour,
151                    },
152                ])
153                .unwrap()
154                .build(),
155        )
156    }
157}
158
159#[async_trait]
160impl CatalogV2 for MockCatalog {
161    /// List namespaces from table.
162    async fn list_namespaces(
163        &self,
164        _parent: Option<&NamespaceIdent>,
165    ) -> iceberg::Result<Vec<NamespaceIdent>> {
166        todo!()
167    }
168
169    /// Create a new namespace inside the catalog.
170    async fn create_namespace(
171        &self,
172        _namespace: &iceberg::NamespaceIdent,
173        _properties: HashMap<String, String>,
174    ) -> iceberg::Result<iceberg::Namespace> {
175        todo!()
176    }
177
178    /// Get a namespace information from the catalog.
179    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
180        todo!()
181    }
182
183    /// Check if namespace exists in catalog.
184    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result<bool> {
185        todo!()
186    }
187
188    /// Drop a namespace from the catalog.
189    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
190        todo!()
191    }
192
193    /// List tables from namespace.
194    async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
195        todo!()
196    }
197
198    async fn update_namespace(
199        &self,
200        _namespace: &NamespaceIdent,
201        _properties: HashMap<String, String>,
202    ) -> iceberg::Result<()> {
203        todo!()
204    }
205
206    /// Create a new table inside the namespace.
207    async fn create_table(
208        &self,
209        _namespace: &NamespaceIdent,
210        _creation: TableCreation,
211    ) -> iceberg::Result<Table> {
212        todo!()
213    }
214
215    /// Load table from the catalog.
216    async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
217        match table.name.as_ref() {
218            Self::SPARSE_TABLE => Ok(Self::sparse_table()),
219            Self::RANGE_TABLE => Ok(Self::range_table()),
220            _ => unimplemented!("table {} not found", table.name()),
221        }
222    }
223
224    /// Drop a table from the catalog.
225    async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
226        todo!()
227    }
228
229    /// Check if a table exists in the catalog.
230    async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
231        match table.name.as_ref() {
232            Self::SPARSE_TABLE => Ok(true),
233            Self::RANGE_TABLE => Ok(true),
234            _ => Ok(false),
235        }
236    }
237
238    /// Rename a table in the catalog.
239    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
240        todo!()
241    }
242
243    /// Update a table to the catalog.
244    async fn update_table(&self, _commit: TableCommit) -> iceberg::Result<Table> {
245        todo!()
246    }
247}