1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, MAIN_BRANCH, Operation, PartitionSpecRef, SchemaRef as IcebergSchemaRef,
31 SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
32};
33use iceberg::table::Table;
34use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
35use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
36use iceberg::writer::base_writer::equality_delete_writer::{
37 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
38};
39use iceberg::writer::base_writer::position_delete_file_writer::{
40 POSITION_DELETE_SCHEMA, PositionDeleteFileWriterBuilder,
41};
42use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
43use iceberg::writer::file_writer::ParquetWriterBuilder;
44use iceberg::writer::file_writer::location_generator::{
45 DefaultFileNameGenerator, DefaultLocationGenerator,
46};
47use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
48use iceberg::writer::task_writer::TaskWriter;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58 Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::{Field, Schema};
65use risingwave_common::error::IcebergError;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use risingwave_pb::stream_plan::PbSinkSchemaChange;
72use serde::{Deserialize, Serialize};
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::RetryIf;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87 SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::writer::SinkWriter;
94use crate::sink::{
95 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
96 TwoPhaseCommitCoordinator,
97};
98use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
99
100pub const ICEBERG_SINK: &str = "iceberg";
101
102pub const ICEBERG_COW_BRANCH: &str = "ingestion";
103pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
104pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
105pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
106pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
107pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
108
109pub const PARTITION_DATA_ID_START: i32 = 1000;
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
112#[serde(rename_all = "kebab-case")]
113pub enum IcebergWriteMode {
114 #[default]
115 MergeOnRead,
116 CopyOnWrite,
117}
118
119impl IcebergWriteMode {
120 pub fn as_str(self) -> &'static str {
121 match self {
122 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
123 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
124 }
125 }
126}
127
128impl std::str::FromStr for IcebergWriteMode {
129 type Err = SinkError;
130
131 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
132 match s {
133 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
134 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
135 _ => Err(SinkError::Config(anyhow!(format!(
136 "invalid write_mode: {}, must be one of: {}, {}",
137 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
138 )))),
139 }
140 }
141}
142
143impl TryFrom<&str> for IcebergWriteMode {
144 type Error = <Self as std::str::FromStr>::Err;
145
146 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
147 value.parse()
148 }
149}
150
151impl TryFrom<String> for IcebergWriteMode {
152 type Error = <Self as std::str::FromStr>::Err;
153
154 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
155 value.as_str().parse()
156 }
157}
158
159impl std::fmt::Display for IcebergWriteMode {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 f.write_str(self.as_str())
162 }
163}
164
165pub const ENABLE_COMPACTION: &str = "enable_compaction";
167pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
168pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
169pub const WRITE_MODE: &str = "write_mode";
170pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
171pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
172pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
173pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
174 "snapshot_expiration_clear_expired_meta_data";
175pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
176
177pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
178
179pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
180
181pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
182
183pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
184
185pub const COMPACTION_TYPE: &str = "compaction.type";
186
187fn default_commit_retry_num() -> u32 {
188 8
189}
190
191fn default_iceberg_write_mode() -> IcebergWriteMode {
192 IcebergWriteMode::MergeOnRead
193}
194
195fn default_true() -> bool {
196 true
197}
198
199fn default_some_true() -> Option<bool> {
200 Some(true)
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
205#[serde(rename_all = "kebab-case")]
206pub enum CompactionType {
207 #[default]
209 Full,
210 SmallFiles,
212 FilesWithDelete,
214}
215
216impl CompactionType {
217 pub fn as_str(&self) -> &'static str {
218 match self {
219 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
220 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
221 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
222 }
223 }
224}
225
226impl std::str::FromStr for CompactionType {
227 type Err = SinkError;
228
229 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
230 match s {
231 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
232 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
233 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
234 _ => Err(SinkError::Config(anyhow!(format!(
235 "invalid compaction_type: {}, must be one of: {}, {}, {}",
236 s,
237 ICEBERG_COMPACTION_TYPE_FULL,
238 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
239 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
240 )))),
241 }
242 }
243}
244
245impl TryFrom<&str> for CompactionType {
246 type Error = <Self as std::str::FromStr>::Err;
247
248 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
249 value.parse()
250 }
251}
252
253impl TryFrom<String> for CompactionType {
254 type Error = <Self as std::str::FromStr>::Err;
255
256 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
257 value.as_str().parse()
258 }
259}
260
261impl std::fmt::Display for CompactionType {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 write!(f, "{}", self.as_str())
264 }
265}
266
267#[serde_as]
268#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
269pub struct IcebergConfig {
270 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
273 pub force_append_only: bool,
274
275 #[serde(flatten)]
276 common: IcebergCommon,
277
278 #[serde(flatten)]
279 table: IcebergTableIdentifier,
280
281 #[serde(
282 rename = "primary_key",
283 default,
284 deserialize_with = "deserialize_optional_string_seq_from_string"
285 )]
286 pub primary_key: Option<Vec<String>>,
287
288 #[serde(skip)]
290 pub java_catalog_props: HashMap<String, String>,
291
292 #[serde(default)]
293 pub partition_by: Option<String>,
294
295 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
297 #[serde_as(as = "DisplayFromStr")]
298 #[with_option(allow_alter_on_fly)]
299 pub commit_checkpoint_interval: u64,
300
301 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
302 pub create_table_if_not_exists: bool,
303
304 #[serde(default = "default_some_true")]
306 #[serde_as(as = "Option<DisplayFromStr>")]
307 pub is_exactly_once: Option<bool>,
308 #[serde(default = "default_commit_retry_num")]
313 pub commit_retry_num: u32,
314
315 #[serde(
317 rename = "enable_compaction",
318 default,
319 deserialize_with = "deserialize_bool_from_string"
320 )]
321 #[with_option(allow_alter_on_fly)]
322 pub enable_compaction: bool,
323
324 #[serde(rename = "compaction_interval_sec", default)]
326 #[serde_as(as = "Option<DisplayFromStr>")]
327 #[with_option(allow_alter_on_fly)]
328 pub compaction_interval_sec: Option<u64>,
329
330 #[serde(
332 rename = "enable_snapshot_expiration",
333 default,
334 deserialize_with = "deserialize_bool_from_string"
335 )]
336 #[with_option(allow_alter_on_fly)]
337 pub enable_snapshot_expiration: bool,
338
339 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
341 pub write_mode: IcebergWriteMode,
342
343 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
346 #[serde_as(as = "Option<DisplayFromStr>")]
347 #[with_option(allow_alter_on_fly)]
348 pub snapshot_expiration_max_age_millis: Option<i64>,
349
350 #[serde(rename = "snapshot_expiration_retain_last", default)]
352 #[serde_as(as = "Option<DisplayFromStr>")]
353 #[with_option(allow_alter_on_fly)]
354 pub snapshot_expiration_retain_last: Option<i32>,
355
356 #[serde(
357 rename = "snapshot_expiration_clear_expired_files",
358 default = "default_true",
359 deserialize_with = "deserialize_bool_from_string"
360 )]
361 #[with_option(allow_alter_on_fly)]
362 pub snapshot_expiration_clear_expired_files: bool,
363
364 #[serde(
365 rename = "snapshot_expiration_clear_expired_meta_data",
366 default = "default_true",
367 deserialize_with = "deserialize_bool_from_string"
368 )]
369 #[with_option(allow_alter_on_fly)]
370 pub snapshot_expiration_clear_expired_meta_data: bool,
371
372 #[serde(rename = "compaction.max_snapshots_num", default)]
375 #[serde_as(as = "Option<DisplayFromStr>")]
376 #[with_option(allow_alter_on_fly)]
377 pub max_snapshots_num_before_compaction: Option<usize>,
378
379 #[serde(rename = "compaction.small_files_threshold_mb", default)]
380 #[serde_as(as = "Option<DisplayFromStr>")]
381 #[with_option(allow_alter_on_fly)]
382 pub small_files_threshold_mb: Option<u64>,
383
384 #[serde(rename = "compaction.delete_files_count_threshold", default)]
385 #[serde_as(as = "Option<DisplayFromStr>")]
386 #[with_option(allow_alter_on_fly)]
387 pub delete_files_count_threshold: Option<usize>,
388
389 #[serde(rename = "compaction.trigger_snapshot_count", default)]
390 #[serde_as(as = "Option<DisplayFromStr>")]
391 #[with_option(allow_alter_on_fly)]
392 pub trigger_snapshot_count: Option<usize>,
393
394 #[serde(rename = "compaction.target_file_size_mb", default)]
395 #[serde_as(as = "Option<DisplayFromStr>")]
396 #[with_option(allow_alter_on_fly)]
397 pub target_file_size_mb: Option<u64>,
398
399 #[serde(rename = "compaction.type", default)]
402 #[with_option(allow_alter_on_fly)]
403 pub compaction_type: Option<CompactionType>,
404}
405
406impl EnforceSecret for IcebergConfig {
407 fn enforce_secret<'a>(
408 prop_iter: impl Iterator<Item = &'a str>,
409 ) -> crate::error::ConnectorResult<()> {
410 for prop in prop_iter {
411 IcebergCommon::enforce_one(prop)?;
412 }
413 Ok(())
414 }
415
416 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
417 IcebergCommon::enforce_one(prop)
418 }
419}
420
421impl IcebergConfig {
422 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
423 let mut config =
424 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
425 .map_err(|e| SinkError::Config(anyhow!(e)))?;
426
427 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
428 return Err(SinkError::Config(anyhow!(
429 "`{}` must be {}, or {}",
430 SINK_TYPE_OPTION,
431 SINK_TYPE_APPEND_ONLY,
432 SINK_TYPE_UPSERT
433 )));
434 }
435
436 if config.r#type == SINK_TYPE_UPSERT {
437 if let Some(primary_key) = &config.primary_key {
438 if primary_key.is_empty() {
439 return Err(SinkError::Config(anyhow!(
440 "`primary-key` must not be empty in {}",
441 SINK_TYPE_UPSERT
442 )));
443 }
444 } else {
445 return Err(SinkError::Config(anyhow!(
446 "Must set `primary-key` in {}",
447 SINK_TYPE_UPSERT
448 )));
449 }
450 }
451
452 config.java_catalog_props = values
454 .iter()
455 .filter(|(k, _v)| {
456 k.starts_with("catalog.")
457 && k != &"catalog.uri"
458 && k != &"catalog.type"
459 && k != &"catalog.name"
460 && k != &"catalog.header"
461 })
462 .map(|(k, v)| (k[8..].to_string(), v.clone()))
463 .collect();
464
465 if config.commit_checkpoint_interval == 0 {
466 return Err(SinkError::Config(anyhow!(
467 "`commit-checkpoint-interval` must be greater than 0"
468 )));
469 }
470
471 Ok(config)
472 }
473
474 pub fn catalog_type(&self) -> &str {
475 self.common.catalog_type()
476 }
477
478 pub async fn load_table(&self) -> Result<Table> {
479 self.common
480 .load_table(&self.table, &self.java_catalog_props)
481 .await
482 .map_err(Into::into)
483 }
484
485 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
486 self.common
487 .create_catalog(&self.java_catalog_props)
488 .await
489 .map_err(Into::into)
490 }
491
492 pub fn full_table_name(&self) -> Result<TableIdent> {
493 self.table.to_table_ident().map_err(Into::into)
494 }
495
496 pub fn catalog_name(&self) -> String {
497 self.common.catalog_name()
498 }
499
500 pub fn compaction_interval_sec(&self) -> u64 {
501 self.compaction_interval_sec.unwrap_or(3600)
503 }
504
505 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
508 self.snapshot_expiration_max_age_millis
509 .map(|max_age_millis| current_time_ms - max_age_millis)
510 }
511
512 pub fn trigger_snapshot_count(&self) -> usize {
513 self.trigger_snapshot_count.unwrap_or(16)
514 }
515
516 pub fn small_files_threshold_mb(&self) -> u64 {
517 self.small_files_threshold_mb.unwrap_or(64)
518 }
519
520 pub fn delete_files_count_threshold(&self) -> usize {
521 self.delete_files_count_threshold.unwrap_or(256)
522 }
523
524 pub fn target_file_size_mb(&self) -> u64 {
525 self.target_file_size_mb.unwrap_or(1024)
526 }
527
528 pub fn compaction_type(&self) -> CompactionType {
531 self.compaction_type.unwrap_or_default()
532 }
533}
534
535pub struct IcebergSink {
536 pub config: IcebergConfig,
537 param: SinkParam,
538 unique_column_ids: Option<Vec<usize>>,
540}
541
542impl EnforceSecret for IcebergSink {
543 fn enforce_secret<'a>(
544 prop_iter: impl Iterator<Item = &'a str>,
545 ) -> crate::error::ConnectorResult<()> {
546 for prop in prop_iter {
547 IcebergConfig::enforce_one(prop)?;
548 }
549 Ok(())
550 }
551}
552
553impl TryFrom<SinkParam> for IcebergSink {
554 type Error = SinkError;
555
556 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
557 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
558 IcebergSink::new(config, param)
559 }
560}
561
562impl Debug for IcebergSink {
563 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564 f.debug_struct("IcebergSink")
565 .field("config", &self.config)
566 .finish()
567 }
568}
569
570async fn create_and_validate_table_impl(
571 config: &IcebergConfig,
572 param: &SinkParam,
573) -> Result<Table> {
574 if config.create_table_if_not_exists {
575 create_table_if_not_exists_impl(config, param).await?;
576 }
577
578 let table = config
579 .load_table()
580 .await
581 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
582
583 let sink_schema = param.schema();
584 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
585 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
586
587 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
588 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
589
590 Ok(table)
591}
592
593async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
594 let catalog = config.create_catalog().await?;
595 let namespace = if let Some(database_name) = config.table.database_name() {
596 let namespace = NamespaceIdent::new(database_name.to_owned());
597 if !catalog
598 .namespace_exists(&namespace)
599 .await
600 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
601 {
602 catalog
603 .create_namespace(&namespace, HashMap::default())
604 .await
605 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
606 .context("failed to create iceberg namespace")?;
607 }
608 namespace
609 } else {
610 bail!("database name must be set if you want to create table")
611 };
612
613 let table_id = config
614 .full_table_name()
615 .context("Unable to parse table name")?;
616 if !catalog
617 .table_exists(&table_id)
618 .await
619 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
620 {
621 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
622 let arrow_fields = param
624 .columns
625 .iter()
626 .map(|column| {
627 Ok(iceberg_create_table_arrow_convert
628 .to_arrow_field(&column.name, &column.data_type)
629 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
630 .context(format!(
631 "failed to convert {}: {} to arrow type",
632 &column.name, &column.data_type
633 ))?)
634 })
635 .collect::<Result<Vec<ArrowField>>>()?;
636 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
637 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
638 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
639 .context("failed to convert arrow schema to iceberg schema")?;
640
641 let location = {
642 let mut names = namespace.clone().inner();
643 names.push(config.table.table_name().to_owned());
644 match &config.common.warehouse_path {
645 Some(warehouse_path) => {
646 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
647 let url = Url::parse(warehouse_path);
648 if url.is_err() || is_s3_tables {
649 if config.common.catalog_type() == "rest"
652 || config.common.catalog_type() == "rest_rust"
653 {
654 None
655 } else {
656 bail!(format!("Invalid warehouse path: {}", warehouse_path))
657 }
658 } else if warehouse_path.ends_with('/') {
659 Some(format!("{}{}", warehouse_path, names.join("/")))
660 } else {
661 Some(format!("{}/{}", warehouse_path, names.join("/")))
662 }
663 }
664 None => None,
665 }
666 };
667
668 let partition_spec = match &config.partition_by {
669 Some(partition_by) => {
670 let mut partition_fields = Vec::<UnboundPartitionField>::new();
671 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
672 .into_iter()
673 .enumerate()
674 {
675 match iceberg_schema.field_id_by_name(&column) {
676 Some(id) => partition_fields.push(
677 UnboundPartitionField::builder()
678 .source_id(id)
679 .transform(transform)
680 .name(format!("_p_{}", column))
681 .field_id(PARTITION_DATA_ID_START + i as i32)
682 .build(),
683 ),
684 None => bail!(format!(
685 "Partition source column does not exist in schema: {}",
686 column
687 )),
688 };
689 }
690 Some(
691 UnboundPartitionSpec::builder()
692 .with_spec_id(0)
693 .add_partition_fields(partition_fields)
694 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
695 .context("failed to add partition columns")?
696 .build(),
697 )
698 }
699 None => None,
700 };
701
702 let table_creation_builder = TableCreation::builder()
703 .name(config.table.table_name().to_owned())
704 .schema(iceberg_schema);
705
706 let table_creation = match (location, partition_spec) {
707 (Some(location), Some(partition_spec)) => table_creation_builder
708 .location(location)
709 .partition_spec(partition_spec)
710 .build(),
711 (Some(location), None) => table_creation_builder.location(location).build(),
712 (None, Some(partition_spec)) => table_creation_builder
713 .partition_spec(partition_spec)
714 .build(),
715 (None, None) => table_creation_builder.build(),
716 };
717
718 catalog
719 .create_table(&namespace, table_creation)
720 .await
721 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
722 .context("failed to create iceberg table")?;
723 }
724 Ok(())
725}
726
727impl IcebergSink {
728 pub async fn create_and_validate_table(&self) -> Result<Table> {
729 create_and_validate_table_impl(&self.config, &self.param).await
730 }
731
732 pub async fn create_table_if_not_exists(&self) -> Result<()> {
733 create_table_if_not_exists_impl(&self.config, &self.param).await
734 }
735
736 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
737 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
738 if let Some(pk) = &config.primary_key {
739 let mut unique_column_ids = Vec::with_capacity(pk.len());
740 for col_name in pk {
741 let id = param
742 .columns
743 .iter()
744 .find(|col| col.name.as_str() == col_name)
745 .ok_or_else(|| {
746 SinkError::Config(anyhow!(
747 "Primary key column {} not found in sink schema",
748 col_name
749 ))
750 })?
751 .column_id
752 .get_id() as usize;
753 unique_column_ids.push(id);
754 }
755 Some(unique_column_ids)
756 } else {
757 unreachable!()
758 }
759 } else {
760 None
761 };
762 Ok(Self {
763 config,
764 param,
765 unique_column_ids,
766 })
767 }
768}
769
770impl Sink for IcebergSink {
771 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
772
773 const SINK_NAME: &'static str = ICEBERG_SINK;
774
775 async fn validate(&self) -> Result<()> {
776 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
777 bail!("Snowflake catalog only supports iceberg sources");
778 }
779
780 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
781 risingwave_common::license::Feature::IcebergSinkWithGlue
782 .check_available()
783 .map_err(|e| anyhow::anyhow!(e))?;
784 }
785
786 let compaction_type = self.config.compaction_type();
788
789 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
792 && compaction_type != CompactionType::Full
793 {
794 bail!(
795 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
796 compaction_type
797 );
798 }
799
800 match compaction_type {
801 CompactionType::SmallFiles => {
802 risingwave_common::license::Feature::IcebergCompaction
804 .check_available()
805 .map_err(|e| anyhow::anyhow!(e))?;
806
807 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
809 bail!(
810 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
811 self.config.write_mode
812 );
813 }
814
815 if self.config.delete_files_count_threshold.is_some() {
817 bail!(
818 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
819 );
820 }
821 }
822 CompactionType::FilesWithDelete => {
823 risingwave_common::license::Feature::IcebergCompaction
825 .check_available()
826 .map_err(|e| anyhow::anyhow!(e))?;
827
828 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
830 bail!(
831 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
832 self.config.write_mode
833 );
834 }
835
836 if self.config.small_files_threshold_mb.is_some() {
838 bail!(
839 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
840 );
841 }
842 }
843 CompactionType::Full => {
844 }
846 }
847
848 let _ = self.create_and_validate_table().await?;
849 Ok(())
850 }
851
852 fn support_schema_change() -> bool {
853 true
854 }
855
856 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
857 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
858
859 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
861 if iceberg_config.enable_compaction && compaction_interval == 0 {
862 bail!(
863 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
864 );
865 }
866
867 tracing::info!(
869 "Alter config compaction_interval set to {} seconds",
870 compaction_interval
871 );
872 }
873
874 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
876 && max_snapshots < 1
877 {
878 bail!(
879 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
880 max_snapshots
881 );
882 }
883
884 Ok(())
885 }
886
887 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
888 let writer = IcebergSinkWriter::new(
889 self.config.clone(),
890 self.param.clone(),
891 writer_param.clone(),
892 self.unique_column_ids.clone(),
893 );
894
895 let commit_checkpoint_interval =
896 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
897 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
898 );
899 let log_sinker = CoordinatedLogSinker::new(
900 &writer_param,
901 self.param.clone(),
902 writer,
903 commit_checkpoint_interval,
904 )
905 .await?;
906
907 Ok(log_sinker)
908 }
909
910 fn is_coordinated_sink(&self) -> bool {
911 true
912 }
913
914 async fn new_coordinator(
915 &self,
916 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
917 ) -> Result<SinkCommitCoordinator> {
918 let catalog = self.config.create_catalog().await?;
919 let table = self.create_and_validate_table().await?;
920 let coordinator = IcebergSinkCommitter {
921 catalog,
922 table,
923 last_commit_epoch: 0,
924 sink_id: self.param.sink_id,
925 config: self.config.clone(),
926 param: self.param.clone(),
927 commit_retry_num: self.config.commit_retry_num,
928 iceberg_compact_stat_sender,
929 };
930 if self.config.is_exactly_once.unwrap_or_default() {
931 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
932 } else {
933 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
934 }
935 }
936}
937
938enum ProjectIdxVec {
945 None,
946 Prepare(usize),
947 Done(Vec<usize>),
948}
949
950type DataFileWriterBuilderType =
951 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
952type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
953 ParquetWriterBuilder,
954 DefaultLocationGenerator,
955 DefaultFileNameGenerator,
956>;
957type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
958 ParquetWriterBuilder,
959 DefaultLocationGenerator,
960 DefaultFileNameGenerator,
961>;
962
963#[derive(Clone)]
964struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
965 inner: B,
966 fanout_enabled: bool,
967 schema: IcebergSchemaRef,
968 partition_spec: PartitionSpecRef,
969 compute_partition: bool,
970}
971
972impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
973 fn new(
974 inner: B,
975 fanout_enabled: bool,
976 schema: IcebergSchemaRef,
977 partition_spec: PartitionSpecRef,
978 compute_partition: bool,
979 ) -> Self {
980 Self {
981 inner,
982 fanout_enabled,
983 schema,
984 partition_spec,
985 compute_partition,
986 }
987 }
988
989 fn build(self) -> iceberg::Result<TaskWriter<B>> {
990 let partition_splitter = match (
991 self.partition_spec.is_unpartitioned(),
992 self.compute_partition,
993 ) {
994 (true, _) => None,
995 (false, true) => Some(RecordBatchPartitionSplitter::new_with_computed_values(
996 self.schema.clone(),
997 self.partition_spec.clone(),
998 )?),
999 (false, false) => Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
1000 self.schema.clone(),
1001 self.partition_spec.clone(),
1002 )?),
1003 };
1004
1005 Ok(TaskWriter::new_with_partition_splitter(
1006 self.inner,
1007 self.fanout_enabled,
1008 self.schema,
1009 self.partition_spec,
1010 partition_splitter,
1011 ))
1012 }
1013}
1014
1015pub enum IcebergSinkWriter {
1016 Created(IcebergSinkWriterArgs),
1017 Initialized(IcebergSinkWriterInner),
1018}
1019
1020pub struct IcebergSinkWriterArgs {
1021 config: IcebergConfig,
1022 sink_param: SinkParam,
1023 writer_param: SinkWriterParam,
1024 unique_column_ids: Option<Vec<usize>>,
1025}
1026
1027pub struct IcebergSinkWriterInner {
1028 writer: IcebergWriterDispatch,
1029 arrow_schema: SchemaRef,
1030 metrics: IcebergWriterMetrics,
1032 table: Table,
1034 project_idx_vec: ProjectIdxVec,
1037}
1038
1039#[allow(clippy::type_complexity)]
1040enum IcebergWriterDispatch {
1041 Append {
1042 writer: Option<Box<dyn IcebergWriter>>,
1043 writer_builder:
1044 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1045 },
1046 Upsert {
1047 writer: Option<Box<dyn IcebergWriter>>,
1048 writer_builder: TaskWriterBuilderWrapper<
1049 MonitoredGeneralWriterBuilder<
1050 DeltaWriterBuilder<
1051 DataFileWriterBuilderType,
1052 PositionDeleteFileWriterBuilderType,
1053 EqualityDeleteFileWriterBuilderType,
1054 >,
1055 >,
1056 >,
1057 arrow_schema_with_op_column: SchemaRef,
1058 },
1059}
1060
1061impl IcebergWriterDispatch {
1062 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1063 match self {
1064 IcebergWriterDispatch::Append { writer, .. }
1065 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1066 }
1067 }
1068}
1069
1070pub struct IcebergWriterMetrics {
1071 _write_qps: LabelGuardedIntCounter,
1076 _write_latency: LabelGuardedHistogram,
1077 write_bytes: LabelGuardedIntCounter,
1078}
1079
1080impl IcebergSinkWriter {
1081 pub fn new(
1082 config: IcebergConfig,
1083 sink_param: SinkParam,
1084 writer_param: SinkWriterParam,
1085 unique_column_ids: Option<Vec<usize>>,
1086 ) -> Self {
1087 Self::Created(IcebergSinkWriterArgs {
1088 config,
1089 sink_param,
1090 writer_param,
1091 unique_column_ids,
1092 })
1093 }
1094}
1095
1096impl IcebergSinkWriterInner {
1097 fn build_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
1098 let SinkWriterParam {
1099 extra_partition_col_idx,
1100 actor_id,
1101 sink_id,
1102 sink_name,
1103 ..
1104 } = writer_param;
1105 let metrics_labels = [
1106 &actor_id.to_string(),
1107 &sink_id.to_string(),
1108 sink_name.as_str(),
1109 ];
1110
1111 let write_qps = GLOBAL_SINK_METRICS
1113 .iceberg_write_qps
1114 .with_guarded_label_values(&metrics_labels);
1115 let write_latency = GLOBAL_SINK_METRICS
1116 .iceberg_write_latency
1117 .with_guarded_label_values(&metrics_labels);
1118 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1121 .iceberg_rolling_unflushed_data_file
1122 .with_guarded_label_values(&metrics_labels);
1123 let write_bytes = GLOBAL_SINK_METRICS
1124 .iceberg_write_bytes
1125 .with_guarded_label_values(&metrics_labels);
1126
1127 let schema = table.metadata().current_schema();
1128 let partition_spec = table.metadata().default_partition_spec();
1129 let fanout_enabled = !partition_spec.fields().is_empty();
1130
1131 let unique_uuid_suffix = Uuid::now_v7();
1133
1134 let parquet_writer_properties = WriterProperties::builder()
1135 .set_max_row_group_size(
1136 writer_param
1137 .streaming_config
1138 .developer
1139 .iceberg_sink_write_parquet_max_row_group_rows,
1140 )
1141 .build();
1142
1143 let parquet_writer_builder =
1144 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1145 let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
1146 parquet_writer_builder,
1147 table.file_io().clone(),
1148 DefaultLocationGenerator::new(table.metadata().clone())
1149 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1150 DefaultFileNameGenerator::new(
1151 writer_param.actor_id.to_string(),
1152 Some(unique_uuid_suffix.to_string()),
1153 iceberg::spec::DataFileFormat::Parquet,
1154 ),
1155 );
1156 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1157 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1158 data_file_builder,
1159 write_qps.clone(),
1160 write_latency.clone(),
1161 );
1162 let writer_builder = TaskWriterBuilderWrapper::new(
1163 monitored_builder,
1164 fanout_enabled,
1165 schema.clone(),
1166 partition_spec.clone(),
1167 true,
1168 );
1169 let inner_writer = Some(Box::new(
1170 writer_builder
1171 .clone()
1172 .build()
1173 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1174 ) as Box<dyn IcebergWriter>);
1175 Ok(Self {
1176 arrow_schema: Arc::new(
1177 schema_to_arrow_schema(table.metadata().current_schema())
1178 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1179 ),
1180 metrics: IcebergWriterMetrics {
1181 _write_qps: write_qps,
1182 _write_latency: write_latency,
1183 write_bytes,
1184 },
1185 writer: IcebergWriterDispatch::Append {
1186 writer: inner_writer,
1187 writer_builder,
1188 },
1189 table,
1190 project_idx_vec: {
1191 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1192 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1193 } else {
1194 ProjectIdxVec::None
1195 }
1196 },
1197 })
1198 }
1199
1200 fn build_upsert(
1201 table: Table,
1202 unique_column_ids: Vec<usize>,
1203 writer_param: &SinkWriterParam,
1204 ) -> Result<Self> {
1205 let SinkWriterParam {
1206 extra_partition_col_idx,
1207 actor_id,
1208 sink_id,
1209 sink_name,
1210 ..
1211 } = writer_param;
1212 let metrics_labels = [
1213 &actor_id.to_string(),
1214 &sink_id.to_string(),
1215 sink_name.as_str(),
1216 ];
1217 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1218
1219 let write_qps = GLOBAL_SINK_METRICS
1221 .iceberg_write_qps
1222 .with_guarded_label_values(&metrics_labels);
1223 let write_latency = GLOBAL_SINK_METRICS
1224 .iceberg_write_latency
1225 .with_guarded_label_values(&metrics_labels);
1226 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1229 .iceberg_rolling_unflushed_data_file
1230 .with_guarded_label_values(&metrics_labels);
1231 let write_bytes = GLOBAL_SINK_METRICS
1232 .iceberg_write_bytes
1233 .with_guarded_label_values(&metrics_labels);
1234
1235 let schema = table.metadata().current_schema();
1237 let partition_spec = table.metadata().default_partition_spec();
1238 let fanout_enabled = !partition_spec.fields().is_empty();
1239
1240 let unique_uuid_suffix = Uuid::now_v7();
1242
1243 let parquet_writer_properties = WriterProperties::builder()
1244 .set_max_row_group_size(
1245 writer_param
1246 .streaming_config
1247 .developer
1248 .iceberg_sink_write_parquet_max_row_group_rows,
1249 )
1250 .build();
1251
1252 let data_file_builder = {
1253 let parquet_writer_builder =
1254 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1255 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1256 parquet_writer_builder,
1257 table.file_io().clone(),
1258 DefaultLocationGenerator::new(table.metadata().clone())
1259 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1260 DefaultFileNameGenerator::new(
1261 writer_param.actor_id.to_string(),
1262 Some(unique_uuid_suffix.to_string()),
1263 iceberg::spec::DataFileFormat::Parquet,
1264 ),
1265 );
1266 DataFileWriterBuilder::new(rolling_writer_builder)
1267 };
1268 let position_delete_builder = {
1269 let parquet_writer_builder = ParquetWriterBuilder::new(
1270 parquet_writer_properties.clone(),
1271 POSITION_DELETE_SCHEMA.clone().into(),
1272 );
1273 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1274 parquet_writer_builder,
1275 table.file_io().clone(),
1276 DefaultLocationGenerator::new(table.metadata().clone())
1277 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1278 DefaultFileNameGenerator::new(
1279 writer_param.actor_id.to_string(),
1280 Some(format!("pos-del-{}", unique_uuid_suffix)),
1281 iceberg::spec::DataFileFormat::Parquet,
1282 ),
1283 );
1284 PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
1285 };
1286 let equality_delete_builder = {
1287 let config = EqualityDeleteWriterConfig::new(
1288 unique_column_ids.clone(),
1289 table.metadata().current_schema().clone(),
1290 )
1291 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1292 let parquet_writer_builder = ParquetWriterBuilder::new(
1293 parquet_writer_properties,
1294 Arc::new(
1295 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1296 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1297 ),
1298 );
1299 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
1300 parquet_writer_builder,
1301 table.file_io().clone(),
1302 DefaultLocationGenerator::new(table.metadata().clone())
1303 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1304 DefaultFileNameGenerator::new(
1305 writer_param.actor_id.to_string(),
1306 Some(format!("eq-del-{}", unique_uuid_suffix)),
1307 iceberg::spec::DataFileFormat::Parquet,
1308 ),
1309 );
1310
1311 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
1312 };
1313 let delta_builder = DeltaWriterBuilder::new(
1314 data_file_builder,
1315 position_delete_builder,
1316 equality_delete_builder,
1317 unique_column_ids,
1318 schema.clone(),
1319 );
1320 let original_arrow_schema = Arc::new(
1321 schema_to_arrow_schema(table.metadata().current_schema())
1322 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1323 );
1324 let schema_with_extra_op_column = {
1325 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1326 new_fields.push(Arc::new(ArrowField::new(
1327 "op".to_owned(),
1328 ArrowDataType::Int32,
1329 false,
1330 )));
1331 Arc::new(ArrowSchema::new(new_fields))
1332 };
1333 let writer_builder = TaskWriterBuilderWrapper::new(
1334 MonitoredGeneralWriterBuilder::new(
1335 delta_builder,
1336 write_qps.clone(),
1337 write_latency.clone(),
1338 ),
1339 fanout_enabled,
1340 schema.clone(),
1341 partition_spec.clone(),
1342 true,
1343 );
1344 let inner_writer = Some(Box::new(
1345 writer_builder
1346 .clone()
1347 .build()
1348 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1349 ) as Box<dyn IcebergWriter>);
1350 Ok(Self {
1351 arrow_schema: original_arrow_schema,
1352 metrics: IcebergWriterMetrics {
1353 _write_qps: write_qps,
1354 _write_latency: write_latency,
1355 write_bytes,
1356 },
1357 table,
1358 writer: IcebergWriterDispatch::Upsert {
1359 writer: inner_writer,
1360 writer_builder,
1361 arrow_schema_with_op_column: schema_with_extra_op_column,
1362 },
1363 project_idx_vec: {
1364 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1365 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1366 } else {
1367 ProjectIdxVec::None
1368 }
1369 },
1370 })
1371 }
1372}
1373
1374#[async_trait]
1375impl SinkWriter for IcebergSinkWriter {
1376 type CommitMetadata = Option<SinkMetadata>;
1377
1378 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1380 let Self::Created(args) = self else {
1381 return Ok(());
1382 };
1383
1384 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1385 let inner = match &args.unique_column_ids {
1386 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1387 table,
1388 unique_column_ids.clone(),
1389 &args.writer_param,
1390 )?,
1391 None => IcebergSinkWriterInner::build_append_only(table, &args.writer_param)?,
1392 };
1393
1394 *self = IcebergSinkWriter::Initialized(inner);
1395 Ok(())
1396 }
1397
1398 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1400 let Self::Initialized(inner) = self else {
1401 unreachable!("IcebergSinkWriter should be initialized before barrier");
1402 };
1403
1404 match &mut inner.writer {
1406 IcebergWriterDispatch::Append {
1407 writer,
1408 writer_builder,
1409 } => {
1410 if writer.is_none() {
1411 *writer = Some(Box::new(
1412 writer_builder
1413 .clone()
1414 .build()
1415 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1416 ));
1417 }
1418 }
1419 IcebergWriterDispatch::Upsert {
1420 writer,
1421 writer_builder,
1422 ..
1423 } => {
1424 if writer.is_none() {
1425 *writer = Some(Box::new(
1426 writer_builder
1427 .clone()
1428 .build()
1429 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1430 ));
1431 }
1432 }
1433 };
1434
1435 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1437 match &mut inner.project_idx_vec {
1438 ProjectIdxVec::None => {}
1439 ProjectIdxVec::Prepare(idx) => {
1440 if *idx >= chunk.columns().len() {
1441 return Err(SinkError::Iceberg(anyhow!(
1442 "invalid extra partition column index {}",
1443 idx
1444 )));
1445 }
1446 let project_idx_vec = (0..*idx)
1447 .chain(*idx + 1..chunk.columns().len())
1448 .collect_vec();
1449 chunk = chunk.project(&project_idx_vec);
1450 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1451 }
1452 ProjectIdxVec::Done(idx_vec) => {
1453 chunk = chunk.project(idx_vec);
1454 }
1455 }
1456 if ops.is_empty() {
1457 return Ok(());
1458 }
1459 let write_batch_size = chunk.estimated_heap_size();
1460 let batch = match &inner.writer {
1461 IcebergWriterDispatch::Append { .. } => {
1462 let filters =
1464 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1465 chunk.set_visibility(filters);
1466 IcebergArrowConvert
1467 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1468 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1469 }
1470 IcebergWriterDispatch::Upsert {
1471 arrow_schema_with_op_column,
1472 ..
1473 } => {
1474 let chunk = IcebergArrowConvert
1475 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1476 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1477 let ops = Arc::new(Int32Array::from(
1478 ops.iter()
1479 .map(|op| match op {
1480 Op::UpdateInsert | Op::Insert => INSERT_OP,
1481 Op::UpdateDelete | Op::Delete => DELETE_OP,
1482 })
1483 .collect_vec(),
1484 ));
1485 let mut columns = chunk.columns().to_vec();
1486 columns.push(ops);
1487 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1488 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1489 }
1490 };
1491
1492 let writer = inner.writer.get_writer().unwrap();
1493 writer
1494 .write(batch)
1495 .instrument_await("iceberg_write")
1496 .await
1497 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1498 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1499 Ok(())
1500 }
1501
1502 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1505 let Self::Initialized(inner) = self else {
1506 unreachable!("IcebergSinkWriter should be initialized before barrier");
1507 };
1508
1509 if !is_checkpoint {
1511 return Ok(None);
1512 }
1513
1514 let close_result = match &mut inner.writer {
1515 IcebergWriterDispatch::Append {
1516 writer,
1517 writer_builder,
1518 } => {
1519 let close_result = match writer.take() {
1520 Some(mut writer) => {
1521 Some(writer.close().instrument_await("iceberg_close").await)
1522 }
1523 _ => None,
1524 };
1525 match writer_builder.clone().build() {
1526 Ok(new_writer) => {
1527 *writer = Some(Box::new(new_writer));
1528 }
1529 _ => {
1530 warn!("Failed to build new writer after close");
1533 }
1534 }
1535 close_result
1536 }
1537 IcebergWriterDispatch::Upsert {
1538 writer,
1539 writer_builder,
1540 ..
1541 } => {
1542 let close_result = match writer.take() {
1543 Some(mut writer) => {
1544 Some(writer.close().instrument_await("iceberg_close").await)
1545 }
1546 _ => None,
1547 };
1548 match writer_builder.clone().build() {
1549 Ok(new_writer) => {
1550 *writer = Some(Box::new(new_writer));
1551 }
1552 _ => {
1553 warn!("Failed to build new writer after close");
1556 }
1557 }
1558 close_result
1559 }
1560 };
1561
1562 match close_result {
1563 Some(Ok(result)) => {
1564 let format_version = inner.table.metadata().format_version();
1565 let partition_type = inner.table.metadata().default_partition_type();
1566 let data_files = result
1567 .into_iter()
1568 .map(|f| {
1569 let truncated = truncate_datafile(f);
1571 SerializedDataFile::try_from(truncated, partition_type, format_version)
1572 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1573 })
1574 .collect::<Result<Vec<_>>>()?;
1575 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1576 data_files,
1577 schema_id: inner.table.metadata().current_schema_id(),
1578 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1579 })?))
1580 }
1581 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1582 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1583 }
1584 }
1585}
1586
1587const SCHEMA_ID: &str = "schema_id";
1588const PARTITION_SPEC_ID: &str = "partition_spec_id";
1589const DATA_FILES: &str = "data_files";
1590
1591const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1615 data_file.lower_bounds.retain(|field_id, datum| {
1617 let size = match datum.to_bytes() {
1619 Ok(bytes) => bytes.len(),
1620 Err(_) => 0,
1621 };
1622
1623 if size > MAX_COLUMN_STAT_SIZE {
1624 tracing::debug!(
1625 field_id = field_id,
1626 size = size,
1627 "Truncating large lower_bound statistic"
1628 );
1629 return false;
1630 }
1631 true
1632 });
1633
1634 data_file.upper_bounds.retain(|field_id, datum| {
1636 let size = match datum.to_bytes() {
1638 Ok(bytes) => bytes.len(),
1639 Err(_) => 0,
1640 };
1641
1642 if size > MAX_COLUMN_STAT_SIZE {
1643 tracing::debug!(
1644 field_id = field_id,
1645 size = size,
1646 "Truncating large upper_bound statistic"
1647 );
1648 return false;
1649 }
1650 true
1651 });
1652
1653 data_file
1654}
1655
1656#[derive(Default, Clone)]
1657struct IcebergCommitResult {
1658 schema_id: i32,
1659 partition_spec_id: i32,
1660 data_files: Vec<SerializedDataFile>,
1661}
1662
1663impl IcebergCommitResult {
1664 fn try_from(value: &SinkMetadata) -> Result<Self> {
1665 if let Some(Serialized(v)) = &value.metadata {
1666 let mut values = if let serde_json::Value::Object(v) =
1667 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1668 .context("Can't parse iceberg sink metadata")?
1669 {
1670 v
1671 } else {
1672 bail!("iceberg sink metadata should be an object");
1673 };
1674
1675 let schema_id;
1676 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1677 schema_id = value
1678 .as_u64()
1679 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1680 } else {
1681 bail!("iceberg sink metadata should have schema_id");
1682 }
1683
1684 let partition_spec_id;
1685 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1686 partition_spec_id = value
1687 .as_u64()
1688 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1689 } else {
1690 bail!("iceberg sink metadata should have partition_spec_id");
1691 }
1692
1693 let data_files: Vec<SerializedDataFile>;
1694 if let serde_json::Value::Array(values) = values
1695 .remove(DATA_FILES)
1696 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1697 {
1698 data_files = values
1699 .into_iter()
1700 .map(from_value::<SerializedDataFile>)
1701 .collect::<std::result::Result<_, _>>()
1702 .unwrap();
1703 } else {
1704 bail!("iceberg sink metadata should have data_files object");
1705 }
1706
1707 Ok(Self {
1708 schema_id: schema_id as i32,
1709 partition_spec_id: partition_spec_id as i32,
1710 data_files,
1711 })
1712 } else {
1713 bail!("Can't create iceberg sink write result from empty data!")
1714 }
1715 }
1716
1717 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
1718 let mut values = if let serde_json::Value::Object(value) =
1719 serde_json::from_slice::<serde_json::Value>(&value)
1720 .context("Can't parse iceberg sink metadata")?
1721 {
1722 value
1723 } else {
1724 bail!("iceberg sink metadata should be an object");
1725 };
1726
1727 let schema_id;
1728 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1729 schema_id = value
1730 .as_u64()
1731 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1732 } else {
1733 bail!("iceberg sink metadata should have schema_id");
1734 }
1735
1736 let partition_spec_id;
1737 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1738 partition_spec_id = value
1739 .as_u64()
1740 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1741 } else {
1742 bail!("iceberg sink metadata should have partition_spec_id");
1743 }
1744
1745 let data_files: Vec<SerializedDataFile>;
1746 if let serde_json::Value::Array(values) = values
1747 .remove(DATA_FILES)
1748 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1749 {
1750 data_files = values
1751 .into_iter()
1752 .map(from_value::<SerializedDataFile>)
1753 .collect::<std::result::Result<_, _>>()
1754 .unwrap();
1755 } else {
1756 bail!("iceberg sink metadata should have data_files object");
1757 }
1758
1759 Ok(Self {
1760 schema_id: schema_id as i32,
1761 partition_spec_id: partition_spec_id as i32,
1762 data_files,
1763 })
1764 }
1765}
1766
1767impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1768 type Error = SinkError;
1769
1770 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1771 let json_data_files = serde_json::Value::Array(
1772 value
1773 .data_files
1774 .iter()
1775 .map(serde_json::to_value)
1776 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1777 .context("Can't serialize data files to json")?,
1778 );
1779 let json_value = serde_json::Value::Object(
1780 vec![
1781 (
1782 SCHEMA_ID.to_owned(),
1783 serde_json::Value::Number(value.schema_id.into()),
1784 ),
1785 (
1786 PARTITION_SPEC_ID.to_owned(),
1787 serde_json::Value::Number(value.partition_spec_id.into()),
1788 ),
1789 (DATA_FILES.to_owned(), json_data_files),
1790 ]
1791 .into_iter()
1792 .collect(),
1793 );
1794 Ok(SinkMetadata {
1795 metadata: Some(Serialized(SerializedMetadata {
1796 metadata: serde_json::to_vec(&json_value)
1797 .context("Can't serialize iceberg sink metadata")?,
1798 })),
1799 })
1800 }
1801}
1802
1803impl TryFrom<IcebergCommitResult> for Vec<u8> {
1804 type Error = SinkError;
1805
1806 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1807 let json_data_files = serde_json::Value::Array(
1808 value
1809 .data_files
1810 .iter()
1811 .map(serde_json::to_value)
1812 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1813 .context("Can't serialize data files to json")?,
1814 );
1815 let json_value = serde_json::Value::Object(
1816 vec![
1817 (
1818 SCHEMA_ID.to_owned(),
1819 serde_json::Value::Number(value.schema_id.into()),
1820 ),
1821 (
1822 PARTITION_SPEC_ID.to_owned(),
1823 serde_json::Value::Number(value.partition_spec_id.into()),
1824 ),
1825 (DATA_FILES.to_owned(), json_data_files),
1826 ]
1827 .into_iter()
1828 .collect(),
1829 );
1830 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1831 }
1832}
1833pub struct IcebergSinkCommitter {
1834 catalog: Arc<dyn Catalog>,
1835 table: Table,
1836 pub last_commit_epoch: u64,
1837 pub(crate) sink_id: SinkId,
1838 pub(crate) config: IcebergConfig,
1839 pub(crate) param: SinkParam,
1840 commit_retry_num: u32,
1841 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1842}
1843
1844impl IcebergSinkCommitter {
1845 async fn reload_table(
1848 catalog: &dyn Catalog,
1849 table_ident: &TableIdent,
1850 schema_id: i32,
1851 partition_spec_id: i32,
1852 ) -> Result<Table> {
1853 let table = catalog
1854 .load_table(table_ident)
1855 .await
1856 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1857 if table.metadata().current_schema_id() != schema_id {
1858 return Err(SinkError::Iceberg(anyhow!(
1859 "Schema evolution not supported, expect schema id {}, but got {}",
1860 schema_id,
1861 table.metadata().current_schema_id()
1862 )));
1863 }
1864 if table.metadata().default_partition_spec_id() != partition_spec_id {
1865 return Err(SinkError::Iceberg(anyhow!(
1866 "Partition evolution not supported, expect partition spec id {}, but got {}",
1867 partition_spec_id,
1868 table.metadata().default_partition_spec_id()
1869 )));
1870 }
1871 Ok(table)
1872 }
1873}
1874
1875#[async_trait]
1876impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
1877 async fn init(&mut self) -> Result<()> {
1878 tracing::info!(
1879 sink_id = %self.param.sink_id,
1880 "Iceberg sink coordinator initialized",
1881 );
1882
1883 Ok(())
1884 }
1885
1886 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1887 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
1888
1889 if metadata.is_empty() {
1890 tracing::debug!(?epoch, "No datafile to commit");
1891 return Ok(());
1892 }
1893
1894 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
1896 self.commit_data_impl(epoch, write_results, snapshot_id)
1897 .await?;
1898 }
1899
1900 Ok(())
1901 }
1902
1903 async fn commit_schema_change(
1904 &mut self,
1905 epoch: u64,
1906 schema_change: PbSinkSchemaChange,
1907 ) -> Result<()> {
1908 tracing::info!(
1909 "Committing schema change {:?} in epoch {}",
1910 schema_change,
1911 epoch
1912 );
1913 self.commit_schema_change_impl(schema_change).await?;
1914 tracing::info!("Successfully committed schema change in epoch {}", epoch);
1915
1916 Ok(())
1917 }
1918}
1919
1920#[async_trait]
1921impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
1922 async fn init(&mut self) -> Result<()> {
1923 tracing::info!(
1924 sink_id = %self.param.sink_id,
1925 "Iceberg sink coordinator initialized",
1926 );
1927
1928 Ok(())
1929 }
1930
1931 async fn pre_commit(
1932 &mut self,
1933 epoch: u64,
1934 metadata: Vec<SinkMetadata>,
1935 _schema_change: Option<PbSinkSchemaChange>,
1936 ) -> Result<Option<Vec<u8>>> {
1937 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
1938
1939 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
1940 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1941 None => {
1942 tracing::debug!(?epoch, "no data to pre commit");
1943 return Ok(None);
1944 }
1945 };
1946
1947 let mut write_results_bytes = Vec::new();
1948 for each_parallelism_write_result in write_results {
1949 let each_parallelism_write_result_bytes: Vec<u8> =
1950 each_parallelism_write_result.try_into()?;
1951 write_results_bytes.push(each_parallelism_write_result_bytes);
1952 }
1953
1954 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
1955 write_results_bytes.push(snapshot_id_bytes);
1956
1957 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
1958 Ok(Some(pre_commit_metadata_bytes))
1959 }
1960
1961 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1962 tracing::debug!("Starting iceberg commit in epoch {epoch}");
1963
1964 if commit_metadata.is_empty() {
1965 tracing::debug!(?epoch, "No datafile to commit");
1966 return Ok(());
1967 }
1968
1969 let mut payload = deserialize_metadata(commit_metadata);
1971 if payload.is_empty() {
1972 return Err(SinkError::Iceberg(anyhow!(
1973 "Invalid commit metadata: empty payload"
1974 )));
1975 }
1976
1977 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
1979 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
1980 })?;
1981 let snapshot_id = i64::from_le_bytes(
1982 snapshot_id_bytes
1983 .try_into()
1984 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
1985 );
1986
1987 let write_results = payload
1989 .into_iter()
1990 .map(IcebergCommitResult::try_from_serialized_bytes)
1991 .collect::<Result<Vec<_>>>()?;
1992
1993 let snapshot_committed = self
1994 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1995 .await?;
1996
1997 if snapshot_committed {
1998 tracing::info!(
1999 "Snapshot id {} already committed in iceberg table, skip committing again.",
2000 snapshot_id
2001 );
2002 return Ok(());
2003 }
2004
2005 self.commit_data_impl(epoch, write_results, snapshot_id)
2006 .await
2007 }
2008
2009 async fn commit_schema_change(
2010 &mut self,
2011 epoch: u64,
2012 schema_change: PbSinkSchemaChange,
2013 ) -> Result<()> {
2014 let schema_updated = self.check_schema_change_applied(&schema_change)?;
2015 if schema_updated {
2016 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2017 return Ok(());
2018 }
2019
2020 tracing::info!(
2021 "Committing schema change {:?} in epoch {}",
2022 schema_change,
2023 epoch
2024 );
2025 self.commit_schema_change_impl(schema_change).await?;
2026 tracing::info!("Successfully committed schema change in epoch {epoch}");
2027
2028 Ok(())
2029 }
2030
2031 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2032 tracing::debug!("Abort not implemented yet");
2034 }
2035}
2036
2037impl IcebergSinkCommitter {
2039 fn pre_commit_inner(
2040 &mut self,
2041 _epoch: u64,
2042 metadata: Vec<SinkMetadata>,
2043 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2044 let write_results: Vec<IcebergCommitResult> = metadata
2045 .iter()
2046 .map(IcebergCommitResult::try_from)
2047 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2048
2049 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2051 return Ok(None);
2052 }
2053
2054 let expect_schema_id = write_results[0].schema_id;
2055 let expect_partition_spec_id = write_results[0].partition_spec_id;
2056
2057 if write_results
2059 .iter()
2060 .any(|r| r.schema_id != expect_schema_id)
2061 || write_results
2062 .iter()
2063 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2064 {
2065 return Err(SinkError::Iceberg(anyhow!(
2066 "schema_id and partition_spec_id should be the same in all write results"
2067 )));
2068 }
2069
2070 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2071
2072 Ok(Some((write_results, snapshot_id)))
2073 }
2074
2075 async fn commit_data_impl(
2076 &mut self,
2077 epoch: u64,
2078 write_results: Vec<IcebergCommitResult>,
2079 snapshot_id: i64,
2080 ) -> Result<()> {
2081 assert!(
2083 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2084 );
2085
2086 self.wait_for_snapshot_limit().await?;
2088
2089 let expect_schema_id = write_results[0].schema_id;
2090 let expect_partition_spec_id = write_results[0].partition_spec_id;
2091
2092 self.table = Self::reload_table(
2094 self.catalog.as_ref(),
2095 self.table.identifier(),
2096 expect_schema_id,
2097 expect_partition_spec_id,
2098 )
2099 .await?;
2100
2101 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2102 return Err(SinkError::Iceberg(anyhow!(
2103 "Can't find schema by id {}",
2104 expect_schema_id
2105 )));
2106 };
2107 let Some(partition_spec) = self
2108 .table
2109 .metadata()
2110 .partition_spec_by_id(expect_partition_spec_id)
2111 else {
2112 return Err(SinkError::Iceberg(anyhow!(
2113 "Can't find partition spec by id {}",
2114 expect_partition_spec_id
2115 )));
2116 };
2117 let partition_type = partition_spec
2118 .as_ref()
2119 .clone()
2120 .partition_type(schema)
2121 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2122
2123 let data_files = write_results
2124 .into_iter()
2125 .flat_map(|r| {
2126 r.data_files.into_iter().map(|f| {
2127 f.try_into(expect_partition_spec_id, &partition_type, schema)
2128 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2129 })
2130 })
2131 .collect::<Result<Vec<DataFile>>>()?;
2132 let retry_strategy = ExponentialBackoff::from_millis(10)
2137 .max_delay(Duration::from_secs(60))
2138 .map(jitter)
2139 .take(self.commit_retry_num as usize);
2140 let catalog = self.catalog.clone();
2141 let table_ident = self.table.identifier().clone();
2142
2143 enum CommitError {
2148 ReloadTable(SinkError), Commit(SinkError), }
2151
2152 let table = RetryIf::spawn(
2153 retry_strategy,
2154 || async {
2155 let table = Self::reload_table(
2157 catalog.as_ref(),
2158 &table_ident,
2159 expect_schema_id,
2160 expect_partition_spec_id,
2161 )
2162 .await
2163 .map_err(|e| {
2164 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2165 CommitError::ReloadTable(e)
2166 })?;
2167
2168 let txn = Transaction::new(&table);
2169 let append_action = txn
2170 .fast_append()
2171 .set_snapshot_id(snapshot_id)
2172 .set_target_branch(commit_branch(
2173 self.config.r#type.as_str(),
2174 self.config.write_mode,
2175 ))
2176 .add_data_files(data_files.clone());
2177
2178 let tx = append_action.apply(txn).map_err(|err| {
2179 let err: IcebergError = err.into();
2180 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2181 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2182 })?;
2183
2184 tx.commit(catalog.as_ref()).await.map_err(|err| {
2185 let err: IcebergError = err.into();
2186 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2187 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2188 })
2189 },
2190 |err: &CommitError| {
2191 match err {
2193 CommitError::Commit(_) => {
2194 tracing::warn!("Commit failed, will retry");
2195 true
2196 }
2197 CommitError::ReloadTable(_) => {
2198 tracing::error!(
2199 "reload_table failed with non-retriable error, will not retry"
2200 );
2201 false
2202 }
2203 }
2204 },
2205 )
2206 .await
2207 .map_err(|e| match e {
2208 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2209 })?;
2210 self.table = table;
2211
2212 let snapshot_num = self.table.metadata().snapshots().count();
2213 let catalog_name = self.config.common.catalog_name();
2214 let table_name = self.table.identifier().to_string();
2215 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2216 GLOBAL_SINK_METRICS
2217 .iceberg_snapshot_num
2218 .with_guarded_label_values(&metrics_labels)
2219 .set(snapshot_num as i64);
2220
2221 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2222
2223 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2224 && self.config.enable_compaction
2225 && iceberg_compact_stat_sender
2226 .send(IcebergSinkCompactionUpdate {
2227 sink_id: self.sink_id,
2228 compaction_interval: self.config.compaction_interval_sec(),
2229 force_compaction: false,
2230 })
2231 .is_err()
2232 {
2233 warn!("failed to send iceberg compaction stats");
2234 }
2235
2236 Ok(())
2237 }
2238
2239 async fn is_snapshot_id_in_iceberg(
2243 &self,
2244 iceberg_config: &IcebergConfig,
2245 snapshot_id: i64,
2246 ) -> Result<bool> {
2247 let table = iceberg_config.load_table().await?;
2248 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2249 Ok(true)
2250 } else {
2251 Ok(false)
2252 }
2253 }
2254
2255 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2258 let current_schema = self.table.metadata().current_schema();
2259 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2260 .context("Failed to convert schema")
2261 .map_err(SinkError::Iceberg)?;
2262
2263 let iceberg_arrow_convert = IcebergArrowConvert;
2264
2265 let schema_matches = |expected: &[ArrowField]| {
2266 if current_arrow_schema.fields().len() != expected.len() {
2267 return false;
2268 }
2269
2270 expected.iter().all(|expected_field| {
2271 current_arrow_schema.fields().iter().any(|current_field| {
2272 current_field.name() == expected_field.name()
2273 && current_field.data_type() == expected_field.data_type()
2274 })
2275 })
2276 };
2277
2278 let original_arrow_fields: Vec<ArrowField> = schema_change
2279 .original_schema
2280 .iter()
2281 .map(|pb_field| {
2282 let field = Field::from(pb_field);
2283 iceberg_arrow_convert
2284 .to_arrow_field(&field.name, &field.data_type)
2285 .context("Failed to convert field to arrow")
2286 .map_err(SinkError::Iceberg)
2287 })
2288 .collect::<Result<_>>()?;
2289
2290 if schema_matches(&original_arrow_fields) {
2292 tracing::debug!(
2293 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2294 original_arrow_fields.len()
2295 );
2296 return Ok(false);
2297 }
2298
2299 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2301 schema_change.op.as_ref()
2302 else {
2303 return Err(SinkError::Iceberg(anyhow!(
2304 "Unsupported sink schema change op in iceberg sink: {:?}",
2305 schema_change.op
2306 )));
2307 };
2308
2309 let add_arrow_fields: Vec<ArrowField> = add_columns_op
2310 .fields
2311 .iter()
2312 .map(|pb_field| {
2313 let field = Field::from(pb_field);
2314 iceberg_arrow_convert
2315 .to_arrow_field(&field.name, &field.data_type)
2316 .context("Failed to convert field to arrow")
2317 .map_err(SinkError::Iceberg)
2318 })
2319 .collect::<Result<_>>()?;
2320
2321 let mut expected_after_change = original_arrow_fields;
2322 expected_after_change.extend(add_arrow_fields);
2323
2324 if schema_matches(&expected_after_change) {
2326 tracing::debug!(
2327 "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2328 expected_after_change.len()
2329 );
2330 return Ok(true);
2331 }
2332
2333 Err(SinkError::Iceberg(anyhow!(
2334 "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2335 schema_change.original_schema.len()
2336 )))
2337 }
2338
2339 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2343 use iceberg::spec::NestedField;
2344
2345 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2346 schema_change.op.as_ref()
2347 else {
2348 return Err(SinkError::Iceberg(anyhow!(
2349 "Unsupported sink schema change op in iceberg sink: {:?}",
2350 schema_change.op
2351 )));
2352 };
2353
2354 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2355
2356 let metadata = self.table.metadata();
2358 let mut next_field_id = metadata.last_column_id() + 1;
2359 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2360
2361 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2363 let mut new_fields = Vec::new();
2364
2365 for field in &add_columns {
2366 let arrow_field = iceberg_create_table_arrow_convert
2368 .to_arrow_field(&field.name, &field.data_type)
2369 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2370 .map_err(SinkError::Iceberg)?;
2371
2372 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2374 .map_err(|err| {
2375 SinkError::Iceberg(
2376 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2377 )
2378 })?;
2379
2380 let nested_field = Arc::new(NestedField::optional(
2382 next_field_id,
2383 &field.name,
2384 iceberg_type,
2385 ));
2386
2387 new_fields.push(nested_field);
2388 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2389 next_field_id += 1;
2390 }
2391
2392 tracing::info!(
2394 "Committing schema change to catalog for table {}",
2395 self.table.identifier()
2396 );
2397
2398 let txn = Transaction::new(&self.table);
2399 let action = txn.update_schema().add_fields(new_fields);
2400
2401 let updated_table = action
2402 .apply(txn)
2403 .context("Failed to apply schema update action")
2404 .map_err(SinkError::Iceberg)?
2405 .commit(self.catalog.as_ref())
2406 .await
2407 .context("Failed to commit table schema change")
2408 .map_err(SinkError::Iceberg)?;
2409
2410 self.table = updated_table;
2411
2412 tracing::info!(
2413 "Successfully committed schema change, added {} columns to iceberg table",
2414 add_columns.len()
2415 );
2416
2417 Ok(())
2418 }
2419
2420 fn count_snapshots_since_rewrite(&self) -> usize {
2423 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2424 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2425
2426 let mut count = 0;
2428 for snapshot in snapshots {
2429 let summary = snapshot.summary();
2431 match &summary.operation {
2432 Operation::Replace => {
2433 break;
2435 }
2436
2437 _ => {
2438 count += 1;
2440 }
2441 }
2442 }
2443
2444 count
2445 }
2446
2447 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2449 if !self.config.enable_compaction {
2450 return Ok(());
2451 }
2452
2453 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2454 loop {
2455 let current_count = self.count_snapshots_since_rewrite();
2456
2457 if current_count < max_snapshots {
2458 tracing::info!(
2459 "Snapshot count check passed: {} < {}",
2460 current_count,
2461 max_snapshots
2462 );
2463 break;
2464 }
2465
2466 tracing::info!(
2467 "Snapshot count {} exceeds limit {}, waiting...",
2468 current_count,
2469 max_snapshots
2470 );
2471
2472 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2473 && iceberg_compact_stat_sender
2474 .send(IcebergSinkCompactionUpdate {
2475 sink_id: self.sink_id,
2476 compaction_interval: self.config.compaction_interval_sec(),
2477 force_compaction: true,
2478 })
2479 .is_err()
2480 {
2481 tracing::warn!("failed to send iceberg compaction stats");
2482 }
2483
2484 tokio::time::sleep(Duration::from_secs(30)).await;
2486
2487 self.table = self.config.load_table().await?;
2489 }
2490 }
2491 Ok(())
2492 }
2493}
2494
2495const MAP_KEY: &str = "key";
2496const MAP_VALUE: &str = "value";
2497
2498fn get_fields<'a>(
2499 our_field_type: &'a risingwave_common::types::DataType,
2500 data_type: &ArrowDataType,
2501 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2502) -> Option<ArrowFields> {
2503 match data_type {
2504 ArrowDataType::Struct(fields) => {
2505 match our_field_type {
2506 risingwave_common::types::DataType::Struct(struct_fields) => {
2507 struct_fields.iter().for_each(|(name, data_type)| {
2508 let res = schema_fields.insert(name, data_type);
2509 assert!(res.is_none())
2511 });
2512 }
2513 risingwave_common::types::DataType::Map(map_fields) => {
2514 schema_fields.insert(MAP_KEY, map_fields.key());
2515 schema_fields.insert(MAP_VALUE, map_fields.value());
2516 }
2517 risingwave_common::types::DataType::List(list) => {
2518 list.elem()
2519 .as_struct()
2520 .iter()
2521 .for_each(|(name, data_type)| {
2522 let res = schema_fields.insert(name, data_type);
2523 assert!(res.is_none())
2525 });
2526 }
2527 _ => {}
2528 };
2529 Some(fields.clone())
2530 }
2531 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2532 get_fields(our_field_type, field.data_type(), schema_fields)
2533 }
2534 _ => None, }
2536}
2537
2538fn check_compatibility(
2539 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2540 fields: &ArrowFields,
2541) -> anyhow::Result<bool> {
2542 for arrow_field in fields {
2543 let our_field_type = schema_fields
2544 .get(arrow_field.name().as_str())
2545 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2546
2547 let converted_arrow_data_type = IcebergArrowConvert
2549 .to_arrow_field("", our_field_type)
2550 .map_err(|e| anyhow!(e))?
2551 .data_type()
2552 .clone();
2553
2554 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2555 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2556 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2557 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2558 (ArrowDataType::List(_), ArrowDataType::List(field))
2559 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2560 let mut schema_fields = HashMap::new();
2561 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2562 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2563 }
2564 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2566 let mut schema_fields = HashMap::new();
2567 our_field_type
2568 .as_struct()
2569 .iter()
2570 .for_each(|(name, data_type)| {
2571 let res = schema_fields.insert(name, data_type);
2572 assert!(res.is_none())
2574 });
2575 check_compatibility(schema_fields, fields)?
2576 }
2577 (left, right) => left.equals_datatype(right),
2585 };
2586 if !compatible {
2587 bail!(
2588 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2589 arrow_field.name(),
2590 converted_arrow_data_type,
2591 arrow_field.data_type()
2592 );
2593 }
2594 }
2595 Ok(true)
2596}
2597
2598pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2600 if rw_schema.fields.len() != arrow_schema.fields().len() {
2601 bail!(
2602 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2603 rw_schema.fields.len(),
2604 arrow_schema.fields.len()
2605 );
2606 }
2607
2608 let mut schema_fields = HashMap::new();
2609 rw_schema.fields.iter().for_each(|field| {
2610 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2611 assert!(res.is_none())
2613 });
2614
2615 check_compatibility(schema_fields, &arrow_schema.fields)?;
2616 Ok(())
2617}
2618
2619fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2620 serde_json::to_vec(&metadata).unwrap()
2621}
2622
2623fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2624 serde_json::from_slice(&bytes).unwrap()
2625}
2626
2627pub fn parse_partition_by_exprs(
2628 expr: String,
2629) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2630 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2632 if !re.is_match(&expr) {
2633 bail!(format!(
2634 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2635 expr
2636 ))
2637 }
2638 let caps = re.captures_iter(&expr);
2639
2640 let mut partition_columns = vec![];
2641
2642 for mat in caps {
2643 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2644 (&mat["transform"], Transform::Identity)
2645 } else {
2646 let mut func = mat["transform"].to_owned();
2647 if func == "bucket" || func == "truncate" {
2648 let n = &mat
2649 .name("n")
2650 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2651 .as_str();
2652 func = format!("{func}[{n}]");
2653 }
2654 (
2655 &mat["field"],
2656 Transform::from_str(&func)
2657 .with_context(|| format!("invalid transform function {}", func))?,
2658 )
2659 };
2660 partition_columns.push((column.to_owned(), transform));
2661 }
2662 Ok(partition_columns)
2663}
2664
2665pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
2666 if should_enable_iceberg_cow(sink_type, write_mode) {
2667 ICEBERG_COW_BRANCH.to_owned()
2668 } else {
2669 MAIN_BRANCH.to_owned()
2670 }
2671}
2672
2673pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
2674 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
2675}
2676
2677impl crate::with_options::WithOptions for IcebergWriteMode {}
2678
2679impl crate::with_options::WithOptions for CompactionType {}
2680
2681#[cfg(test)]
2682mod test {
2683 use std::collections::BTreeMap;
2684
2685 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2686 use risingwave_common::types::{DataType, MapType, StructType};
2687
2688 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2689 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2690 use crate::sink::iceberg::{
2691 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, CompactionType, ENABLE_COMPACTION,
2692 ENABLE_SNAPSHOT_EXPIRATION, IcebergConfig, IcebergWriteMode,
2693 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2694 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2695 };
2696
2697 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2700 fn test_compatible_arrow_schema() {
2701 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2702
2703 use super::*;
2704 let risingwave_schema = Schema::new(vec![
2705 Field::with_name(DataType::Int32, "a"),
2706 Field::with_name(DataType::Int32, "b"),
2707 Field::with_name(DataType::Int32, "c"),
2708 ]);
2709 let arrow_schema = ArrowSchema::new(vec![
2710 ArrowField::new("a", ArrowDataType::Int32, false),
2711 ArrowField::new("b", ArrowDataType::Int32, false),
2712 ArrowField::new("c", ArrowDataType::Int32, false),
2713 ]);
2714
2715 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2716
2717 let risingwave_schema = Schema::new(vec![
2718 Field::with_name(DataType::Int32, "d"),
2719 Field::with_name(DataType::Int32, "c"),
2720 Field::with_name(DataType::Int32, "a"),
2721 Field::with_name(DataType::Int32, "b"),
2722 ]);
2723 let arrow_schema = ArrowSchema::new(vec![
2724 ArrowField::new("a", ArrowDataType::Int32, false),
2725 ArrowField::new("b", ArrowDataType::Int32, false),
2726 ArrowField::new("d", ArrowDataType::Int32, false),
2727 ArrowField::new("c", ArrowDataType::Int32, false),
2728 ]);
2729 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2730
2731 let risingwave_schema = Schema::new(vec![
2732 Field::with_name(
2733 DataType::Struct(StructType::new(vec![
2734 ("a1", DataType::Int32),
2735 (
2736 "a2",
2737 DataType::Struct(StructType::new(vec![
2738 ("a21", DataType::Bytea),
2739 (
2740 "a22",
2741 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2742 ),
2743 ])),
2744 ),
2745 ])),
2746 "a",
2747 ),
2748 Field::with_name(
2749 DataType::list(DataType::Struct(StructType::new(vec![
2750 ("b1", DataType::Int32),
2751 ("b2", DataType::Bytea),
2752 (
2753 "b3",
2754 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2755 ),
2756 ]))),
2757 "b",
2758 ),
2759 Field::with_name(
2760 DataType::Map(MapType::from_kv(
2761 DataType::Varchar,
2762 DataType::list(DataType::Struct(StructType::new([
2763 ("c1", DataType::Int32),
2764 ("c2", DataType::Bytea),
2765 (
2766 "c3",
2767 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2768 ),
2769 ]))),
2770 )),
2771 "c",
2772 ),
2773 ]);
2774 let arrow_schema = ArrowSchema::new(vec![
2775 ArrowField::new(
2776 "a",
2777 ArrowDataType::Struct(ArrowFields::from(vec![
2778 ArrowField::new("a1", ArrowDataType::Int32, false),
2779 ArrowField::new(
2780 "a2",
2781 ArrowDataType::Struct(ArrowFields::from(vec![
2782 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2783 ArrowField::new_map(
2784 "a22",
2785 "entries",
2786 ArrowFieldRef::new(ArrowField::new(
2787 "key",
2788 ArrowDataType::Utf8,
2789 false,
2790 )),
2791 ArrowFieldRef::new(ArrowField::new(
2792 "value",
2793 ArrowDataType::Utf8,
2794 false,
2795 )),
2796 false,
2797 false,
2798 ),
2799 ])),
2800 false,
2801 ),
2802 ])),
2803 false,
2804 ),
2805 ArrowField::new(
2806 "b",
2807 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2808 ArrowDataType::Struct(ArrowFields::from(vec![
2809 ArrowField::new("b1", ArrowDataType::Int32, false),
2810 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2811 ArrowField::new_map(
2812 "b3",
2813 "entries",
2814 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2815 ArrowFieldRef::new(ArrowField::new(
2816 "value",
2817 ArrowDataType::Utf8,
2818 false,
2819 )),
2820 false,
2821 false,
2822 ),
2823 ])),
2824 false,
2825 ))),
2826 false,
2827 ),
2828 ArrowField::new_map(
2829 "c",
2830 "entries",
2831 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2832 ArrowFieldRef::new(ArrowField::new(
2833 "value",
2834 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2835 ArrowDataType::Struct(ArrowFields::from(vec![
2836 ArrowField::new("c1", ArrowDataType::Int32, false),
2837 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2838 ArrowField::new_map(
2839 "c3",
2840 "entries",
2841 ArrowFieldRef::new(ArrowField::new(
2842 "key",
2843 ArrowDataType::Utf8,
2844 false,
2845 )),
2846 ArrowFieldRef::new(ArrowField::new(
2847 "value",
2848 ArrowDataType::Utf8,
2849 false,
2850 )),
2851 false,
2852 false,
2853 ),
2854 ])),
2855 false,
2856 ))),
2857 false,
2858 )),
2859 false,
2860 false,
2861 ),
2862 ]);
2863 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2864 }
2865
2866 #[test]
2867 fn test_parse_iceberg_config() {
2868 let values = [
2869 ("connector", "iceberg"),
2870 ("type", "upsert"),
2871 ("primary_key", "v1"),
2872 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2873 ("warehouse.path", "s3://iceberg"),
2874 ("s3.endpoint", "http://127.0.0.1:9301"),
2875 ("s3.access.key", "hummockadmin"),
2876 ("s3.secret.key", "hummockadmin"),
2877 ("s3.path.style.access", "true"),
2878 ("s3.region", "us-east-1"),
2879 ("catalog.type", "jdbc"),
2880 ("catalog.name", "demo"),
2881 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2882 ("catalog.jdbc.user", "admin"),
2883 ("catalog.jdbc.password", "123456"),
2884 ("database.name", "demo_db"),
2885 ("table.name", "demo_table"),
2886 ("enable_compaction", "true"),
2887 ("compaction_interval_sec", "1800"),
2888 ("enable_snapshot_expiration", "true"),
2889 ]
2890 .into_iter()
2891 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2892 .collect();
2893
2894 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2895
2896 let expected_iceberg_config = IcebergConfig {
2897 common: IcebergCommon {
2898 warehouse_path: Some("s3://iceberg".to_owned()),
2899 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2900 s3_region: Some("us-east-1".to_owned()),
2901 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2902 s3_access_key: Some("hummockadmin".to_owned()),
2903 s3_secret_key: Some("hummockadmin".to_owned()),
2904 s3_iam_role_arn: None,
2905 gcs_credential: None,
2906 catalog_type: Some("jdbc".to_owned()),
2907 glue_id: None,
2908 glue_region: None,
2909 glue_access_key: None,
2910 glue_secret_key: None,
2911 glue_iam_role_arn: None,
2912 catalog_name: Some("demo".to_owned()),
2913 s3_path_style_access: Some(true),
2914 catalog_credential: None,
2915 catalog_oauth2_server_uri: None,
2916 catalog_scope: None,
2917 catalog_token: None,
2918 enable_config_load: None,
2919 rest_signing_name: None,
2920 rest_signing_region: None,
2921 rest_sigv4_enabled: None,
2922 hosted_catalog: None,
2923 azblob_account_name: None,
2924 azblob_account_key: None,
2925 azblob_endpoint_url: None,
2926 catalog_header: None,
2927 adlsgen2_account_name: None,
2928 adlsgen2_account_key: None,
2929 adlsgen2_endpoint: None,
2930 vended_credentials: None,
2931 },
2932 table: IcebergTableIdentifier {
2933 database_name: Some("demo_db".to_owned()),
2934 table_name: "demo_table".to_owned(),
2935 },
2936 r#type: "upsert".to_owned(),
2937 force_append_only: false,
2938 primary_key: Some(vec!["v1".to_owned()]),
2939 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2940 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2941 .into_iter()
2942 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2943 .collect(),
2944 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2945 create_table_if_not_exists: false,
2946 is_exactly_once: Some(true),
2947 commit_retry_num: 8,
2948 enable_compaction: true,
2949 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2950 enable_snapshot_expiration: true,
2951 write_mode: IcebergWriteMode::MergeOnRead,
2952 snapshot_expiration_max_age_millis: None,
2953 snapshot_expiration_retain_last: None,
2954 snapshot_expiration_clear_expired_files: true,
2955 snapshot_expiration_clear_expired_meta_data: true,
2956 max_snapshots_num_before_compaction: None,
2957 small_files_threshold_mb: None,
2958 delete_files_count_threshold: None,
2959 trigger_snapshot_count: None,
2960 target_file_size_mb: None,
2961 compaction_type: None,
2962 };
2963
2964 assert_eq!(iceberg_config, expected_iceberg_config);
2965
2966 assert_eq!(
2967 &iceberg_config.full_table_name().unwrap().to_string(),
2968 "demo_db.demo_table"
2969 );
2970 }
2971
2972 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2973 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2974
2975 let _table = iceberg_config.load_table().await.unwrap();
2976 }
2977
2978 #[tokio::test]
2979 #[ignore]
2980 async fn test_storage_catalog() {
2981 let values = [
2982 ("connector", "iceberg"),
2983 ("type", "append-only"),
2984 ("force_append_only", "true"),
2985 ("s3.endpoint", "http://127.0.0.1:9301"),
2986 ("s3.access.key", "hummockadmin"),
2987 ("s3.secret.key", "hummockadmin"),
2988 ("s3.region", "us-east-1"),
2989 ("s3.path.style.access", "true"),
2990 ("catalog.name", "demo"),
2991 ("catalog.type", "storage"),
2992 ("warehouse.path", "s3://icebergdata/demo"),
2993 ("database.name", "s1"),
2994 ("table.name", "t1"),
2995 ]
2996 .into_iter()
2997 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2998 .collect();
2999
3000 test_create_catalog(values).await;
3001 }
3002
3003 #[tokio::test]
3004 #[ignore]
3005 async fn test_rest_catalog() {
3006 let values = [
3007 ("connector", "iceberg"),
3008 ("type", "append-only"),
3009 ("force_append_only", "true"),
3010 ("s3.endpoint", "http://127.0.0.1:9301"),
3011 ("s3.access.key", "hummockadmin"),
3012 ("s3.secret.key", "hummockadmin"),
3013 ("s3.region", "us-east-1"),
3014 ("s3.path.style.access", "true"),
3015 ("catalog.name", "demo"),
3016 ("catalog.type", "rest"),
3017 ("catalog.uri", "http://192.168.167.4:8181"),
3018 ("warehouse.path", "s3://icebergdata/demo"),
3019 ("database.name", "s1"),
3020 ("table.name", "t1"),
3021 ]
3022 .into_iter()
3023 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3024 .collect();
3025
3026 test_create_catalog(values).await;
3027 }
3028
3029 #[tokio::test]
3030 #[ignore]
3031 async fn test_jdbc_catalog() {
3032 let values = [
3033 ("connector", "iceberg"),
3034 ("type", "append-only"),
3035 ("force_append_only", "true"),
3036 ("s3.endpoint", "http://127.0.0.1:9301"),
3037 ("s3.access.key", "hummockadmin"),
3038 ("s3.secret.key", "hummockadmin"),
3039 ("s3.region", "us-east-1"),
3040 ("s3.path.style.access", "true"),
3041 ("catalog.name", "demo"),
3042 ("catalog.type", "jdbc"),
3043 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3044 ("catalog.jdbc.user", "admin"),
3045 ("catalog.jdbc.password", "123456"),
3046 ("warehouse.path", "s3://icebergdata/demo"),
3047 ("database.name", "s1"),
3048 ("table.name", "t1"),
3049 ]
3050 .into_iter()
3051 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3052 .collect();
3053
3054 test_create_catalog(values).await;
3055 }
3056
3057 #[tokio::test]
3058 #[ignore]
3059 async fn test_hive_catalog() {
3060 let values = [
3061 ("connector", "iceberg"),
3062 ("type", "append-only"),
3063 ("force_append_only", "true"),
3064 ("s3.endpoint", "http://127.0.0.1:9301"),
3065 ("s3.access.key", "hummockadmin"),
3066 ("s3.secret.key", "hummockadmin"),
3067 ("s3.region", "us-east-1"),
3068 ("s3.path.style.access", "true"),
3069 ("catalog.name", "demo"),
3070 ("catalog.type", "hive"),
3071 ("catalog.uri", "thrift://localhost:9083"),
3072 ("warehouse.path", "s3://icebergdata/demo"),
3073 ("database.name", "s1"),
3074 ("table.name", "t1"),
3075 ]
3076 .into_iter()
3077 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3078 .collect();
3079
3080 test_create_catalog(values).await;
3081 }
3082
3083 #[test]
3084 fn test_config_constants_consistency() {
3085 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3088 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3089 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3090 assert_eq!(WRITE_MODE, "write_mode");
3091 assert_eq!(
3092 SNAPSHOT_EXPIRATION_RETAIN_LAST,
3093 "snapshot_expiration_retain_last"
3094 );
3095 assert_eq!(
3096 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3097 "snapshot_expiration_max_age_millis"
3098 );
3099 assert_eq!(
3100 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3101 "snapshot_expiration_clear_expired_files"
3102 );
3103 assert_eq!(
3104 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3105 "snapshot_expiration_clear_expired_meta_data"
3106 );
3107 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3108 }
3109
3110 #[test]
3111 fn test_parse_iceberg_compaction_config() {
3112 let values = [
3114 ("connector", "iceberg"),
3115 ("type", "upsert"),
3116 ("primary_key", "id"),
3117 ("warehouse.path", "s3://iceberg"),
3118 ("s3.endpoint", "http://127.0.0.1:9301"),
3119 ("s3.access.key", "test"),
3120 ("s3.secret.key", "test"),
3121 ("s3.region", "us-east-1"),
3122 ("catalog.type", "storage"),
3123 ("catalog.name", "demo"),
3124 ("database.name", "test_db"),
3125 ("table.name", "test_table"),
3126 ("enable_compaction", "true"),
3127 ("compaction.max_snapshots_num", "100"),
3128 ("compaction.small_files_threshold_mb", "512"),
3129 ("compaction.delete_files_count_threshold", "50"),
3130 ("compaction.trigger_snapshot_count", "10"),
3131 ("compaction.target_file_size_mb", "256"),
3132 ("compaction.type", "full"),
3133 ]
3134 .into_iter()
3135 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3136 .collect();
3137
3138 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3139
3140 assert!(iceberg_config.enable_compaction);
3142 assert_eq!(
3143 iceberg_config.max_snapshots_num_before_compaction,
3144 Some(100)
3145 );
3146 assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
3147 assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
3148 assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
3149 assert_eq!(iceberg_config.target_file_size_mb, Some(256));
3150 assert_eq!(iceberg_config.compaction_type, Some(CompactionType::Full));
3151 }
3152}