risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30    UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43    DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59    Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103// Configuration constants
104pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112    "snapshot_expiration_clear_expired_meta_data";
113pub const COMPACTION_MAX_SNAPSHOTS_NUM: &str = "compaction.max_snapshots_num";
114
115pub const COMPACTION_SMALL_FILES_THRESHOLD_MB: &str = "compaction.small_files_threshold_mb";
116
117pub const COMPACTION_DELETE_FILES_COUNT_THRESHOLD: &str = "compaction.delete_files_count_threshold";
118
119pub const COMPACTION_TRIGGER_SNAPSHOT_COUNT: &str = "compaction.trigger_snapshot_count";
120
121pub const COMPACTION_TARGET_FILE_SIZE_MB: &str = "compaction.target_file_size_mb";
122
123fn deserialize_and_normalize_string<'de, D>(
124    deserializer: D,
125) -> std::result::Result<String, D::Error>
126where
127    D: serde::Deserializer<'de>,
128{
129    let s = String::deserialize(deserializer)?;
130    Ok(s.to_lowercase().replace('_', "-"))
131}
132
133fn deserialize_and_normalize_optional_string<'de, D>(
134    deserializer: D,
135) -> std::result::Result<Option<String>, D::Error>
136where
137    D: serde::Deserializer<'de>,
138{
139    let opt = Option::<String>::deserialize(deserializer)?;
140    Ok(opt.map(|s| s.to_lowercase().replace('_', "-")))
141}
142
143fn default_commit_retry_num() -> u32 {
144    8
145}
146
147fn default_iceberg_write_mode() -> String {
148    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
149}
150
151fn default_true() -> bool {
152    true
153}
154
155fn default_some_true() -> Option<bool> {
156    Some(true)
157}
158
159/// Compaction type for Iceberg sink
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Default)]
161#[serde(rename_all = "kebab-case")]
162pub enum CompactionType {
163    /// Full compaction - rewrites all data files
164    #[default]
165    Full,
166    /// Small files compaction - only compact small files
167    SmallFiles,
168    /// Files with delete compaction - only compact files that have associated delete files
169    FilesWithDelete,
170}
171
172impl CompactionType {
173    pub fn as_str(&self) -> &'static str {
174        match self {
175            CompactionType::Full => "full",
176            CompactionType::SmallFiles => "small-files",
177            CompactionType::FilesWithDelete => "files-with-delete",
178        }
179    }
180}
181
182impl std::fmt::Display for CompactionType {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        write!(f, "{}", self.as_str())
185    }
186}
187
188impl FromStr for CompactionType {
189    type Err = String;
190
191    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
192        // Normalize the input string first: lowercase and replace underscores with hyphens
193        let normalized = s.to_lowercase().replace('_', "-");
194
195        match normalized.as_str() {
196            "full" => Ok(CompactionType::Full),
197            "small-files" => Ok(CompactionType::SmallFiles),
198            "files-with-delete" => Ok(CompactionType::FilesWithDelete),
199            _ => Err(format!(
200                "Unknown compaction type '{}', must be one of: 'full', 'small-files', 'files-with-delete'",
201                s // 使用原始输入字符串来显示错误,而不是规范化后的
202            )),
203        }
204    }
205}
206
207#[serde_as]
208#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
209pub struct IcebergConfig {
210    pub r#type: String, // accept "append-only" or "upsert"
211
212    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
213    pub force_append_only: bool,
214
215    #[serde(flatten)]
216    common: IcebergCommon,
217
218    #[serde(flatten)]
219    table: IcebergTableIdentifier,
220
221    #[serde(
222        rename = "primary_key",
223        default,
224        deserialize_with = "deserialize_optional_string_seq_from_string"
225    )]
226    pub primary_key: Option<Vec<String>>,
227
228    // Props for java catalog props.
229    #[serde(skip)]
230    pub java_catalog_props: HashMap<String, String>,
231
232    #[serde(default)]
233    pub partition_by: Option<String>,
234
235    /// Commit every n(>0) checkpoints, default is 60.
236    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
237    #[serde_as(as = "DisplayFromStr")]
238    #[with_option(allow_alter_on_fly)]
239    pub commit_checkpoint_interval: u64,
240
241    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
242    pub create_table_if_not_exists: bool,
243
244    /// Whether it is `exactly_once`, the default is true.
245    #[serde(default = "default_some_true")]
246    #[serde_as(as = "Option<DisplayFromStr>")]
247    pub is_exactly_once: Option<bool>,
248    // Retry commit num when iceberg commit fail. default is 8.
249    // # TODO
250    // Iceberg table may store the retry commit num in table meta.
251    // We should try to find and use that as default commit retry num first.
252    #[serde(default = "default_commit_retry_num")]
253    pub commit_retry_num: u32,
254
255    /// Whether to enable iceberg compaction.
256    #[serde(
257        rename = "enable_compaction",
258        default,
259        deserialize_with = "deserialize_bool_from_string"
260    )]
261    #[with_option(allow_alter_on_fly)]
262    pub enable_compaction: bool,
263
264    /// The interval of iceberg compaction
265    #[serde(rename = "compaction_interval_sec", default)]
266    #[serde_as(as = "Option<DisplayFromStr>")]
267    #[with_option(allow_alter_on_fly)]
268    pub compaction_interval_sec: Option<u64>,
269
270    /// Whether to enable iceberg expired snapshots.
271    #[serde(
272        rename = "enable_snapshot_expiration",
273        default,
274        deserialize_with = "deserialize_bool_from_string"
275    )]
276    #[with_option(allow_alter_on_fly)]
277    pub enable_snapshot_expiration: bool,
278
279    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
280    #[serde(
281        rename = "write_mode",
282        default = "default_iceberg_write_mode",
283        deserialize_with = "deserialize_and_normalize_string"
284    )]
285    pub write_mode: String,
286
287    /// The maximum age (in milliseconds) for snapshots before they expire
288    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
289    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
290    #[serde_as(as = "Option<DisplayFromStr>")]
291    #[with_option(allow_alter_on_fly)]
292    pub snapshot_expiration_max_age_millis: Option<i64>,
293
294    /// The number of snapshots to retain
295    #[serde(rename = "snapshot_expiration_retain_last", default)]
296    #[serde_as(as = "Option<DisplayFromStr>")]
297    #[with_option(allow_alter_on_fly)]
298    pub snapshot_expiration_retain_last: Option<i32>,
299
300    #[serde(
301        rename = "snapshot_expiration_clear_expired_files",
302        default = "default_true",
303        deserialize_with = "deserialize_bool_from_string"
304    )]
305    #[with_option(allow_alter_on_fly)]
306    pub snapshot_expiration_clear_expired_files: bool,
307
308    #[serde(
309        rename = "snapshot_expiration_clear_expired_meta_data",
310        default = "default_true",
311        deserialize_with = "deserialize_bool_from_string"
312    )]
313    #[with_option(allow_alter_on_fly)]
314    pub snapshot_expiration_clear_expired_meta_data: bool,
315
316    /// The maximum number of snapshots allowed since the last rewrite operation
317    /// If set, sink will check snapshot count and wait if exceeded
318    #[serde(rename = "compaction.max_snapshots_num", default)]
319    #[serde_as(as = "Option<DisplayFromStr>")]
320    #[with_option(allow_alter_on_fly)]
321    pub max_snapshots_num_before_compaction: Option<usize>,
322
323    #[serde(rename = "compaction.small_files_threshold_mb", default)]
324    #[serde_as(as = "Option<DisplayFromStr>")]
325    #[with_option(allow_alter_on_fly)]
326    pub small_files_threshold_mb: Option<u64>,
327
328    #[serde(rename = "compaction.delete_files_count_threshold", default)]
329    #[serde_as(as = "Option<DisplayFromStr>")]
330    #[with_option(allow_alter_on_fly)]
331    pub delete_files_count_threshold: Option<usize>,
332
333    #[serde(rename = "compaction.trigger_snapshot_count", default)]
334    #[serde_as(as = "Option<DisplayFromStr>")]
335    #[with_option(allow_alter_on_fly)]
336    pub trigger_snapshot_count: Option<usize>,
337
338    #[serde(rename = "compaction.target_file_size_mb", default)]
339    #[serde_as(as = "Option<DisplayFromStr>")]
340    #[with_option(allow_alter_on_fly)]
341    pub target_file_size_mb: Option<u64>,
342
343    /// Compaction type: `full`, `small-files`, or `files-with-delete`
344    /// If not set, will default to `full`
345    #[serde(
346        rename = "compaction.type",
347        default,
348        deserialize_with = "deserialize_and_normalize_optional_string"
349    )]
350    #[with_option(allow_alter_on_fly)]
351    pub compaction_type: Option<String>,
352}
353
354impl EnforceSecret for IcebergConfig {
355    fn enforce_secret<'a>(
356        prop_iter: impl Iterator<Item = &'a str>,
357    ) -> crate::error::ConnectorResult<()> {
358        for prop in prop_iter {
359            IcebergCommon::enforce_one(prop)?;
360        }
361        Ok(())
362    }
363
364    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
365        IcebergCommon::enforce_one(prop)
366    }
367}
368
369impl IcebergConfig {
370    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
371        let mut config =
372            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
373                .map_err(|e| SinkError::Config(anyhow!(e)))?;
374
375        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
376            return Err(SinkError::Config(anyhow!(
377                "`{}` must be {}, or {}",
378                SINK_TYPE_OPTION,
379                SINK_TYPE_APPEND_ONLY,
380                SINK_TYPE_UPSERT
381            )));
382        }
383
384        if config.r#type == SINK_TYPE_UPSERT {
385            if let Some(primary_key) = &config.primary_key {
386                if primary_key.is_empty() {
387                    return Err(SinkError::Config(anyhow!(
388                        "`primary-key` must not be empty in {}",
389                        SINK_TYPE_UPSERT
390                    )));
391                }
392            } else {
393                return Err(SinkError::Config(anyhow!(
394                    "Must set `primary-key` in {}",
395                    SINK_TYPE_UPSERT
396                )));
397            }
398        }
399
400        // All configs start with "catalog." will be treated as java configs.
401        config.java_catalog_props = values
402            .iter()
403            .filter(|(k, _v)| {
404                k.starts_with("catalog.")
405                    && k != &"catalog.uri"
406                    && k != &"catalog.type"
407                    && k != &"catalog.name"
408                    && k != &"catalog.header"
409            })
410            .map(|(k, v)| (k[8..].to_string(), v.clone()))
411            .collect();
412
413        if config.commit_checkpoint_interval == 0 {
414            return Err(SinkError::Config(anyhow!(
415                "`commit-checkpoint-interval` must be greater than 0"
416            )));
417        }
418
419        // Validate compaction_type early
420        if let Some(ref compaction_type_str) = config.compaction_type {
421            CompactionType::from_str(compaction_type_str)
422                .map_err(|e| SinkError::Config(anyhow!(e)))?;
423        }
424
425        Ok(config)
426    }
427
428    pub fn catalog_type(&self) -> &str {
429        self.common.catalog_type()
430    }
431
432    pub async fn load_table(&self) -> Result<Table> {
433        self.common
434            .load_table(&self.table, &self.java_catalog_props)
435            .await
436            .map_err(Into::into)
437    }
438
439    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
440        self.common
441            .create_catalog(&self.java_catalog_props)
442            .await
443            .map_err(Into::into)
444    }
445
446    pub fn full_table_name(&self) -> Result<TableIdent> {
447        self.table.to_table_ident().map_err(Into::into)
448    }
449
450    pub fn catalog_name(&self) -> String {
451        self.common.catalog_name()
452    }
453
454    pub fn compaction_interval_sec(&self) -> u64 {
455        // default to 1 hour
456        self.compaction_interval_sec.unwrap_or(3600)
457    }
458
459    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
460    /// Returns `current_time_ms` - `max_age_millis`
461    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
462        self.snapshot_expiration_max_age_millis
463            .map(|max_age_millis| current_time_ms - max_age_millis)
464    }
465
466    pub fn trigger_snapshot_count(&self) -> usize {
467        self.trigger_snapshot_count.unwrap_or(16)
468    }
469
470    pub fn small_files_threshold_mb(&self) -> u64 {
471        self.small_files_threshold_mb.unwrap_or(64)
472    }
473
474    pub fn delete_files_count_threshold(&self) -> usize {
475        self.delete_files_count_threshold.unwrap_or(256)
476    }
477
478    pub fn target_file_size_mb(&self) -> u64 {
479        self.target_file_size_mb.unwrap_or(1024)
480    }
481
482    /// Get the compaction type as an enum
483    /// This method parses the string and returns the enum value
484    pub fn compaction_type(&self) -> CompactionType {
485        self.compaction_type
486            .as_deref()
487            .and_then(|s| CompactionType::from_str(s).ok())
488            .unwrap_or_default()
489    }
490}
491
492pub struct IcebergSink {
493    pub config: IcebergConfig,
494    param: SinkParam,
495    // In upsert mode, it never be None and empty.
496    unique_column_ids: Option<Vec<usize>>,
497}
498
499impl EnforceSecret for IcebergSink {
500    fn enforce_secret<'a>(
501        prop_iter: impl Iterator<Item = &'a str>,
502    ) -> crate::error::ConnectorResult<()> {
503        for prop in prop_iter {
504            IcebergConfig::enforce_one(prop)?;
505        }
506        Ok(())
507    }
508}
509
510impl TryFrom<SinkParam> for IcebergSink {
511    type Error = SinkError;
512
513    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
514        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
515        IcebergSink::new(config, param)
516    }
517}
518
519impl Debug for IcebergSink {
520    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521        f.debug_struct("IcebergSink")
522            .field("config", &self.config)
523            .finish()
524    }
525}
526
527impl IcebergSink {
528    pub async fn create_and_validate_table(&self) -> Result<Table> {
529        if self.config.create_table_if_not_exists {
530            self.create_table_if_not_exists().await?;
531        }
532
533        let table = self
534            .config
535            .load_table()
536            .await
537            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
538
539        let sink_schema = self.param.schema();
540        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
541            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
542
543        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
544            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
545
546        Ok(table)
547    }
548
549    pub async fn create_table_if_not_exists(&self) -> Result<()> {
550        let catalog = self.config.create_catalog().await?;
551        let namespace = if let Some(database_name) = self.config.table.database_name() {
552            let namespace = NamespaceIdent::new(database_name.to_owned());
553            if !catalog
554                .namespace_exists(&namespace)
555                .await
556                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
557            {
558                catalog
559                    .create_namespace(&namespace, HashMap::default())
560                    .await
561                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
562                    .context("failed to create iceberg namespace")?;
563            }
564            namespace
565        } else {
566            bail!("database name must be set if you want to create table")
567        };
568
569        let table_id = self
570            .config
571            .full_table_name()
572            .context("Unable to parse table name")?;
573        if !catalog
574            .table_exists(&table_id)
575            .await
576            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
577        {
578            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
579            // convert risingwave schema -> arrow schema -> iceberg schema
580            let arrow_fields = self
581                .param
582                .columns
583                .iter()
584                .map(|column| {
585                    Ok(iceberg_create_table_arrow_convert
586                        .to_arrow_field(&column.name, &column.data_type)
587                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
588                        .context(format!(
589                            "failed to convert {}: {} to arrow type",
590                            &column.name, &column.data_type
591                        ))?)
592                })
593                .collect::<Result<Vec<ArrowField>>>()?;
594            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
595            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
596                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
597                .context("failed to convert arrow schema to iceberg schema")?;
598
599            let location = {
600                let mut names = namespace.clone().inner();
601                names.push(self.config.table.table_name().to_owned());
602                match &self.config.common.warehouse_path {
603                    Some(warehouse_path) => {
604                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
605                        let url = Url::parse(warehouse_path);
606                        if url.is_err() || is_s3_tables {
607                            // For rest catalog, the warehouse_path could be a warehouse name.
608                            // In this case, we should specify the location when creating a table.
609                            if self.config.common.catalog_type() == "rest"
610                                || self.config.common.catalog_type() == "rest_rust"
611                            {
612                                None
613                            } else {
614                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
615                            }
616                        } else if warehouse_path.ends_with('/') {
617                            Some(format!("{}{}", warehouse_path, names.join("/")))
618                        } else {
619                            Some(format!("{}/{}", warehouse_path, names.join("/")))
620                        }
621                    }
622                    None => None,
623                }
624            };
625
626            let partition_spec = match &self.config.partition_by {
627                Some(partition_by) => {
628                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
629                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
630                        .into_iter()
631                        .enumerate()
632                    {
633                        match iceberg_schema.field_id_by_name(&column) {
634                            Some(id) => partition_fields.push(
635                                UnboundPartitionField::builder()
636                                    .source_id(id)
637                                    .transform(transform)
638                                    .name(format!("_p_{}", column))
639                                    .field_id(i as i32)
640                                    .build(),
641                            ),
642                            None => bail!(format!(
643                                "Partition source column does not exist in schema: {}",
644                                column
645                            )),
646                        };
647                    }
648                    Some(
649                        UnboundPartitionSpec::builder()
650                            .with_spec_id(0)
651                            .add_partition_fields(partition_fields)
652                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
653                            .context("failed to add partition columns")?
654                            .build(),
655                    )
656                }
657                None => None,
658            };
659
660            let table_creation_builder = TableCreation::builder()
661                .name(self.config.table.table_name().to_owned())
662                .schema(iceberg_schema);
663
664            let table_creation = match (location, partition_spec) {
665                (Some(location), Some(partition_spec)) => table_creation_builder
666                    .location(location)
667                    .partition_spec(partition_spec)
668                    .build(),
669                (Some(location), None) => table_creation_builder.location(location).build(),
670                (None, Some(partition_spec)) => table_creation_builder
671                    .partition_spec(partition_spec)
672                    .build(),
673                (None, None) => table_creation_builder.build(),
674            };
675
676            catalog
677                .create_table(&namespace, table_creation)
678                .await
679                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
680                .context("failed to create iceberg table")?;
681        }
682        Ok(())
683    }
684
685    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
686        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
687            if let Some(pk) = &config.primary_key {
688                let mut unique_column_ids = Vec::with_capacity(pk.len());
689                for col_name in pk {
690                    let id = param
691                        .columns
692                        .iter()
693                        .find(|col| col.name.as_str() == col_name)
694                        .ok_or_else(|| {
695                            SinkError::Config(anyhow!(
696                                "Primary key column {} not found in sink schema",
697                                col_name
698                            ))
699                        })?
700                        .column_id
701                        .get_id() as usize;
702                    unique_column_ids.push(id);
703                }
704                Some(unique_column_ids)
705            } else {
706                unreachable!()
707            }
708        } else {
709            None
710        };
711        Ok(Self {
712            config,
713            param,
714            unique_column_ids,
715        })
716    }
717}
718
719impl Sink for IcebergSink {
720    type Coordinator = IcebergSinkCommitter;
721    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
722
723    const SINK_NAME: &'static str = ICEBERG_SINK;
724
725    async fn validate(&self) -> Result<()> {
726        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
727            bail!("Snowflake catalog only supports iceberg sources");
728        }
729
730        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
731            risingwave_common::license::Feature::IcebergSinkWithGlue
732                .check_available()
733                .map_err(|e| anyhow::anyhow!(e))?;
734        }
735
736        // Validate compaction type configuration
737        let compaction_type = self.config.compaction_type();
738
739        // Check COW mode constraints
740        // COW mode only supports 'full' compaction type
741        if self.config.write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
742            && compaction_type != CompactionType::Full
743        {
744            bail!(
745                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
746                compaction_type
747            );
748        }
749
750        match compaction_type {
751            CompactionType::SmallFiles => {
752                // 1. check license
753                risingwave_common::license::Feature::IcebergCompaction
754                    .check_available()
755                    .map_err(|e| anyhow::anyhow!(e))?;
756
757                // 2. check write mode
758                if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
759                    bail!(
760                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
761                        self.config.write_mode
762                    );
763                }
764
765                // 3. check conflicting parameters
766                if self.config.delete_files_count_threshold.is_some() {
767                    bail!(
768                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
769                    );
770                }
771            }
772            CompactionType::FilesWithDelete => {
773                // 1. check license
774                risingwave_common::license::Feature::IcebergCompaction
775                    .check_available()
776                    .map_err(|e| anyhow::anyhow!(e))?;
777
778                // 2. check write mode
779                if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
780                    bail!(
781                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
782                        self.config.write_mode
783                    );
784                }
785
786                // 3. check conflicting parameters
787                if self.config.small_files_threshold_mb.is_some() {
788                    bail!(
789                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
790                    );
791                }
792            }
793            CompactionType::Full => {
794                // Full compaction has no special requirements
795            }
796        }
797
798        let _ = self.create_and_validate_table().await?;
799        Ok(())
800    }
801
802    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
803        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
804
805        // Validate compaction interval
806        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
807            if iceberg_config.enable_compaction && compaction_interval == 0 {
808                bail!(
809                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
810                );
811            }
812
813            // log compaction_interval
814            tracing::info!(
815                "Alter config compaction_interval set to {} seconds",
816                compaction_interval
817            );
818        }
819
820        // Validate max snapshots
821        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
822            && max_snapshots < 1
823        {
824            bail!(
825                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
826                max_snapshots
827            );
828        }
829
830        Ok(())
831    }
832
833    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
834        let table = self.create_and_validate_table().await?;
835        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
836            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
837        } else {
838            IcebergSinkWriter::new_append_only(table, &writer_param).await?
839        };
840
841        let commit_checkpoint_interval =
842            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
843                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
844            );
845        let writer = CoordinatedLogSinker::new(
846            &writer_param,
847            self.param.clone(),
848            inner,
849            commit_checkpoint_interval,
850        )
851        .await?;
852
853        Ok(writer)
854    }
855
856    fn is_coordinated_sink(&self) -> bool {
857        true
858    }
859
860    async fn new_coordinator(
861        &self,
862        db: DatabaseConnection,
863        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
864    ) -> Result<Self::Coordinator> {
865        let catalog = self.config.create_catalog().await?;
866        let table = self.create_and_validate_table().await?;
867        Ok(IcebergSinkCommitter {
868            catalog,
869            table,
870            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
871            last_commit_epoch: 0,
872            sink_id: self.param.sink_id,
873            config: self.config.clone(),
874            param: self.param.clone(),
875            db,
876            commit_retry_num: self.config.commit_retry_num,
877            committed_epoch_subscriber: None,
878            iceberg_compact_stat_sender,
879        })
880    }
881}
882
883/// None means no project.
884/// Prepare represent the extra partition column idx.
885/// Done represents the project idx vec.
886///
887/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
888/// to create the project idx vec.
889enum ProjectIdxVec {
890    None,
891    Prepare(usize),
892    Done(Vec<usize>),
893}
894
895pub struct IcebergSinkWriter {
896    writer: IcebergWriterDispatch,
897    arrow_schema: SchemaRef,
898    // See comments below
899    metrics: IcebergWriterMetrics,
900    // State of iceberg table for this writer
901    table: Table,
902    // For chunk with extra partition column, we should remove this column before write.
903    // This project index vec is used to avoid create project idx each time.
904    project_idx_vec: ProjectIdxVec,
905}
906
907#[allow(clippy::type_complexity)]
908enum IcebergWriterDispatch {
909    PartitionAppendOnly {
910        writer: Option<Box<dyn IcebergWriter>>,
911        writer_builder: MonitoredGeneralWriterBuilder<
912            FanoutPartitionWriterBuilder<
913                DataFileWriterBuilder<
914                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
915                >,
916            >,
917        >,
918    },
919    NonPartitionAppendOnly {
920        writer: Option<Box<dyn IcebergWriter>>,
921        writer_builder: MonitoredGeneralWriterBuilder<
922            DataFileWriterBuilder<
923                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
924            >,
925        >,
926    },
927    PartitionUpsert {
928        writer: Option<Box<dyn IcebergWriter>>,
929        writer_builder: MonitoredGeneralWriterBuilder<
930            FanoutPartitionWriterBuilder<
931                EqualityDeltaWriterBuilder<
932                    DataFileWriterBuilder<
933                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
934                    >,
935                    MonitoredPositionDeleteWriterBuilder<
936                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
937                    >,
938                    EqualityDeleteFileWriterBuilder<
939                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
940                    >,
941                >,
942            >,
943        >,
944        arrow_schema_with_op_column: SchemaRef,
945    },
946    NonpartitionUpsert {
947        writer: Option<Box<dyn IcebergWriter>>,
948        writer_builder: MonitoredGeneralWriterBuilder<
949            EqualityDeltaWriterBuilder<
950                DataFileWriterBuilder<
951                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
952                >,
953                MonitoredPositionDeleteWriterBuilder<
954                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
955                >,
956                EqualityDeleteFileWriterBuilder<
957                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
958                >,
959            >,
960        >,
961        arrow_schema_with_op_column: SchemaRef,
962    },
963}
964
965impl IcebergWriterDispatch {
966    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
967        match self {
968            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
969            | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
970            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
971            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
972        }
973    }
974}
975
976pub struct IcebergWriterMetrics {
977    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
978    // They are actually used in `PrometheusWriterBuilder`:
979    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
980    // We keep them here to let the guard cleans the labels from metrics registry when dropped
981    _write_qps: LabelGuardedIntCounter,
982    _write_latency: LabelGuardedHistogram,
983    write_bytes: LabelGuardedIntCounter,
984}
985
986impl IcebergSinkWriter {
987    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
988        let SinkWriterParam {
989            extra_partition_col_idx,
990            actor_id,
991            sink_id,
992            sink_name,
993            ..
994        } = writer_param;
995        let metrics_labels = [
996            &actor_id.to_string(),
997            &sink_id.to_string(),
998            sink_name.as_str(),
999        ];
1000
1001        // Metrics
1002        let write_qps = GLOBAL_SINK_METRICS
1003            .iceberg_write_qps
1004            .with_guarded_label_values(&metrics_labels);
1005        let write_latency = GLOBAL_SINK_METRICS
1006            .iceberg_write_latency
1007            .with_guarded_label_values(&metrics_labels);
1008        // # TODO
1009        // Unused. Add this metrics later.
1010        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1011            .iceberg_rolling_unflushed_data_file
1012            .with_guarded_label_values(&metrics_labels);
1013        let write_bytes = GLOBAL_SINK_METRICS
1014            .iceberg_write_bytes
1015            .with_guarded_label_values(&metrics_labels);
1016
1017        let schema = table.metadata().current_schema();
1018        let partition_spec = table.metadata().default_partition_spec();
1019
1020        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1021        let unique_uuid_suffix = Uuid::now_v7();
1022
1023        let parquet_writer_properties = WriterProperties::builder()
1024            .set_max_row_group_size(
1025                writer_param
1026                    .streaming_config
1027                    .developer
1028                    .iceberg_sink_write_parquet_max_row_group_rows,
1029            )
1030            .build();
1031
1032        let parquet_writer_builder = ParquetWriterBuilder::new(
1033            parquet_writer_properties,
1034            schema.clone(),
1035            table.file_io().clone(),
1036            DefaultLocationGenerator::new(table.metadata().clone())
1037                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1038            DefaultFileNameGenerator::new(
1039                writer_param.actor_id.to_string(),
1040                Some(unique_uuid_suffix.to_string()),
1041                iceberg::spec::DataFileFormat::Parquet,
1042            ),
1043        );
1044        let data_file_builder =
1045            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
1046        if partition_spec.fields().is_empty() {
1047            let writer_builder = MonitoredGeneralWriterBuilder::new(
1048                data_file_builder,
1049                write_qps.clone(),
1050                write_latency.clone(),
1051            );
1052            let inner_writer = Some(Box::new(
1053                writer_builder
1054                    .clone()
1055                    .build()
1056                    .await
1057                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1058            ) as Box<dyn IcebergWriter>);
1059            Ok(Self {
1060                arrow_schema: Arc::new(
1061                    schema_to_arrow_schema(table.metadata().current_schema())
1062                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1063                ),
1064                metrics: IcebergWriterMetrics {
1065                    _write_qps: write_qps,
1066                    _write_latency: write_latency,
1067                    write_bytes,
1068                },
1069                writer: IcebergWriterDispatch::NonPartitionAppendOnly {
1070                    writer: inner_writer,
1071                    writer_builder,
1072                },
1073                table,
1074                project_idx_vec: {
1075                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1076                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1077                    } else {
1078                        ProjectIdxVec::None
1079                    }
1080                },
1081            })
1082        } else {
1083            let partition_builder = MonitoredGeneralWriterBuilder::new(
1084                FanoutPartitionWriterBuilder::new(
1085                    data_file_builder,
1086                    partition_spec.clone(),
1087                    schema.clone(),
1088                )
1089                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1090                write_qps.clone(),
1091                write_latency.clone(),
1092            );
1093            let inner_writer = Some(Box::new(
1094                partition_builder
1095                    .clone()
1096                    .build()
1097                    .await
1098                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1099            ) as Box<dyn IcebergWriter>);
1100            Ok(Self {
1101                arrow_schema: Arc::new(
1102                    schema_to_arrow_schema(table.metadata().current_schema())
1103                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1104                ),
1105                metrics: IcebergWriterMetrics {
1106                    _write_qps: write_qps,
1107                    _write_latency: write_latency,
1108                    write_bytes,
1109                },
1110                writer: IcebergWriterDispatch::PartitionAppendOnly {
1111                    writer: inner_writer,
1112                    writer_builder: partition_builder,
1113                },
1114                table,
1115                project_idx_vec: {
1116                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1117                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1118                    } else {
1119                        ProjectIdxVec::None
1120                    }
1121                },
1122            })
1123        }
1124    }
1125
1126    pub async fn new_upsert(
1127        table: Table,
1128        unique_column_ids: Vec<usize>,
1129        writer_param: &SinkWriterParam,
1130    ) -> Result<Self> {
1131        let SinkWriterParam {
1132            extra_partition_col_idx,
1133            actor_id,
1134            sink_id,
1135            sink_name,
1136            ..
1137        } = writer_param;
1138        let metrics_labels = [
1139            &actor_id.to_string(),
1140            &sink_id.to_string(),
1141            sink_name.as_str(),
1142        ];
1143        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1144
1145        // Metrics
1146        let write_qps = GLOBAL_SINK_METRICS
1147            .iceberg_write_qps
1148            .with_guarded_label_values(&metrics_labels);
1149        let write_latency = GLOBAL_SINK_METRICS
1150            .iceberg_write_latency
1151            .with_guarded_label_values(&metrics_labels);
1152        // # TODO
1153        // Unused. Add this metrics later.
1154        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1155            .iceberg_rolling_unflushed_data_file
1156            .with_guarded_label_values(&metrics_labels);
1157        let position_delete_cache_num = GLOBAL_SINK_METRICS
1158            .iceberg_position_delete_cache_num
1159            .with_guarded_label_values(&metrics_labels);
1160        let write_bytes = GLOBAL_SINK_METRICS
1161            .iceberg_write_bytes
1162            .with_guarded_label_values(&metrics_labels);
1163
1164        // Determine the schema id and partition spec id
1165        let schema = table.metadata().current_schema();
1166        let partition_spec = table.metadata().default_partition_spec();
1167
1168        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1169        let unique_uuid_suffix = Uuid::now_v7();
1170
1171        let parquet_writer_properties = WriterProperties::builder()
1172            .set_max_row_group_size(
1173                writer_param
1174                    .streaming_config
1175                    .developer
1176                    .iceberg_sink_write_parquet_max_row_group_rows,
1177            )
1178            .build();
1179
1180        let data_file_builder = {
1181            let parquet_writer_builder = ParquetWriterBuilder::new(
1182                parquet_writer_properties.clone(),
1183                schema.clone(),
1184                table.file_io().clone(),
1185                DefaultLocationGenerator::new(table.metadata().clone())
1186                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1187                DefaultFileNameGenerator::new(
1188                    writer_param.actor_id.to_string(),
1189                    Some(unique_uuid_suffix.to_string()),
1190                    iceberg::spec::DataFileFormat::Parquet,
1191                ),
1192            );
1193            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id())
1194        };
1195        let position_delete_builder = {
1196            let parquet_writer_builder = ParquetWriterBuilder::new(
1197                parquet_writer_properties.clone(),
1198                POSITION_DELETE_SCHEMA.clone(),
1199                table.file_io().clone(),
1200                DefaultLocationGenerator::new(table.metadata().clone())
1201                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1202                DefaultFileNameGenerator::new(
1203                    writer_param.actor_id.to_string(),
1204                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1205                    iceberg::spec::DataFileFormat::Parquet,
1206                ),
1207            );
1208            MonitoredPositionDeleteWriterBuilder::new(
1209                SortPositionDeleteWriterBuilder::new(
1210                    parquet_writer_builder,
1211                    writer_param
1212                        .streaming_config
1213                        .developer
1214                        .iceberg_sink_positional_delete_cache_size,
1215                    None,
1216                    None,
1217                ),
1218                position_delete_cache_num,
1219            )
1220        };
1221        let equality_delete_builder = {
1222            let config = EqualityDeleteWriterConfig::new(
1223                unique_column_ids.clone(),
1224                table.metadata().current_schema().clone(),
1225                None,
1226                partition_spec.spec_id(),
1227            )
1228            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1229            let parquet_writer_builder = ParquetWriterBuilder::new(
1230                parquet_writer_properties.clone(),
1231                Arc::new(
1232                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1233                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1234                ),
1235                table.file_io().clone(),
1236                DefaultLocationGenerator::new(table.metadata().clone())
1237                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1238                DefaultFileNameGenerator::new(
1239                    writer_param.actor_id.to_string(),
1240                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1241                    iceberg::spec::DataFileFormat::Parquet,
1242                ),
1243            );
1244
1245            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder, config)
1246        };
1247        let delta_builder = EqualityDeltaWriterBuilder::new(
1248            data_file_builder,
1249            position_delete_builder,
1250            equality_delete_builder,
1251            unique_column_ids,
1252        );
1253        if partition_spec.fields().is_empty() {
1254            let writer_builder = MonitoredGeneralWriterBuilder::new(
1255                delta_builder,
1256                write_qps.clone(),
1257                write_latency.clone(),
1258            );
1259            let inner_writer = Some(Box::new(
1260                writer_builder
1261                    .clone()
1262                    .build()
1263                    .await
1264                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1265            ) as Box<dyn IcebergWriter>);
1266            let original_arrow_schema = Arc::new(
1267                schema_to_arrow_schema(table.metadata().current_schema())
1268                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1269            );
1270            let schema_with_extra_op_column = {
1271                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1272                new_fields.push(Arc::new(ArrowField::new(
1273                    "op".to_owned(),
1274                    ArrowDataType::Int32,
1275                    false,
1276                )));
1277                Arc::new(ArrowSchema::new(new_fields))
1278            };
1279            Ok(Self {
1280                arrow_schema: original_arrow_schema,
1281                metrics: IcebergWriterMetrics {
1282                    _write_qps: write_qps,
1283                    _write_latency: write_latency,
1284                    write_bytes,
1285                },
1286                table,
1287                writer: IcebergWriterDispatch::NonpartitionUpsert {
1288                    writer: inner_writer,
1289                    writer_builder,
1290                    arrow_schema_with_op_column: schema_with_extra_op_column,
1291                },
1292                project_idx_vec: {
1293                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1294                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1295                    } else {
1296                        ProjectIdxVec::None
1297                    }
1298                },
1299            })
1300        } else {
1301            let original_arrow_schema = Arc::new(
1302                schema_to_arrow_schema(table.metadata().current_schema())
1303                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1304            );
1305            let schema_with_extra_op_column = {
1306                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1307                new_fields.push(Arc::new(ArrowField::new(
1308                    "op".to_owned(),
1309                    ArrowDataType::Int32,
1310                    false,
1311                )));
1312                Arc::new(ArrowSchema::new(new_fields))
1313            };
1314            let partition_builder = MonitoredGeneralWriterBuilder::new(
1315                FanoutPartitionWriterBuilder::new_with_custom_schema(
1316                    delta_builder,
1317                    schema_with_extra_op_column.clone(),
1318                    partition_spec.clone(),
1319                    table.metadata().current_schema().clone(),
1320                ),
1321                write_qps.clone(),
1322                write_latency.clone(),
1323            );
1324            let inner_writer = Some(Box::new(
1325                partition_builder
1326                    .clone()
1327                    .build()
1328                    .await
1329                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1330            ) as Box<dyn IcebergWriter>);
1331            Ok(Self {
1332                arrow_schema: original_arrow_schema,
1333                metrics: IcebergWriterMetrics {
1334                    _write_qps: write_qps,
1335                    _write_latency: write_latency,
1336                    write_bytes,
1337                },
1338                table,
1339                writer: IcebergWriterDispatch::PartitionUpsert {
1340                    writer: inner_writer,
1341                    writer_builder: partition_builder,
1342                    arrow_schema_with_op_column: schema_with_extra_op_column,
1343                },
1344                project_idx_vec: {
1345                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1346                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1347                    } else {
1348                        ProjectIdxVec::None
1349                    }
1350                },
1351            })
1352        }
1353    }
1354}
1355
1356#[async_trait]
1357impl SinkWriter for IcebergSinkWriter {
1358    type CommitMetadata = Option<SinkMetadata>;
1359
1360    /// Begin a new epoch
1361    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1362        // Just skip it.
1363        Ok(())
1364    }
1365
1366    /// Write a stream chunk to sink
1367    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1368        // Try to build writer if it's None.
1369        match &mut self.writer {
1370            IcebergWriterDispatch::PartitionAppendOnly {
1371                writer,
1372                writer_builder,
1373            } => {
1374                if writer.is_none() {
1375                    *writer = Some(Box::new(
1376                        writer_builder
1377                            .clone()
1378                            .build()
1379                            .await
1380                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1381                    ));
1382                }
1383            }
1384            IcebergWriterDispatch::NonPartitionAppendOnly {
1385                writer,
1386                writer_builder,
1387            } => {
1388                if writer.is_none() {
1389                    *writer = Some(Box::new(
1390                        writer_builder
1391                            .clone()
1392                            .build()
1393                            .await
1394                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1395                    ));
1396                }
1397            }
1398            IcebergWriterDispatch::PartitionUpsert {
1399                writer,
1400                writer_builder,
1401                ..
1402            } => {
1403                if writer.is_none() {
1404                    *writer = Some(Box::new(
1405                        writer_builder
1406                            .clone()
1407                            .build()
1408                            .await
1409                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1410                    ));
1411                }
1412            }
1413            IcebergWriterDispatch::NonpartitionUpsert {
1414                writer,
1415                writer_builder,
1416                ..
1417            } => {
1418                if writer.is_none() {
1419                    *writer = Some(Box::new(
1420                        writer_builder
1421                            .clone()
1422                            .build()
1423                            .await
1424                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1425                    ));
1426                }
1427            }
1428        };
1429
1430        // Process the chunk.
1431        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1432        match &self.project_idx_vec {
1433            ProjectIdxVec::None => {}
1434            ProjectIdxVec::Prepare(idx) => {
1435                if *idx >= chunk.columns().len() {
1436                    return Err(SinkError::Iceberg(anyhow!(
1437                        "invalid extra partition column index {}",
1438                        idx
1439                    )));
1440                }
1441                let project_idx_vec = (0..*idx)
1442                    .chain(*idx + 1..chunk.columns().len())
1443                    .collect_vec();
1444                chunk = chunk.project(&project_idx_vec);
1445                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1446            }
1447            ProjectIdxVec::Done(idx_vec) => {
1448                chunk = chunk.project(idx_vec);
1449            }
1450        }
1451        if ops.is_empty() {
1452            return Ok(());
1453        }
1454        let write_batch_size = chunk.estimated_heap_size();
1455        let batch = match &self.writer {
1456            IcebergWriterDispatch::PartitionAppendOnly { .. }
1457            | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1458                // separate out insert chunk
1459                let filters =
1460                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1461                chunk.set_visibility(filters);
1462                IcebergArrowConvert
1463                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
1464                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1465            }
1466            IcebergWriterDispatch::PartitionUpsert {
1467                arrow_schema_with_op_column,
1468                ..
1469            }
1470            | IcebergWriterDispatch::NonpartitionUpsert {
1471                arrow_schema_with_op_column,
1472                ..
1473            } => {
1474                let chunk = IcebergArrowConvert
1475                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1476                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1477                let ops = Arc::new(Int32Array::from(
1478                    ops.iter()
1479                        .map(|op| match op {
1480                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1481                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1482                        })
1483                        .collect_vec(),
1484                ));
1485                let mut columns = chunk.columns().to_vec();
1486                columns.push(ops);
1487                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1488                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1489            }
1490        };
1491
1492        let writer = self.writer.get_writer().unwrap();
1493        writer
1494            .write(batch)
1495            .instrument_await("iceberg_write")
1496            .await
1497            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1498        self.metrics.write_bytes.inc_by(write_batch_size as _);
1499        Ok(())
1500    }
1501
1502    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1503    /// writer should commit the current epoch.
1504    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1505        // Skip it if not checkpoint
1506        if !is_checkpoint {
1507            return Ok(None);
1508        }
1509
1510        let close_result = match &mut self.writer {
1511            IcebergWriterDispatch::PartitionAppendOnly {
1512                writer,
1513                writer_builder,
1514            } => {
1515                let close_result = match writer.take() {
1516                    Some(mut writer) => {
1517                        Some(writer.close().instrument_await("iceberg_close").await)
1518                    }
1519                    _ => None,
1520                };
1521                match writer_builder.clone().build().await {
1522                    Ok(new_writer) => {
1523                        *writer = Some(Box::new(new_writer));
1524                    }
1525                    _ => {
1526                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1527                        // here because current writer may close successfully. So we just log the error.
1528                        warn!("Failed to build new writer after close");
1529                    }
1530                }
1531                close_result
1532            }
1533            IcebergWriterDispatch::NonPartitionAppendOnly {
1534                writer,
1535                writer_builder,
1536            } => {
1537                let close_result = match writer.take() {
1538                    Some(mut writer) => {
1539                        Some(writer.close().instrument_await("iceberg_close").await)
1540                    }
1541                    _ => None,
1542                };
1543                match writer_builder.clone().build().await {
1544                    Ok(new_writer) => {
1545                        *writer = Some(Box::new(new_writer));
1546                    }
1547                    _ => {
1548                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1549                        // here because current writer may close successfully. So we just log the error.
1550                        warn!("Failed to build new writer after close");
1551                    }
1552                }
1553                close_result
1554            }
1555            IcebergWriterDispatch::PartitionUpsert {
1556                writer,
1557                writer_builder,
1558                ..
1559            } => {
1560                let close_result = match writer.take() {
1561                    Some(mut writer) => {
1562                        Some(writer.close().instrument_await("iceberg_close").await)
1563                    }
1564                    _ => None,
1565                };
1566                match writer_builder.clone().build().await {
1567                    Ok(new_writer) => {
1568                        *writer = Some(Box::new(new_writer));
1569                    }
1570                    _ => {
1571                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1572                        // here because current writer may close successfully. So we just log the error.
1573                        warn!("Failed to build new writer after close");
1574                    }
1575                }
1576                close_result
1577            }
1578            IcebergWriterDispatch::NonpartitionUpsert {
1579                writer,
1580                writer_builder,
1581                ..
1582            } => {
1583                let close_result = match writer.take() {
1584                    Some(mut writer) => {
1585                        Some(writer.close().instrument_await("iceberg_close").await)
1586                    }
1587                    _ => None,
1588                };
1589                match writer_builder.clone().build().await {
1590                    Ok(new_writer) => {
1591                        *writer = Some(Box::new(new_writer));
1592                    }
1593                    _ => {
1594                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1595                        // here because current writer may close successfully. So we just log the error.
1596                        warn!("Failed to build new writer after close");
1597                    }
1598                }
1599                close_result
1600            }
1601        };
1602
1603        match close_result {
1604            Some(Ok(result)) => {
1605                let version = self.table.metadata().format_version() as u8;
1606                let partition_type = self.table.metadata().default_partition_type();
1607                let data_files = result
1608                    .into_iter()
1609                    .map(|f| {
1610                        SerializedDataFile::try_from(f, partition_type, version == 1)
1611                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1612                    })
1613                    .collect::<Result<Vec<_>>>()?;
1614                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1615                    data_files,
1616                    schema_id: self.table.metadata().current_schema_id(),
1617                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1618                })?))
1619            }
1620            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1621            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1622        }
1623    }
1624
1625    /// Clean up
1626    async fn abort(&mut self) -> Result<()> {
1627        // TODO: abort should clean up all the data written in this epoch.
1628        Ok(())
1629    }
1630}
1631
1632const SCHEMA_ID: &str = "schema_id";
1633const PARTITION_SPEC_ID: &str = "partition_spec_id";
1634const DATA_FILES: &str = "data_files";
1635
1636#[derive(Default, Clone)]
1637struct IcebergCommitResult {
1638    schema_id: i32,
1639    partition_spec_id: i32,
1640    data_files: Vec<SerializedDataFile>,
1641}
1642
1643impl IcebergCommitResult {
1644    fn try_from(value: &SinkMetadata) -> Result<Self> {
1645        if let Some(Serialized(v)) = &value.metadata {
1646            let mut values = if let serde_json::Value::Object(v) =
1647                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1648                    .context("Can't parse iceberg sink metadata")?
1649            {
1650                v
1651            } else {
1652                bail!("iceberg sink metadata should be an object");
1653            };
1654
1655            let schema_id;
1656            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1657                schema_id = value
1658                    .as_u64()
1659                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1660            } else {
1661                bail!("iceberg sink metadata should have schema_id");
1662            }
1663
1664            let partition_spec_id;
1665            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1666                partition_spec_id = value
1667                    .as_u64()
1668                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1669            } else {
1670                bail!("iceberg sink metadata should have partition_spec_id");
1671            }
1672
1673            let data_files: Vec<SerializedDataFile>;
1674            if let serde_json::Value::Array(values) = values
1675                .remove(DATA_FILES)
1676                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1677            {
1678                data_files = values
1679                    .into_iter()
1680                    .map(from_value::<SerializedDataFile>)
1681                    .collect::<std::result::Result<_, _>>()
1682                    .unwrap();
1683            } else {
1684                bail!("iceberg sink metadata should have data_files object");
1685            }
1686
1687            Ok(Self {
1688                schema_id: schema_id as i32,
1689                partition_spec_id: partition_spec_id as i32,
1690                data_files,
1691            })
1692        } else {
1693            bail!("Can't create iceberg sink write result from empty data!")
1694        }
1695    }
1696
1697    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1698        let mut values = if let serde_json::Value::Object(value) =
1699            serde_json::from_slice::<serde_json::Value>(&value)
1700                .context("Can't parse iceberg sink metadata")?
1701        {
1702            value
1703        } else {
1704            bail!("iceberg sink metadata should be an object");
1705        };
1706
1707        let schema_id;
1708        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1709            schema_id = value
1710                .as_u64()
1711                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1712        } else {
1713            bail!("iceberg sink metadata should have schema_id");
1714        }
1715
1716        let partition_spec_id;
1717        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1718            partition_spec_id = value
1719                .as_u64()
1720                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1721        } else {
1722            bail!("iceberg sink metadata should have partition_spec_id");
1723        }
1724
1725        let data_files: Vec<SerializedDataFile>;
1726        if let serde_json::Value::Array(values) = values
1727            .remove(DATA_FILES)
1728            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1729        {
1730            data_files = values
1731                .into_iter()
1732                .map(from_value::<SerializedDataFile>)
1733                .collect::<std::result::Result<_, _>>()
1734                .unwrap();
1735        } else {
1736            bail!("iceberg sink metadata should have data_files object");
1737        }
1738
1739        Ok(Self {
1740            schema_id: schema_id as i32,
1741            partition_spec_id: partition_spec_id as i32,
1742            data_files,
1743        })
1744    }
1745}
1746
1747impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1748    type Error = SinkError;
1749
1750    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1751        let json_data_files = serde_json::Value::Array(
1752            value
1753                .data_files
1754                .iter()
1755                .map(serde_json::to_value)
1756                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1757                .context("Can't serialize data files to json")?,
1758        );
1759        let json_value = serde_json::Value::Object(
1760            vec![
1761                (
1762                    SCHEMA_ID.to_owned(),
1763                    serde_json::Value::Number(value.schema_id.into()),
1764                ),
1765                (
1766                    PARTITION_SPEC_ID.to_owned(),
1767                    serde_json::Value::Number(value.partition_spec_id.into()),
1768                ),
1769                (DATA_FILES.to_owned(), json_data_files),
1770            ]
1771            .into_iter()
1772            .collect(),
1773        );
1774        Ok(SinkMetadata {
1775            metadata: Some(Serialized(SerializedMetadata {
1776                metadata: serde_json::to_vec(&json_value)
1777                    .context("Can't serialize iceberg sink metadata")?,
1778            })),
1779        })
1780    }
1781}
1782
1783impl TryFrom<IcebergCommitResult> for Vec<u8> {
1784    type Error = SinkError;
1785
1786    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1787        let json_data_files = serde_json::Value::Array(
1788            value
1789                .data_files
1790                .iter()
1791                .map(serde_json::to_value)
1792                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1793                .context("Can't serialize data files to json")?,
1794        );
1795        let json_value = serde_json::Value::Object(
1796            vec![
1797                (
1798                    SCHEMA_ID.to_owned(),
1799                    serde_json::Value::Number(value.schema_id.into()),
1800                ),
1801                (
1802                    PARTITION_SPEC_ID.to_owned(),
1803                    serde_json::Value::Number(value.partition_spec_id.into()),
1804                ),
1805                (DATA_FILES.to_owned(), json_data_files),
1806            ]
1807            .into_iter()
1808            .collect(),
1809        );
1810        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1811    }
1812}
1813pub struct IcebergSinkCommitter {
1814    catalog: Arc<dyn Catalog>,
1815    table: Table,
1816    pub last_commit_epoch: u64,
1817    pub(crate) is_exactly_once: bool,
1818    pub(crate) sink_id: SinkId,
1819    pub(crate) config: IcebergConfig,
1820    pub(crate) param: SinkParam,
1821    pub(crate) db: DatabaseConnection,
1822    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1823    commit_retry_num: u32,
1824    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1825}
1826
1827impl IcebergSinkCommitter {
1828    // Reload table and guarantee current schema_id and partition_spec_id matches
1829    // given `schema_id` and `partition_spec_id`
1830    async fn reload_table(
1831        catalog: &dyn Catalog,
1832        table_ident: &TableIdent,
1833        schema_id: i32,
1834        partition_spec_id: i32,
1835    ) -> Result<Table> {
1836        let table = catalog
1837            .load_table(table_ident)
1838            .await
1839            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1840        if table.metadata().current_schema_id() != schema_id {
1841            return Err(SinkError::Iceberg(anyhow!(
1842                "Schema evolution not supported, expect schema id {}, but got {}",
1843                schema_id,
1844                table.metadata().current_schema_id()
1845            )));
1846        }
1847        if table.metadata().default_partition_spec_id() != partition_spec_id {
1848            return Err(SinkError::Iceberg(anyhow!(
1849                "Partition evolution not supported, expect partition spec id {}, but got {}",
1850                partition_spec_id,
1851                table.metadata().default_partition_spec_id()
1852            )));
1853        }
1854        Ok(table)
1855    }
1856}
1857
1858#[async_trait::async_trait]
1859impl SinkCommitCoordinator for IcebergSinkCommitter {
1860    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1861        if self.is_exactly_once {
1862            self.committed_epoch_subscriber = Some(subscriber);
1863            tracing::info!(
1864                "Sink id = {}: iceberg sink coordinator initing.",
1865                self.param.sink_id
1866            );
1867            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id).await? {
1868                let ordered_metadata_list_by_end_epoch =
1869                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id).await?;
1870
1871                let mut last_recommit_epoch = 0;
1872                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1873                    ordered_metadata_list_by_end_epoch
1874                {
1875                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1876                    let mut write_results = vec![];
1877
1878                    for each in write_results_bytes {
1879                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1880                        write_results.push(write_result);
1881                    }
1882
1883                    match (
1884                        committed,
1885                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1886                            .await?,
1887                    ) {
1888                        (true, _) => {
1889                            tracing::info!(
1890                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1891                                self.param.sink_id
1892                            );
1893                        }
1894                        (false, true) => {
1895                            // skip
1896                            tracing::info!(
1897                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1898                                self.param.sink_id
1899                            );
1900                            mark_row_is_committed_by_sink_id_and_end_epoch(
1901                                &self.db,
1902                                self.sink_id,
1903                                end_epoch,
1904                            )
1905                            .await?;
1906                        }
1907                        (false, false) => {
1908                            tracing::info!(
1909                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1910                                self.param.sink_id
1911                            );
1912                            self.re_commit(end_epoch, write_results, snapshot_id)
1913                                .await?;
1914                        }
1915                    }
1916
1917                    last_recommit_epoch = end_epoch;
1918                }
1919                tracing::info!(
1920                    "Sink id = {}: iceberg commit coordinator inited.",
1921                    self.param.sink_id
1922                );
1923                return Ok(Some(last_recommit_epoch));
1924            } else {
1925                tracing::info!(
1926                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1927                    self.param.sink_id
1928                );
1929                return Ok(None);
1930            }
1931        }
1932
1933        tracing::info!("Iceberg commit coordinator inited.");
1934        return Ok(None);
1935    }
1936
1937    async fn commit(
1938        &mut self,
1939        epoch: u64,
1940        metadata: Vec<SinkMetadata>,
1941        add_columns: Option<Vec<Field>>,
1942    ) -> Result<()> {
1943        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1944        if let Some(add_columns) = add_columns {
1945            return Err(anyhow!(
1946                "Iceberg sink not support add columns, but got: {:?}",
1947                add_columns
1948            )
1949            .into());
1950        }
1951
1952        let write_results: Vec<IcebergCommitResult> = metadata
1953            .iter()
1954            .map(IcebergCommitResult::try_from)
1955            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1956
1957        // Skip if no data to commit
1958        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1959            tracing::debug!(?epoch, "no data to commit");
1960            return Ok(());
1961        }
1962
1963        // guarantee that all write results has same schema_id and partition_spec_id
1964        if write_results
1965            .iter()
1966            .any(|r| r.schema_id != write_results[0].schema_id)
1967            || write_results
1968                .iter()
1969                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1970        {
1971            return Err(SinkError::Iceberg(anyhow!(
1972                "schema_id and partition_spec_id should be the same in all write results"
1973            )));
1974        }
1975
1976        // Check snapshot limit before proceeding with commit
1977        self.wait_for_snapshot_limit().await?;
1978
1979        if self.is_exactly_once {
1980            assert!(self.committed_epoch_subscriber.is_some());
1981            match self.committed_epoch_subscriber.clone() {
1982                Some(committed_epoch_subscriber) => {
1983                    // Get the latest committed_epoch and the receiver
1984                    let (committed_epoch, mut rw_futures_utilrx) =
1985                        committed_epoch_subscriber(self.param.sink_id).await?;
1986                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1987                    if committed_epoch >= epoch {
1988                        self.commit_iceberg_inner(epoch, write_results, None)
1989                            .await?;
1990                    } else {
1991                        tracing::info!(
1992                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1993                            committed_epoch,
1994                            epoch
1995                        );
1996                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1997                            tracing::info!(
1998                                "Received next committed epoch: {}",
1999                                next_committed_epoch
2000                            );
2001                            // If next_epoch meets the condition, execute commit immediately
2002                            if next_committed_epoch >= epoch {
2003                                self.commit_iceberg_inner(epoch, write_results, None)
2004                                    .await?;
2005                                break;
2006                            }
2007                        }
2008                    }
2009                }
2010                None => unreachable!(
2011                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
2012                ),
2013            }
2014        } else {
2015            self.commit_iceberg_inner(epoch, write_results, None)
2016                .await?;
2017        }
2018
2019        Ok(())
2020    }
2021}
2022
2023/// Methods Required to Achieve Exactly Once Semantics
2024impl IcebergSinkCommitter {
2025    async fn re_commit(
2026        &mut self,
2027        epoch: u64,
2028        write_results: Vec<IcebergCommitResult>,
2029        snapshot_id: i64,
2030    ) -> Result<()> {
2031        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
2032
2033        // Skip if no data to commit
2034        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2035            tracing::debug!(?epoch, "no data to commit");
2036            return Ok(());
2037        }
2038        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
2039            .await?;
2040        Ok(())
2041    }
2042
2043    async fn commit_iceberg_inner(
2044        &mut self,
2045        epoch: u64,
2046        write_results: Vec<IcebergCommitResult>,
2047        snapshot_id: Option<i64>,
2048    ) -> Result<()> {
2049        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
2050        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
2051        // that was previously persisted in the system table to commit.
2052        let is_first_commit = snapshot_id.is_none();
2053        self.last_commit_epoch = epoch;
2054        let expect_schema_id = write_results[0].schema_id;
2055        let expect_partition_spec_id = write_results[0].partition_spec_id;
2056
2057        // Load the latest table to avoid concurrent modification with the best effort.
2058        self.table = Self::reload_table(
2059            self.catalog.as_ref(),
2060            self.table.identifier(),
2061            expect_schema_id,
2062            expect_partition_spec_id,
2063        )
2064        .await?;
2065        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2066            return Err(SinkError::Iceberg(anyhow!(
2067                "Can't find schema by id {}",
2068                expect_schema_id
2069            )));
2070        };
2071        let Some(partition_spec) = self
2072            .table
2073            .metadata()
2074            .partition_spec_by_id(expect_partition_spec_id)
2075        else {
2076            return Err(SinkError::Iceberg(anyhow!(
2077                "Can't find partition spec by id {}",
2078                expect_partition_spec_id
2079            )));
2080        };
2081        let partition_type = partition_spec
2082            .as_ref()
2083            .clone()
2084            .partition_type(schema)
2085            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2086
2087        let txn = Transaction::new(&self.table);
2088        // Only generate new snapshot id when first commit.
2089        let snapshot_id = match snapshot_id {
2090            Some(previous_snapshot_id) => previous_snapshot_id,
2091            None => txn.generate_unique_snapshot_id(),
2092        };
2093        if self.is_exactly_once && is_first_commit {
2094            // persist pre commit metadata and snapshot id in system table.
2095            let mut pre_commit_metadata_bytes = Vec::new();
2096            for each_parallelism_write_result in write_results.clone() {
2097                let each_parallelism_write_result_bytes: Vec<u8> =
2098                    each_parallelism_write_result.try_into()?;
2099                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
2100            }
2101
2102            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
2103
2104            persist_pre_commit_metadata(
2105                self.sink_id,
2106                self.db.clone(),
2107                self.last_commit_epoch,
2108                epoch,
2109                pre_commit_metadata_bytes,
2110                snapshot_id,
2111            )
2112            .await?;
2113        }
2114
2115        let data_files = write_results
2116            .into_iter()
2117            .flat_map(|r| {
2118                r.data_files.into_iter().map(|f| {
2119                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2120                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2121                })
2122            })
2123            .collect::<Result<Vec<DataFile>>>()?;
2124        // # TODO:
2125        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2126        // because retry logic involved reapply the commit metadata.
2127        // For now, we just retry the commit operation.
2128        let retry_strategy = ExponentialBackoff::from_millis(10)
2129            .max_delay(Duration::from_secs(60))
2130            .map(jitter)
2131            .take(self.commit_retry_num as usize);
2132        let catalog = self.catalog.clone();
2133        let table_ident = self.table.identifier().clone();
2134        let table = Retry::spawn(retry_strategy, || async {
2135            let table = Self::reload_table(
2136                catalog.as_ref(),
2137                &table_ident,
2138                expect_schema_id,
2139                expect_partition_spec_id,
2140            )
2141            .await?;
2142            let txn = Transaction::new(&table);
2143            let mut append_action = txn
2144                .fast_append(Some(snapshot_id), None, vec![])
2145                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
2146                .with_to_branch(commit_branch(
2147                    self.config.r#type.as_str(),
2148                    self.config.write_mode.as_str(),
2149                ));
2150            append_action
2151                .add_data_files(data_files.clone())
2152                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2153            let tx = append_action.apply().await.map_err(|err| {
2154                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2155                SinkError::Iceberg(anyhow!(err))
2156            })?;
2157            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2158                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2159                SinkError::Iceberg(anyhow!(err))
2160            })
2161        })
2162        .await?;
2163        self.table = table;
2164
2165        let snapshot_num = self.table.metadata().snapshots().count();
2166        let catalog_name = self.config.common.catalog_name();
2167        let table_name = self.table.identifier().to_string();
2168        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2169        GLOBAL_SINK_METRICS
2170            .iceberg_snapshot_num
2171            .with_guarded_label_values(&metrics_labels)
2172            .set(snapshot_num as i64);
2173
2174        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2175
2176        if self.is_exactly_once {
2177            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2178            tracing::info!(
2179                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
2180                self.sink_id,
2181                epoch
2182            );
2183
2184            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2185        }
2186
2187        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2188            && self.config.enable_compaction
2189            && iceberg_compact_stat_sender
2190                .send(IcebergSinkCompactionUpdate {
2191                    sink_id: self.sink_id,
2192                    compaction_interval: self.config.compaction_interval_sec(),
2193                    force_compaction: false,
2194                })
2195                .is_err()
2196        {
2197            warn!("failed to send iceberg compaction stats");
2198        }
2199
2200        Ok(())
2201    }
2202
2203    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2204    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2205    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2206    async fn is_snapshot_id_in_iceberg(
2207        &self,
2208        iceberg_config: &IcebergConfig,
2209        snapshot_id: i64,
2210    ) -> Result<bool> {
2211        let table = iceberg_config.load_table().await?;
2212        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2213            Ok(true)
2214        } else {
2215            Ok(false)
2216        }
2217    }
2218
2219    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2220    /// Returns the number of snapshots since the last rewrite/overwrite
2221    fn count_snapshots_since_rewrite(&self) -> usize {
2222        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2223        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2224
2225        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2226        let mut count = 0;
2227        for snapshot in snapshots {
2228            // Check if this snapshot represents a rewrite or overwrite operation
2229            let summary = snapshot.summary();
2230            match &summary.operation {
2231                Operation::Replace => {
2232                    // Found a rewrite/overwrite operation, stop counting
2233                    break;
2234                }
2235
2236                _ => {
2237                    // Increment count for each snapshot that is not a rewrite/overwrite
2238                    count += 1;
2239                }
2240            }
2241        }
2242
2243        count
2244    }
2245
2246    /// Wait until snapshot count since last rewrite is below the limit
2247    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2248        if !self.config.enable_compaction {
2249            return Ok(());
2250        }
2251
2252        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2253            loop {
2254                let current_count = self.count_snapshots_since_rewrite();
2255
2256                if current_count < max_snapshots {
2257                    tracing::info!(
2258                        "Snapshot count check passed: {} < {}",
2259                        current_count,
2260                        max_snapshots
2261                    );
2262                    break;
2263                }
2264
2265                tracing::info!(
2266                    "Snapshot count {} exceeds limit {}, waiting...",
2267                    current_count,
2268                    max_snapshots
2269                );
2270
2271                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2272                    && iceberg_compact_stat_sender
2273                        .send(IcebergSinkCompactionUpdate {
2274                            sink_id: self.sink_id,
2275                            compaction_interval: self.config.compaction_interval_sec(),
2276                            force_compaction: true,
2277                        })
2278                        .is_err()
2279                {
2280                    tracing::warn!("failed to send iceberg compaction stats");
2281                }
2282
2283                // Wait for 30 seconds before checking again
2284                tokio::time::sleep(Duration::from_secs(30)).await;
2285
2286                // Reload table to get latest snapshots
2287                self.table = self.config.load_table().await?;
2288            }
2289        }
2290        Ok(())
2291    }
2292}
2293
2294const MAP_KEY: &str = "key";
2295const MAP_VALUE: &str = "value";
2296
2297fn get_fields<'a>(
2298    our_field_type: &'a risingwave_common::types::DataType,
2299    data_type: &ArrowDataType,
2300    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2301) -> Option<ArrowFields> {
2302    match data_type {
2303        ArrowDataType::Struct(fields) => {
2304            match our_field_type {
2305                risingwave_common::types::DataType::Struct(struct_fields) => {
2306                    struct_fields.iter().for_each(|(name, data_type)| {
2307                        let res = schema_fields.insert(name, data_type);
2308                        // This assert is to make sure there is no duplicate field name in the schema.
2309                        assert!(res.is_none())
2310                    });
2311                }
2312                risingwave_common::types::DataType::Map(map_fields) => {
2313                    schema_fields.insert(MAP_KEY, map_fields.key());
2314                    schema_fields.insert(MAP_VALUE, map_fields.value());
2315                }
2316                risingwave_common::types::DataType::List(list) => {
2317                    list.elem()
2318                        .as_struct()
2319                        .iter()
2320                        .for_each(|(name, data_type)| {
2321                            let res = schema_fields.insert(name, data_type);
2322                            // This assert is to make sure there is no duplicate field name in the schema.
2323                            assert!(res.is_none())
2324                        });
2325                }
2326                _ => {}
2327            };
2328            Some(fields.clone())
2329        }
2330        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2331            get_fields(our_field_type, field.data_type(), schema_fields)
2332        }
2333        _ => None, // not a supported complex type and unlikely to show up
2334    }
2335}
2336
2337fn check_compatibility(
2338    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2339    fields: &ArrowFields,
2340) -> anyhow::Result<bool> {
2341    for arrow_field in fields {
2342        let our_field_type = schema_fields
2343            .get(arrow_field.name().as_str())
2344            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2345
2346        // Iceberg source should be able to read iceberg decimal type.
2347        let converted_arrow_data_type = IcebergArrowConvert
2348            .to_arrow_field("", our_field_type)
2349            .map_err(|e| anyhow!(e))?
2350            .data_type()
2351            .clone();
2352
2353        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2354            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2355            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2356            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2357            (ArrowDataType::List(_), ArrowDataType::List(field))
2358            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2359                let mut schema_fields = HashMap::new();
2360                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2361                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2362            }
2363            // validate nested structs
2364            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2365                let mut schema_fields = HashMap::new();
2366                our_field_type
2367                    .as_struct()
2368                    .iter()
2369                    .for_each(|(name, data_type)| {
2370                        let res = schema_fields.insert(name, data_type);
2371                        // This assert is to make sure there is no duplicate field name in the schema.
2372                        assert!(res.is_none())
2373                    });
2374                check_compatibility(schema_fields, fields)?
2375            }
2376            // cases where left != right (metadata, field name mismatch)
2377            //
2378            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2379            // {"PARQUET:field_id": ".."}
2380            //
2381            // map: The standard name in arrow is "entries", "key", "value".
2382            // in iceberg-rs, it's called "key_value"
2383            (left, right) => left.equals_datatype(right),
2384        };
2385        if !compatible {
2386            bail!(
2387                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2388                arrow_field.name(),
2389                converted_arrow_data_type,
2390                arrow_field.data_type()
2391            );
2392        }
2393    }
2394    Ok(true)
2395}
2396
2397/// Try to match our schema with iceberg schema.
2398pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2399    if rw_schema.fields.len() != arrow_schema.fields().len() {
2400        bail!(
2401            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2402            rw_schema.fields.len(),
2403            arrow_schema.fields.len()
2404        );
2405    }
2406
2407    let mut schema_fields = HashMap::new();
2408    rw_schema.fields.iter().for_each(|field| {
2409        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2410        // This assert is to make sure there is no duplicate field name in the schema.
2411        assert!(res.is_none())
2412    });
2413
2414    check_compatibility(schema_fields, &arrow_schema.fields)?;
2415    Ok(())
2416}
2417
2418pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2419    serde_json::to_vec(&metadata).unwrap()
2420}
2421
2422pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2423    serde_json::from_slice(&bytes).unwrap()
2424}
2425
2426pub fn parse_partition_by_exprs(
2427    expr: String,
2428) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2429    // captures column, transform(column), transform(n,column), transform(n, column)
2430    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2431    if !re.is_match(&expr) {
2432        bail!(format!(
2433            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2434            expr
2435        ))
2436    }
2437    let caps = re.captures_iter(&expr);
2438
2439    let mut partition_columns = vec![];
2440
2441    for mat in caps {
2442        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2443            (&mat["transform"], Transform::Identity)
2444        } else {
2445            let mut func = mat["transform"].to_owned();
2446            if func == "bucket" || func == "truncate" {
2447                let n = &mat
2448                    .name("n")
2449                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2450                    .as_str();
2451                func = format!("{func}[{n}]");
2452            }
2453            (
2454                &mat["field"],
2455                Transform::from_str(&func)
2456                    .with_context(|| format!("invalid transform function {}", func))?,
2457            )
2458        };
2459        partition_columns.push((column.to_owned(), transform));
2460    }
2461    Ok(partition_columns)
2462}
2463
2464pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2465    if should_enable_iceberg_cow(sink_type, write_mode) {
2466        ICEBERG_COW_BRANCH.to_owned()
2467    } else {
2468        MAIN_BRANCH.to_owned()
2469    }
2470}
2471
2472pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2473    sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2474}
2475
2476#[cfg(test)]
2477mod test {
2478    use std::collections::BTreeMap;
2479
2480    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2481    use risingwave_common::types::{DataType, MapType, StructType};
2482
2483    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2484    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2485    use crate::sink::iceberg::{
2486        COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, ENABLE_COMPACTION,
2487        ENABLE_SNAPSHOT_EXPIRATION, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig,
2488        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2489        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2490    };
2491
2492    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2493
2494    #[test]
2495    fn test_compatible_arrow_schema() {
2496        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2497
2498        use super::*;
2499        let risingwave_schema = Schema::new(vec![
2500            Field::with_name(DataType::Int32, "a"),
2501            Field::with_name(DataType::Int32, "b"),
2502            Field::with_name(DataType::Int32, "c"),
2503        ]);
2504        let arrow_schema = ArrowSchema::new(vec![
2505            ArrowField::new("a", ArrowDataType::Int32, false),
2506            ArrowField::new("b", ArrowDataType::Int32, false),
2507            ArrowField::new("c", ArrowDataType::Int32, false),
2508        ]);
2509
2510        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2511
2512        let risingwave_schema = Schema::new(vec![
2513            Field::with_name(DataType::Int32, "d"),
2514            Field::with_name(DataType::Int32, "c"),
2515            Field::with_name(DataType::Int32, "a"),
2516            Field::with_name(DataType::Int32, "b"),
2517        ]);
2518        let arrow_schema = ArrowSchema::new(vec![
2519            ArrowField::new("a", ArrowDataType::Int32, false),
2520            ArrowField::new("b", ArrowDataType::Int32, false),
2521            ArrowField::new("d", ArrowDataType::Int32, false),
2522            ArrowField::new("c", ArrowDataType::Int32, false),
2523        ]);
2524        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2525
2526        let risingwave_schema = Schema::new(vec![
2527            Field::with_name(
2528                DataType::Struct(StructType::new(vec![
2529                    ("a1", DataType::Int32),
2530                    (
2531                        "a2",
2532                        DataType::Struct(StructType::new(vec![
2533                            ("a21", DataType::Bytea),
2534                            (
2535                                "a22",
2536                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2537                            ),
2538                        ])),
2539                    ),
2540                ])),
2541                "a",
2542            ),
2543            Field::with_name(
2544                DataType::list(DataType::Struct(StructType::new(vec![
2545                    ("b1", DataType::Int32),
2546                    ("b2", DataType::Bytea),
2547                    (
2548                        "b3",
2549                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2550                    ),
2551                ]))),
2552                "b",
2553            ),
2554            Field::with_name(
2555                DataType::Map(MapType::from_kv(
2556                    DataType::Varchar,
2557                    DataType::list(DataType::Struct(StructType::new([
2558                        ("c1", DataType::Int32),
2559                        ("c2", DataType::Bytea),
2560                        (
2561                            "c3",
2562                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2563                        ),
2564                    ]))),
2565                )),
2566                "c",
2567            ),
2568        ]);
2569        let arrow_schema = ArrowSchema::new(vec![
2570            ArrowField::new(
2571                "a",
2572                ArrowDataType::Struct(ArrowFields::from(vec![
2573                    ArrowField::new("a1", ArrowDataType::Int32, false),
2574                    ArrowField::new(
2575                        "a2",
2576                        ArrowDataType::Struct(ArrowFields::from(vec![
2577                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2578                            ArrowField::new_map(
2579                                "a22",
2580                                "entries",
2581                                ArrowFieldRef::new(ArrowField::new(
2582                                    "key",
2583                                    ArrowDataType::Utf8,
2584                                    false,
2585                                )),
2586                                ArrowFieldRef::new(ArrowField::new(
2587                                    "value",
2588                                    ArrowDataType::Utf8,
2589                                    false,
2590                                )),
2591                                false,
2592                                false,
2593                            ),
2594                        ])),
2595                        false,
2596                    ),
2597                ])),
2598                false,
2599            ),
2600            ArrowField::new(
2601                "b",
2602                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2603                    ArrowDataType::Struct(ArrowFields::from(vec![
2604                        ArrowField::new("b1", ArrowDataType::Int32, false),
2605                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2606                        ArrowField::new_map(
2607                            "b3",
2608                            "entries",
2609                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2610                            ArrowFieldRef::new(ArrowField::new(
2611                                "value",
2612                                ArrowDataType::Utf8,
2613                                false,
2614                            )),
2615                            false,
2616                            false,
2617                        ),
2618                    ])),
2619                    false,
2620                ))),
2621                false,
2622            ),
2623            ArrowField::new_map(
2624                "c",
2625                "entries",
2626                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2627                ArrowFieldRef::new(ArrowField::new(
2628                    "value",
2629                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2630                        ArrowDataType::Struct(ArrowFields::from(vec![
2631                            ArrowField::new("c1", ArrowDataType::Int32, false),
2632                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2633                            ArrowField::new_map(
2634                                "c3",
2635                                "entries",
2636                                ArrowFieldRef::new(ArrowField::new(
2637                                    "key",
2638                                    ArrowDataType::Utf8,
2639                                    false,
2640                                )),
2641                                ArrowFieldRef::new(ArrowField::new(
2642                                    "value",
2643                                    ArrowDataType::Utf8,
2644                                    false,
2645                                )),
2646                                false,
2647                                false,
2648                            ),
2649                        ])),
2650                        false,
2651                    ))),
2652                    false,
2653                )),
2654                false,
2655                false,
2656            ),
2657        ]);
2658        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2659    }
2660
2661    #[test]
2662    fn test_parse_iceberg_config() {
2663        let values = [
2664            ("connector", "iceberg"),
2665            ("type", "upsert"),
2666            ("primary_key", "v1"),
2667            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2668            ("warehouse.path", "s3://iceberg"),
2669            ("s3.endpoint", "http://127.0.0.1:9301"),
2670            ("s3.access.key", "hummockadmin"),
2671            ("s3.secret.key", "hummockadmin"),
2672            ("s3.path.style.access", "true"),
2673            ("s3.region", "us-east-1"),
2674            ("catalog.type", "jdbc"),
2675            ("catalog.name", "demo"),
2676            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2677            ("catalog.jdbc.user", "admin"),
2678            ("catalog.jdbc.password", "123456"),
2679            ("database.name", "demo_db"),
2680            ("table.name", "demo_table"),
2681            ("enable_compaction", "true"),
2682            ("compaction_interval_sec", "1800"),
2683            ("enable_snapshot_expiration", "true"),
2684        ]
2685        .into_iter()
2686        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2687        .collect();
2688
2689        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2690
2691        let expected_iceberg_config = IcebergConfig {
2692            common: IcebergCommon {
2693                warehouse_path: Some("s3://iceberg".to_owned()),
2694                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2695                s3_region: Some("us-east-1".to_owned()),
2696                s3_endpoint: Some("http://127.0.0.1:9301".to_owned()),
2697                s3_access_key: Some("hummockadmin".to_owned()),
2698                s3_secret_key: Some("hummockadmin".to_owned()),
2699                s3_iam_role_arn: None,
2700                gcs_credential: None,
2701                catalog_type: Some("jdbc".to_owned()),
2702                glue_id: None,
2703                glue_region: None,
2704                glue_access_key: None,
2705                glue_secret_key: None,
2706                glue_iam_role_arn: None,
2707                catalog_name: Some("demo".to_owned()),
2708                s3_path_style_access: Some(true),
2709                catalog_credential: None,
2710                catalog_oauth2_server_uri: None,
2711                catalog_scope: None,
2712                catalog_token: None,
2713                enable_config_load: None,
2714                rest_signing_name: None,
2715                rest_signing_region: None,
2716                rest_sigv4_enabled: None,
2717                hosted_catalog: None,
2718                azblob_account_name: None,
2719                azblob_account_key: None,
2720                azblob_endpoint_url: None,
2721                catalog_header: None,
2722                adlsgen2_account_name: None,
2723                adlsgen2_account_key: None,
2724                adlsgen2_endpoint: None,
2725                vended_credentials: None,
2726            },
2727            table: IcebergTableIdentifier {
2728                database_name: Some("demo_db".to_owned()),
2729                table_name: "demo_table".to_owned(),
2730            },
2731            r#type: "upsert".to_owned(),
2732            force_append_only: false,
2733            primary_key: Some(vec!["v1".to_owned()]),
2734            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2735            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2736                .into_iter()
2737                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2738                .collect(),
2739            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2740            create_table_if_not_exists: false,
2741            is_exactly_once: Some(true),
2742            commit_retry_num: 8,
2743            enable_compaction: true,
2744            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2745            enable_snapshot_expiration: true,
2746            write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2747            snapshot_expiration_max_age_millis: None,
2748            snapshot_expiration_retain_last: None,
2749            snapshot_expiration_clear_expired_files: true,
2750            snapshot_expiration_clear_expired_meta_data: true,
2751            max_snapshots_num_before_compaction: None,
2752            small_files_threshold_mb: None,
2753            delete_files_count_threshold: None,
2754            trigger_snapshot_count: None,
2755            target_file_size_mb: None,
2756            compaction_type: None,
2757        };
2758
2759        assert_eq!(iceberg_config, expected_iceberg_config);
2760
2761        assert_eq!(
2762            &iceberg_config.full_table_name().unwrap().to_string(),
2763            "demo_db.demo_table"
2764        );
2765    }
2766
2767    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2768        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2769
2770        let _table = iceberg_config.load_table().await.unwrap();
2771    }
2772
2773    #[tokio::test]
2774    #[ignore]
2775    async fn test_storage_catalog() {
2776        let values = [
2777            ("connector", "iceberg"),
2778            ("type", "append-only"),
2779            ("force_append_only", "true"),
2780            ("s3.endpoint", "http://127.0.0.1:9301"),
2781            ("s3.access.key", "hummockadmin"),
2782            ("s3.secret.key", "hummockadmin"),
2783            ("s3.region", "us-east-1"),
2784            ("s3.path.style.access", "true"),
2785            ("catalog.name", "demo"),
2786            ("catalog.type", "storage"),
2787            ("warehouse.path", "s3://icebergdata/demo"),
2788            ("database.name", "s1"),
2789            ("table.name", "t1"),
2790        ]
2791        .into_iter()
2792        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2793        .collect();
2794
2795        test_create_catalog(values).await;
2796    }
2797
2798    #[tokio::test]
2799    #[ignore]
2800    async fn test_rest_catalog() {
2801        let values = [
2802            ("connector", "iceberg"),
2803            ("type", "append-only"),
2804            ("force_append_only", "true"),
2805            ("s3.endpoint", "http://127.0.0.1:9301"),
2806            ("s3.access.key", "hummockadmin"),
2807            ("s3.secret.key", "hummockadmin"),
2808            ("s3.region", "us-east-1"),
2809            ("s3.path.style.access", "true"),
2810            ("catalog.name", "demo"),
2811            ("catalog.type", "rest"),
2812            ("catalog.uri", "http://192.168.167.4:8181"),
2813            ("warehouse.path", "s3://icebergdata/demo"),
2814            ("database.name", "s1"),
2815            ("table.name", "t1"),
2816        ]
2817        .into_iter()
2818        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2819        .collect();
2820
2821        test_create_catalog(values).await;
2822    }
2823
2824    #[tokio::test]
2825    #[ignore]
2826    async fn test_jdbc_catalog() {
2827        let values = [
2828            ("connector", "iceberg"),
2829            ("type", "append-only"),
2830            ("force_append_only", "true"),
2831            ("s3.endpoint", "http://127.0.0.1:9301"),
2832            ("s3.access.key", "hummockadmin"),
2833            ("s3.secret.key", "hummockadmin"),
2834            ("s3.region", "us-east-1"),
2835            ("s3.path.style.access", "true"),
2836            ("catalog.name", "demo"),
2837            ("catalog.type", "jdbc"),
2838            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2839            ("catalog.jdbc.user", "admin"),
2840            ("catalog.jdbc.password", "123456"),
2841            ("warehouse.path", "s3://icebergdata/demo"),
2842            ("database.name", "s1"),
2843            ("table.name", "t1"),
2844        ]
2845        .into_iter()
2846        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2847        .collect();
2848
2849        test_create_catalog(values).await;
2850    }
2851
2852    #[tokio::test]
2853    #[ignore]
2854    async fn test_hive_catalog() {
2855        let values = [
2856            ("connector", "iceberg"),
2857            ("type", "append-only"),
2858            ("force_append_only", "true"),
2859            ("s3.endpoint", "http://127.0.0.1:9301"),
2860            ("s3.access.key", "hummockadmin"),
2861            ("s3.secret.key", "hummockadmin"),
2862            ("s3.region", "us-east-1"),
2863            ("s3.path.style.access", "true"),
2864            ("catalog.name", "demo"),
2865            ("catalog.type", "hive"),
2866            ("catalog.uri", "thrift://localhost:9083"),
2867            ("warehouse.path", "s3://icebergdata/demo"),
2868            ("database.name", "s1"),
2869            ("table.name", "t1"),
2870        ]
2871        .into_iter()
2872        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2873        .collect();
2874
2875        test_create_catalog(values).await;
2876    }
2877
2878    #[test]
2879    fn test_config_constants_consistency() {
2880        // This test ensures our constants match the expected configuration names
2881        // If you change a constant, this test will remind you to update both places
2882        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2883        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2884        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2885        assert_eq!(WRITE_MODE, "write_mode");
2886        assert_eq!(
2887            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2888            "snapshot_expiration_retain_last"
2889        );
2890        assert_eq!(
2891            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2892            "snapshot_expiration_max_age_millis"
2893        );
2894        assert_eq!(
2895            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2896            "snapshot_expiration_clear_expired_files"
2897        );
2898        assert_eq!(
2899            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2900            "snapshot_expiration_clear_expired_meta_data"
2901        );
2902        assert_eq!(COMPACTION_MAX_SNAPSHOTS_NUM, "compaction.max_snapshots_num");
2903    }
2904
2905    #[test]
2906    fn test_parse_iceberg_compaction_config() {
2907        // Test parsing with new compaction.* prefix config names
2908        let values = [
2909            ("connector", "iceberg"),
2910            ("type", "upsert"),
2911            ("primary_key", "id"),
2912            ("warehouse.path", "s3://iceberg"),
2913            ("s3.endpoint", "http://127.0.0.1:9301"),
2914            ("s3.access.key", "test"),
2915            ("s3.secret.key", "test"),
2916            ("s3.region", "us-east-1"),
2917            ("catalog.type", "storage"),
2918            ("catalog.name", "demo"),
2919            ("database.name", "test_db"),
2920            ("table.name", "test_table"),
2921            ("enable_compaction", "true"),
2922            ("compaction.max_snapshots_num", "100"),
2923            ("compaction.small_files_threshold_mb", "512"),
2924            ("compaction.delete_files_count_threshold", "50"),
2925            ("compaction.trigger_snapshot_count", "10"),
2926            ("compaction.target_file_size_mb", "256"),
2927            ("compaction.type", "full"),
2928        ]
2929        .into_iter()
2930        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2931        .collect();
2932
2933        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2934
2935        // Verify all compaction config fields are parsed correctly
2936        assert!(iceberg_config.enable_compaction);
2937        assert_eq!(
2938            iceberg_config.max_snapshots_num_before_compaction,
2939            Some(100)
2940        );
2941        assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2942        assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2943        assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2944        assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2945        assert_eq!(iceberg_config.compaction_type, Some("full".to_owned()));
2946    }
2947}