1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30 UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43 DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59 Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112 "snapshot_expiration_clear_expired_meta_data";
113pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
114
115pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
116
117pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
118
119pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
120
121pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
122
123fn deserialize_and_normalize_string<'de, D>(
124 deserializer: D,
125) -> std::result::Result<String, D::Error>
126where
127 D: serde::Deserializer<'de>,
128{
129 let s = String::deserialize(deserializer)?;
130 Ok(s.to_lowercase().replace('_', "-"))
131}
132
133fn deserialize_and_normalize_optional_string<'de, D>(
134 deserializer: D,
135) -> std::result::Result<Option<String>, D::Error>
136where
137 D: serde::Deserializer<'de>,
138{
139 let opt = Option::<String>::deserialize(deserializer)?;
140 Ok(opt.map(|s| s.to_lowercase().replace('_', "-")))
141}
142
143fn default_commit_retry_num() -> u32 {
144 8
145}
146
147fn default_iceberg_write_mode() -> String {
148 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
149}
150
151fn default_true() -> bool {
152 true
153}
154
155fn default_some_true() -> Option<bool> {
156 Some(true)
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Default)]
161#[serde(rename_all = "kebab-case")]
162pub enum CompactionType {
163 #[default]
165 Full,
166 SmallFiles,
168 FilesWithDelete,
170}
171
172impl CompactionType {
173 pub fn as_str(&self) -> &'static str {
174 match self {
175 CompactionType::Full => "full",
176 CompactionType::SmallFiles => "small-files",
177 CompactionType::FilesWithDelete => "files-with-delete",
178 }
179 }
180}
181
182impl std::fmt::Display for CompactionType {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 write!(f, "{}", self.as_str())
185 }
186}
187
188impl FromStr for CompactionType {
189 type Err = String;
190
191 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
192 let normalized = s.to_lowercase().replace('_', "-");
194
195 match normalized.as_str() {
196 "full" => Ok(CompactionType::Full),
197 "small-files" => Ok(CompactionType::SmallFiles),
198 "files-with-delete" => Ok(CompactionType::FilesWithDelete),
199 _ => Err(format!(
200 "Unknown compaction type '{}', must be one of: 'full', 'small-files', 'files-with-delete'",
201 s )),
203 }
204 }
205}
206
207#[serde_as]
208#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
209pub struct IcebergConfig {
210 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
213 pub force_append_only: bool,
214
215 #[serde(flatten)]
216 common: IcebergCommon,
217
218 #[serde(flatten)]
219 table: IcebergTableIdentifier,
220
221 #[serde(
222 rename = "primary_key",
223 default,
224 deserialize_with = "deserialize_optional_string_seq_from_string"
225 )]
226 pub primary_key: Option<Vec<String>>,
227
228 #[serde(skip)]
230 pub java_catalog_props: HashMap<String, String>,
231
232 #[serde(default)]
233 pub partition_by: Option<String>,
234
235 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
237 #[serde_as(as = "DisplayFromStr")]
238 #[with_option(allow_alter_on_fly)]
239 pub commit_checkpoint_interval: u64,
240
241 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
242 pub create_table_if_not_exists: bool,
243
244 #[serde(default = "default_some_true")]
246 #[serde_as(as = "Option<DisplayFromStr>")]
247 pub is_exactly_once: Option<bool>,
248 #[serde(default = "default_commit_retry_num")]
253 pub commit_retry_num: u32,
254
255 #[serde(
257 rename = "enable_compaction",
258 default,
259 deserialize_with = "deserialize_bool_from_string"
260 )]
261 #[with_option(allow_alter_on_fly)]
262 pub enable_compaction: bool,
263
264 #[serde(rename = "compaction_interval_sec", default)]
266 #[serde_as(as = "Option<DisplayFromStr>")]
267 #[with_option(allow_alter_on_fly)]
268 pub compaction_interval_sec: Option<u64>,
269
270 #[serde(
272 rename = "enable_snapshot_expiration",
273 default,
274 deserialize_with = "deserialize_bool_from_string"
275 )]
276 #[with_option(allow_alter_on_fly)]
277 pub enable_snapshot_expiration: bool,
278
279 #[serde(
281 rename = "write_mode",
282 default = "default_iceberg_write_mode",
283 deserialize_with = "deserialize_and_normalize_string"
284 )]
285 pub write_mode: String,
286
287 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
290 #[serde_as(as = "Option<DisplayFromStr>")]
291 #[with_option(allow_alter_on_fly)]
292 pub snapshot_expiration_max_age_millis: Option<i64>,
293
294 #[serde(rename = "snapshot_expiration_retain_last", default)]
296 #[serde_as(as = "Option<DisplayFromStr>")]
297 #[with_option(allow_alter_on_fly)]
298 pub snapshot_expiration_retain_last: Option<i32>,
299
300 #[serde(
301 rename = "snapshot_expiration_clear_expired_files",
302 default = "default_true",
303 deserialize_with = "deserialize_bool_from_string"
304 )]
305 #[with_option(allow_alter_on_fly)]
306 pub snapshot_expiration_clear_expired_files: bool,
307
308 #[serde(
309 rename = "snapshot_expiration_clear_expired_meta_data",
310 default = "default_true",
311 deserialize_with = "deserialize_bool_from_string"
312 )]
313 #[with_option(allow_alter_on_fly)]
314 pub snapshot_expiration_clear_expired_meta_data: bool,
315
316 #[serde(rename = "compaction.max_snapshots_num", default)]
319 #[serde_as(as = "Option<DisplayFromStr>")]
320 #[with_option(allow_alter_on_fly)]
321 pub max_snapshots_num_before_compaction: Option<usize>,
322
323 #[serde(rename = "compaction.small_files_threshold_mb", default)]
324 #[serde_as(as = "Option<DisplayFromStr>")]
325 #[with_option(allow_alter_on_fly)]
326 pub small_files_threshold_mb: Option<u64>,
327
328 #[serde(rename = "compaction.delete_files_count_threshold", default)]
329 #[serde_as(as = "Option<DisplayFromStr>")]
330 #[with_option(allow_alter_on_fly)]
331 pub delete_files_count_threshold: Option<usize>,
332
333 #[serde(rename = "compaction.trigger_snapshot_count", default)]
334 #[serde_as(as = "Option<DisplayFromStr>")]
335 #[with_option(allow_alter_on_fly)]
336 pub trigger_snapshot_count: Option<usize>,
337
338 #[serde(rename = "compaction.target_file_size_mb", default)]
339 #[serde_as(as = "Option<DisplayFromStr>")]
340 #[with_option(allow_alter_on_fly)]
341 pub target_file_size_mb: Option<u64>,
342
343 #[serde(
346 rename = "compaction.type",
347 default,
348 deserialize_with = "deserialize_and_normalize_optional_string"
349 )]
350 #[with_option(allow_alter_on_fly)]
351 pub compaction_type: Option<String>,
352}
353
354impl EnforceSecret for IcebergConfig {
355 fn enforce_secret<'a>(
356 prop_iter: impl Iterator<Item = &'a str>,
357 ) -> crate::error::ConnectorResult<()> {
358 for prop in prop_iter {
359 IcebergCommon::enforce_one(prop)?;
360 }
361 Ok(())
362 }
363
364 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
365 IcebergCommon::enforce_one(prop)
366 }
367}
368
369impl IcebergConfig {
370 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
371 let mut config =
372 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
373 .map_err(|e| SinkError::Config(anyhow!(e)))?;
374
375 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
376 return Err(SinkError::Config(anyhow!(
377 "`{}` must be {}, or {}",
378 SINK_TYPE_OPTION,
379 SINK_TYPE_APPEND_ONLY,
380 SINK_TYPE_UPSERT
381 )));
382 }
383
384 if config.r#type == SINK_TYPE_UPSERT {
385 if let Some(primary_key) = &config.primary_key {
386 if primary_key.is_empty() {
387 return Err(SinkError::Config(anyhow!(
388 "`primary-key` must not be empty in {}",
389 SINK_TYPE_UPSERT
390 )));
391 }
392 } else {
393 return Err(SinkError::Config(anyhow!(
394 "Must set `primary-key` in {}",
395 SINK_TYPE_UPSERT
396 )));
397 }
398 }
399
400 config.java_catalog_props = values
402 .iter()
403 .filter(|(k, _v)| {
404 k.starts_with("catalog.")
405 && k != &"catalog.uri"
406 && k != &"catalog.type"
407 && k != &"catalog.name"
408 && k != &"catalog.header"
409 })
410 .map(|(k, v)| (k[8..].to_string(), v.clone()))
411 .collect();
412
413 if config.commit_checkpoint_interval == 0 {
414 return Err(SinkError::Config(anyhow!(
415 "`commit-checkpoint-interval` must be greater than 0"
416 )));
417 }
418
419 if let Some(ref compaction_type_str) = config.compaction_type {
421 CompactionType::from_str(compaction_type_str)
422 .map_err(|e| SinkError::Config(anyhow!(e)))?;
423 }
424
425 Ok(config)
426 }
427
428 pub fn catalog_type(&self) -> &str {
429 self.common.catalog_type()
430 }
431
432 pub async fn load_table(&self) -> Result<Table> {
433 self.common
434 .load_table(&self.table, &self.java_catalog_props)
435 .await
436 .map_err(Into::into)
437 }
438
439 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
440 self.common
441 .create_catalog(&self.java_catalog_props)
442 .await
443 .map_err(Into::into)
444 }
445
446 pub fn full_table_name(&self) -> Result<TableIdent> {
447 self.table.to_table_ident().map_err(Into::into)
448 }
449
450 pub fn catalog_name(&self) -> String {
451 self.common.catalog_name()
452 }
453
454 pub fn compaction_interval_sec(&self) -> u64 {
455 self.compaction_interval_sec.unwrap_or(3600)
457 }
458
459 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
462 self.snapshot_expiration_max_age_millis
463 .map(|max_age_millis| current_time_ms - max_age_millis)
464 }
465
466 pub fn trigger_snapshot_count(&self) -> usize {
467 self.trigger_snapshot_count.unwrap_or(16)
468 }
469
470 pub fn small_files_threshold_mb(&self) -> u64 {
471 self.small_files_threshold_mb.unwrap_or(64)
472 }
473
474 pub fn delete_files_count_threshold(&self) -> usize {
475 self.delete_files_count_threshold.unwrap_or(256)
476 }
477
478 pub fn target_file_size_mb(&self) -> u64 {
479 self.target_file_size_mb.unwrap_or(1024)
480 }
481
482 pub fn compaction_type(&self) -> CompactionType {
485 self.compaction_type
486 .as_deref()
487 .and_then(|s| CompactionType::from_str(s).ok())
488 .unwrap_or_default()
489 }
490}
491
492pub struct IcebergSink {
493 pub config: IcebergConfig,
494 param: SinkParam,
495 unique_column_ids: Option<Vec<usize>>,
497}
498
499impl EnforceSecret for IcebergSink {
500 fn enforce_secret<'a>(
501 prop_iter: impl Iterator<Item = &'a str>,
502 ) -> crate::error::ConnectorResult<()> {
503 for prop in prop_iter {
504 IcebergConfig::enforce_one(prop)?;
505 }
506 Ok(())
507 }
508}
509
510impl TryFrom<SinkParam> for IcebergSink {
511 type Error = SinkError;
512
513 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
514 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
515 IcebergSink::new(config, param)
516 }
517}
518
519impl Debug for IcebergSink {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 f.debug_struct("IcebergSink")
522 .field("config", &self.config)
523 .finish()
524 }
525}
526
527impl IcebergSink {
528 pub async fn create_and_validate_table(&self) -> Result<Table> {
529 if self.config.create_table_if_not_exists {
530 self.create_table_if_not_exists().await?;
531 }
532
533 let table = self
534 .config
535 .load_table()
536 .await
537 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
538
539 let sink_schema = self.param.schema();
540 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
541 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
542
543 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
544 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
545
546 Ok(table)
547 }
548
549 pub async fn create_table_if_not_exists(&self) -> Result<()> {
550 let catalog = self.config.create_catalog().await?;
551 let namespace = if let Some(database_name) = self.config.table.database_name() {
552 let namespace = NamespaceIdent::new(database_name.to_owned());
553 if !catalog
554 .namespace_exists(&namespace)
555 .await
556 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
557 {
558 catalog
559 .create_namespace(&namespace, HashMap::default())
560 .await
561 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
562 .context("failed to create iceberg namespace")?;
563 }
564 namespace
565 } else {
566 bail!("database name must be set if you want to create table")
567 };
568
569 let table_id = self
570 .config
571 .full_table_name()
572 .context("Unable to parse table name")?;
573 if !catalog
574 .table_exists(&table_id)
575 .await
576 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
577 {
578 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
579 let arrow_fields = self
581 .param
582 .columns
583 .iter()
584 .map(|column| {
585 Ok(iceberg_create_table_arrow_convert
586 .to_arrow_field(&column.name, &column.data_type)
587 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
588 .context(format!(
589 "failed to convert {}: {} to arrow type",
590 &column.name, &column.data_type
591 ))?)
592 })
593 .collect::<Result<Vec<ArrowField>>>()?;
594 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
595 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
596 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
597 .context("failed to convert arrow schema to iceberg schema")?;
598
599 let location = {
600 let mut names = namespace.clone().inner();
601 names.push(self.config.table.table_name().to_owned());
602 match &self.config.common.warehouse_path {
603 Some(warehouse_path) => {
604 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
605 let url = Url::parse(warehouse_path);
606 if url.is_err() || is_s3_tables {
607 if self.config.common.catalog_type() == "rest"
610 || self.config.common.catalog_type() == "rest_rust"
611 {
612 None
613 } else {
614 bail!(format!("Invalid warehouse path: {}", warehouse_path))
615 }
616 } else if warehouse_path.ends_with('/') {
617 Some(format!("{}{}", warehouse_path, names.join("/")))
618 } else {
619 Some(format!("{}/{}", warehouse_path, names.join("/")))
620 }
621 }
622 None => None,
623 }
624 };
625
626 let partition_spec = match &self.config.partition_by {
627 Some(partition_by) => {
628 let mut partition_fields = Vec::<UnboundPartitionField>::new();
629 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
630 .into_iter()
631 .enumerate()
632 {
633 match iceberg_schema.field_id_by_name(&column) {
634 Some(id) => partition_fields.push(
635 UnboundPartitionField::builder()
636 .source_id(id)
637 .transform(transform)
638 .name(format!("_p_{}", column))
639 .field_id(i as i32)
640 .build(),
641 ),
642 None => bail!(format!(
643 "Partition source column does not exist in schema: {}",
644 column
645 )),
646 };
647 }
648 Some(
649 UnboundPartitionSpec::builder()
650 .with_spec_id(0)
651 .add_partition_fields(partition_fields)
652 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
653 .context("failed to add partition columns")?
654 .build(),
655 )
656 }
657 None => None,
658 };
659
660 let table_creation_builder = TableCreation::builder()
661 .name(self.config.table.table_name().to_owned())
662 .schema(iceberg_schema);
663
664 let table_creation = match (location, partition_spec) {
665 (Some(location), Some(partition_spec)) => table_creation_builder
666 .location(location)
667 .partition_spec(partition_spec)
668 .build(),
669 (Some(location), None) => table_creation_builder.location(location).build(),
670 (None, Some(partition_spec)) => table_creation_builder
671 .partition_spec(partition_spec)
672 .build(),
673 (None, None) => table_creation_builder.build(),
674 };
675
676 catalog
677 .create_table(&namespace, table_creation)
678 .await
679 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
680 .context("failed to create iceberg table")?;
681 }
682 Ok(())
683 }
684
685 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
686 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
687 if let Some(pk) = &config.primary_key {
688 let mut unique_column_ids = Vec::with_capacity(pk.len());
689 for col_name in pk {
690 let id = param
691 .columns
692 .iter()
693 .find(|col| col.name.as_str() == col_name)
694 .ok_or_else(|| {
695 SinkError::Config(anyhow!(
696 "Primary key column {} not found in sink schema",
697 col_name
698 ))
699 })?
700 .column_id
701 .get_id() as usize;
702 unique_column_ids.push(id);
703 }
704 Some(unique_column_ids)
705 } else {
706 unreachable!()
707 }
708 } else {
709 None
710 };
711 Ok(Self {
712 config,
713 param,
714 unique_column_ids,
715 })
716 }
717}
718
719impl Sink for IcebergSink {
720 type Coordinator = IcebergSinkCommitter;
721 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
722
723 const SINK_NAME: &'static str = ICEBERG_SINK;
724
725 async fn validate(&self) -> Result<()> {
726 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
727 bail!("Snowflake catalog only supports iceberg sources");
728 }
729
730 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
731 risingwave_common::license::Feature::IcebergSinkWithGlue
732 .check_available()
733 .map_err(|e| anyhow::anyhow!(e))?;
734 }
735
736 let compaction_type = self.config.compaction_type();
738
739 if self.config.write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
742 && compaction_type != CompactionType::Full
743 {
744 bail!(
745 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
746 compaction_type
747 );
748 }
749
750 match compaction_type {
751 CompactionType::SmallFiles => {
752 risingwave_common::license::Feature::IcebergCompaction
754 .check_available()
755 .map_err(|e| anyhow::anyhow!(e))?;
756
757 if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
759 bail!(
760 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
761 self.config.write_mode
762 );
763 }
764
765 if self.config.delete_files_count_threshold.is_some() {
767 bail!(
768 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
769 );
770 }
771 }
772 CompactionType::FilesWithDelete => {
773 risingwave_common::license::Feature::IcebergCompaction
775 .check_available()
776 .map_err(|e| anyhow::anyhow!(e))?;
777
778 if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
780 bail!(
781 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
782 self.config.write_mode
783 );
784 }
785
786 if self.config.small_files_threshold_mb.is_some() {
788 bail!(
789 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
790 );
791 }
792 }
793 CompactionType::Full => {
794 }
796 }
797
798 let _ = self.create_and_validate_table().await?;
799 Ok(())
800 }
801
802 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
803 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
804
805 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
807 if iceberg_config.enable_compaction && compaction_interval == 0 {
808 bail!(
809 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
810 );
811 }
812
813 tracing::info!(
815 "Alter config compaction_interval set to {} seconds",
816 compaction_interval
817 );
818 }
819
820 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
822 && max_snapshots < 1
823 {
824 bail!(
825 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
826 max_snapshots
827 );
828 }
829
830 Ok(())
831 }
832
833 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
834 let table = self.create_and_validate_table().await?;
835 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
836 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
837 } else {
838 IcebergSinkWriter::new_append_only(table, &writer_param).await?
839 };
840
841 let commit_checkpoint_interval =
842 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
843 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
844 );
845 let writer = CoordinatedLogSinker::new(
846 &writer_param,
847 self.param.clone(),
848 inner,
849 commit_checkpoint_interval,
850 )
851 .await?;
852
853 Ok(writer)
854 }
855
856 fn is_coordinated_sink(&self) -> bool {
857 true
858 }
859
860 async fn new_coordinator(
861 &self,
862 db: DatabaseConnection,
863 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
864 ) -> Result<Self::Coordinator> {
865 let catalog = self.config.create_catalog().await?;
866 let table = self.create_and_validate_table().await?;
867 Ok(IcebergSinkCommitter {
868 catalog,
869 table,
870 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
871 last_commit_epoch: 0,
872 sink_id: self.param.sink_id,
873 config: self.config.clone(),
874 param: self.param.clone(),
875 db,
876 commit_retry_num: self.config.commit_retry_num,
877 committed_epoch_subscriber: None,
878 iceberg_compact_stat_sender,
879 })
880 }
881}
882
883enum ProjectIdxVec {
890 None,
891 Prepare(usize),
892 Done(Vec<usize>),
893}
894
895pub struct IcebergSinkWriter {
896 writer: IcebergWriterDispatch,
897 arrow_schema: SchemaRef,
898 metrics: IcebergWriterMetrics,
900 table: Table,
902 project_idx_vec: ProjectIdxVec,
905}
906
907#[allow(clippy::type_complexity)]
908enum IcebergWriterDispatch {
909 PartitionAppendOnly {
910 writer: Option<Box<dyn IcebergWriter>>,
911 writer_builder: MonitoredGeneralWriterBuilder<
912 FanoutPartitionWriterBuilder<
913 DataFileWriterBuilder<
914 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
915 >,
916 >,
917 >,
918 },
919 NonPartitionAppendOnly {
920 writer: Option<Box<dyn IcebergWriter>>,
921 writer_builder: MonitoredGeneralWriterBuilder<
922 DataFileWriterBuilder<
923 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
924 >,
925 >,
926 },
927 PartitionUpsert {
928 writer: Option<Box<dyn IcebergWriter>>,
929 writer_builder: MonitoredGeneralWriterBuilder<
930 FanoutPartitionWriterBuilder<
931 EqualityDeltaWriterBuilder<
932 DataFileWriterBuilder<
933 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
934 >,
935 MonitoredPositionDeleteWriterBuilder<
936 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
937 >,
938 EqualityDeleteFileWriterBuilder<
939 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
940 >,
941 >,
942 >,
943 >,
944 arrow_schema_with_op_column: SchemaRef,
945 },
946 NonpartitionUpsert {
947 writer: Option<Box<dyn IcebergWriter>>,
948 writer_builder: MonitoredGeneralWriterBuilder<
949 EqualityDeltaWriterBuilder<
950 DataFileWriterBuilder<
951 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
952 >,
953 MonitoredPositionDeleteWriterBuilder<
954 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
955 >,
956 EqualityDeleteFileWriterBuilder<
957 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
958 >,
959 >,
960 >,
961 arrow_schema_with_op_column: SchemaRef,
962 },
963}
964
965impl IcebergWriterDispatch {
966 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
967 match self {
968 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
969 | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
970 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
971 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
972 }
973 }
974}
975
976pub struct IcebergWriterMetrics {
977 _write_qps: LabelGuardedIntCounter,
982 _write_latency: LabelGuardedHistogram,
983 write_bytes: LabelGuardedIntCounter,
984}
985
986impl IcebergSinkWriter {
987 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
988 let SinkWriterParam {
989 extra_partition_col_idx,
990 actor_id,
991 sink_id,
992 sink_name,
993 ..
994 } = writer_param;
995 let metrics_labels = [
996 &actor_id.to_string(),
997 &sink_id.to_string(),
998 sink_name.as_str(),
999 ];
1000
1001 let write_qps = GLOBAL_SINK_METRICS
1003 .iceberg_write_qps
1004 .with_guarded_label_values(&metrics_labels);
1005 let write_latency = GLOBAL_SINK_METRICS
1006 .iceberg_write_latency
1007 .with_guarded_label_values(&metrics_labels);
1008 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1011 .iceberg_rolling_unflushed_data_file
1012 .with_guarded_label_values(&metrics_labels);
1013 let write_bytes = GLOBAL_SINK_METRICS
1014 .iceberg_write_bytes
1015 .with_guarded_label_values(&metrics_labels);
1016
1017 let schema = table.metadata().current_schema();
1018 let partition_spec = table.metadata().default_partition_spec();
1019
1020 let unique_uuid_suffix = Uuid::now_v7();
1022
1023 let parquet_writer_properties = WriterProperties::builder()
1024 .set_max_row_group_size(
1025 writer_param
1026 .streaming_config
1027 .developer
1028 .iceberg_sink_write_parquet_max_row_group_rows,
1029 )
1030 .build();
1031
1032 let parquet_writer_builder = ParquetWriterBuilder::new(
1033 parquet_writer_properties,
1034 schema.clone(),
1035 table.file_io().clone(),
1036 DefaultLocationGenerator::new(table.metadata().clone())
1037 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1038 DefaultFileNameGenerator::new(
1039 writer_param.actor_id.to_string(),
1040 Some(unique_uuid_suffix.to_string()),
1041 iceberg::spec::DataFileFormat::Parquet,
1042 ),
1043 );
1044 let data_file_builder =
1045 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
1046 if partition_spec.fields().is_empty() {
1047 let writer_builder = MonitoredGeneralWriterBuilder::new(
1048 data_file_builder,
1049 write_qps.clone(),
1050 write_latency.clone(),
1051 );
1052 let inner_writer = Some(Box::new(
1053 writer_builder
1054 .clone()
1055 .build()
1056 .await
1057 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1058 ) as Box<dyn IcebergWriter>);
1059 Ok(Self {
1060 arrow_schema: Arc::new(
1061 schema_to_arrow_schema(table.metadata().current_schema())
1062 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1063 ),
1064 metrics: IcebergWriterMetrics {
1065 _write_qps: write_qps,
1066 _write_latency: write_latency,
1067 write_bytes,
1068 },
1069 writer: IcebergWriterDispatch::NonPartitionAppendOnly {
1070 writer: inner_writer,
1071 writer_builder,
1072 },
1073 table,
1074 project_idx_vec: {
1075 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1076 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1077 } else {
1078 ProjectIdxVec::None
1079 }
1080 },
1081 })
1082 } else {
1083 let partition_builder = MonitoredGeneralWriterBuilder::new(
1084 FanoutPartitionWriterBuilder::new(
1085 data_file_builder,
1086 partition_spec.clone(),
1087 schema.clone(),
1088 )
1089 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1090 write_qps.clone(),
1091 write_latency.clone(),
1092 );
1093 let inner_writer = Some(Box::new(
1094 partition_builder
1095 .clone()
1096 .build()
1097 .await
1098 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1099 ) as Box<dyn IcebergWriter>);
1100 Ok(Self {
1101 arrow_schema: Arc::new(
1102 schema_to_arrow_schema(table.metadata().current_schema())
1103 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1104 ),
1105 metrics: IcebergWriterMetrics {
1106 _write_qps: write_qps,
1107 _write_latency: write_latency,
1108 write_bytes,
1109 },
1110 writer: IcebergWriterDispatch::PartitionAppendOnly {
1111 writer: inner_writer,
1112 writer_builder: partition_builder,
1113 },
1114 table,
1115 project_idx_vec: {
1116 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1117 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1118 } else {
1119 ProjectIdxVec::None
1120 }
1121 },
1122 })
1123 }
1124 }
1125
1126 pub async fn new_upsert(
1127 table: Table,
1128 unique_column_ids: Vec<usize>,
1129 writer_param: &SinkWriterParam,
1130 ) -> Result<Self> {
1131 let SinkWriterParam {
1132 extra_partition_col_idx,
1133 actor_id,
1134 sink_id,
1135 sink_name,
1136 ..
1137 } = writer_param;
1138 let metrics_labels = [
1139 &actor_id.to_string(),
1140 &sink_id.to_string(),
1141 sink_name.as_str(),
1142 ];
1143 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1144
1145 let write_qps = GLOBAL_SINK_METRICS
1147 .iceberg_write_qps
1148 .with_guarded_label_values(&metrics_labels);
1149 let write_latency = GLOBAL_SINK_METRICS
1150 .iceberg_write_latency
1151 .with_guarded_label_values(&metrics_labels);
1152 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1155 .iceberg_rolling_unflushed_data_file
1156 .with_guarded_label_values(&metrics_labels);
1157 let position_delete_cache_num = GLOBAL_SINK_METRICS
1158 .iceberg_position_delete_cache_num
1159 .with_guarded_label_values(&metrics_labels);
1160 let write_bytes = GLOBAL_SINK_METRICS
1161 .iceberg_write_bytes
1162 .with_guarded_label_values(&metrics_labels);
1163
1164 let schema = table.metadata().current_schema();
1166 let partition_spec = table.metadata().default_partition_spec();
1167
1168 let unique_uuid_suffix = Uuid::now_v7();
1170
1171 let parquet_writer_properties = WriterProperties::builder()
1172 .set_max_row_group_size(
1173 writer_param
1174 .streaming_config
1175 .developer
1176 .iceberg_sink_write_parquet_max_row_group_rows,
1177 )
1178 .build();
1179
1180 let data_file_builder = {
1181 let parquet_writer_builder = ParquetWriterBuilder::new(
1182 parquet_writer_properties.clone(),
1183 schema.clone(),
1184 table.file_io().clone(),
1185 DefaultLocationGenerator::new(table.metadata().clone())
1186 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1187 DefaultFileNameGenerator::new(
1188 writer_param.actor_id.to_string(),
1189 Some(unique_uuid_suffix.to_string()),
1190 iceberg::spec::DataFileFormat::Parquet,
1191 ),
1192 );
1193 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id())
1194 };
1195 let position_delete_builder = {
1196 let parquet_writer_builder = ParquetWriterBuilder::new(
1197 parquet_writer_properties.clone(),
1198 POSITION_DELETE_SCHEMA.clone(),
1199 table.file_io().clone(),
1200 DefaultLocationGenerator::new(table.metadata().clone())
1201 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1202 DefaultFileNameGenerator::new(
1203 writer_param.actor_id.to_string(),
1204 Some(format!("pos-del-{}", unique_uuid_suffix)),
1205 iceberg::spec::DataFileFormat::Parquet,
1206 ),
1207 );
1208 MonitoredPositionDeleteWriterBuilder::new(
1209 SortPositionDeleteWriterBuilder::new(
1210 parquet_writer_builder,
1211 writer_param
1212 .streaming_config
1213 .developer
1214 .iceberg_sink_positional_delete_cache_size,
1215 None,
1216 None,
1217 ),
1218 position_delete_cache_num,
1219 )
1220 };
1221 let equality_delete_builder = {
1222 let config = EqualityDeleteWriterConfig::new(
1223 unique_column_ids.clone(),
1224 table.metadata().current_schema().clone(),
1225 None,
1226 partition_spec.spec_id(),
1227 )
1228 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1229 let parquet_writer_builder = ParquetWriterBuilder::new(
1230 parquet_writer_properties.clone(),
1231 Arc::new(
1232 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1233 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1234 ),
1235 table.file_io().clone(),
1236 DefaultLocationGenerator::new(table.metadata().clone())
1237 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1238 DefaultFileNameGenerator::new(
1239 writer_param.actor_id.to_string(),
1240 Some(format!("eq-del-{}", unique_uuid_suffix)),
1241 iceberg::spec::DataFileFormat::Parquet,
1242 ),
1243 );
1244
1245 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder, config)
1246 };
1247 let delta_builder = EqualityDeltaWriterBuilder::new(
1248 data_file_builder,
1249 position_delete_builder,
1250 equality_delete_builder,
1251 unique_column_ids,
1252 );
1253 if partition_spec.fields().is_empty() {
1254 let writer_builder = MonitoredGeneralWriterBuilder::new(
1255 delta_builder,
1256 write_qps.clone(),
1257 write_latency.clone(),
1258 );
1259 let inner_writer = Some(Box::new(
1260 writer_builder
1261 .clone()
1262 .build()
1263 .await
1264 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1265 ) as Box<dyn IcebergWriter>);
1266 let original_arrow_schema = Arc::new(
1267 schema_to_arrow_schema(table.metadata().current_schema())
1268 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1269 );
1270 let schema_with_extra_op_column = {
1271 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1272 new_fields.push(Arc::new(ArrowField::new(
1273 "op".to_owned(),
1274 ArrowDataType::Int32,
1275 false,
1276 )));
1277 Arc::new(ArrowSchema::new(new_fields))
1278 };
1279 Ok(Self {
1280 arrow_schema: original_arrow_schema,
1281 metrics: IcebergWriterMetrics {
1282 _write_qps: write_qps,
1283 _write_latency: write_latency,
1284 write_bytes,
1285 },
1286 table,
1287 writer: IcebergWriterDispatch::NonpartitionUpsert {
1288 writer: inner_writer,
1289 writer_builder,
1290 arrow_schema_with_op_column: schema_with_extra_op_column,
1291 },
1292 project_idx_vec: {
1293 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1294 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1295 } else {
1296 ProjectIdxVec::None
1297 }
1298 },
1299 })
1300 } else {
1301 let original_arrow_schema = Arc::new(
1302 schema_to_arrow_schema(table.metadata().current_schema())
1303 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1304 );
1305 let schema_with_extra_op_column = {
1306 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1307 new_fields.push(Arc::new(ArrowField::new(
1308 "op".to_owned(),
1309 ArrowDataType::Int32,
1310 false,
1311 )));
1312 Arc::new(ArrowSchema::new(new_fields))
1313 };
1314 let partition_builder = MonitoredGeneralWriterBuilder::new(
1315 FanoutPartitionWriterBuilder::new_with_custom_schema(
1316 delta_builder,
1317 schema_with_extra_op_column.clone(),
1318 partition_spec.clone(),
1319 table.metadata().current_schema().clone(),
1320 ),
1321 write_qps.clone(),
1322 write_latency.clone(),
1323 );
1324 let inner_writer = Some(Box::new(
1325 partition_builder
1326 .clone()
1327 .build()
1328 .await
1329 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1330 ) as Box<dyn IcebergWriter>);
1331 Ok(Self {
1332 arrow_schema: original_arrow_schema,
1333 metrics: IcebergWriterMetrics {
1334 _write_qps: write_qps,
1335 _write_latency: write_latency,
1336 write_bytes,
1337 },
1338 table,
1339 writer: IcebergWriterDispatch::PartitionUpsert {
1340 writer: inner_writer,
1341 writer_builder: partition_builder,
1342 arrow_schema_with_op_column: schema_with_extra_op_column,
1343 },
1344 project_idx_vec: {
1345 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1346 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1347 } else {
1348 ProjectIdxVec::None
1349 }
1350 },
1351 })
1352 }
1353 }
1354}
1355
1356#[async_trait]
1357impl SinkWriter for IcebergSinkWriter {
1358 type CommitMetadata = Option<SinkMetadata>;
1359
1360 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1362 Ok(())
1364 }
1365
1366 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1368 match &mut self.writer {
1370 IcebergWriterDispatch::PartitionAppendOnly {
1371 writer,
1372 writer_builder,
1373 } => {
1374 if writer.is_none() {
1375 *writer = Some(Box::new(
1376 writer_builder
1377 .clone()
1378 .build()
1379 .await
1380 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1381 ));
1382 }
1383 }
1384 IcebergWriterDispatch::NonPartitionAppendOnly {
1385 writer,
1386 writer_builder,
1387 } => {
1388 if writer.is_none() {
1389 *writer = Some(Box::new(
1390 writer_builder
1391 .clone()
1392 .build()
1393 .await
1394 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1395 ));
1396 }
1397 }
1398 IcebergWriterDispatch::PartitionUpsert {
1399 writer,
1400 writer_builder,
1401 ..
1402 } => {
1403 if writer.is_none() {
1404 *writer = Some(Box::new(
1405 writer_builder
1406 .clone()
1407 .build()
1408 .await
1409 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1410 ));
1411 }
1412 }
1413 IcebergWriterDispatch::NonpartitionUpsert {
1414 writer,
1415 writer_builder,
1416 ..
1417 } => {
1418 if writer.is_none() {
1419 *writer = Some(Box::new(
1420 writer_builder
1421 .clone()
1422 .build()
1423 .await
1424 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1425 ));
1426 }
1427 }
1428 };
1429
1430 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1432 match &self.project_idx_vec {
1433 ProjectIdxVec::None => {}
1434 ProjectIdxVec::Prepare(idx) => {
1435 if *idx >= chunk.columns().len() {
1436 return Err(SinkError::Iceberg(anyhow!(
1437 "invalid extra partition column index {}",
1438 idx
1439 )));
1440 }
1441 let project_idx_vec = (0..*idx)
1442 .chain(*idx + 1..chunk.columns().len())
1443 .collect_vec();
1444 chunk = chunk.project(&project_idx_vec);
1445 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1446 }
1447 ProjectIdxVec::Done(idx_vec) => {
1448 chunk = chunk.project(idx_vec);
1449 }
1450 }
1451 if ops.is_empty() {
1452 return Ok(());
1453 }
1454 let write_batch_size = chunk.estimated_heap_size();
1455 let batch = match &self.writer {
1456 IcebergWriterDispatch::PartitionAppendOnly { .. }
1457 | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1458 let filters =
1460 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1461 chunk.set_visibility(filters);
1462 IcebergArrowConvert
1463 .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
1464 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1465 }
1466 IcebergWriterDispatch::PartitionUpsert {
1467 arrow_schema_with_op_column,
1468 ..
1469 }
1470 | IcebergWriterDispatch::NonpartitionUpsert {
1471 arrow_schema_with_op_column,
1472 ..
1473 } => {
1474 let chunk = IcebergArrowConvert
1475 .to_record_batch(self.arrow_schema.clone(), &chunk)
1476 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1477 let ops = Arc::new(Int32Array::from(
1478 ops.iter()
1479 .map(|op| match op {
1480 Op::UpdateInsert | Op::Insert => INSERT_OP,
1481 Op::UpdateDelete | Op::Delete => DELETE_OP,
1482 })
1483 .collect_vec(),
1484 ));
1485 let mut columns = chunk.columns().to_vec();
1486 columns.push(ops);
1487 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1488 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1489 }
1490 };
1491
1492 let writer = self.writer.get_writer().unwrap();
1493 writer
1494 .write(batch)
1495 .instrument_await("iceberg_write")
1496 .await
1497 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1498 self.metrics.write_bytes.inc_by(write_batch_size as _);
1499 Ok(())
1500 }
1501
1502 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1505 if !is_checkpoint {
1507 return Ok(None);
1508 }
1509
1510 let close_result = match &mut self.writer {
1511 IcebergWriterDispatch::PartitionAppendOnly {
1512 writer,
1513 writer_builder,
1514 } => {
1515 let close_result = match writer.take() {
1516 Some(mut writer) => {
1517 Some(writer.close().instrument_await("iceberg_close").await)
1518 }
1519 _ => None,
1520 };
1521 match writer_builder.clone().build().await {
1522 Ok(new_writer) => {
1523 *writer = Some(Box::new(new_writer));
1524 }
1525 _ => {
1526 warn!("Failed to build new writer after close");
1529 }
1530 }
1531 close_result
1532 }
1533 IcebergWriterDispatch::NonPartitionAppendOnly {
1534 writer,
1535 writer_builder,
1536 } => {
1537 let close_result = match writer.take() {
1538 Some(mut writer) => {
1539 Some(writer.close().instrument_await("iceberg_close").await)
1540 }
1541 _ => None,
1542 };
1543 match writer_builder.clone().build().await {
1544 Ok(new_writer) => {
1545 *writer = Some(Box::new(new_writer));
1546 }
1547 _ => {
1548 warn!("Failed to build new writer after close");
1551 }
1552 }
1553 close_result
1554 }
1555 IcebergWriterDispatch::PartitionUpsert {
1556 writer,
1557 writer_builder,
1558 ..
1559 } => {
1560 let close_result = match writer.take() {
1561 Some(mut writer) => {
1562 Some(writer.close().instrument_await("iceberg_close").await)
1563 }
1564 _ => None,
1565 };
1566 match writer_builder.clone().build().await {
1567 Ok(new_writer) => {
1568 *writer = Some(Box::new(new_writer));
1569 }
1570 _ => {
1571 warn!("Failed to build new writer after close");
1574 }
1575 }
1576 close_result
1577 }
1578 IcebergWriterDispatch::NonpartitionUpsert {
1579 writer,
1580 writer_builder,
1581 ..
1582 } => {
1583 let close_result = match writer.take() {
1584 Some(mut writer) => {
1585 Some(writer.close().instrument_await("iceberg_close").await)
1586 }
1587 _ => None,
1588 };
1589 match writer_builder.clone().build().await {
1590 Ok(new_writer) => {
1591 *writer = Some(Box::new(new_writer));
1592 }
1593 _ => {
1594 warn!("Failed to build new writer after close");
1597 }
1598 }
1599 close_result
1600 }
1601 };
1602
1603 match close_result {
1604 Some(Ok(result)) => {
1605 let version = self.table.metadata().format_version() as u8;
1606 let partition_type = self.table.metadata().default_partition_type();
1607 let data_files = result
1608 .into_iter()
1609 .map(|f| {
1610 SerializedDataFile::try_from(f, partition_type, version == 1)
1611 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1612 })
1613 .collect::<Result<Vec<_>>>()?;
1614 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1615 data_files,
1616 schema_id: self.table.metadata().current_schema_id(),
1617 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1618 })?))
1619 }
1620 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1621 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1622 }
1623 }
1624
1625 async fn abort(&mut self) -> Result<()> {
1627 Ok(())
1629 }
1630}
1631
1632const SCHEMA_ID: &str = "schema_id";
1633const PARTITION_SPEC_ID: &str = "partition_spec_id";
1634const DATA_FILES: &str = "data_files";
1635
1636#[derive(Default, Clone)]
1637struct IcebergCommitResult {
1638 schema_id: i32,
1639 partition_spec_id: i32,
1640 data_files: Vec<SerializedDataFile>,
1641}
1642
1643impl IcebergCommitResult {
1644 fn try_from(value: &SinkMetadata) -> Result<Self> {
1645 if let Some(Serialized(v)) = &value.metadata {
1646 let mut values = if let serde_json::Value::Object(v) =
1647 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1648 .context("Can't parse iceberg sink metadata")?
1649 {
1650 v
1651 } else {
1652 bail!("iceberg sink metadata should be an object");
1653 };
1654
1655 let schema_id;
1656 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1657 schema_id = value
1658 .as_u64()
1659 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1660 } else {
1661 bail!("iceberg sink metadata should have schema_id");
1662 }
1663
1664 let partition_spec_id;
1665 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1666 partition_spec_id = value
1667 .as_u64()
1668 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1669 } else {
1670 bail!("iceberg sink metadata should have partition_spec_id");
1671 }
1672
1673 let data_files: Vec<SerializedDataFile>;
1674 if let serde_json::Value::Array(values) = values
1675 .remove(DATA_FILES)
1676 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1677 {
1678 data_files = values
1679 .into_iter()
1680 .map(from_value::<SerializedDataFile>)
1681 .collect::<std::result::Result<_, _>>()
1682 .unwrap();
1683 } else {
1684 bail!("iceberg sink metadata should have data_files object");
1685 }
1686
1687 Ok(Self {
1688 schema_id: schema_id as i32,
1689 partition_spec_id: partition_spec_id as i32,
1690 data_files,
1691 })
1692 } else {
1693 bail!("Can't create iceberg sink write result from empty data!")
1694 }
1695 }
1696
1697 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1698 let mut values = if let serde_json::Value::Object(value) =
1699 serde_json::from_slice::<serde_json::Value>(&value)
1700 .context("Can't parse iceberg sink metadata")?
1701 {
1702 value
1703 } else {
1704 bail!("iceberg sink metadata should be an object");
1705 };
1706
1707 let schema_id;
1708 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1709 schema_id = value
1710 .as_u64()
1711 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1712 } else {
1713 bail!("iceberg sink metadata should have schema_id");
1714 }
1715
1716 let partition_spec_id;
1717 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1718 partition_spec_id = value
1719 .as_u64()
1720 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1721 } else {
1722 bail!("iceberg sink metadata should have partition_spec_id");
1723 }
1724
1725 let data_files: Vec<SerializedDataFile>;
1726 if let serde_json::Value::Array(values) = values
1727 .remove(DATA_FILES)
1728 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1729 {
1730 data_files = values
1731 .into_iter()
1732 .map(from_value::<SerializedDataFile>)
1733 .collect::<std::result::Result<_, _>>()
1734 .unwrap();
1735 } else {
1736 bail!("iceberg sink metadata should have data_files object");
1737 }
1738
1739 Ok(Self {
1740 schema_id: schema_id as i32,
1741 partition_spec_id: partition_spec_id as i32,
1742 data_files,
1743 })
1744 }
1745}
1746
1747impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1748 type Error = SinkError;
1749
1750 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1751 let json_data_files = serde_json::Value::Array(
1752 value
1753 .data_files
1754 .iter()
1755 .map(serde_json::to_value)
1756 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1757 .context("Can't serialize data files to json")?,
1758 );
1759 let json_value = serde_json::Value::Object(
1760 vec![
1761 (
1762 SCHEMA_ID.to_owned(),
1763 serde_json::Value::Number(value.schema_id.into()),
1764 ),
1765 (
1766 PARTITION_SPEC_ID.to_owned(),
1767 serde_json::Value::Number(value.partition_spec_id.into()),
1768 ),
1769 (DATA_FILES.to_owned(), json_data_files),
1770 ]
1771 .into_iter()
1772 .collect(),
1773 );
1774 Ok(SinkMetadata {
1775 metadata: Some(Serialized(SerializedMetadata {
1776 metadata: serde_json::to_vec(&json_value)
1777 .context("Can't serialize iceberg sink metadata")?,
1778 })),
1779 })
1780 }
1781}
1782
1783impl TryFrom<IcebergCommitResult> for Vec<u8> {
1784 type Error = SinkError;
1785
1786 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1787 let json_data_files = serde_json::Value::Array(
1788 value
1789 .data_files
1790 .iter()
1791 .map(serde_json::to_value)
1792 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1793 .context("Can't serialize data files to json")?,
1794 );
1795 let json_value = serde_json::Value::Object(
1796 vec![
1797 (
1798 SCHEMA_ID.to_owned(),
1799 serde_json::Value::Number(value.schema_id.into()),
1800 ),
1801 (
1802 PARTITION_SPEC_ID.to_owned(),
1803 serde_json::Value::Number(value.partition_spec_id.into()),
1804 ),
1805 (DATA_FILES.to_owned(), json_data_files),
1806 ]
1807 .into_iter()
1808 .collect(),
1809 );
1810 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1811 }
1812}
1813pub struct IcebergSinkCommitter {
1814 catalog: Arc<dyn Catalog>,
1815 table: Table,
1816 pub last_commit_epoch: u64,
1817 pub(crate) is_exactly_once: bool,
1818 pub(crate) sink_id: SinkId,
1819 pub(crate) config: IcebergConfig,
1820 pub(crate) param: SinkParam,
1821 pub(crate) db: DatabaseConnection,
1822 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1823 commit_retry_num: u32,
1824 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1825}
1826
1827impl IcebergSinkCommitter {
1828 async fn reload_table(
1831 catalog: &dyn Catalog,
1832 table_ident: &TableIdent,
1833 schema_id: i32,
1834 partition_spec_id: i32,
1835 ) -> Result<Table> {
1836 let table = catalog
1837 .load_table(table_ident)
1838 .await
1839 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1840 if table.metadata().current_schema_id() != schema_id {
1841 return Err(SinkError::Iceberg(anyhow!(
1842 "Schema evolution not supported, expect schema id {}, but got {}",
1843 schema_id,
1844 table.metadata().current_schema_id()
1845 )));
1846 }
1847 if table.metadata().default_partition_spec_id() != partition_spec_id {
1848 return Err(SinkError::Iceberg(anyhow!(
1849 "Partition evolution not supported, expect partition spec id {}, but got {}",
1850 partition_spec_id,
1851 table.metadata().default_partition_spec_id()
1852 )));
1853 }
1854 Ok(table)
1855 }
1856}
1857
1858#[async_trait::async_trait]
1859impl SinkCommitCoordinator for IcebergSinkCommitter {
1860 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1861 if self.is_exactly_once {
1862 self.committed_epoch_subscriber = Some(subscriber);
1863 tracing::info!(
1864 "Sink id = {}: iceberg sink coordinator initing.",
1865 self.param.sink_id
1866 );
1867 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id).await? {
1868 let ordered_metadata_list_by_end_epoch =
1869 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id).await?;
1870
1871 let mut last_recommit_epoch = 0;
1872 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1873 ordered_metadata_list_by_end_epoch
1874 {
1875 let write_results_bytes = deserialize_metadata(sealized_bytes);
1876 let mut write_results = vec![];
1877
1878 for each in write_results_bytes {
1879 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1880 write_results.push(write_result);
1881 }
1882
1883 match (
1884 committed,
1885 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1886 .await?,
1887 ) {
1888 (true, _) => {
1889 tracing::info!(
1890 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1891 self.param.sink_id
1892 );
1893 }
1894 (false, true) => {
1895 tracing::info!(
1897 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1898 self.param.sink_id
1899 );
1900 mark_row_is_committed_by_sink_id_and_end_epoch(
1901 &self.db,
1902 self.sink_id,
1903 end_epoch,
1904 )
1905 .await?;
1906 }
1907 (false, false) => {
1908 tracing::info!(
1909 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1910 self.param.sink_id
1911 );
1912 self.re_commit(end_epoch, write_results, snapshot_id)
1913 .await?;
1914 }
1915 }
1916
1917 last_recommit_epoch = end_epoch;
1918 }
1919 tracing::info!(
1920 "Sink id = {}: iceberg commit coordinator inited.",
1921 self.param.sink_id
1922 );
1923 return Ok(Some(last_recommit_epoch));
1924 } else {
1925 tracing::info!(
1926 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1927 self.param.sink_id
1928 );
1929 return Ok(None);
1930 }
1931 }
1932
1933 tracing::info!("Iceberg commit coordinator inited.");
1934 return Ok(None);
1935 }
1936
1937 async fn commit(
1938 &mut self,
1939 epoch: u64,
1940 metadata: Vec<SinkMetadata>,
1941 add_columns: Option<Vec<Field>>,
1942 ) -> Result<()> {
1943 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1944 if let Some(add_columns) = add_columns {
1945 return Err(anyhow!(
1946 "Iceberg sink not support add columns, but got: {:?}",
1947 add_columns
1948 )
1949 .into());
1950 }
1951
1952 let write_results: Vec<IcebergCommitResult> = metadata
1953 .iter()
1954 .map(IcebergCommitResult::try_from)
1955 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1956
1957 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1959 tracing::debug!(?epoch, "no data to commit");
1960 return Ok(());
1961 }
1962
1963 if write_results
1965 .iter()
1966 .any(|r| r.schema_id != write_results[0].schema_id)
1967 || write_results
1968 .iter()
1969 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1970 {
1971 return Err(SinkError::Iceberg(anyhow!(
1972 "schema_id and partition_spec_id should be the same in all write results"
1973 )));
1974 }
1975
1976 self.wait_for_snapshot_limit().await?;
1978
1979 if self.is_exactly_once {
1980 assert!(self.committed_epoch_subscriber.is_some());
1981 match self.committed_epoch_subscriber.clone() {
1982 Some(committed_epoch_subscriber) => {
1983 let (committed_epoch, mut rw_futures_utilrx) =
1985 committed_epoch_subscriber(self.param.sink_id).await?;
1986 if committed_epoch >= epoch {
1988 self.commit_iceberg_inner(epoch, write_results, None)
1989 .await?;
1990 } else {
1991 tracing::info!(
1992 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1993 committed_epoch,
1994 epoch
1995 );
1996 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1997 tracing::info!(
1998 "Received next committed epoch: {}",
1999 next_committed_epoch
2000 );
2001 if next_committed_epoch >= epoch {
2003 self.commit_iceberg_inner(epoch, write_results, None)
2004 .await?;
2005 break;
2006 }
2007 }
2008 }
2009 }
2010 None => unreachable!(
2011 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
2012 ),
2013 }
2014 } else {
2015 self.commit_iceberg_inner(epoch, write_results, None)
2016 .await?;
2017 }
2018
2019 Ok(())
2020 }
2021}
2022
2023impl IcebergSinkCommitter {
2025 async fn re_commit(
2026 &mut self,
2027 epoch: u64,
2028 write_results: Vec<IcebergCommitResult>,
2029 snapshot_id: i64,
2030 ) -> Result<()> {
2031 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
2032
2033 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2035 tracing::debug!(?epoch, "no data to commit");
2036 return Ok(());
2037 }
2038 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
2039 .await?;
2040 Ok(())
2041 }
2042
2043 async fn commit_iceberg_inner(
2044 &mut self,
2045 epoch: u64,
2046 write_results: Vec<IcebergCommitResult>,
2047 snapshot_id: Option<i64>,
2048 ) -> Result<()> {
2049 let is_first_commit = snapshot_id.is_none();
2053 self.last_commit_epoch = epoch;
2054 let expect_schema_id = write_results[0].schema_id;
2055 let expect_partition_spec_id = write_results[0].partition_spec_id;
2056
2057 self.table = Self::reload_table(
2059 self.catalog.as_ref(),
2060 self.table.identifier(),
2061 expect_schema_id,
2062 expect_partition_spec_id,
2063 )
2064 .await?;
2065 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2066 return Err(SinkError::Iceberg(anyhow!(
2067 "Can't find schema by id {}",
2068 expect_schema_id
2069 )));
2070 };
2071 let Some(partition_spec) = self
2072 .table
2073 .metadata()
2074 .partition_spec_by_id(expect_partition_spec_id)
2075 else {
2076 return Err(SinkError::Iceberg(anyhow!(
2077 "Can't find partition spec by id {}",
2078 expect_partition_spec_id
2079 )));
2080 };
2081 let partition_type = partition_spec
2082 .as_ref()
2083 .clone()
2084 .partition_type(schema)
2085 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2086
2087 let txn = Transaction::new(&self.table);
2088 let snapshot_id = match snapshot_id {
2090 Some(previous_snapshot_id) => previous_snapshot_id,
2091 None => txn.generate_unique_snapshot_id(),
2092 };
2093 if self.is_exactly_once && is_first_commit {
2094 let mut pre_commit_metadata_bytes = Vec::new();
2096 for each_parallelism_write_result in write_results.clone() {
2097 let each_parallelism_write_result_bytes: Vec<u8> =
2098 each_parallelism_write_result.try_into()?;
2099 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
2100 }
2101
2102 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
2103
2104 persist_pre_commit_metadata(
2105 self.sink_id,
2106 self.db.clone(),
2107 self.last_commit_epoch,
2108 epoch,
2109 pre_commit_metadata_bytes,
2110 snapshot_id,
2111 )
2112 .await?;
2113 }
2114
2115 let data_files = write_results
2116 .into_iter()
2117 .flat_map(|r| {
2118 r.data_files.into_iter().map(|f| {
2119 f.try_into(expect_partition_spec_id, &partition_type, schema)
2120 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2121 })
2122 })
2123 .collect::<Result<Vec<DataFile>>>()?;
2124 let retry_strategy = ExponentialBackoff::from_millis(10)
2129 .max_delay(Duration::from_secs(60))
2130 .map(jitter)
2131 .take(self.commit_retry_num as usize);
2132 let catalog = self.catalog.clone();
2133 let table_ident = self.table.identifier().clone();
2134 let table = Retry::spawn(retry_strategy, || async {
2135 let table = Self::reload_table(
2136 catalog.as_ref(),
2137 &table_ident,
2138 expect_schema_id,
2139 expect_partition_spec_id,
2140 )
2141 .await?;
2142 let txn = Transaction::new(&table);
2143 let mut append_action = txn
2144 .fast_append(Some(snapshot_id), None, vec![])
2145 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
2146 .with_to_branch(commit_branch(
2147 self.config.r#type.as_str(),
2148 self.config.write_mode.as_str(),
2149 ));
2150 append_action
2151 .add_data_files(data_files.clone())
2152 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2153 let tx = append_action.apply().await.map_err(|err| {
2154 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2155 SinkError::Iceberg(anyhow!(err))
2156 })?;
2157 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2158 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2159 SinkError::Iceberg(anyhow!(err))
2160 })
2161 })
2162 .await?;
2163 self.table = table;
2164
2165 let snapshot_num = self.table.metadata().snapshots().count();
2166 let catalog_name = self.config.common.catalog_name();
2167 let table_name = self.table.identifier().to_string();
2168 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2169 GLOBAL_SINK_METRICS
2170 .iceberg_snapshot_num
2171 .with_guarded_label_values(&metrics_labels)
2172 .set(snapshot_num as i64);
2173
2174 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2175
2176 if self.is_exactly_once {
2177 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2178 tracing::info!(
2179 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
2180 self.sink_id,
2181 epoch
2182 );
2183
2184 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2185 }
2186
2187 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2188 && self.config.enable_compaction
2189 && iceberg_compact_stat_sender
2190 .send(IcebergSinkCompactionUpdate {
2191 sink_id: self.sink_id,
2192 compaction_interval: self.config.compaction_interval_sec(),
2193 force_compaction: false,
2194 })
2195 .is_err()
2196 {
2197 warn!("failed to send iceberg compaction stats");
2198 }
2199
2200 Ok(())
2201 }
2202
2203 async fn is_snapshot_id_in_iceberg(
2207 &self,
2208 iceberg_config: &IcebergConfig,
2209 snapshot_id: i64,
2210 ) -> Result<bool> {
2211 let table = iceberg_config.load_table().await?;
2212 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2213 Ok(true)
2214 } else {
2215 Ok(false)
2216 }
2217 }
2218
2219 fn count_snapshots_since_rewrite(&self) -> usize {
2222 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2223 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2224
2225 let mut count = 0;
2227 for snapshot in snapshots {
2228 let summary = snapshot.summary();
2230 match &summary.operation {
2231 Operation::Replace => {
2232 break;
2234 }
2235
2236 _ => {
2237 count += 1;
2239 }
2240 }
2241 }
2242
2243 count
2244 }
2245
2246 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2248 if !self.config.enable_compaction {
2249 return Ok(());
2250 }
2251
2252 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2253 loop {
2254 let current_count = self.count_snapshots_since_rewrite();
2255
2256 if current_count < max_snapshots {
2257 tracing::info!(
2258 "Snapshot count check passed: {} < {}",
2259 current_count,
2260 max_snapshots
2261 );
2262 break;
2263 }
2264
2265 tracing::info!(
2266 "Snapshot count {} exceeds limit {}, waiting...",
2267 current_count,
2268 max_snapshots
2269 );
2270
2271 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2272 && iceberg_compact_stat_sender
2273 .send(IcebergSinkCompactionUpdate {
2274 sink_id: self.sink_id,
2275 compaction_interval: self.config.compaction_interval_sec(),
2276 force_compaction: true,
2277 })
2278 .is_err()
2279 {
2280 tracing::warn!("failed to send iceberg compaction stats");
2281 }
2282
2283 tokio::time::sleep(Duration::from_secs(30)).await;
2285
2286 self.table = self.config.load_table().await?;
2288 }
2289 }
2290 Ok(())
2291 }
2292}
2293
2294const MAP_KEY: &str = "key";
2295const MAP_VALUE: &str = "value";
2296
2297fn get_fields<'a>(
2298 our_field_type: &'a risingwave_common::types::DataType,
2299 data_type: &ArrowDataType,
2300 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2301) -> Option<ArrowFields> {
2302 match data_type {
2303 ArrowDataType::Struct(fields) => {
2304 match our_field_type {
2305 risingwave_common::types::DataType::Struct(struct_fields) => {
2306 struct_fields.iter().for_each(|(name, data_type)| {
2307 let res = schema_fields.insert(name, data_type);
2308 assert!(res.is_none())
2310 });
2311 }
2312 risingwave_common::types::DataType::Map(map_fields) => {
2313 schema_fields.insert(MAP_KEY, map_fields.key());
2314 schema_fields.insert(MAP_VALUE, map_fields.value());
2315 }
2316 risingwave_common::types::DataType::List(list) => {
2317 list.elem()
2318 .as_struct()
2319 .iter()
2320 .for_each(|(name, data_type)| {
2321 let res = schema_fields.insert(name, data_type);
2322 assert!(res.is_none())
2324 });
2325 }
2326 _ => {}
2327 };
2328 Some(fields.clone())
2329 }
2330 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2331 get_fields(our_field_type, field.data_type(), schema_fields)
2332 }
2333 _ => None, }
2335}
2336
2337fn check_compatibility(
2338 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2339 fields: &ArrowFields,
2340) -> anyhow::Result<bool> {
2341 for arrow_field in fields {
2342 let our_field_type = schema_fields
2343 .get(arrow_field.name().as_str())
2344 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2345
2346 let converted_arrow_data_type = IcebergArrowConvert
2348 .to_arrow_field("", our_field_type)
2349 .map_err(|e| anyhow!(e))?
2350 .data_type()
2351 .clone();
2352
2353 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2354 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2355 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2356 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2357 (ArrowDataType::List(_), ArrowDataType::List(field))
2358 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2359 let mut schema_fields = HashMap::new();
2360 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2361 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2362 }
2363 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2365 let mut schema_fields = HashMap::new();
2366 our_field_type
2367 .as_struct()
2368 .iter()
2369 .for_each(|(name, data_type)| {
2370 let res = schema_fields.insert(name, data_type);
2371 assert!(res.is_none())
2373 });
2374 check_compatibility(schema_fields, fields)?
2375 }
2376 (left, right) => left.equals_datatype(right),
2384 };
2385 if !compatible {
2386 bail!(
2387 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2388 arrow_field.name(),
2389 converted_arrow_data_type,
2390 arrow_field.data_type()
2391 );
2392 }
2393 }
2394 Ok(true)
2395}
2396
2397pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2399 if rw_schema.fields.len() != arrow_schema.fields().len() {
2400 bail!(
2401 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2402 rw_schema.fields.len(),
2403 arrow_schema.fields.len()
2404 );
2405 }
2406
2407 let mut schema_fields = HashMap::new();
2408 rw_schema.fields.iter().for_each(|field| {
2409 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2410 assert!(res.is_none())
2412 });
2413
2414 check_compatibility(schema_fields, &arrow_schema.fields)?;
2415 Ok(())
2416}
2417
2418pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2419 serde_json::to_vec(&metadata).unwrap()
2420}
2421
2422pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2423 serde_json::from_slice(&bytes).unwrap()
2424}
2425
2426pub fn parse_partition_by_exprs(
2427 expr: String,
2428) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2429 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2431 if !re.is_match(&expr) {
2432 bail!(format!(
2433 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2434 expr
2435 ))
2436 }
2437 let caps = re.captures_iter(&expr);
2438
2439 let mut partition_columns = vec![];
2440
2441 for mat in caps {
2442 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2443 (&mat["transform"], Transform::Identity)
2444 } else {
2445 let mut func = mat["transform"].to_owned();
2446 if func == "bucket" || func == "truncate" {
2447 let n = &mat
2448 .name("n")
2449 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2450 .as_str();
2451 func = format!("{func}[{n}]");
2452 }
2453 (
2454 &mat["field"],
2455 Transform::from_str(&func)
2456 .with_context(|| format!("invalid transform function {}", func))?,
2457 )
2458 };
2459 partition_columns.push((column.to_owned(), transform));
2460 }
2461 Ok(partition_columns)
2462}
2463
2464pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2465 if should_enable_iceberg_cow(sink_type, write_mode) {
2466 ICEBERG_COW_BRANCH.to_owned()
2467 } else {
2468 MAIN_BRANCH.to_owned()
2469 }
2470}
2471
2472pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2473 sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2474}
2475
2476#[cfg(test)]
2477mod test {
2478 use std::collections::BTreeMap;
2479
2480 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2481 use risingwave_common::types::{DataType, MapType, StructType};
2482
2483 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2484 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2485 use crate::sink::iceberg::{
2486 COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, ENABLE_COMPACTION,
2487 ENABLE_SNAPSHOT_EXPIRATION, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig,
2488 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2489 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2490 };
2491
2492 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2495 fn test_compatible_arrow_schema() {
2496 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2497
2498 use super::*;
2499 let risingwave_schema = Schema::new(vec![
2500 Field::with_name(DataType::Int32, "a"),
2501 Field::with_name(DataType::Int32, "b"),
2502 Field::with_name(DataType::Int32, "c"),
2503 ]);
2504 let arrow_schema = ArrowSchema::new(vec![
2505 ArrowField::new("a", ArrowDataType::Int32, false),
2506 ArrowField::new("b", ArrowDataType::Int32, false),
2507 ArrowField::new("c", ArrowDataType::Int32, false),
2508 ]);
2509
2510 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2511
2512 let risingwave_schema = Schema::new(vec![
2513 Field::with_name(DataType::Int32, "d"),
2514 Field::with_name(DataType::Int32, "c"),
2515 Field::with_name(DataType::Int32, "a"),
2516 Field::with_name(DataType::Int32, "b"),
2517 ]);
2518 let arrow_schema = ArrowSchema::new(vec![
2519 ArrowField::new("a", ArrowDataType::Int32, false),
2520 ArrowField::new("b", ArrowDataType::Int32, false),
2521 ArrowField::new("d", ArrowDataType::Int32, false),
2522 ArrowField::new("c", ArrowDataType::Int32, false),
2523 ]);
2524 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2525
2526 let risingwave_schema = Schema::new(vec![
2527 Field::with_name(
2528 DataType::Struct(StructType::new(vec![
2529 ("a1", DataType::Int32),
2530 (
2531 "a2",
2532 DataType::Struct(StructType::new(vec![
2533 ("a21", DataType::Bytea),
2534 (
2535 "a22",
2536 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2537 ),
2538 ])),
2539 ),
2540 ])),
2541 "a",
2542 ),
2543 Field::with_name(
2544 DataType::list(DataType::Struct(StructType::new(vec![
2545 ("b1", DataType::Int32),
2546 ("b2", DataType::Bytea),
2547 (
2548 "b3",
2549 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2550 ),
2551 ]))),
2552 "b",
2553 ),
2554 Field::with_name(
2555 DataType::Map(MapType::from_kv(
2556 DataType::Varchar,
2557 DataType::list(DataType::Struct(StructType::new([
2558 ("c1", DataType::Int32),
2559 ("c2", DataType::Bytea),
2560 (
2561 "c3",
2562 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2563 ),
2564 ]))),
2565 )),
2566 "c",
2567 ),
2568 ]);
2569 let arrow_schema = ArrowSchema::new(vec![
2570 ArrowField::new(
2571 "a",
2572 ArrowDataType::Struct(ArrowFields::from(vec![
2573 ArrowField::new("a1", ArrowDataType::Int32, false),
2574 ArrowField::new(
2575 "a2",
2576 ArrowDataType::Struct(ArrowFields::from(vec![
2577 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2578 ArrowField::new_map(
2579 "a22",
2580 "entries",
2581 ArrowFieldRef::new(ArrowField::new(
2582 "key",
2583 ArrowDataType::Utf8,
2584 false,
2585 )),
2586 ArrowFieldRef::new(ArrowField::new(
2587 "value",
2588 ArrowDataType::Utf8,
2589 false,
2590 )),
2591 false,
2592 false,
2593 ),
2594 ])),
2595 false,
2596 ),
2597 ])),
2598 false,
2599 ),
2600 ArrowField::new(
2601 "b",
2602 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2603 ArrowDataType::Struct(ArrowFields::from(vec![
2604 ArrowField::new("b1", ArrowDataType::Int32, false),
2605 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2606 ArrowField::new_map(
2607 "b3",
2608 "entries",
2609 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2610 ArrowFieldRef::new(ArrowField::new(
2611 "value",
2612 ArrowDataType::Utf8,
2613 false,
2614 )),
2615 false,
2616 false,
2617 ),
2618 ])),
2619 false,
2620 ))),
2621 false,
2622 ),
2623 ArrowField::new_map(
2624 "c",
2625 "entries",
2626 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2627 ArrowFieldRef::new(ArrowField::new(
2628 "value",
2629 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2630 ArrowDataType::Struct(ArrowFields::from(vec![
2631 ArrowField::new("c1", ArrowDataType::Int32, false),
2632 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2633 ArrowField::new_map(
2634 "c3",
2635 "entries",
2636 ArrowFieldRef::new(ArrowField::new(
2637 "key",
2638 ArrowDataType::Utf8,
2639 false,
2640 )),
2641 ArrowFieldRef::new(ArrowField::new(
2642 "value",
2643 ArrowDataType::Utf8,
2644 false,
2645 )),
2646 false,
2647 false,
2648 ),
2649 ])),
2650 false,
2651 ))),
2652 false,
2653 )),
2654 false,
2655 false,
2656 ),
2657 ]);
2658 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2659 }
2660
2661 #[test]
2662 fn test_parse_iceberg_config() {
2663 let values = [
2664 ("connector", "iceberg"),
2665 ("type", "upsert"),
2666 ("primary_key", "v1"),
2667 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2668 ("warehouse.path", "s3://iceberg"),
2669 ("s3.endpoint", "http://127.0.0.1:9301"),
2670 ("s3.access.key", "hummockadmin"),
2671 ("s3.secret.key", "hummockadmin"),
2672 ("s3.path.style.access", "true"),
2673 ("s3.region", "us-east-1"),
2674 ("catalog.type", "jdbc"),
2675 ("catalog.name", "demo"),
2676 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2677 ("catalog.jdbc.user", "admin"),
2678 ("catalog.jdbc.password", "123456"),
2679 ("database.name", "demo_db"),
2680 ("table.name", "demo_table"),
2681 ("enable_compaction", "true"),
2682 ("compaction_interval_sec", "1800"),
2683 ("enable_snapshot_expiration", "true"),
2684 ]
2685 .into_iter()
2686 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2687 .collect();
2688
2689 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2690
2691 let expected_iceberg_config = IcebergConfig {
2692 common: IcebergCommon {
2693 warehouse_path: Some("s3://iceberg".to_owned()),
2694 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2695 s3_region: Some("us-east-1".to_owned()),
2696 s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2697 s3_access_key: Some("hummockadmin".to_owned()),
2698 s3_secret_key: Some("hummockadmin".to_owned()),
2699 s3_iam_role_arn: None,
2700 gcs_credential: None,
2701 catalog_type: Some("jdbc".to_owned()),
2702 glue_id: None,
2703 glue_region: None,
2704 glue_access_key: None,
2705 glue_secret_key: None,
2706 glue_iam_role_arn: None,
2707 catalog_name: Some("demo".to_owned()),
2708 s3_path_style_access: Some(true),
2709 catalog_credential: None,
2710 catalog_oauth2_server_uri: None,
2711 catalog_scope: None,
2712 catalog_token: None,
2713 enable_config_load: None,
2714 rest_signing_name: None,
2715 rest_signing_region: None,
2716 rest_sigv4_enabled: None,
2717 hosted_catalog: None,
2718 azblob_account_name: None,
2719 azblob_account_key: None,
2720 azblob_endpoint_url: None,
2721 catalog_header: None,
2722 adlsgen2_account_name: None,
2723 adlsgen2_account_key: None,
2724 adlsgen2_endpoint: None,
2725 vended_credentials: None,
2726 },
2727 table: IcebergTableIdentifier {
2728 database_name: Some("demo_db".to_owned()),
2729 table_name: "demo_table".to_owned(),
2730 },
2731 r#type: "upsert".to_owned(),
2732 force_append_only: false,
2733 primary_key: Some(vec!["v1".to_owned()]),
2734 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2735 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2736 .into_iter()
2737 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2738 .collect(),
2739 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2740 create_table_if_not_exists: false,
2741 is_exactly_once: Some(true),
2742 commit_retry_num: 8,
2743 enable_compaction: true,
2744 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2745 enable_snapshot_expiration: true,
2746 write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2747 snapshot_expiration_max_age_millis: None,
2748 snapshot_expiration_retain_last: None,
2749 snapshot_expiration_clear_expired_files: true,
2750 snapshot_expiration_clear_expired_meta_data: true,
2751 max_snapshots_num_before_compaction: None,
2752 small_files_threshold_mb: None,
2753 delete_files_count_threshold: None,
2754 trigger_snapshot_count: None,
2755 target_file_size_mb: None,
2756 compaction_type: None,
2757 };
2758
2759 assert_eq!(iceberg_config, expected_iceberg_config);
2760
2761 assert_eq!(
2762 &iceberg_config.full_table_name().unwrap().to_string(),
2763 "demo_db.demo_table"
2764 );
2765 }
2766
2767 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2768 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2769
2770 let _table = iceberg_config.load_table().await.unwrap();
2771 }
2772
2773 #[tokio::test]
2774 #[ignore]
2775 async fn test_storage_catalog() {
2776 let values = [
2777 ("connector", "iceberg"),
2778 ("type", "append-only"),
2779 ("force_append_only", "true"),
2780 ("s3.endpoint", "http://127.0.0.1:9301"),
2781 ("s3.access.key", "hummockadmin"),
2782 ("s3.secret.key", "hummockadmin"),
2783 ("s3.region", "us-east-1"),
2784 ("s3.path.style.access", "true"),
2785 ("catalog.name", "demo"),
2786 ("catalog.type", "storage"),
2787 ("warehouse.path", "s3://icebergdata/demo"),
2788 ("database.name", "s1"),
2789 ("table.name", "t1"),
2790 ]
2791 .into_iter()
2792 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2793 .collect();
2794
2795 test_create_catalog(values).await;
2796 }
2797
2798 #[tokio::test]
2799 #[ignore]
2800 async fn test_rest_catalog() {
2801 let values = [
2802 ("connector", "iceberg"),
2803 ("type", "append-only"),
2804 ("force_append_only", "true"),
2805 ("s3.endpoint", "http://127.0.0.1:9301"),
2806 ("s3.access.key", "hummockadmin"),
2807 ("s3.secret.key", "hummockadmin"),
2808 ("s3.region", "us-east-1"),
2809 ("s3.path.style.access", "true"),
2810 ("catalog.name", "demo"),
2811 ("catalog.type", "rest"),
2812 ("catalog.uri", "http://192.168.167.4:8181"),
2813 ("warehouse.path", "s3://icebergdata/demo"),
2814 ("database.name", "s1"),
2815 ("table.name", "t1"),
2816 ]
2817 .into_iter()
2818 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2819 .collect();
2820
2821 test_create_catalog(values).await;
2822 }
2823
2824 #[tokio::test]
2825 #[ignore]
2826 async fn test_jdbc_catalog() {
2827 let values = [
2828 ("connector", "iceberg"),
2829 ("type", "append-only"),
2830 ("force_append_only", "true"),
2831 ("s3.endpoint", "http://127.0.0.1:9301"),
2832 ("s3.access.key", "hummockadmin"),
2833 ("s3.secret.key", "hummockadmin"),
2834 ("s3.region", "us-east-1"),
2835 ("s3.path.style.access", "true"),
2836 ("catalog.name", "demo"),
2837 ("catalog.type", "jdbc"),
2838 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2839 ("catalog.jdbc.user", "admin"),
2840 ("catalog.jdbc.password", "123456"),
2841 ("warehouse.path", "s3://icebergdata/demo"),
2842 ("database.name", "s1"),
2843 ("table.name", "t1"),
2844 ]
2845 .into_iter()
2846 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2847 .collect();
2848
2849 test_create_catalog(values).await;
2850 }
2851
2852 #[tokio::test]
2853 #[ignore]
2854 async fn test_hive_catalog() {
2855 let values = [
2856 ("connector", "iceberg"),
2857 ("type", "append-only"),
2858 ("force_append_only", "true"),
2859 ("s3.endpoint", "http://127.0.0.1:9301"),
2860 ("s3.access.key", "hummockadmin"),
2861 ("s3.secret.key", "hummockadmin"),
2862 ("s3.region", "us-east-1"),
2863 ("s3.path.style.access", "true"),
2864 ("catalog.name", "demo"),
2865 ("catalog.type", "hive"),
2866 ("catalog.uri", "thrift://localhost:9083"),
2867 ("warehouse.path", "s3://icebergdata/demo"),
2868 ("database.name", "s1"),
2869 ("table.name", "t1"),
2870 ]
2871 .into_iter()
2872 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2873 .collect();
2874
2875 test_create_catalog(values).await;
2876 }
2877
2878 #[test]
2879 fn test_config_constants_consistency() {
2880 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2883 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2884 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2885 assert_eq!(WRITE_MODE, "write_mode");
2886 assert_eq!(
2887 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2888 "snapshot_expiration_retain_last"
2889 );
2890 assert_eq!(
2891 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2892 "snapshot_expiration_max_age_millis"
2893 );
2894 assert_eq!(
2895 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2896 "snapshot_expiration_clear_expired_files"
2897 );
2898 assert_eq!(
2899 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2900 "snapshot_expiration_clear_expired_meta_data"
2901 );
2902 assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2903 }
2904
2905 #[test]
2906 fn test_parse_iceberg_compaction_config() {
2907 let values = [
2909 ("connector", "iceberg"),
2910 ("type", "upsert"),
2911 ("primary_key", "id"),
2912 ("warehouse.path", "s3://iceberg"),
2913 ("s3.endpoint", "http://127.0.0.1:9301"),
2914 ("s3.access.key", "test"),
2915 ("s3.secret.key", "test"),
2916 ("s3.region", "us-east-1"),
2917 ("catalog.type", "storage"),
2918 ("catalog.name", "demo"),
2919 ("database.name", "test_db"),
2920 ("table.name", "test_table"),
2921 ("enable_compaction", "true"),
2922 ("compaction.max_snapshots_num", "100"),
2923 ("compaction.small_files_threshold_mb", "512"),
2924 ("compaction.delete_files_count_threshold", "50"),
2925 ("compaction.trigger_snapshot_count", "10"),
2926 ("compaction.target_file_size_mb", "256"),
2927 ("compaction.type", "full"),
2928 ]
2929 .into_iter()
2930 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2931 .collect();
2932
2933 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2934
2935 assert!(iceberg_config.enable_compaction);
2937 assert_eq!(
2938 iceberg_config.max_snapshots_num_before_compaction,
2939 Some(100)
2940 );
2941 assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2942 assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2943 assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2944 assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2945 assert_eq!(iceberg_config.compaction_type, Some("full".to_owned()));
2946 }
2947}