1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31 SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40 POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45 DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58 Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use serde::{Deserialize, Serialize};
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::RetryIf;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
84use super::{
85 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86 SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::writer::SinkWriter;
93use crate::sink::{
94 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
95 TwoPhaseCommitCoordinator,
96};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100
101pub const ICEBERG_COW_BRANCH: &str = "ingestion";
102pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
103pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
104pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
105pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
106pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
107
108pub const PARTITION_DATA_ID_START: i32 = 1000;
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
111#[serde(rename_all = "kebab-case")]
112pub enum IcebergWriteMode {
113 #[default]
114 MergeOnRead,
115 CopyOnWrite,
116}
117
118impl IcebergWriteMode {
119 pub fn as_str(self) -> &'static str {
120 match self {
121 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
122 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
123 }
124 }
125}
126
127impl std::str::FromStr for IcebergWriteMode {
128 type Err = SinkError;
129
130 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
131 match s {
132 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
133 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
134 _ => Err(SinkError::Config(anyhow!(format!(
135 "invalid write_mode: {}, must be one of: {}, {}",
136 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
137 )))),
138 }
139 }
140}
141
142impl TryFrom<&str> for IcebergWriteMode {
143 type Error = <Self as std::str::FromStr>::Err;
144
145 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
146 value.parse()
147 }
148}
149
150impl TryFrom<String> for IcebergWriteMode {
151 type Error = <Self as std::str::FromStr>::Err;
152
153 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
154 value.as_str().parse()
155 }
156}
157
158impl std::fmt::Display for IcebergWriteMode {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.write_str(self.as_str())
161 }
162}
163
164pub const ENABLE_COMPACTION: &str = "enable_compaction";
166pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
167pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
168pub const WRITE_MODE: &str = "write_mode";
169pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
170pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
171pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
172pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
173 "snapshot_expiration_clear_expired_meta_data";
174pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
175
176pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
177
178pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
179
180pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
181
182pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
183
184pub const COMPACTION_TYPE: &str = "compaction.type";
185
186fn default_commit_retry_num() -> u32 {
187 8
188}
189
190fn default_iceberg_write_mode() -> IcebergWriteMode {
191 IcebergWriteMode::MergeOnRead
192}
193
194fn default_true() -> bool {
195 true
196}
197
198fn default_some_true() -> Option<bool> {
199 Some(true)
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
204#[serde(rename_all = "kebab-case")]
205pub enum CompactionType {
206 #[default]
208 Full,
209 SmallFiles,
211 FilesWithDelete,
213}
214
215impl CompactionType {
216 pub fn as_str(&self) -> &'static str {
217 match self {
218 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
219 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
220 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
221 }
222 }
223}
224
225impl std::str::FromStr for CompactionType {
226 type Err = SinkError;
227
228 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
229 match s {
230 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
231 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
232 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
233 _ => Err(SinkError::Config(anyhow!(format!(
234 "invalid compaction_type: {}, must be one of: {}, {}, {}",
235 s,
236 ICEBERG_COMPACTION_TYPE_FULL,
237 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
238 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
239 )))),
240 }
241 }
242}
243
244impl TryFrom<&str> for CompactionType {
245 type Error = <Self as std::str::FromStr>::Err;
246
247 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
248 value.parse()
249 }
250}
251
252impl TryFrom<String> for CompactionType {
253 type Error = <Self as std::str::FromStr>::Err;
254
255 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
256 value.as_str().parse()
257 }
258}
259
260impl std::fmt::Display for CompactionType {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 write!(f, "{}", self.as_str())
263 }
264}
265
266#[serde_as]
267#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
268pub struct IcebergConfig {
269 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
272 pub force_append_only: bool,
273
274 #[serde(flatten)]
275 common: IcebergCommon,
276
277 #[serde(flatten)]
278 table: IcebergTableIdentifier,
279
280 #[serde(
281 rename = "primary_key",
282 default,
283 deserialize_with = "deserialize_optional_string_seq_from_string"
284 )]
285 pub primary_key: Option<Vec<String>>,
286
287 #[serde(skip)]
289 pub java_catalog_props: HashMap<String, String>,
290
291 #[serde(default)]
292 pub partition_by: Option<String>,
293
294 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
296 #[serde_as(as = "DisplayFromStr")]
297 #[with_option(allow_alter_on_fly)]
298 pub commit_checkpoint_interval: u64,
299
300 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
301 pub create_table_if_not_exists: bool,
302
303 #[serde(default = "default_some_true")]
305 #[serde_as(as = "Option<DisplayFromStr>")]
306 pub is_exactly_once: Option<bool>,
307 #[serde(default = "default_commit_retry_num")]
312 pub commit_retry_num: u32,
313
314 #[serde(
316 rename = "enable_compaction",
317 default,
318 deserialize_with = "deserialize_bool_from_string"
319 )]
320 #[with_option(allow_alter_on_fly)]
321 pub enable_compaction: bool,
322
323 #[serde(rename = "compaction_interval_sec", default)]
325 #[serde_as(as = "Option<DisplayFromStr>")]
326 #[with_option(allow_alter_on_fly)]
327 pub compaction_interval_sec: Option<u64>,
328
329 #[serde(
331 rename = "enable_snapshot_expiration",
332 default,
333 deserialize_with = "deserialize_bool_from_string"
334 )]
335 #[with_option(allow_alter_on_fly)]
336 pub enable_snapshot_expiration: bool,
337
338 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
340 pub write_mode: IcebergWriteMode,
341
342 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
345 #[serde_as(as = "Option<DisplayFromStr>")]
346 #[with_option(allow_alter_on_fly)]
347 pub snapshot_expiration_max_age_millis: Option<i64>,
348
349 #[serde(rename = "snapshot_expiration_retain_last", default)]
351 #[serde_as(as = "Option<DisplayFromStr>")]
352 #[with_option(allow_alter_on_fly)]
353 pub snapshot_expiration_retain_last: Option<i32>,
354
355 #[serde(
356 rename = "snapshot_expiration_clear_expired_files",
357 default = "default_true",
358 deserialize_with = "deserialize_bool_from_string"
359 )]
360 #[with_option(allow_alter_on_fly)]
361 pub snapshot_expiration_clear_expired_files: bool,
362
363 #[serde(
364 rename = "snapshot_expiration_clear_expired_meta_data",
365 default = "default_true",
366 deserialize_with = "deserialize_bool_from_string"
367 )]
368 #[with_option(allow_alter_on_fly)]
369 pub snapshot_expiration_clear_expired_meta_data: bool,
370
371 #[serde(rename = "compaction.max_snapshots_num", default)]
374 #[serde_as(as = "Option<DisplayFromStr>")]
375 #[with_option(allow_alter_on_fly)]
376 pub max_snapshots_num_before_compaction: Option<usize>,
377
378 #[serde(rename = "compaction.small_files_threshold_mb", default)]
379 #[serde_as(as = "Option<DisplayFromStr>")]
380 #[with_option(allow_alter_on_fly)]
381 pub small_files_threshold_mb: Option<u64>,
382
383 #[serde(rename = "compaction.delete_files_count_threshold", default)]
384 #[serde_as(as = "Option<DisplayFromStr>")]
385 #[with_option(allow_alter_on_fly)]
386 pub delete_files_count_threshold: Option<usize>,
387
388 #[serde(rename = "compaction.trigger_snapshot_count", default)]
389 #[serde_as(as = "Option<DisplayFromStr>")]
390 #[with_option(allow_alter_on_fly)]
391 pub trigger_snapshot_count: Option<usize>,
392
393 #[serde(rename = "compaction.target_file_size_mb", default)]
394 #[serde_as(as = "Option<DisplayFromStr>")]
395 #[with_option(allow_alter_on_fly)]
396 pub target_file_size_mb: Option<u64>,
397
398 #[serde(rename = "compaction.type", default)]
401 #[with_option(allow_alter_on_fly)]
402 pub compaction_type: Option<CompactionType>,
403}
404
405impl EnforceSecret for IcebergConfig {
406 fn enforce_secret<'a>(
407 prop_iter: impl Iterator<Item = &'a str>,
408 ) -> crate::error::ConnectorResult<()> {
409 for prop in prop_iter {
410 IcebergCommon::enforce_one(prop)?;
411 }
412 Ok(())
413 }
414
415 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
416 IcebergCommon::enforce_one(prop)
417 }
418}
419
420impl IcebergConfig {
421 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
422 let mut config =
423 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
424 .map_err(|e| SinkError::Config(anyhow!(e)))?;
425
426 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
427 return Err(SinkError::Config(anyhow!(
428 "`{}` must be {}, or {}",
429 SINK_TYPE_OPTION,
430 SINK_TYPE_APPEND_ONLY,
431 SINK_TYPE_UPSERT
432 )));
433 }
434
435 if config.r#type == SINK_TYPE_UPSERT {
436 if let Some(primary_key) = &config.primary_key {
437 if primary_key.is_empty() {
438 return Err(SinkError::Config(anyhow!(
439 "`primary-key` must not be empty in {}",
440 SINK_TYPE_UPSERT
441 )));
442 }
443 } else {
444 return Err(SinkError::Config(anyhow!(
445 "Must set `primary-key` in {}",
446 SINK_TYPE_UPSERT
447 )));
448 }
449 }
450
451 config.java_catalog_props = values
453 .iter()
454 .filter(|(k, _v)| {
455 k.starts_with("catalog.")
456 && k != &"catalog.uri"
457 && k != &"catalog.type"
458 && k != &"catalog.name"
459 && k != &"catalog.header"
460 })
461 .map(|(k, v)| (k[8..].to_string(), v.clone()))
462 .collect();
463
464 if config.commit_checkpoint_interval == 0 {
465 return Err(SinkError::Config(anyhow!(
466 "`commit-checkpoint-interval` must be greater than 0"
467 )));
468 }
469
470 Ok(config)
471 }
472
473 pub fn catalog_type(&self) -> &str {
474 self.common.catalog_type()
475 }
476
477 pub async fn load_table(&self) -> Result<Table> {
478 self.common
479 .load_table(&self.table, &self.java_catalog_props)
480 .await
481 .map_err(Into::into)
482 }
483
484 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
485 self.common
486 .create_catalog(&self.java_catalog_props)
487 .await
488 .map_err(Into::into)
489 }
490
491 pub fn full_table_name(&self) -> Result<TableIdent> {
492 self.table.to_table_ident().map_err(Into::into)
493 }
494
495 pub fn catalog_name(&self) -> String {
496 self.common.catalog_name()
497 }
498
499 pub fn compaction_interval_sec(&self) -> u64 {
500 self.compaction_interval_sec.unwrap_or(3600)
502 }
503
504 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
507 self.snapshot_expiration_max_age_millis
508 .map(|max_age_millis| current_time_ms - max_age_millis)
509 }
510
511 pub fn trigger_snapshot_count(&self) -> usize {
512 self.trigger_snapshot_count.unwrap_or(16)
513 }
514
515 pub fn small_files_threshold_mb(&self) -> u64 {
516 self.small_files_threshold_mb.unwrap_or(64)
517 }
518
519 pub fn delete_files_count_threshold(&self) -> usize {
520 self.delete_files_count_threshold.unwrap_or(256)
521 }
522
523 pub fn target_file_size_mb(&self) -> u64 {
524 self.target_file_size_mb.unwrap_or(1024)
525 }
526
527 pub fn compaction_type(&self) -> CompactionType {
530 self.compaction_type.unwrap_or_default()
531 }
532}
533
534pub struct IcebergSink {
535 pub config: IcebergConfig,
536 param: SinkParam,
537 unique_column_ids: Option<Vec<usize>>,
539}
540
541impl EnforceSecret for IcebergSink {
542 fn enforce_secret<'a>(
543 prop_iter: impl Iterator<Item = &'a str>,
544 ) -> crate::error::ConnectorResult<()> {
545 for prop in prop_iter {
546 IcebergConfig::enforce_one(prop)?;
547 }
548 Ok(())
549 }
550}
551
552impl TryFrom<SinkParam> for IcebergSink {
553 type Error = SinkError;
554
555 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
556 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
557 IcebergSink::new(config, param)
558 }
559}
560
561impl Debug for IcebergSink {
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 f.debug_struct("IcebergSink")
564 .field("config", &self.config)
565 .finish()
566 }
567}
568
569async fn create_and_validate_table_impl(
570 config: &IcebergConfig,
571 param: &SinkParam,
572) -> Result<Table> {
573 if config.create_table_if_not_exists {
574 create_table_if_not_exists_impl(config, param).await?;
575 }
576
577 let table = config
578 .load_table()
579 .await
580 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
581
582 let sink_schema = param.schema();
583 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
584 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
585
586 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
587 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
588
589 Ok(table)
590}
591
592async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
593 let catalog = config.create_catalog().await?;
594 let namespace = if let Some(database_name) = config.table.database_name() {
595 let namespace = NamespaceIdent::new(database_name.to_owned());
596 if !catalog
597 .namespace_exists(&namespace)
598 .await
599 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
600 {
601 catalog
602 .create_namespace(&namespace, HashMap::default())
603 .await
604 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
605 .context("failed to create iceberg namespace")?;
606 }
607 namespace
608 } else {
609 bail!("database name must be set if you want to create table")
610 };
611
612 let table_id = config
613 .full_table_name()
614 .context("Unable to parse table name")?;
615 if !catalog
616 .table_exists(&table_id)
617 .await
618 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
619 {
620 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
621 let arrow_fields = param
623 .columns
624 .iter()
625 .map(|column| {
626 Ok(iceberg_create_table_arrow_convert
627 .to_arrow_field(&column.name, &column.data_type)
628 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
629 .context(format!(
630 "failed to convert {}: {} to arrow type",
631 &column.name, &column.data_type
632 ))?)
633 })
634 .collect::<Result<Vec<ArrowField>>>()?;
635 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
636 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
637 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
638 .context("failed to convert arrow schema to iceberg schema")?;
639
640 let location = {
641 let mut names = namespace.clone().inner();
642 names.push(config.table.table_name().to_owned());
643 match &config.common.warehouse_path {
644 Some(warehouse_path) => {
645 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
646 let url = Url::parse(warehouse_path);
647 if url.is_err() || is_s3_tables {
648 if config.common.catalog_type() == "rest"
651 || config.common.catalog_type() == "rest_rust"
652 {
653 None
654 } else {
655 bail!(format!("Invalid warehouse path: {}", warehouse_path))
656 }
657 } else if warehouse_path.ends_with('/') {
658 Some(format!("{}{}", warehouse_path, names.join("/")))
659 } else {
660 Some(format!("{}/{}", warehouse_path, names.join("/")))
661 }
662 }
663 None => None,
664 }
665 };
666
667 let partition_spec = match &config.partition_by {
668 Some(partition_by) => {
669 let mut partition_fields = Vec::<UnboundPartitionField>::new();
670 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
671 .into_iter()
672 .enumerate()
673 {
674 match iceberg_schema.field_id_by_name(&column) {
675 Some(id) => partition_fields.push(
676 UnboundPartitionField::builder()
677 .source_id(id)
678 .transform(transform)
679 .name(format!("_p_{}", column))
680 .field_id(PARTITION_DATA_ID_START + i as i32)
681 .build(),
682 ),
683 None => bail!(format!(
684 "Partition source column does not exist in schema: {}",
685 column
686 )),
687 };
688 }
689 Some(
690 UnboundPartitionSpec::builder()
691 .with_spec_id(0)
692 .add_partition_fields(partition_fields)
693 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
694 .context("failed to add partition columns")?
695 .build(),
696 )
697 }
698 None => None,
699 };
700
701 let table_creation_builder = TableCreation::builder()
702 .name(config.table.table_name().to_owned())
703 .schema(iceberg_schema);
704
705 let table_creation = match (location, partition_spec) {
706 (Some(location), Some(partition_spec)) => table_creation_builder
707 .location(location)
708 .partition_spec(partition_spec)
709 .build(),
710 (Some(location), None) => table_creation_builder.location(location).build(),
711 (None, Some(partition_spec)) => table_creation_builder
712 .partition_spec(partition_spec)
713 .build(),
714 (None, None) => table_creation_builder.build(),
715 };
716
717 catalog
718 .create_table(&namespace, table_creation)
719 .await
720 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
721 .context("failed to create iceberg table")?;
722 }
723 Ok(())
724}
725
726impl IcebergSink {
727 pub async fn create_and_validate_table(&self) -> Result<Table> {
728 create_and_validate_table_impl(&self.config, &self.param).await
729 }
730
731 pub async fn create_table_if_not_exists(&self) -> Result<()> {
732 create_table_if_not_exists_impl(&self.config, &self.param).await
733 }
734
735 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
736 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
737 if let Some(pk) = &config.primary_key {
738 let mut unique_column_ids = Vec::with_capacity(pk.len());
739 for col_name in pk {
740 let id = param
741 .columns
742 .iter()
743 .find(|col| col.name.as_str() == col_name)
744 .ok_or_else(|| {
745 SinkError::Config(anyhow!(
746 "Primary key column {} not found in sink schema",
747 col_name
748 ))
749 })?
750 .column_id
751 .get_id() as usize;
752 unique_column_ids.push(id);
753 }
754 Some(unique_column_ids)
755 } else {
756 unreachable!()
757 }
758 } else {
759 None
760 };
761 Ok(Self {
762 config,
763 param,
764 unique_column_ids,
765 })
766 }
767}
768
769impl Sink for IcebergSink {
770 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
771
772 const SINK_NAME: &'static str = ICEBERG_SINK;
773
774 async fn validate(&self) -> Result<()> {
775 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
776 bail!("Snowflake catalog only supports iceberg sources");
777 }
778
779 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
780 risingwave_common::license::Feature::IcebergSinkWithGlue
781 .check_available()
782 .map_err(|e| anyhow::anyhow!(e))?;
783 }
784
785 let compaction_type = self.config.compaction_type();
787
788 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
791 && compaction_type != CompactionType::Full
792 {
793 bail!(
794 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
795 compaction_type
796 );
797 }
798
799 match compaction_type {
800 CompactionType::SmallFiles => {
801 risingwave_common::license::Feature::IcebergCompaction
803 .check_available()
804 .map_err(|e| anyhow::anyhow!(e))?;
805
806 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
808 bail!(
809 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
810 self.config.write_mode
811 );
812 }
813
814 if self.config.delete_files_count_threshold.is_some() {
816 bail!(
817 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
818 );
819 }
820 }
821 CompactionType::FilesWithDelete => {
822 risingwave_common::license::Feature::IcebergCompaction
824 .check_available()
825 .map_err(|e| anyhow::anyhow!(e))?;
826
827 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
829 bail!(
830 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
831 self.config.write_mode
832 );
833 }
834
835 if self.config.small_files_threshold_mb.is_some() {
837 bail!(
838 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
839 );
840 }
841 }
842 CompactionType::Full => {
843 }
845 }
846
847 let _ = self.create_and_validate_table().await?;
848 Ok(())
849 }
850
851 fn support_schema_change() -> bool {
852 true
853 }
854
855 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
856 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
857
858 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
860 if iceberg_config.enable_compaction && compaction_interval == 0 {
861 bail!(
862 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
863 );
864 }
865
866 tracing::info!(
868 "Alter config compaction_interval set to {} seconds",
869 compaction_interval
870 );
871 }
872
873 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
875 && max_snapshots < 1
876 {
877 bail!(
878 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
879 max_snapshots
880 );
881 }
882
883 Ok(())
884 }
885
886 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
887 let writer = IcebergSinkWriter::new(
888 self.config.clone(),
889 self.param.clone(),
890 writer_param.clone(),
891 self.unique_column_ids.clone(),
892 );
893
894 let commit_checkpoint_interval =
895 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
896 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
897 );
898 let log_sinker = CoordinatedLogSinker::new(
899 &writer_param,
900 self.param.clone(),
901 writer,
902 commit_checkpoint_interval,
903 )
904 .await?;
905
906 Ok(log_sinker)
907 }
908
909 fn is_coordinated_sink(&self) -> bool {
910 true
911 }
912
913 async fn new_coordinator(
914 &self,
915 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
916 ) -> Result<SinkCommitCoordinator> {
917 let catalog = self.config.create_catalog().await?;
918 let table = self.create_and_validate_table().await?;
919 let coordinator = IcebergSinkCommitter {
920 catalog,
921 table,
922 last_commit_epoch: 0,
923 sink_id: self.param.sink_id,
924 config: self.config.clone(),
925 param: self.param.clone(),
926 commit_retry_num: self.config.commit_retry_num,
927 iceberg_compact_stat_sender,
928 };
929 if self.config.is_exactly_once.unwrap_or_default() {
930 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
931 } else {
932 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
933 }
934 }
935}
936
937enum ProjectIdxVec {
944 None,
945 Prepare(usize),
946 Done(Vec<usize>),
947}
948
949type DataFileWriterBuilderType =
950 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
951type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
952 ParquetWriterBuilder,
953 DefaultLocationGenerator,
954 DefaultFileNameGenerator,
955>;
956type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
957 ParquetWriterBuilder,
958 DefaultLocationGenerator,
959 DefaultFileNameGenerator,
960>;
961
962#[derive(Clone)]
963struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
964 inner: B,
965 fanout_enabled: bool,
966 schema: IcebergSchemaRef,
967 partition_spec: PartitionSpecRef,
968 compute_partition: bool,
969}
970
971impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
972 fn new(
973 inner: B,
974 fanout_enabled: bool,
975 schema: IcebergSchemaRef,
976 partition_spec: PartitionSpecRef,
977 compute_partition: bool,
978 ) -> Self {
979 Self {
980 inner,
981 fanout_enabled,
982 schema,
983 partition_spec,
984 compute_partition,
985 }
986 }
987
988 fn build(self) -> iceberg::Result<TaskWriter<B>> {
989 let partition_splitter = match (
990 self.partition_spec.is_unpartitioned(),
991 self.compute_partition,
992 ) {
993 (true, _) => None,
994 (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
995 self.schema.clone(),
996 self.partition_spec.clone(),
997 )?),
998 (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
999 self.schema.clone(),
1000 self.partition_spec.clone(),
1001 )?),
1002 };
1003
1004 Ok(TaskWriter::new_with_partition_splitter(
1005 self.inner,
1006 self.fanout_enabled,
1007 self.schema,
1008 self.partition_spec,
1009 partition_splitter,
1010 ))
1011 }
1012}
1013
1014pub enum IcebergSinkWriter {
1015 Created(IcebergSinkWriterArgs),
1016 Initialized(IcebergSinkWriterInner),
1017}
1018
1019pub struct IcebergSinkWriterArgs {
1020 config: IcebergConfig,
1021 sink_param: SinkParam,
1022 writer_param: SinkWriterParam,
1023 unique_column_ids: Option<Vec<usize>>,
1024}
1025
1026pub struct IcebergSinkWriterInner {
1027 writer: IcebergWriterDispatch,
1028 arrow_schema: SchemaRef,
1029 metrics: IcebergWriterMetrics,
1031 table: Table,
1033 project_idx_vec: ProjectIdxVec,
1036}
1037
1038#[allow(clippy::type_complexity)]
1039enum IcebergWriterDispatch {
1040 Append {
1041 writer: Option<Box<dyn IcebergWriter>>,
1042 writer_builder:
1043 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1044 },
1045 Upsert {
1046 writer: Option<Box<dyn IcebergWriter>>,
1047 writer_builder: TaskWriterBuilderWrapper<
1048 MonitoredGeneralWriterBuilder<
1049 DeltaWriterBuilder<
1050 DataFileWriterBuilderType,
1051 PositionDeleteFileWriterBuilderType,
1052 EqualityDeleteFileWriterBuilderType,
1053 >,
1054 >,
1055 >,
1056 arrow_schema_with_op_column: SchemaRef,
1057 },
1058}
1059
1060impl IcebergWriterDispatch {
1061 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1062 match self {
1063 IcebergWriterDispatch::Append { writer, .. }
1064 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1065 }
1066 }
1067}
1068
1069pub struct IcebergWriterMetrics {
1070 _write_qps: LabelGuardedIntCounter,
1075 _write_latency: LabelGuardedHistogram,
1076 write_bytes: LabelGuardedIntCounter,
1077}
1078
1079impl IcebergSinkWriter {
1080 pub fn new(
1081 config: IcebergConfig,
1082 sink_param: SinkParam,
1083 writer_param: SinkWriterParam,
1084 unique_column_ids: Option<Vec<usize>>,
1085 ) -> Self {
1086 Self::Created(IcebergSinkWriterArgs {
1087 config,
1088 sink_param,
1089 writer_param,
1090 unique_column_ids,
1091 })
1092 }
1093}
1094
1095impl IcebergSinkWriterInner {
1096 fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1097 let SinkWriterParam {
1098 extra_partition_col_idx,
1099 actor_id,
1100 sink_id,
1101 sink_name,
1102 ..
1103 } = writer_param;
1104 let metrics_labels = [
1105 &actor_id.to_string(),
1106 &sink_id.to_string(),
1107 sink_name.as_str(),
1108 ];
1109
1110 let write_qps = GLOBAL_SINK_METRICS
1112 .iceberg_write_qps
1113 .with_guarded_label_values(&metrics_labels);
1114 let write_latency = GLOBAL_SINK_METRICS
1115 .iceberg_write_latency
1116 .with_guarded_label_values(&metrics_labels);
1117 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1120 .iceberg_rolling_unflushed_data_file
1121 .with_guarded_label_values(&metrics_labels);
1122 let write_bytes = GLOBAL_SINK_METRICS
1123 .iceberg_write_bytes
1124 .with_guarded_label_values(&metrics_labels);
1125
1126 let schema = table.metadata().current_schema();
1127 let partition_spec = table.metadata().default_partition_spec();
1128 let fanout_enabled = !partition_spec.fields().is_empty();
1129
1130 let unique_uuid_suffix = Uuid::now_v7();
1132
1133 let parquet_writer_properties = WriterProperties::builder()
1134 .set_max_row_group_size(
1135 writer_param
1136 .streaming_config
1137 .developer
1138 .iceberg_sink_write_parquet_max_row_group_rows,
1139 )
1140 .build();
1141
1142 let parquet_writer_builder =
1143 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1144 let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1145 parquet_writer_builder,
1146 table.file_io().clone(),
1147 DefaultLocationGenerator::new(table.metadata().clone())
1148 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1149 DefaultFileNameGenerator::new(
1150 writer_param.actor_id.to_string(),
1151 Some(unique_uuid_suffix.to_string()),
1152 iceberg::spec::DataFileFormat::Parquet,
1153 ),
1154 );
1155 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1156 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1157 data_file_builder,
1158 write_qps.clone(),
1159 write_latency.clone(),
1160 );
1161 let writer_builder = TaskWriterBuilderWrapper::new(
1162 monitored_builder,
1163 fanout_enabled,
1164 schema.clone(),
1165 partition_spec.clone(),
1166 true,
1167 );
1168 let inner_writer = Some(Box::new(
1169 writer_builder
1170 .clone()
1171 .build()
1172 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1173 ) as Box<dyn IcebergWriter>);
1174 Ok(Self {
1175 arrow_schema: Arc::new(
1176 schema_to_arrow_schema(table.metadata().current_schema())
1177 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1178 ),
1179 metrics: IcebergWriterMetrics {
1180 _write_qps: write_qps,
1181 _write_latency: write_latency,
1182 write_bytes,
1183 },
1184 writer: IcebergWriterDispatch::Append {
1185 writer: inner_writer,
1186 writer_builder,
1187 },
1188 table,
1189 project_idx_vec: {
1190 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1191 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1192 } else {
1193 ProjectIdxVec::None
1194 }
1195 },
1196 })
1197 }
1198
1199 fn build_upsert(
1200 table: Table,
1201 unique_column_ids: Vec<usize>,
1202 writer_param: &SinkWriterParam,
1203 ) -> Result<Self> {
1204 let SinkWriterParam {
1205 extra_partition_col_idx,
1206 actor_id,
1207 sink_id,
1208 sink_name,
1209 ..
1210 } = writer_param;
1211 let metrics_labels = [
1212 &actor_id.to_string(),
1213 &sink_id.to_string(),
1214 sink_name.as_str(),
1215 ];
1216 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1217
1218 let write_qps = GLOBAL_SINK_METRICS
1220 .iceberg_write_qps
1221 .with_guarded_label_values(&metrics_labels);
1222 let write_latency = GLOBAL_SINK_METRICS
1223 .iceberg_write_latency
1224 .with_guarded_label_values(&metrics_labels);
1225 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1228 .iceberg_rolling_unflushed_data_file
1229 .with_guarded_label_values(&metrics_labels);
1230 let write_bytes = GLOBAL_SINK_METRICS
1231 .iceberg_write_bytes
1232 .with_guarded_label_values(&metrics_labels);
1233
1234 let schema = table.metadata().current_schema();
1236 let partition_spec = table.metadata().default_partition_spec();
1237 let fanout_enabled = !partition_spec.fields().is_empty();
1238
1239 let unique_uuid_suffix = Uuid::now_v7();
1241
1242 let parquet_writer_properties = WriterProperties::builder()
1243 .set_max_row_group_size(
1244 writer_param
1245 .streaming_config
1246 .developer
1247 .iceberg_sink_write_parquet_max_row_group_rows,
1248 )
1249 .build();
1250
1251 let data_file_builder = {
1252 let parquet_writer_builder =
1253 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1254 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1255 parquet_writer_builder,
1256 table.file_io().clone(),
1257 DefaultLocationGenerator::new(table.metadata().clone())
1258 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1259 DefaultFileNameGenerator::new(
1260 writer_param.actor_id.to_string(),
1261 Some(unique_uuid_suffix.to_string()),
1262 iceberg::spec::DataFileFormat::Parquet,
1263 ),
1264 );
1265 DataFileWriterBuilder::new(rolling_writer_builder)
1266 };
1267 let position_delete_builder = {
1268 let parquet_writer_builder = ParquetWriterBuilder::new(
1269 parquet_writer_properties.clone(),
1270 POSITION_DELETE_SCHEMA.clone().into(),
1271 );
1272 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1273 parquet_writer_builder,
1274 table.file_io().clone(),
1275 DefaultLocationGenerator::new(table.metadata().clone())
1276 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1277 DefaultFileNameGenerator::new(
1278 writer_param.actor_id.to_string(),
1279 Some(format!("pos-del-{}", unique_uuid_suffix)),
1280 iceberg::spec::DataFileFormat::Parquet,
1281 ),
1282 );
1283 PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1284 };
1285 let equality_delete_builder = {
1286 let config = EqualityDeleteWriterConfig::new(
1287 unique_column_ids.clone(),
1288 table.metadata().current_schema().clone(),
1289 )
1290 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1291 let parquet_writer_builder = ParquetWriterBuilder::new(
1292 parquet_writer_properties,
1293 Arc::new(
1294 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1295 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1296 ),
1297 );
1298 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1299 parquet_writer_builder,
1300 table.file_io().clone(),
1301 DefaultLocationGenerator::new(table.metadata().clone())
1302 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1303 DefaultFileNameGenerator::new(
1304 writer_param.actor_id.to_string(),
1305 Some(format!("eq-del-{}", unique_uuid_suffix)),
1306 iceberg::spec::DataFileFormat::Parquet,
1307 ),
1308 );
1309
1310 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1311 };
1312 let delta_builder = DeltaWriterBuilder::new(
1313 data_file_builder,
1314 position_delete_builder,
1315 equality_delete_builder,
1316 unique_column_ids,
1317 schema.clone(),
1318 );
1319 let original_arrow_schema = Arc::new(
1320 schema_to_arrow_schema(table.metadata().current_schema())
1321 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1322 );
1323 let schema_with_extra_op_column = {
1324 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1325 new_fields.push(Arc::new(ArrowField::new(
1326 "op".to_owned(),
1327 ArrowDataType::Int32,
1328 false,
1329 )));
1330 Arc::new(ArrowSchema::new(new_fields))
1331 };
1332 let writer_builder = TaskWriterBuilderWrapper::new(
1333 MonitoredGeneralWriterBuilder::new(
1334 delta_builder,
1335 write_qps.clone(),
1336 write_latency.clone(),
1337 ),
1338 fanout_enabled,
1339 schema.clone(),
1340 partition_spec.clone(),
1341 true,
1342 );
1343 let inner_writer = Some(Box::new(
1344 writer_builder
1345 .clone()
1346 .build()
1347 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1348 ) as Box<dyn IcebergWriter>);
1349 Ok(Self {
1350 arrow_schema: original_arrow_schema,
1351 metrics: IcebergWriterMetrics {
1352 _write_qps: write_qps,
1353 _write_latency: write_latency,
1354 write_bytes,
1355 },
1356 table,
1357 writer: IcebergWriterDispatch::Upsert {
1358 writer: inner_writer,
1359 writer_builder,
1360 arrow_schema_with_op_column: schema_with_extra_op_column,
1361 },
1362 project_idx_vec: {
1363 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1364 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1365 } else {
1366 ProjectIdxVec::None
1367 }
1368 },
1369 })
1370 }
1371}
1372
1373#[async_trait]
1374impl SinkWriter for IcebergSinkWriter {
1375 type CommitMetadata = Option<SinkMetadata>;
1376
1377 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1379 let Self::Created(args) = self else {
1380 return Ok(());
1381 };
1382
1383 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1384 let inner = match &args.unique_column_ids {
1385 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1386 table,
1387 unique_column_ids.clone(),
1388 &args.writer_param,
1389 )?,
1390 None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1391 };
1392
1393 *self = IcebergSinkWriter::Initialized(inner);
1394 Ok(())
1395 }
1396
1397 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1399 let Self::Initialized(inner) = self else {
1400 unreachable!("IcebergSinkWriter should be initialized before barrier");
1401 };
1402
1403 match &mut inner.writer {
1405 IcebergWriterDispatch::Append {
1406 writer,
1407 writer_builder,
1408 } => {
1409 if writer.is_none() {
1410 *writer = Some(Box::new(
1411 writer_builder
1412 .clone()
1413 .build()
1414 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1415 ));
1416 }
1417 }
1418 IcebergWriterDispatch::Upsert {
1419 writer,
1420 writer_builder,
1421 ..
1422 } => {
1423 if writer.is_none() {
1424 *writer = Some(Box::new(
1425 writer_builder
1426 .clone()
1427 .build()
1428 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1429 ));
1430 }
1431 }
1432 };
1433
1434 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1436 match &mut inner.project_idx_vec {
1437 ProjectIdxVec::None => {}
1438 ProjectIdxVec::Prepare(idx) => {
1439 if *idx >= chunk.columns().len() {
1440 return Err(SinkError::Iceberg(anyhow!(
1441 "invalid extra partition column index {}",
1442 idx
1443 )));
1444 }
1445 let project_idx_vec = (0..*idx)
1446 .chain(*idx + 1..chunk.columns().len())
1447 .collect_vec();
1448 chunk = chunk.project(&project_idx_vec);
1449 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1450 }
1451 ProjectIdxVec::Done(idx_vec) => {
1452 chunk = chunk.project(idx_vec);
1453 }
1454 }
1455 if ops.is_empty() {
1456 return Ok(());
1457 }
1458 let write_batch_size = chunk.estimated_heap_size();
1459 let batch = match &inner.writer {
1460 IcebergWriterDispatch::Append { .. } => {
1461 let filters =
1463 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1464 chunk.set_visibility(filters);
1465 IcebergArrowConvert
1466 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1467 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1468 }
1469 IcebergWriterDispatch::Upsert {
1470 arrow_schema_with_op_column,
1471 ..
1472 } => {
1473 let chunk = IcebergArrowConvert
1474 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1475 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1476 let ops = Arc::new(Int32Array::from(
1477 ops.iter()
1478 .map(|op| match op {
1479 Op::UpdateInsert | Op::Insert => INSERT_OP,
1480 Op::UpdateDelete | Op::Delete => DELETE_OP,
1481 })
1482 .collect_vec(),
1483 ));
1484 let mut columns = chunk.columns().to_vec();
1485 columns.push(ops);
1486 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1487 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1488 }
1489 };
1490
1491 let writer = inner.writer.get_writer().unwrap();
1492 writer
1493 .write(batch)
1494 .instrument_await("iceberg_write")
1495 .await
1496 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1497 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1498 Ok(())
1499 }
1500
1501 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1504 let Self::Initialized(inner) = self else {
1505 unreachable!("IcebergSinkWriter should be initialized before barrier");
1506 };
1507
1508 if !is_checkpoint {
1510 return Ok(None);
1511 }
1512
1513 let close_result = match &mut inner.writer {
1514 IcebergWriterDispatch::Append {
1515 writer,
1516 writer_builder,
1517 } => {
1518 let close_result = match writer.take() {
1519 Some(mut writer) => {
1520 Some(writer.close().instrument_await("iceberg_close").await)
1521 }
1522 _ => None,
1523 };
1524 match writer_builder.clone().build() {
1525 Ok(new_writer) => {
1526 *writer = Some(Box::new(new_writer));
1527 }
1528 _ => {
1529 warn!("Failed to build new writer after close");
1532 }
1533 }
1534 close_result
1535 }
1536 IcebergWriterDispatch::Upsert {
1537 writer,
1538 writer_builder,
1539 ..
1540 } => {
1541 let close_result = match writer.take() {
1542 Some(mut writer) => {
1543 Some(writer.close().instrument_await("iceberg_close").await)
1544 }
1545 _ => None,
1546 };
1547 match writer_builder.clone().build() {
1548 Ok(new_writer) => {
1549 *writer = Some(Box::new(new_writer));
1550 }
1551 _ => {
1552 warn!("Failed to build new writer after close");
1555 }
1556 }
1557 close_result
1558 }
1559 };
1560
1561 match close_result {
1562 Some(Ok(result)) => {
1563 let format_version = inner.table.metadata().format_version();
1564 let partition_type = inner.table.metadata().default_partition_type();
1565 let data_files = result
1566 .into_iter()
1567 .map(|f| {
1568 let truncated = truncate_datafile(f);
1570 SerializedDataFile::try_from(truncated, partition_type, format_version)
1571 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1572 })
1573 .collect::<Result<Vec<_>>>()?;
1574 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1575 data_files,
1576 schema_id: inner.table.metadata().current_schema_id(),
1577 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1578 })?))
1579 }
1580 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1581 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1582 }
1583 }
1584}
1585
1586const SCHEMA_ID: &str = "schema_id";
1587const PARTITION_SPEC_ID: &str = "partition_spec_id";
1588const DATA_FILES: &str = "data_files";
1589
1590const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1614 data_file.lower_bounds.retain(|field_id, datum| {
1616 let size = match datum.to_bytes() {
1618 Ok(bytes) => bytes.len(),
1619 Err(_) => 0,
1620 };
1621
1622 if size > MAX_COLUMN_STAT_SIZE {
1623 tracing::debug!(
1624 field_id = field_id,
1625 size = size,
1626 "Truncating large lower_bound statistic"
1627 );
1628 return false;
1629 }
1630 true
1631 });
1632
1633 data_file.upper_bounds.retain(|field_id, datum| {
1635 let size = match datum.to_bytes() {
1637 Ok(bytes) => bytes.len(),
1638 Err(_) => 0,
1639 };
1640
1641 if size > MAX_COLUMN_STAT_SIZE {
1642 tracing::debug!(
1643 field_id = field_id,
1644 size = size,
1645 "Truncating large upper_bound statistic"
1646 );
1647 return false;
1648 }
1649 true
1650 });
1651
1652 data_file
1653}
1654
1655#[derive(Default, Clone)]
1656struct IcebergCommitResult {
1657 schema_id: i32,
1658 partition_spec_id: i32,
1659 data_files: Vec<SerializedDataFile>,
1660}
1661
1662impl IcebergCommitResult {
1663 fn try_from(value: &SinkMetadata) -> Result<Self> {
1664 if let Some(Serialized(v)) = &value.metadata {
1665 let mut values = if let serde_json::Value::Object(v) =
1666 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1667 .context("Can't parse iceberg sink metadata")?
1668 {
1669 v
1670 } else {
1671 bail!("iceberg sink metadata should be an object");
1672 };
1673
1674 let schema_id;
1675 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1676 schema_id = value
1677 .as_u64()
1678 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1679 } else {
1680 bail!("iceberg sink metadata should have schema_id");
1681 }
1682
1683 let partition_spec_id;
1684 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1685 partition_spec_id = value
1686 .as_u64()
1687 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1688 } else {
1689 bail!("iceberg sink metadata should have partition_spec_id");
1690 }
1691
1692 let data_files: Vec<SerializedDataFile>;
1693 if let serde_json::Value::Array(values) = values
1694 .remove(DATA_FILES)
1695 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1696 {
1697 data_files = values
1698 .into_iter()
1699 .map(from_value::<SerializedDataFile>)
1700 .collect::<std::result::Result<_, _>>()
1701 .unwrap();
1702 } else {
1703 bail!("iceberg sink metadata should have data_files object");
1704 }
1705
1706 Ok(Self {
1707 schema_id: schema_id as i32,
1708 partition_spec_id: partition_spec_id as i32,
1709 data_files,
1710 })
1711 } else {
1712 bail!("Can't create iceberg sink write result from empty data!")
1713 }
1714 }
1715
1716 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1717 let mut values = if let serde_json::Value::Object(value) =
1718 serde_json::from_slice::<serde_json::Value>(&value)
1719 .context("Can't parse iceberg sink metadata")?
1720 {
1721 value
1722 } else {
1723 bail!("iceberg sink metadata should be an object");
1724 };
1725
1726 let schema_id;
1727 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1728 schema_id = value
1729 .as_u64()
1730 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1731 } else {
1732 bail!("iceberg sink metadata should have schema_id");
1733 }
1734
1735 let partition_spec_id;
1736 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1737 partition_spec_id = value
1738 .as_u64()
1739 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1740 } else {
1741 bail!("iceberg sink metadata should have partition_spec_id");
1742 }
1743
1744 let data_files: Vec<SerializedDataFile>;
1745 if let serde_json::Value::Array(values) = values
1746 .remove(DATA_FILES)
1747 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1748 {
1749 data_files = values
1750 .into_iter()
1751 .map(from_value::<SerializedDataFile>)
1752 .collect::<std::result::Result<_, _>>()
1753 .unwrap();
1754 } else {
1755 bail!("iceberg sink metadata should have data_files object");
1756 }
1757
1758 Ok(Self {
1759 schema_id: schema_id as i32,
1760 partition_spec_id: partition_spec_id as i32,
1761 data_files,
1762 })
1763 }
1764}
1765
1766impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1767 type Error = SinkError;
1768
1769 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1770 let json_data_files = serde_json::Value::Array(
1771 value
1772 .data_files
1773 .iter()
1774 .map(serde_json::to_value)
1775 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1776 .context("Can't serialize data files to json")?,
1777 );
1778 let json_value = serde_json::Value::Object(
1779 vec![
1780 (
1781 SCHEMA_ID.to_owned(),
1782 serde_json::Value::Number(value.schema_id.into()),
1783 ),
1784 (
1785 PARTITION_SPEC_ID.to_owned(),
1786 serde_json::Value::Number(value.partition_spec_id.into()),
1787 ),
1788 (DATA_FILES.to_owned(), json_data_files),
1789 ]
1790 .into_iter()
1791 .collect(),
1792 );
1793 Ok(SinkMetadata {
1794 metadata: Some(Serialized(SerializedMetadata {
1795 metadata: serde_json::to_vec(&json_value)
1796 .context("Can't serialize iceberg sink metadata")?,
1797 })),
1798 })
1799 }
1800}
1801
1802impl TryFrom<IcebergCommitResult> for Vec<u8> {
1803 type Error = SinkError;
1804
1805 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1806 let json_data_files = serde_json::Value::Array(
1807 value
1808 .data_files
1809 .iter()
1810 .map(serde_json::to_value)
1811 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1812 .context("Can't serialize data files to json")?,
1813 );
1814 let json_value = serde_json::Value::Object(
1815 vec![
1816 (
1817 SCHEMA_ID.to_owned(),
1818 serde_json::Value::Number(value.schema_id.into()),
1819 ),
1820 (
1821 PARTITION_SPEC_ID.to_owned(),
1822 serde_json::Value::Number(value.partition_spec_id.into()),
1823 ),
1824 (DATA_FILES.to_owned(), json_data_files),
1825 ]
1826 .into_iter()
1827 .collect(),
1828 );
1829 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1830 }
1831}
1832pub struct IcebergSinkCommitter {
1833 catalog: Arc<dyn Catalog>,
1834 table: Table,
1835 pub last_commit_epoch: u64,
1836 pub(crate) sink_id: SinkId,
1837 pub(crate) config: IcebergConfig,
1838 pub(crate) param: SinkParam,
1839 commit_retry_num: u32,
1840 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1841}
1842
1843impl IcebergSinkCommitter {
1844 async fn reload_table(
1847 catalog: &dyn Catalog,
1848 table_ident: &TableIdent,
1849 schema_id: i32,
1850 partition_spec_id: i32,
1851 ) -> Result<Table> {
1852 let table = catalog
1853 .load_table(table_ident)
1854 .await
1855 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1856 if table.metadata().current_schema_id() != schema_id {
1857 return Err(SinkError::Iceberg(anyhow!(
1858 "Schema evolution not supported, expect schema id {}, but got {}",
1859 schema_id,
1860 table.metadata().current_schema_id()
1861 )));
1862 }
1863 if table.metadata().default_partition_spec_id() != partition_spec_id {
1864 return Err(SinkError::Iceberg(anyhow!(
1865 "Partition evolution not supported, expect partition spec id {}, but got {}",
1866 partition_spec_id,
1867 table.metadata().default_partition_spec_id()
1868 )));
1869 }
1870 Ok(table)
1871 }
1872}
1873
1874#[async_trait]
1875impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1876 async fn init(&mut self) -> Result<()> {
1877 tracing::info!(
1878 "Sink id = {}: iceberg sink coordinator initing.",
1879 self.param.sink_id
1880 );
1881
1882 Ok(())
1883 }
1884
1885 async fn commit(
1886 &mut self,
1887 epoch: u64,
1888 metadata: Vec<SinkMetadata>,
1889 add_columns: Option<Vec<Field>>,
1890 ) -> Result<()> {
1891 tracing::info!("Starting iceberg direct commit in epoch {epoch}");
1892
1893 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata, None)? {
1895 self.commit_datafile(epoch, write_results, snapshot_id)
1896 .await?;
1897 }
1898
1899 if let Some(add_columns) = add_columns {
1901 tracing::info!(?epoch, "Committing schema change");
1902 self.commit_schema_change(add_columns).await?;
1903 }
1904
1905 Ok(())
1906 }
1907}
1908
1909#[async_trait]
1910impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1911 async fn init(&mut self) -> Result<()> {
1912 tracing::info!(
1913 "Sink id = {}: iceberg sink coordinator initing.",
1914 self.param.sink_id
1915 );
1916
1917 Ok(())
1918 }
1919
1920 async fn pre_commit(
1921 &mut self,
1922 epoch: u64,
1923 metadata: Vec<SinkMetadata>,
1924 add_columns: Option<Vec<Field>>,
1925 ) -> Result<Vec<u8>> {
1926 tracing::info!("Starting iceberg pre commit in epoch {epoch}");
1927
1928 if let Some(add_columns) = &add_columns {
1930 return Err(SinkError::Iceberg(anyhow!(
1931 "TwoPhaseCommitCoordinator for Iceberg sink does not support schema change yet, \
1932 but got add_columns: {:?}",
1933 add_columns.iter().map(|c| &c.name).collect::<Vec<_>>()
1934 )));
1935 }
1936
1937 let (write_results, snapshot_id) =
1938 match self.pre_commit_inner(epoch, metadata, add_columns)? {
1939 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1940 None => {
1941 tracing::debug!(?epoch, "no data to commit");
1942 return Ok(vec![]);
1943 }
1944 };
1945
1946 let mut write_results_bytes = Vec::new();
1947 for each_parallelism_write_result in write_results {
1948 let each_parallelism_write_result_bytes: Vec<u8> =
1949 each_parallelism_write_result.try_into()?;
1950 write_results_bytes.push(each_parallelism_write_result_bytes);
1951 }
1952
1953 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1954 write_results_bytes.push(snapshot_id_bytes);
1955
1956 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1957 Ok(pre_commit_metadata_bytes)
1958 }
1959
1960 async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1961 tracing::info!("Starting iceberg commit in epoch {epoch}");
1962 if commit_metadata.is_empty() {
1963 tracing::debug!(?epoch, "no data to commit");
1964 return Ok(());
1965 }
1966
1967 let mut write_results_bytes = deserialize_metadata(commit_metadata);
1968
1969 let snapshot_id_bytes = write_results_bytes.pop().unwrap();
1970 let snapshot_id = i64::from_le_bytes(
1971 snapshot_id_bytes
1972 .try_into()
1973 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1974 );
1975
1976 if self
1977 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1978 .await?
1979 {
1980 tracing::info!(
1981 "Snapshot id {} already committed in iceberg table, skip committing again.",
1982 snapshot_id
1983 );
1984 return Ok(());
1985 }
1986
1987 let mut write_results = vec![];
1988 for each in write_results_bytes {
1989 let write_result = IcebergCommitResult::try_from_serialized_bytes(each)?;
1990 write_results.push(write_result);
1991 }
1992
1993 self.commit_datafile(epoch, write_results, snapshot_id)
1994 .await?;
1995
1996 Ok(())
1997 }
1998
1999 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2000 tracing::debug!("Abort not implemented yet");
2002 }
2003}
2004
2005impl IcebergSinkCommitter {
2007 fn pre_commit_inner(
2008 &mut self,
2009 _epoch: u64,
2010 metadata: Vec<SinkMetadata>,
2011 add_columns: Option<Vec<Field>>,
2012 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2013 if let Some(add_columns) = add_columns {
2014 return Err(anyhow!(
2015 "Iceberg sink not support add columns, but got: {:?}",
2016 add_columns
2017 )
2018 .into());
2019 }
2020
2021 let write_results: Vec<IcebergCommitResult> = metadata
2022 .iter()
2023 .map(IcebergCommitResult::try_from)
2024 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2025
2026 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2028 return Ok(None);
2029 }
2030
2031 let expect_schema_id = write_results[0].schema_id;
2032 let expect_partition_spec_id = write_results[0].partition_spec_id;
2033
2034 if write_results
2036 .iter()
2037 .any(|r| r.schema_id != expect_schema_id)
2038 || write_results
2039 .iter()
2040 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2041 {
2042 return Err(SinkError::Iceberg(anyhow!(
2043 "schema_id and partition_spec_id should be the same in all write results"
2044 )));
2045 }
2046
2047 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2048
2049 Ok(Some((write_results, snapshot_id)))
2050 }
2051
2052 async fn commit_datafile(
2053 &mut self,
2054 epoch: u64,
2055 write_results: Vec<IcebergCommitResult>,
2056 snapshot_id: i64,
2057 ) -> Result<()> {
2058 assert!(
2060 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2061 );
2062
2063 self.wait_for_snapshot_limit().await?;
2065
2066 let expect_schema_id = write_results[0].schema_id;
2067 let expect_partition_spec_id = write_results[0].partition_spec_id;
2068
2069 self.table = Self::reload_table(
2071 self.catalog.as_ref(),
2072 self.table.identifier(),
2073 expect_schema_id,
2074 expect_partition_spec_id,
2075 )
2076 .await?;
2077
2078 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2079 return Err(SinkError::Iceberg(anyhow!(
2080 "Can't find schema by id {}",
2081 expect_schema_id
2082 )));
2083 };
2084 let Some(partition_spec) = self
2085 .table
2086 .metadata()
2087 .partition_spec_by_id(expect_partition_spec_id)
2088 else {
2089 return Err(SinkError::Iceberg(anyhow!(
2090 "Can't find partition spec by id {}",
2091 expect_partition_spec_id
2092 )));
2093 };
2094 let partition_type = partition_spec
2095 .as_ref()
2096 .clone()
2097 .partition_type(schema)
2098 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2099
2100 let data_files = write_results
2101 .into_iter()
2102 .flat_map(|r| {
2103 r.data_files.into_iter().map(|f| {
2104 f.try_into(expect_partition_spec_id, &partition_type, schema)
2105 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2106 })
2107 })
2108 .collect::<Result<Vec<DataFile>>>()?;
2109 let retry_strategy = ExponentialBackoff::from_millis(10)
2114 .max_delay(Duration::from_secs(60))
2115 .map(jitter)
2116 .take(self.commit_retry_num as usize);
2117 let catalog = self.catalog.clone();
2118 let table_ident = self.table.identifier().clone();
2119
2120 enum CommitError {
2125 ReloadTable(SinkError), Commit(SinkError), }
2128
2129 let table = RetryIf::spawn(
2130 retry_strategy,
2131 || async {
2132 let table = Self::reload_table(
2134 catalog.as_ref(),
2135 &table_ident,
2136 expect_schema_id,
2137 expect_partition_spec_id,
2138 )
2139 .await
2140 .map_err(|e| {
2141 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2142 CommitError::ReloadTable(e)
2143 })?;
2144
2145 let txn = Transaction::new(&table);
2146 let append_action = txn
2147 .fast_append()
2148 .set_snapshot_id(snapshot_id)
2149 .set_target_branch(commit_branch(
2150 self.config.r#type.as_str(),
2151 self.config.write_mode,
2152 ))
2153 .add_data_files(data_files.clone());
2154
2155 let tx = append_action.apply(txn).map_err(|err| {
2156 let err: IcebergError = err.into();
2157 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2158 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2159 })?;
2160
2161 tx.commit(catalog.as_ref()).await.map_err(|err| {
2162 let err: IcebergError = err.into();
2163 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2164 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2165 })
2166 },
2167 |err: &CommitError| {
2168 match err {
2170 CommitError::Commit(_) => {
2171 tracing::warn!("Commit failed, will retry");
2172 true
2173 }
2174 CommitError::ReloadTable(_) => {
2175 tracing::error!(
2176 "reload_table failed with non-retriable error, will not retry"
2177 );
2178 false
2179 }
2180 }
2181 },
2182 )
2183 .await
2184 .map_err(|e| match e {
2185 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2186 })?;
2187 self.table = table;
2188
2189 let snapshot_num = self.table.metadata().snapshots().count();
2190 let catalog_name = self.config.common.catalog_name();
2191 let table_name = self.table.identifier().to_string();
2192 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2193 GLOBAL_SINK_METRICS
2194 .iceberg_snapshot_num
2195 .with_guarded_label_values(&metrics_labels)
2196 .set(snapshot_num as i64);
2197
2198 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2199
2200 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2201 && self.config.enable_compaction
2202 && iceberg_compact_stat_sender
2203 .send(IcebergSinkCompactionUpdate {
2204 sink_id: self.sink_id,
2205 compaction_interval: self.config.compaction_interval_sec(),
2206 force_compaction: false,
2207 })
2208 .is_err()
2209 {
2210 warn!("failed to send iceberg compaction stats");
2211 }
2212
2213 Ok(())
2214 }
2215
2216 async fn is_snapshot_id_in_iceberg(
2220 &self,
2221 iceberg_config: &IcebergConfig,
2222 snapshot_id: i64,
2223 ) -> Result<bool> {
2224 let table = iceberg_config.load_table().await?;
2225 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2226 Ok(true)
2227 } else {
2228 Ok(false)
2229 }
2230 }
2231
2232 async fn commit_schema_change(&mut self, add_columns: Vec<Field>) -> Result<()> {
2236 use iceberg::spec::NestedField;
2237
2238 tracing::info!(
2239 "Starting to commit schema change with {} columns",
2240 add_columns.len()
2241 );
2242
2243 let metadata = self.table.metadata();
2245 let mut next_field_id = metadata.last_column_id() + 1;
2246 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2247
2248 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2250 let mut new_fields = Vec::new();
2251
2252 for field in &add_columns {
2253 let arrow_field = iceberg_create_table_arrow_convert
2255 .to_arrow_field(&field.name, &field.data_type)
2256 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2257 .map_err(SinkError::Iceberg)?;
2258
2259 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2261 .map_err(|err| {
2262 SinkError::Iceberg(
2263 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2264 )
2265 })?;
2266
2267 let nested_field = Arc::new(NestedField::optional(
2269 next_field_id,
2270 &field.name,
2271 iceberg_type,
2272 ));
2273
2274 new_fields.push(nested_field);
2275 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2276 next_field_id += 1;
2277 }
2278
2279 tracing::info!(
2281 "Committing schema change to catalog for table {}",
2282 self.table.identifier()
2283 );
2284
2285 let txn = Transaction::new(&self.table);
2286 let action = txn.update_schema().add_fields(new_fields);
2287
2288 let updated_table = action
2289 .apply(txn)
2290 .context("Failed to apply schema update action")
2291 .map_err(SinkError::Iceberg)?
2292 .commit(self.catalog.as_ref())
2293 .await
2294 .context("Failed to commit table schema change")
2295 .map_err(SinkError::Iceberg)?;
2296
2297 self.table = updated_table;
2298
2299 tracing::info!(
2300 "Successfully committed schema change, added {} columns to iceberg table",
2301 add_columns.len()
2302 );
2303
2304 Ok(())
2305 }
2306
2307 fn count_snapshots_since_rewrite(&self) -> usize {
2310 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2311 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2312
2313 let mut count = 0;
2315 for snapshot in snapshots {
2316 let summary = snapshot.summary();
2318 match &summary.operation {
2319 Operation::Replace => {
2320 break;
2322 }
2323
2324 _ => {
2325 count += 1;
2327 }
2328 }
2329 }
2330
2331 count
2332 }
2333
2334 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2336 if !self.config.enable_compaction {
2337 return Ok(());
2338 }
2339
2340 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2341 loop {
2342 let current_count = self.count_snapshots_since_rewrite();
2343
2344 if current_count < max_snapshots {
2345 tracing::info!(
2346 "Snapshot count check passed: {} < {}",
2347 current_count,
2348 max_snapshots
2349 );
2350 break;
2351 }
2352
2353 tracing::info!(
2354 "Snapshot count {} exceeds limit {}, waiting...",
2355 current_count,
2356 max_snapshots
2357 );
2358
2359 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2360 && iceberg_compact_stat_sender
2361 .send(IcebergSinkCompactionUpdate {
2362 sink_id: self.sink_id,
2363 compaction_interval: self.config.compaction_interval_sec(),
2364 force_compaction: true,
2365 })
2366 .is_err()
2367 {
2368 tracing::warn!("failed to send iceberg compaction stats");
2369 }
2370
2371 tokio::time::sleep(Duration::from_secs(30)).await;
2373
2374 self.table = self.config.load_table().await?;
2376 }
2377 }
2378 Ok(())
2379 }
2380}
2381
2382const MAP_KEY: &str = "key";
2383const MAP_VALUE: &str = "value";
2384
2385fn get_fields<'a>(
2386 our_field_type: &'a risingwave_common::types::DataType,
2387 data_type: &ArrowDataType,
2388 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2389) -> Option<ArrowFields> {
2390 match data_type {
2391 ArrowDataType::Struct(fields) => {
2392 match our_field_type {
2393 risingwave_common::types::DataType::Struct(struct_fields) => {
2394 struct_fields.iter().for_each(|(name, data_type)| {
2395 let res = schema_fields.insert(name, data_type);
2396 assert!(res.is_none())
2398 });
2399 }
2400 risingwave_common::types::DataType::Map(map_fields) => {
2401 schema_fields.insert(MAP_KEY, map_fields.key());
2402 schema_fields.insert(MAP_VALUE, map_fields.value());
2403 }
2404 risingwave_common::types::DataType::List(list) => {
2405 list.elem()
2406 .as_struct()
2407 .iter()
2408 .for_each(|(name, data_type)| {
2409 let res = schema_fields.insert(name, data_type);
2410 assert!(res.is_none())
2412 });
2413 }
2414 _ => {}
2415 };
2416 Some(fields.clone())
2417 }
2418 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2419 get_fields(our_field_type, field.data_type(), schema_fields)
2420 }
2421 _ => None, }
2423}
2424
2425fn check_compatibility(
2426 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2427 fields: &ArrowFields,
2428) -> anyhow::Result<bool> {
2429 for arrow_field in fields {
2430 let our_field_type = schema_fields
2431 .get(arrow_field.name().as_str())
2432 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2433
2434 let converted_arrow_data_type = IcebergArrowConvert
2436 .to_arrow_field("", our_field_type)
2437 .map_err(|e| anyhow!(e))?
2438 .data_type()
2439 .clone();
2440
2441 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2442 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2443 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2444 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2445 (ArrowDataType::List(_), ArrowDataType::List(field))
2446 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2447 let mut schema_fields = HashMap::new();
2448 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2449 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2450 }
2451 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2453 let mut schema_fields = HashMap::new();
2454 our_field_type
2455 .as_struct()
2456 .iter()
2457 .for_each(|(name, data_type)| {
2458 let res = schema_fields.insert(name, data_type);
2459 assert!(res.is_none())
2461 });
2462 check_compatibility(schema_fields, fields)?
2463 }
2464 (left, right) => left.equals_datatype(right),
2472 };
2473 if !compatible {
2474 bail!(
2475 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2476 arrow_field.name(),
2477 converted_arrow_data_type,
2478 arrow_field.data_type()
2479 );
2480 }
2481 }
2482 Ok(true)
2483}
2484
2485pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2487 if rw_schema.fields.len() != arrow_schema.fields().len() {
2488 bail!(
2489 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2490 rw_schema.fields.len(),
2491 arrow_schema.fields.len()
2492 );
2493 }
2494
2495 let mut schema_fields = HashMap::new();
2496 rw_schema.fields.iter().for_each(|field| {
2497 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2498 assert!(res.is_none())
2500 });
2501
2502 check_compatibility(schema_fields, &arrow_schema.fields)?;
2503 Ok(())
2504}
2505
2506fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2507 serde_json::to_vec(&metadata).unwrap()
2508}
2509
2510fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2511 serde_json::from_slice(&bytes).unwrap()
2512}
2513
2514pub fn parse_partition_by_exprs(
2515 expr: String,
2516) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2517 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2519 if !re.is_match(&expr) {
2520 bail!(format!(
2521 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2522 expr
2523 ))
2524 }
2525 let caps = re.captures_iter(&expr);
2526
2527 let mut partition_columns = vec![];
2528
2529 for mat in caps {
2530 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2531 (&mat["transform"], Transform::Identity)
2532 } else {
2533 let mut func = mat["transform"].to_owned();
2534 if func == "bucket" || func == "truncate" {
2535 let n = &mat
2536 .name("n")
2537 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2538 .as_str();
2539 func = format!("{func}[{n}]");
2540 }
2541 (
2542 &mat["field"],
2543 Transform::from_str(&func)
2544 .with_context(|| format!("invalid transform function {}", func))?,
2545 )
2546 };
2547 partition_columns.push((column.to_owned(), transform));
2548 }
2549 Ok(partition_columns)
2550}
2551
2552pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2553 if should_enable_iceberg_cow(sink_type, write_mode) {
2554 ICEBERG_COW_BRANCH.to_owned()
2555 } else {
2556 MAIN_BRANCH.to_owned()
2557 }
2558}
2559
2560pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2561 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2562}
2563
2564impl crate::with_options::WithOptions for IcebergWriteMode {}
2565
2566impl crate::with_options::WithOptions for CompactionType {}
2567
2568#[cfg(test)]
2569mod test {
2570 use std::collections::BTreeMap;
2571
2572 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2573 use risingwave_common::types::{DataType, MapType, StructType};
2574
2575 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2576 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2577 use crate::sink::iceberg::{
2578 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2579 ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2580 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2581 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2582 };
2583
2584 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2587 fn test_compatible_arrow_schema() {
2588 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2589
2590 use super::*;
2591 let risingwave_schema = Schema::new(vec![
2592 Field::with_name(DataType::Int32, "a"),
2593 Field::with_name(DataType::Int32, "b"),
2594 Field::with_name(DataType::Int32, "c"),
2595 ]);
2596 let arrow_schema = ArrowSchema::new(vec![
2597 ArrowField::new("a", ArrowDataType::Int32, false),
2598 ArrowField::new("b", ArrowDataType::Int32, false),
2599 ArrowField::new("c", ArrowDataType::Int32, false),
2600 ]);
2601
2602 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2603
2604 let risingwave_schema = Schema::new(vec![
2605 Field::with_name(DataType::Int32, "d"),
2606 Field::with_name(DataType::Int32, "c"),
2607 Field::with_name(DataType::Int32, "a"),
2608 Field::with_name(DataType::Int32, "b"),
2609 ]);
2610 let arrow_schema = ArrowSchema::new(vec![
2611 ArrowField::new("a", ArrowDataType::Int32, false),
2612 ArrowField::new("b", ArrowDataType::Int32, false),
2613 ArrowField::new("d", ArrowDataType::Int32, false),
2614 ArrowField::new("c", ArrowDataType::Int32, false),
2615 ]);
2616 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2617
2618 let risingwave_schema = Schema::new(vec![
2619 Field::with_name(
2620 DataType::Struct(StructType::new(vec![
2621 ("a1", DataType::Int32),
2622 (
2623 "a2",
2624 DataType::Struct(StructType::new(vec![
2625 ("a21", DataType::Bytea),
2626 (
2627 "a22",
2628 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2629 ),
2630 ])),
2631 ),
2632 ])),
2633 "a",
2634 ),
2635 Field::with_name(
2636 DataType::list(DataType::Struct(StructType::new(vec![
2637 ("b1", DataType::Int32),
2638 ("b2", DataType::Bytea),
2639 (
2640 "b3",
2641 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2642 ),
2643 ]))),
2644 "b",
2645 ),
2646 Field::with_name(
2647 DataType::Map(MapType::from_kv(
2648 DataType::Varchar,
2649 DataType::list(DataType::Struct(StructType::new([
2650 ("c1", DataType::Int32),
2651 ("c2", DataType::Bytea),
2652 (
2653 "c3",
2654 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2655 ),
2656 ]))),
2657 )),
2658 "c",
2659 ),
2660 ]);
2661 let arrow_schema = ArrowSchema::new(vec![
2662 ArrowField::new(
2663 "a",
2664 ArrowDataType::Struct(ArrowFields::from(vec![
2665 ArrowField::new("a1", ArrowDataType::Int32, false),
2666 ArrowField::new(
2667 "a2",
2668 ArrowDataType::Struct(ArrowFields::from(vec![
2669 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2670 ArrowField::new_map(
2671 "a22",
2672 "entries",
2673 ArrowFieldRef::new(ArrowField::new(
2674 "key",
2675 ArrowDataType::Utf8,
2676 false,
2677 )),
2678 ArrowFieldRef::new(ArrowField::new(
2679 "value",
2680 ArrowDataType::Utf8,
2681 false,
2682 )),
2683 false,
2684 false,
2685 ),
2686 ])),
2687 false,
2688 ),
2689 ])),
2690 false,
2691 ),
2692 ArrowField::new(
2693 "b",
2694 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2695 ArrowDataType::Struct(ArrowFields::from(vec![
2696 ArrowField::new("b1", ArrowDataType::Int32, false),
2697 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2698 ArrowField::new_map(
2699 "b3",
2700 "entries",
2701 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2702 ArrowFieldRef::new(ArrowField::new(
2703 "value",
2704 ArrowDataType::Utf8,
2705 false,
2706 )),
2707 false,
2708 false,
2709 ),
2710 ])),
2711 false,
2712 ))),
2713 false,
2714 ),
2715 ArrowField::new_map(
2716 "c",
2717 "entries",
2718 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2719 ArrowFieldRef::new(ArrowField::new(
2720 "value",
2721 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2722 ArrowDataType::Struct(ArrowFields::from(vec![
2723 ArrowField::new("c1", ArrowDataType::Int32, false),
2724 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2725 ArrowField::new_map(
2726 "c3",
2727 "entries",
2728 ArrowFieldRef::new(ArrowField::new(
2729 "key",
2730 ArrowDataType::Utf8,
2731 false,
2732 )),
2733 ArrowFieldRef::new(ArrowField::new(
2734 "value",
2735 ArrowDataType::Utf8,
2736 false,
2737 )),
2738 false,
2739 false,
2740 ),
2741 ])),
2742 false,
2743 ))),
2744 false,
2745 )),
2746 false,
2747 false,
2748 ),
2749 ]);
2750 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2751 }
2752
2753 #[test]
2754 fn test_parse_iceberg_config() {
2755 let values = [
2756 ("connector", "iceberg"),
2757 ("type", "upsert"),
2758 ("primary_key", "v1"),
2759 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2760 ("warehouse.path", "s3://iceberg"),
2761 ("s3.endpoint", "http://127.0.0.1:9301"),
2762 ("s3.access.key", "hummockadmin"),
2763 ("s3.secret.key", "hummockadmin"),
2764 ("s3.path.style.access", "true"),
2765 ("s3.region", "us-east-1"),
2766 ("catalog.type", "jdbc"),
2767 ("catalog.name", "demo"),
2768 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2769 ("catalog.jdbc.user", "admin"),
2770 ("catalog.jdbc.password", "123456"),
2771 ("database.name", "demo_db"),
2772 ("table.name", "demo_table"),
2773 ("enable_compaction", "true"),
2774 ("compaction_interval_sec", "1800"),
2775 ("enable_snapshot_expiration", "true"),
2776 ]
2777 .into_iter()
2778 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2779 .collect();
2780
2781 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2782
2783 let expected_iceberg_config = IcebergConfig {
2784 common: IcebergCommon {
2785 warehouse_path: Some("s3://iceberg".to_owned()),
2786 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2787 s3_region: Some("us-east-1".to_owned()),
2788 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2789 s3_access_key: Some("hummockadmin".to_owned()),
2790 s3_secret_key: Some("hummockadmin".to_owned()),
2791 s3_iam_role_arn: None,
2792 gcs_credential: None,
2793 catalog_type: Some("jdbc".to_owned()),
2794 glue_id: None,
2795 glue_region: None,
2796 glue_access_key: None,
2797 glue_secret_key: None,
2798 glue_iam_role_arn: None,
2799 catalog_name: Some("demo".to_owned()),
2800 s3_path_style_access: Some(true),
2801 catalog_credential: None,
2802 catalog_oauth2_server_uri: None,
2803 catalog_scope: None,
2804 catalog_token: None,
2805 enable_config_load: None,
2806 rest_signing_name: None,
2807 rest_signing_region: None,
2808 rest_sigv4_enabled: None,
2809 hosted_catalog: None,
2810 azblob_account_name: None,
2811 azblob_account_key: None,
2812 azblob_endpoint_url: None,
2813 catalog_header: None,
2814 adlsgen2_account_name: None,
2815 adlsgen2_account_key: None,
2816 adlsgen2_endpoint: None,
2817 vended_credentials: None,
2818 },
2819 table: IcebergTableIdentifier {
2820 database_name: Some("demo_db".to_owned()),
2821 table_name: "demo_table".to_owned(),
2822 },
2823 r#type: "upsert".to_owned(),
2824 force_append_only: false,
2825 primary_key: Some(vec!["v1".to_owned()]),
2826 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2827 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2828 .into_iter()
2829 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2830 .collect(),
2831 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2832 create_table_if_not_exists: false,
2833 is_exactly_once: Some(true),
2834 commit_retry_num: 8,
2835 enable_compaction: true,
2836 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2837 enable_snapshot_expiration: true,
2838 write_mode: IcebergWriteMode::MergeOnRead,
2839 snapshot_expiration_max_age_millis: None,
2840 snapshot_expiration_retain_last: None,
2841 snapshot_expiration_clear_expired_files: true,
2842 snapshot_expiration_clear_expired_meta_data: true,
2843 max_snapshots_num_before_compaction: None,
2844 small_files_threshold_mb: None,
2845 delete_files_count_threshold: None,
2846 trigger_snapshot_count: None,
2847 target_file_size_mb: None,
2848 compaction_type: None,
2849 };
2850
2851 assert_eq!(iceberg_config, expected_iceberg_config);
2852
2853 assert_eq!(
2854 &iceberg_config.full_table_name().unwrap().to_string(),
2855 "demo_db.demo_table"
2856 );
2857 }
2858
2859 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2860 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2861
2862 let _table = iceberg_config.load_table().await.unwrap();
2863 }
2864
2865 #[tokio::test]
2866 #[ignore]
2867 async fn test_storage_catalog() {
2868 let values = [
2869 ("connector", "iceberg"),
2870 ("type", "append-only"),
2871 ("force_append_only", "true"),
2872 ("s3.endpoint", "http://127.0.0.1:9301"),
2873 ("s3.access.key", "hummockadmin"),
2874 ("s3.secret.key", "hummockadmin"),
2875 ("s3.region", "us-east-1"),
2876 ("s3.path.style.access", "true"),
2877 ("catalog.name", "demo"),
2878 ("catalog.type", "storage"),
2879 ("warehouse.path", "s3://icebergdata/demo"),
2880 ("database.name", "s1"),
2881 ("table.name", "t1"),
2882 ]
2883 .into_iter()
2884 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2885 .collect();
2886
2887 test_create_catalog(values).await;
2888 }
2889
2890 #[tokio::test]
2891 #[ignore]
2892 async fn test_rest_catalog() {
2893 let values = [
2894 ("connector", "iceberg"),
2895 ("type", "append-only"),
2896 ("force_append_only", "true"),
2897 ("s3.endpoint", "http://127.0.0.1:9301"),
2898 ("s3.access.key", "hummockadmin"),
2899 ("s3.secret.key", "hummockadmin"),
2900 ("s3.region", "us-east-1"),
2901 ("s3.path.style.access", "true"),
2902 ("catalog.name", "demo"),
2903 ("catalog.type", "rest"),
2904 ("catalog.uri", "http://192.168.167.4:8181"),
2905 ("warehouse.path", "s3://icebergdata/demo"),
2906 ("database.name", "s1"),
2907 ("table.name", "t1"),
2908 ]
2909 .into_iter()
2910 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2911 .collect();
2912
2913 test_create_catalog(values).await;
2914 }
2915
2916 #[tokio::test]
2917 #[ignore]
2918 async fn test_jdbc_catalog() {
2919 let values = [
2920 ("connector", "iceberg"),
2921 ("type", "append-only"),
2922 ("force_append_only", "true"),
2923 ("s3.endpoint", "http://127.0.0.1:9301"),
2924 ("s3.access.key", "hummockadmin"),
2925 ("s3.secret.key", "hummockadmin"),
2926 ("s3.region", "us-east-1"),
2927 ("s3.path.style.access", "true"),
2928 ("catalog.name", "demo"),
2929 ("catalog.type", "jdbc"),
2930 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2931 ("catalog.jdbc.user", "admin"),
2932 ("catalog.jdbc.password", "123456"),
2933 ("warehouse.path", "s3://icebergdata/demo"),
2934 ("database.name", "s1"),
2935 ("table.name", "t1"),
2936 ]
2937 .into_iter()
2938 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2939 .collect();
2940
2941 test_create_catalog(values).await;
2942 }
2943
2944 #[tokio::test]
2945 #[ignore]
2946 async fn test_hive_catalog() {
2947 let values = [
2948 ("connector", "iceberg"),
2949 ("type", "append-only"),
2950 ("force_append_only", "true"),
2951 ("s3.endpoint", "http://127.0.0.1:9301"),
2952 ("s3.access.key", "hummockadmin"),
2953 ("s3.secret.key", "hummockadmin"),
2954 ("s3.region", "us-east-1"),
2955 ("s3.path.style.access", "true"),
2956 ("catalog.name", "demo"),
2957 ("catalog.type", "hive"),
2958 ("catalog.uri", "thrift://localhost:9083"),
2959 ("warehouse.path", "s3://icebergdata/demo"),
2960 ("database.name", "s1"),
2961 ("table.name", "t1"),
2962 ]
2963 .into_iter()
2964 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2965 .collect();
2966
2967 test_create_catalog(values).await;
2968 }
2969
2970 #[test]
2971 fn test_config_constants_consistency() {
2972 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2975 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2976 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2977 assert_eq!(WRITE_MODE, "write_mode");
2978 assert_eq!(
2979 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2980 "snapshot_expiration_retain_last"
2981 );
2982 assert_eq!(
2983 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2984 "snapshot_expiration_max_age_millis"
2985 );
2986 assert_eq!(
2987 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2988 "snapshot_expiration_clear_expired_files"
2989 );
2990 assert_eq!(
2991 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2992 "snapshot_expiration_clear_expired_meta_data"
2993 );
2994 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2995 }
2996
2997 #[test]
2998 fn test_parse_iceberg_compaction_config() {
2999 let values = [
3001 ("connector", "iceberg"),
3002 ("type", "upsert"),
3003 ("primary_key", "id"),
3004 ("warehouse.path", "s3://iceberg"),
3005 ("s3.endpoint", "http://127.0.0.1:9301"),
3006 ("s3.access.key", "test"),
3007 ("s3.secret.key", "test"),
3008 ("s3.region", "us-east-1"),
3009 ("catalog.type", "storage"),
3010 ("catalog.name", "demo"),
3011 ("database.name", "test_db"),
3012 ("table.name", "test_table"),
3013 ("enable_compaction", "true"),
3014 ("compaction.max_snapshots_num", "100"),
3015 ("compaction.small_files_threshold_mb", "512"),
3016 ("compaction.delete_files_count_threshold", "50"),
3017 ("compaction.trigger_snapshot_count", "10"),
3018 ("compaction.target_file_size_mb", "256"),
3019 ("compaction.type", "full"),
3020 ]
3021 .into_iter()
3022 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3023 .collect();
3024
3025 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3026
3027 assert!(iceberg_config.enable_compaction);
3029 assert_eq!(
3030 iceberg_config.max_snapshots_num_before_compaction,
3031 Some(100)
3032 );
3033 assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
3034 assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
3035 assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
3036 assert_eq!(iceberg_config.target_file_size_mb, Some(256));
3037 assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
3038 }
3039}