mod compaction;
mod prometheus;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
use iceberg::spec::{
DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
};
use iceberg::table::Table;
use iceberg::transaction::Transaction;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::base_writer::equality_delete_writer::{
EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
};
use iceberg::writer::base_writer::sort_position_delete_writer::{
SortPositionDeleteWriterBuilder, POSITION_DELETE_SCHEMA,
};
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::function_writer::equality_delta_writer::{
EqualityDeltaWriterBuilder, DELETE_OP, INSERT_OP,
};
use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use itertools::Itertools;
use parquet::file::properties::WriterProperties;
use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
use regex::Regex;
use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
use risingwave_common::array::arrow::arrow_schema_iceberg::{
self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
Schema as ArrowSchema, SchemaRef,
};
use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
use risingwave_pb::connector_service::SinkMetadata;
use serde_derive::Deserialize;
use serde_json::from_value;
use serde_with::{serde_as, DisplayFromStr};
use thiserror_ext::AsReport;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
use url::Url;
use uuid::Uuid;
use with_options::WithOptions;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
};
use super::{
Sink, SinkError, SinkWriterMetrics, SinkWriterParam, GLOBAL_SINK_METRICS,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::connector_common::IcebergCommon;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::iceberg::compaction::spawn_compaction_client;
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
pub const ICEBERG_SINK: &str = "iceberg";
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
pub struct IcebergConfig {
pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub force_append_only: bool,
#[serde(flatten)]
common: IcebergCommon,
#[serde(
rename = "primary_key",
default,
deserialize_with = "deserialize_optional_string_seq_from_string"
)]
pub primary_key: Option<Vec<String>>,
#[serde(skip)]
pub java_catalog_props: HashMap<String, String>,
#[serde(default)]
pub partition_by: Option<String>,
#[serde(default = "default_commit_checkpoint_interval")]
#[serde_as(as = "DisplayFromStr")]
pub commit_checkpoint_interval: u64,
#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub create_table_if_not_exists: bool,
}
impl IcebergConfig {
pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
let mut config =
serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_UPSERT
)));
}
if config.r#type == SINK_TYPE_UPSERT {
if let Some(primary_key) = &config.primary_key {
if primary_key.is_empty() {
return Err(SinkError::Config(anyhow!(
"Primary_key must not be empty in {}",
SINK_TYPE_UPSERT
)));
}
} else {
return Err(SinkError::Config(anyhow!(
"Must set primary_key in {}",
SINK_TYPE_UPSERT
)));
}
}
if config.common.catalog_name.is_none()
&& config.common.catalog_type.as_deref() != Some("storage")
{
return Err(SinkError::Config(anyhow!(
"catalog.name must be set for non-storage catalog"
)));
}
config.java_catalog_props = values
.iter()
.filter(|(k, _v)| {
k.starts_with("catalog.")
&& k != &"catalog.uri"
&& k != &"catalog.type"
&& k != &"catalog.name"
})
.map(|(k, v)| (k[8..].to_string(), v.to_string()))
.collect();
if config.commit_checkpoint_interval == 0 {
return Err(SinkError::Config(anyhow!(
"`commit_checkpoint_interval` must be greater than 0"
)));
}
Ok(config)
}
pub fn catalog_type(&self) -> &str {
self.common.catalog_type()
}
pub async fn load_table(&self) -> Result<Table> {
self.common
.load_table(&self.java_catalog_props)
.await
.map_err(Into::into)
}
pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
self.common
.create_catalog(&self.java_catalog_props)
.await
.map_err(Into::into)
}
pub fn full_table_name(&self) -> Result<TableIdent> {
self.common.full_table_name().map_err(Into::into)
}
}
pub struct IcebergSink {
config: IcebergConfig,
param: SinkParam,
unique_column_ids: Option<Vec<usize>>,
}
impl TryFrom<SinkParam> for IcebergSink {
type Error = SinkError;
fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let config = IcebergConfig::from_btreemap(param.properties.clone())?;
IcebergSink::new(config, param)
}
}
impl Debug for IcebergSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IcebergSink")
.field("config", &self.config)
.finish()
}
}
impl IcebergSink {
async fn create_and_validate_table(&self) -> Result<Table> {
if self.config.create_table_if_not_exists {
self.create_table_if_not_exists().await?;
}
let table = self
.config
.load_table()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let sink_schema = self.param.schema();
let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
Ok(table)
}
async fn create_table_if_not_exists(&self) -> Result<()> {
let catalog = self.config.create_catalog().await?;
let table_id = self
.config
.full_table_name()
.context("Unable to parse table name")?;
if !catalog
.table_exists(&table_id)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))?
{
let namespace = if let Some(database_name) = &self.config.common.database_name {
NamespaceIdent::new(database_name.clone())
} else {
bail!("database name must be set if you want to create table")
};
let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
let arrow_fields = self
.param
.columns
.iter()
.map(|column| {
Ok(iceberg_create_table_arrow_convert
.to_arrow_field(&column.name, &column.data_type)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context(format!(
"failed to convert {}: {} to arrow type",
&column.name, &column.data_type
))?)
})
.collect::<Result<Vec<ArrowField>>>()?;
let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to convert arrow schema to iceberg schema")?;
let location = {
let mut names = namespace.clone().inner();
names.push(self.config.common.table_name.clone());
match &self.config.common.warehouse_path {
Some(warehouse_path) => {
let url = Url::parse(warehouse_path);
if url.is_err() {
if self.config.common.catalog_type() == "rest"
|| self.config.common.catalog_type() == "rest_rust"
{
None
} else {
bail!(format!("Invalid warehouse path: {}", warehouse_path))
}
} else if warehouse_path.ends_with('/') {
Some(format!("{}{}", warehouse_path, names.join("/")))
} else {
Some(format!("{}/{}", warehouse_path, names.join("/")))
}
}
None => None,
}
};
let partition_spec = match &self.config.partition_by {
Some(partition_field) => {
let mut partition_fields = Vec::<UnboundPartitionField>::new();
let re = Regex::new(
r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?",
)
.unwrap();
if !re.is_match(partition_field) {
bail!(format!("Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)", partition_field))
}
let caps = re.captures_iter(partition_field);
for (i, mat) in caps.enumerate() {
let (column, transform) =
if mat.name("n").is_none() && mat.name("field").is_none() {
(&mat["transform"], Transform::Identity)
} else {
let mut func = mat["transform"].to_owned();
if func == "bucket" || func == "truncate" {
let n = &mat
.name("n")
.ok_or_else(|| {
SinkError::Iceberg(anyhow!(
"The `n` must be set with `bucket` and `truncate`"
))
})?
.as_str();
func = format!("{func}[{n}]");
}
(
&mat["field"],
Transform::from_str(&func)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))?,
)
};
match iceberg_schema.field_id_by_name(column) {
Some(id) => partition_fields.push(
UnboundPartitionField::builder()
.source_id(id)
.transform(transform)
.name(column.to_owned())
.field_id(i as i32)
.build(),
),
None => bail!(format!(
"Partition source column does not exist in schema: {}",
column
)),
};
}
Some(
UnboundPartitionSpec::builder()
.with_spec_id(0)
.add_partition_fields(partition_fields)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to add partition columns")?
.build(),
)
}
None => None,
};
let table_creation_builder = TableCreation::builder()
.name(self.config.common.table_name.clone())
.schema(iceberg_schema);
let table_creation = match (location, partition_spec) {
(Some(location), Some(partition_spec)) => table_creation_builder
.location(location)
.partition_spec(partition_spec)
.build(),
(Some(location), None) => table_creation_builder.location(location).build(),
(None, Some(partition_spec)) => table_creation_builder
.partition_spec(partition_spec)
.build(),
(None, None) => table_creation_builder.build(),
};
catalog
.create_table(&namespace, table_creation)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to create iceberg table")?;
}
Ok(())
}
pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
if let Some(pk) = &config.primary_key {
let mut unique_column_ids = Vec::with_capacity(pk.len());
for col_name in pk {
let id = param
.columns
.iter()
.find(|col| col.name.as_str() == col_name)
.ok_or_else(|| {
SinkError::Config(anyhow!(
"Primary key column {} not found in sink schema",
col_name
))
})?
.column_id
.get_id() as usize;
unique_column_ids.push(id);
}
Some(unique_column_ids)
} else {
unreachable!()
}
} else {
None
};
Ok(Self {
config,
param,
unique_column_ids,
})
}
}
impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type LogSinker = DecoupleCheckpointLogSinkerOf<CoordinatedSinkWriter<IcebergSinkWriter>>;
const SINK_NAME: &'static str = ICEBERG_SINK;
async fn validate(&self) -> Result<()> {
if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
bail!("Snowflake catalog only supports iceberg sources");
}
if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
risingwave_common::license::Feature::IcebergSinkWithGlue
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
}
let _ = self.create_and_validate_table().await?;
Ok(())
}
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let table = self.create_and_validate_table().await?;
let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
} else {
IcebergSinkWriter::new_append_only(table, &writer_param).await?
};
let metrics = SinkWriterMetrics::new(&writer_param);
let writer = CoordinatedSinkWriter::new(
writer_param
.meta_client
.expect("should have meta client")
.sink_coordinate_client()
.await,
self.param.clone(),
writer_param.vnode_bitmap.ok_or_else(|| {
SinkError::Remote(anyhow!(
"sink needs coordination and should not have singleton input"
))
})?,
inner,
)
.await?;
let commit_checkpoint_interval =
NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);
Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
metrics,
commit_checkpoint_interval,
))
}
async fn new_coordinator(&self) -> Result<Self::Coordinator> {
let catalog = self.config.create_catalog().await?;
let table = self.create_and_validate_table().await?;
let (commit_tx, finish_tx) = if self.config.common.enable_config_load.unwrap_or(false) {
let (commit_tx, finish_tx) = spawn_compaction_client(&self.config)?;
(Some(commit_tx), Some(finish_tx))
} else {
(None, None)
};
Ok(IcebergSinkCommitter {
catalog,
table,
commit_notifier: commit_tx,
_compact_task_guard: finish_tx,
})
}
}
pub struct IcebergSinkWriter {
writer: IcebergWriterDispatch,
arrow_schema: SchemaRef,
metrics: IcebergWriterMetrics,
table: Table,
}
#[allow(clippy::type_complexity)]
enum IcebergWriterDispatch {
PartitionAppendOnly {
writer: Option<Box<dyn IcebergWriter>>,
writer_builder: MonitoredGeneralWriterBuilder<
FanoutPartitionWriterBuilder<
DataFileWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
>,
>,
},
NonpartitionAppendOnly {
writer: Option<Box<dyn IcebergWriter>>,
writer_builder: MonitoredGeneralWriterBuilder<
DataFileWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
>,
},
PartitionUpsert {
writer: Option<Box<dyn IcebergWriter>>,
writer_builder: MonitoredGeneralWriterBuilder<
FanoutPartitionWriterBuilder<
EqualityDeltaWriterBuilder<
DataFileWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
MonitoredPositionDeleteWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
EqualityDeleteFileWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
>,
>,
>,
arrow_schema_with_op_column: SchemaRef,
},
NonpartitionUpsert {
writer: Option<Box<dyn IcebergWriter>>,
writer_builder: MonitoredGeneralWriterBuilder<
EqualityDeltaWriterBuilder<
DataFileWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
MonitoredPositionDeleteWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
EqualityDeleteFileWriterBuilder<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
>,
>,
arrow_schema_with_op_column: SchemaRef,
},
}
impl IcebergWriterDispatch {
pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
match self {
IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
| IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
| IcebergWriterDispatch::PartitionUpsert { writer, .. }
| IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
}
}
}
pub struct IcebergWriterMetrics {
_write_qps: LabelGuardedIntCounter<3>,
_write_latency: LabelGuardedHistogram<3>,
write_bytes: LabelGuardedIntCounter<3>,
}
impl IcebergSinkWriter {
pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
let SinkWriterParam {
extra_partition_col_idx,
actor_id,
sink_id,
sink_name,
..
} = writer_param;
let metrics_labels = [
&actor_id.to_string(),
&sink_id.to_string(),
sink_name.as_str(),
];
let write_qps = GLOBAL_SINK_METRICS
.iceberg_write_qps
.with_guarded_label_values(&metrics_labels);
let write_latency = GLOBAL_SINK_METRICS
.iceberg_write_latency
.with_guarded_label_values(&metrics_labels);
let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
.iceberg_rolling_unflushed_data_file
.with_guarded_label_values(&metrics_labels);
let write_bytes = GLOBAL_SINK_METRICS
.iceberg_write_bytes
.with_guarded_label_values(&metrics_labels);
let schema = table.metadata().current_schema();
let partition_spec = table.metadata().default_partition_spec();
let unique_uuid_suffix = Uuid::now_v7();
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
schema.clone(),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
DefaultFileNameGenerator::new(
writer_param.actor_id.to_string(),
Some(unique_uuid_suffix.to_string()),
iceberg::spec::DataFileFormat::Parquet,
),
);
let data_file_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
Err(SinkError::Iceberg(anyhow!(
"Extra partition column is not supported in append-only mode"
)))
} else if partition_spec.fields().is_empty() {
let writer_builder = MonitoredGeneralWriterBuilder::new(
data_file_builder,
write_qps.clone(),
write_latency.clone(),
);
let inner_writer = Some(Box::new(
writer_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
) as Box<dyn IcebergWriter>);
Ok(Self {
arrow_schema: Arc::new(
schema_to_arrow_schema(table.metadata().current_schema())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
),
metrics: IcebergWriterMetrics {
_write_qps: write_qps,
_write_latency: write_latency,
write_bytes,
},
writer: IcebergWriterDispatch::NonpartitionAppendOnly {
writer: inner_writer,
writer_builder,
},
table,
})
} else {
let partition_builder = MonitoredGeneralWriterBuilder::new(
FanoutPartitionWriterBuilder::new(
data_file_builder,
partition_spec.clone(),
schema.clone(),
)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
write_qps.clone(),
write_latency.clone(),
);
let inner_writer = Some(Box::new(
partition_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
) as Box<dyn IcebergWriter>);
Ok(Self {
arrow_schema: Arc::new(
schema_to_arrow_schema(table.metadata().current_schema())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
),
metrics: IcebergWriterMetrics {
_write_qps: write_qps,
_write_latency: write_latency,
write_bytes,
},
writer: IcebergWriterDispatch::PartitionAppendOnly {
writer: inner_writer,
writer_builder: partition_builder,
},
table,
})
}
}
pub async fn new_upsert(
table: Table,
unique_column_ids: Vec<usize>,
writer_param: &SinkWriterParam,
) -> Result<Self> {
let SinkWriterParam {
extra_partition_col_idx,
actor_id,
sink_id,
sink_name,
..
} = writer_param;
let metrics_labels = [
&actor_id.to_string(),
&sink_id.to_string(),
sink_name.as_str(),
];
let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
let write_qps = GLOBAL_SINK_METRICS
.iceberg_write_qps
.with_guarded_label_values(&metrics_labels);
let write_latency = GLOBAL_SINK_METRICS
.iceberg_write_latency
.with_guarded_label_values(&metrics_labels);
let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
.iceberg_rolling_unflushed_data_file
.with_guarded_label_values(&metrics_labels);
let position_delete_cache_num = GLOBAL_SINK_METRICS
.iceberg_position_delete_cache_num
.with_guarded_label_values(&metrics_labels);
let write_bytes = GLOBAL_SINK_METRICS
.iceberg_write_bytes
.with_guarded_label_values(&metrics_labels);
let schema = table.metadata().current_schema();
let partition_spec = table.metadata().default_partition_spec();
let unique_uuid_suffix = Uuid::now_v7();
let data_file_builder = {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
schema.clone(),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
DefaultFileNameGenerator::new(
writer_param.actor_id.to_string(),
Some(unique_uuid_suffix.to_string()),
iceberg::spec::DataFileFormat::Parquet,
),
);
DataFileWriterBuilder::new(parquet_writer_builder.clone(), None)
};
let position_delete_builder = {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
POSITION_DELETE_SCHEMA.clone(),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
DefaultFileNameGenerator::new(
writer_param.actor_id.to_string(),
Some(format!("pos-del-{}", unique_uuid_suffix)),
iceberg::spec::DataFileFormat::Parquet,
),
);
MonitoredPositionDeleteWriterBuilder::new(
SortPositionDeleteWriterBuilder::new(parquet_writer_builder.clone(), 1024, None),
position_delete_cache_num,
)
};
let equality_delete_builder = {
let config = EqualityDeleteWriterConfig::new(
unique_column_ids.clone(),
table.metadata().current_schema().clone(),
None,
)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
Arc::new(
arrow_schema_to_schema(config.projected_arrow_schema_ref())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
DefaultFileNameGenerator::new(
writer_param.actor_id.to_string(),
Some(format!("eq-del-{}", unique_uuid_suffix)),
iceberg::spec::DataFileFormat::Parquet,
),
);
EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
};
let delta_builder = EqualityDeltaWriterBuilder::new(
data_file_builder,
position_delete_builder,
equality_delete_builder,
unique_column_ids,
);
if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
Err(SinkError::Iceberg(anyhow!(
"Extra partition column is not supported in upsert mode"
)))
} else if partition_spec.fields().is_empty() {
let writer_builder = MonitoredGeneralWriterBuilder::new(
delta_builder,
write_qps.clone(),
write_latency.clone(),
);
let inner_writer = Some(Box::new(
writer_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
) as Box<dyn IcebergWriter>);
let original_arrow_schema = Arc::new(
schema_to_arrow_schema(table.metadata().current_schema())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
);
let schema_with_extra_op_column = {
let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
new_fields.push(Arc::new(ArrowField::new(
"op".to_owned(),
ArrowDataType::Int32,
false,
)));
Arc::new(ArrowSchema::new(new_fields))
};
Ok(Self {
arrow_schema: original_arrow_schema,
metrics: IcebergWriterMetrics {
_write_qps: write_qps,
_write_latency: write_latency,
write_bytes,
},
table,
writer: IcebergWriterDispatch::NonpartitionUpsert {
writer: inner_writer,
writer_builder,
arrow_schema_with_op_column: schema_with_extra_op_column,
},
})
} else {
let original_arrow_schema = Arc::new(
schema_to_arrow_schema(table.metadata().current_schema())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
);
let schema_with_extra_op_column = {
let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
new_fields.push(Arc::new(ArrowField::new(
"op".to_owned(),
ArrowDataType::Int32,
false,
)));
Arc::new(ArrowSchema::new(new_fields))
};
let partition_builder = MonitoredGeneralWriterBuilder::new(
FanoutPartitionWriterBuilder::new_with_custom_schema(
delta_builder,
schema_with_extra_op_column.clone(),
partition_spec.clone(),
table.metadata().current_schema().clone(),
),
write_qps.clone(),
write_latency.clone(),
);
let inner_writer = Some(Box::new(
partition_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
) as Box<dyn IcebergWriter>);
Ok(Self {
arrow_schema: original_arrow_schema,
metrics: IcebergWriterMetrics {
_write_qps: write_qps,
_write_latency: write_latency,
write_bytes,
},
table,
writer: IcebergWriterDispatch::PartitionUpsert {
writer: inner_writer,
writer_builder: partition_builder,
arrow_schema_with_op_column: schema_with_extra_op_column,
},
})
}
}
}
#[async_trait]
impl SinkWriter for IcebergSinkWriter {
type CommitMetadata = Option<SinkMetadata>;
async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
Ok(())
}
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
match &mut self.writer {
IcebergWriterDispatch::PartitionAppendOnly {
writer,
writer_builder,
} => {
if writer.is_none() {
*writer = Some(Box::new(
writer_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
));
}
}
IcebergWriterDispatch::NonpartitionAppendOnly {
writer,
writer_builder,
} => {
if writer.is_none() {
*writer = Some(Box::new(
writer_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
));
}
}
IcebergWriterDispatch::PartitionUpsert {
writer,
writer_builder,
..
} => {
if writer.is_none() {
*writer = Some(Box::new(
writer_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
));
}
}
IcebergWriterDispatch::NonpartitionUpsert {
writer,
writer_builder,
..
} => {
if writer.is_none() {
*writer = Some(Box::new(
writer_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
));
}
}
};
let (mut chunk, ops) = chunk.compact().into_parts();
if ops.len() == 0 {
return Ok(());
}
let write_batch_size = chunk.estimated_heap_size();
let batch = match &self.writer {
IcebergWriterDispatch::PartitionAppendOnly { .. }
| IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
let filters =
chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
chunk.set_visibility(filters);
IcebergArrowConvert
.to_record_batch(self.arrow_schema.clone(), &chunk.compact())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?
}
IcebergWriterDispatch::PartitionUpsert {
arrow_schema_with_op_column,
..
}
| IcebergWriterDispatch::NonpartitionUpsert {
arrow_schema_with_op_column,
..
} => {
let chunk = IcebergArrowConvert
.to_record_batch(self.arrow_schema.clone(), &chunk)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let ops = Arc::new(Int32Array::from(
ops.iter()
.map(|op| match op {
Op::UpdateInsert | Op::Insert => INSERT_OP,
Op::UpdateDelete | Op::Delete => DELETE_OP,
})
.collect_vec(),
));
let mut columns = chunk.columns().to_vec();
columns.push(ops);
RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?
}
};
let writer = self.writer.get_writer().unwrap();
writer
.write(batch)
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
self.metrics.write_bytes.inc_by(write_batch_size as _);
Ok(())
}
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
if !is_checkpoint {
return Ok(None);
}
let close_result = match &mut self.writer {
IcebergWriterDispatch::PartitionAppendOnly {
writer,
writer_builder,
} => {
let close_result = if let Some(mut writer) = writer.take() {
Some(writer.close().await)
} else {
None
};
if let Ok(new_writer) = writer_builder.clone().build().await {
*writer = Some(Box::new(new_writer));
} else {
warn!("Failed to build new writer after close");
}
close_result
}
IcebergWriterDispatch::NonpartitionAppendOnly {
writer,
writer_builder,
} => {
let close_result = if let Some(mut writer) = writer.take() {
Some(writer.close().await)
} else {
None
};
if let Ok(new_writer) = writer_builder.clone().build().await {
*writer = Some(Box::new(new_writer));
} else {
warn!("Failed to build new writer after close");
}
close_result
}
IcebergWriterDispatch::PartitionUpsert {
writer,
writer_builder,
..
} => {
let close_result = if let Some(mut writer) = writer.take() {
Some(writer.close().await)
} else {
None
};
if let Ok(new_writer) = writer_builder.clone().build().await {
*writer = Some(Box::new(new_writer));
} else {
warn!("Failed to build new writer after close");
}
close_result
}
IcebergWriterDispatch::NonpartitionUpsert {
writer,
writer_builder,
..
} => {
let close_result = if let Some(mut writer) = writer.take() {
Some(writer.close().await)
} else {
None
};
if let Ok(new_writer) = writer_builder.clone().build().await {
*writer = Some(Box::new(new_writer));
} else {
warn!("Failed to build new writer after close");
}
close_result
}
};
match close_result {
Some(Ok(result)) => {
let version = self.table.metadata().format_version() as u8;
let partition_type = self.table.metadata().default_partition_type();
let data_files = result
.into_iter()
.map(|f| {
SerializedDataFile::try_from(f, partition_type, version == 1)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
data_files,
schema_id: self.table.metadata().current_schema_id(),
partition_spec_id: self.table.metadata().default_partition_spec_id(),
})?))
}
Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
}
}
async fn abort(&mut self) -> Result<()> {
Ok(())
}
}
const SCHEMA_ID: &str = "schema_id";
const PARTITION_SPEC_ID: &str = "partition_spec_id";
const DATA_FILES: &str = "data_files";
#[derive(Default)]
struct IcebergCommitResult {
schema_id: i32,
partition_spec_id: i32,
data_files: Vec<SerializedDataFile>,
}
impl IcebergCommitResult {
fn try_from(value: &SinkMetadata) -> Result<Self> {
if let Some(Serialized(v)) = &value.metadata {
let mut values = if let serde_json::Value::Object(v) =
serde_json::from_slice::<serde_json::Value>(&v.metadata)
.context("Can't parse iceberg sink metadata")?
{
v
} else {
bail!("iceberg sink metadata should be an object");
};
let schema_id;
if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
schema_id = value
.as_u64()
.ok_or_else(|| anyhow!("schema_id should be a u64"))?;
} else {
bail!("iceberg sink metadata should have schema_id");
}
let partition_spec_id;
if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
partition_spec_id = value
.as_u64()
.ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
} else {
bail!("iceberg sink metadata should have partition_spec_id");
}
let data_files: Vec<SerializedDataFile>;
if let serde_json::Value::Array(values) = values
.remove(DATA_FILES)
.ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
{
data_files = values
.into_iter()
.map(from_value::<SerializedDataFile>)
.collect::<std::result::Result<_, _>>()
.unwrap();
} else {
bail!("iceberg sink metadata should have data_files object");
}
Ok(Self {
schema_id: schema_id as i32,
partition_spec_id: partition_spec_id as i32,
data_files,
})
} else {
bail!("Can't create iceberg sink write result from empty data!")
}
}
}
impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
type Error = SinkError;
fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
let json_data_files = serde_json::Value::Array(
value
.data_files
.iter()
.map(serde_json::to_value)
.collect::<std::result::Result<Vec<serde_json::Value>, _>>()
.context("Can't serialize data files to json")?,
);
let json_value = serde_json::Value::Object(
vec![
(
SCHEMA_ID.to_owned(),
serde_json::Value::Number(value.schema_id.into()),
),
(
PARTITION_SPEC_ID.to_owned(),
serde_json::Value::Number(value.partition_spec_id.into()),
),
(DATA_FILES.to_owned(), json_data_files),
]
.into_iter()
.collect(),
);
Ok(SinkMetadata {
metadata: Some(Serialized(SerializedMetadata {
metadata: serde_json::to_vec(&json_value)
.context("Can't serialize iceberg sink metadata")?,
})),
})
}
}
pub struct IcebergSinkCommitter {
catalog: Arc<dyn Catalog>,
table: Table,
commit_notifier: Option<mpsc::UnboundedSender<()>>,
_compact_task_guard: Option<oneshot::Sender<()>>,
}
#[async_trait::async_trait]
impl SinkCommitCoordinator for IcebergSinkCommitter {
async fn init(&mut self) -> Result<()> {
tracing::info!("Iceberg commit coordinator inited.");
Ok(())
}
async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
tracing::info!("Starting iceberg commit in epoch {epoch}.");
let write_results = metadata
.iter()
.map(IcebergCommitResult::try_from)
.collect::<Result<Vec<IcebergCommitResult>>>()?;
if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
tracing::debug!(?epoch, "no data to commit");
return Ok(());
}
if write_results
.iter()
.any(|r| r.schema_id != write_results[0].schema_id)
|| write_results
.iter()
.any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
{
return Err(SinkError::Iceberg(anyhow!(
"schema_id and partition_spec_id should be the same in all write results"
)));
}
self.table = self
.catalog
.clone()
.load_table(self.table.identifier())
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let Some(schema) = self
.table
.metadata()
.schema_by_id(write_results[0].schema_id)
else {
return Err(SinkError::Iceberg(anyhow!(
"Can't find schema by id {}",
write_results[0].schema_id
)));
};
let Some(partition_spec) = self
.table
.metadata()
.partition_spec_by_id(write_results[0].partition_spec_id)
else {
return Err(SinkError::Iceberg(anyhow!(
"Can't find partition spec by id {}",
write_results[0].partition_spec_id
)));
};
let partition_type = partition_spec
.as_ref()
.clone()
.partition_type(schema)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let data_files = write_results
.into_iter()
.flat_map(|r| {
r.data_files.into_iter().map(|f| {
f.try_into(&partition_type, schema)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))
})
})
.collect::<Result<Vec<DataFile>>>()?;
let txn = Transaction::new(&self.table);
let mut append_action = txn
.fast_append(None, vec![])
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
append_action
.add_data_files(data_files)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let tx = append_action.apply().await.map_err(|err| {
tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
SinkError::Iceberg(anyhow!(err))
})?;
let table = tx
.commit_dyn(self.catalog.as_ref())
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
self.table = table;
if let Some(commit_notifier) = &mut self.commit_notifier {
if commit_notifier.send(()).is_err() {
warn!("failed to notify commit");
}
}
tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
Ok(())
}
}
const MAP_KEY: &str = "key";
const MAP_VALUE: &str = "value";
fn get_fields<'a>(
our_field_type: &'a risingwave_common::types::DataType,
data_type: &ArrowDataType,
schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
) -> Option<ArrowFields> {
match data_type {
ArrowDataType::Struct(fields) => {
match our_field_type {
risingwave_common::types::DataType::Struct(struct_fields) => {
struct_fields.iter().for_each(|(name, data_type)| {
let res = schema_fields.insert(name, data_type);
assert!(res.is_none())
});
}
risingwave_common::types::DataType::Map(map_fields) => {
schema_fields.insert(MAP_KEY, map_fields.key());
schema_fields.insert(MAP_VALUE, map_fields.value());
}
risingwave_common::types::DataType::List(list_field) => {
list_field.as_struct().iter().for_each(|(name, data_type)| {
let res = schema_fields.insert(name, data_type);
assert!(res.is_none())
});
}
_ => {}
};
Some(fields.clone())
}
ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
get_fields(our_field_type, field.data_type(), schema_fields)
}
_ => None, }
}
fn check_compatibility(
schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
fields: &ArrowFields,
) -> anyhow::Result<bool> {
for arrow_field in fields {
let our_field_type = schema_fields
.get(arrow_field.name().as_str())
.ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
let converted_arrow_data_type = IcebergArrowConvert
.to_arrow_field("", our_field_type)
.map_err(|e| anyhow!(e))?
.data_type()
.clone();
let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
(ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
(ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
(ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
(ArrowDataType::List(_), ArrowDataType::List(field))
| (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
let mut schema_fields = HashMap::new();
get_fields(our_field_type, field.data_type(), &mut schema_fields)
.map_or(true, |fields| {
check_compatibility(schema_fields, &fields).unwrap()
})
}
(ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
let mut schema_fields = HashMap::new();
our_field_type
.as_struct()
.iter()
.for_each(|(name, data_type)| {
let res = schema_fields.insert(name, data_type);
assert!(res.is_none())
});
check_compatibility(schema_fields, fields)?
}
(left, right) => left.equals_datatype(right),
};
if !compatible {
bail!("field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
arrow_field.name(), converted_arrow_data_type, arrow_field.data_type()
);
}
}
Ok(true)
}
pub fn try_matches_arrow_schema(
rw_schema: &Schema,
arrow_schema: &ArrowSchema,
) -> anyhow::Result<()> {
if rw_schema.fields.len() != arrow_schema.fields().len() {
bail!(
"Schema length mismatch, risingwave is {}, and iceberg is {}",
rw_schema.fields.len(),
arrow_schema.fields.len()
);
}
let mut schema_fields = HashMap::new();
rw_schema.fields.iter().for_each(|field| {
let res = schema_fields.insert(field.name.as_str(), &field.data_type);
assert!(res.is_none())
});
check_compatibility(schema_fields, &arrow_schema.fields)?;
Ok(())
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
use risingwave_common::catalog::Field;
use risingwave_common::types::{MapType, StructType};
use crate::connector_common::IcebergCommon;
use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
use crate::sink::iceberg::IcebergConfig;
use crate::source::DataType;
#[test]
fn test_compatible_arrow_schema() {
use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
use super::*;
let risingwave_schema = Schema::new(vec![
Field::with_name(DataType::Int32, "a"),
Field::with_name(DataType::Int32, "b"),
Field::with_name(DataType::Int32, "c"),
]);
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("a", ArrowDataType::Int32, false),
ArrowField::new("b", ArrowDataType::Int32, false),
ArrowField::new("c", ArrowDataType::Int32, false),
]);
try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
let risingwave_schema = Schema::new(vec![
Field::with_name(DataType::Int32, "d"),
Field::with_name(DataType::Int32, "c"),
Field::with_name(DataType::Int32, "a"),
Field::with_name(DataType::Int32, "b"),
]);
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("a", ArrowDataType::Int32, false),
ArrowField::new("b", ArrowDataType::Int32, false),
ArrowField::new("d", ArrowDataType::Int32, false),
ArrowField::new("c", ArrowDataType::Int32, false),
]);
try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
let risingwave_schema = Schema::new(vec![
Field::with_name(
DataType::Struct(StructType::new(vec![
("a1", DataType::Int32),
(
"a2",
DataType::Struct(StructType::new(vec![
("a21", DataType::Bytea),
(
"a22",
DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
),
])),
),
])),
"a",
),
Field::with_name(
DataType::List(Box::new(DataType::Struct(StructType::new(vec![
("b1", DataType::Int32),
("b2", DataType::Bytea),
(
"b3",
DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
),
])))),
"b",
),
Field::with_name(
DataType::Map(MapType::from_kv(
DataType::Varchar,
DataType::List(Box::new(DataType::Struct(StructType::new([
("c1", DataType::Int32),
("c2", DataType::Bytea),
(
"c3",
DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
),
])))),
)),
"c",
),
]);
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new(
"a",
ArrowDataType::Struct(ArrowFields::from(vec![
ArrowField::new("a1", ArrowDataType::Int32, false),
ArrowField::new(
"a2",
ArrowDataType::Struct(ArrowFields::from(vec![
ArrowField::new("a21", ArrowDataType::LargeBinary, false),
ArrowField::new_map(
"a22",
"entries",
ArrowFieldRef::new(ArrowField::new(
"key",
ArrowDataType::Utf8,
false,
)),
ArrowFieldRef::new(ArrowField::new(
"value",
ArrowDataType::Utf8,
false,
)),
false,
false,
),
])),
false,
),
])),
false,
),
ArrowField::new(
"b",
ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
ArrowDataType::Struct(ArrowFields::from(vec![
ArrowField::new("b1", ArrowDataType::Int32, false),
ArrowField::new("b2", ArrowDataType::LargeBinary, false),
ArrowField::new_map(
"b3",
"entries",
ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
ArrowFieldRef::new(ArrowField::new(
"value",
ArrowDataType::Utf8,
false,
)),
false,
false,
),
])),
false,
))),
false,
),
ArrowField::new_map(
"c",
"entries",
ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
ArrowFieldRef::new(ArrowField::new(
"value",
ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
ArrowDataType::Struct(ArrowFields::from(vec![
ArrowField::new("c1", ArrowDataType::Int32, false),
ArrowField::new("c2", ArrowDataType::LargeBinary, false),
ArrowField::new_map(
"c3",
"entries",
ArrowFieldRef::new(ArrowField::new(
"key",
ArrowDataType::Utf8,
false,
)),
ArrowFieldRef::new(ArrowField::new(
"value",
ArrowDataType::Utf8,
false,
)),
false,
false,
),
])),
false,
))),
false,
)),
false,
false,
),
]);
try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
}
#[test]
fn test_parse_iceberg_config() {
let values = [
("connector", "iceberg"),
("type", "upsert"),
("primary_key", "v1"),
("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
("warehouse.path", "s3://iceberg"),
("s3.endpoint", "http://127.0.0.1:9301"),
("s3.access.key", "hummockadmin"),
("s3.secret.key", "hummockadmin"),
("s3.path.style.access", "true"),
("s3.region", "us-east-1"),
("catalog.type", "jdbc"),
("catalog.name", "demo"),
("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
("catalog.jdbc.user", "admin"),
("catalog.jdbc.password", "123456"),
("database.name", "demo_db"),
("table.name", "demo_table"),
]
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();
let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
let expected_iceberg_config = IcebergConfig {
common: IcebergCommon {
warehouse_path: Some("s3://iceberg".to_owned()),
catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
region: Some("us-east-1".to_owned()),
endpoint: Some("http://127.0.0.1:9301".to_owned()),
access_key: Some("hummockadmin".to_owned()),
secret_key: Some("hummockadmin".to_owned()),
gcs_credential: None,
catalog_type: Some("jdbc".to_owned()),
glue_id: None,
catalog_name: Some("demo".to_owned()),
database_name: Some("demo_db".to_owned()),
table_name: "demo_table".to_owned(),
path_style_access: Some(true),
credential: None,
oauth2_server_uri: None,
scope: None,
token: None,
enable_config_load: None,
},
r#type: "upsert".to_owned(),
force_append_only: false,
primary_key: Some(vec!["v1".to_owned()]),
partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect(),
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
create_table_if_not_exists: false,
};
assert_eq!(iceberg_config, expected_iceberg_config);
assert_eq!(
&iceberg_config.common.full_table_name().unwrap().to_string(),
"demo_db.demo_table"
);
}
async fn test_create_catalog(configs: BTreeMap<String, String>) {
let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
let table = iceberg_config.load_table().await.unwrap();
println!("{:?}", table.identifier());
}
#[tokio::test]
#[ignore]
async fn test_storage_catalog() {
let values = [
("connector", "iceberg"),
("type", "append-only"),
("force_append_only", "true"),
("s3.endpoint", "http://127.0.0.1:9301"),
("s3.access.key", "hummockadmin"),
("s3.secret.key", "hummockadmin"),
("s3.region", "us-east-1"),
("s3.path.style.access", "true"),
("catalog.name", "demo"),
("catalog.type", "storage"),
("warehouse.path", "s3://icebergdata/demo"),
("database.name", "s1"),
("table.name", "t1"),
]
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();
test_create_catalog(values).await;
}
#[tokio::test]
#[ignore]
async fn test_rest_catalog() {
let values = [
("connector", "iceberg"),
("type", "append-only"),
("force_append_only", "true"),
("s3.endpoint", "http://127.0.0.1:9301"),
("s3.access.key", "hummockadmin"),
("s3.secret.key", "hummockadmin"),
("s3.region", "us-east-1"),
("s3.path.style.access", "true"),
("catalog.name", "demo"),
("catalog.type", "rest"),
("catalog.uri", "http://192.168.167.4:8181"),
("warehouse.path", "s3://icebergdata/demo"),
("database.name", "s1"),
("table.name", "t1"),
]
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();
test_create_catalog(values).await;
}
#[tokio::test]
#[ignore]
async fn test_jdbc_catalog() {
let values = [
("connector", "iceberg"),
("type", "append-only"),
("force_append_only", "true"),
("s3.endpoint", "http://127.0.0.1:9301"),
("s3.access.key", "hummockadmin"),
("s3.secret.key", "hummockadmin"),
("s3.region", "us-east-1"),
("s3.path.style.access", "true"),
("catalog.name", "demo"),
("catalog.type", "jdbc"),
("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
("catalog.jdbc.user", "admin"),
("catalog.jdbc.password", "123456"),
("warehouse.path", "s3://icebergdata/demo"),
("database.name", "s1"),
("table.name", "t1"),
]
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();
test_create_catalog(values).await;
}
#[tokio::test]
#[ignore]
async fn test_hive_catalog() {
let values = [
("connector", "iceberg"),
("type", "append-only"),
("force_append_only", "true"),
("s3.endpoint", "http://127.0.0.1:9301"),
("s3.access.key", "hummockadmin"),
("s3.secret.key", "hummockadmin"),
("s3.region", "us-east-1"),
("s3.path.style.access", "true"),
("catalog.name", "demo"),
("catalog.type", "hive"),
("catalog.uri", "thrift://localhost:9083"),
("warehouse.path", "s3://icebergdata/demo"),
("database.name", "s1"),
("table.name", "t1"),
]
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();
test_create_catalog(values).await;
}
}