risingwave_connector/connector_common/iceberg/
mock_catalog.rs1use std::collections::HashMap;
16
17use async_trait::async_trait;
18use iceberg::io::FileIO;
19use iceberg::spec::{
20 NestedField, PrimitiveType, Schema, TableMetadataBuilder, Transform, Type,
21 UnboundPartitionField, UnboundPartitionSpec,
22};
23use iceberg::table::Table;
24use iceberg::{
25 Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
26};
27
28#[derive(Debug)]
30pub struct MockCatalog;
31
32impl MockCatalog {
33 const RANGE_TABLE: &'static str = "range_table";
34 const SPARSE_TABLE: &'static str = "sparse_table";
35}
36
37impl MockCatalog {
38 fn build_table(name: &str, schema: Schema, partition_spec: UnboundPartitionSpec) -> Table {
39 let file_io = FileIO::from_path("memory://").unwrap().build().unwrap();
40 let table_creation = TableCreation {
41 name: "ignore".to_owned(),
42 location: Some("1".to_owned()),
43 schema,
44 partition_spec: Some(partition_spec),
45 sort_order: None,
46 properties: HashMap::new(),
47 format_version: iceberg::spec::FormatVersion::V2,
48 };
49 Table::builder()
50 .identifier(TableIdent::new(
51 NamespaceIdent::new("mock_namespace".to_owned()),
52 name.to_owned(),
53 ))
54 .file_io(file_io)
55 .metadata(
56 TableMetadataBuilder::from_table_creation(table_creation)
57 .unwrap()
58 .build()
59 .unwrap()
60 .metadata,
61 )
62 .build()
63 .unwrap()
64 }
65
66 fn sparse_table() -> Table {
67 Self::build_table(
68 Self::SPARSE_TABLE,
69 Schema::builder()
70 .with_fields(vec![
71 NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Int), true).into(),
72 NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Long), true).into(),
73 NestedField::new(3, "v3", Type::Primitive(PrimitiveType::String), true).into(),
74 NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Time), true).into(),
75 ])
76 .build()
77 .unwrap(),
78 UnboundPartitionSpec::builder()
79 .with_spec_id(1)
80 .add_partition_fields(vec![
81 UnboundPartitionField {
82 source_id: 1,
83 field_id: Some(5),
84 name: "f1".to_owned(),
85 transform: Transform::Identity,
86 },
87 UnboundPartitionField {
88 source_id: 2,
89 field_id: Some(6),
90 name: "f2".to_owned(),
91 transform: Transform::Bucket(1),
92 },
93 UnboundPartitionField {
94 source_id: 3,
95 field_id: Some(7),
96 name: "f3".to_owned(),
97 transform: Transform::Truncate(1),
98 },
99 UnboundPartitionField {
100 source_id: 4,
101 field_id: Some(8),
102 name: "f4".to_owned(),
103 transform: Transform::Void,
104 },
105 ])
106 .unwrap()
107 .build(),
108 )
109 }
110
111 fn range_table() -> Table {
112 Self::build_table(
113 Self::RANGE_TABLE,
114 Schema::builder()
115 .with_fields(vec![
116 NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Date), true).into(),
117 NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Timestamp), true)
118 .into(),
119 NestedField::new(3, "v3", Type::Primitive(PrimitiveType::Timestamptz), true)
120 .into(),
121 NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Timestamptz), true)
122 .into(),
123 ])
124 .build()
125 .unwrap(),
126 UnboundPartitionSpec::builder()
127 .with_spec_id(1)
128 .add_partition_fields(vec![
129 UnboundPartitionField {
130 source_id: 1,
131 field_id: Some(5),
132 name: "f1".to_owned(),
133 transform: Transform::Year,
134 },
135 UnboundPartitionField {
136 source_id: 2,
137 field_id: Some(6),
138 name: "f2".to_owned(),
139 transform: Transform::Month,
140 },
141 UnboundPartitionField {
142 source_id: 3,
143 field_id: Some(7),
144 name: "f3".to_owned(),
145 transform: Transform::Day,
146 },
147 UnboundPartitionField {
148 source_id: 4,
149 field_id: Some(8),
150 name: "f4".to_owned(),
151 transform: Transform::Hour,
152 },
153 ])
154 .unwrap()
155 .build(),
156 )
157 }
158}
159
160#[async_trait]
161impl CatalogV2 for MockCatalog {
162 async fn list_namespaces(
164 &self,
165 _parent: Option<&NamespaceIdent>,
166 ) -> iceberg::Result<Vec<NamespaceIdent>> {
167 todo!()
168 }
169
170 async fn create_namespace(
172 &self,
173 _namespace: &iceberg::NamespaceIdent,
174 _properties: HashMap<String, String>,
175 ) -> iceberg::Result<iceberg::Namespace> {
176 todo!()
177 }
178
179 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
181 todo!()
182 }
183
184 async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result<bool> {
186 todo!()
187 }
188
189 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
191 todo!()
192 }
193
194 async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
196 todo!()
197 }
198
199 async fn update_namespace(
200 &self,
201 _namespace: &NamespaceIdent,
202 _properties: HashMap<String, String>,
203 ) -> iceberg::Result<()> {
204 todo!()
205 }
206
207 async fn create_table(
209 &self,
210 _namespace: &NamespaceIdent,
211 _creation: TableCreation,
212 ) -> iceberg::Result<Table> {
213 todo!()
214 }
215
216 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
218 match table.name.as_ref() {
219 Self::SPARSE_TABLE => Ok(Self::sparse_table()),
220 Self::RANGE_TABLE => Ok(Self::range_table()),
221 _ => unimplemented!("table {} not found", table.name()),
222 }
223 }
224
225 async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
227 todo!()
228 }
229
230 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
232 match table.name.as_ref() {
233 Self::SPARSE_TABLE => Ok(true),
234 Self::RANGE_TABLE => Ok(true),
235 _ => Ok(false),
236 }
237 }
238
239 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
241 todo!()
242 }
243
244 async fn update_table(&self, _commit: TableCommit) -> iceberg::Result<Table> {
246 todo!()
247 }
248
249 #[expect(
250 clippy::disallowed_types,
251 reason = "iceberg catalog trait requires returning iceberg::Error"
252 )]
253 async fn register_table(
254 &self,
255 _table_ident: &TableIdent,
256 _metadata_location: String,
257 ) -> iceberg::Result<Table> {
258 Err(iceberg::Error::new(
259 iceberg::ErrorKind::Unexpected,
260 "register_table is not supported in mock catalog",
261 ))
262 }
263}