1pub mod exactly_once_util;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use await_tree::InstrumentAwait;
27use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
28use iceberg::spec::{
29 DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
30};
31use iceberg::table::Table;
32use iceberg::transaction::Transaction;
33use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
34use iceberg::writer::base_writer::equality_delete_writer::{
35 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
36};
37use iceberg::writer::base_writer::sort_position_delete_writer::{
38 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
39};
40use iceberg::writer::file_writer::ParquetWriterBuilder;
41use iceberg::writer::file_writer::location_generator::{
42 DefaultFileNameGenerator, DefaultLocationGenerator,
43};
44use iceberg::writer::function_writer::equality_delta_writer::{
45 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
46};
47use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
48use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
49use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
50use itertools::Itertools;
51use parquet::file::properties::WriterProperties;
52use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
53use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
54use regex::Regex;
55use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
56use risingwave_common::array::arrow::arrow_schema_iceberg::{
57 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
58 Schema as ArrowSchema, SchemaRef,
59};
60use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
61use risingwave_common::array::{Op, StreamChunk};
62use risingwave_common::bail;
63use risingwave_common::bitmap::Bitmap;
64use risingwave_common::catalog::Schema;
65use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
66use risingwave_common_estimate_size::EstimateSize;
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::DatabaseConnection;
71use serde_derive::Deserialize;
72use serde_json::from_value;
73use serde_with::{DisplayFromStr, serde_as};
74use thiserror_ext::AsReport;
75use tokio::sync::mpsc::UnboundedSender;
76use tokio_retry::Retry;
77use tokio_retry::strategy::{ExponentialBackoff, jitter};
78use tracing::warn;
79use url::Url;
80use uuid::Uuid;
81use with_options::WithOptions;
82
83use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
84use super::{
85 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
86 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
87};
88use crate::connector_common::{IcebergCommon, IcebergSinkCompactionUpdate};
89use crate::enforce_secret::EnforceSecret;
90use crate::sink::catalog::SinkId;
91use crate::sink::coordinate::CoordinatedLogSinker;
92use crate::sink::iceberg::exactly_once_util::*;
93use crate::sink::writer::SinkWriter;
94use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
95use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
96
97pub const ICEBERG_SINK: &str = "iceberg";
98
99fn default_commit_retry_num() -> u32 {
100 8
101}
102
103#[serde_as]
104#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
105pub struct IcebergConfig {
106 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
109 pub force_append_only: bool,
110
111 #[serde(flatten)]
112 common: IcebergCommon,
113
114 #[serde(
115 rename = "primary_key",
116 default,
117 deserialize_with = "deserialize_optional_string_seq_from_string"
118 )]
119 pub primary_key: Option<Vec<String>>,
120
121 #[serde(skip)]
123 pub java_catalog_props: HashMap<String, String>,
124
125 #[serde(default)]
126 pub partition_by: Option<String>,
127
128 #[serde(default = "default_commit_checkpoint_interval")]
130 #[serde_as(as = "DisplayFromStr")]
131 #[with_option(allow_alter_on_fly)]
132 pub commit_checkpoint_interval: u64,
133
134 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
135 pub create_table_if_not_exists: bool,
136
137 #[serde(default)]
139 #[serde_as(as = "Option<DisplayFromStr>")]
140 pub is_exactly_once: Option<bool>,
141 #[serde(default = "default_commit_retry_num")]
146 pub commit_retry_num: u32,
147
148 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
150 #[with_option(allow_alter_on_fly)]
151 pub enable_compaction: bool,
152
153 #[serde(default)]
155 #[serde_as(as = "Option<DisplayFromStr>")]
156 #[with_option(allow_alter_on_fly)]
157 pub compaction_interval_sec: Option<u64>,
158
159 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
161 #[with_option(allow_alter_on_fly)]
162 pub enable_snapshot_expiration: bool,
163}
164
165impl EnforceSecret for IcebergConfig {
166 fn enforce_secret<'a>(
167 prop_iter: impl Iterator<Item = &'a str>,
168 ) -> crate::error::ConnectorResult<()> {
169 for prop in prop_iter {
170 IcebergCommon::enforce_one(prop)?;
171 }
172 Ok(())
173 }
174
175 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
176 IcebergCommon::enforce_one(prop)
177 }
178}
179
180impl IcebergConfig {
181 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
182 let mut config =
183 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
184 .map_err(|e| SinkError::Config(anyhow!(e)))?;
185
186 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
187 return Err(SinkError::Config(anyhow!(
188 "`{}` must be {}, or {}",
189 SINK_TYPE_OPTION,
190 SINK_TYPE_APPEND_ONLY,
191 SINK_TYPE_UPSERT
192 )));
193 }
194
195 if config.r#type == SINK_TYPE_UPSERT {
196 if let Some(primary_key) = &config.primary_key {
197 if primary_key.is_empty() {
198 return Err(SinkError::Config(anyhow!(
199 "`primary_key` must not be empty in {}",
200 SINK_TYPE_UPSERT
201 )));
202 }
203 } else {
204 return Err(SinkError::Config(anyhow!(
205 "Must set `primary_key` in {}",
206 SINK_TYPE_UPSERT
207 )));
208 }
209 }
210
211 config.java_catalog_props = values
213 .iter()
214 .filter(|(k, _v)| {
215 k.starts_with("catalog.")
216 && k != &"catalog.uri"
217 && k != &"catalog.type"
218 && k != &"catalog.name"
219 })
220 .map(|(k, v)| (k[8..].to_string(), v.clone()))
221 .collect();
222
223 if config.commit_checkpoint_interval == 0 {
224 return Err(SinkError::Config(anyhow!(
225 "`commit_checkpoint_interval` must be greater than 0"
226 )));
227 }
228
229 Ok(config)
230 }
231
232 pub fn catalog_type(&self) -> &str {
233 self.common.catalog_type()
234 }
235
236 pub async fn load_table(&self) -> Result<Table> {
237 self.common
238 .load_table(&self.java_catalog_props)
239 .await
240 .map_err(Into::into)
241 }
242
243 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
244 self.common
245 .create_catalog(&self.java_catalog_props)
246 .await
247 .map_err(Into::into)
248 }
249
250 pub fn full_table_name(&self) -> Result<TableIdent> {
251 self.common.full_table_name().map_err(Into::into)
252 }
253
254 pub fn catalog_name(&self) -> String {
255 self.common.catalog_name()
256 }
257
258 pub fn compaction_interval_sec(&self) -> u64 {
259 self.compaction_interval_sec.unwrap_or(3600)
261 }
262}
263
264pub struct IcebergSink {
265 config: IcebergConfig,
266 param: SinkParam,
267 unique_column_ids: Option<Vec<usize>>,
269}
270
271impl EnforceSecret for IcebergSink {
272 fn enforce_secret<'a>(
273 prop_iter: impl Iterator<Item = &'a str>,
274 ) -> crate::error::ConnectorResult<()> {
275 for prop in prop_iter {
276 IcebergConfig::enforce_one(prop)?;
277 }
278 Ok(())
279 }
280}
281
282impl TryFrom<SinkParam> for IcebergSink {
283 type Error = SinkError;
284
285 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
286 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
287 IcebergSink::new(config, param)
288 }
289}
290
291impl Debug for IcebergSink {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 f.debug_struct("IcebergSink")
294 .field("config", &self.config)
295 .finish()
296 }
297}
298
299impl IcebergSink {
300 async fn create_and_validate_table(&self) -> Result<Table> {
301 if self.config.create_table_if_not_exists {
302 self.create_table_if_not_exists().await?;
303 }
304
305 let table = self
306 .config
307 .load_table()
308 .await
309 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
310
311 let sink_schema = self.param.schema();
312 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
313 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
314
315 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
316 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
317
318 Ok(table)
319 }
320
321 async fn create_table_if_not_exists(&self) -> Result<()> {
322 let catalog = self.config.create_catalog().await?;
323 let namespace = if let Some(database_name) = &self.config.common.database_name {
324 let namespace = NamespaceIdent::new(database_name.clone());
325 if !catalog
326 .namespace_exists(&namespace)
327 .await
328 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
329 {
330 catalog
331 .create_namespace(&namespace, HashMap::default())
332 .await
333 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
334 .context("failed to create iceberg namespace")?;
335 }
336 namespace
337 } else {
338 bail!("database name must be set if you want to create table")
339 };
340
341 let table_id = self
342 .config
343 .full_table_name()
344 .context("Unable to parse table name")?;
345 if !catalog
346 .table_exists(&table_id)
347 .await
348 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
349 {
350 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
351 let arrow_fields = self
353 .param
354 .columns
355 .iter()
356 .map(|column| {
357 Ok(iceberg_create_table_arrow_convert
358 .to_arrow_field(&column.name, &column.data_type)
359 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
360 .context(format!(
361 "failed to convert {}: {} to arrow type",
362 &column.name, &column.data_type
363 ))?)
364 })
365 .collect::<Result<Vec<ArrowField>>>()?;
366 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
367 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
368 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
369 .context("failed to convert arrow schema to iceberg schema")?;
370
371 let location = {
372 let mut names = namespace.clone().inner();
373 names.push(self.config.common.table_name.clone());
374 match &self.config.common.warehouse_path {
375 Some(warehouse_path) => {
376 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
377 let url = Url::parse(warehouse_path);
378 if url.is_err() || is_s3_tables {
379 if self.config.common.catalog_type() == "rest"
382 || self.config.common.catalog_type() == "rest_rust"
383 {
384 None
385 } else {
386 bail!(format!("Invalid warehouse path: {}", warehouse_path))
387 }
388 } else if warehouse_path.ends_with('/') {
389 Some(format!("{}{}", warehouse_path, names.join("/")))
390 } else {
391 Some(format!("{}/{}", warehouse_path, names.join("/")))
392 }
393 }
394 None => None,
395 }
396 };
397
398 let partition_spec = match &self.config.partition_by {
399 Some(partition_by) => {
400 let mut partition_fields = Vec::<UnboundPartitionField>::new();
401 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
402 .into_iter()
403 .enumerate()
404 {
405 match iceberg_schema.field_id_by_name(&column) {
406 Some(id) => partition_fields.push(
407 UnboundPartitionField::builder()
408 .source_id(id)
409 .transform(transform)
410 .name(format!("_p_{}", column))
411 .field_id(i as i32)
412 .build(),
413 ),
414 None => bail!(format!(
415 "Partition source column does not exist in schema: {}",
416 column
417 )),
418 };
419 }
420 Some(
421 UnboundPartitionSpec::builder()
422 .with_spec_id(0)
423 .add_partition_fields(partition_fields)
424 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
425 .context("failed to add partition columns")?
426 .build(),
427 )
428 }
429 None => None,
430 };
431
432 let table_creation_builder = TableCreation::builder()
433 .name(self.config.common.table_name.clone())
434 .schema(iceberg_schema);
435
436 let table_creation = match (location, partition_spec) {
437 (Some(location), Some(partition_spec)) => table_creation_builder
438 .location(location)
439 .partition_spec(partition_spec)
440 .build(),
441 (Some(location), None) => table_creation_builder.location(location).build(),
442 (None, Some(partition_spec)) => table_creation_builder
443 .partition_spec(partition_spec)
444 .build(),
445 (None, None) => table_creation_builder.build(),
446 };
447
448 catalog
449 .create_table(&namespace, table_creation)
450 .await
451 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
452 .context("failed to create iceberg table")?;
453 }
454 Ok(())
455 }
456
457 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
458 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
459 if let Some(pk) = &config.primary_key {
460 let mut unique_column_ids = Vec::with_capacity(pk.len());
461 for col_name in pk {
462 let id = param
463 .columns
464 .iter()
465 .find(|col| col.name.as_str() == col_name)
466 .ok_or_else(|| {
467 SinkError::Config(anyhow!(
468 "Primary key column {} not found in sink schema",
469 col_name
470 ))
471 })?
472 .column_id
473 .get_id() as usize;
474 unique_column_ids.push(id);
475 }
476 Some(unique_column_ids)
477 } else {
478 unreachable!()
479 }
480 } else {
481 None
482 };
483 Ok(Self {
484 config,
485 param,
486 unique_column_ids,
487 })
488 }
489}
490
491impl Sink for IcebergSink {
492 type Coordinator = IcebergSinkCommitter;
493 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
494
495 const SINK_NAME: &'static str = ICEBERG_SINK;
496
497 async fn validate(&self) -> Result<()> {
498 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
499 bail!("Snowflake catalog only supports iceberg sources");
500 }
501 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
502 risingwave_common::license::Feature::IcebergSinkWithGlue
503 .check_available()
504 .map_err(|e| anyhow::anyhow!(e))?;
505 }
506 if self.config.enable_compaction {
507 risingwave_common::license::Feature::IcebergCompaction
508 .check_available()
509 .map_err(|e| anyhow::anyhow!(e))?;
510 }
511
512 let _ = self.create_and_validate_table().await?;
513 Ok(())
514 }
515
516 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
517 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
518
519 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
520 if iceberg_config.enable_compaction {
521 if compaction_interval == 0 {
522 bail!(
523 "`compaction_interval_sec` must be greater than 0 when `enable_compaction` is true"
524 );
525 }
526 } else {
527 bail!("`compaction_interval_sec` can only be set when `enable_compaction` is true");
528 }
529 }
530
531 Ok(())
532 }
533
534 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
535 let table = self.create_and_validate_table().await?;
536 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
537 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
538 } else {
539 IcebergSinkWriter::new_append_only(table, &writer_param).await?
540 };
541
542 let commit_checkpoint_interval =
543 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
544 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
545 );
546 let writer = CoordinatedLogSinker::new(
547 &writer_param,
548 self.param.clone(),
549 inner,
550 commit_checkpoint_interval,
551 )
552 .await?;
553
554 Ok(writer)
555 }
556
557 fn is_coordinated_sink(&self) -> bool {
558 true
559 }
560
561 async fn new_coordinator(
562 &self,
563 db: DatabaseConnection,
564 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
565 ) -> Result<Self::Coordinator> {
566 let catalog = self.config.create_catalog().await?;
567 let table = self.create_and_validate_table().await?;
568 Ok(IcebergSinkCommitter {
569 catalog,
570 table,
571 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
572 last_commit_epoch: 0,
573 sink_id: self.param.sink_id.sink_id(),
574 config: self.config.clone(),
575 param: self.param.clone(),
576 db,
577 commit_retry_num: self.config.commit_retry_num,
578 committed_epoch_subscriber: None,
579 iceberg_compact_stat_sender,
580 })
581 }
582}
583
584enum ProjectIdxVec {
591 None,
592 Prepare(usize),
593 Done(Vec<usize>),
594}
595
596pub struct IcebergSinkWriter {
597 writer: IcebergWriterDispatch,
598 arrow_schema: SchemaRef,
599 metrics: IcebergWriterMetrics,
601 table: Table,
603 project_idx_vec: ProjectIdxVec,
606}
607
608#[allow(clippy::type_complexity)]
609enum IcebergWriterDispatch {
610 PartitionAppendOnly {
611 writer: Option<Box<dyn IcebergWriter>>,
612 writer_builder: MonitoredGeneralWriterBuilder<
613 FanoutPartitionWriterBuilder<
614 DataFileWriterBuilder<
615 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
616 >,
617 >,
618 >,
619 },
620 NonpartitionAppendOnly {
621 writer: Option<Box<dyn IcebergWriter>>,
622 writer_builder: MonitoredGeneralWriterBuilder<
623 DataFileWriterBuilder<
624 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
625 >,
626 >,
627 },
628 PartitionUpsert {
629 writer: Option<Box<dyn IcebergWriter>>,
630 writer_builder: MonitoredGeneralWriterBuilder<
631 FanoutPartitionWriterBuilder<
632 EqualityDeltaWriterBuilder<
633 DataFileWriterBuilder<
634 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
635 >,
636 MonitoredPositionDeleteWriterBuilder<
637 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
638 >,
639 EqualityDeleteFileWriterBuilder<
640 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
641 >,
642 >,
643 >,
644 >,
645 arrow_schema_with_op_column: SchemaRef,
646 },
647 NonpartitionUpsert {
648 writer: Option<Box<dyn IcebergWriter>>,
649 writer_builder: MonitoredGeneralWriterBuilder<
650 EqualityDeltaWriterBuilder<
651 DataFileWriterBuilder<
652 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
653 >,
654 MonitoredPositionDeleteWriterBuilder<
655 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
656 >,
657 EqualityDeleteFileWriterBuilder<
658 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
659 >,
660 >,
661 >,
662 arrow_schema_with_op_column: SchemaRef,
663 },
664}
665
666impl IcebergWriterDispatch {
667 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
668 match self {
669 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
670 | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
671 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
672 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
673 }
674 }
675}
676
677pub struct IcebergWriterMetrics {
678 _write_qps: LabelGuardedIntCounter,
683 _write_latency: LabelGuardedHistogram,
684 write_bytes: LabelGuardedIntCounter,
685}
686
687impl IcebergSinkWriter {
688 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
689 let SinkWriterParam {
690 extra_partition_col_idx,
691 actor_id,
692 sink_id,
693 sink_name,
694 ..
695 } = writer_param;
696 let metrics_labels = [
697 &actor_id.to_string(),
698 &sink_id.to_string(),
699 sink_name.as_str(),
700 ];
701
702 let write_qps = GLOBAL_SINK_METRICS
704 .iceberg_write_qps
705 .with_guarded_label_values(&metrics_labels);
706 let write_latency = GLOBAL_SINK_METRICS
707 .iceberg_write_latency
708 .with_guarded_label_values(&metrics_labels);
709 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
712 .iceberg_rolling_unflushed_data_file
713 .with_guarded_label_values(&metrics_labels);
714 let write_bytes = GLOBAL_SINK_METRICS
715 .iceberg_write_bytes
716 .with_guarded_label_values(&metrics_labels);
717
718 let schema = table.metadata().current_schema();
719 let partition_spec = table.metadata().default_partition_spec();
720
721 let unique_uuid_suffix = Uuid::now_v7();
723
724 let parquet_writer_builder = ParquetWriterBuilder::new(
725 WriterProperties::new(),
726 schema.clone(),
727 table.file_io().clone(),
728 DefaultLocationGenerator::new(table.metadata().clone())
729 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
730 DefaultFileNameGenerator::new(
731 writer_param.actor_id.to_string(),
732 Some(unique_uuid_suffix.to_string()),
733 iceberg::spec::DataFileFormat::Parquet,
734 ),
735 );
736 let data_file_builder =
737 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
738 if partition_spec.fields().is_empty() {
739 let writer_builder = MonitoredGeneralWriterBuilder::new(
740 data_file_builder,
741 write_qps.clone(),
742 write_latency.clone(),
743 );
744 let inner_writer = Some(Box::new(
745 writer_builder
746 .clone()
747 .build()
748 .await
749 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
750 ) as Box<dyn IcebergWriter>);
751 Ok(Self {
752 arrow_schema: Arc::new(
753 schema_to_arrow_schema(table.metadata().current_schema())
754 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
755 ),
756 metrics: IcebergWriterMetrics {
757 _write_qps: write_qps,
758 _write_latency: write_latency,
759 write_bytes,
760 },
761 writer: IcebergWriterDispatch::NonpartitionAppendOnly {
762 writer: inner_writer,
763 writer_builder,
764 },
765 table,
766 project_idx_vec: {
767 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
768 ProjectIdxVec::Prepare(*extra_partition_col_idx)
769 } else {
770 ProjectIdxVec::None
771 }
772 },
773 })
774 } else {
775 let partition_builder = MonitoredGeneralWriterBuilder::new(
776 FanoutPartitionWriterBuilder::new(
777 data_file_builder,
778 partition_spec.clone(),
779 schema.clone(),
780 )
781 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
782 write_qps.clone(),
783 write_latency.clone(),
784 );
785 let inner_writer = Some(Box::new(
786 partition_builder
787 .clone()
788 .build()
789 .await
790 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
791 ) as Box<dyn IcebergWriter>);
792 Ok(Self {
793 arrow_schema: Arc::new(
794 schema_to_arrow_schema(table.metadata().current_schema())
795 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
796 ),
797 metrics: IcebergWriterMetrics {
798 _write_qps: write_qps,
799 _write_latency: write_latency,
800 write_bytes,
801 },
802 writer: IcebergWriterDispatch::PartitionAppendOnly {
803 writer: inner_writer,
804 writer_builder: partition_builder,
805 },
806 table,
807 project_idx_vec: {
808 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
809 ProjectIdxVec::Prepare(*extra_partition_col_idx)
810 } else {
811 ProjectIdxVec::None
812 }
813 },
814 })
815 }
816 }
817
818 pub async fn new_upsert(
819 table: Table,
820 unique_column_ids: Vec<usize>,
821 writer_param: &SinkWriterParam,
822 ) -> Result<Self> {
823 let SinkWriterParam {
824 extra_partition_col_idx,
825 actor_id,
826 sink_id,
827 sink_name,
828 ..
829 } = writer_param;
830 let metrics_labels = [
831 &actor_id.to_string(),
832 &sink_id.to_string(),
833 sink_name.as_str(),
834 ];
835 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
836
837 let write_qps = GLOBAL_SINK_METRICS
839 .iceberg_write_qps
840 .with_guarded_label_values(&metrics_labels);
841 let write_latency = GLOBAL_SINK_METRICS
842 .iceberg_write_latency
843 .with_guarded_label_values(&metrics_labels);
844 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
847 .iceberg_rolling_unflushed_data_file
848 .with_guarded_label_values(&metrics_labels);
849 let position_delete_cache_num = GLOBAL_SINK_METRICS
850 .iceberg_position_delete_cache_num
851 .with_guarded_label_values(&metrics_labels);
852 let write_bytes = GLOBAL_SINK_METRICS
853 .iceberg_write_bytes
854 .with_guarded_label_values(&metrics_labels);
855
856 let schema = table.metadata().current_schema();
858 let partition_spec = table.metadata().default_partition_spec();
859
860 let unique_uuid_suffix = Uuid::now_v7();
862
863 let data_file_builder = {
864 let parquet_writer_builder = ParquetWriterBuilder::new(
865 WriterProperties::new(),
866 schema.clone(),
867 table.file_io().clone(),
868 DefaultLocationGenerator::new(table.metadata().clone())
869 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
870 DefaultFileNameGenerator::new(
871 writer_param.actor_id.to_string(),
872 Some(unique_uuid_suffix.to_string()),
873 iceberg::spec::DataFileFormat::Parquet,
874 ),
875 );
876 DataFileWriterBuilder::new(
877 parquet_writer_builder.clone(),
878 None,
879 partition_spec.spec_id(),
880 )
881 };
882 let position_delete_builder = {
883 let parquet_writer_builder = ParquetWriterBuilder::new(
884 WriterProperties::new(),
885 POSITION_DELETE_SCHEMA.clone(),
886 table.file_io().clone(),
887 DefaultLocationGenerator::new(table.metadata().clone())
888 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
889 DefaultFileNameGenerator::new(
890 writer_param.actor_id.to_string(),
891 Some(format!("pos-del-{}", unique_uuid_suffix)),
892 iceberg::spec::DataFileFormat::Parquet,
893 ),
894 );
895 MonitoredPositionDeleteWriterBuilder::new(
896 SortPositionDeleteWriterBuilder::new(
897 parquet_writer_builder.clone(),
898 writer_param
899 .streaming_config
900 .developer
901 .iceberg_sink_positional_delete_cache_size,
902 None,
903 None,
904 ),
905 position_delete_cache_num,
906 )
907 };
908 let equality_delete_builder = {
909 let config = EqualityDeleteWriterConfig::new(
910 unique_column_ids.clone(),
911 table.metadata().current_schema().clone(),
912 None,
913 partition_spec.spec_id(),
914 )
915 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
916 let parquet_writer_builder = ParquetWriterBuilder::new(
917 WriterProperties::new(),
918 Arc::new(
919 arrow_schema_to_schema(config.projected_arrow_schema_ref())
920 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
921 ),
922 table.file_io().clone(),
923 DefaultLocationGenerator::new(table.metadata().clone())
924 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
925 DefaultFileNameGenerator::new(
926 writer_param.actor_id.to_string(),
927 Some(format!("eq-del-{}", unique_uuid_suffix)),
928 iceberg::spec::DataFileFormat::Parquet,
929 ),
930 );
931
932 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
933 };
934 let delta_builder = EqualityDeltaWriterBuilder::new(
935 data_file_builder,
936 position_delete_builder,
937 equality_delete_builder,
938 unique_column_ids,
939 );
940 if partition_spec.fields().is_empty() {
941 let writer_builder = MonitoredGeneralWriterBuilder::new(
942 delta_builder,
943 write_qps.clone(),
944 write_latency.clone(),
945 );
946 let inner_writer = Some(Box::new(
947 writer_builder
948 .clone()
949 .build()
950 .await
951 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
952 ) as Box<dyn IcebergWriter>);
953 let original_arrow_schema = Arc::new(
954 schema_to_arrow_schema(table.metadata().current_schema())
955 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
956 );
957 let schema_with_extra_op_column = {
958 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
959 new_fields.push(Arc::new(ArrowField::new(
960 "op".to_owned(),
961 ArrowDataType::Int32,
962 false,
963 )));
964 Arc::new(ArrowSchema::new(new_fields))
965 };
966 Ok(Self {
967 arrow_schema: original_arrow_schema,
968 metrics: IcebergWriterMetrics {
969 _write_qps: write_qps,
970 _write_latency: write_latency,
971 write_bytes,
972 },
973 table,
974 writer: IcebergWriterDispatch::NonpartitionUpsert {
975 writer: inner_writer,
976 writer_builder,
977 arrow_schema_with_op_column: schema_with_extra_op_column,
978 },
979 project_idx_vec: {
980 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
981 ProjectIdxVec::Prepare(*extra_partition_col_idx)
982 } else {
983 ProjectIdxVec::None
984 }
985 },
986 })
987 } else {
988 let original_arrow_schema = Arc::new(
989 schema_to_arrow_schema(table.metadata().current_schema())
990 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
991 );
992 let schema_with_extra_op_column = {
993 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
994 new_fields.push(Arc::new(ArrowField::new(
995 "op".to_owned(),
996 ArrowDataType::Int32,
997 false,
998 )));
999 Arc::new(ArrowSchema::new(new_fields))
1000 };
1001 let partition_builder = MonitoredGeneralWriterBuilder::new(
1002 FanoutPartitionWriterBuilder::new_with_custom_schema(
1003 delta_builder,
1004 schema_with_extra_op_column.clone(),
1005 partition_spec.clone(),
1006 table.metadata().current_schema().clone(),
1007 ),
1008 write_qps.clone(),
1009 write_latency.clone(),
1010 );
1011 let inner_writer = Some(Box::new(
1012 partition_builder
1013 .clone()
1014 .build()
1015 .await
1016 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1017 ) as Box<dyn IcebergWriter>);
1018 Ok(Self {
1019 arrow_schema: original_arrow_schema,
1020 metrics: IcebergWriterMetrics {
1021 _write_qps: write_qps,
1022 _write_latency: write_latency,
1023 write_bytes,
1024 },
1025 table,
1026 writer: IcebergWriterDispatch::PartitionUpsert {
1027 writer: inner_writer,
1028 writer_builder: partition_builder,
1029 arrow_schema_with_op_column: schema_with_extra_op_column,
1030 },
1031 project_idx_vec: {
1032 if let Some(extra_partition_col_idx) = extra_partition_col_idx {
1033 ProjectIdxVec::Prepare(*extra_partition_col_idx)
1034 } else {
1035 ProjectIdxVec::None
1036 }
1037 },
1038 })
1039 }
1040 }
1041}
1042
1043#[async_trait]
1044impl SinkWriter for IcebergSinkWriter {
1045 type CommitMetadata = Option<SinkMetadata>;
1046
1047 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
1049 Ok(())
1051 }
1052
1053 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
1055 match &mut self.writer {
1057 IcebergWriterDispatch::PartitionAppendOnly {
1058 writer,
1059 writer_builder,
1060 } => {
1061 if writer.is_none() {
1062 *writer = Some(Box::new(
1063 writer_builder
1064 .clone()
1065 .build()
1066 .await
1067 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1068 ));
1069 }
1070 }
1071 IcebergWriterDispatch::NonpartitionAppendOnly {
1072 writer,
1073 writer_builder,
1074 } => {
1075 if writer.is_none() {
1076 *writer = Some(Box::new(
1077 writer_builder
1078 .clone()
1079 .build()
1080 .await
1081 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1082 ));
1083 }
1084 }
1085 IcebergWriterDispatch::PartitionUpsert {
1086 writer,
1087 writer_builder,
1088 ..
1089 } => {
1090 if writer.is_none() {
1091 *writer = Some(Box::new(
1092 writer_builder
1093 .clone()
1094 .build()
1095 .await
1096 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1097 ));
1098 }
1099 }
1100 IcebergWriterDispatch::NonpartitionUpsert {
1101 writer,
1102 writer_builder,
1103 ..
1104 } => {
1105 if writer.is_none() {
1106 *writer = Some(Box::new(
1107 writer_builder
1108 .clone()
1109 .build()
1110 .await
1111 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1112 ));
1113 }
1114 }
1115 };
1116
1117 let (mut chunk, ops) = chunk.compact().into_parts();
1119 match &self.project_idx_vec {
1120 ProjectIdxVec::None => {}
1121 ProjectIdxVec::Prepare(idx) => {
1122 if *idx >= chunk.columns().len() {
1123 return Err(SinkError::Iceberg(anyhow!(
1124 "invalid extra partition column index {}",
1125 idx
1126 )));
1127 }
1128 let project_idx_vec = (0..*idx)
1129 .chain(*idx + 1..chunk.columns().len())
1130 .collect_vec();
1131 chunk = chunk.project(&project_idx_vec);
1132 self.project_idx_vec = ProjectIdxVec::Done(project_idx_vec);
1133 }
1134 ProjectIdxVec::Done(idx_vec) => {
1135 chunk = chunk.project(idx_vec);
1136 }
1137 }
1138 if ops.is_empty() {
1139 return Ok(());
1140 }
1141 let write_batch_size = chunk.estimated_heap_size();
1142 let batch = match &self.writer {
1143 IcebergWriterDispatch::PartitionAppendOnly { .. }
1144 | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1145 let filters =
1147 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1148 chunk.set_visibility(filters);
1149 IcebergArrowConvert
1150 .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1151 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1152 }
1153 IcebergWriterDispatch::PartitionUpsert {
1154 arrow_schema_with_op_column,
1155 ..
1156 }
1157 | IcebergWriterDispatch::NonpartitionUpsert {
1158 arrow_schema_with_op_column,
1159 ..
1160 } => {
1161 let chunk = IcebergArrowConvert
1162 .to_record_batch(self.arrow_schema.clone(), &chunk)
1163 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1164 let ops = Arc::new(Int32Array::from(
1165 ops.iter()
1166 .map(|op| match op {
1167 Op::UpdateInsert | Op::Insert => INSERT_OP,
1168 Op::UpdateDelete | Op::Delete => DELETE_OP,
1169 })
1170 .collect_vec(),
1171 ));
1172 let mut columns = chunk.columns().to_vec();
1173 columns.push(ops);
1174 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1175 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1176 }
1177 };
1178
1179 let writer = self.writer.get_writer().unwrap();
1180 writer
1181 .write(batch)
1182 .instrument_await("iceberg_write")
1183 .await
1184 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1185 self.metrics.write_bytes.inc_by(write_batch_size as _);
1186 Ok(())
1187 }
1188
1189 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1192 if !is_checkpoint {
1194 return Ok(None);
1195 }
1196
1197 let close_result = match &mut self.writer {
1198 IcebergWriterDispatch::PartitionAppendOnly {
1199 writer,
1200 writer_builder,
1201 } => {
1202 let close_result = match writer.take() {
1203 Some(mut writer) => {
1204 Some(writer.close().instrument_await("iceberg_close").await)
1205 }
1206 _ => None,
1207 };
1208 match writer_builder.clone().build().await {
1209 Ok(new_writer) => {
1210 *writer = Some(Box::new(new_writer));
1211 }
1212 _ => {
1213 warn!("Failed to build new writer after close");
1216 }
1217 }
1218 close_result
1219 }
1220 IcebergWriterDispatch::NonpartitionAppendOnly {
1221 writer,
1222 writer_builder,
1223 } => {
1224 let close_result = match writer.take() {
1225 Some(mut writer) => {
1226 Some(writer.close().instrument_await("iceberg_close").await)
1227 }
1228 _ => None,
1229 };
1230 match writer_builder.clone().build().await {
1231 Ok(new_writer) => {
1232 *writer = Some(Box::new(new_writer));
1233 }
1234 _ => {
1235 warn!("Failed to build new writer after close");
1238 }
1239 }
1240 close_result
1241 }
1242 IcebergWriterDispatch::PartitionUpsert {
1243 writer,
1244 writer_builder,
1245 ..
1246 } => {
1247 let close_result = match writer.take() {
1248 Some(mut writer) => {
1249 Some(writer.close().instrument_await("iceberg_close").await)
1250 }
1251 _ => None,
1252 };
1253 match writer_builder.clone().build().await {
1254 Ok(new_writer) => {
1255 *writer = Some(Box::new(new_writer));
1256 }
1257 _ => {
1258 warn!("Failed to build new writer after close");
1261 }
1262 }
1263 close_result
1264 }
1265 IcebergWriterDispatch::NonpartitionUpsert {
1266 writer,
1267 writer_builder,
1268 ..
1269 } => {
1270 let close_result = match writer.take() {
1271 Some(mut writer) => {
1272 Some(writer.close().instrument_await("iceberg_close").await)
1273 }
1274 _ => None,
1275 };
1276 match writer_builder.clone().build().await {
1277 Ok(new_writer) => {
1278 *writer = Some(Box::new(new_writer));
1279 }
1280 _ => {
1281 warn!("Failed to build new writer after close");
1284 }
1285 }
1286 close_result
1287 }
1288 };
1289
1290 match close_result {
1291 Some(Ok(result)) => {
1292 let version = self.table.metadata().format_version() as u8;
1293 let partition_type = self.table.metadata().default_partition_type();
1294 let data_files = result
1295 .into_iter()
1296 .map(|f| {
1297 SerializedDataFile::try_from(f, partition_type, version == 1)
1298 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1299 })
1300 .collect::<Result<Vec<_>>>()?;
1301 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1302 data_files,
1303 schema_id: self.table.metadata().current_schema_id(),
1304 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1305 })?))
1306 }
1307 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1308 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1309 }
1310 }
1311
1312 async fn abort(&mut self) -> Result<()> {
1314 Ok(())
1316 }
1317}
1318
1319const SCHEMA_ID: &str = "schema_id";
1320const PARTITION_SPEC_ID: &str = "partition_spec_id";
1321const DATA_FILES: &str = "data_files";
1322
1323#[derive(Default, Clone)]
1324struct IcebergCommitResult {
1325 schema_id: i32,
1326 partition_spec_id: i32,
1327 data_files: Vec<SerializedDataFile>,
1328}
1329
1330impl IcebergCommitResult {
1331 fn try_from(value: &SinkMetadata) -> Result<Self> {
1332 if let Some(Serialized(v)) = &value.metadata {
1333 let mut values = if let serde_json::Value::Object(v) =
1334 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1335 .context("Can't parse iceberg sink metadata")?
1336 {
1337 v
1338 } else {
1339 bail!("iceberg sink metadata should be an object");
1340 };
1341
1342 let schema_id;
1343 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1344 schema_id = value
1345 .as_u64()
1346 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1347 } else {
1348 bail!("iceberg sink metadata should have schema_id");
1349 }
1350
1351 let partition_spec_id;
1352 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1353 partition_spec_id = value
1354 .as_u64()
1355 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1356 } else {
1357 bail!("iceberg sink metadata should have partition_spec_id");
1358 }
1359
1360 let data_files: Vec<SerializedDataFile>;
1361 if let serde_json::Value::Array(values) = values
1362 .remove(DATA_FILES)
1363 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1364 {
1365 data_files = values
1366 .into_iter()
1367 .map(from_value::<SerializedDataFile>)
1368 .collect::<std::result::Result<_, _>>()
1369 .unwrap();
1370 } else {
1371 bail!("iceberg sink metadata should have data_files object");
1372 }
1373
1374 Ok(Self {
1375 schema_id: schema_id as i32,
1376 partition_spec_id: partition_spec_id as i32,
1377 data_files,
1378 })
1379 } else {
1380 bail!("Can't create iceberg sink write result from empty data!")
1381 }
1382 }
1383
1384 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1385 let mut values = if let serde_json::Value::Object(value) =
1386 serde_json::from_slice::<serde_json::Value>(&value)
1387 .context("Can't parse iceberg sink metadata")?
1388 {
1389 value
1390 } else {
1391 bail!("iceberg sink metadata should be an object");
1392 };
1393
1394 let schema_id;
1395 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1396 schema_id = value
1397 .as_u64()
1398 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1399 } else {
1400 bail!("iceberg sink metadata should have schema_id");
1401 }
1402
1403 let partition_spec_id;
1404 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1405 partition_spec_id = value
1406 .as_u64()
1407 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1408 } else {
1409 bail!("iceberg sink metadata should have partition_spec_id");
1410 }
1411
1412 let data_files: Vec<SerializedDataFile>;
1413 if let serde_json::Value::Array(values) = values
1414 .remove(DATA_FILES)
1415 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1416 {
1417 data_files = values
1418 .into_iter()
1419 .map(from_value::<SerializedDataFile>)
1420 .collect::<std::result::Result<_, _>>()
1421 .unwrap();
1422 } else {
1423 bail!("iceberg sink metadata should have data_files object");
1424 }
1425
1426 Ok(Self {
1427 schema_id: schema_id as i32,
1428 partition_spec_id: partition_spec_id as i32,
1429 data_files,
1430 })
1431 }
1432}
1433
1434impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1435 type Error = SinkError;
1436
1437 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1438 let json_data_files = serde_json::Value::Array(
1439 value
1440 .data_files
1441 .iter()
1442 .map(serde_json::to_value)
1443 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1444 .context("Can't serialize data files to json")?,
1445 );
1446 let json_value = serde_json::Value::Object(
1447 vec![
1448 (
1449 SCHEMA_ID.to_owned(),
1450 serde_json::Value::Number(value.schema_id.into()),
1451 ),
1452 (
1453 PARTITION_SPEC_ID.to_owned(),
1454 serde_json::Value::Number(value.partition_spec_id.into()),
1455 ),
1456 (DATA_FILES.to_owned(), json_data_files),
1457 ]
1458 .into_iter()
1459 .collect(),
1460 );
1461 Ok(SinkMetadata {
1462 metadata: Some(Serialized(SerializedMetadata {
1463 metadata: serde_json::to_vec(&json_value)
1464 .context("Can't serialize iceberg sink metadata")?,
1465 })),
1466 })
1467 }
1468}
1469
1470impl TryFrom<IcebergCommitResult> for Vec<u8> {
1471 type Error = SinkError;
1472
1473 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1474 let json_data_files = serde_json::Value::Array(
1475 value
1476 .data_files
1477 .iter()
1478 .map(serde_json::to_value)
1479 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1480 .context("Can't serialize data files to json")?,
1481 );
1482 let json_value = serde_json::Value::Object(
1483 vec![
1484 (
1485 SCHEMA_ID.to_owned(),
1486 serde_json::Value::Number(value.schema_id.into()),
1487 ),
1488 (
1489 PARTITION_SPEC_ID.to_owned(),
1490 serde_json::Value::Number(value.partition_spec_id.into()),
1491 ),
1492 (DATA_FILES.to_owned(), json_data_files),
1493 ]
1494 .into_iter()
1495 .collect(),
1496 );
1497 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1498 }
1499}
1500pub struct IcebergSinkCommitter {
1501 catalog: Arc<dyn Catalog>,
1502 table: Table,
1503 pub last_commit_epoch: u64,
1504 pub(crate) is_exactly_once: bool,
1505 pub(crate) sink_id: u32,
1506 pub(crate) config: IcebergConfig,
1507 pub(crate) param: SinkParam,
1508 pub(crate) db: DatabaseConnection,
1509 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1510 commit_retry_num: u32,
1511 pub(crate) iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
1512}
1513
1514impl IcebergSinkCommitter {
1515 async fn reload_table(
1518 catalog: &dyn Catalog,
1519 table_ident: &TableIdent,
1520 schema_id: i32,
1521 partition_spec_id: i32,
1522 ) -> Result<Table> {
1523 let table = catalog
1524 .load_table(table_ident)
1525 .await
1526 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1527 if table.metadata().current_schema_id() != schema_id {
1528 return Err(SinkError::Iceberg(anyhow!(
1529 "Schema evolution not supported, expect schema id {}, but got {}",
1530 schema_id,
1531 table.metadata().current_schema_id()
1532 )));
1533 }
1534 if table.metadata().default_partition_spec_id() != partition_spec_id {
1535 return Err(SinkError::Iceberg(anyhow!(
1536 "Partition evolution not supported, expect partition spec id {}, but got {}",
1537 partition_spec_id,
1538 table.metadata().default_partition_spec_id()
1539 )));
1540 }
1541 Ok(table)
1542 }
1543}
1544
1545#[async_trait::async_trait]
1546impl SinkCommitCoordinator for IcebergSinkCommitter {
1547 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1548 if self.is_exactly_once {
1549 self.committed_epoch_subscriber = Some(subscriber);
1550 tracing::info!(
1551 "Sink id = {}: iceberg sink coordinator initing.",
1552 self.param.sink_id.sink_id()
1553 );
1554 if iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id()).await? {
1555 let ordered_metadata_list_by_end_epoch =
1556 get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id()).await?;
1557
1558 let mut last_recommit_epoch = 0;
1559 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1560 ordered_metadata_list_by_end_epoch
1561 {
1562 let write_results_bytes = deserialize_metadata(sealized_bytes);
1563 let mut write_results = vec![];
1564
1565 for each in write_results_bytes {
1566 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1567 write_results.push(write_result);
1568 }
1569
1570 match (
1571 committed,
1572 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1573 .await?,
1574 ) {
1575 (true, _) => {
1576 tracing::info!(
1577 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1578 self.param.sink_id.sink_id()
1579 );
1580 }
1581 (false, true) => {
1582 tracing::info!(
1584 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1585 self.param.sink_id.sink_id()
1586 );
1587 mark_row_is_committed_by_sink_id_and_end_epoch(
1588 &self.db,
1589 self.sink_id,
1590 end_epoch,
1591 )
1592 .await?;
1593 }
1594 (false, false) => {
1595 tracing::info!(
1596 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1597 self.param.sink_id.sink_id()
1598 );
1599 self.re_commit(end_epoch, write_results, snapshot_id)
1600 .await?;
1601 }
1602 }
1603
1604 last_recommit_epoch = end_epoch;
1605 }
1606 tracing::info!(
1607 "Sink id = {}: iceberg commit coordinator inited.",
1608 self.param.sink_id.sink_id()
1609 );
1610 return Ok(Some(last_recommit_epoch));
1611 } else {
1612 tracing::info!(
1613 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1614 self.param.sink_id.sink_id()
1615 );
1616 return Ok(None);
1617 }
1618 }
1619
1620 tracing::info!("Iceberg commit coordinator inited.");
1621 return Ok(None);
1622 }
1623
1624 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1625 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1626 let write_results: Vec<IcebergCommitResult> = metadata
1627 .iter()
1628 .map(IcebergCommitResult::try_from)
1629 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1630
1631 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1633 tracing::debug!(?epoch, "no data to commit");
1634 return Ok(());
1635 }
1636
1637 if write_results
1639 .iter()
1640 .any(|r| r.schema_id != write_results[0].schema_id)
1641 || write_results
1642 .iter()
1643 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1644 {
1645 return Err(SinkError::Iceberg(anyhow!(
1646 "schema_id and partition_spec_id should be the same in all write results"
1647 )));
1648 }
1649
1650 if self.is_exactly_once {
1651 assert!(self.committed_epoch_subscriber.is_some());
1652 match self.committed_epoch_subscriber.clone() {
1653 Some(committed_epoch_subscriber) => {
1654 let (committed_epoch, mut rw_futures_utilrx) =
1656 committed_epoch_subscriber(self.param.sink_id).await?;
1657 if committed_epoch >= epoch {
1659 self.commit_iceberg_inner(epoch, write_results, None)
1660 .await?;
1661 } else {
1662 tracing::info!(
1663 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1664 committed_epoch,
1665 epoch
1666 );
1667 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1668 tracing::info!(
1669 "Received next committed epoch: {}",
1670 next_committed_epoch
1671 );
1672 if next_committed_epoch >= epoch {
1674 self.commit_iceberg_inner(epoch, write_results, None)
1675 .await?;
1676 break;
1677 }
1678 }
1679 }
1680 }
1681 None => unreachable!(
1682 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1683 ),
1684 }
1685 } else {
1686 self.commit_iceberg_inner(epoch, write_results, None)
1687 .await?;
1688 }
1689
1690 Ok(())
1691 }
1692}
1693
1694impl IcebergSinkCommitter {
1696 async fn re_commit(
1697 &mut self,
1698 epoch: u64,
1699 write_results: Vec<IcebergCommitResult>,
1700 snapshot_id: i64,
1701 ) -> Result<()> {
1702 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1703
1704 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1706 tracing::debug!(?epoch, "no data to commit");
1707 return Ok(());
1708 }
1709 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1710 .await?;
1711 Ok(())
1712 }
1713
1714 async fn commit_iceberg_inner(
1715 &mut self,
1716 epoch: u64,
1717 write_results: Vec<IcebergCommitResult>,
1718 snapshot_id: Option<i64>,
1719 ) -> Result<()> {
1720 let is_first_commit = snapshot_id.is_none();
1724 self.last_commit_epoch = epoch;
1725 let expect_schema_id = write_results[0].schema_id;
1726 let expect_partition_spec_id = write_results[0].partition_spec_id;
1727
1728 self.table = Self::reload_table(
1730 self.catalog.as_ref(),
1731 self.table.identifier(),
1732 expect_schema_id,
1733 expect_partition_spec_id,
1734 )
1735 .await?;
1736 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1737 return Err(SinkError::Iceberg(anyhow!(
1738 "Can't find schema by id {}",
1739 expect_schema_id
1740 )));
1741 };
1742 let Some(partition_spec) = self
1743 .table
1744 .metadata()
1745 .partition_spec_by_id(expect_partition_spec_id)
1746 else {
1747 return Err(SinkError::Iceberg(anyhow!(
1748 "Can't find partition spec by id {}",
1749 expect_partition_spec_id
1750 )));
1751 };
1752 let partition_type = partition_spec
1753 .as_ref()
1754 .clone()
1755 .partition_type(schema)
1756 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1757
1758 let txn = Transaction::new(&self.table);
1759 let snapshot_id = match snapshot_id {
1761 Some(previous_snapshot_id) => previous_snapshot_id,
1762 None => txn.generate_unique_snapshot_id(),
1763 };
1764 if self.is_exactly_once && is_first_commit {
1765 let mut pre_commit_metadata_bytes = Vec::new();
1767 for each_parallelism_write_result in write_results.clone() {
1768 let each_parallelism_write_result_bytes: Vec<u8> =
1769 each_parallelism_write_result.try_into()?;
1770 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1771 }
1772
1773 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1774
1775 persist_pre_commit_metadata(
1776 self.sink_id,
1777 self.db.clone(),
1778 self.last_commit_epoch,
1779 epoch,
1780 pre_commit_metadata_bytes,
1781 snapshot_id,
1782 )
1783 .await?;
1784 }
1785
1786 let data_files = write_results
1787 .into_iter()
1788 .flat_map(|r| {
1789 r.data_files.into_iter().map(|f| {
1790 f.try_into(expect_partition_spec_id, &partition_type, schema)
1791 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1792 })
1793 })
1794 .collect::<Result<Vec<DataFile>>>()?;
1795 let retry_strategy = ExponentialBackoff::from_millis(10)
1800 .max_delay(Duration::from_secs(60))
1801 .map(jitter)
1802 .take(self.commit_retry_num as usize);
1803 let catalog = self.catalog.clone();
1804 let table_ident = self.table.identifier().clone();
1805 let table = Retry::spawn(retry_strategy, || async {
1806 let table = Self::reload_table(
1807 catalog.as_ref(),
1808 &table_ident,
1809 expect_schema_id,
1810 expect_partition_spec_id,
1811 )
1812 .await?;
1813 let txn = Transaction::new(&table);
1814 let mut append_action = txn
1815 .fast_append(Some(snapshot_id), None, vec![])
1816 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1817 append_action
1818 .add_data_files(data_files.clone())
1819 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1820 let tx = append_action.apply().await.map_err(|err| {
1821 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1822 SinkError::Iceberg(anyhow!(err))
1823 })?;
1824 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1825 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1826 SinkError::Iceberg(anyhow!(err))
1827 })
1828 })
1829 .await?;
1830 self.table = table;
1831
1832 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1833
1834 if self.is_exactly_once {
1835 mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1836 tracing::info!(
1837 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1838 self.sink_id,
1839 epoch
1840 );
1841
1842 delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch).await?;
1843 }
1844 if let Some(iceberg_compact_stat_sender) = &self.iceberg_compact_stat_sender
1845 && self.config.enable_compaction
1846 && iceberg_compact_stat_sender
1847 .send(IcebergSinkCompactionUpdate {
1848 sink_id: SinkId::new(self.sink_id),
1849 compaction_interval: self.config.compaction_interval_sec(),
1850 })
1851 .is_err()
1852 {
1853 warn!("failed to send iceberg compaction stats");
1854 }
1855
1856 Ok(())
1857 }
1858
1859 async fn is_snapshot_id_in_iceberg(
1863 &self,
1864 iceberg_config: &IcebergConfig,
1865 snapshot_id: i64,
1866 ) -> Result<bool> {
1867 let iceberg_common = iceberg_config.common.clone();
1868 let table = iceberg_common
1869 .load_table(&iceberg_config.java_catalog_props)
1870 .await?;
1871 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1872 Ok(true)
1873 } else {
1874 Ok(false)
1875 }
1876 }
1877}
1878
1879const MAP_KEY: &str = "key";
1880const MAP_VALUE: &str = "value";
1881
1882fn get_fields<'a>(
1883 our_field_type: &'a risingwave_common::types::DataType,
1884 data_type: &ArrowDataType,
1885 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1886) -> Option<ArrowFields> {
1887 match data_type {
1888 ArrowDataType::Struct(fields) => {
1889 match our_field_type {
1890 risingwave_common::types::DataType::Struct(struct_fields) => {
1891 struct_fields.iter().for_each(|(name, data_type)| {
1892 let res = schema_fields.insert(name, data_type);
1893 assert!(res.is_none())
1895 });
1896 }
1897 risingwave_common::types::DataType::Map(map_fields) => {
1898 schema_fields.insert(MAP_KEY, map_fields.key());
1899 schema_fields.insert(MAP_VALUE, map_fields.value());
1900 }
1901 risingwave_common::types::DataType::List(list_field) => {
1902 list_field.as_struct().iter().for_each(|(name, data_type)| {
1903 let res = schema_fields.insert(name, data_type);
1904 assert!(res.is_none())
1906 });
1907 }
1908 _ => {}
1909 };
1910 Some(fields.clone())
1911 }
1912 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1913 get_fields(our_field_type, field.data_type(), schema_fields)
1914 }
1915 _ => None, }
1917}
1918
1919fn check_compatibility(
1920 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1921 fields: &ArrowFields,
1922) -> anyhow::Result<bool> {
1923 for arrow_field in fields {
1924 let our_field_type = schema_fields
1925 .get(arrow_field.name().as_str())
1926 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1927
1928 let converted_arrow_data_type = IcebergArrowConvert
1930 .to_arrow_field("", our_field_type)
1931 .map_err(|e| anyhow!(e))?
1932 .data_type()
1933 .clone();
1934
1935 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1936 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1937 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1938 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1939 (ArrowDataType::List(_), ArrowDataType::List(field))
1940 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1941 let mut schema_fields = HashMap::new();
1942 get_fields(our_field_type, field.data_type(), &mut schema_fields)
1943 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1944 }
1945 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1947 let mut schema_fields = HashMap::new();
1948 our_field_type
1949 .as_struct()
1950 .iter()
1951 .for_each(|(name, data_type)| {
1952 let res = schema_fields.insert(name, data_type);
1953 assert!(res.is_none())
1955 });
1956 check_compatibility(schema_fields, fields)?
1957 }
1958 (left, right) => left.equals_datatype(right),
1966 };
1967 if !compatible {
1968 bail!(
1969 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
1970 arrow_field.name(),
1971 converted_arrow_data_type,
1972 arrow_field.data_type()
1973 );
1974 }
1975 }
1976 Ok(true)
1977}
1978
1979pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
1981 if rw_schema.fields.len() != arrow_schema.fields().len() {
1982 bail!(
1983 "Schema length mismatch, risingwave is {}, and iceberg is {}",
1984 rw_schema.fields.len(),
1985 arrow_schema.fields.len()
1986 );
1987 }
1988
1989 let mut schema_fields = HashMap::new();
1990 rw_schema.fields.iter().for_each(|field| {
1991 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
1992 assert!(res.is_none())
1994 });
1995
1996 check_compatibility(schema_fields, &arrow_schema.fields)?;
1997 Ok(())
1998}
1999
2000pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2001 serde_json::to_vec(&metadata).unwrap()
2002}
2003
2004pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2005 serde_json::from_slice(&bytes).unwrap()
2006}
2007
2008pub fn parse_partition_by_exprs(
2009 expr: String,
2010) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
2011 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
2013 if !re.is_match(&expr) {
2014 bail!(format!(
2015 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
2016 expr
2017 ))
2018 }
2019 let caps = re.captures_iter(&expr);
2020
2021 let mut partition_columns = vec![];
2022
2023 for mat in caps {
2024 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
2025 (&mat["transform"], Transform::Identity)
2026 } else {
2027 let mut func = mat["transform"].to_owned();
2028 if func == "bucket" || func == "truncate" {
2029 let n = &mat
2030 .name("n")
2031 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
2032 .as_str();
2033 func = format!("{func}[{n}]");
2034 }
2035 (
2036 &mat["field"],
2037 Transform::from_str(&func)
2038 .with_context(|| format!("invalid transform function {}", func))?,
2039 )
2040 };
2041 partition_columns.push((column.to_owned(), transform));
2042 }
2043 Ok(partition_columns)
2044}
2045
2046#[cfg(test)]
2047mod test {
2048 use std::collections::BTreeMap;
2049
2050 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2051 use risingwave_common::catalog::Field;
2052 use risingwave_common::types::{DataType, MapType, StructType};
2053
2054 use crate::connector_common::IcebergCommon;
2055 use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2056 use crate::sink::iceberg::IcebergConfig;
2057
2058 pub const DEFAULT_ICEBERG_COMPACTION_INTERVAL: u64 = 3600; #[test]
2061 fn test_compatible_arrow_schema() {
2062 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2063
2064 use super::*;
2065 let risingwave_schema = Schema::new(vec![
2066 Field::with_name(DataType::Int32, "a"),
2067 Field::with_name(DataType::Int32, "b"),
2068 Field::with_name(DataType::Int32, "c"),
2069 ]);
2070 let arrow_schema = ArrowSchema::new(vec![
2071 ArrowField::new("a", ArrowDataType::Int32, false),
2072 ArrowField::new("b", ArrowDataType::Int32, false),
2073 ArrowField::new("c", ArrowDataType::Int32, false),
2074 ]);
2075
2076 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2077
2078 let risingwave_schema = Schema::new(vec![
2079 Field::with_name(DataType::Int32, "d"),
2080 Field::with_name(DataType::Int32, "c"),
2081 Field::with_name(DataType::Int32, "a"),
2082 Field::with_name(DataType::Int32, "b"),
2083 ]);
2084 let arrow_schema = ArrowSchema::new(vec![
2085 ArrowField::new("a", ArrowDataType::Int32, false),
2086 ArrowField::new("b", ArrowDataType::Int32, false),
2087 ArrowField::new("d", ArrowDataType::Int32, false),
2088 ArrowField::new("c", ArrowDataType::Int32, false),
2089 ]);
2090 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2091
2092 let risingwave_schema = Schema::new(vec![
2093 Field::with_name(
2094 DataType::Struct(StructType::new(vec![
2095 ("a1", DataType::Int32),
2096 (
2097 "a2",
2098 DataType::Struct(StructType::new(vec![
2099 ("a21", DataType::Bytea),
2100 (
2101 "a22",
2102 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2103 ),
2104 ])),
2105 ),
2106 ])),
2107 "a",
2108 ),
2109 Field::with_name(
2110 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2111 ("b1", DataType::Int32),
2112 ("b2", DataType::Bytea),
2113 (
2114 "b3",
2115 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2116 ),
2117 ])))),
2118 "b",
2119 ),
2120 Field::with_name(
2121 DataType::Map(MapType::from_kv(
2122 DataType::Varchar,
2123 DataType::List(Box::new(DataType::Struct(StructType::new([
2124 ("c1", DataType::Int32),
2125 ("c2", DataType::Bytea),
2126 (
2127 "c3",
2128 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2129 ),
2130 ])))),
2131 )),
2132 "c",
2133 ),
2134 ]);
2135 let arrow_schema = ArrowSchema::new(vec![
2136 ArrowField::new(
2137 "a",
2138 ArrowDataType::Struct(ArrowFields::from(vec![
2139 ArrowField::new("a1", ArrowDataType::Int32, false),
2140 ArrowField::new(
2141 "a2",
2142 ArrowDataType::Struct(ArrowFields::from(vec![
2143 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2144 ArrowField::new_map(
2145 "a22",
2146 "entries",
2147 ArrowFieldRef::new(ArrowField::new(
2148 "key",
2149 ArrowDataType::Utf8,
2150 false,
2151 )),
2152 ArrowFieldRef::new(ArrowField::new(
2153 "value",
2154 ArrowDataType::Utf8,
2155 false,
2156 )),
2157 false,
2158 false,
2159 ),
2160 ])),
2161 false,
2162 ),
2163 ])),
2164 false,
2165 ),
2166 ArrowField::new(
2167 "b",
2168 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2169 ArrowDataType::Struct(ArrowFields::from(vec![
2170 ArrowField::new("b1", ArrowDataType::Int32, false),
2171 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2172 ArrowField::new_map(
2173 "b3",
2174 "entries",
2175 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2176 ArrowFieldRef::new(ArrowField::new(
2177 "value",
2178 ArrowDataType::Utf8,
2179 false,
2180 )),
2181 false,
2182 false,
2183 ),
2184 ])),
2185 false,
2186 ))),
2187 false,
2188 ),
2189 ArrowField::new_map(
2190 "c",
2191 "entries",
2192 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2193 ArrowFieldRef::new(ArrowField::new(
2194 "value",
2195 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2196 ArrowDataType::Struct(ArrowFields::from(vec![
2197 ArrowField::new("c1", ArrowDataType::Int32, false),
2198 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2199 ArrowField::new_map(
2200 "c3",
2201 "entries",
2202 ArrowFieldRef::new(ArrowField::new(
2203 "key",
2204 ArrowDataType::Utf8,
2205 false,
2206 )),
2207 ArrowFieldRef::new(ArrowField::new(
2208 "value",
2209 ArrowDataType::Utf8,
2210 false,
2211 )),
2212 false,
2213 false,
2214 ),
2215 ])),
2216 false,
2217 ))),
2218 false,
2219 )),
2220 false,
2221 false,
2222 ),
2223 ]);
2224 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2225 }
2226
2227 #[test]
2228 fn test_parse_iceberg_config() {
2229 let values = [
2230 ("connector", "iceberg"),
2231 ("type", "upsert"),
2232 ("primary_key", "v1"),
2233 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2234 ("warehouse.path", "s3://iceberg"),
2235 ("s3.endpoint", "http://127.0.0.1:9301"),
2236 ("s3.access.key", "hummockadmin"),
2237 ("s3.secret.key", "hummockadmin"),
2238 ("s3.path.style.access", "true"),
2239 ("s3.region", "us-east-1"),
2240 ("catalog.type", "jdbc"),
2241 ("catalog.name", "demo"),
2242 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2243 ("catalog.jdbc.user", "admin"),
2244 ("catalog.jdbc.password", "123456"),
2245 ("database.name", "demo_db"),
2246 ("table.name", "demo_table"),
2247 ("enable_compaction", "true"),
2248 ("compaction_interval_sec", "1800"),
2249 ("enable_snapshot_expiration", "true"),
2250 ]
2251 .into_iter()
2252 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2253 .collect();
2254
2255 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2256
2257 let expected_iceberg_config = IcebergConfig {
2258 common: IcebergCommon {
2259 warehouse_path: Some("s3://iceberg".to_owned()),
2260 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2261 region: Some("us-east-1".to_owned()),
2262 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2263 access_key: Some("hummockadmin".to_owned()),
2264 secret_key: Some("hummockadmin".to_owned()),
2265 gcs_credential: None,
2266 catalog_type: Some("jdbc".to_owned()),
2267 glue_id: None,
2268 catalog_name: Some("demo".to_owned()),
2269 database_name: Some("demo_db".to_owned()),
2270 table_name: "demo_table".to_owned(),
2271 path_style_access: Some(true),
2272 credential: None,
2273 oauth2_server_uri: None,
2274 scope: None,
2275 token: None,
2276 enable_config_load: None,
2277 rest_signing_name: None,
2278 rest_signing_region: None,
2279 rest_sigv4_enabled: None,
2280 hosted_catalog: None,
2281 azblob_account_name: None,
2282 azblob_account_key: None,
2283 azblob_endpoint_url: None,
2284 },
2285 r#type: "upsert".to_owned(),
2286 force_append_only: false,
2287 primary_key: Some(vec!["v1".to_owned()]),
2288 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2289 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2290 .into_iter()
2291 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2292 .collect(),
2293 commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2294 create_table_if_not_exists: false,
2295 is_exactly_once: None,
2296 commit_retry_num: 8,
2297 enable_compaction: true,
2298 compaction_interval_sec: Some(DEFAULT_ICEBERG_COMPACTION_INTERVAL / 2),
2299 enable_snapshot_expiration: true,
2300 };
2301
2302 assert_eq!(iceberg_config, expected_iceberg_config);
2303
2304 assert_eq!(
2305 &iceberg_config.common.full_table_name().unwrap().to_string(),
2306 "demo_db.demo_table"
2307 );
2308 }
2309
2310 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2311 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2312
2313 let table = iceberg_config.load_table().await.unwrap();
2314
2315 println!("{:?}", table.identifier());
2316 }
2317
2318 #[tokio::test]
2319 #[ignore]
2320 async fn test_storage_catalog() {
2321 let values = [
2322 ("connector", "iceberg"),
2323 ("type", "append-only"),
2324 ("force_append_only", "true"),
2325 ("s3.endpoint", "http://127.0.0.1:9301"),
2326 ("s3.access.key", "hummockadmin"),
2327 ("s3.secret.key", "hummockadmin"),
2328 ("s3.region", "us-east-1"),
2329 ("s3.path.style.access", "true"),
2330 ("catalog.name", "demo"),
2331 ("catalog.type", "storage"),
2332 ("warehouse.path", "s3://icebergdata/demo"),
2333 ("database.name", "s1"),
2334 ("table.name", "t1"),
2335 ]
2336 .into_iter()
2337 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2338 .collect();
2339
2340 test_create_catalog(values).await;
2341 }
2342
2343 #[tokio::test]
2344 #[ignore]
2345 async fn test_rest_catalog() {
2346 let values = [
2347 ("connector", "iceberg"),
2348 ("type", "append-only"),
2349 ("force_append_only", "true"),
2350 ("s3.endpoint", "http://127.0.0.1:9301"),
2351 ("s3.access.key", "hummockadmin"),
2352 ("s3.secret.key", "hummockadmin"),
2353 ("s3.region", "us-east-1"),
2354 ("s3.path.style.access", "true"),
2355 ("catalog.name", "demo"),
2356 ("catalog.type", "rest"),
2357 ("catalog.uri", "http://192.168.167.4:8181"),
2358 ("warehouse.path", "s3://icebergdata/demo"),
2359 ("database.name", "s1"),
2360 ("table.name", "t1"),
2361 ]
2362 .into_iter()
2363 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2364 .collect();
2365
2366 test_create_catalog(values).await;
2367 }
2368
2369 #[tokio::test]
2370 #[ignore]
2371 async fn test_jdbc_catalog() {
2372 let values = [
2373 ("connector", "iceberg"),
2374 ("type", "append-only"),
2375 ("force_append_only", "true"),
2376 ("s3.endpoint", "http://127.0.0.1:9301"),
2377 ("s3.access.key", "hummockadmin"),
2378 ("s3.secret.key", "hummockadmin"),
2379 ("s3.region", "us-east-1"),
2380 ("s3.path.style.access", "true"),
2381 ("catalog.name", "demo"),
2382 ("catalog.type", "jdbc"),
2383 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2384 ("catalog.jdbc.user", "admin"),
2385 ("catalog.jdbc.password", "123456"),
2386 ("warehouse.path", "s3://icebergdata/demo"),
2387 ("database.name", "s1"),
2388 ("table.name", "t1"),
2389 ]
2390 .into_iter()
2391 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2392 .collect();
2393
2394 test_create_catalog(values).await;
2395 }
2396
2397 #[tokio::test]
2398 #[ignore]
2399 async fn test_hive_catalog() {
2400 let values = [
2401 ("connector", "iceberg"),
2402 ("type", "append-only"),
2403 ("force_append_only", "true"),
2404 ("s3.endpoint", "http://127.0.0.1:9301"),
2405 ("s3.access.key", "hummockadmin"),
2406 ("s3.secret.key", "hummockadmin"),
2407 ("s3.region", "us-east-1"),
2408 ("s3.path.style.access", "true"),
2409 ("catalog.name", "demo"),
2410 ("catalog.type", "hive"),
2411 ("catalog.uri", "thrift://localhost:9083"),
2412 ("warehouse.path", "s3://icebergdata/demo"),
2413 ("database.name", "s1"),
2414 ("table.name", "t1"),
2415 ]
2416 .into_iter()
2417 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2418 .collect();
2419
2420 test_create_catalog(values).await;
2421 }
2422}