1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30 UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43 DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59 Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::iceberg_default_commit_checkpoint_interval;
85use super::{
86 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate, IcebergTableIdentifier};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112 "snapshot_expiration_clear_expired_meta_data";
113pub const MAX_SNAPSHOTS_NUM: &str = "max_snapshots_num_before_compaction";
114
115fn default_commit_retry_num() -> u32 {
116 8
117}
118
119fn default_iceberg_write_mode() -> String {
120 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
121}
122
123fn default_true() -> bool {
124 true
125}
126
127fn default_some_true() -> Option<bool> {
128 Some(true)
129}
130
131#[serde_as]
132#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
133pub struct IcebergConfig {
134 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
137 pub force_append_only: bool,
138
139 #[serde(flatten)]
140 common: IcebergCommon,
141
142 #[serde(flatten)]
143 table: IcebergTableIdentifier,
144
145 #[serde(
146 rename = "primary_key",
147 default,
148 deserialize_with = "deserialize_optional_string_seq_from_string"
149 )]
150 pub primary_key: Option<Vec<String>>,
151
152 #[serde(skip)]
154 pub java_catalog_props: HashMap<String, String>,
155
156 #[serde(default)]
157 pub partition_by: Option<String>,
158
159 #[serde(default = "iceberg_default_commit_checkpoint_interval")]
161 #[serde_as(as = "DisplayFromStr")]
162 #[with_option(allow_alter_on_fly)]
163 pub commit_checkpoint_interval: u64,
164
165 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
166 pub create_table_if_not_exists: bool,
167
168 #[serde(default = "default_some_true")]
170 #[serde_as(as = "Option<DisplayFromStr>")]
171 pub is_exactly_once: Option<bool>,
172 #[serde(default = "default_commit_retry_num")]
177 pub commit_retry_num: u32,
178
179 #[serde(
181 rename = "enable_compaction",
182 default,
183 deserialize_with = "deserialize_bool_from_string"
184 )]
185 #[with_option(allow_alter_on_fly)]
186 pub enable_compaction: bool,
187
188 #[serde(rename = "compaction_interval_sec", default)]
190 #[serde_as(as = "Option<DisplayFromStr>")]
191 #[with_option(allow_alter_on_fly)]
192 pub compaction_interval_sec: Option<u64>,
193
194 #[serde(
196 rename = "enable_snapshot_expiration",
197 default,
198 deserialize_with = "deserialize_bool_from_string"
199 )]
200 #[with_option(allow_alter_on_fly)]
201 pub enable_snapshot_expiration: bool,
202
203 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
205 pub write_mode: String,
206
207 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
210 #[serde_as(as = "Option<DisplayFromStr>")]
211 #[with_option(allow_alter_on_fly)]
212 pub snapshot_expiration_max_age_millis: Option<i64>,
213
214 #[serde(rename = "snapshot_expiration_retain_last", default)]
216 #[serde_as(as = "Option<DisplayFromStr>")]
217 #[with_option(allow_alter_on_fly)]
218 pub snapshot_expiration_retain_last: Option<i32>,
219
220 #[serde(
221 rename = "snapshot_expiration_clear_expired_files",
222 default = "default_true",
223 deserialize_with = "deserialize_bool_from_string"
224 )]
225 #[with_option(allow_alter_on_fly)]
226 pub snapshot_expiration_clear_expired_files: bool,
227
228 #[serde(
229 rename = "snapshot_expiration_clear_expired_meta_data",
230 default = "default_true",
231 deserialize_with = "deserialize_bool_from_string"
232 )]
233 #[with_option(allow_alter_on_fly)]
234 pub snapshot_expiration_clear_expired_meta_data: bool,
235
236 #[serde(rename = "max_snapshots_num_before_compaction", default)]
239 #[serde_as(as = "Option<DisplayFromStr>")]
240 #[with_option(allow_alter_on_fly)]
241 pub max_snapshots_num_before_compaction: Option<usize>,
242}
243
244impl EnforceSecret for IcebergConfig {
245 fn enforce_secret<'a>(
246 prop_iter: impl Iterator<Item = &'a str>,
247 ) -> crate::error::ConnectorResult<()> {
248 for prop in prop_iter {
249 IcebergCommon::enforce_one(prop)?;
250 }
251 Ok(())
252 }
253
254 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
255 IcebergCommon::enforce_one(prop)
256 }
257}
258
259impl IcebergConfig {
260 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
261 let mut config =
262 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
263 .map_err(|e| SinkError::Config(anyhow!(e)))?;
264
265 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
266 return Err(SinkError::Config(anyhow!(
267 "`{}` must be {}, or {}",
268 SINK_TYPE_OPTION,
269 SINK_TYPE_APPEND_ONLY,
270 SINK_TYPE_UPSERT
271 )));
272 }
273
274 if config.r#type == SINK_TYPE_UPSERT {
275 if let Some(primary_key) = &config.primary_key {
276 if primary_key.is_empty() {
277 return Err(SinkError::Config(anyhow!(
278 "`primary_key` must not be empty in {}",
279 SINK_TYPE_UPSERT
280 )));
281 }
282 } else {
283 return Err(SinkError::Config(anyhow!(
284 "Must set `primary_key` in {}",
285 SINK_TYPE_UPSERT
286 )));
287 }
288 }
289
290 config.java_catalog_props = values
292 .iter()
293 .filter(|(k, _v)| {
294 k.starts_with("catalog.")
295 && k != &"catalog.uri"
296 && k != &"catalog.type"
297 && k != &"catalog.name"
298 && k != &"catalog.header"
299 })
300 .map(|(k, v)| (k[8..].to_string(), v.clone()))
301 .collect();
302
303 if config.commit_checkpoint_interval == 0 {
304 return Err(SinkError::Config(anyhow!(
305 "`commit_checkpoint_interval` must be greater than 0"
306 )));
307 }
308
309 Ok(config)
310 }
311
312 pub fn catalog_type(&self) -> &str {
313 self.common.catalog_type()
314 }
315
316 pub async fn load_table(&self) -> Result<Table> {
317 self.common
318 .load_table(&self.table, &self.java_catalog_props)
319 .await
320 .map_err(Into::into)
321 }
322
323 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
324 self.common
325 .create_catalog(&self.java_catalog_props)
326 .await
327 .map_err(Into::into)
328 }
329
330 pub fn full_table_name(&self) -> Result<TableIdent> {
331 self.table.to_table_ident().map_err(Into::into)
332 }
333
334 pub fn catalog_name(&self) -> String {
335 self.common.catalog_name()
336 }
337
338 pub fn compaction_interval_sec(&self) -> u64 {
339 self.compaction_interval_sec.unwrap_or(3600)
341 }
342
343 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
346 self.snapshot_expiration_max_age_millis
347 .map(|max_age_millis| current_time_ms - max_age_millis)
348 }
349}
350
351pub struct IcebergSink {
352 pub config: IcebergConfig,
353 param: SinkParam,
354 unique_column_ids: Option<Vec<usize>>,
356}
357
358impl EnforceSecret for IcebergSink {
359 fn enforce_secret<'a>(
360 prop_iter: impl Iterator<Item = &'a str>,
361 ) -> crate::error::ConnectorResult<()> {
362 for prop in prop_iter {
363 IcebergConfig::enforce_one(prop)?;
364 }
365 Ok(())
366 }
367}
368
369impl TryFrom<SinkParam> for IcebergSink {
370 type Error = SinkError;
371
372 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
373 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
374 IcebergSink::new(config, param)
375 }
376}
377
378impl Debug for IcebergSink {
379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380 f.debug_struct("IcebergSink")
381 .field("config", &self.config)
382 .finish()
383 }
384}
385
386impl IcebergSink {
387 pub async fn create_and_validate_table(&self) -> Result<Table> {
388 if self.config.create_table_if_not_exists {
389 self.create_table_if_not_exists().await?;
390 }
391
392 let table = self
393 .config
394 .load_table()
395 .await
396 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
397
398 let sink_schema = self.param.schema();
399 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
400 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
401
402 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
403 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
404
405 Ok(table)
406 }
407
408 pub async fn create_table_if_not_exists(&self) -> Result<()> {
409 let catalog = self.config.create_catalog().await?;
410 let namespace = if let Some(database_name) = self.config.table.database_name() {
411 let namespace = NamespaceIdent::new(database_name.to_owned());
412 if !catalog
413 .namespace_exists(&namespace)
414 .await
415 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
416 {
417 catalog
418 .create_namespace(&namespace, HashMap::default())
419 .await
420 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
421 .context("failed to create iceberg namespace")?;
422 }
423 namespace
424 } else {
425 bail!("database name must be set if you want to create table")
426 };
427
428 let table_id = self
429 .config
430 .full_table_name()
431 .context("Unable to parse table name")?;
432 if !catalog
433 .table_exists(&table_id)
434 .await
435 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
436 {
437 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
438 let arrow_fields = self
440 .param
441 .columns
442 .iter()
443 .map(|column| {
444 Ok(iceberg_create_table_arrow_convert
445 .to_arrow_field(&column.name, &column.data_type)
446 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
447 .context(format!(
448 "failed to convert {}: {} to arrow type",
449 &column.name, &column.data_type
450 ))?)
451 })
452 .collect::<Result<Vec<ArrowField>>>()?;
453 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
454 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
455 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
456 .context("failed to convert arrow schema to iceberg schema")?;
457
458 let location = {
459 let mut names = namespace.clone().inner();
460 names.push(self.config.table.table_name().to_owned());
461 match &self.config.common.warehouse_path {
462 Some(warehouse_path) => {
463 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
464 let url = Url::parse(warehouse_path);
465 if url.is_err() || is_s3_tables {
466 if self.config.common.catalog_type() == "rest"
469 || self.config.common.catalog_type() == "rest_rust"
470 {
471 None
472 } else {
473 bail!(format!("Invalid warehouse path: {}", warehouse_path))
474 }
475 } else if warehouse_path.ends_with('/') {
476 Some(format!("{}{}", warehouse_path, names.join("/")))
477 } else {
478 Some(format!("{}/{}", warehouse_path, names.join("/")))
479 }
480 }
481 None => None,
482 }
483 };
484
485 let partition_spec = match &self.config.partition_by {
486 Some(partition_by) => {
487 let mut partition_fields = Vec::<UnboundPartitionField>::new();
488 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
489 .into_iter()
490 .enumerate()
491 {
492 match iceberg_schema.field_id_by_name(&column) {
493 Some(id) => partition_fields.push(
494 UnboundPartitionField::builder()
495 .source_id(id)
496 .transform(transform)
497 .name(format!("_p_{}", column))
498 .field_id(i as i32)
499 .build(),
500 ),
501 None => bail!(format!(
502 "Partition source column does not exist in schema: {}",
503 column
504 )),
505 };
506 }
507 Some(
508 UnboundPartitionSpec::builder()
509 .with_spec_id(0)
510 .add_partition_fields(partition_fields)
511 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
512 .context("failed to add partition columns")?
513 .build(),
514 )
515 }
516 None => None,
517 };
518
519 let table_creation_builder = TableCreation::builder()
520 .name(self.config.table.table_name().to_owned())
521 .schema(iceberg_schema);
522
523 let table_creation = match (location, partition_spec) {
524 (Some(location), Some(partition_spec)) => table_creation_builder
525 .location(location)
526 .partition_spec(partition_spec)
527 .build(),
528 (Some(location), None) => table_creation_builder.location(location).build(),
529 (None, Some(partition_spec)) => table_creation_builder
530 .partition_spec(partition_spec)
531 .build(),
532 (None, None) => table_creation_builder.build(),
533 };
534
535 catalog
536 .create_table(&namespace, table_creation)
537 .await
538 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
539 .context("failed to create iceberg table")?;
540 }
541 Ok(())
542 }
543
544 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
545 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
546 if let Some(pk) = &config.primary_key {
547 let mut unique_column_ids = Vec::with_capacity(pk.len());
548 for col_name in pk {
549 let id = param
550 .columns
551 .iter()
552 .find(|col| col.name.as_str() == col_name)
553 .ok_or_else(|| {
554 SinkError::Config(anyhow!(
555 "Primary key column {} not found in sink schema",
556 col_name
557 ))
558 })?
559 .column_id
560 .get_id() as usize;
561 unique_column_ids.push(id);
562 }
563 Some(unique_column_ids)
564 } else {
565 unreachable!()
566 }
567 } else {
568 None
569 };
570 Ok(Self {
571 config,
572 param,
573 unique_column_ids,
574 })
575 }
576}
577
578impl Sink for IcebergSink {
579 type Coordinator = IcebergSinkCommitter;
580 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
581
582 const SINK_NAME: &'static str = ICEBERG_SINK;
583
584 async fn validate(&self) -> Result<()> {
585 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
586 bail!("Snowflake catalog only supports iceberg sources");
587 }
588 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
589 risingwave_common::license::Feature::IcebergSinkWithGlue
590 .check_available()
591 .map_err(|e| anyhow::anyhow!(e))?;
592 }
593
594 let _ = self.create_and_validate_table().await?;
595 Ok(())
596 }
597
598 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
599 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
600
601 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
602 if iceberg_config.enable_compaction && compaction_interval == 0 {
603 bail!(
604 "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
605 );
606 } else {
607 tracing::info!(
609 "Alter config compaction_interval set to {} seconds",
610 compaction_interval
611 );
612 }
613 }
614
615 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
616 && max_snapshots < 1
617 {
618 bail!(
619 "`max_snapshots_num_before_compaction` must be greater than 0, got: {}",
620 max_snapshots
621 );
622 }
623
624 Ok(())
625 }
626
627 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
628 let table = self.create_and_validate_table().await?;
629 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
630 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
631 } else {
632 IcebergSinkWriter::new_append_only(table, &writer_param).await?
633 };
634
635 let commit_checkpoint_interval =
636 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
637 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
638 );
639 let writer = CoordinatedLogSinker::new(
640 &writer_param,
641 self.param.clone(),
642 inner,
643 commit_checkpoint_interval,
644 )
645 .await?;
646
647 Ok(writer)
648 }
649
650 fn is_coordinated_sink(&self) -> bool {
651 true
652 }
653
654 async fn new_coordinator(
655 &self,
656 db: DatabaseConnection,
657 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
658 ) -> Result<Self::Coordinator> {
659 let catalog = self.config.create_catalog().await?;
660 let table = self.create_and_validate_table().await?;
661 Ok(IcebergSinkCommitter {
662 catalog,
663 table,
664 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
665 last_commit_epoch: 0,
666 sink_id: self.param.sink_id.sink_id(),
667 config: self.config.clone(),
668 param: self.param.clone(),
669 db,
670 commit_retry_num: self.config.commit_retry_num,
671 committed_epoch_subscriber: None,
672 iceberg_compact_stat_sender,
673 })
674 }
675}
676
677enum ProjectIdxVec {
684 None,
685 Prepare(usize),
686 Done(Vec<usize>),
687}
688
689pub struct IcebergSinkWriter {
690 writer: IcebergWriterDispatch,
691 arrow_schema: SchemaRef,
692 metrics: IcebergWriterMetrics,
694 table: Table,
696 project_idx_vec: ProjectIdxVec,
699}
700
701#[allow(clippy::type_complexity)]
702enum IcebergWriterDispatch {
703 PartitionAppendOnly {
704 writer: Option<Box<dyn IcebergWriter>>,
705 writer_builder: MonitoredGeneralWriterBuilder<
706 FanoutPartitionWriterBuilder<
707 DataFileWriterBuilder<
708 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
709 >,
710 >,
711 >,
712 },
713 NonPartitionAppendOnly {
714 writer: Option<Box<dyn IcebergWriter>>,
715 writer_builder: MonitoredGeneralWriterBuilder<
716 DataFileWriterBuilder<
717 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
718 >,
719 >,
720 },
721 PartitionUpsert {
722 writer: Option<Box<dyn IcebergWriter>>,
723 writer_builder: MonitoredGeneralWriterBuilder<
724 FanoutPartitionWriterBuilder<
725 EqualityDeltaWriterBuilder<
726 DataFileWriterBuilder<
727 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
728 >,
729 MonitoredPositionDeleteWriterBuilder<
730 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
731 >,
732 EqualityDeleteFileWriterBuilder<
733 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
734 >,
735 >,
736 >,
737 >,
738 arrow_schema_with_op_column: SchemaRef,
739 },
740 NonpartitionUpsert {
741 writer: Option<Box<dyn IcebergWriter>>,
742 writer_builder: MonitoredGeneralWriterBuilder<
743 EqualityDeltaWriterBuilder<
744 DataFileWriterBuilder<
745 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
746 >,
747 MonitoredPositionDeleteWriterBuilder<
748 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
749 >,
750 EqualityDeleteFileWriterBuilder<
751 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
752 >,
753 >,
754 >,
755 arrow_schema_with_op_column: SchemaRef,
756 },
757}
758
759impl IcebergWriterDispatch {
760 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
761 match self {
762 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
763 | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
764 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
765 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
766 }
767 }
768}
769
770pub struct IcebergWriterMetrics {
771 _write_qps: LabelGuardedIntCounter,
776 _write_latency: LabelGuardedHistogram,
777 write_bytes: LabelGuardedIntCounter,
778}
779
780impl IcebergSinkWriter {
781 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
782 let SinkWriterParam {
783 extra_partition_col_idx,
784 actor_id,
785 sink_id,
786 sink_name,
787 ..
788 } = writer_param;
789 let metrics_labels = [
790 &actor_id.to_string(),
791 &sink_id.to_string(),
792 sink_name.as_str(),
793 ];
794
795 let write_qps = GLOBAL_SINK_METRICS
797 .iceberg_write_qps
798 .with_guarded_label_values(&metrics_labels);
799 let write_latency = GLOBAL_SINK_METRICS
800 .iceberg_write_latency
801 .with_guarded_label_values(&metrics_labels);
802 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
805 .iceberg_rolling_unflushed_data_file
806 .with_guarded_label_values(&metrics_labels);
807 let write_bytes = GLOBAL_SINK_METRICS
808 .iceberg_write_bytes
809 .with_guarded_label_values(&metrics_labels);
810
811 let schema = table.metadata().current_schema();
812 let partition_spec = table.metadata().default_partition_spec();
813
814 let unique_uuid_suffix = Uuid::now_v7();
816
817 let parquet_writer_properties = WriterProperties::builder()
818 .set_max_row_group_size(
819 writer_param
820 .streaming_config
821 .developer
822 .iceberg_sink_write_parquet_max_row_group_rows,
823 )
824 .build();
825
826 let parquet_writer_builder = ParquetWriterBuilder::new(
827 parquet_writer_properties,
828 schema.clone(),
829 table.file_io().clone(),
830 DefaultLocationGenerator::new(table.metadata().clone())
831 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
832 DefaultFileNameGenerator::new(
833 writer_param.actor_id.to_string(),
834 Some(unique_uuid_suffix.to_string()),
835 iceberg::spec::DataFileFormat::Parquet,
836 ),
837 );
838 let data_file_builder =
839 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
840 if partition_spec.fields().is_empty() {
841 let writer_builder = MonitoredGeneralWriterBuilder::new(
842 data_file_builder,
843 write_qps.clone(),
844 write_latency.clone(),
845 );
846 let inner_writer = Some(Box::new(
847 writer_builder
848 .clone()
849 .build()
850 .await
851 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
852 ) as Box<dyn IcebergWriter>);
853 Ok(Self {
854 arrow_schema: Arc::new(
855 schema_to_arrow_schema(table.metadata().current_schema())
856 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
857 ),
858 metrics: IcebergWriterMetrics {
859 _write_qps: write_qps,
860 _write_latency: write_latency,
861 write_bytes,
862 },
863 writer: IcebergWriterDispatch::NonPartitionAppendOnly {
864 writer: inner_writer,
865 writer_builder,
866 },
867 table,
868 project_idx_vec: {
869 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
870 ProjectIdxVec::Prepare(*extra_partition_col_idx)
871 } else {
872 ProjectIdxVec::None
873 }
874 },
875 })
876 } else {
877 let partition_builder = MonitoredGeneralWriterBuilder::new(
878 FanoutPartitionWriterBuilder::new(
879 data_file_builder,
880 partition_spec.clone(),
881 schema.clone(),
882 )
883 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
884 write_qps.clone(),
885 write_latency.clone(),
886 );
887 let inner_writer = Some(Box::new(
888 partition_builder
889 .clone()
890 .build()
891 .await
892 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
893 ) as Box<dyn IcebergWriter>);
894 Ok(Self {
895 arrow_schema: Arc::new(
896 schema_to_arrow_schema(table.metadata().current_schema())
897 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
898 ),
899 metrics: IcebergWriterMetrics {
900 _write_qps: write_qps,
901 _write_latency: write_latency,
902 write_bytes,
903 },
904 writer: IcebergWriterDispatch::PartitionAppendOnly {
905 writer: inner_writer,
906 writer_builder: partition_builder,
907 },
908 table,
909 project_idx_vec: {
910 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
911 ProjectIdxVec::Prepare(*extra_partition_col_idx)
912 } else {
913 ProjectIdxVec::None
914 }
915 },
916 })
917 }
918 }
919
920 pub async fn new_upsert(
921 table: Table,
922 unique_column_ids: Vec<usize>,
923 writer_param: &SinkWriterParam,
924 ) -> Result<Self> {
925 let SinkWriterParam {
926 extra_partition_col_idx,
927 actor_id,
928 sink_id,
929 sink_name,
930 ..
931 } = writer_param;
932 let metrics_labels = [
933 &actor_id.to_string(),
934 &sink_id.to_string(),
935 sink_name.as_str(),
936 ];
937 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
938
939 let write_qps = GLOBAL_SINK_METRICS
941 .iceberg_write_qps
942 .with_guarded_label_values(&metrics_labels);
943 let write_latency = GLOBAL_SINK_METRICS
944 .iceberg_write_latency
945 .with_guarded_label_values(&metrics_labels);
946 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
949 .iceberg_rolling_unflushed_data_file
950 .with_guarded_label_values(&metrics_labels);
951 let position_delete_cache_num = GLOBAL_SINK_METRICS
952 .iceberg_position_delete_cache_num
953 .with_guarded_label_values(&metrics_labels);
954 let write_bytes = GLOBAL_SINK_METRICS
955 .iceberg_write_bytes
956 .with_guarded_label_values(&metrics_labels);
957
958 let schema = table.metadata().current_schema();
960 let partition_spec = table.metadata().default_partition_spec();
961
962 let unique_uuid_suffix = Uuid::now_v7();
964
965 let parquet_writer_properties = WriterProperties::builder()
966 .set_max_row_group_size(
967 writer_param
968 .streaming_config
969 .developer
970 .iceberg_sink_write_parquet_max_row_group_rows,
971 )
972 .build();
973
974 let data_file_builder = {
975 let parquet_writer_builder = ParquetWriterBuilder::new(
976 parquet_writer_properties.clone(),
977 schema.clone(),
978 table.file_io().clone(),
979 DefaultLocationGenerator::new(table.metadata().clone())
980 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
981 DefaultFileNameGenerator::new(
982 writer_param.actor_id.to_string(),
983 Some(unique_uuid_suffix.to_string()),
984 iceberg::spec::DataFileFormat::Parquet,
985 ),
986 );
987 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id())
988 };
989 let position_delete_builder = {
990 let parquet_writer_builder = ParquetWriterBuilder::new(
991 parquet_writer_properties.clone(),
992 POSITION_DELETE_SCHEMA.clone(),
993 table.file_io().clone(),
994 DefaultLocationGenerator::new(table.metadata().clone())
995 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
996 DefaultFileNameGenerator::new(
997 writer_param.actor_id.to_string(),
998 Some(format!("pos-del-{}", unique_uuid_suffix)),
999 iceberg::spec::DataFileFormat::Parquet,
1000 ),
1001 );
1002 MonitoredPositionDeleteWriterBuilder::new(
1003 SortPositionDeleteWriterBuilder::new(
1004 parquet_writer_builder,
1005 writer_param
1006 .streaming_config
1007 .developer
1008 .iceberg_sink_positional_delete_cache_size,
1009 None,
1010 None,
1011 ),
1012 position_delete_cache_num,
1013 )
1014 };
1015 let equality_delete_builder = {
1016 let config = EqualityDeleteWriterConfig::new(
1017 unique_column_ids.clone(),
1018 table.metadata().current_schema().clone(),
1019 None,
1020 partition_spec.spec_id(),
1021 )
1022 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1023 let parquet_writer_builder = ParquetWriterBuilder::new(
1024 parquet_writer_properties.clone(),
1025 Arc::new(
1026 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1027 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1028 ),
1029 table.file_io().clone(),
1030 DefaultLocationGenerator::new(table.metadata().clone())
1031 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1032 DefaultFileNameGenerator::new(
1033 writer_param.actor_id.to_string(),
1034 Some(format!("eq-del-{}", unique_uuid_suffix)),
1035 iceberg::spec::DataFileFormat::Parquet,
1036 ),
1037 );
1038
1039 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder, config)
1040 };
1041 let delta_builder = EqualityDeltaWriterBuilder::new(
1042 data_file_builder,
1043 position_delete_builder,
1044 equality_delete_builder,
1045 unique_column_ids,
1046 );
1047 if partition_spec.fields().is_empty() {
1048 let writer_builder = MonitoredGeneralWriterBuilder::new(
1049 delta_builder,
1050 write_qps.clone(),
1051 write_latency.clone(),
1052 );
1053 let inner_writer = Some(Box::new(
1054 writer_builder
1055 .clone()
1056 .build()
1057 .await
1058 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1059 ) as Box<dyn IcebergWriter>);
1060 let original_arrow_schema = Arc::new(
1061 schema_to_arrow_schema(table.metadata().current_schema())
1062 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1063 );
1064 let schema_with_extra_op_column = {
1065 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1066 new_fields.push(Arc::new(ArrowField::new(
1067 "op".to_owned(),
1068 ArrowDataType::Int32,
1069 false,
1070 )));
1071 Arc::new(ArrowSchema::new(new_fields))
1072 };
1073 Ok(Self {
1074 arrow_schema: original_arrow_schema,
1075 metrics: IcebergWriterMetrics {
1076 _write_qps: write_qps,
1077 _write_latency: write_latency,
1078 write_bytes,
1079 },
1080 table,
1081 writer: IcebergWriterDispatch::NonpartitionUpsert {
1082 writer: inner_writer,
1083 writer_builder,
1084 arrow_schema_with_op_column: schema_with_extra_op_column,
1085 },
1086 project_idx_vec: {
1087 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1088 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1089 } else {
1090 ProjectIdxVec::None
1091 }
1092 },
1093 })
1094 } else {
1095 let original_arrow_schema = Arc::new(
1096 schema_to_arrow_schema(table.metadata().current_schema())
1097 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1098 );
1099 let schema_with_extra_op_column = {
1100 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1101 new_fields.push(Arc::new(ArrowField::new(
1102 "op".to_owned(),
1103 ArrowDataType::Int32,
1104 false,
1105 )));
1106 Arc::new(ArrowSchema::new(new_fields))
1107 };
1108 let partition_builder = MonitoredGeneralWriterBuilder::new(
1109 FanoutPartitionWriterBuilder::new_with_custom_schema(
1110 delta_builder,
1111 schema_with_extra_op_column.clone(),
1112 partition_spec.clone(),
1113 table.metadata().current_schema().clone(),
1114 ),
1115 write_qps.clone(),
1116 write_latency.clone(),
1117 );
1118 let inner_writer = Some(Box::new(
1119 partition_builder
1120 .clone()
1121 .build()
1122 .await
1123 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1124 ) as Box<dyn IcebergWriter>);
1125 Ok(Self {
1126 arrow_schema: original_arrow_schema,
1127 metrics: IcebergWriterMetrics {
1128 _write_qps: write_qps,
1129 _write_latency: write_latency,
1130 write_bytes,
1131 },
1132 table,
1133 writer: IcebergWriterDispatch::PartitionUpsert {
1134 writer: inner_writer,
1135 writer_builder: partition_builder,
1136 arrow_schema_with_op_column: schema_with_extra_op_column,
1137 },
1138 project_idx_vec: {
1139 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1140 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1141 } else {
1142 ProjectIdxVec::None
1143 }
1144 },
1145 })
1146 }
1147 }
1148}
1149
1150#[async_trait]
1151impl SinkWriter for IcebergSinkWriter {
1152 type CommitMetadata = Option<SinkMetadata>;
1153
1154 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1156 Ok(())
1158 }
1159
1160 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1162 match &mut self.writer {
1164 IcebergWriterDispatch::PartitionAppendOnly {
1165 writer,
1166 writer_builder,
1167 } => {
1168 if writer.is_none() {
1169 *writer = Some(Box::new(
1170 writer_builder
1171 .clone()
1172 .build()
1173 .await
1174 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1175 ));
1176 }
1177 }
1178 IcebergWriterDispatch::NonPartitionAppendOnly {
1179 writer,
1180 writer_builder,
1181 } => {
1182 if writer.is_none() {
1183 *writer = Some(Box::new(
1184 writer_builder
1185 .clone()
1186 .build()
1187 .await
1188 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1189 ));
1190 }
1191 }
1192 IcebergWriterDispatch::PartitionUpsert {
1193 writer,
1194 writer_builder,
1195 ..
1196 } => {
1197 if writer.is_none() {
1198 *writer = Some(Box::new(
1199 writer_builder
1200 .clone()
1201 .build()
1202 .await
1203 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1204 ));
1205 }
1206 }
1207 IcebergWriterDispatch::NonpartitionUpsert {
1208 writer,
1209 writer_builder,
1210 ..
1211 } => {
1212 if writer.is_none() {
1213 *writer = Some(Box::new(
1214 writer_builder
1215 .clone()
1216 .build()
1217 .await
1218 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1219 ));
1220 }
1221 }
1222 };
1223
1224 let (mut chunk, ops) = chunk.compact_vis().into_parts();
1226 match &self.project_idx_vec {
1227 ProjectIdxVec::None => {}
1228 ProjectIdxVec::Prepare(idx) => {
1229 if *idx >= chunk.columns().len() {
1230 return Err(SinkError::Iceberg(anyhow!(
1231 "invalid extra partition column index {}",
1232 idx
1233 )));
1234 }
1235 let project_idx_vec = (0..*idx)
1236 .chain(*idx + 1..chunk.columns().len())
1237 .collect_vec();
1238 chunk = chunk.project(&project_idx_vec);
1239 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1240 }
1241 ProjectIdxVec::Done(idx_vec) => {
1242 chunk = chunk.project(idx_vec);
1243 }
1244 }
1245 if ops.is_empty() {
1246 return Ok(());
1247 }
1248 let write_batch_size = chunk.estimated_heap_size();
1249 let batch = match &self.writer {
1250 IcebergWriterDispatch::PartitionAppendOnly { .. }
1251 | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1252 let filters =
1254 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1255 chunk.set_visibility(filters);
1256 IcebergArrowConvert
1257 .to_record_batch(self.arrow_schema.clone(), &chunk.compact_vis())
1258 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1259 }
1260 IcebergWriterDispatch::PartitionUpsert {
1261 arrow_schema_with_op_column,
1262 ..
1263 }
1264 | IcebergWriterDispatch::NonpartitionUpsert {
1265 arrow_schema_with_op_column,
1266 ..
1267 } => {
1268 let chunk = IcebergArrowConvert
1269 .to_record_batch(self.arrow_schema.clone(), &chunk)
1270 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1271 let ops = Arc::new(Int32Array::from(
1272 ops.iter()
1273 .map(|op| match op {
1274 Op::UpdateInsert | Op::Insert => INSERT_OP,
1275 Op::UpdateDelete | Op::Delete => DELETE_OP,
1276 })
1277 .collect_vec(),
1278 ));
1279 let mut columns = chunk.columns().to_vec();
1280 columns.push(ops);
1281 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1282 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1283 }
1284 };
1285
1286 let writer = self.writer.get_writer().unwrap();
1287 writer
1288 .write(batch)
1289 .instrument_await("iceberg_write")
1290 .await
1291 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1292 self.metrics.write_bytes.inc_by(write_batch_size as _);
1293 Ok(())
1294 }
1295
1296 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1299 if !is_checkpoint {
1301 return Ok(None);
1302 }
1303
1304 let close_result = match &mut self.writer {
1305 IcebergWriterDispatch::PartitionAppendOnly {
1306 writer,
1307 writer_builder,
1308 } => {
1309 let close_result = match writer.take() {
1310 Some(mut writer) => {
1311 Some(writer.close().instrument_await("iceberg_close").await)
1312 }
1313 _ => None,
1314 };
1315 match writer_builder.clone().build().await {
1316 Ok(new_writer) => {
1317 *writer = Some(Box::new(new_writer));
1318 }
1319 _ => {
1320 warn!("Failed to build new writer after close");
1323 }
1324 }
1325 close_result
1326 }
1327 IcebergWriterDispatch::NonPartitionAppendOnly {
1328 writer,
1329 writer_builder,
1330 } => {
1331 let close_result = match writer.take() {
1332 Some(mut writer) => {
1333 Some(writer.close().instrument_await("iceberg_close").await)
1334 }
1335 _ => None,
1336 };
1337 match writer_builder.clone().build().await {
1338 Ok(new_writer) => {
1339 *writer = Some(Box::new(new_writer));
1340 }
1341 _ => {
1342 warn!("Failed to build new writer after close");
1345 }
1346 }
1347 close_result
1348 }
1349 IcebergWriterDispatch::PartitionUpsert {
1350 writer,
1351 writer_builder,
1352 ..
1353 } => {
1354 let close_result = match writer.take() {
1355 Some(mut writer) => {
1356 Some(writer.close().instrument_await("iceberg_close").await)
1357 }
1358 _ => None,
1359 };
1360 match writer_builder.clone().build().await {
1361 Ok(new_writer) => {
1362 *writer = Some(Box::new(new_writer));
1363 }
1364 _ => {
1365 warn!("Failed to build new writer after close");
1368 }
1369 }
1370 close_result
1371 }
1372 IcebergWriterDispatch::NonpartitionUpsert {
1373 writer,
1374 writer_builder,
1375 ..
1376 } => {
1377 let close_result = match writer.take() {
1378 Some(mut writer) => {
1379 Some(writer.close().instrument_await("iceberg_close").await)
1380 }
1381 _ => None,
1382 };
1383 match writer_builder.clone().build().await {
1384 Ok(new_writer) => {
1385 *writer = Some(Box::new(new_writer));
1386 }
1387 _ => {
1388 warn!("Failed to build new writer after close");
1391 }
1392 }
1393 close_result
1394 }
1395 };
1396
1397 match close_result {
1398 Some(Ok(result)) => {
1399 let version = self.table.metadata().format_version() as u8;
1400 let partition_type = self.table.metadata().default_partition_type();
1401 let data_files = result
1402 .into_iter()
1403 .map(|f| {
1404 SerializedDataFile::try_from(f, partition_type, version == 1)
1405 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1406 })
1407 .collect::<Result<Vec<_>>>()?;
1408 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1409 data_files,
1410 schema_id: self.table.metadata().current_schema_id(),
1411 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1412 })?))
1413 }
1414 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1415 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1416 }
1417 }
1418
1419 async fn abort(&mut self) -> Result<()> {
1421 Ok(())
1423 }
1424}
1425
1426const SCHEMA_ID: &str = "schema_id";
1427const PARTITION_SPEC_ID: &str = "partition_spec_id";
1428const DATA_FILES: &str = "data_files";
1429
1430#[derive(Default, Clone)]
1431struct IcebergCommitResult {
1432 schema_id: i32,
1433 partition_spec_id: i32,
1434 data_files: Vec<SerializedDataFile>,
1435}
1436
1437impl IcebergCommitResult {
1438 fn try_from(value: &SinkMetadata) -> Result<Self> {
1439 if let Some(Serialized(v)) = &value.metadata {
1440 let mut values = if let serde_json::Value::Object(v) =
1441 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1442 .context("Can't parse iceberg sink metadata")?
1443 {
1444 v
1445 } else {
1446 bail!("iceberg sink metadata should be an object");
1447 };
1448
1449 let schema_id;
1450 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1451 schema_id = value
1452 .as_u64()
1453 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1454 } else {
1455 bail!("iceberg sink metadata should have schema_id");
1456 }
1457
1458 let partition_spec_id;
1459 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1460 partition_spec_id = value
1461 .as_u64()
1462 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1463 } else {
1464 bail!("iceberg sink metadata should have partition_spec_id");
1465 }
1466
1467 let data_files: Vec<SerializedDataFile>;
1468 if let serde_json::Value::Array(values) = values
1469 .remove(DATA_FILES)
1470 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1471 {
1472 data_files = values
1473 .into_iter()
1474 .map(from_value::<SerializedDataFile>)
1475 .collect::<std::result::Result<_, _>>()
1476 .unwrap();
1477 } else {
1478 bail!("iceberg sink metadata should have data_files object");
1479 }
1480
1481 Ok(Self {
1482 schema_id: schema_id as i32,
1483 partition_spec_id: partition_spec_id as i32,
1484 data_files,
1485 })
1486 } else {
1487 bail!("Can't create iceberg sink write result from empty data!")
1488 }
1489 }
1490
1491 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1492 let mut values = if let serde_json::Value::Object(value) =
1493 serde_json::from_slice::<serde_json::Value>(&value)
1494 .context("Can't parse iceberg sink metadata")?
1495 {
1496 value
1497 } else {
1498 bail!("iceberg sink metadata should be an object");
1499 };
1500
1501 let schema_id;
1502 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1503 schema_id = value
1504 .as_u64()
1505 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1506 } else {
1507 bail!("iceberg sink metadata should have schema_id");
1508 }
1509
1510 let partition_spec_id;
1511 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1512 partition_spec_id = value
1513 .as_u64()
1514 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1515 } else {
1516 bail!("iceberg sink metadata should have partition_spec_id");
1517 }
1518
1519 let data_files: Vec<SerializedDataFile>;
1520 if let serde_json::Value::Array(values) = values
1521 .remove(DATA_FILES)
1522 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1523 {
1524 data_files = values
1525 .into_iter()
1526 .map(from_value::<SerializedDataFile>)
1527 .collect::<std::result::Result<_, _>>()
1528 .unwrap();
1529 } else {
1530 bail!("iceberg sink metadata should have data_files object");
1531 }
1532
1533 Ok(Self {
1534 schema_id: schema_id as i32,
1535 partition_spec_id: partition_spec_id as i32,
1536 data_files,
1537 })
1538 }
1539}
1540
1541impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1542 type Error = SinkError;
1543
1544 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1545 let json_data_files = serde_json::Value::Array(
1546 value
1547 .data_files
1548 .iter()
1549 .map(serde_json::to_value)
1550 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1551 .context("Can't serialize data files to json")?,
1552 );
1553 let json_value = serde_json::Value::Object(
1554 vec![
1555 (
1556 SCHEMA_ID.to_owned(),
1557 serde_json::Value::Number(value.schema_id.into()),
1558 ),
1559 (
1560 PARTITION_SPEC_ID.to_owned(),
1561 serde_json::Value::Number(value.partition_spec_id.into()),
1562 ),
1563 (DATA_FILES.to_owned(), json_data_files),
1564 ]
1565 .into_iter()
1566 .collect(),
1567 );
1568 Ok(SinkMetadata {
1569 metadata: Some(Serialized(SerializedMetadata {
1570 metadata: serde_json::to_vec(&json_value)
1571 .context("Can't serialize iceberg sink metadata")?,
1572 })),
1573 })
1574 }
1575}
1576
1577impl TryFrom<IcebergCommitResult> for Vec<u8> {
1578 type Error = SinkError;
1579
1580 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1581 let json_data_files = serde_json::Value::Array(
1582 value
1583 .data_files
1584 .iter()
1585 .map(serde_json::to_value)
1586 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1587 .context("Can't serialize data files to json")?,
1588 );
1589 let json_value = serde_json::Value::Object(
1590 vec![
1591 (
1592 SCHEMA_ID.to_owned(),
1593 serde_json::Value::Number(value.schema_id.into()),
1594 ),
1595 (
1596 PARTITION_SPEC_ID.to_owned(),
1597 serde_json::Value::Number(value.partition_spec_id.into()),
1598 ),
1599 (DATA_FILES.to_owned(), json_data_files),
1600 ]
1601 .into_iter()
1602 .collect(),
1603 );
1604 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1605 }
1606}
1607pub struct IcebergSinkCommitter {
1608 catalog: Arc<dyn Catalog>,
1609 table: Table,
1610 pub last_commit_epoch: u64,
1611 pub(crate) is_exactly_once: bool,
1612 pub(crate) sink_id: u32,
1613 pub(crate) config: IcebergConfig,
1614 pub(crate) param: SinkParam,
1615 pub(crate) db: DatabaseConnection,
1616 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1617 commit_retry_num: u32,
1618 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1619}
1620
1621impl IcebergSinkCommitter {
1622 async fn reload_table(
1625 catalog: &dyn Catalog,
1626 table_ident: &TableIdent,
1627 schema_id: i32,
1628 partition_spec_id: i32,
1629 ) -> Result<Table> {
1630 let table = catalog
1631 .load_table(table_ident)
1632 .await
1633 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1634 if table.metadata().current_schema_id() != schema_id {
1635 return Err(SinkError::Iceberg(anyhow!(
1636 "Schema evolution not supported, expect schema id {}, but got {}",
1637 schema_id,
1638 table.metadata().current_schema_id()
1639 )));
1640 }
1641 if table.metadata().default_partition_spec_id() != partition_spec_id {
1642 return Err(SinkError::Iceberg(anyhow!(
1643 "Partition evolution not supported, expect partition spec id {}, but got {}",
1644 partition_spec_id,
1645 table.metadata().default_partition_spec_id()
1646 )));
1647 }
1648 Ok(table)
1649 }
1650}
1651
1652#[async_trait::async_trait]
1653impl SinkCommitCoordinator for IcebergSinkCommitter {
1654 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1655 if self.is_exactly_once {
1656 self.committed_epoch_subscriber = Some(subscriber);
1657 tracing::info!(
1658 "Sink id = {}: iceberg sink coordinator initing.",
1659 self.param.sink_id.sink_id()
1660 );
1661 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1662 let ordered_metadata_list_by_end_epoch =
1663 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1664
1665 let mut last_recommit_epoch = 0;
1666 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1667 ordered_metadata_list_by_end_epoch
1668 {
1669 let write_results_bytes = deserialize_metadata(sealized_bytes);
1670 let mut write_results = vec![];
1671
1672 for each in write_results_bytes {
1673 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1674 write_results.push(write_result);
1675 }
1676
1677 match (
1678 committed,
1679 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1680 .await?,
1681 ) {
1682 (true, _) => {
1683 tracing::info!(
1684 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1685 self.param.sink_id.sink_id()
1686 );
1687 }
1688 (false, true) => {
1689 tracing::info!(
1691 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1692 self.param.sink_id.sink_id()
1693 );
1694 mark_row_is_committed_by_sink_id_and_end_epoch(
1695 &self.db,
1696 self.sink_id,
1697 end_epoch,
1698 )
1699 .await?;
1700 }
1701 (false, false) => {
1702 tracing::info!(
1703 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1704 self.param.sink_id.sink_id()
1705 );
1706 self.re_commit(end_epoch, write_results, snapshot_id)
1707 .await?;
1708 }
1709 }
1710
1711 last_recommit_epoch = end_epoch;
1712 }
1713 tracing::info!(
1714 "Sink id = {}: iceberg commit coordinator inited.",
1715 self.param.sink_id.sink_id()
1716 );
1717 return Ok(Some(last_recommit_epoch));
1718 } else {
1719 tracing::info!(
1720 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1721 self.param.sink_id.sink_id()
1722 );
1723 return Ok(None);
1724 }
1725 }
1726
1727 tracing::info!("Iceberg commit coordinator inited.");
1728 return Ok(None);
1729 }
1730
1731 async fn commit(
1732 &mut self,
1733 epoch: u64,
1734 metadata: Vec<SinkMetadata>,
1735 add_columns: Option<Vec<Field>>,
1736 ) -> Result<()> {
1737 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1738 if let Some(add_columns) = add_columns {
1739 return Err(anyhow!(
1740 "Iceberg sink not support add columns, but got: {:?}",
1741 add_columns
1742 )
1743 .into());
1744 }
1745
1746 let write_results: Vec<IcebergCommitResult> = metadata
1747 .iter()
1748 .map(IcebergCommitResult::try_from)
1749 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1750
1751 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1753 tracing::debug!(?epoch, "no data to commit");
1754 return Ok(());
1755 }
1756
1757 if write_results
1759 .iter()
1760 .any(|r| r.schema_id != write_results[0].schema_id)
1761 || write_results
1762 .iter()
1763 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1764 {
1765 return Err(SinkError::Iceberg(anyhow!(
1766 "schema_id and partition_spec_id should be the same in all write results"
1767 )));
1768 }
1769
1770 self.wait_for_snapshot_limit().await?;
1772
1773 if self.is_exactly_once {
1774 assert!(self.committed_epoch_subscriber.is_some());
1775 match self.committed_epoch_subscriber.clone() {
1776 Some(committed_epoch_subscriber) => {
1777 let (committed_epoch, mut rw_futures_utilrx) =
1779 committed_epoch_subscriber(self.param.sink_id).await?;
1780 if committed_epoch >= epoch {
1782 self.commit_iceberg_inner(epoch, write_results, None)
1783 .await?;
1784 } else {
1785 tracing::info!(
1786 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1787 committed_epoch,
1788 epoch
1789 );
1790 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1791 tracing::info!(
1792 "Received next committed epoch: {}",
1793 next_committed_epoch
1794 );
1795 if next_committed_epoch >= epoch {
1797 self.commit_iceberg_inner(epoch, write_results, None)
1798 .await?;
1799 break;
1800 }
1801 }
1802 }
1803 }
1804 None => unreachable!(
1805 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1806 ),
1807 }
1808 } else {
1809 self.commit_iceberg_inner(epoch, write_results, None)
1810 .await?;
1811 }
1812
1813 Ok(())
1814 }
1815}
1816
1817impl IcebergSinkCommitter {
1819 async fn re_commit(
1820 &mut self,
1821 epoch: u64,
1822 write_results: Vec<IcebergCommitResult>,
1823 snapshot_id: i64,
1824 ) -> Result<()> {
1825 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1826
1827 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1829 tracing::debug!(?epoch, "no data to commit");
1830 return Ok(());
1831 }
1832 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1833 .await?;
1834 Ok(())
1835 }
1836
1837 async fn commit_iceberg_inner(
1838 &mut self,
1839 epoch: u64,
1840 write_results: Vec<IcebergCommitResult>,
1841 snapshot_id: Option<i64>,
1842 ) -> Result<()> {
1843 let is_first_commit = snapshot_id.is_none();
1847 self.last_commit_epoch = epoch;
1848 let expect_schema_id = write_results[0].schema_id;
1849 let expect_partition_spec_id = write_results[0].partition_spec_id;
1850
1851 self.table = Self::reload_table(
1853 self.catalog.as_ref(),
1854 self.table.identifier(),
1855 expect_schema_id,
1856 expect_partition_spec_id,
1857 )
1858 .await?;
1859 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1860 return Err(SinkError::Iceberg(anyhow!(
1861 "Can't find schema by id {}",
1862 expect_schema_id
1863 )));
1864 };
1865 let Some(partition_spec) = self
1866 .table
1867 .metadata()
1868 .partition_spec_by_id(expect_partition_spec_id)
1869 else {
1870 return Err(SinkError::Iceberg(anyhow!(
1871 "Can't find partition spec by id {}",
1872 expect_partition_spec_id
1873 )));
1874 };
1875 let partition_type = partition_spec
1876 .as_ref()
1877 .clone()
1878 .partition_type(schema)
1879 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1880
1881 let txn = Transaction::new(&self.table);
1882 let snapshot_id = match snapshot_id {
1884 Some(previous_snapshot_id) => previous_snapshot_id,
1885 None => txn.generate_unique_snapshot_id(),
1886 };
1887 if self.is_exactly_once && is_first_commit {
1888 let mut pre_commit_metadata_bytes = Vec::new();
1890 for each_parallelism_write_result in write_results.clone() {
1891 let each_parallelism_write_result_bytes: Vec<u8> =
1892 each_parallelism_write_result.try_into()?;
1893 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1894 }
1895
1896 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1897
1898 persist_pre_commit_metadata(
1899 self.sink_id,
1900 self.db.clone(),
1901 self.last_commit_epoch,
1902 epoch,
1903 pre_commit_metadata_bytes,
1904 snapshot_id,
1905 )
1906 .await?;
1907 }
1908
1909 let data_files = write_results
1910 .into_iter()
1911 .flat_map(|r| {
1912 r.data_files.into_iter().map(|f| {
1913 f.try_into(expect_partition_spec_id, &partition_type, schema)
1914 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1915 })
1916 })
1917 .collect::<Result<Vec<DataFile>>>()?;
1918 let retry_strategy = ExponentialBackoff::from_millis(10)
1923 .max_delay(Duration::from_secs(60))
1924 .map(jitter)
1925 .take(self.commit_retry_num as usize);
1926 let catalog = self.catalog.clone();
1927 let table_ident = self.table.identifier().clone();
1928 let table = Retry::spawn(retry_strategy, || async {
1929 let table = Self::reload_table(
1930 catalog.as_ref(),
1931 &table_ident,
1932 expect_schema_id,
1933 expect_partition_spec_id,
1934 )
1935 .await?;
1936 let txn = Transaction::new(&table);
1937 let mut append_action = txn
1938 .fast_append(Some(snapshot_id), None, vec![])
1939 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1940 .with_to_branch(commit_branch(
1941 self.config.r#type.as_str(),
1942 self.config.write_mode.as_str(),
1943 ));
1944 append_action
1945 .add_data_files(data_files.clone())
1946 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1947 let tx = append_action.apply().await.map_err(|err| {
1948 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1949 SinkError::Iceberg(anyhow!(err))
1950 })?;
1951 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1952 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1953 SinkError::Iceberg(anyhow!(err))
1954 })
1955 })
1956 .await?;
1957 self.table = table;
1958
1959 let snapshot_num = self.table.metadata().snapshots().count();
1960 let catalog_name = self.config.common.catalog_name();
1961 let table_name = self.table.identifier().to_string();
1962 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
1963 GLOBAL_SINK_METRICS
1964 .iceberg_snapshot_num
1965 .with_guarded_label_values(&metrics_labels)
1966 .set(snapshot_num as i64);
1967
1968 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1969
1970 if self.is_exactly_once {
1971 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1972 tracing::info!(
1973 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1974 self.sink_id,
1975 epoch
1976 );
1977
1978 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1979 }
1980
1981 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1982 && self.config.enable_compaction
1983 && iceberg_compact_stat_sender
1984 .send(IcebergSinkCompactionUpdate {
1985 sink_id: SinkId::new(self.sink_id),
1986 compaction_interval: self.config.compaction_interval_sec(),
1987 force_compaction: false,
1988 })
1989 .is_err()
1990 {
1991 warn!("failed to send iceberg compaction stats");
1992 }
1993
1994 Ok(())
1995 }
1996
1997 async fn is_snapshot_id_in_iceberg(
2001 &self,
2002 iceberg_config: &IcebergConfig,
2003 snapshot_id: i64,
2004 ) -> Result<bool> {
2005 let table = iceberg_config.load_table().await?;
2006 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2007 Ok(true)
2008 } else {
2009 Ok(false)
2010 }
2011 }
2012
2013 fn count_snapshots_since_rewrite(&self) -> usize {
2016 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2017 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2018
2019 let mut count = 0;
2021 for snapshot in snapshots {
2022 let summary = snapshot.summary();
2024 match &summary.operation {
2025 Operation::Replace => {
2026 break;
2028 }
2029
2030 _ => {
2031 count += 1;
2033 }
2034 }
2035 }
2036
2037 count
2038 }
2039
2040 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2042 if !self.config.enable_compaction {
2043 return Ok(());
2044 }
2045
2046 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2047 loop {
2048 let current_count = self.count_snapshots_since_rewrite();
2049
2050 if current_count < max_snapshots {
2051 tracing::info!(
2052 "Snapshot count check passed: {} < {}",
2053 current_count,
2054 max_snapshots
2055 );
2056 break;
2057 }
2058
2059 tracing::info!(
2060 "Snapshot count {} exceeds limit {}, waiting...",
2061 current_count,
2062 max_snapshots
2063 );
2064
2065 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2066 && iceberg_compact_stat_sender
2067 .send(IcebergSinkCompactionUpdate {
2068 sink_id: SinkId::new(self.sink_id),
2069 compaction_interval: self.config.compaction_interval_sec(),
2070 force_compaction: true,
2071 })
2072 .is_err()
2073 {
2074 tracing::warn!("failed to send iceberg compaction stats");
2075 }
2076
2077 tokio::time::sleep(Duration::from_secs(30)).await;
2079
2080 self.table = self.config.load_table().await?;
2082 }
2083 }
2084 Ok(())
2085 }
2086}
2087
2088const MAP_KEY: &str = "key";
2089const MAP_VALUE: &str = "value";
2090
2091fn get_fields<'a>(
2092 our_field_type: &'a risingwave_common::types::DataType,
2093 data_type: &ArrowDataType,
2094 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2095) -> Option<ArrowFields> {
2096 match data_type {
2097 ArrowDataType::Struct(fields) => {
2098 match our_field_type {
2099 risingwave_common::types::DataType::Struct(struct_fields) => {
2100 struct_fields.iter().for_each(|(name, data_type)| {
2101 let res = schema_fields.insert(name, data_type);
2102 assert!(res.is_none())
2104 });
2105 }
2106 risingwave_common::types::DataType::Map(map_fields) => {
2107 schema_fields.insert(MAP_KEY, map_fields.key());
2108 schema_fields.insert(MAP_VALUE, map_fields.value());
2109 }
2110 risingwave_common::types::DataType::List(list) => {
2111 list.elem()
2112 .as_struct()
2113 .iter()
2114 .for_each(|(name, data_type)| {
2115 let res = schema_fields.insert(name, data_type);
2116 assert!(res.is_none())
2118 });
2119 }
2120 _ => {}
2121 };
2122 Some(fields.clone())
2123 }
2124 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2125 get_fields(our_field_type, field.data_type(), schema_fields)
2126 }
2127 _ => None, }
2129}
2130
2131fn check_compatibility(
2132 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2133 fields: &ArrowFields,
2134) -> anyhow::Result<bool> {
2135 for arrow_field in fields {
2136 let our_field_type = schema_fields
2137 .get(arrow_field.name().as_str())
2138 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2139
2140 let converted_arrow_data_type = IcebergArrowConvert
2142 .to_arrow_field("", our_field_type)
2143 .map_err(|e| anyhow!(e))?
2144 .data_type()
2145 .clone();
2146
2147 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2148 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2149 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2150 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2151 (ArrowDataType::List(_), ArrowDataType::List(field))
2152 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2153 let mut schema_fields = HashMap::new();
2154 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2155 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2156 }
2157 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2159 let mut schema_fields = HashMap::new();
2160 our_field_type
2161 .as_struct()
2162 .iter()
2163 .for_each(|(name, data_type)| {
2164 let res = schema_fields.insert(name, data_type);
2165 assert!(res.is_none())
2167 });
2168 check_compatibility(schema_fields, fields)?
2169 }
2170 (left, right) => left.equals_datatype(right),
2178 };
2179 if !compatible {
2180 bail!(
2181 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2182 arrow_field.name(),
2183 converted_arrow_data_type,
2184 arrow_field.data_type()
2185 );
2186 }
2187 }
2188 Ok(true)
2189}
2190
2191pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2193 if rw_schema.fields.len() != arrow_schema.fields().len() {
2194 bail!(
2195 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2196 rw_schema.fields.len(),
2197 arrow_schema.fields.len()
2198 );
2199 }
2200
2201 let mut schema_fields = HashMap::new();
2202 rw_schema.fields.iter().for_each(|field| {
2203 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2204 assert!(res.is_none())
2206 });
2207
2208 check_compatibility(schema_fields, &arrow_schema.fields)?;
2209 Ok(())
2210}
2211
2212pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2213 serde_json::to_vec(&metadata).unwrap()
2214}
2215
2216pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2217 serde_json::from_slice(&bytes).unwrap()
2218}
2219
2220pub fn parse_partition_by_exprs(
2221 expr: String,
2222) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2223 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2225 if !re.is_match(&expr) {
2226 bail!(format!(
2227 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2228 expr
2229 ))
2230 }
2231 let caps = re.captures_iter(&expr);
2232
2233 let mut partition_columns = vec![];
2234
2235 for mat in caps {
2236 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2237 (&mat["transform"], Transform::Identity)
2238 } else {
2239 let mut func = mat["transform"].to_owned();
2240 if func == "bucket" || func == "truncate" {
2241 let n = &mat
2242 .name("n")
2243 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2244 .as_str();
2245 func = format!("{func}[{n}]");
2246 }
2247 (
2248 &mat["field"],
2249 Transform::from_str(&func)
2250 .with_context(|| format!("invalid transform function {}", func))?,
2251 )
2252 };
2253 partition_columns.push((column.to_owned(), transform));
2254 }
2255 Ok(partition_columns)
2256}
2257
2258pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2259 if should_enable_iceberg_cow(sink_type, write_mode) {
2260 ICEBERG_COW_BRANCH.to_owned()
2261 } else {
2262 MAIN_BRANCH.to_owned()
2263 }
2264}
2265
2266pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2267 sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2268}
2269
2270#[cfg(test)]
2271mod test {
2272 use std::collections::BTreeMap;
2273
2274 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2275 use risingwave_common::types::{DataType, MapType, StructType};
2276
2277 use crate::connector_common::{IcebergCommon, IcebergTableIdentifier};
2278 use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
2279 use crate::sink::iceberg::{
2280 COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2281 ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, MAX_SNAPSHOTS_NUM,
2282 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2283 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2284 };
2285
2286 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2289 fn test_compatible_arrow_schema() {
2290 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2291
2292 use super::*;
2293 let risingwave_schema = Schema::new(vec![
2294 Field::with_name(DataType::Int32, "a"),
2295 Field::with_name(DataType::Int32, "b"),
2296 Field::with_name(DataType::Int32, "c"),
2297 ]);
2298 let arrow_schema = ArrowSchema::new(vec![
2299 ArrowField::new("a", ArrowDataType::Int32, false),
2300 ArrowField::new("b", ArrowDataType::Int32, false),
2301 ArrowField::new("c", ArrowDataType::Int32, false),
2302 ]);
2303
2304 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2305
2306 let risingwave_schema = Schema::new(vec![
2307 Field::with_name(DataType::Int32, "d"),
2308 Field::with_name(DataType::Int32, "c"),
2309 Field::with_name(DataType::Int32, "a"),
2310 Field::with_name(DataType::Int32, "b"),
2311 ]);
2312 let arrow_schema = ArrowSchema::new(vec![
2313 ArrowField::new("a", ArrowDataType::Int32, false),
2314 ArrowField::new("b", ArrowDataType::Int32, false),
2315 ArrowField::new("d", ArrowDataType::Int32, false),
2316 ArrowField::new("c", ArrowDataType::Int32, false),
2317 ]);
2318 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2319
2320 let risingwave_schema = Schema::new(vec![
2321 Field::with_name(
2322 DataType::Struct(StructType::new(vec![
2323 ("a1", DataType::Int32),
2324 (
2325 "a2",
2326 DataType::Struct(StructType::new(vec![
2327 ("a21", DataType::Bytea),
2328 (
2329 "a22",
2330 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2331 ),
2332 ])),
2333 ),
2334 ])),
2335 "a",
2336 ),
2337 Field::with_name(
2338 DataType::list(DataType::Struct(StructType::new(vec![
2339 ("b1", DataType::Int32),
2340 ("b2", DataType::Bytea),
2341 (
2342 "b3",
2343 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2344 ),
2345 ]))),
2346 "b",
2347 ),
2348 Field::with_name(
2349 DataType::Map(MapType::from_kv(
2350 DataType::Varchar,
2351 DataType::list(DataType::Struct(StructType::new([
2352 ("c1", DataType::Int32),
2353 ("c2", DataType::Bytea),
2354 (
2355 "c3",
2356 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2357 ),
2358 ]))),
2359 )),
2360 "c",
2361 ),
2362 ]);
2363 let arrow_schema = ArrowSchema::new(vec![
2364 ArrowField::new(
2365 "a",
2366 ArrowDataType::Struct(ArrowFields::from(vec![
2367 ArrowField::new("a1", ArrowDataType::Int32, false),
2368 ArrowField::new(
2369 "a2",
2370 ArrowDataType::Struct(ArrowFields::from(vec![
2371 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2372 ArrowField::new_map(
2373 "a22",
2374 "entries",
2375 ArrowFieldRef::new(ArrowField::new(
2376 "key",
2377 ArrowDataType::Utf8,
2378 false,
2379 )),
2380 ArrowFieldRef::new(ArrowField::new(
2381 "value",
2382 ArrowDataType::Utf8,
2383 false,
2384 )),
2385 false,
2386 false,
2387 ),
2388 ])),
2389 false,
2390 ),
2391 ])),
2392 false,
2393 ),
2394 ArrowField::new(
2395 "b",
2396 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2397 ArrowDataType::Struct(ArrowFields::from(vec![
2398 ArrowField::new("b1", ArrowDataType::Int32, false),
2399 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2400 ArrowField::new_map(
2401 "b3",
2402 "entries",
2403 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2404 ArrowFieldRef::new(ArrowField::new(
2405 "value",
2406 ArrowDataType::Utf8,
2407 false,
2408 )),
2409 false,
2410 false,
2411 ),
2412 ])),
2413 false,
2414 ))),
2415 false,
2416 ),
2417 ArrowField::new_map(
2418 "c",
2419 "entries",
2420 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2421 ArrowFieldRef::new(ArrowField::new(
2422 "value",
2423 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2424 ArrowDataType::Struct(ArrowFields::from(vec![
2425 ArrowField::new("c1", ArrowDataType::Int32, false),
2426 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2427 ArrowField::new_map(
2428 "c3",
2429 "entries",
2430 ArrowFieldRef::new(ArrowField::new(
2431 "key",
2432 ArrowDataType::Utf8,
2433 false,
2434 )),
2435 ArrowFieldRef::new(ArrowField::new(
2436 "value",
2437 ArrowDataType::Utf8,
2438 false,
2439 )),
2440 false,
2441 false,
2442 ),
2443 ])),
2444 false,
2445 ))),
2446 false,
2447 )),
2448 false,
2449 false,
2450 ),
2451 ]);
2452 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2453 }
2454
2455 #[test]
2456 fn test_parse_iceberg_config() {
2457 let values = [
2458 ("connector", "iceberg"),
2459 ("type", "upsert"),
2460 ("primary_key", "v1"),
2461 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2462 ("warehouse.path", "s3://iceberg"),
2463 ("s3.endpoint", "http://127.0.0.1:9301"),
2464 ("s3.access.key", "hummockadmin"),
2465 ("s3.secret.key", "hummockadmin"),
2466 ("s3.path.style.access", "true"),
2467 ("s3.region", "us-east-1"),
2468 ("catalog.type", "jdbc"),
2469 ("catalog.name", "demo"),
2470 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2471 ("catalog.jdbc.user", "admin"),
2472 ("catalog.jdbc.password", "123456"),
2473 ("database.name", "demo_db"),
2474 ("table.name", "demo_table"),
2475 ("enable_compaction", "true"),
2476 ("compaction_interval_sec", "1800"),
2477 ("enable_snapshot_expiration", "true"),
2478 ]
2479 .into_iter()
2480 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2481 .collect();
2482
2483 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2484
2485 let expected_iceberg_config = IcebergConfig {
2486 common: IcebergCommon {
2487 warehouse_path: Some("s3://iceberg".to_owned()),
2488 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2489 region: Some("us-east-1".to_owned()),
2490 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2491 access_key: Some("hummockadmin".to_owned()),
2492 secret_key: Some("hummockadmin".to_owned()),
2493 gcs_credential: None,
2494 catalog_type: Some("jdbc".to_owned()),
2495 glue_id: None,
2496 catalog_name: Some("demo".to_owned()),
2497 path_style_access: Some(true),
2498 credential: None,
2499 oauth2_server_uri: None,
2500 scope: None,
2501 token: None,
2502 enable_config_load: None,
2503 rest_signing_name: None,
2504 rest_signing_region: None,
2505 rest_sigv4_enabled: None,
2506 hosted_catalog: None,
2507 azblob_account_name: None,
2508 azblob_account_key: None,
2509 azblob_endpoint_url: None,
2510 header: None,
2511 adlsgen2_account_name: None,
2512 adlsgen2_account_key: None,
2513 adlsgen2_endpoint: None,
2514 vended_credentials: None,
2515 },
2516 table: IcebergTableIdentifier {
2517 database_name: Some("demo_db".to_owned()),
2518 table_name: "demo_table".to_owned(),
2519 },
2520 r#type: "upsert".to_owned(),
2521 force_append_only: false,
2522 primary_key: Some(vec!["v1".to_owned()]),
2523 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2524 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2525 .into_iter()
2526 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2527 .collect(),
2528 commit_checkpoint_interval: ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
2529 create_table_if_not_exists: false,
2530 is_exactly_once: Some(true),
2531 commit_retry_num: 8,
2532 enable_compaction: true,
2533 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2534 enable_snapshot_expiration: true,
2535 write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2536 snapshot_expiration_max_age_millis: None,
2537 snapshot_expiration_retain_last: None,
2538 snapshot_expiration_clear_expired_files: true,
2539 snapshot_expiration_clear_expired_meta_data: true,
2540 max_snapshots_num_before_compaction: None,
2541 };
2542
2543 assert_eq!(iceberg_config, expected_iceberg_config);
2544
2545 assert_eq!(
2546 &iceberg_config.full_table_name().unwrap().to_string(),
2547 "demo_db.demo_table"
2548 );
2549 }
2550
2551 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2552 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2553
2554 let table = iceberg_config.load_table().await.unwrap();
2555
2556 println!("{:?}", table.identifier());
2557 }
2558
2559 #[tokio::test]
2560 #[ignore]
2561 async fn test_storage_catalog() {
2562 let values = [
2563 ("connector", "iceberg"),
2564 ("type", "append-only"),
2565 ("force_append_only", "true"),
2566 ("s3.endpoint", "http://127.0.0.1:9301"),
2567 ("s3.access.key", "hummockadmin"),
2568 ("s3.secret.key", "hummockadmin"),
2569 ("s3.region", "us-east-1"),
2570 ("s3.path.style.access", "true"),
2571 ("catalog.name", "demo"),
2572 ("catalog.type", "storage"),
2573 ("warehouse.path", "s3://icebergdata/demo"),
2574 ("database.name", "s1"),
2575 ("table.name", "t1"),
2576 ]
2577 .into_iter()
2578 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2579 .collect();
2580
2581 test_create_catalog(values).await;
2582 }
2583
2584 #[tokio::test]
2585 #[ignore]
2586 async fn test_rest_catalog() {
2587 let values = [
2588 ("connector", "iceberg"),
2589 ("type", "append-only"),
2590 ("force_append_only", "true"),
2591 ("s3.endpoint", "http://127.0.0.1:9301"),
2592 ("s3.access.key", "hummockadmin"),
2593 ("s3.secret.key", "hummockadmin"),
2594 ("s3.region", "us-east-1"),
2595 ("s3.path.style.access", "true"),
2596 ("catalog.name", "demo"),
2597 ("catalog.type", "rest"),
2598 ("catalog.uri", "http://192.168.167.4:8181"),
2599 ("warehouse.path", "s3://icebergdata/demo"),
2600 ("database.name", "s1"),
2601 ("table.name", "t1"),
2602 ]
2603 .into_iter()
2604 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2605 .collect();
2606
2607 test_create_catalog(values).await;
2608 }
2609
2610 #[tokio::test]
2611 #[ignore]
2612 async fn test_jdbc_catalog() {
2613 let values = [
2614 ("connector", "iceberg"),
2615 ("type", "append-only"),
2616 ("force_append_only", "true"),
2617 ("s3.endpoint", "http://127.0.0.1:9301"),
2618 ("s3.access.key", "hummockadmin"),
2619 ("s3.secret.key", "hummockadmin"),
2620 ("s3.region", "us-east-1"),
2621 ("s3.path.style.access", "true"),
2622 ("catalog.name", "demo"),
2623 ("catalog.type", "jdbc"),
2624 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2625 ("catalog.jdbc.user", "admin"),
2626 ("catalog.jdbc.password", "123456"),
2627 ("warehouse.path", "s3://icebergdata/demo"),
2628 ("database.name", "s1"),
2629 ("table.name", "t1"),
2630 ]
2631 .into_iter()
2632 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2633 .collect();
2634
2635 test_create_catalog(values).await;
2636 }
2637
2638 #[tokio::test]
2639 #[ignore]
2640 async fn test_hive_catalog() {
2641 let values = [
2642 ("connector", "iceberg"),
2643 ("type", "append-only"),
2644 ("force_append_only", "true"),
2645 ("s3.endpoint", "http://127.0.0.1:9301"),
2646 ("s3.access.key", "hummockadmin"),
2647 ("s3.secret.key", "hummockadmin"),
2648 ("s3.region", "us-east-1"),
2649 ("s3.path.style.access", "true"),
2650 ("catalog.name", "demo"),
2651 ("catalog.type", "hive"),
2652 ("catalog.uri", "thrift://localhost:9083"),
2653 ("warehouse.path", "s3://icebergdata/demo"),
2654 ("database.name", "s1"),
2655 ("table.name", "t1"),
2656 ]
2657 .into_iter()
2658 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2659 .collect();
2660
2661 test_create_catalog(values).await;
2662 }
2663
2664 #[test]
2665 fn test_config_constants_consistency() {
2666 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2669 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2670 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2671 assert_eq!(WRITE_MODE, "write_mode");
2672 assert_eq!(
2673 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2674 "snapshot_expiration_retain_last"
2675 );
2676 assert_eq!(
2677 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2678 "snapshot_expiration_max_age_millis"
2679 );
2680 assert_eq!(
2681 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2682 "snapshot_expiration_clear_expired_files"
2683 );
2684 assert_eq!(
2685 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2686 "snapshot_expiration_clear_expired_meta_data"
2687 );
2688 assert_eq!(MAX_SNAPSHOTS_NUM, "max_snapshots_num_before_compaction");
2689 }
2690}