risingwave_connector/connector_common/iceberg/
mock_catalog.rs1use std::collections::HashMap;
16
17use async_trait::async_trait;
18use iceberg::io::FileIO;
19use iceberg::spec::{
20 NestedField, PrimitiveType, Schema, TableMetadataBuilder, Transform, Type,
21 UnboundPartitionField, UnboundPartitionSpec,
22};
23use iceberg::table::Table;
24use iceberg::{
25 Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
26};
27
28#[derive(Debug)]
30pub struct MockCatalog;
31
32impl MockCatalog {
33 const RANGE_TABLE: &'static str = "range_table";
34 const SPARSE_TABLE: &'static str = "sparse_table";
35}
36
37impl MockCatalog {
38 fn build_table(name: &str, schema: Schema, partition_spec: UnboundPartitionSpec) -> Table {
39 let file_io = FileIO::from_path("memory://").unwrap().build().unwrap();
40 let table_creation = TableCreation {
41 name: "ignore".to_owned(),
42 location: Some("1".to_owned()),
43 schema,
44 partition_spec: Some(partition_spec),
45 sort_order: None,
46 properties: HashMap::new(),
47 };
48 Table::builder()
49 .identifier(TableIdent::new(
50 NamespaceIdent::new("mock_namespace".to_owned()),
51 name.to_owned(),
52 ))
53 .file_io(file_io)
54 .metadata(
55 TableMetadataBuilder::from_table_creation(table_creation)
56 .unwrap()
57 .build()
58 .unwrap()
59 .metadata,
60 )
61 .build()
62 .unwrap()
63 }
64
65 fn sparse_table() -> Table {
66 Self::build_table(
67 Self::SPARSE_TABLE,
68 Schema::builder()
69 .with_fields(vec![
70 NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Int), true).into(),
71 NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Long), true).into(),
72 NestedField::new(3, "v3", Type::Primitive(PrimitiveType::String), true).into(),
73 NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Time), true).into(),
74 ])
75 .build()
76 .unwrap(),
77 UnboundPartitionSpec::builder()
78 .with_spec_id(1)
79 .add_partition_fields(vec![
80 UnboundPartitionField {
81 source_id: 1,
82 field_id: Some(5),
83 name: "f1".to_owned(),
84 transform: Transform::Identity,
85 },
86 UnboundPartitionField {
87 source_id: 2,
88 field_id: Some(6),
89 name: "f2".to_owned(),
90 transform: Transform::Bucket(1),
91 },
92 UnboundPartitionField {
93 source_id: 3,
94 field_id: Some(7),
95 name: "f3".to_owned(),
96 transform: Transform::Truncate(1),
97 },
98 UnboundPartitionField {
99 source_id: 4,
100 field_id: Some(8),
101 name: "f4".to_owned(),
102 transform: Transform::Void,
103 },
104 ])
105 .unwrap()
106 .build(),
107 )
108 }
109
110 fn range_table() -> Table {
111 Self::build_table(
112 Self::RANGE_TABLE,
113 Schema::builder()
114 .with_fields(vec![
115 NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Date), true).into(),
116 NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Timestamp), true)
117 .into(),
118 NestedField::new(3, "v3", Type::Primitive(PrimitiveType::Timestamptz), true)
119 .into(),
120 NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Timestamptz), true)
121 .into(),
122 ])
123 .build()
124 .unwrap(),
125 UnboundPartitionSpec::builder()
126 .with_spec_id(1)
127 .add_partition_fields(vec![
128 UnboundPartitionField {
129 source_id: 1,
130 field_id: Some(5),
131 name: "f1".to_owned(),
132 transform: Transform::Year,
133 },
134 UnboundPartitionField {
135 source_id: 2,
136 field_id: Some(6),
137 name: "f2".to_owned(),
138 transform: Transform::Month,
139 },
140 UnboundPartitionField {
141 source_id: 3,
142 field_id: Some(7),
143 name: "f3".to_owned(),
144 transform: Transform::Day,
145 },
146 UnboundPartitionField {
147 source_id: 4,
148 field_id: Some(8),
149 name: "f4".to_owned(),
150 transform: Transform::Hour,
151 },
152 ])
153 .unwrap()
154 .build(),
155 )
156 }
157}
158
159#[async_trait]
160impl CatalogV2 for MockCatalog {
161 async fn list_namespaces(
163 &self,
164 _parent: Option<&NamespaceIdent>,
165 ) -> iceberg::Result<Vec<NamespaceIdent>> {
166 todo!()
167 }
168
169 async fn create_namespace(
171 &self,
172 _namespace: &iceberg::NamespaceIdent,
173 _properties: HashMap<String, String>,
174 ) -> iceberg::Result<iceberg::Namespace> {
175 todo!()
176 }
177
178 async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
180 todo!()
181 }
182
183 async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result<bool> {
185 todo!()
186 }
187
188 async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
190 todo!()
191 }
192
193 async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
195 todo!()
196 }
197
198 async fn update_namespace(
199 &self,
200 _namespace: &NamespaceIdent,
201 _properties: HashMap<String, String>,
202 ) -> iceberg::Result<()> {
203 todo!()
204 }
205
206 async fn create_table(
208 &self,
209 _namespace: &NamespaceIdent,
210 _creation: TableCreation,
211 ) -> iceberg::Result<Table> {
212 todo!()
213 }
214
215 async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
217 match table.name.as_ref() {
218 Self::SPARSE_TABLE => Ok(Self::sparse_table()),
219 Self::RANGE_TABLE => Ok(Self::range_table()),
220 _ => unimplemented!("table {} not found", table.name()),
221 }
222 }
223
224 async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
226 todo!()
227 }
228
229 async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
231 match table.name.as_ref() {
232 Self::SPARSE_TABLE => Ok(true),
233 Self::RANGE_TABLE => Ok(true),
234 _ => Ok(false),
235 }
236 }
237
238 async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
240 todo!()
241 }
242
243 async fn update_table(&self, _commit: TableCommit) -> iceberg::Result<Table> {
245 todo!()
246 }
247}