risingwave_connector/connector_common/iceberg/
mock_catalog.rsuse std::collections::HashMap;
use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::{
NestedField, PrimitiveType, Schema, TableMetadataBuilder, Transform, Type,
UnboundPartitionField, UnboundPartitionSpec,
};
use iceberg::table::Table;
use iceberg::{
Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
};
#[derive(Debug)]
pub struct MockCatalog;
impl MockCatalog {
const RANGE_TABLE: &'static str = "range_table";
const SPARSE_TABLE: &'static str = "sparse_table";
}
impl MockCatalog {
fn build_table(name: &str, schema: Schema, partition_spec: UnboundPartitionSpec) -> Table {
let file_io = FileIO::from_path("memory://").unwrap().build().unwrap();
let table_creation = TableCreation {
name: "ignore".to_owned(),
location: Some("1".to_owned()),
schema,
partition_spec: Some(partition_spec),
sort_order: None,
properties: HashMap::new(),
};
Table::builder()
.identifier(TableIdent::new(
NamespaceIdent::new("mock_namespace".to_owned()),
name.to_owned(),
))
.file_io(file_io)
.metadata(
TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap()
.metadata,
)
.build()
.unwrap()
}
fn sparse_table() -> Table {
Self::build_table(
Self::SPARSE_TABLE,
Schema::builder()
.with_fields(vec![
NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Int), true).into(),
NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Long), true).into(),
NestedField::new(3, "v3", Type::Primitive(PrimitiveType::String), true).into(),
NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Time), true).into(),
])
.build()
.unwrap(),
UnboundPartitionSpec::builder()
.with_spec_id(1)
.add_partition_fields(vec![
UnboundPartitionField {
source_id: 1,
field_id: Some(5),
name: "f1".to_owned(),
transform: Transform::Identity,
},
UnboundPartitionField {
source_id: 2,
field_id: Some(6),
name: "f2".to_owned(),
transform: Transform::Bucket(1),
},
UnboundPartitionField {
source_id: 3,
field_id: Some(7),
name: "f3".to_owned(),
transform: Transform::Truncate(1),
},
UnboundPartitionField {
source_id: 4,
field_id: Some(8),
name: "f4".to_owned(),
transform: Transform::Void,
},
])
.unwrap()
.build(),
)
}
fn range_table() -> Table {
Self::build_table(
Self::RANGE_TABLE,
Schema::builder()
.with_fields(vec![
NestedField::new(1, "v1", Type::Primitive(PrimitiveType::Date), true).into(),
NestedField::new(2, "v2", Type::Primitive(PrimitiveType::Timestamp), true)
.into(),
NestedField::new(3, "v3", Type::Primitive(PrimitiveType::Timestamptz), true)
.into(),
NestedField::new(4, "v4", Type::Primitive(PrimitiveType::Timestamptz), true)
.into(),
])
.build()
.unwrap(),
UnboundPartitionSpec::builder()
.with_spec_id(1)
.add_partition_fields(vec![
UnboundPartitionField {
source_id: 1,
field_id: Some(5),
name: "f1".to_owned(),
transform: Transform::Year,
},
UnboundPartitionField {
source_id: 2,
field_id: Some(6),
name: "f2".to_owned(),
transform: Transform::Month,
},
UnboundPartitionField {
source_id: 3,
field_id: Some(7),
name: "f3".to_owned(),
transform: Transform::Day,
},
UnboundPartitionField {
source_id: 4,
field_id: Some(8),
name: "f4".to_owned(),
transform: Transform::Hour,
},
])
.unwrap()
.build(),
)
}
}
#[async_trait]
impl CatalogV2 for MockCatalog {
async fn list_namespaces(
&self,
_parent: Option<&NamespaceIdent>,
) -> iceberg::Result<Vec<NamespaceIdent>> {
todo!()
}
async fn create_namespace(
&self,
_namespace: &iceberg::NamespaceIdent,
_properties: HashMap<String, String>,
) -> iceberg::Result<iceberg::Namespace> {
todo!()
}
async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Namespace> {
todo!()
}
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result<bool> {
todo!()
}
async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> {
todo!()
}
async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result<Vec<TableIdent>> {
todo!()
}
async fn update_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
) -> iceberg::Result<()> {
todo!()
}
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
) -> iceberg::Result<Table> {
todo!()
}
async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
match table.name.as_ref() {
Self::SPARSE_TABLE => Ok(Self::sparse_table()),
Self::RANGE_TABLE => Ok(Self::range_table()),
_ => unimplemented!("table {} not found", table.name()),
}
}
async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
todo!()
}
async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
match table.name.as_ref() {
Self::SPARSE_TABLE => Ok(true),
Self::RANGE_TABLE => Ok(true),
_ => Ok(false),
}
}
async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> {
todo!()
}
async fn update_table(&self, _commit: TableCommit) -> iceberg::Result<Table> {
todo!()
}
}