1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, MAIN_BRANCH, SerializedDataFile, Transform, UnboundPartitionField,
30 UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43 DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59 Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::{Field, Schema};
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
85use super::{
86 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103pub const ENABLE_COMPACTION: &str = "enable_compaction";
105pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
106pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
107pub const WRITE_MODE: &str = "write_mode";
108pub const SNAPSHOT_EXPIRATION_RETAIN_LAST: &str = "snapshot_expiration_retain_last";
109pub const SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS: &str = "snapshot_expiration_max_age_millis";
110pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES: &str = "snapshot_expiration_clear_expired_files";
111pub const SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA: &str =
112 "snapshot_expiration_clear_expired_meta_data";
113
114fn default_commit_retry_num() -> u32 {
115 8
116}
117
118fn default_iceberg_write_mode() -> String {
119 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
120}
121
122fn default_true() -> bool {
123 true
124}
125
126#[serde_as]
127#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
128pub struct IcebergConfig {
129 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
132 pub force_append_only: bool,
133
134 #[serde(flatten)]
135 common: IcebergCommon,
136
137 #[serde(
138 rename = "primary_key",
139 default,
140 deserialize_with = "deserialize_optional_string_seq_from_string"
141 )]
142 pub primary_key: Option<Vec<String>>,
143
144 #[serde(skip)]
146 pub java_catalog_props: HashMap<String, String>,
147
148 #[serde(default)]
149 pub partition_by: Option<String>,
150
151 #[serde(default = "default_commit_checkpoint_interval")]
153 #[serde_as(as = "DisplayFromStr")]
154 #[with_option(allow_alter_on_fly)]
155 pub commit_checkpoint_interval: u64,
156
157 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
158 pub create_table_if_not_exists: bool,
159
160 #[serde(default)]
162 #[serde_as(as = "Option<DisplayFromStr>")]
163 pub is_exactly_once: Option<bool>,
164 #[serde(default = "default_commit_retry_num")]
169 pub commit_retry_num: u32,
170
171 #[serde(
173 rename = "enable_compaction",
174 default,
175 deserialize_with = "deserialize_bool_from_string"
176 )]
177 #[with_option(allow_alter_on_fly)]
178 pub enable_compaction: bool,
179
180 #[serde(rename = "compaction_interval_sec", default)]
182 #[serde_as(as = "Option<DisplayFromStr>")]
183 #[with_option(allow_alter_on_fly)]
184 pub compaction_interval_sec: Option<u64>,
185
186 #[serde(
188 rename = "enable_snapshot_expiration",
189 default,
190 deserialize_with = "deserialize_bool_from_string"
191 )]
192 #[with_option(allow_alter_on_fly)]
193 pub enable_snapshot_expiration: bool,
194
195 #[serde(rename = "write_mode", default = "default_iceberg_write_mode")]
197 pub write_mode: String,
198
199 #[serde(rename = "snapshot_expiration_max_age_millis", default)]
202 #[serde_as(as = "Option<DisplayFromStr>")]
203 #[with_option(allow_alter_on_fly)]
204 pub snapshot_expiration_max_age_millis: Option<i64>,
205
206 #[serde(rename = "snapshot_expiration_retain_last", default)]
208 #[serde_as(as = "Option<DisplayFromStr>")]
209 #[with_option(allow_alter_on_fly)]
210 pub snapshot_expiration_retain_last: Option<i32>,
211
212 #[serde(
213 rename = "snapshot_expiration_clear_expired_files",
214 default = "default_true",
215 deserialize_with = "deserialize_bool_from_string"
216 )]
217 #[with_option(allow_alter_on_fly)]
218 pub snapshot_expiration_clear_expired_files: bool,
219
220 #[serde(
221 rename = "snapshot_expiration_clear_expired_meta_data",
222 default = "default_true",
223 deserialize_with = "deserialize_bool_from_string"
224 )]
225 #[with_option(allow_alter_on_fly)]
226 pub snapshot_expiration_clear_expired_meta_data: bool,
227}
228
229impl EnforceSecret for IcebergConfig {
230 fn enforce_secret<'a>(
231 prop_iter: impl Iterator<Item = &'a str>,
232 ) -> crate::error::ConnectorResult<()> {
233 for prop in prop_iter {
234 IcebergCommon::enforce_one(prop)?;
235 }
236 Ok(())
237 }
238
239 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
240 IcebergCommon::enforce_one(prop)
241 }
242}
243
244impl IcebergConfig {
245 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
246 let mut config =
247 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
248 .map_err(|e| SinkError::Config(anyhow!(e)))?;
249
250 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
251 return Err(SinkError::Config(anyhow!(
252 "`{}` must be {}, or {}",
253 SINK_TYPE_OPTION,
254 SINK_TYPE_APPEND_ONLY,
255 SINK_TYPE_UPSERT
256 )));
257 }
258
259 if config.r#type == SINK_TYPE_UPSERT {
260 if let Some(primary_key) = &config.primary_key {
261 if primary_key.is_empty() {
262 return Err(SinkError::Config(anyhow!(
263 "`primary_key` must not be empty in {}",
264 SINK_TYPE_UPSERT
265 )));
266 }
267 } else {
268 return Err(SinkError::Config(anyhow!(
269 "Must set `primary_key` in {}",
270 SINK_TYPE_UPSERT
271 )));
272 }
273 }
274
275 config.java_catalog_props = values
277 .iter()
278 .filter(|(k, _v)| {
279 k.starts_with("catalog.")
280 && k != &"catalog.uri"
281 && k != &"catalog.type"
282 && k != &"catalog.name"
283 && k != &"catalog.header"
284 })
285 .map(|(k, v)| (k[8..].to_string(), v.clone()))
286 .collect();
287
288 if config.commit_checkpoint_interval == 0 {
289 return Err(SinkError::Config(anyhow!(
290 "`commit_checkpoint_interval` must be greater than 0"
291 )));
292 }
293
294 Ok(config)
295 }
296
297 pub fn catalog_type(&self) -> &str {
298 self.common.catalog_type()
299 }
300
301 pub async fn load_table(&self) -> Result<Table> {
302 self.common
303 .load_table(&self.java_catalog_props)
304 .await
305 .map_err(Into::into)
306 }
307
308 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
309 self.common
310 .create_catalog(&self.java_catalog_props)
311 .await
312 .map_err(Into::into)
313 }
314
315 pub fn full_table_name(&self) -> Result<TableIdent> {
316 self.common.full_table_name().map_err(Into::into)
317 }
318
319 pub fn catalog_name(&self) -> String {
320 self.common.catalog_name()
321 }
322
323 pub fn compaction_interval_sec(&self) -> u64 {
324 self.compaction_interval_sec.unwrap_or(3600)
326 }
327
328 pub fn snapshot_expiration_timestamp_ms(&self, current_time_ms: i64) -> Option<i64> {
331 self.snapshot_expiration_max_age_millis
332 .map(|max_age_millis| current_time_ms - max_age_millis)
333 }
334}
335
336pub struct IcebergSink {
337 config: IcebergConfig,
338 param: SinkParam,
339 unique_column_ids: Option<Vec<usize>>,
341}
342
343impl EnforceSecret for IcebergSink {
344 fn enforce_secret<'a>(
345 prop_iter: impl Iterator<Item = &'a str>,
346 ) -> crate::error::ConnectorResult<()> {
347 for prop in prop_iter {
348 IcebergConfig::enforce_one(prop)?;
349 }
350 Ok(())
351 }
352}
353
354impl TryFrom<SinkParam> for IcebergSink {
355 type Error = SinkError;
356
357 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
358 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
359 IcebergSink::new(config, param)
360 }
361}
362
363impl Debug for IcebergSink {
364 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365 f.debug_struct("IcebergSink")
366 .field("config", &self.config)
367 .finish()
368 }
369}
370
371impl IcebergSink {
372 async fn create_and_validate_table(&self) -> Result<Table> {
373 if self.config.create_table_if_not_exists {
374 self.create_table_if_not_exists().await?;
375 }
376
377 let table = self
378 .config
379 .load_table()
380 .await
381 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
382
383 let sink_schema = self.param.schema();
384 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
385 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
386
387 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
388 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
389
390 Ok(table)
391 }
392
393 async fn create_table_if_not_exists(&self) -> Result<()> {
394 let catalog = self.config.create_catalog().await?;
395 let namespace = if let Some(database_name) = &self.config.common.database_name {
396 let namespace = NamespaceIdent::new(database_name.clone());
397 if !catalog
398 .namespace_exists(&namespace)
399 .await
400 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
401 {
402 catalog
403 .create_namespace(&namespace, HashMap::default())
404 .await
405 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
406 .context("failed to create iceberg namespace")?;
407 }
408 namespace
409 } else {
410 bail!("database name must be set if you want to create table")
411 };
412
413 let table_id = self
414 .config
415 .full_table_name()
416 .context("Unable to parse table name")?;
417 if !catalog
418 .table_exists(&table_id)
419 .await
420 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
421 {
422 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
423 let arrow_fields = self
425 .param
426 .columns
427 .iter()
428 .map(|column| {
429 Ok(iceberg_create_table_arrow_convert
430 .to_arrow_field(&column.name, &column.data_type)
431 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
432 .context(format!(
433 "failed to convert {}: {} to arrow type",
434 &column.name, &column.data_type
435 ))?)
436 })
437 .collect::<Result<Vec<ArrowField>>>()?;
438 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
439 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
440 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
441 .context("failed to convert arrow schema to iceberg schema")?;
442
443 let location = {
444 let mut names = namespace.clone().inner();
445 names.push(self.config.common.table_name.clone());
446 match &self.config.common.warehouse_path {
447 Some(warehouse_path) => {
448 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
449 let url = Url::parse(warehouse_path);
450 if url.is_err() || is_s3_tables {
451 if self.config.common.catalog_type() == "rest"
454 || self.config.common.catalog_type() == "rest_rust"
455 {
456 None
457 } else {
458 bail!(format!("Invalid warehouse path: {}", warehouse_path))
459 }
460 } else if warehouse_path.ends_with('/') {
461 Some(format!("{}{}", warehouse_path, names.join("/")))
462 } else {
463 Some(format!("{}/{}", warehouse_path, names.join("/")))
464 }
465 }
466 None => None,
467 }
468 };
469
470 let partition_spec = match &self.config.partition_by {
471 Some(partition_by) => {
472 let mut partition_fields = Vec::<UnboundPartitionField>::new();
473 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
474 .into_iter()
475 .enumerate()
476 {
477 match iceberg_schema.field_id_by_name(&column) {
478 Some(id) => partition_fields.push(
479 UnboundPartitionField::builder()
480 .source_id(id)
481 .transform(transform)
482 .name(format!("_p_{}", column))
483 .field_id(i as i32)
484 .build(),
485 ),
486 None => bail!(format!(
487 "Partition source column does not exist in schema: {}",
488 column
489 )),
490 };
491 }
492 Some(
493 UnboundPartitionSpec::builder()
494 .with_spec_id(0)
495 .add_partition_fields(partition_fields)
496 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
497 .context("failed to add partition columns")?
498 .build(),
499 )
500 }
501 None => None,
502 };
503
504 let table_creation_builder = TableCreation::builder()
505 .name(self.config.common.table_name.clone())
506 .schema(iceberg_schema);
507
508 let table_creation = match (location, partition_spec) {
509 (Some(location), Some(partition_spec)) => table_creation_builder
510 .location(location)
511 .partition_spec(partition_spec)
512 .build(),
513 (Some(location), None) => table_creation_builder.location(location).build(),
514 (None, Some(partition_spec)) => table_creation_builder
515 .partition_spec(partition_spec)
516 .build(),
517 (None, None) => table_creation_builder.build(),
518 };
519
520 catalog
521 .create_table(&namespace, table_creation)
522 .await
523 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
524 .context("failed to create iceberg table")?;
525 }
526 Ok(())
527 }
528
529 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
530 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
531 if let Some(pk) = &config.primary_key {
532 let mut unique_column_ids = Vec::with_capacity(pk.len());
533 for col_name in pk {
534 let id = param
535 .columns
536 .iter()
537 .find(|col| col.name.as_str() == col_name)
538 .ok_or_else(|| {
539 SinkError::Config(anyhow!(
540 "Primary key column {} not found in sink schema",
541 col_name
542 ))
543 })?
544 .column_id
545 .get_id() as usize;
546 unique_column_ids.push(id);
547 }
548 Some(unique_column_ids)
549 } else {
550 unreachable!()
551 }
552 } else {
553 None
554 };
555 Ok(Self {
556 config,
557 param,
558 unique_column_ids,
559 })
560 }
561}
562
563impl Sink for IcebergSink {
564 type Coordinator = IcebergSinkCommitter;
565 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
566
567 const SINK_NAME: &'static str = ICEBERG_SINK;
568
569 async fn validate(&self) -> Result<()> {
570 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
571 bail!("Snowflake catalog only supports iceberg sources");
572 }
573 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
574 risingwave_common::license::Feature::IcebergSinkWithGlue
575 .check_available()
576 .map_err(|e| anyhow::anyhow!(e))?;
577 }
578
579 let _ = self.create_and_validate_table().await?;
580 Ok(())
581 }
582
583 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
584 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
585
586 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
587 if iceberg_config.enable_compaction && compaction_interval == 0 {
588 bail!(
589 "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
590 );
591 } else {
592 tracing::info!(
594 "Alter config compaction_interval set to {} seconds",
595 compaction_interval
596 );
597 }
598 }
599
600 Ok(())
601 }
602
603 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
604 let table = self.create_and_validate_table().await?;
605 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
606 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
607 } else {
608 IcebergSinkWriter::new_append_only(table, &writer_param).await?
609 };
610
611 let commit_checkpoint_interval =
612 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
613 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
614 );
615 let writer = CoordinatedLogSinker::new(
616 &writer_param,
617 self.param.clone(),
618 inner,
619 commit_checkpoint_interval,
620 )
621 .await?;
622
623 Ok(writer)
624 }
625
626 fn is_coordinated_sink(&self) -> bool {
627 true
628 }
629
630 async fn new_coordinator(
631 &self,
632 db: DatabaseConnection,
633 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
634 ) -> Result<Self::Coordinator> {
635 let catalog = self.config.create_catalog().await?;
636 let table = self.create_and_validate_table().await?;
637 Ok(IcebergSinkCommitter {
638 catalog,
639 table,
640 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
641 last_commit_epoch: 0,
642 sink_id: self.param.sink_id.sink_id(),
643 config: self.config.clone(),
644 param: self.param.clone(),
645 db,
646 commit_retry_num: self.config.commit_retry_num,
647 committed_epoch_subscriber: None,
648 iceberg_compact_stat_sender,
649 })
650 }
651}
652
653enum ProjectIdxVec {
660 None,
661 Prepare(usize),
662 Done(Vec<usize>),
663}
664
665pub struct IcebergSinkWriter {
666 writer: IcebergWriterDispatch,
667 arrow_schema: SchemaRef,
668 metrics: IcebergWriterMetrics,
670 table: Table,
672 project_idx_vec: ProjectIdxVec,
675}
676
677#[allow(clippy::type_complexity)]
678enum IcebergWriterDispatch {
679 PartitionAppendOnly {
680 writer: Option<Box<dyn IcebergWriter>>,
681 writer_builder: MonitoredGeneralWriterBuilder<
682 FanoutPartitionWriterBuilder<
683 DataFileWriterBuilder<
684 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
685 >,
686 >,
687 >,
688 },
689 NonpartitionAppendOnly {
690 writer: Option<Box<dyn IcebergWriter>>,
691 writer_builder: MonitoredGeneralWriterBuilder<
692 DataFileWriterBuilder<
693 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
694 >,
695 >,
696 },
697 PartitionUpsert {
698 writer: Option<Box<dyn IcebergWriter>>,
699 writer_builder: MonitoredGeneralWriterBuilder<
700 FanoutPartitionWriterBuilder<
701 EqualityDeltaWriterBuilder<
702 DataFileWriterBuilder<
703 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
704 >,
705 MonitoredPositionDeleteWriterBuilder<
706 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
707 >,
708 EqualityDeleteFileWriterBuilder<
709 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
710 >,
711 >,
712 >,
713 >,
714 arrow_schema_with_op_column: SchemaRef,
715 },
716 NonpartitionUpsert {
717 writer: Option<Box<dyn IcebergWriter>>,
718 writer_builder: MonitoredGeneralWriterBuilder<
719 EqualityDeltaWriterBuilder<
720 DataFileWriterBuilder<
721 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
722 >,
723 MonitoredPositionDeleteWriterBuilder<
724 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
725 >,
726 EqualityDeleteFileWriterBuilder<
727 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
728 >,
729 >,
730 >,
731 arrow_schema_with_op_column: SchemaRef,
732 },
733}
734
735impl IcebergWriterDispatch {
736 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
737 match self {
738 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
739 | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
740 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
741 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
742 }
743 }
744}
745
746pub struct IcebergWriterMetrics {
747 _write_qps: LabelGuardedIntCounter,
752 _write_latency: LabelGuardedHistogram,
753 write_bytes: LabelGuardedIntCounter,
754}
755
756impl IcebergSinkWriter {
757 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
758 let SinkWriterParam {
759 extra_partition_col_idx,
760 actor_id,
761 sink_id,
762 sink_name,
763 ..
764 } = writer_param;
765 let metrics_labels = [
766 &actor_id.to_string(),
767 &sink_id.to_string(),
768 sink_name.as_str(),
769 ];
770
771 let write_qps = GLOBAL_SINK_METRICS
773 .iceberg_write_qps
774 .with_guarded_label_values(&metrics_labels);
775 let write_latency = GLOBAL_SINK_METRICS
776 .iceberg_write_latency
777 .with_guarded_label_values(&metrics_labels);
778 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
781 .iceberg_rolling_unflushed_data_file
782 .with_guarded_label_values(&metrics_labels);
783 let write_bytes = GLOBAL_SINK_METRICS
784 .iceberg_write_bytes
785 .with_guarded_label_values(&metrics_labels);
786
787 let schema = table.metadata().current_schema();
788 let partition_spec = table.metadata().default_partition_spec();
789
790 let unique_uuid_suffix = Uuid::now_v7();
792
793 let parquet_writer_properties = WriterProperties::builder()
794 .set_max_row_group_size(
795 writer_param
796 .streaming_config
797 .developer
798 .iceberg_sink_write_parquet_max_row_group_rows,
799 )
800 .build();
801
802 let parquet_writer_builder = ParquetWriterBuilder::new(
803 parquet_writer_properties,
804 schema.clone(),
805 table.file_io().clone(),
806 DefaultLocationGenerator::new(table.metadata().clone())
807 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
808 DefaultFileNameGenerator::new(
809 writer_param.actor_id.to_string(),
810 Some(unique_uuid_suffix.to_string()),
811 iceberg::spec::DataFileFormat::Parquet,
812 ),
813 );
814 let data_file_builder =
815 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
816 if partition_spec.fields().is_empty() {
817 let writer_builder = MonitoredGeneralWriterBuilder::new(
818 data_file_builder,
819 write_qps.clone(),
820 write_latency.clone(),
821 );
822 let inner_writer = Some(Box::new(
823 writer_builder
824 .clone()
825 .build()
826 .await
827 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
828 ) as Box<dyn IcebergWriter>);
829 Ok(Self {
830 arrow_schema: Arc::new(
831 schema_to_arrow_schema(table.metadata().current_schema())
832 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
833 ),
834 metrics: IcebergWriterMetrics {
835 _write_qps: write_qps,
836 _write_latency: write_latency,
837 write_bytes,
838 },
839 writer: IcebergWriterDispatch::NonpartitionAppendOnly {
840 writer: inner_writer,
841 writer_builder,
842 },
843 table,
844 project_idx_vec: {
845 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
846 ProjectIdxVec::Prepare(*extra_partition_col_idx)
847 } else {
848 ProjectIdxVec::None
849 }
850 },
851 })
852 } else {
853 let partition_builder = MonitoredGeneralWriterBuilder::new(
854 FanoutPartitionWriterBuilder::new(
855 data_file_builder,
856 partition_spec.clone(),
857 schema.clone(),
858 )
859 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
860 write_qps.clone(),
861 write_latency.clone(),
862 );
863 let inner_writer = Some(Box::new(
864 partition_builder
865 .clone()
866 .build()
867 .await
868 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
869 ) as Box<dyn IcebergWriter>);
870 Ok(Self {
871 arrow_schema: Arc::new(
872 schema_to_arrow_schema(table.metadata().current_schema())
873 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
874 ),
875 metrics: IcebergWriterMetrics {
876 _write_qps: write_qps,
877 _write_latency: write_latency,
878 write_bytes,
879 },
880 writer: IcebergWriterDispatch::PartitionAppendOnly {
881 writer: inner_writer,
882 writer_builder: partition_builder,
883 },
884 table,
885 project_idx_vec: {
886 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
887 ProjectIdxVec::Prepare(*extra_partition_col_idx)
888 } else {
889 ProjectIdxVec::None
890 }
891 },
892 })
893 }
894 }
895
896 pub async fn new_upsert(
897 table: Table,
898 unique_column_ids: Vec<usize>,
899 writer_param: &SinkWriterParam,
900 ) -> Result<Self> {
901 let SinkWriterParam {
902 extra_partition_col_idx,
903 actor_id,
904 sink_id,
905 sink_name,
906 ..
907 } = writer_param;
908 let metrics_labels = [
909 &actor_id.to_string(),
910 &sink_id.to_string(),
911 sink_name.as_str(),
912 ];
913 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
914
915 let write_qps = GLOBAL_SINK_METRICS
917 .iceberg_write_qps
918 .with_guarded_label_values(&metrics_labels);
919 let write_latency = GLOBAL_SINK_METRICS
920 .iceberg_write_latency
921 .with_guarded_label_values(&metrics_labels);
922 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
925 .iceberg_rolling_unflushed_data_file
926 .with_guarded_label_values(&metrics_labels);
927 let position_delete_cache_num = GLOBAL_SINK_METRICS
928 .iceberg_position_delete_cache_num
929 .with_guarded_label_values(&metrics_labels);
930 let write_bytes = GLOBAL_SINK_METRICS
931 .iceberg_write_bytes
932 .with_guarded_label_values(&metrics_labels);
933
934 let schema = table.metadata().current_schema();
936 let partition_spec = table.metadata().default_partition_spec();
937
938 let unique_uuid_suffix = Uuid::now_v7();
940
941 let parquet_writer_properties = WriterProperties::builder()
942 .set_max_row_group_size(
943 writer_param
944 .streaming_config
945 .developer
946 .iceberg_sink_write_parquet_max_row_group_rows,
947 )
948 .build();
949
950 let data_file_builder = {
951 let parquet_writer_builder = ParquetWriterBuilder::new(
952 parquet_writer_properties.clone(),
953 schema.clone(),
954 table.file_io().clone(),
955 DefaultLocationGenerator::new(table.metadata().clone())
956 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
957 DefaultFileNameGenerator::new(
958 writer_param.actor_id.to_string(),
959 Some(unique_uuid_suffix.to_string()),
960 iceberg::spec::DataFileFormat::Parquet,
961 ),
962 );
963 DataFileWriterBuilder::new(
964 parquet_writer_builder.clone(),
965 None,
966 partition_spec.spec_id(),
967 )
968 };
969 let position_delete_builder = {
970 let parquet_writer_builder = ParquetWriterBuilder::new(
971 parquet_writer_properties.clone(),
972 POSITION_DELETE_SCHEMA.clone(),
973 table.file_io().clone(),
974 DefaultLocationGenerator::new(table.metadata().clone())
975 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
976 DefaultFileNameGenerator::new(
977 writer_param.actor_id.to_string(),
978 Some(format!("pos-del-{}", unique_uuid_suffix)),
979 iceberg::spec::DataFileFormat::Parquet,
980 ),
981 );
982 MonitoredPositionDeleteWriterBuilder::new(
983 SortPositionDeleteWriterBuilder::new(
984 parquet_writer_builder.clone(),
985 writer_param
986 .streaming_config
987 .developer
988 .iceberg_sink_positional_delete_cache_size,
989 None,
990 None,
991 ),
992 position_delete_cache_num,
993 )
994 };
995 let equality_delete_builder = {
996 let config = EqualityDeleteWriterConfig::new(
997 unique_column_ids.clone(),
998 table.metadata().current_schema().clone(),
999 None,
1000 partition_spec.spec_id(),
1001 )
1002 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1003 let parquet_writer_builder = ParquetWriterBuilder::new(
1004 parquet_writer_properties.clone(),
1005 Arc::new(
1006 arrow_schema_to_schema(config.projected_arrow_schema_ref())
1007 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1008 ),
1009 table.file_io().clone(),
1010 DefaultLocationGenerator::new(table.metadata().clone())
1011 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1012 DefaultFileNameGenerator::new(
1013 writer_param.actor_id.to_string(),
1014 Some(format!("eq-del-{}", unique_uuid_suffix)),
1015 iceberg::spec::DataFileFormat::Parquet,
1016 ),
1017 );
1018
1019 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
1020 };
1021 let delta_builder = EqualityDeltaWriterBuilder::new(
1022 data_file_builder,
1023 position_delete_builder,
1024 equality_delete_builder,
1025 unique_column_ids,
1026 );
1027 if partition_spec.fields().is_empty() {
1028 let writer_builder = MonitoredGeneralWriterBuilder::new(
1029 delta_builder,
1030 write_qps.clone(),
1031 write_latency.clone(),
1032 );
1033 let inner_writer = Some(Box::new(
1034 writer_builder
1035 .clone()
1036 .build()
1037 .await
1038 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1039 ) as Box<dyn IcebergWriter>);
1040 let original_arrow_schema = Arc::new(
1041 schema_to_arrow_schema(table.metadata().current_schema())
1042 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1043 );
1044 let schema_with_extra_op_column = {
1045 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1046 new_fields.push(Arc::new(ArrowField::new(
1047 "op".to_owned(),
1048 ArrowDataType::Int32,
1049 false,
1050 )));
1051 Arc::new(ArrowSchema::new(new_fields))
1052 };
1053 Ok(Self {
1054 arrow_schema: original_arrow_schema,
1055 metrics: IcebergWriterMetrics {
1056 _write_qps: write_qps,
1057 _write_latency: write_latency,
1058 write_bytes,
1059 },
1060 table,
1061 writer: IcebergWriterDispatch::NonpartitionUpsert {
1062 writer: inner_writer,
1063 writer_builder,
1064 arrow_schema_with_op_column: schema_with_extra_op_column,
1065 },
1066 project_idx_vec: {
1067 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1068 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1069 } else {
1070 ProjectIdxVec::None
1071 }
1072 },
1073 })
1074 } else {
1075 let original_arrow_schema = Arc::new(
1076 schema_to_arrow_schema(table.metadata().current_schema())
1077 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1078 );
1079 let schema_with_extra_op_column = {
1080 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1081 new_fields.push(Arc::new(ArrowField::new(
1082 "op".to_owned(),
1083 ArrowDataType::Int32,
1084 false,
1085 )));
1086 Arc::new(ArrowSchema::new(new_fields))
1087 };
1088 let partition_builder = MonitoredGeneralWriterBuilder::new(
1089 FanoutPartitionWriterBuilder::new_with_custom_schema(
1090 delta_builder,
1091 schema_with_extra_op_column.clone(),
1092 partition_spec.clone(),
1093 table.metadata().current_schema().clone(),
1094 ),
1095 write_qps.clone(),
1096 write_latency.clone(),
1097 );
1098 let inner_writer = Some(Box::new(
1099 partition_builder
1100 .clone()
1101 .build()
1102 .await
1103 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1104 ) as Box<dyn IcebergWriter>);
1105 Ok(Self {
1106 arrow_schema: original_arrow_schema,
1107 metrics: IcebergWriterMetrics {
1108 _write_qps: write_qps,
1109 _write_latency: write_latency,
1110 write_bytes,
1111 },
1112 table,
1113 writer: IcebergWriterDispatch::PartitionUpsert {
1114 writer: inner_writer,
1115 writer_builder: partition_builder,
1116 arrow_schema_with_op_column: schema_with_extra_op_column,
1117 },
1118 project_idx_vec: {
1119 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1120 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1121 } else {
1122 ProjectIdxVec::None
1123 }
1124 },
1125 })
1126 }
1127 }
1128}
1129
1130#[async_trait]
1131impl SinkWriter for IcebergSinkWriter {
1132 type CommitMetadata = Option<SinkMetadata>;
1133
1134 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1136 Ok(())
1138 }
1139
1140 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1142 match &mut self.writer {
1144 IcebergWriterDispatch::PartitionAppendOnly {
1145 writer,
1146 writer_builder,
1147 } => {
1148 if writer.is_none() {
1149 *writer = Some(Box::new(
1150 writer_builder
1151 .clone()
1152 .build()
1153 .await
1154 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1155 ));
1156 }
1157 }
1158 IcebergWriterDispatch::NonpartitionAppendOnly {
1159 writer,
1160 writer_builder,
1161 } => {
1162 if writer.is_none() {
1163 *writer = Some(Box::new(
1164 writer_builder
1165 .clone()
1166 .build()
1167 .await
1168 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1169 ));
1170 }
1171 }
1172 IcebergWriterDispatch::PartitionUpsert {
1173 writer,
1174 writer_builder,
1175 ..
1176 } => {
1177 if writer.is_none() {
1178 *writer = Some(Box::new(
1179 writer_builder
1180 .clone()
1181 .build()
1182 .await
1183 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1184 ));
1185 }
1186 }
1187 IcebergWriterDispatch::NonpartitionUpsert {
1188 writer,
1189 writer_builder,
1190 ..
1191 } => {
1192 if writer.is_none() {
1193 *writer = Some(Box::new(
1194 writer_builder
1195 .clone()
1196 .build()
1197 .await
1198 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1199 ));
1200 }
1201 }
1202 };
1203
1204 let (mut chunk, ops) = chunk.compact().into_parts();
1206 match &self.project_idx_vec {
1207 ProjectIdxVec::None => {}
1208 ProjectIdxVec::Prepare(idx) => {
1209 if *idx >= chunk.columns().len() {
1210 return Err(SinkError::Iceberg(anyhow!(
1211 "invalid extra partition column index {}",
1212 idx
1213 )));
1214 }
1215 let project_idx_vec = (0..*idx)
1216 .chain(*idx + 1..chunk.columns().len())
1217 .collect_vec();
1218 chunk = chunk.project(&project_idx_vec);
1219 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1220 }
1221 ProjectIdxVec::Done(idx_vec) => {
1222 chunk = chunk.project(idx_vec);
1223 }
1224 }
1225 if ops.is_empty() {
1226 return Ok(());
1227 }
1228 let write_batch_size = chunk.estimated_heap_size();
1229 let batch = match &self.writer {
1230 IcebergWriterDispatch::PartitionAppendOnly { .. }
1231 | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1232 let filters =
1234 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1235 chunk.set_visibility(filters);
1236 IcebergArrowConvert
1237 .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1238 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1239 }
1240 IcebergWriterDispatch::PartitionUpsert {
1241 arrow_schema_with_op_column,
1242 ..
1243 }
1244 | IcebergWriterDispatch::NonpartitionUpsert {
1245 arrow_schema_with_op_column,
1246 ..
1247 } => {
1248 let chunk = IcebergArrowConvert
1249 .to_record_batch(self.arrow_schema.clone(), &chunk)
1250 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1251 let ops = Arc::new(Int32Array::from(
1252 ops.iter()
1253 .map(|op| match op {
1254 Op::UpdateInsert | Op::Insert => INSERT_OP,
1255 Op::UpdateDelete | Op::Delete => DELETE_OP,
1256 })
1257 .collect_vec(),
1258 ));
1259 let mut columns = chunk.columns().to_vec();
1260 columns.push(ops);
1261 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1262 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1263 }
1264 };
1265
1266 let writer = self.writer.get_writer().unwrap();
1267 writer
1268 .write(batch)
1269 .instrument_await("iceberg_write")
1270 .await
1271 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1272 self.metrics.write_bytes.inc_by(write_batch_size as _);
1273 Ok(())
1274 }
1275
1276 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1279 if !is_checkpoint {
1281 return Ok(None);
1282 }
1283
1284 let close_result = match &mut self.writer {
1285 IcebergWriterDispatch::PartitionAppendOnly {
1286 writer,
1287 writer_builder,
1288 } => {
1289 let close_result = match writer.take() {
1290 Some(mut writer) => {
1291 Some(writer.close().instrument_await("iceberg_close").await)
1292 }
1293 _ => None,
1294 };
1295 match writer_builder.clone().build().await {
1296 Ok(new_writer) => {
1297 *writer = Some(Box::new(new_writer));
1298 }
1299 _ => {
1300 warn!("Failed to build new writer after close");
1303 }
1304 }
1305 close_result
1306 }
1307 IcebergWriterDispatch::NonpartitionAppendOnly {
1308 writer,
1309 writer_builder,
1310 } => {
1311 let close_result = match writer.take() {
1312 Some(mut writer) => {
1313 Some(writer.close().instrument_await("iceberg_close").await)
1314 }
1315 _ => None,
1316 };
1317 match writer_builder.clone().build().await {
1318 Ok(new_writer) => {
1319 *writer = Some(Box::new(new_writer));
1320 }
1321 _ => {
1322 warn!("Failed to build new writer after close");
1325 }
1326 }
1327 close_result
1328 }
1329 IcebergWriterDispatch::PartitionUpsert {
1330 writer,
1331 writer_builder,
1332 ..
1333 } => {
1334 let close_result = match writer.take() {
1335 Some(mut writer) => {
1336 Some(writer.close().instrument_await("iceberg_close").await)
1337 }
1338 _ => None,
1339 };
1340 match writer_builder.clone().build().await {
1341 Ok(new_writer) => {
1342 *writer = Some(Box::new(new_writer));
1343 }
1344 _ => {
1345 warn!("Failed to build new writer after close");
1348 }
1349 }
1350 close_result
1351 }
1352 IcebergWriterDispatch::NonpartitionUpsert {
1353 writer,
1354 writer_builder,
1355 ..
1356 } => {
1357 let close_result = match writer.take() {
1358 Some(mut writer) => {
1359 Some(writer.close().instrument_await("iceberg_close").await)
1360 }
1361 _ => None,
1362 };
1363 match writer_builder.clone().build().await {
1364 Ok(new_writer) => {
1365 *writer = Some(Box::new(new_writer));
1366 }
1367 _ => {
1368 warn!("Failed to build new writer after close");
1371 }
1372 }
1373 close_result
1374 }
1375 };
1376
1377 match close_result {
1378 Some(Ok(result)) => {
1379 let version = self.table.metadata().format_version() as u8;
1380 let partition_type = self.table.metadata().default_partition_type();
1381 let data_files = result
1382 .into_iter()
1383 .map(|f| {
1384 SerializedDataFile::try_from(f, partition_type, version == 1)
1385 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1386 })
1387 .collect::<Result<Vec<_>>>()?;
1388 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1389 data_files,
1390 schema_id: self.table.metadata().current_schema_id(),
1391 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1392 })?))
1393 }
1394 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1395 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1396 }
1397 }
1398
1399 async fn abort(&mut self) -> Result<()> {
1401 Ok(())
1403 }
1404}
1405
1406const SCHEMA_ID: &str = "schema_id";
1407const PARTITION_SPEC_ID: &str = "partition_spec_id";
1408const DATA_FILES: &str = "data_files";
1409
1410#[derive(Default, Clone)]
1411struct IcebergCommitResult {
1412 schema_id: i32,
1413 partition_spec_id: i32,
1414 data_files: Vec<SerializedDataFile>,
1415}
1416
1417impl IcebergCommitResult {
1418 fn try_from(value: &SinkMetadata) -> Result<Self> {
1419 if let Some(Serialized(v)) = &value.metadata {
1420 let mut values = if let serde_json::Value::Object(v) =
1421 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1422 .context("Can't parse iceberg sink metadata")?
1423 {
1424 v
1425 } else {
1426 bail!("iceberg sink metadata should be an object");
1427 };
1428
1429 let schema_id;
1430 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1431 schema_id = value
1432 .as_u64()
1433 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1434 } else {
1435 bail!("iceberg sink metadata should have schema_id");
1436 }
1437
1438 let partition_spec_id;
1439 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1440 partition_spec_id = value
1441 .as_u64()
1442 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1443 } else {
1444 bail!("iceberg sink metadata should have partition_spec_id");
1445 }
1446
1447 let data_files: Vec<SerializedDataFile>;
1448 if let serde_json::Value::Array(values) = values
1449 .remove(DATA_FILES)
1450 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1451 {
1452 data_files = values
1453 .into_iter()
1454 .map(from_value::<SerializedDataFile>)
1455 .collect::<std::result::Result<_, _>>()
1456 .unwrap();
1457 } else {
1458 bail!("iceberg sink metadata should have data_files object");
1459 }
1460
1461 Ok(Self {
1462 schema_id: schema_id as i32,
1463 partition_spec_id: partition_spec_id as i32,
1464 data_files,
1465 })
1466 } else {
1467 bail!("Can't create iceberg sink write result from empty data!")
1468 }
1469 }
1470
1471 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1472 let mut values = if let serde_json::Value::Object(value) =
1473 serde_json::from_slice::<serde_json::Value>(&value)
1474 .context("Can't parse iceberg sink metadata")?
1475 {
1476 value
1477 } else {
1478 bail!("iceberg sink metadata should be an object");
1479 };
1480
1481 let schema_id;
1482 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1483 schema_id = value
1484 .as_u64()
1485 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1486 } else {
1487 bail!("iceberg sink metadata should have schema_id");
1488 }
1489
1490 let partition_spec_id;
1491 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1492 partition_spec_id = value
1493 .as_u64()
1494 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1495 } else {
1496 bail!("iceberg sink metadata should have partition_spec_id");
1497 }
1498
1499 let data_files: Vec<SerializedDataFile>;
1500 if let serde_json::Value::Array(values) = values
1501 .remove(DATA_FILES)
1502 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1503 {
1504 data_files = values
1505 .into_iter()
1506 .map(from_value::<SerializedDataFile>)
1507 .collect::<std::result::Result<_, _>>()
1508 .unwrap();
1509 } else {
1510 bail!("iceberg sink metadata should have data_files object");
1511 }
1512
1513 Ok(Self {
1514 schema_id: schema_id as i32,
1515 partition_spec_id: partition_spec_id as i32,
1516 data_files,
1517 })
1518 }
1519}
1520
1521impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1522 type Error = SinkError;
1523
1524 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1525 let json_data_files = serde_json::Value::Array(
1526 value
1527 .data_files
1528 .iter()
1529 .map(serde_json::to_value)
1530 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1531 .context("Can't serialize data files to json")?,
1532 );
1533 let json_value = serde_json::Value::Object(
1534 vec![
1535 (
1536 SCHEMA_ID.to_owned(),
1537 serde_json::Value::Number(value.schema_id.into()),
1538 ),
1539 (
1540 PARTITION_SPEC_ID.to_owned(),
1541 serde_json::Value::Number(value.partition_spec_id.into()),
1542 ),
1543 (DATA_FILES.to_owned(), json_data_files),
1544 ]
1545 .into_iter()
1546 .collect(),
1547 );
1548 Ok(SinkMetadata {
1549 metadata: Some(Serialized(SerializedMetadata {
1550 metadata: serde_json::to_vec(&json_value)
1551 .context("Can't serialize iceberg sink metadata")?,
1552 })),
1553 })
1554 }
1555}
1556
1557impl TryFrom<IcebergCommitResult> for Vec<u8> {
1558 type Error = SinkError;
1559
1560 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1561 let json_data_files = serde_json::Value::Array(
1562 value
1563 .data_files
1564 .iter()
1565 .map(serde_json::to_value)
1566 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1567 .context("Can't serialize data files to json")?,
1568 );
1569 let json_value = serde_json::Value::Object(
1570 vec![
1571 (
1572 SCHEMA_ID.to_owned(),
1573 serde_json::Value::Number(value.schema_id.into()),
1574 ),
1575 (
1576 PARTITION_SPEC_ID.to_owned(),
1577 serde_json::Value::Number(value.partition_spec_id.into()),
1578 ),
1579 (DATA_FILES.to_owned(), json_data_files),
1580 ]
1581 .into_iter()
1582 .collect(),
1583 );
1584 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1585 }
1586}
1587pub struct IcebergSinkCommitter {
1588 catalog: Arc<dyn Catalog>,
1589 table: Table,
1590 pub last_commit_epoch: u64,
1591 pub(crate) is_exactly_once: bool,
1592 pub(crate) sink_id: u32,
1593 pub(crate) config: IcebergConfig,
1594 pub(crate) param: SinkParam,
1595 pub(crate) db: DatabaseConnection,
1596 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1597 commit_retry_num: u32,
1598 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1599}
1600
1601impl IcebergSinkCommitter {
1602 async fn reload_table(
1605 catalog: &dyn Catalog,
1606 table_ident: &TableIdent,
1607 schema_id: i32,
1608 partition_spec_id: i32,
1609 ) -> Result<Table> {
1610 let table = catalog
1611 .load_table(table_ident)
1612 .await
1613 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1614 if table.metadata().current_schema_id() != schema_id {
1615 return Err(SinkError::Iceberg(anyhow!(
1616 "Schema evolution not supported, expect schema id {}, but got {}",
1617 schema_id,
1618 table.metadata().current_schema_id()
1619 )));
1620 }
1621 if table.metadata().default_partition_spec_id() != partition_spec_id {
1622 return Err(SinkError::Iceberg(anyhow!(
1623 "Partition evolution not supported, expect partition spec id {}, but got {}",
1624 partition_spec_id,
1625 table.metadata().default_partition_spec_id()
1626 )));
1627 }
1628 Ok(table)
1629 }
1630}
1631
1632#[async_trait::async_trait]
1633impl SinkCommitCoordinator for IcebergSinkCommitter {
1634 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1635 if self.is_exactly_once {
1636 self.committed_epoch_subscriber = Some(subscriber);
1637 tracing::info!(
1638 "Sink id = {}: iceberg sink coordinator initing.",
1639 self.param.sink_id.sink_id()
1640 );
1641 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1642 let ordered_metadata_list_by_end_epoch =
1643 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1644
1645 let mut last_recommit_epoch = 0;
1646 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1647 ordered_metadata_list_by_end_epoch
1648 {
1649 let write_results_bytes = deserialize_metadata(sealized_bytes);
1650 let mut write_results = vec![];
1651
1652 for each in write_results_bytes {
1653 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1654 write_results.push(write_result);
1655 }
1656
1657 match (
1658 committed,
1659 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1660 .await?,
1661 ) {
1662 (true, _) => {
1663 tracing::info!(
1664 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1665 self.param.sink_id.sink_id()
1666 );
1667 }
1668 (false, true) => {
1669 tracing::info!(
1671 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1672 self.param.sink_id.sink_id()
1673 );
1674 mark_row_is_committed_by_sink_id_and_end_epoch(
1675 &self.db,
1676 self.sink_id,
1677 end_epoch,
1678 )
1679 .await?;
1680 }
1681 (false, false) => {
1682 tracing::info!(
1683 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1684 self.param.sink_id.sink_id()
1685 );
1686 self.re_commit(end_epoch, write_results, snapshot_id)
1687 .await?;
1688 }
1689 }
1690
1691 last_recommit_epoch = end_epoch;
1692 }
1693 tracing::info!(
1694 "Sink id = {}: iceberg commit coordinator inited.",
1695 self.param.sink_id.sink_id()
1696 );
1697 return Ok(Some(last_recommit_epoch));
1698 } else {
1699 tracing::info!(
1700 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1701 self.param.sink_id.sink_id()
1702 );
1703 return Ok(None);
1704 }
1705 }
1706
1707 tracing::info!("Iceberg commit coordinator inited.");
1708 return Ok(None);
1709 }
1710
1711 async fn commit(
1712 &mut self,
1713 epoch: u64,
1714 metadata: Vec<SinkMetadata>,
1715 add_columns: Option<Vec<Field>>,
1716 ) -> Result<()> {
1717 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1718 if let Some(add_columns) = add_columns {
1719 return Err(anyhow!(
1720 "Iceberg sink not support add columns, but got: {:?}",
1721 add_columns
1722 )
1723 .into());
1724 }
1725
1726 let write_results: Vec<IcebergCommitResult> = metadata
1727 .iter()
1728 .map(IcebergCommitResult::try_from)
1729 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1730
1731 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1733 tracing::debug!(?epoch, "no data to commit");
1734 return Ok(());
1735 }
1736
1737 if write_results
1739 .iter()
1740 .any(|r| r.schema_id != write_results[0].schema_id)
1741 || write_results
1742 .iter()
1743 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1744 {
1745 return Err(SinkError::Iceberg(anyhow!(
1746 "schema_id and partition_spec_id should be the same in all write results"
1747 )));
1748 }
1749
1750 if self.is_exactly_once {
1751 assert!(self.committed_epoch_subscriber.is_some());
1752 match self.committed_epoch_subscriber.clone() {
1753 Some(committed_epoch_subscriber) => {
1754 let (committed_epoch, mut rw_futures_utilrx) =
1756 committed_epoch_subscriber(self.param.sink_id).await?;
1757 if committed_epoch >= epoch {
1759 self.commit_iceberg_inner(epoch, write_results, None)
1760 .await?;
1761 } else {
1762 tracing::info!(
1763 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1764 committed_epoch,
1765 epoch
1766 );
1767 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1768 tracing::info!(
1769 "Received next committed epoch: {}",
1770 next_committed_epoch
1771 );
1772 if next_committed_epoch >= epoch {
1774 self.commit_iceberg_inner(epoch, write_results, None)
1775 .await?;
1776 break;
1777 }
1778 }
1779 }
1780 }
1781 None => unreachable!(
1782 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1783 ),
1784 }
1785 } else {
1786 self.commit_iceberg_inner(epoch, write_results, None)
1787 .await?;
1788 }
1789
1790 Ok(())
1791 }
1792}
1793
1794impl IcebergSinkCommitter {
1796 async fn re_commit(
1797 &mut self,
1798 epoch: u64,
1799 write_results: Vec<IcebergCommitResult>,
1800 snapshot_id: i64,
1801 ) -> Result<()> {
1802 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1803
1804 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1806 tracing::debug!(?epoch, "no data to commit");
1807 return Ok(());
1808 }
1809 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1810 .await?;
1811 Ok(())
1812 }
1813
1814 async fn commit_iceberg_inner(
1815 &mut self,
1816 epoch: u64,
1817 write_results: Vec<IcebergCommitResult>,
1818 snapshot_id: Option<i64>,
1819 ) -> Result<()> {
1820 let is_first_commit = snapshot_id.is_none();
1824 self.last_commit_epoch = epoch;
1825 let expect_schema_id = write_results[0].schema_id;
1826 let expect_partition_spec_id = write_results[0].partition_spec_id;
1827
1828 self.table = Self::reload_table(
1830 self.catalog.as_ref(),
1831 self.table.identifier(),
1832 expect_schema_id,
1833 expect_partition_spec_id,
1834 )
1835 .await?;
1836 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1837 return Err(SinkError::Iceberg(anyhow!(
1838 "Can't find schema by id {}",
1839 expect_schema_id
1840 )));
1841 };
1842 let Some(partition_spec) = self
1843 .table
1844 .metadata()
1845 .partition_spec_by_id(expect_partition_spec_id)
1846 else {
1847 return Err(SinkError::Iceberg(anyhow!(
1848 "Can't find partition spec by id {}",
1849 expect_partition_spec_id
1850 )));
1851 };
1852 let partition_type = partition_spec
1853 .as_ref()
1854 .clone()
1855 .partition_type(schema)
1856 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1857
1858 let txn = Transaction::new(&self.table);
1859 let snapshot_id = match snapshot_id {
1861 Some(previous_snapshot_id) => previous_snapshot_id,
1862 None => txn.generate_unique_snapshot_id(),
1863 };
1864 if self.is_exactly_once && is_first_commit {
1865 let mut pre_commit_metadata_bytes = Vec::new();
1867 for each_parallelism_write_result in write_results.clone() {
1868 let each_parallelism_write_result_bytes: Vec<u8> =
1869 each_parallelism_write_result.try_into()?;
1870 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1871 }
1872
1873 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1874
1875 persist_pre_commit_metadata(
1876 self.sink_id,
1877 self.db.clone(),
1878 self.last_commit_epoch,
1879 epoch,
1880 pre_commit_metadata_bytes,
1881 snapshot_id,
1882 )
1883 .await?;
1884 }
1885
1886 let data_files = write_results
1887 .into_iter()
1888 .flat_map(|r| {
1889 r.data_files.into_iter().map(|f| {
1890 f.try_into(expect_partition_spec_id, &partition_type, schema)
1891 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1892 })
1893 })
1894 .collect::<Result<Vec<DataFile>>>()?;
1895 let retry_strategy = ExponentialBackoff::from_millis(10)
1900 .max_delay(Duration::from_secs(60))
1901 .map(jitter)
1902 .take(self.commit_retry_num as usize);
1903 let catalog = self.catalog.clone();
1904 let table_ident = self.table.identifier().clone();
1905 let table = Retry::spawn(retry_strategy, || async {
1906 let table = Self::reload_table(
1907 catalog.as_ref(),
1908 &table_ident,
1909 expect_schema_id,
1910 expect_partition_spec_id,
1911 )
1912 .await?;
1913 let txn = Transaction::new(&table);
1914 let mut append_action = txn
1915 .fast_append(Some(snapshot_id), None, vec![])
1916 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1917 .with_to_branch(commit_branch(
1918 self.config.r#type.as_str(),
1919 self.config.write_mode.as_str(),
1920 ));
1921 append_action
1922 .add_data_files(data_files.clone())
1923 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1924 let tx = append_action.apply().await.map_err(|err| {
1925 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1926 SinkError::Iceberg(anyhow!(err))
1927 })?;
1928 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1929 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1930 SinkError::Iceberg(anyhow!(err))
1931 })
1932 })
1933 .await?;
1934 self.table = table;
1935
1936 let snapshot_num = self.table.metadata().snapshots().count();
1937 let catalog_name = self.config.common.catalog_name();
1938 let table_name = self.table.identifier().to_string();
1939 let metrics_labels = [&self.param.sink_name, &catalog_name, &table_name];
1940 GLOBAL_SINK_METRICS
1941 .iceberg_snapshot_num
1942 .with_guarded_label_values(&metrics_labels)
1943 .set(snapshot_num as i64);
1944
1945 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1946
1947 if self.is_exactly_once {
1948 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1949 tracing::info!(
1950 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1951 self.sink_id,
1952 epoch
1953 );
1954
1955 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1956 }
1957 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1958 && self.config.enable_compaction
1959 && iceberg_compact_stat_sender
1960 .send(IcebergSinkCompactionUpdate {
1961 sink_id: SinkId::new(self.sink_id),
1962 compaction_interval: self.config.compaction_interval_sec(),
1963 })
1964 .is_err()
1965 {
1966 warn!("failed to send iceberg compaction stats");
1967 }
1968
1969 Ok(())
1970 }
1971
1972 async fn is_snapshot_id_in_iceberg(
1976 &self,
1977 iceberg_config: &IcebergConfig,
1978 snapshot_id: i64,
1979 ) -> Result<bool> {
1980 let iceberg_common = iceberg_config.common.clone();
1981 let table = iceberg_common
1982 .load_table(&iceberg_config.java_catalog_props)
1983 .await?;
1984 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1985 Ok(true)
1986 } else {
1987 Ok(false)
1988 }
1989 }
1990}
1991
1992const MAP_KEY: &str = "key";
1993const MAP_VALUE: &str = "value";
1994
1995fn get_fields<'a>(
1996 our_field_type: &'a risingwave_common::types::DataType,
1997 data_type: &ArrowDataType,
1998 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1999) -> Option<ArrowFields> {
2000 match data_type {
2001 ArrowDataType::Struct(fields) => {
2002 match our_field_type {
2003 risingwave_common::types::DataType::Struct(struct_fields) => {
2004 struct_fields.iter().for_each(|(name, data_type)| {
2005 let res = schema_fields.insert(name, data_type);
2006 assert!(res.is_none())
2008 });
2009 }
2010 risingwave_common::types::DataType::Map(map_fields) => {
2011 schema_fields.insert(MAP_KEY, map_fields.key());
2012 schema_fields.insert(MAP_VALUE, map_fields.value());
2013 }
2014 risingwave_common::types::DataType::List(list_field) => {
2015 list_field.as_struct().iter().for_each(|(name, data_type)| {
2016 let res = schema_fields.insert(name, data_type);
2017 assert!(res.is_none())
2019 });
2020 }
2021 _ => {}
2022 };
2023 Some(fields.clone())
2024 }
2025 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
2026 get_fields(our_field_type, field.data_type(), schema_fields)
2027 }
2028 _ => None, }
2030}
2031
2032fn check_compatibility(
2033 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
2034 fields: &ArrowFields,
2035) -> anyhow::Result<bool> {
2036 for arrow_field in fields {
2037 let our_field_type = schema_fields
2038 .get(arrow_field.name().as_str())
2039 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
2040
2041 let converted_arrow_data_type = IcebergArrowConvert
2043 .to_arrow_field("", our_field_type)
2044 .map_err(|e| anyhow!(e))?
2045 .data_type()
2046 .clone();
2047
2048 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
2049 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
2050 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
2051 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
2052 (ArrowDataType::List(_), ArrowDataType::List(field))
2053 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
2054 let mut schema_fields = HashMap::new();
2055 get_fields(our_field_type, field.data_type(), &mut schema_fields)
2056 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
2057 }
2058 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
2060 let mut schema_fields = HashMap::new();
2061 our_field_type
2062 .as_struct()
2063 .iter()
2064 .for_each(|(name, data_type)| {
2065 let res = schema_fields.insert(name, data_type);
2066 assert!(res.is_none())
2068 });
2069 check_compatibility(schema_fields, fields)?
2070 }
2071 (left, right) => left.equals_datatype(right),
2079 };
2080 if !compatible {
2081 bail!(
2082 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2083 arrow_field.name(),
2084 converted_arrow_data_type,
2085 arrow_field.data_type()
2086 );
2087 }
2088 }
2089 Ok(true)
2090}
2091
2092pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2094 if rw_schema.fields.len() != arrow_schema.fields().len() {
2095 bail!(
2096 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2097 rw_schema.fields.len(),
2098 arrow_schema.fields.len()
2099 );
2100 }
2101
2102 let mut schema_fields = HashMap::new();
2103 rw_schema.fields.iter().for_each(|field| {
2104 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2105 assert!(res.is_none())
2107 });
2108
2109 check_compatibility(schema_fields, &arrow_schema.fields)?;
2110 Ok(())
2111}
2112
2113pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2114 serde_json::to_vec(&metadata).unwrap()
2115}
2116
2117pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2118 serde_json::from_slice(&bytes).unwrap()
2119}
2120
2121pub fn parse_partition_by_exprs(
2122 expr: String,
2123) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2124 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2126 if !re.is_match(&expr) {
2127 bail!(format!(
2128 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2129 expr
2130 ))
2131 }
2132 let caps = re.captures_iter(&expr);
2133
2134 let mut partition_columns = vec![];
2135
2136 for mat in caps {
2137 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2138 (&mat["transform"], Transform::Identity)
2139 } else {
2140 let mut func = mat["transform"].to_owned();
2141 if func == "bucket" || func == "truncate" {
2142 let n = &mat
2143 .name("n")
2144 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2145 .as_str();
2146 func = format!("{func}[{n}]");
2147 }
2148 (
2149 &mat["field"],
2150 Transform::from_str(&func)
2151 .with_context(|| format!("invalid transform function {}", func))?,
2152 )
2153 };
2154 partition_columns.push((column.to_owned(), transform));
2155 }
2156 Ok(partition_columns)
2157}
2158
2159pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2160 if should_enable_iceberg_cow(sink_type, write_mode) {
2161 ICEBERG_COW_BRANCH.to_owned()
2162 } else {
2163 MAIN_BRANCH.to_owned()
2164 }
2165}
2166
2167pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2168 sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2169}
2170
2171#[cfg(test)]
2172mod test {
2173 use std::collections::BTreeMap;
2174
2175 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2176 use risingwave_common::types::{DataType, MapType, StructType};
2177
2178 use crate::connector_common::IcebergCommon;
2179 use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2180 use crate::sink::iceberg::{
2181 COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
2182 ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2183 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2184 SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
2185 };
2186
2187 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2190 fn test_compatible_arrow_schema() {
2191 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2192
2193 use super::*;
2194 let risingwave_schema = Schema::new(vec![
2195 Field::with_name(DataType::Int32, "a"),
2196 Field::with_name(DataType::Int32, "b"),
2197 Field::with_name(DataType::Int32, "c"),
2198 ]);
2199 let arrow_schema = ArrowSchema::new(vec![
2200 ArrowField::new("a", ArrowDataType::Int32, false),
2201 ArrowField::new("b", ArrowDataType::Int32, false),
2202 ArrowField::new("c", ArrowDataType::Int32, false),
2203 ]);
2204
2205 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2206
2207 let risingwave_schema = Schema::new(vec![
2208 Field::with_name(DataType::Int32, "d"),
2209 Field::with_name(DataType::Int32, "c"),
2210 Field::with_name(DataType::Int32, "a"),
2211 Field::with_name(DataType::Int32, "b"),
2212 ]);
2213 let arrow_schema = ArrowSchema::new(vec![
2214 ArrowField::new("a", ArrowDataType::Int32, false),
2215 ArrowField::new("b", ArrowDataType::Int32, false),
2216 ArrowField::new("d", ArrowDataType::Int32, false),
2217 ArrowField::new("c", ArrowDataType::Int32, false),
2218 ]);
2219 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2220
2221 let risingwave_schema = Schema::new(vec![
2222 Field::with_name(
2223 DataType::Struct(StructType::new(vec![
2224 ("a1", DataType::Int32),
2225 (
2226 "a2",
2227 DataType::Struct(StructType::new(vec![
2228 ("a21", DataType::Bytea),
2229 (
2230 "a22",
2231 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2232 ),
2233 ])),
2234 ),
2235 ])),
2236 "a",
2237 ),
2238 Field::with_name(
2239 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2240 ("b1", DataType::Int32),
2241 ("b2", DataType::Bytea),
2242 (
2243 "b3",
2244 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2245 ),
2246 ])))),
2247 "b",
2248 ),
2249 Field::with_name(
2250 DataType::Map(MapType::from_kv(
2251 DataType::Varchar,
2252 DataType::List(Box::new(DataType::Struct(StructType::new([
2253 ("c1", DataType::Int32),
2254 ("c2", DataType::Bytea),
2255 (
2256 "c3",
2257 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2258 ),
2259 ])))),
2260 )),
2261 "c",
2262 ),
2263 ]);
2264 let arrow_schema = ArrowSchema::new(vec![
2265 ArrowField::new(
2266 "a",
2267 ArrowDataType::Struct(ArrowFields::from(vec![
2268 ArrowField::new("a1", ArrowDataType::Int32, false),
2269 ArrowField::new(
2270 "a2",
2271 ArrowDataType::Struct(ArrowFields::from(vec![
2272 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2273 ArrowField::new_map(
2274 "a22",
2275 "entries",
2276 ArrowFieldRef::new(ArrowField::new(
2277 "key",
2278 ArrowDataType::Utf8,
2279 false,
2280 )),
2281 ArrowFieldRef::new(ArrowField::new(
2282 "value",
2283 ArrowDataType::Utf8,
2284 false,
2285 )),
2286 false,
2287 false,
2288 ),
2289 ])),
2290 false,
2291 ),
2292 ])),
2293 false,
2294 ),
2295 ArrowField::new(
2296 "b",
2297 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2298 ArrowDataType::Struct(ArrowFields::from(vec![
2299 ArrowField::new("b1", ArrowDataType::Int32, false),
2300 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2301 ArrowField::new_map(
2302 "b3",
2303 "entries",
2304 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2305 ArrowFieldRef::new(ArrowField::new(
2306 "value",
2307 ArrowDataType::Utf8,
2308 false,
2309 )),
2310 false,
2311 false,
2312 ),
2313 ])),
2314 false,
2315 ))),
2316 false,
2317 ),
2318 ArrowField::new_map(
2319 "c",
2320 "entries",
2321 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2322 ArrowFieldRef::new(ArrowField::new(
2323 "value",
2324 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2325 ArrowDataType::Struct(ArrowFields::from(vec![
2326 ArrowField::new("c1", ArrowDataType::Int32, false),
2327 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2328 ArrowField::new_map(
2329 "c3",
2330 "entries",
2331 ArrowFieldRef::new(ArrowField::new(
2332 "key",
2333 ArrowDataType::Utf8,
2334 false,
2335 )),
2336 ArrowFieldRef::new(ArrowField::new(
2337 "value",
2338 ArrowDataType::Utf8,
2339 false,
2340 )),
2341 false,
2342 false,
2343 ),
2344 ])),
2345 false,
2346 ))),
2347 false,
2348 )),
2349 false,
2350 false,
2351 ),
2352 ]);
2353 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2354 }
2355
2356 #[test]
2357 fn test_parse_iceberg_config() {
2358 let values = [
2359 ("connector", "iceberg"),
2360 ("type", "upsert"),
2361 ("primary_key", "v1"),
2362 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2363 ("warehouse.path", "s3://iceberg"),
2364 ("s3.endpoint", "http://127.0.0.1:9301"),
2365 ("s3.access.key", "hummockadmin"),
2366 ("s3.secret.key", "hummockadmin"),
2367 ("s3.path.style.access", "true"),
2368 ("s3.region", "us-east-1"),
2369 ("catalog.type", "jdbc"),
2370 ("catalog.name", "demo"),
2371 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2372 ("catalog.jdbc.user", "admin"),
2373 ("catalog.jdbc.password", "123456"),
2374 ("database.name", "demo_db"),
2375 ("table.name", "demo_table"),
2376 ("enable_compaction", "true"),
2377 ("compaction_interval_sec", "1800"),
2378 ("enable_snapshot_expiration", "true"),
2379 ]
2380 .into_iter()
2381 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2382 .collect();
2383
2384 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2385
2386 let expected_iceberg_config = IcebergConfig {
2387 common: IcebergCommon {
2388 warehouse_path: Some("s3://iceberg".to_owned()),
2389 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2390 region: Some("us-east-1".to_owned()),
2391 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2392 access_key: Some("hummockadmin".to_owned()),
2393 secret_key: Some("hummockadmin".to_owned()),
2394 gcs_credential: None,
2395 catalog_type: Some("jdbc".to_owned()),
2396 glue_id: None,
2397 catalog_name: Some("demo".to_owned()),
2398 database_name: Some("demo_db".to_owned()),
2399 table_name: "demo_table".to_owned(),
2400 path_style_access: Some(true),
2401 credential: None,
2402 oauth2_server_uri: None,
2403 scope: None,
2404 token: None,
2405 enable_config_load: None,
2406 rest_signing_name: None,
2407 rest_signing_region: None,
2408 rest_sigv4_enabled: None,
2409 hosted_catalog: None,
2410 azblob_account_name: None,
2411 azblob_account_key: None,
2412 azblob_endpoint_url: None,
2413 header: None,
2414 },
2415 r#type: "upsert".to_owned(),
2416 force_append_only: false,
2417 primary_key: Some(vec!["v1".to_owned()]),
2418 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2419 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2420 .into_iter()
2421 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2422 .collect(),
2423 commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2424 create_table_if_not_exists: false,
2425 is_exactly_once: None,
2426 commit_retry_num: 8,
2427 enable_compaction: true,
2428 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2429 enable_snapshot_expiration: true,
2430 write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2431 snapshot_expiration_max_age_millis: None,
2432 snapshot_expiration_retain_last: None,
2433 snapshot_expiration_clear_expired_files: true,
2434 snapshot_expiration_clear_expired_meta_data: true
2435 };
2436
2437 assert_eq!(iceberg_config, expected_iceberg_config);
2438
2439 assert_eq!(
2440 &iceberg_config.common.full_table_name().unwrap().to_string(),
2441 "demo_db.demo_table"
2442 );
2443 }
2444
2445 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2446 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2447
2448 let table = iceberg_config.load_table().await.unwrap();
2449
2450 println!("{:?}", table.identifier());
2451 }
2452
2453 #[tokio::test]
2454 #[ignore]
2455 async fn test_storage_catalog() {
2456 let values = [
2457 ("connector", "iceberg"),
2458 ("type", "append-only"),
2459 ("force_append_only", "true"),
2460 ("s3.endpoint", "http://127.0.0.1:9301"),
2461 ("s3.access.key", "hummockadmin"),
2462 ("s3.secret.key", "hummockadmin"),
2463 ("s3.region", "us-east-1"),
2464 ("s3.path.style.access", "true"),
2465 ("catalog.name", "demo"),
2466 ("catalog.type", "storage"),
2467 ("warehouse.path", "s3://icebergdata/demo"),
2468 ("database.name", "s1"),
2469 ("table.name", "t1"),
2470 ]
2471 .into_iter()
2472 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2473 .collect();
2474
2475 test_create_catalog(values).await;
2476 }
2477
2478 #[tokio::test]
2479 #[ignore]
2480 async fn test_rest_catalog() {
2481 let values = [
2482 ("connector", "iceberg"),
2483 ("type", "append-only"),
2484 ("force_append_only", "true"),
2485 ("s3.endpoint", "http://127.0.0.1:9301"),
2486 ("s3.access.key", "hummockadmin"),
2487 ("s3.secret.key", "hummockadmin"),
2488 ("s3.region", "us-east-1"),
2489 ("s3.path.style.access", "true"),
2490 ("catalog.name", "demo"),
2491 ("catalog.type", "rest"),
2492 ("catalog.uri", "http://192.168.167.4:8181"),
2493 ("warehouse.path", "s3://icebergdata/demo"),
2494 ("database.name", "s1"),
2495 ("table.name", "t1"),
2496 ]
2497 .into_iter()
2498 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2499 .collect();
2500
2501 test_create_catalog(values).await;
2502 }
2503
2504 #[tokio::test]
2505 #[ignore]
2506 async fn test_jdbc_catalog() {
2507 let values = [
2508 ("connector", "iceberg"),
2509 ("type", "append-only"),
2510 ("force_append_only", "true"),
2511 ("s3.endpoint", "http://127.0.0.1:9301"),
2512 ("s3.access.key", "hummockadmin"),
2513 ("s3.secret.key", "hummockadmin"),
2514 ("s3.region", "us-east-1"),
2515 ("s3.path.style.access", "true"),
2516 ("catalog.name", "demo"),
2517 ("catalog.type", "jdbc"),
2518 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2519 ("catalog.jdbc.user", "admin"),
2520 ("catalog.jdbc.password", "123456"),
2521 ("warehouse.path", "s3://icebergdata/demo"),
2522 ("database.name", "s1"),
2523 ("table.name", "t1"),
2524 ]
2525 .into_iter()
2526 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2527 .collect();
2528
2529 test_create_catalog(values).await;
2530 }
2531
2532 #[tokio::test]
2533 #[ignore]
2534 async fn test_hive_catalog() {
2535 let values = [
2536 ("connector", "iceberg"),
2537 ("type", "append-only"),
2538 ("force_append_only", "true"),
2539 ("s3.endpoint", "http://127.0.0.1:9301"),
2540 ("s3.access.key", "hummockadmin"),
2541 ("s3.secret.key", "hummockadmin"),
2542 ("s3.region", "us-east-1"),
2543 ("s3.path.style.access", "true"),
2544 ("catalog.name", "demo"),
2545 ("catalog.type", "hive"),
2546 ("catalog.uri", "thrift://localhost:9083"),
2547 ("warehouse.path", "s3://icebergdata/demo"),
2548 ("database.name", "s1"),
2549 ("table.name", "t1"),
2550 ]
2551 .into_iter()
2552 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2553 .collect();
2554
2555 test_create_catalog(values).await;
2556 }
2557
2558 #[test]
2559 fn test_config_constants_consistency() {
2560 assert_eq!(ENABLE_COMPACTION, "enable_compaction");
2563 assert_eq!(COMPACTION_INTERVAL_SEC, "compaction_interval_sec");
2564 assert_eq!(ENABLE_SNAPSHOT_EXPIRATION, "enable_snapshot_expiration");
2565 assert_eq!(WRITE_MODE, "write_mode");
2566 assert_eq!(
2567 SNAPSHOT_EXPIRATION_RETAIN_LAST,
2568 "snapshot_expiration_retain_last"
2569 );
2570 assert_eq!(
2571 SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
2572 "snapshot_expiration_max_age_millis"
2573 );
2574 assert_eq!(
2575 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
2576 "snapshot_expiration_clear_expired_files"
2577 );
2578 assert_eq!(
2579 SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
2580 "snapshot_expiration_clear_expired_meta_data"
2581 );
2582 }
2583}