risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
27use iceberg::spec::{
28    DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
29};
30use iceberg::table::Table;
31use iceberg::transaction::Transaction;
32use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
33use iceberg::writer::base_writer::equality_delete_writer::{
34    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
35};
36use iceberg::writer::base_writer::sort_position_delete_writer::{
37    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
38};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41    DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::function_writer::equality_delta_writer::{
44    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
45};
46use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
47use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
48use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
49use itertools::Itertools;
50use parquet::file::properties::WriterProperties;
51use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
52use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
53use regex::Regex;
54use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
55use risingwave_common::array::arrow::arrow_schema_iceberg::{
56    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
57    Schema as ArrowSchema, SchemaRef,
58};
59use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
60use risingwave_common::array::{Op, StreamChunk};
61use risingwave_common::bail;
62use risingwave_common::bitmap::Bitmap;
63use risingwave_common::catalog::Schema;
64use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
65use risingwave_common_estimate_size::EstimateSize;
66use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::{
71    ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
72    Set,
73};
74use serde_derive::Deserialize;
75use serde_json::from_value;
76use serde_with::{DisplayFromStr, serde_as};
77use thiserror_ext::AsReport;
78use tokio::sync::mpsc::{self};
79use tokio::sync::oneshot;
80use tokio_retry::Retry;
81use tokio_retry::strategy::{ExponentialBackoff, jitter};
82use tracing::warn;
83use url::Url;
84use uuid::Uuid;
85use with_options::WithOptions;
86
87use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
88use super::{
89    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
90    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
91};
92use crate::connector_common::IcebergCommon;
93use crate::enforce_secret::EnforceSecret;
94use crate::sink::coordinate::CoordinatedLogSinker;
95use crate::sink::writer::SinkWriter;
96use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
97use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
98
99pub const ICEBERG_SINK: &str = "iceberg";
100
101fn default_commit_retry_num() -> u32 {
102    8
103}
104
105#[serde_as]
106#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
107pub struct IcebergConfig {
108    pub r#type: String, // accept "append-only" or "upsert"
109
110    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
111    pub force_append_only: bool,
112
113    #[serde(flatten)]
114    common: IcebergCommon,
115
116    #[serde(
117        rename = "primary_key",
118        default,
119        deserialize_with = "deserialize_optional_string_seq_from_string"
120    )]
121    pub primary_key: Option<Vec<String>>,
122
123    // Props for java catalog props.
124    #[serde(skip)]
125    pub java_catalog_props: HashMap<String, String>,
126
127    #[serde(default)]
128    pub partition_by: Option<String>,
129
130    /// Commit every n(>0) checkpoints, default is 10.
131    #[serde(default = "default_commit_checkpoint_interval")]
132    #[serde_as(as = "DisplayFromStr")]
133    pub commit_checkpoint_interval: u64,
134
135    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
136    pub create_table_if_not_exists: bool,
137
138    /// Whether it is exactly_once, the default is not.
139    #[serde(default)]
140    #[serde_as(as = "Option<DisplayFromStr>")]
141    pub is_exactly_once: Option<bool>,
142    // Retry commit num when iceberg commit fail. default is 8.
143    // # TODO
144    // Iceberg table may store the retry commit num in table meta.
145    // We should try to find and use that as default commit retry num first.
146    #[serde(default = "default_commit_retry_num")]
147    pub commit_retry_num: u32,
148}
149
150impl EnforceSecret for IcebergConfig {
151    fn enforce_secret<'a>(
152        prop_iter: impl Iterator<Item = &'a str>,
153    ) -> crate::error::ConnectorResult<()> {
154        for prop in prop_iter {
155            IcebergCommon::enforce_one(prop)?;
156        }
157        Ok(())
158    }
159
160    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
161        IcebergCommon::enforce_one(prop)
162    }
163}
164
165impl IcebergConfig {
166    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
167        let mut config =
168            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
169                .map_err(|e| SinkError::Config(anyhow!(e)))?;
170
171        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
172            return Err(SinkError::Config(anyhow!(
173                "`{}` must be {}, or {}",
174                SINK_TYPE_OPTION,
175                SINK_TYPE_APPEND_ONLY,
176                SINK_TYPE_UPSERT
177            )));
178        }
179
180        if config.r#type == SINK_TYPE_UPSERT {
181            if let Some(primary_key) = &config.primary_key {
182                if primary_key.is_empty() {
183                    return Err(SinkError::Config(anyhow!(
184                        "`primary_key` must not be empty in {}",
185                        SINK_TYPE_UPSERT
186                    )));
187                }
188            } else {
189                return Err(SinkError::Config(anyhow!(
190                    "Must set `primary_key` in {}",
191                    SINK_TYPE_UPSERT
192                )));
193            }
194        }
195
196        // All configs start with "catalog." will be treated as java configs.
197        config.java_catalog_props = values
198            .iter()
199            .filter(|(k, _v)| {
200                k.starts_with("catalog.")
201                    && k != &"catalog.uri"
202                    && k != &"catalog.type"
203                    && k != &"catalog.name"
204            })
205            .map(|(k, v)| (k[8..].to_string(), v.to_string()))
206            .collect();
207
208        if config.commit_checkpoint_interval == 0 {
209            return Err(SinkError::Config(anyhow!(
210                "`commit_checkpoint_interval` must be greater than 0"
211            )));
212        }
213
214        Ok(config)
215    }
216
217    pub fn catalog_type(&self) -> &str {
218        self.common.catalog_type()
219    }
220
221    pub async fn load_table(&self) -> Result<Table> {
222        self.common
223            .load_table(&self.java_catalog_props)
224            .await
225            .map_err(Into::into)
226    }
227
228    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
229        self.common
230            .create_catalog(&self.java_catalog_props)
231            .await
232            .map_err(Into::into)
233    }
234
235    pub fn full_table_name(&self) -> Result<TableIdent> {
236        self.common.full_table_name().map_err(Into::into)
237    }
238}
239
240pub struct IcebergSink {
241    config: IcebergConfig,
242    param: SinkParam,
243    // In upsert mode, it never be None and empty.
244    unique_column_ids: Option<Vec<usize>>,
245}
246
247impl EnforceSecret for IcebergSink {
248    fn enforce_secret<'a>(
249        prop_iter: impl Iterator<Item = &'a str>,
250    ) -> crate::error::ConnectorResult<()> {
251        for prop in prop_iter {
252            IcebergConfig::enforce_one(prop)?;
253        }
254        Ok(())
255    }
256}
257
258impl TryFrom<SinkParam> for IcebergSink {
259    type Error = SinkError;
260
261    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
262        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
263        IcebergSink::new(config, param)
264    }
265}
266
267impl Debug for IcebergSink {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("IcebergSink")
270            .field("config", &self.config)
271            .finish()
272    }
273}
274
275impl IcebergSink {
276    async fn create_and_validate_table(&self) -> Result<Table> {
277        if self.config.create_table_if_not_exists {
278            self.create_table_if_not_exists().await?;
279        }
280
281        let table = self
282            .config
283            .load_table()
284            .await
285            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
286
287        let sink_schema = self.param.schema();
288        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
289            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
290
291        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
292            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
293
294        Ok(table)
295    }
296
297    async fn create_table_if_not_exists(&self) -> Result<()> {
298        let catalog = self.config.create_catalog().await?;
299        let namespace = if let Some(database_name) = &self.config.common.database_name {
300            let namespace = NamespaceIdent::new(database_name.clone());
301            if !catalog
302                .namespace_exists(&namespace)
303                .await
304                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
305            {
306                catalog
307                    .create_namespace(&namespace, HashMap::default())
308                    .await
309                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
310                    .context("failed to create iceberg namespace")?;
311            }
312            namespace
313        } else {
314            bail!("database name must be set if you want to create table")
315        };
316
317        let table_id = self
318            .config
319            .full_table_name()
320            .context("Unable to parse table name")?;
321        if !catalog
322            .table_exists(&table_id)
323            .await
324            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
325        {
326            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
327            // convert risingwave schema -> arrow schema -> iceberg schema
328            let arrow_fields = self
329                .param
330                .columns
331                .iter()
332                .map(|column| {
333                    Ok(iceberg_create_table_arrow_convert
334                        .to_arrow_field(&column.name, &column.data_type)
335                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
336                        .context(format!(
337                            "failed to convert {}: {} to arrow type",
338                            &column.name, &column.data_type
339                        ))?)
340                })
341                .collect::<Result<Vec<ArrowField>>>()?;
342            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
343            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
344                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
345                .context("failed to convert arrow schema to iceberg schema")?;
346
347            let location = {
348                let mut names = namespace.clone().inner();
349                names.push(self.config.common.table_name.clone());
350                match &self.config.common.warehouse_path {
351                    Some(warehouse_path) => {
352                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
353                        let url = Url::parse(warehouse_path);
354                        if url.is_err() || is_s3_tables {
355                            // For rest catalog, the warehouse_path could be a warehouse name.
356                            // In this case, we should specify the location when creating a table.
357                            if self.config.common.catalog_type() == "rest"
358                                || self.config.common.catalog_type() == "rest_rust"
359                            {
360                                None
361                            } else {
362                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
363                            }
364                        } else if warehouse_path.ends_with('/') {
365                            Some(format!("{}{}", warehouse_path, names.join("/")))
366                        } else {
367                            Some(format!("{}/{}", warehouse_path, names.join("/")))
368                        }
369                    }
370                    None => None,
371                }
372            };
373
374            let partition_spec = match &self.config.partition_by {
375                Some(partition_field) => {
376                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
377                    // captures column, transform(column), transform(n,column), transform(n, column)
378                    let re = Regex::new(
379                        r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?",
380                    )
381                    .unwrap();
382                    if !re.is_match(partition_field) {
383                        bail!(format!(
384                            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
385                            partition_field
386                        ))
387                    }
388                    let caps = re.captures_iter(partition_field);
389                    for (i, mat) in caps.enumerate() {
390                        let (column, transform) =
391                            if mat.name("n").is_none() && mat.name("field").is_none() {
392                                (&mat["transform"], Transform::Identity)
393                            } else {
394                                let mut func = mat["transform"].to_owned();
395                                if func == "bucket" || func == "truncate" {
396                                    let n = &mat
397                                        .name("n")
398                                        .ok_or_else(|| {
399                                            SinkError::Iceberg(anyhow!(
400                                                "The `n` must be set with `bucket` and `truncate`"
401                                            ))
402                                        })?
403                                        .as_str();
404                                    func = format!("{func}[{n}]");
405                                }
406                                (
407                                    &mat["field"],
408                                    Transform::from_str(&func)
409                                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?,
410                                )
411                            };
412
413                        match iceberg_schema.field_id_by_name(column) {
414                            Some(id) => partition_fields.push(
415                                UnboundPartitionField::builder()
416                                    .source_id(id)
417                                    .transform(transform)
418                                    .name(format!("_p_{}", column))
419                                    .field_id(i as i32)
420                                    .build(),
421                            ),
422                            None => bail!(format!(
423                                "Partition source column does not exist in schema: {}",
424                                column
425                            )),
426                        };
427                    }
428                    Some(
429                        UnboundPartitionSpec::builder()
430                            .with_spec_id(0)
431                            .add_partition_fields(partition_fields)
432                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
433                            .context("failed to add partition columns")?
434                            .build(),
435                    )
436                }
437                None => None,
438            };
439
440            let table_creation_builder = TableCreation::builder()
441                .name(self.config.common.table_name.clone())
442                .schema(iceberg_schema);
443
444            let table_creation = match (location, partition_spec) {
445                (Some(location), Some(partition_spec)) => table_creation_builder
446                    .location(location)
447                    .partition_spec(partition_spec)
448                    .build(),
449                (Some(location), None) => table_creation_builder.location(location).build(),
450                (None, Some(partition_spec)) => table_creation_builder
451                    .partition_spec(partition_spec)
452                    .build(),
453                (None, None) => table_creation_builder.build(),
454            };
455
456            catalog
457                .create_table(&namespace, table_creation)
458                .await
459                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
460                .context("failed to create iceberg table")?;
461        }
462        Ok(())
463    }
464
465    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
466        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
467            if let Some(pk) = &config.primary_key {
468                let mut unique_column_ids = Vec::with_capacity(pk.len());
469                for col_name in pk {
470                    let id = param
471                        .columns
472                        .iter()
473                        .find(|col| col.name.as_str() == col_name)
474                        .ok_or_else(|| {
475                            SinkError::Config(anyhow!(
476                                "Primary key column {} not found in sink schema",
477                                col_name
478                            ))
479                        })?
480                        .column_id
481                        .get_id() as usize;
482                    unique_column_ids.push(id);
483                }
484                Some(unique_column_ids)
485            } else {
486                unreachable!()
487            }
488        } else {
489            None
490        };
491        Ok(Self {
492            config,
493            param,
494            unique_column_ids,
495        })
496    }
497}
498
499impl Sink for IcebergSink {
500    type Coordinator = IcebergSinkCommitter;
501    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
502
503    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
504    const SINK_NAME: &'static str = ICEBERG_SINK;
505
506    async fn validate(&self) -> Result<()> {
507        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
508            bail!("Snowflake catalog only supports iceberg sources");
509        }
510        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
511            risingwave_common::license::Feature::IcebergSinkWithGlue
512                .check_available()
513                .map_err(|e| anyhow::anyhow!(e))?;
514        }
515        let _ = self.create_and_validate_table().await?;
516        Ok(())
517    }
518
519    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
520        IcebergConfig::from_btreemap(config.clone())?;
521        Ok(())
522    }
523
524    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
525        let table = self.create_and_validate_table().await?;
526        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
527            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
528        } else {
529            IcebergSinkWriter::new_append_only(table, &writer_param).await?
530        };
531
532        let commit_checkpoint_interval =
533            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
534                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
535            );
536        let writer = CoordinatedLogSinker::new(
537            &writer_param,
538            self.param.clone(),
539            inner,
540            commit_checkpoint_interval,
541        )
542        .await?;
543
544        Ok(writer)
545    }
546
547    fn is_coordinated_sink(&self) -> bool {
548        true
549    }
550
551    async fn new_coordinator(&self, db: DatabaseConnection) -> Result<Self::Coordinator> {
552        let catalog = self.config.create_catalog().await?;
553        let table = self.create_and_validate_table().await?;
554        // FIXME(Dylan): Disable EMR serverless compaction for now.
555        Ok(IcebergSinkCommitter {
556            catalog,
557            table,
558            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
559            last_commit_epoch: 0,
560            sink_id: self.param.sink_id.sink_id(),
561            config: self.config.clone(),
562            param: self.param.clone(),
563            db,
564            commit_notifier: None,
565            _compact_task_guard: None,
566            commit_retry_num: self.config.commit_retry_num,
567            committed_epoch_subscriber: None,
568        })
569    }
570}
571
572pub struct IcebergSinkWriter {
573    writer: IcebergWriterDispatch,
574    arrow_schema: SchemaRef,
575    // See comments below
576    metrics: IcebergWriterMetrics,
577    // State of iceberg table for this writer
578    table: Table,
579}
580
581#[allow(clippy::type_complexity)]
582enum IcebergWriterDispatch {
583    PartitionAppendOnly {
584        writer: Option<Box<dyn IcebergWriter>>,
585        writer_builder: MonitoredGeneralWriterBuilder<
586            FanoutPartitionWriterBuilder<
587                DataFileWriterBuilder<
588                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
589                >,
590            >,
591        >,
592    },
593    NonpartitionAppendOnly {
594        writer: Option<Box<dyn IcebergWriter>>,
595        writer_builder: MonitoredGeneralWriterBuilder<
596            DataFileWriterBuilder<
597                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
598            >,
599        >,
600    },
601    PartitionUpsert {
602        writer: Option<Box<dyn IcebergWriter>>,
603        writer_builder: MonitoredGeneralWriterBuilder<
604            FanoutPartitionWriterBuilder<
605                EqualityDeltaWriterBuilder<
606                    DataFileWriterBuilder<
607                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
608                    >,
609                    MonitoredPositionDeleteWriterBuilder<
610                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
611                    >,
612                    EqualityDeleteFileWriterBuilder<
613                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
614                    >,
615                >,
616            >,
617        >,
618        arrow_schema_with_op_column: SchemaRef,
619    },
620    NonpartitionUpsert {
621        writer: Option<Box<dyn IcebergWriter>>,
622        writer_builder: MonitoredGeneralWriterBuilder<
623            EqualityDeltaWriterBuilder<
624                DataFileWriterBuilder<
625                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
626                >,
627                MonitoredPositionDeleteWriterBuilder<
628                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
629                >,
630                EqualityDeleteFileWriterBuilder<
631                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
632                >,
633            >,
634        >,
635        arrow_schema_with_op_column: SchemaRef,
636    },
637}
638
639impl IcebergWriterDispatch {
640    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
641        match self {
642            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
643            | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
644            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
645            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
646        }
647    }
648}
649
650pub struct IcebergWriterMetrics {
651    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
652    // They are actually used in `PrometheusWriterBuilder`:
653    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
654    // We keep them here to let the guard cleans the labels from metrics registry when dropped
655    _write_qps: LabelGuardedIntCounter,
656    _write_latency: LabelGuardedHistogram,
657    write_bytes: LabelGuardedIntCounter,
658}
659
660impl IcebergSinkWriter {
661    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
662        let SinkWriterParam {
663            extra_partition_col_idx,
664            actor_id,
665            sink_id,
666            sink_name,
667            ..
668        } = writer_param;
669        let metrics_labels = [
670            &actor_id.to_string(),
671            &sink_id.to_string(),
672            sink_name.as_str(),
673        ];
674
675        // Metrics
676        let write_qps = GLOBAL_SINK_METRICS
677            .iceberg_write_qps
678            .with_guarded_label_values(&metrics_labels);
679        let write_latency = GLOBAL_SINK_METRICS
680            .iceberg_write_latency
681            .with_guarded_label_values(&metrics_labels);
682        // # TODO
683        // Unused. Add this metrics later.
684        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
685            .iceberg_rolling_unflushed_data_file
686            .with_guarded_label_values(&metrics_labels);
687        let write_bytes = GLOBAL_SINK_METRICS
688            .iceberg_write_bytes
689            .with_guarded_label_values(&metrics_labels);
690
691        let schema = table.metadata().current_schema();
692        let partition_spec = table.metadata().default_partition_spec();
693
694        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
695        let unique_uuid_suffix = Uuid::now_v7();
696
697        let parquet_writer_builder = ParquetWriterBuilder::new(
698            WriterProperties::new(),
699            schema.clone(),
700            table.file_io().clone(),
701            DefaultLocationGenerator::new(table.metadata().clone())
702                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
703            DefaultFileNameGenerator::new(
704                writer_param.actor_id.to_string(),
705                Some(unique_uuid_suffix.to_string()),
706                iceberg::spec::DataFileFormat::Parquet,
707            ),
708        );
709        let data_file_builder =
710            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
711        if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
712            Err(SinkError::Iceberg(anyhow!(
713                "Extra partition column is not supported in append-only mode"
714            )))
715        } else if partition_spec.fields().is_empty() {
716            let writer_builder = MonitoredGeneralWriterBuilder::new(
717                data_file_builder,
718                write_qps.clone(),
719                write_latency.clone(),
720            );
721            let inner_writer = Some(Box::new(
722                writer_builder
723                    .clone()
724                    .build()
725                    .await
726                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
727            ) as Box<dyn IcebergWriter>);
728            Ok(Self {
729                arrow_schema: Arc::new(
730                    schema_to_arrow_schema(table.metadata().current_schema())
731                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
732                ),
733                metrics: IcebergWriterMetrics {
734                    _write_qps: write_qps,
735                    _write_latency: write_latency,
736                    write_bytes,
737                },
738                writer: IcebergWriterDispatch::NonpartitionAppendOnly {
739                    writer: inner_writer,
740                    writer_builder,
741                },
742                table,
743            })
744        } else {
745            let partition_builder = MonitoredGeneralWriterBuilder::new(
746                FanoutPartitionWriterBuilder::new(
747                    data_file_builder,
748                    partition_spec.clone(),
749                    schema.clone(),
750                )
751                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
752                write_qps.clone(),
753                write_latency.clone(),
754            );
755            let inner_writer = Some(Box::new(
756                partition_builder
757                    .clone()
758                    .build()
759                    .await
760                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
761            ) as Box<dyn IcebergWriter>);
762            Ok(Self {
763                arrow_schema: Arc::new(
764                    schema_to_arrow_schema(table.metadata().current_schema())
765                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
766                ),
767                metrics: IcebergWriterMetrics {
768                    _write_qps: write_qps,
769                    _write_latency: write_latency,
770                    write_bytes,
771                },
772                writer: IcebergWriterDispatch::PartitionAppendOnly {
773                    writer: inner_writer,
774                    writer_builder: partition_builder,
775                },
776                table,
777            })
778        }
779    }
780
781    pub async fn new_upsert(
782        table: Table,
783        unique_column_ids: Vec<usize>,
784        writer_param: &SinkWriterParam,
785    ) -> Result<Self> {
786        let SinkWriterParam {
787            extra_partition_col_idx,
788            actor_id,
789            sink_id,
790            sink_name,
791            ..
792        } = writer_param;
793        let metrics_labels = [
794            &actor_id.to_string(),
795            &sink_id.to_string(),
796            sink_name.as_str(),
797        ];
798        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
799
800        // Metrics
801        let write_qps = GLOBAL_SINK_METRICS
802            .iceberg_write_qps
803            .with_guarded_label_values(&metrics_labels);
804        let write_latency = GLOBAL_SINK_METRICS
805            .iceberg_write_latency
806            .with_guarded_label_values(&metrics_labels);
807        // # TODO
808        // Unused. Add this metrics later.
809        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
810            .iceberg_rolling_unflushed_data_file
811            .with_guarded_label_values(&metrics_labels);
812        let position_delete_cache_num = GLOBAL_SINK_METRICS
813            .iceberg_position_delete_cache_num
814            .with_guarded_label_values(&metrics_labels);
815        let write_bytes = GLOBAL_SINK_METRICS
816            .iceberg_write_bytes
817            .with_guarded_label_values(&metrics_labels);
818
819        // Determine the schema id and partition spec id
820        let schema = table.metadata().current_schema();
821        let partition_spec = table.metadata().default_partition_spec();
822
823        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
824        let unique_uuid_suffix = Uuid::now_v7();
825
826        let data_file_builder = {
827            let parquet_writer_builder = ParquetWriterBuilder::new(
828                WriterProperties::new(),
829                schema.clone(),
830                table.file_io().clone(),
831                DefaultLocationGenerator::new(table.metadata().clone())
832                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
833                DefaultFileNameGenerator::new(
834                    writer_param.actor_id.to_string(),
835                    Some(unique_uuid_suffix.to_string()),
836                    iceberg::spec::DataFileFormat::Parquet,
837                ),
838            );
839            DataFileWriterBuilder::new(
840                parquet_writer_builder.clone(),
841                None,
842                partition_spec.spec_id(),
843            )
844        };
845        let position_delete_builder = {
846            let parquet_writer_builder = ParquetWriterBuilder::new(
847                WriterProperties::new(),
848                POSITION_DELETE_SCHEMA.clone(),
849                table.file_io().clone(),
850                DefaultLocationGenerator::new(table.metadata().clone())
851                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
852                DefaultFileNameGenerator::new(
853                    writer_param.actor_id.to_string(),
854                    Some(format!("pos-del-{}", unique_uuid_suffix)),
855                    iceberg::spec::DataFileFormat::Parquet,
856                ),
857            );
858            MonitoredPositionDeleteWriterBuilder::new(
859                SortPositionDeleteWriterBuilder::new(
860                    parquet_writer_builder.clone(),
861                    writer_param
862                        .streaming_config
863                        .developer
864                        .iceberg_sink_positional_delete_cache_size,
865                    None,
866                    None,
867                ),
868                position_delete_cache_num,
869            )
870        };
871        let equality_delete_builder = {
872            let config = EqualityDeleteWriterConfig::new(
873                unique_column_ids.clone(),
874                table.metadata().current_schema().clone(),
875                None,
876                partition_spec.spec_id(),
877            )
878            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
879            let parquet_writer_builder = ParquetWriterBuilder::new(
880                WriterProperties::new(),
881                Arc::new(
882                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
883                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
884                ),
885                table.file_io().clone(),
886                DefaultLocationGenerator::new(table.metadata().clone())
887                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
888                DefaultFileNameGenerator::new(
889                    writer_param.actor_id.to_string(),
890                    Some(format!("eq-del-{}", unique_uuid_suffix)),
891                    iceberg::spec::DataFileFormat::Parquet,
892                ),
893            );
894
895            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
896        };
897        let delta_builder = EqualityDeltaWriterBuilder::new(
898            data_file_builder,
899            position_delete_builder,
900            equality_delete_builder,
901            unique_column_ids,
902        );
903        if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
904            Err(SinkError::Iceberg(anyhow!(
905                "Extra partition column is not supported in upsert mode"
906            )))
907        } else if partition_spec.fields().is_empty() {
908            let writer_builder = MonitoredGeneralWriterBuilder::new(
909                delta_builder,
910                write_qps.clone(),
911                write_latency.clone(),
912            );
913            let inner_writer = Some(Box::new(
914                writer_builder
915                    .clone()
916                    .build()
917                    .await
918                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
919            ) as Box<dyn IcebergWriter>);
920            let original_arrow_schema = Arc::new(
921                schema_to_arrow_schema(table.metadata().current_schema())
922                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
923            );
924            let schema_with_extra_op_column = {
925                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
926                new_fields.push(Arc::new(ArrowField::new(
927                    "op".to_owned(),
928                    ArrowDataType::Int32,
929                    false,
930                )));
931                Arc::new(ArrowSchema::new(new_fields))
932            };
933            Ok(Self {
934                arrow_schema: original_arrow_schema,
935                metrics: IcebergWriterMetrics {
936                    _write_qps: write_qps,
937                    _write_latency: write_latency,
938                    write_bytes,
939                },
940                table,
941                writer: IcebergWriterDispatch::NonpartitionUpsert {
942                    writer: inner_writer,
943                    writer_builder,
944                    arrow_schema_with_op_column: schema_with_extra_op_column,
945                },
946            })
947        } else {
948            let original_arrow_schema = Arc::new(
949                schema_to_arrow_schema(table.metadata().current_schema())
950                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
951            );
952            let schema_with_extra_op_column = {
953                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
954                new_fields.push(Arc::new(ArrowField::new(
955                    "op".to_owned(),
956                    ArrowDataType::Int32,
957                    false,
958                )));
959                Arc::new(ArrowSchema::new(new_fields))
960            };
961            let partition_builder = MonitoredGeneralWriterBuilder::new(
962                FanoutPartitionWriterBuilder::new_with_custom_schema(
963                    delta_builder,
964                    schema_with_extra_op_column.clone(),
965                    partition_spec.clone(),
966                    table.metadata().current_schema().clone(),
967                ),
968                write_qps.clone(),
969                write_latency.clone(),
970            );
971            let inner_writer = Some(Box::new(
972                partition_builder
973                    .clone()
974                    .build()
975                    .await
976                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
977            ) as Box<dyn IcebergWriter>);
978            Ok(Self {
979                arrow_schema: original_arrow_schema,
980                metrics: IcebergWriterMetrics {
981                    _write_qps: write_qps,
982                    _write_latency: write_latency,
983                    write_bytes,
984                },
985                table,
986                writer: IcebergWriterDispatch::PartitionUpsert {
987                    writer: inner_writer,
988                    writer_builder: partition_builder,
989                    arrow_schema_with_op_column: schema_with_extra_op_column,
990                },
991            })
992        }
993    }
994}
995
996#[async_trait]
997impl SinkWriter for IcebergSinkWriter {
998    type CommitMetadata = Option<SinkMetadata>;
999
1000    /// Begin a new epoch
1001    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1002        // Just skip it.
1003        Ok(())
1004    }
1005
1006    /// Write a stream chunk to sink
1007    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1008        // Try to build writer if it's None.
1009        match &mut self.writer {
1010            IcebergWriterDispatch::PartitionAppendOnly {
1011                writer,
1012                writer_builder,
1013            } => {
1014                if writer.is_none() {
1015                    *writer = Some(Box::new(
1016                        writer_builder
1017                            .clone()
1018                            .build()
1019                            .await
1020                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1021                    ));
1022                }
1023            }
1024            IcebergWriterDispatch::NonpartitionAppendOnly {
1025                writer,
1026                writer_builder,
1027            } => {
1028                if writer.is_none() {
1029                    *writer = Some(Box::new(
1030                        writer_builder
1031                            .clone()
1032                            .build()
1033                            .await
1034                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1035                    ));
1036                }
1037            }
1038            IcebergWriterDispatch::PartitionUpsert {
1039                writer,
1040                writer_builder,
1041                ..
1042            } => {
1043                if writer.is_none() {
1044                    *writer = Some(Box::new(
1045                        writer_builder
1046                            .clone()
1047                            .build()
1048                            .await
1049                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1050                    ));
1051                }
1052            }
1053            IcebergWriterDispatch::NonpartitionUpsert {
1054                writer,
1055                writer_builder,
1056                ..
1057            } => {
1058                if writer.is_none() {
1059                    *writer = Some(Box::new(
1060                        writer_builder
1061                            .clone()
1062                            .build()
1063                            .await
1064                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1065                    ));
1066                }
1067            }
1068        };
1069
1070        // Process the chunk.
1071        let (mut chunk, ops) = chunk.compact().into_parts();
1072        if ops.is_empty() {
1073            return Ok(());
1074        }
1075        let write_batch_size = chunk.estimated_heap_size();
1076        let batch = match &self.writer {
1077            IcebergWriterDispatch::PartitionAppendOnly { .. }
1078            | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1079                // separate out insert chunk
1080                let filters =
1081                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1082                chunk.set_visibility(filters);
1083                IcebergArrowConvert
1084                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1085                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1086            }
1087            IcebergWriterDispatch::PartitionUpsert {
1088                arrow_schema_with_op_column,
1089                ..
1090            }
1091            | IcebergWriterDispatch::NonpartitionUpsert {
1092                arrow_schema_with_op_column,
1093                ..
1094            } => {
1095                let chunk = IcebergArrowConvert
1096                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1097                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1098                let ops = Arc::new(Int32Array::from(
1099                    ops.iter()
1100                        .map(|op| match op {
1101                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1102                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1103                        })
1104                        .collect_vec(),
1105                ));
1106                let mut columns = chunk.columns().to_vec();
1107                columns.push(ops);
1108                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1109                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1110            }
1111        };
1112
1113        let writer = self.writer.get_writer().unwrap();
1114        writer
1115            .write(batch)
1116            .await
1117            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1118        self.metrics.write_bytes.inc_by(write_batch_size as _);
1119        Ok(())
1120    }
1121
1122    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1123    /// writer should commit the current epoch.
1124    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1125        // Skip it if not checkpoint
1126        if !is_checkpoint {
1127            return Ok(None);
1128        }
1129
1130        let close_result = match &mut self.writer {
1131            IcebergWriterDispatch::PartitionAppendOnly {
1132                writer,
1133                writer_builder,
1134            } => {
1135                let close_result = match writer.take() {
1136                    Some(mut writer) => Some(writer.close().await),
1137                    _ => None,
1138                };
1139                match writer_builder.clone().build().await {
1140                    Ok(new_writer) => {
1141                        *writer = Some(Box::new(new_writer));
1142                    }
1143                    _ => {
1144                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1145                        // here because current writer may close successfully. So we just log the error.
1146                        warn!("Failed to build new writer after close");
1147                    }
1148                }
1149                close_result
1150            }
1151            IcebergWriterDispatch::NonpartitionAppendOnly {
1152                writer,
1153                writer_builder,
1154            } => {
1155                let close_result = match writer.take() {
1156                    Some(mut writer) => Some(writer.close().await),
1157                    _ => None,
1158                };
1159                match writer_builder.clone().build().await {
1160                    Ok(new_writer) => {
1161                        *writer = Some(Box::new(new_writer));
1162                    }
1163                    _ => {
1164                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1165                        // here because current writer may close successfully. So we just log the error.
1166                        warn!("Failed to build new writer after close");
1167                    }
1168                }
1169                close_result
1170            }
1171            IcebergWriterDispatch::PartitionUpsert {
1172                writer,
1173                writer_builder,
1174                ..
1175            } => {
1176                let close_result = match writer.take() {
1177                    Some(mut writer) => Some(writer.close().await),
1178                    _ => None,
1179                };
1180                match writer_builder.clone().build().await {
1181                    Ok(new_writer) => {
1182                        *writer = Some(Box::new(new_writer));
1183                    }
1184                    _ => {
1185                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1186                        // here because current writer may close successfully. So we just log the error.
1187                        warn!("Failed to build new writer after close");
1188                    }
1189                }
1190                close_result
1191            }
1192            IcebergWriterDispatch::NonpartitionUpsert {
1193                writer,
1194                writer_builder,
1195                ..
1196            } => {
1197                let close_result = match writer.take() {
1198                    Some(mut writer) => Some(writer.close().await),
1199                    _ => None,
1200                };
1201                match writer_builder.clone().build().await {
1202                    Ok(new_writer) => {
1203                        *writer = Some(Box::new(new_writer));
1204                    }
1205                    _ => {
1206                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1207                        // here because current writer may close successfully. So we just log the error.
1208                        warn!("Failed to build new writer after close");
1209                    }
1210                }
1211                close_result
1212            }
1213        };
1214
1215        match close_result {
1216            Some(Ok(result)) => {
1217                let version = self.table.metadata().format_version() as u8;
1218                let partition_type = self.table.metadata().default_partition_type();
1219                let data_files = result
1220                    .into_iter()
1221                    .map(|f| {
1222                        SerializedDataFile::try_from(f, partition_type, version == 1)
1223                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1224                    })
1225                    .collect::<Result<Vec<_>>>()?;
1226                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1227                    data_files,
1228                    schema_id: self.table.metadata().current_schema_id(),
1229                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1230                })?))
1231            }
1232            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1233            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1234        }
1235    }
1236
1237    /// Clean up
1238    async fn abort(&mut self) -> Result<()> {
1239        // TODO: abort should clean up all the data written in this epoch.
1240        Ok(())
1241    }
1242}
1243
1244const SCHEMA_ID: &str = "schema_id";
1245const PARTITION_SPEC_ID: &str = "partition_spec_id";
1246const DATA_FILES: &str = "data_files";
1247
1248#[derive(Default, Clone)]
1249struct IcebergCommitResult {
1250    schema_id: i32,
1251    partition_spec_id: i32,
1252    data_files: Vec<SerializedDataFile>,
1253}
1254
1255impl IcebergCommitResult {
1256    fn try_from(value: &SinkMetadata) -> Result<Self> {
1257        if let Some(Serialized(v)) = &value.metadata {
1258            let mut values = if let serde_json::Value::Object(v) =
1259                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1260                    .context("Can't parse iceberg sink metadata")?
1261            {
1262                v
1263            } else {
1264                bail!("iceberg sink metadata should be an object");
1265            };
1266
1267            let schema_id;
1268            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1269                schema_id = value
1270                    .as_u64()
1271                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1272            } else {
1273                bail!("iceberg sink metadata should have schema_id");
1274            }
1275
1276            let partition_spec_id;
1277            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1278                partition_spec_id = value
1279                    .as_u64()
1280                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1281            } else {
1282                bail!("iceberg sink metadata should have partition_spec_id");
1283            }
1284
1285            let data_files: Vec<SerializedDataFile>;
1286            if let serde_json::Value::Array(values) = values
1287                .remove(DATA_FILES)
1288                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1289            {
1290                data_files = values
1291                    .into_iter()
1292                    .map(from_value::<SerializedDataFile>)
1293                    .collect::<std::result::Result<_, _>>()
1294                    .unwrap();
1295            } else {
1296                bail!("iceberg sink metadata should have data_files object");
1297            }
1298
1299            Ok(Self {
1300                schema_id: schema_id as i32,
1301                partition_spec_id: partition_spec_id as i32,
1302                data_files,
1303            })
1304        } else {
1305            bail!("Can't create iceberg sink write result from empty data!")
1306        }
1307    }
1308
1309    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1310        let mut values = if let serde_json::Value::Object(value) =
1311            serde_json::from_slice::<serde_json::Value>(&value)
1312                .context("Can't parse iceberg sink metadata")?
1313        {
1314            value
1315        } else {
1316            bail!("iceberg sink metadata should be an object");
1317        };
1318
1319        let schema_id;
1320        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1321            schema_id = value
1322                .as_u64()
1323                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1324        } else {
1325            bail!("iceberg sink metadata should have schema_id");
1326        }
1327
1328        let partition_spec_id;
1329        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1330            partition_spec_id = value
1331                .as_u64()
1332                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1333        } else {
1334            bail!("iceberg sink metadata should have partition_spec_id");
1335        }
1336
1337        let data_files: Vec<SerializedDataFile>;
1338        if let serde_json::Value::Array(values) = values
1339            .remove(DATA_FILES)
1340            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1341        {
1342            data_files = values
1343                .into_iter()
1344                .map(from_value::<SerializedDataFile>)
1345                .collect::<std::result::Result<_, _>>()
1346                .unwrap();
1347        } else {
1348            bail!("iceberg sink metadata should have data_files object");
1349        }
1350
1351        Ok(Self {
1352            schema_id: schema_id as i32,
1353            partition_spec_id: partition_spec_id as i32,
1354            data_files,
1355        })
1356    }
1357}
1358
1359impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1360    type Error = SinkError;
1361
1362    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1363        let json_data_files = serde_json::Value::Array(
1364            value
1365                .data_files
1366                .iter()
1367                .map(serde_json::to_value)
1368                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1369                .context("Can't serialize data files to json")?,
1370        );
1371        let json_value = serde_json::Value::Object(
1372            vec![
1373                (
1374                    SCHEMA_ID.to_owned(),
1375                    serde_json::Value::Number(value.schema_id.into()),
1376                ),
1377                (
1378                    PARTITION_SPEC_ID.to_owned(),
1379                    serde_json::Value::Number(value.partition_spec_id.into()),
1380                ),
1381                (DATA_FILES.to_owned(), json_data_files),
1382            ]
1383            .into_iter()
1384            .collect(),
1385        );
1386        Ok(SinkMetadata {
1387            metadata: Some(Serialized(SerializedMetadata {
1388                metadata: serde_json::to_vec(&json_value)
1389                    .context("Can't serialize iceberg sink metadata")?,
1390            })),
1391        })
1392    }
1393}
1394
1395impl TryFrom<IcebergCommitResult> for Vec<u8> {
1396    type Error = SinkError;
1397
1398    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1399        let json_data_files = serde_json::Value::Array(
1400            value
1401                .data_files
1402                .iter()
1403                .map(serde_json::to_value)
1404                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1405                .context("Can't serialize data files to json")?,
1406        );
1407        let json_value = serde_json::Value::Object(
1408            vec![
1409                (
1410                    SCHEMA_ID.to_owned(),
1411                    serde_json::Value::Number(value.schema_id.into()),
1412                ),
1413                (
1414                    PARTITION_SPEC_ID.to_owned(),
1415                    serde_json::Value::Number(value.partition_spec_id.into()),
1416                ),
1417                (DATA_FILES.to_owned(), json_data_files),
1418            ]
1419            .into_iter()
1420            .collect(),
1421        );
1422        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1423    }
1424}
1425pub struct IcebergSinkCommitter {
1426    catalog: Arc<dyn Catalog>,
1427    table: Table,
1428    pub last_commit_epoch: u64,
1429    pub(crate) is_exactly_once: bool,
1430    pub(crate) sink_id: u32,
1431    pub(crate) config: IcebergConfig,
1432    pub(crate) param: SinkParam,
1433    pub(crate) db: DatabaseConnection,
1434    commit_notifier: Option<mpsc::UnboundedSender<()>>,
1435    commit_retry_num: u32,
1436    _compact_task_guard: Option<oneshot::Sender<()>>,
1437    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1438}
1439
1440impl IcebergSinkCommitter {
1441    // Reload table and guarantee current schema_id and partition_spec_id matches
1442    // given `schema_id` and `partition_spec_id`
1443    async fn reload_table(
1444        catalog: &dyn Catalog,
1445        table_ident: &TableIdent,
1446        schema_id: i32,
1447        partition_spec_id: i32,
1448    ) -> Result<Table> {
1449        let table = catalog
1450            .load_table(table_ident)
1451            .await
1452            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1453        if table.metadata().current_schema_id() != schema_id {
1454            return Err(SinkError::Iceberg(anyhow!(
1455                "Schema evolution not supported, expect schema id {}, but got {}",
1456                schema_id,
1457                table.metadata().current_schema_id()
1458            )));
1459        }
1460        if table.metadata().default_partition_spec_id() != partition_spec_id {
1461            return Err(SinkError::Iceberg(anyhow!(
1462                "Partition evolution not supported, expect partition spec id {}, but got {}",
1463                partition_spec_id,
1464                table.metadata().default_partition_spec_id()
1465            )));
1466        }
1467        Ok(table)
1468    }
1469}
1470
1471#[async_trait::async_trait]
1472impl SinkCommitCoordinator for IcebergSinkCommitter {
1473    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1474        if self.is_exactly_once {
1475            self.committed_epoch_subscriber = Some(subscriber);
1476            tracing::info!(
1477                "Sink id = {}: iceberg sink coordinator initing.",
1478                self.param.sink_id.sink_id()
1479            );
1480            if self
1481                .iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id())
1482                .await?
1483            {
1484                let ordered_metadata_list_by_end_epoch = self
1485                    .get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id())
1486                    .await?;
1487
1488                let mut last_recommit_epoch = 0;
1489                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1490                    ordered_metadata_list_by_end_epoch
1491                {
1492                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1493                    let mut write_results = vec![];
1494
1495                    for each in write_results_bytes {
1496                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1497                        write_results.push(write_result);
1498                    }
1499
1500                    match (
1501                        committed,
1502                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1503                            .await?,
1504                    ) {
1505                        (true, _) => {
1506                            tracing::info!(
1507                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1508                                self.param.sink_id.sink_id()
1509                            );
1510                        }
1511                        (false, true) => {
1512                            // skip
1513                            tracing::info!(
1514                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1515                                self.param.sink_id.sink_id()
1516                            );
1517                            self.mark_row_is_committed_by_sink_id_and_end_epoch(
1518                                &self.db,
1519                                self.sink_id,
1520                                end_epoch,
1521                            )
1522                            .await?;
1523                        }
1524                        (false, false) => {
1525                            tracing::info!(
1526                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1527                                self.param.sink_id.sink_id()
1528                            );
1529                            self.re_commit(end_epoch, write_results, snapshot_id)
1530                                .await?;
1531                        }
1532                    }
1533
1534                    last_recommit_epoch = end_epoch;
1535                }
1536                tracing::info!(
1537                    "Sink id = {}: iceberg commit coordinator inited.",
1538                    self.param.sink_id.sink_id()
1539                );
1540                return Ok(Some(last_recommit_epoch));
1541            } else {
1542                tracing::info!(
1543                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1544                    self.param.sink_id.sink_id()
1545                );
1546                return Ok(None);
1547            }
1548        }
1549
1550        tracing::info!("Iceberg commit coordinator inited.");
1551        return Ok(None);
1552    }
1553
1554    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1555        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1556        let write_results: Vec<IcebergCommitResult> = metadata
1557            .iter()
1558            .map(IcebergCommitResult::try_from)
1559            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1560
1561        // Skip if no data to commit
1562        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1563            tracing::debug!(?epoch, "no data to commit");
1564            return Ok(());
1565        }
1566
1567        // guarantee that all write results has same schema_id and partition_spec_id
1568        if write_results
1569            .iter()
1570            .any(|r| r.schema_id != write_results[0].schema_id)
1571            || write_results
1572                .iter()
1573                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1574        {
1575            return Err(SinkError::Iceberg(anyhow!(
1576                "schema_id and partition_spec_id should be the same in all write results"
1577            )));
1578        }
1579
1580        if self.is_exactly_once {
1581            assert!(self.committed_epoch_subscriber.is_some());
1582            match self.committed_epoch_subscriber.clone() {
1583                Some(committed_epoch_subscriber) => {
1584                    // Get the latest committed_epoch and the receiver
1585                    let (committed_epoch, mut rw_futures_utilrx) =
1586                        committed_epoch_subscriber(self.param.sink_id).await?;
1587                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1588                    if committed_epoch >= epoch {
1589                        self.commit_iceberg_inner(epoch, write_results, None)
1590                            .await?;
1591                    } else {
1592                        tracing::info!(
1593                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1594                            committed_epoch,
1595                            epoch
1596                        );
1597                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1598                            tracing::info!(
1599                                "Received next committed epoch: {}",
1600                                next_committed_epoch
1601                            );
1602                            // If next_epoch meets the condition, execute commit immediately
1603                            if next_committed_epoch >= epoch {
1604                                self.commit_iceberg_inner(epoch, write_results, None)
1605                                    .await?;
1606                                break;
1607                            }
1608                        }
1609                    }
1610                }
1611                None => unreachable!(
1612                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1613                ),
1614            }
1615        } else {
1616            self.commit_iceberg_inner(epoch, write_results, None)
1617                .await?;
1618        }
1619
1620        Ok(())
1621    }
1622}
1623
1624/// Methods Required to Achieve Exactly Once Semantics
1625impl IcebergSinkCommitter {
1626    async fn re_commit(
1627        &mut self,
1628        epoch: u64,
1629        write_results: Vec<IcebergCommitResult>,
1630        snapshot_id: i64,
1631    ) -> Result<()> {
1632        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1633
1634        // Skip if no data to commit
1635        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1636            tracing::debug!(?epoch, "no data to commit");
1637            return Ok(());
1638        }
1639        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1640            .await?;
1641        Ok(())
1642    }
1643
1644    async fn commit_iceberg_inner(
1645        &mut self,
1646        epoch: u64,
1647        write_results: Vec<IcebergCommitResult>,
1648        snapshot_id: Option<i64>,
1649    ) -> Result<()> {
1650        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1651        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1652        // that was previously persisted in the system table to commit.
1653        let is_first_commit = snapshot_id.is_none();
1654        if !is_first_commit {
1655            tracing::info!("Doing iceberg re commit.");
1656        }
1657        self.last_commit_epoch = epoch;
1658        let expect_schema_id = write_results[0].schema_id;
1659        let expect_partition_spec_id = write_results[0].partition_spec_id;
1660
1661        // Load the latest table to avoid concurrent modification with the best effort.
1662        self.table = Self::reload_table(
1663            self.catalog.as_ref(),
1664            self.table.identifier(),
1665            expect_schema_id,
1666            expect_partition_spec_id,
1667        )
1668        .await?;
1669        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1670            return Err(SinkError::Iceberg(anyhow!(
1671                "Can't find schema by id {}",
1672                expect_schema_id
1673            )));
1674        };
1675        let Some(partition_spec) = self
1676            .table
1677            .metadata()
1678            .partition_spec_by_id(expect_partition_spec_id)
1679        else {
1680            return Err(SinkError::Iceberg(anyhow!(
1681                "Can't find partition spec by id {}",
1682                expect_partition_spec_id
1683            )));
1684        };
1685        let partition_type = partition_spec
1686            .as_ref()
1687            .clone()
1688            .partition_type(schema)
1689            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1690
1691        let txn = Transaction::new(&self.table);
1692        // Only generate new snapshot id when first commit.
1693        let snapshot_id = match snapshot_id {
1694            Some(previous_snapshot_id) => previous_snapshot_id,
1695            None => txn.generate_unique_snapshot_id(),
1696        };
1697        if self.is_exactly_once && is_first_commit {
1698            // persist pre commit metadata and snapshot id in system table.
1699            let mut pre_commit_metadata_bytes = Vec::new();
1700            for each_parallelism_write_result in write_results.clone() {
1701                let each_parallelism_write_result_bytes: Vec<u8> =
1702                    each_parallelism_write_result.try_into()?;
1703                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1704            }
1705
1706            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1707
1708            self.persist_pre_commit_metadata(
1709                self.db.clone(),
1710                self.last_commit_epoch,
1711                epoch,
1712                pre_commit_metadata_bytes,
1713                snapshot_id,
1714            )
1715            .await?;
1716        }
1717
1718        let data_files = write_results
1719            .into_iter()
1720            .flat_map(|r| {
1721                r.data_files.into_iter().map(|f| {
1722                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1723                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1724                })
1725            })
1726            .collect::<Result<Vec<DataFile>>>()?;
1727        // # TODO:
1728        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1729        // because retry logic involved reapply the commit metadata.
1730        // For now, we just retry the commit operation.
1731        let retry_strategy = ExponentialBackoff::from_millis(10)
1732            .max_delay(Duration::from_secs(60))
1733            .map(jitter)
1734            .take(self.commit_retry_num as usize);
1735        let catalog = self.catalog.clone();
1736        let table_ident = self.table.identifier().clone();
1737        let table = Retry::spawn(retry_strategy, || async {
1738            let table = Self::reload_table(
1739                catalog.as_ref(),
1740                &table_ident,
1741                expect_schema_id,
1742                expect_partition_spec_id,
1743            )
1744            .await?;
1745            let txn = Transaction::new(&table);
1746            let mut append_action = txn
1747                .fast_append(Some(snapshot_id), None, vec![])
1748                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1749            append_action
1750                .add_data_files(data_files.clone())
1751                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1752            let tx = append_action.apply().await.map_err(|err| {
1753                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1754                SinkError::Iceberg(anyhow!(err))
1755            })?;
1756            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1757                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1758                SinkError::Iceberg(anyhow!(err))
1759            })
1760        })
1761        .await?;
1762        self.table = table;
1763
1764        if let Some(commit_notifier) = &mut self.commit_notifier {
1765            if commit_notifier.send(()).is_err() {
1766                warn!("failed to notify commit");
1767            }
1768        }
1769        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1770
1771        if self.is_exactly_once {
1772            self.mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1773                .await?;
1774            tracing::info!(
1775                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1776                self.sink_id,
1777                epoch
1778            );
1779
1780            self.delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1781                .await?;
1782        }
1783        Ok(())
1784    }
1785
1786    async fn persist_pre_commit_metadata(
1787        &self,
1788        db: DatabaseConnection,
1789        start_epoch: u64,
1790        end_epoch: u64,
1791        pre_commit_metadata: Vec<u8>,
1792        snapshot_id: i64,
1793    ) -> Result<()> {
1794        let m = exactly_once_iceberg_sink::ActiveModel {
1795            sink_id: Set(self.sink_id as i32),
1796            end_epoch: Set(end_epoch.try_into().unwrap()),
1797            start_epoch: Set(start_epoch.try_into().unwrap()),
1798            metadata: Set(pre_commit_metadata),
1799            committed: Set(false),
1800            snapshot_id: Set(snapshot_id),
1801        };
1802        match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
1803            Ok(_) => Ok(()),
1804            Err(e) => {
1805                tracing::error!("Error inserting into system table: {:?}", e.as_report());
1806                Err(e.into())
1807            }
1808        }
1809    }
1810
1811    async fn mark_row_is_committed_by_sink_id_and_end_epoch(
1812        &self,
1813        db: &DatabaseConnection,
1814        sink_id: u32,
1815        end_epoch: u64,
1816    ) -> Result<()> {
1817        match Entity::update(exactly_once_iceberg_sink::ActiveModel {
1818            sink_id: Set(sink_id as i32),
1819            end_epoch: Set(end_epoch.try_into().unwrap()),
1820            committed: Set(true),
1821            ..Default::default()
1822        })
1823        .exec(db)
1824        .await
1825        {
1826            Ok(_) => {
1827                tracing::info!(
1828                    "Sink id = {}: mark written data status to committed, end_epoch = {}.",
1829                    sink_id,
1830                    end_epoch
1831                );
1832                Ok(())
1833            }
1834            Err(e) => {
1835                tracing::error!(
1836                    "Error marking item to committed from iceberg exactly once system table: {:?}",
1837                    e.as_report()
1838                );
1839                Err(e.into())
1840            }
1841        }
1842    }
1843
1844    async fn delete_row_by_sink_id_and_end_epoch(
1845        &self,
1846        db: &DatabaseConnection,
1847        sink_id: u32,
1848        end_epoch: u64,
1849    ) -> Result<()> {
1850        let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
1851        match Entity::delete_many()
1852            .filter(Column::SinkId.eq(sink_id))
1853            .filter(Column::EndEpoch.lt(end_epoch_i64))
1854            .exec(db)
1855            .await
1856        {
1857            Ok(result) => {
1858                let deleted_count = result.rows_affected;
1859
1860                if deleted_count == 0 {
1861                    tracing::info!(
1862                        "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
1863                        sink_id,
1864                        end_epoch
1865                    );
1866                } else {
1867                    tracing::info!(
1868                        "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
1869                        sink_id,
1870                        end_epoch
1871                    );
1872                }
1873                Ok(())
1874            }
1875            Err(e) => {
1876                tracing::error!(
1877                    "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
1878                    sink_id,
1879                    e.as_report()
1880                );
1881                Err(e.into())
1882            }
1883        }
1884    }
1885
1886    async fn iceberg_sink_has_pre_commit_metadata(
1887        &self,
1888        db: &DatabaseConnection,
1889        sink_id: u32,
1890    ) -> Result<bool> {
1891        match exactly_once_iceberg_sink::Entity::find()
1892            .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id as i32))
1893            .count(db)
1894            .await
1895        {
1896            Ok(count) => Ok(count > 0),
1897            Err(e) => {
1898                tracing::error!(
1899                    "Error querying pre-commit metadata from system table: {:?}",
1900                    e.as_report()
1901                );
1902                Err(e.into())
1903            }
1904        }
1905    }
1906
1907    async fn get_pre_commit_info_by_sink_id(
1908        &self,
1909        db: &DatabaseConnection,
1910        sink_id: u32,
1911    ) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
1912        let models: Vec<Model> = Entity::find()
1913            .filter(Column::SinkId.eq(sink_id as i32))
1914            .order_by(Column::EndEpoch, Order::Asc)
1915            .all(db)
1916            .await?;
1917
1918        let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
1919
1920        for model in models {
1921            result.push((
1922                model.end_epoch.try_into().unwrap(),
1923                model.metadata,
1924                model.snapshot_id,
1925                model.committed,
1926            ));
1927        }
1928
1929        Ok(result)
1930    }
1931
1932    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1933    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1934    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
1935    async fn is_snapshot_id_in_iceberg(
1936        &self,
1937        iceberg_config: &IcebergConfig,
1938        snapshot_id: i64,
1939    ) -> Result<bool> {
1940        let iceberg_common = iceberg_config.common.clone();
1941        let table = iceberg_common
1942            .load_table(&iceberg_config.java_catalog_props)
1943            .await?;
1944        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1945            Ok(true)
1946        } else {
1947            Ok(false)
1948        }
1949    }
1950}
1951
1952const MAP_KEY: &str = "key";
1953const MAP_VALUE: &str = "value";
1954
1955fn get_fields<'a>(
1956    our_field_type: &'a risingwave_common::types::DataType,
1957    data_type: &ArrowDataType,
1958    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1959) -> Option<ArrowFields> {
1960    match data_type {
1961        ArrowDataType::Struct(fields) => {
1962            match our_field_type {
1963                risingwave_common::types::DataType::Struct(struct_fields) => {
1964                    struct_fields.iter().for_each(|(name, data_type)| {
1965                        let res = schema_fields.insert(name, data_type);
1966                        // This assert is to make sure there is no duplicate field name in the schema.
1967                        assert!(res.is_none())
1968                    });
1969                }
1970                risingwave_common::types::DataType::Map(map_fields) => {
1971                    schema_fields.insert(MAP_KEY, map_fields.key());
1972                    schema_fields.insert(MAP_VALUE, map_fields.value());
1973                }
1974                risingwave_common::types::DataType::List(list_field) => {
1975                    list_field.as_struct().iter().for_each(|(name, data_type)| {
1976                        let res = schema_fields.insert(name, data_type);
1977                        // This assert is to make sure there is no duplicate field name in the schema.
1978                        assert!(res.is_none())
1979                    });
1980                }
1981                _ => {}
1982            };
1983            Some(fields.clone())
1984        }
1985        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1986            get_fields(our_field_type, field.data_type(), schema_fields)
1987        }
1988        _ => None, // not a supported complex type and unlikely to show up
1989    }
1990}
1991
1992fn check_compatibility(
1993    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1994    fields: &ArrowFields,
1995) -> anyhow::Result<bool> {
1996    for arrow_field in fields {
1997        let our_field_type = schema_fields
1998            .get(arrow_field.name().as_str())
1999            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2000
2001        // Iceberg source should be able to read iceberg decimal type.
2002        let converted_arrow_data_type = IcebergArrowConvert
2003            .to_arrow_field("", our_field_type)
2004            .map_err(|e| anyhow!(e))?
2005            .data_type()
2006            .clone();
2007
2008        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2009            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2010            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2011            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2012            (ArrowDataType::List(_), ArrowDataType::List(field))
2013            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2014                let mut schema_fields = HashMap::new();
2015                get_fields(our_field_type, field.data_type(), &mut schema_fields)
2016                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2017            }
2018            // validate nested structs
2019            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2020                let mut schema_fields = HashMap::new();
2021                our_field_type
2022                    .as_struct()
2023                    .iter()
2024                    .for_each(|(name, data_type)| {
2025                        let res = schema_fields.insert(name, data_type);
2026                        // This assert is to make sure there is no duplicate field name in the schema.
2027                        assert!(res.is_none())
2028                    });
2029                check_compatibility(schema_fields, fields)?
2030            }
2031            // cases where left != right (metadata, field name mismatch)
2032            //
2033            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2034            // {"PARQUET:field_id": ".."}
2035            //
2036            // map: The standard name in arrow is "entries", "key", "value".
2037            // in iceberg-rs, it's called "key_value"
2038            (left, right) => left.equals_datatype(right),
2039        };
2040        if !compatible {
2041            bail!(
2042                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2043                arrow_field.name(),
2044                converted_arrow_data_type,
2045                arrow_field.data_type()
2046            );
2047        }
2048    }
2049    Ok(true)
2050}
2051
2052/// Try to match our schema with iceberg schema.
2053pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2054    if rw_schema.fields.len() != arrow_schema.fields().len() {
2055        bail!(
2056            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2057            rw_schema.fields.len(),
2058            arrow_schema.fields.len()
2059        );
2060    }
2061
2062    let mut schema_fields = HashMap::new();
2063    rw_schema.fields.iter().for_each(|field| {
2064        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2065        // This assert is to make sure there is no duplicate field name in the schema.
2066        assert!(res.is_none())
2067    });
2068
2069    check_compatibility(schema_fields, &arrow_schema.fields)?;
2070    Ok(())
2071}
2072
2073pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2074    serde_json::to_vec(&metadata).unwrap()
2075}
2076
2077pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2078    serde_json::from_slice(&bytes).unwrap()
2079}
2080
2081#[cfg(test)]
2082mod test {
2083    use std::collections::BTreeMap;
2084
2085    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2086    use risingwave_common::catalog::Field;
2087    use risingwave_common::types::{DataType, MapType, StructType};
2088
2089    use crate::connector_common::IcebergCommon;
2090    use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2091    use crate::sink::iceberg::IcebergConfig;
2092
2093    #[test]
2094    fn test_compatible_arrow_schema() {
2095        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2096
2097        use super::*;
2098        let risingwave_schema = Schema::new(vec![
2099            Field::with_name(DataType::Int32, "a"),
2100            Field::with_name(DataType::Int32, "b"),
2101            Field::with_name(DataType::Int32, "c"),
2102        ]);
2103        let arrow_schema = ArrowSchema::new(vec![
2104            ArrowField::new("a", ArrowDataType::Int32, false),
2105            ArrowField::new("b", ArrowDataType::Int32, false),
2106            ArrowField::new("c", ArrowDataType::Int32, false),
2107        ]);
2108
2109        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2110
2111        let risingwave_schema = Schema::new(vec![
2112            Field::with_name(DataType::Int32, "d"),
2113            Field::with_name(DataType::Int32, "c"),
2114            Field::with_name(DataType::Int32, "a"),
2115            Field::with_name(DataType::Int32, "b"),
2116        ]);
2117        let arrow_schema = ArrowSchema::new(vec![
2118            ArrowField::new("a", ArrowDataType::Int32, false),
2119            ArrowField::new("b", ArrowDataType::Int32, false),
2120            ArrowField::new("d", ArrowDataType::Int32, false),
2121            ArrowField::new("c", ArrowDataType::Int32, false),
2122        ]);
2123        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2124
2125        let risingwave_schema = Schema::new(vec![
2126            Field::with_name(
2127                DataType::Struct(StructType::new(vec![
2128                    ("a1", DataType::Int32),
2129                    (
2130                        "a2",
2131                        DataType::Struct(StructType::new(vec![
2132                            ("a21", DataType::Bytea),
2133                            (
2134                                "a22",
2135                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2136                            ),
2137                        ])),
2138                    ),
2139                ])),
2140                "a",
2141            ),
2142            Field::with_name(
2143                DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2144                    ("b1", DataType::Int32),
2145                    ("b2", DataType::Bytea),
2146                    (
2147                        "b3",
2148                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2149                    ),
2150                ])))),
2151                "b",
2152            ),
2153            Field::with_name(
2154                DataType::Map(MapType::from_kv(
2155                    DataType::Varchar,
2156                    DataType::List(Box::new(DataType::Struct(StructType::new([
2157                        ("c1", DataType::Int32),
2158                        ("c2", DataType::Bytea),
2159                        (
2160                            "c3",
2161                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2162                        ),
2163                    ])))),
2164                )),
2165                "c",
2166            ),
2167        ]);
2168        let arrow_schema = ArrowSchema::new(vec![
2169            ArrowField::new(
2170                "a",
2171                ArrowDataType::Struct(ArrowFields::from(vec![
2172                    ArrowField::new("a1", ArrowDataType::Int32, false),
2173                    ArrowField::new(
2174                        "a2",
2175                        ArrowDataType::Struct(ArrowFields::from(vec![
2176                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2177                            ArrowField::new_map(
2178                                "a22",
2179                                "entries",
2180                                ArrowFieldRef::new(ArrowField::new(
2181                                    "key",
2182                                    ArrowDataType::Utf8,
2183                                    false,
2184                                )),
2185                                ArrowFieldRef::new(ArrowField::new(
2186                                    "value",
2187                                    ArrowDataType::Utf8,
2188                                    false,
2189                                )),
2190                                false,
2191                                false,
2192                            ),
2193                        ])),
2194                        false,
2195                    ),
2196                ])),
2197                false,
2198            ),
2199            ArrowField::new(
2200                "b",
2201                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2202                    ArrowDataType::Struct(ArrowFields::from(vec![
2203                        ArrowField::new("b1", ArrowDataType::Int32, false),
2204                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2205                        ArrowField::new_map(
2206                            "b3",
2207                            "entries",
2208                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2209                            ArrowFieldRef::new(ArrowField::new(
2210                                "value",
2211                                ArrowDataType::Utf8,
2212                                false,
2213                            )),
2214                            false,
2215                            false,
2216                        ),
2217                    ])),
2218                    false,
2219                ))),
2220                false,
2221            ),
2222            ArrowField::new_map(
2223                "c",
2224                "entries",
2225                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2226                ArrowFieldRef::new(ArrowField::new(
2227                    "value",
2228                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2229                        ArrowDataType::Struct(ArrowFields::from(vec![
2230                            ArrowField::new("c1", ArrowDataType::Int32, false),
2231                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2232                            ArrowField::new_map(
2233                                "c3",
2234                                "entries",
2235                                ArrowFieldRef::new(ArrowField::new(
2236                                    "key",
2237                                    ArrowDataType::Utf8,
2238                                    false,
2239                                )),
2240                                ArrowFieldRef::new(ArrowField::new(
2241                                    "value",
2242                                    ArrowDataType::Utf8,
2243                                    false,
2244                                )),
2245                                false,
2246                                false,
2247                            ),
2248                        ])),
2249                        false,
2250                    ))),
2251                    false,
2252                )),
2253                false,
2254                false,
2255            ),
2256        ]);
2257        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2258    }
2259
2260    #[test]
2261    fn test_parse_iceberg_config() {
2262        let values = [
2263            ("connector", "iceberg"),
2264            ("type", "upsert"),
2265            ("primary_key", "v1"),
2266            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2267            ("warehouse.path", "s3://iceberg"),
2268            ("s3.endpoint", "http://127.0.0.1:9301"),
2269            ("s3.access.key", "hummockadmin"),
2270            ("s3.secret.key", "hummockadmin"),
2271            ("s3.path.style.access", "true"),
2272            ("s3.region", "us-east-1"),
2273            ("catalog.type", "jdbc"),
2274            ("catalog.name", "demo"),
2275            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2276            ("catalog.jdbc.user", "admin"),
2277            ("catalog.jdbc.password", "123456"),
2278            ("database.name", "demo_db"),
2279            ("table.name", "demo_table"),
2280        ]
2281        .into_iter()
2282        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2283        .collect();
2284
2285        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2286
2287        let expected_iceberg_config = IcebergConfig {
2288            common: IcebergCommon {
2289                warehouse_path: Some("s3://iceberg".to_owned()),
2290                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2291                region: Some("us-east-1".to_owned()),
2292                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2293                access_key: Some("hummockadmin".to_owned()),
2294                secret_key: Some("hummockadmin".to_owned()),
2295                gcs_credential: None,
2296                catalog_type: Some("jdbc".to_owned()),
2297                glue_id: None,
2298                catalog_name: Some("demo".to_owned()),
2299                database_name: Some("demo_db".to_owned()),
2300                table_name: "demo_table".to_owned(),
2301                path_style_access: Some(true),
2302                credential: None,
2303                oauth2_server_uri: None,
2304                scope: None,
2305                token: None,
2306                enable_config_load: None,
2307                rest_signing_name: None,
2308                rest_signing_region: None,
2309                rest_sigv4_enabled: None,
2310                hosted_catalog: None,
2311                azblob_account_name: None,
2312                azblob_account_key: None,
2313                azblob_endpoint_url: None,
2314            },
2315            r#type: "upsert".to_owned(),
2316            force_append_only: false,
2317            primary_key: Some(vec!["v1".to_owned()]),
2318            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2319            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2320                .into_iter()
2321                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2322                .collect(),
2323            commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2324            create_table_if_not_exists: false,
2325            is_exactly_once: None,
2326            commit_retry_num: 8,
2327        };
2328
2329        assert_eq!(iceberg_config, expected_iceberg_config);
2330
2331        assert_eq!(
2332            &iceberg_config.common.full_table_name().unwrap().to_string(),
2333            "demo_db.demo_table"
2334        );
2335    }
2336
2337    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2338        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2339
2340        let table = iceberg_config.load_table().await.unwrap();
2341
2342        println!("{:?}", table.identifier());
2343    }
2344
2345    #[tokio::test]
2346    #[ignore]
2347    async fn test_storage_catalog() {
2348        let values = [
2349            ("connector", "iceberg"),
2350            ("type", "append-only"),
2351            ("force_append_only", "true"),
2352            ("s3.endpoint", "http://127.0.0.1:9301"),
2353            ("s3.access.key", "hummockadmin"),
2354            ("s3.secret.key", "hummockadmin"),
2355            ("s3.region", "us-east-1"),
2356            ("s3.path.style.access", "true"),
2357            ("catalog.name", "demo"),
2358            ("catalog.type", "storage"),
2359            ("warehouse.path", "s3://icebergdata/demo"),
2360            ("database.name", "s1"),
2361            ("table.name", "t1"),
2362        ]
2363        .into_iter()
2364        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2365        .collect();
2366
2367        test_create_catalog(values).await;
2368    }
2369
2370    #[tokio::test]
2371    #[ignore]
2372    async fn test_rest_catalog() {
2373        let values = [
2374            ("connector", "iceberg"),
2375            ("type", "append-only"),
2376            ("force_append_only", "true"),
2377            ("s3.endpoint", "http://127.0.0.1:9301"),
2378            ("s3.access.key", "hummockadmin"),
2379            ("s3.secret.key", "hummockadmin"),
2380            ("s3.region", "us-east-1"),
2381            ("s3.path.style.access", "true"),
2382            ("catalog.name", "demo"),
2383            ("catalog.type", "rest"),
2384            ("catalog.uri", "http://192.168.167.4:8181"),
2385            ("warehouse.path", "s3://icebergdata/demo"),
2386            ("database.name", "s1"),
2387            ("table.name", "t1"),
2388        ]
2389        .into_iter()
2390        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2391        .collect();
2392
2393        test_create_catalog(values).await;
2394    }
2395
2396    #[tokio::test]
2397    #[ignore]
2398    async fn test_jdbc_catalog() {
2399        let values = [
2400            ("connector", "iceberg"),
2401            ("type", "append-only"),
2402            ("force_append_only", "true"),
2403            ("s3.endpoint", "http://127.0.0.1:9301"),
2404            ("s3.access.key", "hummockadmin"),
2405            ("s3.secret.key", "hummockadmin"),
2406            ("s3.region", "us-east-1"),
2407            ("s3.path.style.access", "true"),
2408            ("catalog.name", "demo"),
2409            ("catalog.type", "jdbc"),
2410            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2411            ("catalog.jdbc.user", "admin"),
2412            ("catalog.jdbc.password", "123456"),
2413            ("warehouse.path", "s3://icebergdata/demo"),
2414            ("database.name", "s1"),
2415            ("table.name", "t1"),
2416        ]
2417        .into_iter()
2418        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2419        .collect();
2420
2421        test_create_catalog(values).await;
2422    }
2423
2424    #[tokio::test]
2425    #[ignore]
2426    async fn test_hive_catalog() {
2427        let values = [
2428            ("connector", "iceberg"),
2429            ("type", "append-only"),
2430            ("force_append_only", "true"),
2431            ("s3.endpoint", "http://127.0.0.1:9301"),
2432            ("s3.access.key", "hummockadmin"),
2433            ("s3.secret.key", "hummockadmin"),
2434            ("s3.region", "us-east-1"),
2435            ("s3.path.style.access", "true"),
2436            ("catalog.name", "demo"),
2437            ("catalog.type", "hive"),
2438            ("catalog.uri", "thrift://localhost:9083"),
2439            ("warehouse.path", "s3://icebergdata/demo"),
2440            ("database.name", "s1"),
2441            ("table.name", "t1"),
2442        ]
2443        .into_iter()
2444        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2445        .collect();
2446
2447        test_create_catalog(values).await;
2448    }
2449}