1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31 SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40 POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45 DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58 Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use serde::{Deserialize, Serialize};
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
84use super::{
85 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86 SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::writer::SinkWriter;
93use crate::sink::{
94 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
95 TwoPhaseCommitCoordinator,
96};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100pub const ICEBERG_COW_BRANCH: &str = "ingestion";
101pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
102pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
103pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
104pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
105pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
108#[serde(rename_all = "kebab-case")]
109pub enum IcebergWriteMode {
110 #[default]
111 MergeOnRead,
112 CopyOnWrite,
113}
114
115impl IcebergWriteMode {
116 pub fn as_str(self) -> &'static str {
117 match self {
118 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
119 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
120 }
121 }
122}
123
124impl std::str::FromStr for IcebergWriteMode {
125 type Err = SinkError;
126
127 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
128 match s {
129 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
130 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
131 _ => Err(SinkError::Config(anyhow!(format!(
132 "invalid write_mode: {}, must be one of: {}, {}",
133 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
134 )))),
135 }
136 }
137}
138
139impl TryFrom<&str> for IcebergWriteMode {
140 type Error = <Self as std::str::FromStr>::Err;
141
142 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
143 value.parse()
144 }
145}
146
147impl TryFrom<String> for IcebergWriteMode {
148 type Error = <Self as std::str::FromStr>::Err;
149
150 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
151 value.as_str().parse()
152 }
153}
154
155impl std::fmt::Display for IcebergWriteMode {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.write_str(self.as_str())
158 }
159}
160
161pub const ENABLE_COMPACTION: &str = "enable_compaction";
163pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
164pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
165pub const WRITE_MODE: &str = "write_mode";
166pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
167pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
168pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
169pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
170 "snapshot_expiration_clear_expired_meta_data";
171pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
172
173pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
174
175pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
176
177pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
178
179pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
180
181pub const COMPACTION_TYPE: &str = "compaction.type";
182
183fn default_commit_retry_num() -> u32 {
184 8
185}
186
187fn default_iceberg_write_mode() -> IcebergWriteMode {
188 IcebergWriteMode::MergeOnRead
189}
190
191fn default_true() -> bool {
192 true
193}
194
195fn default_some_true() -> Option<bool> {
196 Some(true)
197}
198
199#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
201#[serde(rename_all = "kebab-case")]
202pub enum CompactionType {
203 #[default]
205 Full,
206 SmallFiles,
208 FilesWithDelete,
210}
211
212impl CompactionType {
213 pub fn as_str(&self) -> &'static str {
214 match self {
215 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
216 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
217 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
218 }
219 }
220}
221
222impl std::str::FromStr for CompactionType {
223 type Err = SinkError;
224
225 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
226 match s {
227 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
228 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
229 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
230 _ => Err(SinkError::Config(anyhow!(format!(
231 "invalid compaction_type: {}, must be one of: {}, {}, {}",
232 s,
233 ICEBERG_COMPACTION_TYPE_FULL,
234 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
235 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
236 )))),
237 }
238 }
239}
240
241impl TryFrom<&str> for CompactionType {
242 type Error = <Self as std::str::FromStr>::Err;
243
244 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
245 value.parse()
246 }
247}
248
249impl TryFrom<String> for CompactionType {
250 type Error = <Self as std::str::FromStr>::Err;
251
252 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
253 value.as_str().parse()
254 }
255}
256
257impl std::fmt::Display for CompactionType {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 write!(f, "{}", self.as_str())
260 }
261}
262
263#[serde_as]
264#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
265pub struct IcebergConfig {
266 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
269 pub force_append_only: bool,
270
271 #[serde(flatten)]
272 common: IcebergCommon,
273
274 #[serde(flatten)]
275 table: IcebergTableIdentifier,
276
277 #[serde(
278 rename = "primary_key",
279 default,
280 deserialize_with = "deserialize_optional_string_seq_from_string"
281 )]
282 pub primary_key: Option<Vec<String>>,
283
284 #[serde(skip)]
286 pub java_catalog_props: HashMap<String, String>,
287
288 #[serde(default)]
289 pub partition_by: Option<String>,
290
291 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
293 #[serde_as(as = "DisplayFromStr")]
294 #[with_option(allow_alter_on_fly)]
295 pub commit_checkpoint_interval: u64,
296
297 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
298 pub create_table_if_not_exists: bool,
299
300 #[serde(default = "default_some_true")]
302 #[serde_as(as = "Option<DisplayFromStr>")]
303 pub is_exactly_once: Option<bool>,
304 #[serde(default = "default_commit_retry_num")]
309 pub commit_retry_num: u32,
310
311 #[serde(
313 rename = "enable_compaction",
314 default,
315 deserialize_with = "deserialize_bool_from_string"
316 )]
317 #[with_option(allow_alter_on_fly)]
318 pub enable_compaction: bool,
319
320 #[serde(rename = "compaction_interval_sec", default)]
322 #[serde_as(as = "Option<DisplayFromStr>")]
323 #[with_option(allow_alter_on_fly)]
324 pub compaction_interval_sec: Option<u64>,
325
326 #[serde(
328 rename = "enable_snapshot_expiration",
329 default,
330 deserialize_with = "deserialize_bool_from_string"
331 )]
332 #[with_option(allow_alter_on_fly)]
333 pub enable_snapshot_expiration: bool,
334
335 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
337 pub write_mode: IcebergWriteMode,
338
339 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
342 #[serde_as(as = "Option<DisplayFromStr>")]
343 #[with_option(allow_alter_on_fly)]
344 pub snapshot_expiration_max_age_millis: Option<i64>,
345
346 #[serde(rename = "snapshot_expiration_retain_last", default)]
348 #[serde_as(as = "Option<DisplayFromStr>")]
349 #[with_option(allow_alter_on_fly)]
350 pub snapshot_expiration_retain_last: Option<i32>,
351
352 #[serde(
353 rename = "snapshot_expiration_clear_expired_files",
354 default = "default_true",
355 deserialize_with = "deserialize_bool_from_string"
356 )]
357 #[with_option(allow_alter_on_fly)]
358 pub snapshot_expiration_clear_expired_files: bool,
359
360 #[serde(
361 rename = "snapshot_expiration_clear_expired_meta_data",
362 default = "default_true",
363 deserialize_with = "deserialize_bool_from_string"
364 )]
365 #[with_option(allow_alter_on_fly)]
366 pub snapshot_expiration_clear_expired_meta_data: bool,
367
368 #[serde(rename = "compaction.max_snapshots_num", default)]
371 #[serde_as(as = "Option<DisplayFromStr>")]
372 #[with_option(allow_alter_on_fly)]
373 pub max_snapshots_num_before_compaction: Option<usize>,
374
375 #[serde(rename = "compaction.small_files_threshold_mb", default)]
376 #[serde_as(as = "Option<DisplayFromStr>")]
377 #[with_option(allow_alter_on_fly)]
378 pub small_files_threshold_mb: Option<u64>,
379
380 #[serde(rename = "compaction.delete_files_count_threshold", default)]
381 #[serde_as(as = "Option<DisplayFromStr>")]
382 #[with_option(allow_alter_on_fly)]
383 pub delete_files_count_threshold: Option<usize>,
384
385 #[serde(rename = "compaction.trigger_snapshot_count", default)]
386 #[serde_as(as = "Option<DisplayFromStr>")]
387 #[with_option(allow_alter_on_fly)]
388 pub trigger_snapshot_count: Option<usize>,
389
390 #[serde(rename = "compaction.target_file_size_mb", default)]
391 #[serde_as(as = "Option<DisplayFromStr>")]
392 #[with_option(allow_alter_on_fly)]
393 pub target_file_size_mb: Option<u64>,
394
395 #[serde(rename = "compaction.type", default)]
398 #[with_option(allow_alter_on_fly)]
399 pub compaction_type: Option<CompactionType>,
400}
401
402impl EnforceSecret for IcebergConfig {
403 fn enforce_secret<'a>(
404 prop_iter: impl Iterator<Item = &'a str>,
405 ) -> crate::error::ConnectorResult<()> {
406 for prop in prop_iter {
407 IcebergCommon::enforce_one(prop)?;
408 }
409 Ok(())
410 }
411
412 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
413 IcebergCommon::enforce_one(prop)
414 }
415}
416
417impl IcebergConfig {
418 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
419 let mut config =
420 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
421 .map_err(|e| SinkError::Config(anyhow!(e)))?;
422
423 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
424 return Err(SinkError::Config(anyhow!(
425 "`{}` must be {}, or {}",
426 SINK_TYPE_OPTION,
427 SINK_TYPE_APPEND_ONLY,
428 SINK_TYPE_UPSERT
429 )));
430 }
431
432 if config.r#type == SINK_TYPE_UPSERT {
433 if let Some(primary_key) = &config.primary_key {
434 if primary_key.is_empty() {
435 return Err(SinkError::Config(anyhow!(
436 "`primary-key` must not be empty in {}",
437 SINK_TYPE_UPSERT
438 )));
439 }
440 } else {
441 return Err(SinkError::Config(anyhow!(
442 "Must set `primary-key` in {}",
443 SINK_TYPE_UPSERT
444 )));
445 }
446 }
447
448 config.java_catalog_props = values
450 .iter()
451 .filter(|(k, _v)| {
452 k.starts_with("catalog.")
453 && k != &"catalog.uri"
454 && k != &"catalog.type"
455 && k != &"catalog.name"
456 && k != &"catalog.header"
457 })
458 .map(|(k, v)| (k[8..].to_string(), v.clone()))
459 .collect();
460
461 if config.commit_checkpoint_interval == 0 {
462 return Err(SinkError::Config(anyhow!(
463 "`commit-checkpoint-interval` must be greater than 0"
464 )));
465 }
466
467 Ok(config)
468 }
469
470 pub fn catalog_type(&self) -> &str {
471 self.common.catalog_type()
472 }
473
474 pub async fn load_table(&self) -> Result<Table> {
475 self.common
476 .load_table(&self.table, &self.java_catalog_props)
477 .await
478 .map_err(Into::into)
479 }
480
481 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
482 self.common
483 .create_catalog(&self.java_catalog_props)
484 .await
485 .map_err(Into::into)
486 }
487
488 pub fn full_table_name(&self) -> Result<TableIdent> {
489 self.table.to_table_ident().map_err(Into::into)
490 }
491
492 pub fn catalog_name(&self) -> String {
493 self.common.catalog_name()
494 }
495
496 pub fn compaction_interval_sec(&self) -> u64 {
497 self.compaction_interval_sec.unwrap_or(3600)
499 }
500
501 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
504 self.snapshot_expiration_max_age_millis
505 .map(|max_age_millis| current_time_ms - max_age_millis)
506 }
507
508 pub fn trigger_snapshot_count(&self) -> usize {
509 self.trigger_snapshot_count.unwrap_or(16)
510 }
511
512 pub fn small_files_threshold_mb(&self) -> u64 {
513 self.small_files_threshold_mb.unwrap_or(64)
514 }
515
516 pub fn delete_files_count_threshold(&self) -> usize {
517 self.delete_files_count_threshold.unwrap_or(256)
518 }
519
520 pub fn target_file_size_mb(&self) -> u64 {
521 self.target_file_size_mb.unwrap_or(1024)
522 }
523
524 pub fn compaction_type(&self) -> CompactionType {
527 self.compaction_type.unwrap_or_default()
528 }
529}
530
531pub struct IcebergSink {
532 pub config: IcebergConfig,
533 param: SinkParam,
534 unique_column_ids: Option<Vec<usize>>,
536}
537
538impl EnforceSecret for IcebergSink {
539 fn enforce_secret<'a>(
540 prop_iter: impl Iterator<Item = &'a str>,
541 ) -> crate::error::ConnectorResult<()> {
542 for prop in prop_iter {
543 IcebergConfig::enforce_one(prop)?;
544 }
545 Ok(())
546 }
547}
548
549impl TryFrom<SinkParam> for IcebergSink {
550 type Error = SinkError;
551
552 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
553 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
554 IcebergSink::new(config, param)
555 }
556}
557
558impl Debug for IcebergSink {
559 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560 f.debug_struct("IcebergSink")
561 .field("config", &self.config)
562 .finish()
563 }
564}
565
566async fn create_and_validate_table_impl(
567 config: &IcebergConfig,
568 param: &SinkParam,
569) -> Result<Table> {
570 if config.create_table_if_not_exists {
571 create_table_if_not_exists_impl(config, param).await?;
572 }
573
574 let table = config
575 .load_table()
576 .await
577 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
578
579 let sink_schema = param.schema();
580 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
581 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
582
583 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
584 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
585
586 Ok(table)
587}
588
589async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
590 let catalog = config.create_catalog().await?;
591 let namespace = if let Some(database_name) = config.table.database_name() {
592 let namespace = NamespaceIdent::new(database_name.to_owned());
593 if !catalog
594 .namespace_exists(&namespace)
595 .await
596 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
597 {
598 catalog
599 .create_namespace(&namespace, HashMap::default())
600 .await
601 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
602 .context("failed to create iceberg namespace")?;
603 }
604 namespace
605 } else {
606 bail!("database name must be set if you want to create table")
607 };
608
609 let table_id = config
610 .full_table_name()
611 .context("Unable to parse table name")?;
612 if !catalog
613 .table_exists(&table_id)
614 .await
615 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
616 {
617 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
618 let arrow_fields = param
620 .columns
621 .iter()
622 .map(|column| {
623 Ok(iceberg_create_table_arrow_convert
624 .to_arrow_field(&column.name, &column.data_type)
625 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
626 .context(format!(
627 "failed to convert {}: {} to arrow type",
628 &column.name, &column.data_type
629 ))?)
630 })
631 .collect::<Result<Vec<ArrowField>>>()?;
632 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
633 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
634 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
635 .context("failed to convert arrow schema to iceberg schema")?;
636
637 let location = {
638 let mut names = namespace.clone().inner();
639 names.push(config.table.table_name().to_owned());
640 match &config.common.warehouse_path {
641 Some(warehouse_path) => {
642 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
643 let url = Url::parse(warehouse_path);
644 if url.is_err() || is_s3_tables {
645 if config.common.catalog_type() == "rest"
648 || config.common.catalog_type() == "rest_rust"
649 {
650 None
651 } else {
652 bail!(format!("Invalid warehouse path: {}", warehouse_path))
653 }
654 } else if warehouse_path.ends_with('/') {
655 Some(format!("{}{}", warehouse_path, names.join("/")))
656 } else {
657 Some(format!("{}/{}", warehouse_path, names.join("/")))
658 }
659 }
660 None => None,
661 }
662 };
663
664 let partition_spec = match &config.partition_by {
665 Some(partition_by) => {
666 let mut partition_fields = Vec::<UnboundPartitionField>::new();
667 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
668 .into_iter()
669 .enumerate()
670 {
671 match iceberg_schema.field_id_by_name(&column) {
672 Some(id) => partition_fields.push(
673 UnboundPartitionField::builder()
674 .source_id(id)
675 .transform(transform)
676 .name(format!("_p_{}", column))
677 .field_id(i as i32)
678 .build(),
679 ),
680 None => bail!(format!(
681 "Partition source column does not exist in schema: {}",
682 column
683 )),
684 };
685 }
686 Some(
687 UnboundPartitionSpec::builder()
688 .with_spec_id(0)
689 .add_partition_fields(partition_fields)
690 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
691 .context("failed to add partition columns")?
692 .build(),
693 )
694 }
695 None => None,
696 };
697
698 let table_creation_builder = TableCreation::builder()
699 .name(config.table.table_name().to_owned())
700 .schema(iceberg_schema);
701
702 let table_creation = match (location, partition_spec) {
703 (Some(location), Some(partition_spec)) => table_creation_builder
704 .location(location)
705 .partition_spec(partition_spec)
706 .build(),
707 (Some(location), None) => table_creation_builder.location(location).build(),
708 (None, Some(partition_spec)) => table_creation_builder
709 .partition_spec(partition_spec)
710 .build(),
711 (None, None) => table_creation_builder.build(),
712 };
713
714 catalog
715 .create_table(&namespace, table_creation)
716 .await
717 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
718 .context("failed to create iceberg table")?;
719 }
720 Ok(())
721}
722
723impl IcebergSink {
724 pub async fn create_and_validate_table(&self) -> Result<Table> {
725 create_and_validate_table_impl(&self.config, &self.param).await
726 }
727
728 pub async fn create_table_if_not_exists(&self) -> Result<()> {
729 create_table_if_not_exists_impl(&self.config, &self.param).await
730 }
731
732 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
733 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
734 if let Some(pk) = &config.primary_key {
735 let mut unique_column_ids = Vec::with_capacity(pk.len());
736 for col_name in pk {
737 let id = param
738 .columns
739 .iter()
740 .find(|col| col.name.as_str() == col_name)
741 .ok_or_else(|| {
742 SinkError::Config(anyhow!(
743 "Primary key column {} not found in sink schema",
744 col_name
745 ))
746 })?
747 .column_id
748 .get_id() as usize;
749 unique_column_ids.push(id);
750 }
751 Some(unique_column_ids)
752 } else {
753 unreachable!()
754 }
755 } else {
756 None
757 };
758 Ok(Self {
759 config,
760 param,
761 unique_column_ids,
762 })
763 }
764}
765
766impl Sink for IcebergSink {
767 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
768
769 const SINK_NAME: &'static str = ICEBERG_SINK;
770
771 async fn validate(&self) -> Result<()> {
772 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
773 bail!("Snowflake catalog only supports iceberg sources");
774 }
775
776 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
777 risingwave_common::license::Feature::IcebergSinkWithGlue
778 .check_available()
779 .map_err(|e| anyhow::anyhow!(e))?;
780 }
781
782 let compaction_type = self.config.compaction_type();
784
785 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
788 && compaction_type != CompactionType::Full
789 {
790 bail!(
791 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
792 compaction_type
793 );
794 }
795
796 match compaction_type {
797 CompactionType::SmallFiles => {
798 risingwave_common::license::Feature::IcebergCompaction
800 .check_available()
801 .map_err(|e| anyhow::anyhow!(e))?;
802
803 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
805 bail!(
806 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
807 self.config.write_mode
808 );
809 }
810
811 if self.config.delete_files_count_threshold.is_some() {
813 bail!(
814 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
815 );
816 }
817 }
818 CompactionType::FilesWithDelete => {
819 risingwave_common::license::Feature::IcebergCompaction
821 .check_available()
822 .map_err(|e| anyhow::anyhow!(e))?;
823
824 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
826 bail!(
827 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
828 self.config.write_mode
829 );
830 }
831
832 if self.config.small_files_threshold_mb.is_some() {
834 bail!(
835 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
836 );
837 }
838 }
839 CompactionType::Full => {
840 }
842 }
843
844 let _ = self.create_and_validate_table().await?;
845 Ok(())
846 }
847
848 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
849 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
850
851 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
853 if iceberg_config.enable_compaction && compaction_interval == 0 {
854 bail!(
855 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
856 );
857 }
858
859 tracing::info!(
861 "Alter config compaction_interval set to {} seconds",
862 compaction_interval
863 );
864 }
865
866 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
868 && max_snapshots < 1
869 {
870 bail!(
871 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
872 max_snapshots
873 );
874 }
875
876 Ok(())
877 }
878
879 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
880 let writer = IcebergSinkWriter::new(
881 self.config.clone(),
882 self.param.clone(),
883 writer_param.clone(),
884 self.unique_column_ids.clone(),
885 );
886
887 let commit_checkpoint_interval =
888 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
889 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
890 );
891 let log_sinker = CoordinatedLogSinker::new(
892 &writer_param,
893 self.param.clone(),
894 writer,
895 commit_checkpoint_interval,
896 )
897 .await?;
898
899 Ok(log_sinker)
900 }
901
902 fn is_coordinated_sink(&self) -> bool {
903 true
904 }
905
906 async fn new_coordinator(
907 &self,
908 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
909 ) -> Result<SinkCommitCoordinator> {
910 let catalog = self.config.create_catalog().await?;
911 let table = self.create_and_validate_table().await?;
912 let coordinator = IcebergSinkCommitter {
913 catalog,
914 table,
915 last_commit_epoch: 0,
916 sink_id: self.param.sink_id,
917 config: self.config.clone(),
918 param: self.param.clone(),
919 commit_retry_num: self.config.commit_retry_num,
920 iceberg_compact_stat_sender,
921 };
922 if self.config.is_exactly_once.unwrap_or_default() {
923 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
924 } else {
925 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
926 }
927 }
928}
929
930enum ProjectIdxVec {
937 None,
938 Prepare(usize),
939 Done(Vec<usize>),
940}
941
942type DataFileWriterBuilderType =
943 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
944type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
945 ParquetWriterBuilder,
946 DefaultLocationGenerator,
947 DefaultFileNameGenerator,
948>;
949type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
950 ParquetWriterBuilder,
951 DefaultLocationGenerator,
952 DefaultFileNameGenerator,
953>;
954
955#[derive(Clone)]
956struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
957 inner: B,
958 fanout_enabled: bool,
959 schema: IcebergSchemaRef,
960 partition_spec: PartitionSpecRef,
961 compute_partition: bool,
962}
963
964impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
965 fn new(
966 inner: B,
967 fanout_enabled: bool,
968 schema: IcebergSchemaRef,
969 partition_spec: PartitionSpecRef,
970 compute_partition: bool,
971 ) -> Self {
972 Self {
973 inner,
974 fanout_enabled,
975 schema,
976 partition_spec,
977 compute_partition,
978 }
979 }
980
981 fn build(self) -> iceberg::Result<TaskWriter<B>> {
982 let partition_splitter = match (
983 self.partition_spec.is_unpartitioned(),
984 self.compute_partition,
985 ) {
986 (true, _) => None,
987 (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
988 self.schema.clone(),
989 self.partition_spec.clone(),
990 )?),
991 (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
992 self.schema.clone(),
993 self.partition_spec.clone(),
994 )?),
995 };
996
997 Ok(TaskWriter::new_with_partition_splitter(
998 self.inner,
999 self.fanout_enabled,
1000 self.schema,
1001 self.partition_spec,
1002 partition_splitter,
1003 ))
1004 }
1005}
1006
1007pub enum IcebergSinkWriter {
1008 Created(IcebergSinkWriterArgs),
1009 Initialized(IcebergSinkWriterInner),
1010}
1011
1012pub struct IcebergSinkWriterArgs {
1013 config: IcebergConfig,
1014 sink_param: SinkParam,
1015 writer_param: SinkWriterParam,
1016 unique_column_ids: Option<Vec<usize>>,
1017}
1018
1019pub struct IcebergSinkWriterInner {
1020 writer: IcebergWriterDispatch,
1021 arrow_schema: SchemaRef,
1022 metrics: IcebergWriterMetrics,
1024 table: Table,
1026 project_idx_vec: ProjectIdxVec,
1029}
1030
1031#[allow(clippy::type_complexity)]
1032enum IcebergWriterDispatch {
1033 Append {
1034 writer: Option<Box<dyn IcebergWriter>>,
1035 writer_builder:
1036 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1037 },
1038 Upsert {
1039 writer: Option<Box<dyn IcebergWriter>>,
1040 writer_builder: TaskWriterBuilderWrapper<
1041 MonitoredGeneralWriterBuilder<
1042 DeltaWriterBuilder<
1043 DataFileWriterBuilderType,
1044 PositionDeleteFileWriterBuilderType,
1045 EqualityDeleteFileWriterBuilderType,
1046 >,
1047 >,
1048 >,
1049 arrow_schema_with_op_column: SchemaRef,
1050 },
1051}
1052
1053impl IcebergWriterDispatch {
1054 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1055 match self {
1056 IcebergWriterDispatch::Append { writer, .. }
1057 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1058 }
1059 }
1060}
1061
1062pub struct IcebergWriterMetrics {
1063 _write_qps: LabelGuardedIntCounter,
1068 _write_latency: LabelGuardedHistogram,
1069 write_bytes: LabelGuardedIntCounter,
1070}
1071
1072impl IcebergSinkWriter {
1073 pub fn new(
1074 config: IcebergConfig,
1075 sink_param: SinkParam,
1076 writer_param: SinkWriterParam,
1077 unique_column_ids: Option<Vec<usize>>,
1078 ) -> Self {
1079 Self::Created(IcebergSinkWriterArgs {
1080 config,
1081 sink_param,
1082 writer_param,
1083 unique_column_ids,
1084 })
1085 }
1086}
1087
1088impl IcebergSinkWriterInner {
1089 fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1090 let SinkWriterParam {
1091 extra_partition_col_idx,
1092 actor_id,
1093 sink_id,
1094 sink_name,
1095 ..
1096 } = writer_param;
1097 let metrics_labels = [
1098 &actor_id.to_string(),
1099 &sink_id.to_string(),
1100 sink_name.as_str(),
1101 ];
1102
1103 let write_qps = GLOBAL_SINK_METRICS
1105 .iceberg_write_qps
1106 .with_guarded_label_values(&metrics_labels);
1107 let write_latency = GLOBAL_SINK_METRICS
1108 .iceberg_write_latency
1109 .with_guarded_label_values(&metrics_labels);
1110 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1113 .iceberg_rolling_unflushed_data_file
1114 .with_guarded_label_values(&metrics_labels);
1115 let write_bytes = GLOBAL_SINK_METRICS
1116 .iceberg_write_bytes
1117 .with_guarded_label_values(&metrics_labels);
1118
1119 let schema = table.metadata().current_schema();
1120 let partition_spec = table.metadata().default_partition_spec();
1121 let fanout_enabled = !partition_spec.fields().is_empty();
1122
1123 let unique_uuid_suffix = Uuid::now_v7();
1125
1126 let parquet_writer_properties = WriterProperties::builder()
1127 .set_max_row_group_size(
1128 writer_param
1129 .streaming_config
1130 .developer
1131 .iceberg_sink_write_parquet_max_row_group_rows,
1132 )
1133 .build();
1134
1135 let parquet_writer_builder =
1136 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1137 let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1138 parquet_writer_builder,
1139 table.file_io().clone(),
1140 DefaultLocationGenerator::new(table.metadata().clone())
1141 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1142 DefaultFileNameGenerator::new(
1143 writer_param.actor_id.to_string(),
1144 Some(unique_uuid_suffix.to_string()),
1145 iceberg::spec::DataFileFormat::Parquet,
1146 ),
1147 );
1148 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1149 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1150 data_file_builder,
1151 write_qps.clone(),
1152 write_latency.clone(),
1153 );
1154 let writer_builder = TaskWriterBuilderWrapper::new(
1155 monitored_builder,
1156 fanout_enabled,
1157 schema.clone(),
1158 partition_spec.clone(),
1159 true,
1160 );
1161 let inner_writer = Some(Box::new(
1162 writer_builder
1163 .clone()
1164 .build()
1165 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1166 ) as Box<dyn IcebergWriter>);
1167 Ok(Self {
1168 arrow_schema: Arc::new(
1169 schema_to_arrow_schema(table.metadata().current_schema())
1170 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1171 ),
1172 metrics: IcebergWriterMetrics {
1173 _write_qps: write_qps,
1174 _write_latency: write_latency,
1175 write_bytes,
1176 },
1177 writer: IcebergWriterDispatch::Append {
1178 writer: inner_writer,
1179 writer_builder,
1180 },
1181 table,
1182 project_idx_vec: {
1183 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1184 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1185 } else {
1186 ProjectIdxVec::None
1187 }
1188 },
1189 })
1190 }
1191
1192 fn build_upsert(
1193 table: Table,
1194 unique_column_ids: Vec<usize>,
1195 writer_param: &SinkWriterParam,
1196 ) -> Result<Self> {
1197 let SinkWriterParam {
1198 extra_partition_col_idx,
1199 actor_id,
1200 sink_id,
1201 sink_name,
1202 ..
1203 } = writer_param;
1204 let metrics_labels = [
1205 &actor_id.to_string(),
1206 &sink_id.to_string(),
1207 sink_name.as_str(),
1208 ];
1209 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1210
1211 let write_qps = GLOBAL_SINK_METRICS
1213 .iceberg_write_qps
1214 .with_guarded_label_values(&metrics_labels);
1215 let write_latency = GLOBAL_SINK_METRICS
1216 .iceberg_write_latency
1217 .with_guarded_label_values(&metrics_labels);
1218 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1221 .iceberg_rolling_unflushed_data_file
1222 .with_guarded_label_values(&metrics_labels);
1223 let write_bytes = GLOBAL_SINK_METRICS
1224 .iceberg_write_bytes
1225 .with_guarded_label_values(&metrics_labels);
1226
1227 let schema = table.metadata().current_schema();
1229 let partition_spec = table.metadata().default_partition_spec();
1230 let fanout_enabled = !partition_spec.fields().is_empty();
1231
1232 let unique_uuid_suffix = Uuid::now_v7();
1234
1235 let parquet_writer_properties = WriterProperties::builder()
1236 .set_max_row_group_size(
1237 writer_param
1238 .streaming_config
1239 .developer
1240 .iceberg_sink_write_parquet_max_row_group_rows,
1241 )
1242 .build();
1243
1244 let data_file_builder = {
1245 let parquet_writer_builder =
1246 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1247 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1248 parquet_writer_builder,
1249 table.file_io().clone(),
1250 DefaultLocationGenerator::new(table.metadata().clone())
1251 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1252 DefaultFileNameGenerator::new(
1253 writer_param.actor_id.to_string(),
1254 Some(unique_uuid_suffix.to_string()),
1255 iceberg::spec::DataFileFormat::Parquet,
1256 ),
1257 );
1258 DataFileWriterBuilder::new(rolling_writer_builder)
1259 };
1260 let position_delete_builder = {
1261 let parquet_writer_builder = ParquetWriterBuilder::new(
1262 parquet_writer_properties.clone(),
1263 POSITION_DELETE_SCHEMA.clone().into(),
1264 );
1265 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1266 parquet_writer_builder,
1267 table.file_io().clone(),
1268 DefaultLocationGenerator::new(table.metadata().clone())
1269 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1270 DefaultFileNameGenerator::new(
1271 writer_param.actor_id.to_string(),
1272 Some(format!("pos-del-{}", unique_uuid_suffix)),
1273 iceberg::spec::DataFileFormat::Parquet,
1274 ),
1275 );
1276 PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1277 };
1278 let equality_delete_builder = {
1279 let config = EqualityDeleteWriterConfig::new(
1280 unique_column_ids.clone(),
1281 table.metadata().current_schema().clone(),
1282 )
1283 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1284 let parquet_writer_builder = ParquetWriterBuilder::new(
1285 parquet_writer_properties,
1286 Arc::new(
1287 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1288 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1289 ),
1290 );
1291 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1292 parquet_writer_builder,
1293 table.file_io().clone(),
1294 DefaultLocationGenerator::new(table.metadata().clone())
1295 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1296 DefaultFileNameGenerator::new(
1297 writer_param.actor_id.to_string(),
1298 Some(format!("eq-del-{}", unique_uuid_suffix)),
1299 iceberg::spec::DataFileFormat::Parquet,
1300 ),
1301 );
1302
1303 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1304 };
1305 let delta_builder = DeltaWriterBuilder::new(
1306 data_file_builder,
1307 position_delete_builder,
1308 equality_delete_builder,
1309 unique_column_ids,
1310 schema.clone(),
1311 );
1312 let original_arrow_schema = Arc::new(
1313 schema_to_arrow_schema(table.metadata().current_schema())
1314 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1315 );
1316 let schema_with_extra_op_column = {
1317 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1318 new_fields.push(Arc::new(ArrowField::new(
1319 "op".to_owned(),
1320 ArrowDataType::Int32,
1321 false,
1322 )));
1323 Arc::new(ArrowSchema::new(new_fields))
1324 };
1325 let writer_builder = TaskWriterBuilderWrapper::new(
1326 MonitoredGeneralWriterBuilder::new(
1327 delta_builder,
1328 write_qps.clone(),
1329 write_latency.clone(),
1330 ),
1331 fanout_enabled,
1332 schema.clone(),
1333 partition_spec.clone(),
1334 true,
1335 );
1336 let inner_writer = Some(Box::new(
1337 writer_builder
1338 .clone()
1339 .build()
1340 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1341 ) as Box<dyn IcebergWriter>);
1342 Ok(Self {
1343 arrow_schema: original_arrow_schema,
1344 metrics: IcebergWriterMetrics {
1345 _write_qps: write_qps,
1346 _write_latency: write_latency,
1347 write_bytes,
1348 },
1349 table,
1350 writer: IcebergWriterDispatch::Upsert {
1351 writer: inner_writer,
1352 writer_builder,
1353 arrow_schema_with_op_column: schema_with_extra_op_column,
1354 },
1355 project_idx_vec: {
1356 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1357 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1358 } else {
1359 ProjectIdxVec::None
1360 }
1361 },
1362 })
1363 }
1364}
1365
1366#[async_trait]
1367impl SinkWriter for IcebergSinkWriter {
1368 type CommitMetadata = Option<SinkMetadata>;
1369
1370 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1372 let Self::Created(args) = self else {
1373 return Ok(());
1374 };
1375
1376 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1377 let inner = match &args.unique_column_ids {
1378 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1379 table,
1380 unique_column_ids.clone(),
1381 &args.writer_param,
1382 )?,
1383 None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1384 };
1385
1386 *self = IcebergSinkWriter::Initialized(inner);
1387 Ok(())
1388 }
1389
1390 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1392 let Self::Initialized(inner) = self else {
1393 unreachable!("IcebergSinkWriter should be initialized before barrier");
1394 };
1395
1396 match &mut inner.writer {
1398 IcebergWriterDispatch::Append {
1399 writer,
1400 writer_builder,
1401 } => {
1402 if writer.is_none() {
1403 *writer = Some(Box::new(
1404 writer_builder
1405 .clone()
1406 .build()
1407 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1408 ));
1409 }
1410 }
1411 IcebergWriterDispatch::Upsert {
1412 writer,
1413 writer_builder,
1414 ..
1415 } => {
1416 if writer.is_none() {
1417 *writer = Some(Box::new(
1418 writer_builder
1419 .clone()
1420 .build()
1421 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1422 ));
1423 }
1424 }
1425 };
1426
1427 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1429 match &mut inner.project_idx_vec {
1430 ProjectIdxVec::None => {}
1431 ProjectIdxVec::Prepare(idx) => {
1432 if *idx >= chunk.columns().len() {
1433 return Err(SinkError::Iceberg(anyhow!(
1434 "invalid extra partition column index {}",
1435 idx
1436 )));
1437 }
1438 let project_idx_vec = (0..*idx)
1439 .chain(*idx + 1..chunk.columns().len())
1440 .collect_vec();
1441 chunk = chunk.project(&project_idx_vec);
1442 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1443 }
1444 ProjectIdxVec::Done(idx_vec) => {
1445 chunk = chunk.project(idx_vec);
1446 }
1447 }
1448 if ops.is_empty() {
1449 return Ok(());
1450 }
1451 let write_batch_size = chunk.estimated_heap_size();
1452 let batch = match &inner.writer {
1453 IcebergWriterDispatch::Append { .. } => {
1454 let filters =
1456 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1457 chunk.set_visibility(filters);
1458 IcebergArrowConvert
1459 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1460 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1461 }
1462 IcebergWriterDispatch::Upsert {
1463 arrow_schema_with_op_column,
1464 ..
1465 } => {
1466 let chunk = IcebergArrowConvert
1467 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1468 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1469 let ops = Arc::new(Int32Array::from(
1470 ops.iter()
1471 .map(|op| match op {
1472 Op::UpdateInsert | Op::Insert => INSERT_OP,
1473 Op::UpdateDelete | Op::Delete => DELETE_OP,
1474 })
1475 .collect_vec(),
1476 ));
1477 let mut columns = chunk.columns().to_vec();
1478 columns.push(ops);
1479 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1480 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1481 }
1482 };
1483
1484 let writer = inner.writer.get_writer().unwrap();
1485 writer
1486 .write(batch)
1487 .instrument_await("iceberg_write")
1488 .await
1489 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1490 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1491 Ok(())
1492 }
1493
1494 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1497 let Self::Initialized(inner) = self else {
1498 unreachable!("IcebergSinkWriter should be initialized before barrier");
1499 };
1500
1501 if !is_checkpoint {
1503 return Ok(None);
1504 }
1505
1506 let close_result = match &mut inner.writer {
1507 IcebergWriterDispatch::Append {
1508 writer,
1509 writer_builder,
1510 } => {
1511 let close_result = match writer.take() {
1512 Some(mut writer) => {
1513 Some(writer.close().instrument_await("iceberg_close").await)
1514 }
1515 _ => None,
1516 };
1517 match writer_builder.clone().build() {
1518 Ok(new_writer) => {
1519 *writer = Some(Box::new(new_writer));
1520 }
1521 _ => {
1522 warn!("Failed to build new writer after close");
1525 }
1526 }
1527 close_result
1528 }
1529 IcebergWriterDispatch::Upsert {
1530 writer,
1531 writer_builder,
1532 ..
1533 } => {
1534 let close_result = match writer.take() {
1535 Some(mut writer) => {
1536 Some(writer.close().instrument_await("iceberg_close").await)
1537 }
1538 _ => None,
1539 };
1540 match writer_builder.clone().build() {
1541 Ok(new_writer) => {
1542 *writer = Some(Box::new(new_writer));
1543 }
1544 _ => {
1545 warn!("Failed to build new writer after close");
1548 }
1549 }
1550 close_result
1551 }
1552 };
1553
1554 match close_result {
1555 Some(Ok(result)) => {
1556 let format_version = inner.table.metadata().format_version();
1557 let partition_type = inner.table.metadata().default_partition_type();
1558 let data_files = result
1559 .into_iter()
1560 .map(|f| {
1561 SerializedDataFile::try_from(f, partition_type, format_version)
1562 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1563 })
1564 .collect::<Result<Vec<_>>>()?;
1565 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1566 data_files,
1567 schema_id: inner.table.metadata().current_schema_id(),
1568 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1569 })?))
1570 }
1571 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1572 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1573 }
1574 }
1575}
1576
1577const SCHEMA_ID: &str = "schema_id";
1578const PARTITION_SPEC_ID: &str = "partition_spec_id";
1579const DATA_FILES: &str = "data_files";
1580
1581#[derive(Default, Clone)]
1582struct IcebergCommitResult {
1583 schema_id: i32,
1584 partition_spec_id: i32,
1585 data_files: Vec<SerializedDataFile>,
1586}
1587
1588impl IcebergCommitResult {
1589 fn try_from(value: &SinkMetadata) -> Result<Self> {
1590 if let Some(Serialized(v)) = &value.metadata {
1591 let mut values = if let serde_json::Value::Object(v) =
1592 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1593 .context("Can't parse iceberg sink metadata")?
1594 {
1595 v
1596 } else {
1597 bail!("iceberg sink metadata should be an object");
1598 };
1599
1600 let schema_id;
1601 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1602 schema_id = value
1603 .as_u64()
1604 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1605 } else {
1606 bail!("iceberg sink metadata should have schema_id");
1607 }
1608
1609 let partition_spec_id;
1610 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1611 partition_spec_id = value
1612 .as_u64()
1613 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1614 } else {
1615 bail!("iceberg sink metadata should have partition_spec_id");
1616 }
1617
1618 let data_files: Vec<SerializedDataFile>;
1619 if let serde_json::Value::Array(values) = values
1620 .remove(DATA_FILES)
1621 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1622 {
1623 data_files = values
1624 .into_iter()
1625 .map(from_value::<SerializedDataFile>)
1626 .collect::<std::result::Result<_, _>>()
1627 .unwrap();
1628 } else {
1629 bail!("iceberg sink metadata should have data_files object");
1630 }
1631
1632 Ok(Self {
1633 schema_id: schema_id as i32,
1634 partition_spec_id: partition_spec_id as i32,
1635 data_files,
1636 })
1637 } else {
1638 bail!("Can't create iceberg sink write result from empty data!")
1639 }
1640 }
1641
1642 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1643 let mut values = if let serde_json::Value::Object(value) =
1644 serde_json::from_slice::<serde_json::Value>(&value)
1645 .context("Can't parse iceberg sink metadata")?
1646 {
1647 value
1648 } else {
1649 bail!("iceberg sink metadata should be an object");
1650 };
1651
1652 let schema_id;
1653 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1654 schema_id = value
1655 .as_u64()
1656 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1657 } else {
1658 bail!("iceberg sink metadata should have schema_id");
1659 }
1660
1661 let partition_spec_id;
1662 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1663 partition_spec_id = value
1664 .as_u64()
1665 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1666 } else {
1667 bail!("iceberg sink metadata should have partition_spec_id");
1668 }
1669
1670 let data_files: Vec<SerializedDataFile>;
1671 if let serde_json::Value::Array(values) = values
1672 .remove(DATA_FILES)
1673 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1674 {
1675 data_files = values
1676 .into_iter()
1677 .map(from_value::<SerializedDataFile>)
1678 .collect::<std::result::Result<_, _>>()
1679 .unwrap();
1680 } else {
1681 bail!("iceberg sink metadata should have data_files object");
1682 }
1683
1684 Ok(Self {
1685 schema_id: schema_id as i32,
1686 partition_spec_id: partition_spec_id as i32,
1687 data_files,
1688 })
1689 }
1690}
1691
1692impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1693 type Error = SinkError;
1694
1695 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1696 let json_data_files = serde_json::Value::Array(
1697 value
1698 .data_files
1699 .iter()
1700 .map(serde_json::to_value)
1701 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1702 .context("Can't serialize data files to json")?,
1703 );
1704 let json_value = serde_json::Value::Object(
1705 vec![
1706 (
1707 SCHEMA_ID.to_owned(),
1708 serde_json::Value::Number(value.schema_id.into()),
1709 ),
1710 (
1711 PARTITION_SPEC_ID.to_owned(),
1712 serde_json::Value::Number(value.partition_spec_id.into()),
1713 ),
1714 (DATA_FILES.to_owned(), json_data_files),
1715 ]
1716 .into_iter()
1717 .collect(),
1718 );
1719 Ok(SinkMetadata {
1720 metadata: Some(Serialized(SerializedMetadata {
1721 metadata: serde_json::to_vec(&json_value)
1722 .context("Can't serialize iceberg sink metadata")?,
1723 })),
1724 })
1725 }
1726}
1727
1728impl TryFrom<IcebergCommitResult> for Vec<u8> {
1729 type Error = SinkError;
1730
1731 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1732 let json_data_files = serde_json::Value::Array(
1733 value
1734 .data_files
1735 .iter()
1736 .map(serde_json::to_value)
1737 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1738 .context("Can't serialize data files to json")?,
1739 );
1740 let json_value = serde_json::Value::Object(
1741 vec![
1742 (
1743 SCHEMA_ID.to_owned(),
1744 serde_json::Value::Number(value.schema_id.into()),
1745 ),
1746 (
1747 PARTITION_SPEC_ID.to_owned(),
1748 serde_json::Value::Number(value.partition_spec_id.into()),
1749 ),
1750 (DATA_FILES.to_owned(), json_data_files),
1751 ]
1752 .into_iter()
1753 .collect(),
1754 );
1755 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1756 }
1757}
1758pub struct IcebergSinkCommitter {
1759 catalog: Arc<dyn Catalog>,
1760 table: Table,
1761 pub last_commit_epoch: u64,
1762 pub(crate) sink_id: SinkId,
1763 pub(crate) config: IcebergConfig,
1764 pub(crate) param: SinkParam,
1765 commit_retry_num: u32,
1766 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1767}
1768
1769impl IcebergSinkCommitter {
1770 async fn reload_table(
1773 catalog: &dyn Catalog,
1774 table_ident: &TableIdent,
1775 schema_id: i32,
1776 partition_spec_id: i32,
1777 ) -> Result<Table> {
1778 let table = catalog
1779 .load_table(table_ident)
1780 .await
1781 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1782 if table.metadata().current_schema_id() != schema_id {
1783 return Err(SinkError::Iceberg(anyhow!(
1784 "Schema evolution not supported, expect schema id {}, but got {}",
1785 schema_id,
1786 table.metadata().current_schema_id()
1787 )));
1788 }
1789 if table.metadata().default_partition_spec_id() != partition_spec_id {
1790 return Err(SinkError::Iceberg(anyhow!(
1791 "Partition evolution not supported, expect partition spec id {}, but got {}",
1792 partition_spec_id,
1793 table.metadata().default_partition_spec_id()
1794 )));
1795 }
1796 Ok(table)
1797 }
1798}
1799
1800#[async_trait]
1801impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1802 async fn init(&mut self) -> Result<()> {
1803 tracing::info!(
1804 "Sink id = {}: iceberg sink coordinator initing.",
1805 self.param.sink_id
1806 );
1807
1808 Ok(())
1809 }
1810
1811 async fn commit(
1812 &mut self,
1813 epoch: u64,
1814 metadata: Vec<SinkMetadata>,
1815 add_columns: Option<Vec<Field>>,
1816 ) -> Result<()> {
1817 tracing::info!("Starting iceberg direct commit in epoch {epoch}");
1818
1819 let (write_results, snapshot_id) =
1820 match self.pre_commit_inner(epoch, metadata, add_columns)? {
1821 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1822 None => {
1823 tracing::debug!(?epoch, "no data to commit");
1824 return Ok(());
1825 }
1826 };
1827
1828 self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1829 .await
1830 }
1831}
1832
1833#[async_trait]
1834impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1835 async fn init(&mut self) -> Result<()> {
1836 tracing::info!(
1837 "Sink id = {}: iceberg sink coordinator initing.",
1838 self.param.sink_id
1839 );
1840
1841 Ok(())
1842 }
1843
1844 async fn pre_commit(
1845 &mut self,
1846 epoch: u64,
1847 metadata: Vec<SinkMetadata>,
1848 add_columns: Option<Vec<Field>>,
1849 ) -> Result<Vec<u8>> {
1850 tracing::info!("Starting iceberg pre commit in epoch {epoch}");
1851
1852 let (write_results, snapshot_id) =
1853 match self.pre_commit_inner(epoch, metadata, add_columns)? {
1854 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1855 None => {
1856 tracing::debug!(?epoch, "no data to commit");
1857 return Ok(vec![]);
1858 }
1859 };
1860
1861 let mut write_results_bytes = Vec::new();
1862 for each_parallelism_write_result in write_results {
1863 let each_parallelism_write_result_bytes: Vec<u8> =
1864 each_parallelism_write_result.try_into()?;
1865 write_results_bytes.push(each_parallelism_write_result_bytes);
1866 }
1867
1868 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1869 write_results_bytes.push(snapshot_id_bytes);
1870
1871 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1872 Ok(pre_commit_metadata_bytes)
1873 }
1874
1875 async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1876 tracing::info!("Starting iceberg commit in epoch {epoch}");
1877 if commit_metadata.is_empty() {
1878 tracing::debug!(?epoch, "no data to commit");
1879 return Ok(());
1880 }
1881
1882 let mut write_results_bytes = deserialize_metadata(commit_metadata);
1883
1884 let snapshot_id_bytes = write_results_bytes.pop().unwrap();
1885 let snapshot_id = i64::from_le_bytes(
1886 snapshot_id_bytes
1887 .try_into()
1888 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1889 );
1890
1891 if self
1892 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1893 .await?
1894 {
1895 tracing::info!(
1896 "Snapshot id {} already committed in iceberg table, skip committing again.",
1897 snapshot_id
1898 );
1899 return Ok(());
1900 }
1901
1902 let mut write_results = vec![];
1903 for each in write_results_bytes {
1904 let write_result = IcebergCommitResult::try_from_serialized_bytes(each)?;
1905 write_results.push(write_result);
1906 }
1907
1908 self.commit_iceberg_inner(epoch, write_results, snapshot_id)
1909 .await?;
1910
1911 Ok(())
1912 }
1913
1914 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
1915 tracing::debug!("Abort not implemented yet");
1917 }
1918}
1919
1920impl IcebergSinkCommitter {
1922 fn pre_commit_inner(
1923 &mut self,
1924 _epoch: u64,
1925 metadata: Vec<SinkMetadata>,
1926 add_columns: Option<Vec<Field>>,
1927 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
1928 if let Some(add_columns) = add_columns {
1929 return Err(anyhow!(
1930 "Iceberg sink not support add columns, but got: {:?}",
1931 add_columns
1932 )
1933 .into());
1934 }
1935
1936 let write_results: Vec<IcebergCommitResult> = metadata
1937 .iter()
1938 .map(IcebergCommitResult::try_from)
1939 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1940
1941 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1943 return Ok(None);
1944 }
1945
1946 let expect_schema_id = write_results[0].schema_id;
1947 let expect_partition_spec_id = write_results[0].partition_spec_id;
1948
1949 if write_results
1951 .iter()
1952 .any(|r| r.schema_id != expect_schema_id)
1953 || write_results
1954 .iter()
1955 .any(|r| r.partition_spec_id != expect_partition_spec_id)
1956 {
1957 return Err(SinkError::Iceberg(anyhow!(
1958 "schema_id and partition_spec_id should be the same in all write results"
1959 )));
1960 }
1961
1962 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
1963
1964 Ok(Some((write_results, snapshot_id)))
1965 }
1966
1967 async fn commit_iceberg_inner(
1968 &mut self,
1969 epoch: u64,
1970 write_results: Vec<IcebergCommitResult>,
1971 snapshot_id: i64,
1972 ) -> Result<()> {
1973 assert!(
1975 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
1976 );
1977
1978 self.wait_for_snapshot_limit().await?;
1980
1981 let expect_schema_id = write_results[0].schema_id;
1982 let expect_partition_spec_id = write_results[0].partition_spec_id;
1983
1984 self.table = Self::reload_table(
1986 self.catalog.as_ref(),
1987 self.table.identifier(),
1988 expect_schema_id,
1989 expect_partition_spec_id,
1990 )
1991 .await?;
1992
1993 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1994 return Err(SinkError::Iceberg(anyhow!(
1995 "Can't find schema by id {}",
1996 expect_schema_id
1997 )));
1998 };
1999 let Some(partition_spec) = self
2000 .table
2001 .metadata()
2002 .partition_spec_by_id(expect_partition_spec_id)
2003 else {
2004 return Err(SinkError::Iceberg(anyhow!(
2005 "Can't find partition spec by id {}",
2006 expect_partition_spec_id
2007 )));
2008 };
2009 let partition_type = partition_spec
2010 .as_ref()
2011 .clone()
2012 .partition_type(schema)
2013 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2014
2015 let data_files = write_results
2016 .into_iter()
2017 .flat_map(|r| {
2018 r.data_files.into_iter().map(|f| {
2019 f.try_into(expect_partition_spec_id, &partition_type, schema)
2020 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2021 })
2022 })
2023 .collect::<Result<Vec<DataFile>>>()?;
2024 let retry_strategy = ExponentialBackoff::from_millis(10)
2029 .max_delay(Duration::from_secs(60))
2030 .map(jitter)
2031 .take(self.commit_retry_num as usize);
2032 let catalog = self.catalog.clone();
2033 let table_ident = self.table.identifier().clone();
2034 let table = Retry::spawn(retry_strategy, || async {
2035 let table = Self::reload_table(
2036 catalog.as_ref(),
2037 &table_ident,
2038 expect_schema_id,
2039 expect_partition_spec_id,
2040 )
2041 .await?;
2042 let txn = Transaction::new(&table);
2043 let append_action = txn
2044 .fast_append()
2045 .set_snapshot_id(snapshot_id)
2046 .set_target_branch(commit_branch(
2047 self.config.r#type.as_str(),
2048 self.config.write_mode,
2049 ))
2050 .add_data_files(data_files.clone());
2051
2052 let tx = append_action.apply(txn).map_err(|err| {
2053 let err: IcebergError = err.into();
2054 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2055 SinkError::Iceberg(anyhow!(err))
2056 })?;
2057 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2058 let err: IcebergError = err.into();
2059 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2060 SinkError::Iceberg(anyhow!(err))
2061 })
2062 })
2063 .await?;
2064 self.table = table;
2065
2066 let snapshot_num = self.table.metadata().snapshots().count();
2067 let catalog_name = self.config.common.catalog_name();
2068 let table_name = self.table.identifier().to_string();
2069 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2070 GLOBAL_SINK_METRICS
2071 .iceberg_snapshot_num
2072 .with_guarded_label_values(&metrics_labels)
2073 .set(snapshot_num as i64);
2074
2075 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2076
2077 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2078 && self.config.enable_compaction
2079 && iceberg_compact_stat_sender
2080 .send(IcebergSinkCompactionUpdate {
2081 sink_id: self.sink_id,
2082 compaction_interval: self.config.compaction_interval_sec(),
2083 force_compaction: false,
2084 })
2085 .is_err()
2086 {
2087 warn!("failed to send iceberg compaction stats");
2088 }
2089
2090 Ok(())
2091 }
2092
2093 async fn is_snapshot_id_in_iceberg(
2097 &self,
2098 iceberg_config: &IcebergConfig,
2099 snapshot_id: i64,
2100 ) -> Result<bool> {
2101 let table = iceberg_config.load_table().await?;
2102 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2103 Ok(true)
2104 } else {
2105 Ok(false)
2106 }
2107 }
2108
2109 fn count_snapshots_since_rewrite(&self) -> usize {
2112 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2113 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2114
2115 let mut count = 0;
2117 for snapshot in snapshots {
2118 let summary = snapshot.summary();
2120 match &summary.operation {
2121 Operation::Replace => {
2122 break;
2124 }
2125
2126 _ => {
2127 count += 1;
2129 }
2130 }
2131 }
2132
2133 count
2134 }
2135
2136 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2138 if !self.config.enable_compaction {
2139 return Ok(());
2140 }
2141
2142 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2143 loop {
2144 let current_count = self.count_snapshots_since_rewrite();
2145
2146 if current_count < max_snapshots {
2147 tracing::info!(
2148 "Snapshot count check passed: {} < {}",
2149 current_count,
2150 max_snapshots
2151 );
2152 break;
2153 }
2154
2155 tracing::info!(
2156 "Snapshot count {} exceeds limit {}, waiting...",
2157 current_count,
2158 max_snapshots
2159 );
2160
2161 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2162 && iceberg_compact_stat_sender
2163 .send(IcebergSinkCompactionUpdate {
2164 sink_id: self.sink_id,
2165 compaction_interval: self.config.compaction_interval_sec(),
2166 force_compaction: true,
2167 })
2168 .is_err()
2169 {
2170 tracing::warn!("failed to send iceberg compaction stats");
2171 }
2172
2173 tokio::time::sleep(Duration::from_secs(30)).await;
2175
2176 self.table = self.config.load_table().await?;
2178 }
2179 }
2180 Ok(())
2181 }
2182}
2183
2184const MAP_KEY: &str = "key";
2185const MAP_VALUE: &str = "value";
2186
2187fn get_fields<'a>(
2188 our_field_type: &'a risingwave_common::types::DataType,
2189 data_type: &ArrowDataType,
2190 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2191) -> Option<ArrowFields> {
2192 match data_type {
2193 ArrowDataType::Struct(fields) => {
2194 match our_field_type {
2195 risingwave_common::types::DataType::Struct(struct_fields) => {
2196 struct_fields.iter().for_each(|(name, data_type)| {
2197 let res = schema_fields.insert(name, data_type);
2198 assert!(res.is_none())
2200 });
2201 }
2202 risingwave_common::types::DataType::Map(map_fields) => {
2203 schema_fields.insert(MAP_KEY, map_fields.key());
2204 schema_fields.insert(MAP_VALUE, map_fields.value());
2205 }
2206 risingwave_common::types::DataType::List(list) => {
2207 list.elem()
2208 .as_struct()
2209 .iter()
2210 .for_each(|(name, data_type)| {
2211 let res = schema_fields.insert(name, data_type);
2212 assert!(res.is_none())
2214 });
2215 }
2216 _ => {}
2217 };
2218 Some(fields.clone())
2219 }
2220 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2221 get_fields(our_field_type, field.data_type(), schema_fields)
2222 }
2223 _ => None, }
2225}
2226
2227fn check_compatibility(
2228 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2229 fields: &ArrowFields,
2230) -> anyhow::Result<bool> {
2231 for arrow_field in fields {
2232 let our_field_type = schema_fields
2233 .get(arrow_field.name().as_str())
2234 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2235
2236 let converted_arrow_data_type = IcebergArrowConvert
2238 .to_arrow_field("", our_field_type)
2239 .map_err(|e| anyhow!(e))?
2240 .data_type()
2241 .clone();
2242
2243 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2244 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2245 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2246 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2247 (ArrowDataType::List(_), ArrowDataType::List(field))
2248 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2249 let mut schema_fields = HashMap::new();
2250 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2251 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2252 }
2253 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2255 let mut schema_fields = HashMap::new();
2256 our_field_type
2257 .as_struct()
2258 .iter()
2259 .for_each(|(name, data_type)| {
2260 let res = schema_fields.insert(name, data_type);
2261 assert!(res.is_none())
2263 });
2264 check_compatibility(schema_fields, fields)?
2265 }
2266 (left, right) => left.equals_datatype(right),
2274 };
2275 if !compatible {
2276 bail!(
2277 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2278 arrow_field.name(),
2279 converted_arrow_data_type,
2280 arrow_field.data_type()
2281 );
2282 }
2283 }
2284 Ok(true)
2285}
2286
2287pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2289 if rw_schema.fields.len() != arrow_schema.fields().len() {
2290 bail!(
2291 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2292 rw_schema.fields.len(),
2293 arrow_schema.fields.len()
2294 );
2295 }
2296
2297 let mut schema_fields = HashMap::new();
2298 rw_schema.fields.iter().for_each(|field| {
2299 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2300 assert!(res.is_none())
2302 });
2303
2304 check_compatibility(schema_fields, &arrow_schema.fields)?;
2305 Ok(())
2306}
2307
2308pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2309 serde_json::to_vec(&metadata).unwrap()
2310}
2311
2312pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2313 serde_json::from_slice(&bytes).unwrap()
2314}
2315
2316pub fn parse_partition_by_exprs(
2317 expr: String,
2318) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2319 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2321 if !re.is_match(&expr) {
2322 bail!(format!(
2323 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2324 expr
2325 ))
2326 }
2327 let caps = re.captures_iter(&expr);
2328
2329 let mut partition_columns = vec![];
2330
2331 for mat in caps {
2332 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2333 (&mat["transform"], Transform::Identity)
2334 } else {
2335 let mut func = mat["transform"].to_owned();
2336 if func == "bucket" || func == "truncate" {
2337 let n = &mat
2338 .name("n")
2339 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2340 .as_str();
2341 func = format!("{func}[{n}]");
2342 }
2343 (
2344 &mat["field"],
2345 Transform::from_str(&func)
2346 .with_context(|| format!("invalid transform function {}", func))?,
2347 )
2348 };
2349 partition_columns.push((column.to_owned(), transform));
2350 }
2351 Ok(partition_columns)
2352}
2353
2354pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2355 if should_enable_iceberg_cow(sink_type, write_mode) {
2356 ICEBERG_COW_BRANCH.to_owned()
2357 } else {
2358 MAIN_BRANCH.to_owned()
2359 }
2360}
2361
2362pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2363 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2364}
2365
2366impl crate::with_options::WithOptions for IcebergWriteMode {}
2367
2368impl crate::with_options::WithOptions for CompactionType {}
2369
2370#[cfg(test)]
2371mod test {
2372 use std::collections::BTreeMap;
2373
2374 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2375 use risingwave_common::types::{DataType, MapType, StructType};
2376
2377 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2378 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2379 use crate::sink::iceberg::{
2380 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2381 ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2382 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2383 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2384 };
2385
2386 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2389 fn test_compatible_arrow_schema() {
2390 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2391
2392 use super::*;
2393 let risingwave_schema = Schema::new(vec![
2394 Field::with_name(DataType::Int32, "a"),
2395 Field::with_name(DataType::Int32, "b"),
2396 Field::with_name(DataType::Int32, "c"),
2397 ]);
2398 let arrow_schema = ArrowSchema::new(vec![
2399 ArrowField::new("a", ArrowDataType::Int32, false),
2400 ArrowField::new("b", ArrowDataType::Int32, false),
2401 ArrowField::new("c", ArrowDataType::Int32, false),
2402 ]);
2403
2404 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2405
2406 let risingwave_schema = Schema::new(vec![
2407 Field::with_name(DataType::Int32, "d"),
2408 Field::with_name(DataType::Int32, "c"),
2409 Field::with_name(DataType::Int32, "a"),
2410 Field::with_name(DataType::Int32, "b"),
2411 ]);
2412 let arrow_schema = ArrowSchema::new(vec![
2413 ArrowField::new("a", ArrowDataType::Int32, false),
2414 ArrowField::new("b", ArrowDataType::Int32, false),
2415 ArrowField::new("d", ArrowDataType::Int32, false),
2416 ArrowField::new("c", ArrowDataType::Int32, false),
2417 ]);
2418 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2419
2420 let risingwave_schema = Schema::new(vec![
2421 Field::with_name(
2422 DataType::Struct(StructType::new(vec![
2423 ("a1", DataType::Int32),
2424 (
2425 "a2",
2426 DataType::Struct(StructType::new(vec![
2427 ("a21", DataType::Bytea),
2428 (
2429 "a22",
2430 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2431 ),
2432 ])),
2433 ),
2434 ])),
2435 "a",
2436 ),
2437 Field::with_name(
2438 DataType::list(DataType::Struct(StructType::new(vec![
2439 ("b1", DataType::Int32),
2440 ("b2", DataType::Bytea),
2441 (
2442 "b3",
2443 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2444 ),
2445 ]))),
2446 "b",
2447 ),
2448 Field::with_name(
2449 DataType::Map(MapType::from_kv(
2450 DataType::Varchar,
2451 DataType::list(DataType::Struct(StructType::new([
2452 ("c1", DataType::Int32),
2453 ("c2", DataType::Bytea),
2454 (
2455 "c3",
2456 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2457 ),
2458 ]))),
2459 )),
2460 "c",
2461 ),
2462 ]);
2463 let arrow_schema = ArrowSchema::new(vec![
2464 ArrowField::new(
2465 "a",
2466 ArrowDataType::Struct(ArrowFields::from(vec![
2467 ArrowField::new("a1", ArrowDataType::Int32, false),
2468 ArrowField::new(
2469 "a2",
2470 ArrowDataType::Struct(ArrowFields::from(vec![
2471 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2472 ArrowField::new_map(
2473 "a22",
2474 "entries",
2475 ArrowFieldRef::new(ArrowField::new(
2476 "key",
2477 ArrowDataType::Utf8,
2478 false,
2479 )),
2480 ArrowFieldRef::new(ArrowField::new(
2481 "value",
2482 ArrowDataType::Utf8,
2483 false,
2484 )),
2485 false,
2486 false,
2487 ),
2488 ])),
2489 false,
2490 ),
2491 ])),
2492 false,
2493 ),
2494 ArrowField::new(
2495 "b",
2496 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2497 ArrowDataType::Struct(ArrowFields::from(vec![
2498 ArrowField::new("b1", ArrowDataType::Int32, false),
2499 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2500 ArrowField::new_map(
2501 "b3",
2502 "entries",
2503 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2504 ArrowFieldRef::new(ArrowField::new(
2505 "value",
2506 ArrowDataType::Utf8,
2507 false,
2508 )),
2509 false,
2510 false,
2511 ),
2512 ])),
2513 false,
2514 ))),
2515 false,
2516 ),
2517 ArrowField::new_map(
2518 "c",
2519 "entries",
2520 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2521 ArrowFieldRef::new(ArrowField::new(
2522 "value",
2523 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2524 ArrowDataType::Struct(ArrowFields::from(vec![
2525 ArrowField::new("c1", ArrowDataType::Int32, false),
2526 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2527 ArrowField::new_map(
2528 "c3",
2529 "entries",
2530 ArrowFieldRef::new(ArrowField::new(
2531 "key",
2532 ArrowDataType::Utf8,
2533 false,
2534 )),
2535 ArrowFieldRef::new(ArrowField::new(
2536 "value",
2537 ArrowDataType::Utf8,
2538 false,
2539 )),
2540 false,
2541 false,
2542 ),
2543 ])),
2544 false,
2545 ))),
2546 false,
2547 )),
2548 false,
2549 false,
2550 ),
2551 ]);
2552 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2553 }
2554
2555 #[test]
2556 fn test_parse_iceberg_config() {
2557 let values = [
2558 ("connector", "iceberg"),
2559 ("type", "upsert"),
2560 ("primary_key", "v1"),
2561 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2562 ("warehouse.path", "s3://iceberg"),
2563 ("s3.endpoint", "http://127.0.0.1:9301"),
2564 ("s3.access.key", "hummockadmin"),
2565 ("s3.secret.key", "hummockadmin"),
2566 ("s3.path.style.access", "true"),
2567 ("s3.region", "us-east-1"),
2568 ("catalog.type", "jdbc"),
2569 ("catalog.name", "demo"),
2570 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2571 ("catalog.jdbc.user", "admin"),
2572 ("catalog.jdbc.password", "123456"),
2573 ("database.name", "demo_db"),
2574 ("table.name", "demo_table"),
2575 ("enable_compaction", "true"),
2576 ("compaction_interval_sec", "1800"),
2577 ("enable_snapshot_expiration", "true"),
2578 ]
2579 .into_iter()
2580 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2581 .collect();
2582
2583 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2584
2585 let expected_iceberg_config = IcebergConfig {
2586 common: IcebergCommon {
2587 warehouse_path: Some("s3://iceberg".to_owned()),
2588 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2589 s3_region: Some("us-east-1".to_owned()),
2590 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2591 s3_access_key: Some("hummockadmin".to_owned()),
2592 s3_secret_key: Some("hummockadmin".to_owned()),
2593 s3_iam_role_arn: None,
2594 gcs_credential: None,
2595 catalog_type: Some("jdbc".to_owned()),
2596 glue_id: None,
2597 glue_region: None,
2598 glue_access_key: None,
2599 glue_secret_key: None,
2600 glue_iam_role_arn: None,
2601 catalog_name: Some("demo".to_owned()),
2602 s3_path_style_access: Some(true),
2603 catalog_credential: None,
2604 catalog_oauth2_server_uri: None,
2605 catalog_scope: None,
2606 catalog_token: None,
2607 enable_config_load: None,
2608 rest_signing_name: None,
2609 rest_signing_region: None,
2610 rest_sigv4_enabled: None,
2611 hosted_catalog: None,
2612 azblob_account_name: None,
2613 azblob_account_key: None,
2614 azblob_endpoint_url: None,
2615 catalog_header: None,
2616 adlsgen2_account_name: None,
2617 adlsgen2_account_key: None,
2618 adlsgen2_endpoint: None,
2619 vended_credentials: None,
2620 },
2621 table: IcebergTableIdentifier {
2622 database_name: Some("demo_db".to_owned()),
2623 table_name: "demo_table".to_owned(),
2624 },
2625 r#type: "upsert".to_owned(),
2626 force_append_only: false,
2627 primary_key: Some(vec!["v1".to_owned()]),
2628 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2629 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2630 .into_iter()
2631 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2632 .collect(),
2633 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2634 create_table_if_not_exists: false,
2635 is_exactly_once: Some(true),
2636 commit_retry_num: 8,
2637 enable_compaction: true,
2638 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2639 enable_snapshot_expiration: true,
2640 write_mode: IcebergWriteMode::MergeOnRead,
2641 snapshot_expiration_max_age_millis: None,
2642 snapshot_expiration_retain_last: None,
2643 snapshot_expiration_clear_expired_files: true,
2644 snapshot_expiration_clear_expired_meta_data: true,
2645 max_snapshots_num_before_compaction: None,
2646 small_files_threshold_mb: None,
2647 delete_files_count_threshold: None,
2648 trigger_snapshot_count: None,
2649 target_file_size_mb: None,
2650 compaction_type: None,
2651 };
2652
2653 assert_eq!(iceberg_config, expected_iceberg_config);
2654
2655 assert_eq!(
2656 &iceberg_config.full_table_name().unwrap().to_string(),
2657 "demo_db.demo_table"
2658 );
2659 }
2660
2661 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2662 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2663
2664 let _table = iceberg_config.load_table().await.unwrap();
2665 }
2666
2667 #[tokio::test]
2668 #[ignore]
2669 async fn test_storage_catalog() {
2670 let values = [
2671 ("connector", "iceberg"),
2672 ("type", "append-only"),
2673 ("force_append_only", "true"),
2674 ("s3.endpoint", "http://127.0.0.1:9301"),
2675 ("s3.access.key", "hummockadmin"),
2676 ("s3.secret.key", "hummockadmin"),
2677 ("s3.region", "us-east-1"),
2678 ("s3.path.style.access", "true"),
2679 ("catalog.name", "demo"),
2680 ("catalog.type", "storage"),
2681 ("warehouse.path", "s3://icebergdata/demo"),
2682 ("database.name", "s1"),
2683 ("table.name", "t1"),
2684 ]
2685 .into_iter()
2686 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2687 .collect();
2688
2689 test_create_catalog(values).await;
2690 }
2691
2692 #[tokio::test]
2693 #[ignore]
2694 async fn test_rest_catalog() {
2695 let values = [
2696 ("connector", "iceberg"),
2697 ("type", "append-only"),
2698 ("force_append_only", "true"),
2699 ("s3.endpoint", "http://127.0.0.1:9301"),
2700 ("s3.access.key", "hummockadmin"),
2701 ("s3.secret.key", "hummockadmin"),
2702 ("s3.region", "us-east-1"),
2703 ("s3.path.style.access", "true"),
2704 ("catalog.name", "demo"),
2705 ("catalog.type", "rest"),
2706 ("catalog.uri", "http://192.168.167.4:8181"),
2707 ("warehouse.path", "s3://icebergdata/demo"),
2708 ("database.name", "s1"),
2709 ("table.name", "t1"),
2710 ]
2711 .into_iter()
2712 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2713 .collect();
2714
2715 test_create_catalog(values).await;
2716 }
2717
2718 #[tokio::test]
2719 #[ignore]
2720 async fn test_jdbc_catalog() {
2721 let values = [
2722 ("connector", "iceberg"),
2723 ("type", "append-only"),
2724 ("force_append_only", "true"),
2725 ("s3.endpoint", "http://127.0.0.1:9301"),
2726 ("s3.access.key", "hummockadmin"),
2727 ("s3.secret.key", "hummockadmin"),
2728 ("s3.region", "us-east-1"),
2729 ("s3.path.style.access", "true"),
2730 ("catalog.name", "demo"),
2731 ("catalog.type", "jdbc"),
2732 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2733 ("catalog.jdbc.user", "admin"),
2734 ("catalog.jdbc.password", "123456"),
2735 ("warehouse.path", "s3://icebergdata/demo"),
2736 ("database.name", "s1"),
2737 ("table.name", "t1"),
2738 ]
2739 .into_iter()
2740 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2741 .collect();
2742
2743 test_create_catalog(values).await;
2744 }
2745
2746 #[tokio::test]
2747 #[ignore]
2748 async fn test_hive_catalog() {
2749 let values = [
2750 ("connector", "iceberg"),
2751 ("type", "append-only"),
2752 ("force_append_only", "true"),
2753 ("s3.endpoint", "http://127.0.0.1:9301"),
2754 ("s3.access.key", "hummockadmin"),
2755 ("s3.secret.key", "hummockadmin"),
2756 ("s3.region", "us-east-1"),
2757 ("s3.path.style.access", "true"),
2758 ("catalog.name", "demo"),
2759 ("catalog.type", "hive"),
2760 ("catalog.uri", "thrift://localhost:9083"),
2761 ("warehouse.path", "s3://icebergdata/demo"),
2762 ("database.name", "s1"),
2763 ("table.name", "t1"),
2764 ]
2765 .into_iter()
2766 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2767 .collect();
2768
2769 test_create_catalog(values).await;
2770 }
2771
2772 #[test]
2773 fn test_config_constants_consistency() {
2774 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2777 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2778 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2779 assert_eq!(WRITE_MODE, "write_mode");
2780 assert_eq!(
2781 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2782 "snapshot_expiration_retain_last"
2783 );
2784 assert_eq!(
2785 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2786 "snapshot_expiration_max_age_millis"
2787 );
2788 assert_eq!(
2789 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2790 "snapshot_expiration_clear_expired_files"
2791 );
2792 assert_eq!(
2793 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2794 "snapshot_expiration_clear_expired_meta_data"
2795 );
2796 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2797 }
2798
2799 #[test]
2800 fn test_parse_iceberg_compaction_config() {
2801 let values = [
2803 ("connector", "iceberg"),
2804 ("type", "upsert"),
2805 ("primary_key", "id"),
2806 ("warehouse.path", "s3://iceberg"),
2807 ("s3.endpoint", "http://127.0.0.1:9301"),
2808 ("s3.access.key", "test"),
2809 ("s3.secret.key", "test"),
2810 ("s3.region", "us-east-1"),
2811 ("catalog.type", "storage"),
2812 ("catalog.name", "demo"),
2813 ("database.name", "test_db"),
2814 ("table.name", "test_table"),
2815 ("enable_compaction", "true"),
2816 ("compaction.max_snapshots_num", "100"),
2817 ("compaction.small_files_threshold_mb", "512"),
2818 ("compaction.delete_files_count_threshold", "50"),
2819 ("compaction.trigger_snapshot_count", "10"),
2820 ("compaction.target_file_size_mb", "256"),
2821 ("compaction.type", "full"),
2822 ]
2823 .into_iter()
2824 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2825 .collect();
2826
2827 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2828
2829 assert!(iceberg_config.enable_compaction);
2831 assert_eq!(
2832 iceberg_config.max_snapshots_num_before_compaction,
2833 Some(100)
2834 );
2835 assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2836 assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2837 assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2838 assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2839 assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
2840 }
2841}