risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30    UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43    DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59    Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103// Configuration constants
104pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112    "snapshot_expiration_clear_expired_meta_data";
113pub const MAX_SNAPSHOTS_NUM: &str = "max_snapshots_num_before_compaction";
114
115fn deserialize_and_normalize_string<'de, D>(
116    deserializer: D,
117) -> std::result::Result<String, D::Error>
118where
119    D: serde::Deserializer<'de>,
120{
121    let s = String::deserialize(deserializer)?;
122    Ok(s.to_lowercase().replace('_', "-"))
123}
124
125fn deserialize_and_normalize_optional_string<'de, D>(
126    deserializer: D,
127) -> std::result::Result<Option<String>, D::Error>
128where
129    D: serde::Deserializer<'de>,
130{
131    let opt = Option::<String>::deserialize(deserializer)?;
132    Ok(opt.map(|s| s.to_lowercase().replace('_', "-")))
133}
134
135fn default_commit_retry_num() -> u32 {
136    8
137}
138
139fn default_iceberg_write_mode() -> String {
140    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
141}
142
143fn default_true() -> bool {
144    true
145}
146
147fn default_some_true() -> Option<bool> {
148    Some(true)
149}
150
151/// Compaction type for Iceberg sink
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Default)]
153#[serde(rename_all = "kebab-case")]
154pub enum CompactionType {
155    /// Full compaction - rewrites all data files
156    #[default]
157    Full,
158    /// Small files compaction - only compact small files
159    SmallFiles,
160    /// Files with delete compaction - only compact files that have associated delete files
161    FilesWithDelete,
162}
163
164impl CompactionType {
165    pub fn as_str(&self) -> &'static str {
166        match self {
167            CompactionType::Full => "full",
168            CompactionType::SmallFiles => "small-files",
169            CompactionType::FilesWithDelete => "files-with-delete",
170        }
171    }
172}
173
174impl std::fmt::Display for CompactionType {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        write!(f, "{}", self.as_str())
177    }
178}
179
180impl FromStr for CompactionType {
181    type Err = String;
182
183    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
184        // Normalize the input string first: lowercase and replace underscores with hyphens
185        let normalized = s.to_lowercase().replace('_', "-");
186
187        match normalized.as_str() {
188            "full" => Ok(CompactionType::Full),
189            "small-files" => Ok(CompactionType::SmallFiles),
190            "files-with-delete" => Ok(CompactionType::FilesWithDelete),
191            _ => Err(format!(
192                "Unknown compaction type '{}', must be one of: 'full', 'small-files', 'files-with-delete'",
193                s // 使用原始输入字符串来显示错误,而不是规范化后的
194            )),
195        }
196    }
197}
198
199#[serde_as]
200#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
201pub struct IcebergConfig {
202    pub r#type: String, // accept "append-only" or "upsert"
203
204    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
205    pub force_append_only: bool,
206
207    #[serde(flatten)]
208    common: IcebergCommon,
209
210    #[serde(flatten)]
211    table: IcebergTableIdentifier,
212
213    #[serde(
214        rename = "primary_key",
215        default,
216        deserialize_with = "deserialize_optional_string_seq_from_string"
217    )]
218    pub primary_key: Option<Vec<String>>,
219
220    // Props for java catalog props.
221    #[serde(skip)]
222    pub java_catalog_props: HashMap<String, String>,
223
224    #[serde(default)]
225    pub partition_by: Option<String>,
226
227    /// Commit every n(>0) checkpoints, default is 60.
228    #[serde(default = "iceberg_default_commit_checkpoint_interval")]
229    #[serde_as(as = "DisplayFromStr")]
230    #[with_option(allow_alter_on_fly)]
231    pub commit_checkpoint_interval: u64,
232
233    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
234    pub create_table_if_not_exists: bool,
235
236    /// Whether it is `exactly_once`, the default is true.
237    #[serde(default = "default_some_true")]
238    #[serde_as(as = "Option<DisplayFromStr>")]
239    pub is_exactly_once: Option<bool>,
240    // Retry commit num when iceberg commit fail. default is 8.
241    // # TODO
242    // Iceberg table may store the retry commit num in table meta.
243    // We should try to find and use that as default commit retry num first.
244    #[serde(default = "default_commit_retry_num")]
245    pub commit_retry_num: u32,
246
247    /// Whether to enable iceberg compaction.
248    #[serde(
249        rename = "enable_compaction",
250        default,
251        deserialize_with = "deserialize_bool_from_string"
252    )]
253    #[with_option(allow_alter_on_fly)]
254    pub enable_compaction: bool,
255
256    /// The interval of iceberg compaction
257    #[serde(rename = "compaction_interval_sec", default)]
258    #[serde_as(as = "Option<DisplayFromStr>")]
259    #[with_option(allow_alter_on_fly)]
260    pub compaction_interval_sec: Option<u64>,
261
262    /// Whether to enable iceberg expired snapshots.
263    #[serde(
264        rename = "enable_snapshot_expiration",
265        default,
266        deserialize_with = "deserialize_bool_from_string"
267    )]
268    #[with_option(allow_alter_on_fly)]
269    pub enable_snapshot_expiration: bool,
270
271    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
272    #[serde(
273        rename = "write_mode",
274        default = "default_iceberg_write_mode",
275        deserialize_with = "deserialize_and_normalize_string"
276    )]
277    pub write_mode: String,
278
279    /// The maximum age (in milliseconds) for snapshots before they expire
280    /// For example, if set to 3600000, snapshots older than 1 hour will be expired
281    #[serde(rename = "snapshot_expiration_max_age_millis", default)]
282    #[serde_as(as = "Option<DisplayFromStr>")]
283    #[with_option(allow_alter_on_fly)]
284    pub snapshot_expiration_max_age_millis: Option<i64>,
285
286    /// The number of snapshots to retain
287    #[serde(rename = "snapshot_expiration_retain_last", default)]
288    #[serde_as(as = "Option<DisplayFromStr>")]
289    #[with_option(allow_alter_on_fly)]
290    pub snapshot_expiration_retain_last: Option<i32>,
291
292    #[serde(
293        rename = "snapshot_expiration_clear_expired_files",
294        default = "default_true",
295        deserialize_with = "deserialize_bool_from_string"
296    )]
297    #[with_option(allow_alter_on_fly)]
298    pub snapshot_expiration_clear_expired_files: bool,
299
300    #[serde(
301        rename = "snapshot_expiration_clear_expired_meta_data",
302        default = "default_true",
303        deserialize_with = "deserialize_bool_from_string"
304    )]
305    #[with_option(allow_alter_on_fly)]
306    pub snapshot_expiration_clear_expired_meta_data: bool,
307
308    /// The maximum number of snapshots allowed since the last rewrite operation
309    /// If set, sink will check snapshot count and wait if exceeded
310    #[serde(rename = "compaction.max_snapshots_num", default)]
311    #[serde_as(as = "Option<DisplayFromStr>")]
312    #[with_option(allow_alter_on_fly)]
313    pub max_snapshots_num_before_compaction: Option<usize>,
314
315    #[serde(rename = "compaction.small_files_threshold_mb", default)]
316    #[serde_as(as = "Option<DisplayFromStr>")]
317    #[with_option(allow_alter_on_fly)]
318    pub small_files_threshold_mb: Option<u64>,
319
320    #[serde(rename = "compaction.delete_files_count_threshold", default)]
321    #[serde_as(as = "Option<DisplayFromStr>")]
322    #[with_option(allow_alter_on_fly)]
323    pub delete_files_count_threshold: Option<usize>,
324
325    #[serde(rename = "compaction.trigger_snapshot_count", default)]
326    #[serde_as(as = "Option<DisplayFromStr>")]
327    #[with_option(allow_alter_on_fly)]
328    pub trigger_snapshot_count: Option<usize>,
329
330    #[serde(rename = "compaction.target_file_size_mb", default)]
331    #[serde_as(as = "Option<DisplayFromStr>")]
332    #[with_option(allow_alter_on_fly)]
333    pub target_file_size_mb: Option<u64>,
334
335    /// Compaction type: `full`, `small-files`, or `files-with-delete`
336    /// If not set, will default to `full`
337    #[serde(
338        rename = "compaction.type",
339        default,
340        deserialize_with = "deserialize_and_normalize_optional_string"
341    )]
342    #[with_option(allow_alter_on_fly)]
343    pub compaction_type: Option<String>,
344}
345
346impl EnforceSecret for IcebergConfig {
347    fn enforce_secret<'a>(
348        prop_iter: impl Iterator<Item = &'a str>,
349    ) -> crate::error::ConnectorResult<()> {
350        for prop in prop_iter {
351            IcebergCommon::enforce_one(prop)?;
352        }
353        Ok(())
354    }
355
356    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
357        IcebergCommon::enforce_one(prop)
358    }
359}
360
361impl IcebergConfig {
362    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
363        let mut config =
364            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
365                .map_err(|e| SinkError::Config(anyhow!(e)))?;
366
367        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
368            return Err(SinkError::Config(anyhow!(
369                "`{}` must be {}, or {}",
370                SINK_TYPE_OPTION,
371                SINK_TYPE_APPEND_ONLY,
372                SINK_TYPE_UPSERT
373            )));
374        }
375
376        if config.r#type == SINK_TYPE_UPSERT {
377            if let Some(primary_key) = &config.primary_key {
378                if primary_key.is_empty() {
379                    return Err(SinkError::Config(anyhow!(
380                        "`primary-key` must not be empty in {}",
381                        SINK_TYPE_UPSERT
382                    )));
383                }
384            } else {
385                return Err(SinkError::Config(anyhow!(
386                    "Must set `primary-key` in {}",
387                    SINK_TYPE_UPSERT
388                )));
389            }
390        }
391
392        // All configs start with "catalog." will be treated as java configs.
393        config.java_catalog_props = values
394            .iter()
395            .filter(|(k, _v)| {
396                k.starts_with("catalog.")
397                    && k != &"catalog.uri"
398                    && k != &"catalog.type"
399                    && k != &"catalog.name"
400                    && k != &"catalog.header"
401            })
402            .map(|(k, v)| (k[8..].to_string(), v.clone()))
403            .collect();
404
405        if config.commit_checkpoint_interval == 0 {
406            return Err(SinkError::Config(anyhow!(
407                "`commit-checkpoint-interval` must be greater than 0"
408            )));
409        }
410
411        // Validate compaction_type early
412        if let Some(ref compaction_type_str) = config.compaction_type {
413            CompactionType::from_str(compaction_type_str)
414                .map_err(|e| SinkError::Config(anyhow!(e)))?;
415        }
416
417        Ok(config)
418    }
419
420    pub fn catalog_type(&self) -> &str {
421        self.common.catalog_type()
422    }
423
424    pub async fn load_table(&self) -> Result<Table> {
425        self.common
426            .load_table(&self.table, &self.java_catalog_props)
427            .await
428            .map_err(Into::into)
429    }
430
431    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
432        self.common
433            .create_catalog(&self.java_catalog_props)
434            .await
435            .map_err(Into::into)
436    }
437
438    pub fn full_table_name(&self) -> Result<TableIdent> {
439        self.table.to_table_ident().map_err(Into::into)
440    }
441
442    pub fn catalog_name(&self) -> String {
443        self.common.catalog_name()
444    }
445
446    pub fn compaction_interval_sec(&self) -> u64 {
447        // default to 1 hour
448        self.compaction_interval_sec.unwrap_or(3600)
449    }
450
451    /// Calculate the timestamp (in milliseconds) before which snapshots should be expired
452    /// Returns `current_time_ms` - `max_age_millis`
453    pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
454        self.snapshot_expiration_max_age_millis
455            .map(|max_age_millis| current_time_ms - max_age_millis)
456    }
457
458    pub fn trigger_snapshot_count(&self) -> usize {
459        self.trigger_snapshot_count.unwrap_or(16)
460    }
461
462    pub fn small_files_threshold_mb(&self) -> u64 {
463        self.small_files_threshold_mb.unwrap_or(64)
464    }
465
466    pub fn delete_files_count_threshold(&self) -> usize {
467        self.delete_files_count_threshold.unwrap_or(256)
468    }
469
470    pub fn target_file_size_mb(&self) -> u64 {
471        self.target_file_size_mb.unwrap_or(1024)
472    }
473
474    /// Get the compaction type as an enum
475    /// This method parses the string and returns the enum value
476    pub fn compaction_type(&self) -> CompactionType {
477        self.compaction_type
478            .as_deref()
479            .and_then(|s| CompactionType::from_str(s).ok())
480            .unwrap_or_default()
481    }
482}
483
484pub struct IcebergSink {
485    pub config: IcebergConfig,
486    param: SinkParam,
487    // In upsert mode, it never be None and empty.
488    unique_column_ids: Option<Vec<usize>>,
489}
490
491impl EnforceSecret for IcebergSink {
492    fn enforce_secret<'a>(
493        prop_iter: impl Iterator<Item = &'a str>,
494    ) -> crate::error::ConnectorResult<()> {
495        for prop in prop_iter {
496            IcebergConfig::enforce_one(prop)?;
497        }
498        Ok(())
499    }
500}
501
502impl TryFrom<SinkParam> for IcebergSink {
503    type Error = SinkError;
504
505    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
506        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
507        IcebergSink::new(config, param)
508    }
509}
510
511impl Debug for IcebergSink {
512    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513        f.debug_struct("IcebergSink")
514            .field("config", &self.config)
515            .finish()
516    }
517}
518
519impl IcebergSink {
520    pub async fn create_and_validate_table(&self) -> Result<Table> {
521        if self.config.create_table_if_not_exists {
522            self.create_table_if_not_exists().await?;
523        }
524
525        let table = self
526            .config
527            .load_table()
528            .await
529            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
530
531        let sink_schema = self.param.schema();
532        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
533            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
534
535        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
536            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
537
538        Ok(table)
539    }
540
541    pub async fn create_table_if_not_exists(&self) -> Result<()> {
542        let catalog = self.config.create_catalog().await?;
543        let namespace = if let Some(database_name) = self.config.table.database_name() {
544            let namespace = NamespaceIdent::new(database_name.to_owned());
545            if !catalog
546                .namespace_exists(&namespace)
547                .await
548                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
549            {
550                catalog
551                    .create_namespace(&namespace, HashMap::default())
552                    .await
553                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
554                    .context("failed to create iceberg namespace")?;
555            }
556            namespace
557        } else {
558            bail!("database name must be set if you want to create table")
559        };
560
561        let table_id = self
562            .config
563            .full_table_name()
564            .context("Unable to parse table name")?;
565        if !catalog
566            .table_exists(&table_id)
567            .await
568            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
569        {
570            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
571            // convert risingwave schema -> arrow schema -> iceberg schema
572            let arrow_fields = self
573                .param
574                .columns
575                .iter()
576                .map(|column| {
577                    Ok(iceberg_create_table_arrow_convert
578                        .to_arrow_field(&column.name, &column.data_type)
579                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
580                        .context(format!(
581                            "failed to convert {}: {} to arrow type",
582                            &column.name, &column.data_type
583                        ))?)
584                })
585                .collect::<Result<Vec<ArrowField>>>()?;
586            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
587            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
588                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
589                .context("failed to convert arrow schema to iceberg schema")?;
590
591            let location = {
592                let mut names = namespace.clone().inner();
593                names.push(self.config.table.table_name().to_owned());
594                match &self.config.common.warehouse_path {
595                    Some(warehouse_path) => {
596                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
597                        let url = Url::parse(warehouse_path);
598                        if url.is_err() || is_s3_tables {
599                            // For rest catalog, the warehouse_path could be a warehouse name.
600                            // In this case, we should specify the location when creating a table.
601                            if self.config.common.catalog_type() == "rest"
602                                || self.config.common.catalog_type() == "rest_rust"
603                            {
604                                None
605                            } else {
606                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
607                            }
608                        } else if warehouse_path.ends_with('/') {
609                            Some(format!("{}{}", warehouse_path, names.join("/")))
610                        } else {
611                            Some(format!("{}/{}", warehouse_path, names.join("/")))
612                        }
613                    }
614                    None => None,
615                }
616            };
617
618            let partition_spec = match &self.config.partition_by {
619                Some(partition_by) => {
620                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
621                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
622                        .into_iter()
623                        .enumerate()
624                    {
625                        match iceberg_schema.field_id_by_name(&column) {
626                            Some(id) => partition_fields.push(
627                                UnboundPartitionField::builder()
628                                    .source_id(id)
629                                    .transform(transform)
630                                    .name(format!("_p_{}", column))
631                                    .field_id(i as i32)
632                                    .build(),
633                            ),
634                            None => bail!(format!(
635                                "Partition source column does not exist in schema: {}",
636                                column
637                            )),
638                        };
639                    }
640                    Some(
641                        UnboundPartitionSpec::builder()
642                            .with_spec_id(0)
643                            .add_partition_fields(partition_fields)
644                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
645                            .context("failed to add partition columns")?
646                            .build(),
647                    )
648                }
649                None => None,
650            };
651
652            let table_creation_builder = TableCreation::builder()
653                .name(self.config.table.table_name().to_owned())
654                .schema(iceberg_schema);
655
656            let table_creation = match (location, partition_spec) {
657                (Some(location), Some(partition_spec)) => table_creation_builder
658                    .location(location)
659                    .partition_spec(partition_spec)
660                    .build(),
661                (Some(location), None) => table_creation_builder.location(location).build(),
662                (None, Some(partition_spec)) => table_creation_builder
663                    .partition_spec(partition_spec)
664                    .build(),
665                (None, None) => table_creation_builder.build(),
666            };
667
668            catalog
669                .create_table(&namespace, table_creation)
670                .await
671                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
672                .context("failed to create iceberg table")?;
673        }
674        Ok(())
675    }
676
677    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
678        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
679            if let Some(pk) = &config.primary_key {
680                let mut unique_column_ids = Vec::with_capacity(pk.len());
681                for col_name in pk {
682                    let id = param
683                        .columns
684                        .iter()
685                        .find(|col| col.name.as_str() == col_name)
686                        .ok_or_else(|| {
687                            SinkError::Config(anyhow!(
688                                "Primary key column {} not found in sink schema",
689                                col_name
690                            ))
691                        })?
692                        .column_id
693                        .get_id() as usize;
694                    unique_column_ids.push(id);
695                }
696                Some(unique_column_ids)
697            } else {
698                unreachable!()
699            }
700        } else {
701            None
702        };
703        Ok(Self {
704            config,
705            param,
706            unique_column_ids,
707        })
708    }
709}
710
711impl Sink for IcebergSink {
712    type Coordinator = IcebergSinkCommitter;
713    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
714
715    const SINK_NAME: &'static str = ICEBERG_SINK;
716
717    async fn validate(&self) -> Result<()> {
718        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
719            bail!("Snowflake catalog only supports iceberg sources");
720        }
721
722        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
723            risingwave_common::license::Feature::IcebergSinkWithGlue
724                .check_available()
725                .map_err(|e| anyhow::anyhow!(e))?;
726        }
727
728        // Validate compaction type configuration
729        let compaction_type = self.config.compaction_type();
730
731        // Check COW mode constraints
732        // COW mode only supports 'full' compaction type
733        if self.config.write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
734            && compaction_type != CompactionType::Full
735        {
736            bail!(
737                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
738                compaction_type
739            );
740        }
741
742        match compaction_type {
743            CompactionType::SmallFiles => {
744                // 1. check license
745                risingwave_common::license::Feature::IcebergCompaction
746                    .check_available()
747                    .map_err(|e| anyhow::anyhow!(e))?;
748
749                // 2. check write mode
750                if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
751                    bail!(
752                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
753                        self.config.write_mode
754                    );
755                }
756
757                // 3. check conflicting parameters
758                if self.config.delete_files_count_threshold.is_some() {
759                    bail!(
760                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
761                    );
762                }
763            }
764            CompactionType::FilesWithDelete => {
765                // 1. check license
766                risingwave_common::license::Feature::IcebergCompaction
767                    .check_available()
768                    .map_err(|e| anyhow::anyhow!(e))?;
769
770                // 2. check write mode
771                if self.config.write_mode != ICEBERG_WRITE_MODE_MERGE_ON_READ {
772                    bail!(
773                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
774                        self.config.write_mode
775                    );
776                }
777
778                // 3. check conflicting parameters
779                if self.config.small_files_threshold_mb.is_some() {
780                    bail!(
781                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
782                    );
783                }
784            }
785            CompactionType::Full => {
786                // Full compaction has no special requirements
787            }
788        }
789
790        let _ = self.create_and_validate_table().await?;
791        Ok(())
792    }
793
794    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
795        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
796
797        // Validate compaction interval
798        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
799            if iceberg_config.enable_compaction && compaction_interval == 0 {
800                bail!(
801                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
802                );
803            }
804
805            // log compaction_interval
806            tracing::info!(
807                "Alter config compaction_interval set to {} seconds",
808                compaction_interval
809            );
810        }
811
812        // Validate max snapshots
813        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
814            && max_snapshots < 1
815        {
816            bail!(
817                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
818                max_snapshots
819            );
820        }
821
822        Ok(())
823    }
824
825    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
826        let table = self.create_and_validate_table().await?;
827        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
828            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
829        } else {
830            IcebergSinkWriter::new_append_only(table, &writer_param).await?
831        };
832
833        let commit_checkpoint_interval =
834            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
835                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
836            );
837        let writer = CoordinatedLogSinker::new(
838            &writer_param,
839            self.param.clone(),
840            inner,
841            commit_checkpoint_interval,
842        )
843        .await?;
844
845        Ok(writer)
846    }
847
848    fn is_coordinated_sink(&self) -> bool {
849        true
850    }
851
852    async fn new_coordinator(
853        &self,
854        db: DatabaseConnection,
855        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
856    ) -> Result<Self::Coordinator> {
857        let catalog = self.config.create_catalog().await?;
858        let table = self.create_and_validate_table().await?;
859        Ok(IcebergSinkCommitter {
860            catalog,
861            table,
862            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
863            last_commit_epoch: 0,
864            sink_id: self.param.sink_id,
865            config: self.config.clone(),
866            param: self.param.clone(),
867            db,
868            commit_retry_num: self.config.commit_retry_num,
869            committed_epoch_subscriber: None,
870            iceberg_compact_stat_sender,
871        })
872    }
873}
874
875/// None means no project.
876/// Prepare represent the extra partition column idx.
877/// Done represents the project idx vec.
878///
879/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
880/// to create the project idx vec.
881enum ProjectIdxVec {
882    None,
883    Prepare(usize),
884    Done(Vec<usize>),
885}
886
887pub struct IcebergSinkWriter {
888    writer: IcebergWriterDispatch,
889    arrow_schema: SchemaRef,
890    // See comments below
891    metrics: IcebergWriterMetrics,
892    // State of iceberg table for this writer
893    table: Table,
894    // For chunk with extra partition column, we should remove this column before write.
895    // This project index vec is used to avoid create project idx each time.
896    project_idx_vec: ProjectIdxVec,
897}
898
899#[allow(clippy::type_complexity)]
900enum IcebergWriterDispatch {
901    PartitionAppendOnly {
902        writer: Option<Box<dyn IcebergWriter>>,
903        writer_builder: MonitoredGeneralWriterBuilder<
904            FanoutPartitionWriterBuilder<
905                DataFileWriterBuilder<
906                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
907                >,
908            >,
909        >,
910    },
911    NonPartitionAppendOnly {
912        writer: Option<Box<dyn IcebergWriter>>,
913        writer_builder: MonitoredGeneralWriterBuilder<
914            DataFileWriterBuilder<
915                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
916            >,
917        >,
918    },
919    PartitionUpsert {
920        writer: Option<Box<dyn IcebergWriter>>,
921        writer_builder: MonitoredGeneralWriterBuilder<
922            FanoutPartitionWriterBuilder<
923                EqualityDeltaWriterBuilder<
924                    DataFileWriterBuilder<
925                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
926                    >,
927                    MonitoredPositionDeleteWriterBuilder<
928                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
929                    >,
930                    EqualityDeleteFileWriterBuilder<
931                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
932                    >,
933                >,
934            >,
935        >,
936        arrow_schema_with_op_column: SchemaRef,
937    },
938    NonpartitionUpsert {
939        writer: Option<Box<dyn IcebergWriter>>,
940        writer_builder: MonitoredGeneralWriterBuilder<
941            EqualityDeltaWriterBuilder<
942                DataFileWriterBuilder<
943                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
944                >,
945                MonitoredPositionDeleteWriterBuilder<
946                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
947                >,
948                EqualityDeleteFileWriterBuilder<
949                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
950                >,
951            >,
952        >,
953        arrow_schema_with_op_column: SchemaRef,
954    },
955}
956
957impl IcebergWriterDispatch {
958    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
959        match self {
960            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
961            | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
962            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
963            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
964        }
965    }
966}
967
968pub struct IcebergWriterMetrics {
969    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
970    // They are actually used in `PrometheusWriterBuilder`:
971    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
972    // We keep them here to let the guard cleans the labels from metrics registry when dropped
973    _write_qps: LabelGuardedIntCounter,
974    _write_latency: LabelGuardedHistogram,
975    write_bytes: LabelGuardedIntCounter,
976}
977
978impl IcebergSinkWriter {
979    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
980        let SinkWriterParam {
981            extra_partition_col_idx,
982            actor_id,
983            sink_id,
984            sink_name,
985            ..
986        } = writer_param;
987        let metrics_labels = [
988            &actor_id.to_string(),
989            &sink_id.to_string(),
990            sink_name.as_str(),
991        ];
992
993        // Metrics
994        let write_qps = GLOBAL_SINK_METRICS
995            .iceberg_write_qps
996            .with_guarded_label_values(&metrics_labels);
997        let write_latency = GLOBAL_SINK_METRICS
998            .iceberg_write_latency
999            .with_guarded_label_values(&metrics_labels);
1000        // # TODO
1001        // Unused. Add this metrics later.
1002        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1003            .iceberg_rolling_unflushed_data_file
1004            .with_guarded_label_values(&metrics_labels);
1005        let write_bytes = GLOBAL_SINK_METRICS
1006            .iceberg_write_bytes
1007            .with_guarded_label_values(&metrics_labels);
1008
1009        let schema = table.metadata().current_schema();
1010        let partition_spec = table.metadata().default_partition_spec();
1011
1012        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1013        let unique_uuid_suffix = Uuid::now_v7();
1014
1015        let parquet_writer_properties = WriterProperties::builder()
1016            .set_max_row_group_size(
1017                writer_param
1018                    .streaming_config
1019                    .developer
1020                    .iceberg_sink_write_parquet_max_row_group_rows,
1021            )
1022            .build();
1023
1024        let parquet_writer_builder = ParquetWriterBuilder::new(
1025            parquet_writer_properties,
1026            schema.clone(),
1027            table.file_io().clone(),
1028            DefaultLocationGenerator::new(table.metadata().clone())
1029                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1030            DefaultFileNameGenerator::new(
1031                writer_param.actor_id.to_string(),
1032                Some(unique_uuid_suffix.to_string()),
1033                iceberg::spec::DataFileFormat::Parquet,
1034            ),
1035        );
1036        let data_file_builder =
1037            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
1038        if partition_spec.fields().is_empty() {
1039            let writer_builder = MonitoredGeneralWriterBuilder::new(
1040                data_file_builder,
1041                write_qps.clone(),
1042                write_latency.clone(),
1043            );
1044            let inner_writer = Some(Box::new(
1045                writer_builder
1046                    .clone()
1047                    .build()
1048                    .await
1049                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1050            ) as Box<dyn IcebergWriter>);
1051            Ok(Self {
1052                arrow_schema: Arc::new(
1053                    schema_to_arrow_schema(table.metadata().current_schema())
1054                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1055                ),
1056                metrics: IcebergWriterMetrics {
1057                    _write_qps: write_qps,
1058                    _write_latency: write_latency,
1059                    write_bytes,
1060                },
1061                writer: IcebergWriterDispatch::NonPartitionAppendOnly {
1062                    writer: inner_writer,
1063                    writer_builder,
1064                },
1065                table,
1066                project_idx_vec: {
1067                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1068                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1069                    } else {
1070                        ProjectIdxVec::None
1071                    }
1072                },
1073            })
1074        } else {
1075            let partition_builder = MonitoredGeneralWriterBuilder::new(
1076                FanoutPartitionWriterBuilder::new(
1077                    data_file_builder,
1078                    partition_spec.clone(),
1079                    schema.clone(),
1080                )
1081                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1082                write_qps.clone(),
1083                write_latency.clone(),
1084            );
1085            let inner_writer = Some(Box::new(
1086                partition_builder
1087                    .clone()
1088                    .build()
1089                    .await
1090                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1091            ) as Box<dyn IcebergWriter>);
1092            Ok(Self {
1093                arrow_schema: Arc::new(
1094                    schema_to_arrow_schema(table.metadata().current_schema())
1095                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1096                ),
1097                metrics: IcebergWriterMetrics {
1098                    _write_qps: write_qps,
1099                    _write_latency: write_latency,
1100                    write_bytes,
1101                },
1102                writer: IcebergWriterDispatch::PartitionAppendOnly {
1103                    writer: inner_writer,
1104                    writer_builder: partition_builder,
1105                },
1106                table,
1107                project_idx_vec: {
1108                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1109                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1110                    } else {
1111                        ProjectIdxVec::None
1112                    }
1113                },
1114            })
1115        }
1116    }
1117
1118    pub async fn new_upsert(
1119        table: Table,
1120        unique_column_ids: Vec<usize>,
1121        writer_param: &SinkWriterParam,
1122    ) -> Result<Self> {
1123        let SinkWriterParam {
1124            extra_partition_col_idx,
1125            actor_id,
1126            sink_id,
1127            sink_name,
1128            ..
1129        } = writer_param;
1130        let metrics_labels = [
1131            &actor_id.to_string(),
1132            &sink_id.to_string(),
1133            sink_name.as_str(),
1134        ];
1135        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
1136
1137        // Metrics
1138        let write_qps = GLOBAL_SINK_METRICS
1139            .iceberg_write_qps
1140            .with_guarded_label_values(&metrics_labels);
1141        let write_latency = GLOBAL_SINK_METRICS
1142            .iceberg_write_latency
1143            .with_guarded_label_values(&metrics_labels);
1144        // # TODO
1145        // Unused. Add this metrics later.
1146        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
1147            .iceberg_rolling_unflushed_data_file
1148            .with_guarded_label_values(&metrics_labels);
1149        let position_delete_cache_num = GLOBAL_SINK_METRICS
1150            .iceberg_position_delete_cache_num
1151            .with_guarded_label_values(&metrics_labels);
1152        let write_bytes = GLOBAL_SINK_METRICS
1153            .iceberg_write_bytes
1154            .with_guarded_label_values(&metrics_labels);
1155
1156        // Determine the schema id and partition spec id
1157        let schema = table.metadata().current_schema();
1158        let partition_spec = table.metadata().default_partition_spec();
1159
1160        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
1161        let unique_uuid_suffix = Uuid::now_v7();
1162
1163        let parquet_writer_properties = WriterProperties::builder()
1164            .set_max_row_group_size(
1165                writer_param
1166                    .streaming_config
1167                    .developer
1168                    .iceberg_sink_write_parquet_max_row_group_rows,
1169            )
1170            .build();
1171
1172        let data_file_builder = {
1173            let parquet_writer_builder = ParquetWriterBuilder::new(
1174                parquet_writer_properties.clone(),
1175                schema.clone(),
1176                table.file_io().clone(),
1177                DefaultLocationGenerator::new(table.metadata().clone())
1178                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1179                DefaultFileNameGenerator::new(
1180                    writer_param.actor_id.to_string(),
1181                    Some(unique_uuid_suffix.to_string()),
1182                    iceberg::spec::DataFileFormat::Parquet,
1183                ),
1184            );
1185            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id())
1186        };
1187        let position_delete_builder = {
1188            let parquet_writer_builder = ParquetWriterBuilder::new(
1189                parquet_writer_properties.clone(),
1190                POSITION_DELETE_SCHEMA.clone(),
1191                table.file_io().clone(),
1192                DefaultLocationGenerator::new(table.metadata().clone())
1193                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1194                DefaultFileNameGenerator::new(
1195                    writer_param.actor_id.to_string(),
1196                    Some(format!("pos-del-{}", unique_uuid_suffix)),
1197                    iceberg::spec::DataFileFormat::Parquet,
1198                ),
1199            );
1200            MonitoredPositionDeleteWriterBuilder::new(
1201                SortPositionDeleteWriterBuilder::new(
1202                    parquet_writer_builder,
1203                    writer_param
1204                        .streaming_config
1205                        .developer
1206                        .iceberg_sink_positional_delete_cache_size,
1207                    None,
1208                    None,
1209                ),
1210                position_delete_cache_num,
1211            )
1212        };
1213        let equality_delete_builder = {
1214            let config = EqualityDeleteWriterConfig::new(
1215                unique_column_ids.clone(),
1216                table.metadata().current_schema().clone(),
1217                None,
1218                partition_spec.spec_id(),
1219            )
1220            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1221            let parquet_writer_builder = ParquetWriterBuilder::new(
1222                parquet_writer_properties.clone(),
1223                Arc::new(
1224                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
1225                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1226                ),
1227                table.file_io().clone(),
1228                DefaultLocationGenerator::new(table.metadata().clone())
1229                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1230                DefaultFileNameGenerator::new(
1231                    writer_param.actor_id.to_string(),
1232                    Some(format!("eq-del-{}", unique_uuid_suffix)),
1233                    iceberg::spec::DataFileFormat::Parquet,
1234                ),
1235            );
1236
1237            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder, config)
1238        };
1239        let delta_builder = EqualityDeltaWriterBuilder::new(
1240            data_file_builder,
1241            position_delete_builder,
1242            equality_delete_builder,
1243            unique_column_ids,
1244        );
1245        if partition_spec.fields().is_empty() {
1246            let writer_builder = MonitoredGeneralWriterBuilder::new(
1247                delta_builder,
1248                write_qps.clone(),
1249                write_latency.clone(),
1250            );
1251            let inner_writer = Some(Box::new(
1252                writer_builder
1253                    .clone()
1254                    .build()
1255                    .await
1256                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1257            ) as Box<dyn IcebergWriter>);
1258            let original_arrow_schema = Arc::new(
1259                schema_to_arrow_schema(table.metadata().current_schema())
1260                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1261            );
1262            let schema_with_extra_op_column = {
1263                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1264                new_fields.push(Arc::new(ArrowField::new(
1265                    "op".to_owned(),
1266                    ArrowDataType::Int32,
1267                    false,
1268                )));
1269                Arc::new(ArrowSchema::new(new_fields))
1270            };
1271            Ok(Self {
1272                arrow_schema: original_arrow_schema,
1273                metrics: IcebergWriterMetrics {
1274                    _write_qps: write_qps,
1275                    _write_latency: write_latency,
1276                    write_bytes,
1277                },
1278                table,
1279                writer: IcebergWriterDispatch::NonpartitionUpsert {
1280                    writer: inner_writer,
1281                    writer_builder,
1282                    arrow_schema_with_op_column: schema_with_extra_op_column,
1283                },
1284                project_idx_vec: {
1285                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1286                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1287                    } else {
1288                        ProjectIdxVec::None
1289                    }
1290                },
1291            })
1292        } else {
1293            let original_arrow_schema = Arc::new(
1294                schema_to_arrow_schema(table.metadata().current_schema())
1295                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1296            );
1297            let schema_with_extra_op_column = {
1298                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1299                new_fields.push(Arc::new(ArrowField::new(
1300                    "op".to_owned(),
1301                    ArrowDataType::Int32,
1302                    false,
1303                )));
1304                Arc::new(ArrowSchema::new(new_fields))
1305            };
1306            let partition_builder = MonitoredGeneralWriterBuilder::new(
1307                FanoutPartitionWriterBuilder::new_with_custom_schema(
1308                    delta_builder,
1309                    schema_with_extra_op_column.clone(),
1310                    partition_spec.clone(),
1311                    table.metadata().current_schema().clone(),
1312                ),
1313                write_qps.clone(),
1314                write_latency.clone(),
1315            );
1316            let inner_writer = Some(Box::new(
1317                partition_builder
1318                    .clone()
1319                    .build()
1320                    .await
1321                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1322            ) as Box<dyn IcebergWriter>);
1323            Ok(Self {
1324                arrow_schema: original_arrow_schema,
1325                metrics: IcebergWriterMetrics {
1326                    _write_qps: write_qps,
1327                    _write_latency: write_latency,
1328                    write_bytes,
1329                },
1330                table,
1331                writer: IcebergWriterDispatch::PartitionUpsert {
1332                    writer: inner_writer,
1333                    writer_builder: partition_builder,
1334                    arrow_schema_with_op_column: schema_with_extra_op_column,
1335                },
1336                project_idx_vec: {
1337                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1338                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1339                    } else {
1340                        ProjectIdxVec::None
1341                    }
1342                },
1343            })
1344        }
1345    }
1346}
1347
1348#[async_trait]
1349impl SinkWriter for IcebergSinkWriter {
1350    type CommitMetadata = Option<SinkMetadata>;
1351
1352    /// Begin a new epoch
1353    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1354        // Just skip it.
1355        Ok(())
1356    }
1357
1358    /// Write a stream chunk to sink
1359    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1360        // Try to build writer if it's None.
1361        match &mut self.writer {
1362            IcebergWriterDispatch::PartitionAppendOnly {
1363                writer,
1364                writer_builder,
1365            } => {
1366                if writer.is_none() {
1367                    *writer = Some(Box::new(
1368                        writer_builder
1369                            .clone()
1370                            .build()
1371                            .await
1372                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1373                    ));
1374                }
1375            }
1376            IcebergWriterDispatch::NonPartitionAppendOnly {
1377                writer,
1378                writer_builder,
1379            } => {
1380                if writer.is_none() {
1381                    *writer = Some(Box::new(
1382                        writer_builder
1383                            .clone()
1384                            .build()
1385                            .await
1386                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1387                    ));
1388                }
1389            }
1390            IcebergWriterDispatch::PartitionUpsert {
1391                writer,
1392                writer_builder,
1393                ..
1394            } => {
1395                if writer.is_none() {
1396                    *writer = Some(Box::new(
1397                        writer_builder
1398                            .clone()
1399                            .build()
1400                            .await
1401                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1402                    ));
1403                }
1404            }
1405            IcebergWriterDispatch::NonpartitionUpsert {
1406                writer,
1407                writer_builder,
1408                ..
1409            } => {
1410                if writer.is_none() {
1411                    *writer = Some(Box::new(
1412                        writer_builder
1413                            .clone()
1414                            .build()
1415                            .await
1416                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1417                    ));
1418                }
1419            }
1420        };
1421
1422        // Process the chunk.
1423        let (mut chunk, ops) = chunk.compact_vis().into_parts();
1424        match &self.project_idx_vec {
1425            ProjectIdxVec::None => {}
1426            ProjectIdxVec::Prepare(idx) => {
1427                if *idx >= chunk.columns().len() {
1428                    return Err(SinkError::Iceberg(anyhow!(
1429                        "invalid extra partition column index {}",
1430                        idx
1431                    )));
1432                }
1433                let project_idx_vec = (0..*idx)
1434                    .chain(*idx + 1..chunk.columns().len())
1435                    .collect_vec();
1436                chunk = chunk.project(&project_idx_vec);
1437                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1438            }
1439            ProjectIdxVec::Done(idx_vec) => {
1440                chunk = chunk.project(idx_vec);
1441            }
1442        }
1443        if ops.is_empty() {
1444            return Ok(());
1445        }
1446        let write_batch_size = chunk.estimated_heap_size();
1447        let batch = match &self.writer {
1448            IcebergWriterDispatch::PartitionAppendOnly { .. }
1449            | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1450                // separate out insert chunk
1451                let filters =
1452                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1453                chunk.set_visibility(filters);
1454                IcebergArrowConvert
1455                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
1456                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1457            }
1458            IcebergWriterDispatch::PartitionUpsert {
1459                arrow_schema_with_op_column,
1460                ..
1461            }
1462            | IcebergWriterDispatch::NonpartitionUpsert {
1463                arrow_schema_with_op_column,
1464                ..
1465            } => {
1466                let chunk = IcebergArrowConvert
1467                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1468                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1469                let ops = Arc::new(Int32Array::from(
1470                    ops.iter()
1471                        .map(|op| match op {
1472                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1473                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1474                        })
1475                        .collect_vec(),
1476                ));
1477                let mut columns = chunk.columns().to_vec();
1478                columns.push(ops);
1479                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1480                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1481            }
1482        };
1483
1484        let writer = self.writer.get_writer().unwrap();
1485        writer
1486            .write(batch)
1487            .instrument_await("iceberg_write")
1488            .await
1489            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1490        self.metrics.write_bytes.inc_by(write_batch_size as _);
1491        Ok(())
1492    }
1493
1494    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1495    /// writer should commit the current epoch.
1496    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1497        // Skip it if not checkpoint
1498        if !is_checkpoint {
1499            return Ok(None);
1500        }
1501
1502        let close_result = match &mut self.writer {
1503            IcebergWriterDispatch::PartitionAppendOnly {
1504                writer,
1505                writer_builder,
1506            } => {
1507                let close_result = match writer.take() {
1508                    Some(mut writer) => {
1509                        Some(writer.close().instrument_await("iceberg_close").await)
1510                    }
1511                    _ => None,
1512                };
1513                match writer_builder.clone().build().await {
1514                    Ok(new_writer) => {
1515                        *writer = Some(Box::new(new_writer));
1516                    }
1517                    _ => {
1518                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1519                        // here because current writer may close successfully. So we just log the error.
1520                        warn!("Failed to build new writer after close");
1521                    }
1522                }
1523                close_result
1524            }
1525            IcebergWriterDispatch::NonPartitionAppendOnly {
1526                writer,
1527                writer_builder,
1528            } => {
1529                let close_result = match writer.take() {
1530                    Some(mut writer) => {
1531                        Some(writer.close().instrument_await("iceberg_close").await)
1532                    }
1533                    _ => None,
1534                };
1535                match writer_builder.clone().build().await {
1536                    Ok(new_writer) => {
1537                        *writer = Some(Box::new(new_writer));
1538                    }
1539                    _ => {
1540                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1541                        // here because current writer may close successfully. So we just log the error.
1542                        warn!("Failed to build new writer after close");
1543                    }
1544                }
1545                close_result
1546            }
1547            IcebergWriterDispatch::PartitionUpsert {
1548                writer,
1549                writer_builder,
1550                ..
1551            } => {
1552                let close_result = match writer.take() {
1553                    Some(mut writer) => {
1554                        Some(writer.close().instrument_await("iceberg_close").await)
1555                    }
1556                    _ => None,
1557                };
1558                match writer_builder.clone().build().await {
1559                    Ok(new_writer) => {
1560                        *writer = Some(Box::new(new_writer));
1561                    }
1562                    _ => {
1563                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1564                        // here because current writer may close successfully. So we just log the error.
1565                        warn!("Failed to build new writer after close");
1566                    }
1567                }
1568                close_result
1569            }
1570            IcebergWriterDispatch::NonpartitionUpsert {
1571                writer,
1572                writer_builder,
1573                ..
1574            } => {
1575                let close_result = match writer.take() {
1576                    Some(mut writer) => {
1577                        Some(writer.close().instrument_await("iceberg_close").await)
1578                    }
1579                    _ => None,
1580                };
1581                match writer_builder.clone().build().await {
1582                    Ok(new_writer) => {
1583                        *writer = Some(Box::new(new_writer));
1584                    }
1585                    _ => {
1586                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1587                        // here because current writer may close successfully. So we just log the error.
1588                        warn!("Failed to build new writer after close");
1589                    }
1590                }
1591                close_result
1592            }
1593        };
1594
1595        match close_result {
1596            Some(Ok(result)) => {
1597                let version = self.table.metadata().format_version() as u8;
1598                let partition_type = self.table.metadata().default_partition_type();
1599                let data_files = result
1600                    .into_iter()
1601                    .map(|f| {
1602                        SerializedDataFile::try_from(f, partition_type, version == 1)
1603                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1604                    })
1605                    .collect::<Result<Vec<_>>>()?;
1606                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1607                    data_files,
1608                    schema_id: self.table.metadata().current_schema_id(),
1609                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1610                })?))
1611            }
1612            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1613            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1614        }
1615    }
1616
1617    /// Clean up
1618    async fn abort(&mut self) -> Result<()> {
1619        // TODO: abort should clean up all the data written in this epoch.
1620        Ok(())
1621    }
1622}
1623
1624const SCHEMA_ID: &str = "schema_id";
1625const PARTITION_SPEC_ID: &str = "partition_spec_id";
1626const DATA_FILES: &str = "data_files";
1627
1628#[derive(Default, Clone)]
1629struct IcebergCommitResult {
1630    schema_id: i32,
1631    partition_spec_id: i32,
1632    data_files: Vec<SerializedDataFile>,
1633}
1634
1635impl IcebergCommitResult {
1636    fn try_from(value: &SinkMetadata) -> Result<Self> {
1637        if let Some(Serialized(v)) = &value.metadata {
1638            let mut values = if let serde_json::Value::Object(v) =
1639                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1640                    .context("Can't parse iceberg sink metadata")?
1641            {
1642                v
1643            } else {
1644                bail!("iceberg sink metadata should be an object");
1645            };
1646
1647            let schema_id;
1648            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1649                schema_id = value
1650                    .as_u64()
1651                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1652            } else {
1653                bail!("iceberg sink metadata should have schema_id");
1654            }
1655
1656            let partition_spec_id;
1657            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1658                partition_spec_id = value
1659                    .as_u64()
1660                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1661            } else {
1662                bail!("iceberg sink metadata should have partition_spec_id");
1663            }
1664
1665            let data_files: Vec<SerializedDataFile>;
1666            if let serde_json::Value::Array(values) = values
1667                .remove(DATA_FILES)
1668                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1669            {
1670                data_files = values
1671                    .into_iter()
1672                    .map(from_value::<SerializedDataFile>)
1673                    .collect::<std::result::Result<_, _>>()
1674                    .unwrap();
1675            } else {
1676                bail!("iceberg sink metadata should have data_files object");
1677            }
1678
1679            Ok(Self {
1680                schema_id: schema_id as i32,
1681                partition_spec_id: partition_spec_id as i32,
1682                data_files,
1683            })
1684        } else {
1685            bail!("Can't create iceberg sink write result from empty data!")
1686        }
1687    }
1688
1689    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1690        let mut values = if let serde_json::Value::Object(value) =
1691            serde_json::from_slice::<serde_json::Value>(&value)
1692                .context("Can't parse iceberg sink metadata")?
1693        {
1694            value
1695        } else {
1696            bail!("iceberg sink metadata should be an object");
1697        };
1698
1699        let schema_id;
1700        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1701            schema_id = value
1702                .as_u64()
1703                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1704        } else {
1705            bail!("iceberg sink metadata should have schema_id");
1706        }
1707
1708        let partition_spec_id;
1709        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1710            partition_spec_id = value
1711                .as_u64()
1712                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1713        } else {
1714            bail!("iceberg sink metadata should have partition_spec_id");
1715        }
1716
1717        let data_files: Vec<SerializedDataFile>;
1718        if let serde_json::Value::Array(values) = values
1719            .remove(DATA_FILES)
1720            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1721        {
1722            data_files = values
1723                .into_iter()
1724                .map(from_value::<SerializedDataFile>)
1725                .collect::<std::result::Result<_, _>>()
1726                .unwrap();
1727        } else {
1728            bail!("iceberg sink metadata should have data_files object");
1729        }
1730
1731        Ok(Self {
1732            schema_id: schema_id as i32,
1733            partition_spec_id: partition_spec_id as i32,
1734            data_files,
1735        })
1736    }
1737}
1738
1739impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1740    type Error = SinkError;
1741
1742    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1743        let json_data_files = serde_json::Value::Array(
1744            value
1745                .data_files
1746                .iter()
1747                .map(serde_json::to_value)
1748                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1749                .context("Can't serialize data files to json")?,
1750        );
1751        let json_value = serde_json::Value::Object(
1752            vec![
1753                (
1754                    SCHEMA_ID.to_owned(),
1755                    serde_json::Value::Number(value.schema_id.into()),
1756                ),
1757                (
1758                    PARTITION_SPEC_ID.to_owned(),
1759                    serde_json::Value::Number(value.partition_spec_id.into()),
1760                ),
1761                (DATA_FILES.to_owned(), json_data_files),
1762            ]
1763            .into_iter()
1764            .collect(),
1765        );
1766        Ok(SinkMetadata {
1767            metadata: Some(Serialized(SerializedMetadata {
1768                metadata: serde_json::to_vec(&json_value)
1769                    .context("Can't serialize iceberg sink metadata")?,
1770            })),
1771        })
1772    }
1773}
1774
1775impl TryFrom<IcebergCommitResult> for Vec<u8> {
1776    type Error = SinkError;
1777
1778    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1779        let json_data_files = serde_json::Value::Array(
1780            value
1781                .data_files
1782                .iter()
1783                .map(serde_json::to_value)
1784                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1785                .context("Can't serialize data files to json")?,
1786        );
1787        let json_value = serde_json::Value::Object(
1788            vec![
1789                (
1790                    SCHEMA_ID.to_owned(),
1791                    serde_json::Value::Number(value.schema_id.into()),
1792                ),
1793                (
1794                    PARTITION_SPEC_ID.to_owned(),
1795                    serde_json::Value::Number(value.partition_spec_id.into()),
1796                ),
1797                (DATA_FILES.to_owned(), json_data_files),
1798            ]
1799            .into_iter()
1800            .collect(),
1801        );
1802        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1803    }
1804}
1805pub struct IcebergSinkCommitter {
1806    catalog: Arc<dyn Catalog>,
1807    table: Table,
1808    pub last_commit_epoch: u64,
1809    pub(crate) is_exactly_once: bool,
1810    pub(crate) sink_id: SinkId,
1811    pub(crate) config: IcebergConfig,
1812    pub(crate) param: SinkParam,
1813    pub(crate) db: DatabaseConnection,
1814    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1815    commit_retry_num: u32,
1816    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1817}
1818
1819impl IcebergSinkCommitter {
1820    // Reload table and guarantee current schema_id and partition_spec_id matches
1821    // given `schema_id` and `partition_spec_id`
1822    async fn reload_table(
1823        catalog: &dyn Catalog,
1824        table_ident: &TableIdent,
1825        schema_id: i32,
1826        partition_spec_id: i32,
1827    ) -> Result<Table> {
1828        let table = catalog
1829            .load_table(table_ident)
1830            .await
1831            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1832        if table.metadata().current_schema_id() != schema_id {
1833            return Err(SinkError::Iceberg(anyhow!(
1834                "Schema evolution not supported, expect schema id {}, but got {}",
1835                schema_id,
1836                table.metadata().current_schema_id()
1837            )));
1838        }
1839        if table.metadata().default_partition_spec_id() != partition_spec_id {
1840            return Err(SinkError::Iceberg(anyhow!(
1841                "Partition evolution not supported, expect partition spec id {}, but got {}",
1842                partition_spec_id,
1843                table.metadata().default_partition_spec_id()
1844            )));
1845        }
1846        Ok(table)
1847    }
1848}
1849
1850#[async_trait::async_trait]
1851impl SinkCommitCoordinator for IcebergSinkCommitter {
1852    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1853        if self.is_exactly_once {
1854            self.committed_epoch_subscriber = Some(subscriber);
1855            tracing::info!(
1856                "Sink id = {}: iceberg sink coordinator initing.",
1857                self.param.sink_id
1858            );
1859            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id).await? {
1860                let ordered_metadata_list_by_end_epoch =
1861                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id).await?;
1862
1863                let mut last_recommit_epoch = 0;
1864                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1865                    ordered_metadata_list_by_end_epoch
1866                {
1867                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1868                    let mut write_results = vec![];
1869
1870                    for each in write_results_bytes {
1871                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1872                        write_results.push(write_result);
1873                    }
1874
1875                    match (
1876                        committed,
1877                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1878                            .await?,
1879                    ) {
1880                        (true, _) => {
1881                            tracing::info!(
1882                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1883                                self.param.sink_id
1884                            );
1885                        }
1886                        (false, true) => {
1887                            // skip
1888                            tracing::info!(
1889                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1890                                self.param.sink_id
1891                            );
1892                            mark_row_is_committed_by_sink_id_and_end_epoch(
1893                                &self.db,
1894                                self.sink_id,
1895                                end_epoch,
1896                            )
1897                            .await?;
1898                        }
1899                        (false, false) => {
1900                            tracing::info!(
1901                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1902                                self.param.sink_id
1903                            );
1904                            self.re_commit(end_epoch, write_results, snapshot_id)
1905                                .await?;
1906                        }
1907                    }
1908
1909                    last_recommit_epoch = end_epoch;
1910                }
1911                tracing::info!(
1912                    "Sink id = {}: iceberg commit coordinator inited.",
1913                    self.param.sink_id
1914                );
1915                return Ok(Some(last_recommit_epoch));
1916            } else {
1917                tracing::info!(
1918                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1919                    self.param.sink_id
1920                );
1921                return Ok(None);
1922            }
1923        }
1924
1925        tracing::info!("Iceberg commit coordinator inited.");
1926        return Ok(None);
1927    }
1928
1929    async fn commit(
1930        &mut self,
1931        epoch: u64,
1932        metadata: Vec<SinkMetadata>,
1933        add_columns: Option<Vec<Field>>,
1934    ) -> Result<()> {
1935        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1936        if let Some(add_columns) = add_columns {
1937            return Err(anyhow!(
1938                "Iceberg sink not support add columns, but got: {:?}",
1939                add_columns
1940            )
1941            .into());
1942        }
1943
1944        let write_results: Vec<IcebergCommitResult> = metadata
1945            .iter()
1946            .map(IcebergCommitResult::try_from)
1947            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1948
1949        // Skip if no data to commit
1950        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1951            tracing::debug!(?epoch, "no data to commit");
1952            return Ok(());
1953        }
1954
1955        // guarantee that all write results has same schema_id and partition_spec_id
1956        if write_results
1957            .iter()
1958            .any(|r| r.schema_id != write_results[0].schema_id)
1959            || write_results
1960                .iter()
1961                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1962        {
1963            return Err(SinkError::Iceberg(anyhow!(
1964                "schema_id and partition_spec_id should be the same in all write results"
1965            )));
1966        }
1967
1968        // Check snapshot limit before proceeding with commit
1969        self.wait_for_snapshot_limit().await?;
1970
1971        if self.is_exactly_once {
1972            assert!(self.committed_epoch_subscriber.is_some());
1973            match self.committed_epoch_subscriber.clone() {
1974                Some(committed_epoch_subscriber) => {
1975                    // Get the latest committed_epoch and the receiver
1976                    let (committed_epoch, mut rw_futures_utilrx) =
1977                        committed_epoch_subscriber(self.param.sink_id).await?;
1978                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1979                    if committed_epoch >= epoch {
1980                        self.commit_iceberg_inner(epoch, write_results, None)
1981                            .await?;
1982                    } else {
1983                        tracing::info!(
1984                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1985                            committed_epoch,
1986                            epoch
1987                        );
1988                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1989                            tracing::info!(
1990                                "Received next committed epoch: {}",
1991                                next_committed_epoch
1992                            );
1993                            // If next_epoch meets the condition, execute commit immediately
1994                            if next_committed_epoch >= epoch {
1995                                self.commit_iceberg_inner(epoch, write_results, None)
1996                                    .await?;
1997                                break;
1998                            }
1999                        }
2000                    }
2001                }
2002                None => unreachable!(
2003                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
2004                ),
2005            }
2006        } else {
2007            self.commit_iceberg_inner(epoch, write_results, None)
2008                .await?;
2009        }
2010
2011        Ok(())
2012    }
2013}
2014
2015/// Methods Required to Achieve Exactly Once Semantics
2016impl IcebergSinkCommitter {
2017    async fn re_commit(
2018        &mut self,
2019        epoch: u64,
2020        write_results: Vec<IcebergCommitResult>,
2021        snapshot_id: i64,
2022    ) -> Result<()> {
2023        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
2024
2025        // Skip if no data to commit
2026        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
2027            tracing::debug!(?epoch, "no data to commit");
2028            return Ok(());
2029        }
2030        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
2031            .await?;
2032        Ok(())
2033    }
2034
2035    async fn commit_iceberg_inner(
2036        &mut self,
2037        epoch: u64,
2038        write_results: Vec<IcebergCommitResult>,
2039        snapshot_id: Option<i64>,
2040    ) -> Result<()> {
2041        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
2042        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
2043        // that was previously persisted in the system table to commit.
2044        let is_first_commit = snapshot_id.is_none();
2045        self.last_commit_epoch = epoch;
2046        let expect_schema_id = write_results[0].schema_id;
2047        let expect_partition_spec_id = write_results[0].partition_spec_id;
2048
2049        // Load the latest table to avoid concurrent modification with the best effort.
2050        self.table = Self::reload_table(
2051            self.catalog.as_ref(),
2052            self.table.identifier(),
2053            expect_schema_id,
2054            expect_partition_spec_id,
2055        )
2056        .await?;
2057        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
2058            return Err(SinkError::Iceberg(anyhow!(
2059                "Can't find schema by id {}",
2060                expect_schema_id
2061            )));
2062        };
2063        let Some(partition_spec) = self
2064            .table
2065            .metadata()
2066            .partition_spec_by_id(expect_partition_spec_id)
2067        else {
2068            return Err(SinkError::Iceberg(anyhow!(
2069                "Can't find partition spec by id {}",
2070                expect_partition_spec_id
2071            )));
2072        };
2073        let partition_type = partition_spec
2074            .as_ref()
2075            .clone()
2076            .partition_type(schema)
2077            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2078
2079        let txn = Transaction::new(&self.table);
2080        // Only generate new snapshot id when first commit.
2081        let snapshot_id = match snapshot_id {
2082            Some(previous_snapshot_id) => previous_snapshot_id,
2083            None => txn.generate_unique_snapshot_id(),
2084        };
2085        if self.is_exactly_once && is_first_commit {
2086            // persist pre commit metadata and snapshot id in system table.
2087            let mut pre_commit_metadata_bytes = Vec::new();
2088            for each_parallelism_write_result in write_results.clone() {
2089                let each_parallelism_write_result_bytes: Vec<u8> =
2090                    each_parallelism_write_result.try_into()?;
2091                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
2092            }
2093
2094            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
2095
2096            persist_pre_commit_metadata(
2097                self.sink_id,
2098                self.db.clone(),
2099                self.last_commit_epoch,
2100                epoch,
2101                pre_commit_metadata_bytes,
2102                snapshot_id,
2103            )
2104            .await?;
2105        }
2106
2107        let data_files = write_results
2108            .into_iter()
2109            .flat_map(|r| {
2110                r.data_files.into_iter().map(|f| {
2111                    f.try_into(expect_partition_spec_id, &partition_type, schema)
2112                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
2113                })
2114            })
2115            .collect::<Result<Vec<DataFile>>>()?;
2116        // # TODO:
2117        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
2118        // because retry logic involved reapply the commit metadata.
2119        // For now, we just retry the commit operation.
2120        let retry_strategy = ExponentialBackoff::from_millis(10)
2121            .max_delay(Duration::from_secs(60))
2122            .map(jitter)
2123            .take(self.commit_retry_num as usize);
2124        let catalog = self.catalog.clone();
2125        let table_ident = self.table.identifier().clone();
2126        let table = Retry::spawn(retry_strategy, || async {
2127            let table = Self::reload_table(
2128                catalog.as_ref(),
2129                &table_ident,
2130                expect_schema_id,
2131                expect_partition_spec_id,
2132            )
2133            .await?;
2134            let txn = Transaction::new(&table);
2135            let mut append_action = txn
2136                .fast_append(Some(snapshot_id), None, vec![])
2137                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
2138                .with_to_branch(commit_branch(
2139                    self.config.r#type.as_str(),
2140                    self.config.write_mode.as_str(),
2141                ));
2142            append_action
2143                .add_data_files(data_files.clone())
2144                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
2145            let tx = append_action.apply().await.map_err(|err| {
2146                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
2147                SinkError::Iceberg(anyhow!(err))
2148            })?;
2149            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
2150                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
2151                SinkError::Iceberg(anyhow!(err))
2152            })
2153        })
2154        .await?;
2155        self.table = table;
2156
2157        let snapshot_num = self.table.metadata().snapshots().count();
2158        let catalog_name = self.config.common.catalog_name();
2159        let table_name = self.table.identifier().to_string();
2160        let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
2161        GLOBAL_SINK_METRICS
2162            .iceberg_snapshot_num
2163            .with_guarded_label_values(&metrics_labels)
2164            .set(snapshot_num as i64);
2165
2166        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
2167
2168        if self.is_exactly_once {
2169            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2170            tracing::info!(
2171                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
2172                self.sink_id,
2173                epoch
2174            );
2175
2176            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
2177        }
2178
2179        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2180            && self.config.enable_compaction
2181            && iceberg_compact_stat_sender
2182                .send(IcebergSinkCompactionUpdate {
2183                    sink_id: self.sink_id,
2184                    compaction_interval: self.config.compaction_interval_sec(),
2185                    force_compaction: false,
2186                })
2187                .is_err()
2188        {
2189            warn!("failed to send iceberg compaction stats");
2190        }
2191
2192        Ok(())
2193    }
2194
2195    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
2196    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
2197    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
2198    async fn is_snapshot_id_in_iceberg(
2199        &self,
2200        iceberg_config: &IcebergConfig,
2201        snapshot_id: i64,
2202    ) -> Result<bool> {
2203        let table = iceberg_config.load_table().await?;
2204        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2205            Ok(true)
2206        } else {
2207            Ok(false)
2208        }
2209    }
2210
2211    /// Check if the number of snapshots since the last rewrite/overwrite operation exceeds the limit
2212    /// Returns the number of snapshots since the last rewrite/overwrite
2213    fn count_snapshots_since_rewrite(&self) -> usize {
2214        let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2215        snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2216
2217        // Iterate through snapshots in reverse order (newest first) to find the last rewrite/overwrite
2218        let mut count = 0;
2219        for snapshot in snapshots {
2220            // Check if this snapshot represents a rewrite or overwrite operation
2221            let summary = snapshot.summary();
2222            match &summary.operation {
2223                Operation::Replace => {
2224                    // Found a rewrite/overwrite operation, stop counting
2225                    break;
2226                }
2227
2228                _ => {
2229                    // Increment count for each snapshot that is not a rewrite/overwrite
2230                    count += 1;
2231                }
2232            }
2233        }
2234
2235        count
2236    }
2237
2238    /// Wait until snapshot count since last rewrite is below the limit
2239    async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2240        if !self.config.enable_compaction {
2241            return Ok(());
2242        }
2243
2244        if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2245            loop {
2246                let current_count = self.count_snapshots_since_rewrite();
2247
2248                if current_count < max_snapshots {
2249                    tracing::info!(
2250                        "Snapshot count check passed: {} < {}",
2251                        current_count,
2252                        max_snapshots
2253                    );
2254                    break;
2255                }
2256
2257                tracing::info!(
2258                    "Snapshot count {} exceeds limit {}, waiting...",
2259                    current_count,
2260                    max_snapshots
2261                );
2262
2263                if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2264                    && iceberg_compact_stat_sender
2265                        .send(IcebergSinkCompactionUpdate {
2266                            sink_id: self.sink_id,
2267                            compaction_interval: self.config.compaction_interval_sec(),
2268                            force_compaction: true,
2269                        })
2270                        .is_err()
2271                {
2272                    tracing::warn!("failed to send iceberg compaction stats");
2273                }
2274
2275                // Wait for 30 seconds before checking again
2276                tokio::time::sleep(Duration::from_secs(30)).await;
2277
2278                // Reload table to get latest snapshots
2279                self.table = self.config.load_table().await?;
2280            }
2281        }
2282        Ok(())
2283    }
2284}
2285
2286const MAP_KEY: &str = "key";
2287const MAP_VALUE: &str = "value";
2288
2289fn get_fields<'a>(
2290    our_field_type: &'a risingwave_common::types::DataType,
2291    data_type: &ArrowDataType,
2292    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2293) -> Option<ArrowFields> {
2294    match data_type {
2295        ArrowDataType::Struct(fields) => {
2296            match our_field_type {
2297                risingwave_common::types::DataType::Struct(struct_fields) => {
2298                    struct_fields.iter().for_each(|(name, data_type)| {
2299                        let res = schema_fields.insert(name, data_type);
2300                        // This assert is to make sure there is no duplicate field name in the schema.
2301                        assert!(res.is_none())
2302                    });
2303                }
2304                risingwave_common::types::DataType::Map(map_fields) => {
2305                    schema_fields.insert(MAP_KEY, map_fields.key());
2306                    schema_fields.insert(MAP_VALUE, map_fields.value());
2307                }
2308                risingwave_common::types::DataType::List(list) => {
2309                    list.elem()
2310                        .as_struct()
2311                        .iter()
2312                        .for_each(|(name, data_type)| {
2313                            let res = schema_fields.insert(name, data_type);
2314                            // This assert is to make sure there is no duplicate field name in the schema.
2315                            assert!(res.is_none())
2316                        });
2317                }
2318                _ => {}
2319            };
2320            Some(fields.clone())
2321        }
2322        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2323            get_fields(our_field_type, field.data_type(), schema_fields)
2324        }
2325        _ => None, // not a supported complex type and unlikely to show up
2326    }
2327}
2328
2329fn check_compatibility(
2330    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2331    fields: &ArrowFields,
2332) -> anyhow::Result<bool> {
2333    for arrow_field in fields {
2334        let our_field_type = schema_fields
2335            .get(arrow_field.name().as_str())
2336            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2337
2338        // Iceberg source should be able to read iceberg decimal type.
2339        let converted_arrow_data_type = IcebergArrowConvert
2340            .to_arrow_field("", our_field_type)
2341            .map_err(|e| anyhow!(e))?
2342            .data_type()
2343            .clone();
2344
2345        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2346            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2347            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2348            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2349            (ArrowDataType::List(_), ArrowDataType::List(field))
2350            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2351                let mut schema_fields = HashMap::new();
2352                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2353                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2354            }
2355            // validate nested structs
2356            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2357                let mut schema_fields = HashMap::new();
2358                our_field_type
2359                    .as_struct()
2360                    .iter()
2361                    .for_each(|(name, data_type)| {
2362                        let res = schema_fields.insert(name, data_type);
2363                        // This assert is to make sure there is no duplicate field name in the schema.
2364                        assert!(res.is_none())
2365                    });
2366                check_compatibility(schema_fields, fields)?
2367            }
2368            // cases where left != right (metadata, field name mismatch)
2369            //
2370            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2371            // {"PARQUET:field_id": ".."}
2372            //
2373            // map: The standard name in arrow is "entries", "key", "value".
2374            // in iceberg-rs, it's called "key_value"
2375            (left, right) => left.equals_datatype(right),
2376        };
2377        if !compatible {
2378            bail!(
2379                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2380                arrow_field.name(),
2381                converted_arrow_data_type,
2382                arrow_field.data_type()
2383            );
2384        }
2385    }
2386    Ok(true)
2387}
2388
2389/// Try to match our schema with iceberg schema.
2390pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2391    if rw_schema.fields.len() != arrow_schema.fields().len() {
2392        bail!(
2393            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2394            rw_schema.fields.len(),
2395            arrow_schema.fields.len()
2396        );
2397    }
2398
2399    let mut schema_fields = HashMap::new();
2400    rw_schema.fields.iter().for_each(|field| {
2401        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2402        // This assert is to make sure there is no duplicate field name in the schema.
2403        assert!(res.is_none())
2404    });
2405
2406    check_compatibility(schema_fields, &arrow_schema.fields)?;
2407    Ok(())
2408}
2409
2410pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2411    serde_json::to_vec(&metadata).unwrap()
2412}
2413
2414pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2415    serde_json::from_slice(&bytes).unwrap()
2416}
2417
2418pub fn parse_partition_by_exprs(
2419    expr: String,
2420) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2421    // captures column, transform(column), transform(n,column), transform(n, column)
2422    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2423    if !re.is_match(&expr) {
2424        bail!(format!(
2425            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2426            expr
2427        ))
2428    }
2429    let caps = re.captures_iter(&expr);
2430
2431    let mut partition_columns = vec![];
2432
2433    for mat in caps {
2434        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2435            (&mat["transform"], Transform::Identity)
2436        } else {
2437            let mut func = mat["transform"].to_owned();
2438            if func == "bucket" || func == "truncate" {
2439                let n = &mat
2440                    .name("n")
2441                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2442                    .as_str();
2443                func = format!("{func}[{n}]");
2444            }
2445            (
2446                &mat["field"],
2447                Transform::from_str(&func)
2448                    .with_context(|| format!("invalid transform function {}", func))?,
2449            )
2450        };
2451        partition_columns.push((column.to_owned(), transform));
2452    }
2453    Ok(partition_columns)
2454}
2455
2456pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2457    if should_enable_iceberg_cow(sink_type, write_mode) {
2458        ICEBERG_COW_BRANCH.to_owned()
2459    } else {
2460        MAIN_BRANCH.to_owned()
2461    }
2462}
2463
2464pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2465    sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2466}
2467
2468#[cfg(test)]
2469mod test {
2470    use std::collections::BTreeMap;
2471
2472    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2473    use risingwave_common::types::{DataType, MapType, StructType};
2474
2475    use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2476    use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2477    use crate::sink::iceberg::{
2478        COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2479        ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, MAX_SNAPSHOTS_NUM,
2480        SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2481        SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2482    };
2483
2484    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2485
2486    #[test]
2487    fn test_compatible_arrow_schema() {
2488        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2489
2490        use super::*;
2491        let risingwave_schema = Schema::new(vec![
2492            Field::with_name(DataType::Int32, "a"),
2493            Field::with_name(DataType::Int32, "b"),
2494            Field::with_name(DataType::Int32, "c"),
2495        ]);
2496        let arrow_schema = ArrowSchema::new(vec![
2497            ArrowField::new("a", ArrowDataType::Int32, false),
2498            ArrowField::new("b", ArrowDataType::Int32, false),
2499            ArrowField::new("c", ArrowDataType::Int32, false),
2500        ]);
2501
2502        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2503
2504        let risingwave_schema = Schema::new(vec![
2505            Field::with_name(DataType::Int32, "d"),
2506            Field::with_name(DataType::Int32, "c"),
2507            Field::with_name(DataType::Int32, "a"),
2508            Field::with_name(DataType::Int32, "b"),
2509        ]);
2510        let arrow_schema = ArrowSchema::new(vec![
2511            ArrowField::new("a", ArrowDataType::Int32, false),
2512            ArrowField::new("b", ArrowDataType::Int32, false),
2513            ArrowField::new("d", ArrowDataType::Int32, false),
2514            ArrowField::new("c", ArrowDataType::Int32, false),
2515        ]);
2516        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2517
2518        let risingwave_schema = Schema::new(vec![
2519            Field::with_name(
2520                DataType::Struct(StructType::new(vec![
2521                    ("a1", DataType::Int32),
2522                    (
2523                        "a2",
2524                        DataType::Struct(StructType::new(vec![
2525                            ("a21", DataType::Bytea),
2526                            (
2527                                "a22",
2528                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2529                            ),
2530                        ])),
2531                    ),
2532                ])),
2533                "a",
2534            ),
2535            Field::with_name(
2536                DataType::list(DataType::Struct(StructType::new(vec![
2537                    ("b1", DataType::Int32),
2538                    ("b2", DataType::Bytea),
2539                    (
2540                        "b3",
2541                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2542                    ),
2543                ]))),
2544                "b",
2545            ),
2546            Field::with_name(
2547                DataType::Map(MapType::from_kv(
2548                    DataType::Varchar,
2549                    DataType::list(DataType::Struct(StructType::new([
2550                        ("c1", DataType::Int32),
2551                        ("c2", DataType::Bytea),
2552                        (
2553                            "c3",
2554                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2555                        ),
2556                    ]))),
2557                )),
2558                "c",
2559            ),
2560        ]);
2561        let arrow_schema = ArrowSchema::new(vec![
2562            ArrowField::new(
2563                "a",
2564                ArrowDataType::Struct(ArrowFields::from(vec![
2565                    ArrowField::new("a1", ArrowDataType::Int32, false),
2566                    ArrowField::new(
2567                        "a2",
2568                        ArrowDataType::Struct(ArrowFields::from(vec![
2569                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2570                            ArrowField::new_map(
2571                                "a22",
2572                                "entries",
2573                                ArrowFieldRef::new(ArrowField::new(
2574                                    "key",
2575                                    ArrowDataType::Utf8,
2576                                    false,
2577                                )),
2578                                ArrowFieldRef::new(ArrowField::new(
2579                                    "value",
2580                                    ArrowDataType::Utf8,
2581                                    false,
2582                                )),
2583                                false,
2584                                false,
2585                            ),
2586                        ])),
2587                        false,
2588                    ),
2589                ])),
2590                false,
2591            ),
2592            ArrowField::new(
2593                "b",
2594                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2595                    ArrowDataType::Struct(ArrowFields::from(vec![
2596                        ArrowField::new("b1", ArrowDataType::Int32, false),
2597                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2598                        ArrowField::new_map(
2599                            "b3",
2600                            "entries",
2601                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2602                            ArrowFieldRef::new(ArrowField::new(
2603                                "value",
2604                                ArrowDataType::Utf8,
2605                                false,
2606                            )),
2607                            false,
2608                            false,
2609                        ),
2610                    ])),
2611                    false,
2612                ))),
2613                false,
2614            ),
2615            ArrowField::new_map(
2616                "c",
2617                "entries",
2618                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2619                ArrowFieldRef::new(ArrowField::new(
2620                    "value",
2621                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2622                        ArrowDataType::Struct(ArrowFields::from(vec![
2623                            ArrowField::new("c1", ArrowDataType::Int32, false),
2624                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2625                            ArrowField::new_map(
2626                                "c3",
2627                                "entries",
2628                                ArrowFieldRef::new(ArrowField::new(
2629                                    "key",
2630                                    ArrowDataType::Utf8,
2631                                    false,
2632                                )),
2633                                ArrowFieldRef::new(ArrowField::new(
2634                                    "value",
2635                                    ArrowDataType::Utf8,
2636                                    false,
2637                                )),
2638                                false,
2639                                false,
2640                            ),
2641                        ])),
2642                        false,
2643                    ))),
2644                    false,
2645                )),
2646                false,
2647                false,
2648            ),
2649        ]);
2650        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2651    }
2652
2653    #[test]
2654    fn test_parse_iceberg_config() {
2655        let values = [
2656            ("connector", "iceberg"),
2657            ("type", "upsert"),
2658            ("primary_key", "v1"),
2659            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2660            ("warehouse.path", "s3://iceberg"),
2661            ("s3.endpoint", "http://127.0.0.1:9301"),
2662            ("s3.access.key", "hummockadmin"),
2663            ("s3.secret.key", "hummockadmin"),
2664            ("s3.path.style.access", "true"),
2665            ("s3.region", "us-east-1"),
2666            ("catalog.type", "jdbc"),
2667            ("catalog.name", "demo"),
2668            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2669            ("catalog.jdbc.user", "admin"),
2670            ("catalog.jdbc.password", "123456"),
2671            ("database.name", "demo_db"),
2672            ("table.name", "demo_table"),
2673            ("enable_compaction", "true"),
2674            ("compaction_interval_sec", "1800"),
2675            ("enable_snapshot_expiration", "true"),
2676        ]
2677        .into_iter()
2678        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2679        .collect();
2680
2681        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2682
2683        let expected_iceberg_config = IcebergConfig {
2684            common: IcebergCommon {
2685                warehouse_path: Some("s3://iceberg".to_owned()),
2686                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2687                region: Some("us-east-1".to_owned()),
2688                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2689                access_key: Some("hummockadmin".to_owned()),
2690                secret_key: Some("hummockadmin".to_owned()),
2691                gcs_credential: None,
2692                catalog_type: Some("jdbc".to_owned()),
2693                glue_id: None,
2694                catalog_name: Some("demo".to_owned()),
2695                path_style_access: Some(true),
2696                credential: None,
2697                oauth2_server_uri: None,
2698                scope: None,
2699                token: None,
2700                enable_config_load: None,
2701                rest_signing_name: None,
2702                rest_signing_region: None,
2703                rest_sigv4_enabled: None,
2704                hosted_catalog: None,
2705                azblob_account_name: None,
2706                azblob_account_key: None,
2707                azblob_endpoint_url: None,
2708                header: None,
2709                adlsgen2_account_name: None,
2710                adlsgen2_account_key: None,
2711                adlsgen2_endpoint: None,
2712                vended_credentials: None,
2713            },
2714            table: IcebergTableIdentifier {
2715                database_name: Some("demo_db".to_owned()),
2716                table_name: "demo_table".to_owned(),
2717            },
2718            r#type: "upsert".to_owned(),
2719            force_append_only: false,
2720            primary_key: Some(vec!["v1".to_owned()]),
2721            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2722            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2723                .into_iter()
2724                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2725                .collect(),
2726            commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2727            create_table_if_not_exists: false,
2728            is_exactly_once: Some(true),
2729            commit_retry_num: 8,
2730            enable_compaction: true,
2731            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2732            enable_snapshot_expiration: true,
2733            write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2734            snapshot_expiration_max_age_millis: None,
2735            snapshot_expiration_retain_last: None,
2736            snapshot_expiration_clear_expired_files: true,
2737            snapshot_expiration_clear_expired_meta_data: true,
2738            max_snapshots_num_before_compaction: None,
2739            small_files_threshold_mb: None,
2740            delete_files_count_threshold: None,
2741            trigger_snapshot_count: None,
2742            target_file_size_mb: None,
2743            compaction_type: None,
2744        };
2745
2746        assert_eq!(iceberg_config, expected_iceberg_config);
2747
2748        assert_eq!(
2749            &iceberg_config.full_table_name().unwrap().to_string(),
2750            "demo_db.demo_table"
2751        );
2752    }
2753
2754    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2755        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2756
2757        let _table = iceberg_config.load_table().await.unwrap();
2758    }
2759
2760    #[tokio::test]
2761    #[ignore]
2762    async fn test_storage_catalog() {
2763        let values = [
2764            ("connector", "iceberg"),
2765            ("type", "append-only"),
2766            ("force_append_only", "true"),
2767            ("s3.endpoint", "http://127.0.0.1:9301"),
2768            ("s3.access.key", "hummockadmin"),
2769            ("s3.secret.key", "hummockadmin"),
2770            ("s3.region", "us-east-1"),
2771            ("s3.path.style.access", "true"),
2772            ("catalog.name", "demo"),
2773            ("catalog.type", "storage"),
2774            ("warehouse.path", "s3://icebergdata/demo"),
2775            ("database.name", "s1"),
2776            ("table.name", "t1"),
2777        ]
2778        .into_iter()
2779        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2780        .collect();
2781
2782        test_create_catalog(values).await;
2783    }
2784
2785    #[tokio::test]
2786    #[ignore]
2787    async fn test_rest_catalog() {
2788        let values = [
2789            ("connector", "iceberg"),
2790            ("type", "append-only"),
2791            ("force_append_only", "true"),
2792            ("s3.endpoint", "http://127.0.0.1:9301"),
2793            ("s3.access.key", "hummockadmin"),
2794            ("s3.secret.key", "hummockadmin"),
2795            ("s3.region", "us-east-1"),
2796            ("s3.path.style.access", "true"),
2797            ("catalog.name", "demo"),
2798            ("catalog.type", "rest"),
2799            ("catalog.uri", "http://192.168.167.4:8181"),
2800            ("warehouse.path", "s3://icebergdata/demo"),
2801            ("database.name", "s1"),
2802            ("table.name", "t1"),
2803        ]
2804        .into_iter()
2805        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2806        .collect();
2807
2808        test_create_catalog(values).await;
2809    }
2810
2811    #[tokio::test]
2812    #[ignore]
2813    async fn test_jdbc_catalog() {
2814        let values = [
2815            ("connector", "iceberg"),
2816            ("type", "append-only"),
2817            ("force_append_only", "true"),
2818            ("s3.endpoint", "http://127.0.0.1:9301"),
2819            ("s3.access.key", "hummockadmin"),
2820            ("s3.secret.key", "hummockadmin"),
2821            ("s3.region", "us-east-1"),
2822            ("s3.path.style.access", "true"),
2823            ("catalog.name", "demo"),
2824            ("catalog.type", "jdbc"),
2825            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2826            ("catalog.jdbc.user", "admin"),
2827            ("catalog.jdbc.password", "123456"),
2828            ("warehouse.path", "s3://icebergdata/demo"),
2829            ("database.name", "s1"),
2830            ("table.name", "t1"),
2831        ]
2832        .into_iter()
2833        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2834        .collect();
2835
2836        test_create_catalog(values).await;
2837    }
2838
2839    #[tokio::test]
2840    #[ignore]
2841    async fn test_hive_catalog() {
2842        let values = [
2843            ("connector", "iceberg"),
2844            ("type", "append-only"),
2845            ("force_append_only", "true"),
2846            ("s3.endpoint", "http://127.0.0.1:9301"),
2847            ("s3.access.key", "hummockadmin"),
2848            ("s3.secret.key", "hummockadmin"),
2849            ("s3.region", "us-east-1"),
2850            ("s3.path.style.access", "true"),
2851            ("catalog.name", "demo"),
2852            ("catalog.type", "hive"),
2853            ("catalog.uri", "thrift://localhost:9083"),
2854            ("warehouse.path", "s3://icebergdata/demo"),
2855            ("database.name", "s1"),
2856            ("table.name", "t1"),
2857        ]
2858        .into_iter()
2859        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2860        .collect();
2861
2862        test_create_catalog(values).await;
2863    }
2864
2865    #[test]
2866    fn test_config_constants_consistency() {
2867        // This test ensures our constants match the expected configuration names
2868        // If you change a constant, this test will remind you to update both places
2869        assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2870        assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2871        assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2872        assert_eq!(WRITE_MODE, "write_mode");
2873        assert_eq!(
2874            SNAPSHOT_EXPIRATION_RETAIN_LAST,
2875            "snapshot_expiration_retain_last"
2876        );
2877        assert_eq!(
2878            SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2879            "snapshot_expiration_max_age_millis"
2880        );
2881        assert_eq!(
2882            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2883            "snapshot_expiration_clear_expired_files"
2884        );
2885        assert_eq!(
2886            SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2887            "snapshot_expiration_clear_expired_meta_data"
2888        );
2889        assert_eq!(MAX_SNAPSHOTS_NUM, "max_snapshots_num_before_compaction");
2890    }
2891
2892    #[test]
2893    fn test_parse_iceberg_compaction_config() {
2894        // Test parsing with new compaction.* prefix config names
2895        let values = [
2896            ("connector", "iceberg"),
2897            ("type", "upsert"),
2898            ("primary_key", "id"),
2899            ("warehouse.path", "s3://iceberg"),
2900            ("s3.endpoint", "http://127.0.0.1:9301"),
2901            ("s3.access.key", "test"),
2902            ("s3.secret.key", "test"),
2903            ("s3.region", "us-east-1"),
2904            ("catalog.type", "storage"),
2905            ("catalog.name", "demo"),
2906            ("database.name", "test_db"),
2907            ("table.name", "test_table"),
2908            ("enable_compaction", "true"),
2909            ("compaction.max_snapshots_num", "100"),
2910            ("compaction.small_files_threshold_mb", "512"),
2911            ("compaction.delete_files_count_threshold", "50"),
2912            ("compaction.trigger_snapshot_count", "10"),
2913            ("compaction.target_file_size_mb", "256"),
2914            ("compaction.type", "full"),
2915        ]
2916        .into_iter()
2917        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2918        .collect();
2919
2920        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2921
2922        // Verify all compaction config fields are parsed correctly
2923        assert!(iceberg_config.enable_compaction);
2924        assert_eq!(
2925            iceberg_config.max_snapshots_num_before_compaction,
2926            Some(100)
2927        );
2928        assert_eq!(iceberg_config.small_files_threshold_mb, Some(512));
2929        assert_eq!(iceberg_config.delete_files_count_threshold, Some(50));
2930        assert_eq!(iceberg_config.trigger_snapshot_count, Some(10));
2931        assert_eq!(iceberg_config.target_file_size_mb, Some(256));
2932        assert_eq!(iceberg_config.compaction_type, Some("full".to_owned()));
2933    }
2934}