risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
30};
31use iceberg::table::Table;
32use iceberg::transaction::Transaction;
33use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
34use iceberg::writer::base_writer::equality_delete_writer::{
35    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
36};
37use iceberg::writer::base_writer::sort_position_delete_writer::{
38    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
39};
40use iceberg::writer::file_writer::ParquetWriterBuilder;
41use iceberg::writer::file_writer::location_generator::{
42    DefaultFileNameGenerator, DefaultLocationGenerator,
43};
44use iceberg::writer::function_writer::equality_delta_writer::{
45    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
46};
47use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
48use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
49use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
50use itertools::Itertools;
51use parquet::file::properties::WriterProperties;
52use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
53use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58    Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::Schema;
65use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
66use risingwave_common_estimate_size::EstimateSize;
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::DatabaseConnection;
71use serde_derive::Deserialize;
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
84use super::{
85    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::iceberg::exactly_once_util::*;
93use crate::sink::writer::SinkWriter;
94use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
95use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
96
97pub const ICEBERG_SINK: &str = "iceberg";
98
99fn default_commit_retry_num() -> u32 {
100    8
101}
102
103#[serde_as]
104#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
105pub struct IcebergConfig {
106    pub r#type: String, // accept "append-only" or "upsert"
107
108    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
109    pub force_append_only: bool,
110
111    #[serde(flatten)]
112    common: IcebergCommon,
113
114    #[serde(
115        rename = "primary_key",
116        default,
117        deserialize_with = "deserialize_optional_string_seq_from_string"
118    )]
119    pub primary_key: Option<Vec<String>>,
120
121    // Props for java catalog props.
122    #[serde(skip)]
123    pub java_catalog_props: HashMap<String, String>,
124
125    #[serde(default)]
126    pub partition_by: Option<String>,
127
128    /// Commit every n(>0) checkpoints, default is 10.
129    #[serde(default = "default_commit_checkpoint_interval")]
130    #[serde_as(as = "DisplayFromStr")]
131    #[with_option(allow_alter_on_fly)]
132    pub commit_checkpoint_interval: u64,
133
134    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
135    pub create_table_if_not_exists: bool,
136
137    /// Whether it is `exactly_once`, the default is not.
138    #[serde(default)]
139    #[serde_as(as = "Option<DisplayFromStr>")]
140    pub is_exactly_once: Option<bool>,
141    // Retry commit num when iceberg commit fail. default is 8.
142    // # TODO
143    // Iceberg table may store the retry commit num in table meta.
144    // We should try to find and use that as default commit retry num first.
145    #[serde(default = "default_commit_retry_num")]
146    pub commit_retry_num: u32,
147
148    /// Whether to enable iceberg compaction.
149    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
150    #[with_option(allow_alter_on_fly)]
151    pub enable_compaction: bool,
152
153    /// The interval of iceberg compaction
154    #[serde(default)]
155    #[serde_as(as = "Option<DisplayFromStr>")]
156    #[with_option(allow_alter_on_fly)]
157    pub compaction_interval_sec: Option<u64>,
158
159    /// Whether to enable iceberg expired snapshots.
160    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
161    #[with_option(allow_alter_on_fly)]
162    pub enable_snapshot_expiration: bool,
163}
164
165impl EnforceSecret for IcebergConfig {
166    fn enforce_secret<'a>(
167        prop_iter: impl Iterator<Item = &'a str>,
168    ) -> crate::error::ConnectorResult<()> {
169        for prop in prop_iter {
170            IcebergCommon::enforce_one(prop)?;
171        }
172        Ok(())
173    }
174
175    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
176        IcebergCommon::enforce_one(prop)
177    }
178}
179
180impl IcebergConfig {
181    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
182        let mut config =
183            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
184                .map_err(|e| SinkError::Config(anyhow!(e)))?;
185
186        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
187            return Err(SinkError::Config(anyhow!(
188                "`{}` must be {}, or {}",
189                SINK_TYPE_OPTION,
190                SINK_TYPE_APPEND_ONLY,
191                SINK_TYPE_UPSERT
192            )));
193        }
194
195        if config.r#type == SINK_TYPE_UPSERT {
196            if let Some(primary_key) = &config.primary_key {
197                if primary_key.is_empty() {
198                    return Err(SinkError::Config(anyhow!(
199                        "`primary_key` must not be empty in {}",
200                        SINK_TYPE_UPSERT
201                    )));
202                }
203            } else {
204                return Err(SinkError::Config(anyhow!(
205                    "Must set `primary_key` in {}",
206                    SINK_TYPE_UPSERT
207                )));
208            }
209        }
210
211        // All configs start with "catalog." will be treated as java configs.
212        config.java_catalog_props = values
213            .iter()
214            .filter(|(k, _v)| {
215                k.starts_with("catalog.")
216                    && k != &"catalog.uri"
217                    && k != &"catalog.type"
218                    && k != &"catalog.name"
219            })
220            .map(|(k, v)| (k[8..].to_string(), v.clone()))
221            .collect();
222
223        if config.commit_checkpoint_interval == 0 {
224            return Err(SinkError::Config(anyhow!(
225                "`commit_checkpoint_interval` must be greater than 0"
226            )));
227        }
228
229        Ok(config)
230    }
231
232    pub fn catalog_type(&self) -> &str {
233        self.common.catalog_type()
234    }
235
236    pub async fn load_table(&self) -> Result<Table> {
237        self.common
238            .load_table(&self.java_catalog_props)
239            .await
240            .map_err(Into::into)
241    }
242
243    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
244        self.common
245            .create_catalog(&self.java_catalog_props)
246            .await
247            .map_err(Into::into)
248    }
249
250    pub fn full_table_name(&self) -> Result<TableIdent> {
251        self.common.full_table_name().map_err(Into::into)
252    }
253
254    pub fn catalog_name(&self) -> String {
255        self.common.catalog_name()
256    }
257
258    pub fn compaction_interval_sec(&self) -> u64 {
259        // default to 1 hour
260        self.compaction_interval_sec.unwrap_or(3600)
261    }
262}
263
264pub struct IcebergSink {
265    config: IcebergConfig,
266    param: SinkParam,
267    // In upsert mode, it never be None and empty.
268    unique_column_ids: Option<Vec<usize>>,
269}
270
271impl EnforceSecret for IcebergSink {
272    fn enforce_secret<'a>(
273        prop_iter: impl Iterator<Item = &'a str>,
274    ) -> crate::error::ConnectorResult<()> {
275        for prop in prop_iter {
276            IcebergConfig::enforce_one(prop)?;
277        }
278        Ok(())
279    }
280}
281
282impl TryFrom<SinkParam> for IcebergSink {
283    type Error = SinkError;
284
285    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
286        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
287        IcebergSink::new(config, param)
288    }
289}
290
291impl Debug for IcebergSink {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        f.debug_struct("IcebergSink")
294            .field("config", &self.config)
295            .finish()
296    }
297}
298
299impl IcebergSink {
300    async fn create_and_validate_table(&self) -> Result<Table> {
301        if self.config.create_table_if_not_exists {
302            self.create_table_if_not_exists().await?;
303        }
304
305        let table = self
306            .config
307            .load_table()
308            .await
309            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
310
311        let sink_schema = self.param.schema();
312        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
313            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
314
315        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
316            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
317
318        Ok(table)
319    }
320
321    async fn create_table_if_not_exists(&self) -> Result<()> {
322        let catalog = self.config.create_catalog().await?;
323        let namespace = if let Some(database_name) = &self.config.common.database_name {
324            let namespace = NamespaceIdent::new(database_name.clone());
325            if !catalog
326                .namespace_exists(&namespace)
327                .await
328                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
329            {
330                catalog
331                    .create_namespace(&namespace, HashMap::default())
332                    .await
333                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
334                    .context("failed to create iceberg namespace")?;
335            }
336            namespace
337        } else {
338            bail!("database name must be set if you want to create table")
339        };
340
341        let table_id = self
342            .config
343            .full_table_name()
344            .context("Unable to parse table name")?;
345        if !catalog
346            .table_exists(&table_id)
347            .await
348            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
349        {
350            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
351            // convert risingwave schema -> arrow schema -> iceberg schema
352            let arrow_fields = self
353                .param
354                .columns
355                .iter()
356                .map(|column| {
357                    Ok(iceberg_create_table_arrow_convert
358                        .to_arrow_field(&column.name, &column.data_type)
359                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
360                        .context(format!(
361                            "failed to convert {}: {} to arrow type",
362                            &column.name, &column.data_type
363                        ))?)
364                })
365                .collect::<Result<Vec<ArrowField>>>()?;
366            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
367            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
368                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
369                .context("failed to convert arrow schema to iceberg schema")?;
370
371            let location = {
372                let mut names = namespace.clone().inner();
373                names.push(self.config.common.table_name.clone());
374                match &self.config.common.warehouse_path {
375                    Some(warehouse_path) => {
376                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
377                        let url = Url::parse(warehouse_path);
378                        if url.is_err() || is_s3_tables {
379                            // For rest catalog, the warehouse_path could be a warehouse name.
380                            // In this case, we should specify the location when creating a table.
381                            if self.config.common.catalog_type() == "rest"
382                                || self.config.common.catalog_type() == "rest_rust"
383                            {
384                                None
385                            } else {
386                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
387                            }
388                        } else if warehouse_path.ends_with('/') {
389                            Some(format!("{}{}", warehouse_path, names.join("/")))
390                        } else {
391                            Some(format!("{}/{}", warehouse_path, names.join("/")))
392                        }
393                    }
394                    None => None,
395                }
396            };
397
398            let partition_spec = match &self.config.partition_by {
399                Some(partition_by) => {
400                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
401                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
402                        .into_iter()
403                        .enumerate()
404                    {
405                        match iceberg_schema.field_id_by_name(&column) {
406                            Some(id) => partition_fields.push(
407                                UnboundPartitionField::builder()
408                                    .source_id(id)
409                                    .transform(transform)
410                                    .name(format!("_p_{}", column))
411                                    .field_id(i as i32)
412                                    .build(),
413                            ),
414                            None => bail!(format!(
415                                "Partition source column does not exist in schema: {}",
416                                column
417                            )),
418                        };
419                    }
420                    Some(
421                        UnboundPartitionSpec::builder()
422                            .with_spec_id(0)
423                            .add_partition_fields(partition_fields)
424                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
425                            .context("failed to add partition columns")?
426                            .build(),
427                    )
428                }
429                None => None,
430            };
431
432            let table_creation_builder = TableCreation::builder()
433                .name(self.config.common.table_name.clone())
434                .schema(iceberg_schema);
435
436            let table_creation = match (location, partition_spec) {
437                (Some(location), Some(partition_spec)) => table_creation_builder
438                    .location(location)
439                    .partition_spec(partition_spec)
440                    .build(),
441                (Some(location), None) => table_creation_builder.location(location).build(),
442                (None, Some(partition_spec)) => table_creation_builder
443                    .partition_spec(partition_spec)
444                    .build(),
445                (None, None) => table_creation_builder.build(),
446            };
447
448            catalog
449                .create_table(&namespace, table_creation)
450                .await
451                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
452                .context("failed to create iceberg table")?;
453        }
454        Ok(())
455    }
456
457    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
458        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
459            if let Some(pk) = &config.primary_key {
460                let mut unique_column_ids = Vec::with_capacity(pk.len());
461                for col_name in pk {
462                    let id = param
463                        .columns
464                        .iter()
465                        .find(|col| col.name.as_str() == col_name)
466                        .ok_or_else(|| {
467                            SinkError::Config(anyhow!(
468                                "Primary key column {} not found in sink schema",
469                                col_name
470                            ))
471                        })?
472                        .column_id
473                        .get_id() as usize;
474                    unique_column_ids.push(id);
475                }
476                Some(unique_column_ids)
477            } else {
478                unreachable!()
479            }
480        } else {
481            None
482        };
483        Ok(Self {
484            config,
485            param,
486            unique_column_ids,
487        })
488    }
489}
490
491impl Sink for IcebergSink {
492    type Coordinator = IcebergSinkCommitter;
493    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
494
495    const SINK_NAME: &'static str = ICEBERG_SINK;
496
497    async fn validate(&self) -> Result<()> {
498        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
499            bail!("Snowflake catalog only supports iceberg sources");
500        }
501        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
502            risingwave_common::license::Feature::IcebergSinkWithGlue
503                .check_available()
504                .map_err(|e| anyhow::anyhow!(e))?;
505        }
506        if self.config.enable_compaction {
507            risingwave_common::license::Feature::IcebergCompaction
508                .check_available()
509                .map_err(|e| anyhow::anyhow!(e))?;
510        }
511
512        let _ = self.create_and_validate_table().await?;
513        Ok(())
514    }
515
516    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
517        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
518
519        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
520            if iceberg_config.enable_compaction {
521                if compaction_interval == 0 {
522                    bail!(
523                        "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
524                    );
525                }
526            } else {
527                bail!("`compaction_interval_sec` can only be set when `enable_compaction` is true");
528            }
529        }
530
531        Ok(())
532    }
533
534    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
535        let table = self.create_and_validate_table().await?;
536        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
537            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
538        } else {
539            IcebergSinkWriter::new_append_only(table, &writer_param).await?
540        };
541
542        let commit_checkpoint_interval =
543            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
544                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
545            );
546        let writer = CoordinatedLogSinker::new(
547            &writer_param,
548            self.param.clone(),
549            inner,
550            commit_checkpoint_interval,
551        )
552        .await?;
553
554        Ok(writer)
555    }
556
557    fn is_coordinated_sink(&self) -> bool {
558        true
559    }
560
561    async fn new_coordinator(
562        &self,
563        db: DatabaseConnection,
564        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
565    ) -> Result<Self::Coordinator> {
566        let catalog = self.config.create_catalog().await?;
567        let table = self.create_and_validate_table().await?;
568        Ok(IcebergSinkCommitter {
569            catalog,
570            table,
571            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
572            last_commit_epoch: 0,
573            sink_id: self.param.sink_id.sink_id(),
574            config: self.config.clone(),
575            param: self.param.clone(),
576            db,
577            commit_retry_num: self.config.commit_retry_num,
578            committed_epoch_subscriber: None,
579            iceberg_compact_stat_sender,
580        })
581    }
582}
583
584/// None means no project.
585/// Prepare represent the extra partition column idx.
586/// Done represents the project idx vec.
587///
588/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
589/// to create the project idx vec.
590enum ProjectIdxVec {
591    None,
592    Prepare(usize),
593    Done(Vec<usize>),
594}
595
596pub struct IcebergSinkWriter {
597    writer: IcebergWriterDispatch,
598    arrow_schema: SchemaRef,
599    // See comments below
600    metrics: IcebergWriterMetrics,
601    // State of iceberg table for this writer
602    table: Table,
603    // For chunk with extra partition column, we should remove this column before write.
604    // This project index vec is used to avoid create project idx each time.
605    project_idx_vec: ProjectIdxVec,
606}
607
608#[allow(clippy::type_complexity)]
609enum IcebergWriterDispatch {
610    PartitionAppendOnly {
611        writer: Option<Box<dyn IcebergWriter>>,
612        writer_builder: MonitoredGeneralWriterBuilder<
613            FanoutPartitionWriterBuilder<
614                DataFileWriterBuilder<
615                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
616                >,
617            >,
618        >,
619    },
620    NonpartitionAppendOnly {
621        writer: Option<Box<dyn IcebergWriter>>,
622        writer_builder: MonitoredGeneralWriterBuilder<
623            DataFileWriterBuilder<
624                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
625            >,
626        >,
627    },
628    PartitionUpsert {
629        writer: Option<Box<dyn IcebergWriter>>,
630        writer_builder: MonitoredGeneralWriterBuilder<
631            FanoutPartitionWriterBuilder<
632                EqualityDeltaWriterBuilder<
633                    DataFileWriterBuilder<
634                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
635                    >,
636                    MonitoredPositionDeleteWriterBuilder<
637                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
638                    >,
639                    EqualityDeleteFileWriterBuilder<
640                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
641                    >,
642                >,
643            >,
644        >,
645        arrow_schema_with_op_column: SchemaRef,
646    },
647    NonpartitionUpsert {
648        writer: Option<Box<dyn IcebergWriter>>,
649        writer_builder: MonitoredGeneralWriterBuilder<
650            EqualityDeltaWriterBuilder<
651                DataFileWriterBuilder<
652                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
653                >,
654                MonitoredPositionDeleteWriterBuilder<
655                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
656                >,
657                EqualityDeleteFileWriterBuilder<
658                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
659                >,
660            >,
661        >,
662        arrow_schema_with_op_column: SchemaRef,
663    },
664}
665
666impl IcebergWriterDispatch {
667    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
668        match self {
669            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
670            | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
671            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
672            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
673        }
674    }
675}
676
677pub struct IcebergWriterMetrics {
678    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
679    // They are actually used in `PrometheusWriterBuilder`:
680    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
681    // We keep them here to let the guard cleans the labels from metrics registry when dropped
682    _write_qps: LabelGuardedIntCounter,
683    _write_latency: LabelGuardedHistogram,
684    write_bytes: LabelGuardedIntCounter,
685}
686
687impl IcebergSinkWriter {
688    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
689        let SinkWriterParam {
690            extra_partition_col_idx,
691            actor_id,
692            sink_id,
693            sink_name,
694            ..
695        } = writer_param;
696        let metrics_labels = [
697            &actor_id.to_string(),
698            &sink_id.to_string(),
699            sink_name.as_str(),
700        ];
701
702        // Metrics
703        let write_qps = GLOBAL_SINK_METRICS
704            .iceberg_write_qps
705            .with_guarded_label_values(&metrics_labels);
706        let write_latency = GLOBAL_SINK_METRICS
707            .iceberg_write_latency
708            .with_guarded_label_values(&metrics_labels);
709        // # TODO
710        // Unused. Add this metrics later.
711        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
712            .iceberg_rolling_unflushed_data_file
713            .with_guarded_label_values(&metrics_labels);
714        let write_bytes = GLOBAL_SINK_METRICS
715            .iceberg_write_bytes
716            .with_guarded_label_values(&metrics_labels);
717
718        let schema = table.metadata().current_schema();
719        let partition_spec = table.metadata().default_partition_spec();
720
721        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
722        let unique_uuid_suffix = Uuid::now_v7();
723
724        let parquet_writer_builder = ParquetWriterBuilder::new(
725            WriterProperties::new(),
726            schema.clone(),
727            table.file_io().clone(),
728            DefaultLocationGenerator::new(table.metadata().clone())
729                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
730            DefaultFileNameGenerator::new(
731                writer_param.actor_id.to_string(),
732                Some(unique_uuid_suffix.to_string()),
733                iceberg::spec::DataFileFormat::Parquet,
734            ),
735        );
736        let data_file_builder =
737            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
738        if partition_spec.fields().is_empty() {
739            let writer_builder = MonitoredGeneralWriterBuilder::new(
740                data_file_builder,
741                write_qps.clone(),
742                write_latency.clone(),
743            );
744            let inner_writer = Some(Box::new(
745                writer_builder
746                    .clone()
747                    .build()
748                    .await
749                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
750            ) as Box<dyn IcebergWriter>);
751            Ok(Self {
752                arrow_schema: Arc::new(
753                    schema_to_arrow_schema(table.metadata().current_schema())
754                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
755                ),
756                metrics: IcebergWriterMetrics {
757                    _write_qps: write_qps,
758                    _write_latency: write_latency,
759                    write_bytes,
760                },
761                writer: IcebergWriterDispatch::NonpartitionAppendOnly {
762                    writer: inner_writer,
763                    writer_builder,
764                },
765                table,
766                project_idx_vec: {
767                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
768                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
769                    } else {
770                        ProjectIdxVec::None
771                    }
772                },
773            })
774        } else {
775            let partition_builder = MonitoredGeneralWriterBuilder::new(
776                FanoutPartitionWriterBuilder::new(
777                    data_file_builder,
778                    partition_spec.clone(),
779                    schema.clone(),
780                )
781                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
782                write_qps.clone(),
783                write_latency.clone(),
784            );
785            let inner_writer = Some(Box::new(
786                partition_builder
787                    .clone()
788                    .build()
789                    .await
790                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
791            ) as Box<dyn IcebergWriter>);
792            Ok(Self {
793                arrow_schema: Arc::new(
794                    schema_to_arrow_schema(table.metadata().current_schema())
795                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
796                ),
797                metrics: IcebergWriterMetrics {
798                    _write_qps: write_qps,
799                    _write_latency: write_latency,
800                    write_bytes,
801                },
802                writer: IcebergWriterDispatch::PartitionAppendOnly {
803                    writer: inner_writer,
804                    writer_builder: partition_builder,
805                },
806                table,
807                project_idx_vec: {
808                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
809                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
810                    } else {
811                        ProjectIdxVec::None
812                    }
813                },
814            })
815        }
816    }
817
818    pub async fn new_upsert(
819        table: Table,
820        unique_column_ids: Vec<usize>,
821        writer_param: &SinkWriterParam,
822    ) -> Result<Self> {
823        let SinkWriterParam {
824            extra_partition_col_idx,
825            actor_id,
826            sink_id,
827            sink_name,
828            ..
829        } = writer_param;
830        let metrics_labels = [
831            &actor_id.to_string(),
832            &sink_id.to_string(),
833            sink_name.as_str(),
834        ];
835        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
836
837        // Metrics
838        let write_qps = GLOBAL_SINK_METRICS
839            .iceberg_write_qps
840            .with_guarded_label_values(&metrics_labels);
841        let write_latency = GLOBAL_SINK_METRICS
842            .iceberg_write_latency
843            .with_guarded_label_values(&metrics_labels);
844        // # TODO
845        // Unused. Add this metrics later.
846        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
847            .iceberg_rolling_unflushed_data_file
848            .with_guarded_label_values(&metrics_labels);
849        let position_delete_cache_num = GLOBAL_SINK_METRICS
850            .iceberg_position_delete_cache_num
851            .with_guarded_label_values(&metrics_labels);
852        let write_bytes = GLOBAL_SINK_METRICS
853            .iceberg_write_bytes
854            .with_guarded_label_values(&metrics_labels);
855
856        // Determine the schema id and partition spec id
857        let schema = table.metadata().current_schema();
858        let partition_spec = table.metadata().default_partition_spec();
859
860        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
861        let unique_uuid_suffix = Uuid::now_v7();
862
863        let data_file_builder = {
864            let parquet_writer_builder = ParquetWriterBuilder::new(
865                WriterProperties::new(),
866                schema.clone(),
867                table.file_io().clone(),
868                DefaultLocationGenerator::new(table.metadata().clone())
869                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
870                DefaultFileNameGenerator::new(
871                    writer_param.actor_id.to_string(),
872                    Some(unique_uuid_suffix.to_string()),
873                    iceberg::spec::DataFileFormat::Parquet,
874                ),
875            );
876            DataFileWriterBuilder::new(
877                parquet_writer_builder.clone(),
878                None,
879                partition_spec.spec_id(),
880            )
881        };
882        let position_delete_builder = {
883            let parquet_writer_builder = ParquetWriterBuilder::new(
884                WriterProperties::new(),
885                POSITION_DELETE_SCHEMA.clone(),
886                table.file_io().clone(),
887                DefaultLocationGenerator::new(table.metadata().clone())
888                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
889                DefaultFileNameGenerator::new(
890                    writer_param.actor_id.to_string(),
891                    Some(format!("pos-del-{}", unique_uuid_suffix)),
892                    iceberg::spec::DataFileFormat::Parquet,
893                ),
894            );
895            MonitoredPositionDeleteWriterBuilder::new(
896                SortPositionDeleteWriterBuilder::new(
897                    parquet_writer_builder.clone(),
898                    writer_param
899                        .streaming_config
900                        .developer
901                        .iceberg_sink_positional_delete_cache_size,
902                    None,
903                    None,
904                ),
905                position_delete_cache_num,
906            )
907        };
908        let equality_delete_builder = {
909            let config = EqualityDeleteWriterConfig::new(
910                unique_column_ids.clone(),
911                table.metadata().current_schema().clone(),
912                None,
913                partition_spec.spec_id(),
914            )
915            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
916            let parquet_writer_builder = ParquetWriterBuilder::new(
917                WriterProperties::new(),
918                Arc::new(
919                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
920                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
921                ),
922                table.file_io().clone(),
923                DefaultLocationGenerator::new(table.metadata().clone())
924                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
925                DefaultFileNameGenerator::new(
926                    writer_param.actor_id.to_string(),
927                    Some(format!("eq-del-{}", unique_uuid_suffix)),
928                    iceberg::spec::DataFileFormat::Parquet,
929                ),
930            );
931
932            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
933        };
934        let delta_builder = EqualityDeltaWriterBuilder::new(
935            data_file_builder,
936            position_delete_builder,
937            equality_delete_builder,
938            unique_column_ids,
939        );
940        if partition_spec.fields().is_empty() {
941            let writer_builder = MonitoredGeneralWriterBuilder::new(
942                delta_builder,
943                write_qps.clone(),
944                write_latency.clone(),
945            );
946            let inner_writer = Some(Box::new(
947                writer_builder
948                    .clone()
949                    .build()
950                    .await
951                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
952            ) as Box<dyn IcebergWriter>);
953            let original_arrow_schema = Arc::new(
954                schema_to_arrow_schema(table.metadata().current_schema())
955                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
956            );
957            let schema_with_extra_op_column = {
958                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
959                new_fields.push(Arc::new(ArrowField::new(
960                    "op".to_owned(),
961                    ArrowDataType::Int32,
962                    false,
963                )));
964                Arc::new(ArrowSchema::new(new_fields))
965            };
966            Ok(Self {
967                arrow_schema: original_arrow_schema,
968                metrics: IcebergWriterMetrics {
969                    _write_qps: write_qps,
970                    _write_latency: write_latency,
971                    write_bytes,
972                },
973                table,
974                writer: IcebergWriterDispatch::NonpartitionUpsert {
975                    writer: inner_writer,
976                    writer_builder,
977                    arrow_schema_with_op_column: schema_with_extra_op_column,
978                },
979                project_idx_vec: {
980                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
981                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
982                    } else {
983                        ProjectIdxVec::None
984                    }
985                },
986            })
987        } else {
988            let original_arrow_schema = Arc::new(
989                schema_to_arrow_schema(table.metadata().current_schema())
990                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
991            );
992            let schema_with_extra_op_column = {
993                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
994                new_fields.push(Arc::new(ArrowField::new(
995                    "op".to_owned(),
996                    ArrowDataType::Int32,
997                    false,
998                )));
999                Arc::new(ArrowSchema::new(new_fields))
1000            };
1001            let partition_builder = MonitoredGeneralWriterBuilder::new(
1002                FanoutPartitionWriterBuilder::new_with_custom_schema(
1003                    delta_builder,
1004                    schema_with_extra_op_column.clone(),
1005                    partition_spec.clone(),
1006                    table.metadata().current_schema().clone(),
1007                ),
1008                write_qps.clone(),
1009                write_latency.clone(),
1010            );
1011            let inner_writer = Some(Box::new(
1012                partition_builder
1013                    .clone()
1014                    .build()
1015                    .await
1016                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1017            ) as Box<dyn IcebergWriter>);
1018            Ok(Self {
1019                arrow_schema: original_arrow_schema,
1020                metrics: IcebergWriterMetrics {
1021                    _write_qps: write_qps,
1022                    _write_latency: write_latency,
1023                    write_bytes,
1024                },
1025                table,
1026                writer: IcebergWriterDispatch::PartitionUpsert {
1027                    writer: inner_writer,
1028                    writer_builder: partition_builder,
1029                    arrow_schema_with_op_column: schema_with_extra_op_column,
1030                },
1031                project_idx_vec: {
1032                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1033                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1034                    } else {
1035                        ProjectIdxVec::None
1036                    }
1037                },
1038            })
1039        }
1040    }
1041}
1042
1043#[async_trait]
1044impl SinkWriter for IcebergSinkWriter {
1045    type CommitMetadata = Option<SinkMetadata>;
1046
1047    /// Begin a new epoch
1048    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1049        // Just skip it.
1050        Ok(())
1051    }
1052
1053    /// Write a stream chunk to sink
1054    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1055        // Try to build writer if it's None.
1056        match &mut self.writer {
1057            IcebergWriterDispatch::PartitionAppendOnly {
1058                writer,
1059                writer_builder,
1060            } => {
1061                if writer.is_none() {
1062                    *writer = Some(Box::new(
1063                        writer_builder
1064                            .clone()
1065                            .build()
1066                            .await
1067                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1068                    ));
1069                }
1070            }
1071            IcebergWriterDispatch::NonpartitionAppendOnly {
1072                writer,
1073                writer_builder,
1074            } => {
1075                if writer.is_none() {
1076                    *writer = Some(Box::new(
1077                        writer_builder
1078                            .clone()
1079                            .build()
1080                            .await
1081                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1082                    ));
1083                }
1084            }
1085            IcebergWriterDispatch::PartitionUpsert {
1086                writer,
1087                writer_builder,
1088                ..
1089            } => {
1090                if writer.is_none() {
1091                    *writer = Some(Box::new(
1092                        writer_builder
1093                            .clone()
1094                            .build()
1095                            .await
1096                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1097                    ));
1098                }
1099            }
1100            IcebergWriterDispatch::NonpartitionUpsert {
1101                writer,
1102                writer_builder,
1103                ..
1104            } => {
1105                if writer.is_none() {
1106                    *writer = Some(Box::new(
1107                        writer_builder
1108                            .clone()
1109                            .build()
1110                            .await
1111                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1112                    ));
1113                }
1114            }
1115        };
1116
1117        // Process the chunk.
1118        let (mut chunk, ops) = chunk.compact().into_parts();
1119        match &self.project_idx_vec {
1120            ProjectIdxVec::None => {}
1121            ProjectIdxVec::Prepare(idx) => {
1122                if *idx >= chunk.columns().len() {
1123                    return Err(SinkError::Iceberg(anyhow!(
1124                        "invalid extra partition column index {}",
1125                        idx
1126                    )));
1127                }
1128                let project_idx_vec = (0..*idx)
1129                    .chain(*idx + 1..chunk.columns().len())
1130                    .collect_vec();
1131                chunk = chunk.project(&project_idx_vec);
1132                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1133            }
1134            ProjectIdxVec::Done(idx_vec) => {
1135                chunk = chunk.project(idx_vec);
1136            }
1137        }
1138        if ops.is_empty() {
1139            return Ok(());
1140        }
1141        let write_batch_size = chunk.estimated_heap_size();
1142        let batch = match &self.writer {
1143            IcebergWriterDispatch::PartitionAppendOnly { .. }
1144            | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1145                // separate out insert chunk
1146                let filters =
1147                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1148                chunk.set_visibility(filters);
1149                IcebergArrowConvert
1150                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1151                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1152            }
1153            IcebergWriterDispatch::PartitionUpsert {
1154                arrow_schema_with_op_column,
1155                ..
1156            }
1157            | IcebergWriterDispatch::NonpartitionUpsert {
1158                arrow_schema_with_op_column,
1159                ..
1160            } => {
1161                let chunk = IcebergArrowConvert
1162                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1163                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1164                let ops = Arc::new(Int32Array::from(
1165                    ops.iter()
1166                        .map(|op| match op {
1167                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1168                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1169                        })
1170                        .collect_vec(),
1171                ));
1172                let mut columns = chunk.columns().to_vec();
1173                columns.push(ops);
1174                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1175                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1176            }
1177        };
1178
1179        let writer = self.writer.get_writer().unwrap();
1180        writer
1181            .write(batch)
1182            .instrument_await("iceberg_write")
1183            .await
1184            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1185        self.metrics.write_bytes.inc_by(write_batch_size as _);
1186        Ok(())
1187    }
1188
1189    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1190    /// writer should commit the current epoch.
1191    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1192        // Skip it if not checkpoint
1193        if !is_checkpoint {
1194            return Ok(None);
1195        }
1196
1197        let close_result = match &mut self.writer {
1198            IcebergWriterDispatch::PartitionAppendOnly {
1199                writer,
1200                writer_builder,
1201            } => {
1202                let close_result = match writer.take() {
1203                    Some(mut writer) => {
1204                        Some(writer.close().instrument_await("iceberg_close").await)
1205                    }
1206                    _ => None,
1207                };
1208                match writer_builder.clone().build().await {
1209                    Ok(new_writer) => {
1210                        *writer = Some(Box::new(new_writer));
1211                    }
1212                    _ => {
1213                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1214                        // here because current writer may close successfully. So we just log the error.
1215                        warn!("Failed to build new writer after close");
1216                    }
1217                }
1218                close_result
1219            }
1220            IcebergWriterDispatch::NonpartitionAppendOnly {
1221                writer,
1222                writer_builder,
1223            } => {
1224                let close_result = match writer.take() {
1225                    Some(mut writer) => {
1226                        Some(writer.close().instrument_await("iceberg_close").await)
1227                    }
1228                    _ => None,
1229                };
1230                match writer_builder.clone().build().await {
1231                    Ok(new_writer) => {
1232                        *writer = Some(Box::new(new_writer));
1233                    }
1234                    _ => {
1235                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1236                        // here because current writer may close successfully. So we just log the error.
1237                        warn!("Failed to build new writer after close");
1238                    }
1239                }
1240                close_result
1241            }
1242            IcebergWriterDispatch::PartitionUpsert {
1243                writer,
1244                writer_builder,
1245                ..
1246            } => {
1247                let close_result = match writer.take() {
1248                    Some(mut writer) => {
1249                        Some(writer.close().instrument_await("iceberg_close").await)
1250                    }
1251                    _ => None,
1252                };
1253                match writer_builder.clone().build().await {
1254                    Ok(new_writer) => {
1255                        *writer = Some(Box::new(new_writer));
1256                    }
1257                    _ => {
1258                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1259                        // here because current writer may close successfully. So we just log the error.
1260                        warn!("Failed to build new writer after close");
1261                    }
1262                }
1263                close_result
1264            }
1265            IcebergWriterDispatch::NonpartitionUpsert {
1266                writer,
1267                writer_builder,
1268                ..
1269            } => {
1270                let close_result = match writer.take() {
1271                    Some(mut writer) => {
1272                        Some(writer.close().instrument_await("iceberg_close").await)
1273                    }
1274                    _ => None,
1275                };
1276                match writer_builder.clone().build().await {
1277                    Ok(new_writer) => {
1278                        *writer = Some(Box::new(new_writer));
1279                    }
1280                    _ => {
1281                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1282                        // here because current writer may close successfully. So we just log the error.
1283                        warn!("Failed to build new writer after close");
1284                    }
1285                }
1286                close_result
1287            }
1288        };
1289
1290        match close_result {
1291            Some(Ok(result)) => {
1292                let version = self.table.metadata().format_version() as u8;
1293                let partition_type = self.table.metadata().default_partition_type();
1294                let data_files = result
1295                    .into_iter()
1296                    .map(|f| {
1297                        SerializedDataFile::try_from(f, partition_type, version == 1)
1298                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1299                    })
1300                    .collect::<Result<Vec<_>>>()?;
1301                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1302                    data_files,
1303                    schema_id: self.table.metadata().current_schema_id(),
1304                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1305                })?))
1306            }
1307            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1308            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1309        }
1310    }
1311
1312    /// Clean up
1313    async fn abort(&mut self) -> Result<()> {
1314        // TODO: abort should clean up all the data written in this epoch.
1315        Ok(())
1316    }
1317}
1318
1319const SCHEMA_ID: &str = "schema_id";
1320const PARTITION_SPEC_ID: &str = "partition_spec_id";
1321const DATA_FILES: &str = "data_files";
1322
1323#[derive(Default, Clone)]
1324struct IcebergCommitResult {
1325    schema_id: i32,
1326    partition_spec_id: i32,
1327    data_files: Vec<SerializedDataFile>,
1328}
1329
1330impl IcebergCommitResult {
1331    fn try_from(value: &SinkMetadata) -> Result<Self> {
1332        if let Some(Serialized(v)) = &value.metadata {
1333            let mut values = if let serde_json::Value::Object(v) =
1334                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1335                    .context("Can't parse iceberg sink metadata")?
1336            {
1337                v
1338            } else {
1339                bail!("iceberg sink metadata should be an object");
1340            };
1341
1342            let schema_id;
1343            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1344                schema_id = value
1345                    .as_u64()
1346                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1347            } else {
1348                bail!("iceberg sink metadata should have schema_id");
1349            }
1350
1351            let partition_spec_id;
1352            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1353                partition_spec_id = value
1354                    .as_u64()
1355                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1356            } else {
1357                bail!("iceberg sink metadata should have partition_spec_id");
1358            }
1359
1360            let data_files: Vec<SerializedDataFile>;
1361            if let serde_json::Value::Array(values) = values
1362                .remove(DATA_FILES)
1363                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1364            {
1365                data_files = values
1366                    .into_iter()
1367                    .map(from_value::<SerializedDataFile>)
1368                    .collect::<std::result::Result<_, _>>()
1369                    .unwrap();
1370            } else {
1371                bail!("iceberg sink metadata should have data_files object");
1372            }
1373
1374            Ok(Self {
1375                schema_id: schema_id as i32,
1376                partition_spec_id: partition_spec_id as i32,
1377                data_files,
1378            })
1379        } else {
1380            bail!("Can't create iceberg sink write result from empty data!")
1381        }
1382    }
1383
1384    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1385        let mut values = if let serde_json::Value::Object(value) =
1386            serde_json::from_slice::<serde_json::Value>(&value)
1387                .context("Can't parse iceberg sink metadata")?
1388        {
1389            value
1390        } else {
1391            bail!("iceberg sink metadata should be an object");
1392        };
1393
1394        let schema_id;
1395        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1396            schema_id = value
1397                .as_u64()
1398                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1399        } else {
1400            bail!("iceberg sink metadata should have schema_id");
1401        }
1402
1403        let partition_spec_id;
1404        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1405            partition_spec_id = value
1406                .as_u64()
1407                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1408        } else {
1409            bail!("iceberg sink metadata should have partition_spec_id");
1410        }
1411
1412        let data_files: Vec<SerializedDataFile>;
1413        if let serde_json::Value::Array(values) = values
1414            .remove(DATA_FILES)
1415            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1416        {
1417            data_files = values
1418                .into_iter()
1419                .map(from_value::<SerializedDataFile>)
1420                .collect::<std::result::Result<_, _>>()
1421                .unwrap();
1422        } else {
1423            bail!("iceberg sink metadata should have data_files object");
1424        }
1425
1426        Ok(Self {
1427            schema_id: schema_id as i32,
1428            partition_spec_id: partition_spec_id as i32,
1429            data_files,
1430        })
1431    }
1432}
1433
1434impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1435    type Error = SinkError;
1436
1437    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1438        let json_data_files = serde_json::Value::Array(
1439            value
1440                .data_files
1441                .iter()
1442                .map(serde_json::to_value)
1443                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1444                .context("Can't serialize data files to json")?,
1445        );
1446        let json_value = serde_json::Value::Object(
1447            vec![
1448                (
1449                    SCHEMA_ID.to_owned(),
1450                    serde_json::Value::Number(value.schema_id.into()),
1451                ),
1452                (
1453                    PARTITION_SPEC_ID.to_owned(),
1454                    serde_json::Value::Number(value.partition_spec_id.into()),
1455                ),
1456                (DATA_FILES.to_owned(), json_data_files),
1457            ]
1458            .into_iter()
1459            .collect(),
1460        );
1461        Ok(SinkMetadata {
1462            metadata: Some(Serialized(SerializedMetadata {
1463                metadata: serde_json::to_vec(&json_value)
1464                    .context("Can't serialize iceberg sink metadata")?,
1465            })),
1466        })
1467    }
1468}
1469
1470impl TryFrom<IcebergCommitResult> for Vec<u8> {
1471    type Error = SinkError;
1472
1473    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1474        let json_data_files = serde_json::Value::Array(
1475            value
1476                .data_files
1477                .iter()
1478                .map(serde_json::to_value)
1479                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1480                .context("Can't serialize data files to json")?,
1481        );
1482        let json_value = serde_json::Value::Object(
1483            vec![
1484                (
1485                    SCHEMA_ID.to_owned(),
1486                    serde_json::Value::Number(value.schema_id.into()),
1487                ),
1488                (
1489                    PARTITION_SPEC_ID.to_owned(),
1490                    serde_json::Value::Number(value.partition_spec_id.into()),
1491                ),
1492                (DATA_FILES.to_owned(), json_data_files),
1493            ]
1494            .into_iter()
1495            .collect(),
1496        );
1497        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1498    }
1499}
1500pub struct IcebergSinkCommitter {
1501    catalog: Arc<dyn Catalog>,
1502    table: Table,
1503    pub last_commit_epoch: u64,
1504    pub(crate) is_exactly_once: bool,
1505    pub(crate) sink_id: u32,
1506    pub(crate) config: IcebergConfig,
1507    pub(crate) param: SinkParam,
1508    pub(crate) db: DatabaseConnection,
1509    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1510    commit_retry_num: u32,
1511    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1512}
1513
1514impl IcebergSinkCommitter {
1515    // Reload table and guarantee current schema_id and partition_spec_id matches
1516    // given `schema_id` and `partition_spec_id`
1517    async fn reload_table(
1518        catalog: &dyn Catalog,
1519        table_ident: &TableIdent,
1520        schema_id: i32,
1521        partition_spec_id: i32,
1522    ) -> Result<Table> {
1523        let table = catalog
1524            .load_table(table_ident)
1525            .await
1526            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1527        if table.metadata().current_schema_id() != schema_id {
1528            return Err(SinkError::Iceberg(anyhow!(
1529                "Schema evolution not supported, expect schema id {}, but got {}",
1530                schema_id,
1531                table.metadata().current_schema_id()
1532            )));
1533        }
1534        if table.metadata().default_partition_spec_id() != partition_spec_id {
1535            return Err(SinkError::Iceberg(anyhow!(
1536                "Partition evolution not supported, expect partition spec id {}, but got {}",
1537                partition_spec_id,
1538                table.metadata().default_partition_spec_id()
1539            )));
1540        }
1541        Ok(table)
1542    }
1543}
1544
1545#[async_trait::async_trait]
1546impl SinkCommitCoordinator for IcebergSinkCommitter {
1547    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1548        if self.is_exactly_once {
1549            self.committed_epoch_subscriber = Some(subscriber);
1550            tracing::info!(
1551                "Sink id = {}: iceberg sink coordinator initing.",
1552                self.param.sink_id.sink_id()
1553            );
1554            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1555                let ordered_metadata_list_by_end_epoch =
1556                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1557
1558                let mut last_recommit_epoch = 0;
1559                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1560                    ordered_metadata_list_by_end_epoch
1561                {
1562                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1563                    let mut write_results = vec![];
1564
1565                    for each in write_results_bytes {
1566                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1567                        write_results.push(write_result);
1568                    }
1569
1570                    match (
1571                        committed,
1572                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1573                            .await?,
1574                    ) {
1575                        (true, _) => {
1576                            tracing::info!(
1577                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1578                                self.param.sink_id.sink_id()
1579                            );
1580                        }
1581                        (false, true) => {
1582                            // skip
1583                            tracing::info!(
1584                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1585                                self.param.sink_id.sink_id()
1586                            );
1587                            mark_row_is_committed_by_sink_id_and_end_epoch(
1588                                &self.db,
1589                                self.sink_id,
1590                                end_epoch,
1591                            )
1592                            .await?;
1593                        }
1594                        (false, false) => {
1595                            tracing::info!(
1596                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1597                                self.param.sink_id.sink_id()
1598                            );
1599                            self.re_commit(end_epoch, write_results, snapshot_id)
1600                                .await?;
1601                        }
1602                    }
1603
1604                    last_recommit_epoch = end_epoch;
1605                }
1606                tracing::info!(
1607                    "Sink id = {}: iceberg commit coordinator inited.",
1608                    self.param.sink_id.sink_id()
1609                );
1610                return Ok(Some(last_recommit_epoch));
1611            } else {
1612                tracing::info!(
1613                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1614                    self.param.sink_id.sink_id()
1615                );
1616                return Ok(None);
1617            }
1618        }
1619
1620        tracing::info!("Iceberg commit coordinator inited.");
1621        return Ok(None);
1622    }
1623
1624    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1625        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1626        let write_results: Vec<IcebergCommitResult> = metadata
1627            .iter()
1628            .map(IcebergCommitResult::try_from)
1629            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1630
1631        // Skip if no data to commit
1632        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1633            tracing::debug!(?epoch, "no data to commit");
1634            return Ok(());
1635        }
1636
1637        // guarantee that all write results has same schema_id and partition_spec_id
1638        if write_results
1639            .iter()
1640            .any(|r| r.schema_id != write_results[0].schema_id)
1641            || write_results
1642                .iter()
1643                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1644        {
1645            return Err(SinkError::Iceberg(anyhow!(
1646                "schema_id and partition_spec_id should be the same in all write results"
1647            )));
1648        }
1649
1650        if self.is_exactly_once {
1651            assert!(self.committed_epoch_subscriber.is_some());
1652            match self.committed_epoch_subscriber.clone() {
1653                Some(committed_epoch_subscriber) => {
1654                    // Get the latest committed_epoch and the receiver
1655                    let (committed_epoch, mut rw_futures_utilrx) =
1656                        committed_epoch_subscriber(self.param.sink_id).await?;
1657                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1658                    if committed_epoch >= epoch {
1659                        self.commit_iceberg_inner(epoch, write_results, None)
1660                            .await?;
1661                    } else {
1662                        tracing::info!(
1663                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1664                            committed_epoch,
1665                            epoch
1666                        );
1667                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1668                            tracing::info!(
1669                                "Received next committed epoch: {}",
1670                                next_committed_epoch
1671                            );
1672                            // If next_epoch meets the condition, execute commit immediately
1673                            if next_committed_epoch >= epoch {
1674                                self.commit_iceberg_inner(epoch, write_results, None)
1675                                    .await?;
1676                                break;
1677                            }
1678                        }
1679                    }
1680                }
1681                None => unreachable!(
1682                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1683                ),
1684            }
1685        } else {
1686            self.commit_iceberg_inner(epoch, write_results, None)
1687                .await?;
1688        }
1689
1690        Ok(())
1691    }
1692}
1693
1694/// Methods Required to Achieve Exactly Once Semantics
1695impl IcebergSinkCommitter {
1696    async fn re_commit(
1697        &mut self,
1698        epoch: u64,
1699        write_results: Vec<IcebergCommitResult>,
1700        snapshot_id: i64,
1701    ) -> Result<()> {
1702        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1703
1704        // Skip if no data to commit
1705        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1706            tracing::debug!(?epoch, "no data to commit");
1707            return Ok(());
1708        }
1709        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1710            .await?;
1711        Ok(())
1712    }
1713
1714    async fn commit_iceberg_inner(
1715        &mut self,
1716        epoch: u64,
1717        write_results: Vec<IcebergCommitResult>,
1718        snapshot_id: Option<i64>,
1719    ) -> Result<()> {
1720        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1721        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1722        // that was previously persisted in the system table to commit.
1723        let is_first_commit = snapshot_id.is_none();
1724        self.last_commit_epoch = epoch;
1725        let expect_schema_id = write_results[0].schema_id;
1726        let expect_partition_spec_id = write_results[0].partition_spec_id;
1727
1728        // Load the latest table to avoid concurrent modification with the best effort.
1729        self.table = Self::reload_table(
1730            self.catalog.as_ref(),
1731            self.table.identifier(),
1732            expect_schema_id,
1733            expect_partition_spec_id,
1734        )
1735        .await?;
1736        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1737            return Err(SinkError::Iceberg(anyhow!(
1738                "Can't find schema by id {}",
1739                expect_schema_id
1740            )));
1741        };
1742        let Some(partition_spec) = self
1743            .table
1744            .metadata()
1745            .partition_spec_by_id(expect_partition_spec_id)
1746        else {
1747            return Err(SinkError::Iceberg(anyhow!(
1748                "Can't find partition spec by id {}",
1749                expect_partition_spec_id
1750            )));
1751        };
1752        let partition_type = partition_spec
1753            .as_ref()
1754            .clone()
1755            .partition_type(schema)
1756            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1757
1758        let txn = Transaction::new(&self.table);
1759        // Only generate new snapshot id when first commit.
1760        let snapshot_id = match snapshot_id {
1761            Some(previous_snapshot_id) => previous_snapshot_id,
1762            None => txn.generate_unique_snapshot_id(),
1763        };
1764        if self.is_exactly_once && is_first_commit {
1765            // persist pre commit metadata and snapshot id in system table.
1766            let mut pre_commit_metadata_bytes = Vec::new();
1767            for each_parallelism_write_result in write_results.clone() {
1768                let each_parallelism_write_result_bytes: Vec<u8> =
1769                    each_parallelism_write_result.try_into()?;
1770                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1771            }
1772
1773            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1774
1775            persist_pre_commit_metadata(
1776                self.sink_id,
1777                self.db.clone(),
1778                self.last_commit_epoch,
1779                epoch,
1780                pre_commit_metadata_bytes,
1781                snapshot_id,
1782            )
1783            .await?;
1784        }
1785
1786        let data_files = write_results
1787            .into_iter()
1788            .flat_map(|r| {
1789                r.data_files.into_iter().map(|f| {
1790                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1791                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1792                })
1793            })
1794            .collect::<Result<Vec<DataFile>>>()?;
1795        // # TODO:
1796        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1797        // because retry logic involved reapply the commit metadata.
1798        // For now, we just retry the commit operation.
1799        let retry_strategy = ExponentialBackoff::from_millis(10)
1800            .max_delay(Duration::from_secs(60))
1801            .map(jitter)
1802            .take(self.commit_retry_num as usize);
1803        let catalog = self.catalog.clone();
1804        let table_ident = self.table.identifier().clone();
1805        let table = Retry::spawn(retry_strategy, || async {
1806            let table = Self::reload_table(
1807                catalog.as_ref(),
1808                &table_ident,
1809                expect_schema_id,
1810                expect_partition_spec_id,
1811            )
1812            .await?;
1813            let txn = Transaction::new(&table);
1814            let mut append_action = txn
1815                .fast_append(Some(snapshot_id), None, vec![])
1816                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1817            append_action
1818                .add_data_files(data_files.clone())
1819                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1820            let tx = append_action.apply().await.map_err(|err| {
1821                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1822                SinkError::Iceberg(anyhow!(err))
1823            })?;
1824            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1825                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1826                SinkError::Iceberg(anyhow!(err))
1827            })
1828        })
1829        .await?;
1830        self.table = table;
1831
1832        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1833
1834        if self.is_exactly_once {
1835            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1836            tracing::info!(
1837                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1838                self.sink_id,
1839                epoch
1840            );
1841
1842            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1843        }
1844        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1845            && self.config.enable_compaction
1846            && iceberg_compact_stat_sender
1847                .send(IcebergSinkCompactionUpdate {
1848                    sink_id: SinkId::new(self.sink_id),
1849                    compaction_interval: self.config.compaction_interval_sec(),
1850                })
1851                .is_err()
1852        {
1853            warn!("failed to send iceberg compaction stats");
1854        }
1855
1856        Ok(())
1857    }
1858
1859    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1860    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1861    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
1862    async fn is_snapshot_id_in_iceberg(
1863        &self,
1864        iceberg_config: &IcebergConfig,
1865        snapshot_id: i64,
1866    ) -> Result<bool> {
1867        let iceberg_common = iceberg_config.common.clone();
1868        let table = iceberg_common
1869            .load_table(&iceberg_config.java_catalog_props)
1870            .await?;
1871        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1872            Ok(true)
1873        } else {
1874            Ok(false)
1875        }
1876    }
1877}
1878
1879const MAP_KEY: &str = "key";
1880const MAP_VALUE: &str = "value";
1881
1882fn get_fields<'a>(
1883    our_field_type: &'a risingwave_common::types::DataType,
1884    data_type: &ArrowDataType,
1885    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1886) -> Option<ArrowFields> {
1887    match data_type {
1888        ArrowDataType::Struct(fields) => {
1889            match our_field_type {
1890                risingwave_common::types::DataType::Struct(struct_fields) => {
1891                    struct_fields.iter().for_each(|(name, data_type)| {
1892                        let res = schema_fields.insert(name, data_type);
1893                        // This assert is to make sure there is no duplicate field name in the schema.
1894                        assert!(res.is_none())
1895                    });
1896                }
1897                risingwave_common::types::DataType::Map(map_fields) => {
1898                    schema_fields.insert(MAP_KEY, map_fields.key());
1899                    schema_fields.insert(MAP_VALUE, map_fields.value());
1900                }
1901                risingwave_common::types::DataType::List(list_field) => {
1902                    list_field.as_struct().iter().for_each(|(name, data_type)| {
1903                        let res = schema_fields.insert(name, data_type);
1904                        // This assert is to make sure there is no duplicate field name in the schema.
1905                        assert!(res.is_none())
1906                    });
1907                }
1908                _ => {}
1909            };
1910            Some(fields.clone())
1911        }
1912        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1913            get_fields(our_field_type, field.data_type(), schema_fields)
1914        }
1915        _ => None, // not a supported complex type and unlikely to show up
1916    }
1917}
1918
1919fn check_compatibility(
1920    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1921    fields: &ArrowFields,
1922) -> anyhow::Result<bool> {
1923    for arrow_field in fields {
1924        let our_field_type = schema_fields
1925            .get(arrow_field.name().as_str())
1926            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1927
1928        // Iceberg source should be able to read iceberg decimal type.
1929        let converted_arrow_data_type = IcebergArrowConvert
1930            .to_arrow_field("", our_field_type)
1931            .map_err(|e| anyhow!(e))?
1932            .data_type()
1933            .clone();
1934
1935        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1936            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1937            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1938            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1939            (ArrowDataType::List(_), ArrowDataType::List(field))
1940            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1941                let mut schema_fields = HashMap::new();
1942                get_fields(our_field_type, field.data_type(), &mut schema_fields)
1943                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1944            }
1945            // validate nested structs
1946            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1947                let mut schema_fields = HashMap::new();
1948                our_field_type
1949                    .as_struct()
1950                    .iter()
1951                    .for_each(|(name, data_type)| {
1952                        let res = schema_fields.insert(name, data_type);
1953                        // This assert is to make sure there is no duplicate field name in the schema.
1954                        assert!(res.is_none())
1955                    });
1956                check_compatibility(schema_fields, fields)?
1957            }
1958            // cases where left != right (metadata, field name mismatch)
1959            //
1960            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
1961            // {"PARQUET:field_id": ".."}
1962            //
1963            // map: The standard name in arrow is "entries", "key", "value".
1964            // in iceberg-rs, it's called "key_value"
1965            (left, right) => left.equals_datatype(right),
1966        };
1967        if !compatible {
1968            bail!(
1969                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
1970                arrow_field.name(),
1971                converted_arrow_data_type,
1972                arrow_field.data_type()
1973            );
1974        }
1975    }
1976    Ok(true)
1977}
1978
1979/// Try to match our schema with iceberg schema.
1980pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
1981    if rw_schema.fields.len() != arrow_schema.fields().len() {
1982        bail!(
1983            "Schema length mismatch, risingwave is {}, and iceberg is {}",
1984            rw_schema.fields.len(),
1985            arrow_schema.fields.len()
1986        );
1987    }
1988
1989    let mut schema_fields = HashMap::new();
1990    rw_schema.fields.iter().for_each(|field| {
1991        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
1992        // This assert is to make sure there is no duplicate field name in the schema.
1993        assert!(res.is_none())
1994    });
1995
1996    check_compatibility(schema_fields, &arrow_schema.fields)?;
1997    Ok(())
1998}
1999
2000pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2001    serde_json::to_vec(&metadata).unwrap()
2002}
2003
2004pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2005    serde_json::from_slice(&bytes).unwrap()
2006}
2007
2008pub fn parse_partition_by_exprs(
2009    expr: String,
2010) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2011    // captures column, transform(column), transform(n,column), transform(n, column)
2012    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2013    if !re.is_match(&expr) {
2014        bail!(format!(
2015            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2016            expr
2017        ))
2018    }
2019    let caps = re.captures_iter(&expr);
2020
2021    let mut partition_columns = vec![];
2022
2023    for mat in caps {
2024        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2025            (&mat["transform"], Transform::Identity)
2026        } else {
2027            let mut func = mat["transform"].to_owned();
2028            if func == "bucket" || func == "truncate" {
2029                let n = &mat
2030                    .name("n")
2031                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2032                    .as_str();
2033                func = format!("{func}[{n}]");
2034            }
2035            (
2036                &mat["field"],
2037                Transform::from_str(&func)
2038                    .with_context(|| format!("invalid transform function {}", func))?,
2039            )
2040        };
2041        partition_columns.push((column.to_owned(), transform));
2042    }
2043    Ok(partition_columns)
2044}
2045
2046#[cfg(test)]
2047mod test {
2048    use std::collections::BTreeMap;
2049
2050    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2051    use risingwave_common::catalog::Field;
2052    use risingwave_common::types::{DataType, MapType, StructType};
2053
2054    use crate::connector_common::IcebergCommon;
2055    use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2056    use crate::sink::iceberg::IcebergConfig;
2057
2058    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2059
2060    #[test]
2061    fn test_compatible_arrow_schema() {
2062        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2063
2064        use super::*;
2065        let risingwave_schema = Schema::new(vec![
2066            Field::with_name(DataType::Int32, "a"),
2067            Field::with_name(DataType::Int32, "b"),
2068            Field::with_name(DataType::Int32, "c"),
2069        ]);
2070        let arrow_schema = ArrowSchema::new(vec![
2071            ArrowField::new("a", ArrowDataType::Int32, false),
2072            ArrowField::new("b", ArrowDataType::Int32, false),
2073            ArrowField::new("c", ArrowDataType::Int32, false),
2074        ]);
2075
2076        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2077
2078        let risingwave_schema = Schema::new(vec![
2079            Field::with_name(DataType::Int32, "d"),
2080            Field::with_name(DataType::Int32, "c"),
2081            Field::with_name(DataType::Int32, "a"),
2082            Field::with_name(DataType::Int32, "b"),
2083        ]);
2084        let arrow_schema = ArrowSchema::new(vec![
2085            ArrowField::new("a", ArrowDataType::Int32, false),
2086            ArrowField::new("b", ArrowDataType::Int32, false),
2087            ArrowField::new("d", ArrowDataType::Int32, false),
2088            ArrowField::new("c", ArrowDataType::Int32, false),
2089        ]);
2090        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2091
2092        let risingwave_schema = Schema::new(vec![
2093            Field::with_name(
2094                DataType::Struct(StructType::new(vec![
2095                    ("a1", DataType::Int32),
2096                    (
2097                        "a2",
2098                        DataType::Struct(StructType::new(vec![
2099                            ("a21", DataType::Bytea),
2100                            (
2101                                "a22",
2102                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2103                            ),
2104                        ])),
2105                    ),
2106                ])),
2107                "a",
2108            ),
2109            Field::with_name(
2110                DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2111                    ("b1", DataType::Int32),
2112                    ("b2", DataType::Bytea),
2113                    (
2114                        "b3",
2115                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2116                    ),
2117                ])))),
2118                "b",
2119            ),
2120            Field::with_name(
2121                DataType::Map(MapType::from_kv(
2122                    DataType::Varchar,
2123                    DataType::List(Box::new(DataType::Struct(StructType::new([
2124                        ("c1", DataType::Int32),
2125                        ("c2", DataType::Bytea),
2126                        (
2127                            "c3",
2128                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2129                        ),
2130                    ])))),
2131                )),
2132                "c",
2133            ),
2134        ]);
2135        let arrow_schema = ArrowSchema::new(vec![
2136            ArrowField::new(
2137                "a",
2138                ArrowDataType::Struct(ArrowFields::from(vec![
2139                    ArrowField::new("a1", ArrowDataType::Int32, false),
2140                    ArrowField::new(
2141                        "a2",
2142                        ArrowDataType::Struct(ArrowFields::from(vec![
2143                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2144                            ArrowField::new_map(
2145                                "a22",
2146                                "entries",
2147                                ArrowFieldRef::new(ArrowField::new(
2148                                    "key",
2149                                    ArrowDataType::Utf8,
2150                                    false,
2151                                )),
2152                                ArrowFieldRef::new(ArrowField::new(
2153                                    "value",
2154                                    ArrowDataType::Utf8,
2155                                    false,
2156                                )),
2157                                false,
2158                                false,
2159                            ),
2160                        ])),
2161                        false,
2162                    ),
2163                ])),
2164                false,
2165            ),
2166            ArrowField::new(
2167                "b",
2168                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2169                    ArrowDataType::Struct(ArrowFields::from(vec![
2170                        ArrowField::new("b1", ArrowDataType::Int32, false),
2171                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2172                        ArrowField::new_map(
2173                            "b3",
2174                            "entries",
2175                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2176                            ArrowFieldRef::new(ArrowField::new(
2177                                "value",
2178                                ArrowDataType::Utf8,
2179                                false,
2180                            )),
2181                            false,
2182                            false,
2183                        ),
2184                    ])),
2185                    false,
2186                ))),
2187                false,
2188            ),
2189            ArrowField::new_map(
2190                "c",
2191                "entries",
2192                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2193                ArrowFieldRef::new(ArrowField::new(
2194                    "value",
2195                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2196                        ArrowDataType::Struct(ArrowFields::from(vec![
2197                            ArrowField::new("c1", ArrowDataType::Int32, false),
2198                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2199                            ArrowField::new_map(
2200                                "c3",
2201                                "entries",
2202                                ArrowFieldRef::new(ArrowField::new(
2203                                    "key",
2204                                    ArrowDataType::Utf8,
2205                                    false,
2206                                )),
2207                                ArrowFieldRef::new(ArrowField::new(
2208                                    "value",
2209                                    ArrowDataType::Utf8,
2210                                    false,
2211                                )),
2212                                false,
2213                                false,
2214                            ),
2215                        ])),
2216                        false,
2217                    ))),
2218                    false,
2219                )),
2220                false,
2221                false,
2222            ),
2223        ]);
2224        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2225    }
2226
2227    #[test]
2228    fn test_parse_iceberg_config() {
2229        let values = [
2230            ("connector", "iceberg"),
2231            ("type", "upsert"),
2232            ("primary_key", "v1"),
2233            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2234            ("warehouse.path", "s3://iceberg"),
2235            ("s3.endpoint", "http://127.0.0.1:9301"),
2236            ("s3.access.key", "hummockadmin"),
2237            ("s3.secret.key", "hummockadmin"),
2238            ("s3.path.style.access", "true"),
2239            ("s3.region", "us-east-1"),
2240            ("catalog.type", "jdbc"),
2241            ("catalog.name", "demo"),
2242            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2243            ("catalog.jdbc.user", "admin"),
2244            ("catalog.jdbc.password", "123456"),
2245            ("database.name", "demo_db"),
2246            ("table.name", "demo_table"),
2247            ("enable_compaction", "true"),
2248            ("compaction_interval_sec", "1800"),
2249            ("enable_snapshot_expiration", "true"),
2250        ]
2251        .into_iter()
2252        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2253        .collect();
2254
2255        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2256
2257        let expected_iceberg_config = IcebergConfig {
2258            common: IcebergCommon {
2259                warehouse_path: Some("s3://iceberg".to_owned()),
2260                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2261                region: Some("us-east-1".to_owned()),
2262                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2263                access_key: Some("hummockadmin".to_owned()),
2264                secret_key: Some("hummockadmin".to_owned()),
2265                gcs_credential: None,
2266                catalog_type: Some("jdbc".to_owned()),
2267                glue_id: None,
2268                catalog_name: Some("demo".to_owned()),
2269                database_name: Some("demo_db".to_owned()),
2270                table_name: "demo_table".to_owned(),
2271                path_style_access: Some(true),
2272                credential: None,
2273                oauth2_server_uri: None,
2274                scope: None,
2275                token: None,
2276                enable_config_load: None,
2277                rest_signing_name: None,
2278                rest_signing_region: None,
2279                rest_sigv4_enabled: None,
2280                hosted_catalog: None,
2281                azblob_account_name: None,
2282                azblob_account_key: None,
2283                azblob_endpoint_url: None,
2284            },
2285            r#type: "upsert".to_owned(),
2286            force_append_only: false,
2287            primary_key: Some(vec!["v1".to_owned()]),
2288            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2289            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2290                .into_iter()
2291                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2292                .collect(),
2293            commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2294            create_table_if_not_exists: false,
2295            is_exactly_once: None,
2296            commit_retry_num: 8,
2297            enable_compaction: true,
2298            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2299            enable_snapshot_expiration: true,
2300        };
2301
2302        assert_eq!(iceberg_config, expected_iceberg_config);
2303
2304        assert_eq!(
2305            &iceberg_config.common.full_table_name().unwrap().to_string(),
2306            "demo_db.demo_table"
2307        );
2308    }
2309
2310    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2311        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2312
2313        let table = iceberg_config.load_table().await.unwrap();
2314
2315        println!("{:?}", table.identifier());
2316    }
2317
2318    #[tokio::test]
2319    #[ignore]
2320    async fn test_storage_catalog() {
2321        let values = [
2322            ("connector", "iceberg"),
2323            ("type", "append-only"),
2324            ("force_append_only", "true"),
2325            ("s3.endpoint", "http://127.0.0.1:9301"),
2326            ("s3.access.key", "hummockadmin"),
2327            ("s3.secret.key", "hummockadmin"),
2328            ("s3.region", "us-east-1"),
2329            ("s3.path.style.access", "true"),
2330            ("catalog.name", "demo"),
2331            ("catalog.type", "storage"),
2332            ("warehouse.path", "s3://icebergdata/demo"),
2333            ("database.name", "s1"),
2334            ("table.name", "t1"),
2335        ]
2336        .into_iter()
2337        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2338        .collect();
2339
2340        test_create_catalog(values).await;
2341    }
2342
2343    #[tokio::test]
2344    #[ignore]
2345    async fn test_rest_catalog() {
2346        let values = [
2347            ("connector", "iceberg"),
2348            ("type", "append-only"),
2349            ("force_append_only", "true"),
2350            ("s3.endpoint", "http://127.0.0.1:9301"),
2351            ("s3.access.key", "hummockadmin"),
2352            ("s3.secret.key", "hummockadmin"),
2353            ("s3.region", "us-east-1"),
2354            ("s3.path.style.access", "true"),
2355            ("catalog.name", "demo"),
2356            ("catalog.type", "rest"),
2357            ("catalog.uri", "http://192.168.167.4:8181"),
2358            ("warehouse.path", "s3://icebergdata/demo"),
2359            ("database.name", "s1"),
2360            ("table.name", "t1"),
2361        ]
2362        .into_iter()
2363        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2364        .collect();
2365
2366        test_create_catalog(values).await;
2367    }
2368
2369    #[tokio::test]
2370    #[ignore]
2371    async fn test_jdbc_catalog() {
2372        let values = [
2373            ("connector", "iceberg"),
2374            ("type", "append-only"),
2375            ("force_append_only", "true"),
2376            ("s3.endpoint", "http://127.0.0.1:9301"),
2377            ("s3.access.key", "hummockadmin"),
2378            ("s3.secret.key", "hummockadmin"),
2379            ("s3.region", "us-east-1"),
2380            ("s3.path.style.access", "true"),
2381            ("catalog.name", "demo"),
2382            ("catalog.type", "jdbc"),
2383            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2384            ("catalog.jdbc.user", "admin"),
2385            ("catalog.jdbc.password", "123456"),
2386            ("warehouse.path", "s3://icebergdata/demo"),
2387            ("database.name", "s1"),
2388            ("table.name", "t1"),
2389        ]
2390        .into_iter()
2391        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2392        .collect();
2393
2394        test_create_catalog(values).await;
2395    }
2396
2397    #[tokio::test]
2398    #[ignore]
2399    async fn test_hive_catalog() {
2400        let values = [
2401            ("connector", "iceberg"),
2402            ("type", "append-only"),
2403            ("force_append_only", "true"),
2404            ("s3.endpoint", "http://127.0.0.1:9301"),
2405            ("s3.access.key", "hummockadmin"),
2406            ("s3.secret.key", "hummockadmin"),
2407            ("s3.region", "us-east-1"),
2408            ("s3.path.style.access", "true"),
2409            ("catalog.name", "demo"),
2410            ("catalog.type", "hive"),
2411            ("catalog.uri", "thrift://localhost:9083"),
2412            ("warehouse.path", "s3://icebergdata/demo"),
2413            ("database.name", "s1"),
2414            ("table.name", "t1"),
2415        ]
2416        .into_iter()
2417        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2418        .collect();
2419
2420        test_create_catalog(values).await;
2421    }
2422}