1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30 UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43 DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59 Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112 "snapshot_expiration_clear_expired_meta_data";
113pub const MAX_SNAPSHOTS_NUM: &str = "max_snapshots_num_before_compaction";
114
115fn deserialize_and_normalize_string<'de, D>(
116 deserializer: D,
117) -> std::result::Result<String, D::Error>
118where
119 D: serde::Deserializer<'de>,
120{
121 let s = String::deserialize(deserializer)?;
122 Ok(s.to_lowercase().replace('_', "-"))
123}
124
125fn deserialize_and_normalize_optional_string<'de, D>(
126 deserializer: D,
127) -> std::result::Result<Option<String>, D::Error>
128where
129 D: serde::Deserializer<'de>,
130{
131 let opt = Option::<String>::deserialize(deserializer)?;
132 Ok(opt.map(|s| s.to_lowercase().replace('_', "-")))
133}
134
135fn default_commit_retry_num() -> u32 {
136 8
137}
138
139fn default_iceberg_write_mode() -> String {
140 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
141}
142
143fn default_true() -> bool {
144 true
145}
146
147fn default_some_true() -> Option<bool> {
148 Some(true)
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Default)]
153#[serde(rename_all = "kebab-case")]
154pub enum CompactionType {
155 #[default]
157 Full,
158 SmallFiles,
160 FilesWithDelete,
162}
163
164impl CompactionType {
165 pub fn as_str(&self) -> &'static str {
166 match self {
167 CompactionType::Full => "full",
168 CompactionType::SmallFiles => "small-files",
169 CompactionType::FilesWithDelete => "files-with-delete",
170 }
171 }
172}
173
174impl std::fmt::Display for CompactionType {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 write!(f, "{}", self.as_str())
177 }
178}
179
180impl FromStr for CompactionType {
181 type Err = String;
182
183 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
184 let normalized = s.to_lowercase().replace('_', "-");
186
187 match normalized.as_str() {
188 "full" => Ok(CompactionType::Full),
189 "small-files" => Ok(CompactionType::SmallFiles),
190 "files-with-delete" => Ok(CompactionType::FilesWithDelete),
191 _ => Err(format!(
192 "Unknown compaction type '{}', must be one of: 'full', 'small-files', 'files-with-delete'",
193 s )),
195 }
196 }
197}
198
199#[serde_as]
200#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
201pub struct IcebergConfig {
202 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
205 pub force_append_only: bool,
206
207 #[serde(flatten)]
208 common: IcebergCommon,
209
210 #[serde(flatten)]
211 table: IcebergTableIdentifier,
212
213 #[serde(
214 rename = "primary_key",
215 default,
216 deserialize_with = "deserialize_optional_string_seq_from_string"
217 )]
218 pub primary_key: Option<Vec<String>>,
219
220 #[serde(skip)]
222 pub java_catalog_props: HashMap<String, String>,
223
224 #[serde(default)]
225 pub partition_by: Option<String>,
226
227 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
229 #[serde_as(as = "DisplayFromStr")]
230 #[with_option(allow_alter_on_fly)]
231 pub commit_checkpoint_interval: u64,
232
233 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
234 pub create_table_if_not_exists: bool,
235
236 #[serde(default = "default_some_true")]
238 #[serde_as(as = "Option<DisplayFromStr>")]
239 pub is_exactly_once: Option<bool>,
240 #[serde(default = "default_commit_retry_num")]
245 pub commit_retry_num: u32,
246
247 #[serde(
249 rename = "enable_compaction",
250 default,
251 deserialize_with = "deserialize_bool_from_string"
252 )]
253 #[with_option(allow_alter_on_fly)]
254 pub enable_compaction: bool,
255
256 #[serde(rename = "compaction_interval_sec", default)]
258 #[serde_as(as = "Option<DisplayFromStr>")]
259 #[with_option(allow_alter_on_fly)]
260 pub compaction_interval_sec: Option<u64>,
261
262 #[serde(
264 rename = "enable_snapshot_expiration",
265 default,
266 deserialize_with = "deserialize_bool_from_string"
267 )]
268 #[with_option(allow_alter_on_fly)]
269 pub enable_snapshot_expiration: bool,
270
271 #[serde(
273 rename = "write_mode",
274 default = "default_iceberg_write_mode",
275 deserialize_with = "deserialize_and_normalize_string"
276 )]
277 pub write_mode: String,
278
279 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
282 #[serde_as(as = "Option<DisplayFromStr>")]
283 #[with_option(allow_alter_on_fly)]
284 pub snapshot_expiration_max_age_millis: Option<i64>,
285
286 #[serde(rename = "snapshot_expiration_retain_last", default)]
288 #[serde_as(as = "Option<DisplayFromStr>")]
289 #[with_option(allow_alter_on_fly)]
290 pub snapshot_expiration_retain_last: Option<i32>,
291
292 #[serde(
293 rename = "snapshot_expiration_clear_expired_files",
294 default = "default_true",
295 deserialize_with = "deserialize_bool_from_string"
296 )]
297 #[with_option(allow_alter_on_fly)]
298 pub snapshot_expiration_clear_expired_files: bool,
299
300 #[serde(
301 rename = "snapshot_expiration_clear_expired_meta_data",
302 default = "default_true",
303 deserialize_with = "deserialize_bool_from_string"
304 )]
305 #[with_option(allow_alter_on_fly)]
306 pub snapshot_expiration_clear_expired_meta_data: bool,
307
308 #[serde(rename = "compaction.max_snapshots_num", default)]
311 #[serde_as(as = "Option<DisplayFromStr>")]
312 #[with_option(allow_alter_on_fly)]
313 pub max_snapshots_num_before_compaction: Option<usize>,
314
315 #[serde(rename = "compaction.small_files_threshold_mb", default)]
316 #[serde_as(as = "Option<DisplayFromStr>")]
317 #[with_option(allow_alter_on_fly)]
318 pub small_files_threshold_mb: Option<u64>,
319
320 #[serde(rename = "compaction.delete_files_count_threshold", default)]
321 #[serde_as(as = "Option<DisplayFromStr>")]
322 #[with_option(allow_alter_on_fly)]
323 pub delete_files_count_threshold: Option<usize>,
324
325 #[serde(rename = "compaction.trigger_snapshot_count", default)]
326 #[serde_as(as = "Option<DisplayFromStr>")]
327 #[with_option(allow_alter_on_fly)]
328 pub trigger_snapshot_count: Option<usize>,
329
330 #[serde(rename = "compaction.target_file_size_mb", default)]
331 #[serde_as(as = "Option<DisplayFromStr>")]
332 #[with_option(allow_alter_on_fly)]
333 pub target_file_size_mb: Option<u64>,
334
335 #[serde(
338 rename = "compaction.type",
339 default,
340 deserialize_with = "deserialize_and_normalize_optional_string"
341 )]
342 #[with_option(allow_alter_on_fly)]
343 pub compaction_type: Option<String>,
344}
345
346impl EnforceSecret for IcebergConfig {
347 fn enforce_secret<'a>(
348 prop_iter: impl Iterator<Item = &'a str>,
349 ) -> crate::error::ConnectorResult<()> {
350 for prop in prop_iter {
351 IcebergCommon::enforce_one(prop)?;
352 }
353 Ok(())
354 }
355
356 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
357 IcebergCommon::enforce_one(prop)
358 }
359}
360
361impl IcebergConfig {
362 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
363 let mut config =
364 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
365 .map_err(|e| SinkError::Config(anyhow!(e)))?;
366
367 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
368 return Err(SinkError::Config(anyhow!(
369 "`{}` must be {}, or {}",
370 SINK_TYPE_OPTION,
371 SINK_TYPE_APPEND_ONLY,
372 SINK_TYPE_UPSERT
373 )));
374 }
375
376 if config.r#type == SINK_TYPE_UPSERT {
377 if let Some(primary_key) = &config.primary_key {
378 if primary_key.is_empty() {
379 return Err(SinkError::Config(anyhow!(
380 "`primary-key` must not be empty in {}",
381 SINK_TYPE_UPSERT
382 )));
383 }
384 } else {
385 return Err(SinkError::Config(anyhow!(
386 "Must set `primary-key` in {}",
387 SINK_TYPE_UPSERT
388 )));
389 }
390 }
391
392 config.java_catalog_props = values
394 .iter()
395 .filter(|(k, _v)| {
396 k.starts_with("catalog.")
397 && k != &"catalog.uri"
398 && k != &"catalog.type"
399 && k != &"catalog.name"
400 && k != &"catalog.header"
401 })
402 .map(|(k, v)| (k[8..].to_string(), v.clone()))
403 .collect();
404
405 if config.commit_checkpoint_interval == 0 {
406 return Err(SinkError::Config(anyhow!(
407 "`commit-checkpoint-interval` must be greater than 0"
408 )));
409 }
410
411 if let Some(ref compaction_type_str) = config.compaction_type {
413 CompactionType::from_str(compaction_type_str)
414 .map_err(|e| SinkError::Config(anyhow!(e)))?;
415 }
416
417 Ok(config)
418 }
419
420 pub fn catalog_type(&self) -> &str {
421 self.common.catalog_type()
422 }
423
424 pub async fn load_table(&self) -> Result<Table> {
425 self.common
426 .load_table(&self.table, &self.java_catalog_props)
427 .await
428 .map_err(Into::into)
429 }
430
431 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
432 self.common
433 .create_catalog(&self.java_catalog_props)
434 .await
435 .map_err(Into::into)
436 }
437
438 pub fn full_table_name(&self) -> Result<TableIdent> {
439 self.table.to_table_ident().map_err(Into::into)
440 }
441
442 pub fn catalog_name(&self) -> String {
443 self.common.catalog_name()
444 }
445
446 pub fn compaction_interval_sec(&self) -> u64 {
447 self.compaction_interval_sec.unwrap_or(3600)
449 }
450
451 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
454 self.snapshot_expiration_max_age_millis
455 .map(|max_age_millis| current_time_ms - max_age_millis)
456 }
457
458 pub fn trigger_snapshot_count(&self) -> usize {
459 self.trigger_snapshot_count.unwrap_or(16)
460 }
461
462 pub fn small_files_threshold_mb(&self) -> u64 {
463 self.small_files_threshold_mb.unwrap_or(64)
464 }
465
466 pub fn delete_files_count_threshold(&self) -> usize {
467 self.delete_files_count_threshold.unwrap_or(256)
468 }
469
470 pub fn target_file_size_mb(&self) -> u64 {
471 self.target_file_size_mb.unwrap_or(1024)
472 }
473
474 pub fn compaction_type(&self) -> CompactionType {
477 self.compaction_type
478 .as_deref()
479 .and_then(|s| CompactionType::from_str(s).ok())
480 .unwrap_or_default()
481 }
482}
483
484pub struct IcebergSink {
485 pub config: IcebergConfig,
486 param: SinkParam,
487 unique_column_ids: Option<Vec<usize>>,
489}
490
491impl EnforceSecret for IcebergSink {
492 fn enforce_secret<'a>(
493 prop_iter: impl Iterator<Item = &'a str>,
494 ) -> crate::error::ConnectorResult<()> {
495 for prop in prop_iter {
496 IcebergConfig::enforce_one(prop)?;
497 }
498 Ok(())
499 }
500}
501
502impl TryFrom<SinkParam> for IcebergSink {
503 type Error = SinkError;
504
505 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
506 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
507 IcebergSink::new(config, param)
508 }
509}
510
511impl Debug for IcebergSink {
512 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513 f.debug_struct("IcebergSink")
514 .field("config", &self.config)
515 .finish()
516 }
517}
518
519impl IcebergSink {
520 pub async fn create_and_validate_table(&self) -> Result<Table> {
521 if self.config.create_table_if_not_exists {
522 self.create_table_if_not_exists().await?;
523 }
524
525 let table = self
526 .config
527 .load_table()
528 .await
529 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
530
531 let sink_schema = self.param.schema();
532 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
533 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
534
535 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
536 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
537
538 Ok(table)
539 }
540
541 pub async fn create_table_if_not_exists(&self) -> Result<()> {
542 let catalog = self.config.create_catalog().await?;
543 let namespace = if let Some(database_name) = self.config.table.database_name() {
544 let namespace = NamespaceIdent::new(database_name.to_owned());
545 if !catalog
546 .namespace_exists(&namespace)
547 .await
548 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
549 {
550 catalog
551 .create_namespace(&namespace, HashMap::default())
552 .await
553 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
554 .context("failed to create iceberg namespace")?;
555 }
556 namespace
557 } else {
558 bail!("database name must be set if you want to create table")
559 };
560
561 let table_id = self
562 .config
563 .full_table_name()
564 .context("Unable to parse table name")?;
565 if !catalog
566 .table_exists(&table_id)
567 .await
568 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
569 {
570 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
571 let arrow_fields = self
573 .param
574 .columns
575 .iter()
576 .map(|column| {
577 Ok(iceberg_create_table_arrow_convert
578 .to_arrow_field(&column.name, &column.data_type)
579 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
580 .context(format!(
581 "failed to convert {}: {} to arrow type",
582 &column.name, &column.data_type
583 ))?)
584 })
585 .collect::<Result<Vec<ArrowField>>>()?;
586 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
587 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
588 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
589 .context("failed to convert arrow schema to iceberg schema")?;
590
591 let location = {
592 let mut names = namespace.clone().inner();
593 names.push(self.config.table.table_name().to_owned());
594 match &self.config.common.warehouse_path {
595 Some(warehouse_path) => {
596 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
597 let url = Url::parse(warehouse_path);
598 if url.is_err() || is_s3_tables {
599 if self.config.common.catalog_type() == "rest"
602 || self.config.common.catalog_type() == "rest_rust"
603 {
604 None
605 } else {
606 bail!(format!("Invalid warehouse path: {}", warehouse_path))
607 }
608 } else if warehouse_path.ends_with('/') {
609 Some(format!("{}{}", warehouse_path, names.join("/")))
610 } else {
611 Some(format!("{}/{}", warehouse_path, names.join("/")))
612 }
613 }
614 None => None,
615 }
616 };
617
618 let partition_spec = match &self.config.partition_by {
619 Some(partition_by) => {
620 let mut partition_fields = Vec::<UnboundPartitionField>::new();
621 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
622 .into_iter()
623 .enumerate()
624 {
625 match iceberg_schema.field_id_by_name(&column) {
626 Some(id) => partition_fields.push(
627 UnboundPartitionField::builder()
628 .source_id(id)
629 .transform(transform)
630 .name(format!("_p_{}", column))
631 .field_id(i as i32)
632 .build(),
633 ),
634 None => bail!(format!(
635 "Partition source column does not exist in schema: {}",
636 column
637 )),
638 };
639 }
640 Some(
641 UnboundPartitionSpec::builder()
642 .with_spec_id(0)
643 .add_partition_fields(partition_fields)
644 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
645 .context("failed to add partition columns")?
646 .build(),
647 )
648 }
649 None => None,
650 };
651
652 let table_creation_builder = TableCreation::builder()
653 .name(self.config.table.table_name().to_owned())
654 .schema(iceberg_schema);
655
656 let table_creation = match (location, partition_spec) {
657 (Some(location), Some(partition_spec)) => table_creation_builder
658 .location(location)
659 .partition_spec(partition_spec)
660 .build(),
661 (Some(location), None) => table_creation_builder.location(location).build(),
662 (None, Some(partition_spec)) => table_creation_builder
663 .partition_spec(partition_spec)
664 .build(),
665 (None, None) => table_creation_builder.build(),
666 };
667
668 catalog
669 .create_table(&namespace, table_creation)
670 .await
671 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
672 .context("failed to create iceberg table")?;
673 }
674 Ok(())
675 }
676
677 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
678 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
679 if let Some(pk) = &config.primary_key {
680 let mut unique_column_ids = Vec::with_capacity(pk.len());
681 for col_name in pk {
682 let id = param
683 .columns
684 .iter()
685 .find(|col| col.name.as_str() == col_name)
686 .ok_or_else(|| {
687 SinkError::Config(anyhow!(
688 "Primary key column {} not found in sink schema",
689 col_name
690 ))
691 })?
692 .column_id
693 .get_id() as usize;
694 unique_column_ids.push(id);
695 }
696 Some(unique_column_ids)
697 } else {
698 unreachable!()
699 }
700 } else {
701 None
702 };
703 Ok(Self {
704 config,
705 param,
706 unique_column_ids,
707 })
708 }
709}
710
711impl Sink for IcebergSink {
712 type Coordinator = IcebergSinkCommitter;
713 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
714
715 const SINK_NAME: &'static str = ICEBERG_SINK;
716
717 async fn validate(&self) -> Result<()> {
718 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
719 bail!("Snowflake catalog only supports iceberg sources");
720 }
721
722 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
723 risingwave_common::license::Feature::IcebergSinkWithGlue
724 .check_available()
725 .map_err(|e| anyhow::anyhow!(e))?;
726 }
727
728 let compaction_type = self.config.compaction_type();
730
731 if self.config.write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
734 && compaction_type != CompactionType::Full
735 {
736 bail!(
737 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
738 compaction_type
739 );
740 }
741
742 match compaction_type {
743 CompactionType::SmallFiles => {
744 risingwave_common::license::Feature::IcebergCompaction
746 .check_available()
747 .map_err(|e| anyhow::anyhow!(e))?;
748
749 if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
751 bail!(
752 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
753 self.config.write_mode
754 );
755 }
756
757 if self.config.delete_files_count_threshold.is_some() {
759 bail!(
760 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
761 );
762 }
763 }
764 CompactionType::FilesWithDelete => {
765 risingwave_common::license::Feature::IcebergCompaction
767 .check_available()
768 .map_err(|e| anyhow::anyhow!(e))?;
769
770 if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
772 bail!(
773 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
774 self.config.write_mode
775 );
776 }
777
778 if self.config.small_files_threshold_mb.is_some() {
780 bail!(
781 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
782 );
783 }
784 }
785 CompactionType::Full => {
786 }
788 }
789
790 let _ = self.create_and_validate_table().await?;
791 Ok(())
792 }
793
794 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
795 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
796
797 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
799 if iceberg_config.enable_compaction && compaction_interval == 0 {
800 bail!(
801 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
802 );
803 }
804
805 tracing::info!(
807 "Alter config compaction_interval set to {} seconds",
808 compaction_interval
809 );
810 }
811
812 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
814 && max_snapshots < 1
815 {
816 bail!(
817 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
818 max_snapshots
819 );
820 }
821
822 Ok(())
823 }
824
825 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
826 let table = self.create_and_validate_table().await?;
827 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
828 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
829 } else {
830 IcebergSinkWriter::new_append_only(table, &writer_param).await?
831 };
832
833 let commit_checkpoint_interval =
834 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
835 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
836 );
837 let writer = CoordinatedLogSinker::new(
838 &writer_param,
839 self.param.clone(),
840 inner,
841 commit_checkpoint_interval,
842 )
843 .await?;
844
845 Ok(writer)
846 }
847
848 fn is_coordinated_sink(&self) -> bool {
849 true
850 }
851
852 async fn new_coordinator(
853 &self,
854 db: DatabaseConnection,
855 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
856 ) -> Result<Self::Coordinator> {
857 let catalog = self.config.create_catalog().await?;
858 let table = self.create_and_validate_table().await?;
859 Ok(IcebergSinkCommitter {
860 catalog,
861 table,
862 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
863 last_commit_epoch: 0,
864 sink_id: self.param.sink_id,
865 config: self.config.clone(),
866 param: self.param.clone(),
867 db,
868 commit_retry_num: self.config.commit_retry_num,
869 committed_epoch_subscriber: None,
870 iceberg_compact_stat_sender,
871 })
872 }
873}
874
875enum ProjectIdxVec {
882 None,
883 Prepare(usize),
884 Done(Vec<usize>),
885}
886
887pub struct IcebergSinkWriter {
888 writer: IcebergWriterDispatch,
889 arrow_schema: SchemaRef,
890 metrics: IcebergWriterMetrics,
892 table: Table,
894 project_idx_vec: ProjectIdxVec,
897}
898
899#[allow(clippy::type_complexity)]
900enum IcebergWriterDispatch {
901 PartitionAppendOnly {
902 writer: Option<Box<dyn IcebergWriter>>,
903 writer_builder: MonitoredGeneralWriterBuilder<
904 FanoutPartitionWriterBuilder<
905 DataFileWriterBuilder<
906 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
907 >,
908 >,
909 >,
910 },
911 NonPartitionAppendOnly {
912 writer: Option<Box<dyn IcebergWriter>>,
913 writer_builder: MonitoredGeneralWriterBuilder<
914 DataFileWriterBuilder<
915 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
916 >,
917 >,
918 },
919 PartitionUpsert {
920 writer: Option<Box<dyn IcebergWriter>>,
921 writer_builder: MonitoredGeneralWriterBuilder<
922 FanoutPartitionWriterBuilder<
923 EqualityDeltaWriterBuilder<
924 DataFileWriterBuilder<
925 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
926 >,
927 MonitoredPositionDeleteWriterBuilder<
928 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
929 >,
930 EqualityDeleteFileWriterBuilder<
931 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
932 >,
933 >,
934 >,
935 >,
936 arrow_schema_with_op_column: SchemaRef,
937 },
938 NonpartitionUpsert {
939 writer: Option<Box<dyn IcebergWriter>>,
940 writer_builder: MonitoredGeneralWriterBuilder<
941 EqualityDeltaWriterBuilder<
942 DataFileWriterBuilder<
943 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
944 >,
945 MonitoredPositionDeleteWriterBuilder<
946 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
947 >,
948 EqualityDeleteFileWriterBuilder<
949 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
950 >,
951 >,
952 >,
953 arrow_schema_with_op_column: SchemaRef,
954 },
955}
956
957impl IcebergWriterDispatch {
958 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
959 match self {
960 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
961 | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
962 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
963 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
964 }
965 }
966}
967
968pub struct IcebergWriterMetrics {
969 _write_qps: LabelGuardedIntCounter,
974 _write_latency: LabelGuardedHistogram,
975 write_bytes: LabelGuardedIntCounter,
976}
977
978impl IcebergSinkWriter {
979 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
980 let SinkWriterParam {
981 extra_partition_col_idx,
982 actor_id,
983 sink_id,
984 sink_name,
985 ..
986 } = writer_param;
987 let metrics_labels = [
988 &actor_id.to_string(),
989 &sink_id.to_string(),
990 sink_name.as_str(),
991 ];
992
993 let write_qps = GLOBAL_SINK_METRICS
995 .iceberg_write_qps
996 .with_guarded_label_values(&metrics_labels);
997 let write_latency = GLOBAL_SINK_METRICS
998 .iceberg_write_latency
999 .with_guarded_label_values(&metrics_labels);
1000 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1003 .iceberg_rolling_unflushed_data_file
1004 .with_guarded_label_values(&metrics_labels);
1005 let write_bytes = GLOBAL_SINK_METRICS
1006 .iceberg_write_bytes
1007 .with_guarded_label_values(&metrics_labels);
1008
1009 let schema = table.metadata().current_schema();
1010 let partition_spec = table.metadata().default_partition_spec();
1011
1012 let unique_uuid_suffix = Uuid::now_v7();
1014
1015 let parquet_writer_properties = WriterProperties::builder()
1016 .set_max_row_group_size(
1017 writer_param
1018 .streaming_config
1019 .developer
1020 .iceberg_sink_write_parquet_max_row_group_rows,
1021 )
1022 .build();
1023
1024 let parquet_writer_builder = ParquetWriterBuilder::new(
1025 parquet_writer_properties,
1026 schema.clone(),
1027 table.file_io().clone(),
1028 DefaultLocationGenerator::new(table.metadata().clone())
1029 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1030 DefaultFileNameGenerator::new(
1031 writer_param.actor_id.to_string(),
1032 Some(unique_uuid_suffix.to_string()),
1033 iceberg::spec::DataFileFormat::Parquet,
1034 ),
1035 );
1036 let data_file_builder =
1037 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
1038 if partition_spec.fields().is_empty() {
1039 let writer_builder = MonitoredGeneralWriterBuilder::new(
1040 data_file_builder,
1041 write_qps.clone(),
1042 write_latency.clone(),
1043 );
1044 let inner_writer = Some(Box::new(
1045 writer_builder
1046 .clone()
1047 .build()
1048 .await
1049 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1050 ) as Box<dyn IcebergWriter>);
1051 Ok(Self {
1052 arrow_schema: Arc::new(
1053 schema_to_arrow_schema(table.metadata().current_schema())
1054 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1055 ),
1056 metrics: IcebergWriterMetrics {
1057 _write_qps: write_qps,
1058 _write_latency: write_latency,
1059 write_bytes,
1060 },
1061 writer: IcebergWriterDispatch::NonPartitionAppendOnly {
1062 writer: inner_writer,
1063 writer_builder,
1064 },
1065 table,
1066 project_idx_vec: {
1067 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1068 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1069 } else {
1070 ProjectIdxVec::None
1071 }
1072 },
1073 })
1074 } else {
1075 let partition_builder = MonitoredGeneralWriterBuilder::new(
1076 FanoutPartitionWriterBuilder::new(
1077 data_file_builder,
1078 partition_spec.clone(),
1079 schema.clone(),
1080 )
1081 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1082 write_qps.clone(),
1083 write_latency.clone(),
1084 );
1085 let inner_writer = Some(Box::new(
1086 partition_builder
1087 .clone()
1088 .build()
1089 .await
1090 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1091 ) as Box<dyn IcebergWriter>);
1092 Ok(Self {
1093 arrow_schema: Arc::new(
1094 schema_to_arrow_schema(table.metadata().current_schema())
1095 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1096 ),
1097 metrics: IcebergWriterMetrics {
1098 _write_qps: write_qps,
1099 _write_latency: write_latency,
1100 write_bytes,
1101 },
1102 writer: IcebergWriterDispatch::PartitionAppendOnly {
1103 writer: inner_writer,
1104 writer_builder: partition_builder,
1105 },
1106 table,
1107 project_idx_vec: {
1108 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1109 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1110 } else {
1111 ProjectIdxVec::None
1112 }
1113 },
1114 })
1115 }
1116 }
1117
1118 pub async fn new_upsert(
1119 table: Table,
1120 unique_column_ids: Vec<usize>,
1121 writer_param: &SinkWriterParam,
1122 ) -> Result<Self> {
1123 let SinkWriterParam {
1124 extra_partition_col_idx,
1125 actor_id,
1126 sink_id,
1127 sink_name,
1128 ..
1129 } = writer_param;
1130 let metrics_labels = [
1131 &actor_id.to_string(),
1132 &sink_id.to_string(),
1133 sink_name.as_str(),
1134 ];
1135 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1136
1137 let write_qps = GLOBAL_SINK_METRICS
1139 .iceberg_write_qps
1140 .with_guarded_label_values(&metrics_labels);
1141 let write_latency = GLOBAL_SINK_METRICS
1142 .iceberg_write_latency
1143 .with_guarded_label_values(&metrics_labels);
1144 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1147 .iceberg_rolling_unflushed_data_file
1148 .with_guarded_label_values(&metrics_labels);
1149 let position_delete_cache_num = GLOBAL_SINK_METRICS
1150 .iceberg_position_delete_cache_num
1151 .with_guarded_label_values(&metrics_labels);
1152 let write_bytes = GLOBAL_SINK_METRICS
1153 .iceberg_write_bytes
1154 .with_guarded_label_values(&metrics_labels);
1155
1156 let schema = table.metadata().current_schema();
1158 let partition_spec = table.metadata().default_partition_spec();
1159
1160 let unique_uuid_suffix = Uuid::now_v7();
1162
1163 let parquet_writer_properties = WriterProperties::builder()
1164 .set_max_row_group_size(
1165 writer_param
1166 .streaming_config
1167 .developer
1168 .iceberg_sink_write_parquet_max_row_group_rows,
1169 )
1170 .build();
1171
1172 let data_file_builder = {
1173 let parquet_writer_builder = ParquetWriterBuilder::new(
1174 parquet_writer_properties.clone(),
1175 schema.clone(),
1176 table.file_io().clone(),
1177 DefaultLocationGenerator::new(table.metadata().clone())
1178 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1179 DefaultFileNameGenerator::new(
1180 writer_param.actor_id.to_string(),
1181 Some(unique_uuid_suffix.to_string()),
1182 iceberg::spec::DataFileFormat::Parquet,
1183 ),
1184 );
1185 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id())
1186 };
1187 let position_delete_builder = {
1188 let parquet_writer_builder = ParquetWriterBuilder::new(
1189 parquet_writer_properties.clone(),
1190 POSITION_DELETE_SCHEMA.clone(),
1191 table.file_io().clone(),
1192 DefaultLocationGenerator::new(table.metadata().clone())
1193 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1194 DefaultFileNameGenerator::new(
1195 writer_param.actor_id.to_string(),
1196 Some(format!("pos-del-{}", unique_uuid_suffix)),
1197 iceberg::spec::DataFileFormat::Parquet,
1198 ),
1199 );
1200 MonitoredPositionDeleteWriterBuilder::new(
1201 SortPositionDeleteWriterBuilder::new(
1202 parquet_writer_builder,
1203 writer_param
1204 .streaming_config
1205 .developer
1206 .iceberg_sink_positional_delete_cache_size,
1207 None,
1208 None,
1209 ),
1210 position_delete_cache_num,
1211 )
1212 };
1213 let equality_delete_builder = {
1214 let config = EqualityDeleteWriterConfig::new(
1215 unique_column_ids.clone(),
1216 table.metadata().current_schema().clone(),
1217 None,
1218 partition_spec.spec_id(),
1219 )
1220 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1221 let parquet_writer_builder = ParquetWriterBuilder::new(
1222 parquet_writer_properties.clone(),
1223 Arc::new(
1224 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1225 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1226 ),
1227 table.file_io().clone(),
1228 DefaultLocationGenerator::new(table.metadata().clone())
1229 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1230 DefaultFileNameGenerator::new(
1231 writer_param.actor_id.to_string(),
1232 Some(format!("eq-del-{}", unique_uuid_suffix)),
1233 iceberg::spec::DataFileFormat::Parquet,
1234 ),
1235 );
1236
1237 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder, config)
1238 };
1239 let delta_builder = EqualityDeltaWriterBuilder::new(
1240 data_file_builder,
1241 position_delete_builder,
1242 equality_delete_builder,
1243 unique_column_ids,
1244 );
1245 if partition_spec.fields().is_empty() {
1246 let writer_builder = MonitoredGeneralWriterBuilder::new(
1247 delta_builder,
1248 write_qps.clone(),
1249 write_latency.clone(),
1250 );
1251 let inner_writer = Some(Box::new(
1252 writer_builder
1253 .clone()
1254 .build()
1255 .await
1256 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1257 ) as Box<dyn IcebergWriter>);
1258 let original_arrow_schema = Arc::new(
1259 schema_to_arrow_schema(table.metadata().current_schema())
1260 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1261 );
1262 let schema_with_extra_op_column = {
1263 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1264 new_fields.push(Arc::new(ArrowField::new(
1265 "op".to_owned(),
1266 ArrowDataType::Int32,
1267 false,
1268 )));
1269 Arc::new(ArrowSchema::new(new_fields))
1270 };
1271 Ok(Self {
1272 arrow_schema: original_arrow_schema,
1273 metrics: IcebergWriterMetrics {
1274 _write_qps: write_qps,
1275 _write_latency: write_latency,
1276 write_bytes,
1277 },
1278 table,
1279 writer: IcebergWriterDispatch::NonpartitionUpsert {
1280 writer: inner_writer,
1281 writer_builder,
1282 arrow_schema_with_op_column: schema_with_extra_op_column,
1283 },
1284 project_idx_vec: {
1285 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1286 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1287 } else {
1288 ProjectIdxVec::None
1289 }
1290 },
1291 })
1292 } else {
1293 let original_arrow_schema = Arc::new(
1294 schema_to_arrow_schema(table.metadata().current_schema())
1295 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1296 );
1297 let schema_with_extra_op_column = {
1298 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1299 new_fields.push(Arc::new(ArrowField::new(
1300 "op".to_owned(),
1301 ArrowDataType::Int32,
1302 false,
1303 )));
1304 Arc::new(ArrowSchema::new(new_fields))
1305 };
1306 let partition_builder = MonitoredGeneralWriterBuilder::new(
1307 FanoutPartitionWriterBuilder::new_with_custom_schema(
1308 delta_builder,
1309 schema_with_extra_op_column.clone(),
1310 partition_spec.clone(),
1311 table.metadata().current_schema().clone(),
1312 ),
1313 write_qps.clone(),
1314 write_latency.clone(),
1315 );
1316 let inner_writer = Some(Box::new(
1317 partition_builder
1318 .clone()
1319 .build()
1320 .await
1321 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1322 ) as Box<dyn IcebergWriter>);
1323 Ok(Self {
1324 arrow_schema: original_arrow_schema,
1325 metrics: IcebergWriterMetrics {
1326 _write_qps: write_qps,
1327 _write_latency: write_latency,
1328 write_bytes,
1329 },
1330 table,
1331 writer: IcebergWriterDispatch::PartitionUpsert {
1332 writer: inner_writer,
1333 writer_builder: partition_builder,
1334 arrow_schema_with_op_column: schema_with_extra_op_column,
1335 },
1336 project_idx_vec: {
1337 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1338 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1339 } else {
1340 ProjectIdxVec::None
1341 }
1342 },
1343 })
1344 }
1345 }
1346}
1347
1348#[async_trait]
1349impl SinkWriter for IcebergSinkWriter {
1350 type CommitMetadata = Option<SinkMetadata>;
1351
1352 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1354 Ok(())
1356 }
1357
1358 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1360 match &mut self.writer {
1362 IcebergWriterDispatch::PartitionAppendOnly {
1363 writer,
1364 writer_builder,
1365 } => {
1366 if writer.is_none() {
1367 *writer = Some(Box::new(
1368 writer_builder
1369 .clone()
1370 .build()
1371 .await
1372 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1373 ));
1374 }
1375 }
1376 IcebergWriterDispatch::NonPartitionAppendOnly {
1377 writer,
1378 writer_builder,
1379 } => {
1380 if writer.is_none() {
1381 *writer = Some(Box::new(
1382 writer_builder
1383 .clone()
1384 .build()
1385 .await
1386 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1387 ));
1388 }
1389 }
1390 IcebergWriterDispatch::PartitionUpsert {
1391 writer,
1392 writer_builder,
1393 ..
1394 } => {
1395 if writer.is_none() {
1396 *writer = Some(Box::new(
1397 writer_builder
1398 .clone()
1399 .build()
1400 .await
1401 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1402 ));
1403 }
1404 }
1405 IcebergWriterDispatch::NonpartitionUpsert {
1406 writer,
1407 writer_builder,
1408 ..
1409 } => {
1410 if writer.is_none() {
1411 *writer = Some(Box::new(
1412 writer_builder
1413 .clone()
1414 .build()
1415 .await
1416 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1417 ));
1418 }
1419 }
1420 };
1421
1422 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1424 match &self.project_idx_vec {
1425 ProjectIdxVec::None => {}
1426 ProjectIdxVec::Prepare(idx) => {
1427 if *idx >= chunk.columns().len() {
1428 return Err(SinkError::Iceberg(anyhow!(
1429 "invalid extra partition column index {}",
1430 idx
1431 )));
1432 }
1433 let project_idx_vec = (0..*idx)
1434 .chain(*idx + 1..chunk.columns().len())
1435 .collect_vec();
1436 chunk = chunk.project(&project_idx_vec);
1437 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1438 }
1439 ProjectIdxVec::Done(idx_vec) => {
1440 chunk = chunk.project(idx_vec);
1441 }
1442 }
1443 if ops.is_empty() {
1444 return Ok(());
1445 }
1446 let write_batch_size = chunk.estimated_heap_size();
1447 let batch = match &self.writer {
1448 IcebergWriterDispatch::PartitionAppendOnly { .. }
1449 | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1450 let filters =
1452 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1453 chunk.set_visibility(filters);
1454 IcebergArrowConvert
1455 .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
1456 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1457 }
1458 IcebergWriterDispatch::PartitionUpsert {
1459 arrow_schema_with_op_column,
1460 ..
1461 }
1462 | IcebergWriterDispatch::NonpartitionUpsert {
1463 arrow_schema_with_op_column,
1464 ..
1465 } => {
1466 let chunk = IcebergArrowConvert
1467 .to_record_batch(self.arrow_schema.clone(), &chunk)
1468 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1469 let ops = Arc::new(Int32Array::from(
1470 ops.iter()
1471 .map(|op| match op {
1472 Op::UpdateInsert | Op::Insert => INSERT_OP,
1473 Op::UpdateDelete | Op::Delete => DELETE_OP,
1474 })
1475 .collect_vec(),
1476 ));
1477 let mut columns = chunk.columns().to_vec();
1478 columns.push(ops);
1479 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1480 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1481 }
1482 };
1483
1484 let writer = self.writer.get_writer().unwrap();
1485 writer
1486 .write(batch)
1487 .instrument_await("iceberg_write")
1488 .await
1489 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1490 self.metrics.write_bytes.inc_by(write_batch_size as _);
1491 Ok(())
1492 }
1493
1494 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1497 if !is_checkpoint {
1499 return Ok(None);
1500 }
1501
1502 let close_result = match &mut self.writer {
1503 IcebergWriterDispatch::PartitionAppendOnly {
1504 writer,
1505 writer_builder,
1506 } => {
1507 let close_result = match writer.take() {
1508 Some(mut writer) => {
1509 Some(writer.close().instrument_await("iceberg_close").await)
1510 }
1511 _ => None,
1512 };
1513 match writer_builder.clone().build().await {
1514 Ok(new_writer) => {
1515 *writer = Some(Box::new(new_writer));
1516 }
1517 _ => {
1518 warn!("Failed to build new writer after close");
1521 }
1522 }
1523 close_result
1524 }
1525 IcebergWriterDispatch::NonPartitionAppendOnly {
1526 writer,
1527 writer_builder,
1528 } => {
1529 let close_result = match writer.take() {
1530 Some(mut writer) => {
1531 Some(writer.close().instrument_await("iceberg_close").await)
1532 }
1533 _ => None,
1534 };
1535 match writer_builder.clone().build().await {
1536 Ok(new_writer) => {
1537 *writer = Some(Box::new(new_writer));
1538 }
1539 _ => {
1540 warn!("Failed to build new writer after close");
1543 }
1544 }
1545 close_result
1546 }
1547 IcebergWriterDispatch::PartitionUpsert {
1548 writer,
1549 writer_builder,
1550 ..
1551 } => {
1552 let close_result = match writer.take() {
1553 Some(mut writer) => {
1554 Some(writer.close().instrument_await("iceberg_close").await)
1555 }
1556 _ => None,
1557 };
1558 match writer_builder.clone().build().await {
1559 Ok(new_writer) => {
1560 *writer = Some(Box::new(new_writer));
1561 }
1562 _ => {
1563 warn!("Failed to build new writer after close");
1566 }
1567 }
1568 close_result
1569 }
1570 IcebergWriterDispatch::NonpartitionUpsert {
1571 writer,
1572 writer_builder,
1573 ..
1574 } => {
1575 let close_result = match writer.take() {
1576 Some(mut writer) => {
1577 Some(writer.close().instrument_await("iceberg_close").await)
1578 }
1579 _ => None,
1580 };
1581 match writer_builder.clone().build().await {
1582 Ok(new_writer) => {
1583 *writer = Some(Box::new(new_writer));
1584 }
1585 _ => {
1586 warn!("Failed to build new writer after close");
1589 }
1590 }
1591 close_result
1592 }
1593 };
1594
1595 match close_result {
1596 Some(Ok(result)) => {
1597 let version = self.table.metadata().format_version() as u8;
1598 let partition_type = self.table.metadata().default_partition_type();
1599 let data_files = result
1600 .into_iter()
1601 .map(|f| {
1602 SerializedDataFile::try_from(f, partition_type, version == 1)
1603 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1604 })
1605 .collect::<Result<Vec<_>>>()?;
1606 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1607 data_files,
1608 schema_id: self.table.metadata().current_schema_id(),
1609 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1610 })?))
1611 }
1612 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1613 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1614 }
1615 }
1616
1617 async fn abort(&mut self) -> Result<()> {
1619 Ok(())
1621 }
1622}
1623
1624const SCHEMA_ID: &str = "schema_id";
1625const PARTITION_SPEC_ID: &str = "partition_spec_id";
1626const DATA_FILES: &str = "data_files";
1627
1628#[derive(Default, Clone)]
1629struct IcebergCommitResult {
1630 schema_id: i32,
1631 partition_spec_id: i32,
1632 data_files: Vec<SerializedDataFile>,
1633}
1634
1635impl IcebergCommitResult {
1636 fn try_from(value: &SinkMetadata) -> Result<Self> {
1637 if let Some(Serialized(v)) = &value.metadata {
1638 let mut values = if let serde_json::Value::Object(v) =
1639 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1640 .context("Can't parse iceberg sink metadata")?
1641 {
1642 v
1643 } else {
1644 bail!("iceberg sink metadata should be an object");
1645 };
1646
1647 let schema_id;
1648 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1649 schema_id = value
1650 .as_u64()
1651 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1652 } else {
1653 bail!("iceberg sink metadata should have schema_id");
1654 }
1655
1656 let partition_spec_id;
1657 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1658 partition_spec_id = value
1659 .as_u64()
1660 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1661 } else {
1662 bail!("iceberg sink metadata should have partition_spec_id");
1663 }
1664
1665 let data_files: Vec<SerializedDataFile>;
1666 if let serde_json::Value::Array(values) = values
1667 .remove(DATA_FILES)
1668 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1669 {
1670 data_files = values
1671 .into_iter()
1672 .map(from_value::<SerializedDataFile>)
1673 .collect::<std::result::Result<_, _>>()
1674 .unwrap();
1675 } else {
1676 bail!("iceberg sink metadata should have data_files object");
1677 }
1678
1679 Ok(Self {
1680 schema_id: schema_id as i32,
1681 partition_spec_id: partition_spec_id as i32,
1682 data_files,
1683 })
1684 } else {
1685 bail!("Can't create iceberg sink write result from empty data!")
1686 }
1687 }
1688
1689 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1690 let mut values = if let serde_json::Value::Object(value) =
1691 serde_json::from_slice::<serde_json::Value>(&value)
1692 .context("Can't parse iceberg sink metadata")?
1693 {
1694 value
1695 } else {
1696 bail!("iceberg sink metadata should be an object");
1697 };
1698
1699 let schema_id;
1700 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1701 schema_id = value
1702 .as_u64()
1703 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1704 } else {
1705 bail!("iceberg sink metadata should have schema_id");
1706 }
1707
1708 let partition_spec_id;
1709 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1710 partition_spec_id = value
1711 .as_u64()
1712 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1713 } else {
1714 bail!("iceberg sink metadata should have partition_spec_id");
1715 }
1716
1717 let data_files: Vec<SerializedDataFile>;
1718 if let serde_json::Value::Array(values) = values
1719 .remove(DATA_FILES)
1720 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1721 {
1722 data_files = values
1723 .into_iter()
1724 .map(from_value::<SerializedDataFile>)
1725 .collect::<std::result::Result<_, _>>()
1726 .unwrap();
1727 } else {
1728 bail!("iceberg sink metadata should have data_files object");
1729 }
1730
1731 Ok(Self {
1732 schema_id: schema_id as i32,
1733 partition_spec_id: partition_spec_id as i32,
1734 data_files,
1735 })
1736 }
1737}
1738
1739impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1740 type Error = SinkError;
1741
1742 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1743 let json_data_files = serde_json::Value::Array(
1744 value
1745 .data_files
1746 .iter()
1747 .map(serde_json::to_value)
1748 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1749 .context("Can't serialize data files to json")?,
1750 );
1751 let json_value = serde_json::Value::Object(
1752 vec![
1753 (
1754 SCHEMA_ID.to_owned(),
1755 serde_json::Value::Number(value.schema_id.into()),
1756 ),
1757 (
1758 PARTITION_SPEC_ID.to_owned(),
1759 serde_json::Value::Number(value.partition_spec_id.into()),
1760 ),
1761 (DATA_FILES.to_owned(), json_data_files),
1762 ]
1763 .into_iter()
1764 .collect(),
1765 );
1766 Ok(SinkMetadata {
1767 metadata: Some(Serialized(SerializedMetadata {
1768 metadata: serde_json::to_vec(&json_value)
1769 .context("Can't serialize iceberg sink metadata")?,
1770 })),
1771 })
1772 }
1773}
1774
1775impl TryFrom<IcebergCommitResult> for Vec<u8> {
1776 type Error = SinkError;
1777
1778 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1779 let json_data_files = serde_json::Value::Array(
1780 value
1781 .data_files
1782 .iter()
1783 .map(serde_json::to_value)
1784 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1785 .context("Can't serialize data files to json")?,
1786 );
1787 let json_value = serde_json::Value::Object(
1788 vec![
1789 (
1790 SCHEMA_ID.to_owned(),
1791 serde_json::Value::Number(value.schema_id.into()),
1792 ),
1793 (
1794 PARTITION_SPEC_ID.to_owned(),
1795 serde_json::Value::Number(value.partition_spec_id.into()),
1796 ),
1797 (DATA_FILES.to_owned(), json_data_files),
1798 ]
1799 .into_iter()
1800 .collect(),
1801 );
1802 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1803 }
1804}
1805pub struct IcebergSinkCommitter {
1806 catalog: Arc<dyn Catalog>,
1807 table: Table,
1808 pub last_commit_epoch: u64,
1809 pub(crate) is_exactly_once: bool,
1810 pub(crate) sink_id: SinkId,
1811 pub(crate) config: IcebergConfig,
1812 pub(crate) param: SinkParam,
1813 pub(crate) db: DatabaseConnection,
1814 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1815 commit_retry_num: u32,
1816 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1817}
1818
1819impl IcebergSinkCommitter {
1820 async fn reload_table(
1823 catalog: &dyn Catalog,
1824 table_ident: &TableIdent,
1825 schema_id: i32,
1826 partition_spec_id: i32,
1827 ) -> Result<Table> {
1828 let table = catalog
1829 .load_table(table_ident)
1830 .await
1831 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1832 if table.metadata().current_schema_id() != schema_id {
1833 return Err(SinkError::Iceberg(anyhow!(
1834 "Schema evolution not supported, expect schema id {}, but got {}",
1835 schema_id,
1836 table.metadata().current_schema_id()
1837 )));
1838 }
1839 if table.metadata().default_partition_spec_id() != partition_spec_id {
1840 return Err(SinkError::Iceberg(anyhow!(
1841 "Partition evolution not supported, expect partition spec id {}, but got {}",
1842 partition_spec_id,
1843 table.metadata().default_partition_spec_id()
1844 )));
1845 }
1846 Ok(table)
1847 }
1848}
1849
1850#[async_trait::async_trait]
1851impl SinkCommitCoordinator for IcebergSinkCommitter {
1852 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1853 if self.is_exactly_once {
1854 self.committed_epoch_subscriber = Some(subscriber);
1855 tracing::info!(
1856 "Sink id = {}: iceberg sink coordinator initing.",
1857 self.param.sink_id
1858 );
1859 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id).await? {
1860 let ordered_metadata_list_by_end_epoch =
1861 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id).await?;
1862
1863 let mut last_recommit_epoch = 0;
1864 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1865 ordered_metadata_list_by_end_epoch
1866 {
1867 let write_results_bytes = deserialize_metadata(sealized_bytes);
1868 let mut write_results = vec![];
1869
1870 for each in write_results_bytes {
1871 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1872 write_results.push(write_result);
1873 }
1874
1875 match (
1876 committed,
1877 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1878 .await?,
1879 ) {
1880 (true, _) => {
1881 tracing::info!(
1882 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1883 self.param.sink_id
1884 );
1885 }
1886 (false, true) => {
1887 tracing::info!(
1889 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1890 self.param.sink_id
1891 );
1892 mark_row_is_committed_by_sink_id_and_end_epoch(
1893 &self.db,
1894 self.sink_id,
1895 end_epoch,
1896 )
1897 .await?;
1898 }
1899 (false, false) => {
1900 tracing::info!(
1901 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1902 self.param.sink_id
1903 );
1904 self.re_commit(end_epoch, write_results, snapshot_id)
1905 .await?;
1906 }
1907 }
1908
1909 last_recommit_epoch = end_epoch;
1910 }
1911 tracing::info!(
1912 "Sink id = {}: iceberg commit coordinator inited.",
1913 self.param.sink_id
1914 );
1915 return Ok(Some(last_recommit_epoch));
1916 } else {
1917 tracing::info!(
1918 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1919 self.param.sink_id
1920 );
1921 return Ok(None);
1922 }
1923 }
1924
1925 tracing::info!("Iceberg commit coordinator inited.");
1926 return Ok(None);
1927 }
1928
1929 async fn commit(
1930 &mut self,
1931 epoch: u64,
1932 metadata: Vec<SinkMetadata>,
1933 add_columns: Option<Vec<Field>>,
1934 ) -> Result<()> {
1935 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1936 if let Some(add_columns) = add_columns {
1937 return Err(anyhow!(
1938 "Iceberg sink not support add columns, but got: {:?}",
1939 add_columns
1940 )
1941 .into());
1942 }
1943
1944 let write_results: Vec<IcebergCommitResult> = metadata
1945 .iter()
1946 .map(IcebergCommitResult::try_from)
1947 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1948
1949 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1951 tracing::debug!(?epoch, "no data to commit");
1952 return Ok(());
1953 }
1954
1955 if write_results
1957 .iter()
1958 .any(|r| r.schema_id != write_results[0].schema_id)
1959 || write_results
1960 .iter()
1961 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1962 {
1963 return Err(SinkError::Iceberg(anyhow!(
1964 "schema_id and partition_spec_id should be the same in all write results"
1965 )));
1966 }
1967
1968 self.wait_for_snapshot_limit().await?;
1970
1971 if self.is_exactly_once {
1972 assert!(self.committed_epoch_subscriber.is_some());
1973 match self.committed_epoch_subscriber.clone() {
1974 Some(committed_epoch_subscriber) => {
1975 let (committed_epoch, mut rw_futures_utilrx) =
1977 committed_epoch_subscriber(self.param.sink_id).await?;
1978 if committed_epoch >= epoch {
1980 self.commit_iceberg_inner(epoch, write_results, None)
1981 .await?;
1982 } else {
1983 tracing::info!(
1984 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1985 committed_epoch,
1986 epoch
1987 );
1988 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1989 tracing::info!(
1990 "Received next committed epoch: {}",
1991 next_committed_epoch
1992 );
1993 if next_committed_epoch >= epoch {
1995 self.commit_iceberg_inner(epoch, write_results, None)
1996 .await?;
1997 break;
1998 }
1999 }
2000 }
2001 }
2002 None => unreachable!(
2003 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
2004 ),
2005 }
2006 } else {
2007 self.commit_iceberg_inner(epoch, write_results, None)
2008 .await?;
2009 }
2010
2011 Ok(())
2012 }
2013}
2014
2015impl IcebergSinkCommitter {
2017 async fn re_commit(
2018 &mut self,
2019 epoch: u64,
2020 write_results: Vec<IcebergCommitResult>,
2021 snapshot_id: i64,
2022 ) -> Result<()> {
2023 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
2024
2025 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2027 tracing::debug!(?epoch, "no data to commit");
2028 return Ok(());
2029 }
2030 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
2031 .await?;
2032 Ok(())
2033 }
2034
2035 async fn commit_iceberg_inner(
2036 &mut self,
2037 epoch: u64,
2038 write_results: Vec<IcebergCommitResult>,
2039 snapshot_id: Option<i64>,
2040 ) -> Result<()> {
2041 let is_first_commit = snapshot_id.is_none();
2045 self.last_commit_epoch = epoch;
2046 let expect_schema_id = write_results[0].schema_id;
2047 let expect_partition_spec_id = write_results[0].partition_spec_id;
2048
2049 self.table = Self::reload_table(
2051 self.catalog.as_ref(),
2052 self.table.identifier(),
2053 expect_schema_id,
2054 expect_partition_spec_id,
2055 )
2056 .await?;
2057 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2058 return Err(SinkError::Iceberg(anyhow!(
2059 "Can't find schema by id {}",
2060 expect_schema_id
2061 )));
2062 };
2063 let Some(partition_spec) = self
2064 .table
2065 .metadata()
2066 .partition_spec_by_id(expect_partition_spec_id)
2067 else {
2068 return Err(SinkError::Iceberg(anyhow!(
2069 "Can't find partition spec by id {}",
2070 expect_partition_spec_id
2071 )));
2072 };
2073 let partition_type = partition_spec
2074 .as_ref()
2075 .clone()
2076 .partition_type(schema)
2077 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2078
2079 let txn = Transaction::new(&self.table);
2080 let snapshot_id = match snapshot_id {
2082 Some(previous_snapshot_id) => previous_snapshot_id,
2083 None => txn.generate_unique_snapshot_id(),
2084 };
2085 if self.is_exactly_once && is_first_commit {
2086 let mut pre_commit_metadata_bytes = Vec::new();
2088 for each_parallelism_write_result in write_results.clone() {
2089 let each_parallelism_write_result_bytes: Vec<u8> =
2090 each_parallelism_write_result.try_into()?;
2091 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
2092 }
2093
2094 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
2095
2096 persist_pre_commit_metadata(
2097 self.sink_id,
2098 self.db.clone(),
2099 self.last_commit_epoch,
2100 epoch,
2101 pre_commit_metadata_bytes,
2102 snapshot_id,
2103 )
2104 .await?;
2105 }
2106
2107 let data_files = write_results
2108 .into_iter()
2109 .flat_map(|r| {
2110 r.data_files.into_iter().map(|f| {
2111 f.try_into(expect_partition_spec_id, &partition_type, schema)
2112 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2113 })
2114 })
2115 .collect::<Result<Vec<DataFile>>>()?;
2116 let retry_strategy = ExponentialBackoff::from_millis(10)
2121 .max_delay(Duration::from_secs(60))
2122 .map(jitter)
2123 .take(self.commit_retry_num as usize);
2124 let catalog = self.catalog.clone();
2125 let table_ident = self.table.identifier().clone();
2126 let table = Retry::spawn(retry_strategy, || async {
2127 let table = Self::reload_table(
2128 catalog.as_ref(),
2129 &table_ident,
2130 expect_schema_id,
2131 expect_partition_spec_id,
2132 )
2133 .await?;
2134 let txn = Transaction::new(&table);
2135 let mut append_action = txn
2136 .fast_append(Some(snapshot_id), None, vec![])
2137 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
2138 .with_to_branch(commit_branch(
2139 self.config.r#type.as_str(),
2140 self.config.write_mode.as_str(),
2141 ));
2142 append_action
2143 .add_data_files(data_files.clone())
2144 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2145 let tx = append_action.apply().await.map_err(|err| {
2146 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2147 SinkError::Iceberg(anyhow!(err))
2148 })?;
2149 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2150 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2151 SinkError::Iceberg(anyhow!(err))
2152 })
2153 })
2154 .await?;
2155 self.table = table;
2156
2157 let snapshot_num = self.table.metadata().snapshots().count();
2158 let catalog_name = self.config.common.catalog_name();
2159 let table_name = self.table.identifier().to_string();
2160 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2161 GLOBAL_SINK_METRICS
2162 .iceberg_snapshot_num
2163 .with_guarded_label_values(&metrics_labels)
2164 .set(snapshot_num as i64);
2165
2166 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2167
2168 if self.is_exactly_once {
2169 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2170 tracing::info!(
2171 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
2172 self.sink_id,
2173 epoch
2174 );
2175
2176 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2177 }
2178
2179 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2180 && self.config.enable_compaction
2181 && iceberg_compact_stat_sender
2182 .send(IcebergSinkCompactionUpdate {
2183 sink_id: self.sink_id,
2184 compaction_interval: self.config.compaction_interval_sec(),
2185 force_compaction: false,
2186 })
2187 .is_err()
2188 {
2189 warn!("failed to send iceberg compaction stats");
2190 }
2191
2192 Ok(())
2193 }
2194
2195 async fn is_snapshot_id_in_iceberg(
2199 &self,
2200 iceberg_config: &IcebergConfig,
2201 snapshot_id: i64,
2202 ) -> Result<bool> {
2203 let table = iceberg_config.load_table().await?;
2204 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2205 Ok(true)
2206 } else {
2207 Ok(false)
2208 }
2209 }
2210
2211 fn count_snapshots_since_rewrite(&self) -> usize {
2214 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2215 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2216
2217 let mut count = 0;
2219 for snapshot in snapshots {
2220 let summary = snapshot.summary();
2222 match &summary.operation {
2223 Operation::Replace => {
2224 break;
2226 }
2227
2228 _ => {
2229 count += 1;
2231 }
2232 }
2233 }
2234
2235 count
2236 }
2237
2238 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2240 if !self.config.enable_compaction {
2241 return Ok(());
2242 }
2243
2244 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2245 loop {
2246 let current_count = self.count_snapshots_since_rewrite();
2247
2248 if current_count < max_snapshots {
2249 tracing::info!(
2250 "Snapshot count check passed: {} < {}",
2251 current_count,
2252 max_snapshots
2253 );
2254 break;
2255 }
2256
2257 tracing::info!(
2258 "Snapshot count {} exceeds limit {}, waiting...",
2259 current_count,
2260 max_snapshots
2261 );
2262
2263 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2264 && iceberg_compact_stat_sender
2265 .send(IcebergSinkCompactionUpdate {
2266 sink_id: self.sink_id,
2267 compaction_interval: self.config.compaction_interval_sec(),
2268 force_compaction: true,
2269 })
2270 .is_err()
2271 {
2272 tracing::warn!("failed to send iceberg compaction stats");
2273 }
2274
2275 tokio::time::sleep(Duration::from_secs(30)).await;
2277
2278 self.table = self.config.load_table().await?;
2280 }
2281 }
2282 Ok(())
2283 }
2284}
2285
2286const MAP_KEY: &str = "key";
2287const MAP_VALUE: &str = "value";
2288
2289fn get_fields<'a>(
2290 our_field_type: &'a risingwave_common::types::DataType,
2291 data_type: &ArrowDataType,
2292 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2293) -> Option<ArrowFields> {
2294 match data_type {
2295 ArrowDataType::Struct(fields) => {
2296 match our_field_type {
2297 risingwave_common::types::DataType::Struct(struct_fields) => {
2298 struct_fields.iter().for_each(|(name, data_type)| {
2299 let res = schema_fields.insert(name, data_type);
2300 assert!(res.is_none())
2302 });
2303 }
2304 risingwave_common::types::DataType::Map(map_fields) => {
2305 schema_fields.insert(MAP_KEY, map_fields.key());
2306 schema_fields.insert(MAP_VALUE, map_fields.value());
2307 }
2308 risingwave_common::types::DataType::List(list) => {
2309 list.elem()
2310 .as_struct()
2311 .iter()
2312 .for_each(|(name, data_type)| {
2313 let res = schema_fields.insert(name, data_type);
2314 assert!(res.is_none())
2316 });
2317 }
2318 _ => {}
2319 };
2320 Some(fields.clone())
2321 }
2322 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2323 get_fields(our_field_type, field.data_type(), schema_fields)
2324 }
2325 _ => None, }
2327}
2328
2329fn check_compatibility(
2330 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2331 fields: &ArrowFields,
2332) -> anyhow::Result<bool> {
2333 for arrow_field in fields {
2334 let our_field_type = schema_fields
2335 .get(arrow_field.name().as_str())
2336 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2337
2338 let converted_arrow_data_type = IcebergArrowConvert
2340 .to_arrow_field("", our_field_type)
2341 .map_err(|e| anyhow!(e))?
2342 .data_type()
2343 .clone();
2344
2345 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2346 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2347 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2348 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2349 (ArrowDataType::List(_), ArrowDataType::List(field))
2350 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2351 let mut schema_fields = HashMap::new();
2352 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2353 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2354 }
2355 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2357 let mut schema_fields = HashMap::new();
2358 our_field_type
2359 .as_struct()
2360 .iter()
2361 .for_each(|(name, data_type)| {
2362 let res = schema_fields.insert(name, data_type);
2363 assert!(res.is_none())
2365 });
2366 check_compatibility(schema_fields, fields)?
2367 }
2368 (left, right) => left.equals_datatype(right),
2376 };
2377 if !compatible {
2378 bail!(
2379 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2380 arrow_field.name(),
2381 converted_arrow_data_type,
2382 arrow_field.data_type()
2383 );
2384 }
2385 }
2386 Ok(true)
2387}
2388
2389pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2391 if rw_schema.fields.len() != arrow_schema.fields().len() {
2392 bail!(
2393 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2394 rw_schema.fields.len(),
2395 arrow_schema.fields.len()
2396 );
2397 }
2398
2399 let mut schema_fields = HashMap::new();
2400 rw_schema.fields.iter().for_each(|field| {
2401 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2402 assert!(res.is_none())
2404 });
2405
2406 check_compatibility(schema_fields, &arrow_schema.fields)?;
2407 Ok(())
2408}
2409
2410pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2411 serde_json::to_vec(&metadata).unwrap()
2412}
2413
2414pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2415 serde_json::from_slice(&bytes).unwrap()
2416}
2417
2418pub fn parse_partition_by_exprs(
2419 expr: String,
2420) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2421 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2423 if !re.is_match(&expr) {
2424 bail!(format!(
2425 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2426 expr
2427 ))
2428 }
2429 let caps = re.captures_iter(&expr);
2430
2431 let mut partition_columns = vec![];
2432
2433 for mat in caps {
2434 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2435 (&mat["transform"], Transform::Identity)
2436 } else {
2437 let mut func = mat["transform"].to_owned();
2438 if func == "bucket" || func == "truncate" {
2439 let n = &mat
2440 .name("n")
2441 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2442 .as_str();
2443 func = format!("{func}[{n}]");
2444 }
2445 (
2446 &mat["field"],
2447 Transform::from_str(&func)
2448 .with_context(|| format!("invalid transform function {}", func))?,
2449 )
2450 };
2451 partition_columns.push((column.to_owned(), transform));
2452 }
2453 Ok(partition_columns)
2454}
2455
2456pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2457 if should_enable_iceberg_cow(sink_type, write_mode) {
2458 ICEBERG_COW_BRANCH.to_owned()
2459 } else {
2460 MAIN_BRANCH.to_owned()
2461 }
2462}
2463
2464pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2465 sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2466}
2467
2468#[cfg(test)]
2469mod test {
2470 use std::collections::BTreeMap;
2471
2472 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2473 use risingwave_common::types::{DataType, MapType, StructType};
2474
2475 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2476 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2477 use crate::sink::iceberg::{
2478 COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2479 ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, MAX_SNAPSHOTS_NUM,
2480 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2481 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2482 };
2483
2484 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2487 fn test_compatible_arrow_schema() {
2488 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2489
2490 use super::*;
2491 let risingwave_schema = Schema::new(vec![
2492 Field::with_name(DataType::Int32, "a"),
2493 Field::with_name(DataType::Int32, "b"),
2494 Field::with_name(DataType::Int32, "c"),
2495 ]);
2496 let arrow_schema = ArrowSchema::new(vec![
2497 ArrowField::new("a", ArrowDataType::Int32, false),
2498 ArrowField::new("b", ArrowDataType::Int32, false),
2499 ArrowField::new("c", ArrowDataType::Int32, false),
2500 ]);
2501
2502 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2503
2504 let risingwave_schema = Schema::new(vec![
2505 Field::with_name(DataType::Int32, "d"),
2506 Field::with_name(DataType::Int32, "c"),
2507 Field::with_name(DataType::Int32, "a"),
2508 Field::with_name(DataType::Int32, "b"),
2509 ]);
2510 let arrow_schema = ArrowSchema::new(vec![
2511 ArrowField::new("a", ArrowDataType::Int32, false),
2512 ArrowField::new("b", ArrowDataType::Int32, false),
2513 ArrowField::new("d", ArrowDataType::Int32, false),
2514 ArrowField::new("c", ArrowDataType::Int32, false),
2515 ]);
2516 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2517
2518 let risingwave_schema = Schema::new(vec![
2519 Field::with_name(
2520 DataType::Struct(StructType::new(vec![
2521 ("a1", DataType::Int32),
2522 (
2523 "a2",
2524 DataType::Struct(StructType::new(vec![
2525 ("a21", DataType::Bytea),
2526 (
2527 "a22",
2528 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2529 ),
2530 ])),
2531 ),
2532 ])),
2533 "a",
2534 ),
2535 Field::with_name(
2536 DataType::list(DataType::Struct(StructType::new(vec![
2537 ("b1", DataType::Int32),
2538 ("b2", DataType::Bytea),
2539 (
2540 "b3",
2541 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2542 ),
2543 ]))),
2544 "b",
2545 ),
2546 Field::with_name(
2547 DataType::Map(MapType::from_kv(
2548 DataType::Varchar,
2549 DataType::list(DataType::Struct(StructType::new([
2550 ("c1", DataType::Int32),
2551 ("c2", DataType::Bytea),
2552 (
2553 "c3",
2554 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2555 ),
2556 ]))),
2557 )),
2558 "c",
2559 ),
2560 ]);
2561 let arrow_schema = ArrowSchema::new(vec![
2562 ArrowField::new(
2563 "a",
2564 ArrowDataType::Struct(ArrowFields::from(vec![
2565 ArrowField::new("a1", ArrowDataType::Int32, false),
2566 ArrowField::new(
2567 "a2",
2568 ArrowDataType::Struct(ArrowFields::from(vec![
2569 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2570 ArrowField::new_map(
2571 "a22",
2572 "entries",
2573 ArrowFieldRef::new(ArrowField::new(
2574 "key",
2575 ArrowDataType::Utf8,
2576 false,
2577 )),
2578 ArrowFieldRef::new(ArrowField::new(
2579 "value",
2580 ArrowDataType::Utf8,
2581 false,
2582 )),
2583 false,
2584 false,
2585 ),
2586 ])),
2587 false,
2588 ),
2589 ])),
2590 false,
2591 ),
2592 ArrowField::new(
2593 "b",
2594 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2595 ArrowDataType::Struct(ArrowFields::from(vec![
2596 ArrowField::new("b1", ArrowDataType::Int32, false),
2597 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2598 ArrowField::new_map(
2599 "b3",
2600 "entries",
2601 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2602 ArrowFieldRef::new(ArrowField::new(
2603 "value",
2604 ArrowDataType::Utf8,
2605 false,
2606 )),
2607 false,
2608 false,
2609 ),
2610 ])),
2611 false,
2612 ))),
2613 false,
2614 ),
2615 ArrowField::new_map(
2616 "c",
2617 "entries",
2618 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2619 ArrowFieldRef::new(ArrowField::new(
2620 "value",
2621 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2622 ArrowDataType::Struct(ArrowFields::from(vec![
2623 ArrowField::new("c1", ArrowDataType::Int32, false),
2624 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2625 ArrowField::new_map(
2626 "c3",
2627 "entries",
2628 ArrowFieldRef::new(ArrowField::new(
2629 "key",
2630 ArrowDataType::Utf8,
2631 false,
2632 )),
2633 ArrowFieldRef::new(ArrowField::new(
2634 "value",
2635 ArrowDataType::Utf8,
2636 false,
2637 )),
2638 false,
2639 false,
2640 ),
2641 ])),
2642 false,
2643 ))),
2644 false,
2645 )),
2646 false,
2647 false,
2648 ),
2649 ]);
2650 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2651 }
2652
2653 #[test]
2654 fn test_parse_iceberg_config() {
2655 let values = [
2656 ("connector", "iceberg"),
2657 ("type", "upsert"),
2658 ("primary_key", "v1"),
2659 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2660 ("warehouse.path", "s3://iceberg"),
2661 ("s3.endpoint", "http://127.0.0.1:9301"),
2662 ("s3.access.key", "hummockadmin"),
2663 ("s3.secret.key", "hummockadmin"),
2664 ("s3.path.style.access", "true"),
2665 ("s3.region", "us-east-1"),
2666 ("catalog.type", "jdbc"),
2667 ("catalog.name", "demo"),
2668 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2669 ("catalog.jdbc.user", "admin"),
2670 ("catalog.jdbc.password", "123456"),
2671 ("database.name", "demo_db"),
2672 ("table.name", "demo_table"),
2673 ("enable_compaction", "true"),
2674 ("compaction_interval_sec", "1800"),
2675 ("enable_snapshot_expiration", "true"),
2676 ]
2677 .into_iter()
2678 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2679 .collect();
2680
2681 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2682
2683 let expected_iceberg_config = IcebergConfig {
2684 common: IcebergCommon {
2685 warehouse_path: Some("s3://iceberg".to_owned()),
2686 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2687 region: Some("us-east-1".to_owned()),
2688 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2689 access_key: Some("hummockadmin".to_owned()),
2690 secret_key: Some("hummockadmin".to_owned()),
2691 gcs_credential: None,
2692 catalog_type: Some("jdbc".to_owned()),
2693 glue_id: None,
2694 catalog_name: Some("demo".to_owned()),
2695 path_style_access: Some(true),
2696 credential: None,
2697 oauth2_server_uri: None,
2698 scope: None,
2699 token: None,
2700 enable_config_load: None,
2701 rest_signing_name: None,
2702 rest_signing_region: None,
2703 rest_sigv4_enabled: None,
2704 hosted_catalog: None,
2705 azblob_account_name: None,
2706 azblob_account_key: None,
2707 azblob_endpoint_url: None,
2708 header: None,
2709 adlsgen2_account_name: None,
2710 adlsgen2_account_key: None,
2711 adlsgen2_endpoint: None,
2712 vended_credentials: None,
2713 },
2714 table: IcebergTableIdentifier {
2715 database_name: Some("demo_db".to_owned()),
2716 table_name: "demo_table".to_owned(),
2717 },
2718 r#type: "upsert".to_owned(),
2719 force_append_only: false,
2720 primary_key: Some(vec!["v1".to_owned()]),
2721 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2722 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2723 .into_iter()
2724 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2725 .collect(),
2726 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2727 create_table_if_not_exists: false,
2728 is_exactly_once: Some(true),
2729 commit_retry_num: 8,
2730 enable_compaction: true,
2731 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2732 enable_snapshot_expiration: true,
2733 write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2734 snapshot_expiration_max_age_millis: None,
2735 snapshot_expiration_retain_last: None,
2736 snapshot_expiration_clear_expired_files: true,
2737 snapshot_expiration_clear_expired_meta_data: true,
2738 max_snapshots_num_before_compaction: None,
2739 small_files_threshold_mb: None,
2740 delete_files_count_threshold: None,
2741 trigger_snapshot_count: None,
2742 target_file_size_mb: None,
2743 compaction_type: None,
2744 };
2745
2746 assert_eq!(iceberg_config, expected_iceberg_config);
2747
2748 assert_eq!(
2749 &iceberg_config.full_table_name().unwrap().to_string(),
2750 "demo_db.demo_table"
2751 );
2752 }
2753
2754 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2755 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2756
2757 let _table = iceberg_config.load_table().await.unwrap();
2758 }
2759
2760 #[tokio::test]
2761 #[ignore]
2762 async fn test_storage_catalog() {
2763 let values = [
2764 ("connector", "iceberg"),
2765 ("type", "append-only"),
2766 ("force_append_only", "true"),
2767 ("s3.endpoint", "http://127.0.0.1:9301"),
2768 ("s3.access.key", "hummockadmin"),
2769 ("s3.secret.key", "hummockadmin"),
2770 ("s3.region", "us-east-1"),
2771 ("s3.path.style.access", "true"),
2772 ("catalog.name", "demo"),
2773 ("catalog.type", "storage"),
2774 ("warehouse.path", "s3://icebergdata/demo"),
2775 ("database.name", "s1"),
2776 ("table.name", "t1"),
2777 ]
2778 .into_iter()
2779 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2780 .collect();
2781
2782 test_create_catalog(values).await;
2783 }
2784
2785 #[tokio::test]
2786 #[ignore]
2787 async fn test_rest_catalog() {
2788 let values = [
2789 ("connector", "iceberg"),
2790 ("type", "append-only"),
2791 ("force_append_only", "true"),
2792 ("s3.endpoint", "http://127.0.0.1:9301"),
2793 ("s3.access.key", "hummockadmin"),
2794 ("s3.secret.key", "hummockadmin"),
2795 ("s3.region", "us-east-1"),
2796 ("s3.path.style.access", "true"),
2797 ("catalog.name", "demo"),
2798 ("catalog.type", "rest"),
2799 ("catalog.uri", "http://192.168.167.4:8181"),
2800 ("warehouse.path", "s3://icebergdata/demo"),
2801 ("database.name", "s1"),
2802 ("table.name", "t1"),
2803 ]
2804 .into_iter()
2805 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2806 .collect();
2807
2808 test_create_catalog(values).await;
2809 }
2810
2811 #[tokio::test]
2812 #[ignore]
2813 async fn test_jdbc_catalog() {
2814 let values = [
2815 ("connector", "iceberg"),
2816 ("type", "append-only"),
2817 ("force_append_only", "true"),
2818 ("s3.endpoint", "http://127.0.0.1:9301"),
2819 ("s3.access.key", "hummockadmin"),
2820 ("s3.secret.key", "hummockadmin"),
2821 ("s3.region", "us-east-1"),
2822 ("s3.path.style.access", "true"),
2823 ("catalog.name", "demo"),
2824 ("catalog.type", "jdbc"),
2825 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2826 ("catalog.jdbc.user", "admin"),
2827 ("catalog.jdbc.password", "123456"),
2828 ("warehouse.path", "s3://icebergdata/demo"),
2829 ("database.name", "s1"),
2830 ("table.name", "t1"),
2831 ]
2832 .into_iter()
2833 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2834 .collect();
2835
2836 test_create_catalog(values).await;
2837 }
2838
2839 #[tokio::test]
2840 #[ignore]
2841 async fn test_hive_catalog() {
2842 let values = [
2843 ("connector", "iceberg"),
2844 ("type", "append-only"),
2845 ("force_append_only", "true"),
2846 ("s3.endpoint", "http://127.0.0.1:9301"),
2847 ("s3.access.key", "hummockadmin"),
2848 ("s3.secret.key", "hummockadmin"),
2849 ("s3.region", "us-east-1"),
2850 ("s3.path.style.access", "true"),
2851 ("catalog.name", "demo"),
2852 ("catalog.type", "hive"),
2853 ("catalog.uri", "thrift://localhost:9083"),
2854 ("warehouse.path", "s3://icebergdata/demo"),
2855 ("database.name", "s1"),
2856 ("table.name", "t1"),
2857 ]
2858 .into_iter()
2859 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2860 .collect();
2861
2862 test_create_catalog(values).await;
2863 }
2864
2865 #[test]
2866 fn test_config_constants_consistency() {
2867 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2870 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2871 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2872 assert_eq!(WRITE_MODE, "write_mode");
2873 assert_eq!(
2874 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2875 "snapshot_expiration_retain_last"
2876 );
2877 assert_eq!(
2878 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2879 "snapshot_expiration_max_age_millis"
2880 );
2881 assert_eq!(
2882 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2883 "snapshot_expiration_clear_expired_files"
2884 );
2885 assert_eq!(
2886 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2887 "snapshot_expiration_clear_expired_meta_data"
2888 );
2889 assert_eq!(MAX_SNAPSHOTS_NUM, "max_snapshots_num_before_compaction");
2890 }
2891
2892 #[test]
2893 fn test_parse_iceberg_compaction_config() {
2894 let values = [
2896 ("connector", "iceberg"),
2897 ("type", "upsert"),
2898 ("primary_key", "id"),
2899 ("warehouse.path", "s3://iceberg"),
2900 ("s3.endpoint", "http://127.0.0.1:9301"),
2901 ("s3.access.key", "test"),
2902 ("s3.secret.key", "test"),
2903 ("s3.region", "us-east-1"),
2904 ("catalog.type", "storage"),
2905 ("catalog.name", "demo"),
2906 ("database.name", "test_db"),
2907 ("table.name", "test_table"),
2908 ("enable_compaction", "true"),
2909 ("compaction.max_snapshots_num", "100"),
2910 ("compaction.small_files_threshold_mb", "512"),
2911 ("compaction.delete_files_count_threshold", "50"),
2912 ("compaction.trigger_snapshot_count", "10"),
2913 ("compaction.target_file_size_mb", "256"),
2914 ("compaction.type", "full"),
2915 ]
2916 .into_iter()
2917 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2918 .collect();
2919
2920 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2921
2922 assert!(iceberg_config.enable_compaction);
2924 assert_eq!(
2925 iceberg_config.max_snapshots_num_before_compaction,
2926 Some(100)
2927 );
2928 assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2929 assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2930 assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2931 assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2932 assert_eq!(iceberg_config.compaction_type, Some("full".to_owned()));
2933 }
2934}