1mod prometheus;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Debug;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use async_trait::async_trait;
25use await_tree::InstrumentAwait;
26use iceberg::arrow::{
27 RecordBatchPartitionSplitter, arrow_schema_to_schema, schema_to_arrow_schema,
28};
29use iceberg::spec::{
30 DataFile, FormatVersion, MAIN_BRANCH, Operation, PartitionSpecRef,
31 SchemaRef as IcebergSchemaRef, SerializedDataFile, TableProperties, Transform,
32 UnboundPartitionField, UnboundPartitionSpec,
33};
34use iceberg::table::Table;
35use iceberg::transaction::{ApplyTransactionAction, FastAppendAction, Transaction};
36use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
37use iceberg::writer::base_writer::deletion_vector_writer::{
38 DeletionVectorWriter, DeletionVectorWriterBuilder,
39};
40use iceberg::writer::base_writer::equality_delete_writer::{
41 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
42};
43use iceberg::writer::base_writer::position_delete_file_writer::{
44 POSITION_DELETE_SCHEMA, PositionDeleteFileWriter, PositionDeleteFileWriterBuilder,
45 PositionDeleteInput,
46};
47use iceberg::writer::delta_writer::{DELETE_OP, DeltaWriterBuilder, INSERT_OP};
48use iceberg::writer::file_writer::ParquetWriterBuilder;
49use iceberg::writer::file_writer::location_generator::{
50 DefaultFileNameGenerator, DefaultLocationGenerator,
51};
52use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
53use iceberg::writer::task_writer::TaskWriter;
54use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
55use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
56use itertools::Itertools;
57use parquet::basic::Compression;
58use parquet::file::properties::WriterProperties;
59use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
60use regex::Regex;
61use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
62use risingwave_common::array::arrow::arrow_schema_iceberg::{
63 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
64 Schema as ArrowSchema, SchemaRef,
65};
66use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
67use risingwave_common::array::{Op, StreamChunk};
68use risingwave_common::bail;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{Field, Schema};
71use risingwave_common::error::IcebergError;
72use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
73use risingwave_common_estimate_size::EstimateSize;
74use risingwave_pb::connector_service::SinkMetadata;
75use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
76use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
77use risingwave_pb::stream_plan::PbSinkSchemaChange;
78use serde::de::{self, Deserializer, Visitor};
79use serde::{Deserialize, Serialize};
80use serde_json::from_value;
81use serde_with::{DisplayFromStr, serde_as};
82use thiserror_ext::AsReport;
83use tokio::sync::mpsc::UnboundedSender;
84use tokio_retry::RetryIf;
85use tokio_retry::strategy::{ExponentialBackoff, jitter};
86use tracing::warn;
87use url::Url;
88use uuid::Uuid;
89use with_options::WithOptions;
90
91use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
92use super::{
93 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
94 SinkError, SinkWriterParam,
95};
96use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
97use crate::enforce_secret::EnforceSecret;
98use crate::sink::catalog::SinkId;
99use crate::sink::coordinate::CoordinatedLogSinker;
100use crate::sink::writer::SinkWriter;
101use crate::sink::{
102 Result, SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkParam,
103 TwoPhaseCommitCoordinator,
104};
105use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
106
107pub const ICEBERG_SINK: &str = "iceberg";
108
109pub const ICEBERG_COW_BRANCH: &str = "ingestion";
110pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
111pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
112pub const ICEBERG_COMPACTION_TYPE_FULL: &str = "full";
113pub const ICEBERG_COMPACTION_TYPE_SMALL_FILES: &str = "small-files";
114pub const ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE: &str = "files-with-delete";
115
116pub const PARTITION_DATA_ID_START: i32 = 1000;
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
119#[serde(rename_all = "kebab-case")]
120pub enum IcebergWriteMode {
121 #[default]
122 MergeOnRead,
123 CopyOnWrite,
124}
125
126impl IcebergWriteMode {
127 pub fn as_str(self) -> &'static str {
128 match self {
129 IcebergWriteMode::MergeOnRead => ICEBERG_WRITE_MODE_MERGE_ON_READ,
130 IcebergWriteMode::CopyOnWrite => ICEBERG_WRITE_MODE_COPY_ON_WRITE,
131 }
132 }
133}
134
135impl std::str::FromStr for IcebergWriteMode {
136 type Err = SinkError;
137
138 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
139 match s {
140 ICEBERG_WRITE_MODE_MERGE_ON_READ => Ok(IcebergWriteMode::MergeOnRead),
141 ICEBERG_WRITE_MODE_COPY_ON_WRITE => Ok(IcebergWriteMode::CopyOnWrite),
142 _ => Err(SinkError::Config(anyhow!(format!(
143 "invalid write_mode: {}, must be one of: {}, {}",
144 s, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
145 )))),
146 }
147 }
148}
149
150impl TryFrom<&str> for IcebergWriteMode {
151 type Error = <Self as std::str::FromStr>::Err;
152
153 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
154 value.parse()
155 }
156}
157
158impl TryFrom<String> for IcebergWriteMode {
159 type Error = <Self as std::str::FromStr>::Err;
160
161 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
162 value.as_str().parse()
163 }
164}
165
166impl std::fmt::Display for IcebergWriteMode {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 f.write_str(self.as_str())
169 }
170}
171
172pub const ENABLE_COMPACTION: &str = "enable_compaction";
174pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
175pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
176pub const WRITE_MODE: &str = "write_mode";
177pub const FORMAT_VERSION: &str = "format_version";
178pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
179pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
180pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
181pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
182 "snapshot_expiration_clear_expired_meta_data";
183pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
184
185pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
186
187pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
188
189pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
190
191pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
192
193pub const COMPACTION_TYPE: &str = "compaction.type";
194
195pub const COMPACTION_WRITE_PARQUET_COMPRESSION: &str = "compaction.write_parquet_compression";
196pub const COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS: &str =
197 "compaction.write_parquet_max_row_group_rows";
198pub const COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: &str = "commit_checkpoint_size_threshold_mb";
199pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB: u64 = 128;
200
201const PARQUET_CREATED_BY: &str = concat!("risingwave version ", env!("CARGO_PKG_VERSION"));
202
203fn default_commit_retry_num() -> u32 {
204 8
205}
206
207fn default_commit_checkpoint_size_threshold_mb() -> Option<u64> {
208 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
209}
210
211fn default_iceberg_write_mode() -> IcebergWriteMode {
212 IcebergWriteMode::MergeOnRead
213}
214
215fn default_iceberg_format_version() -> FormatVersion {
216 FormatVersion::V2
217}
218
219fn default_true() -> bool {
220 true
221}
222
223fn default_some_true() -> Option<bool> {
224 Some(true)
225}
226
227fn parse_format_version_str(value: &str) -> std::result::Result<FormatVersion, String> {
228 let parsed = value
229 .trim()
230 .parse::<u8>()
231 .map_err(|_| "`format-version` must be one of 1, 2, or 3".to_owned())?;
232 match parsed {
233 1 => Ok(FormatVersion::V1),
234 2 => Ok(FormatVersion::V2),
235 3 => Ok(FormatVersion::V3),
236 _ => Err("`format-version` must be one of 1, 2, or 3".to_owned()),
237 }
238}
239
240fn deserialize_format_version<'de, D>(
241 deserializer: D,
242) -> std::result::Result<FormatVersion, D::Error>
243where
244 D: Deserializer<'de>,
245{
246 struct FormatVersionVisitor;
247
248 impl<'de> Visitor<'de> for FormatVersionVisitor {
249 type Value = FormatVersion;
250
251 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 formatter.write_str("format-version as 1, 2, or 3")
253 }
254
255 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
256 where
257 E: de::Error,
258 {
259 let value = u8::try_from(value)
260 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
261 parse_format_version_str(&value.to_string()).map_err(E::custom)
262 }
263
264 fn visit_i64<E>(self, value: i64) -> std::result::Result<Self::Value, E>
265 where
266 E: de::Error,
267 {
268 let value = u8::try_from(value)
269 .map_err(|_| E::custom("`format-version` must be one of 1, 2, or 3"))?;
270 parse_format_version_str(&value.to_string()).map_err(E::custom)
271 }
272
273 fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
274 where
275 E: de::Error,
276 {
277 parse_format_version_str(value).map_err(E::custom)
278 }
279
280 fn visit_string<E>(self, value: String) -> std::result::Result<Self::Value, E>
281 where
282 E: de::Error,
283 {
284 self.visit_str(&value)
285 }
286 }
287
288 deserializer.deserialize_any(FormatVersionVisitor)
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
293#[serde(rename_all = "kebab-case")]
294pub enum CompactionType {
295 #[default]
297 Full,
298 SmallFiles,
300 FilesWithDelete,
302}
303
304impl CompactionType {
305 pub fn as_str(&self) -> &'static str {
306 match self {
307 CompactionType::Full => ICEBERG_COMPACTION_TYPE_FULL,
308 CompactionType::SmallFiles => ICEBERG_COMPACTION_TYPE_SMALL_FILES,
309 CompactionType::FilesWithDelete => ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE,
310 }
311 }
312}
313
314impl std::str::FromStr for CompactionType {
315 type Err = SinkError;
316
317 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
318 match s {
319 ICEBERG_COMPACTION_TYPE_FULL => Ok(CompactionType::Full),
320 ICEBERG_COMPACTION_TYPE_SMALL_FILES => Ok(CompactionType::SmallFiles),
321 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE => Ok(CompactionType::FilesWithDelete),
322 _ => Err(SinkError::Config(anyhow!(format!(
323 "invalid compaction_type: {}, must be one of: {}, {}, {}",
324 s,
325 ICEBERG_COMPACTION_TYPE_FULL,
326 ICEBERG_COMPACTION_TYPE_SMALL_FILES,
327 ICEBERG_COMPACTION_TYPE_FILES_WITH_DELETE
328 )))),
329 }
330 }
331}
332
333impl TryFrom<&str> for CompactionType {
334 type Error = <Self as std::str::FromStr>::Err;
335
336 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
337 value.parse()
338 }
339}
340
341impl TryFrom<String> for CompactionType {
342 type Error = <Self as std::str::FromStr>::Err;
343
344 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
345 value.as_str().parse()
346 }
347}
348
349impl std::fmt::Display for CompactionType {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 write!(f, "{}", self.as_str())
352 }
353}
354
355#[serde_as]
356#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
357pub struct IcebergConfig {
358 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
361 pub force_append_only: bool,
362
363 #[serde(flatten)]
364 common: IcebergCommon,
365
366 #[serde(flatten)]
367 table: IcebergTableIdentifier,
368
369 #[serde(
370 rename = "primary_key",
371 default,
372 deserialize_with = "deserialize_optional_string_seq_from_string"
373 )]
374 pub primary_key: Option<Vec<String>>,
375
376 #[serde(skip)]
378 pub java_catalog_props: HashMap<String, String>,
379
380 #[serde(default)]
381 pub partition_by: Option<String>,
382
383 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
385 #[serde_as(as = "DisplayFromStr")]
386 #[with_option(allow_alter_on_fly)]
387 pub commit_checkpoint_interval: u64,
388
389 #[serde(default = "default_commit_checkpoint_size_threshold_mb")]
392 #[serde_as(as = "Option<DisplayFromStr>")]
393 #[with_option(allow_alter_on_fly)]
394 pub commit_checkpoint_size_threshold_mb: Option<u64>,
395
396 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
397 pub create_table_if_not_exists: bool,
398
399 #[serde(default = "default_some_true")]
401 #[serde_as(as = "Option<DisplayFromStr>")]
402 pub is_exactly_once: Option<bool>,
403 #[serde(default = "default_commit_retry_num")]
408 pub commit_retry_num: u32,
409
410 #[serde(
412 rename = "enable_compaction",
413 default,
414 deserialize_with = "deserialize_bool_from_string"
415 )]
416 #[with_option(allow_alter_on_fly)]
417 pub enable_compaction: bool,
418
419 #[serde(rename = "compaction_interval_sec", default)]
421 #[serde_as(as = "Option<DisplayFromStr>")]
422 #[with_option(allow_alter_on_fly)]
423 pub compaction_interval_sec: Option<u64>,
424
425 #[serde(
427 rename = "enable_snapshot_expiration",
428 default,
429 deserialize_with = "deserialize_bool_from_string"
430 )]
431 #[with_option(allow_alter_on_fly)]
432 pub enable_snapshot_expiration: bool,
433
434 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
436 pub write_mode: IcebergWriteMode,
437
438 #[serde(
440 rename = "format_version",
441 default = "default_iceberg_format_version",
442 deserialize_with = "deserialize_format_version"
443 )]
444 pub format_version: FormatVersion,
445
446 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
449 #[serde_as(as = "Option<DisplayFromStr>")]
450 #[with_option(allow_alter_on_fly)]
451 pub snapshot_expiration_max_age_millis: Option<i64>,
452
453 #[serde(rename = "snapshot_expiration_retain_last", default)]
455 #[serde_as(as = "Option<DisplayFromStr>")]
456 #[with_option(allow_alter_on_fly)]
457 pub snapshot_expiration_retain_last: Option<i32>,
458
459 #[serde(
460 rename = "snapshot_expiration_clear_expired_files",
461 default = "default_true",
462 deserialize_with = "deserialize_bool_from_string"
463 )]
464 #[with_option(allow_alter_on_fly)]
465 pub snapshot_expiration_clear_expired_files: bool,
466
467 #[serde(
468 rename = "snapshot_expiration_clear_expired_meta_data",
469 default = "default_true",
470 deserialize_with = "deserialize_bool_from_string"
471 )]
472 #[with_option(allow_alter_on_fly)]
473 pub snapshot_expiration_clear_expired_meta_data: bool,
474
475 #[serde(rename = "compaction.max_snapshots_num", default)]
478 #[serde_as(as = "Option<DisplayFromStr>")]
479 #[with_option(allow_alter_on_fly)]
480 pub max_snapshots_num_before_compaction: Option<usize>,
481
482 #[serde(rename = "compaction.small_files_threshold_mb", default)]
483 #[serde_as(as = "Option<DisplayFromStr>")]
484 #[with_option(allow_alter_on_fly)]
485 pub small_files_threshold_mb: Option<u64>,
486
487 #[serde(rename = "compaction.delete_files_count_threshold", default)]
488 #[serde_as(as = "Option<DisplayFromStr>")]
489 #[with_option(allow_alter_on_fly)]
490 pub delete_files_count_threshold: Option<usize>,
491
492 #[serde(rename = "compaction.trigger_snapshot_count", default)]
493 #[serde_as(as = "Option<DisplayFromStr>")]
494 #[with_option(allow_alter_on_fly)]
495 pub trigger_snapshot_count: Option<usize>,
496
497 #[serde(rename = "compaction.target_file_size_mb", default)]
498 #[serde_as(as = "Option<DisplayFromStr>")]
499 #[with_option(allow_alter_on_fly)]
500 pub target_file_size_mb: Option<u64>,
501
502 #[serde(rename = "compaction.type", default)]
505 #[with_option(allow_alter_on_fly)]
506 pub compaction_type: Option<CompactionType>,
507
508 #[serde(rename = "compaction.write_parquet_compression", default)]
512 #[with_option(allow_alter_on_fly)]
513 pub write_parquet_compression: Option<String>,
514
515 #[serde(rename = "compaction.write_parquet_max_row_group_rows", default)]
518 #[serde_as(as = "Option<DisplayFromStr>")]
519 #[with_option(allow_alter_on_fly)]
520 pub write_parquet_max_row_group_rows: Option<usize>,
521}
522
523impl EnforceSecret for IcebergConfig {
524 fn enforce_secret<'a>(
525 prop_iter: impl Iterator<Item = &'a str>,
526 ) -> crate::error::ConnectorResult<()> {
527 for prop in prop_iter {
528 IcebergCommon::enforce_one(prop)?;
529 }
530 Ok(())
531 }
532
533 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
534 IcebergCommon::enforce_one(prop)
535 }
536}
537
538impl IcebergConfig {
539 fn validate_append_only_write_mode(
542 sink_type: &str,
543 write_mode: IcebergWriteMode,
544 ) -> Result<()> {
545 if sink_type == SINK_TYPE_APPEND_ONLY && write_mode == IcebergWriteMode::CopyOnWrite {
546 return Err(SinkError::Config(anyhow!(
547 "'copy-on-write' mode is not supported for append-only iceberg sink. \
548 Please use 'merge-on-read' instead, which is strictly better for append-only workloads."
549 )));
550 }
551 Ok(())
552 }
553
554 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
555 let mut config =
556 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
557 .map_err(|e| SinkError::Config(anyhow!(e)))?;
558
559 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
560 return Err(SinkError::Config(anyhow!(
561 "`{}` must be {}, or {}",
562 SINK_TYPE_OPTION,
563 SINK_TYPE_APPEND_ONLY,
564 SINK_TYPE_UPSERT
565 )));
566 }
567
568 if config.r#type == SINK_TYPE_UPSERT {
569 if let Some(primary_key) = &config.primary_key {
570 if primary_key.is_empty() {
571 return Err(SinkError::Config(anyhow!(
572 "`primary-key` must not be empty in {}",
573 SINK_TYPE_UPSERT
574 )));
575 }
576 } else {
577 return Err(SinkError::Config(anyhow!(
578 "Must set `primary-key` in {}",
579 SINK_TYPE_UPSERT
580 )));
581 }
582 }
583
584 Self::validate_append_only_write_mode(&config.r#type, config.write_mode)?;
586
587 config.java_catalog_props = values
589 .iter()
590 .filter(|(k, _v)| {
591 k.starts_with("catalog.")
592 && k != &"catalog.uri"
593 && k != &"catalog.type"
594 && k != &"catalog.name"
595 && k != &"catalog.header"
596 })
597 .map(|(k, v)| (k[8..].to_string(), v.clone()))
598 .collect();
599
600 if config.commit_checkpoint_interval == 0 {
601 return Err(SinkError::Config(anyhow!(
602 "`commit-checkpoint-interval` must be greater than 0"
603 )));
604 }
605
606 if config.commit_checkpoint_size_threshold_mb == Some(0) {
607 return Err(SinkError::Config(anyhow!(
608 "`commit_checkpoint_size_threshold_mb` must be greater than 0"
609 )));
610 }
611
612 config
614 .table
615 .validate()
616 .map_err(|e| SinkError::Config(anyhow!(e)))?;
617
618 Ok(config)
619 }
620
621 pub fn catalog_type(&self) -> &str {
622 self.common.catalog_type()
623 }
624
625 pub async fn load_table(&self) -> Result<Table> {
626 self.common
627 .load_table(&self.table, &self.java_catalog_props)
628 .await
629 .map_err(Into::into)
630 }
631
632 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
633 self.common
634 .create_catalog(&self.java_catalog_props)
635 .await
636 .map_err(Into::into)
637 }
638
639 pub fn full_table_name(&self) -> Result<TableIdent> {
640 self.table.to_table_ident().map_err(Into::into)
641 }
642
643 pub fn catalog_name(&self) -> String {
644 self.common.catalog_name()
645 }
646
647 pub fn table_format_version(&self) -> FormatVersion {
648 self.format_version
649 }
650
651 pub fn compaction_interval_sec(&self) -> u64 {
652 self.compaction_interval_sec.unwrap_or(3600)
654 }
655
656 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
659 self.snapshot_expiration_max_age_millis
660 .map(|max_age_millis| current_time_ms - max_age_millis)
661 }
662
663 pub fn trigger_snapshot_count(&self) -> usize {
664 self.trigger_snapshot_count.unwrap_or(usize::MAX)
665 }
666
667 pub fn small_files_threshold_mb(&self) -> u64 {
668 self.small_files_threshold_mb.unwrap_or(64)
669 }
670
671 pub fn delete_files_count_threshold(&self) -> usize {
672 self.delete_files_count_threshold.unwrap_or(256)
673 }
674
675 pub fn target_file_size_mb(&self) -> u64 {
676 self.target_file_size_mb.unwrap_or(1024)
677 }
678
679 pub fn commit_checkpoint_size_threshold_bytes(&self) -> Option<u64> {
680 self.commit_checkpoint_size_threshold_mb
681 .map(|threshold_mb| threshold_mb.saturating_mul(1024 * 1024))
682 }
683
684 pub fn compaction_type(&self) -> CompactionType {
687 self.compaction_type.unwrap_or_default()
688 }
689
690 pub fn write_parquet_compression(&self) -> &str {
693 self.write_parquet_compression.as_deref().unwrap_or("zstd")
694 }
695
696 pub fn write_parquet_max_row_group_rows(&self) -> usize {
699 self.write_parquet_max_row_group_rows.unwrap_or(122880)
700 }
701
702 pub fn get_parquet_compression(&self) -> Compression {
705 parse_parquet_compression(self.write_parquet_compression())
706 }
707}
708
709fn parse_parquet_compression(codec: &str) -> Compression {
711 match codec.to_lowercase().as_str() {
712 "uncompressed" => Compression::UNCOMPRESSED,
713 "snappy" => Compression::SNAPPY,
714 "gzip" => Compression::GZIP(Default::default()),
715 "lzo" => Compression::LZO,
716 "brotli" => Compression::BROTLI(Default::default()),
717 "lz4" => Compression::LZ4,
718 "zstd" => Compression::ZSTD(Default::default()),
719 _ => {
720 tracing::warn!(
721 "Unknown compression codec '{}', falling back to SNAPPY",
722 codec
723 );
724 Compression::SNAPPY
725 }
726 }
727}
728
729pub struct IcebergSink {
730 pub config: IcebergConfig,
731 param: SinkParam,
732 unique_column_ids: Option<Vec<usize>>,
734}
735
736impl EnforceSecret for IcebergSink {
737 fn enforce_secret<'a>(
738 prop_iter: impl Iterator<Item = &'a str>,
739 ) -> crate::error::ConnectorResult<()> {
740 for prop in prop_iter {
741 IcebergConfig::enforce_one(prop)?;
742 }
743 Ok(())
744 }
745}
746
747impl TryFrom<SinkParam> for IcebergSink {
748 type Error = SinkError;
749
750 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
751 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
752 IcebergSink::new(config, param)
753 }
754}
755
756impl Debug for IcebergSink {
757 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
758 f.debug_struct("IcebergSink")
759 .field("config", &self.config)
760 .finish()
761 }
762}
763
764async fn create_and_validate_table_impl(
765 config: &IcebergConfig,
766 param: &SinkParam,
767) -> Result<Table> {
768 if config.create_table_if_not_exists {
769 create_table_if_not_exists_impl(config, param).await?;
770 }
771
772 let table = config
773 .load_table()
774 .await
775 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
776
777 let sink_schema = param.schema();
778 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
779 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
780
781 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
782 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
783
784 Ok(table)
785}
786
787async fn create_table_if_not_exists_impl(config: &IcebergConfig, param: &SinkParam) -> Result<()> {
788 let catalog = config.create_catalog().await?;
789 let namespace = if let Some(database_name) = config.table.database_name() {
790 let namespace = NamespaceIdent::new(database_name.to_owned());
791 if !catalog
792 .namespace_exists(&namespace)
793 .await
794 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
795 {
796 catalog
797 .create_namespace(&namespace, HashMap::default())
798 .await
799 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
800 .context("failed to create iceberg namespace")?;
801 }
802 namespace
803 } else {
804 bail!("database name must be set if you want to create table")
805 };
806
807 let table_id = config
808 .full_table_name()
809 .context("Unable to parse table name")?;
810 if !catalog
811 .table_exists(&table_id)
812 .await
813 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
814 {
815 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
816 let arrow_fields = param
818 .columns
819 .iter()
820 .map(|column| {
821 Ok(iceberg_create_table_arrow_convert
822 .to_arrow_field(&column.name, &column.data_type)
823 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
824 .context(format!(
825 "failed to convert {}: {} to arrow type",
826 &column.name, &column.data_type
827 ))?)
828 })
829 .collect::<Result<Vec<ArrowField>>>()?;
830 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
831 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
832 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
833 .context("failed to convert arrow schema to iceberg schema")?;
834
835 let location = {
836 let mut names = namespace.clone().inner();
837 names.push(config.table.table_name().to_owned());
838 match &config.common.warehouse_path {
839 Some(warehouse_path) => {
840 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
841 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
843 let url = Url::parse(warehouse_path);
844 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
845 if config.common.catalog_type() == "rest"
848 || config.common.catalog_type() == "rest_rust"
849 {
850 None
851 } else {
852 bail!(format!("Invalid warehouse path: {}", warehouse_path))
853 }
854 } else if warehouse_path.ends_with('/') {
855 Some(format!("{}{}", warehouse_path, names.join("/")))
856 } else {
857 Some(format!("{}/{}", warehouse_path, names.join("/")))
858 }
859 }
860 None => None,
861 }
862 };
863
864 let partition_spec = match &config.partition_by {
865 Some(partition_by) => {
866 let mut partition_fields = Vec::<UnboundPartitionField>::new();
867 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
868 .into_iter()
869 .enumerate()
870 {
871 match iceberg_schema.field_id_by_name(&column) {
872 Some(id) => partition_fields.push(
873 UnboundPartitionField::builder()
874 .source_id(id)
875 .transform(transform)
876 .name(format!("_p_{}", column))
877 .field_id(PARTITION_DATA_ID_START + i as i32)
878 .build(),
879 ),
880 None => bail!(format!(
881 "Partition source column does not exist in schema: {}",
882 column
883 )),
884 };
885 }
886 Some(
887 UnboundPartitionSpec::builder()
888 .with_spec_id(0)
889 .add_partition_fields(partition_fields)
890 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
891 .context("failed to add partition columns")?
892 .build(),
893 )
894 }
895 None => None,
896 };
897
898 let properties = HashMap::from([(
900 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
901 (config.format_version as u8).to_string(),
902 )]);
903
904 let table_creation_builder = TableCreation::builder()
905 .name(config.table.table_name().to_owned())
906 .schema(iceberg_schema)
907 .format_version(config.table_format_version())
908 .properties(properties);
909
910 let table_creation = match (location, partition_spec) {
911 (Some(location), Some(partition_spec)) => table_creation_builder
912 .location(location)
913 .partition_spec(partition_spec)
914 .build(),
915 (Some(location), None) => table_creation_builder.location(location).build(),
916 (None, Some(partition_spec)) => table_creation_builder
917 .partition_spec(partition_spec)
918 .build(),
919 (None, None) => table_creation_builder.build(),
920 };
921
922 catalog
923 .create_table(&namespace, table_creation)
924 .await
925 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
926 .context("failed to create iceberg table")?;
927 }
928 Ok(())
929}
930
931impl IcebergSink {
932 pub async fn create_and_validate_table(&self) -> Result<Table> {
933 create_and_validate_table_impl(&self.config, &self.param).await
934 }
935
936 pub async fn create_table_if_not_exists(&self) -> Result<()> {
937 create_table_if_not_exists_impl(&self.config, &self.param).await
938 }
939
940 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
941 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
942 if let Some(pk) = &config.primary_key {
943 let mut unique_column_ids = Vec::with_capacity(pk.len());
944 for col_name in pk {
945 let id = param
946 .columns
947 .iter()
948 .find(|col| col.name.as_str() == col_name)
949 .ok_or_else(|| {
950 SinkError::Config(anyhow!(
951 "Primary key column {} not found in sink schema",
952 col_name
953 ))
954 })?
955 .column_id
956 .get_id() as usize;
957 unique_column_ids.push(id);
958 }
959 Some(unique_column_ids)
960 } else {
961 unreachable!()
962 }
963 } else {
964 None
965 };
966 Ok(Self {
967 config,
968 param,
969 unique_column_ids,
970 })
971 }
972}
973
974impl Sink for IcebergSink {
975 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
976
977 const SINK_NAME: &'static str = ICEBERG_SINK;
978
979 async fn validate(&self) -> Result<()> {
980 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
981 bail!("Snowflake catalog only supports iceberg sources");
982 }
983
984 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
985 risingwave_common::license::Feature::IcebergSinkWithGlue
986 .check_available()
987 .map_err(|e| anyhow::anyhow!(e))?;
988 }
989
990 IcebergConfig::validate_append_only_write_mode(
992 &self.config.r#type,
993 self.config.write_mode,
994 )?;
995
996 let compaction_type = self.config.compaction_type();
998
999 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
1002 && compaction_type != CompactionType::Full
1003 {
1004 bail!(
1005 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
1006 compaction_type
1007 );
1008 }
1009
1010 match compaction_type {
1011 CompactionType::SmallFiles => {
1012 risingwave_common::license::Feature::IcebergCompaction
1014 .check_available()
1015 .map_err(|e| anyhow::anyhow!(e))?;
1016
1017 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1019 bail!(
1020 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1021 self.config.write_mode
1022 );
1023 }
1024
1025 if self.config.delete_files_count_threshold.is_some() {
1027 bail!(
1028 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
1029 );
1030 }
1031 }
1032 CompactionType::FilesWithDelete => {
1033 risingwave_common::license::Feature::IcebergCompaction
1035 .check_available()
1036 .map_err(|e| anyhow::anyhow!(e))?;
1037
1038 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
1040 bail!(
1041 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
1042 self.config.write_mode
1043 );
1044 }
1045
1046 if self.config.small_files_threshold_mb.is_some() {
1048 bail!(
1049 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
1050 );
1051 }
1052 }
1053 CompactionType::Full => {
1054 }
1056 }
1057
1058 let _ = self.create_and_validate_table().await?;
1059 Ok(())
1060 }
1061
1062 fn support_schema_change() -> bool {
1063 true
1064 }
1065
1066 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
1067 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
1068
1069 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
1071 if iceberg_config.enable_compaction && compaction_interval == 0 {
1072 bail!(
1073 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
1074 );
1075 }
1076
1077 tracing::info!(
1079 "Alter config compaction_interval set to {} seconds",
1080 compaction_interval
1081 );
1082 }
1083
1084 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
1086 && max_snapshots < 1
1087 {
1088 bail!(
1089 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
1090 max_snapshots
1091 );
1092 }
1093
1094 if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
1096 && target_file_size_mb == 0
1097 {
1098 bail!("`compaction.target_file_size_mb` must be greater than 0");
1099 }
1100
1101 if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
1103 && max_row_group_rows == 0
1104 {
1105 bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
1106 }
1107
1108 if let Some(ref compression) = iceberg_config.write_parquet_compression {
1110 let valid_codecs = [
1111 "uncompressed",
1112 "snappy",
1113 "gzip",
1114 "lzo",
1115 "brotli",
1116 "lz4",
1117 "zstd",
1118 ];
1119 if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
1120 bail!(
1121 "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
1122 valid_codecs,
1123 compression
1124 );
1125 }
1126 }
1127
1128 Ok(())
1129 }
1130
1131 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
1132 let writer = IcebergSinkWriter::new(
1133 self.config.clone(),
1134 self.param.clone(),
1135 writer_param.clone(),
1136 self.unique_column_ids.clone(),
1137 );
1138
1139 let commit_checkpoint_interval =
1140 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
1141 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
1142 );
1143 let log_sinker = CoordinatedLogSinker::new(
1144 &writer_param,
1145 self.param.clone(),
1146 writer,
1147 commit_checkpoint_interval,
1148 )
1149 .await?;
1150
1151 Ok(log_sinker)
1152 }
1153
1154 fn is_coordinated_sink(&self) -> bool {
1155 true
1156 }
1157
1158 async fn new_coordinator(
1159 &self,
1160 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1161 ) -> Result<SinkCommitCoordinator> {
1162 let catalog = self.config.create_catalog().await?;
1163 let table = self.create_and_validate_table().await?;
1164 let coordinator = IcebergSinkCommitter {
1165 catalog,
1166 table,
1167 last_commit_epoch: 0,
1168 sink_id: self.param.sink_id,
1169 config: self.config.clone(),
1170 param: self.param.clone(),
1171 commit_retry_num: self.config.commit_retry_num,
1172 iceberg_compact_stat_sender,
1173 };
1174 if self.config.is_exactly_once.unwrap_or_default() {
1175 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
1176 } else {
1177 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
1178 }
1179 }
1180}
1181
1182enum ProjectIdxVec {
1189 None,
1190 Prepare(usize),
1191 Done(Vec<usize>),
1192}
1193
1194type DataFileWriterBuilderType =
1195 DataFileWriterBuilder<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>;
1196type PositionDeleteFileWriterBuilderType = PositionDeleteFileWriterBuilder<
1197 ParquetWriterBuilder,
1198 DefaultLocationGenerator,
1199 DefaultFileNameGenerator,
1200>;
1201type PositionDeleteFileWriterType = PositionDeleteFileWriter<
1202 ParquetWriterBuilder,
1203 DefaultLocationGenerator,
1204 DefaultFileNameGenerator,
1205>;
1206type DeletionVectorWriterBuilderType =
1207 DeletionVectorWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
1208type DeletionVectorWriterType =
1209 DeletionVectorWriter<DefaultLocationGenerator, DefaultFileNameGenerator>;
1210type EqualityDeleteFileWriterBuilderType = EqualityDeleteFileWriterBuilder<
1211 ParquetWriterBuilder,
1212 DefaultLocationGenerator,
1213 DefaultFileNameGenerator,
1214>;
1215
1216#[derive(Clone)]
1217enum PositionDeleteWriterBuilderType {
1218 PositionDelete(PositionDeleteFileWriterBuilderType),
1219 DeletionVector(DeletionVectorWriterBuilderType),
1220}
1221
1222enum PositionDeleteWriterType {
1223 PositionDelete(PositionDeleteFileWriterType),
1224 DeletionVector(DeletionVectorWriterType),
1225}
1226
1227#[async_trait]
1228impl IcebergWriterBuilder<Vec<PositionDeleteInput>> for PositionDeleteWriterBuilderType {
1229 type R = PositionDeleteWriterType;
1230
1231 async fn build(
1232 &self,
1233 partition_key: Option<iceberg::spec::PartitionKey>,
1234 ) -> iceberg::Result<Self::R> {
1235 match self {
1236 PositionDeleteWriterBuilderType::PositionDelete(builder) => Ok(
1237 PositionDeleteWriterType::PositionDelete(builder.build(partition_key).await?),
1238 ),
1239 PositionDeleteWriterBuilderType::DeletionVector(builder) => Ok(
1240 PositionDeleteWriterType::DeletionVector(builder.build(partition_key).await?),
1241 ),
1242 }
1243 }
1244}
1245
1246#[async_trait]
1247impl IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriterType {
1248 async fn write(&mut self, input: Vec<PositionDeleteInput>) -> iceberg::Result<()> {
1249 match self {
1250 PositionDeleteWriterType::PositionDelete(writer) => writer.write(input).await,
1251 PositionDeleteWriterType::DeletionVector(writer) => writer.write(input).await,
1252 }
1253 }
1254
1255 async fn close(&mut self) -> iceberg::Result<Vec<DataFile>> {
1256 match self {
1257 PositionDeleteWriterType::PositionDelete(writer) => writer.close().await,
1258 PositionDeleteWriterType::DeletionVector(writer) => writer.close().await,
1259 }
1260 }
1261}
1262
1263#[derive(Clone)]
1264struct SharedIcebergWriterBuilder<B>(Arc<B>);
1265
1266#[async_trait]
1267impl<B: IcebergWriterBuilder> IcebergWriterBuilder for SharedIcebergWriterBuilder<B> {
1268 type R = B::R;
1269
1270 async fn build(
1271 &self,
1272 partition_key: Option<iceberg::spec::PartitionKey>,
1273 ) -> iceberg::Result<Self::R> {
1274 self.0.build(partition_key).await
1275 }
1276}
1277
1278#[derive(Clone)]
1279struct TaskWriterBuilderWrapper<B: IcebergWriterBuilder> {
1280 inner: Arc<B>,
1281 fanout_enabled: bool,
1282 schema: IcebergSchemaRef,
1283 partition_spec: PartitionSpecRef,
1284 compute_partition: bool,
1285}
1286
1287impl<B: IcebergWriterBuilder> TaskWriterBuilderWrapper<B> {
1288 fn new(
1289 inner: B,
1290 fanout_enabled: bool,
1291 schema: IcebergSchemaRef,
1292 partition_spec: PartitionSpecRef,
1293 compute_partition: bool,
1294 ) -> Self {
1295 Self {
1296 inner: Arc::new(inner),
1297 fanout_enabled,
1298 schema,
1299 partition_spec,
1300 compute_partition,
1301 }
1302 }
1303
1304 fn build(&self) -> iceberg::Result<TaskWriter<SharedIcebergWriterBuilder<B>>> {
1305 let partition_splitter = match (
1306 self.partition_spec.is_unpartitioned(),
1307 self.compute_partition,
1308 ) {
1309 (true, _) => None,
1310 (false, true) => Some(RecordBatchPartitionSplitter::try_new_with_computed_values(
1311 self.schema.clone(),
1312 self.partition_spec.clone(),
1313 )?),
1314 (false, false) => Some(
1315 RecordBatchPartitionSplitter::try_new_with_precomputed_values(
1316 self.schema.clone(),
1317 self.partition_spec.clone(),
1318 )?,
1319 ),
1320 };
1321
1322 Ok(TaskWriter::new_with_partition_splitter(
1323 SharedIcebergWriterBuilder(self.inner.clone()),
1324 self.fanout_enabled,
1325 self.schema.clone(),
1326 self.partition_spec.clone(),
1327 partition_splitter,
1328 ))
1329 }
1330}
1331
1332pub enum IcebergSinkWriter {
1333 Created(IcebergSinkWriterArgs),
1334 Initialized(IcebergSinkWriterInner),
1335}
1336
1337pub struct IcebergSinkWriterArgs {
1338 config: IcebergConfig,
1339 sink_param: SinkParam,
1340 writer_param: SinkWriterParam,
1341 unique_column_ids: Option<Vec<usize>>,
1342}
1343
1344pub struct IcebergSinkWriterInner {
1345 writer: IcebergWriterDispatch,
1346 arrow_schema: SchemaRef,
1347 metrics: IcebergWriterMetrics,
1349 table: Table,
1351 project_idx_vec: ProjectIdxVec,
1354 commit_checkpoint_size_threshold_bytes: Option<u64>,
1355 uncommitted_write_bytes: u64,
1356}
1357
1358#[allow(clippy::type_complexity)]
1359enum IcebergWriterDispatch {
1360 Append {
1361 writer: Option<Box<dyn IcebergWriter>>,
1362 writer_builder:
1363 TaskWriterBuilderWrapper<MonitoredGeneralWriterBuilder<DataFileWriterBuilderType>>,
1364 },
1365 Upsert {
1366 writer: Option<Box<dyn IcebergWriter>>,
1367 writer_builder: TaskWriterBuilderWrapper<
1368 MonitoredGeneralWriterBuilder<
1369 DeltaWriterBuilder<
1370 DataFileWriterBuilderType,
1371 PositionDeleteWriterBuilderType,
1372 EqualityDeleteFileWriterBuilderType,
1373 >,
1374 >,
1375 >,
1376 arrow_schema_with_op_column: SchemaRef,
1377 },
1378}
1379
1380impl IcebergWriterDispatch {
1381 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
1382 match self {
1383 IcebergWriterDispatch::Append { writer, .. }
1384 | IcebergWriterDispatch::Upsert { writer, .. } => writer.as_mut(),
1385 }
1386 }
1387}
1388
1389pub struct IcebergWriterMetrics {
1390 _write_qps: LabelGuardedIntCounter,
1395 _write_latency: LabelGuardedHistogram,
1396 write_bytes: LabelGuardedIntCounter,
1397}
1398
1399impl IcebergSinkWriter {
1400 pub fn new(
1401 config: IcebergConfig,
1402 sink_param: SinkParam,
1403 writer_param: SinkWriterParam,
1404 unique_column_ids: Option<Vec<usize>>,
1405 ) -> Self {
1406 Self::Created(IcebergSinkWriterArgs {
1407 config,
1408 sink_param,
1409 writer_param,
1410 unique_column_ids,
1411 })
1412 }
1413}
1414
1415impl IcebergSinkWriterInner {
1416 fn should_commit_on_checkpoint(&self) -> bool {
1417 self.commit_checkpoint_size_threshold_bytes
1418 .is_some_and(|threshold| {
1419 self.uncommitted_write_bytes > 0 && self.uncommitted_write_bytes >= threshold
1420 })
1421 }
1422
1423 fn build_append_only(
1424 config: &IcebergConfig,
1425 table: Table,
1426 writer_param: &SinkWriterParam,
1427 ) -> Result<Self> {
1428 let SinkWriterParam {
1429 extra_partition_col_idx,
1430 actor_id,
1431 sink_id,
1432 sink_name,
1433 ..
1434 } = writer_param;
1435 let metrics_labels = [
1436 &actor_id.to_string(),
1437 &sink_id.to_string(),
1438 sink_name.as_str(),
1439 ];
1440
1441 let write_qps = GLOBAL_SINK_METRICS
1443 .iceberg_write_qps
1444 .with_guarded_label_values(&metrics_labels);
1445 let write_latency = GLOBAL_SINK_METRICS
1446 .iceberg_write_latency
1447 .with_guarded_label_values(&metrics_labels);
1448 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1451 .iceberg_rolling_unflushed_data_file
1452 .with_guarded_label_values(&metrics_labels);
1453 let write_bytes = GLOBAL_SINK_METRICS
1454 .iceberg_write_bytes
1455 .with_guarded_label_values(&metrics_labels);
1456
1457 let schema = table.metadata().current_schema();
1458 let partition_spec = table.metadata().default_partition_spec();
1459 let fanout_enabled = !partition_spec.fields().is_empty();
1460 let unique_uuid_suffix = Uuid::now_v7();
1462
1463 let parquet_writer_properties = WriterProperties::builder()
1464 .set_compression(config.get_parquet_compression())
1465 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1466 .set_created_by(PARQUET_CREATED_BY.to_owned())
1467 .build();
1468
1469 let parquet_writer_builder =
1470 ParquetWriterBuilder::new(parquet_writer_properties, schema.clone());
1471 let rolling_builder = RollingFileWriterBuilder::new(
1472 parquet_writer_builder,
1473 (config.target_file_size_mb() * 1024 * 1024) as usize,
1474 table.file_io().clone(),
1475 DefaultLocationGenerator::new(table.metadata().clone())
1476 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1477 DefaultFileNameGenerator::new(
1478 writer_param.actor_id.to_string(),
1479 Some(unique_uuid_suffix.to_string()),
1480 iceberg::spec::DataFileFormat::Parquet,
1481 ),
1482 );
1483 let data_file_builder = DataFileWriterBuilder::new(rolling_builder);
1484 let monitored_builder = MonitoredGeneralWriterBuilder::new(
1485 data_file_builder,
1486 write_qps.clone(),
1487 write_latency.clone(),
1488 );
1489 let writer_builder = TaskWriterBuilderWrapper::new(
1490 monitored_builder,
1491 fanout_enabled,
1492 schema.clone(),
1493 partition_spec.clone(),
1494 true,
1495 );
1496 let inner_writer = Some(Box::new(
1497 writer_builder
1498 .build()
1499 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1500 ) as Box<dyn IcebergWriter>);
1501 Ok(Self {
1502 arrow_schema: Arc::new(
1503 schema_to_arrow_schema(table.metadata().current_schema())
1504 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1505 ),
1506 metrics: IcebergWriterMetrics {
1507 _write_qps: write_qps,
1508 _write_latency: write_latency,
1509 write_bytes,
1510 },
1511 writer: IcebergWriterDispatch::Append {
1512 writer: inner_writer,
1513 writer_builder,
1514 },
1515 table,
1516 project_idx_vec: {
1517 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1518 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1519 } else {
1520 ProjectIdxVec::None
1521 }
1522 },
1523 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1524 uncommitted_write_bytes: 0,
1525 })
1526 }
1527
1528 fn build_upsert(
1529 config: &IcebergConfig,
1530 table: Table,
1531 unique_column_ids: Vec<usize>,
1532 writer_param: &SinkWriterParam,
1533 ) -> Result<Self> {
1534 let SinkWriterParam {
1535 extra_partition_col_idx,
1536 actor_id,
1537 sink_id,
1538 sink_name,
1539 ..
1540 } = writer_param;
1541 let metrics_labels = [
1542 &actor_id.to_string(),
1543 &sink_id.to_string(),
1544 sink_name.as_str(),
1545 ];
1546 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1547
1548 let write_qps = GLOBAL_SINK_METRICS
1550 .iceberg_write_qps
1551 .with_guarded_label_values(&metrics_labels);
1552 let write_latency = GLOBAL_SINK_METRICS
1553 .iceberg_write_latency
1554 .with_guarded_label_values(&metrics_labels);
1555 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1558 .iceberg_rolling_unflushed_data_file
1559 .with_guarded_label_values(&metrics_labels);
1560 let write_bytes = GLOBAL_SINK_METRICS
1561 .iceberg_write_bytes
1562 .with_guarded_label_values(&metrics_labels);
1563
1564 let schema = table.metadata().current_schema();
1566 let partition_spec = table.metadata().default_partition_spec();
1567 let fanout_enabled = !partition_spec.fields().is_empty();
1568 let use_deletion_vectors = table.metadata().format_version() >= FormatVersion::V3;
1569
1570 let unique_uuid_suffix = Uuid::now_v7();
1572
1573 let parquet_writer_properties = WriterProperties::builder()
1574 .set_compression(config.get_parquet_compression())
1575 .set_max_row_group_size(config.write_parquet_max_row_group_rows())
1576 .set_created_by(PARQUET_CREATED_BY.to_owned())
1577 .build();
1578
1579 let data_file_builder = {
1580 let parquet_writer_builder =
1581 ParquetWriterBuilder::new(parquet_writer_properties.clone(), schema.clone());
1582 let rolling_writer_builder = RollingFileWriterBuilder::new(
1583 parquet_writer_builder,
1584 (config.target_file_size_mb() * 1024 * 1024) as usize,
1585 table.file_io().clone(),
1586 DefaultLocationGenerator::new(table.metadata().clone())
1587 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1588 DefaultFileNameGenerator::new(
1589 writer_param.actor_id.to_string(),
1590 Some(unique_uuid_suffix.to_string()),
1591 iceberg::spec::DataFileFormat::Parquet,
1592 ),
1593 );
1594 DataFileWriterBuilder::new(rolling_writer_builder)
1595 };
1596 let position_delete_builder = if use_deletion_vectors {
1597 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
1598 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1599 PositionDeleteWriterBuilderType::DeletionVector(DeletionVectorWriterBuilder::new(
1600 table.file_io().clone(),
1601 location_generator,
1602 DefaultFileNameGenerator::new(
1603 writer_param.actor_id.to_string(),
1604 Some(format!("delvec-{}", unique_uuid_suffix)),
1605 iceberg::spec::DataFileFormat::Puffin,
1606 ),
1607 ))
1608 } else {
1609 let parquet_writer_builder = ParquetWriterBuilder::new(
1610 parquet_writer_properties.clone(),
1611 POSITION_DELETE_SCHEMA.clone().into(),
1612 );
1613 let rolling_writer_builder = RollingFileWriterBuilder::new(
1614 parquet_writer_builder,
1615 (config.target_file_size_mb() * 1024 * 1024) as usize,
1616 table.file_io().clone(),
1617 DefaultLocationGenerator::new(table.metadata().clone())
1618 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1619 DefaultFileNameGenerator::new(
1620 writer_param.actor_id.to_string(),
1621 Some(format!("pos-del-{}", unique_uuid_suffix)),
1622 iceberg::spec::DataFileFormat::Parquet,
1623 ),
1624 );
1625 PositionDeleteWriterBuilderType::PositionDelete(PositionDeleteFileWriterBuilder::new(
1626 rolling_writer_builder,
1627 ))
1628 };
1629 let equality_delete_builder = {
1630 let eq_del_config = EqualityDeleteWriterConfig::new(
1631 unique_column_ids.clone(),
1632 table.metadata().current_schema().clone(),
1633 )
1634 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1635 let parquet_writer_builder = ParquetWriterBuilder::new(
1636 parquet_writer_properties,
1637 Arc::new(
1638 arrow_schema_to_schema(eq_del_config.projected_arrow_schema_ref())
1639 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1640 ),
1641 );
1642 let rolling_writer_builder = RollingFileWriterBuilder::new(
1643 parquet_writer_builder,
1644 (config.target_file_size_mb() * 1024 * 1024) as usize,
1645 table.file_io().clone(),
1646 DefaultLocationGenerator::new(table.metadata().clone())
1647 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1648 DefaultFileNameGenerator::new(
1649 writer_param.actor_id.to_string(),
1650 Some(format!("eq-del-{}", unique_uuid_suffix)),
1651 iceberg::spec::DataFileFormat::Parquet,
1652 ),
1653 );
1654
1655 EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, eq_del_config)
1656 };
1657 let delta_builder = DeltaWriterBuilder::new(
1658 data_file_builder,
1659 position_delete_builder,
1660 equality_delete_builder,
1661 unique_column_ids,
1662 schema.clone(),
1663 );
1664 let original_arrow_schema = Arc::new(
1665 schema_to_arrow_schema(table.metadata().current_schema())
1666 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1667 );
1668 let schema_with_extra_op_column = {
1669 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1670 new_fields.push(Arc::new(ArrowField::new(
1671 "op".to_owned(),
1672 ArrowDataType::Int32,
1673 false,
1674 )));
1675 Arc::new(ArrowSchema::new(new_fields))
1676 };
1677 let writer_builder = TaskWriterBuilderWrapper::new(
1678 MonitoredGeneralWriterBuilder::new(
1679 delta_builder,
1680 write_qps.clone(),
1681 write_latency.clone(),
1682 ),
1683 fanout_enabled,
1684 schema.clone(),
1685 partition_spec.clone(),
1686 true,
1687 );
1688 let inner_writer = Some(Box::new(
1689 writer_builder
1690 .build()
1691 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1692 ) as Box<dyn IcebergWriter>);
1693 Ok(Self {
1694 arrow_schema: original_arrow_schema,
1695 metrics: IcebergWriterMetrics {
1696 _write_qps: write_qps,
1697 _write_latency: write_latency,
1698 write_bytes,
1699 },
1700 table,
1701 writer: IcebergWriterDispatch::Upsert {
1702 writer: inner_writer,
1703 writer_builder,
1704 arrow_schema_with_op_column: schema_with_extra_op_column,
1705 },
1706 project_idx_vec: {
1707 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1708 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1709 } else {
1710 ProjectIdxVec::None
1711 }
1712 },
1713 commit_checkpoint_size_threshold_bytes: config.commit_checkpoint_size_threshold_bytes(),
1714 uncommitted_write_bytes: 0,
1715 })
1716 }
1717}
1718
1719#[async_trait]
1720impl SinkWriter for IcebergSinkWriter {
1721 type CommitMetadata = Option<SinkMetadata>;
1722
1723 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1725 let Self::Created(args) = self else {
1726 return Ok(());
1727 };
1728
1729 let table = create_and_validate_table_impl(&args.config, &args.sink_param).await?;
1730 let inner = match &args.unique_column_ids {
1731 Some(unique_column_ids) => IcebergSinkWriterInner::build_upsert(
1732 &args.config,
1733 table,
1734 unique_column_ids.clone(),
1735 &args.writer_param,
1736 )?,
1737 None => {
1738 IcebergSinkWriterInner::build_append_only(&args.config, table, &args.writer_param)?
1739 }
1740 };
1741
1742 *self = IcebergSinkWriter::Initialized(inner);
1743 Ok(())
1744 }
1745
1746 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1748 let Self::Initialized(inner) = self else {
1749 unreachable!("IcebergSinkWriter should be initialized before barrier");
1750 };
1751
1752 match &mut inner.writer {
1754 IcebergWriterDispatch::Append {
1755 writer,
1756 writer_builder,
1757 } => {
1758 if writer.is_none() {
1759 *writer = Some(Box::new(
1760 writer_builder
1761 .build()
1762 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1763 ));
1764 }
1765 }
1766 IcebergWriterDispatch::Upsert {
1767 writer,
1768 writer_builder,
1769 ..
1770 } => {
1771 if writer.is_none() {
1772 *writer = Some(Box::new(
1773 writer_builder
1774 .build()
1775 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1776 ));
1777 }
1778 }
1779 };
1780
1781 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1783 match &mut inner.project_idx_vec {
1784 ProjectIdxVec::None => {}
1785 ProjectIdxVec::Prepare(idx) => {
1786 if *idx >= chunk.columns().len() {
1787 return Err(SinkError::Iceberg(anyhow!(
1788 "invalid extra partition column index {}",
1789 idx
1790 )));
1791 }
1792 let project_idx_vec = (0..*idx)
1793 .chain(*idx + 1..chunk.columns().len())
1794 .collect_vec();
1795 chunk = chunk.project(&project_idx_vec);
1796 inner.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1797 }
1798 ProjectIdxVec::Done(idx_vec) => {
1799 chunk = chunk.project(idx_vec);
1800 }
1801 }
1802 if ops.is_empty() {
1803 return Ok(());
1804 }
1805 let write_batch_size = chunk.estimated_heap_size();
1806 let batch = match &inner.writer {
1807 IcebergWriterDispatch::Append { .. } => {
1808 let filters =
1810 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1811 chunk.set_visibility(filters);
1812 IcebergArrowConvert
1813 .to_record_batch(inner.arrow_schema.clone(), &chunk.compact_vis())
1814 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1815 }
1816 IcebergWriterDispatch::Upsert {
1817 arrow_schema_with_op_column,
1818 ..
1819 } => {
1820 let chunk = IcebergArrowConvert
1821 .to_record_batch(inner.arrow_schema.clone(), &chunk)
1822 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1823 let ops = Arc::new(Int32Array::from(
1824 ops.iter()
1825 .map(|op| match op {
1826 Op::UpdateInsert | Op::Insert => INSERT_OP,
1827 Op::UpdateDelete | Op::Delete => DELETE_OP,
1828 })
1829 .collect_vec(),
1830 ));
1831 let mut columns = chunk.columns().to_vec();
1832 columns.push(ops);
1833 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1834 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1835 }
1836 };
1837
1838 let writer = inner.writer.get_writer().unwrap();
1839 writer
1840 .write(batch)
1841 .instrument_await("iceberg_write")
1842 .await
1843 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1844 inner.metrics.write_bytes.inc_by(write_batch_size as _);
1845 inner.uncommitted_write_bytes = inner
1846 .uncommitted_write_bytes
1847 .saturating_add(write_batch_size as u64);
1848 Ok(())
1849 }
1850
1851 fn should_commit_on_checkpoint(&self) -> bool {
1852 match self {
1853 Self::Initialized(inner) => inner.should_commit_on_checkpoint(),
1854 Self::Created(_) => false,
1855 }
1856 }
1857
1858 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1861 let Self::Initialized(inner) = self else {
1862 unreachable!("IcebergSinkWriter should be initialized before barrier");
1863 };
1864
1865 if !is_checkpoint {
1867 return Ok(None);
1868 }
1869
1870 let close_result = match &mut inner.writer {
1871 IcebergWriterDispatch::Append {
1872 writer,
1873 writer_builder,
1874 } => {
1875 let close_result = match writer.take() {
1876 Some(mut writer) => {
1877 Some(writer.close().instrument_await("iceberg_close").await)
1878 }
1879 _ => None,
1880 };
1881 match writer_builder.build() {
1882 Ok(new_writer) => {
1883 *writer = Some(Box::new(new_writer));
1884 }
1885 _ => {
1886 warn!("Failed to build new writer after close");
1889 }
1890 }
1891 close_result
1892 }
1893 IcebergWriterDispatch::Upsert {
1894 writer,
1895 writer_builder,
1896 ..
1897 } => {
1898 let close_result = match writer.take() {
1899 Some(mut writer) => {
1900 Some(writer.close().instrument_await("iceberg_close").await)
1901 }
1902 _ => None,
1903 };
1904 match writer_builder.build() {
1905 Ok(new_writer) => {
1906 *writer = Some(Box::new(new_writer));
1907 }
1908 _ => {
1909 warn!("Failed to build new writer after close");
1912 }
1913 }
1914 close_result
1915 }
1916 };
1917
1918 match close_result {
1919 Some(Ok(result)) => {
1920 inner.uncommitted_write_bytes = 0;
1921 let format_version = inner.table.metadata().format_version();
1922 let partition_type = inner.table.metadata().default_partition_type();
1923 let data_files = result
1924 .into_iter()
1925 .map(|f| {
1926 let truncated = truncate_datafile(f);
1928 SerializedDataFile::try_from(truncated, partition_type, format_version)
1929 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1930 })
1931 .collect::<Result<Vec<_>>>()?;
1932 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1933 data_files,
1934 schema_id: inner.table.metadata().current_schema_id(),
1935 partition_spec_id: inner.table.metadata().default_partition_spec_id(),
1936 })?))
1937 }
1938 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1939 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1940 }
1941 }
1942}
1943
1944const SCHEMA_ID: &str = "schema_id";
1945const PARTITION_SPEC_ID: &str = "partition_spec_id";
1946const DATA_FILES: &str = "data_files";
1947
1948const MAX_COLUMN_STAT_SIZE: usize = 10240; fn truncate_datafile(mut data_file: DataFile) -> DataFile {
1972 data_file.lower_bounds.retain(|field_id, datum| {
1974 let size = match datum.to_bytes() {
1976 Ok(bytes) => bytes.len(),
1977 Err(_) => 0,
1978 };
1979
1980 if size > MAX_COLUMN_STAT_SIZE {
1981 tracing::debug!(
1982 field_id = field_id,
1983 size = size,
1984 "Truncating large lower_bound statistic"
1985 );
1986 return false;
1987 }
1988 true
1989 });
1990
1991 data_file.upper_bounds.retain(|field_id, datum| {
1993 let size = match datum.to_bytes() {
1995 Ok(bytes) => bytes.len(),
1996 Err(_) => 0,
1997 };
1998
1999 if size > MAX_COLUMN_STAT_SIZE {
2000 tracing::debug!(
2001 field_id = field_id,
2002 size = size,
2003 "Truncating large upper_bound statistic"
2004 );
2005 return false;
2006 }
2007 true
2008 });
2009
2010 data_file
2011}
2012
2013#[derive(Default, Clone)]
2014struct IcebergCommitResult {
2015 schema_id: i32,
2016 partition_spec_id: i32,
2017 data_files: Vec<SerializedDataFile>,
2018}
2019
2020impl IcebergCommitResult {
2021 fn try_from(value: &SinkMetadata) -> Result<Self> {
2022 if let Some(Serialized(v)) = &value.metadata {
2023 let mut values = if let serde_json::Value::Object(v) =
2024 serde_json::from_slice::<serde_json::Value>(&v.metadata)
2025 .context("Can't parse iceberg sink metadata")?
2026 {
2027 v
2028 } else {
2029 bail!("iceberg sink metadata should be an object");
2030 };
2031
2032 let schema_id;
2033 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2034 schema_id = value
2035 .as_u64()
2036 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2037 } else {
2038 bail!("iceberg sink metadata should have schema_id");
2039 }
2040
2041 let partition_spec_id;
2042 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2043 partition_spec_id = value
2044 .as_u64()
2045 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2046 } else {
2047 bail!("iceberg sink metadata should have partition_spec_id");
2048 }
2049
2050 let data_files: Vec<SerializedDataFile>;
2051 if let serde_json::Value::Array(values) = values
2052 .remove(DATA_FILES)
2053 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2054 {
2055 data_files = values
2056 .into_iter()
2057 .map(from_value::<SerializedDataFile>)
2058 .collect::<std::result::Result<_, _>>()
2059 .unwrap();
2060 } else {
2061 bail!("iceberg sink metadata should have data_files object");
2062 }
2063
2064 Ok(Self {
2065 schema_id: schema_id as i32,
2066 partition_spec_id: partition_spec_id as i32,
2067 data_files,
2068 })
2069 } else {
2070 bail!("Can't create iceberg sink write result from empty data!")
2071 }
2072 }
2073
2074 fn try_from_serialized_bytes(value: Vec<u8>) -> Result<Self> {
2075 let mut values = if let serde_json::Value::Object(value) =
2076 serde_json::from_slice::<serde_json::Value>(&value)
2077 .context("Can't parse iceberg sink metadata")?
2078 {
2079 value
2080 } else {
2081 bail!("iceberg sink metadata should be an object");
2082 };
2083
2084 let schema_id;
2085 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
2086 schema_id = value
2087 .as_u64()
2088 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
2089 } else {
2090 bail!("iceberg sink metadata should have schema_id");
2091 }
2092
2093 let partition_spec_id;
2094 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
2095 partition_spec_id = value
2096 .as_u64()
2097 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
2098 } else {
2099 bail!("iceberg sink metadata should have partition_spec_id");
2100 }
2101
2102 let data_files: Vec<SerializedDataFile>;
2103 if let serde_json::Value::Array(values) = values
2104 .remove(DATA_FILES)
2105 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
2106 {
2107 data_files = values
2108 .into_iter()
2109 .map(from_value::<SerializedDataFile>)
2110 .collect::<std::result::Result<_, _>>()
2111 .unwrap();
2112 } else {
2113 bail!("iceberg sink metadata should have data_files object");
2114 }
2115
2116 Ok(Self {
2117 schema_id: schema_id as i32,
2118 partition_spec_id: partition_spec_id as i32,
2119 data_files,
2120 })
2121 }
2122}
2123
2124impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
2125 type Error = SinkError;
2126
2127 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
2128 let json_data_files = serde_json::Value::Array(
2129 value
2130 .data_files
2131 .iter()
2132 .map(serde_json::to_value)
2133 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2134 .context("Can't serialize data files to json")?,
2135 );
2136 let json_value = serde_json::Value::Object(
2137 vec![
2138 (
2139 SCHEMA_ID.to_owned(),
2140 serde_json::Value::Number(value.schema_id.into()),
2141 ),
2142 (
2143 PARTITION_SPEC_ID.to_owned(),
2144 serde_json::Value::Number(value.partition_spec_id.into()),
2145 ),
2146 (DATA_FILES.to_owned(), json_data_files),
2147 ]
2148 .into_iter()
2149 .collect(),
2150 );
2151 Ok(SinkMetadata {
2152 metadata: Some(Serialized(SerializedMetadata {
2153 metadata: serde_json::to_vec(&json_value)
2154 .context("Can't serialize iceberg sink metadata")?,
2155 })),
2156 })
2157 }
2158}
2159
2160impl TryFrom<IcebergCommitResult> for Vec<u8> {
2161 type Error = SinkError;
2162
2163 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
2164 let json_data_files = serde_json::Value::Array(
2165 value
2166 .data_files
2167 .iter()
2168 .map(serde_json::to_value)
2169 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
2170 .context("Can't serialize data files to json")?,
2171 );
2172 let json_value = serde_json::Value::Object(
2173 vec![
2174 (
2175 SCHEMA_ID.to_owned(),
2176 serde_json::Value::Number(value.schema_id.into()),
2177 ),
2178 (
2179 PARTITION_SPEC_ID.to_owned(),
2180 serde_json::Value::Number(value.partition_spec_id.into()),
2181 ),
2182 (DATA_FILES.to_owned(), json_data_files),
2183 ]
2184 .into_iter()
2185 .collect(),
2186 );
2187 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
2188 }
2189}
2190pub struct IcebergSinkCommitter {
2191 catalog: Arc<dyn Catalog>,
2192 table: Table,
2193 pub last_commit_epoch: u64,
2194 pub(crate) sink_id: SinkId,
2195 pub(crate) config: IcebergConfig,
2196 pub(crate) param: SinkParam,
2197 commit_retry_num: u32,
2198 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
2199}
2200
2201impl IcebergSinkCommitter {
2202 async fn reload_table(
2205 catalog: &dyn Catalog,
2206 table_ident: &TableIdent,
2207 schema_id: i32,
2208 partition_spec_id: i32,
2209 ) -> Result<Table> {
2210 let table = catalog
2211 .load_table(table_ident)
2212 .await
2213 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2214 if table.metadata().current_schema_id() != schema_id {
2215 return Err(SinkError::Iceberg(anyhow!(
2216 "Schema evolution not supported, expect schema id {}, but got {}",
2217 schema_id,
2218 table.metadata().current_schema_id()
2219 )));
2220 }
2221 if table.metadata().default_partition_spec_id() != partition_spec_id {
2222 return Err(SinkError::Iceberg(anyhow!(
2223 "Partition evolution not supported, expect partition spec id {}, but got {}",
2224 partition_spec_id,
2225 table.metadata().default_partition_spec_id()
2226 )));
2227 }
2228 Ok(table)
2229 }
2230}
2231
2232#[async_trait]
2233impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
2234 async fn init(&mut self) -> Result<()> {
2235 tracing::info!(
2236 sink_id = %self.param.sink_id,
2237 "Iceberg sink coordinator initialized",
2238 );
2239
2240 Ok(())
2241 }
2242
2243 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
2244 tracing::debug!("Starting iceberg direct commit in epoch {epoch}");
2245
2246 if metadata.is_empty() {
2247 tracing::debug!(?epoch, "No datafile to commit");
2248 return Ok(());
2249 }
2250
2251 if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
2253 self.commit_data_impl(epoch, write_results, snapshot_id)
2254 .await?;
2255 }
2256
2257 Ok(())
2258 }
2259
2260 async fn commit_schema_change(
2261 &mut self,
2262 epoch: u64,
2263 schema_change: PbSinkSchemaChange,
2264 ) -> Result<()> {
2265 tracing::info!(
2266 "Committing schema change {:?} in epoch {}",
2267 schema_change,
2268 epoch
2269 );
2270 self.commit_schema_change_impl(schema_change).await?;
2271 tracing::info!("Successfully committed schema change in epoch {}", epoch);
2272
2273 Ok(())
2274 }
2275}
2276
2277#[async_trait]
2278impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
2279 async fn init(&mut self) -> Result<()> {
2280 tracing::info!(
2281 sink_id = %self.param.sink_id,
2282 "Iceberg sink coordinator initialized",
2283 );
2284
2285 Ok(())
2286 }
2287
2288 async fn pre_commit(
2289 &mut self,
2290 epoch: u64,
2291 metadata: Vec<SinkMetadata>,
2292 _schema_change: Option<PbSinkSchemaChange>,
2293 ) -> Result<Option<Vec<u8>>> {
2294 tracing::debug!("Starting iceberg pre commit in epoch {epoch}");
2295
2296 let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
2297 Some((write_results, snapshot_id)) => (write_results, snapshot_id),
2298 None => {
2299 tracing::debug!(?epoch, "no data to pre commit");
2300 return Ok(None);
2301 }
2302 };
2303
2304 let mut write_results_bytes = Vec::new();
2305 for each_parallelism_write_result in write_results {
2306 let each_parallelism_write_result_bytes: Vec<u8> =
2307 each_parallelism_write_result.try_into()?;
2308 write_results_bytes.push(each_parallelism_write_result_bytes);
2309 }
2310
2311 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
2312 write_results_bytes.push(snapshot_id_bytes);
2313
2314 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(write_results_bytes);
2315 Ok(Some(pre_commit_metadata_bytes))
2316 }
2317
2318 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
2319 tracing::debug!("Starting iceberg commit in epoch {epoch}");
2320
2321 if commit_metadata.is_empty() {
2322 tracing::debug!(?epoch, "No datafile to commit");
2323 return Ok(());
2324 }
2325
2326 let mut payload = deserialize_metadata(commit_metadata);
2328 if payload.is_empty() {
2329 return Err(SinkError::Iceberg(anyhow!(
2330 "Invalid commit metadata: empty payload"
2331 )));
2332 }
2333
2334 let snapshot_id_bytes = payload.pop().ok_or_else(|| {
2336 SinkError::Iceberg(anyhow!("Invalid commit metadata: missing snapshot_id"))
2337 })?;
2338 let snapshot_id = i64::from_le_bytes(
2339 snapshot_id_bytes
2340 .try_into()
2341 .map_err(|_| SinkError::Iceberg(anyhow!("Invalid snapshot id bytes")))?,
2342 );
2343
2344 let write_results = payload
2346 .into_iter()
2347 .map(IcebergCommitResult::try_from_serialized_bytes)
2348 .collect::<Result<Vec<_>>>()?;
2349
2350 let snapshot_committed = self
2351 .is_snapshot_id_in_iceberg(&self.config, snapshot_id)
2352 .await?;
2353
2354 if snapshot_committed {
2355 tracing::info!(
2356 "Snapshot id {} already committed in iceberg table, skip committing again.",
2357 snapshot_id
2358 );
2359 return Ok(());
2360 }
2361
2362 self.commit_data_impl(epoch, write_results, snapshot_id)
2363 .await
2364 }
2365
2366 async fn commit_schema_change(
2367 &mut self,
2368 epoch: u64,
2369 schema_change: PbSinkSchemaChange,
2370 ) -> Result<()> {
2371 let schema_updated = self.check_schema_change_applied(&schema_change)?;
2372 if schema_updated {
2373 tracing::info!("Schema change already committed in epoch {}, skip", epoch);
2374 return Ok(());
2375 }
2376
2377 tracing::info!(
2378 "Committing schema change {:?} in epoch {}",
2379 schema_change,
2380 epoch
2381 );
2382 self.commit_schema_change_impl(schema_change).await?;
2383 tracing::info!("Successfully committed schema change in epoch {epoch}");
2384
2385 Ok(())
2386 }
2387
2388 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
2389 tracing::debug!("Abort not implemented yet");
2391 }
2392}
2393
2394impl IcebergSinkCommitter {
2396 fn pre_commit_inner(
2397 &mut self,
2398 _epoch: u64,
2399 metadata: Vec<SinkMetadata>,
2400 ) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2401 let write_results: Vec<IcebergCommitResult> = metadata
2402 .iter()
2403 .map(IcebergCommitResult::try_from)
2404 .collect::<Result<Vec<IcebergCommitResult>>>()?;
2405
2406 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2408 return Ok(None);
2409 }
2410
2411 let expect_schema_id = write_results[0].schema_id;
2412 let expect_partition_spec_id = write_results[0].partition_spec_id;
2413
2414 if write_results
2416 .iter()
2417 .any(|r| r.schema_id != expect_schema_id)
2418 || write_results
2419 .iter()
2420 .any(|r| r.partition_spec_id != expect_partition_spec_id)
2421 {
2422 return Err(SinkError::Iceberg(anyhow!(
2423 "schema_id and partition_spec_id should be the same in all write results"
2424 )));
2425 }
2426
2427 let snapshot_id = FastAppendAction::generate_snapshot_id(&self.table);
2428
2429 Ok(Some((write_results, snapshot_id)))
2430 }
2431
2432 async fn commit_data_impl(
2433 &mut self,
2434 epoch: u64,
2435 write_results: Vec<IcebergCommitResult>,
2436 snapshot_id: i64,
2437 ) -> Result<()> {
2438 assert!(
2440 !write_results.is_empty() && !write_results.iter().all(|r| r.data_files.is_empty())
2441 );
2442
2443 self.wait_for_snapshot_limit().await?;
2445
2446 let expect_schema_id = write_results[0].schema_id;
2447 let expect_partition_spec_id = write_results[0].partition_spec_id;
2448
2449 self.table = Self::reload_table(
2451 self.catalog.as_ref(),
2452 self.table.identifier(),
2453 expect_schema_id,
2454 expect_partition_spec_id,
2455 )
2456 .await?;
2457
2458 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2459 return Err(SinkError::Iceberg(anyhow!(
2460 "Can't find schema by id {}",
2461 expect_schema_id
2462 )));
2463 };
2464 let Some(partition_spec) = self
2465 .table
2466 .metadata()
2467 .partition_spec_by_id(expect_partition_spec_id)
2468 else {
2469 return Err(SinkError::Iceberg(anyhow!(
2470 "Can't find partition spec by id {}",
2471 expect_partition_spec_id
2472 )));
2473 };
2474 let partition_type = partition_spec
2475 .as_ref()
2476 .clone()
2477 .partition_type(schema)
2478 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2479
2480 let data_files = write_results
2481 .into_iter()
2482 .flat_map(|r| {
2483 r.data_files.into_iter().map(|f| {
2484 f.try_into(expect_partition_spec_id, &partition_type, schema)
2485 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2486 })
2487 })
2488 .collect::<Result<Vec<DataFile>>>()?;
2489 let retry_strategy = ExponentialBackoff::from_millis(10)
2494 .max_delay(Duration::from_secs(60))
2495 .map(jitter)
2496 .take(self.commit_retry_num as usize);
2497 let catalog = self.catalog.clone();
2498 let table_ident = self.table.identifier().clone();
2499
2500 enum CommitError {
2505 ReloadTable(SinkError), Commit(SinkError), }
2508
2509 let table = RetryIf::spawn(
2510 retry_strategy,
2511 || async {
2512 let table = Self::reload_table(
2514 catalog.as_ref(),
2515 &table_ident,
2516 expect_schema_id,
2517 expect_partition_spec_id,
2518 )
2519 .await
2520 .map_err(|e| {
2521 tracing::error!(error = %e.as_report(), "Failed to reload iceberg table");
2522 CommitError::ReloadTable(e)
2523 })?;
2524
2525 let txn = Transaction::new(&table);
2526 let append_action = txn
2527 .fast_append()
2528 .set_snapshot_id(snapshot_id)
2529 .set_target_branch(commit_branch(
2530 self.config.r#type.as_str(),
2531 self.config.write_mode,
2532 ))
2533 .add_data_files(data_files.clone());
2534
2535 let tx = append_action.apply(txn).map_err(|err| {
2536 let err: IcebergError = err.into();
2537 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2538 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2539 })?;
2540
2541 tx.commit(catalog.as_ref()).await.map_err(|err| {
2542 let err: IcebergError = err.into();
2543 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2544 CommitError::Commit(SinkError::Iceberg(anyhow!(err)))
2545 })
2546 },
2547 |err: &CommitError| {
2548 match err {
2550 CommitError::Commit(_) => {
2551 tracing::warn!("Commit failed, will retry");
2552 true
2553 }
2554 CommitError::ReloadTable(_) => {
2555 tracing::error!(
2556 "reload_table failed with non-retriable error, will not retry"
2557 );
2558 false
2559 }
2560 }
2561 },
2562 )
2563 .await
2564 .map_err(|e| match e {
2565 CommitError::ReloadTable(e) | CommitError::Commit(e) => e,
2566 })?;
2567 self.table = table;
2568
2569 let snapshot_num = self.table.metadata().snapshots().count();
2570 let catalog_name = self.config.common.catalog_name();
2571 let table_name = self.table.identifier().to_string();
2572 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2573 GLOBAL_SINK_METRICS
2574 .iceberg_snapshot_num
2575 .with_guarded_label_values(&metrics_labels)
2576 .set(snapshot_num as i64);
2577
2578 tracing::debug!("Succeeded to commit to iceberg table in epoch {epoch}.");
2579
2580 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2581 && self.config.enable_compaction
2582 && iceberg_compact_stat_sender
2583 .send(IcebergSinkCompactionUpdate {
2584 sink_id: self.sink_id,
2585 compaction_interval: self.config.compaction_interval_sec(),
2586 force_compaction: false,
2587 })
2588 .is_err()
2589 {
2590 warn!("failed to send iceberg compaction stats");
2591 }
2592
2593 Ok(())
2594 }
2595
2596 async fn is_snapshot_id_in_iceberg(
2600 &self,
2601 iceberg_config: &IcebergConfig,
2602 snapshot_id: i64,
2603 ) -> Result<bool> {
2604 let table = iceberg_config.load_table().await?;
2605 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2606 Ok(true)
2607 } else {
2608 Ok(false)
2609 }
2610 }
2611
2612 fn check_schema_change_applied(&self, schema_change: &PbSinkSchemaChange) -> Result<bool> {
2615 let current_schema = self.table.metadata().current_schema();
2616 let current_arrow_schema = schema_to_arrow_schema(current_schema.as_ref())
2617 .context("Failed to convert schema")
2618 .map_err(SinkError::Iceberg)?;
2619
2620 let iceberg_arrow_convert = IcebergArrowConvert;
2621
2622 let schema_matches = |expected: &[ArrowField]| {
2623 if current_arrow_schema.fields().len() != expected.len() {
2624 return false;
2625 }
2626
2627 expected.iter().all(|expected_field| {
2628 current_arrow_schema.fields().iter().any(|current_field| {
2629 current_field.name() == expected_field.name()
2630 && current_field.data_type() == expected_field.data_type()
2631 })
2632 })
2633 };
2634
2635 let original_arrow_fields: Vec<ArrowField> = schema_change
2636 .original_schema
2637 .iter()
2638 .map(|pb_field| {
2639 let field = Field::from(pb_field);
2640 iceberg_arrow_convert
2641 .to_arrow_field(&field.name, &field.data_type)
2642 .context("Failed to convert field to arrow")
2643 .map_err(SinkError::Iceberg)
2644 })
2645 .collect::<Result<_>>()?;
2646
2647 if schema_matches(&original_arrow_fields) {
2649 tracing::debug!(
2650 "Current iceberg schema matches original_schema ({} columns); schema change not applied",
2651 original_arrow_fields.len()
2652 );
2653 return Ok(false);
2654 }
2655
2656 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2658 schema_change.op.as_ref()
2659 else {
2660 return Err(SinkError::Iceberg(anyhow!(
2661 "Unsupported sink schema change op in iceberg sink: {:?}",
2662 schema_change.op
2663 )));
2664 };
2665
2666 let add_arrow_fields: Vec<ArrowField> = add_columns_op
2667 .fields
2668 .iter()
2669 .map(|pb_field| {
2670 let field = Field::from(pb_field);
2671 iceberg_arrow_convert
2672 .to_arrow_field(&field.name, &field.data_type)
2673 .context("Failed to convert field to arrow")
2674 .map_err(SinkError::Iceberg)
2675 })
2676 .collect::<Result<_>>()?;
2677
2678 let mut expected_after_change = original_arrow_fields;
2679 expected_after_change.extend(add_arrow_fields);
2680
2681 if schema_matches(&expected_after_change) {
2683 tracing::debug!(
2684 "Current iceberg schema matches original_schema + add_columns ({} columns); schema change already applied",
2685 expected_after_change.len()
2686 );
2687 return Ok(true);
2688 }
2689
2690 Err(SinkError::Iceberg(anyhow!(
2691 "Current iceberg schema does not match either original_schema ({} cols) or original_schema + add_columns; cannot determine whether schema change is applied",
2692 schema_change.original_schema.len()
2693 )))
2694 }
2695
2696 async fn commit_schema_change_impl(&mut self, schema_change: PbSinkSchemaChange) -> Result<()> {
2700 use iceberg::spec::NestedField;
2701
2702 let Some(risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns_op)) =
2703 schema_change.op.as_ref()
2704 else {
2705 return Err(SinkError::Iceberg(anyhow!(
2706 "Unsupported sink schema change op in iceberg sink: {:?}",
2707 schema_change.op
2708 )));
2709 };
2710
2711 let add_columns = add_columns_op.fields.iter().map(Field::from).collect_vec();
2712
2713 let metadata = self.table.metadata();
2715 let mut next_field_id = metadata.last_column_id() + 1;
2716 tracing::debug!("Starting schema change, next_field_id: {}", next_field_id);
2717
2718 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
2720 let mut new_fields = Vec::new();
2721
2722 for field in &add_columns {
2723 let arrow_field = iceberg_create_table_arrow_convert
2725 .to_arrow_field(&field.name, &field.data_type)
2726 .with_context(|| format!("Failed to convert field '{}' to arrow", field.name))
2727 .map_err(SinkError::Iceberg)?;
2728
2729 let iceberg_type = iceberg::arrow::arrow_type_to_type(arrow_field.data_type())
2731 .map_err(|err| {
2732 SinkError::Iceberg(
2733 anyhow!(err).context("Failed to convert Arrow type to Iceberg type"),
2734 )
2735 })?;
2736
2737 let nested_field = Arc::new(NestedField::optional(
2739 next_field_id,
2740 &field.name,
2741 iceberg_type,
2742 ));
2743
2744 new_fields.push(nested_field);
2745 tracing::info!("Prepared field '{}' with ID {}", field.name, next_field_id);
2746 next_field_id += 1;
2747 }
2748
2749 tracing::info!(
2751 "Committing schema change to catalog for table {}",
2752 self.table.identifier()
2753 );
2754
2755 let txn = Transaction::new(&self.table);
2756 let action = txn.update_schema().add_fields(new_fields);
2757
2758 let updated_table = action
2759 .apply(txn)
2760 .context("Failed to apply schema update action")
2761 .map_err(SinkError::Iceberg)?
2762 .commit(self.catalog.as_ref())
2763 .await
2764 .context("Failed to commit table schema change")
2765 .map_err(SinkError::Iceberg)?;
2766
2767 self.table = updated_table;
2768
2769 tracing::info!(
2770 "Successfully committed schema change, added {} columns to iceberg table",
2771 add_columns.len()
2772 );
2773
2774 Ok(())
2775 }
2776
2777 fn count_snapshots_since_rewrite(&self) -> usize {
2780 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2781 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2782
2783 let mut count = 0;
2785 for snapshot in snapshots {
2786 let summary = snapshot.summary();
2788 match &summary.operation {
2789 Operation::Replace => {
2790 break;
2792 }
2793
2794 _ => {
2795 count += 1;
2797 }
2798 }
2799 }
2800
2801 count
2802 }
2803
2804 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2806 if !self.config.enable_compaction {
2807 return Ok(());
2808 }
2809
2810 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2811 loop {
2812 let current_count = self.count_snapshots_since_rewrite();
2813
2814 if current_count < max_snapshots {
2815 tracing::info!(
2816 "Snapshot count check passed: {} < {}",
2817 current_count,
2818 max_snapshots
2819 );
2820 break;
2821 }
2822
2823 tracing::info!(
2824 "Snapshot count {} exceeds limit {}, waiting...",
2825 current_count,
2826 max_snapshots
2827 );
2828
2829 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2830 && iceberg_compact_stat_sender
2831 .send(IcebergSinkCompactionUpdate {
2832 sink_id: self.sink_id,
2833 compaction_interval: self.config.compaction_interval_sec(),
2834 force_compaction: true,
2835 })
2836 .is_err()
2837 {
2838 tracing::warn!("failed to send iceberg compaction stats");
2839 }
2840
2841 tokio::time::sleep(Duration::from_secs(30)).await;
2843
2844 self.table = self.config.load_table().await?;
2846 }
2847 }
2848 Ok(())
2849 }
2850}
2851
2852const MAP_KEY: &str = "key";
2853const MAP_VALUE: &str = "value";
2854
2855fn get_fields<'a>(
2856 our_field_type: &'a risingwave_common::types::DataType,
2857 data_type: &ArrowDataType,
2858 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2859) -> Option<ArrowFields> {
2860 match data_type {
2861 ArrowDataType::Struct(fields) => {
2862 match our_field_type {
2863 risingwave_common::types::DataType::Struct(struct_fields) => {
2864 struct_fields.iter().for_each(|(name, data_type)| {
2865 let res = schema_fields.insert(name, data_type);
2866 assert!(res.is_none())
2868 });
2869 }
2870 risingwave_common::types::DataType::Map(map_fields) => {
2871 schema_fields.insert(MAP_KEY, map_fields.key());
2872 schema_fields.insert(MAP_VALUE, map_fields.value());
2873 }
2874 risingwave_common::types::DataType::List(list) => {
2875 list.elem()
2876 .as_struct()
2877 .iter()
2878 .for_each(|(name, data_type)| {
2879 let res = schema_fields.insert(name, data_type);
2880 assert!(res.is_none())
2882 });
2883 }
2884 _ => {}
2885 };
2886 Some(fields.clone())
2887 }
2888 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2889 get_fields(our_field_type, field.data_type(), schema_fields)
2890 }
2891 _ => None, }
2893}
2894
2895fn check_compatibility(
2896 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2897 fields: &ArrowFields,
2898) -> anyhow::Result<bool> {
2899 for arrow_field in fields {
2900 let our_field_type = schema_fields
2901 .get(arrow_field.name().as_str())
2902 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2903
2904 let converted_arrow_data_type = IcebergArrowConvert
2906 .to_arrow_field("", our_field_type)
2907 .map_err(|e| anyhow!(e))?
2908 .data_type()
2909 .clone();
2910
2911 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2912 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2913 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2914 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2915 (ArrowDataType::List(_), ArrowDataType::List(field))
2916 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2917 let mut schema_fields = HashMap::new();
2918 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2919 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2920 }
2921 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2923 let mut schema_fields = HashMap::new();
2924 our_field_type
2925 .as_struct()
2926 .iter()
2927 .for_each(|(name, data_type)| {
2928 let res = schema_fields.insert(name, data_type);
2929 assert!(res.is_none())
2931 });
2932 check_compatibility(schema_fields, fields)?
2933 }
2934 (left, right) => left.equals_datatype(right),
2942 };
2943 if !compatible {
2944 bail!(
2945 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2946 arrow_field.name(),
2947 converted_arrow_data_type,
2948 arrow_field.data_type()
2949 );
2950 }
2951 }
2952 Ok(true)
2953}
2954
2955pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2957 if rw_schema.fields.len() != arrow_schema.fields().len() {
2958 bail!(
2959 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2960 rw_schema.fields.len(),
2961 arrow_schema.fields.len()
2962 );
2963 }
2964
2965 let mut schema_fields = HashMap::new();
2966 rw_schema.fields.iter().for_each(|field| {
2967 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2968 assert!(res.is_none())
2970 });
2971
2972 check_compatibility(schema_fields, &arrow_schema.fields)?;
2973 Ok(())
2974}
2975
2976fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2977 serde_json::to_vec(&metadata).unwrap()
2978}
2979
2980fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2981 serde_json::from_slice(&bytes).unwrap()
2982}
2983
2984pub fn parse_partition_by_exprs(
2985 expr: String,
2986) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2987 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2989 if !re.is_match(&expr) {
2990 bail!(format!(
2991 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2992 expr
2993 ))
2994 }
2995 let caps = re.captures_iter(&expr);
2996
2997 let mut partition_columns = vec![];
2998
2999 for mat in caps {
3000 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
3001 (&mat["transform"], Transform::Identity)
3002 } else {
3003 let mut func = mat["transform"].to_owned();
3004 if func == "bucket" || func == "truncate" {
3005 let n = &mat
3006 .name("n")
3007 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
3008 .as_str();
3009 func = format!("{func}[{n}]");
3010 }
3011 (
3012 &mat["field"],
3013 Transform::from_str(&func)
3014 .with_context(|| format!("invalid transform function {}", func))?,
3015 )
3016 };
3017 partition_columns.push((column.to_owned(), transform));
3018 }
3019 Ok(partition_columns)
3020}
3021
3022pub fn commit_branch(sink_type: &str, write_mode: IcebergWriteMode) -> String {
3023 if should_enable_iceberg_cow(sink_type, write_mode) {
3024 ICEBERG_COW_BRANCH.to_owned()
3025 } else {
3026 MAIN_BRANCH.to_owned()
3027 }
3028}
3029
3030pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: IcebergWriteMode) -> bool {
3031 sink_type == SINK_TYPE_UPSERT && write_mode == IcebergWriteMode::CopyOnWrite
3032}
3033
3034impl crate::with_options::WithOptions for IcebergWriteMode {}
3035
3036impl crate::with_options::WithOptions for FormatVersion {}
3037
3038impl crate::with_options::WithOptions for CompactionType {}
3039
3040#[cfg(test)]
3041mod test {
3042 use std::collections::BTreeMap;
3043
3044 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
3045 use risingwave_common::types::{DataType, MapType, StructType};
3046
3047 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
3048 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
3049 use crate::sink::iceberg::{
3050 COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
3051 CompactionType, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION, FormatVersion,
3052 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, IcebergConfig, IcebergWriteMode,
3053 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3054 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
3055 };
3056
3057 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
3060 fn test_compatible_arrow_schema() {
3061 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
3062
3063 use super::*;
3064 let risingwave_schema = Schema::new(vec![
3065 Field::with_name(DataType::Int32, "a"),
3066 Field::with_name(DataType::Int32, "b"),
3067 Field::with_name(DataType::Int32, "c"),
3068 ]);
3069 let arrow_schema = ArrowSchema::new(vec![
3070 ArrowField::new("a", ArrowDataType::Int32, false),
3071 ArrowField::new("b", ArrowDataType::Int32, false),
3072 ArrowField::new("c", ArrowDataType::Int32, false),
3073 ]);
3074
3075 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3076
3077 let risingwave_schema = Schema::new(vec![
3078 Field::with_name(DataType::Int32, "d"),
3079 Field::with_name(DataType::Int32, "c"),
3080 Field::with_name(DataType::Int32, "a"),
3081 Field::with_name(DataType::Int32, "b"),
3082 ]);
3083 let arrow_schema = ArrowSchema::new(vec![
3084 ArrowField::new("a", ArrowDataType::Int32, false),
3085 ArrowField::new("b", ArrowDataType::Int32, false),
3086 ArrowField::new("d", ArrowDataType::Int32, false),
3087 ArrowField::new("c", ArrowDataType::Int32, false),
3088 ]);
3089 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3090
3091 let risingwave_schema = Schema::new(vec![
3092 Field::with_name(
3093 DataType::Struct(StructType::new(vec![
3094 ("a1", DataType::Int32),
3095 (
3096 "a2",
3097 DataType::Struct(StructType::new(vec![
3098 ("a21", DataType::Bytea),
3099 (
3100 "a22",
3101 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3102 ),
3103 ])),
3104 ),
3105 ])),
3106 "a",
3107 ),
3108 Field::with_name(
3109 DataType::list(DataType::Struct(StructType::new(vec![
3110 ("b1", DataType::Int32),
3111 ("b2", DataType::Bytea),
3112 (
3113 "b3",
3114 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3115 ),
3116 ]))),
3117 "b",
3118 ),
3119 Field::with_name(
3120 DataType::Map(MapType::from_kv(
3121 DataType::Varchar,
3122 DataType::list(DataType::Struct(StructType::new([
3123 ("c1", DataType::Int32),
3124 ("c2", DataType::Bytea),
3125 (
3126 "c3",
3127 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
3128 ),
3129 ]))),
3130 )),
3131 "c",
3132 ),
3133 ]);
3134 let arrow_schema = ArrowSchema::new(vec![
3135 ArrowField::new(
3136 "a",
3137 ArrowDataType::Struct(ArrowFields::from(vec![
3138 ArrowField::new("a1", ArrowDataType::Int32, false),
3139 ArrowField::new(
3140 "a2",
3141 ArrowDataType::Struct(ArrowFields::from(vec![
3142 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
3143 ArrowField::new_map(
3144 "a22",
3145 "entries",
3146 ArrowFieldRef::new(ArrowField::new(
3147 "key",
3148 ArrowDataType::Utf8,
3149 false,
3150 )),
3151 ArrowFieldRef::new(ArrowField::new(
3152 "value",
3153 ArrowDataType::Utf8,
3154 false,
3155 )),
3156 false,
3157 false,
3158 ),
3159 ])),
3160 false,
3161 ),
3162 ])),
3163 false,
3164 ),
3165 ArrowField::new(
3166 "b",
3167 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3168 ArrowDataType::Struct(ArrowFields::from(vec![
3169 ArrowField::new("b1", ArrowDataType::Int32, false),
3170 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
3171 ArrowField::new_map(
3172 "b3",
3173 "entries",
3174 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3175 ArrowFieldRef::new(ArrowField::new(
3176 "value",
3177 ArrowDataType::Utf8,
3178 false,
3179 )),
3180 false,
3181 false,
3182 ),
3183 ])),
3184 false,
3185 ))),
3186 false,
3187 ),
3188 ArrowField::new_map(
3189 "c",
3190 "entries",
3191 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
3192 ArrowFieldRef::new(ArrowField::new(
3193 "value",
3194 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
3195 ArrowDataType::Struct(ArrowFields::from(vec![
3196 ArrowField::new("c1", ArrowDataType::Int32, false),
3197 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
3198 ArrowField::new_map(
3199 "c3",
3200 "entries",
3201 ArrowFieldRef::new(ArrowField::new(
3202 "key",
3203 ArrowDataType::Utf8,
3204 false,
3205 )),
3206 ArrowFieldRef::new(ArrowField::new(
3207 "value",
3208 ArrowDataType::Utf8,
3209 false,
3210 )),
3211 false,
3212 false,
3213 ),
3214 ])),
3215 false,
3216 ))),
3217 false,
3218 )),
3219 false,
3220 false,
3221 ),
3222 ]);
3223 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
3224 }
3225
3226 #[test]
3227 fn test_parse_iceberg_config() {
3228 let values = [
3229 ("connector", "iceberg"),
3230 ("type", "upsert"),
3231 ("primary_key", "v1"),
3232 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
3233 ("warehouse.path", "s3://iceberg"),
3234 ("s3.endpoint", "http://127.0.0.1:9301"),
3235 ("s3.access.key", "hummockadmin"),
3236 ("s3.secret.key", "hummockadmin"),
3237 ("s3.path.style.access", "true"),
3238 ("s3.region", "us-east-1"),
3239 ("catalog.type", "jdbc"),
3240 ("catalog.name", "demo"),
3241 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
3242 ("catalog.jdbc.user", "admin"),
3243 ("catalog.jdbc.password", "123456"),
3244 ("database.name", "demo_db"),
3245 ("table.name", "demo_table"),
3246 ("enable_compaction", "true"),
3247 ("compaction_interval_sec", "1800"),
3248 ("enable_snapshot_expiration", "true"),
3249 ]
3250 .into_iter()
3251 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3252 .collect();
3253
3254 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3255
3256 let expected_iceberg_config = IcebergConfig {
3257 common: IcebergCommon {
3258 warehouse_path: Some("s3://iceberg".to_owned()),
3259 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
3260 s3_region: Some("us-east-1".to_owned()),
3261 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
3262 s3_access_key: Some("hummockadmin".to_owned()),
3263 s3_secret_key: Some("hummockadmin".to_owned()),
3264 s3_iam_role_arn: None,
3265 gcs_credential: None,
3266 catalog_type: Some("jdbc".to_owned()),
3267 glue_id: None,
3268 glue_region: None,
3269 glue_access_key: None,
3270 glue_secret_key: None,
3271 glue_iam_role_arn: None,
3272 catalog_name: Some("demo".to_owned()),
3273 s3_path_style_access: Some(true),
3274 catalog_credential: None,
3275 catalog_oauth2_server_uri: None,
3276 catalog_scope: None,
3277 catalog_token: None,
3278 enable_config_load: None,
3279 rest_signing_name: None,
3280 rest_signing_region: None,
3281 rest_sigv4_enabled: None,
3282 hosted_catalog: None,
3283 azblob_account_name: None,
3284 azblob_account_key: None,
3285 azblob_endpoint_url: None,
3286 catalog_header: None,
3287 adlsgen2_account_name: None,
3288 adlsgen2_account_key: None,
3289 adlsgen2_endpoint: None,
3290 vended_credentials: None,
3291 catalog_security: None,
3292 gcp_auth_scopes: None,
3293 catalog_io_impl: None,
3294 },
3295 table: IcebergTableIdentifier {
3296 database_name: Some("demo_db".to_owned()),
3297 table_name: "demo_table".to_owned(),
3298 },
3299 r#type: "upsert".to_owned(),
3300 force_append_only: false,
3301 primary_key: Some(vec!["v1".to_owned()]),
3302 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
3303 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
3304 .into_iter()
3305 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3306 .collect(),
3307 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
3308 commit_checkpoint_size_threshold_mb: Some(
3309 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3310 ),
3311 create_table_if_not_exists: false,
3312 is_exactly_once: Some(true),
3313 commit_retry_num: 8,
3314 enable_compaction: true,
3315 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
3316 enable_snapshot_expiration: true,
3317 write_mode: IcebergWriteMode::MergeOnRead,
3318 format_version: FormatVersion::V2,
3319 snapshot_expiration_max_age_millis: None,
3320 snapshot_expiration_retain_last: None,
3321 snapshot_expiration_clear_expired_files: true,
3322 snapshot_expiration_clear_expired_meta_data: true,
3323 max_snapshots_num_before_compaction: None,
3324 small_files_threshold_mb: None,
3325 delete_files_count_threshold: None,
3326 trigger_snapshot_count: None,
3327 target_file_size_mb: None,
3328 compaction_type: None,
3329 write_parquet_compression: None,
3330 write_parquet_max_row_group_rows: None,
3331 };
3332
3333 assert_eq!(iceberg_config, expected_iceberg_config);
3334
3335 assert_eq!(
3336 &iceberg_config.full_table_name().unwrap().to_string(),
3337 "demo_db.demo_table"
3338 );
3339 }
3340
3341 #[test]
3342 fn test_parse_commit_checkpoint_size_threshold() {
3343 let values: BTreeMap<String, String> = [
3344 ("connector", "iceberg"),
3345 ("type", "append-only"),
3346 ("force_append_only", "true"),
3347 ("catalog.name", "test-catalog"),
3348 ("catalog.type", "storage"),
3349 ("warehouse.path", "s3://my-bucket/warehouse"),
3350 ("database.name", "test_db"),
3351 ("table.name", "test_table"),
3352 ("commit_checkpoint_size_threshold_mb", "128"),
3353 ]
3354 .into_iter()
3355 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3356 .collect();
3357
3358 let config = IcebergConfig::from_btreemap(values).unwrap();
3359 assert_eq!(config.commit_checkpoint_size_threshold_mb, Some(128));
3360 assert_eq!(
3361 config.commit_checkpoint_size_threshold_bytes(),
3362 Some(128 * 1024 * 1024)
3363 );
3364 }
3365
3366 #[test]
3367 fn test_default_commit_checkpoint_size_threshold() {
3368 let values: BTreeMap<String, String> = [
3369 ("connector", "iceberg"),
3370 ("type", "append-only"),
3371 ("force_append_only", "true"),
3372 ("catalog.name", "test-catalog"),
3373 ("catalog.type", "storage"),
3374 ("warehouse.path", "s3://my-bucket/warehouse"),
3375 ("database.name", "test_db"),
3376 ("table.name", "test_table"),
3377 ]
3378 .into_iter()
3379 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3380 .collect();
3381
3382 let config = IcebergConfig::from_btreemap(values).unwrap();
3383 assert_eq!(
3384 config.commit_checkpoint_size_threshold_mb,
3385 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
3386 );
3387 assert_eq!(
3388 config.commit_checkpoint_size_threshold_bytes(),
3389 Some(ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB * 1024 * 1024)
3390 );
3391 }
3392
3393 #[test]
3394 fn test_reject_zero_commit_checkpoint_size_threshold() {
3395 let values: BTreeMap<String, String> = [
3396 ("connector", "iceberg"),
3397 ("type", "append-only"),
3398 ("force_append_only", "true"),
3399 ("catalog.name", "test-catalog"),
3400 ("catalog.type", "storage"),
3401 ("warehouse.path", "s3://my-bucket/warehouse"),
3402 ("database.name", "test_db"),
3403 ("table.name", "test_table"),
3404 ("commit_checkpoint_size_threshold_mb", "0"),
3405 ]
3406 .into_iter()
3407 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3408 .collect();
3409
3410 let err = IcebergConfig::from_btreemap(values).unwrap_err();
3411 assert!(
3412 err.to_string()
3413 .contains("`commit_checkpoint_size_threshold_mb` must be greater than 0")
3414 );
3415 }
3416
3417 async fn test_create_catalog(configs: BTreeMap<String, String>) {
3418 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
3419
3420 let _table = iceberg_config.load_table().await.unwrap();
3421 }
3422
3423 #[tokio::test]
3424 #[ignore]
3425 async fn test_storage_catalog() {
3426 let values = [
3427 ("connector", "iceberg"),
3428 ("type", "append-only"),
3429 ("force_append_only", "true"),
3430 ("s3.endpoint", "http://127.0.0.1:9301"),
3431 ("s3.access.key", "hummockadmin"),
3432 ("s3.secret.key", "hummockadmin"),
3433 ("s3.region", "us-east-1"),
3434 ("s3.path.style.access", "true"),
3435 ("catalog.name", "demo"),
3436 ("catalog.type", "storage"),
3437 ("warehouse.path", "s3://icebergdata/demo"),
3438 ("database.name", "s1"),
3439 ("table.name", "t1"),
3440 ]
3441 .into_iter()
3442 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3443 .collect();
3444
3445 test_create_catalog(values).await;
3446 }
3447
3448 #[tokio::test]
3449 #[ignore]
3450 async fn test_rest_catalog() {
3451 let values = [
3452 ("connector", "iceberg"),
3453 ("type", "append-only"),
3454 ("force_append_only", "true"),
3455 ("s3.endpoint", "http://127.0.0.1:9301"),
3456 ("s3.access.key", "hummockadmin"),
3457 ("s3.secret.key", "hummockadmin"),
3458 ("s3.region", "us-east-1"),
3459 ("s3.path.style.access", "true"),
3460 ("catalog.name", "demo"),
3461 ("catalog.type", "rest"),
3462 ("catalog.uri", "http://192.168.167.4:8181"),
3463 ("warehouse.path", "s3://icebergdata/demo"),
3464 ("database.name", "s1"),
3465 ("table.name", "t1"),
3466 ]
3467 .into_iter()
3468 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3469 .collect();
3470
3471 test_create_catalog(values).await;
3472 }
3473
3474 #[tokio::test]
3475 #[ignore]
3476 async fn test_jdbc_catalog() {
3477 let values = [
3478 ("connector", "iceberg"),
3479 ("type", "append-only"),
3480 ("force_append_only", "true"),
3481 ("s3.endpoint", "http://127.0.0.1:9301"),
3482 ("s3.access.key", "hummockadmin"),
3483 ("s3.secret.key", "hummockadmin"),
3484 ("s3.region", "us-east-1"),
3485 ("s3.path.style.access", "true"),
3486 ("catalog.name", "demo"),
3487 ("catalog.type", "jdbc"),
3488 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
3489 ("catalog.jdbc.user", "admin"),
3490 ("catalog.jdbc.password", "123456"),
3491 ("warehouse.path", "s3://icebergdata/demo"),
3492 ("database.name", "s1"),
3493 ("table.name", "t1"),
3494 ]
3495 .into_iter()
3496 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3497 .collect();
3498
3499 test_create_catalog(values).await;
3500 }
3501
3502 #[tokio::test]
3503 #[ignore]
3504 async fn test_hive_catalog() {
3505 let values = [
3506 ("connector", "iceberg"),
3507 ("type", "append-only"),
3508 ("force_append_only", "true"),
3509 ("s3.endpoint", "http://127.0.0.1:9301"),
3510 ("s3.access.key", "hummockadmin"),
3511 ("s3.secret.key", "hummockadmin"),
3512 ("s3.region", "us-east-1"),
3513 ("s3.path.style.access", "true"),
3514 ("catalog.name", "demo"),
3515 ("catalog.type", "hive"),
3516 ("catalog.uri", "thrift://localhost:9083"),
3517 ("warehouse.path", "s3://icebergdata/demo"),
3518 ("database.name", "s1"),
3519 ("table.name", "t1"),
3520 ]
3521 .into_iter()
3522 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3523 .collect();
3524
3525 test_create_catalog(values).await;
3526 }
3527
3528 #[test]
3530 fn test_parse_google_auth_config() {
3531 let values: BTreeMap<String, String> = [
3532 ("connector", "iceberg"),
3533 ("type", "append-only"),
3534 ("force_append_only", "true"),
3535 ("catalog.name", "biglake-catalog"),
3536 ("catalog.type", "rest"),
3537 ("catalog.uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog"),
3538 ("warehouse.path", "bq://projects/my-gcp-project"),
3539 ("catalog.header", "x-goog-user-project=my-gcp-project"),
3540 ("catalog.security", "google"),
3541 ("gcp.auth.scopes", "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"),
3542 ("database.name", "my_dataset"),
3543 ("table.name", "my_table"),
3544 ]
3545 .into_iter()
3546 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3547 .collect();
3548
3549 let config = IcebergConfig::from_btreemap(values).unwrap();
3550 assert_eq!(config.catalog_type(), "rest");
3551 assert_eq!(config.common.catalog_security.as_deref(), Some("google"));
3552 assert_eq!(
3553 config.common.gcp_auth_scopes.as_deref(),
3554 Some(
3555 "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/bigquery"
3556 )
3557 );
3558 assert_eq!(
3559 config.common.warehouse_path.as_deref(),
3560 Some("bq://projects/my-gcp-project")
3561 );
3562 assert_eq!(
3563 config.common.catalog_header.as_deref(),
3564 Some("x-goog-user-project=my-gcp-project")
3565 );
3566 }
3567
3568 #[test]
3570 fn test_parse_oauth2_security_config() {
3571 let values: BTreeMap<String, String> = [
3572 ("connector", "iceberg"),
3573 ("type", "append-only"),
3574 ("force_append_only", "true"),
3575 ("catalog.name", "oauth2-catalog"),
3576 ("catalog.type", "rest"),
3577 ("catalog.uri", "https://example.com/iceberg/rest"),
3578 ("warehouse.path", "s3://my-bucket/warehouse"),
3579 ("catalog.security", "oauth2"),
3580 ("catalog.credential", "client_id:client_secret"),
3581 ("catalog.token", "bearer-token"),
3582 (
3583 "catalog.oauth2_server_uri",
3584 "https://oauth.example.com/token",
3585 ),
3586 ("catalog.scope", "read write"),
3587 ("database.name", "test_db"),
3588 ("table.name", "test_table"),
3589 ]
3590 .into_iter()
3591 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3592 .collect();
3593
3594 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3595
3596 assert_eq!(iceberg_config.catalog_type(), "rest");
3598
3599 assert_eq!(
3601 iceberg_config.common.catalog_security.as_deref(),
3602 Some("oauth2")
3603 );
3604 assert_eq!(
3605 iceberg_config.common.catalog_credential.as_deref(),
3606 Some("client_id:client_secret")
3607 );
3608 assert_eq!(
3609 iceberg_config.common.catalog_token.as_deref(),
3610 Some("bearer-token")
3611 );
3612 assert_eq!(
3613 iceberg_config.common.catalog_oauth2_server_uri.as_deref(),
3614 Some("https://oauth.example.com/token")
3615 );
3616 assert_eq!(
3617 iceberg_config.common.catalog_scope.as_deref(),
3618 Some("read write")
3619 );
3620 }
3621
3622 #[test]
3624 fn test_parse_invalid_security_config() {
3625 let values: BTreeMap<String, String> = [
3626 ("connector", "iceberg"),
3627 ("type", "append-only"),
3628 ("force_append_only", "true"),
3629 ("catalog.name", "invalid-catalog"),
3630 ("catalog.type", "rest"),
3631 ("catalog.uri", "https://example.com/iceberg/rest"),
3632 ("warehouse.path", "s3://my-bucket/warehouse"),
3633 ("catalog.security", "invalid_security_type"),
3634 ("database.name", "test_db"),
3635 ("table.name", "test_table"),
3636 ]
3637 .into_iter()
3638 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3639 .collect();
3640
3641 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3643
3644 assert_eq!(
3646 iceberg_config.common.catalog_security.as_deref(),
3647 Some("invalid_security_type")
3648 );
3649
3650 assert_eq!(iceberg_config.catalog_type(), "rest");
3652 }
3653
3654 #[test]
3656 fn test_parse_custom_io_impl_config() {
3657 let values: BTreeMap<String, String> = [
3658 ("connector", "iceberg"),
3659 ("type", "append-only"),
3660 ("force_append_only", "true"),
3661 ("catalog.name", "gcs-catalog"),
3662 ("catalog.type", "rest"),
3663 ("catalog.uri", "https://example.com/iceberg/rest"),
3664 ("warehouse.path", "gs://my-bucket/warehouse"),
3665 ("catalog.security", "google"),
3666 ("catalog.io_impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"),
3667 ("database.name", "test_db"),
3668 ("table.name", "test_table"),
3669 ]
3670 .into_iter()
3671 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3672 .collect();
3673
3674 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
3675
3676 assert_eq!(iceberg_config.catalog_type(), "rest");
3678
3679 assert_eq!(
3681 iceberg_config.common.catalog_io_impl.as_deref(),
3682 Some("org.apache.iceberg.gcp.gcs.GCSFileIO")
3683 );
3684
3685 assert_eq!(
3687 iceberg_config.common.catalog_security.as_deref(),
3688 Some("google")
3689 );
3690 }
3691
3692 #[test]
3693 fn test_config_constants_consistency() {
3694 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
3697 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
3698 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
3699 assert_eq!(WRITE_MODE, "write_mode");
3700 assert_eq!(
3701 COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB,
3702 "commit_checkpoint_size_threshold_mb"
3703 );
3704 assert_eq!(
3705 SNAPSHOT_EXPIRATION_RETAIN_LAST,
3706 "snapshot_expiration_retain_last"
3707 );
3708 assert_eq!(
3709 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
3710 "snapshot_expiration_max_age_millis"
3711 );
3712 assert_eq!(
3713 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
3714 "snapshot_expiration_clear_expired_files"
3715 );
3716 assert_eq!(
3717 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
3718 "snapshot_expiration_clear_expired_meta_data"
3719 );
3720 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
3721 }
3722
3723 #[test]
3725 fn test_parse_compaction_config() {
3726 let values: BTreeMap<String, String> = [
3728 ("connector", "iceberg"),
3729 ("type", "upsert"),
3730 ("primary_key", "id"),
3731 ("warehouse.path", "s3://iceberg"),
3732 ("s3.endpoint", "http://127.0.0.1:9301"),
3733 ("s3.access.key", "test"),
3734 ("s3.secret.key", "test"),
3735 ("s3.region", "us-east-1"),
3736 ("catalog.type", "storage"),
3737 ("catalog.name", "demo"),
3738 ("database.name", "test_db"),
3739 ("table.name", "test_table"),
3740 ("enable_compaction", "true"),
3741 ("compaction.max_snapshots_num", "100"),
3742 ("compaction.small_files_threshold_mb", "512"),
3743 ("compaction.delete_files_count_threshold", "50"),
3744 ("compaction.trigger_snapshot_count", "10"),
3745 ("compaction.target_file_size_mb", "256"),
3746 ("compaction.type", "full"),
3747 ("compaction.write_parquet_compression", "zstd"),
3748 ("compaction.write_parquet_max_row_group_rows", "50000"),
3749 ]
3750 .into_iter()
3751 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3752 .collect();
3753
3754 let config = IcebergConfig::from_btreemap(values).unwrap();
3755 assert!(config.enable_compaction);
3756 assert_eq!(config.max_snapshots_num_before_compaction, Some(100));
3757 assert_eq!(config.small_files_threshold_mb, Some(512));
3758 assert_eq!(config.delete_files_count_threshold, Some(50));
3759 assert_eq!(config.trigger_snapshot_count, Some(10));
3760 assert_eq!(config.target_file_size_mb, Some(256));
3761 assert_eq!(config.compaction_type, Some(CompactionType::Full));
3762 assert_eq!(config.target_file_size_mb(), 256);
3763 assert_eq!(config.write_parquet_compression(), "zstd");
3764 assert_eq!(config.write_parquet_max_row_group_rows(), 50000);
3765
3766 let values: BTreeMap<String, String> = [
3768 ("connector", "iceberg"),
3769 ("type", "append-only"),
3770 ("force_append_only", "true"),
3771 ("catalog.name", "test-catalog"),
3772 ("catalog.type", "storage"),
3773 ("warehouse.path", "s3://my-bucket/warehouse"),
3774 ("database.name", "test_db"),
3775 ("table.name", "test_table"),
3776 ]
3777 .into_iter()
3778 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3779 .collect();
3780
3781 let config = IcebergConfig::from_btreemap(values).unwrap();
3782 assert_eq!(config.target_file_size_mb(), 1024); assert_eq!(config.write_parquet_compression(), "zstd"); assert_eq!(config.write_parquet_max_row_group_rows(), 122880); }
3786
3787 #[test]
3789 fn test_parse_parquet_compression() {
3790 use parquet::basic::Compression;
3791
3792 use super::parse_parquet_compression;
3793
3794 assert!(matches!(
3796 parse_parquet_compression("snappy"),
3797 Compression::SNAPPY
3798 ));
3799 assert!(matches!(
3800 parse_parquet_compression("gzip"),
3801 Compression::GZIP(_)
3802 ));
3803 assert!(matches!(
3804 parse_parquet_compression("zstd"),
3805 Compression::ZSTD(_)
3806 ));
3807 assert!(matches!(parse_parquet_compression("lz4"), Compression::LZ4));
3808 assert!(matches!(
3809 parse_parquet_compression("brotli"),
3810 Compression::BROTLI(_)
3811 ));
3812 assert!(matches!(
3813 parse_parquet_compression("uncompressed"),
3814 Compression::UNCOMPRESSED
3815 ));
3816
3817 assert!(matches!(
3819 parse_parquet_compression("SNAPPY"),
3820 Compression::SNAPPY
3821 ));
3822 assert!(matches!(
3823 parse_parquet_compression("Zstd"),
3824 Compression::ZSTD(_)
3825 ));
3826
3827 assert!(matches!(
3829 parse_parquet_compression("invalid"),
3830 Compression::SNAPPY
3831 ));
3832 }
3833
3834 #[test]
3835 fn test_append_only_rejects_copy_on_write() {
3836 let values = [
3838 ("connector", "iceberg"),
3839 ("type", "append-only"),
3840 ("warehouse.path", "s3://iceberg"),
3841 ("s3.endpoint", "http://127.0.0.1:9301"),
3842 ("s3.access.key", "test"),
3843 ("s3.secret.key", "test"),
3844 ("s3.region", "us-east-1"),
3845 ("catalog.type", "storage"),
3846 ("catalog.name", "demo"),
3847 ("database.name", "test_db"),
3848 ("table.name", "test_table"),
3849 ("write_mode", "copy-on-write"),
3850 ]
3851 .into_iter()
3852 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3853 .collect();
3854
3855 let result = IcebergConfig::from_btreemap(values);
3856 assert!(result.is_err());
3857 assert!(
3858 result
3859 .unwrap_err()
3860 .to_string()
3861 .contains("'copy-on-write' mode is not supported for append-only iceberg sink")
3862 );
3863 }
3864
3865 #[test]
3866 fn test_append_only_accepts_merge_on_read() {
3867 let values = [
3869 ("connector", "iceberg"),
3870 ("type", "append-only"),
3871 ("warehouse.path", "s3://iceberg"),
3872 ("s3.endpoint", "http://127.0.0.1:9301"),
3873 ("s3.access.key", "test"),
3874 ("s3.secret.key", "test"),
3875 ("s3.region", "us-east-1"),
3876 ("catalog.type", "storage"),
3877 ("catalog.name", "demo"),
3878 ("database.name", "test_db"),
3879 ("table.name", "test_table"),
3880 ("write_mode", "merge-on-read"),
3881 ]
3882 .into_iter()
3883 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3884 .collect();
3885
3886 let result = IcebergConfig::from_btreemap(values);
3887 assert!(result.is_ok());
3888 let config = result.unwrap();
3889 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3890 }
3891
3892 #[test]
3893 fn test_append_only_defaults_to_merge_on_read() {
3894 let values = [
3896 ("connector", "iceberg"),
3897 ("type", "append-only"),
3898 ("warehouse.path", "s3://iceberg"),
3899 ("s3.endpoint", "http://127.0.0.1:9301"),
3900 ("s3.access.key", "test"),
3901 ("s3.secret.key", "test"),
3902 ("s3.region", "us-east-1"),
3903 ("catalog.type", "storage"),
3904 ("catalog.name", "demo"),
3905 ("database.name", "test_db"),
3906 ("table.name", "test_table"),
3907 ]
3908 .into_iter()
3909 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3910 .collect();
3911
3912 let result = IcebergConfig::from_btreemap(values);
3913 assert!(result.is_ok());
3914 let config = result.unwrap();
3915 assert_eq!(config.write_mode, IcebergWriteMode::MergeOnRead);
3916 }
3917
3918 #[test]
3919 fn test_upsert_accepts_copy_on_write() {
3920 let values = [
3922 ("connector", "iceberg"),
3923 ("type", "upsert"),
3924 ("primary_key", "id"),
3925 ("warehouse.path", "s3://iceberg"),
3926 ("s3.endpoint", "http://127.0.0.1:9301"),
3927 ("s3.access.key", "test"),
3928 ("s3.secret.key", "test"),
3929 ("s3.region", "us-east-1"),
3930 ("catalog.type", "storage"),
3931 ("catalog.name", "demo"),
3932 ("database.name", "test_db"),
3933 ("table.name", "test_table"),
3934 ("write_mode", "copy-on-write"),
3935 ]
3936 .into_iter()
3937 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3938 .collect();
3939
3940 let result = IcebergConfig::from_btreemap(values);
3941 assert!(result.is_ok());
3942 let config = result.unwrap();
3943 assert_eq!(config.write_mode, IcebergWriteMode::CopyOnWrite);
3944 }
3945}