risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
27use iceberg::spec::{
28    DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
29};
30use iceberg::table::Table;
31use iceberg::transaction::Transaction;
32use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
33use iceberg::writer::base_writer::equality_delete_writer::{
34    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
35};
36use iceberg::writer::base_writer::sort_position_delete_writer::{
37    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
38};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41    DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::function_writer::equality_delta_writer::{
44    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
45};
46use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
47use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
48use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
49use itertools::Itertools;
50use parquet::file::properties::WriterProperties;
51use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
52use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
53use regex::Regex;
54use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
55use risingwave_common::array::arrow::arrow_schema_iceberg::{
56    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
57    Schema as ArrowSchema, SchemaRef,
58};
59use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
60use risingwave_common::array::{Op, StreamChunk};
61use risingwave_common::bail;
62use risingwave_common::bitmap::Bitmap;
63use risingwave_common::catalog::Schema;
64use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
65use risingwave_common_estimate_size::EstimateSize;
66use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::{
71    ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
72    Set,
73};
74use serde_derive::Deserialize;
75use serde_json::from_value;
76use serde_with::{DisplayFromStr, serde_as};
77use thiserror_ext::AsReport;
78use tokio::sync::mpsc::{self};
79use tokio::sync::oneshot;
80use tokio_retry::Retry;
81use tokio_retry::strategy::{ExponentialBackoff, jitter};
82use tracing::warn;
83use url::Url;
84use uuid::Uuid;
85use with_options::WithOptions;
86
87use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
88use super::{
89    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
90    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
91};
92use crate::connector_common::IcebergCommon;
93use crate::sink::coordinate::CoordinatedLogSinker;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99
100fn default_commit_retry_num() -> u32 {
101    8
102}
103
104#[serde_as]
105#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
106pub struct IcebergConfig {
107    pub r#type: String, // accept "append-only" or "upsert"
108
109    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
110    pub force_append_only: bool,
111
112    #[serde(flatten)]
113    common: IcebergCommon,
114
115    #[serde(
116        rename = "primary_key",
117        default,
118        deserialize_with = "deserialize_optional_string_seq_from_string"
119    )]
120    pub primary_key: Option<Vec<String>>,
121
122    // Props for java catalog props.
123    #[serde(skip)]
124    pub java_catalog_props: HashMap<String, String>,
125
126    #[serde(default)]
127    pub partition_by: Option<String>,
128
129    /// Commit every n(>0) checkpoints, default is 10.
130    #[serde(default = "default_commit_checkpoint_interval")]
131    #[serde_as(as = "DisplayFromStr")]
132    pub commit_checkpoint_interval: u64,
133
134    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
135    pub create_table_if_not_exists: bool,
136
137    /// Whether it is exactly_once, the default is not.
138    #[serde(default)]
139    #[serde_as(as = "Option<DisplayFromStr>")]
140    pub is_exactly_once: Option<bool>,
141    // Retry commit num when iceberg commit fail. default is 8.
142    // # TODO
143    // Iceberg table may store the retry commit num in table meta.
144    // We should try to find and use that as default commit retry num first.
145    #[serde(default = "default_commit_retry_num")]
146    pub commit_retry_num: u32,
147}
148
149impl IcebergConfig {
150    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
151        let mut config =
152            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
153                .map_err(|e| SinkError::Config(anyhow!(e)))?;
154
155        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
156            return Err(SinkError::Config(anyhow!(
157                "`{}` must be {}, or {}",
158                SINK_TYPE_OPTION,
159                SINK_TYPE_APPEND_ONLY,
160                SINK_TYPE_UPSERT
161            )));
162        }
163
164        if config.r#type == SINK_TYPE_UPSERT {
165            if let Some(primary_key) = &config.primary_key {
166                if primary_key.is_empty() {
167                    return Err(SinkError::Config(anyhow!(
168                        "`primary_key` must not be empty in {}",
169                        SINK_TYPE_UPSERT
170                    )));
171                }
172            } else {
173                return Err(SinkError::Config(anyhow!(
174                    "Must set `primary_key` in {}",
175                    SINK_TYPE_UPSERT
176                )));
177            }
178        }
179
180        // All configs start with "catalog." will be treated as java configs.
181        config.java_catalog_props = values
182            .iter()
183            .filter(|(k, _v)| {
184                k.starts_with("catalog.")
185                    && k != &"catalog.uri"
186                    && k != &"catalog.type"
187                    && k != &"catalog.name"
188            })
189            .map(|(k, v)| (k[8..].to_string(), v.to_string()))
190            .collect();
191
192        if config.commit_checkpoint_interval == 0 {
193            return Err(SinkError::Config(anyhow!(
194                "`commit_checkpoint_interval` must be greater than 0"
195            )));
196        }
197
198        Ok(config)
199    }
200
201    pub fn catalog_type(&self) -> &str {
202        self.common.catalog_type()
203    }
204
205    pub async fn load_table(&self) -> Result<Table> {
206        self.common
207            .load_table(&self.java_catalog_props)
208            .await
209            .map_err(Into::into)
210    }
211
212    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
213        self.common
214            .create_catalog(&self.java_catalog_props)
215            .await
216            .map_err(Into::into)
217    }
218
219    pub fn full_table_name(&self) -> Result<TableIdent> {
220        self.common.full_table_name().map_err(Into::into)
221    }
222}
223
224pub struct IcebergSink {
225    config: IcebergConfig,
226    param: SinkParam,
227    // In upsert mode, it never be None and empty.
228    unique_column_ids: Option<Vec<usize>>,
229}
230
231impl TryFrom<SinkParam> for IcebergSink {
232    type Error = SinkError;
233
234    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
235        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
236        IcebergSink::new(config, param)
237    }
238}
239
240impl Debug for IcebergSink {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        f.debug_struct("IcebergSink")
243            .field("config", &self.config)
244            .finish()
245    }
246}
247
248impl IcebergSink {
249    async fn create_and_validate_table(&self) -> Result<Table> {
250        if self.config.create_table_if_not_exists {
251            self.create_table_if_not_exists().await?;
252        }
253
254        let table = self
255            .config
256            .load_table()
257            .await
258            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
259
260        let sink_schema = self.param.schema();
261        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
262            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
263
264        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
265            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
266
267        Ok(table)
268    }
269
270    async fn create_table_if_not_exists(&self) -> Result<()> {
271        let catalog = self.config.create_catalog().await?;
272        let namespace = if let Some(database_name) = &self.config.common.database_name {
273            let namespace = NamespaceIdent::new(database_name.clone());
274            if !catalog
275                .namespace_exists(&namespace)
276                .await
277                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
278            {
279                catalog
280                    .create_namespace(&namespace, HashMap::default())
281                    .await
282                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
283                    .context("failed to create iceberg namespace")?;
284            }
285            namespace
286        } else {
287            bail!("database name must be set if you want to create table")
288        };
289
290        let table_id = self
291            .config
292            .full_table_name()
293            .context("Unable to parse table name")?;
294        if !catalog
295            .table_exists(&table_id)
296            .await
297            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
298        {
299            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
300            // convert risingwave schema -> arrow schema -> iceberg schema
301            let arrow_fields = self
302                .param
303                .columns
304                .iter()
305                .map(|column| {
306                    Ok(iceberg_create_table_arrow_convert
307                        .to_arrow_field(&column.name, &column.data_type)
308                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
309                        .context(format!(
310                            "failed to convert {}: {} to arrow type",
311                            &column.name, &column.data_type
312                        ))?)
313                })
314                .collect::<Result<Vec<ArrowField>>>()?;
315            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
316            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
317                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
318                .context("failed to convert arrow schema to iceberg schema")?;
319
320            let location = {
321                let mut names = namespace.clone().inner();
322                names.push(self.config.common.table_name.clone());
323                match &self.config.common.warehouse_path {
324                    Some(warehouse_path) => {
325                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
326                        let url = Url::parse(warehouse_path);
327                        if url.is_err() || is_s3_tables {
328                            // For rest catalog, the warehouse_path could be a warehouse name.
329                            // In this case, we should specify the location when creating a table.
330                            if self.config.common.catalog_type() == "rest"
331                                || self.config.common.catalog_type() == "rest_rust"
332                            {
333                                None
334                            } else {
335                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
336                            }
337                        } else if warehouse_path.ends_with('/') {
338                            Some(format!("{}{}", warehouse_path, names.join("/")))
339                        } else {
340                            Some(format!("{}/{}", warehouse_path, names.join("/")))
341                        }
342                    }
343                    None => None,
344                }
345            };
346
347            let partition_spec = match &self.config.partition_by {
348                Some(partition_field) => {
349                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
350                    // captures column, transform(column), transform(n,column), transform(n, column)
351                    let re = Regex::new(
352                        r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?",
353                    )
354                    .unwrap();
355                    if !re.is_match(partition_field) {
356                        bail!(format!(
357                            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
358                            partition_field
359                        ))
360                    }
361                    let caps = re.captures_iter(partition_field);
362                    for (i, mat) in caps.enumerate() {
363                        let (column, transform) =
364                            if mat.name("n").is_none() && mat.name("field").is_none() {
365                                (&mat["transform"], Transform::Identity)
366                            } else {
367                                let mut func = mat["transform"].to_owned();
368                                if func == "bucket" || func == "truncate" {
369                                    let n = &mat
370                                        .name("n")
371                                        .ok_or_else(|| {
372                                            SinkError::Iceberg(anyhow!(
373                                                "The `n` must be set with `bucket` and `truncate`"
374                                            ))
375                                        })?
376                                        .as_str();
377                                    func = format!("{func}[{n}]");
378                                }
379                                (
380                                    &mat["field"],
381                                    Transform::from_str(&func)
382                                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?,
383                                )
384                            };
385
386                        match iceberg_schema.field_id_by_name(column) {
387                            Some(id) => partition_fields.push(
388                                UnboundPartitionField::builder()
389                                    .source_id(id)
390                                    .transform(transform)
391                                    .name(column.to_owned())
392                                    .field_id(i as i32)
393                                    .build(),
394                            ),
395                            None => bail!(format!(
396                                "Partition source column does not exist in schema: {}",
397                                column
398                            )),
399                        };
400                    }
401                    Some(
402                        UnboundPartitionSpec::builder()
403                            .with_spec_id(0)
404                            .add_partition_fields(partition_fields)
405                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
406                            .context("failed to add partition columns")?
407                            .build(),
408                    )
409                }
410                None => None,
411            };
412
413            let table_creation_builder = TableCreation::builder()
414                .name(self.config.common.table_name.clone())
415                .schema(iceberg_schema);
416
417            let table_creation = match (location, partition_spec) {
418                (Some(location), Some(partition_spec)) => table_creation_builder
419                    .location(location)
420                    .partition_spec(partition_spec)
421                    .build(),
422                (Some(location), None) => table_creation_builder.location(location).build(),
423                (None, Some(partition_spec)) => table_creation_builder
424                    .partition_spec(partition_spec)
425                    .build(),
426                (None, None) => table_creation_builder.build(),
427            };
428
429            catalog
430                .create_table(&namespace, table_creation)
431                .await
432                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
433                .context("failed to create iceberg table")?;
434        }
435        Ok(())
436    }
437
438    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
439        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
440            if let Some(pk) = &config.primary_key {
441                let mut unique_column_ids = Vec::with_capacity(pk.len());
442                for col_name in pk {
443                    let id = param
444                        .columns
445                        .iter()
446                        .find(|col| col.name.as_str() == col_name)
447                        .ok_or_else(|| {
448                            SinkError::Config(anyhow!(
449                                "Primary key column {} not found in sink schema",
450                                col_name
451                            ))
452                        })?
453                        .column_id
454                        .get_id() as usize;
455                    unique_column_ids.push(id);
456                }
457                Some(unique_column_ids)
458            } else {
459                unreachable!()
460            }
461        } else {
462            None
463        };
464        Ok(Self {
465            config,
466            param,
467            unique_column_ids,
468        })
469    }
470}
471
472impl Sink for IcebergSink {
473    type Coordinator = IcebergSinkCommitter;
474    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
475
476    const SINK_NAME: &'static str = ICEBERG_SINK;
477
478    async fn validate(&self) -> Result<()> {
479        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
480            bail!("Snowflake catalog only supports iceberg sources");
481        }
482        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
483            risingwave_common::license::Feature::IcebergSinkWithGlue
484                .check_available()
485                .map_err(|e| anyhow::anyhow!(e))?;
486        }
487        let _ = self.create_and_validate_table().await?;
488        Ok(())
489    }
490
491    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
492        let table = self.create_and_validate_table().await?;
493        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
494            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
495        } else {
496            IcebergSinkWriter::new_append_only(table, &writer_param).await?
497        };
498
499        let commit_checkpoint_interval =
500            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
501                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
502            );
503        let writer = CoordinatedLogSinker::new(
504            &writer_param,
505            self.param.clone(),
506            inner,
507            commit_checkpoint_interval,
508        )
509        .await?;
510
511        Ok(writer)
512    }
513
514    fn is_coordinated_sink(&self) -> bool {
515        true
516    }
517
518    async fn new_coordinator(&self, db: DatabaseConnection) -> Result<Self::Coordinator> {
519        let catalog = self.config.create_catalog().await?;
520        let table = self.create_and_validate_table().await?;
521        // FIXME(Dylan): Disable EMR serverless compaction for now.
522        Ok(IcebergSinkCommitter {
523            catalog,
524            table,
525            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
526            last_commit_epoch: 0,
527            sink_id: self.param.sink_id.sink_id(),
528            config: self.config.clone(),
529            param: self.param.clone(),
530            db,
531            commit_notifier: None,
532            _compact_task_guard: None,
533            commit_retry_num: self.config.commit_retry_num,
534            committed_epoch_subscriber: None,
535        })
536    }
537}
538
539pub struct IcebergSinkWriter {
540    writer: IcebergWriterDispatch,
541    arrow_schema: SchemaRef,
542    // See comments below
543    metrics: IcebergWriterMetrics,
544    // State of iceberg table for this writer
545    table: Table,
546}
547
548#[allow(clippy::type_complexity)]
549enum IcebergWriterDispatch {
550    PartitionAppendOnly {
551        writer: Option<Box<dyn IcebergWriter>>,
552        writer_builder: MonitoredGeneralWriterBuilder<
553            FanoutPartitionWriterBuilder<
554                DataFileWriterBuilder<
555                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
556                >,
557            >,
558        >,
559    },
560    NonpartitionAppendOnly {
561        writer: Option<Box<dyn IcebergWriter>>,
562        writer_builder: MonitoredGeneralWriterBuilder<
563            DataFileWriterBuilder<
564                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
565            >,
566        >,
567    },
568    PartitionUpsert {
569        writer: Option<Box<dyn IcebergWriter>>,
570        writer_builder: MonitoredGeneralWriterBuilder<
571            FanoutPartitionWriterBuilder<
572                EqualityDeltaWriterBuilder<
573                    DataFileWriterBuilder<
574                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
575                    >,
576                    MonitoredPositionDeleteWriterBuilder<
577                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
578                    >,
579                    EqualityDeleteFileWriterBuilder<
580                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
581                    >,
582                >,
583            >,
584        >,
585        arrow_schema_with_op_column: SchemaRef,
586    },
587    NonpartitionUpsert {
588        writer: Option<Box<dyn IcebergWriter>>,
589        writer_builder: MonitoredGeneralWriterBuilder<
590            EqualityDeltaWriterBuilder<
591                DataFileWriterBuilder<
592                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
593                >,
594                MonitoredPositionDeleteWriterBuilder<
595                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
596                >,
597                EqualityDeleteFileWriterBuilder<
598                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
599                >,
600            >,
601        >,
602        arrow_schema_with_op_column: SchemaRef,
603    },
604}
605
606impl IcebergWriterDispatch {
607    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
608        match self {
609            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
610            | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
611            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
612            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
613        }
614    }
615}
616
617pub struct IcebergWriterMetrics {
618    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
619    // They are actually used in `PrometheusWriterBuilder`:
620    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
621    // We keep them here to let the guard cleans the labels from metrics registry when dropped
622    _write_qps: LabelGuardedIntCounter<3>,
623    _write_latency: LabelGuardedHistogram<3>,
624    write_bytes: LabelGuardedIntCounter<3>,
625}
626
627impl IcebergSinkWriter {
628    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
629        let SinkWriterParam {
630            extra_partition_col_idx,
631            actor_id,
632            sink_id,
633            sink_name,
634            ..
635        } = writer_param;
636        let metrics_labels = [
637            &actor_id.to_string(),
638            &sink_id.to_string(),
639            sink_name.as_str(),
640        ];
641
642        // Metrics
643        let write_qps = GLOBAL_SINK_METRICS
644            .iceberg_write_qps
645            .with_guarded_label_values(&metrics_labels);
646        let write_latency = GLOBAL_SINK_METRICS
647            .iceberg_write_latency
648            .with_guarded_label_values(&metrics_labels);
649        // # TODO
650        // Unused. Add this metrics later.
651        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
652            .iceberg_rolling_unflushed_data_file
653            .with_guarded_label_values(&metrics_labels);
654        let write_bytes = GLOBAL_SINK_METRICS
655            .iceberg_write_bytes
656            .with_guarded_label_values(&metrics_labels);
657
658        let schema = table.metadata().current_schema();
659        let partition_spec = table.metadata().default_partition_spec();
660
661        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
662        let unique_uuid_suffix = Uuid::now_v7();
663
664        let parquet_writer_builder = ParquetWriterBuilder::new(
665            WriterProperties::new(),
666            schema.clone(),
667            table.file_io().clone(),
668            DefaultLocationGenerator::new(table.metadata().clone())
669                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
670            DefaultFileNameGenerator::new(
671                writer_param.actor_id.to_string(),
672                Some(unique_uuid_suffix.to_string()),
673                iceberg::spec::DataFileFormat::Parquet,
674            ),
675        );
676        let data_file_builder =
677            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
678        if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
679            Err(SinkError::Iceberg(anyhow!(
680                "Extra partition column is not supported in append-only mode"
681            )))
682        } else if partition_spec.fields().is_empty() {
683            let writer_builder = MonitoredGeneralWriterBuilder::new(
684                data_file_builder,
685                write_qps.clone(),
686                write_latency.clone(),
687            );
688            let inner_writer = Some(Box::new(
689                writer_builder
690                    .clone()
691                    .build()
692                    .await
693                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
694            ) as Box<dyn IcebergWriter>);
695            Ok(Self {
696                arrow_schema: Arc::new(
697                    schema_to_arrow_schema(table.metadata().current_schema())
698                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
699                ),
700                metrics: IcebergWriterMetrics {
701                    _write_qps: write_qps,
702                    _write_latency: write_latency,
703                    write_bytes,
704                },
705                writer: IcebergWriterDispatch::NonpartitionAppendOnly {
706                    writer: inner_writer,
707                    writer_builder,
708                },
709                table,
710            })
711        } else {
712            let partition_builder = MonitoredGeneralWriterBuilder::new(
713                FanoutPartitionWriterBuilder::new(
714                    data_file_builder,
715                    partition_spec.clone(),
716                    schema.clone(),
717                )
718                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
719                write_qps.clone(),
720                write_latency.clone(),
721            );
722            let inner_writer = Some(Box::new(
723                partition_builder
724                    .clone()
725                    .build()
726                    .await
727                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
728            ) as Box<dyn IcebergWriter>);
729            Ok(Self {
730                arrow_schema: Arc::new(
731                    schema_to_arrow_schema(table.metadata().current_schema())
732                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
733                ),
734                metrics: IcebergWriterMetrics {
735                    _write_qps: write_qps,
736                    _write_latency: write_latency,
737                    write_bytes,
738                },
739                writer: IcebergWriterDispatch::PartitionAppendOnly {
740                    writer: inner_writer,
741                    writer_builder: partition_builder,
742                },
743                table,
744            })
745        }
746    }
747
748    pub async fn new_upsert(
749        table: Table,
750        unique_column_ids: Vec<usize>,
751        writer_param: &SinkWriterParam,
752    ) -> Result<Self> {
753        let SinkWriterParam {
754            extra_partition_col_idx,
755            actor_id,
756            sink_id,
757            sink_name,
758            ..
759        } = writer_param;
760        let metrics_labels = [
761            &actor_id.to_string(),
762            &sink_id.to_string(),
763            sink_name.as_str(),
764        ];
765        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
766
767        // Metrics
768        let write_qps = GLOBAL_SINK_METRICS
769            .iceberg_write_qps
770            .with_guarded_label_values(&metrics_labels);
771        let write_latency = GLOBAL_SINK_METRICS
772            .iceberg_write_latency
773            .with_guarded_label_values(&metrics_labels);
774        // # TODO
775        // Unused. Add this metrics later.
776        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
777            .iceberg_rolling_unflushed_data_file
778            .with_guarded_label_values(&metrics_labels);
779        let position_delete_cache_num = GLOBAL_SINK_METRICS
780            .iceberg_position_delete_cache_num
781            .with_guarded_label_values(&metrics_labels);
782        let write_bytes = GLOBAL_SINK_METRICS
783            .iceberg_write_bytes
784            .with_guarded_label_values(&metrics_labels);
785
786        // Determine the schema id and partition spec id
787        let schema = table.metadata().current_schema();
788        let partition_spec = table.metadata().default_partition_spec();
789
790        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
791        let unique_uuid_suffix = Uuid::now_v7();
792
793        let data_file_builder = {
794            let parquet_writer_builder = ParquetWriterBuilder::new(
795                WriterProperties::new(),
796                schema.clone(),
797                table.file_io().clone(),
798                DefaultLocationGenerator::new(table.metadata().clone())
799                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
800                DefaultFileNameGenerator::new(
801                    writer_param.actor_id.to_string(),
802                    Some(unique_uuid_suffix.to_string()),
803                    iceberg::spec::DataFileFormat::Parquet,
804                ),
805            );
806            DataFileWriterBuilder::new(
807                parquet_writer_builder.clone(),
808                None,
809                partition_spec.spec_id(),
810            )
811        };
812        let position_delete_builder = {
813            let parquet_writer_builder = ParquetWriterBuilder::new(
814                WriterProperties::new(),
815                POSITION_DELETE_SCHEMA.clone(),
816                table.file_io().clone(),
817                DefaultLocationGenerator::new(table.metadata().clone())
818                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
819                DefaultFileNameGenerator::new(
820                    writer_param.actor_id.to_string(),
821                    Some(format!("pos-del-{}", unique_uuid_suffix)),
822                    iceberg::spec::DataFileFormat::Parquet,
823                ),
824            );
825            MonitoredPositionDeleteWriterBuilder::new(
826                SortPositionDeleteWriterBuilder::new(
827                    parquet_writer_builder.clone(),
828                    writer_param
829                        .streaming_config
830                        .developer
831                        .iceberg_sink_positional_delete_cache_size,
832                    None,
833                    None,
834                ),
835                position_delete_cache_num,
836            )
837        };
838        let equality_delete_builder = {
839            let config = EqualityDeleteWriterConfig::new(
840                unique_column_ids.clone(),
841                table.metadata().current_schema().clone(),
842                None,
843                partition_spec.spec_id(),
844            )
845            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
846            let parquet_writer_builder = ParquetWriterBuilder::new(
847                WriterProperties::new(),
848                Arc::new(
849                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
850                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
851                ),
852                table.file_io().clone(),
853                DefaultLocationGenerator::new(table.metadata().clone())
854                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
855                DefaultFileNameGenerator::new(
856                    writer_param.actor_id.to_string(),
857                    Some(format!("eq-del-{}", unique_uuid_suffix)),
858                    iceberg::spec::DataFileFormat::Parquet,
859                ),
860            );
861
862            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
863        };
864        let delta_builder = EqualityDeltaWriterBuilder::new(
865            data_file_builder,
866            position_delete_builder,
867            equality_delete_builder,
868            unique_column_ids,
869        );
870        if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
871            Err(SinkError::Iceberg(anyhow!(
872                "Extra partition column is not supported in upsert mode"
873            )))
874        } else if partition_spec.fields().is_empty() {
875            let writer_builder = MonitoredGeneralWriterBuilder::new(
876                delta_builder,
877                write_qps.clone(),
878                write_latency.clone(),
879            );
880            let inner_writer = Some(Box::new(
881                writer_builder
882                    .clone()
883                    .build()
884                    .await
885                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
886            ) as Box<dyn IcebergWriter>);
887            let original_arrow_schema = Arc::new(
888                schema_to_arrow_schema(table.metadata().current_schema())
889                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
890            );
891            let schema_with_extra_op_column = {
892                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
893                new_fields.push(Arc::new(ArrowField::new(
894                    "op".to_owned(),
895                    ArrowDataType::Int32,
896                    false,
897                )));
898                Arc::new(ArrowSchema::new(new_fields))
899            };
900            Ok(Self {
901                arrow_schema: original_arrow_schema,
902                metrics: IcebergWriterMetrics {
903                    _write_qps: write_qps,
904                    _write_latency: write_latency,
905                    write_bytes,
906                },
907                table,
908                writer: IcebergWriterDispatch::NonpartitionUpsert {
909                    writer: inner_writer,
910                    writer_builder,
911                    arrow_schema_with_op_column: schema_with_extra_op_column,
912                },
913            })
914        } else {
915            let original_arrow_schema = Arc::new(
916                schema_to_arrow_schema(table.metadata().current_schema())
917                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
918            );
919            let schema_with_extra_op_column = {
920                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
921                new_fields.push(Arc::new(ArrowField::new(
922                    "op".to_owned(),
923                    ArrowDataType::Int32,
924                    false,
925                )));
926                Arc::new(ArrowSchema::new(new_fields))
927            };
928            let partition_builder = MonitoredGeneralWriterBuilder::new(
929                FanoutPartitionWriterBuilder::new_with_custom_schema(
930                    delta_builder,
931                    schema_with_extra_op_column.clone(),
932                    partition_spec.clone(),
933                    table.metadata().current_schema().clone(),
934                ),
935                write_qps.clone(),
936                write_latency.clone(),
937            );
938            let inner_writer = Some(Box::new(
939                partition_builder
940                    .clone()
941                    .build()
942                    .await
943                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
944            ) as Box<dyn IcebergWriter>);
945            Ok(Self {
946                arrow_schema: original_arrow_schema,
947                metrics: IcebergWriterMetrics {
948                    _write_qps: write_qps,
949                    _write_latency: write_latency,
950                    write_bytes,
951                },
952                table,
953                writer: IcebergWriterDispatch::PartitionUpsert {
954                    writer: inner_writer,
955                    writer_builder: partition_builder,
956                    arrow_schema_with_op_column: schema_with_extra_op_column,
957                },
958            })
959        }
960    }
961}
962
963#[async_trait]
964impl SinkWriter for IcebergSinkWriter {
965    type CommitMetadata = Option<SinkMetadata>;
966
967    /// Begin a new epoch
968    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
969        // Just skip it.
970        Ok(())
971    }
972
973    /// Write a stream chunk to sink
974    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
975        // Try to build writer if it's None.
976        match &mut self.writer {
977            IcebergWriterDispatch::PartitionAppendOnly {
978                writer,
979                writer_builder,
980            } => {
981                if writer.is_none() {
982                    *writer = Some(Box::new(
983                        writer_builder
984                            .clone()
985                            .build()
986                            .await
987                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
988                    ));
989                }
990            }
991            IcebergWriterDispatch::NonpartitionAppendOnly {
992                writer,
993                writer_builder,
994            } => {
995                if writer.is_none() {
996                    *writer = Some(Box::new(
997                        writer_builder
998                            .clone()
999                            .build()
1000                            .await
1001                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1002                    ));
1003                }
1004            }
1005            IcebergWriterDispatch::PartitionUpsert {
1006                writer,
1007                writer_builder,
1008                ..
1009            } => {
1010                if writer.is_none() {
1011                    *writer = Some(Box::new(
1012                        writer_builder
1013                            .clone()
1014                            .build()
1015                            .await
1016                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1017                    ));
1018                }
1019            }
1020            IcebergWriterDispatch::NonpartitionUpsert {
1021                writer,
1022                writer_builder,
1023                ..
1024            } => {
1025                if writer.is_none() {
1026                    *writer = Some(Box::new(
1027                        writer_builder
1028                            .clone()
1029                            .build()
1030                            .await
1031                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1032                    ));
1033                }
1034            }
1035        };
1036
1037        // Process the chunk.
1038        let (mut chunk, ops) = chunk.compact().into_parts();
1039        if ops.is_empty() {
1040            return Ok(());
1041        }
1042        let write_batch_size = chunk.estimated_heap_size();
1043        let batch = match &self.writer {
1044            IcebergWriterDispatch::PartitionAppendOnly { .. }
1045            | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1046                // separate out insert chunk
1047                let filters =
1048                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1049                chunk.set_visibility(filters);
1050                IcebergArrowConvert
1051                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1052                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1053            }
1054            IcebergWriterDispatch::PartitionUpsert {
1055                arrow_schema_with_op_column,
1056                ..
1057            }
1058            | IcebergWriterDispatch::NonpartitionUpsert {
1059                arrow_schema_with_op_column,
1060                ..
1061            } => {
1062                let chunk = IcebergArrowConvert
1063                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1064                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1065                let ops = Arc::new(Int32Array::from(
1066                    ops.iter()
1067                        .map(|op| match op {
1068                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1069                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1070                        })
1071                        .collect_vec(),
1072                ));
1073                let mut columns = chunk.columns().to_vec();
1074                columns.push(ops);
1075                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1076                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1077            }
1078        };
1079
1080        let writer = self.writer.get_writer().unwrap();
1081        writer
1082            .write(batch)
1083            .await
1084            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1085        self.metrics.write_bytes.inc_by(write_batch_size as _);
1086        Ok(())
1087    }
1088
1089    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1090    /// writer should commit the current epoch.
1091    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1092        // Skip it if not checkpoint
1093        if !is_checkpoint {
1094            return Ok(None);
1095        }
1096
1097        let close_result = match &mut self.writer {
1098            IcebergWriterDispatch::PartitionAppendOnly {
1099                writer,
1100                writer_builder,
1101            } => {
1102                let close_result = match writer.take() {
1103                    Some(mut writer) => Some(writer.close().await),
1104                    _ => None,
1105                };
1106                match writer_builder.clone().build().await {
1107                    Ok(new_writer) => {
1108                        *writer = Some(Box::new(new_writer));
1109                    }
1110                    _ => {
1111                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1112                        // here because current writer may close successfully. So we just log the error.
1113                        warn!("Failed to build new writer after close");
1114                    }
1115                }
1116                close_result
1117            }
1118            IcebergWriterDispatch::NonpartitionAppendOnly {
1119                writer,
1120                writer_builder,
1121            } => {
1122                let close_result = match writer.take() {
1123                    Some(mut writer) => Some(writer.close().await),
1124                    _ => None,
1125                };
1126                match writer_builder.clone().build().await {
1127                    Ok(new_writer) => {
1128                        *writer = Some(Box::new(new_writer));
1129                    }
1130                    _ => {
1131                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1132                        // here because current writer may close successfully. So we just log the error.
1133                        warn!("Failed to build new writer after close");
1134                    }
1135                }
1136                close_result
1137            }
1138            IcebergWriterDispatch::PartitionUpsert {
1139                writer,
1140                writer_builder,
1141                ..
1142            } => {
1143                let close_result = match writer.take() {
1144                    Some(mut writer) => Some(writer.close().await),
1145                    _ => None,
1146                };
1147                match writer_builder.clone().build().await {
1148                    Ok(new_writer) => {
1149                        *writer = Some(Box::new(new_writer));
1150                    }
1151                    _ => {
1152                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1153                        // here because current writer may close successfully. So we just log the error.
1154                        warn!("Failed to build new writer after close");
1155                    }
1156                }
1157                close_result
1158            }
1159            IcebergWriterDispatch::NonpartitionUpsert {
1160                writer,
1161                writer_builder,
1162                ..
1163            } => {
1164                let close_result = match writer.take() {
1165                    Some(mut writer) => Some(writer.close().await),
1166                    _ => None,
1167                };
1168                match writer_builder.clone().build().await {
1169                    Ok(new_writer) => {
1170                        *writer = Some(Box::new(new_writer));
1171                    }
1172                    _ => {
1173                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1174                        // here because current writer may close successfully. So we just log the error.
1175                        warn!("Failed to build new writer after close");
1176                    }
1177                }
1178                close_result
1179            }
1180        };
1181
1182        match close_result {
1183            Some(Ok(result)) => {
1184                let version = self.table.metadata().format_version() as u8;
1185                let partition_type = self.table.metadata().default_partition_type();
1186                let data_files = result
1187                    .into_iter()
1188                    .map(|f| {
1189                        SerializedDataFile::try_from(f, partition_type, version == 1)
1190                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1191                    })
1192                    .collect::<Result<Vec<_>>>()?;
1193                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1194                    data_files,
1195                    schema_id: self.table.metadata().current_schema_id(),
1196                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1197                })?))
1198            }
1199            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1200            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1201        }
1202    }
1203
1204    /// Clean up
1205    async fn abort(&mut self) -> Result<()> {
1206        // TODO: abort should clean up all the data written in this epoch.
1207        Ok(())
1208    }
1209}
1210
1211const SCHEMA_ID: &str = "schema_id";
1212const PARTITION_SPEC_ID: &str = "partition_spec_id";
1213const DATA_FILES: &str = "data_files";
1214
1215#[derive(Default, Clone)]
1216struct IcebergCommitResult {
1217    schema_id: i32,
1218    partition_spec_id: i32,
1219    data_files: Vec<SerializedDataFile>,
1220}
1221
1222impl IcebergCommitResult {
1223    fn try_from(value: &SinkMetadata) -> Result<Self> {
1224        if let Some(Serialized(v)) = &value.metadata {
1225            let mut values = if let serde_json::Value::Object(v) =
1226                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1227                    .context("Can't parse iceberg sink metadata")?
1228            {
1229                v
1230            } else {
1231                bail!("iceberg sink metadata should be an object");
1232            };
1233
1234            let schema_id;
1235            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1236                schema_id = value
1237                    .as_u64()
1238                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1239            } else {
1240                bail!("iceberg sink metadata should have schema_id");
1241            }
1242
1243            let partition_spec_id;
1244            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1245                partition_spec_id = value
1246                    .as_u64()
1247                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1248            } else {
1249                bail!("iceberg sink metadata should have partition_spec_id");
1250            }
1251
1252            let data_files: Vec<SerializedDataFile>;
1253            if let serde_json::Value::Array(values) = values
1254                .remove(DATA_FILES)
1255                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1256            {
1257                data_files = values
1258                    .into_iter()
1259                    .map(from_value::<SerializedDataFile>)
1260                    .collect::<std::result::Result<_, _>>()
1261                    .unwrap();
1262            } else {
1263                bail!("iceberg sink metadata should have data_files object");
1264            }
1265
1266            Ok(Self {
1267                schema_id: schema_id as i32,
1268                partition_spec_id: partition_spec_id as i32,
1269                data_files,
1270            })
1271        } else {
1272            bail!("Can't create iceberg sink write result from empty data!")
1273        }
1274    }
1275
1276    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1277        let mut values = if let serde_json::Value::Object(value) =
1278            serde_json::from_slice::<serde_json::Value>(&value)
1279                .context("Can't parse iceberg sink metadata")?
1280        {
1281            value
1282        } else {
1283            bail!("iceberg sink metadata should be an object");
1284        };
1285
1286        let schema_id;
1287        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1288            schema_id = value
1289                .as_u64()
1290                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1291        } else {
1292            bail!("iceberg sink metadata should have schema_id");
1293        }
1294
1295        let partition_spec_id;
1296        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1297            partition_spec_id = value
1298                .as_u64()
1299                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1300        } else {
1301            bail!("iceberg sink metadata should have partition_spec_id");
1302        }
1303
1304        let data_files: Vec<SerializedDataFile>;
1305        if let serde_json::Value::Array(values) = values
1306            .remove(DATA_FILES)
1307            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1308        {
1309            data_files = values
1310                .into_iter()
1311                .map(from_value::<SerializedDataFile>)
1312                .collect::<std::result::Result<_, _>>()
1313                .unwrap();
1314        } else {
1315            bail!("iceberg sink metadata should have data_files object");
1316        }
1317
1318        Ok(Self {
1319            schema_id: schema_id as i32,
1320            partition_spec_id: partition_spec_id as i32,
1321            data_files,
1322        })
1323    }
1324}
1325
1326impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1327    type Error = SinkError;
1328
1329    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1330        let json_data_files = serde_json::Value::Array(
1331            value
1332                .data_files
1333                .iter()
1334                .map(serde_json::to_value)
1335                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1336                .context("Can't serialize data files to json")?,
1337        );
1338        let json_value = serde_json::Value::Object(
1339            vec![
1340                (
1341                    SCHEMA_ID.to_owned(),
1342                    serde_json::Value::Number(value.schema_id.into()),
1343                ),
1344                (
1345                    PARTITION_SPEC_ID.to_owned(),
1346                    serde_json::Value::Number(value.partition_spec_id.into()),
1347                ),
1348                (DATA_FILES.to_owned(), json_data_files),
1349            ]
1350            .into_iter()
1351            .collect(),
1352        );
1353        Ok(SinkMetadata {
1354            metadata: Some(Serialized(SerializedMetadata {
1355                metadata: serde_json::to_vec(&json_value)
1356                    .context("Can't serialize iceberg sink metadata")?,
1357            })),
1358        })
1359    }
1360}
1361
1362impl TryFrom<IcebergCommitResult> for Vec<u8> {
1363    type Error = SinkError;
1364
1365    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1366        let json_data_files = serde_json::Value::Array(
1367            value
1368                .data_files
1369                .iter()
1370                .map(serde_json::to_value)
1371                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1372                .context("Can't serialize data files to json")?,
1373        );
1374        let json_value = serde_json::Value::Object(
1375            vec![
1376                (
1377                    SCHEMA_ID.to_owned(),
1378                    serde_json::Value::Number(value.schema_id.into()),
1379                ),
1380                (
1381                    PARTITION_SPEC_ID.to_owned(),
1382                    serde_json::Value::Number(value.partition_spec_id.into()),
1383                ),
1384                (DATA_FILES.to_owned(), json_data_files),
1385            ]
1386            .into_iter()
1387            .collect(),
1388        );
1389        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1390    }
1391}
1392pub struct IcebergSinkCommitter {
1393    catalog: Arc<dyn Catalog>,
1394    table: Table,
1395    pub last_commit_epoch: u64,
1396    pub(crate) is_exactly_once: bool,
1397    pub(crate) sink_id: u32,
1398    pub(crate) config: IcebergConfig,
1399    pub(crate) param: SinkParam,
1400    pub(crate) db: DatabaseConnection,
1401    commit_notifier: Option<mpsc::UnboundedSender<()>>,
1402    commit_retry_num: u32,
1403    _compact_task_guard: Option<oneshot::Sender<()>>,
1404    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1405}
1406
1407impl IcebergSinkCommitter {
1408    // Reload table and guarantee current schema_id and partition_spec_id matches
1409    // given `schema_id` and `partition_spec_id`
1410    async fn reload_table(
1411        catalog: &dyn Catalog,
1412        table_ident: &TableIdent,
1413        schema_id: i32,
1414        partition_spec_id: i32,
1415    ) -> Result<Table> {
1416        let table = catalog
1417            .load_table(table_ident)
1418            .await
1419            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1420        if table.metadata().current_schema_id() != schema_id {
1421            return Err(SinkError::Iceberg(anyhow!(
1422                "Schema evolution not supported, expect schema id {}, but got {}",
1423                schema_id,
1424                table.metadata().current_schema_id()
1425            )));
1426        }
1427        if table.metadata().default_partition_spec_id() != partition_spec_id {
1428            return Err(SinkError::Iceberg(anyhow!(
1429                "Partition evolution not supported, expect partition spec id {}, but got {}",
1430                partition_spec_id,
1431                table.metadata().default_partition_spec_id()
1432            )));
1433        }
1434        Ok(table)
1435    }
1436}
1437
1438#[async_trait::async_trait]
1439impl SinkCommitCoordinator for IcebergSinkCommitter {
1440    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1441        if self.is_exactly_once {
1442            self.committed_epoch_subscriber = Some(subscriber);
1443            tracing::info!(
1444                "Sink id = {}: iceberg sink coordinator initing.",
1445                self.param.sink_id.sink_id()
1446            );
1447            if self
1448                .iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id())
1449                .await?
1450            {
1451                let ordered_metadata_list_by_end_epoch = self
1452                    .get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id())
1453                    .await?;
1454
1455                let mut last_recommit_epoch = 0;
1456                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1457                    ordered_metadata_list_by_end_epoch
1458                {
1459                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1460                    let mut write_results = vec![];
1461
1462                    for each in write_results_bytes {
1463                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1464                        write_results.push(write_result);
1465                    }
1466
1467                    match (
1468                        committed,
1469                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1470                            .await?,
1471                    ) {
1472                        (true, _) => {
1473                            tracing::info!(
1474                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1475                                self.param.sink_id.sink_id()
1476                            );
1477                        }
1478                        (false, true) => {
1479                            // skip
1480                            tracing::info!(
1481                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1482                                self.param.sink_id.sink_id()
1483                            );
1484                            self.mark_row_is_committed_by_sink_id_and_end_epoch(
1485                                &self.db,
1486                                self.sink_id,
1487                                end_epoch,
1488                            )
1489                            .await?;
1490                        }
1491                        (false, false) => {
1492                            tracing::info!(
1493                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1494                                self.param.sink_id.sink_id()
1495                            );
1496                            self.re_commit(end_epoch, write_results, snapshot_id)
1497                                .await?;
1498                        }
1499                    }
1500
1501                    last_recommit_epoch = end_epoch;
1502                }
1503                tracing::info!(
1504                    "Sink id = {}: iceberg commit coordinator inited.",
1505                    self.param.sink_id.sink_id()
1506                );
1507                return Ok(Some(last_recommit_epoch));
1508            } else {
1509                tracing::info!(
1510                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1511                    self.param.sink_id.sink_id()
1512                );
1513                return Ok(None);
1514            }
1515        }
1516
1517        tracing::info!("Iceberg commit coordinator inited.");
1518        return Ok(None);
1519    }
1520
1521    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1522        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1523        let write_results: Vec<IcebergCommitResult> = metadata
1524            .iter()
1525            .map(IcebergCommitResult::try_from)
1526            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1527
1528        // Skip if no data to commit
1529        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1530            tracing::debug!(?epoch, "no data to commit");
1531            return Ok(());
1532        }
1533
1534        // guarantee that all write results has same schema_id and partition_spec_id
1535        if write_results
1536            .iter()
1537            .any(|r| r.schema_id != write_results[0].schema_id)
1538            || write_results
1539                .iter()
1540                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1541        {
1542            return Err(SinkError::Iceberg(anyhow!(
1543                "schema_id and partition_spec_id should be the same in all write results"
1544            )));
1545        }
1546
1547        if self.is_exactly_once {
1548            assert!(self.committed_epoch_subscriber.is_some());
1549            match self.committed_epoch_subscriber.clone() {
1550                Some(committed_epoch_subscriber) => {
1551                    // Get the latest committed_epoch and the receiver
1552                    let (committed_epoch, mut rw_futures_utilrx) =
1553                        committed_epoch_subscriber(self.param.sink_id).await?;
1554                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1555                    if committed_epoch >= epoch {
1556                        self.commit_iceberg_inner(epoch, write_results, None)
1557                            .await?;
1558                    } else {
1559                        tracing::info!(
1560                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1561                            committed_epoch,
1562                            epoch
1563                        );
1564                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1565                            tracing::info!(
1566                                "Received next committed epoch: {}",
1567                                next_committed_epoch
1568                            );
1569                            // If next_epoch meets the condition, execute commit immediately
1570                            if next_committed_epoch >= epoch {
1571                                self.commit_iceberg_inner(epoch, write_results, None)
1572                                    .await?;
1573                                break;
1574                            }
1575                        }
1576                    }
1577                }
1578                None => unreachable!(
1579                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1580                ),
1581            }
1582        } else {
1583            self.commit_iceberg_inner(epoch, write_results, None)
1584                .await?;
1585        }
1586
1587        Ok(())
1588    }
1589}
1590
1591/// Methods Required to Achieve Exactly Once Semantics
1592impl IcebergSinkCommitter {
1593    async fn re_commit(
1594        &mut self,
1595        epoch: u64,
1596        write_results: Vec<IcebergCommitResult>,
1597        snapshot_id: i64,
1598    ) -> Result<()> {
1599        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1600
1601        // Skip if no data to commit
1602        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1603            tracing::debug!(?epoch, "no data to commit");
1604            return Ok(());
1605        }
1606        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1607            .await?;
1608        Ok(())
1609    }
1610
1611    async fn commit_iceberg_inner(
1612        &mut self,
1613        epoch: u64,
1614        write_results: Vec<IcebergCommitResult>,
1615        snapshot_id: Option<i64>,
1616    ) -> Result<()> {
1617        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1618        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1619        // that was previously persisted in the system table to commit.
1620        let is_first_commit = snapshot_id.is_none();
1621        if !is_first_commit {
1622            tracing::info!("Doing iceberg re commit.");
1623        }
1624        self.last_commit_epoch = epoch;
1625        let expect_schema_id = write_results[0].schema_id;
1626        let expect_partition_spec_id = write_results[0].partition_spec_id;
1627
1628        // Load the latest table to avoid concurrent modification with the best effort.
1629        self.table = Self::reload_table(
1630            self.catalog.as_ref(),
1631            self.table.identifier(),
1632            expect_schema_id,
1633            expect_partition_spec_id,
1634        )
1635        .await?;
1636        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1637            return Err(SinkError::Iceberg(anyhow!(
1638                "Can't find schema by id {}",
1639                expect_schema_id
1640            )));
1641        };
1642        let Some(partition_spec) = self
1643            .table
1644            .metadata()
1645            .partition_spec_by_id(expect_partition_spec_id)
1646        else {
1647            return Err(SinkError::Iceberg(anyhow!(
1648                "Can't find partition spec by id {}",
1649                expect_partition_spec_id
1650            )));
1651        };
1652        let partition_type = partition_spec
1653            .as_ref()
1654            .clone()
1655            .partition_type(schema)
1656            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1657
1658        let txn = Transaction::new(&self.table);
1659        // Only generate new snapshot id when first commit.
1660        let snapshot_id = match snapshot_id {
1661            Some(previous_snapshot_id) => previous_snapshot_id,
1662            None => txn.generate_unique_snapshot_id(),
1663        };
1664        if self.is_exactly_once && is_first_commit {
1665            // persist pre commit metadata and snapshot id in system table.
1666            let mut pre_commit_metadata_bytes = Vec::new();
1667            for each_parallelism_write_result in write_results.clone() {
1668                let each_parallelism_write_result_bytes: Vec<u8> =
1669                    each_parallelism_write_result.try_into()?;
1670                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1671            }
1672
1673            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1674
1675            self.persist_pre_commit_metadata(
1676                self.db.clone(),
1677                self.last_commit_epoch,
1678                epoch,
1679                pre_commit_metadata_bytes,
1680                snapshot_id,
1681            )
1682            .await?;
1683        }
1684
1685        let data_files = write_results
1686            .into_iter()
1687            .flat_map(|r| {
1688                r.data_files.into_iter().map(|f| {
1689                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1690                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1691                })
1692            })
1693            .collect::<Result<Vec<DataFile>>>()?;
1694        // # TODO:
1695        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1696        // because retry logic involved reapply the commit metadata.
1697        // For now, we just retry the commit operation.
1698        let retry_strategy = ExponentialBackoff::from_millis(10)
1699            .max_delay(Duration::from_secs(60))
1700            .map(jitter)
1701            .take(self.commit_retry_num as usize);
1702        let catalog = self.catalog.clone();
1703        let table_ident = self.table.identifier().clone();
1704        let table = Retry::spawn(retry_strategy, || async {
1705            let table = Self::reload_table(
1706                catalog.as_ref(),
1707                &table_ident,
1708                expect_schema_id,
1709                expect_partition_spec_id,
1710            )
1711            .await?;
1712            let txn = Transaction::new(&table);
1713            let mut append_action = txn
1714                .fast_append(Some(snapshot_id), None, vec![])
1715                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1716            append_action
1717                .add_data_files(data_files.clone())
1718                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1719            let tx = append_action.apply().await.map_err(|err| {
1720                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1721                SinkError::Iceberg(anyhow!(err))
1722            })?;
1723            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1724                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1725                SinkError::Iceberg(anyhow!(err))
1726            })
1727        })
1728        .await?;
1729        self.table = table;
1730
1731        if let Some(commit_notifier) = &mut self.commit_notifier {
1732            if commit_notifier.send(()).is_err() {
1733                warn!("failed to notify commit");
1734            }
1735        }
1736        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1737
1738        if self.is_exactly_once {
1739            self.mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1740                .await?;
1741            tracing::info!(
1742                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1743                self.sink_id,
1744                epoch
1745            );
1746
1747            self.delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1748                .await?;
1749        }
1750        Ok(())
1751    }
1752
1753    async fn persist_pre_commit_metadata(
1754        &self,
1755        db: DatabaseConnection,
1756        start_epoch: u64,
1757        end_epoch: u64,
1758        pre_commit_metadata: Vec<u8>,
1759        snapshot_id: i64,
1760    ) -> Result<()> {
1761        let m = exactly_once_iceberg_sink::ActiveModel {
1762            sink_id: Set(self.sink_id as i32),
1763            end_epoch: Set(end_epoch.try_into().unwrap()),
1764            start_epoch: Set(start_epoch.try_into().unwrap()),
1765            metadata: Set(pre_commit_metadata),
1766            committed: Set(false),
1767            snapshot_id: Set(snapshot_id),
1768        };
1769        match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
1770            Ok(_) => Ok(()),
1771            Err(e) => {
1772                tracing::error!("Error inserting into system table: {:?}", e.as_report());
1773                Err(e.into())
1774            }
1775        }
1776    }
1777
1778    async fn mark_row_is_committed_by_sink_id_and_end_epoch(
1779        &self,
1780        db: &DatabaseConnection,
1781        sink_id: u32,
1782        end_epoch: u64,
1783    ) -> Result<()> {
1784        match Entity::update(exactly_once_iceberg_sink::ActiveModel {
1785            sink_id: Set(sink_id as i32),
1786            end_epoch: Set(end_epoch.try_into().unwrap()),
1787            committed: Set(true),
1788            ..Default::default()
1789        })
1790        .exec(db)
1791        .await
1792        {
1793            Ok(_) => {
1794                tracing::info!(
1795                    "Sink id = {}: mark written data status to committed, end_epoch = {}.",
1796                    sink_id,
1797                    end_epoch
1798                );
1799                Ok(())
1800            }
1801            Err(e) => {
1802                tracing::error!(
1803                    "Error marking item to committed from iceberg exactly once system table: {:?}",
1804                    e.as_report()
1805                );
1806                Err(e.into())
1807            }
1808        }
1809    }
1810
1811    async fn delete_row_by_sink_id_and_end_epoch(
1812        &self,
1813        db: &DatabaseConnection,
1814        sink_id: u32,
1815        end_epoch: u64,
1816    ) -> Result<()> {
1817        let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
1818        match Entity::delete_many()
1819            .filter(Column::SinkId.eq(sink_id))
1820            .filter(Column::EndEpoch.lt(end_epoch_i64))
1821            .exec(db)
1822            .await
1823        {
1824            Ok(result) => {
1825                let deleted_count = result.rows_affected;
1826
1827                if deleted_count == 0 {
1828                    tracing::info!(
1829                        "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
1830                        sink_id,
1831                        end_epoch
1832                    );
1833                } else {
1834                    tracing::info!(
1835                        "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
1836                        sink_id,
1837                        end_epoch
1838                    );
1839                }
1840                Ok(())
1841            }
1842            Err(e) => {
1843                tracing::error!(
1844                    "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
1845                    sink_id,
1846                    e.as_report()
1847                );
1848                Err(e.into())
1849            }
1850        }
1851    }
1852
1853    async fn iceberg_sink_has_pre_commit_metadata(
1854        &self,
1855        db: &DatabaseConnection,
1856        sink_id: u32,
1857    ) -> Result<bool> {
1858        match exactly_once_iceberg_sink::Entity::find()
1859            .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id as i32))
1860            .count(db)
1861            .await
1862        {
1863            Ok(count) => Ok(count > 0),
1864            Err(e) => {
1865                tracing::error!(
1866                    "Error querying pre-commit metadata from system table: {:?}",
1867                    e.as_report()
1868                );
1869                Err(e.into())
1870            }
1871        }
1872    }
1873
1874    async fn get_pre_commit_info_by_sink_id(
1875        &self,
1876        db: &DatabaseConnection,
1877        sink_id: u32,
1878    ) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
1879        let models: Vec<Model> = Entity::find()
1880            .filter(Column::SinkId.eq(sink_id as i32))
1881            .order_by(Column::EndEpoch, Order::Asc)
1882            .all(db)
1883            .await?;
1884
1885        let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
1886
1887        for model in models {
1888            result.push((
1889                model.end_epoch.try_into().unwrap(),
1890                model.metadata,
1891                model.snapshot_id,
1892                model.committed,
1893            ));
1894        }
1895
1896        Ok(result)
1897    }
1898
1899    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1900    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1901    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
1902    async fn is_snapshot_id_in_iceberg(
1903        &self,
1904        iceberg_config: &IcebergConfig,
1905        snapshot_id: i64,
1906    ) -> Result<bool> {
1907        let iceberg_common = iceberg_config.common.clone();
1908        let table = iceberg_common
1909            .load_table(&iceberg_config.java_catalog_props)
1910            .await?;
1911        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1912            Ok(true)
1913        } else {
1914            Ok(false)
1915        }
1916    }
1917}
1918
1919const MAP_KEY: &str = "key";
1920const MAP_VALUE: &str = "value";
1921
1922fn get_fields<'a>(
1923    our_field_type: &'a risingwave_common::types::DataType,
1924    data_type: &ArrowDataType,
1925    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1926) -> Option<ArrowFields> {
1927    match data_type {
1928        ArrowDataType::Struct(fields) => {
1929            match our_field_type {
1930                risingwave_common::types::DataType::Struct(struct_fields) => {
1931                    struct_fields.iter().for_each(|(name, data_type)| {
1932                        let res = schema_fields.insert(name, data_type);
1933                        // This assert is to make sure there is no duplicate field name in the schema.
1934                        assert!(res.is_none())
1935                    });
1936                }
1937                risingwave_common::types::DataType::Map(map_fields) => {
1938                    schema_fields.insert(MAP_KEY, map_fields.key());
1939                    schema_fields.insert(MAP_VALUE, map_fields.value());
1940                }
1941                risingwave_common::types::DataType::List(list_field) => {
1942                    list_field.as_struct().iter().for_each(|(name, data_type)| {
1943                        let res = schema_fields.insert(name, data_type);
1944                        // This assert is to make sure there is no duplicate field name in the schema.
1945                        assert!(res.is_none())
1946                    });
1947                }
1948                _ => {}
1949            };
1950            Some(fields.clone())
1951        }
1952        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1953            get_fields(our_field_type, field.data_type(), schema_fields)
1954        }
1955        _ => None, // not a supported complex type and unlikely to show up
1956    }
1957}
1958
1959fn check_compatibility(
1960    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1961    fields: &ArrowFields,
1962) -> anyhow::Result<bool> {
1963    for arrow_field in fields {
1964        let our_field_type = schema_fields
1965            .get(arrow_field.name().as_str())
1966            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1967
1968        // Iceberg source should be able to read iceberg decimal type.
1969        let converted_arrow_data_type = IcebergArrowConvert
1970            .to_arrow_field("", our_field_type)
1971            .map_err(|e| anyhow!(e))?
1972            .data_type()
1973            .clone();
1974
1975        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1976            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1977            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1978            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1979            (ArrowDataType::List(_), ArrowDataType::List(field))
1980            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1981                let mut schema_fields = HashMap::new();
1982                get_fields(our_field_type, field.data_type(), &mut schema_fields)
1983                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1984            }
1985            // validate nested structs
1986            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1987                let mut schema_fields = HashMap::new();
1988                our_field_type
1989                    .as_struct()
1990                    .iter()
1991                    .for_each(|(name, data_type)| {
1992                        let res = schema_fields.insert(name, data_type);
1993                        // This assert is to make sure there is no duplicate field name in the schema.
1994                        assert!(res.is_none())
1995                    });
1996                check_compatibility(schema_fields, fields)?
1997            }
1998            // cases where left != right (metadata, field name mismatch)
1999            //
2000            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
2001            // {"PARQUET:field_id": ".."}
2002            //
2003            // map: The standard name in arrow is "entries", "key", "value".
2004            // in iceberg-rs, it's called "key_value"
2005            (left, right) => left.equals_datatype(right),
2006        };
2007        if !compatible {
2008            bail!(
2009                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2010                arrow_field.name(),
2011                converted_arrow_data_type,
2012                arrow_field.data_type()
2013            );
2014        }
2015    }
2016    Ok(true)
2017}
2018
2019/// Try to match our schema with iceberg schema.
2020pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2021    if rw_schema.fields.len() != arrow_schema.fields().len() {
2022        bail!(
2023            "Schema length mismatch, risingwave is {}, and iceberg is {}",
2024            rw_schema.fields.len(),
2025            arrow_schema.fields.len()
2026        );
2027    }
2028
2029    let mut schema_fields = HashMap::new();
2030    rw_schema.fields.iter().for_each(|field| {
2031        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2032        // This assert is to make sure there is no duplicate field name in the schema.
2033        assert!(res.is_none())
2034    });
2035
2036    check_compatibility(schema_fields, &arrow_schema.fields)?;
2037    Ok(())
2038}
2039
2040pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2041    serde_json::to_vec(&metadata).unwrap()
2042}
2043
2044pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2045    serde_json::from_slice(&bytes).unwrap()
2046}
2047
2048#[cfg(test)]
2049mod test {
2050    use std::collections::BTreeMap;
2051
2052    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2053    use risingwave_common::catalog::Field;
2054    use risingwave_common::types::{DataType, MapType, StructType};
2055
2056    use crate::connector_common::IcebergCommon;
2057    use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2058    use crate::sink::iceberg::IcebergConfig;
2059
2060    #[test]
2061    fn test_compatible_arrow_schema() {
2062        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2063
2064        use super::*;
2065        let risingwave_schema = Schema::new(vec![
2066            Field::with_name(DataType::Int32, "a"),
2067            Field::with_name(DataType::Int32, "b"),
2068            Field::with_name(DataType::Int32, "c"),
2069        ]);
2070        let arrow_schema = ArrowSchema::new(vec![
2071            ArrowField::new("a", ArrowDataType::Int32, false),
2072            ArrowField::new("b", ArrowDataType::Int32, false),
2073            ArrowField::new("c", ArrowDataType::Int32, false),
2074        ]);
2075
2076        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2077
2078        let risingwave_schema = Schema::new(vec![
2079            Field::with_name(DataType::Int32, "d"),
2080            Field::with_name(DataType::Int32, "c"),
2081            Field::with_name(DataType::Int32, "a"),
2082            Field::with_name(DataType::Int32, "b"),
2083        ]);
2084        let arrow_schema = ArrowSchema::new(vec![
2085            ArrowField::new("a", ArrowDataType::Int32, false),
2086            ArrowField::new("b", ArrowDataType::Int32, false),
2087            ArrowField::new("d", ArrowDataType::Int32, false),
2088            ArrowField::new("c", ArrowDataType::Int32, false),
2089        ]);
2090        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2091
2092        let risingwave_schema = Schema::new(vec![
2093            Field::with_name(
2094                DataType::Struct(StructType::new(vec![
2095                    ("a1", DataType::Int32),
2096                    (
2097                        "a2",
2098                        DataType::Struct(StructType::new(vec![
2099                            ("a21", DataType::Bytea),
2100                            (
2101                                "a22",
2102                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2103                            ),
2104                        ])),
2105                    ),
2106                ])),
2107                "a",
2108            ),
2109            Field::with_name(
2110                DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2111                    ("b1", DataType::Int32),
2112                    ("b2", DataType::Bytea),
2113                    (
2114                        "b3",
2115                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2116                    ),
2117                ])))),
2118                "b",
2119            ),
2120            Field::with_name(
2121                DataType::Map(MapType::from_kv(
2122                    DataType::Varchar,
2123                    DataType::List(Box::new(DataType::Struct(StructType::new([
2124                        ("c1", DataType::Int32),
2125                        ("c2", DataType::Bytea),
2126                        (
2127                            "c3",
2128                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2129                        ),
2130                    ])))),
2131                )),
2132                "c",
2133            ),
2134        ]);
2135        let arrow_schema = ArrowSchema::new(vec![
2136            ArrowField::new(
2137                "a",
2138                ArrowDataType::Struct(ArrowFields::from(vec![
2139                    ArrowField::new("a1", ArrowDataType::Int32, false),
2140                    ArrowField::new(
2141                        "a2",
2142                        ArrowDataType::Struct(ArrowFields::from(vec![
2143                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2144                            ArrowField::new_map(
2145                                "a22",
2146                                "entries",
2147                                ArrowFieldRef::new(ArrowField::new(
2148                                    "key",
2149                                    ArrowDataType::Utf8,
2150                                    false,
2151                                )),
2152                                ArrowFieldRef::new(ArrowField::new(
2153                                    "value",
2154                                    ArrowDataType::Utf8,
2155                                    false,
2156                                )),
2157                                false,
2158                                false,
2159                            ),
2160                        ])),
2161                        false,
2162                    ),
2163                ])),
2164                false,
2165            ),
2166            ArrowField::new(
2167                "b",
2168                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2169                    ArrowDataType::Struct(ArrowFields::from(vec![
2170                        ArrowField::new("b1", ArrowDataType::Int32, false),
2171                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2172                        ArrowField::new_map(
2173                            "b3",
2174                            "entries",
2175                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2176                            ArrowFieldRef::new(ArrowField::new(
2177                                "value",
2178                                ArrowDataType::Utf8,
2179                                false,
2180                            )),
2181                            false,
2182                            false,
2183                        ),
2184                    ])),
2185                    false,
2186                ))),
2187                false,
2188            ),
2189            ArrowField::new_map(
2190                "c",
2191                "entries",
2192                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2193                ArrowFieldRef::new(ArrowField::new(
2194                    "value",
2195                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2196                        ArrowDataType::Struct(ArrowFields::from(vec![
2197                            ArrowField::new("c1", ArrowDataType::Int32, false),
2198                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2199                            ArrowField::new_map(
2200                                "c3",
2201                                "entries",
2202                                ArrowFieldRef::new(ArrowField::new(
2203                                    "key",
2204                                    ArrowDataType::Utf8,
2205                                    false,
2206                                )),
2207                                ArrowFieldRef::new(ArrowField::new(
2208                                    "value",
2209                                    ArrowDataType::Utf8,
2210                                    false,
2211                                )),
2212                                false,
2213                                false,
2214                            ),
2215                        ])),
2216                        false,
2217                    ))),
2218                    false,
2219                )),
2220                false,
2221                false,
2222            ),
2223        ]);
2224        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2225    }
2226
2227    #[test]
2228    fn test_parse_iceberg_config() {
2229        let values = [
2230            ("connector", "iceberg"),
2231            ("type", "upsert"),
2232            ("primary_key", "v1"),
2233            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2234            ("warehouse.path", "s3://iceberg"),
2235            ("s3.endpoint", "http://127.0.0.1:9301"),
2236            ("s3.access.key", "hummockadmin"),
2237            ("s3.secret.key", "hummockadmin"),
2238            ("s3.path.style.access", "true"),
2239            ("s3.region", "us-east-1"),
2240            ("catalog.type", "jdbc"),
2241            ("catalog.name", "demo"),
2242            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2243            ("catalog.jdbc.user", "admin"),
2244            ("catalog.jdbc.password", "123456"),
2245            ("database.name", "demo_db"),
2246            ("table.name", "demo_table"),
2247        ]
2248        .into_iter()
2249        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2250        .collect();
2251
2252        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2253
2254        let expected_iceberg_config = IcebergConfig {
2255            common: IcebergCommon {
2256                warehouse_path: Some("s3://iceberg".to_owned()),
2257                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2258                region: Some("us-east-1".to_owned()),
2259                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2260                access_key: Some("hummockadmin".to_owned()),
2261                secret_key: Some("hummockadmin".to_owned()),
2262                gcs_credential: None,
2263                catalog_type: Some("jdbc".to_owned()),
2264                glue_id: None,
2265                catalog_name: Some("demo".to_owned()),
2266                database_name: Some("demo_db".to_owned()),
2267                table_name: "demo_table".to_owned(),
2268                path_style_access: Some(true),
2269                credential: None,
2270                oauth2_server_uri: None,
2271                scope: None,
2272                token: None,
2273                enable_config_load: None,
2274                rest_signing_name: None,
2275                rest_signing_region: None,
2276                rest_sigv4_enabled: None,
2277            },
2278            r#type: "upsert".to_owned(),
2279            force_append_only: false,
2280            primary_key: Some(vec!["v1".to_owned()]),
2281            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2282            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2283                .into_iter()
2284                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2285                .collect(),
2286            commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2287            create_table_if_not_exists: false,
2288            is_exactly_once: None,
2289            commit_retry_num: 8,
2290        };
2291
2292        assert_eq!(iceberg_config, expected_iceberg_config);
2293
2294        assert_eq!(
2295            &iceberg_config.common.full_table_name().unwrap().to_string(),
2296            "demo_db.demo_table"
2297        );
2298    }
2299
2300    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2301        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2302
2303        let table = iceberg_config.load_table().await.unwrap();
2304
2305        println!("{:?}", table.identifier());
2306    }
2307
2308    #[tokio::test]
2309    #[ignore]
2310    async fn test_storage_catalog() {
2311        let values = [
2312            ("connector", "iceberg"),
2313            ("type", "append-only"),
2314            ("force_append_only", "true"),
2315            ("s3.endpoint", "http://127.0.0.1:9301"),
2316            ("s3.access.key", "hummockadmin"),
2317            ("s3.secret.key", "hummockadmin"),
2318            ("s3.region", "us-east-1"),
2319            ("s3.path.style.access", "true"),
2320            ("catalog.name", "demo"),
2321            ("catalog.type", "storage"),
2322            ("warehouse.path", "s3://icebergdata/demo"),
2323            ("database.name", "s1"),
2324            ("table.name", "t1"),
2325        ]
2326        .into_iter()
2327        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2328        .collect();
2329
2330        test_create_catalog(values).await;
2331    }
2332
2333    #[tokio::test]
2334    #[ignore]
2335    async fn test_rest_catalog() {
2336        let values = [
2337            ("connector", "iceberg"),
2338            ("type", "append-only"),
2339            ("force_append_only", "true"),
2340            ("s3.endpoint", "http://127.0.0.1:9301"),
2341            ("s3.access.key", "hummockadmin"),
2342            ("s3.secret.key", "hummockadmin"),
2343            ("s3.region", "us-east-1"),
2344            ("s3.path.style.access", "true"),
2345            ("catalog.name", "demo"),
2346            ("catalog.type", "rest"),
2347            ("catalog.uri", "http://192.168.167.4:8181"),
2348            ("warehouse.path", "s3://icebergdata/demo"),
2349            ("database.name", "s1"),
2350            ("table.name", "t1"),
2351        ]
2352        .into_iter()
2353        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2354        .collect();
2355
2356        test_create_catalog(values).await;
2357    }
2358
2359    #[tokio::test]
2360    #[ignore]
2361    async fn test_jdbc_catalog() {
2362        let values = [
2363            ("connector", "iceberg"),
2364            ("type", "append-only"),
2365            ("force_append_only", "true"),
2366            ("s3.endpoint", "http://127.0.0.1:9301"),
2367            ("s3.access.key", "hummockadmin"),
2368            ("s3.secret.key", "hummockadmin"),
2369            ("s3.region", "us-east-1"),
2370            ("s3.path.style.access", "true"),
2371            ("catalog.name", "demo"),
2372            ("catalog.type", "jdbc"),
2373            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2374            ("catalog.jdbc.user", "admin"),
2375            ("catalog.jdbc.password", "123456"),
2376            ("warehouse.path", "s3://icebergdata/demo"),
2377            ("database.name", "s1"),
2378            ("table.name", "t1"),
2379        ]
2380        .into_iter()
2381        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2382        .collect();
2383
2384        test_create_catalog(values).await;
2385    }
2386
2387    #[tokio::test]
2388    #[ignore]
2389    async fn test_hive_catalog() {
2390        let values = [
2391            ("connector", "iceberg"),
2392            ("type", "append-only"),
2393            ("force_append_only", "true"),
2394            ("s3.endpoint", "http://127.0.0.1:9301"),
2395            ("s3.access.key", "hummockadmin"),
2396            ("s3.secret.key", "hummockadmin"),
2397            ("s3.region", "us-east-1"),
2398            ("s3.path.style.access", "true"),
2399            ("catalog.name", "demo"),
2400            ("catalog.type", "hive"),
2401            ("catalog.uri", "thrift://localhost:9083"),
2402            ("warehouse.path", "s3://icebergdata/demo"),
2403            ("database.name", "s1"),
2404            ("table.name", "t1"),
2405        ]
2406        .into_iter()
2407        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2408        .collect();
2409
2410        test_create_catalog(values).await;
2411    }
2412}