1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, MAIN_BRANCH, Operation, SerializedDataFile, Transform, UnboundPartitionField,
30 UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43 DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59 Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
85use super::{
86 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112 "snapshot_expiration_clear_expired_meta_data";
113pub const MAX_SNAPSHOTS_NUM: &str = "max_snapshots_num_before_compaction";
114
115fn default_commit_retry_num() -> u32 {
116 8
117}
118
119fn default_iceberg_write_mode() -> String {
120 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
121}
122
123fn default_true() -> bool {
124 true
125}
126
127#[serde_as]
128#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
129pub struct IcebergConfig {
130 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
133 pub force_append_only: bool,
134
135 #[serde(flatten)]
136 common: IcebergCommon,
137
138 #[serde(
139 rename = "primary_key",
140 default,
141 deserialize_with = "deserialize_optional_string_seq_from_string"
142 )]
143 pub primary_key: Option<Vec<String>>,
144
145 #[serde(skip)]
147 pub java_catalog_props: HashMap<String, String>,
148
149 #[serde(default)]
150 pub partition_by: Option<String>,
151
152 #[serde(default = "default_commit_checkpoint_interval")]
154 #[serde_as(as = "DisplayFromStr")]
155 #[with_option(allow_alter_on_fly)]
156 pub commit_checkpoint_interval: u64,
157
158 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
159 pub create_table_if_not_exists: bool,
160
161 #[serde(default)]
163 #[serde_as(as = "Option<DisplayFromStr>")]
164 pub is_exactly_once: Option<bool>,
165 #[serde(default = "default_commit_retry_num")]
170 pub commit_retry_num: u32,
171
172 #[serde(
174 rename = "enable_compaction",
175 default,
176 deserialize_with = "deserialize_bool_from_string"
177 )]
178 #[with_option(allow_alter_on_fly)]
179 pub enable_compaction: bool,
180
181 #[serde(rename = "compaction_interval_sec", default)]
183 #[serde_as(as = "Option<DisplayFromStr>")]
184 #[with_option(allow_alter_on_fly)]
185 pub compaction_interval_sec: Option<u64>,
186
187 #[serde(
189 rename = "enable_snapshot_expiration",
190 default,
191 deserialize_with = "deserialize_bool_from_string"
192 )]
193 #[with_option(allow_alter_on_fly)]
194 pub enable_snapshot_expiration: bool,
195
196 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
198 pub write_mode: String,
199
200 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
203 #[serde_as(as = "Option<DisplayFromStr>")]
204 #[with_option(allow_alter_on_fly)]
205 pub snapshot_expiration_max_age_millis: Option<i64>,
206
207 #[serde(rename = "snapshot_expiration_retain_last", default)]
209 #[serde_as(as = "Option<DisplayFromStr>")]
210 #[with_option(allow_alter_on_fly)]
211 pub snapshot_expiration_retain_last: Option<i32>,
212
213 #[serde(
214 rename = "snapshot_expiration_clear_expired_files",
215 default = "default_true",
216 deserialize_with = "deserialize_bool_from_string"
217 )]
218 #[with_option(allow_alter_on_fly)]
219 pub snapshot_expiration_clear_expired_files: bool,
220
221 #[serde(
222 rename = "snapshot_expiration_clear_expired_meta_data",
223 default = "default_true",
224 deserialize_with = "deserialize_bool_from_string"
225 )]
226 #[with_option(allow_alter_on_fly)]
227 pub snapshot_expiration_clear_expired_meta_data: bool,
228
229 #[serde(rename = "max_snapshots_num_before_compaction", default)]
232 #[serde_as(as = "Option<DisplayFromStr>")]
233 #[with_option(allow_alter_on_fly)]
234 pub max_snapshots_num_before_compaction: Option<usize>,
235}
236
237impl EnforceSecret for IcebergConfig {
238 fn enforce_secret<'a>(
239 prop_iter: impl Iterator<Item = &'a str>,
240 ) -> crate::error::ConnectorResult<()> {
241 for prop in prop_iter {
242 IcebergCommon::enforce_one(prop)?;
243 }
244 Ok(())
245 }
246
247 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
248 IcebergCommon::enforce_one(prop)
249 }
250}
251
252impl IcebergConfig {
253 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
254 let mut config =
255 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
256 .map_err(|e| SinkError::Config(anyhow!(e)))?;
257
258 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
259 return Err(SinkError::Config(anyhow!(
260 "`{}` must be {}, or {}",
261 SINK_TYPE_OPTION,
262 SINK_TYPE_APPEND_ONLY,
263 SINK_TYPE_UPSERT
264 )));
265 }
266
267 if config.r#type == SINK_TYPE_UPSERT {
268 if let Some(primary_key) = &config.primary_key {
269 if primary_key.is_empty() {
270 return Err(SinkError::Config(anyhow!(
271 "`primary_key` must not be empty in {}",
272 SINK_TYPE_UPSERT
273 )));
274 }
275 } else {
276 return Err(SinkError::Config(anyhow!(
277 "Must set `primary_key` in {}",
278 SINK_TYPE_UPSERT
279 )));
280 }
281 }
282
283 config.java_catalog_props = values
285 .iter()
286 .filter(|(k, _v)| {
287 k.starts_with("catalog.")
288 && k != &"catalog.uri"
289 && k != &"catalog.type"
290 && k != &"catalog.name"
291 && k != &"catalog.header"
292 })
293 .map(|(k, v)| (k[8..].to_string(), v.clone()))
294 .collect();
295
296 if config.commit_checkpoint_interval == 0 {
297 return Err(SinkError::Config(anyhow!(
298 "`commit_checkpoint_interval` must be greater than 0"
299 )));
300 }
301
302 Ok(config)
303 }
304
305 pub fn catalog_type(&self) -> &str {
306 self.common.catalog_type()
307 }
308
309 pub async fn load_table(&self) -> Result<Table> {
310 self.common
311 .load_table(&self.java_catalog_props)
312 .await
313 .map_err(Into::into)
314 }
315
316 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
317 self.common
318 .create_catalog(&self.java_catalog_props)
319 .await
320 .map_err(Into::into)
321 }
322
323 pub fn full_table_name(&self) -> Result<TableIdent> {
324 self.common.full_table_name().map_err(Into::into)
325 }
326
327 pub fn catalog_name(&self) -> String {
328 self.common.catalog_name()
329 }
330
331 pub fn compaction_interval_sec(&self) -> u64 {
332 self.compaction_interval_sec.unwrap_or(3600)
334 }
335
336 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
339 self.snapshot_expiration_max_age_millis
340 .map(|max_age_millis| current_time_ms - max_age_millis)
341 }
342}
343
344pub struct IcebergSink {
345 config: IcebergConfig,
346 param: SinkParam,
347 unique_column_ids: Option<Vec<usize>>,
349}
350
351impl EnforceSecret for IcebergSink {
352 fn enforce_secret<'a>(
353 prop_iter: impl Iterator<Item = &'a str>,
354 ) -> crate::error::ConnectorResult<()> {
355 for prop in prop_iter {
356 IcebergConfig::enforce_one(prop)?;
357 }
358 Ok(())
359 }
360}
361
362impl TryFrom<SinkParam> for IcebergSink {
363 type Error = SinkError;
364
365 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
366 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
367 IcebergSink::new(config, param)
368 }
369}
370
371impl Debug for IcebergSink {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 f.debug_struct("IcebergSink")
374 .field("config", &self.config)
375 .finish()
376 }
377}
378
379impl IcebergSink {
380 async fn create_and_validate_table(&self) -> Result<Table> {
381 if self.config.create_table_if_not_exists {
382 self.create_table_if_not_exists().await?;
383 }
384
385 let table = self
386 .config
387 .load_table()
388 .await
389 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
390
391 let sink_schema = self.param.schema();
392 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
393 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
394
395 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
396 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
397
398 Ok(table)
399 }
400
401 async fn create_table_if_not_exists(&self) -> Result<()> {
402 let catalog = self.config.create_catalog().await?;
403 let namespace = if let Some(database_name) = &self.config.common.database_name {
404 let namespace = NamespaceIdent::new(database_name.clone());
405 if !catalog
406 .namespace_exists(&namespace)
407 .await
408 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
409 {
410 catalog
411 .create_namespace(&namespace, HashMap::default())
412 .await
413 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
414 .context("failed to create iceberg namespace")?;
415 }
416 namespace
417 } else {
418 bail!("database name must be set if you want to create table")
419 };
420
421 let table_id = self
422 .config
423 .full_table_name()
424 .context("Unable to parse table name")?;
425 if !catalog
426 .table_exists(&table_id)
427 .await
428 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
429 {
430 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
431 let arrow_fields = self
433 .param
434 .columns
435 .iter()
436 .map(|column| {
437 Ok(iceberg_create_table_arrow_convert
438 .to_arrow_field(&column.name, &column.data_type)
439 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
440 .context(format!(
441 "failed to convert {}: {} to arrow type",
442 &column.name, &column.data_type
443 ))?)
444 })
445 .collect::<Result<Vec<ArrowField>>>()?;
446 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
447 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
448 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
449 .context("failed to convert arrow schema to iceberg schema")?;
450
451 let location = {
452 let mut names = namespace.clone().inner();
453 names.push(self.config.common.table_name.clone());
454 match &self.config.common.warehouse_path {
455 Some(warehouse_path) => {
456 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
457 let url = Url::parse(warehouse_path);
458 if url.is_err() || is_s3_tables {
459 if self.config.common.catalog_type() == "rest"
462 || self.config.common.catalog_type() == "rest_rust"
463 {
464 None
465 } else {
466 bail!(format!("Invalid warehouse path: {}", warehouse_path))
467 }
468 } else if warehouse_path.ends_with('/') {
469 Some(format!("{}{}", warehouse_path, names.join("/")))
470 } else {
471 Some(format!("{}/{}", warehouse_path, names.join("/")))
472 }
473 }
474 None => None,
475 }
476 };
477
478 let partition_spec = match &self.config.partition_by {
479 Some(partition_by) => {
480 let mut partition_fields = Vec::<UnboundPartitionField>::new();
481 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
482 .into_iter()
483 .enumerate()
484 {
485 match iceberg_schema.field_id_by_name(&column) {
486 Some(id) => partition_fields.push(
487 UnboundPartitionField::builder()
488 .source_id(id)
489 .transform(transform)
490 .name(format!("_p_{}", column))
491 .field_id(i as i32)
492 .build(),
493 ),
494 None => bail!(format!(
495 "Partition source column does not exist in schema: {}",
496 column
497 )),
498 };
499 }
500 Some(
501 UnboundPartitionSpec::builder()
502 .with_spec_id(0)
503 .add_partition_fields(partition_fields)
504 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
505 .context("failed to add partition columns")?
506 .build(),
507 )
508 }
509 None => None,
510 };
511
512 let table_creation_builder = TableCreation::builder()
513 .name(self.config.common.table_name.clone())
514 .schema(iceberg_schema);
515
516 let table_creation = match (location, partition_spec) {
517 (Some(location), Some(partition_spec)) => table_creation_builder
518 .location(location)
519 .partition_spec(partition_spec)
520 .build(),
521 (Some(location), None) => table_creation_builder.location(location).build(),
522 (None, Some(partition_spec)) => table_creation_builder
523 .partition_spec(partition_spec)
524 .build(),
525 (None, None) => table_creation_builder.build(),
526 };
527
528 catalog
529 .create_table(&namespace, table_creation)
530 .await
531 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
532 .context("failed to create iceberg table")?;
533 }
534 Ok(())
535 }
536
537 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
538 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
539 if let Some(pk) = &config.primary_key {
540 let mut unique_column_ids = Vec::with_capacity(pk.len());
541 for col_name in pk {
542 let id = param
543 .columns
544 .iter()
545 .find(|col| col.name.as_str() == col_name)
546 .ok_or_else(|| {
547 SinkError::Config(anyhow!(
548 "Primary key column {} not found in sink schema",
549 col_name
550 ))
551 })?
552 .column_id
553 .get_id() as usize;
554 unique_column_ids.push(id);
555 }
556 Some(unique_column_ids)
557 } else {
558 unreachable!()
559 }
560 } else {
561 None
562 };
563 Ok(Self {
564 config,
565 param,
566 unique_column_ids,
567 })
568 }
569}
570
571impl Sink for IcebergSink {
572 type Coordinator = IcebergSinkCommitter;
573 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
574
575 const SINK_NAME: &'static str = ICEBERG_SINK;
576
577 async fn validate(&self) -> Result<()> {
578 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
579 bail!("Snowflake catalog only supports iceberg sources");
580 }
581 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
582 risingwave_common::license::Feature::IcebergSinkWithGlue
583 .check_available()
584 .map_err(|e| anyhow::anyhow!(e))?;
585 }
586
587 let _ = self.create_and_validate_table().await?;
588 Ok(())
589 }
590
591 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
592 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
593
594 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
595 if iceberg_config.enable_compaction && compaction_interval == 0 {
596 bail!(
597 "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
598 );
599 } else {
600 tracing::info!(
602 "Alter config compaction_interval set to {} seconds",
603 compaction_interval
604 );
605 }
606 }
607
608 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
609 && max_snapshots < 1
610 {
611 bail!(
612 "`max_snapshots_num_before_compaction` must be greater than 0, got: {}",
613 max_snapshots
614 );
615 }
616
617 Ok(())
618 }
619
620 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
621 let table = self.create_and_validate_table().await?;
622 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
623 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
624 } else {
625 IcebergSinkWriter::new_append_only(table, &writer_param).await?
626 };
627
628 let commit_checkpoint_interval =
629 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
630 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
631 );
632 let writer = CoordinatedLogSinker::new(
633 &writer_param,
634 self.param.clone(),
635 inner,
636 commit_checkpoint_interval,
637 )
638 .await?;
639
640 Ok(writer)
641 }
642
643 fn is_coordinated_sink(&self) -> bool {
644 true
645 }
646
647 async fn new_coordinator(
648 &self,
649 db: DatabaseConnection,
650 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
651 ) -> Result<Self::Coordinator> {
652 let catalog = self.config.create_catalog().await?;
653 let table = self.create_and_validate_table().await?;
654 Ok(IcebergSinkCommitter {
655 catalog,
656 table,
657 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
658 last_commit_epoch: 0,
659 sink_id: self.param.sink_id.sink_id(),
660 config: self.config.clone(),
661 param: self.param.clone(),
662 db,
663 commit_retry_num: self.config.commit_retry_num,
664 committed_epoch_subscriber: None,
665 iceberg_compact_stat_sender,
666 })
667 }
668}
669
670enum ProjectIdxVec {
677 None,
678 Prepare(usize),
679 Done(Vec<usize>),
680}
681
682pub struct IcebergSinkWriter {
683 writer: IcebergWriterDispatch,
684 arrow_schema: SchemaRef,
685 metrics: IcebergWriterMetrics,
687 table: Table,
689 project_idx_vec: ProjectIdxVec,
692}
693
694#[allow(clippy::type_complexity)]
695enum IcebergWriterDispatch {
696 PartitionAppendOnly {
697 writer: Option<Box<dyn IcebergWriter>>,
698 writer_builder: MonitoredGeneralWriterBuilder<
699 FanoutPartitionWriterBuilder<
700 DataFileWriterBuilder<
701 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
702 >,
703 >,
704 >,
705 },
706 NonPartitionAppendOnly {
707 writer: Option<Box<dyn IcebergWriter>>,
708 writer_builder: MonitoredGeneralWriterBuilder<
709 DataFileWriterBuilder<
710 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
711 >,
712 >,
713 },
714 PartitionUpsert {
715 writer: Option<Box<dyn IcebergWriter>>,
716 writer_builder: MonitoredGeneralWriterBuilder<
717 FanoutPartitionWriterBuilder<
718 EqualityDeltaWriterBuilder<
719 DataFileWriterBuilder<
720 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
721 >,
722 MonitoredPositionDeleteWriterBuilder<
723 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
724 >,
725 EqualityDeleteFileWriterBuilder<
726 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
727 >,
728 >,
729 >,
730 >,
731 arrow_schema_with_op_column: SchemaRef,
732 },
733 NonpartitionUpsert {
734 writer: Option<Box<dyn IcebergWriter>>,
735 writer_builder: MonitoredGeneralWriterBuilder<
736 EqualityDeltaWriterBuilder<
737 DataFileWriterBuilder<
738 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
739 >,
740 MonitoredPositionDeleteWriterBuilder<
741 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
742 >,
743 EqualityDeleteFileWriterBuilder<
744 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
745 >,
746 >,
747 >,
748 arrow_schema_with_op_column: SchemaRef,
749 },
750}
751
752impl IcebergWriterDispatch {
753 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
754 match self {
755 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
756 | IcebergWriterDispatch::NonPartitionAppendOnly { writer, .. }
757 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
758 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
759 }
760 }
761}
762
763pub struct IcebergWriterMetrics {
764 _write_qps: LabelGuardedIntCounter,
769 _write_latency: LabelGuardedHistogram,
770 write_bytes: LabelGuardedIntCounter,
771}
772
773impl IcebergSinkWriter {
774 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
775 let SinkWriterParam {
776 extra_partition_col_idx,
777 actor_id,
778 sink_id,
779 sink_name,
780 ..
781 } = writer_param;
782 let metrics_labels = [
783 &actor_id.to_string(),
784 &sink_id.to_string(),
785 sink_name.as_str(),
786 ];
787
788 let write_qps = GLOBAL_SINK_METRICS
790 .iceberg_write_qps
791 .with_guarded_label_values(&metrics_labels);
792 let write_latency = GLOBAL_SINK_METRICS
793 .iceberg_write_latency
794 .with_guarded_label_values(&metrics_labels);
795 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
798 .iceberg_rolling_unflushed_data_file
799 .with_guarded_label_values(&metrics_labels);
800 let write_bytes = GLOBAL_SINK_METRICS
801 .iceberg_write_bytes
802 .with_guarded_label_values(&metrics_labels);
803
804 let schema = table.metadata().current_schema();
805 let partition_spec = table.metadata().default_partition_spec();
806
807 let unique_uuid_suffix = Uuid::now_v7();
809
810 let parquet_writer_properties = WriterProperties::builder()
811 .set_max_row_group_size(
812 writer_param
813 .streaming_config
814 .developer
815 .iceberg_sink_write_parquet_max_row_group_rows,
816 )
817 .build();
818
819 let parquet_writer_builder = ParquetWriterBuilder::new(
820 parquet_writer_properties,
821 schema.clone(),
822 table.file_io().clone(),
823 DefaultLocationGenerator::new(table.metadata().clone())
824 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
825 DefaultFileNameGenerator::new(
826 writer_param.actor_id.to_string(),
827 Some(unique_uuid_suffix.to_string()),
828 iceberg::spec::DataFileFormat::Parquet,
829 ),
830 );
831 let data_file_builder =
832 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
833 if partition_spec.fields().is_empty() {
834 let writer_builder = MonitoredGeneralWriterBuilder::new(
835 data_file_builder,
836 write_qps.clone(),
837 write_latency.clone(),
838 );
839 let inner_writer = Some(Box::new(
840 writer_builder
841 .clone()
842 .build()
843 .await
844 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
845 ) as Box<dyn IcebergWriter>);
846 Ok(Self {
847 arrow_schema: Arc::new(
848 schema_to_arrow_schema(table.metadata().current_schema())
849 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
850 ),
851 metrics: IcebergWriterMetrics {
852 _write_qps: write_qps,
853 _write_latency: write_latency,
854 write_bytes,
855 },
856 writer: IcebergWriterDispatch::NonPartitionAppendOnly {
857 writer: inner_writer,
858 writer_builder,
859 },
860 table,
861 project_idx_vec: {
862 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
863 ProjectIdxVec::Prepare(*extra_partition_col_idx)
864 } else {
865 ProjectIdxVec::None
866 }
867 },
868 })
869 } else {
870 let partition_builder = MonitoredGeneralWriterBuilder::new(
871 FanoutPartitionWriterBuilder::new(
872 data_file_builder,
873 partition_spec.clone(),
874 schema.clone(),
875 )
876 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
877 write_qps.clone(),
878 write_latency.clone(),
879 );
880 let inner_writer = Some(Box::new(
881 partition_builder
882 .clone()
883 .build()
884 .await
885 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
886 ) as Box<dyn IcebergWriter>);
887 Ok(Self {
888 arrow_schema: Arc::new(
889 schema_to_arrow_schema(table.metadata().current_schema())
890 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
891 ),
892 metrics: IcebergWriterMetrics {
893 _write_qps: write_qps,
894 _write_latency: write_latency,
895 write_bytes,
896 },
897 writer: IcebergWriterDispatch::PartitionAppendOnly {
898 writer: inner_writer,
899 writer_builder: partition_builder,
900 },
901 table,
902 project_idx_vec: {
903 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
904 ProjectIdxVec::Prepare(*extra_partition_col_idx)
905 } else {
906 ProjectIdxVec::None
907 }
908 },
909 })
910 }
911 }
912
913 pub async fn new_upsert(
914 table: Table,
915 unique_column_ids: Vec<usize>,
916 writer_param: &SinkWriterParam,
917 ) -> Result<Self> {
918 let SinkWriterParam {
919 extra_partition_col_idx,
920 actor_id,
921 sink_id,
922 sink_name,
923 ..
924 } = writer_param;
925 let metrics_labels = [
926 &actor_id.to_string(),
927 &sink_id.to_string(),
928 sink_name.as_str(),
929 ];
930 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
931
932 let write_qps = GLOBAL_SINK_METRICS
934 .iceberg_write_qps
935 .with_guarded_label_values(&metrics_labels);
936 let write_latency = GLOBAL_SINK_METRICS
937 .iceberg_write_latency
938 .with_guarded_label_values(&metrics_labels);
939 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
942 .iceberg_rolling_unflushed_data_file
943 .with_guarded_label_values(&metrics_labels);
944 let position_delete_cache_num = GLOBAL_SINK_METRICS
945 .iceberg_position_delete_cache_num
946 .with_guarded_label_values(&metrics_labels);
947 let write_bytes = GLOBAL_SINK_METRICS
948 .iceberg_write_bytes
949 .with_guarded_label_values(&metrics_labels);
950
951 let schema = table.metadata().current_schema();
953 let partition_spec = table.metadata().default_partition_spec();
954
955 let unique_uuid_suffix = Uuid::now_v7();
957
958 let parquet_writer_properties = WriterProperties::builder()
959 .set_max_row_group_size(
960 writer_param
961 .streaming_config
962 .developer
963 .iceberg_sink_write_parquet_max_row_group_rows,
964 )
965 .build();
966
967 let data_file_builder = {
968 let parquet_writer_builder = ParquetWriterBuilder::new(
969 parquet_writer_properties.clone(),
970 schema.clone(),
971 table.file_io().clone(),
972 DefaultLocationGenerator::new(table.metadata().clone())
973 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
974 DefaultFileNameGenerator::new(
975 writer_param.actor_id.to_string(),
976 Some(unique_uuid_suffix.to_string()),
977 iceberg::spec::DataFileFormat::Parquet,
978 ),
979 );
980 DataFileWriterBuilder::new(
981 parquet_writer_builder.clone(),
982 None,
983 partition_spec.spec_id(),
984 )
985 };
986 let position_delete_builder = {
987 let parquet_writer_builder = ParquetWriterBuilder::new(
988 parquet_writer_properties.clone(),
989 POSITION_DELETE_SCHEMA.clone(),
990 table.file_io().clone(),
991 DefaultLocationGenerator::new(table.metadata().clone())
992 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
993 DefaultFileNameGenerator::new(
994 writer_param.actor_id.to_string(),
995 Some(format!("pos-del-{}", unique_uuid_suffix)),
996 iceberg::spec::DataFileFormat::Parquet,
997 ),
998 );
999 MonitoredPositionDeleteWriterBuilder::new(
1000 SortPositionDeleteWriterBuilder::new(
1001 parquet_writer_builder.clone(),
1002 writer_param
1003 .streaming_config
1004 .developer
1005 .iceberg_sink_positional_delete_cache_size,
1006 None,
1007 None,
1008 ),
1009 position_delete_cache_num,
1010 )
1011 };
1012 let equality_delete_builder = {
1013 let config = EqualityDeleteWriterConfig::new(
1014 unique_column_ids.clone(),
1015 table.metadata().current_schema().clone(),
1016 None,
1017 partition_spec.spec_id(),
1018 )
1019 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1020 let parquet_writer_builder = ParquetWriterBuilder::new(
1021 parquet_writer_properties.clone(),
1022 Arc::new(
1023 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1024 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1025 ),
1026 table.file_io().clone(),
1027 DefaultLocationGenerator::new(table.metadata().clone())
1028 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1029 DefaultFileNameGenerator::new(
1030 writer_param.actor_id.to_string(),
1031 Some(format!("eq-del-{}", unique_uuid_suffix)),
1032 iceberg::spec::DataFileFormat::Parquet,
1033 ),
1034 );
1035
1036 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
1037 };
1038 let delta_builder = EqualityDeltaWriterBuilder::new(
1039 data_file_builder,
1040 position_delete_builder,
1041 equality_delete_builder,
1042 unique_column_ids,
1043 );
1044 if partition_spec.fields().is_empty() {
1045 let writer_builder = MonitoredGeneralWriterBuilder::new(
1046 delta_builder,
1047 write_qps.clone(),
1048 write_latency.clone(),
1049 );
1050 let inner_writer = Some(Box::new(
1051 writer_builder
1052 .clone()
1053 .build()
1054 .await
1055 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1056 ) as Box<dyn IcebergWriter>);
1057 let original_arrow_schema = Arc::new(
1058 schema_to_arrow_schema(table.metadata().current_schema())
1059 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1060 );
1061 let schema_with_extra_op_column = {
1062 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1063 new_fields.push(Arc::new(ArrowField::new(
1064 "op".to_owned(),
1065 ArrowDataType::Int32,
1066 false,
1067 )));
1068 Arc::new(ArrowSchema::new(new_fields))
1069 };
1070 Ok(Self {
1071 arrow_schema: original_arrow_schema,
1072 metrics: IcebergWriterMetrics {
1073 _write_qps: write_qps,
1074 _write_latency: write_latency,
1075 write_bytes,
1076 },
1077 table,
1078 writer: IcebergWriterDispatch::NonpartitionUpsert {
1079 writer: inner_writer,
1080 writer_builder,
1081 arrow_schema_with_op_column: schema_with_extra_op_column,
1082 },
1083 project_idx_vec: {
1084 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1085 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1086 } else {
1087 ProjectIdxVec::None
1088 }
1089 },
1090 })
1091 } else {
1092 let original_arrow_schema = Arc::new(
1093 schema_to_arrow_schema(table.metadata().current_schema())
1094 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1095 );
1096 let schema_with_extra_op_column = {
1097 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1098 new_fields.push(Arc::new(ArrowField::new(
1099 "op".to_owned(),
1100 ArrowDataType::Int32,
1101 false,
1102 )));
1103 Arc::new(ArrowSchema::new(new_fields))
1104 };
1105 let partition_builder = MonitoredGeneralWriterBuilder::new(
1106 FanoutPartitionWriterBuilder::new_with_custom_schema(
1107 delta_builder,
1108 schema_with_extra_op_column.clone(),
1109 partition_spec.clone(),
1110 table.metadata().current_schema().clone(),
1111 ),
1112 write_qps.clone(),
1113 write_latency.clone(),
1114 );
1115 let inner_writer = Some(Box::new(
1116 partition_builder
1117 .clone()
1118 .build()
1119 .await
1120 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1121 ) as Box<dyn IcebergWriter>);
1122 Ok(Self {
1123 arrow_schema: original_arrow_schema,
1124 metrics: IcebergWriterMetrics {
1125 _write_qps: write_qps,
1126 _write_latency: write_latency,
1127 write_bytes,
1128 },
1129 table,
1130 writer: IcebergWriterDispatch::PartitionUpsert {
1131 writer: inner_writer,
1132 writer_builder: partition_builder,
1133 arrow_schema_with_op_column: schema_with_extra_op_column,
1134 },
1135 project_idx_vec: {
1136 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1137 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1138 } else {
1139 ProjectIdxVec::None
1140 }
1141 },
1142 })
1143 }
1144 }
1145}
1146
1147#[async_trait]
1148impl SinkWriter for IcebergSinkWriter {
1149 type CommitMetadata = Option<SinkMetadata>;
1150
1151 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1153 Ok(())
1155 }
1156
1157 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1159 match &mut self.writer {
1161 IcebergWriterDispatch::PartitionAppendOnly {
1162 writer,
1163 writer_builder,
1164 } => {
1165 if writer.is_none() {
1166 *writer = Some(Box::new(
1167 writer_builder
1168 .clone()
1169 .build()
1170 .await
1171 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1172 ));
1173 }
1174 }
1175 IcebergWriterDispatch::NonPartitionAppendOnly {
1176 writer,
1177 writer_builder,
1178 } => {
1179 if writer.is_none() {
1180 *writer = Some(Box::new(
1181 writer_builder
1182 .clone()
1183 .build()
1184 .await
1185 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1186 ));
1187 }
1188 }
1189 IcebergWriterDispatch::PartitionUpsert {
1190 writer,
1191 writer_builder,
1192 ..
1193 } => {
1194 if writer.is_none() {
1195 *writer = Some(Box::new(
1196 writer_builder
1197 .clone()
1198 .build()
1199 .await
1200 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1201 ));
1202 }
1203 }
1204 IcebergWriterDispatch::NonpartitionUpsert {
1205 writer,
1206 writer_builder,
1207 ..
1208 } => {
1209 if writer.is_none() {
1210 *writer = Some(Box::new(
1211 writer_builder
1212 .clone()
1213 .build()
1214 .await
1215 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1216 ));
1217 }
1218 }
1219 };
1220
1221 let (mut chunk, ops) = chunk.compact().into_parts();
1223 match &self.project_idx_vec {
1224 ProjectIdxVec::None => {}
1225 ProjectIdxVec::Prepare(idx) => {
1226 if *idx >= chunk.columns().len() {
1227 return Err(SinkError::Iceberg(anyhow!(
1228 "invalid extra partition column index {}",
1229 idx
1230 )));
1231 }
1232 let project_idx_vec = (0..*idx)
1233 .chain(*idx + 1..chunk.columns().len())
1234 .collect_vec();
1235 chunk = chunk.project(&project_idx_vec);
1236 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1237 }
1238 ProjectIdxVec::Done(idx_vec) => {
1239 chunk = chunk.project(idx_vec);
1240 }
1241 }
1242 if ops.is_empty() {
1243 return Ok(());
1244 }
1245 let write_batch_size = chunk.estimated_heap_size();
1246 let batch = match &self.writer {
1247 IcebergWriterDispatch::PartitionAppendOnly { .. }
1248 | IcebergWriterDispatch::NonPartitionAppendOnly { .. } => {
1249 let filters =
1251 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1252 chunk.set_visibility(filters);
1253 IcebergArrowConvert
1254 .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1255 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1256 }
1257 IcebergWriterDispatch::PartitionUpsert {
1258 arrow_schema_with_op_column,
1259 ..
1260 }
1261 | IcebergWriterDispatch::NonpartitionUpsert {
1262 arrow_schema_with_op_column,
1263 ..
1264 } => {
1265 let chunk = IcebergArrowConvert
1266 .to_record_batch(self.arrow_schema.clone(), &chunk)
1267 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1268 let ops = Arc::new(Int32Array::from(
1269 ops.iter()
1270 .map(|op| match op {
1271 Op::UpdateInsert | Op::Insert => INSERT_OP,
1272 Op::UpdateDelete | Op::Delete => DELETE_OP,
1273 })
1274 .collect_vec(),
1275 ));
1276 let mut columns = chunk.columns().to_vec();
1277 columns.push(ops);
1278 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1279 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1280 }
1281 };
1282
1283 let writer = self.writer.get_writer().unwrap();
1284 writer
1285 .write(batch)
1286 .instrument_await("iceberg_write")
1287 .await
1288 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1289 self.metrics.write_bytes.inc_by(write_batch_size as _);
1290 Ok(())
1291 }
1292
1293 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1296 if !is_checkpoint {
1298 return Ok(None);
1299 }
1300
1301 let close_result = match &mut self.writer {
1302 IcebergWriterDispatch::PartitionAppendOnly {
1303 writer,
1304 writer_builder,
1305 } => {
1306 let close_result = match writer.take() {
1307 Some(mut writer) => {
1308 Some(writer.close().instrument_await("iceberg_close").await)
1309 }
1310 _ => None,
1311 };
1312 match writer_builder.clone().build().await {
1313 Ok(new_writer) => {
1314 *writer = Some(Box::new(new_writer));
1315 }
1316 _ => {
1317 warn!("Failed to build new writer after close");
1320 }
1321 }
1322 close_result
1323 }
1324 IcebergWriterDispatch::NonPartitionAppendOnly {
1325 writer,
1326 writer_builder,
1327 } => {
1328 let close_result = match writer.take() {
1329 Some(mut writer) => {
1330 Some(writer.close().instrument_await("iceberg_close").await)
1331 }
1332 _ => None,
1333 };
1334 match writer_builder.clone().build().await {
1335 Ok(new_writer) => {
1336 *writer = Some(Box::new(new_writer));
1337 }
1338 _ => {
1339 warn!("Failed to build new writer after close");
1342 }
1343 }
1344 close_result
1345 }
1346 IcebergWriterDispatch::PartitionUpsert {
1347 writer,
1348 writer_builder,
1349 ..
1350 } => {
1351 let close_result = match writer.take() {
1352 Some(mut writer) => {
1353 Some(writer.close().instrument_await("iceberg_close").await)
1354 }
1355 _ => None,
1356 };
1357 match writer_builder.clone().build().await {
1358 Ok(new_writer) => {
1359 *writer = Some(Box::new(new_writer));
1360 }
1361 _ => {
1362 warn!("Failed to build new writer after close");
1365 }
1366 }
1367 close_result
1368 }
1369 IcebergWriterDispatch::NonpartitionUpsert {
1370 writer,
1371 writer_builder,
1372 ..
1373 } => {
1374 let close_result = match writer.take() {
1375 Some(mut writer) => {
1376 Some(writer.close().instrument_await("iceberg_close").await)
1377 }
1378 _ => None,
1379 };
1380 match writer_builder.clone().build().await {
1381 Ok(new_writer) => {
1382 *writer = Some(Box::new(new_writer));
1383 }
1384 _ => {
1385 warn!("Failed to build new writer after close");
1388 }
1389 }
1390 close_result
1391 }
1392 };
1393
1394 match close_result {
1395 Some(Ok(result)) => {
1396 let version = self.table.metadata().format_version() as u8;
1397 let partition_type = self.table.metadata().default_partition_type();
1398 let data_files = result
1399 .into_iter()
1400 .map(|f| {
1401 SerializedDataFile::try_from(f, partition_type, version == 1)
1402 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1403 })
1404 .collect::<Result<Vec<_>>>()?;
1405 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1406 data_files,
1407 schema_id: self.table.metadata().current_schema_id(),
1408 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1409 })?))
1410 }
1411 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1412 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1413 }
1414 }
1415
1416 async fn abort(&mut self) -> Result<()> {
1418 Ok(())
1420 }
1421}
1422
1423const SCHEMA_ID: &str = "schema_id";
1424const PARTITION_SPEC_ID: &str = "partition_spec_id";
1425const DATA_FILES: &str = "data_files";
1426
1427#[derive(Default, Clone)]
1428struct IcebergCommitResult {
1429 schema_id: i32,
1430 partition_spec_id: i32,
1431 data_files: Vec<SerializedDataFile>,
1432}
1433
1434impl IcebergCommitResult {
1435 fn try_from(value: &SinkMetadata) -> Result<Self> {
1436 if let Some(Serialized(v)) = &value.metadata {
1437 let mut values = if let serde_json::Value::Object(v) =
1438 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1439 .context("Can't parse iceberg sink metadata")?
1440 {
1441 v
1442 } else {
1443 bail!("iceberg sink metadata should be an object");
1444 };
1445
1446 let schema_id;
1447 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1448 schema_id = value
1449 .as_u64()
1450 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1451 } else {
1452 bail!("iceberg sink metadata should have schema_id");
1453 }
1454
1455 let partition_spec_id;
1456 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1457 partition_spec_id = value
1458 .as_u64()
1459 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1460 } else {
1461 bail!("iceberg sink metadata should have partition_spec_id");
1462 }
1463
1464 let data_files: Vec<SerializedDataFile>;
1465 if let serde_json::Value::Array(values) = values
1466 .remove(DATA_FILES)
1467 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1468 {
1469 data_files = values
1470 .into_iter()
1471 .map(from_value::<SerializedDataFile>)
1472 .collect::<std::result::Result<_, _>>()
1473 .unwrap();
1474 } else {
1475 bail!("iceberg sink metadata should have data_files object");
1476 }
1477
1478 Ok(Self {
1479 schema_id: schema_id as i32,
1480 partition_spec_id: partition_spec_id as i32,
1481 data_files,
1482 })
1483 } else {
1484 bail!("Can't create iceberg sink write result from empty data!")
1485 }
1486 }
1487
1488 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1489 let mut values = if let serde_json::Value::Object(value) =
1490 serde_json::from_slice::<serde_json::Value>(&value)
1491 .context("Can't parse iceberg sink metadata")?
1492 {
1493 value
1494 } else {
1495 bail!("iceberg sink metadata should be an object");
1496 };
1497
1498 let schema_id;
1499 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1500 schema_id = value
1501 .as_u64()
1502 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1503 } else {
1504 bail!("iceberg sink metadata should have schema_id");
1505 }
1506
1507 let partition_spec_id;
1508 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1509 partition_spec_id = value
1510 .as_u64()
1511 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1512 } else {
1513 bail!("iceberg sink metadata should have partition_spec_id");
1514 }
1515
1516 let data_files: Vec<SerializedDataFile>;
1517 if let serde_json::Value::Array(values) = values
1518 .remove(DATA_FILES)
1519 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1520 {
1521 data_files = values
1522 .into_iter()
1523 .map(from_value::<SerializedDataFile>)
1524 .collect::<std::result::Result<_, _>>()
1525 .unwrap();
1526 } else {
1527 bail!("iceberg sink metadata should have data_files object");
1528 }
1529
1530 Ok(Self {
1531 schema_id: schema_id as i32,
1532 partition_spec_id: partition_spec_id as i32,
1533 data_files,
1534 })
1535 }
1536}
1537
1538impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1539 type Error = SinkError;
1540
1541 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1542 let json_data_files = serde_json::Value::Array(
1543 value
1544 .data_files
1545 .iter()
1546 .map(serde_json::to_value)
1547 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1548 .context("Can't serialize data files to json")?,
1549 );
1550 let json_value = serde_json::Value::Object(
1551 vec![
1552 (
1553 SCHEMA_ID.to_owned(),
1554 serde_json::Value::Number(value.schema_id.into()),
1555 ),
1556 (
1557 PARTITION_SPEC_ID.to_owned(),
1558 serde_json::Value::Number(value.partition_spec_id.into()),
1559 ),
1560 (DATA_FILES.to_owned(), json_data_files),
1561 ]
1562 .into_iter()
1563 .collect(),
1564 );
1565 Ok(SinkMetadata {
1566 metadata: Some(Serialized(SerializedMetadata {
1567 metadata: serde_json::to_vec(&json_value)
1568 .context("Can't serialize iceberg sink metadata")?,
1569 })),
1570 })
1571 }
1572}
1573
1574impl TryFrom<IcebergCommitResult> for Vec<u8> {
1575 type Error = SinkError;
1576
1577 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1578 let json_data_files = serde_json::Value::Array(
1579 value
1580 .data_files
1581 .iter()
1582 .map(serde_json::to_value)
1583 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1584 .context("Can't serialize data files to json")?,
1585 );
1586 let json_value = serde_json::Value::Object(
1587 vec![
1588 (
1589 SCHEMA_ID.to_owned(),
1590 serde_json::Value::Number(value.schema_id.into()),
1591 ),
1592 (
1593 PARTITION_SPEC_ID.to_owned(),
1594 serde_json::Value::Number(value.partition_spec_id.into()),
1595 ),
1596 (DATA_FILES.to_owned(), json_data_files),
1597 ]
1598 .into_iter()
1599 .collect(),
1600 );
1601 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1602 }
1603}
1604pub struct IcebergSinkCommitter {
1605 catalog: Arc<dyn Catalog>,
1606 table: Table,
1607 pub last_commit_epoch: u64,
1608 pub(crate) is_exactly_once: bool,
1609 pub(crate) sink_id: u32,
1610 pub(crate) config: IcebergConfig,
1611 pub(crate) param: SinkParam,
1612 pub(crate) db: DatabaseConnection,
1613 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1614 commit_retry_num: u32,
1615 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1616}
1617
1618impl IcebergSinkCommitter {
1619 async fn reload_table(
1622 catalog: &dyn Catalog,
1623 table_ident: &TableIdent,
1624 schema_id: i32,
1625 partition_spec_id: i32,
1626 ) -> Result<Table> {
1627 let table = catalog
1628 .load_table(table_ident)
1629 .await
1630 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1631 if table.metadata().current_schema_id() != schema_id {
1632 return Err(SinkError::Iceberg(anyhow!(
1633 "Schema evolution not supported, expect schema id {}, but got {}",
1634 schema_id,
1635 table.metadata().current_schema_id()
1636 )));
1637 }
1638 if table.metadata().default_partition_spec_id() != partition_spec_id {
1639 return Err(SinkError::Iceberg(anyhow!(
1640 "Partition evolution not supported, expect partition spec id {}, but got {}",
1641 partition_spec_id,
1642 table.metadata().default_partition_spec_id()
1643 )));
1644 }
1645 Ok(table)
1646 }
1647}
1648
1649#[async_trait::async_trait]
1650impl SinkCommitCoordinator for IcebergSinkCommitter {
1651 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1652 if self.is_exactly_once {
1653 self.committed_epoch_subscriber = Some(subscriber);
1654 tracing::info!(
1655 "Sink id = {}: iceberg sink coordinator initing.",
1656 self.param.sink_id.sink_id()
1657 );
1658 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1659 let ordered_metadata_list_by_end_epoch =
1660 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1661
1662 let mut last_recommit_epoch = 0;
1663 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1664 ordered_metadata_list_by_end_epoch
1665 {
1666 let write_results_bytes = deserialize_metadata(sealized_bytes);
1667 let mut write_results = vec![];
1668
1669 for each in write_results_bytes {
1670 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1671 write_results.push(write_result);
1672 }
1673
1674 match (
1675 committed,
1676 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1677 .await?,
1678 ) {
1679 (true, _) => {
1680 tracing::info!(
1681 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1682 self.param.sink_id.sink_id()
1683 );
1684 }
1685 (false, true) => {
1686 tracing::info!(
1688 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1689 self.param.sink_id.sink_id()
1690 );
1691 mark_row_is_committed_by_sink_id_and_end_epoch(
1692 &self.db,
1693 self.sink_id,
1694 end_epoch,
1695 )
1696 .await?;
1697 }
1698 (false, false) => {
1699 tracing::info!(
1700 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1701 self.param.sink_id.sink_id()
1702 );
1703 self.re_commit(end_epoch, write_results, snapshot_id)
1704 .await?;
1705 }
1706 }
1707
1708 last_recommit_epoch = end_epoch;
1709 }
1710 tracing::info!(
1711 "Sink id = {}: iceberg commit coordinator inited.",
1712 self.param.sink_id.sink_id()
1713 );
1714 return Ok(Some(last_recommit_epoch));
1715 } else {
1716 tracing::info!(
1717 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1718 self.param.sink_id.sink_id()
1719 );
1720 return Ok(None);
1721 }
1722 }
1723
1724 tracing::info!("Iceberg commit coordinator inited.");
1725 return Ok(None);
1726 }
1727
1728 async fn commit(
1729 &mut self,
1730 epoch: u64,
1731 metadata: Vec<SinkMetadata>,
1732 add_columns: Option<Vec<Field>>,
1733 ) -> Result<()> {
1734 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1735 if let Some(add_columns) = add_columns {
1736 return Err(anyhow!(
1737 "Iceberg sink not support add columns, but got: {:?}",
1738 add_columns
1739 )
1740 .into());
1741 }
1742
1743 let write_results: Vec<IcebergCommitResult> = metadata
1744 .iter()
1745 .map(IcebergCommitResult::try_from)
1746 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1747
1748 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1750 tracing::debug!(?epoch, "no data to commit");
1751 return Ok(());
1752 }
1753
1754 if write_results
1756 .iter()
1757 .any(|r| r.schema_id != write_results[0].schema_id)
1758 || write_results
1759 .iter()
1760 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1761 {
1762 return Err(SinkError::Iceberg(anyhow!(
1763 "schema_id and partition_spec_id should be the same in all write results"
1764 )));
1765 }
1766
1767 self.wait_for_snapshot_limit().await?;
1769
1770 if self.is_exactly_once {
1771 assert!(self.committed_epoch_subscriber.is_some());
1772 match self.committed_epoch_subscriber.clone() {
1773 Some(committed_epoch_subscriber) => {
1774 let (committed_epoch, mut rw_futures_utilrx) =
1776 committed_epoch_subscriber(self.param.sink_id).await?;
1777 if committed_epoch >= epoch {
1779 self.commit_iceberg_inner(epoch, write_results, None)
1780 .await?;
1781 } else {
1782 tracing::info!(
1783 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1784 committed_epoch,
1785 epoch
1786 );
1787 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1788 tracing::info!(
1789 "Received next committed epoch: {}",
1790 next_committed_epoch
1791 );
1792 if next_committed_epoch >= epoch {
1794 self.commit_iceberg_inner(epoch, write_results, None)
1795 .await?;
1796 break;
1797 }
1798 }
1799 }
1800 }
1801 None => unreachable!(
1802 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1803 ),
1804 }
1805 } else {
1806 self.commit_iceberg_inner(epoch, write_results, None)
1807 .await?;
1808 }
1809
1810 Ok(())
1811 }
1812}
1813
1814impl IcebergSinkCommitter {
1816 async fn re_commit(
1817 &mut self,
1818 epoch: u64,
1819 write_results: Vec<IcebergCommitResult>,
1820 snapshot_id: i64,
1821 ) -> Result<()> {
1822 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1823
1824 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1826 tracing::debug!(?epoch, "no data to commit");
1827 return Ok(());
1828 }
1829 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1830 .await?;
1831 Ok(())
1832 }
1833
1834 async fn commit_iceberg_inner(
1835 &mut self,
1836 epoch: u64,
1837 write_results: Vec<IcebergCommitResult>,
1838 snapshot_id: Option<i64>,
1839 ) -> Result<()> {
1840 let is_first_commit = snapshot_id.is_none();
1844 self.last_commit_epoch = epoch;
1845 let expect_schema_id = write_results[0].schema_id;
1846 let expect_partition_spec_id = write_results[0].partition_spec_id;
1847
1848 self.table = Self::reload_table(
1850 self.catalog.as_ref(),
1851 self.table.identifier(),
1852 expect_schema_id,
1853 expect_partition_spec_id,
1854 )
1855 .await?;
1856 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1857 return Err(SinkError::Iceberg(anyhow!(
1858 "Can't find schema by id {}",
1859 expect_schema_id
1860 )));
1861 };
1862 let Some(partition_spec) = self
1863 .table
1864 .metadata()
1865 .partition_spec_by_id(expect_partition_spec_id)
1866 else {
1867 return Err(SinkError::Iceberg(anyhow!(
1868 "Can't find partition spec by id {}",
1869 expect_partition_spec_id
1870 )));
1871 };
1872 let partition_type = partition_spec
1873 .as_ref()
1874 .clone()
1875 .partition_type(schema)
1876 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1877
1878 let txn = Transaction::new(&self.table);
1879 let snapshot_id = match snapshot_id {
1881 Some(previous_snapshot_id) => previous_snapshot_id,
1882 None => txn.generate_unique_snapshot_id(),
1883 };
1884 if self.is_exactly_once && is_first_commit {
1885 let mut pre_commit_metadata_bytes = Vec::new();
1887 for each_parallelism_write_result in write_results.clone() {
1888 let each_parallelism_write_result_bytes: Vec<u8> =
1889 each_parallelism_write_result.try_into()?;
1890 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1891 }
1892
1893 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1894
1895 persist_pre_commit_metadata(
1896 self.sink_id,
1897 self.db.clone(),
1898 self.last_commit_epoch,
1899 epoch,
1900 pre_commit_metadata_bytes,
1901 snapshot_id,
1902 )
1903 .await?;
1904 }
1905
1906 let data_files = write_results
1907 .into_iter()
1908 .flat_map(|r| {
1909 r.data_files.into_iter().map(|f| {
1910 f.try_into(expect_partition_spec_id, &partition_type, schema)
1911 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1912 })
1913 })
1914 .collect::<Result<Vec<DataFile>>>()?;
1915 let retry_strategy = ExponentialBackoff::from_millis(10)
1920 .max_delay(Duration::from_secs(60))
1921 .map(jitter)
1922 .take(self.commit_retry_num as usize);
1923 let catalog = self.catalog.clone();
1924 let table_ident = self.table.identifier().clone();
1925 let table = Retry::spawn(retry_strategy, || async {
1926 let table = Self::reload_table(
1927 catalog.as_ref(),
1928 &table_ident,
1929 expect_schema_id,
1930 expect_partition_spec_id,
1931 )
1932 .await?;
1933 let txn = Transaction::new(&table);
1934 let mut append_action = txn
1935 .fast_append(Some(snapshot_id), None, vec![])
1936 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1937 .with_to_branch(commit_branch(
1938 self.config.r#type.as_str(),
1939 self.config.write_mode.as_str(),
1940 ));
1941 append_action
1942 .add_data_files(data_files.clone())
1943 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1944 let tx = append_action.apply().await.map_err(|err| {
1945 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1946 SinkError::Iceberg(anyhow!(err))
1947 })?;
1948 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1949 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1950 SinkError::Iceberg(anyhow!(err))
1951 })
1952 })
1953 .await?;
1954 self.table = table;
1955
1956 let snapshot_num = self.table.metadata().snapshots().count();
1957 let catalog_name = self.config.common.catalog_name();
1958 let table_name = self.table.identifier().to_string();
1959 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
1960 GLOBAL_SINK_METRICS
1961 .iceberg_snapshot_num
1962 .with_guarded_label_values(&metrics_labels)
1963 .set(snapshot_num as i64);
1964
1965 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1966
1967 if self.is_exactly_once {
1968 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1969 tracing::info!(
1970 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1971 self.sink_id,
1972 epoch
1973 );
1974
1975 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1976 }
1977
1978 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1979 && self.config.enable_compaction
1980 && iceberg_compact_stat_sender
1981 .send(IcebergSinkCompactionUpdate {
1982 sink_id: SinkId::new(self.sink_id),
1983 compaction_interval: self.config.compaction_interval_sec(),
1984 force_compaction: false,
1985 })
1986 .is_err()
1987 {
1988 warn!("failed to send iceberg compaction stats");
1989 }
1990
1991 Ok(())
1992 }
1993
1994 async fn is_snapshot_id_in_iceberg(
1998 &self,
1999 iceberg_config: &IcebergConfig,
2000 snapshot_id: i64,
2001 ) -> Result<bool> {
2002 let iceberg_common = iceberg_config.common.clone();
2003 let table = iceberg_common
2004 .load_table(&iceberg_config.java_catalog_props)
2005 .await?;
2006 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
2007 Ok(true)
2008 } else {
2009 Ok(false)
2010 }
2011 }
2012
2013 fn count_snapshots_since_rewrite(&self) -> usize {
2016 let mut snapshots: Vec<_> = self.table.metadata().snapshots().collect();
2017 snapshots.sort_by_key(|b| std::cmp::Reverse(b.timestamp_ms()));
2018
2019 let mut count = 0;
2021 for snapshot in snapshots {
2022 let summary = snapshot.summary();
2024 match &summary.operation {
2025 Operation::Replace => {
2026 break;
2028 }
2029
2030 _ => {
2031 count += 1;
2033 }
2034 }
2035 }
2036
2037 count
2038 }
2039
2040 async fn wait_for_snapshot_limit(&mut self) -> Result<()> {
2042 if !self.config.enable_compaction {
2043 return Ok(());
2044 }
2045
2046 if let Some(max_snapshots) = self.config.max_snapshots_num_before_compaction {
2047 loop {
2048 let current_count = self.count_snapshots_since_rewrite();
2049
2050 if current_count < max_snapshots {
2051 tracing::info!(
2052 "Snapshot count check passed: {} < {}",
2053 current_count,
2054 max_snapshots
2055 );
2056 break;
2057 }
2058
2059 tracing::info!(
2060 "Snapshot count {} exceeds limit {}, waiting...",
2061 current_count,
2062 max_snapshots
2063 );
2064
2065 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
2066 && iceberg_compact_stat_sender
2067 .send(IcebergSinkCompactionUpdate {
2068 sink_id: SinkId::new(self.sink_id),
2069 compaction_interval: self.config.compaction_interval_sec(),
2070 force_compaction: true,
2071 })
2072 .is_err()
2073 {
2074 tracing::warn!("failed to send iceberg compaction stats");
2075 }
2076
2077 tokio::time::sleep(Duration::from_secs(30)).await;
2079
2080 self.table = self.config.load_table().await?;
2082 }
2083 }
2084 Ok(())
2085 }
2086}
2087
2088const MAP_KEY: &str = "key";
2089const MAP_VALUE: &str = "value";
2090
2091fn get_fields<'a>(
2092 our_field_type: &'a risingwave_common::types::DataType,
2093 data_type: &ArrowDataType,
2094 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
2095) -> Option<ArrowFields> {
2096 match data_type {
2097 ArrowDataType::Struct(fields) => {
2098 match our_field_type {
2099 risingwave_common::types::DataType::Struct(struct_fields) => {
2100 struct_fields.iter().for_each(|(name, data_type)| {
2101 let res = schema_fields.insert(name, data_type);
2102 assert!(res.is_none())
2104 });
2105 }
2106 risingwave_common::types::DataType::Map(map_fields) => {
2107 schema_fields.insert(MAP_KEY, map_fields.key());
2108 schema_fields.insert(MAP_VALUE, map_fields.value());
2109 }
2110 risingwave_common::types::DataType::List(list) => {
2111 list.elem()
2112 .as_struct()
2113 .iter()
2114 .for_each(|(name, data_type)| {
2115 let res = schema_fields.insert(name, data_type);
2116 assert!(res.is_none())
2118 });
2119 }
2120 _ => {}
2121 };
2122 Some(fields.clone())
2123 }
2124 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2125 get_fields(our_field_type, field.data_type(), schema_fields)
2126 }
2127 _ => None, }
2129}
2130
2131fn check_compatibility(
2132 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2133 fields: &ArrowFields,
2134) -> anyhow::Result<bool> {
2135 for arrow_field in fields {
2136 let our_field_type = schema_fields
2137 .get(arrow_field.name().as_str())
2138 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2139
2140 let converted_arrow_data_type = IcebergArrowConvert
2142 .to_arrow_field("", our_field_type)
2143 .map_err(|e| anyhow!(e))?
2144 .data_type()
2145 .clone();
2146
2147 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2148 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2149 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2150 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2151 (ArrowDataType::List(_), ArrowDataType::List(field))
2152 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2153 let mut schema_fields = HashMap::new();
2154 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2155 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2156 }
2157 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2159 let mut schema_fields = HashMap::new();
2160 our_field_type
2161 .as_struct()
2162 .iter()
2163 .for_each(|(name, data_type)| {
2164 let res = schema_fields.insert(name, data_type);
2165 assert!(res.is_none())
2167 });
2168 check_compatibility(schema_fields, fields)?
2169 }
2170 (left, right) => left.equals_datatype(right),
2178 };
2179 if !compatible {
2180 bail!(
2181 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2182 arrow_field.name(),
2183 converted_arrow_data_type,
2184 arrow_field.data_type()
2185 );
2186 }
2187 }
2188 Ok(true)
2189}
2190
2191pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2193 if rw_schema.fields.len() != arrow_schema.fields().len() {
2194 bail!(
2195 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2196 rw_schema.fields.len(),
2197 arrow_schema.fields.len()
2198 );
2199 }
2200
2201 let mut schema_fields = HashMap::new();
2202 rw_schema.fields.iter().for_each(|field| {
2203 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2204 assert!(res.is_none())
2206 });
2207
2208 check_compatibility(schema_fields, &arrow_schema.fields)?;
2209 Ok(())
2210}
2211
2212pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2213 serde_json::to_vec(&metadata).unwrap()
2214}
2215
2216pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2217 serde_json::from_slice(&bytes).unwrap()
2218}
2219
2220pub fn parse_partition_by_exprs(
2221 expr: String,
2222) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2223 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2225 if !re.is_match(&expr) {
2226 bail!(format!(
2227 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2228 expr
2229 ))
2230 }
2231 let caps = re.captures_iter(&expr);
2232
2233 let mut partition_columns = vec![];
2234
2235 for mat in caps {
2236 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2237 (&mat["transform"], Transform::Identity)
2238 } else {
2239 let mut func = mat["transform"].to_owned();
2240 if func == "bucket" || func == "truncate" {
2241 let n = &mat
2242 .name("n")
2243 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2244 .as_str();
2245 func = format!("{func}[{n}]");
2246 }
2247 (
2248 &mat["field"],
2249 Transform::from_str(&func)
2250 .with_context(|| format!("invalid transform function {}", func))?,
2251 )
2252 };
2253 partition_columns.push((column.to_owned(), transform));
2254 }
2255 Ok(partition_columns)
2256}
2257
2258pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2259 if should_enable_iceberg_cow(sink_type, write_mode) {
2260 ICEBERG_COW_BRANCH.to_owned()
2261 } else {
2262 MAIN_BRANCH.to_owned()
2263 }
2264}
2265
2266pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2267 sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2268}
2269
2270#[cfg(test)]
2271mod test {
2272 use std::collections::BTreeMap;
2273
2274 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2275 use risingwave_common::types::{DataType, MapType, StructType};
2276
2277 use crate::connector_common::IcebergCommon;
2278 use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2279 use crate::sink::iceberg::{
2280 COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2281 ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, MAX_SNAPSHOTS_NUM,
2282 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2283 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2284 };
2285
2286 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2289 fn test_compatible_arrow_schema() {
2290 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2291
2292 use super::*;
2293 let risingwave_schema = Schema::new(vec![
2294 Field::with_name(DataType::Int32, "a"),
2295 Field::with_name(DataType::Int32, "b"),
2296 Field::with_name(DataType::Int32, "c"),
2297 ]);
2298 let arrow_schema = ArrowSchema::new(vec![
2299 ArrowField::new("a", ArrowDataType::Int32, false),
2300 ArrowField::new("b", ArrowDataType::Int32, false),
2301 ArrowField::new("c", ArrowDataType::Int32, false),
2302 ]);
2303
2304 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2305
2306 let risingwave_schema = Schema::new(vec![
2307 Field::with_name(DataType::Int32, "d"),
2308 Field::with_name(DataType::Int32, "c"),
2309 Field::with_name(DataType::Int32, "a"),
2310 Field::with_name(DataType::Int32, "b"),
2311 ]);
2312 let arrow_schema = ArrowSchema::new(vec![
2313 ArrowField::new("a", ArrowDataType::Int32, false),
2314 ArrowField::new("b", ArrowDataType::Int32, false),
2315 ArrowField::new("d", ArrowDataType::Int32, false),
2316 ArrowField::new("c", ArrowDataType::Int32, false),
2317 ]);
2318 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2319
2320 let risingwave_schema = Schema::new(vec![
2321 Field::with_name(
2322 DataType::Struct(StructType::new(vec![
2323 ("a1", DataType::Int32),
2324 (
2325 "a2",
2326 DataType::Struct(StructType::new(vec![
2327 ("a21", DataType::Bytea),
2328 (
2329 "a22",
2330 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2331 ),
2332 ])),
2333 ),
2334 ])),
2335 "a",
2336 ),
2337 Field::with_name(
2338 DataType::list(DataType::Struct(StructType::new(vec![
2339 ("b1", DataType::Int32),
2340 ("b2", DataType::Bytea),
2341 (
2342 "b3",
2343 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2344 ),
2345 ]))),
2346 "b",
2347 ),
2348 Field::with_name(
2349 DataType::Map(MapType::from_kv(
2350 DataType::Varchar,
2351 DataType::list(DataType::Struct(StructType::new([
2352 ("c1", DataType::Int32),
2353 ("c2", DataType::Bytea),
2354 (
2355 "c3",
2356 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2357 ),
2358 ]))),
2359 )),
2360 "c",
2361 ),
2362 ]);
2363 let arrow_schema = ArrowSchema::new(vec![
2364 ArrowField::new(
2365 "a",
2366 ArrowDataType::Struct(ArrowFields::from(vec![
2367 ArrowField::new("a1", ArrowDataType::Int32, false),
2368 ArrowField::new(
2369 "a2",
2370 ArrowDataType::Struct(ArrowFields::from(vec![
2371 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2372 ArrowField::new_map(
2373 "a22",
2374 "entries",
2375 ArrowFieldRef::new(ArrowField::new(
2376 "key",
2377 ArrowDataType::Utf8,
2378 false,
2379 )),
2380 ArrowFieldRef::new(ArrowField::new(
2381 "value",
2382 ArrowDataType::Utf8,
2383 false,
2384 )),
2385 false,
2386 false,
2387 ),
2388 ])),
2389 false,
2390 ),
2391 ])),
2392 false,
2393 ),
2394 ArrowField::new(
2395 "b",
2396 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2397 ArrowDataType::Struct(ArrowFields::from(vec![
2398 ArrowField::new("b1", ArrowDataType::Int32, false),
2399 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2400 ArrowField::new_map(
2401 "b3",
2402 "entries",
2403 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2404 ArrowFieldRef::new(ArrowField::new(
2405 "value",
2406 ArrowDataType::Utf8,
2407 false,
2408 )),
2409 false,
2410 false,
2411 ),
2412 ])),
2413 false,
2414 ))),
2415 false,
2416 ),
2417 ArrowField::new_map(
2418 "c",
2419 "entries",
2420 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2421 ArrowFieldRef::new(ArrowField::new(
2422 "value",
2423 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2424 ArrowDataType::Struct(ArrowFields::from(vec![
2425 ArrowField::new("c1", ArrowDataType::Int32, false),
2426 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2427 ArrowField::new_map(
2428 "c3",
2429 "entries",
2430 ArrowFieldRef::new(ArrowField::new(
2431 "key",
2432 ArrowDataType::Utf8,
2433 false,
2434 )),
2435 ArrowFieldRef::new(ArrowField::new(
2436 "value",
2437 ArrowDataType::Utf8,
2438 false,
2439 )),
2440 false,
2441 false,
2442 ),
2443 ])),
2444 false,
2445 ))),
2446 false,
2447 )),
2448 false,
2449 false,
2450 ),
2451 ]);
2452 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2453 }
2454
2455 #[test]
2456 fn test_parse_iceberg_config() {
2457 let values = [
2458 ("connector", "iceberg"),
2459 ("type", "upsert"),
2460 ("primary_key", "v1"),
2461 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2462 ("warehouse.path", "s3://iceberg"),
2463 ("s3.endpoint", "http://127.0.0.1:9301"),
2464 ("s3.access.key", "hummockadmin"),
2465 ("s3.secret.key", "hummockadmin"),
2466 ("s3.path.style.access", "true"),
2467 ("s3.region", "us-east-1"),
2468 ("catalog.type", "jdbc"),
2469 ("catalog.name", "demo"),
2470 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2471 ("catalog.jdbc.user", "admin"),
2472 ("catalog.jdbc.password", "123456"),
2473 ("database.name", "demo_db"),
2474 ("table.name", "demo_table"),
2475 ("enable_compaction", "true"),
2476 ("compaction_interval_sec", "1800"),
2477 ("enable_snapshot_expiration", "true"),
2478 ]
2479 .into_iter()
2480 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2481 .collect();
2482
2483 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2484
2485 let expected_iceberg_config = IcebergConfig {
2486 common: IcebergCommon {
2487 warehouse_path: Some("s3://iceberg".to_owned()),
2488 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2489 region: Some("us-east-1".to_owned()),
2490 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2491 access_key: Some("hummockadmin".to_owned()),
2492 secret_key: Some("hummockadmin".to_owned()),
2493 gcs_credential: None,
2494 catalog_type: Some("jdbc".to_owned()),
2495 glue_id: None,
2496 catalog_name: Some("demo".to_owned()),
2497 database_name: Some("demo_db".to_owned()),
2498 table_name: "demo_table".to_owned(),
2499 path_style_access: Some(true),
2500 credential: None,
2501 oauth2_server_uri: None,
2502 scope: None,
2503 token: None,
2504 enable_config_load: None,
2505 rest_signing_name: None,
2506 rest_signing_region: None,
2507 rest_sigv4_enabled: None,
2508 hosted_catalog: None,
2509 azblob_account_name: None,
2510 azblob_account_key: None,
2511 azblob_endpoint_url: None,
2512 header: None,
2513 },
2514 r#type: "upsert".to_owned(),
2515 force_append_only: false,
2516 primary_key: Some(vec!["v1".to_owned()]),
2517 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2518 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2519 .into_iter()
2520 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2521 .collect(),
2522 commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2523 create_table_if_not_exists: false,
2524 is_exactly_once: None,
2525 commit_retry_num: 8,
2526 enable_compaction: true,
2527 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2528 enable_snapshot_expiration: true,
2529 write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2530 snapshot_expiration_max_age_millis: None,
2531 snapshot_expiration_retain_last: None,
2532 snapshot_expiration_clear_expired_files: true,
2533 snapshot_expiration_clear_expired_meta_data: true,
2534 max_snapshots_num_before_compaction: None,
2535 };
2536
2537 assert_eq!(iceberg_config, expected_iceberg_config);
2538
2539 assert_eq!(
2540 &iceberg_config.common.full_table_name().unwrap().to_string(),
2541 "demo_db.demo_table"
2542 );
2543 }
2544
2545 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2546 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2547
2548 let table = iceberg_config.load_table().await.unwrap();
2549
2550 println!("{:?}", table.identifier());
2551 }
2552
2553 #[tokio::test]
2554 #[ignore]
2555 async fn test_storage_catalog() {
2556 let values = [
2557 ("connector", "iceberg"),
2558 ("type", "append-only"),
2559 ("force_append_only", "true"),
2560 ("s3.endpoint", "http://127.0.0.1:9301"),
2561 ("s3.access.key", "hummockadmin"),
2562 ("s3.secret.key", "hummockadmin"),
2563 ("s3.region", "us-east-1"),
2564 ("s3.path.style.access", "true"),
2565 ("catalog.name", "demo"),
2566 ("catalog.type", "storage"),
2567 ("warehouse.path", "s3://icebergdata/demo"),
2568 ("database.name", "s1"),
2569 ("table.name", "t1"),
2570 ]
2571 .into_iter()
2572 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2573 .collect();
2574
2575 test_create_catalog(values).await;
2576 }
2577
2578 #[tokio::test]
2579 #[ignore]
2580 async fn test_rest_catalog() {
2581 let values = [
2582 ("connector", "iceberg"),
2583 ("type", "append-only"),
2584 ("force_append_only", "true"),
2585 ("s3.endpoint", "http://127.0.0.1:9301"),
2586 ("s3.access.key", "hummockadmin"),
2587 ("s3.secret.key", "hummockadmin"),
2588 ("s3.region", "us-east-1"),
2589 ("s3.path.style.access", "true"),
2590 ("catalog.name", "demo"),
2591 ("catalog.type", "rest"),
2592 ("catalog.uri", "http://192.168.167.4:8181"),
2593 ("warehouse.path", "s3://icebergdata/demo"),
2594 ("database.name", "s1"),
2595 ("table.name", "t1"),
2596 ]
2597 .into_iter()
2598 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2599 .collect();
2600
2601 test_create_catalog(values).await;
2602 }
2603
2604 #[tokio::test]
2605 #[ignore]
2606 async fn test_jdbc_catalog() {
2607 let values = [
2608 ("connector", "iceberg"),
2609 ("type", "append-only"),
2610 ("force_append_only", "true"),
2611 ("s3.endpoint", "http://127.0.0.1:9301"),
2612 ("s3.access.key", "hummockadmin"),
2613 ("s3.secret.key", "hummockadmin"),
2614 ("s3.region", "us-east-1"),
2615 ("s3.path.style.access", "true"),
2616 ("catalog.name", "demo"),
2617 ("catalog.type", "jdbc"),
2618 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2619 ("catalog.jdbc.user", "admin"),
2620 ("catalog.jdbc.password", "123456"),
2621 ("warehouse.path", "s3://icebergdata/demo"),
2622 ("database.name", "s1"),
2623 ("table.name", "t1"),
2624 ]
2625 .into_iter()
2626 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2627 .collect();
2628
2629 test_create_catalog(values).await;
2630 }
2631
2632 #[tokio::test]
2633 #[ignore]
2634 async fn test_hive_catalog() {
2635 let values = [
2636 ("connector", "iceberg"),
2637 ("type", "append-only"),
2638 ("force_append_only", "true"),
2639 ("s3.endpoint", "http://127.0.0.1:9301"),
2640 ("s3.access.key", "hummockadmin"),
2641 ("s3.secret.key", "hummockadmin"),
2642 ("s3.region", "us-east-1"),
2643 ("s3.path.style.access", "true"),
2644 ("catalog.name", "demo"),
2645 ("catalog.type", "hive"),
2646 ("catalog.uri", "thrift://localhost:9083"),
2647 ("warehouse.path", "s3://icebergdata/demo"),
2648 ("database.name", "s1"),
2649 ("table.name", "t1"),
2650 ]
2651 .into_iter()
2652 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2653 .collect();
2654
2655 test_create_catalog(values).await;
2656 }
2657
2658 #[test]
2659 fn test_config_constants_consistency() {
2660 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2663 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2664 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2665 assert_eq!(WRITE_MODE, "write_mode");
2666 assert_eq!(
2667 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2668 "snapshot_expiration_retain_last"
2669 );
2670 assert_eq!(
2671 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2672 "snapshot_expiration_max_age_millis"
2673 );
2674 assert_eq!(
2675 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2676 "snapshot_expiration_clear_expired_files"
2677 );
2678 assert_eq!(
2679 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2680 "snapshot_expiration_clear_expired_meta_data"
2681 );
2682 assert_eq!(MAX_SNAPSHOTS_NUM, "max_snapshots_num_before_compaction");
2683 }
2684}