risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, MAIN_BRANCH, SerializedDataFile, Transform, UnboundPartitionField,
30    UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43    DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59    Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::Schema;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde_derive::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
85use super::{
86    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103pub const ENABLE_COMPACTION: &str = "enable_compaction";
104pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
105pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
106pub const WRITE_MODE: &str = "write_mode";
107
108fn default_commit_retry_num() -> u32 {
109    8
110}
111
112fn default_iceberg_write_mode() -> String {
113    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
114}
115
116#[serde_as]
117#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
118pub struct IcebergConfig {
119    pub r#type: String, // accept "append-only" or "upsert"
120
121    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
122    pub force_append_only: bool,
123
124    #[serde(flatten)]
125    common: IcebergCommon,
126
127    #[serde(
128        rename = "primary_key",
129        default,
130        deserialize_with = "deserialize_optional_string_seq_from_string"
131    )]
132    pub primary_key: Option<Vec<String>>,
133
134    // Props for java catalog props.
135    #[serde(skip)]
136    pub java_catalog_props: HashMap<String, String>,
137
138    #[serde(default)]
139    pub partition_by: Option<String>,
140
141    /// Commit every n(>0) checkpoints, default is 10.
142    #[serde(default = "default_commit_checkpoint_interval")]
143    #[serde_as(as = "DisplayFromStr")]
144    #[with_option(allow_alter_on_fly)]
145    pub commit_checkpoint_interval: u64,
146
147    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
148    pub create_table_if_not_exists: bool,
149
150    /// Whether it is `exactly_once`, the default is not.
151    #[serde(default)]
152    #[serde_as(as = "Option<DisplayFromStr>")]
153    pub is_exactly_once: Option<bool>,
154    // Retry commit num when iceberg commit fail. default is 8.
155    // # TODO
156    // Iceberg table may store the retry commit num in table meta.
157    // We should try to find and use that as default commit retry num first.
158    #[serde(default = "default_commit_retry_num")]
159    pub commit_retry_num: u32,
160
161    /// Whether to enable iceberg compaction.
162    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
163    #[with_option(allow_alter_on_fly)]
164    pub enable_compaction: bool,
165
166    /// The interval of iceberg compaction
167    #[serde(default)]
168    #[serde_as(as = "Option<DisplayFromStr>")]
169    #[with_option(allow_alter_on_fly)]
170    pub compaction_interval_sec: Option<u64>,
171
172    /// Whether to enable iceberg expired snapshots.
173    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
174    #[with_option(allow_alter_on_fly)]
175    pub enable_snapshot_expiration: bool,
176
177    /// The iceberg write mode, can be `merge-on-read` or `copy-on-write`.
178    #[serde(default = "default_iceberg_write_mode")]
179    pub write_mode: String,
180}
181
182impl EnforceSecret for IcebergConfig {
183    fn enforce_secret<'a>(
184        prop_iter: impl Iterator<Item = &'a str>,
185    ) -> crate::error::ConnectorResult<()> {
186        for prop in prop_iter {
187            IcebergCommon::enforce_one(prop)?;
188        }
189        Ok(())
190    }
191
192    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
193        IcebergCommon::enforce_one(prop)
194    }
195}
196
197impl IcebergConfig {
198    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
199        let mut config =
200            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
201                .map_err(|e| SinkError::Config(anyhow!(e)))?;
202
203        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
204            return Err(SinkError::Config(anyhow!(
205                "`{}` must be {}, or {}",
206                SINK_TYPE_OPTION,
207                SINK_TYPE_APPEND_ONLY,
208                SINK_TYPE_UPSERT
209            )));
210        }
211
212        if config.r#type == SINK_TYPE_UPSERT {
213            if let Some(primary_key) = &config.primary_key {
214                if primary_key.is_empty() {
215                    return Err(SinkError::Config(anyhow!(
216                        "`primary_key` must not be empty in {}",
217                        SINK_TYPE_UPSERT
218                    )));
219                }
220            } else {
221                return Err(SinkError::Config(anyhow!(
222                    "Must set `primary_key` in {}",
223                    SINK_TYPE_UPSERT
224                )));
225            }
226        }
227
228        // All configs start with "catalog." will be treated as java configs.
229        config.java_catalog_props = values
230            .iter()
231            .filter(|(k, _v)| {
232                k.starts_with("catalog.")
233                    && k != &"catalog.uri"
234                    && k != &"catalog.type"
235                    && k != &"catalog.name"
236                    && k != &"catalog.header"
237            })
238            .map(|(k, v)| (k[8..].to_string(), v.clone()))
239            .collect();
240
241        if config.commit_checkpoint_interval == 0 {
242            return Err(SinkError::Config(anyhow!(
243                "`commit_checkpoint_interval` must be greater than 0"
244            )));
245        }
246
247        Ok(config)
248    }
249
250    pub fn catalog_type(&self) -> &str {
251        self.common.catalog_type()
252    }
253
254    pub async fn load_table(&self) -> Result<Table> {
255        self.common
256            .load_table(&self.java_catalog_props)
257            .await
258            .map_err(Into::into)
259    }
260
261    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
262        self.common
263            .create_catalog(&self.java_catalog_props)
264            .await
265            .map_err(Into::into)
266    }
267
268    pub fn full_table_name(&self) -> Result<TableIdent> {
269        self.common.full_table_name().map_err(Into::into)
270    }
271
272    pub fn catalog_name(&self) -> String {
273        self.common.catalog_name()
274    }
275
276    pub fn compaction_interval_sec(&self) -> u64 {
277        // default to 1 hour
278        self.compaction_interval_sec.unwrap_or(3600)
279    }
280}
281
282pub struct IcebergSink {
283    config: IcebergConfig,
284    param: SinkParam,
285    // In upsert mode, it never be None and empty.
286    unique_column_ids: Option<Vec<usize>>,
287}
288
289impl EnforceSecret for IcebergSink {
290    fn enforce_secret<'a>(
291        prop_iter: impl Iterator<Item = &'a str>,
292    ) -> crate::error::ConnectorResult<()> {
293        for prop in prop_iter {
294            IcebergConfig::enforce_one(prop)?;
295        }
296        Ok(())
297    }
298}
299
300impl TryFrom<SinkParam> for IcebergSink {
301    type Error = SinkError;
302
303    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
304        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
305        IcebergSink::new(config, param)
306    }
307}
308
309impl Debug for IcebergSink {
310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311        f.debug_struct("IcebergSink")
312            .field("config", &self.config)
313            .finish()
314    }
315}
316
317impl IcebergSink {
318    async fn create_and_validate_table(&self) -> Result<Table> {
319        if self.config.create_table_if_not_exists {
320            self.create_table_if_not_exists().await?;
321        }
322
323        let table = self
324            .config
325            .load_table()
326            .await
327            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
328
329        let sink_schema = self.param.schema();
330        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
331            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
332
333        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
334            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
335
336        Ok(table)
337    }
338
339    async fn create_table_if_not_exists(&self) -> Result<()> {
340        let catalog = self.config.create_catalog().await?;
341        let namespace = if let Some(database_name) = &self.config.common.database_name {
342            let namespace = NamespaceIdent::new(database_name.clone());
343            if !catalog
344                .namespace_exists(&namespace)
345                .await
346                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
347            {
348                catalog
349                    .create_namespace(&namespace, HashMap::default())
350                    .await
351                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
352                    .context("failed to create iceberg namespace")?;
353            }
354            namespace
355        } else {
356            bail!("database name must be set if you want to create table")
357        };
358
359        let table_id = self
360            .config
361            .full_table_name()
362            .context("Unable to parse table name")?;
363        if !catalog
364            .table_exists(&table_id)
365            .await
366            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
367        {
368            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
369            // convert risingwave schema -> arrow schema -> iceberg schema
370            let arrow_fields = self
371                .param
372                .columns
373                .iter()
374                .map(|column| {
375                    Ok(iceberg_create_table_arrow_convert
376                        .to_arrow_field(&column.name, &column.data_type)
377                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
378                        .context(format!(
379                            "failed to convert {}: {} to arrow type",
380                            &column.name, &column.data_type
381                        ))?)
382                })
383                .collect::<Result<Vec<ArrowField>>>()?;
384            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
385            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
386                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
387                .context("failed to convert arrow schema to iceberg schema")?;
388
389            let location = {
390                let mut names = namespace.clone().inner();
391                names.push(self.config.common.table_name.clone());
392                match &self.config.common.warehouse_path {
393                    Some(warehouse_path) => {
394                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
395                        let url = Url::parse(warehouse_path);
396                        if url.is_err() || is_s3_tables {
397                            // For rest catalog, the warehouse_path could be a warehouse name.
398                            // In this case, we should specify the location when creating a table.
399                            if self.config.common.catalog_type() == "rest"
400                                || self.config.common.catalog_type() == "rest_rust"
401                            {
402                                None
403                            } else {
404                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
405                            }
406                        } else if warehouse_path.ends_with('/') {
407                            Some(format!("{}{}", warehouse_path, names.join("/")))
408                        } else {
409                            Some(format!("{}/{}", warehouse_path, names.join("/")))
410                        }
411                    }
412                    None => None,
413                }
414            };
415
416            let partition_spec = match &self.config.partition_by {
417                Some(partition_by) => {
418                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
419                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
420                        .into_iter()
421                        .enumerate()
422                    {
423                        match iceberg_schema.field_id_by_name(&column) {
424                            Some(id) => partition_fields.push(
425                                UnboundPartitionField::builder()
426                                    .source_id(id)
427                                    .transform(transform)
428                                    .name(format!("_p_{}", column))
429                                    .field_id(i as i32)
430                                    .build(),
431                            ),
432                            None => bail!(format!(
433                                "Partition source column does not exist in schema: {}",
434                                column
435                            )),
436                        };
437                    }
438                    Some(
439                        UnboundPartitionSpec::builder()
440                            .with_spec_id(0)
441                            .add_partition_fields(partition_fields)
442                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
443                            .context("failed to add partition columns")?
444                            .build(),
445                    )
446                }
447                None => None,
448            };
449
450            let table_creation_builder = TableCreation::builder()
451                .name(self.config.common.table_name.clone())
452                .schema(iceberg_schema);
453
454            let table_creation = match (location, partition_spec) {
455                (Some(location), Some(partition_spec)) => table_creation_builder
456                    .location(location)
457                    .partition_spec(partition_spec)
458                    .build(),
459                (Some(location), None) => table_creation_builder.location(location).build(),
460                (None, Some(partition_spec)) => table_creation_builder
461                    .partition_spec(partition_spec)
462                    .build(),
463                (None, None) => table_creation_builder.build(),
464            };
465
466            catalog
467                .create_table(&namespace, table_creation)
468                .await
469                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
470                .context("failed to create iceberg table")?;
471        }
472        Ok(())
473    }
474
475    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
476        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
477            if let Some(pk) = &config.primary_key {
478                let mut unique_column_ids = Vec::with_capacity(pk.len());
479                for col_name in pk {
480                    let id = param
481                        .columns
482                        .iter()
483                        .find(|col| col.name.as_str() == col_name)
484                        .ok_or_else(|| {
485                            SinkError::Config(anyhow!(
486                                "Primary key column {} not found in sink schema",
487                                col_name
488                            ))
489                        })?
490                        .column_id
491                        .get_id() as usize;
492                    unique_column_ids.push(id);
493                }
494                Some(unique_column_ids)
495            } else {
496                unreachable!()
497            }
498        } else {
499            None
500        };
501        Ok(Self {
502            config,
503            param,
504            unique_column_ids,
505        })
506    }
507}
508
509impl Sink for IcebergSink {
510    type Coordinator = IcebergSinkCommitter;
511    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
512
513    const SINK_NAME: &'static str = ICEBERG_SINK;
514
515    async fn validate(&self) -> Result<()> {
516        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
517            bail!("Snowflake catalog only supports iceberg sources");
518        }
519        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
520            risingwave_common::license::Feature::IcebergSinkWithGlue
521                .check_available()
522                .map_err(|e| anyhow::anyhow!(e))?;
523        }
524        if self.config.enable_compaction {
525            risingwave_common::license::Feature::IcebergCompaction
526                .check_available()
527                .map_err(|e| anyhow::anyhow!(e))?;
528        }
529
530        let _ = self.create_and_validate_table().await?;
531        Ok(())
532    }
533
534    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
535        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
536
537        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
538            if iceberg_config.enable_compaction {
539                if compaction_interval == 0 {
540                    bail!(
541                        "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
542                    );
543                }
544            } else {
545                bail!("`compaction_interval_sec` can only be set when `enable_compaction` is true");
546            }
547        }
548
549        Ok(())
550    }
551
552    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
553        let table = self.create_and_validate_table().await?;
554        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
555            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
556        } else {
557            IcebergSinkWriter::new_append_only(table, &writer_param).await?
558        };
559
560        let commit_checkpoint_interval =
561            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
562                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
563            );
564        let writer = CoordinatedLogSinker::new(
565            &writer_param,
566            self.param.clone(),
567            inner,
568            commit_checkpoint_interval,
569        )
570        .await?;
571
572        Ok(writer)
573    }
574
575    fn is_coordinated_sink(&self) -> bool {
576        true
577    }
578
579    async fn new_coordinator(
580        &self,
581        db: DatabaseConnection,
582        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
583    ) -> Result<Self::Coordinator> {
584        let catalog = self.config.create_catalog().await?;
585        let table = self.create_and_validate_table().await?;
586        Ok(IcebergSinkCommitter {
587            catalog,
588            table,
589            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
590            last_commit_epoch: 0,
591            sink_id: self.param.sink_id.sink_id(),
592            config: self.config.clone(),
593            param: self.param.clone(),
594            db,
595            commit_retry_num: self.config.commit_retry_num,
596            committed_epoch_subscriber: None,
597            iceberg_compact_stat_sender,
598        })
599    }
600}
601
602/// None means no project.
603/// Prepare represent the extra partition column idx.
604/// Done represents the project idx vec.
605///
606/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
607/// to create the project idx vec.
608enum ProjectIdxVec {
609    None,
610    Prepare(usize),
611    Done(Vec<usize>),
612}
613
614pub struct IcebergSinkWriter {
615    writer: IcebergWriterDispatch,
616    arrow_schema: SchemaRef,
617    // See comments below
618    metrics: IcebergWriterMetrics,
619    // State of iceberg table for this writer
620    table: Table,
621    // For chunk with extra partition column, we should remove this column before write.
622    // This project index vec is used to avoid create project idx each time.
623    project_idx_vec: ProjectIdxVec,
624}
625
626#[allow(clippy::type_complexity)]
627enum IcebergWriterDispatch {
628    PartitionAppendOnly {
629        writer: Option<Box<dyn IcebergWriter>>,
630        writer_builder: MonitoredGeneralWriterBuilder<
631            FanoutPartitionWriterBuilder<
632                DataFileWriterBuilder<
633                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
634                >,
635            >,
636        >,
637    },
638    NonpartitionAppendOnly {
639        writer: Option<Box<dyn IcebergWriter>>,
640        writer_builder: MonitoredGeneralWriterBuilder<
641            DataFileWriterBuilder<
642                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
643            >,
644        >,
645    },
646    PartitionUpsert {
647        writer: Option<Box<dyn IcebergWriter>>,
648        writer_builder: MonitoredGeneralWriterBuilder<
649            FanoutPartitionWriterBuilder<
650                EqualityDeltaWriterBuilder<
651                    DataFileWriterBuilder<
652                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
653                    >,
654                    MonitoredPositionDeleteWriterBuilder<
655                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
656                    >,
657                    EqualityDeleteFileWriterBuilder<
658                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
659                    >,
660                >,
661            >,
662        >,
663        arrow_schema_with_op_column: SchemaRef,
664    },
665    NonpartitionUpsert {
666        writer: Option<Box<dyn IcebergWriter>>,
667        writer_builder: MonitoredGeneralWriterBuilder<
668            EqualityDeltaWriterBuilder<
669                DataFileWriterBuilder<
670                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
671                >,
672                MonitoredPositionDeleteWriterBuilder<
673                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
674                >,
675                EqualityDeleteFileWriterBuilder<
676                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
677                >,
678            >,
679        >,
680        arrow_schema_with_op_column: SchemaRef,
681    },
682}
683
684impl IcebergWriterDispatch {
685    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
686        match self {
687            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
688            | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
689            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
690            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
691        }
692    }
693}
694
695pub struct IcebergWriterMetrics {
696    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
697    // They are actually used in `PrometheusWriterBuilder`:
698    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
699    // We keep them here to let the guard cleans the labels from metrics registry when dropped
700    _write_qps: LabelGuardedIntCounter,
701    _write_latency: LabelGuardedHistogram,
702    write_bytes: LabelGuardedIntCounter,
703}
704
705impl IcebergSinkWriter {
706    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
707        let SinkWriterParam {
708            extra_partition_col_idx,
709            actor_id,
710            sink_id,
711            sink_name,
712            ..
713        } = writer_param;
714        let metrics_labels = [
715            &actor_id.to_string(),
716            &sink_id.to_string(),
717            sink_name.as_str(),
718        ];
719
720        // Metrics
721        let write_qps = GLOBAL_SINK_METRICS
722            .iceberg_write_qps
723            .with_guarded_label_values(&metrics_labels);
724        let write_latency = GLOBAL_SINK_METRICS
725            .iceberg_write_latency
726            .with_guarded_label_values(&metrics_labels);
727        // # TODO
728        // Unused. Add this metrics later.
729        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
730            .iceberg_rolling_unflushed_data_file
731            .with_guarded_label_values(&metrics_labels);
732        let write_bytes = GLOBAL_SINK_METRICS
733            .iceberg_write_bytes
734            .with_guarded_label_values(&metrics_labels);
735
736        let schema = table.metadata().current_schema();
737        let partition_spec = table.metadata().default_partition_spec();
738
739        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
740        let unique_uuid_suffix = Uuid::now_v7();
741
742        let parquet_writer_properties = WriterProperties::builder()
743            .set_max_row_group_size(
744                writer_param
745                    .streaming_config
746                    .developer
747                    .iceberg_sink_write_parquet_max_row_group_rows,
748            )
749            .build();
750
751        let parquet_writer_builder = ParquetWriterBuilder::new(
752            parquet_writer_properties,
753            schema.clone(),
754            table.file_io().clone(),
755            DefaultLocationGenerator::new(table.metadata().clone())
756                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
757            DefaultFileNameGenerator::new(
758                writer_param.actor_id.to_string(),
759                Some(unique_uuid_suffix.to_string()),
760                iceberg::spec::DataFileFormat::Parquet,
761            ),
762        );
763        let data_file_builder =
764            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
765        if partition_spec.fields().is_empty() {
766            let writer_builder = MonitoredGeneralWriterBuilder::new(
767                data_file_builder,
768                write_qps.clone(),
769                write_latency.clone(),
770            );
771            let inner_writer = Some(Box::new(
772                writer_builder
773                    .clone()
774                    .build()
775                    .await
776                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
777            ) as Box<dyn IcebergWriter>);
778            Ok(Self {
779                arrow_schema: Arc::new(
780                    schema_to_arrow_schema(table.metadata().current_schema())
781                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
782                ),
783                metrics: IcebergWriterMetrics {
784                    _write_qps: write_qps,
785                    _write_latency: write_latency,
786                    write_bytes,
787                },
788                writer: IcebergWriterDispatch::NonpartitionAppendOnly {
789                    writer: inner_writer,
790                    writer_builder,
791                },
792                table,
793                project_idx_vec: {
794                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
795                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
796                    } else {
797                        ProjectIdxVec::None
798                    }
799                },
800            })
801        } else {
802            let partition_builder = MonitoredGeneralWriterBuilder::new(
803                FanoutPartitionWriterBuilder::new(
804                    data_file_builder,
805                    partition_spec.clone(),
806                    schema.clone(),
807                )
808                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
809                write_qps.clone(),
810                write_latency.clone(),
811            );
812            let inner_writer = Some(Box::new(
813                partition_builder
814                    .clone()
815                    .build()
816                    .await
817                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
818            ) as Box<dyn IcebergWriter>);
819            Ok(Self {
820                arrow_schema: Arc::new(
821                    schema_to_arrow_schema(table.metadata().current_schema())
822                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
823                ),
824                metrics: IcebergWriterMetrics {
825                    _write_qps: write_qps,
826                    _write_latency: write_latency,
827                    write_bytes,
828                },
829                writer: IcebergWriterDispatch::PartitionAppendOnly {
830                    writer: inner_writer,
831                    writer_builder: partition_builder,
832                },
833                table,
834                project_idx_vec: {
835                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
836                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
837                    } else {
838                        ProjectIdxVec::None
839                    }
840                },
841            })
842        }
843    }
844
845    pub async fn new_upsert(
846        table: Table,
847        unique_column_ids: Vec<usize>,
848        writer_param: &SinkWriterParam,
849    ) -> Result<Self> {
850        let SinkWriterParam {
851            extra_partition_col_idx,
852            actor_id,
853            sink_id,
854            sink_name,
855            ..
856        } = writer_param;
857        let metrics_labels = [
858            &actor_id.to_string(),
859            &sink_id.to_string(),
860            sink_name.as_str(),
861        ];
862        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
863
864        // Metrics
865        let write_qps = GLOBAL_SINK_METRICS
866            .iceberg_write_qps
867            .with_guarded_label_values(&metrics_labels);
868        let write_latency = GLOBAL_SINK_METRICS
869            .iceberg_write_latency
870            .with_guarded_label_values(&metrics_labels);
871        // # TODO
872        // Unused. Add this metrics later.
873        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
874            .iceberg_rolling_unflushed_data_file
875            .with_guarded_label_values(&metrics_labels);
876        let position_delete_cache_num = GLOBAL_SINK_METRICS
877            .iceberg_position_delete_cache_num
878            .with_guarded_label_values(&metrics_labels);
879        let write_bytes = GLOBAL_SINK_METRICS
880            .iceberg_write_bytes
881            .with_guarded_label_values(&metrics_labels);
882
883        // Determine the schema id and partition spec id
884        let schema = table.metadata().current_schema();
885        let partition_spec = table.metadata().default_partition_spec();
886
887        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
888        let unique_uuid_suffix = Uuid::now_v7();
889
890        let parquet_writer_properties = WriterProperties::builder()
891            .set_max_row_group_size(
892                writer_param
893                    .streaming_config
894                    .developer
895                    .iceberg_sink_write_parquet_max_row_group_rows,
896            )
897            .build();
898
899        let data_file_builder = {
900            let parquet_writer_builder = ParquetWriterBuilder::new(
901                parquet_writer_properties.clone(),
902                schema.clone(),
903                table.file_io().clone(),
904                DefaultLocationGenerator::new(table.metadata().clone())
905                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
906                DefaultFileNameGenerator::new(
907                    writer_param.actor_id.to_string(),
908                    Some(unique_uuid_suffix.to_string()),
909                    iceberg::spec::DataFileFormat::Parquet,
910                ),
911            );
912            DataFileWriterBuilder::new(
913                parquet_writer_builder.clone(),
914                None,
915                partition_spec.spec_id(),
916            )
917        };
918        let position_delete_builder = {
919            let parquet_writer_builder = ParquetWriterBuilder::new(
920                parquet_writer_properties.clone(),
921                POSITION_DELETE_SCHEMA.clone(),
922                table.file_io().clone(),
923                DefaultLocationGenerator::new(table.metadata().clone())
924                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
925                DefaultFileNameGenerator::new(
926                    writer_param.actor_id.to_string(),
927                    Some(format!("pos-del-{}", unique_uuid_suffix)),
928                    iceberg::spec::DataFileFormat::Parquet,
929                ),
930            );
931            MonitoredPositionDeleteWriterBuilder::new(
932                SortPositionDeleteWriterBuilder::new(
933                    parquet_writer_builder.clone(),
934                    writer_param
935                        .streaming_config
936                        .developer
937                        .iceberg_sink_positional_delete_cache_size,
938                    None,
939                    None,
940                ),
941                position_delete_cache_num,
942            )
943        };
944        let equality_delete_builder = {
945            let config = EqualityDeleteWriterConfig::new(
946                unique_column_ids.clone(),
947                table.metadata().current_schema().clone(),
948                None,
949                partition_spec.spec_id(),
950            )
951            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
952            let parquet_writer_builder = ParquetWriterBuilder::new(
953                parquet_writer_properties.clone(),
954                Arc::new(
955                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
956                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
957                ),
958                table.file_io().clone(),
959                DefaultLocationGenerator::new(table.metadata().clone())
960                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
961                DefaultFileNameGenerator::new(
962                    writer_param.actor_id.to_string(),
963                    Some(format!("eq-del-{}", unique_uuid_suffix)),
964                    iceberg::spec::DataFileFormat::Parquet,
965                ),
966            );
967
968            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
969        };
970        let delta_builder = EqualityDeltaWriterBuilder::new(
971            data_file_builder,
972            position_delete_builder,
973            equality_delete_builder,
974            unique_column_ids,
975        );
976        if partition_spec.fields().is_empty() {
977            let writer_builder = MonitoredGeneralWriterBuilder::new(
978                delta_builder,
979                write_qps.clone(),
980                write_latency.clone(),
981            );
982            let inner_writer = Some(Box::new(
983                writer_builder
984                    .clone()
985                    .build()
986                    .await
987                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
988            ) as Box<dyn IcebergWriter>);
989            let original_arrow_schema = Arc::new(
990                schema_to_arrow_schema(table.metadata().current_schema())
991                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
992            );
993            let schema_with_extra_op_column = {
994                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
995                new_fields.push(Arc::new(ArrowField::new(
996                    "op".to_owned(),
997                    ArrowDataType::Int32,
998                    false,
999                )));
1000                Arc::new(ArrowSchema::new(new_fields))
1001            };
1002            Ok(Self {
1003                arrow_schema: original_arrow_schema,
1004                metrics: IcebergWriterMetrics {
1005                    _write_qps: write_qps,
1006                    _write_latency: write_latency,
1007                    write_bytes,
1008                },
1009                table,
1010                writer: IcebergWriterDispatch::NonpartitionUpsert {
1011                    writer: inner_writer,
1012                    writer_builder,
1013                    arrow_schema_with_op_column: schema_with_extra_op_column,
1014                },
1015                project_idx_vec: {
1016                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1017                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1018                    } else {
1019                        ProjectIdxVec::None
1020                    }
1021                },
1022            })
1023        } else {
1024            let original_arrow_schema = Arc::new(
1025                schema_to_arrow_schema(table.metadata().current_schema())
1026                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1027            );
1028            let schema_with_extra_op_column = {
1029                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1030                new_fields.push(Arc::new(ArrowField::new(
1031                    "op".to_owned(),
1032                    ArrowDataType::Int32,
1033                    false,
1034                )));
1035                Arc::new(ArrowSchema::new(new_fields))
1036            };
1037            let partition_builder = MonitoredGeneralWriterBuilder::new(
1038                FanoutPartitionWriterBuilder::new_with_custom_schema(
1039                    delta_builder,
1040                    schema_with_extra_op_column.clone(),
1041                    partition_spec.clone(),
1042                    table.metadata().current_schema().clone(),
1043                ),
1044                write_qps.clone(),
1045                write_latency.clone(),
1046            );
1047            let inner_writer = Some(Box::new(
1048                partition_builder
1049                    .clone()
1050                    .build()
1051                    .await
1052                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1053            ) as Box<dyn IcebergWriter>);
1054            Ok(Self {
1055                arrow_schema: original_arrow_schema,
1056                metrics: IcebergWriterMetrics {
1057                    _write_qps: write_qps,
1058                    _write_latency: write_latency,
1059                    write_bytes,
1060                },
1061                table,
1062                writer: IcebergWriterDispatch::PartitionUpsert {
1063                    writer: inner_writer,
1064                    writer_builder: partition_builder,
1065                    arrow_schema_with_op_column: schema_with_extra_op_column,
1066                },
1067                project_idx_vec: {
1068                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1069                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1070                    } else {
1071                        ProjectIdxVec::None
1072                    }
1073                },
1074            })
1075        }
1076    }
1077}
1078
1079#[async_trait]
1080impl SinkWriter for IcebergSinkWriter {
1081    type CommitMetadata = Option<SinkMetadata>;
1082
1083    /// Begin a new epoch
1084    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1085        // Just skip it.
1086        Ok(())
1087    }
1088
1089    /// Write a stream chunk to sink
1090    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1091        // Try to build writer if it's None.
1092        match &mut self.writer {
1093            IcebergWriterDispatch::PartitionAppendOnly {
1094                writer,
1095                writer_builder,
1096            } => {
1097                if writer.is_none() {
1098                    *writer = Some(Box::new(
1099                        writer_builder
1100                            .clone()
1101                            .build()
1102                            .await
1103                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1104                    ));
1105                }
1106            }
1107            IcebergWriterDispatch::NonpartitionAppendOnly {
1108                writer,
1109                writer_builder,
1110            } => {
1111                if writer.is_none() {
1112                    *writer = Some(Box::new(
1113                        writer_builder
1114                            .clone()
1115                            .build()
1116                            .await
1117                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1118                    ));
1119                }
1120            }
1121            IcebergWriterDispatch::PartitionUpsert {
1122                writer,
1123                writer_builder,
1124                ..
1125            } => {
1126                if writer.is_none() {
1127                    *writer = Some(Box::new(
1128                        writer_builder
1129                            .clone()
1130                            .build()
1131                            .await
1132                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1133                    ));
1134                }
1135            }
1136            IcebergWriterDispatch::NonpartitionUpsert {
1137                writer,
1138                writer_builder,
1139                ..
1140            } => {
1141                if writer.is_none() {
1142                    *writer = Some(Box::new(
1143                        writer_builder
1144                            .clone()
1145                            .build()
1146                            .await
1147                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1148                    ));
1149                }
1150            }
1151        };
1152
1153        // Process the chunk.
1154        let (mut chunk, ops) = chunk.compact().into_parts();
1155        match &self.project_idx_vec {
1156            ProjectIdxVec::None => {}
1157            ProjectIdxVec::Prepare(idx) => {
1158                if *idx >= chunk.columns().len() {
1159                    return Err(SinkError::Iceberg(anyhow!(
1160                        "invalid extra partition column index {}",
1161                        idx
1162                    )));
1163                }
1164                let project_idx_vec = (0..*idx)
1165                    .chain(*idx + 1..chunk.columns().len())
1166                    .collect_vec();
1167                chunk = chunk.project(&project_idx_vec);
1168                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1169            }
1170            ProjectIdxVec::Done(idx_vec) => {
1171                chunk = chunk.project(idx_vec);
1172            }
1173        }
1174        if ops.is_empty() {
1175            return Ok(());
1176        }
1177        let write_batch_size = chunk.estimated_heap_size();
1178        let batch = match &self.writer {
1179            IcebergWriterDispatch::PartitionAppendOnly { .. }
1180            | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1181                // separate out insert chunk
1182                let filters =
1183                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1184                chunk.set_visibility(filters);
1185                IcebergArrowConvert
1186                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1187                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1188            }
1189            IcebergWriterDispatch::PartitionUpsert {
1190                arrow_schema_with_op_column,
1191                ..
1192            }
1193            | IcebergWriterDispatch::NonpartitionUpsert {
1194                arrow_schema_with_op_column,
1195                ..
1196            } => {
1197                let chunk = IcebergArrowConvert
1198                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1199                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1200                let ops = Arc::new(Int32Array::from(
1201                    ops.iter()
1202                        .map(|op| match op {
1203                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1204                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1205                        })
1206                        .collect_vec(),
1207                ));
1208                let mut columns = chunk.columns().to_vec();
1209                columns.push(ops);
1210                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1211                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1212            }
1213        };
1214
1215        let writer = self.writer.get_writer().unwrap();
1216        writer
1217            .write(batch)
1218            .instrument_await("iceberg_write")
1219            .await
1220            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1221        self.metrics.write_bytes.inc_by(write_batch_size as _);
1222        Ok(())
1223    }
1224
1225    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1226    /// writer should commit the current epoch.
1227    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1228        // Skip it if not checkpoint
1229        if !is_checkpoint {
1230            return Ok(None);
1231        }
1232
1233        let close_result = match &mut self.writer {
1234            IcebergWriterDispatch::PartitionAppendOnly {
1235                writer,
1236                writer_builder,
1237            } => {
1238                let close_result = match writer.take() {
1239                    Some(mut writer) => {
1240                        Some(writer.close().instrument_await("iceberg_close").await)
1241                    }
1242                    _ => None,
1243                };
1244                match writer_builder.clone().build().await {
1245                    Ok(new_writer) => {
1246                        *writer = Some(Box::new(new_writer));
1247                    }
1248                    _ => {
1249                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1250                        // here because current writer may close successfully. So we just log the error.
1251                        warn!("Failed to build new writer after close");
1252                    }
1253                }
1254                close_result
1255            }
1256            IcebergWriterDispatch::NonpartitionAppendOnly {
1257                writer,
1258                writer_builder,
1259            } => {
1260                let close_result = match writer.take() {
1261                    Some(mut writer) => {
1262                        Some(writer.close().instrument_await("iceberg_close").await)
1263                    }
1264                    _ => None,
1265                };
1266                match writer_builder.clone().build().await {
1267                    Ok(new_writer) => {
1268                        *writer = Some(Box::new(new_writer));
1269                    }
1270                    _ => {
1271                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1272                        // here because current writer may close successfully. So we just log the error.
1273                        warn!("Failed to build new writer after close");
1274                    }
1275                }
1276                close_result
1277            }
1278            IcebergWriterDispatch::PartitionUpsert {
1279                writer,
1280                writer_builder,
1281                ..
1282            } => {
1283                let close_result = match writer.take() {
1284                    Some(mut writer) => {
1285                        Some(writer.close().instrument_await("iceberg_close").await)
1286                    }
1287                    _ => None,
1288                };
1289                match writer_builder.clone().build().await {
1290                    Ok(new_writer) => {
1291                        *writer = Some(Box::new(new_writer));
1292                    }
1293                    _ => {
1294                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1295                        // here because current writer may close successfully. So we just log the error.
1296                        warn!("Failed to build new writer after close");
1297                    }
1298                }
1299                close_result
1300            }
1301            IcebergWriterDispatch::NonpartitionUpsert {
1302                writer,
1303                writer_builder,
1304                ..
1305            } => {
1306                let close_result = match writer.take() {
1307                    Some(mut writer) => {
1308                        Some(writer.close().instrument_await("iceberg_close").await)
1309                    }
1310                    _ => None,
1311                };
1312                match writer_builder.clone().build().await {
1313                    Ok(new_writer) => {
1314                        *writer = Some(Box::new(new_writer));
1315                    }
1316                    _ => {
1317                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1318                        // here because current writer may close successfully. So we just log the error.
1319                        warn!("Failed to build new writer after close");
1320                    }
1321                }
1322                close_result
1323            }
1324        };
1325
1326        match close_result {
1327            Some(Ok(result)) => {
1328                let version = self.table.metadata().format_version() as u8;
1329                let partition_type = self.table.metadata().default_partition_type();
1330                let data_files = result
1331                    .into_iter()
1332                    .map(|f| {
1333                        SerializedDataFile::try_from(f, partition_type, version == 1)
1334                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1335                    })
1336                    .collect::<Result<Vec<_>>>()?;
1337                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1338                    data_files,
1339                    schema_id: self.table.metadata().current_schema_id(),
1340                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1341                })?))
1342            }
1343            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1344            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1345        }
1346    }
1347
1348    /// Clean up
1349    async fn abort(&mut self) -> Result<()> {
1350        // TODO: abort should clean up all the data written in this epoch.
1351        Ok(())
1352    }
1353}
1354
1355const SCHEMA_ID: &str = "schema_id";
1356const PARTITION_SPEC_ID: &str = "partition_spec_id";
1357const DATA_FILES: &str = "data_files";
1358
1359#[derive(Default, Clone)]
1360struct IcebergCommitResult {
1361    schema_id: i32,
1362    partition_spec_id: i32,
1363    data_files: Vec<SerializedDataFile>,
1364}
1365
1366impl IcebergCommitResult {
1367    fn try_from(value: &SinkMetadata) -> Result<Self> {
1368        if let Some(Serialized(v)) = &value.metadata {
1369            let mut values = if let serde_json::Value::Object(v) =
1370                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1371                    .context("Can't parse iceberg sink metadata")?
1372            {
1373                v
1374            } else {
1375                bail!("iceberg sink metadata should be an object");
1376            };
1377
1378            let schema_id;
1379            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1380                schema_id = value
1381                    .as_u64()
1382                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1383            } else {
1384                bail!("iceberg sink metadata should have schema_id");
1385            }
1386
1387            let partition_spec_id;
1388            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1389                partition_spec_id = value
1390                    .as_u64()
1391                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1392            } else {
1393                bail!("iceberg sink metadata should have partition_spec_id");
1394            }
1395
1396            let data_files: Vec<SerializedDataFile>;
1397            if let serde_json::Value::Array(values) = values
1398                .remove(DATA_FILES)
1399                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1400            {
1401                data_files = values
1402                    .into_iter()
1403                    .map(from_value::<SerializedDataFile>)
1404                    .collect::<std::result::Result<_, _>>()
1405                    .unwrap();
1406            } else {
1407                bail!("iceberg sink metadata should have data_files object");
1408            }
1409
1410            Ok(Self {
1411                schema_id: schema_id as i32,
1412                partition_spec_id: partition_spec_id as i32,
1413                data_files,
1414            })
1415        } else {
1416            bail!("Can't create iceberg sink write result from empty data!")
1417        }
1418    }
1419
1420    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1421        let mut values = if let serde_json::Value::Object(value) =
1422            serde_json::from_slice::<serde_json::Value>(&value)
1423                .context("Can't parse iceberg sink metadata")?
1424        {
1425            value
1426        } else {
1427            bail!("iceberg sink metadata should be an object");
1428        };
1429
1430        let schema_id;
1431        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1432            schema_id = value
1433                .as_u64()
1434                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1435        } else {
1436            bail!("iceberg sink metadata should have schema_id");
1437        }
1438
1439        let partition_spec_id;
1440        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1441            partition_spec_id = value
1442                .as_u64()
1443                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1444        } else {
1445            bail!("iceberg sink metadata should have partition_spec_id");
1446        }
1447
1448        let data_files: Vec<SerializedDataFile>;
1449        if let serde_json::Value::Array(values) = values
1450            .remove(DATA_FILES)
1451            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1452        {
1453            data_files = values
1454                .into_iter()
1455                .map(from_value::<SerializedDataFile>)
1456                .collect::<std::result::Result<_, _>>()
1457                .unwrap();
1458        } else {
1459            bail!("iceberg sink metadata should have data_files object");
1460        }
1461
1462        Ok(Self {
1463            schema_id: schema_id as i32,
1464            partition_spec_id: partition_spec_id as i32,
1465            data_files,
1466        })
1467    }
1468}
1469
1470impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1471    type Error = SinkError;
1472
1473    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1474        let json_data_files = serde_json::Value::Array(
1475            value
1476                .data_files
1477                .iter()
1478                .map(serde_json::to_value)
1479                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1480                .context("Can't serialize data files to json")?,
1481        );
1482        let json_value = serde_json::Value::Object(
1483            vec![
1484                (
1485                    SCHEMA_ID.to_owned(),
1486                    serde_json::Value::Number(value.schema_id.into()),
1487                ),
1488                (
1489                    PARTITION_SPEC_ID.to_owned(),
1490                    serde_json::Value::Number(value.partition_spec_id.into()),
1491                ),
1492                (DATA_FILES.to_owned(), json_data_files),
1493            ]
1494            .into_iter()
1495            .collect(),
1496        );
1497        Ok(SinkMetadata {
1498            metadata: Some(Serialized(SerializedMetadata {
1499                metadata: serde_json::to_vec(&json_value)
1500                    .context("Can't serialize iceberg sink metadata")?,
1501            })),
1502        })
1503    }
1504}
1505
1506impl TryFrom<IcebergCommitResult> for Vec<u8> {
1507    type Error = SinkError;
1508
1509    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1510        let json_data_files = serde_json::Value::Array(
1511            value
1512                .data_files
1513                .iter()
1514                .map(serde_json::to_value)
1515                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1516                .context("Can't serialize data files to json")?,
1517        );
1518        let json_value = serde_json::Value::Object(
1519            vec![
1520                (
1521                    SCHEMA_ID.to_owned(),
1522                    serde_json::Value::Number(value.schema_id.into()),
1523                ),
1524                (
1525                    PARTITION_SPEC_ID.to_owned(),
1526                    serde_json::Value::Number(value.partition_spec_id.into()),
1527                ),
1528                (DATA_FILES.to_owned(), json_data_files),
1529            ]
1530            .into_iter()
1531            .collect(),
1532        );
1533        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1534    }
1535}
1536pub struct IcebergSinkCommitter {
1537    catalog: Arc<dyn Catalog>,
1538    table: Table,
1539    pub last_commit_epoch: u64,
1540    pub(crate) is_exactly_once: bool,
1541    pub(crate) sink_id: u32,
1542    pub(crate) config: IcebergConfig,
1543    pub(crate) param: SinkParam,
1544    pub(crate) db: DatabaseConnection,
1545    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1546    commit_retry_num: u32,
1547    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1548}
1549
1550impl IcebergSinkCommitter {
1551    // Reload table and guarantee current schema_id and partition_spec_id matches
1552    // given `schema_id` and `partition_spec_id`
1553    async fn reload_table(
1554        catalog: &dyn Catalog,
1555        table_ident: &TableIdent,
1556        schema_id: i32,
1557        partition_spec_id: i32,
1558    ) -> Result<Table> {
1559        let table = catalog
1560            .load_table(table_ident)
1561            .await
1562            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1563        if table.metadata().current_schema_id() != schema_id {
1564            return Err(SinkError::Iceberg(anyhow!(
1565                "Schema evolution not supported, expect schema id {}, but got {}",
1566                schema_id,
1567                table.metadata().current_schema_id()
1568            )));
1569        }
1570        if table.metadata().default_partition_spec_id() != partition_spec_id {
1571            return Err(SinkError::Iceberg(anyhow!(
1572                "Partition evolution not supported, expect partition spec id {}, but got {}",
1573                partition_spec_id,
1574                table.metadata().default_partition_spec_id()
1575            )));
1576        }
1577        Ok(table)
1578    }
1579}
1580
1581#[async_trait::async_trait]
1582impl SinkCommitCoordinator for IcebergSinkCommitter {
1583    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1584        if self.is_exactly_once {
1585            self.committed_epoch_subscriber = Some(subscriber);
1586            tracing::info!(
1587                "Sink id = {}: iceberg sink coordinator initing.",
1588                self.param.sink_id.sink_id()
1589            );
1590            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1591                let ordered_metadata_list_by_end_epoch =
1592                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1593
1594                let mut last_recommit_epoch = 0;
1595                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1596                    ordered_metadata_list_by_end_epoch
1597                {
1598                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1599                    let mut write_results = vec![];
1600
1601                    for each in write_results_bytes {
1602                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1603                        write_results.push(write_result);
1604                    }
1605
1606                    match (
1607                        committed,
1608                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1609                            .await?,
1610                    ) {
1611                        (true, _) => {
1612                            tracing::info!(
1613                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1614                                self.param.sink_id.sink_id()
1615                            );
1616                        }
1617                        (false, true) => {
1618                            // skip
1619                            tracing::info!(
1620                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1621                                self.param.sink_id.sink_id()
1622                            );
1623                            mark_row_is_committed_by_sink_id_and_end_epoch(
1624                                &self.db,
1625                                self.sink_id,
1626                                end_epoch,
1627                            )
1628                            .await?;
1629                        }
1630                        (false, false) => {
1631                            tracing::info!(
1632                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1633                                self.param.sink_id.sink_id()
1634                            );
1635                            self.re_commit(end_epoch, write_results, snapshot_id)
1636                                .await?;
1637                        }
1638                    }
1639
1640                    last_recommit_epoch = end_epoch;
1641                }
1642                tracing::info!(
1643                    "Sink id = {}: iceberg commit coordinator inited.",
1644                    self.param.sink_id.sink_id()
1645                );
1646                return Ok(Some(last_recommit_epoch));
1647            } else {
1648                tracing::info!(
1649                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1650                    self.param.sink_id.sink_id()
1651                );
1652                return Ok(None);
1653            }
1654        }
1655
1656        tracing::info!("Iceberg commit coordinator inited.");
1657        return Ok(None);
1658    }
1659
1660    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1661        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1662        let write_results: Vec<IcebergCommitResult> = metadata
1663            .iter()
1664            .map(IcebergCommitResult::try_from)
1665            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1666
1667        // Skip if no data to commit
1668        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1669            tracing::debug!(?epoch, "no data to commit");
1670            return Ok(());
1671        }
1672
1673        // guarantee that all write results has same schema_id and partition_spec_id
1674        if write_results
1675            .iter()
1676            .any(|r| r.schema_id != write_results[0].schema_id)
1677            || write_results
1678                .iter()
1679                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1680        {
1681            return Err(SinkError::Iceberg(anyhow!(
1682                "schema_id and partition_spec_id should be the same in all write results"
1683            )));
1684        }
1685
1686        if self.is_exactly_once {
1687            assert!(self.committed_epoch_subscriber.is_some());
1688            match self.committed_epoch_subscriber.clone() {
1689                Some(committed_epoch_subscriber) => {
1690                    // Get the latest committed_epoch and the receiver
1691                    let (committed_epoch, mut rw_futures_utilrx) =
1692                        committed_epoch_subscriber(self.param.sink_id).await?;
1693                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1694                    if committed_epoch >= epoch {
1695                        self.commit_iceberg_inner(epoch, write_results, None)
1696                            .await?;
1697                    } else {
1698                        tracing::info!(
1699                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1700                            committed_epoch,
1701                            epoch
1702                        );
1703                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1704                            tracing::info!(
1705                                "Received next committed epoch: {}",
1706                                next_committed_epoch
1707                            );
1708                            // If next_epoch meets the condition, execute commit immediately
1709                            if next_committed_epoch >= epoch {
1710                                self.commit_iceberg_inner(epoch, write_results, None)
1711                                    .await?;
1712                                break;
1713                            }
1714                        }
1715                    }
1716                }
1717                None => unreachable!(
1718                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1719                ),
1720            }
1721        } else {
1722            self.commit_iceberg_inner(epoch, write_results, None)
1723                .await?;
1724        }
1725
1726        Ok(())
1727    }
1728}
1729
1730/// Methods Required to Achieve Exactly Once Semantics
1731impl IcebergSinkCommitter {
1732    async fn re_commit(
1733        &mut self,
1734        epoch: u64,
1735        write_results: Vec<IcebergCommitResult>,
1736        snapshot_id: i64,
1737    ) -> Result<()> {
1738        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1739
1740        // Skip if no data to commit
1741        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1742            tracing::debug!(?epoch, "no data to commit");
1743            return Ok(());
1744        }
1745        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1746            .await?;
1747        Ok(())
1748    }
1749
1750    async fn commit_iceberg_inner(
1751        &mut self,
1752        epoch: u64,
1753        write_results: Vec<IcebergCommitResult>,
1754        snapshot_id: Option<i64>,
1755    ) -> Result<()> {
1756        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1757        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1758        // that was previously persisted in the system table to commit.
1759        let is_first_commit = snapshot_id.is_none();
1760        self.last_commit_epoch = epoch;
1761        let expect_schema_id = write_results[0].schema_id;
1762        let expect_partition_spec_id = write_results[0].partition_spec_id;
1763
1764        // Load the latest table to avoid concurrent modification with the best effort.
1765        self.table = Self::reload_table(
1766            self.catalog.as_ref(),
1767            self.table.identifier(),
1768            expect_schema_id,
1769            expect_partition_spec_id,
1770        )
1771        .await?;
1772        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1773            return Err(SinkError::Iceberg(anyhow!(
1774                "Can't find schema by id {}",
1775                expect_schema_id
1776            )));
1777        };
1778        let Some(partition_spec) = self
1779            .table
1780            .metadata()
1781            .partition_spec_by_id(expect_partition_spec_id)
1782        else {
1783            return Err(SinkError::Iceberg(anyhow!(
1784                "Can't find partition spec by id {}",
1785                expect_partition_spec_id
1786            )));
1787        };
1788        let partition_type = partition_spec
1789            .as_ref()
1790            .clone()
1791            .partition_type(schema)
1792            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1793
1794        let txn = Transaction::new(&self.table);
1795        // Only generate new snapshot id when first commit.
1796        let snapshot_id = match snapshot_id {
1797            Some(previous_snapshot_id) => previous_snapshot_id,
1798            None => txn.generate_unique_snapshot_id(),
1799        };
1800        if self.is_exactly_once && is_first_commit {
1801            // persist pre commit metadata and snapshot id in system table.
1802            let mut pre_commit_metadata_bytes = Vec::new();
1803            for each_parallelism_write_result in write_results.clone() {
1804                let each_parallelism_write_result_bytes: Vec<u8> =
1805                    each_parallelism_write_result.try_into()?;
1806                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1807            }
1808
1809            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1810
1811            persist_pre_commit_metadata(
1812                self.sink_id,
1813                self.db.clone(),
1814                self.last_commit_epoch,
1815                epoch,
1816                pre_commit_metadata_bytes,
1817                snapshot_id,
1818            )
1819            .await?;
1820        }
1821
1822        let data_files = write_results
1823            .into_iter()
1824            .flat_map(|r| {
1825                r.data_files.into_iter().map(|f| {
1826                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1827                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1828                })
1829            })
1830            .collect::<Result<Vec<DataFile>>>()?;
1831        // # TODO:
1832        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1833        // because retry logic involved reapply the commit metadata.
1834        // For now, we just retry the commit operation.
1835        let retry_strategy = ExponentialBackoff::from_millis(10)
1836            .max_delay(Duration::from_secs(60))
1837            .map(jitter)
1838            .take(self.commit_retry_num as usize);
1839        let catalog = self.catalog.clone();
1840        let table_ident = self.table.identifier().clone();
1841        let table = Retry::spawn(retry_strategy, || async {
1842            let table = Self::reload_table(
1843                catalog.as_ref(),
1844                &table_ident,
1845                expect_schema_id,
1846                expect_partition_spec_id,
1847            )
1848            .await?;
1849            let txn = Transaction::new(&table);
1850            let mut append_action = txn
1851                .fast_append(Some(snapshot_id), None, vec![])
1852                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1853                .with_to_branch(commit_branch(
1854                    self.config.r#type.as_str(),
1855                    self.config.write_mode.as_str(),
1856                ));
1857            append_action
1858                .add_data_files(data_files.clone())
1859                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1860            let tx = append_action.apply().await.map_err(|err| {
1861                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1862                SinkError::Iceberg(anyhow!(err))
1863            })?;
1864            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1865                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1866                SinkError::Iceberg(anyhow!(err))
1867            })
1868        })
1869        .await?;
1870        self.table = table;
1871
1872        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1873
1874        if self.is_exactly_once {
1875            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1876            tracing::info!(
1877                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1878                self.sink_id,
1879                epoch
1880            );
1881
1882            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1883        }
1884        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1885            && self.config.enable_compaction
1886            && iceberg_compact_stat_sender
1887                .send(IcebergSinkCompactionUpdate {
1888                    sink_id: SinkId::new(self.sink_id),
1889                    compaction_interval: self.config.compaction_interval_sec(),
1890                })
1891                .is_err()
1892        {
1893            warn!("failed to send iceberg compaction stats");
1894        }
1895
1896        Ok(())
1897    }
1898
1899    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1900    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1901    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
1902    async fn is_snapshot_id_in_iceberg(
1903        &self,
1904        iceberg_config: &IcebergConfig,
1905        snapshot_id: i64,
1906    ) -> Result<bool> {
1907        let iceberg_common = iceberg_config.common.clone();
1908        let table = iceberg_common
1909            .load_table(&iceberg_config.java_catalog_props)
1910            .await?;
1911        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1912            Ok(true)
1913        } else {
1914            Ok(false)
1915        }
1916    }
1917}
1918
1919const MAP_KEY: &str = "key";
1920const MAP_VALUE: &str = "value";
1921
1922fn get_fields<'a>(
1923    our_field_type: &'a risingwave_common::types::DataType,
1924    data_type: &ArrowDataType,
1925    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1926) -> Option<ArrowFields> {
1927    match data_type {
1928        ArrowDataType::Struct(fields) => {
1929            match our_field_type {
1930                risingwave_common::types::DataType::Struct(struct_fields) => {
1931                    struct_fields.iter().for_each(|(name, data_type)| {
1932                        let res = schema_fields.insert(name, data_type);
1933                        // This assert is to make sure there is no duplicate field name in the schema.
1934                        assert!(res.is_none())
1935                    });
1936                }
1937                risingwave_common::types::DataType::Map(map_fields) => {
1938                    schema_fields.insert(MAP_KEY, map_fields.key());
1939                    schema_fields.insert(MAP_VALUE, map_fields.value());
1940                }
1941                risingwave_common::types::DataType::List(list_field) => {
1942                    list_field.as_struct().iter().for_each(|(name, data_type)| {
1943                        let res = schema_fields.insert(name, data_type);
1944                        // This assert is to make sure there is no duplicate field name in the schema.
1945                        assert!(res.is_none())
1946                    });
1947                }
1948                _ => {}
1949            };
1950            Some(fields.clone())
1951        }
1952        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1953            get_fields(our_field_type, field.data_type(), schema_fields)
1954        }
1955        _ => None, // not a supported complex type and unlikely to show up
1956    }
1957}
1958
1959fn check_compatibility(
1960    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1961    fields: &ArrowFields,
1962) -> anyhow::Result<bool> {
1963    for arrow_field in fields {
1964        let our_field_type = schema_fields
1965            .get(arrow_field.name().as_str())
1966            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1967
1968        // Iceberg source should be able to read iceberg decimal type.
1969        let converted_arrow_data_type = IcebergArrowConvert
1970            .to_arrow_field("", our_field_type)
1971            .map_err(|e| anyhow!(e))?
1972            .data_type()
1973            .clone();
1974
1975        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1976            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1977            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1978            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1979            (ArrowDataType::List(_), ArrowDataType::List(field))
1980            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1981                let mut schema_fields = HashMap::new();
1982                get_fields(our_field_type, field.data_type(), &mut schema_fields)
1983                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1984            }
1985            // validate nested structs
1986            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1987                let mut schema_fields = HashMap::new();
1988                our_field_type
1989                    .as_struct()
1990                    .iter()
1991                    .for_each(|(name, data_type)| {
1992                        let res = schema_fields.insert(name, data_type);
1993                        // This assert is to make sure there is no duplicate field name in the schema.
1994                        assert!(res.is_none())
1995                    });
1996                check_compatibility(schema_fields, fields)?
1997            }
1998            // cases where left != right (metadata, field name mismatch)
1999            //
2000            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2001            // {"PARQUET:field_id": ".."}
2002            //
2003            // map: The standard name in arrow is "entries", "key", "value".
2004            // in iceberg-rs, it's called "key_value"
2005            (left, right) => left.equals_datatype(right),
2006        };
2007        if !compatible {
2008            bail!(
2009                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2010                arrow_field.name(),
2011                converted_arrow_data_type,
2012                arrow_field.data_type()
2013            );
2014        }
2015    }
2016    Ok(true)
2017}
2018
2019/// Try to match our schema with iceberg schema.
2020pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2021    if rw_schema.fields.len() != arrow_schema.fields().len() {
2022        bail!(
2023            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2024            rw_schema.fields.len(),
2025            arrow_schema.fields.len()
2026        );
2027    }
2028
2029    let mut schema_fields = HashMap::new();
2030    rw_schema.fields.iter().for_each(|field| {
2031        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2032        // This assert is to make sure there is no duplicate field name in the schema.
2033        assert!(res.is_none())
2034    });
2035
2036    check_compatibility(schema_fields, &arrow_schema.fields)?;
2037    Ok(())
2038}
2039
2040pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2041    serde_json::to_vec(&metadata).unwrap()
2042}
2043
2044pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2045    serde_json::from_slice(&bytes).unwrap()
2046}
2047
2048pub fn parse_partition_by_exprs(
2049    expr: String,
2050) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2051    // captures column, transform(column), transform(n,column), transform(n, column)
2052    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2053    if !re.is_match(&expr) {
2054        bail!(format!(
2055            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2056            expr
2057        ))
2058    }
2059    let caps = re.captures_iter(&expr);
2060
2061    let mut partition_columns = vec![];
2062
2063    for mat in caps {
2064        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2065            (&mat["transform"], Transform::Identity)
2066        } else {
2067            let mut func = mat["transform"].to_owned();
2068            if func == "bucket" || func == "truncate" {
2069                let n = &mat
2070                    .name("n")
2071                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2072                    .as_str();
2073                func = format!("{func}[{n}]");
2074            }
2075            (
2076                &mat["field"],
2077                Transform::from_str(&func)
2078                    .with_context(|| format!("invalid transform function {}", func))?,
2079            )
2080        };
2081        partition_columns.push((column.to_owned(), transform));
2082    }
2083    Ok(partition_columns)
2084}
2085
2086pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2087    if should_enable_iceberg_cow(sink_type, write_mode) {
2088        ICEBERG_COW_BRANCH.to_owned()
2089    } else {
2090        MAIN_BRANCH.to_owned()
2091    }
2092}
2093
2094pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2095    sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2096}
2097
2098#[cfg(test)]
2099mod test {
2100    use std::collections::BTreeMap;
2101
2102    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2103    use risingwave_common::catalog::Field;
2104    use risingwave_common::types::{DataType, MapType, StructType};
2105
2106    use crate::connector_common::IcebergCommon;
2107    use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2108    use crate::sink::iceberg::{ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig};
2109
2110    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2111
2112    #[test]
2113    fn test_compatible_arrow_schema() {
2114        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2115
2116        use super::*;
2117        let risingwave_schema = Schema::new(vec![
2118            Field::with_name(DataType::Int32, "a"),
2119            Field::with_name(DataType::Int32, "b"),
2120            Field::with_name(DataType::Int32, "c"),
2121        ]);
2122        let arrow_schema = ArrowSchema::new(vec![
2123            ArrowField::new("a", ArrowDataType::Int32, false),
2124            ArrowField::new("b", ArrowDataType::Int32, false),
2125            ArrowField::new("c", ArrowDataType::Int32, false),
2126        ]);
2127
2128        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2129
2130        let risingwave_schema = Schema::new(vec![
2131            Field::with_name(DataType::Int32, "d"),
2132            Field::with_name(DataType::Int32, "c"),
2133            Field::with_name(DataType::Int32, "a"),
2134            Field::with_name(DataType::Int32, "b"),
2135        ]);
2136        let arrow_schema = ArrowSchema::new(vec![
2137            ArrowField::new("a", ArrowDataType::Int32, false),
2138            ArrowField::new("b", ArrowDataType::Int32, false),
2139            ArrowField::new("d", ArrowDataType::Int32, false),
2140            ArrowField::new("c", ArrowDataType::Int32, false),
2141        ]);
2142        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2143
2144        let risingwave_schema = Schema::new(vec![
2145            Field::with_name(
2146                DataType::Struct(StructType::new(vec![
2147                    ("a1", DataType::Int32),
2148                    (
2149                        "a2",
2150                        DataType::Struct(StructType::new(vec![
2151                            ("a21", DataType::Bytea),
2152                            (
2153                                "a22",
2154                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2155                            ),
2156                        ])),
2157                    ),
2158                ])),
2159                "a",
2160            ),
2161            Field::with_name(
2162                DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2163                    ("b1", DataType::Int32),
2164                    ("b2", DataType::Bytea),
2165                    (
2166                        "b3",
2167                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2168                    ),
2169                ])))),
2170                "b",
2171            ),
2172            Field::with_name(
2173                DataType::Map(MapType::from_kv(
2174                    DataType::Varchar,
2175                    DataType::List(Box::new(DataType::Struct(StructType::new([
2176                        ("c1", DataType::Int32),
2177                        ("c2", DataType::Bytea),
2178                        (
2179                            "c3",
2180                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2181                        ),
2182                    ])))),
2183                )),
2184                "c",
2185            ),
2186        ]);
2187        let arrow_schema = ArrowSchema::new(vec![
2188            ArrowField::new(
2189                "a",
2190                ArrowDataType::Struct(ArrowFields::from(vec![
2191                    ArrowField::new("a1", ArrowDataType::Int32, false),
2192                    ArrowField::new(
2193                        "a2",
2194                        ArrowDataType::Struct(ArrowFields::from(vec![
2195                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2196                            ArrowField::new_map(
2197                                "a22",
2198                                "entries",
2199                                ArrowFieldRef::new(ArrowField::new(
2200                                    "key",
2201                                    ArrowDataType::Utf8,
2202                                    false,
2203                                )),
2204                                ArrowFieldRef::new(ArrowField::new(
2205                                    "value",
2206                                    ArrowDataType::Utf8,
2207                                    false,
2208                                )),
2209                                false,
2210                                false,
2211                            ),
2212                        ])),
2213                        false,
2214                    ),
2215                ])),
2216                false,
2217            ),
2218            ArrowField::new(
2219                "b",
2220                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2221                    ArrowDataType::Struct(ArrowFields::from(vec![
2222                        ArrowField::new("b1", ArrowDataType::Int32, false),
2223                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2224                        ArrowField::new_map(
2225                            "b3",
2226                            "entries",
2227                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2228                            ArrowFieldRef::new(ArrowField::new(
2229                                "value",
2230                                ArrowDataType::Utf8,
2231                                false,
2232                            )),
2233                            false,
2234                            false,
2235                        ),
2236                    ])),
2237                    false,
2238                ))),
2239                false,
2240            ),
2241            ArrowField::new_map(
2242                "c",
2243                "entries",
2244                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2245                ArrowFieldRef::new(ArrowField::new(
2246                    "value",
2247                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2248                        ArrowDataType::Struct(ArrowFields::from(vec![
2249                            ArrowField::new("c1", ArrowDataType::Int32, false),
2250                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2251                            ArrowField::new_map(
2252                                "c3",
2253                                "entries",
2254                                ArrowFieldRef::new(ArrowField::new(
2255                                    "key",
2256                                    ArrowDataType::Utf8,
2257                                    false,
2258                                )),
2259                                ArrowFieldRef::new(ArrowField::new(
2260                                    "value",
2261                                    ArrowDataType::Utf8,
2262                                    false,
2263                                )),
2264                                false,
2265                                false,
2266                            ),
2267                        ])),
2268                        false,
2269                    ))),
2270                    false,
2271                )),
2272                false,
2273                false,
2274            ),
2275        ]);
2276        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2277    }
2278
2279    #[test]
2280    fn test_parse_iceberg_config() {
2281        let values = [
2282            ("connector", "iceberg"),
2283            ("type", "upsert"),
2284            ("primary_key", "v1"),
2285            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2286            ("warehouse.path", "s3://iceberg"),
2287            ("s3.endpoint", "http://127.0.0.1:9301"),
2288            ("s3.access.key", "hummockadmin"),
2289            ("s3.secret.key", "hummockadmin"),
2290            ("s3.path.style.access", "true"),
2291            ("s3.region", "us-east-1"),
2292            ("catalog.type", "jdbc"),
2293            ("catalog.name", "demo"),
2294            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2295            ("catalog.jdbc.user", "admin"),
2296            ("catalog.jdbc.password", "123456"),
2297            ("database.name", "demo_db"),
2298            ("table.name", "demo_table"),
2299            ("enable_compaction", "true"),
2300            ("compaction_interval_sec", "1800"),
2301            ("enable_snapshot_expiration", "true"),
2302        ]
2303        .into_iter()
2304        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2305        .collect();
2306
2307        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2308
2309        let expected_iceberg_config = IcebergConfig {
2310            common: IcebergCommon {
2311                warehouse_path: Some("s3://iceberg".to_owned()),
2312                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2313                region: Some("us-east-1".to_owned()),
2314                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2315                access_key: Some("hummockadmin".to_owned()),
2316                secret_key: Some("hummockadmin".to_owned()),
2317                gcs_credential: None,
2318                catalog_type: Some("jdbc".to_owned()),
2319                glue_id: None,
2320                catalog_name: Some("demo".to_owned()),
2321                database_name: Some("demo_db".to_owned()),
2322                table_name: "demo_table".to_owned(),
2323                path_style_access: Some(true),
2324                credential: None,
2325                oauth2_server_uri: None,
2326                scope: None,
2327                token: None,
2328                enable_config_load: None,
2329                rest_signing_name: None,
2330                rest_signing_region: None,
2331                rest_sigv4_enabled: None,
2332                hosted_catalog: None,
2333                azblob_account_name: None,
2334                azblob_account_key: None,
2335                azblob_endpoint_url: None,
2336                header: None,
2337            },
2338            r#type: "upsert".to_owned(),
2339            force_append_only: false,
2340            primary_key: Some(vec!["v1".to_owned()]),
2341            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2342            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2343                .into_iter()
2344                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2345                .collect(),
2346            commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2347            create_table_if_not_exists: false,
2348            is_exactly_once: None,
2349            commit_retry_num: 8,
2350            enable_compaction: true,
2351            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2352            enable_snapshot_expiration: true,
2353            write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2354        };
2355
2356        assert_eq!(iceberg_config, expected_iceberg_config);
2357
2358        assert_eq!(
2359            &iceberg_config.common.full_table_name().unwrap().to_string(),
2360            "demo_db.demo_table"
2361        );
2362    }
2363
2364    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2365        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2366
2367        let table = iceberg_config.load_table().await.unwrap();
2368
2369        println!("{:?}", table.identifier());
2370    }
2371
2372    #[tokio::test]
2373    #[ignore]
2374    async fn test_storage_catalog() {
2375        let values = [
2376            ("connector", "iceberg"),
2377            ("type", "append-only"),
2378            ("force_append_only", "true"),
2379            ("s3.endpoint", "http://127.0.0.1:9301"),
2380            ("s3.access.key", "hummockadmin"),
2381            ("s3.secret.key", "hummockadmin"),
2382            ("s3.region", "us-east-1"),
2383            ("s3.path.style.access", "true"),
2384            ("catalog.name", "demo"),
2385            ("catalog.type", "storage"),
2386            ("warehouse.path", "s3://icebergdata/demo"),
2387            ("database.name", "s1"),
2388            ("table.name", "t1"),
2389        ]
2390        .into_iter()
2391        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2392        .collect();
2393
2394        test_create_catalog(values).await;
2395    }
2396
2397    #[tokio::test]
2398    #[ignore]
2399    async fn test_rest_catalog() {
2400        let values = [
2401            ("connector", "iceberg"),
2402            ("type", "append-only"),
2403            ("force_append_only", "true"),
2404            ("s3.endpoint", "http://127.0.0.1:9301"),
2405            ("s3.access.key", "hummockadmin"),
2406            ("s3.secret.key", "hummockadmin"),
2407            ("s3.region", "us-east-1"),
2408            ("s3.path.style.access", "true"),
2409            ("catalog.name", "demo"),
2410            ("catalog.type", "rest"),
2411            ("catalog.uri", "http://192.168.167.4:8181"),
2412            ("warehouse.path", "s3://icebergdata/demo"),
2413            ("database.name", "s1"),
2414            ("table.name", "t1"),
2415        ]
2416        .into_iter()
2417        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2418        .collect();
2419
2420        test_create_catalog(values).await;
2421    }
2422
2423    #[tokio::test]
2424    #[ignore]
2425    async fn test_jdbc_catalog() {
2426        let values = [
2427            ("connector", "iceberg"),
2428            ("type", "append-only"),
2429            ("force_append_only", "true"),
2430            ("s3.endpoint", "http://127.0.0.1:9301"),
2431            ("s3.access.key", "hummockadmin"),
2432            ("s3.secret.key", "hummockadmin"),
2433            ("s3.region", "us-east-1"),
2434            ("s3.path.style.access", "true"),
2435            ("catalog.name", "demo"),
2436            ("catalog.type", "jdbc"),
2437            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2438            ("catalog.jdbc.user", "admin"),
2439            ("catalog.jdbc.password", "123456"),
2440            ("warehouse.path", "s3://icebergdata/demo"),
2441            ("database.name", "s1"),
2442            ("table.name", "t1"),
2443        ]
2444        .into_iter()
2445        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2446        .collect();
2447
2448        test_create_catalog(values).await;
2449    }
2450
2451    #[tokio::test]
2452    #[ignore]
2453    async fn test_hive_catalog() {
2454        let values = [
2455            ("connector", "iceberg"),
2456            ("type", "append-only"),
2457            ("force_append_only", "true"),
2458            ("s3.endpoint", "http://127.0.0.1:9301"),
2459            ("s3.access.key", "hummockadmin"),
2460            ("s3.secret.key", "hummockadmin"),
2461            ("s3.region", "us-east-1"),
2462            ("s3.path.style.access", "true"),
2463            ("catalog.name", "demo"),
2464            ("catalog.type", "hive"),
2465            ("catalog.uri", "thrift://localhost:9083"),
2466            ("warehouse.path", "s3://icebergdata/demo"),
2467            ("database.name", "s1"),
2468            ("table.name", "t1"),
2469        ]
2470        .into_iter()
2471        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2472        .collect();
2473
2474        test_create_catalog(values).await;
2475    }
2476}