1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31 SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40 POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45 DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58 Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use serde::{Deserialize, Serialize};
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
84use super::{
85 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86 SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::writer::SinkWriter;
93use crate::sink::{
94 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
95 TwoPhaseCommitCoordinator,
96};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100pub const ICEBERG_COW_BRANCH: &str = "ingestion";
101pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
102pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
103pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
104pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
105pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
106
107pub const PARTITION_DATA_ID_START: i32 = 1000;
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
110#[serde(rename_all = "kebab-case")]
111pub enum IcebergWriteMode {
112 #[default]
113 MergeOnRead,
114 CopyOnWrite,
115}
116
117impl IcebergWriteMode {
118 pub fn as_str(self) -> &'static str {
119 match self {
120 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
121 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
122 }
123 }
124}
125
126impl std::str::FromStr for IcebergWriteMode {
127 type Err = SinkError;
128
129 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
130 match s {
131 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
132 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
133 _ => Err(SinkError::Config(anyhow!(format!(
134 "invalid write_mode: {}, must be one of: {}, {}",
135 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
136 )))),
137 }
138 }
139}
140
141impl TryFrom<&str> for IcebergWriteMode {
142 type Error = <Self as std::str::FromStr>::Err;
143
144 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
145 value.parse()
146 }
147}
148
149impl TryFrom<String> for IcebergWriteMode {
150 type Error = <Self as std::str::FromStr>::Err;
151
152 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
153 value.as_str().parse()
154 }
155}
156
157impl std::fmt::Display for IcebergWriteMode {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.write_str(self.as_str())
160 }
161}
162
163pub const ENABLE_COMPACTION: &str = "enable_compaction";
165pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
166pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
167pub const WRITE_MODE: &str = "write_mode";
168pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
169pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
170pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
171pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
172 "snapshot_expiration_clear_expired_meta_data";
173pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
174
175pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
176
177pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
178
179pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
180
181pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
182
183pub const COMPACTION_TYPE: &str = "compaction.type";
184
185fn default_commit_retry_num() -> u32 {
186 8
187}
188
189fn default_iceberg_write_mode() -> IcebergWriteMode {
190 IcebergWriteMode::MergeOnRead
191}
192
193fn default_true() -> bool {
194 true
195}
196
197fn default_some_true() -> Option<bool> {
198 Some(true)
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
203#[serde(rename_all = "kebab-case")]
204pub enum CompactionType {
205 #[default]
207 Full,
208 SmallFiles,
210 FilesWithDelete,
212}
213
214impl CompactionType {
215 pub fn as_str(&self) -> &'static str {
216 match self {
217 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
218 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
219 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
220 }
221 }
222}
223
224impl std::str::FromStr for CompactionType {
225 type Err = SinkError;
226
227 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
228 match s {
229 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
230 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
231 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
232 _ => Err(SinkError::Config(anyhow!(format!(
233 "invalid compaction_type: {}, must be one of: {}, {}, {}",
234 s,
235 ICEBERG_COMPACTION_TYPE_FULL,
236 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
237 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
238 )))),
239 }
240 }
241}
242
243impl TryFrom<&str> for CompactionType {
244 type Error = <Self as std::str::FromStr>::Err;
245
246 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
247 value.parse()
248 }
249}
250
251impl TryFrom<String> for CompactionType {
252 type Error = <Self as std::str::FromStr>::Err;
253
254 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
255 value.as_str().parse()
256 }
257}
258
259impl std::fmt::Display for CompactionType {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.as_str())
262 }
263}
264
265#[serde_as]
266#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
267pub struct IcebergConfig {
268 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
271 pub force_append_only: bool,
272
273 #[serde(flatten)]
274 common: IcebergCommon,
275
276 #[serde(flatten)]
277 table: IcebergTableIdentifier,
278
279 #[serde(
280 rename = "primary_key",
281 default,
282 deserialize_with = "deserialize_optional_string_seq_from_string"
283 )]
284 pub primary_key: Option<Vec<String>>,
285
286 #[serde(skip)]
288 pub java_catalog_props: HashMap<String, String>,
289
290 #[serde(default)]
291 pub partition_by: Option<String>,
292
293 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
295 #[serde_as(as = "DisplayFromStr")]
296 #[with_option(allow_alter_on_fly)]
297 pub commit_checkpoint_interval: u64,
298
299 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
300 pub create_table_if_not_exists: bool,
301
302 #[serde(default = "default_some_true")]
304 #[serde_as(as = "Option<DisplayFromStr>")]
305 pub is_exactly_once: Option<bool>,
306 #[serde(default = "default_commit_retry_num")]
311 pub commit_retry_num: u32,
312
313 #[serde(
315 rename = "enable_compaction",
316 default,
317 deserialize_with = "deserialize_bool_from_string"
318 )]
319 #[with_option(allow_alter_on_fly)]
320 pub enable_compaction: bool,
321
322 #[serde(rename = "compaction_interval_sec", default)]
324 #[serde_as(as = "Option<DisplayFromStr>")]
325 #[with_option(allow_alter_on_fly)]
326 pub compaction_interval_sec: Option<u64>,
327
328 #[serde(
330 rename = "enable_snapshot_expiration",
331 default,
332 deserialize_with = "deserialize_bool_from_string"
333 )]
334 #[with_option(allow_alter_on_fly)]
335 pub enable_snapshot_expiration: bool,
336
337 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
339 pub write_mode: IcebergWriteMode,
340
341 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
344 #[serde_as(as = "Option<DisplayFromStr>")]
345 #[with_option(allow_alter_on_fly)]
346 pub snapshot_expiration_max_age_millis: Option<i64>,
347
348 #[serde(rename = "snapshot_expiration_retain_last", default)]
350 #[serde_as(as = "Option<DisplayFromStr>")]
351 #[with_option(allow_alter_on_fly)]
352 pub snapshot_expiration_retain_last: Option<i32>,
353
354 #[serde(
355 rename = "snapshot_expiration_clear_expired_files",
356 default = "default_true",
357 deserialize_with = "deserialize_bool_from_string"
358 )]
359 #[with_option(allow_alter_on_fly)]
360 pub snapshot_expiration_clear_expired_files: bool,
361
362 #[serde(
363 rename = "snapshot_expiration_clear_expired_meta_data",
364 default = "default_true",
365 deserialize_with = "deserialize_bool_from_string"
366 )]
367 #[with_option(allow_alter_on_fly)]
368 pub snapshot_expiration_clear_expired_meta_data: bool,
369
370 #[serde(rename = "compaction.max_snapshots_num", default)]
373 #[serde_as(as = "Option<DisplayFromStr>")]
374 #[with_option(allow_alter_on_fly)]
375 pub max_snapshots_num_before_compaction: Option<usize>,
376
377 #[serde(rename = "compaction.small_files_threshold_mb", default)]
378 #[serde_as(as = "Option<DisplayFromStr>")]
379 #[with_option(allow_alter_on_fly)]
380 pub small_files_threshold_mb: Option<u64>,
381
382 #[serde(rename = "compaction.delete_files_count_threshold", default)]
383 #[serde_as(as = "Option<DisplayFromStr>")]
384 #[with_option(allow_alter_on_fly)]
385 pub delete_files_count_threshold: Option<usize>,
386
387 #[serde(rename = "compaction.trigger_snapshot_count", default)]
388 #[serde_as(as = "Option<DisplayFromStr>")]
389 #[with_option(allow_alter_on_fly)]
390 pub trigger_snapshot_count: Option<usize>,
391
392 #[serde(rename = "compaction.target_file_size_mb", default)]
393 #[serde_as(as = "Option<DisplayFromStr>")]
394 #[with_option(allow_alter_on_fly)]
395 pub target_file_size_mb: Option<u64>,
396
397 #[serde(rename = "compaction.type", default)]
400 #[with_option(allow_alter_on_fly)]
401 pub compaction_type: Option<CompactionType>,
402}
403
404impl EnforceSecret for IcebergConfig {
405 fn enforce_secret<'a>(
406 prop_iter: impl Iterator<Item = &'a str>,
407 ) -> crate::error::ConnectorResult<()> {
408 for prop in prop_iter {
409 IcebergCommon::enforce_one(prop)?;
410 }
411 Ok(())
412 }
413
414 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
415 IcebergCommon::enforce_one(prop)
416 }
417}
418
419impl IcebergConfig {
420 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
421 let mut config =
422 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
423 .map_err(|e| SinkError::Config(anyhow!(e)))?;
424
425 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
426 return Err(SinkError::Config(anyhow!(
427 "`{}` must be {}, or {}",
428 SINK_TYPE_OPTION,
429 SINK_TYPE_APPEND_ONLY,
430 SINK_TYPE_UPSERT
431 )));
432 }
433
434 if config.r#type == SINK_TYPE_UPSERT {
435 if let Some(primary_key) = &config.primary_key {
436 if primary_key.is_empty() {
437 return Err(SinkError::Config(anyhow!(
438 "`primary-key` must not be empty in {}",
439 SINK_TYPE_UPSERT
440 )));
441 }
442 } else {
443 return Err(SinkError::Config(anyhow!(
444 "Must set `primary-key` in {}",
445 SINK_TYPE_UPSERT
446 )));
447 }
448 }
449
450 config.java_catalog_props = values
452 .iter()
453 .filter(|(k, _v)| {
454 k.starts_with("catalog.")
455 && k != &"catalog.uri"
456 && k != &"catalog.type"
457 && k != &"catalog.name"
458 && k != &"catalog.header"
459 })
460 .map(|(k, v)| (k[8..].to_string(), v.clone()))
461 .collect();
462
463 if config.commit_checkpoint_interval == 0 {
464 return Err(SinkError::Config(anyhow!(
465 "`commit-checkpoint-interval` must be greater than 0"
466 )));
467 }
468
469 Ok(config)
470 }
471
472 pub fn catalog_type(&self) -> &str {
473 self.common.catalog_type()
474 }
475
476 pub async fn load_table(&self) -> Result<Table> {
477 self.common
478 .load_table(&self.table, &self.java_catalog_props)
479 .await
480 .map_err(Into::into)
481 }
482
483 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
484 self.common
485 .create_catalog(&self.java_catalog_props)
486 .await
487 .map_err(Into::into)
488 }
489
490 pub fn full_table_name(&self) -> Result<TableIdent> {
491 self.table.to_table_ident().map_err(Into::into)
492 }
493
494 pub fn catalog_name(&self) -> String {
495 self.common.catalog_name()
496 }
497
498 pub fn compaction_interval_sec(&self) -> u64 {
499 self.compaction_interval_sec.unwrap_or(3600)
501 }
502
503 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
506 self.snapshot_expiration_max_age_millis
507 .map(|max_age_millis| current_time_ms - max_age_millis)
508 }
509
510 pub fn trigger_snapshot_count(&self) -> usize {
511 self.trigger_snapshot_count.unwrap_or(16)
512 }
513
514 pub fn small_files_threshold_mb(&self) -> u64 {
515 self.small_files_threshold_mb.unwrap_or(64)
516 }
517
518 pub fn delete_files_count_threshold(&self) -> usize {
519 self.delete_files_count_threshold.unwrap_or(256)
520 }
521
522 pub fn target_file_size_mb(&self) -> u64 {
523 self.target_file_size_mb.unwrap_or(1024)
524 }
525
526 pub fn compaction_type(&self) -> CompactionType {
529 self.compaction_type.unwrap_or_default()
530 }
531}
532
533pub struct IcebergSink {
534 pub config: IcebergConfig,
535 param: SinkParam,
536 unique_column_ids: Option<Vec<usize>>,
538}
539
540impl EnforceSecret for IcebergSink {
541 fn enforce_secret<'a>(
542 prop_iter: impl Iterator<Item = &'a str>,
543 ) -> crate::error::ConnectorResult<()> {
544 for prop in prop_iter {
545 IcebergConfig::enforce_one(prop)?;
546 }
547 Ok(())
548 }
549}
550
551impl TryFrom<SinkParam> for IcebergSink {
552 type Error = SinkError;
553
554 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
555 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
556 IcebergSink::new(config, param)
557 }
558}
559
560impl Debug for IcebergSink {
561 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
562 f.debug_struct("IcebergSink")
563 .field("config", &self.config)
564 .finish()
565 }
566}
567
568async fn create_and_validate_table_impl(
569 config: &IcebergConfig,
570 param: &SinkParam,
571) -> Result<Table> {
572 if config.create_table_if_not_exists {
573 create_table_if_not_exists_impl(config, param).await?;
574 }
575
576 let table = config
577 .load_table()
578 .await
579 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
580
581 let sink_schema = param.schema();
582 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
583 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
584
585 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
586 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
587
588 Ok(table)
589}
590
591async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
592 let catalog = config.create_catalog().await?;
593 let namespace = if let Some(database_name) = config.table.database_name() {
594 let namespace = NamespaceIdent::new(database_name.to_owned());
595 if !catalog
596 .namespace_exists(&namespace)
597 .await
598 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
599 {
600 catalog
601 .create_namespace(&namespace, HashMap::default())
602 .await
603 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
604 .context("failed to create iceberg namespace")?;
605 }
606 namespace
607 } else {
608 bail!("database name must be set if you want to create table")
609 };
610
611 let table_id = config
612 .full_table_name()
613 .context("Unable to parse table name")?;
614 if !catalog
615 .table_exists(&table_id)
616 .await
617 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
618 {
619 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
620 let arrow_fields = param
622 .columns
623 .iter()
624 .map(|column| {
625 Ok(iceberg_create_table_arrow_convert
626 .to_arrow_field(&column.name, &column.data_type)
627 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
628 .context(format!(
629 "failed to convert {}: {} to arrow type",
630 &column.name, &column.data_type
631 ))?)
632 })
633 .collect::<Result<Vec<ArrowField>>>()?;
634 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
635 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
636 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
637 .context("failed to convert arrow schema to iceberg schema")?;
638
639 let location = {
640 let mut names = namespace.clone().inner();
641 names.push(config.table.table_name().to_owned());
642 match &config.common.warehouse_path {
643 Some(warehouse_path) => {
644 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
645 let url = Url::parse(warehouse_path);
646 if url.is_err() || is_s3_tables {
647 if config.common.catalog_type() == "rest"
650 || config.common.catalog_type() == "rest_rust"
651 {
652 None
653 } else {
654 bail!(format!("Invalid warehouse path: {}", warehouse_path))
655 }
656 } else if warehouse_path.ends_with('/') {
657 Some(format!("{}{}", warehouse_path, names.join("/")))
658 } else {
659 Some(format!("{}/{}", warehouse_path, names.join("/")))
660 }
661 }
662 None => None,
663 }
664 };
665
666 let partition_spec = match &config.partition_by {
667 Some(partition_by) => {
668 let mut partition_fields = Vec::<UnboundPartitionField>::new();
669 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
670 .into_iter()
671 .enumerate()
672 {
673 match iceberg_schema.field_id_by_name(&column) {
674 Some(id) => partition_fields.push(
675 UnboundPartitionField::builder()
676 .source_id(id)
677 .transform(transform)
678 .name(format!("_p_{}", column))
679 .field_id(PARTITION_DATA_ID_START + i as i32)
680 .build(),
681 ),
682 None => bail!(format!(
683 "Partition source column does not exist in schema: {}",
684 column
685 )),
686 };
687 }
688 Some(
689 UnboundPartitionSpec::builder()
690 .with_spec_id(0)
691 .add_partition_fields(partition_fields)
692 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
693 .context("failed to add partition columns")?
694 .build(),
695 )
696 }
697 None => None,
698 };
699
700 let table_creation_builder = TableCreation::builder()
701 .name(config.table.table_name().to_owned())
702 .schema(iceberg_schema);
703
704 let table_creation = match (location, partition_spec) {
705 (Some(location), Some(partition_spec)) => table_creation_builder
706 .location(location)
707 .partition_spec(partition_spec)
708 .build(),
709 (Some(location), None) => table_creation_builder.location(location).build(),
710 (None, Some(partition_spec)) => table_creation_builder
711 .partition_spec(partition_spec)
712 .build(),
713 (None, None) => table_creation_builder.build(),
714 };
715
716 catalog
717 .create_table(&namespace, table_creation)
718 .await
719 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
720 .context("failed to create iceberg table")?;
721 }
722 Ok(())
723}
724
725impl IcebergSink {
726 pub async fn create_and_validate_table(&self) -> Result<Table> {
727 create_and_validate_table_impl(&self.config, &self.param).await
728 }
729
730 pub async fn create_table_if_not_exists(&self) -> Result<()> {
731 create_table_if_not_exists_impl(&self.config, &self.param).await
732 }
733
734 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
735 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
736 if let Some(pk) = &config.primary_key {
737 let mut unique_column_ids = Vec::with_capacity(pk.len());
738 for col_name in pk {
739 let id = param
740 .columns
741 .iter()
742 .find(|col| col.name.as_str() == col_name)
743 .ok_or_else(|| {
744 SinkError::Config(anyhow!(
745 "Primary key column {} not found in sink schema",
746 col_name
747 ))
748 })?
749 .column_id
750 .get_id() as usize;
751 unique_column_ids.push(id);
752 }
753 Some(unique_column_ids)
754 } else {
755 unreachable!()
756 }
757 } else {
758 None
759 };
760 Ok(Self {
761 config,
762 param,
763 unique_column_ids,
764 })
765 }
766}
767
768impl Sink for IcebergSink {
769 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
770
771 const SINK_NAME: &'static str = ICEBERG_SINK;
772
773 async fn validate(&self) -> Result<()> {
774 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
775 bail!("Snowflake catalog only supports iceberg sources");
776 }
777
778 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
779 risingwave_common::license::Feature::IcebergSinkWithGlue
780 .check_available()
781 .map_err(|e| anyhow::anyhow!(e))?;
782 }
783
784 let compaction_type = self.config.compaction_type();
786
787 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
790 && compaction_type != CompactionType::Full
791 {
792 bail!(
793 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
794 compaction_type
795 );
796 }
797
798 match compaction_type {
799 CompactionType::SmallFiles => {
800 risingwave_common::license::Feature::IcebergCompaction
802 .check_available()
803 .map_err(|e| anyhow::anyhow!(e))?;
804
805 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
807 bail!(
808 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
809 self.config.write_mode
810 );
811 }
812
813 if self.config.delete_files_count_threshold.is_some() {
815 bail!(
816 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
817 );
818 }
819 }
820 CompactionType::FilesWithDelete => {
821 risingwave_common::license::Feature::IcebergCompaction
823 .check_available()
824 .map_err(|e| anyhow::anyhow!(e))?;
825
826 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
828 bail!(
829 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
830 self.config.write_mode
831 );
832 }
833
834 if self.config.small_files_threshold_mb.is_some() {
836 bail!(
837 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
838 );
839 }
840 }
841 CompactionType::Full => {
842 }
844 }
845
846 let _ = self.create_and_validate_table().await?;
847 Ok(())
848 }
849
850 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
851 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
852
853 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
855 if iceberg_config.enable_compaction && compaction_interval == 0 {
856 bail!(
857 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
858 );
859 }
860
861 tracing::info!(
863 "Alter config compaction_interval set to {} seconds",
864 compaction_interval
865 );
866 }
867
868 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
870 && max_snapshots < 1
871 {
872 bail!(
873 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
874 max_snapshots
875 );
876 }
877
878 Ok(())
879 }
880
881 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
882 let writer = IcebergSinkWriter::new(
883 self.config.clone(),
884 self.param.clone(),
885 writer_param.clone(),
886 self.unique_column_ids.clone(),
887 );
888
889 let commit_checkpoint_interval =
890 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
891 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
892 );
893 let log_sinker = CoordinatedLogSinker::new(
894 &writer_param,
895 self.param.clone(),
896 writer,
897 commit_checkpoint_interval,
898 )
899 .await?;
900
901 Ok(log_sinker)
902 }
903
904 fn is_coordinated_sink(&self) -> bool {
905 true
906 }
907
908 async fn new_coordinator(
909 &self,
910 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
911 ) -> Result<SinkCommitCoordinator> {
912 let catalog = self.config.create_catalog().await?;
913 let table = self.create_and_validate_table().await?;
914 let coordinator = IcebergSinkCommitter {
915 catalog,
916 table,
917 last_commit_epoch: 0,
918 sink_id: self.param.sink_id,
919 config: self.config.clone(),
920 param: self.param.clone(),
921 commit_retry_num: self.config.commit_retry_num,
922 iceberg_compact_stat_sender,
923 };
924 if self.config.is_exactly_once.unwrap_or_default() {
925 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
926 } else {
927 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
928 }
929 }
930}
931
932enum ProjectIdxVec {
939 None,
940 Prepare(usize),
941 Done(Vec<usize>),
942}
943
944type DataFileWriterBuilderType =
945 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
946type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
947 ParquetWriterBuilder,
948 DefaultLocationGenerator,
949 DefaultFileNameGenerator,
950>;
951type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
952 ParquetWriterBuilder,
953 DefaultLocationGenerator,
954 DefaultFileNameGenerator,
955>;
956
957#[derive(Clone)]
958struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
959 inner: B,
960 fanout_enabled: bool,
961 schema: IcebergSchemaRef,
962 partition_spec: PartitionSpecRef,
963 compute_partition: bool,
964}
965
966impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
967 fn new(
968 inner: B,
969 fanout_enabled: bool,
970 schema: IcebergSchemaRef,
971 partition_spec: PartitionSpecRef,
972 compute_partition: bool,
973 ) -> Self {
974 Self {
975 inner,
976 fanout_enabled,
977 schema,
978 partition_spec,
979 compute_partition,
980 }
981 }
982
983 fn build(self) -> iceberg::Result<TaskWriter<B>> {
984 let partition_splitter = match (
985 self.partition_spec.is_unpartitioned(),
986 self.compute_partition,
987 ) {
988 (true, _) => None,
989 (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
990 self.schema.clone(),
991 self.partition_spec.clone(),
992 )?),
993 (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
994 self.schema.clone(),
995 self.partition_spec.clone(),
996 )?),
997 };
998
999 Ok(TaskWriter::new_with_partition_splitter(
1000 self.inner,
1001 self.fanout_enabled,
1002 self.schema,
1003 self.partition_spec,
1004 partition_splitter,
1005 ))
1006 }
1007}
1008
1009pub enum IcebergSinkWriter {
1010 Created(IcebergSinkWriterArgs),
1011 Initialized(IcebergSinkWriterInner),
1012}
1013
1014pub struct IcebergSinkWriterArgs {
1015 config: IcebergConfig,
1016 sink_param: SinkParam,
1017 writer_param: SinkWriterParam,
1018 unique_column_ids: Option<Vec<usize>>,
1019}
1020
1021pub struct IcebergSinkWriterInner {
1022 writer: IcebergWriterDispatch,
1023 arrow_schema: SchemaRef,
1024 metrics: IcebergWriterMetrics,
1026 table: Table,
1028 project_idx_vec: ProjectIdxVec,
1031}
1032
1033#[allow(clippy::type_complexity)]
1034enum IcebergWriterDispatch {
1035 Append {
1036 writer: Option<Box<dyn IcebergWriter>>,
1037 writer_builder:
1038 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1039 },
1040 Upsert {
1041 writer: Option<Box<dyn IcebergWriter>>,
1042 writer_builder: TaskWriterBuilderWrapper<
1043 MonitoredGeneralWriterBuilder<
1044 DeltaWriterBuilder<
1045 DataFileWriterBuilderType,
1046 PositionDeleteFileWriterBuilderType,
1047 EqualityDeleteFileWriterBuilderType,
1048 >,
1049 >,
1050 >,
1051 arrow_schema_with_op_column: SchemaRef,
1052 },
1053}
1054
1055impl IcebergWriterDispatch {
1056 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1057 match self {
1058 IcebergWriterDispatch::Append { writer, .. }
1059 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1060 }
1061 }
1062}
1063
1064pub struct IcebergWriterMetrics {
1065 _write_qps: LabelGuardedIntCounter,
1070 _write_latency: LabelGuardedHistogram,
1071 write_bytes: LabelGuardedIntCounter,
1072}
1073
1074impl IcebergSinkWriter {
1075 pub fn new(
1076 config: IcebergConfig,
1077 sink_param: SinkParam,
1078 writer_param: SinkWriterParam,
1079 unique_column_ids: Option<Vec<usize>>,
1080 ) -> Self {
1081 Self::Created(IcebergSinkWriterArgs {
1082 config,
1083 sink_param,
1084 writer_param,
1085 unique_column_ids,
1086 })
1087 }
1088}
1089
1090impl IcebergSinkWriterInner {
1091 fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1092 let SinkWriterParam {
1093 extra_partition_col_idx,
1094 actor_id,
1095 sink_id,
1096 sink_name,
1097 ..
1098 } = writer_param;
1099 let metrics_labels = [
1100 &actor_id.to_string(),
1101 &sink_id.to_string(),
1102 sink_name.as_str(),
1103 ];
1104
1105 let write_qps = GLOBAL_SINK_METRICS
1107 .iceberg_write_qps
1108 .with_guarded_label_values(&metrics_labels);
1109 let write_latency = GLOBAL_SINK_METRICS
1110 .iceberg_write_latency
1111 .with_guarded_label_values(&metrics_labels);
1112 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1115 .iceberg_rolling_unflushed_data_file
1116 .with_guarded_label_values(&metrics_labels);
1117 let write_bytes = GLOBAL_SINK_METRICS
1118 .iceberg_write_bytes
1119 .with_guarded_label_values(&metrics_labels);
1120
1121 let schema = table.metadata().current_schema();
1122 let partition_spec = table.metadata().default_partition_spec();
1123 let fanout_enabled = !partition_spec.fields().is_empty();
1124
1125 let unique_uuid_suffix = Uuid::now_v7();
1127
1128 let parquet_writer_properties = WriterProperties::builder()
1129 .set_max_row_group_size(
1130 writer_param
1131 .streaming_config
1132 .developer
1133 .iceberg_sink_write_parquet_max_row_group_rows,
1134 )
1135 .build();
1136
1137 let parquet_writer_builder =
1138 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1139 let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1140 parquet_writer_builder,
1141 table.file_io().clone(),
1142 DefaultLocationGenerator::new(table.metadata().clone())
1143 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1144 DefaultFileNameGenerator::new(
1145 writer_param.actor_id.to_string(),
1146 Some(unique_uuid_suffix.to_string()),
1147 iceberg::spec::DataFileFormat::Parquet,
1148 ),
1149 );
1150 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1151 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1152 data_file_builder,
1153 write_qps.clone(),
1154 write_latency.clone(),
1155 );
1156 let writer_builder = TaskWriterBuilderWrapper::new(
1157 monitored_builder,
1158 fanout_enabled,
1159 schema.clone(),
1160 partition_spec.clone(),
1161 true,
1162 );
1163 let inner_writer = Some(Box::new(
1164 writer_builder
1165 .clone()
1166 .build()
1167 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1168 ) as Box<dyn IcebergWriter>);
1169 Ok(Self {
1170 arrow_schema: Arc::new(
1171 schema_to_arrow_schema(table.metadata().current_schema())
1172 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1173 ),
1174 metrics: IcebergWriterMetrics {
1175 _write_qps: write_qps,
1176 _write_latency: write_latency,
1177 write_bytes,
1178 },
1179 writer: IcebergWriterDispatch::Append {
1180 writer: inner_writer,
1181 writer_builder,
1182 },
1183 table,
1184 project_idx_vec: {
1185 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1186 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1187 } else {
1188 ProjectIdxVec::None
1189 }
1190 },
1191 })
1192 }
1193
1194 fn build_upsert(
1195 table: Table,
1196 unique_column_ids: Vec<usize>,
1197 writer_param: &SinkWriterParam,
1198 ) -> Result<Self> {
1199 let SinkWriterParam {
1200 extra_partition_col_idx,
1201 actor_id,
1202 sink_id,
1203 sink_name,
1204 ..
1205 } = writer_param;
1206 let metrics_labels = [
1207 &actor_id.to_string(),
1208 &sink_id.to_string(),
1209 sink_name.as_str(),
1210 ];
1211 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1212
1213 let write_qps = GLOBAL_SINK_METRICS
1215 .iceberg_write_qps
1216 .with_guarded_label_values(&metrics_labels);
1217 let write_latency = GLOBAL_SINK_METRICS
1218 .iceberg_write_latency
1219 .with_guarded_label_values(&metrics_labels);
1220 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1223 .iceberg_rolling_unflushed_data_file
1224 .with_guarded_label_values(&metrics_labels);
1225 let write_bytes = GLOBAL_SINK_METRICS
1226 .iceberg_write_bytes
1227 .with_guarded_label_values(&metrics_labels);
1228
1229 let schema = table.metadata().current_schema();
1231 let partition_spec = table.metadata().default_partition_spec();
1232 let fanout_enabled = !partition_spec.fields().is_empty();
1233
1234 let unique_uuid_suffix = Uuid::now_v7();
1236
1237 let parquet_writer_properties = WriterProperties::builder()
1238 .set_max_row_group_size(
1239 writer_param
1240 .streaming_config
1241 .developer
1242 .iceberg_sink_write_parquet_max_row_group_rows,
1243 )
1244 .build();
1245
1246 let data_file_builder = {
1247 let parquet_writer_builder =
1248 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1249 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1250 parquet_writer_builder,
1251 table.file_io().clone(),
1252 DefaultLocationGenerator::new(table.metadata().clone())
1253 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1254 DefaultFileNameGenerator::new(
1255 writer_param.actor_id.to_string(),
1256 Some(unique_uuid_suffix.to_string()),
1257 iceberg::spec::DataFileFormat::Parquet,
1258 ),
1259 );
1260 DataFileWriterBuilder::new(rolling_writer_builder)
1261 };
1262 let position_delete_builder = {
1263 let parquet_writer_builder = ParquetWriterBuilder::new(
1264 parquet_writer_properties.clone(),
1265 POSITION_DELETE_SCHEMA.clone().into(),
1266 );
1267 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1268 parquet_writer_builder,
1269 table.file_io().clone(),
1270 DefaultLocationGenerator::new(table.metadata().clone())
1271 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1272 DefaultFileNameGenerator::new(
1273 writer_param.actor_id.to_string(),
1274 Some(format!("pos-del-{}", unique_uuid_suffix)),
1275 iceberg::spec::DataFileFormat::Parquet,
1276 ),
1277 );
1278 PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1279 };
1280 let equality_delete_builder = {
1281 let config = EqualityDeleteWriterConfig::new(
1282 unique_column_ids.clone(),
1283 table.metadata().current_schema().clone(),
1284 )
1285 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1286 let parquet_writer_builder = ParquetWriterBuilder::new(
1287 parquet_writer_properties,
1288 Arc::new(
1289 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1290 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1291 ),
1292 );
1293 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1294 parquet_writer_builder,
1295 table.file_io().clone(),
1296 DefaultLocationGenerator::new(table.metadata().clone())
1297 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1298 DefaultFileNameGenerator::new(
1299 writer_param.actor_id.to_string(),
1300 Some(format!("eq-del-{}", unique_uuid_suffix)),
1301 iceberg::spec::DataFileFormat::Parquet,
1302 ),
1303 );
1304
1305 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1306 };
1307 let delta_builder = DeltaWriterBuilder::new(
1308 data_file_builder,
1309 position_delete_builder,
1310 equality_delete_builder,
1311 unique_column_ids,
1312 schema.clone(),
1313 );
1314 let original_arrow_schema = Arc::new(
1315 schema_to_arrow_schema(table.metadata().current_schema())
1316 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1317 );
1318 let schema_with_extra_op_column = {
1319 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1320 new_fields.push(Arc::new(ArrowField::new(
1321 "op".to_owned(),
1322 ArrowDataType::Int32,
1323 false,
1324 )));
1325 Arc::new(ArrowSchema::new(new_fields))
1326 };
1327 let writer_builder = TaskWriterBuilderWrapper::new(
1328 MonitoredGeneralWriterBuilder::new(
1329 delta_builder,
1330 write_qps.clone(),
1331 write_latency.clone(),
1332 ),
1333 fanout_enabled,
1334 schema.clone(),
1335 partition_spec.clone(),
1336 true,
1337 );
1338 let inner_writer = Some(Box::new(
1339 writer_builder
1340 .clone()
1341 .build()
1342 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1343 ) as Box<dyn IcebergWriter>);
1344 Ok(Self {
1345 arrow_schema: original_arrow_schema,
1346 metrics: IcebergWriterMetrics {
1347 _write_qps: write_qps,
1348 _write_latency: write_latency,
1349 write_bytes,
1350 },
1351 table,
1352 writer: IcebergWriterDispatch::Upsert {
1353 writer: inner_writer,
1354 writer_builder,
1355 arrow_schema_with_op_column: schema_with_extra_op_column,
1356 },
1357 project_idx_vec: {
1358 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1359 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1360 } else {
1361 ProjectIdxVec::None
1362 }
1363 },
1364 })
1365 }
1366}
1367
1368#[async_trait]
1369impl SinkWriter for IcebergSinkWriter {
1370 type CommitMetadata = Option<SinkMetadata>;
1371
1372 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1374 let Self::Created(args) = self else {
1375 return Ok(());
1376 };
1377
1378 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1379 let inner = match &args.unique_column_ids {
1380 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1381 table,
1382 unique_column_ids.clone(),
1383 &args.writer_param,
1384 )?,
1385 None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1386 };
1387
1388 *self = IcebergSinkWriter::Initialized(inner);
1389 Ok(())
1390 }
1391
1392 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1394 let Self::Initialized(inner) = self else {
1395 unreachable!("IcebergSinkWriter should be initialized before barrier");
1396 };
1397
1398 match &mut inner.writer {
1400 IcebergWriterDispatch::Append {
1401 writer,
1402 writer_builder,
1403 } => {
1404 if writer.is_none() {
1405 *writer = Some(Box::new(
1406 writer_builder
1407 .clone()
1408 .build()
1409 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1410 ));
1411 }
1412 }
1413 IcebergWriterDispatch::Upsert {
1414 writer,
1415 writer_builder,
1416 ..
1417 } => {
1418 if writer.is_none() {
1419 *writer = Some(Box::new(
1420 writer_builder
1421 .clone()
1422 .build()
1423 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1424 ));
1425 }
1426 }
1427 };
1428
1429 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1431 match &mut inner.project_idx_vec {
1432 ProjectIdxVec::None => {}
1433 ProjectIdxVec::Prepare(idx) => {
1434 if *idx >= chunk.columns().len() {
1435 return Err(SinkError::Iceberg(anyhow!(
1436 "invalid extra partition column index {}",
1437 idx
1438 )));
1439 }
1440 let project_idx_vec = (0..*idx)
1441 .chain(*idx + 1..chunk.columns().len())
1442 .collect_vec();
1443 chunk = chunk.project(&project_idx_vec);
1444 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1445 }
1446 ProjectIdxVec::Done(idx_vec) => {
1447 chunk = chunk.project(idx_vec);
1448 }
1449 }
1450 if ops.is_empty() {
1451 return Ok(());
1452 }
1453 let write_batch_size = chunk.estimated_heap_size();
1454 let batch = match &inner.writer {
1455 IcebergWriterDispatch::Append { .. } => {
1456 let filters =
1458 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1459 chunk.set_visibility(filters);
1460 IcebergArrowConvert
1461 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1462 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1463 }
1464 IcebergWriterDispatch::Upsert {
1465 arrow_schema_with_op_column,
1466 ..
1467 } => {
1468 let chunk = IcebergArrowConvert
1469 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1470 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1471 let ops = Arc::new(Int32Array::from(
1472 ops.iter()
1473 .map(|op| match op {
1474 Op::UpdateInsert | Op::Insert => INSERT_OP,
1475 Op::UpdateDelete | Op::Delete => DELETE_OP,
1476 })
1477 .collect_vec(),
1478 ));
1479 let mut columns = chunk.columns().to_vec();
1480 columns.push(ops);
1481 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1482 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1483 }
1484 };
1485
1486 let writer = inner.writer.get_writer().unwrap();
1487 writer
1488 .write(batch)
1489 .instrument_await("iceberg_write")
1490 .await
1491 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1492 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1493 Ok(())
1494 }
1495
1496 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1499 let Self::Initialized(inner) = self else {
1500 unreachable!("IcebergSinkWriter should be initialized before barrier");
1501 };
1502
1503 if !is_checkpoint {
1505 return Ok(None);
1506 }
1507
1508 let close_result = match &mut inner.writer {
1509 IcebergWriterDispatch::Append {
1510 writer,
1511 writer_builder,
1512 } => {
1513 let close_result = match writer.take() {
1514 Some(mut writer) => {
1515 Some(writer.close().instrument_await("iceberg_close").await)
1516 }
1517 _ => None,
1518 };
1519 match writer_builder.clone().build() {
1520 Ok(new_writer) => {
1521 *writer = Some(Box::new(new_writer));
1522 }
1523 _ => {
1524 warn!("Failed to build new writer after close");
1527 }
1528 }
1529 close_result
1530 }
1531 IcebergWriterDispatch::Upsert {
1532 writer,
1533 writer_builder,
1534 ..
1535 } => {
1536 let close_result = match writer.take() {
1537 Some(mut writer) => {
1538 Some(writer.close().instrument_await("iceberg_close").await)
1539 }
1540 _ => None,
1541 };
1542 match writer_builder.clone().build() {
1543 Ok(new_writer) => {
1544 *writer = Some(Box::new(new_writer));
1545 }
1546 _ => {
1547 warn!("Failed to build new writer after close");
1550 }
1551 }
1552 close_result
1553 }
1554 };
1555
1556 match close_result {
1557 Some(Ok(result)) => {
1558 let format_version = inner.table.metadata().format_version();
1559 let partition_type = inner.table.metadata().default_partition_type();
1560 let data_files = result
1561 .into_iter()
1562 .map(|f| {
1563 let truncated = truncate_datafile(f);
1565 SerializedDataFile::try_from(truncated, partition_type, format_version)
1566 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1567 })
1568 .collect::<Result<Vec<_>>>()?;
1569 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1570 data_files,
1571 schema_id: inner.table.metadata().current_schema_id(),
1572 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1573 })?))
1574 }
1575 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1576 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1577 }
1578 }
1579}
1580
1581const SCHEMA_ID: &str = "schema_id";
1582const PARTITION_SPEC_ID: &str = "partition_spec_id";
1583const DATA_FILES: &str = "data_files";
1584
1585const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1609 data_file.lower_bounds.retain(|field_id, datum| {
1611 let size = match datum.to_bytes() {
1613 Ok(bytes) => bytes.len(),
1614 Err(_) => 0,
1615 };
1616
1617 if size > MAX_COLUMN_STAT_SIZE {
1618 tracing::debug!(
1619 field_id = field_id,
1620 size = size,
1621 "Truncating large lower_bound statistic"
1622 );
1623 return false;
1624 }
1625 true
1626 });
1627
1628 data_file.upper_bounds.retain(|field_id, datum| {
1630 let size = match datum.to_bytes() {
1632 Ok(bytes) => bytes.len(),
1633 Err(_) => 0,
1634 };
1635
1636 if size > MAX_COLUMN_STAT_SIZE {
1637 tracing::debug!(
1638 field_id = field_id,
1639 size = size,
1640 "Truncating large upper_bound statistic"
1641 );
1642 return false;
1643 }
1644 true
1645 });
1646
1647 data_file
1648}
1649
1650#[derive(Default, Clone)]
1651struct IcebergCommitResult {
1652 schema_id: i32,
1653 partition_spec_id: i32,
1654 data_files: Vec<SerializedDataFile>,
1655}
1656
1657impl IcebergCommitResult {
1658 fn try_from(value: &SinkMetadata) -> Result<Self> {
1659 if let Some(Serialized(v)) = &value.metadata {
1660 let mut values = if let serde_json::Value::Object(v) =
1661 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1662 .context("Can't parse iceberg sink metadata")?
1663 {
1664 v
1665 } else {
1666 bail!("iceberg sink metadata should be an object");
1667 };
1668
1669 let schema_id;
1670 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1671 schema_id = value
1672 .as_u64()
1673 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1674 } else {
1675 bail!("iceberg sink metadata should have schema_id");
1676 }
1677
1678 let partition_spec_id;
1679 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1680 partition_spec_id = value
1681 .as_u64()
1682 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1683 } else {
1684 bail!("iceberg sink metadata should have partition_spec_id");
1685 }
1686
1687 let data_files: Vec<SerializedDataFile>;
1688 if let serde_json::Value::Array(values) = values
1689 .remove(DATA_FILES)
1690 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1691 {
1692 data_files = values
1693 .into_iter()
1694 .map(from_value::<SerializedDataFile>)
1695 .collect::<std::result::Result<_, _>>()
1696 .unwrap();
1697 } else {
1698 bail!("iceberg sink metadata should have data_files object");
1699 }
1700
1701 Ok(Self {
1702 schema_id: schema_id as i32,
1703 partition_spec_id: partition_spec_id as i32,
1704 data_files,
1705 })
1706 } else {
1707 bail!("Can't create iceberg sink write result from empty data!")
1708 }
1709 }
1710
1711 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1712 let mut values = if let serde_json::Value::Object(value) =
1713 serde_json::from_slice::<serde_json::Value>(&value)
1714 .context("Can't parse iceberg sink metadata")?
1715 {
1716 value
1717 } else {
1718 bail!("iceberg sink metadata should be an object");
1719 };
1720
1721 let schema_id;
1722 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1723 schema_id = value
1724 .as_u64()
1725 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1726 } else {
1727 bail!("iceberg sink metadata should have schema_id");
1728 }
1729
1730 let partition_spec_id;
1731 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1732 partition_spec_id = value
1733 .as_u64()
1734 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1735 } else {
1736 bail!("iceberg sink metadata should have partition_spec_id");
1737 }
1738
1739 let data_files: Vec<SerializedDataFile>;
1740 if let serde_json::Value::Array(values) = values
1741 .remove(DATA_FILES)
1742 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1743 {
1744 data_files = values
1745 .into_iter()
1746 .map(from_value::<SerializedDataFile>)
1747 .collect::<std::result::Result<_, _>>()
1748 .unwrap();
1749 } else {
1750 bail!("iceberg sink metadata should have data_files object");
1751 }
1752
1753 Ok(Self {
1754 schema_id: schema_id as i32,
1755 partition_spec_id: partition_spec_id as i32,
1756 data_files,
1757 })
1758 }
1759}
1760
1761impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1762 type Error = SinkError;
1763
1764 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1765 let json_data_files = serde_json::Value::Array(
1766 value
1767 .data_files
1768 .iter()
1769 .map(serde_json::to_value)
1770 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1771 .context("Can't serialize data files to json")?,
1772 );
1773 let json_value = serde_json::Value::Object(
1774 vec![
1775 (
1776 SCHEMA_ID.to_owned(),
1777 serde_json::Value::Number(value.schema_id.into()),
1778 ),
1779 (
1780 PARTITION_SPEC_ID.to_owned(),
1781 serde_json::Value::Number(value.partition_spec_id.into()),
1782 ),
1783 (DATA_FILES.to_owned(), json_data_files),
1784 ]
1785 .into_iter()
1786 .collect(),
1787 );
1788 Ok(SinkMetadata {
1789 metadata: Some(Serialized(SerializedMetadata {
1790 metadata: serde_json::to_vec(&json_value)
1791 .context("Can't serialize iceberg sink metadata")?,
1792 })),
1793 })
1794 }
1795}
1796
1797impl TryFrom<IcebergCommitResult> for Vec<u8> {
1798 type Error = SinkError;
1799
1800 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1801 let json_data_files = serde_json::Value::Array(
1802 value
1803 .data_files
1804 .iter()
1805 .map(serde_json::to_value)
1806 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1807 .context("Can't serialize data files to json")?,
1808 );
1809 let json_value = serde_json::Value::Object(
1810 vec![
1811 (
1812 SCHEMA_ID.to_owned(),
1813 serde_json::Value::Number(value.schema_id.into()),
1814 ),
1815 (
1816 PARTITION_SPEC_ID.to_owned(),
1817 serde_json::Value::Number(value.partition_spec_id.into()),
1818 ),
1819 (DATA_FILES.to_owned(), json_data_files),
1820 ]
1821 .into_iter()
1822 .collect(),
1823 );
1824 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1825 }
1826}
1827pub struct IcebergSinkCommitter {
1828 catalog: Arc<dyn Catalog>,
1829 table: Table,
1830 pub last_commit_epoch: u64,
1831 pub(crate) sink_id: SinkId,
1832 pub(crate) config: IcebergConfig,
1833 pub(crate) param: SinkParam,
1834 commit_retry_num: u32,
1835 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1836}
1837
1838impl IcebergSinkCommitter {
1839 async fn reload_table(
1842 catalog: &dyn Catalog,
1843 table_ident: &TableIdent,
1844 schema_id: i32,
1845 partition_spec_id: i32,
1846 ) -> Result<Table> {
1847 let table = catalog
1848 .load_table(table_ident)
1849 .await
1850 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1851 if table.metadata().current_schema_id() != schema_id {
1852 return Err(SinkError::Iceberg(anyhow!(
1853 "Schema evolution not supported, expect schema id {}, but got {}",
1854 schema_id,
1855 table.metadata().current_schema_id()
1856 )));
1857 }
1858 if table.metadata().default_partition_spec_id() != partition_spec_id {
1859 return Err(SinkError::Iceberg(anyhow!(
1860 "Partition evolution not supported, expect partition spec id {}, but got {}",
1861 partition_spec_id,
1862 table.metadata().default_partition_spec_id()
1863 )));
1864 }
1865 Ok(table)
1866 }
1867}
1868
1869#[async_trait]
1870impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1871 async fn init(&mut self) -> Result<()> {
1872 tracing::info!(
1873 "Sink id = {}: iceberg sink coordinator initing.",
1874 self.param.sink_id
1875 );
1876
1877 Ok(())
1878 }
1879
1880 async fn commit(
1881 &mut self,
1882 epoch: u64,
1883 metadata: Vec<SinkMetadata>,
1884 add_columns: Option<Vec<Field>>,
1885 ) -> Result<()> {
1886 tracing::info!("Starting iceberg direct commit in epoch {epoch}");
1887
1888 let (write_results, snapshot_id) =
1889 match self.pre_commit_inner(epoch, metadata, add_columns)? {
1890 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1891 None => {
1892 tracing::debug!(?epoch, "no data to commit");
1893 return Ok(());
1894 }
1895 };
1896
1897 self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1898 .await
1899 }
1900}
1901
1902#[async_trait]
1903impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1904 async fn init(&mut self) -> Result<()> {
1905 tracing::info!(
1906 "Sink id = {}: iceberg sink coordinator initing.",
1907 self.param.sink_id
1908 );
1909
1910 Ok(())
1911 }
1912
1913 async fn pre_commit(
1914 &mut self,
1915 epoch: u64,
1916 metadata: Vec<SinkMetadata>,
1917 add_columns: Option<Vec<Field>>,
1918 ) -> Result<Vec<u8>> {
1919 tracing::info!("Starting iceberg pre commit in epoch {epoch}");
1920
1921 let (write_results, snapshot_id) =
1922 match self.pre_commit_inner(epoch, metadata, add_columns)? {
1923 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1924 None => {
1925 tracing::debug!(?epoch, "no data to commit");
1926 return Ok(vec![]);
1927 }
1928 };
1929
1930 let mut write_results_bytes = Vec::new();
1931 for each_parallelism_write_result in write_results {
1932 let each_parallelism_write_result_bytes: Vec<u8> =
1933 each_parallelism_write_result.try_into()?;
1934 write_results_bytes.push(each_parallelism_write_result_bytes);
1935 }
1936
1937 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1938 write_results_bytes.push(snapshot_id_bytes);
1939
1940 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1941 Ok(pre_commit_metadata_bytes)
1942 }
1943
1944 async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1945 tracing::info!("Starting iceberg commit in epoch {epoch}");
1946 if commit_metadata.is_empty() {
1947 tracing::debug!(?epoch, "no data to commit");
1948 return Ok(());
1949 }
1950
1951 let mut write_results_bytes = deserialize_metadata(commit_metadata);
1952
1953 let snapshot_id_bytes = write_results_bytes.pop().unwrap();
1954 let snapshot_id = i64::from_le_bytes(
1955 snapshot_id_bytes
1956 .try_into()
1957 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1958 );
1959
1960 if self
1961 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1962 .await?
1963 {
1964 tracing::info!(
1965 "Snapshot id {} already committed in iceberg table, skip committing again.",
1966 snapshot_id
1967 );
1968 return Ok(());
1969 }
1970
1971 let mut write_results = vec![];
1972 for each in write_results_bytes {
1973 let write_result = IcebergCommitResult::try_from_serialized_bytes(each)?;
1974 write_results.push(write_result);
1975 }
1976
1977 self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1978 .await?;
1979
1980 Ok(())
1981 }
1982
1983 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
1984 tracing::debug!("Abort not implemented yet");
1986 }
1987}
1988
1989impl IcebergSinkCommitter {
1991 fn pre_commit_inner(
1992 &mut self,
1993 _epoch: u64,
1994 metadata: Vec<SinkMetadata>,
1995 add_columns: Option<Vec<Field>>,
1996 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
1997 if let Some(add_columns) = add_columns {
1998 return Err(anyhow!(
1999 "Iceberg sink not support add columns, but got: {:?}",
2000 add_columns
2001 )
2002 .into());
2003 }
2004
2005 let write_results: Vec<IcebergCommitResult> = metadata
2006 .iter()
2007 .map(IcebergCommitResult::try_from)
2008 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2009
2010 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2012 return Ok(None);
2013 }
2014
2015 let expect_schema_id = write_results[0].schema_id;
2016 let expect_partition_spec_id = write_results[0].partition_spec_id;
2017
2018 if write_results
2020 .iter()
2021 .any(|r| r.schema_id != expect_schema_id)
2022 || write_results
2023 .iter()
2024 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2025 {
2026 return Err(SinkError::Iceberg(anyhow!(
2027 "schema_id and partition_spec_id should be the same in all write results"
2028 )));
2029 }
2030
2031 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2032
2033 Ok(Some((write_results, snapshot_id)))
2034 }
2035
2036 async fn commit_iceberg_inner(
2037 &mut self,
2038 epoch: u64,
2039 write_results: Vec<IcebergCommitResult>,
2040 snapshot_id: i64,
2041 ) -> Result<()> {
2042 assert!(
2044 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2045 );
2046
2047 self.wait_for_snapshot_limit().await?;
2049
2050 let expect_schema_id = write_results[0].schema_id;
2051 let expect_partition_spec_id = write_results[0].partition_spec_id;
2052
2053 self.table = Self::reload_table(
2055 self.catalog.as_ref(),
2056 self.table.identifier(),
2057 expect_schema_id,
2058 expect_partition_spec_id,
2059 )
2060 .await?;
2061
2062 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2063 return Err(SinkError::Iceberg(anyhow!(
2064 "Can't find schema by id {}",
2065 expect_schema_id
2066 )));
2067 };
2068 let Some(partition_spec) = self
2069 .table
2070 .metadata()
2071 .partition_spec_by_id(expect_partition_spec_id)
2072 else {
2073 return Err(SinkError::Iceberg(anyhow!(
2074 "Can't find partition spec by id {}",
2075 expect_partition_spec_id
2076 )));
2077 };
2078 let partition_type = partition_spec
2079 .as_ref()
2080 .clone()
2081 .partition_type(schema)
2082 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2083
2084 let data_files = write_results
2085 .into_iter()
2086 .flat_map(|r| {
2087 r.data_files.into_iter().map(|f| {
2088 f.try_into(expect_partition_spec_id, &partition_type, schema)
2089 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2090 })
2091 })
2092 .collect::<Result<Vec<DataFile>>>()?;
2093 let retry_strategy = ExponentialBackoff::from_millis(10)
2098 .max_delay(Duration::from_secs(60))
2099 .map(jitter)
2100 .take(self.commit_retry_num as usize);
2101 let catalog = self.catalog.clone();
2102 let table_ident = self.table.identifier().clone();
2103 let table = Retry::spawn(retry_strategy, || async {
2104 let table = Self::reload_table(
2105 catalog.as_ref(),
2106 &table_ident,
2107 expect_schema_id,
2108 expect_partition_spec_id,
2109 )
2110 .await?;
2111 let txn = Transaction::new(&table);
2112 let append_action = txn
2113 .fast_append()
2114 .set_snapshot_id(snapshot_id)
2115 .set_target_branch(commit_branch(
2116 self.config.r#type.as_str(),
2117 self.config.write_mode,
2118 ))
2119 .add_data_files(data_files.clone());
2120
2121 let tx = append_action.apply(txn).map_err(|err| {
2122 let err: IcebergError = err.into();
2123 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2124 SinkError::Iceberg(anyhow!(err))
2125 })?;
2126 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2127 let err: IcebergError = err.into();
2128 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2129 SinkError::Iceberg(anyhow!(err))
2130 })
2131 })
2132 .await?;
2133 self.table = table;
2134
2135 let snapshot_num = self.table.metadata().snapshots().count();
2136 let catalog_name = self.config.common.catalog_name();
2137 let table_name = self.table.identifier().to_string();
2138 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2139 GLOBAL_SINK_METRICS
2140 .iceberg_snapshot_num
2141 .with_guarded_label_values(&metrics_labels)
2142 .set(snapshot_num as i64);
2143
2144 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2145
2146 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2147 && self.config.enable_compaction
2148 && iceberg_compact_stat_sender
2149 .send(IcebergSinkCompactionUpdate {
2150 sink_id: self.sink_id,
2151 compaction_interval: self.config.compaction_interval_sec(),
2152 force_compaction: false,
2153 })
2154 .is_err()
2155 {
2156 warn!("failed to send iceberg compaction stats");
2157 }
2158
2159 Ok(())
2160 }
2161
2162 async fn is_snapshot_id_in_iceberg(
2166 &self,
2167 iceberg_config: &IcebergConfig,
2168 snapshot_id: i64,
2169 ) -> Result<bool> {
2170 let table = iceberg_config.load_table().await?;
2171 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2172 Ok(true)
2173 } else {
2174 Ok(false)
2175 }
2176 }
2177
2178 fn count_snapshots_since_rewrite(&self) -> usize {
2181 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2182 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2183
2184 let mut count = 0;
2186 for snapshot in snapshots {
2187 let summary = snapshot.summary();
2189 match &summary.operation {
2190 Operation::Replace => {
2191 break;
2193 }
2194
2195 _ => {
2196 count += 1;
2198 }
2199 }
2200 }
2201
2202 count
2203 }
2204
2205 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2207 if !self.config.enable_compaction {
2208 return Ok(());
2209 }
2210
2211 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2212 loop {
2213 let current_count = self.count_snapshots_since_rewrite();
2214
2215 if current_count < max_snapshots {
2216 tracing::info!(
2217 "Snapshot count check passed: {} < {}",
2218 current_count,
2219 max_snapshots
2220 );
2221 break;
2222 }
2223
2224 tracing::info!(
2225 "Snapshot count {} exceeds limit {}, waiting...",
2226 current_count,
2227 max_snapshots
2228 );
2229
2230 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2231 && iceberg_compact_stat_sender
2232 .send(IcebergSinkCompactionUpdate {
2233 sink_id: self.sink_id,
2234 compaction_interval: self.config.compaction_interval_sec(),
2235 force_compaction: true,
2236 })
2237 .is_err()
2238 {
2239 tracing::warn!("failed to send iceberg compaction stats");
2240 }
2241
2242 tokio::time::sleep(Duration::from_secs(30)).await;
2244
2245 self.table = self.config.load_table().await?;
2247 }
2248 }
2249 Ok(())
2250 }
2251}
2252
2253const MAP_KEY: &str = "key";
2254const MAP_VALUE: &str = "value";
2255
2256fn get_fields<'a>(
2257 our_field_type: &'a risingwave_common::types::DataType,
2258 data_type: &ArrowDataType,
2259 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2260) -> Option<ArrowFields> {
2261 match data_type {
2262 ArrowDataType::Struct(fields) => {
2263 match our_field_type {
2264 risingwave_common::types::DataType::Struct(struct_fields) => {
2265 struct_fields.iter().for_each(|(name, data_type)| {
2266 let res = schema_fields.insert(name, data_type);
2267 assert!(res.is_none())
2269 });
2270 }
2271 risingwave_common::types::DataType::Map(map_fields) => {
2272 schema_fields.insert(MAP_KEY, map_fields.key());
2273 schema_fields.insert(MAP_VALUE, map_fields.value());
2274 }
2275 risingwave_common::types::DataType::List(list) => {
2276 list.elem()
2277 .as_struct()
2278 .iter()
2279 .for_each(|(name, data_type)| {
2280 let res = schema_fields.insert(name, data_type);
2281 assert!(res.is_none())
2283 });
2284 }
2285 _ => {}
2286 };
2287 Some(fields.clone())
2288 }
2289 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2290 get_fields(our_field_type, field.data_type(), schema_fields)
2291 }
2292 _ => None, }
2294}
2295
2296fn check_compatibility(
2297 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2298 fields: &ArrowFields,
2299) -> anyhow::Result<bool> {
2300 for arrow_field in fields {
2301 let our_field_type = schema_fields
2302 .get(arrow_field.name().as_str())
2303 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2304
2305 let converted_arrow_data_type = IcebergArrowConvert
2307 .to_arrow_field("", our_field_type)
2308 .map_err(|e| anyhow!(e))?
2309 .data_type()
2310 .clone();
2311
2312 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2313 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2314 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2315 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2316 (ArrowDataType::List(_), ArrowDataType::List(field))
2317 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2318 let mut schema_fields = HashMap::new();
2319 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2320 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2321 }
2322 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2324 let mut schema_fields = HashMap::new();
2325 our_field_type
2326 .as_struct()
2327 .iter()
2328 .for_each(|(name, data_type)| {
2329 let res = schema_fields.insert(name, data_type);
2330 assert!(res.is_none())
2332 });
2333 check_compatibility(schema_fields, fields)?
2334 }
2335 (left, right) => left.equals_datatype(right),
2343 };
2344 if !compatible {
2345 bail!(
2346 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2347 arrow_field.name(),
2348 converted_arrow_data_type,
2349 arrow_field.data_type()
2350 );
2351 }
2352 }
2353 Ok(true)
2354}
2355
2356pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2358 if rw_schema.fields.len() != arrow_schema.fields().len() {
2359 bail!(
2360 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2361 rw_schema.fields.len(),
2362 arrow_schema.fields.len()
2363 );
2364 }
2365
2366 let mut schema_fields = HashMap::new();
2367 rw_schema.fields.iter().for_each(|field| {
2368 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2369 assert!(res.is_none())
2371 });
2372
2373 check_compatibility(schema_fields, &arrow_schema.fields)?;
2374 Ok(())
2375}
2376
2377pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2378 serde_json::to_vec(&metadata).unwrap()
2379}
2380
2381pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2382 serde_json::from_slice(&bytes).unwrap()
2383}
2384
2385pub fn parse_partition_by_exprs(
2386 expr: String,
2387) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2388 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2390 if !re.is_match(&expr) {
2391 bail!(format!(
2392 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2393 expr
2394 ))
2395 }
2396 let caps = re.captures_iter(&expr);
2397
2398 let mut partition_columns = vec![];
2399
2400 for mat in caps {
2401 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2402 (&mat["transform"], Transform::Identity)
2403 } else {
2404 let mut func = mat["transform"].to_owned();
2405 if func == "bucket" || func == "truncate" {
2406 let n = &mat
2407 .name("n")
2408 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2409 .as_str();
2410 func = format!("{func}[{n}]");
2411 }
2412 (
2413 &mat["field"],
2414 Transform::from_str(&func)
2415 .with_context(|| format!("invalid transform function {}", func))?,
2416 )
2417 };
2418 partition_columns.push((column.to_owned(), transform));
2419 }
2420 Ok(partition_columns)
2421}
2422
2423pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2424 if should_enable_iceberg_cow(sink_type, write_mode) {
2425 ICEBERG_COW_BRANCH.to_owned()
2426 } else {
2427 MAIN_BRANCH.to_owned()
2428 }
2429}
2430
2431pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2432 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2433}
2434
2435impl crate::with_options::WithOptions for IcebergWriteMode {}
2436
2437impl crate::with_options::WithOptions for CompactionType {}
2438
2439#[cfg(test)]
2440mod test {
2441 use std::collections::BTreeMap;
2442
2443 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2444 use risingwave_common::types::{DataType, MapType, StructType};
2445
2446 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2447 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2448 use crate::sink::iceberg::{
2449 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2450 ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2451 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2452 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2453 };
2454
2455 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2458 fn test_compatible_arrow_schema() {
2459 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2460
2461 use super::*;
2462 let risingwave_schema = Schema::new(vec![
2463 Field::with_name(DataType::Int32, "a"),
2464 Field::with_name(DataType::Int32, "b"),
2465 Field::with_name(DataType::Int32, "c"),
2466 ]);
2467 let arrow_schema = ArrowSchema::new(vec![
2468 ArrowField::new("a", ArrowDataType::Int32, false),
2469 ArrowField::new("b", ArrowDataType::Int32, false),
2470 ArrowField::new("c", ArrowDataType::Int32, false),
2471 ]);
2472
2473 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2474
2475 let risingwave_schema = Schema::new(vec![
2476 Field::with_name(DataType::Int32, "d"),
2477 Field::with_name(DataType::Int32, "c"),
2478 Field::with_name(DataType::Int32, "a"),
2479 Field::with_name(DataType::Int32, "b"),
2480 ]);
2481 let arrow_schema = ArrowSchema::new(vec![
2482 ArrowField::new("a", ArrowDataType::Int32, false),
2483 ArrowField::new("b", ArrowDataType::Int32, false),
2484 ArrowField::new("d", ArrowDataType::Int32, false),
2485 ArrowField::new("c", ArrowDataType::Int32, false),
2486 ]);
2487 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2488
2489 let risingwave_schema = Schema::new(vec![
2490 Field::with_name(
2491 DataType::Struct(StructType::new(vec![
2492 ("a1", DataType::Int32),
2493 (
2494 "a2",
2495 DataType::Struct(StructType::new(vec![
2496 ("a21", DataType::Bytea),
2497 (
2498 "a22",
2499 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2500 ),
2501 ])),
2502 ),
2503 ])),
2504 "a",
2505 ),
2506 Field::with_name(
2507 DataType::list(DataType::Struct(StructType::new(vec![
2508 ("b1", DataType::Int32),
2509 ("b2", DataType::Bytea),
2510 (
2511 "b3",
2512 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2513 ),
2514 ]))),
2515 "b",
2516 ),
2517 Field::with_name(
2518 DataType::Map(MapType::from_kv(
2519 DataType::Varchar,
2520 DataType::list(DataType::Struct(StructType::new([
2521 ("c1", DataType::Int32),
2522 ("c2", DataType::Bytea),
2523 (
2524 "c3",
2525 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2526 ),
2527 ]))),
2528 )),
2529 "c",
2530 ),
2531 ]);
2532 let arrow_schema = ArrowSchema::new(vec![
2533 ArrowField::new(
2534 "a",
2535 ArrowDataType::Struct(ArrowFields::from(vec![
2536 ArrowField::new("a1", ArrowDataType::Int32, false),
2537 ArrowField::new(
2538 "a2",
2539 ArrowDataType::Struct(ArrowFields::from(vec![
2540 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2541 ArrowField::new_map(
2542 "a22",
2543 "entries",
2544 ArrowFieldRef::new(ArrowField::new(
2545 "key",
2546 ArrowDataType::Utf8,
2547 false,
2548 )),
2549 ArrowFieldRef::new(ArrowField::new(
2550 "value",
2551 ArrowDataType::Utf8,
2552 false,
2553 )),
2554 false,
2555 false,
2556 ),
2557 ])),
2558 false,
2559 ),
2560 ])),
2561 false,
2562 ),
2563 ArrowField::new(
2564 "b",
2565 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2566 ArrowDataType::Struct(ArrowFields::from(vec![
2567 ArrowField::new("b1", ArrowDataType::Int32, false),
2568 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2569 ArrowField::new_map(
2570 "b3",
2571 "entries",
2572 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2573 ArrowFieldRef::new(ArrowField::new(
2574 "value",
2575 ArrowDataType::Utf8,
2576 false,
2577 )),
2578 false,
2579 false,
2580 ),
2581 ])),
2582 false,
2583 ))),
2584 false,
2585 ),
2586 ArrowField::new_map(
2587 "c",
2588 "entries",
2589 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2590 ArrowFieldRef::new(ArrowField::new(
2591 "value",
2592 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2593 ArrowDataType::Struct(ArrowFields::from(vec![
2594 ArrowField::new("c1", ArrowDataType::Int32, false),
2595 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2596 ArrowField::new_map(
2597 "c3",
2598 "entries",
2599 ArrowFieldRef::new(ArrowField::new(
2600 "key",
2601 ArrowDataType::Utf8,
2602 false,
2603 )),
2604 ArrowFieldRef::new(ArrowField::new(
2605 "value",
2606 ArrowDataType::Utf8,
2607 false,
2608 )),
2609 false,
2610 false,
2611 ),
2612 ])),
2613 false,
2614 ))),
2615 false,
2616 )),
2617 false,
2618 false,
2619 ),
2620 ]);
2621 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2622 }
2623
2624 #[test]
2625 fn test_parse_iceberg_config() {
2626 let values = [
2627 ("connector", "iceberg"),
2628 ("type", "upsert"),
2629 ("primary_key", "v1"),
2630 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2631 ("warehouse.path", "s3://iceberg"),
2632 ("s3.endpoint", "http://127.0.0.1:9301"),
2633 ("s3.access.key", "hummockadmin"),
2634 ("s3.secret.key", "hummockadmin"),
2635 ("s3.path.style.access", "true"),
2636 ("s3.region", "us-east-1"),
2637 ("catalog.type", "jdbc"),
2638 ("catalog.name", "demo"),
2639 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2640 ("catalog.jdbc.user", "admin"),
2641 ("catalog.jdbc.password", "123456"),
2642 ("database.name", "demo_db"),
2643 ("table.name", "demo_table"),
2644 ("enable_compaction", "true"),
2645 ("compaction_interval_sec", "1800"),
2646 ("enable_snapshot_expiration", "true"),
2647 ]
2648 .into_iter()
2649 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2650 .collect();
2651
2652 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2653
2654 let expected_iceberg_config = IcebergConfig {
2655 common: IcebergCommon {
2656 warehouse_path: Some("s3://iceberg".to_owned()),
2657 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2658 s3_region: Some("us-east-1".to_owned()),
2659 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2660 s3_access_key: Some("hummockadmin".to_owned()),
2661 s3_secret_key: Some("hummockadmin".to_owned()),
2662 s3_iam_role_arn: None,
2663 gcs_credential: None,
2664 catalog_type: Some("jdbc".to_owned()),
2665 glue_id: None,
2666 glue_region: None,
2667 glue_access_key: None,
2668 glue_secret_key: None,
2669 glue_iam_role_arn: None,
2670 catalog_name: Some("demo".to_owned()),
2671 s3_path_style_access: Some(true),
2672 catalog_credential: None,
2673 catalog_oauth2_server_uri: None,
2674 catalog_scope: None,
2675 catalog_token: None,
2676 enable_config_load: None,
2677 rest_signing_name: None,
2678 rest_signing_region: None,
2679 rest_sigv4_enabled: None,
2680 hosted_catalog: None,
2681 azblob_account_name: None,
2682 azblob_account_key: None,
2683 azblob_endpoint_url: None,
2684 catalog_header: None,
2685 adlsgen2_account_name: None,
2686 adlsgen2_account_key: None,
2687 adlsgen2_endpoint: None,
2688 vended_credentials: None,
2689 },
2690 table: IcebergTableIdentifier {
2691 database_name: Some("demo_db".to_owned()),
2692 table_name: "demo_table".to_owned(),
2693 },
2694 r#type: "upsert".to_owned(),
2695 force_append_only: false,
2696 primary_key: Some(vec!["v1".to_owned()]),
2697 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2698 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2699 .into_iter()
2700 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2701 .collect(),
2702 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2703 create_table_if_not_exists: false,
2704 is_exactly_once: Some(true),
2705 commit_retry_num: 8,
2706 enable_compaction: true,
2707 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2708 enable_snapshot_expiration: true,
2709 write_mode: IcebergWriteMode::MergeOnRead,
2710 snapshot_expiration_max_age_millis: None,
2711 snapshot_expiration_retain_last: None,
2712 snapshot_expiration_clear_expired_files: true,
2713 snapshot_expiration_clear_expired_meta_data: true,
2714 max_snapshots_num_before_compaction: None,
2715 small_files_threshold_mb: None,
2716 delete_files_count_threshold: None,
2717 trigger_snapshot_count: None,
2718 target_file_size_mb: None,
2719 compaction_type: None,
2720 };
2721
2722 assert_eq!(iceberg_config, expected_iceberg_config);
2723
2724 assert_eq!(
2725 &iceberg_config.full_table_name().unwrap().to_string(),
2726 "demo_db.demo_table"
2727 );
2728 }
2729
2730 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2731 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2732
2733 let _table = iceberg_config.load_table().await.unwrap();
2734 }
2735
2736 #[tokio::test]
2737 #[ignore]
2738 async fn test_storage_catalog() {
2739 let values = [
2740 ("connector", "iceberg"),
2741 ("type", "append-only"),
2742 ("force_append_only", "true"),
2743 ("s3.endpoint", "http://127.0.0.1:9301"),
2744 ("s3.access.key", "hummockadmin"),
2745 ("s3.secret.key", "hummockadmin"),
2746 ("s3.region", "us-east-1"),
2747 ("s3.path.style.access", "true"),
2748 ("catalog.name", "demo"),
2749 ("catalog.type", "storage"),
2750 ("warehouse.path", "s3://icebergdata/demo"),
2751 ("database.name", "s1"),
2752 ("table.name", "t1"),
2753 ]
2754 .into_iter()
2755 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2756 .collect();
2757
2758 test_create_catalog(values).await;
2759 }
2760
2761 #[tokio::test]
2762 #[ignore]
2763 async fn test_rest_catalog() {
2764 let values = [
2765 ("connector", "iceberg"),
2766 ("type", "append-only"),
2767 ("force_append_only", "true"),
2768 ("s3.endpoint", "http://127.0.0.1:9301"),
2769 ("s3.access.key", "hummockadmin"),
2770 ("s3.secret.key", "hummockadmin"),
2771 ("s3.region", "us-east-1"),
2772 ("s3.path.style.access", "true"),
2773 ("catalog.name", "demo"),
2774 ("catalog.type", "rest"),
2775 ("catalog.uri", "http://192.168.167.4:8181"),
2776 ("warehouse.path", "s3://icebergdata/demo"),
2777 ("database.name", "s1"),
2778 ("table.name", "t1"),
2779 ]
2780 .into_iter()
2781 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2782 .collect();
2783
2784 test_create_catalog(values).await;
2785 }
2786
2787 #[tokio::test]
2788 #[ignore]
2789 async fn test_jdbc_catalog() {
2790 let values = [
2791 ("connector", "iceberg"),
2792 ("type", "append-only"),
2793 ("force_append_only", "true"),
2794 ("s3.endpoint", "http://127.0.0.1:9301"),
2795 ("s3.access.key", "hummockadmin"),
2796 ("s3.secret.key", "hummockadmin"),
2797 ("s3.region", "us-east-1"),
2798 ("s3.path.style.access", "true"),
2799 ("catalog.name", "demo"),
2800 ("catalog.type", "jdbc"),
2801 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2802 ("catalog.jdbc.user", "admin"),
2803 ("catalog.jdbc.password", "123456"),
2804 ("warehouse.path", "s3://icebergdata/demo"),
2805 ("database.name", "s1"),
2806 ("table.name", "t1"),
2807 ]
2808 .into_iter()
2809 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2810 .collect();
2811
2812 test_create_catalog(values).await;
2813 }
2814
2815 #[tokio::test]
2816 #[ignore]
2817 async fn test_hive_catalog() {
2818 let values = [
2819 ("connector", "iceberg"),
2820 ("type", "append-only"),
2821 ("force_append_only", "true"),
2822 ("s3.endpoint", "http://127.0.0.1:9301"),
2823 ("s3.access.key", "hummockadmin"),
2824 ("s3.secret.key", "hummockadmin"),
2825 ("s3.region", "us-east-1"),
2826 ("s3.path.style.access", "true"),
2827 ("catalog.name", "demo"),
2828 ("catalog.type", "hive"),
2829 ("catalog.uri", "thrift://localhost:9083"),
2830 ("warehouse.path", "s3://icebergdata/demo"),
2831 ("database.name", "s1"),
2832 ("table.name", "t1"),
2833 ]
2834 .into_iter()
2835 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2836 .collect();
2837
2838 test_create_catalog(values).await;
2839 }
2840
2841 #[test]
2842 fn test_config_constants_consistency() {
2843 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2846 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2847 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2848 assert_eq!(WRITE_MODE, "write_mode");
2849 assert_eq!(
2850 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2851 "snapshot_expiration_retain_last"
2852 );
2853 assert_eq!(
2854 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2855 "snapshot_expiration_max_age_millis"
2856 );
2857 assert_eq!(
2858 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2859 "snapshot_expiration_clear_expired_files"
2860 );
2861 assert_eq!(
2862 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2863 "snapshot_expiration_clear_expired_meta_data"
2864 );
2865 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2866 }
2867
2868 #[test]
2869 fn test_parse_iceberg_compaction_config() {
2870 let values = [
2872 ("connector", "iceberg"),
2873 ("type", "upsert"),
2874 ("primary_key", "id"),
2875 ("warehouse.path", "s3://iceberg"),
2876 ("s3.endpoint", "http://127.0.0.1:9301"),
2877 ("s3.access.key", "test"),
2878 ("s3.secret.key", "test"),
2879 ("s3.region", "us-east-1"),
2880 ("catalog.type", "storage"),
2881 ("catalog.name", "demo"),
2882 ("database.name", "test_db"),
2883 ("table.name", "test_table"),
2884 ("enable_compaction", "true"),
2885 ("compaction.max_snapshots_num", "100"),
2886 ("compaction.small_files_threshold_mb", "512"),
2887 ("compaction.delete_files_count_threshold", "50"),
2888 ("compaction.trigger_snapshot_count", "10"),
2889 ("compaction.target_file_size_mb", "256"),
2890 ("compaction.type", "full"),
2891 ]
2892 .into_iter()
2893 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2894 .collect();
2895
2896 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2897
2898 assert!(iceberg_config.enable_compaction);
2900 assert_eq!(
2901 iceberg_config.max_snapshots_num_before_compaction,
2902 Some(100)
2903 );
2904 assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2905 assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2906 assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2907 assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2908 assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
2909 }
2910}