1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, MAIN_BRANCH, SerializedDataFile, Transform, UnboundPartitionField,
30 UnboundPartitionSpec,
31};
32use iceberg::table::Table;
33use iceberg::transaction::Transaction;
34use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
35use iceberg::writer::base_writer::equality_delete_writer::{
36 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
37};
38use iceberg::writer::base_writer::sort_position_delete_writer::{
39 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
40};
41use iceberg::writer::file_writer::ParquetWriterBuilder;
42use iceberg::writer::file_writer::location_generator::{
43 DefaultFileNameGenerator, DefaultLocationGenerator,
44};
45use iceberg::writer::function_writer::equality_delta_writer::{
46 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
47};
48use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
49use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
50use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
51use itertools::Itertools;
52use parquet::file::properties::WriterProperties;
53use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
54use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
55use regex::Regex;
56use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
57use risingwave_common::array::arrow::arrow_schema_iceberg::{
58 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
59 Schema as ArrowSchema, SchemaRef,
60};
61use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
62use risingwave_common::array::{Op, StreamChunk};
63use risingwave_common::bail;
64use risingwave_common::bitmap::Bitmap;
65use risingwave_common::catalog::Schema;
66use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
67use risingwave_common_estimate_size::EstimateSize;
68use risingwave_pb::connector_service::SinkMetadata;
69use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
70use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71use sea_orm::DatabaseConnection;
72use serde_derive::Deserialize;
73use serde_json::from_value;
74use serde_with::{DisplayFromStr, serde_as};
75use thiserror_ext::AsReport;
76use tokio::sync::mpsc::UnboundedSender;
77use tokio_retry::Retry;
78use tokio_retry::strategy::{ExponentialBackoff, jitter};
79use tracing::warn;
80use url::Url;
81use uuid::Uuid;
82use with_options::WithOptions;
83
84use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
85use super::{
86 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
87 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
88};
89use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
90use crate::enforce_secret::EnforceSecret;
91use crate::sink::catalog::SinkId;
92use crate::sink::coordinate::CoordinatedLogSinker;
93use crate::sink::iceberg::exactly_once_util::*;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99pub const ICEBERG_COW_BRANCH: &str = "ingestion";
100pub const ICEBERG_WRITE_MODE_MERGE_ON_READ: &str = "merge-on-read";
101pub const ICEBERG_WRITE_MODE_COPY_ON_WRITE: &str = "copy-on-write";
102
103pub const ENABLE_COMPACTION: &str = "enable_compaction";
104pub const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
105pub const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
106pub const WRITE_MODE: &str = "write_mode";
107
108fn default_commit_retry_num() -> u32 {
109 8
110}
111
112fn default_iceberg_write_mode() -> String {
113 ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned()
114}
115
116#[serde_as]
117#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
118pub struct IcebergConfig {
119 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
122 pub force_append_only: bool,
123
124 #[serde(flatten)]
125 common: IcebergCommon,
126
127 #[serde(
128 rename = "primary_key",
129 default,
130 deserialize_with = "deserialize_optional_string_seq_from_string"
131 )]
132 pub primary_key: Option<Vec<String>>,
133
134 #[serde(skip)]
136 pub java_catalog_props: HashMap<String, String>,
137
138 #[serde(default)]
139 pub partition_by: Option<String>,
140
141 #[serde(default = "default_commit_checkpoint_interval")]
143 #[serde_as(as = "DisplayFromStr")]
144 #[with_option(allow_alter_on_fly)]
145 pub commit_checkpoint_interval: u64,
146
147 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
148 pub create_table_if_not_exists: bool,
149
150 #[serde(default)]
152 #[serde_as(as = "Option<DisplayFromStr>")]
153 pub is_exactly_once: Option<bool>,
154 #[serde(default = "default_commit_retry_num")]
159 pub commit_retry_num: u32,
160
161 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
163 #[with_option(allow_alter_on_fly)]
164 pub enable_compaction: bool,
165
166 #[serde(default)]
168 #[serde_as(as = "Option<DisplayFromStr>")]
169 #[with_option(allow_alter_on_fly)]
170 pub compaction_interval_sec: Option<u64>,
171
172 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
174 #[with_option(allow_alter_on_fly)]
175 pub enable_snapshot_expiration: bool,
176
177 #[serde(default = "default_iceberg_write_mode")]
179 pub write_mode: String,
180}
181
182impl EnforceSecret for IcebergConfig {
183 fn enforce_secret<'a>(
184 prop_iter: impl Iterator<Item = &'a str>,
185 ) -> crate::error::ConnectorResult<()> {
186 for prop in prop_iter {
187 IcebergCommon::enforce_one(prop)?;
188 }
189 Ok(())
190 }
191
192 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
193 IcebergCommon::enforce_one(prop)
194 }
195}
196
197impl IcebergConfig {
198 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
199 let mut config =
200 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
201 .map_err(|e| SinkError::Config(anyhow!(e)))?;
202
203 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
204 return Err(SinkError::Config(anyhow!(
205 "`{}` must be {}, or {}",
206 SINK_TYPE_OPTION,
207 SINK_TYPE_APPEND_ONLY,
208 SINK_TYPE_UPSERT
209 )));
210 }
211
212 if config.r#type == SINK_TYPE_UPSERT {
213 if let Some(primary_key) = &config.primary_key {
214 if primary_key.is_empty() {
215 return Err(SinkError::Config(anyhow!(
216 "`primary_key` must not be empty in {}",
217 SINK_TYPE_UPSERT
218 )));
219 }
220 } else {
221 return Err(SinkError::Config(anyhow!(
222 "Must set `primary_key` in {}",
223 SINK_TYPE_UPSERT
224 )));
225 }
226 }
227
228 config.java_catalog_props = values
230 .iter()
231 .filter(|(k, _v)| {
232 k.starts_with("catalog.")
233 && k != &"catalog.uri"
234 && k != &"catalog.type"
235 && k != &"catalog.name"
236 && k != &"catalog.header"
237 })
238 .map(|(k, v)| (k[8..].to_string(), v.clone()))
239 .collect();
240
241 if config.commit_checkpoint_interval == 0 {
242 return Err(SinkError::Config(anyhow!(
243 "`commit_checkpoint_interval` must be greater than 0"
244 )));
245 }
246
247 Ok(config)
248 }
249
250 pub fn catalog_type(&self) -> &str {
251 self.common.catalog_type()
252 }
253
254 pub async fn load_table(&self) -> Result<Table> {
255 self.common
256 .load_table(&self.java_catalog_props)
257 .await
258 .map_err(Into::into)
259 }
260
261 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
262 self.common
263 .create_catalog(&self.java_catalog_props)
264 .await
265 .map_err(Into::into)
266 }
267
268 pub fn full_table_name(&self) -> Result<TableIdent> {
269 self.common.full_table_name().map_err(Into::into)
270 }
271
272 pub fn catalog_name(&self) -> String {
273 self.common.catalog_name()
274 }
275
276 pub fn compaction_interval_sec(&self) -> u64 {
277 self.compaction_interval_sec.unwrap_or(3600)
279 }
280}
281
282pub struct IcebergSink {
283 config: IcebergConfig,
284 param: SinkParam,
285 unique_column_ids: Option<Vec<usize>>,
287}
288
289impl EnforceSecret for IcebergSink {
290 fn enforce_secret<'a>(
291 prop_iter: impl Iterator<Item = &'a str>,
292 ) -> crate::error::ConnectorResult<()> {
293 for prop in prop_iter {
294 IcebergConfig::enforce_one(prop)?;
295 }
296 Ok(())
297 }
298}
299
300impl TryFrom<SinkParam> for IcebergSink {
301 type Error = SinkError;
302
303 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
304 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
305 IcebergSink::new(config, param)
306 }
307}
308
309impl Debug for IcebergSink {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 f.debug_struct("IcebergSink")
312 .field("config", &self.config)
313 .finish()
314 }
315}
316
317impl IcebergSink {
318 async fn create_and_validate_table(&self) -> Result<Table> {
319 if self.config.create_table_if_not_exists {
320 self.create_table_if_not_exists().await?;
321 }
322
323 let table = self
324 .config
325 .load_table()
326 .await
327 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
328
329 let sink_schema = self.param.schema();
330 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
331 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
332
333 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
334 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
335
336 Ok(table)
337 }
338
339 async fn create_table_if_not_exists(&self) -> Result<()> {
340 let catalog = self.config.create_catalog().await?;
341 let namespace = if let Some(database_name) = &self.config.common.database_name {
342 let namespace = NamespaceIdent::new(database_name.clone());
343 if !catalog
344 .namespace_exists(&namespace)
345 .await
346 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
347 {
348 catalog
349 .create_namespace(&namespace, HashMap::default())
350 .await
351 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
352 .context("failed to create iceberg namespace")?;
353 }
354 namespace
355 } else {
356 bail!("database name must be set if you want to create table")
357 };
358
359 let table_id = self
360 .config
361 .full_table_name()
362 .context("Unable to parse table name")?;
363 if !catalog
364 .table_exists(&table_id)
365 .await
366 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
367 {
368 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
369 let arrow_fields = self
371 .param
372 .columns
373 .iter()
374 .map(|column| {
375 Ok(iceberg_create_table_arrow_convert
376 .to_arrow_field(&column.name, &column.data_type)
377 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
378 .context(format!(
379 "failed to convert {}: {} to arrow type",
380 &column.name, &column.data_type
381 ))?)
382 })
383 .collect::<Result<Vec<ArrowField>>>()?;
384 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
385 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
386 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
387 .context("failed to convert arrow schema to iceberg schema")?;
388
389 let location = {
390 let mut names = namespace.clone().inner();
391 names.push(self.config.common.table_name.clone());
392 match &self.config.common.warehouse_path {
393 Some(warehouse_path) => {
394 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
395 let url = Url::parse(warehouse_path);
396 if url.is_err() || is_s3_tables {
397 if self.config.common.catalog_type() == "rest"
400 || self.config.common.catalog_type() == "rest_rust"
401 {
402 None
403 } else {
404 bail!(format!("Invalid warehouse path: {}", warehouse_path))
405 }
406 } else if warehouse_path.ends_with('/') {
407 Some(format!("{}{}", warehouse_path, names.join("/")))
408 } else {
409 Some(format!("{}/{}", warehouse_path, names.join("/")))
410 }
411 }
412 None => None,
413 }
414 };
415
416 let partition_spec = match &self.config.partition_by {
417 Some(partition_by) => {
418 let mut partition_fields = Vec::<UnboundPartitionField>::new();
419 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
420 .into_iter()
421 .enumerate()
422 {
423 match iceberg_schema.field_id_by_name(&column) {
424 Some(id) => partition_fields.push(
425 UnboundPartitionField::builder()
426 .source_id(id)
427 .transform(transform)
428 .name(format!("_p_{}", column))
429 .field_id(i as i32)
430 .build(),
431 ),
432 None => bail!(format!(
433 "Partition source column does not exist in schema: {}",
434 column
435 )),
436 };
437 }
438 Some(
439 UnboundPartitionSpec::builder()
440 .with_spec_id(0)
441 .add_partition_fields(partition_fields)
442 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
443 .context("failed to add partition columns")?
444 .build(),
445 )
446 }
447 None => None,
448 };
449
450 let table_creation_builder = TableCreation::builder()
451 .name(self.config.common.table_name.clone())
452 .schema(iceberg_schema);
453
454 let table_creation = match (location, partition_spec) {
455 (Some(location), Some(partition_spec)) => table_creation_builder
456 .location(location)
457 .partition_spec(partition_spec)
458 .build(),
459 (Some(location), None) => table_creation_builder.location(location).build(),
460 (None, Some(partition_spec)) => table_creation_builder
461 .partition_spec(partition_spec)
462 .build(),
463 (None, None) => table_creation_builder.build(),
464 };
465
466 catalog
467 .create_table(&namespace, table_creation)
468 .await
469 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
470 .context("failed to create iceberg table")?;
471 }
472 Ok(())
473 }
474
475 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
476 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
477 if let Some(pk) = &config.primary_key {
478 let mut unique_column_ids = Vec::with_capacity(pk.len());
479 for col_name in pk {
480 let id = param
481 .columns
482 .iter()
483 .find(|col| col.name.as_str() == col_name)
484 .ok_or_else(|| {
485 SinkError::Config(anyhow!(
486 "Primary key column {} not found in sink schema",
487 col_name
488 ))
489 })?
490 .column_id
491 .get_id() as usize;
492 unique_column_ids.push(id);
493 }
494 Some(unique_column_ids)
495 } else {
496 unreachable!()
497 }
498 } else {
499 None
500 };
501 Ok(Self {
502 config,
503 param,
504 unique_column_ids,
505 })
506 }
507}
508
509impl Sink for IcebergSink {
510 type Coordinator = IcebergSinkCommitter;
511 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
512
513 const SINK_NAME: &'static str = ICEBERG_SINK;
514
515 async fn validate(&self) -> Result<()> {
516 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
517 bail!("Snowflake catalog only supports iceberg sources");
518 }
519 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
520 risingwave_common::license::Feature::IcebergSinkWithGlue
521 .check_available()
522 .map_err(|e| anyhow::anyhow!(e))?;
523 }
524 if self.config.enable_compaction {
525 risingwave_common::license::Feature::IcebergCompaction
526 .check_available()
527 .map_err(|e| anyhow::anyhow!(e))?;
528 }
529
530 let _ = self.create_and_validate_table().await?;
531 Ok(())
532 }
533
534 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
535 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
536
537 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
538 if iceberg_config.enable_compaction {
539 if compaction_interval == 0 {
540 bail!(
541 "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
542 );
543 }
544 } else {
545 bail!("`compaction_interval_sec` can only be set when `enable_compaction` is true");
546 }
547 }
548
549 Ok(())
550 }
551
552 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
553 let table = self.create_and_validate_table().await?;
554 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
555 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
556 } else {
557 IcebergSinkWriter::new_append_only(table, &writer_param).await?
558 };
559
560 let commit_checkpoint_interval =
561 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
562 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
563 );
564 let writer = CoordinatedLogSinker::new(
565 &writer_param,
566 self.param.clone(),
567 inner,
568 commit_checkpoint_interval,
569 )
570 .await?;
571
572 Ok(writer)
573 }
574
575 fn is_coordinated_sink(&self) -> bool {
576 true
577 }
578
579 async fn new_coordinator(
580 &self,
581 db: DatabaseConnection,
582 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
583 ) -> Result<Self::Coordinator> {
584 let catalog = self.config.create_catalog().await?;
585 let table = self.create_and_validate_table().await?;
586 Ok(IcebergSinkCommitter {
587 catalog,
588 table,
589 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
590 last_commit_epoch: 0,
591 sink_id: self.param.sink_id.sink_id(),
592 config: self.config.clone(),
593 param: self.param.clone(),
594 db,
595 commit_retry_num: self.config.commit_retry_num,
596 committed_epoch_subscriber: None,
597 iceberg_compact_stat_sender,
598 })
599 }
600}
601
602enum ProjectIdxVec {
609 None,
610 Prepare(usize),
611 Done(Vec<usize>),
612}
613
614pub struct IcebergSinkWriter {
615 writer: IcebergWriterDispatch,
616 arrow_schema: SchemaRef,
617 metrics: IcebergWriterMetrics,
619 table: Table,
621 project_idx_vec: ProjectIdxVec,
624}
625
626#[allow(clippy::type_complexity)]
627enum IcebergWriterDispatch {
628 PartitionAppendOnly {
629 writer: Option<Box<dyn IcebergWriter>>,
630 writer_builder: MonitoredGeneralWriterBuilder<
631 FanoutPartitionWriterBuilder<
632 DataFileWriterBuilder<
633 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
634 >,
635 >,
636 >,
637 },
638 NonpartitionAppendOnly {
639 writer: Option<Box<dyn IcebergWriter>>,
640 writer_builder: MonitoredGeneralWriterBuilder<
641 DataFileWriterBuilder<
642 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
643 >,
644 >,
645 },
646 PartitionUpsert {
647 writer: Option<Box<dyn IcebergWriter>>,
648 writer_builder: MonitoredGeneralWriterBuilder<
649 FanoutPartitionWriterBuilder<
650 EqualityDeltaWriterBuilder<
651 DataFileWriterBuilder<
652 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
653 >,
654 MonitoredPositionDeleteWriterBuilder<
655 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
656 >,
657 EqualityDeleteFileWriterBuilder<
658 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
659 >,
660 >,
661 >,
662 >,
663 arrow_schema_with_op_column: SchemaRef,
664 },
665 NonpartitionUpsert {
666 writer: Option<Box<dyn IcebergWriter>>,
667 writer_builder: MonitoredGeneralWriterBuilder<
668 EqualityDeltaWriterBuilder<
669 DataFileWriterBuilder<
670 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
671 >,
672 MonitoredPositionDeleteWriterBuilder<
673 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
674 >,
675 EqualityDeleteFileWriterBuilder<
676 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
677 >,
678 >,
679 >,
680 arrow_schema_with_op_column: SchemaRef,
681 },
682}
683
684impl IcebergWriterDispatch {
685 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
686 match self {
687 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
688 | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
689 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
690 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
691 }
692 }
693}
694
695pub struct IcebergWriterMetrics {
696 _write_qps: LabelGuardedIntCounter,
701 _write_latency: LabelGuardedHistogram,
702 write_bytes: LabelGuardedIntCounter,
703}
704
705impl IcebergSinkWriter {
706 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
707 let SinkWriterParam {
708 extra_partition_col_idx,
709 actor_id,
710 sink_id,
711 sink_name,
712 ..
713 } = writer_param;
714 let metrics_labels = [
715 &actor_id.to_string(),
716 &sink_id.to_string(),
717 sink_name.as_str(),
718 ];
719
720 let write_qps = GLOBAL_SINK_METRICS
722 .iceberg_write_qps
723 .with_guarded_label_values(&metrics_labels);
724 let write_latency = GLOBAL_SINK_METRICS
725 .iceberg_write_latency
726 .with_guarded_label_values(&metrics_labels);
727 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
730 .iceberg_rolling_unflushed_data_file
731 .with_guarded_label_values(&metrics_labels);
732 let write_bytes = GLOBAL_SINK_METRICS
733 .iceberg_write_bytes
734 .with_guarded_label_values(&metrics_labels);
735
736 let schema = table.metadata().current_schema();
737 let partition_spec = table.metadata().default_partition_spec();
738
739 let unique_uuid_suffix = Uuid::now_v7();
741
742 let parquet_writer_properties = WriterProperties::builder()
743 .set_max_row_group_size(
744 writer_param
745 .streaming_config
746 .developer
747 .iceberg_sink_write_parquet_max_row_group_rows,
748 )
749 .build();
750
751 let parquet_writer_builder = ParquetWriterBuilder::new(
752 parquet_writer_properties,
753 schema.clone(),
754 table.file_io().clone(),
755 DefaultLocationGenerator::new(table.metadata().clone())
756 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
757 DefaultFileNameGenerator::new(
758 writer_param.actor_id.to_string(),
759 Some(unique_uuid_suffix.to_string()),
760 iceberg::spec::DataFileFormat::Parquet,
761 ),
762 );
763 let data_file_builder =
764 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
765 if partition_spec.fields().is_empty() {
766 let writer_builder = MonitoredGeneralWriterBuilder::new(
767 data_file_builder,
768 write_qps.clone(),
769 write_latency.clone(),
770 );
771 let inner_writer = Some(Box::new(
772 writer_builder
773 .clone()
774 .build()
775 .await
776 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
777 ) as Box<dyn IcebergWriter>);
778 Ok(Self {
779 arrow_schema: Arc::new(
780 schema_to_arrow_schema(table.metadata().current_schema())
781 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
782 ),
783 metrics: IcebergWriterMetrics {
784 _write_qps: write_qps,
785 _write_latency: write_latency,
786 write_bytes,
787 },
788 writer: IcebergWriterDispatch::NonpartitionAppendOnly {
789 writer: inner_writer,
790 writer_builder,
791 },
792 table,
793 project_idx_vec: {
794 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
795 ProjectIdxVec::Prepare(*extra_partition_col_idx)
796 } else {
797 ProjectIdxVec::None
798 }
799 },
800 })
801 } else {
802 let partition_builder = MonitoredGeneralWriterBuilder::new(
803 FanoutPartitionWriterBuilder::new(
804 data_file_builder,
805 partition_spec.clone(),
806 schema.clone(),
807 )
808 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
809 write_qps.clone(),
810 write_latency.clone(),
811 );
812 let inner_writer = Some(Box::new(
813 partition_builder
814 .clone()
815 .build()
816 .await
817 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
818 ) as Box<dyn IcebergWriter>);
819 Ok(Self {
820 arrow_schema: Arc::new(
821 schema_to_arrow_schema(table.metadata().current_schema())
822 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
823 ),
824 metrics: IcebergWriterMetrics {
825 _write_qps: write_qps,
826 _write_latency: write_latency,
827 write_bytes,
828 },
829 writer: IcebergWriterDispatch::PartitionAppendOnly {
830 writer: inner_writer,
831 writer_builder: partition_builder,
832 },
833 table,
834 project_idx_vec: {
835 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
836 ProjectIdxVec::Prepare(*extra_partition_col_idx)
837 } else {
838 ProjectIdxVec::None
839 }
840 },
841 })
842 }
843 }
844
845 pub async fn new_upsert(
846 table: Table,
847 unique_column_ids: Vec<usize>,
848 writer_param: &SinkWriterParam,
849 ) -> Result<Self> {
850 let SinkWriterParam {
851 extra_partition_col_idx,
852 actor_id,
853 sink_id,
854 sink_name,
855 ..
856 } = writer_param;
857 let metrics_labels = [
858 &actor_id.to_string(),
859 &sink_id.to_string(),
860 sink_name.as_str(),
861 ];
862 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
863
864 let write_qps = GLOBAL_SINK_METRICS
866 .iceberg_write_qps
867 .with_guarded_label_values(&metrics_labels);
868 let write_latency = GLOBAL_SINK_METRICS
869 .iceberg_write_latency
870 .with_guarded_label_values(&metrics_labels);
871 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
874 .iceberg_rolling_unflushed_data_file
875 .with_guarded_label_values(&metrics_labels);
876 let position_delete_cache_num = GLOBAL_SINK_METRICS
877 .iceberg_position_delete_cache_num
878 .with_guarded_label_values(&metrics_labels);
879 let write_bytes = GLOBAL_SINK_METRICS
880 .iceberg_write_bytes
881 .with_guarded_label_values(&metrics_labels);
882
883 let schema = table.metadata().current_schema();
885 let partition_spec = table.metadata().default_partition_spec();
886
887 let unique_uuid_suffix = Uuid::now_v7();
889
890 let parquet_writer_properties = WriterProperties::builder()
891 .set_max_row_group_size(
892 writer_param
893 .streaming_config
894 .developer
895 .iceberg_sink_write_parquet_max_row_group_rows,
896 )
897 .build();
898
899 let data_file_builder = {
900 let parquet_writer_builder = ParquetWriterBuilder::new(
901 parquet_writer_properties.clone(),
902 schema.clone(),
903 table.file_io().clone(),
904 DefaultLocationGenerator::new(table.metadata().clone())
905 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
906 DefaultFileNameGenerator::new(
907 writer_param.actor_id.to_string(),
908 Some(unique_uuid_suffix.to_string()),
909 iceberg::spec::DataFileFormat::Parquet,
910 ),
911 );
912 DataFileWriterBuilder::new(
913 parquet_writer_builder.clone(),
914 None,
915 partition_spec.spec_id(),
916 )
917 };
918 let position_delete_builder = {
919 let parquet_writer_builder = ParquetWriterBuilder::new(
920 parquet_writer_properties.clone(),
921 POSITION_DELETE_SCHEMA.clone(),
922 table.file_io().clone(),
923 DefaultLocationGenerator::new(table.metadata().clone())
924 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
925 DefaultFileNameGenerator::new(
926 writer_param.actor_id.to_string(),
927 Some(format!("pos-del-{}", unique_uuid_suffix)),
928 iceberg::spec::DataFileFormat::Parquet,
929 ),
930 );
931 MonitoredPositionDeleteWriterBuilder::new(
932 SortPositionDeleteWriterBuilder::new(
933 parquet_writer_builder.clone(),
934 writer_param
935 .streaming_config
936 .developer
937 .iceberg_sink_positional_delete_cache_size,
938 None,
939 None,
940 ),
941 position_delete_cache_num,
942 )
943 };
944 let equality_delete_builder = {
945 let config = EqualityDeleteWriterConfig::new(
946 unique_column_ids.clone(),
947 table.metadata().current_schema().clone(),
948 None,
949 partition_spec.spec_id(),
950 )
951 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
952 let parquet_writer_builder = ParquetWriterBuilder::new(
953 parquet_writer_properties.clone(),
954 Arc::new(
955 arrow_schema_to_schema(config.projected_arrow_schema_ref())
956 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
957 ),
958 table.file_io().clone(),
959 DefaultLocationGenerator::new(table.metadata().clone())
960 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
961 DefaultFileNameGenerator::new(
962 writer_param.actor_id.to_string(),
963 Some(format!("eq-del-{}", unique_uuid_suffix)),
964 iceberg::spec::DataFileFormat::Parquet,
965 ),
966 );
967
968 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
969 };
970 let delta_builder = EqualityDeltaWriterBuilder::new(
971 data_file_builder,
972 position_delete_builder,
973 equality_delete_builder,
974 unique_column_ids,
975 );
976 if partition_spec.fields().is_empty() {
977 let writer_builder = MonitoredGeneralWriterBuilder::new(
978 delta_builder,
979 write_qps.clone(),
980 write_latency.clone(),
981 );
982 let inner_writer = Some(Box::new(
983 writer_builder
984 .clone()
985 .build()
986 .await
987 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
988 ) as Box<dyn IcebergWriter>);
989 let original_arrow_schema = Arc::new(
990 schema_to_arrow_schema(table.metadata().current_schema())
991 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
992 );
993 let schema_with_extra_op_column = {
994 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
995 new_fields.push(Arc::new(ArrowField::new(
996 "op".to_owned(),
997 ArrowDataType::Int32,
998 false,
999 )));
1000 Arc::new(ArrowSchema::new(new_fields))
1001 };
1002 Ok(Self {
1003 arrow_schema: original_arrow_schema,
1004 metrics: IcebergWriterMetrics {
1005 _write_qps: write_qps,
1006 _write_latency: write_latency,
1007 write_bytes,
1008 },
1009 table,
1010 writer: IcebergWriterDispatch::NonpartitionUpsert {
1011 writer: inner_writer,
1012 writer_builder,
1013 arrow_schema_with_op_column: schema_with_extra_op_column,
1014 },
1015 project_idx_vec: {
1016 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1017 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1018 } else {
1019 ProjectIdxVec::None
1020 }
1021 },
1022 })
1023 } else {
1024 let original_arrow_schema = Arc::new(
1025 schema_to_arrow_schema(table.metadata().current_schema())
1026 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1027 );
1028 let schema_with_extra_op_column = {
1029 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
1030 new_fields.push(Arc::new(ArrowField::new(
1031 "op".to_owned(),
1032 ArrowDataType::Int32,
1033 false,
1034 )));
1035 Arc::new(ArrowSchema::new(new_fields))
1036 };
1037 let partition_builder = MonitoredGeneralWriterBuilder::new(
1038 FanoutPartitionWriterBuilder::new_with_custom_schema(
1039 delta_builder,
1040 schema_with_extra_op_column.clone(),
1041 partition_spec.clone(),
1042 table.metadata().current_schema().clone(),
1043 ),
1044 write_qps.clone(),
1045 write_latency.clone(),
1046 );
1047 let inner_writer = Some(Box::new(
1048 partition_builder
1049 .clone()
1050 .build()
1051 .await
1052 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1053 ) as Box<dyn IcebergWriter>);
1054 Ok(Self {
1055 arrow_schema: original_arrow_schema,
1056 metrics: IcebergWriterMetrics {
1057 _write_qps: write_qps,
1058 _write_latency: write_latency,
1059 write_bytes,
1060 },
1061 table,
1062 writer: IcebergWriterDispatch::PartitionUpsert {
1063 writer: inner_writer,
1064 writer_builder: partition_builder,
1065 arrow_schema_with_op_column: schema_with_extra_op_column,
1066 },
1067 project_idx_vec: {
1068 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1069 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1070 } else {
1071 ProjectIdxVec::None
1072 }
1073 },
1074 })
1075 }
1076 }
1077}
1078
1079#[async_trait]
1080impl SinkWriter for IcebergSinkWriter {
1081 type CommitMetadata = Option<SinkMetadata>;
1082
1083 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1085 Ok(())
1087 }
1088
1089 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1091 match &mut self.writer {
1093 IcebergWriterDispatch::PartitionAppendOnly {
1094 writer,
1095 writer_builder,
1096 } => {
1097 if writer.is_none() {
1098 *writer = Some(Box::new(
1099 writer_builder
1100 .clone()
1101 .build()
1102 .await
1103 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1104 ));
1105 }
1106 }
1107 IcebergWriterDispatch::NonpartitionAppendOnly {
1108 writer,
1109 writer_builder,
1110 } => {
1111 if writer.is_none() {
1112 *writer = Some(Box::new(
1113 writer_builder
1114 .clone()
1115 .build()
1116 .await
1117 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1118 ));
1119 }
1120 }
1121 IcebergWriterDispatch::PartitionUpsert {
1122 writer,
1123 writer_builder,
1124 ..
1125 } => {
1126 if writer.is_none() {
1127 *writer = Some(Box::new(
1128 writer_builder
1129 .clone()
1130 .build()
1131 .await
1132 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1133 ));
1134 }
1135 }
1136 IcebergWriterDispatch::NonpartitionUpsert {
1137 writer,
1138 writer_builder,
1139 ..
1140 } => {
1141 if writer.is_none() {
1142 *writer = Some(Box::new(
1143 writer_builder
1144 .clone()
1145 .build()
1146 .await
1147 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1148 ));
1149 }
1150 }
1151 };
1152
1153 let (mut chunk, ops) = chunk.compact().into_parts();
1155 match &self.project_idx_vec {
1156 ProjectIdxVec::None => {}
1157 ProjectIdxVec::Prepare(idx) => {
1158 if *idx >= chunk.columns().len() {
1159 return Err(SinkError::Iceberg(anyhow!(
1160 "invalid extra partition column index {}",
1161 idx
1162 )));
1163 }
1164 let project_idx_vec = (0..*idx)
1165 .chain(*idx + 1..chunk.columns().len())
1166 .collect_vec();
1167 chunk = chunk.project(&project_idx_vec);
1168 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1169 }
1170 ProjectIdxVec::Done(idx_vec) => {
1171 chunk = chunk.project(idx_vec);
1172 }
1173 }
1174 if ops.is_empty() {
1175 return Ok(());
1176 }
1177 let write_batch_size = chunk.estimated_heap_size();
1178 let batch = match &self.writer {
1179 IcebergWriterDispatch::PartitionAppendOnly { .. }
1180 | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1181 let filters =
1183 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1184 chunk.set_visibility(filters);
1185 IcebergArrowConvert
1186 .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1187 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1188 }
1189 IcebergWriterDispatch::PartitionUpsert {
1190 arrow_schema_with_op_column,
1191 ..
1192 }
1193 | IcebergWriterDispatch::NonpartitionUpsert {
1194 arrow_schema_with_op_column,
1195 ..
1196 } => {
1197 let chunk = IcebergArrowConvert
1198 .to_record_batch(self.arrow_schema.clone(), &chunk)
1199 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1200 let ops = Arc::new(Int32Array::from(
1201 ops.iter()
1202 .map(|op| match op {
1203 Op::UpdateInsert | Op::Insert => INSERT_OP,
1204 Op::UpdateDelete | Op::Delete => DELETE_OP,
1205 })
1206 .collect_vec(),
1207 ));
1208 let mut columns = chunk.columns().to_vec();
1209 columns.push(ops);
1210 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1211 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1212 }
1213 };
1214
1215 let writer = self.writer.get_writer().unwrap();
1216 writer
1217 .write(batch)
1218 .instrument_await("iceberg_write")
1219 .await
1220 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1221 self.metrics.write_bytes.inc_by(write_batch_size as _);
1222 Ok(())
1223 }
1224
1225 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1228 if !is_checkpoint {
1230 return Ok(None);
1231 }
1232
1233 let close_result = match &mut self.writer {
1234 IcebergWriterDispatch::PartitionAppendOnly {
1235 writer,
1236 writer_builder,
1237 } => {
1238 let close_result = match writer.take() {
1239 Some(mut writer) => {
1240 Some(writer.close().instrument_await("iceberg_close").await)
1241 }
1242 _ => None,
1243 };
1244 match writer_builder.clone().build().await {
1245 Ok(new_writer) => {
1246 *writer = Some(Box::new(new_writer));
1247 }
1248 _ => {
1249 warn!("Failed to build new writer after close");
1252 }
1253 }
1254 close_result
1255 }
1256 IcebergWriterDispatch::NonpartitionAppendOnly {
1257 writer,
1258 writer_builder,
1259 } => {
1260 let close_result = match writer.take() {
1261 Some(mut writer) => {
1262 Some(writer.close().instrument_await("iceberg_close").await)
1263 }
1264 _ => None,
1265 };
1266 match writer_builder.clone().build().await {
1267 Ok(new_writer) => {
1268 *writer = Some(Box::new(new_writer));
1269 }
1270 _ => {
1271 warn!("Failed to build new writer after close");
1274 }
1275 }
1276 close_result
1277 }
1278 IcebergWriterDispatch::PartitionUpsert {
1279 writer,
1280 writer_builder,
1281 ..
1282 } => {
1283 let close_result = match writer.take() {
1284 Some(mut writer) => {
1285 Some(writer.close().instrument_await("iceberg_close").await)
1286 }
1287 _ => None,
1288 };
1289 match writer_builder.clone().build().await {
1290 Ok(new_writer) => {
1291 *writer = Some(Box::new(new_writer));
1292 }
1293 _ => {
1294 warn!("Failed to build new writer after close");
1297 }
1298 }
1299 close_result
1300 }
1301 IcebergWriterDispatch::NonpartitionUpsert {
1302 writer,
1303 writer_builder,
1304 ..
1305 } => {
1306 let close_result = match writer.take() {
1307 Some(mut writer) => {
1308 Some(writer.close().instrument_await("iceberg_close").await)
1309 }
1310 _ => None,
1311 };
1312 match writer_builder.clone().build().await {
1313 Ok(new_writer) => {
1314 *writer = Some(Box::new(new_writer));
1315 }
1316 _ => {
1317 warn!("Failed to build new writer after close");
1320 }
1321 }
1322 close_result
1323 }
1324 };
1325
1326 match close_result {
1327 Some(Ok(result)) => {
1328 let version = self.table.metadata().format_version() as u8;
1329 let partition_type = self.table.metadata().default_partition_type();
1330 let data_files = result
1331 .into_iter()
1332 .map(|f| {
1333 SerializedDataFile::try_from(f, partition_type, version == 1)
1334 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1335 })
1336 .collect::<Result<Vec<_>>>()?;
1337 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1338 data_files,
1339 schema_id: self.table.metadata().current_schema_id(),
1340 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1341 })?))
1342 }
1343 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1344 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1345 }
1346 }
1347
1348 async fn abort(&mut self) -> Result<()> {
1350 Ok(())
1352 }
1353}
1354
1355const SCHEMA_ID: &str = "schema_id";
1356const PARTITION_SPEC_ID: &str = "partition_spec_id";
1357const DATA_FILES: &str = "data_files";
1358
1359#[derive(Default, Clone)]
1360struct IcebergCommitResult {
1361 schema_id: i32,
1362 partition_spec_id: i32,
1363 data_files: Vec<SerializedDataFile>,
1364}
1365
1366impl IcebergCommitResult {
1367 fn try_from(value: &SinkMetadata) -> Result<Self> {
1368 if let Some(Serialized(v)) = &value.metadata {
1369 let mut values = if let serde_json::Value::Object(v) =
1370 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1371 .context("Can't parse iceberg sink metadata")?
1372 {
1373 v
1374 } else {
1375 bail!("iceberg sink metadata should be an object");
1376 };
1377
1378 let schema_id;
1379 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1380 schema_id = value
1381 .as_u64()
1382 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1383 } else {
1384 bail!("iceberg sink metadata should have schema_id");
1385 }
1386
1387 let partition_spec_id;
1388 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1389 partition_spec_id = value
1390 .as_u64()
1391 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1392 } else {
1393 bail!("iceberg sink metadata should have partition_spec_id");
1394 }
1395
1396 let data_files: Vec<SerializedDataFile>;
1397 if let serde_json::Value::Array(values) = values
1398 .remove(DATA_FILES)
1399 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1400 {
1401 data_files = values
1402 .into_iter()
1403 .map(from_value::<SerializedDataFile>)
1404 .collect::<std::result::Result<_, _>>()
1405 .unwrap();
1406 } else {
1407 bail!("iceberg sink metadata should have data_files object");
1408 }
1409
1410 Ok(Self {
1411 schema_id: schema_id as i32,
1412 partition_spec_id: partition_spec_id as i32,
1413 data_files,
1414 })
1415 } else {
1416 bail!("Can't create iceberg sink write result from empty data!")
1417 }
1418 }
1419
1420 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1421 let mut values = if let serde_json::Value::Object(value) =
1422 serde_json::from_slice::<serde_json::Value>(&value)
1423 .context("Can't parse iceberg sink metadata")?
1424 {
1425 value
1426 } else {
1427 bail!("iceberg sink metadata should be an object");
1428 };
1429
1430 let schema_id;
1431 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1432 schema_id = value
1433 .as_u64()
1434 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1435 } else {
1436 bail!("iceberg sink metadata should have schema_id");
1437 }
1438
1439 let partition_spec_id;
1440 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1441 partition_spec_id = value
1442 .as_u64()
1443 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1444 } else {
1445 bail!("iceberg sink metadata should have partition_spec_id");
1446 }
1447
1448 let data_files: Vec<SerializedDataFile>;
1449 if let serde_json::Value::Array(values) = values
1450 .remove(DATA_FILES)
1451 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1452 {
1453 data_files = values
1454 .into_iter()
1455 .map(from_value::<SerializedDataFile>)
1456 .collect::<std::result::Result<_, _>>()
1457 .unwrap();
1458 } else {
1459 bail!("iceberg sink metadata should have data_files object");
1460 }
1461
1462 Ok(Self {
1463 schema_id: schema_id as i32,
1464 partition_spec_id: partition_spec_id as i32,
1465 data_files,
1466 })
1467 }
1468}
1469
1470impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1471 type Error = SinkError;
1472
1473 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1474 let json_data_files = serde_json::Value::Array(
1475 value
1476 .data_files
1477 .iter()
1478 .map(serde_json::to_value)
1479 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1480 .context("Can't serialize data files to json")?,
1481 );
1482 let json_value = serde_json::Value::Object(
1483 vec![
1484 (
1485 SCHEMA_ID.to_owned(),
1486 serde_json::Value::Number(value.schema_id.into()),
1487 ),
1488 (
1489 PARTITION_SPEC_ID.to_owned(),
1490 serde_json::Value::Number(value.partition_spec_id.into()),
1491 ),
1492 (DATA_FILES.to_owned(), json_data_files),
1493 ]
1494 .into_iter()
1495 .collect(),
1496 );
1497 Ok(SinkMetadata {
1498 metadata: Some(Serialized(SerializedMetadata {
1499 metadata: serde_json::to_vec(&json_value)
1500 .context("Can't serialize iceberg sink metadata")?,
1501 })),
1502 })
1503 }
1504}
1505
1506impl TryFrom<IcebergCommitResult> for Vec<u8> {
1507 type Error = SinkError;
1508
1509 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1510 let json_data_files = serde_json::Value::Array(
1511 value
1512 .data_files
1513 .iter()
1514 .map(serde_json::to_value)
1515 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1516 .context("Can't serialize data files to json")?,
1517 );
1518 let json_value = serde_json::Value::Object(
1519 vec![
1520 (
1521 SCHEMA_ID.to_owned(),
1522 serde_json::Value::Number(value.schema_id.into()),
1523 ),
1524 (
1525 PARTITION_SPEC_ID.to_owned(),
1526 serde_json::Value::Number(value.partition_spec_id.into()),
1527 ),
1528 (DATA_FILES.to_owned(), json_data_files),
1529 ]
1530 .into_iter()
1531 .collect(),
1532 );
1533 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1534 }
1535}
1536pub struct IcebergSinkCommitter {
1537 catalog: Arc<dyn Catalog>,
1538 table: Table,
1539 pub last_commit_epoch: u64,
1540 pub(crate) is_exactly_once: bool,
1541 pub(crate) sink_id: u32,
1542 pub(crate) config: IcebergConfig,
1543 pub(crate) param: SinkParam,
1544 pub(crate) db: DatabaseConnection,
1545 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1546 commit_retry_num: u32,
1547 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1548}
1549
1550impl IcebergSinkCommitter {
1551 async fn reload_table(
1554 catalog: &dyn Catalog,
1555 table_ident: &TableIdent,
1556 schema_id: i32,
1557 partition_spec_id: i32,
1558 ) -> Result<Table> {
1559 let table = catalog
1560 .load_table(table_ident)
1561 .await
1562 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1563 if table.metadata().current_schema_id() != schema_id {
1564 return Err(SinkError::Iceberg(anyhow!(
1565 "Schema evolution not supported, expect schema id {}, but got {}",
1566 schema_id,
1567 table.metadata().current_schema_id()
1568 )));
1569 }
1570 if table.metadata().default_partition_spec_id() != partition_spec_id {
1571 return Err(SinkError::Iceberg(anyhow!(
1572 "Partition evolution not supported, expect partition spec id {}, but got {}",
1573 partition_spec_id,
1574 table.metadata().default_partition_spec_id()
1575 )));
1576 }
1577 Ok(table)
1578 }
1579}
1580
1581#[async_trait::async_trait]
1582impl SinkCommitCoordinator for IcebergSinkCommitter {
1583 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1584 if self.is_exactly_once {
1585 self.committed_epoch_subscriber = Some(subscriber);
1586 tracing::info!(
1587 "Sink id = {}: iceberg sink coordinator initing.",
1588 self.param.sink_id.sink_id()
1589 );
1590 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1591 let ordered_metadata_list_by_end_epoch =
1592 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1593
1594 let mut last_recommit_epoch = 0;
1595 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1596 ordered_metadata_list_by_end_epoch
1597 {
1598 let write_results_bytes = deserialize_metadata(sealized_bytes);
1599 let mut write_results = vec![];
1600
1601 for each in write_results_bytes {
1602 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1603 write_results.push(write_result);
1604 }
1605
1606 match (
1607 committed,
1608 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1609 .await?,
1610 ) {
1611 (true, _) => {
1612 tracing::info!(
1613 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1614 self.param.sink_id.sink_id()
1615 );
1616 }
1617 (false, true) => {
1618 tracing::info!(
1620 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1621 self.param.sink_id.sink_id()
1622 );
1623 mark_row_is_committed_by_sink_id_and_end_epoch(
1624 &self.db,
1625 self.sink_id,
1626 end_epoch,
1627 )
1628 .await?;
1629 }
1630 (false, false) => {
1631 tracing::info!(
1632 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1633 self.param.sink_id.sink_id()
1634 );
1635 self.re_commit(end_epoch, write_results, snapshot_id)
1636 .await?;
1637 }
1638 }
1639
1640 last_recommit_epoch = end_epoch;
1641 }
1642 tracing::info!(
1643 "Sink id = {}: iceberg commit coordinator inited.",
1644 self.param.sink_id.sink_id()
1645 );
1646 return Ok(Some(last_recommit_epoch));
1647 } else {
1648 tracing::info!(
1649 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1650 self.param.sink_id.sink_id()
1651 );
1652 return Ok(None);
1653 }
1654 }
1655
1656 tracing::info!("Iceberg commit coordinator inited.");
1657 return Ok(None);
1658 }
1659
1660 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1661 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1662 let write_results: Vec<IcebergCommitResult> = metadata
1663 .iter()
1664 .map(IcebergCommitResult::try_from)
1665 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1666
1667 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1669 tracing::debug!(?epoch, "no data to commit");
1670 return Ok(());
1671 }
1672
1673 if write_results
1675 .iter()
1676 .any(|r| r.schema_id != write_results[0].schema_id)
1677 || write_results
1678 .iter()
1679 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1680 {
1681 return Err(SinkError::Iceberg(anyhow!(
1682 "schema_id and partition_spec_id should be the same in all write results"
1683 )));
1684 }
1685
1686 if self.is_exactly_once {
1687 assert!(self.committed_epoch_subscriber.is_some());
1688 match self.committed_epoch_subscriber.clone() {
1689 Some(committed_epoch_subscriber) => {
1690 let (committed_epoch, mut rw_futures_utilrx) =
1692 committed_epoch_subscriber(self.param.sink_id).await?;
1693 if committed_epoch >= epoch {
1695 self.commit_iceberg_inner(epoch, write_results, None)
1696 .await?;
1697 } else {
1698 tracing::info!(
1699 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1700 committed_epoch,
1701 epoch
1702 );
1703 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1704 tracing::info!(
1705 "Received next committed epoch: {}",
1706 next_committed_epoch
1707 );
1708 if next_committed_epoch >= epoch {
1710 self.commit_iceberg_inner(epoch, write_results, None)
1711 .await?;
1712 break;
1713 }
1714 }
1715 }
1716 }
1717 None => unreachable!(
1718 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1719 ),
1720 }
1721 } else {
1722 self.commit_iceberg_inner(epoch, write_results, None)
1723 .await?;
1724 }
1725
1726 Ok(())
1727 }
1728}
1729
1730impl IcebergSinkCommitter {
1732 async fn re_commit(
1733 &mut self,
1734 epoch: u64,
1735 write_results: Vec<IcebergCommitResult>,
1736 snapshot_id: i64,
1737 ) -> Result<()> {
1738 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1739
1740 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1742 tracing::debug!(?epoch, "no data to commit");
1743 return Ok(());
1744 }
1745 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1746 .await?;
1747 Ok(())
1748 }
1749
1750 async fn commit_iceberg_inner(
1751 &mut self,
1752 epoch: u64,
1753 write_results: Vec<IcebergCommitResult>,
1754 snapshot_id: Option<i64>,
1755 ) -> Result<()> {
1756 let is_first_commit = snapshot_id.is_none();
1760 self.last_commit_epoch = epoch;
1761 let expect_schema_id = write_results[0].schema_id;
1762 let expect_partition_spec_id = write_results[0].partition_spec_id;
1763
1764 self.table = Self::reload_table(
1766 self.catalog.as_ref(),
1767 self.table.identifier(),
1768 expect_schema_id,
1769 expect_partition_spec_id,
1770 )
1771 .await?;
1772 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1773 return Err(SinkError::Iceberg(anyhow!(
1774 "Can't find schema by id {}",
1775 expect_schema_id
1776 )));
1777 };
1778 let Some(partition_spec) = self
1779 .table
1780 .metadata()
1781 .partition_spec_by_id(expect_partition_spec_id)
1782 else {
1783 return Err(SinkError::Iceberg(anyhow!(
1784 "Can't find partition spec by id {}",
1785 expect_partition_spec_id
1786 )));
1787 };
1788 let partition_type = partition_spec
1789 .as_ref()
1790 .clone()
1791 .partition_type(schema)
1792 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1793
1794 let txn = Transaction::new(&self.table);
1795 let snapshot_id = match snapshot_id {
1797 Some(previous_snapshot_id) => previous_snapshot_id,
1798 None => txn.generate_unique_snapshot_id(),
1799 };
1800 if self.is_exactly_once && is_first_commit {
1801 let mut pre_commit_metadata_bytes = Vec::new();
1803 for each_parallelism_write_result in write_results.clone() {
1804 let each_parallelism_write_result_bytes: Vec<u8> =
1805 each_parallelism_write_result.try_into()?;
1806 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1807 }
1808
1809 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1810
1811 persist_pre_commit_metadata(
1812 self.sink_id,
1813 self.db.clone(),
1814 self.last_commit_epoch,
1815 epoch,
1816 pre_commit_metadata_bytes,
1817 snapshot_id,
1818 )
1819 .await?;
1820 }
1821
1822 let data_files = write_results
1823 .into_iter()
1824 .flat_map(|r| {
1825 r.data_files.into_iter().map(|f| {
1826 f.try_into(expect_partition_spec_id, &partition_type, schema)
1827 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1828 })
1829 })
1830 .collect::<Result<Vec<DataFile>>>()?;
1831 let retry_strategy = ExponentialBackoff::from_millis(10)
1836 .max_delay(Duration::from_secs(60))
1837 .map(jitter)
1838 .take(self.commit_retry_num as usize);
1839 let catalog = self.catalog.clone();
1840 let table_ident = self.table.identifier().clone();
1841 let table = Retry::spawn(retry_strategy, || async {
1842 let table = Self::reload_table(
1843 catalog.as_ref(),
1844 &table_ident,
1845 expect_schema_id,
1846 expect_partition_spec_id,
1847 )
1848 .await?;
1849 let txn = Transaction::new(&table);
1850 let mut append_action = txn
1851 .fast_append(Some(snapshot_id), None, vec![])
1852 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1853 .with_to_branch(commit_branch(
1854 self.config.r#type.as_str(),
1855 self.config.write_mode.as_str(),
1856 ));
1857 append_action
1858 .add_data_files(data_files.clone())
1859 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1860 let tx = append_action.apply().await.map_err(|err| {
1861 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1862 SinkError::Iceberg(anyhow!(err))
1863 })?;
1864 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1865 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1866 SinkError::Iceberg(anyhow!(err))
1867 })
1868 })
1869 .await?;
1870 self.table = table;
1871
1872 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1873
1874 if self.is_exactly_once {
1875 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1876 tracing::info!(
1877 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1878 self.sink_id,
1879 epoch
1880 );
1881
1882 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1883 }
1884 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1885 && self.config.enable_compaction
1886 && iceberg_compact_stat_sender
1887 .send(IcebergSinkCompactionUpdate {
1888 sink_id: SinkId::new(self.sink_id),
1889 compaction_interval: self.config.compaction_interval_sec(),
1890 })
1891 .is_err()
1892 {
1893 warn!("failed to send iceberg compaction stats");
1894 }
1895
1896 Ok(())
1897 }
1898
1899 async fn is_snapshot_id_in_iceberg(
1903 &self,
1904 iceberg_config: &IcebergConfig,
1905 snapshot_id: i64,
1906 ) -> Result<bool> {
1907 let iceberg_common = iceberg_config.common.clone();
1908 let table = iceberg_common
1909 .load_table(&iceberg_config.java_catalog_props)
1910 .await?;
1911 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1912 Ok(true)
1913 } else {
1914 Ok(false)
1915 }
1916 }
1917}
1918
1919const MAP_KEY: &str = "key";
1920const MAP_VALUE: &str = "value";
1921
1922fn get_fields<'a>(
1923 our_field_type: &'a risingwave_common::types::DataType,
1924 data_type: &ArrowDataType,
1925 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1926) -> Option<ArrowFields> {
1927 match data_type {
1928 ArrowDataType::Struct(fields) => {
1929 match our_field_type {
1930 risingwave_common::types::DataType::Struct(struct_fields) => {
1931 struct_fields.iter().for_each(|(name, data_type)| {
1932 let res = schema_fields.insert(name, data_type);
1933 assert!(res.is_none())
1935 });
1936 }
1937 risingwave_common::types::DataType::Map(map_fields) => {
1938 schema_fields.insert(MAP_KEY, map_fields.key());
1939 schema_fields.insert(MAP_VALUE, map_fields.value());
1940 }
1941 risingwave_common::types::DataType::List(list_field) => {
1942 list_field.as_struct().iter().for_each(|(name, data_type)| {
1943 let res = schema_fields.insert(name, data_type);
1944 assert!(res.is_none())
1946 });
1947 }
1948 _ => {}
1949 };
1950 Some(fields.clone())
1951 }
1952 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1953 get_fields(our_field_type, field.data_type(), schema_fields)
1954 }
1955 _ => None, }
1957}
1958
1959fn check_compatibility(
1960 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1961 fields: &ArrowFields,
1962) -> anyhow::Result<bool> {
1963 for arrow_field in fields {
1964 let our_field_type = schema_fields
1965 .get(arrow_field.name().as_str())
1966 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1967
1968 let converted_arrow_data_type = IcebergArrowConvert
1970 .to_arrow_field("", our_field_type)
1971 .map_err(|e| anyhow!(e))?
1972 .data_type()
1973 .clone();
1974
1975 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1976 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1977 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1978 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1979 (ArrowDataType::List(_), ArrowDataType::List(field))
1980 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1981 let mut schema_fields = HashMap::new();
1982 get_fields(our_field_type, field.data_type(), &mut schema_fields)
1983 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1984 }
1985 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1987 let mut schema_fields = HashMap::new();
1988 our_field_type
1989 .as_struct()
1990 .iter()
1991 .for_each(|(name, data_type)| {
1992 let res = schema_fields.insert(name, data_type);
1993 assert!(res.is_none())
1995 });
1996 check_compatibility(schema_fields, fields)?
1997 }
1998 (left, right) => left.equals_datatype(right),
2006 };
2007 if !compatible {
2008 bail!(
2009 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2010 arrow_field.name(),
2011 converted_arrow_data_type,
2012 arrow_field.data_type()
2013 );
2014 }
2015 }
2016 Ok(true)
2017}
2018
2019pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2021 if rw_schema.fields.len() != arrow_schema.fields().len() {
2022 bail!(
2023 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2024 rw_schema.fields.len(),
2025 arrow_schema.fields.len()
2026 );
2027 }
2028
2029 let mut schema_fields = HashMap::new();
2030 rw_schema.fields.iter().for_each(|field| {
2031 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2032 assert!(res.is_none())
2034 });
2035
2036 check_compatibility(schema_fields, &arrow_schema.fields)?;
2037 Ok(())
2038}
2039
2040pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2041 serde_json::to_vec(&metadata).unwrap()
2042}
2043
2044pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2045 serde_json::from_slice(&bytes).unwrap()
2046}
2047
2048pub fn parse_partition_by_exprs(
2049 expr: String,
2050) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2051 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2053 if !re.is_match(&expr) {
2054 bail!(format!(
2055 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2056 expr
2057 ))
2058 }
2059 let caps = re.captures_iter(&expr);
2060
2061 let mut partition_columns = vec![];
2062
2063 for mat in caps {
2064 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2065 (&mat["transform"], Transform::Identity)
2066 } else {
2067 let mut func = mat["transform"].to_owned();
2068 if func == "bucket" || func == "truncate" {
2069 let n = &mat
2070 .name("n")
2071 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2072 .as_str();
2073 func = format!("{func}[{n}]");
2074 }
2075 (
2076 &mat["field"],
2077 Transform::from_str(&func)
2078 .with_context(|| format!("invalid transform function {}", func))?,
2079 )
2080 };
2081 partition_columns.push((column.to_owned(), transform));
2082 }
2083 Ok(partition_columns)
2084}
2085
2086pub fn commit_branch(sink_type: &str, write_mode: &str) -> String {
2087 if should_enable_iceberg_cow(sink_type, write_mode) {
2088 ICEBERG_COW_BRANCH.to_owned()
2089 } else {
2090 MAIN_BRANCH.to_owned()
2091 }
2092}
2093
2094pub fn should_enable_iceberg_cow(sink_type: &str, write_mode: &str) -> bool {
2095 sink_type == SINK_TYPE_UPSERT && write_mode == ICEBERG_WRITE_MODE_COPY_ON_WRITE
2096}
2097
2098#[cfg(test)]
2099mod test {
2100 use std::collections::BTreeMap;
2101
2102 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2103 use risingwave_common::catalog::Field;
2104 use risingwave_common::types::{DataType, MapType, StructType};
2105
2106 use crate::connector_common::IcebergCommon;
2107 use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2108 use crate::sink::iceberg::{ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergConfig};
2109
2110 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2113 fn test_compatible_arrow_schema() {
2114 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2115
2116 use super::*;
2117 let risingwave_schema = Schema::new(vec![
2118 Field::with_name(DataType::Int32, "a"),
2119 Field::with_name(DataType::Int32, "b"),
2120 Field::with_name(DataType::Int32, "c"),
2121 ]);
2122 let arrow_schema = ArrowSchema::new(vec![
2123 ArrowField::new("a", ArrowDataType::Int32, false),
2124 ArrowField::new("b", ArrowDataType::Int32, false),
2125 ArrowField::new("c", ArrowDataType::Int32, false),
2126 ]);
2127
2128 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2129
2130 let risingwave_schema = Schema::new(vec![
2131 Field::with_name(DataType::Int32, "d"),
2132 Field::with_name(DataType::Int32, "c"),
2133 Field::with_name(DataType::Int32, "a"),
2134 Field::with_name(DataType::Int32, "b"),
2135 ]);
2136 let arrow_schema = ArrowSchema::new(vec![
2137 ArrowField::new("a", ArrowDataType::Int32, false),
2138 ArrowField::new("b", ArrowDataType::Int32, false),
2139 ArrowField::new("d", ArrowDataType::Int32, false),
2140 ArrowField::new("c", ArrowDataType::Int32, false),
2141 ]);
2142 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2143
2144 let risingwave_schema = Schema::new(vec![
2145 Field::with_name(
2146 DataType::Struct(StructType::new(vec![
2147 ("a1", DataType::Int32),
2148 (
2149 "a2",
2150 DataType::Struct(StructType::new(vec![
2151 ("a21", DataType::Bytea),
2152 (
2153 "a22",
2154 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2155 ),
2156 ])),
2157 ),
2158 ])),
2159 "a",
2160 ),
2161 Field::with_name(
2162 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2163 ("b1", DataType::Int32),
2164 ("b2", DataType::Bytea),
2165 (
2166 "b3",
2167 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2168 ),
2169 ])))),
2170 "b",
2171 ),
2172 Field::with_name(
2173 DataType::Map(MapType::from_kv(
2174 DataType::Varchar,
2175 DataType::List(Box::new(DataType::Struct(StructType::new([
2176 ("c1", DataType::Int32),
2177 ("c2", DataType::Bytea),
2178 (
2179 "c3",
2180 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2181 ),
2182 ])))),
2183 )),
2184 "c",
2185 ),
2186 ]);
2187 let arrow_schema = ArrowSchema::new(vec![
2188 ArrowField::new(
2189 "a",
2190 ArrowDataType::Struct(ArrowFields::from(vec![
2191 ArrowField::new("a1", ArrowDataType::Int32, false),
2192 ArrowField::new(
2193 "a2",
2194 ArrowDataType::Struct(ArrowFields::from(vec![
2195 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2196 ArrowField::new_map(
2197 "a22",
2198 "entries",
2199 ArrowFieldRef::new(ArrowField::new(
2200 "key",
2201 ArrowDataType::Utf8,
2202 false,
2203 )),
2204 ArrowFieldRef::new(ArrowField::new(
2205 "value",
2206 ArrowDataType::Utf8,
2207 false,
2208 )),
2209 false,
2210 false,
2211 ),
2212 ])),
2213 false,
2214 ),
2215 ])),
2216 false,
2217 ),
2218 ArrowField::new(
2219 "b",
2220 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2221 ArrowDataType::Struct(ArrowFields::from(vec![
2222 ArrowField::new("b1", ArrowDataType::Int32, false),
2223 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2224 ArrowField::new_map(
2225 "b3",
2226 "entries",
2227 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2228 ArrowFieldRef::new(ArrowField::new(
2229 "value",
2230 ArrowDataType::Utf8,
2231 false,
2232 )),
2233 false,
2234 false,
2235 ),
2236 ])),
2237 false,
2238 ))),
2239 false,
2240 ),
2241 ArrowField::new_map(
2242 "c",
2243 "entries",
2244 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2245 ArrowFieldRef::new(ArrowField::new(
2246 "value",
2247 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2248 ArrowDataType::Struct(ArrowFields::from(vec![
2249 ArrowField::new("c1", ArrowDataType::Int32, false),
2250 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2251 ArrowField::new_map(
2252 "c3",
2253 "entries",
2254 ArrowFieldRef::new(ArrowField::new(
2255 "key",
2256 ArrowDataType::Utf8,
2257 false,
2258 )),
2259 ArrowFieldRef::new(ArrowField::new(
2260 "value",
2261 ArrowDataType::Utf8,
2262 false,
2263 )),
2264 false,
2265 false,
2266 ),
2267 ])),
2268 false,
2269 ))),
2270 false,
2271 )),
2272 false,
2273 false,
2274 ),
2275 ]);
2276 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2277 }
2278
2279 #[test]
2280 fn test_parse_iceberg_config() {
2281 let values = [
2282 ("connector", "iceberg"),
2283 ("type", "upsert"),
2284 ("primary_key", "v1"),
2285 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2286 ("warehouse.path", "s3://iceberg"),
2287 ("s3.endpoint", "http://127.0.0.1:9301"),
2288 ("s3.access.key", "hummockadmin"),
2289 ("s3.secret.key", "hummockadmin"),
2290 ("s3.path.style.access", "true"),
2291 ("s3.region", "us-east-1"),
2292 ("catalog.type", "jdbc"),
2293 ("catalog.name", "demo"),
2294 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2295 ("catalog.jdbc.user", "admin"),
2296 ("catalog.jdbc.password", "123456"),
2297 ("database.name", "demo_db"),
2298 ("table.name", "demo_table"),
2299 ("enable_compaction", "true"),
2300 ("compaction_interval_sec", "1800"),
2301 ("enable_snapshot_expiration", "true"),
2302 ]
2303 .into_iter()
2304 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2305 .collect();
2306
2307 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2308
2309 let expected_iceberg_config = IcebergConfig {
2310 common: IcebergCommon {
2311 warehouse_path: Some("s3://iceberg".to_owned()),
2312 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2313 region: Some("us-east-1".to_owned()),
2314 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2315 access_key: Some("hummockadmin".to_owned()),
2316 secret_key: Some("hummockadmin".to_owned()),
2317 gcs_credential: None,
2318 catalog_type: Some("jdbc".to_owned()),
2319 glue_id: None,
2320 catalog_name: Some("demo".to_owned()),
2321 database_name: Some("demo_db".to_owned()),
2322 table_name: "demo_table".to_owned(),
2323 path_style_access: Some(true),
2324 credential: None,
2325 oauth2_server_uri: None,
2326 scope: None,
2327 token: None,
2328 enable_config_load: None,
2329 rest_signing_name: None,
2330 rest_signing_region: None,
2331 rest_sigv4_enabled: None,
2332 hosted_catalog: None,
2333 azblob_account_name: None,
2334 azblob_account_key: None,
2335 azblob_endpoint_url: None,
2336 header: None,
2337 },
2338 r#type: "upsert".to_owned(),
2339 force_append_only: false,
2340 primary_key: Some(vec!["v1".to_owned()]),
2341 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2342 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2343 .into_iter()
2344 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2345 .collect(),
2346 commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2347 create_table_if_not_exists: false,
2348 is_exactly_once: None,
2349 commit_retry_num: 8,
2350 enable_compaction: true,
2351 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2352 enable_snapshot_expiration: true,
2353 write_mode: ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2354 };
2355
2356 assert_eq!(iceberg_config, expected_iceberg_config);
2357
2358 assert_eq!(
2359 &iceberg_config.common.full_table_name().unwrap().to_string(),
2360 "demo_db.demo_table"
2361 );
2362 }
2363
2364 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2365 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2366
2367 let table = iceberg_config.load_table().await.unwrap();
2368
2369 println!("{:?}", table.identifier());
2370 }
2371
2372 #[tokio::test]
2373 #[ignore]
2374 async fn test_storage_catalog() {
2375 let values = [
2376 ("connector", "iceberg"),
2377 ("type", "append-only"),
2378 ("force_append_only", "true"),
2379 ("s3.endpoint", "http://127.0.0.1:9301"),
2380 ("s3.access.key", "hummockadmin"),
2381 ("s3.secret.key", "hummockadmin"),
2382 ("s3.region", "us-east-1"),
2383 ("s3.path.style.access", "true"),
2384 ("catalog.name", "demo"),
2385 ("catalog.type", "storage"),
2386 ("warehouse.path", "s3://icebergdata/demo"),
2387 ("database.name", "s1"),
2388 ("table.name", "t1"),
2389 ]
2390 .into_iter()
2391 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2392 .collect();
2393
2394 test_create_catalog(values).await;
2395 }
2396
2397 #[tokio::test]
2398 #[ignore]
2399 async fn test_rest_catalog() {
2400 let values = [
2401 ("connector", "iceberg"),
2402 ("type", "append-only"),
2403 ("force_append_only", "true"),
2404 ("s3.endpoint", "http://127.0.0.1:9301"),
2405 ("s3.access.key", "hummockadmin"),
2406 ("s3.secret.key", "hummockadmin"),
2407 ("s3.region", "us-east-1"),
2408 ("s3.path.style.access", "true"),
2409 ("catalog.name", "demo"),
2410 ("catalog.type", "rest"),
2411 ("catalog.uri", "http://192.168.167.4:8181"),
2412 ("warehouse.path", "s3://icebergdata/demo"),
2413 ("database.name", "s1"),
2414 ("table.name", "t1"),
2415 ]
2416 .into_iter()
2417 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2418 .collect();
2419
2420 test_create_catalog(values).await;
2421 }
2422
2423 #[tokio::test]
2424 #[ignore]
2425 async fn test_jdbc_catalog() {
2426 let values = [
2427 ("connector", "iceberg"),
2428 ("type", "append-only"),
2429 ("force_append_only", "true"),
2430 ("s3.endpoint", "http://127.0.0.1:9301"),
2431 ("s3.access.key", "hummockadmin"),
2432 ("s3.secret.key", "hummockadmin"),
2433 ("s3.region", "us-east-1"),
2434 ("s3.path.style.access", "true"),
2435 ("catalog.name", "demo"),
2436 ("catalog.type", "jdbc"),
2437 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2438 ("catalog.jdbc.user", "admin"),
2439 ("catalog.jdbc.password", "123456"),
2440 ("warehouse.path", "s3://icebergdata/demo"),
2441 ("database.name", "s1"),
2442 ("table.name", "t1"),
2443 ]
2444 .into_iter()
2445 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2446 .collect();
2447
2448 test_create_catalog(values).await;
2449 }
2450
2451 #[tokio::test]
2452 #[ignore]
2453 async fn test_hive_catalog() {
2454 let values = [
2455 ("connector", "iceberg"),
2456 ("type", "append-only"),
2457 ("force_append_only", "true"),
2458 ("s3.endpoint", "http://127.0.0.1:9301"),
2459 ("s3.access.key", "hummockadmin"),
2460 ("s3.secret.key", "hummockadmin"),
2461 ("s3.region", "us-east-1"),
2462 ("s3.path.style.access", "true"),
2463 ("catalog.name", "demo"),
2464 ("catalog.type", "hive"),
2465 ("catalog.uri", "thrift://localhost:9083"),
2466 ("warehouse.path", "s3://icebergdata/demo"),
2467 ("database.name", "s1"),
2468 ("table.name", "t1"),
2469 ]
2470 .into_iter()
2471 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2472 .collect();
2473
2474 test_create_catalog(values).await;
2475 }
2476}