risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29    DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
30};
31use iceberg::table::Table;
32use iceberg::transaction::Transaction;
33use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
34use iceberg::writer::base_writer::equality_delete_writer::{
35    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
36};
37use iceberg::writer::base_writer::sort_position_delete_writer::{
38    POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
39};
40use iceberg::writer::file_writer::ParquetWriterBuilder;
41use iceberg::writer::file_writer::location_generator::{
42    DefaultFileNameGenerator, DefaultLocationGenerator,
43};
44use iceberg::writer::function_writer::equality_delta_writer::{
45    DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
46};
47use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
48use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
49use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
50use itertools::Itertools;
51use parquet::file::properties::WriterProperties;
52use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
53use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58    Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::Schema;
65use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
66use risingwave_common_estimate_size::EstimateSize;
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::DatabaseConnection;
71use serde_derive::Deserialize;
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
84use super::{
85    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86    SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::iceberg::exactly_once_util::*;
93use crate::sink::writer::SinkWriter;
94use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
95use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
96
97pub const ICEBERG_SINK: &str = "iceberg";
98
99fn default_commit_retry_num() -> u32 {
100    8
101}
102
103#[serde_as]
104#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
105pub struct IcebergConfig {
106    pub r#type: String, // accept "append-only" or "upsert"
107
108    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
109    pub force_append_only: bool,
110
111    #[serde(flatten)]
112    common: IcebergCommon,
113
114    #[serde(
115        rename = "primary_key",
116        default,
117        deserialize_with = "deserialize_optional_string_seq_from_string"
118    )]
119    pub primary_key: Option<Vec<String>>,
120
121    // Props for java catalog props.
122    #[serde(skip)]
123    pub java_catalog_props: HashMap<String, String>,
124
125    #[serde(default)]
126    pub partition_by: Option<String>,
127
128    /// Commit every n(>0) checkpoints, default is 10.
129    #[serde(default = "default_commit_checkpoint_interval")]
130    #[serde_as(as = "DisplayFromStr")]
131    pub commit_checkpoint_interval: u64,
132
133    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
134    pub create_table_if_not_exists: bool,
135
136    /// Whether it is exactly_once, the default is not.
137    #[serde(default)]
138    #[serde_as(as = "Option<DisplayFromStr>")]
139    pub is_exactly_once: Option<bool>,
140    // Retry commit num when iceberg commit fail. default is 8.
141    // # TODO
142    // Iceberg table may store the retry commit num in table meta.
143    // We should try to find and use that as default commit retry num first.
144    #[serde(default = "default_commit_retry_num")]
145    pub commit_retry_num: u32,
146
147    /// Whether to enable iceberg compaction.
148    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
149    pub enable_compaction: bool,
150
151    /// The interval of iceberg compaction
152    #[serde(default)]
153    #[serde_as(as = "Option<DisplayFromStr>")]
154    pub compaction_interval_sec: Option<u64>,
155
156    /// Whether to enable iceberg expired snapshots.
157    #[serde(default, deserialize_with = "deserialize_bool_from_string")]
158    pub enable_snapshot_expiration: bool,
159}
160
161impl EnforceSecret for IcebergConfig {
162    fn enforce_secret<'a>(
163        prop_iter: impl Iterator<Item = &'a str>,
164    ) -> crate::error::ConnectorResult<()> {
165        for prop in prop_iter {
166            IcebergCommon::enforce_one(prop)?;
167        }
168        Ok(())
169    }
170
171    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
172        IcebergCommon::enforce_one(prop)
173    }
174}
175
176impl IcebergConfig {
177    pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
178        let mut config =
179            serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
180                .map_err(|e| SinkError::Config(anyhow!(e)))?;
181
182        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
183            return Err(SinkError::Config(anyhow!(
184                "`{}` must be {}, or {}",
185                SINK_TYPE_OPTION,
186                SINK_TYPE_APPEND_ONLY,
187                SINK_TYPE_UPSERT
188            )));
189        }
190
191        if config.r#type == SINK_TYPE_UPSERT {
192            if let Some(primary_key) = &config.primary_key {
193                if primary_key.is_empty() {
194                    return Err(SinkError::Config(anyhow!(
195                        "`primary_key` must not be empty in {}",
196                        SINK_TYPE_UPSERT
197                    )));
198                }
199            } else {
200                return Err(SinkError::Config(anyhow!(
201                    "Must set `primary_key` in {}",
202                    SINK_TYPE_UPSERT
203                )));
204            }
205        }
206
207        // All configs start with "catalog." will be treated as java configs.
208        config.java_catalog_props = values
209            .iter()
210            .filter(|(k, _v)| {
211                k.starts_with("catalog.")
212                    && k != &"catalog.uri"
213                    && k != &"catalog.type"
214                    && k != &"catalog.name"
215            })
216            .map(|(k, v)| (k[8..].to_string(), v.to_string()))
217            .collect();
218
219        if config.commit_checkpoint_interval == 0 {
220            return Err(SinkError::Config(anyhow!(
221                "`commit_checkpoint_interval` must be greater than 0"
222            )));
223        }
224
225        Ok(config)
226    }
227
228    pub fn catalog_type(&self) -> &str {
229        self.common.catalog_type()
230    }
231
232    pub async fn load_table(&self) -> Result<Table> {
233        self.common
234            .load_table(&self.java_catalog_props)
235            .await
236            .map_err(Into::into)
237    }
238
239    pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
240        self.common
241            .create_catalog(&self.java_catalog_props)
242            .await
243            .map_err(Into::into)
244    }
245
246    pub fn full_table_name(&self) -> Result<TableIdent> {
247        self.common.full_table_name().map_err(Into::into)
248    }
249
250    pub fn catalog_name(&self) -> String {
251        self.common.catalog_name()
252    }
253
254    pub fn compaction_interval_sec(&self) -> u64 {
255        // default to 1 hour
256        self.compaction_interval_sec.unwrap_or(3600)
257    }
258}
259
260pub struct IcebergSink {
261    config: IcebergConfig,
262    param: SinkParam,
263    // In upsert mode, it never be None and empty.
264    unique_column_ids: Option<Vec<usize>>,
265}
266
267impl EnforceSecret for IcebergSink {
268    fn enforce_secret<'a>(
269        prop_iter: impl Iterator<Item = &'a str>,
270    ) -> crate::error::ConnectorResult<()> {
271        for prop in prop_iter {
272            IcebergConfig::enforce_one(prop)?;
273        }
274        Ok(())
275    }
276}
277
278impl TryFrom<SinkParam> for IcebergSink {
279    type Error = SinkError;
280
281    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
282        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
283        IcebergSink::new(config, param)
284    }
285}
286
287impl Debug for IcebergSink {
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        f.debug_struct("IcebergSink")
290            .field("config", &self.config)
291            .finish()
292    }
293}
294
295impl IcebergSink {
296    async fn create_and_validate_table(&self) -> Result<Table> {
297        if self.config.create_table_if_not_exists {
298            self.create_table_if_not_exists().await?;
299        }
300
301        let table = self
302            .config
303            .load_table()
304            .await
305            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
306
307        let sink_schema = self.param.schema();
308        let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
309            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
310
311        try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
312            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
313
314        Ok(table)
315    }
316
317    async fn create_table_if_not_exists(&self) -> Result<()> {
318        let catalog = self.config.create_catalog().await?;
319        let namespace = if let Some(database_name) = &self.config.common.database_name {
320            let namespace = NamespaceIdent::new(database_name.clone());
321            if !catalog
322                .namespace_exists(&namespace)
323                .await
324                .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
325            {
326                catalog
327                    .create_namespace(&namespace, HashMap::default())
328                    .await
329                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
330                    .context("failed to create iceberg namespace")?;
331            }
332            namespace
333        } else {
334            bail!("database name must be set if you want to create table")
335        };
336
337        let table_id = self
338            .config
339            .full_table_name()
340            .context("Unable to parse table name")?;
341        if !catalog
342            .table_exists(&table_id)
343            .await
344            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
345        {
346            let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
347            // convert risingwave schema -> arrow schema -> iceberg schema
348            let arrow_fields = self
349                .param
350                .columns
351                .iter()
352                .map(|column| {
353                    Ok(iceberg_create_table_arrow_convert
354                        .to_arrow_field(&column.name, &column.data_type)
355                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
356                        .context(format!(
357                            "failed to convert {}: {} to arrow type",
358                            &column.name, &column.data_type
359                        ))?)
360                })
361                .collect::<Result<Vec<ArrowField>>>()?;
362            let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
363            let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
364                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
365                .context("failed to convert arrow schema to iceberg schema")?;
366
367            let location = {
368                let mut names = namespace.clone().inner();
369                names.push(self.config.common.table_name.clone());
370                match &self.config.common.warehouse_path {
371                    Some(warehouse_path) => {
372                        let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
373                        let url = Url::parse(warehouse_path);
374                        if url.is_err() || is_s3_tables {
375                            // For rest catalog, the warehouse_path could be a warehouse name.
376                            // In this case, we should specify the location when creating a table.
377                            if self.config.common.catalog_type() == "rest"
378                                || self.config.common.catalog_type() == "rest_rust"
379                            {
380                                None
381                            } else {
382                                bail!(format!("Invalid warehouse path: {}", warehouse_path))
383                            }
384                        } else if warehouse_path.ends_with('/') {
385                            Some(format!("{}{}", warehouse_path, names.join("/")))
386                        } else {
387                            Some(format!("{}/{}", warehouse_path, names.join("/")))
388                        }
389                    }
390                    None => None,
391                }
392            };
393
394            let partition_spec = match &self.config.partition_by {
395                Some(partition_by) => {
396                    let mut partition_fields = Vec::<UnboundPartitionField>::new();
397                    for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
398                        .into_iter()
399                        .enumerate()
400                    {
401                        match iceberg_schema.field_id_by_name(&column) {
402                            Some(id) => partition_fields.push(
403                                UnboundPartitionField::builder()
404                                    .source_id(id)
405                                    .transform(transform)
406                                    .name(format!("_p_{}", column))
407                                    .field_id(i as i32)
408                                    .build(),
409                            ),
410                            None => bail!(format!(
411                                "Partition source column does not exist in schema: {}",
412                                column
413                            )),
414                        };
415                    }
416                    Some(
417                        UnboundPartitionSpec::builder()
418                            .with_spec_id(0)
419                            .add_partition_fields(partition_fields)
420                            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
421                            .context("failed to add partition columns")?
422                            .build(),
423                    )
424                }
425                None => None,
426            };
427
428            let table_creation_builder = TableCreation::builder()
429                .name(self.config.common.table_name.clone())
430                .schema(iceberg_schema);
431
432            let table_creation = match (location, partition_spec) {
433                (Some(location), Some(partition_spec)) => table_creation_builder
434                    .location(location)
435                    .partition_spec(partition_spec)
436                    .build(),
437                (Some(location), None) => table_creation_builder.location(location).build(),
438                (None, Some(partition_spec)) => table_creation_builder
439                    .partition_spec(partition_spec)
440                    .build(),
441                (None, None) => table_creation_builder.build(),
442            };
443
444            catalog
445                .create_table(&namespace, table_creation)
446                .await
447                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
448                .context("failed to create iceberg table")?;
449        }
450        Ok(())
451    }
452
453    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
454        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
455            if let Some(pk) = &config.primary_key {
456                let mut unique_column_ids = Vec::with_capacity(pk.len());
457                for col_name in pk {
458                    let id = param
459                        .columns
460                        .iter()
461                        .find(|col| col.name.as_str() == col_name)
462                        .ok_or_else(|| {
463                            SinkError::Config(anyhow!(
464                                "Primary key column {} not found in sink schema",
465                                col_name
466                            ))
467                        })?
468                        .column_id
469                        .get_id() as usize;
470                    unique_column_ids.push(id);
471                }
472                Some(unique_column_ids)
473            } else {
474                unreachable!()
475            }
476        } else {
477            None
478        };
479        Ok(Self {
480            config,
481            param,
482            unique_column_ids,
483        })
484    }
485}
486
487impl Sink for IcebergSink {
488    type Coordinator = IcebergSinkCommitter;
489    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
490
491    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[
492        "commit_checkpoint_interval",
493        "enable_compaction",
494        "compaction_interval_sec",
495        "enable_snapshot_expiration",
496    ];
497    const SINK_NAME: &'static str = ICEBERG_SINK;
498
499    async fn validate(&self) -> Result<()> {
500        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
501            bail!("Snowflake catalog only supports iceberg sources");
502        }
503        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
504            risingwave_common::license::Feature::IcebergSinkWithGlue
505                .check_available()
506                .map_err(|e| anyhow::anyhow!(e))?;
507        }
508        if self.config.enable_compaction {
509            risingwave_common::license::Feature::IcebergCompaction
510                .check_available()
511                .map_err(|e| anyhow::anyhow!(e))?;
512        }
513
514        let _ = self.create_and_validate_table().await?;
515        Ok(())
516    }
517
518    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
519        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
520
521        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
522            if iceberg_config.enable_compaction {
523                if compaction_interval == 0 {
524                    bail!(
525                        "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
526                    );
527                }
528            } else {
529                bail!("`compaction_interval_sec` can only be set when `enable_compaction` is true");
530            }
531        }
532
533        Ok(())
534    }
535
536    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
537        let table = self.create_and_validate_table().await?;
538        let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
539            IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
540        } else {
541            IcebergSinkWriter::new_append_only(table, &writer_param).await?
542        };
543
544        let commit_checkpoint_interval =
545            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
546                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
547            );
548        let writer = CoordinatedLogSinker::new(
549            &writer_param,
550            self.param.clone(),
551            inner,
552            commit_checkpoint_interval,
553        )
554        .await?;
555
556        Ok(writer)
557    }
558
559    fn is_coordinated_sink(&self) -> bool {
560        true
561    }
562
563    async fn new_coordinator(
564        &self,
565        db: DatabaseConnection,
566        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
567    ) -> Result<Self::Coordinator> {
568        let catalog = self.config.create_catalog().await?;
569        let table = self.create_and_validate_table().await?;
570        Ok(IcebergSinkCommitter {
571            catalog,
572            table,
573            is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
574            last_commit_epoch: 0,
575            sink_id: self.param.sink_id.sink_id(),
576            config: self.config.clone(),
577            param: self.param.clone(),
578            db,
579            commit_retry_num: self.config.commit_retry_num,
580            committed_epoch_subscriber: None,
581            iceberg_compact_stat_sender,
582        })
583    }
584}
585
586/// None means no project.
587/// Prepare represent the extra partition column idx.
588/// Done represents the project idx vec.
589///
590/// The `ProjectIdxVec` will be late-evaluated. When we encounter the Prepare state first, we will use the data chunk schema
591/// to create the project idx vec.
592enum ProjectIdxVec {
593    None,
594    Prepare(usize),
595    Done(Vec<usize>),
596}
597
598pub struct IcebergSinkWriter {
599    writer: IcebergWriterDispatch,
600    arrow_schema: SchemaRef,
601    // See comments below
602    metrics: IcebergWriterMetrics,
603    // State of iceberg table for this writer
604    table: Table,
605    // For chunk with extra partition column, we should remove this column before write.
606    // This project index vec is used to avoid create project idx each time.
607    project_idx_vec: ProjectIdxVec,
608}
609
610#[allow(clippy::type_complexity)]
611enum IcebergWriterDispatch {
612    PartitionAppendOnly {
613        writer: Option<Box<dyn IcebergWriter>>,
614        writer_builder: MonitoredGeneralWriterBuilder<
615            FanoutPartitionWriterBuilder<
616                DataFileWriterBuilder<
617                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
618                >,
619            >,
620        >,
621    },
622    NonpartitionAppendOnly {
623        writer: Option<Box<dyn IcebergWriter>>,
624        writer_builder: MonitoredGeneralWriterBuilder<
625            DataFileWriterBuilder<
626                ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
627            >,
628        >,
629    },
630    PartitionUpsert {
631        writer: Option<Box<dyn IcebergWriter>>,
632        writer_builder: MonitoredGeneralWriterBuilder<
633            FanoutPartitionWriterBuilder<
634                EqualityDeltaWriterBuilder<
635                    DataFileWriterBuilder<
636                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
637                    >,
638                    MonitoredPositionDeleteWriterBuilder<
639                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
640                    >,
641                    EqualityDeleteFileWriterBuilder<
642                        ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
643                    >,
644                >,
645            >,
646        >,
647        arrow_schema_with_op_column: SchemaRef,
648    },
649    NonpartitionUpsert {
650        writer: Option<Box<dyn IcebergWriter>>,
651        writer_builder: MonitoredGeneralWriterBuilder<
652            EqualityDeltaWriterBuilder<
653                DataFileWriterBuilder<
654                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
655                >,
656                MonitoredPositionDeleteWriterBuilder<
657                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
658                >,
659                EqualityDeleteFileWriterBuilder<
660                    ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
661                >,
662            >,
663        >,
664        arrow_schema_with_op_column: SchemaRef,
665    },
666}
667
668impl IcebergWriterDispatch {
669    pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
670        match self {
671            IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
672            | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
673            | IcebergWriterDispatch::PartitionUpsert { writer, .. }
674            | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
675        }
676    }
677}
678
679pub struct IcebergWriterMetrics {
680    // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management.
681    // They are actually used in `PrometheusWriterBuilder`:
682    //     WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone())
683    // We keep them here to let the guard cleans the labels from metrics registry when dropped
684    _write_qps: LabelGuardedIntCounter,
685    _write_latency: LabelGuardedHistogram,
686    write_bytes: LabelGuardedIntCounter,
687}
688
689impl IcebergSinkWriter {
690    pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
691        let SinkWriterParam {
692            extra_partition_col_idx,
693            actor_id,
694            sink_id,
695            sink_name,
696            ..
697        } = writer_param;
698        let metrics_labels = [
699            &actor_id.to_string(),
700            &sink_id.to_string(),
701            sink_name.as_str(),
702        ];
703
704        // Metrics
705        let write_qps = GLOBAL_SINK_METRICS
706            .iceberg_write_qps
707            .with_guarded_label_values(&metrics_labels);
708        let write_latency = GLOBAL_SINK_METRICS
709            .iceberg_write_latency
710            .with_guarded_label_values(&metrics_labels);
711        // # TODO
712        // Unused. Add this metrics later.
713        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
714            .iceberg_rolling_unflushed_data_file
715            .with_guarded_label_values(&metrics_labels);
716        let write_bytes = GLOBAL_SINK_METRICS
717            .iceberg_write_bytes
718            .with_guarded_label_values(&metrics_labels);
719
720        let schema = table.metadata().current_schema();
721        let partition_spec = table.metadata().default_partition_spec();
722
723        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
724        let unique_uuid_suffix = Uuid::now_v7();
725
726        let parquet_writer_builder = ParquetWriterBuilder::new(
727            WriterProperties::new(),
728            schema.clone(),
729            table.file_io().clone(),
730            DefaultLocationGenerator::new(table.metadata().clone())
731                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
732            DefaultFileNameGenerator::new(
733                writer_param.actor_id.to_string(),
734                Some(unique_uuid_suffix.to_string()),
735                iceberg::spec::DataFileFormat::Parquet,
736            ),
737        );
738        let data_file_builder =
739            DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
740        if partition_spec.fields().is_empty() {
741            let writer_builder = MonitoredGeneralWriterBuilder::new(
742                data_file_builder,
743                write_qps.clone(),
744                write_latency.clone(),
745            );
746            let inner_writer = Some(Box::new(
747                writer_builder
748                    .clone()
749                    .build()
750                    .await
751                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
752            ) as Box<dyn IcebergWriter>);
753            Ok(Self {
754                arrow_schema: Arc::new(
755                    schema_to_arrow_schema(table.metadata().current_schema())
756                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
757                ),
758                metrics: IcebergWriterMetrics {
759                    _write_qps: write_qps,
760                    _write_latency: write_latency,
761                    write_bytes,
762                },
763                writer: IcebergWriterDispatch::NonpartitionAppendOnly {
764                    writer: inner_writer,
765                    writer_builder,
766                },
767                table,
768                project_idx_vec: {
769                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
770                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
771                    } else {
772                        ProjectIdxVec::None
773                    }
774                },
775            })
776        } else {
777            let partition_builder = MonitoredGeneralWriterBuilder::new(
778                FanoutPartitionWriterBuilder::new(
779                    data_file_builder,
780                    partition_spec.clone(),
781                    schema.clone(),
782                )
783                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
784                write_qps.clone(),
785                write_latency.clone(),
786            );
787            let inner_writer = Some(Box::new(
788                partition_builder
789                    .clone()
790                    .build()
791                    .await
792                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
793            ) as Box<dyn IcebergWriter>);
794            Ok(Self {
795                arrow_schema: Arc::new(
796                    schema_to_arrow_schema(table.metadata().current_schema())
797                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
798                ),
799                metrics: IcebergWriterMetrics {
800                    _write_qps: write_qps,
801                    _write_latency: write_latency,
802                    write_bytes,
803                },
804                writer: IcebergWriterDispatch::PartitionAppendOnly {
805                    writer: inner_writer,
806                    writer_builder: partition_builder,
807                },
808                table,
809                project_idx_vec: {
810                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
811                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
812                    } else {
813                        ProjectIdxVec::None
814                    }
815                },
816            })
817        }
818    }
819
820    pub async fn new_upsert(
821        table: Table,
822        unique_column_ids: Vec<usize>,
823        writer_param: &SinkWriterParam,
824    ) -> Result<Self> {
825        let SinkWriterParam {
826            extra_partition_col_idx,
827            actor_id,
828            sink_id,
829            sink_name,
830            ..
831        } = writer_param;
832        let metrics_labels = [
833            &actor_id.to_string(),
834            &sink_id.to_string(),
835            sink_name.as_str(),
836        ];
837        let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
838
839        // Metrics
840        let write_qps = GLOBAL_SINK_METRICS
841            .iceberg_write_qps
842            .with_guarded_label_values(&metrics_labels);
843        let write_latency = GLOBAL_SINK_METRICS
844            .iceberg_write_latency
845            .with_guarded_label_values(&metrics_labels);
846        // # TODO
847        // Unused. Add this metrics later.
848        let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
849            .iceberg_rolling_unflushed_data_file
850            .with_guarded_label_values(&metrics_labels);
851        let position_delete_cache_num = GLOBAL_SINK_METRICS
852            .iceberg_position_delete_cache_num
853            .with_guarded_label_values(&metrics_labels);
854        let write_bytes = GLOBAL_SINK_METRICS
855            .iceberg_write_bytes
856            .with_guarded_label_values(&metrics_labels);
857
858        // Determine the schema id and partition spec id
859        let schema = table.metadata().current_schema();
860        let partition_spec = table.metadata().default_partition_spec();
861
862        // To avoid duplicate file name, each time the sink created will generate a unique uuid as file name suffix.
863        let unique_uuid_suffix = Uuid::now_v7();
864
865        let data_file_builder = {
866            let parquet_writer_builder = ParquetWriterBuilder::new(
867                WriterProperties::new(),
868                schema.clone(),
869                table.file_io().clone(),
870                DefaultLocationGenerator::new(table.metadata().clone())
871                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
872                DefaultFileNameGenerator::new(
873                    writer_param.actor_id.to_string(),
874                    Some(unique_uuid_suffix.to_string()),
875                    iceberg::spec::DataFileFormat::Parquet,
876                ),
877            );
878            DataFileWriterBuilder::new(
879                parquet_writer_builder.clone(),
880                None,
881                partition_spec.spec_id(),
882            )
883        };
884        let position_delete_builder = {
885            let parquet_writer_builder = ParquetWriterBuilder::new(
886                WriterProperties::new(),
887                POSITION_DELETE_SCHEMA.clone(),
888                table.file_io().clone(),
889                DefaultLocationGenerator::new(table.metadata().clone())
890                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
891                DefaultFileNameGenerator::new(
892                    writer_param.actor_id.to_string(),
893                    Some(format!("pos-del-{}", unique_uuid_suffix)),
894                    iceberg::spec::DataFileFormat::Parquet,
895                ),
896            );
897            MonitoredPositionDeleteWriterBuilder::new(
898                SortPositionDeleteWriterBuilder::new(
899                    parquet_writer_builder.clone(),
900                    writer_param
901                        .streaming_config
902                        .developer
903                        .iceberg_sink_positional_delete_cache_size,
904                    None,
905                    None,
906                ),
907                position_delete_cache_num,
908            )
909        };
910        let equality_delete_builder = {
911            let config = EqualityDeleteWriterConfig::new(
912                unique_column_ids.clone(),
913                table.metadata().current_schema().clone(),
914                None,
915                partition_spec.spec_id(),
916            )
917            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
918            let parquet_writer_builder = ParquetWriterBuilder::new(
919                WriterProperties::new(),
920                Arc::new(
921                    arrow_schema_to_schema(config.projected_arrow_schema_ref())
922                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
923                ),
924                table.file_io().clone(),
925                DefaultLocationGenerator::new(table.metadata().clone())
926                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
927                DefaultFileNameGenerator::new(
928                    writer_param.actor_id.to_string(),
929                    Some(format!("eq-del-{}", unique_uuid_suffix)),
930                    iceberg::spec::DataFileFormat::Parquet,
931                ),
932            );
933
934            EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
935        };
936        let delta_builder = EqualityDeltaWriterBuilder::new(
937            data_file_builder,
938            position_delete_builder,
939            equality_delete_builder,
940            unique_column_ids,
941        );
942        if partition_spec.fields().is_empty() {
943            let writer_builder = MonitoredGeneralWriterBuilder::new(
944                delta_builder,
945                write_qps.clone(),
946                write_latency.clone(),
947            );
948            let inner_writer = Some(Box::new(
949                writer_builder
950                    .clone()
951                    .build()
952                    .await
953                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
954            ) as Box<dyn IcebergWriter>);
955            let original_arrow_schema = Arc::new(
956                schema_to_arrow_schema(table.metadata().current_schema())
957                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
958            );
959            let schema_with_extra_op_column = {
960                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
961                new_fields.push(Arc::new(ArrowField::new(
962                    "op".to_owned(),
963                    ArrowDataType::Int32,
964                    false,
965                )));
966                Arc::new(ArrowSchema::new(new_fields))
967            };
968            Ok(Self {
969                arrow_schema: original_arrow_schema,
970                metrics: IcebergWriterMetrics {
971                    _write_qps: write_qps,
972                    _write_latency: write_latency,
973                    write_bytes,
974                },
975                table,
976                writer: IcebergWriterDispatch::NonpartitionUpsert {
977                    writer: inner_writer,
978                    writer_builder,
979                    arrow_schema_with_op_column: schema_with_extra_op_column,
980                },
981                project_idx_vec: {
982                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
983                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
984                    } else {
985                        ProjectIdxVec::None
986                    }
987                },
988            })
989        } else {
990            let original_arrow_schema = Arc::new(
991                schema_to_arrow_schema(table.metadata().current_schema())
992                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
993            );
994            let schema_with_extra_op_column = {
995                let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
996                new_fields.push(Arc::new(ArrowField::new(
997                    "op".to_owned(),
998                    ArrowDataType::Int32,
999                    false,
1000                )));
1001                Arc::new(ArrowSchema::new(new_fields))
1002            };
1003            let partition_builder = MonitoredGeneralWriterBuilder::new(
1004                FanoutPartitionWriterBuilder::new_with_custom_schema(
1005                    delta_builder,
1006                    schema_with_extra_op_column.clone(),
1007                    partition_spec.clone(),
1008                    table.metadata().current_schema().clone(),
1009                ),
1010                write_qps.clone(),
1011                write_latency.clone(),
1012            );
1013            let inner_writer = Some(Box::new(
1014                partition_builder
1015                    .clone()
1016                    .build()
1017                    .await
1018                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1019            ) as Box<dyn IcebergWriter>);
1020            Ok(Self {
1021                arrow_schema: original_arrow_schema,
1022                metrics: IcebergWriterMetrics {
1023                    _write_qps: write_qps,
1024                    _write_latency: write_latency,
1025                    write_bytes,
1026                },
1027                table,
1028                writer: IcebergWriterDispatch::PartitionUpsert {
1029                    writer: inner_writer,
1030                    writer_builder: partition_builder,
1031                    arrow_schema_with_op_column: schema_with_extra_op_column,
1032                },
1033                project_idx_vec: {
1034                    if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1035                        ProjectIdxVec::Prepare(*extra_partition_col_idx)
1036                    } else {
1037                        ProjectIdxVec::None
1038                    }
1039                },
1040            })
1041        }
1042    }
1043}
1044
1045#[async_trait]
1046impl SinkWriter for IcebergSinkWriter {
1047    type CommitMetadata = Option<SinkMetadata>;
1048
1049    /// Begin a new epoch
1050    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1051        // Just skip it.
1052        Ok(())
1053    }
1054
1055    /// Write a stream chunk to sink
1056    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1057        // Try to build writer if it's None.
1058        match &mut self.writer {
1059            IcebergWriterDispatch::PartitionAppendOnly {
1060                writer,
1061                writer_builder,
1062            } => {
1063                if writer.is_none() {
1064                    *writer = Some(Box::new(
1065                        writer_builder
1066                            .clone()
1067                            .build()
1068                            .await
1069                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1070                    ));
1071                }
1072            }
1073            IcebergWriterDispatch::NonpartitionAppendOnly {
1074                writer,
1075                writer_builder,
1076            } => {
1077                if writer.is_none() {
1078                    *writer = Some(Box::new(
1079                        writer_builder
1080                            .clone()
1081                            .build()
1082                            .await
1083                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1084                    ));
1085                }
1086            }
1087            IcebergWriterDispatch::PartitionUpsert {
1088                writer,
1089                writer_builder,
1090                ..
1091            } => {
1092                if writer.is_none() {
1093                    *writer = Some(Box::new(
1094                        writer_builder
1095                            .clone()
1096                            .build()
1097                            .await
1098                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1099                    ));
1100                }
1101            }
1102            IcebergWriterDispatch::NonpartitionUpsert {
1103                writer,
1104                writer_builder,
1105                ..
1106            } => {
1107                if writer.is_none() {
1108                    *writer = Some(Box::new(
1109                        writer_builder
1110                            .clone()
1111                            .build()
1112                            .await
1113                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1114                    ));
1115                }
1116            }
1117        };
1118
1119        // Process the chunk.
1120        let (mut chunk, ops) = chunk.compact().into_parts();
1121        match &self.project_idx_vec {
1122            ProjectIdxVec::None => {}
1123            ProjectIdxVec::Prepare(idx) => {
1124                if *idx >= chunk.columns().len() {
1125                    return Err(SinkError::Iceberg(anyhow!(
1126                        "invalid extra partition column index {}",
1127                        idx
1128                    )));
1129                }
1130                let project_idx_vec = (0..*idx)
1131                    .chain(*idx + 1..chunk.columns().len())
1132                    .collect_vec();
1133                chunk = chunk.project(&project_idx_vec);
1134                self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1135            }
1136            ProjectIdxVec::Done(idx_vec) => {
1137                chunk = chunk.project(idx_vec);
1138            }
1139        }
1140        if ops.is_empty() {
1141            return Ok(());
1142        }
1143        let write_batch_size = chunk.estimated_heap_size();
1144        let batch = match &self.writer {
1145            IcebergWriterDispatch::PartitionAppendOnly { .. }
1146            | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1147                // separate out insert chunk
1148                let filters =
1149                    chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1150                chunk.set_visibility(filters);
1151                IcebergArrowConvert
1152                    .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1153                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1154            }
1155            IcebergWriterDispatch::PartitionUpsert {
1156                arrow_schema_with_op_column,
1157                ..
1158            }
1159            | IcebergWriterDispatch::NonpartitionUpsert {
1160                arrow_schema_with_op_column,
1161                ..
1162            } => {
1163                let chunk = IcebergArrowConvert
1164                    .to_record_batch(self.arrow_schema.clone(), &chunk)
1165                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1166                let ops = Arc::new(Int32Array::from(
1167                    ops.iter()
1168                        .map(|op| match op {
1169                            Op::UpdateInsert | Op::Insert => INSERT_OP,
1170                            Op::UpdateDelete | Op::Delete => DELETE_OP,
1171                        })
1172                        .collect_vec(),
1173                ));
1174                let mut columns = chunk.columns().to_vec();
1175                columns.push(ops);
1176                RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1177                    .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1178            }
1179        };
1180
1181        let writer = self.writer.get_writer().unwrap();
1182        writer
1183            .write(batch)
1184            .instrument_await("iceberg_write")
1185            .await
1186            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1187        self.metrics.write_bytes.inc_by(write_batch_size as _);
1188        Ok(())
1189    }
1190
1191    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
1192    /// writer should commit the current epoch.
1193    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1194        // Skip it if not checkpoint
1195        if !is_checkpoint {
1196            return Ok(None);
1197        }
1198
1199        let close_result = match &mut self.writer {
1200            IcebergWriterDispatch::PartitionAppendOnly {
1201                writer,
1202                writer_builder,
1203            } => {
1204                let close_result = match writer.take() {
1205                    Some(mut writer) => {
1206                        Some(writer.close().instrument_await("iceberg_close").await)
1207                    }
1208                    _ => None,
1209                };
1210                match writer_builder.clone().build().await {
1211                    Ok(new_writer) => {
1212                        *writer = Some(Box::new(new_writer));
1213                    }
1214                    _ => {
1215                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1216                        // here because current writer may close successfully. So we just log the error.
1217                        warn!("Failed to build new writer after close");
1218                    }
1219                }
1220                close_result
1221            }
1222            IcebergWriterDispatch::NonpartitionAppendOnly {
1223                writer,
1224                writer_builder,
1225            } => {
1226                let close_result = match writer.take() {
1227                    Some(mut writer) => {
1228                        Some(writer.close().instrument_await("iceberg_close").await)
1229                    }
1230                    _ => None,
1231                };
1232                match writer_builder.clone().build().await {
1233                    Ok(new_writer) => {
1234                        *writer = Some(Box::new(new_writer));
1235                    }
1236                    _ => {
1237                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1238                        // here because current writer may close successfully. So we just log the error.
1239                        warn!("Failed to build new writer after close");
1240                    }
1241                }
1242                close_result
1243            }
1244            IcebergWriterDispatch::PartitionUpsert {
1245                writer,
1246                writer_builder,
1247                ..
1248            } => {
1249                let close_result = match writer.take() {
1250                    Some(mut writer) => {
1251                        Some(writer.close().instrument_await("iceberg_close").await)
1252                    }
1253                    _ => None,
1254                };
1255                match writer_builder.clone().build().await {
1256                    Ok(new_writer) => {
1257                        *writer = Some(Box::new(new_writer));
1258                    }
1259                    _ => {
1260                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1261                        // here because current writer may close successfully. So we just log the error.
1262                        warn!("Failed to build new writer after close");
1263                    }
1264                }
1265                close_result
1266            }
1267            IcebergWriterDispatch::NonpartitionUpsert {
1268                writer,
1269                writer_builder,
1270                ..
1271            } => {
1272                let close_result = match writer.take() {
1273                    Some(mut writer) => {
1274                        Some(writer.close().instrument_await("iceberg_close").await)
1275                    }
1276                    _ => None,
1277                };
1278                match writer_builder.clone().build().await {
1279                    Ok(new_writer) => {
1280                        *writer = Some(Box::new(new_writer));
1281                    }
1282                    _ => {
1283                        // In this case, the writer is closed and we can't build a new writer. But we can't return the error
1284                        // here because current writer may close successfully. So we just log the error.
1285                        warn!("Failed to build new writer after close");
1286                    }
1287                }
1288                close_result
1289            }
1290        };
1291
1292        match close_result {
1293            Some(Ok(result)) => {
1294                let version = self.table.metadata().format_version() as u8;
1295                let partition_type = self.table.metadata().default_partition_type();
1296                let data_files = result
1297                    .into_iter()
1298                    .map(|f| {
1299                        SerializedDataFile::try_from(f, partition_type, version == 1)
1300                            .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1301                    })
1302                    .collect::<Result<Vec<_>>>()?;
1303                Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1304                    data_files,
1305                    schema_id: self.table.metadata().current_schema_id(),
1306                    partition_spec_id: self.table.metadata().default_partition_spec_id(),
1307                })?))
1308            }
1309            Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1310            None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1311        }
1312    }
1313
1314    /// Clean up
1315    async fn abort(&mut self) -> Result<()> {
1316        // TODO: abort should clean up all the data written in this epoch.
1317        Ok(())
1318    }
1319}
1320
1321const SCHEMA_ID: &str = "schema_id";
1322const PARTITION_SPEC_ID: &str = "partition_spec_id";
1323const DATA_FILES: &str = "data_files";
1324
1325#[derive(Default, Clone)]
1326struct IcebergCommitResult {
1327    schema_id: i32,
1328    partition_spec_id: i32,
1329    data_files: Vec<SerializedDataFile>,
1330}
1331
1332impl IcebergCommitResult {
1333    fn try_from(value: &SinkMetadata) -> Result<Self> {
1334        if let Some(Serialized(v)) = &value.metadata {
1335            let mut values = if let serde_json::Value::Object(v) =
1336                serde_json::from_slice::<serde_json::Value>(&v.metadata)
1337                    .context("Can't parse iceberg sink metadata")?
1338            {
1339                v
1340            } else {
1341                bail!("iceberg sink metadata should be an object");
1342            };
1343
1344            let schema_id;
1345            if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1346                schema_id = value
1347                    .as_u64()
1348                    .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1349            } else {
1350                bail!("iceberg sink metadata should have schema_id");
1351            }
1352
1353            let partition_spec_id;
1354            if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1355                partition_spec_id = value
1356                    .as_u64()
1357                    .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1358            } else {
1359                bail!("iceberg sink metadata should have partition_spec_id");
1360            }
1361
1362            let data_files: Vec<SerializedDataFile>;
1363            if let serde_json::Value::Array(values) = values
1364                .remove(DATA_FILES)
1365                .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1366            {
1367                data_files = values
1368                    .into_iter()
1369                    .map(from_value::<SerializedDataFile>)
1370                    .collect::<std::result::Result<_, _>>()
1371                    .unwrap();
1372            } else {
1373                bail!("iceberg sink metadata should have data_files object");
1374            }
1375
1376            Ok(Self {
1377                schema_id: schema_id as i32,
1378                partition_spec_id: partition_spec_id as i32,
1379                data_files,
1380            })
1381        } else {
1382            bail!("Can't create iceberg sink write result from empty data!")
1383        }
1384    }
1385
1386    fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1387        let mut values = if let serde_json::Value::Object(value) =
1388            serde_json::from_slice::<serde_json::Value>(&value)
1389                .context("Can't parse iceberg sink metadata")?
1390        {
1391            value
1392        } else {
1393            bail!("iceberg sink metadata should be an object");
1394        };
1395
1396        let schema_id;
1397        if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1398            schema_id = value
1399                .as_u64()
1400                .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1401        } else {
1402            bail!("iceberg sink metadata should have schema_id");
1403        }
1404
1405        let partition_spec_id;
1406        if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1407            partition_spec_id = value
1408                .as_u64()
1409                .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1410        } else {
1411            bail!("iceberg sink metadata should have partition_spec_id");
1412        }
1413
1414        let data_files: Vec<SerializedDataFile>;
1415        if let serde_json::Value::Array(values) = values
1416            .remove(DATA_FILES)
1417            .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1418        {
1419            data_files = values
1420                .into_iter()
1421                .map(from_value::<SerializedDataFile>)
1422                .collect::<std::result::Result<_, _>>()
1423                .unwrap();
1424        } else {
1425            bail!("iceberg sink metadata should have data_files object");
1426        }
1427
1428        Ok(Self {
1429            schema_id: schema_id as i32,
1430            partition_spec_id: partition_spec_id as i32,
1431            data_files,
1432        })
1433    }
1434}
1435
1436impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1437    type Error = SinkError;
1438
1439    fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1440        let json_data_files = serde_json::Value::Array(
1441            value
1442                .data_files
1443                .iter()
1444                .map(serde_json::to_value)
1445                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1446                .context("Can't serialize data files to json")?,
1447        );
1448        let json_value = serde_json::Value::Object(
1449            vec![
1450                (
1451                    SCHEMA_ID.to_owned(),
1452                    serde_json::Value::Number(value.schema_id.into()),
1453                ),
1454                (
1455                    PARTITION_SPEC_ID.to_owned(),
1456                    serde_json::Value::Number(value.partition_spec_id.into()),
1457                ),
1458                (DATA_FILES.to_owned(), json_data_files),
1459            ]
1460            .into_iter()
1461            .collect(),
1462        );
1463        Ok(SinkMetadata {
1464            metadata: Some(Serialized(SerializedMetadata {
1465                metadata: serde_json::to_vec(&json_value)
1466                    .context("Can't serialize iceberg sink metadata")?,
1467            })),
1468        })
1469    }
1470}
1471
1472impl TryFrom<IcebergCommitResult> for Vec<u8> {
1473    type Error = SinkError;
1474
1475    fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1476        let json_data_files = serde_json::Value::Array(
1477            value
1478                .data_files
1479                .iter()
1480                .map(serde_json::to_value)
1481                .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1482                .context("Can't serialize data files to json")?,
1483        );
1484        let json_value = serde_json::Value::Object(
1485            vec![
1486                (
1487                    SCHEMA_ID.to_owned(),
1488                    serde_json::Value::Number(value.schema_id.into()),
1489                ),
1490                (
1491                    PARTITION_SPEC_ID.to_owned(),
1492                    serde_json::Value::Number(value.partition_spec_id.into()),
1493                ),
1494                (DATA_FILES.to_owned(), json_data_files),
1495            ]
1496            .into_iter()
1497            .collect(),
1498        );
1499        Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1500    }
1501}
1502pub struct IcebergSinkCommitter {
1503    catalog: Arc<dyn Catalog>,
1504    table: Table,
1505    pub last_commit_epoch: u64,
1506    pub(crate) is_exactly_once: bool,
1507    pub(crate) sink_id: u32,
1508    pub(crate) config: IcebergConfig,
1509    pub(crate) param: SinkParam,
1510    pub(crate) db: DatabaseConnection,
1511    pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1512    commit_retry_num: u32,
1513    pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1514}
1515
1516impl IcebergSinkCommitter {
1517    // Reload table and guarantee current schema_id and partition_spec_id matches
1518    // given `schema_id` and `partition_spec_id`
1519    async fn reload_table(
1520        catalog: &dyn Catalog,
1521        table_ident: &TableIdent,
1522        schema_id: i32,
1523        partition_spec_id: i32,
1524    ) -> Result<Table> {
1525        let table = catalog
1526            .load_table(table_ident)
1527            .await
1528            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1529        if table.metadata().current_schema_id() != schema_id {
1530            return Err(SinkError::Iceberg(anyhow!(
1531                "Schema evolution not supported, expect schema id {}, but got {}",
1532                schema_id,
1533                table.metadata().current_schema_id()
1534            )));
1535        }
1536        if table.metadata().default_partition_spec_id() != partition_spec_id {
1537            return Err(SinkError::Iceberg(anyhow!(
1538                "Partition evolution not supported, expect partition spec id {}, but got {}",
1539                partition_spec_id,
1540                table.metadata().default_partition_spec_id()
1541            )));
1542        }
1543        Ok(table)
1544    }
1545}
1546
1547#[async_trait::async_trait]
1548impl SinkCommitCoordinator for IcebergSinkCommitter {
1549    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1550        if self.is_exactly_once {
1551            self.committed_epoch_subscriber = Some(subscriber);
1552            tracing::info!(
1553                "Sink id = {}: iceberg sink coordinator initing.",
1554                self.param.sink_id.sink_id()
1555            );
1556            if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1557                let ordered_metadata_list_by_end_epoch =
1558                    get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1559
1560                let mut last_recommit_epoch = 0;
1561                for (end_epoch, sealized_bytes, snapshot_id, committed) in
1562                    ordered_metadata_list_by_end_epoch
1563                {
1564                    let write_results_bytes = deserialize_metadata(sealized_bytes);
1565                    let mut write_results = vec![];
1566
1567                    for each in write_results_bytes {
1568                        let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1569                        write_results.push(write_result);
1570                    }
1571
1572                    match (
1573                        committed,
1574                        self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1575                            .await?,
1576                    ) {
1577                        (true, _) => {
1578                            tracing::info!(
1579                                "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1580                                self.param.sink_id.sink_id()
1581                            );
1582                        }
1583                        (false, true) => {
1584                            // skip
1585                            tracing::info!(
1586                                "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1587                                self.param.sink_id.sink_id()
1588                            );
1589                            mark_row_is_committed_by_sink_id_and_end_epoch(
1590                                &self.db,
1591                                self.sink_id,
1592                                end_epoch,
1593                            )
1594                            .await?;
1595                        }
1596                        (false, false) => {
1597                            tracing::info!(
1598                                "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1599                                self.param.sink_id.sink_id()
1600                            );
1601                            self.re_commit(end_epoch, write_results, snapshot_id)
1602                                .await?;
1603                        }
1604                    }
1605
1606                    last_recommit_epoch = end_epoch;
1607                }
1608                tracing::info!(
1609                    "Sink id = {}: iceberg commit coordinator inited.",
1610                    self.param.sink_id.sink_id()
1611                );
1612                return Ok(Some(last_recommit_epoch));
1613            } else {
1614                tracing::info!(
1615                    "Sink id = {}: init iceberg coodinator, and system table is empty.",
1616                    self.param.sink_id.sink_id()
1617                );
1618                return Ok(None);
1619            }
1620        }
1621
1622        tracing::info!("Iceberg commit coordinator inited.");
1623        return Ok(None);
1624    }
1625
1626    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1627        tracing::info!("Starting iceberg commit in epoch {epoch}.");
1628        let write_results: Vec<IcebergCommitResult> = metadata
1629            .iter()
1630            .map(IcebergCommitResult::try_from)
1631            .collect::<Result<Vec<IcebergCommitResult>>>()?;
1632
1633        // Skip if no data to commit
1634        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1635            tracing::debug!(?epoch, "no data to commit");
1636            return Ok(());
1637        }
1638
1639        // guarantee that all write results has same schema_id and partition_spec_id
1640        if write_results
1641            .iter()
1642            .any(|r| r.schema_id != write_results[0].schema_id)
1643            || write_results
1644                .iter()
1645                .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1646        {
1647            return Err(SinkError::Iceberg(anyhow!(
1648                "schema_id and partition_spec_id should be the same in all write results"
1649            )));
1650        }
1651
1652        if self.is_exactly_once {
1653            assert!(self.committed_epoch_subscriber.is_some());
1654            match self.committed_epoch_subscriber.clone() {
1655                Some(committed_epoch_subscriber) => {
1656                    // Get the latest committed_epoch and the receiver
1657                    let (committed_epoch, mut rw_futures_utilrx) =
1658                        committed_epoch_subscriber(self.param.sink_id).await?;
1659                    // The exactly once commit process needs to start after the data corresponding to the current epoch is persisted in the log store.
1660                    if committed_epoch >= epoch {
1661                        self.commit_iceberg_inner(epoch, write_results, None)
1662                            .await?;
1663                    } else {
1664                        tracing::info!(
1665                            "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1666                            committed_epoch,
1667                            epoch
1668                        );
1669                        while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1670                            tracing::info!(
1671                                "Received next committed epoch: {}",
1672                                next_committed_epoch
1673                            );
1674                            // If next_epoch meets the condition, execute commit immediately
1675                            if next_committed_epoch >= epoch {
1676                                self.commit_iceberg_inner(epoch, write_results, None)
1677                                    .await?;
1678                                break;
1679                            }
1680                        }
1681                    }
1682                }
1683                None => unreachable!(
1684                    "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1685                ),
1686            }
1687        } else {
1688            self.commit_iceberg_inner(epoch, write_results, None)
1689                .await?;
1690        }
1691
1692        Ok(())
1693    }
1694}
1695
1696/// Methods Required to Achieve Exactly Once Semantics
1697impl IcebergSinkCommitter {
1698    async fn re_commit(
1699        &mut self,
1700        epoch: u64,
1701        write_results: Vec<IcebergCommitResult>,
1702        snapshot_id: i64,
1703    ) -> Result<()> {
1704        tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1705
1706        // Skip if no data to commit
1707        if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1708            tracing::debug!(?epoch, "no data to commit");
1709            return Ok(());
1710        }
1711        self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1712            .await?;
1713        Ok(())
1714    }
1715
1716    async fn commit_iceberg_inner(
1717        &mut self,
1718        epoch: u64,
1719        write_results: Vec<IcebergCommitResult>,
1720        snapshot_id: Option<i64>,
1721    ) -> Result<()> {
1722        // If the provided `snapshot_id`` is not None, it indicates that this commit is a re commit
1723        // occurring during the recovery phase. In this case, we need to use the `snapshot_id`
1724        // that was previously persisted in the system table to commit.
1725        let is_first_commit = snapshot_id.is_none();
1726        self.last_commit_epoch = epoch;
1727        let expect_schema_id = write_results[0].schema_id;
1728        let expect_partition_spec_id = write_results[0].partition_spec_id;
1729
1730        // Load the latest table to avoid concurrent modification with the best effort.
1731        self.table = Self::reload_table(
1732            self.catalog.as_ref(),
1733            self.table.identifier(),
1734            expect_schema_id,
1735            expect_partition_spec_id,
1736        )
1737        .await?;
1738        let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1739            return Err(SinkError::Iceberg(anyhow!(
1740                "Can't find schema by id {}",
1741                expect_schema_id
1742            )));
1743        };
1744        let Some(partition_spec) = self
1745            .table
1746            .metadata()
1747            .partition_spec_by_id(expect_partition_spec_id)
1748        else {
1749            return Err(SinkError::Iceberg(anyhow!(
1750                "Can't find partition spec by id {}",
1751                expect_partition_spec_id
1752            )));
1753        };
1754        let partition_type = partition_spec
1755            .as_ref()
1756            .clone()
1757            .partition_type(schema)
1758            .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1759
1760        let txn = Transaction::new(&self.table);
1761        // Only generate new snapshot id when first commit.
1762        let snapshot_id = match snapshot_id {
1763            Some(previous_snapshot_id) => previous_snapshot_id,
1764            None => txn.generate_unique_snapshot_id(),
1765        };
1766        if self.is_exactly_once && is_first_commit {
1767            // persist pre commit metadata and snapshot id in system table.
1768            let mut pre_commit_metadata_bytes = Vec::new();
1769            for each_parallelism_write_result in write_results.clone() {
1770                let each_parallelism_write_result_bytes: Vec<u8> =
1771                    each_parallelism_write_result.try_into()?;
1772                pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1773            }
1774
1775            let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1776
1777            persist_pre_commit_metadata(
1778                self.sink_id,
1779                self.db.clone(),
1780                self.last_commit_epoch,
1781                epoch,
1782                pre_commit_metadata_bytes,
1783                snapshot_id,
1784            )
1785            .await?;
1786        }
1787
1788        let data_files = write_results
1789            .into_iter()
1790            .flat_map(|r| {
1791                r.data_files.into_iter().map(|f| {
1792                    f.try_into(expect_partition_spec_id, &partition_type, schema)
1793                        .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1794                })
1795            })
1796            .collect::<Result<Vec<DataFile>>>()?;
1797        // # TODO:
1798        // This retry behavior should be revert and do in iceberg-rust when it supports retry(Track in: https://github.com/apache/iceberg-rust/issues/964)
1799        // because retry logic involved reapply the commit metadata.
1800        // For now, we just retry the commit operation.
1801        let retry_strategy = ExponentialBackoff::from_millis(10)
1802            .max_delay(Duration::from_secs(60))
1803            .map(jitter)
1804            .take(self.commit_retry_num as usize);
1805        let catalog = self.catalog.clone();
1806        let table_ident = self.table.identifier().clone();
1807        let table = Retry::spawn(retry_strategy, || async {
1808            let table = Self::reload_table(
1809                catalog.as_ref(),
1810                &table_ident,
1811                expect_schema_id,
1812                expect_partition_spec_id,
1813            )
1814            .await?;
1815            let txn = Transaction::new(&table);
1816            let mut append_action = txn
1817                .fast_append(Some(snapshot_id), None, vec![])
1818                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1819            append_action
1820                .add_data_files(data_files.clone())
1821                .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1822            let tx = append_action.apply().await.map_err(|err| {
1823                tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1824                SinkError::Iceberg(anyhow!(err))
1825            })?;
1826            tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1827                tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1828                SinkError::Iceberg(anyhow!(err))
1829            })
1830        })
1831        .await?;
1832        self.table = table;
1833
1834        tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1835
1836        if self.is_exactly_once {
1837            mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1838            tracing::info!(
1839                "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1840                self.sink_id,
1841                epoch
1842            );
1843
1844            delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1845        }
1846        if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1847            && self.config.enable_compaction
1848            && iceberg_compact_stat_sender
1849                .send(IcebergSinkCompactionUpdate {
1850                    sink_id: SinkId::new(self.sink_id),
1851                    compaction_interval: self.config.compaction_interval_sec(),
1852                })
1853                .is_err()
1854        {
1855            warn!("failed to send iceberg compaction stats");
1856        }
1857
1858        Ok(())
1859    }
1860
1861    /// During pre-commit metadata, we record the `snapshot_id` corresponding to each batch of files.
1862    /// Therefore, the logic for checking whether all files in this batch are present in Iceberg
1863    /// has been changed to verifying if their corresponding `snapshot_id` exists in Iceberg.
1864    async fn is_snapshot_id_in_iceberg(
1865        &self,
1866        iceberg_config: &IcebergConfig,
1867        snapshot_id: i64,
1868    ) -> Result<bool> {
1869        let iceberg_common = iceberg_config.common.clone();
1870        let table = iceberg_common
1871            .load_table(&iceberg_config.java_catalog_props)
1872            .await?;
1873        if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1874            Ok(true)
1875        } else {
1876            Ok(false)
1877        }
1878    }
1879}
1880
1881const MAP_KEY: &str = "key";
1882const MAP_VALUE: &str = "value";
1883
1884fn get_fields<'a>(
1885    our_field_type: &'a risingwave_common::types::DataType,
1886    data_type: &ArrowDataType,
1887    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1888) -> Option<ArrowFields> {
1889    match data_type {
1890        ArrowDataType::Struct(fields) => {
1891            match our_field_type {
1892                risingwave_common::types::DataType::Struct(struct_fields) => {
1893                    struct_fields.iter().for_each(|(name, data_type)| {
1894                        let res = schema_fields.insert(name, data_type);
1895                        // This assert is to make sure there is no duplicate field name in the schema.
1896                        assert!(res.is_none())
1897                    });
1898                }
1899                risingwave_common::types::DataType::Map(map_fields) => {
1900                    schema_fields.insert(MAP_KEY, map_fields.key());
1901                    schema_fields.insert(MAP_VALUE, map_fields.value());
1902                }
1903                risingwave_common::types::DataType::List(list_field) => {
1904                    list_field.as_struct().iter().for_each(|(name, data_type)| {
1905                        let res = schema_fields.insert(name, data_type);
1906                        // This assert is to make sure there is no duplicate field name in the schema.
1907                        assert!(res.is_none())
1908                    });
1909                }
1910                _ => {}
1911            };
1912            Some(fields.clone())
1913        }
1914        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1915            get_fields(our_field_type, field.data_type(), schema_fields)
1916        }
1917        _ => None, // not a supported complex type and unlikely to show up
1918    }
1919}
1920
1921fn check_compatibility(
1922    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1923    fields: &ArrowFields,
1924) -> anyhow::Result<bool> {
1925    for arrow_field in fields {
1926        let our_field_type = schema_fields
1927            .get(arrow_field.name().as_str())
1928            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1929
1930        // Iceberg source should be able to read iceberg decimal type.
1931        let converted_arrow_data_type = IcebergArrowConvert
1932            .to_arrow_field("", our_field_type)
1933            .map_err(|e| anyhow!(e))?
1934            .data_type()
1935            .clone();
1936
1937        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1938            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1939            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1940            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1941            (ArrowDataType::List(_), ArrowDataType::List(field))
1942            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1943                let mut schema_fields = HashMap::new();
1944                get_fields(our_field_type, field.data_type(), &mut schema_fields)
1945                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1946            }
1947            // validate nested structs
1948            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1949                let mut schema_fields = HashMap::new();
1950                our_field_type
1951                    .as_struct()
1952                    .iter()
1953                    .for_each(|(name, data_type)| {
1954                        let res = schema_fields.insert(name, data_type);
1955                        // This assert is to make sure there is no duplicate field name in the schema.
1956                        assert!(res.is_none())
1957                    });
1958                check_compatibility(schema_fields, fields)?
1959            }
1960            // cases where left != right (metadata, field name mismatch)
1961            //
1962            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
1963            // {"PARQUET:field_id": ".."}
1964            //
1965            // map: The standard name in arrow is "entries", "key", "value".
1966            // in iceberg-rs, it's called "key_value"
1967            (left, right) => left.equals_datatype(right),
1968        };
1969        if !compatible {
1970            bail!(
1971                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
1972                arrow_field.name(),
1973                converted_arrow_data_type,
1974                arrow_field.data_type()
1975            );
1976        }
1977    }
1978    Ok(true)
1979}
1980
1981/// Try to match our schema with iceberg schema.
1982pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
1983    if rw_schema.fields.len() != arrow_schema.fields().len() {
1984        bail!(
1985            "Schema length mismatch, risingwave is {}, and iceberg is {}",
1986            rw_schema.fields.len(),
1987            arrow_schema.fields.len()
1988        );
1989    }
1990
1991    let mut schema_fields = HashMap::new();
1992    rw_schema.fields.iter().for_each(|field| {
1993        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
1994        // This assert is to make sure there is no duplicate field name in the schema.
1995        assert!(res.is_none())
1996    });
1997
1998    check_compatibility(schema_fields, &arrow_schema.fields)?;
1999    Ok(())
2000}
2001
2002pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2003    serde_json::to_vec(&metadata).unwrap()
2004}
2005
2006pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2007    serde_json::from_slice(&bytes).unwrap()
2008}
2009
2010pub fn parse_partition_by_exprs(
2011    expr: String,
2012) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2013    // captures column, transform(column), transform(n,column), transform(n, column)
2014    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2015    if !re.is_match(&expr) {
2016        bail!(format!(
2017            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2018            expr
2019        ))
2020    }
2021    let caps = re.captures_iter(&expr);
2022
2023    let mut partition_columns = vec![];
2024
2025    for mat in caps {
2026        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2027            (&mat["transform"], Transform::Identity)
2028        } else {
2029            let mut func = mat["transform"].to_owned();
2030            if func == "bucket" || func == "truncate" {
2031                let n = &mat
2032                    .name("n")
2033                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2034                    .as_str();
2035                func = format!("{func}[{n}]");
2036            }
2037            (
2038                &mat["field"],
2039                Transform::from_str(&func)
2040                    .with_context(|| format!("invalid transform function {}", func))?,
2041            )
2042        };
2043        partition_columns.push((column.to_owned(), transform));
2044    }
2045    Ok(partition_columns)
2046}
2047
2048#[cfg(test)]
2049mod test {
2050    use std::collections::BTreeMap;
2051
2052    use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2053    use risingwave_common::catalog::Field;
2054    use risingwave_common::types::{DataType, MapType, StructType};
2055
2056    use crate::connector_common::IcebergCommon;
2057    use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2058    use crate::sink::iceberg::IcebergConfig;
2059
2060    pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; // 1 hour
2061
2062    #[test]
2063    fn test_compatible_arrow_schema() {
2064        use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2065
2066        use super::*;
2067        let risingwave_schema = Schema::new(vec![
2068            Field::with_name(DataType::Int32, "a"),
2069            Field::with_name(DataType::Int32, "b"),
2070            Field::with_name(DataType::Int32, "c"),
2071        ]);
2072        let arrow_schema = ArrowSchema::new(vec![
2073            ArrowField::new("a", ArrowDataType::Int32, false),
2074            ArrowField::new("b", ArrowDataType::Int32, false),
2075            ArrowField::new("c", ArrowDataType::Int32, false),
2076        ]);
2077
2078        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2079
2080        let risingwave_schema = Schema::new(vec![
2081            Field::with_name(DataType::Int32, "d"),
2082            Field::with_name(DataType::Int32, "c"),
2083            Field::with_name(DataType::Int32, "a"),
2084            Field::with_name(DataType::Int32, "b"),
2085        ]);
2086        let arrow_schema = ArrowSchema::new(vec![
2087            ArrowField::new("a", ArrowDataType::Int32, false),
2088            ArrowField::new("b", ArrowDataType::Int32, false),
2089            ArrowField::new("d", ArrowDataType::Int32, false),
2090            ArrowField::new("c", ArrowDataType::Int32, false),
2091        ]);
2092        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2093
2094        let risingwave_schema = Schema::new(vec![
2095            Field::with_name(
2096                DataType::Struct(StructType::new(vec![
2097                    ("a1", DataType::Int32),
2098                    (
2099                        "a2",
2100                        DataType::Struct(StructType::new(vec![
2101                            ("a21", DataType::Bytea),
2102                            (
2103                                "a22",
2104                                DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2105                            ),
2106                        ])),
2107                    ),
2108                ])),
2109                "a",
2110            ),
2111            Field::with_name(
2112                DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2113                    ("b1", DataType::Int32),
2114                    ("b2", DataType::Bytea),
2115                    (
2116                        "b3",
2117                        DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2118                    ),
2119                ])))),
2120                "b",
2121            ),
2122            Field::with_name(
2123                DataType::Map(MapType::from_kv(
2124                    DataType::Varchar,
2125                    DataType::List(Box::new(DataType::Struct(StructType::new([
2126                        ("c1", DataType::Int32),
2127                        ("c2", DataType::Bytea),
2128                        (
2129                            "c3",
2130                            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2131                        ),
2132                    ])))),
2133                )),
2134                "c",
2135            ),
2136        ]);
2137        let arrow_schema = ArrowSchema::new(vec![
2138            ArrowField::new(
2139                "a",
2140                ArrowDataType::Struct(ArrowFields::from(vec![
2141                    ArrowField::new("a1", ArrowDataType::Int32, false),
2142                    ArrowField::new(
2143                        "a2",
2144                        ArrowDataType::Struct(ArrowFields::from(vec![
2145                            ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2146                            ArrowField::new_map(
2147                                "a22",
2148                                "entries",
2149                                ArrowFieldRef::new(ArrowField::new(
2150                                    "key",
2151                                    ArrowDataType::Utf8,
2152                                    false,
2153                                )),
2154                                ArrowFieldRef::new(ArrowField::new(
2155                                    "value",
2156                                    ArrowDataType::Utf8,
2157                                    false,
2158                                )),
2159                                false,
2160                                false,
2161                            ),
2162                        ])),
2163                        false,
2164                    ),
2165                ])),
2166                false,
2167            ),
2168            ArrowField::new(
2169                "b",
2170                ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2171                    ArrowDataType::Struct(ArrowFields::from(vec![
2172                        ArrowField::new("b1", ArrowDataType::Int32, false),
2173                        ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2174                        ArrowField::new_map(
2175                            "b3",
2176                            "entries",
2177                            ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2178                            ArrowFieldRef::new(ArrowField::new(
2179                                "value",
2180                                ArrowDataType::Utf8,
2181                                false,
2182                            )),
2183                            false,
2184                            false,
2185                        ),
2186                    ])),
2187                    false,
2188                ))),
2189                false,
2190            ),
2191            ArrowField::new_map(
2192                "c",
2193                "entries",
2194                ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2195                ArrowFieldRef::new(ArrowField::new(
2196                    "value",
2197                    ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2198                        ArrowDataType::Struct(ArrowFields::from(vec![
2199                            ArrowField::new("c1", ArrowDataType::Int32, false),
2200                            ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2201                            ArrowField::new_map(
2202                                "c3",
2203                                "entries",
2204                                ArrowFieldRef::new(ArrowField::new(
2205                                    "key",
2206                                    ArrowDataType::Utf8,
2207                                    false,
2208                                )),
2209                                ArrowFieldRef::new(ArrowField::new(
2210                                    "value",
2211                                    ArrowDataType::Utf8,
2212                                    false,
2213                                )),
2214                                false,
2215                                false,
2216                            ),
2217                        ])),
2218                        false,
2219                    ))),
2220                    false,
2221                )),
2222                false,
2223                false,
2224            ),
2225        ]);
2226        try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2227    }
2228
2229    #[test]
2230    fn test_parse_iceberg_config() {
2231        let values = [
2232            ("connector", "iceberg"),
2233            ("type", "upsert"),
2234            ("primary_key", "v1"),
2235            ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2236            ("warehouse.path", "s3://iceberg"),
2237            ("s3.endpoint", "http://127.0.0.1:9301"),
2238            ("s3.access.key", "hummockadmin"),
2239            ("s3.secret.key", "hummockadmin"),
2240            ("s3.path.style.access", "true"),
2241            ("s3.region", "us-east-1"),
2242            ("catalog.type", "jdbc"),
2243            ("catalog.name", "demo"),
2244            ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2245            ("catalog.jdbc.user", "admin"),
2246            ("catalog.jdbc.password", "123456"),
2247            ("database.name", "demo_db"),
2248            ("table.name", "demo_table"),
2249            ("enable_compaction", "true"),
2250            ("compaction_interval_sec", "1800"),
2251            ("enable_snapshot_expiration", "true"),
2252        ]
2253        .into_iter()
2254        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2255        .collect();
2256
2257        let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2258
2259        let expected_iceberg_config = IcebergConfig {
2260            common: IcebergCommon {
2261                warehouse_path: Some("s3://iceberg".to_owned()),
2262                catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2263                region: Some("us-east-1".to_owned()),
2264                endpoint: Some("http://127.0.0.1:9301".to_owned()),
2265                access_key: Some("hummockadmin".to_owned()),
2266                secret_key: Some("hummockadmin".to_owned()),
2267                gcs_credential: None,
2268                catalog_type: Some("jdbc".to_owned()),
2269                glue_id: None,
2270                catalog_name: Some("demo".to_owned()),
2271                database_name: Some("demo_db".to_owned()),
2272                table_name: "demo_table".to_owned(),
2273                path_style_access: Some(true),
2274                credential: None,
2275                oauth2_server_uri: None,
2276                scope: None,
2277                token: None,
2278                enable_config_load: None,
2279                rest_signing_name: None,
2280                rest_signing_region: None,
2281                rest_sigv4_enabled: None,
2282                hosted_catalog: None,
2283                azblob_account_name: None,
2284                azblob_account_key: None,
2285                azblob_endpoint_url: None,
2286            },
2287            r#type: "upsert".to_owned(),
2288            force_append_only: false,
2289            primary_key: Some(vec!["v1".to_owned()]),
2290            partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2291            java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2292                .into_iter()
2293                .map(|(k, v)| (k.to_owned(), v.to_owned()))
2294                .collect(),
2295            commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2296            create_table_if_not_exists: false,
2297            is_exactly_once: None,
2298            commit_retry_num: 8,
2299            enable_compaction: true,
2300            compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2301            enable_snapshot_expiration: true,
2302        };
2303
2304        assert_eq!(iceberg_config, expected_iceberg_config);
2305
2306        assert_eq!(
2307            &iceberg_config.common.full_table_name().unwrap().to_string(),
2308            "demo_db.demo_table"
2309        );
2310    }
2311
2312    async fn test_create_catalog(configs: BTreeMap<String, String>) {
2313        let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2314
2315        let table = iceberg_config.load_table().await.unwrap();
2316
2317        println!("{:?}", table.identifier());
2318    }
2319
2320    #[tokio::test]
2321    #[ignore]
2322    async fn test_storage_catalog() {
2323        let values = [
2324            ("connector", "iceberg"),
2325            ("type", "append-only"),
2326            ("force_append_only", "true"),
2327            ("s3.endpoint", "http://127.0.0.1:9301"),
2328            ("s3.access.key", "hummockadmin"),
2329            ("s3.secret.key", "hummockadmin"),
2330            ("s3.region", "us-east-1"),
2331            ("s3.path.style.access", "true"),
2332            ("catalog.name", "demo"),
2333            ("catalog.type", "storage"),
2334            ("warehouse.path", "s3://icebergdata/demo"),
2335            ("database.name", "s1"),
2336            ("table.name", "t1"),
2337        ]
2338        .into_iter()
2339        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2340        .collect();
2341
2342        test_create_catalog(values).await;
2343    }
2344
2345    #[tokio::test]
2346    #[ignore]
2347    async fn test_rest_catalog() {
2348        let values = [
2349            ("connector", "iceberg"),
2350            ("type", "append-only"),
2351            ("force_append_only", "true"),
2352            ("s3.endpoint", "http://127.0.0.1:9301"),
2353            ("s3.access.key", "hummockadmin"),
2354            ("s3.secret.key", "hummockadmin"),
2355            ("s3.region", "us-east-1"),
2356            ("s3.path.style.access", "true"),
2357            ("catalog.name", "demo"),
2358            ("catalog.type", "rest"),
2359            ("catalog.uri", "http://192.168.167.4:8181"),
2360            ("warehouse.path", "s3://icebergdata/demo"),
2361            ("database.name", "s1"),
2362            ("table.name", "t1"),
2363        ]
2364        .into_iter()
2365        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2366        .collect();
2367
2368        test_create_catalog(values).await;
2369    }
2370
2371    #[tokio::test]
2372    #[ignore]
2373    async fn test_jdbc_catalog() {
2374        let values = [
2375            ("connector", "iceberg"),
2376            ("type", "append-only"),
2377            ("force_append_only", "true"),
2378            ("s3.endpoint", "http://127.0.0.1:9301"),
2379            ("s3.access.key", "hummockadmin"),
2380            ("s3.secret.key", "hummockadmin"),
2381            ("s3.region", "us-east-1"),
2382            ("s3.path.style.access", "true"),
2383            ("catalog.name", "demo"),
2384            ("catalog.type", "jdbc"),
2385            ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2386            ("catalog.jdbc.user", "admin"),
2387            ("catalog.jdbc.password", "123456"),
2388            ("warehouse.path", "s3://icebergdata/demo"),
2389            ("database.name", "s1"),
2390            ("table.name", "t1"),
2391        ]
2392        .into_iter()
2393        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2394        .collect();
2395
2396        test_create_catalog(values).await;
2397    }
2398
2399    #[tokio::test]
2400    #[ignore]
2401    async fn test_hive_catalog() {
2402        let values = [
2403            ("connector", "iceberg"),
2404            ("type", "append-only"),
2405            ("force_append_only", "true"),
2406            ("s3.endpoint", "http://127.0.0.1:9301"),
2407            ("s3.access.key", "hummockadmin"),
2408            ("s3.secret.key", "hummockadmin"),
2409            ("s3.region", "us-east-1"),
2410            ("s3.path.style.access", "true"),
2411            ("catalog.name", "demo"),
2412            ("catalog.type", "hive"),
2413            ("catalog.uri", "thrift://localhost:9083"),
2414            ("warehouse.path", "s3://icebergdata/demo"),
2415            ("database.name", "s1"),
2416            ("table.name", "t1"),
2417        ]
2418        .into_iter()
2419        .map(|(k, v)| (k.to_owned(), v.to_owned()))
2420        .collect();
2421
2422        test_create_catalog(values).await;
2423    }
2424}