1mod compaction;
16mod prometheus;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Debug;
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::{Context, anyhow};
25use async_trait::async_trait;
26use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
27use iceberg::spec::{
28 DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec,
29};
30use iceberg::table::Table;
31use iceberg::transaction::Transaction;
32use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
33use iceberg::writer::base_writer::equality_delete_writer::{
34 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
35};
36use iceberg::writer::base_writer::sort_position_delete_writer::{
37 POSITION_DELETE_SCHEMA, SortPositionDeleteWriterBuilder,
38};
39use iceberg::writer::file_writer::ParquetWriterBuilder;
40use iceberg::writer::file_writer::location_generator::{
41 DefaultFileNameGenerator, DefaultLocationGenerator,
42};
43use iceberg::writer::function_writer::equality_delta_writer::{
44 DELETE_OP, EqualityDeltaWriterBuilder, INSERT_OP,
45};
46use iceberg::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
47use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
48use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
49use itertools::Itertools;
50use parquet::file::properties::WriterProperties;
51use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder;
52use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
53use regex::Regex;
54use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch};
55use risingwave_common::array::arrow::arrow_schema_iceberg::{
56 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
57 Schema as ArrowSchema, SchemaRef,
58};
59use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
60use risingwave_common::array::{Op, StreamChunk};
61use risingwave_common::bail;
62use risingwave_common::bitmap::Bitmap;
63use risingwave_common::catalog::Schema;
64use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
65use risingwave_common_estimate_size::EstimateSize;
66use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
67use risingwave_pb::connector_service::SinkMetadata;
68use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
69use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
70use sea_orm::{
71 ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
72 Set,
73};
74use serde_derive::Deserialize;
75use serde_json::from_value;
76use serde_with::{DisplayFromStr, serde_as};
77use thiserror_ext::AsReport;
78use tokio::sync::mpsc::{self};
79use tokio::sync::oneshot;
80use tokio_retry::Retry;
81use tokio_retry::strategy::{ExponentialBackoff, jitter};
82use tracing::warn;
83use url::Url;
84use uuid::Uuid;
85use with_options::WithOptions;
86
87use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
88use super::{
89 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
90 SinkCommittedEpochSubscriber, SinkError, SinkWriterParam,
91};
92use crate::connector_common::IcebergCommon;
93use crate::sink::coordinate::CoordinatedLogSinker;
94use crate::sink::writer::SinkWriter;
95use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
96use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};
97
98pub const ICEBERG_SINK: &str = "iceberg";
99
100fn default_commit_retry_num() -> u32 {
101 8
102}
103
104#[serde_as]
105#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
106pub struct IcebergConfig {
107 pub r#type: String, #[serde(default, deserialize_with = "deserialize_bool_from_string")]
110 pub force_append_only: bool,
111
112 #[serde(flatten)]
113 common: IcebergCommon,
114
115 #[serde(
116 rename = "primary_key",
117 default,
118 deserialize_with = "deserialize_optional_string_seq_from_string"
119 )]
120 pub primary_key: Option<Vec<String>>,
121
122 #[serde(skip)]
124 pub java_catalog_props: HashMap<String, String>,
125
126 #[serde(default)]
127 pub partition_by: Option<String>,
128
129 #[serde(default = "default_commit_checkpoint_interval")]
131 #[serde_as(as = "DisplayFromStr")]
132 pub commit_checkpoint_interval: u64,
133
134 #[serde(default, deserialize_with = "deserialize_bool_from_string")]
135 pub create_table_if_not_exists: bool,
136
137 #[serde(default)]
139 #[serde_as(as = "Option<DisplayFromStr>")]
140 pub is_exactly_once: Option<bool>,
141 #[serde(default = "default_commit_retry_num")]
146 pub commit_retry_num: u32,
147}
148
149impl IcebergConfig {
150 pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
151 let mut config =
152 serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
153 .map_err(|e| SinkError::Config(anyhow!(e)))?;
154
155 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
156 return Err(SinkError::Config(anyhow!(
157 "`{}` must be {}, or {}",
158 SINK_TYPE_OPTION,
159 SINK_TYPE_APPEND_ONLY,
160 SINK_TYPE_UPSERT
161 )));
162 }
163
164 if config.r#type == SINK_TYPE_UPSERT {
165 if let Some(primary_key) = &config.primary_key {
166 if primary_key.is_empty() {
167 return Err(SinkError::Config(anyhow!(
168 "`primary_key` must not be empty in {}",
169 SINK_TYPE_UPSERT
170 )));
171 }
172 } else {
173 return Err(SinkError::Config(anyhow!(
174 "Must set `primary_key` in {}",
175 SINK_TYPE_UPSERT
176 )));
177 }
178 }
179
180 config.java_catalog_props = values
182 .iter()
183 .filter(|(k, _v)| {
184 k.starts_with("catalog.")
185 && k != &"catalog.uri"
186 && k != &"catalog.type"
187 && k != &"catalog.name"
188 })
189 .map(|(k, v)| (k[8..].to_string(), v.to_string()))
190 .collect();
191
192 if config.commit_checkpoint_interval == 0 {
193 return Err(SinkError::Config(anyhow!(
194 "`commit_checkpoint_interval` must be greater than 0"
195 )));
196 }
197
198 Ok(config)
199 }
200
201 pub fn catalog_type(&self) -> &str {
202 self.common.catalog_type()
203 }
204
205 pub async fn load_table(&self) -> Result<Table> {
206 self.common
207 .load_table(&self.java_catalog_props)
208 .await
209 .map_err(Into::into)
210 }
211
212 pub async fn create_catalog(&self) -> Result<Arc<dyn Catalog>> {
213 self.common
214 .create_catalog(&self.java_catalog_props)
215 .await
216 .map_err(Into::into)
217 }
218
219 pub fn full_table_name(&self) -> Result<TableIdent> {
220 self.common.full_table_name().map_err(Into::into)
221 }
222}
223
224pub struct IcebergSink {
225 config: IcebergConfig,
226 param: SinkParam,
227 unique_column_ids: Option<Vec<usize>>,
229}
230
231impl TryFrom<SinkParam> for IcebergSink {
232 type Error = SinkError;
233
234 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
235 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
236 IcebergSink::new(config, param)
237 }
238}
239
240impl Debug for IcebergSink {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 f.debug_struct("IcebergSink")
243 .field("config", &self.config)
244 .finish()
245 }
246}
247
248impl IcebergSink {
249 async fn create_and_validate_table(&self) -> Result<Table> {
250 if self.config.create_table_if_not_exists {
251 self.create_table_if_not_exists().await?;
252 }
253
254 let table = self
255 .config
256 .load_table()
257 .await
258 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
259
260 let sink_schema = self.param.schema();
261 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
262 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
263
264 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
265 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
266
267 Ok(table)
268 }
269
270 async fn create_table_if_not_exists(&self) -> Result<()> {
271 let catalog = self.config.create_catalog().await?;
272 let namespace = if let Some(database_name) = &self.config.common.database_name {
273 let namespace = NamespaceIdent::new(database_name.clone());
274 if !catalog
275 .namespace_exists(&namespace)
276 .await
277 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
278 {
279 catalog
280 .create_namespace(&namespace, HashMap::default())
281 .await
282 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
283 .context("failed to create iceberg namespace")?;
284 }
285 namespace
286 } else {
287 bail!("database name must be set if you want to create table")
288 };
289
290 let table_id = self
291 .config
292 .full_table_name()
293 .context("Unable to parse table name")?;
294 if !catalog
295 .table_exists(&table_id)
296 .await
297 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
298 {
299 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
300 let arrow_fields = self
302 .param
303 .columns
304 .iter()
305 .map(|column| {
306 Ok(iceberg_create_table_arrow_convert
307 .to_arrow_field(&column.name, &column.data_type)
308 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
309 .context(format!(
310 "failed to convert {}: {} to arrow type",
311 &column.name, &column.data_type
312 ))?)
313 })
314 .collect::<Result<Vec<ArrowField>>>()?;
315 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
316 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
317 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
318 .context("failed to convert arrow schema to iceberg schema")?;
319
320 let location = {
321 let mut names = namespace.clone().inner();
322 names.push(self.config.common.table_name.clone());
323 match &self.config.common.warehouse_path {
324 Some(warehouse_path) => {
325 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
326 let url = Url::parse(warehouse_path);
327 if url.is_err() || is_s3_tables {
328 if self.config.common.catalog_type() == "rest"
331 || self.config.common.catalog_type() == "rest_rust"
332 {
333 None
334 } else {
335 bail!(format!("Invalid warehouse path: {}", warehouse_path))
336 }
337 } else if warehouse_path.ends_with('/') {
338 Some(format!("{}{}", warehouse_path, names.join("/")))
339 } else {
340 Some(format!("{}/{}", warehouse_path, names.join("/")))
341 }
342 }
343 None => None,
344 }
345 };
346
347 let partition_spec = match &self.config.partition_by {
348 Some(partition_field) => {
349 let mut partition_fields = Vec::<UnboundPartitionField>::new();
350 let re = Regex::new(
352 r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?",
353 )
354 .unwrap();
355 if !re.is_match(partition_field) {
356 bail!(format!(
357 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
358 partition_field
359 ))
360 }
361 let caps = re.captures_iter(partition_field);
362 for (i, mat) in caps.enumerate() {
363 let (column, transform) =
364 if mat.name("n").is_none() && mat.name("field").is_none() {
365 (&mat["transform"], Transform::Identity)
366 } else {
367 let mut func = mat["transform"].to_owned();
368 if func == "bucket" || func == "truncate" {
369 let n = &mat
370 .name("n")
371 .ok_or_else(|| {
372 SinkError::Iceberg(anyhow!(
373 "The `n` must be set with `bucket` and `truncate`"
374 ))
375 })?
376 .as_str();
377 func = format!("{func}[{n}]");
378 }
379 (
380 &mat["field"],
381 Transform::from_str(&func)
382 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?,
383 )
384 };
385
386 match iceberg_schema.field_id_by_name(column) {
387 Some(id) => partition_fields.push(
388 UnboundPartitionField::builder()
389 .source_id(id)
390 .transform(transform)
391 .name(column.to_owned())
392 .field_id(i as i32)
393 .build(),
394 ),
395 None => bail!(format!(
396 "Partition source column does not exist in schema: {}",
397 column
398 )),
399 };
400 }
401 Some(
402 UnboundPartitionSpec::builder()
403 .with_spec_id(0)
404 .add_partition_fields(partition_fields)
405 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
406 .context("failed to add partition columns")?
407 .build(),
408 )
409 }
410 None => None,
411 };
412
413 let table_creation_builder = TableCreation::builder()
414 .name(self.config.common.table_name.clone())
415 .schema(iceberg_schema);
416
417 let table_creation = match (location, partition_spec) {
418 (Some(location), Some(partition_spec)) => table_creation_builder
419 .location(location)
420 .partition_spec(partition_spec)
421 .build(),
422 (Some(location), None) => table_creation_builder.location(location).build(),
423 (None, Some(partition_spec)) => table_creation_builder
424 .partition_spec(partition_spec)
425 .build(),
426 (None, None) => table_creation_builder.build(),
427 };
428
429 catalog
430 .create_table(&namespace, table_creation)
431 .await
432 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
433 .context("failed to create iceberg table")?;
434 }
435 Ok(())
436 }
437
438 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
439 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
440 if let Some(pk) = &config.primary_key {
441 let mut unique_column_ids = Vec::with_capacity(pk.len());
442 for col_name in pk {
443 let id = param
444 .columns
445 .iter()
446 .find(|col| col.name.as_str() == col_name)
447 .ok_or_else(|| {
448 SinkError::Config(anyhow!(
449 "Primary key column {} not found in sink schema",
450 col_name
451 ))
452 })?
453 .column_id
454 .get_id() as usize;
455 unique_column_ids.push(id);
456 }
457 Some(unique_column_ids)
458 } else {
459 unreachable!()
460 }
461 } else {
462 None
463 };
464 Ok(Self {
465 config,
466 param,
467 unique_column_ids,
468 })
469 }
470}
471
472impl Sink for IcebergSink {
473 type Coordinator = IcebergSinkCommitter;
474 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
475
476 const SINK_NAME: &'static str = ICEBERG_SINK;
477
478 async fn validate(&self) -> Result<()> {
479 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
480 bail!("Snowflake catalog only supports iceberg sources");
481 }
482 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
483 risingwave_common::license::Feature::IcebergSinkWithGlue
484 .check_available()
485 .map_err(|e| anyhow::anyhow!(e))?;
486 }
487 let _ = self.create_and_validate_table().await?;
488 Ok(())
489 }
490
491 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
492 let table = self.create_and_validate_table().await?;
493 let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
494 IcebergSinkWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
495 } else {
496 IcebergSinkWriter::new_append_only(table, &writer_param).await?
497 };
498
499 let commit_checkpoint_interval =
500 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
501 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
502 );
503 let writer = CoordinatedLogSinker::new(
504 &writer_param,
505 self.param.clone(),
506 inner,
507 commit_checkpoint_interval,
508 )
509 .await?;
510
511 Ok(writer)
512 }
513
514 fn is_coordinated_sink(&self) -> bool {
515 true
516 }
517
518 async fn new_coordinator(&self, db: DatabaseConnection) -> Result<Self::Coordinator> {
519 let catalog = self.config.create_catalog().await?;
520 let table = self.create_and_validate_table().await?;
521 Ok(IcebergSinkCommitter {
523 catalog,
524 table,
525 is_exactly_once: self.config.is_exactly_once.unwrap_or_default(),
526 last_commit_epoch: 0,
527 sink_id: self.param.sink_id.sink_id(),
528 config: self.config.clone(),
529 param: self.param.clone(),
530 db,
531 commit_notifier: None,
532 _compact_task_guard: None,
533 commit_retry_num: self.config.commit_retry_num,
534 committed_epoch_subscriber: None,
535 })
536 }
537}
538
539pub struct IcebergSinkWriter {
540 writer: IcebergWriterDispatch,
541 arrow_schema: SchemaRef,
542 metrics: IcebergWriterMetrics,
544 table: Table,
546}
547
548#[allow(clippy::type_complexity)]
549enum IcebergWriterDispatch {
550 PartitionAppendOnly {
551 writer: Option<Box<dyn IcebergWriter>>,
552 writer_builder: MonitoredGeneralWriterBuilder<
553 FanoutPartitionWriterBuilder<
554 DataFileWriterBuilder<
555 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
556 >,
557 >,
558 >,
559 },
560 NonpartitionAppendOnly {
561 writer: Option<Box<dyn IcebergWriter>>,
562 writer_builder: MonitoredGeneralWriterBuilder<
563 DataFileWriterBuilder<
564 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
565 >,
566 >,
567 },
568 PartitionUpsert {
569 writer: Option<Box<dyn IcebergWriter>>,
570 writer_builder: MonitoredGeneralWriterBuilder<
571 FanoutPartitionWriterBuilder<
572 EqualityDeltaWriterBuilder<
573 DataFileWriterBuilder<
574 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
575 >,
576 MonitoredPositionDeleteWriterBuilder<
577 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
578 >,
579 EqualityDeleteFileWriterBuilder<
580 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
581 >,
582 >,
583 >,
584 >,
585 arrow_schema_with_op_column: SchemaRef,
586 },
587 NonpartitionUpsert {
588 writer: Option<Box<dyn IcebergWriter>>,
589 writer_builder: MonitoredGeneralWriterBuilder<
590 EqualityDeltaWriterBuilder<
591 DataFileWriterBuilder<
592 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
593 >,
594 MonitoredPositionDeleteWriterBuilder<
595 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
596 >,
597 EqualityDeleteFileWriterBuilder<
598 ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
599 >,
600 >,
601 >,
602 arrow_schema_with_op_column: SchemaRef,
603 },
604}
605
606impl IcebergWriterDispatch {
607 pub fn get_writer(&mut self) -> Option<&mut Box<dyn IcebergWriter>> {
608 match self {
609 IcebergWriterDispatch::PartitionAppendOnly { writer, .. }
610 | IcebergWriterDispatch::NonpartitionAppendOnly { writer, .. }
611 | IcebergWriterDispatch::PartitionUpsert { writer, .. }
612 | IcebergWriterDispatch::NonpartitionUpsert { writer, .. } => writer.as_mut(),
613 }
614 }
615}
616
617pub struct IcebergWriterMetrics {
618 _write_qps: LabelGuardedIntCounter<3>,
623 _write_latency: LabelGuardedHistogram<3>,
624 write_bytes: LabelGuardedIntCounter<3>,
625}
626
627impl IcebergSinkWriter {
628 pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
629 let SinkWriterParam {
630 extra_partition_col_idx,
631 actor_id,
632 sink_id,
633 sink_name,
634 ..
635 } = writer_param;
636 let metrics_labels = [
637 &actor_id.to_string(),
638 &sink_id.to_string(),
639 sink_name.as_str(),
640 ];
641
642 let write_qps = GLOBAL_SINK_METRICS
644 .iceberg_write_qps
645 .with_guarded_label_values(&metrics_labels);
646 let write_latency = GLOBAL_SINK_METRICS
647 .iceberg_write_latency
648 .with_guarded_label_values(&metrics_labels);
649 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
652 .iceberg_rolling_unflushed_data_file
653 .with_guarded_label_values(&metrics_labels);
654 let write_bytes = GLOBAL_SINK_METRICS
655 .iceberg_write_bytes
656 .with_guarded_label_values(&metrics_labels);
657
658 let schema = table.metadata().current_schema();
659 let partition_spec = table.metadata().default_partition_spec();
660
661 let unique_uuid_suffix = Uuid::now_v7();
663
664 let parquet_writer_builder = ParquetWriterBuilder::new(
665 WriterProperties::new(),
666 schema.clone(),
667 table.file_io().clone(),
668 DefaultLocationGenerator::new(table.metadata().clone())
669 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
670 DefaultFileNameGenerator::new(
671 writer_param.actor_id.to_string(),
672 Some(unique_uuid_suffix.to_string()),
673 iceberg::spec::DataFileFormat::Parquet,
674 ),
675 );
676 let data_file_builder =
677 DataFileWriterBuilder::new(parquet_writer_builder, None, partition_spec.spec_id());
678 if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
679 Err(SinkError::Iceberg(anyhow!(
680 "Extra partition column is not supported in append-only mode"
681 )))
682 } else if partition_spec.fields().is_empty() {
683 let writer_builder = MonitoredGeneralWriterBuilder::new(
684 data_file_builder,
685 write_qps.clone(),
686 write_latency.clone(),
687 );
688 let inner_writer = Some(Box::new(
689 writer_builder
690 .clone()
691 .build()
692 .await
693 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
694 ) as Box<dyn IcebergWriter>);
695 Ok(Self {
696 arrow_schema: Arc::new(
697 schema_to_arrow_schema(table.metadata().current_schema())
698 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
699 ),
700 metrics: IcebergWriterMetrics {
701 _write_qps: write_qps,
702 _write_latency: write_latency,
703 write_bytes,
704 },
705 writer: IcebergWriterDispatch::NonpartitionAppendOnly {
706 writer: inner_writer,
707 writer_builder,
708 },
709 table,
710 })
711 } else {
712 let partition_builder = MonitoredGeneralWriterBuilder::new(
713 FanoutPartitionWriterBuilder::new(
714 data_file_builder,
715 partition_spec.clone(),
716 schema.clone(),
717 )
718 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
719 write_qps.clone(),
720 write_latency.clone(),
721 );
722 let inner_writer = Some(Box::new(
723 partition_builder
724 .clone()
725 .build()
726 .await
727 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
728 ) as Box<dyn IcebergWriter>);
729 Ok(Self {
730 arrow_schema: Arc::new(
731 schema_to_arrow_schema(table.metadata().current_schema())
732 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
733 ),
734 metrics: IcebergWriterMetrics {
735 _write_qps: write_qps,
736 _write_latency: write_latency,
737 write_bytes,
738 },
739 writer: IcebergWriterDispatch::PartitionAppendOnly {
740 writer: inner_writer,
741 writer_builder: partition_builder,
742 },
743 table,
744 })
745 }
746 }
747
748 pub async fn new_upsert(
749 table: Table,
750 unique_column_ids: Vec<usize>,
751 writer_param: &SinkWriterParam,
752 ) -> Result<Self> {
753 let SinkWriterParam {
754 extra_partition_col_idx,
755 actor_id,
756 sink_id,
757 sink_name,
758 ..
759 } = writer_param;
760 let metrics_labels = [
761 &actor_id.to_string(),
762 &sink_id.to_string(),
763 sink_name.as_str(),
764 ];
765 let unique_column_ids: Vec<_> = unique_column_ids.into_iter().map(|id| id as i32).collect();
766
767 let write_qps = GLOBAL_SINK_METRICS
769 .iceberg_write_qps
770 .with_guarded_label_values(&metrics_labels);
771 let write_latency = GLOBAL_SINK_METRICS
772 .iceberg_write_latency
773 .with_guarded_label_values(&metrics_labels);
774 let _rolling_unflushed_data_file = GLOBAL_SINK_METRICS
777 .iceberg_rolling_unflushed_data_file
778 .with_guarded_label_values(&metrics_labels);
779 let position_delete_cache_num = GLOBAL_SINK_METRICS
780 .iceberg_position_delete_cache_num
781 .with_guarded_label_values(&metrics_labels);
782 let write_bytes = GLOBAL_SINK_METRICS
783 .iceberg_write_bytes
784 .with_guarded_label_values(&metrics_labels);
785
786 let schema = table.metadata().current_schema();
788 let partition_spec = table.metadata().default_partition_spec();
789
790 let unique_uuid_suffix = Uuid::now_v7();
792
793 let data_file_builder = {
794 let parquet_writer_builder = ParquetWriterBuilder::new(
795 WriterProperties::new(),
796 schema.clone(),
797 table.file_io().clone(),
798 DefaultLocationGenerator::new(table.metadata().clone())
799 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
800 DefaultFileNameGenerator::new(
801 writer_param.actor_id.to_string(),
802 Some(unique_uuid_suffix.to_string()),
803 iceberg::spec::DataFileFormat::Parquet,
804 ),
805 );
806 DataFileWriterBuilder::new(
807 parquet_writer_builder.clone(),
808 None,
809 partition_spec.spec_id(),
810 )
811 };
812 let position_delete_builder = {
813 let parquet_writer_builder = ParquetWriterBuilder::new(
814 WriterProperties::new(),
815 POSITION_DELETE_SCHEMA.clone(),
816 table.file_io().clone(),
817 DefaultLocationGenerator::new(table.metadata().clone())
818 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
819 DefaultFileNameGenerator::new(
820 writer_param.actor_id.to_string(),
821 Some(format!("pos-del-{}", unique_uuid_suffix)),
822 iceberg::spec::DataFileFormat::Parquet,
823 ),
824 );
825 MonitoredPositionDeleteWriterBuilder::new(
826 SortPositionDeleteWriterBuilder::new(
827 parquet_writer_builder.clone(),
828 writer_param
829 .streaming_config
830 .developer
831 .iceberg_sink_positional_delete_cache_size,
832 None,
833 None,
834 ),
835 position_delete_cache_num,
836 )
837 };
838 let equality_delete_builder = {
839 let config = EqualityDeleteWriterConfig::new(
840 unique_column_ids.clone(),
841 table.metadata().current_schema().clone(),
842 None,
843 partition_spec.spec_id(),
844 )
845 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
846 let parquet_writer_builder = ParquetWriterBuilder::new(
847 WriterProperties::new(),
848 Arc::new(
849 arrow_schema_to_schema(config.projected_arrow_schema_ref())
850 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
851 ),
852 table.file_io().clone(),
853 DefaultLocationGenerator::new(table.metadata().clone())
854 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
855 DefaultFileNameGenerator::new(
856 writer_param.actor_id.to_string(),
857 Some(format!("eq-del-{}", unique_uuid_suffix)),
858 iceberg::spec::DataFileFormat::Parquet,
859 ),
860 );
861
862 EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
863 };
864 let delta_builder = EqualityDeltaWriterBuilder::new(
865 data_file_builder,
866 position_delete_builder,
867 equality_delete_builder,
868 unique_column_ids,
869 );
870 if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
871 Err(SinkError::Iceberg(anyhow!(
872 "Extra partition column is not supported in upsert mode"
873 )))
874 } else if partition_spec.fields().is_empty() {
875 let writer_builder = MonitoredGeneralWriterBuilder::new(
876 delta_builder,
877 write_qps.clone(),
878 write_latency.clone(),
879 );
880 let inner_writer = Some(Box::new(
881 writer_builder
882 .clone()
883 .build()
884 .await
885 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
886 ) as Box<dyn IcebergWriter>);
887 let original_arrow_schema = Arc::new(
888 schema_to_arrow_schema(table.metadata().current_schema())
889 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
890 );
891 let schema_with_extra_op_column = {
892 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
893 new_fields.push(Arc::new(ArrowField::new(
894 "op".to_owned(),
895 ArrowDataType::Int32,
896 false,
897 )));
898 Arc::new(ArrowSchema::new(new_fields))
899 };
900 Ok(Self {
901 arrow_schema: original_arrow_schema,
902 metrics: IcebergWriterMetrics {
903 _write_qps: write_qps,
904 _write_latency: write_latency,
905 write_bytes,
906 },
907 table,
908 writer: IcebergWriterDispatch::NonpartitionUpsert {
909 writer: inner_writer,
910 writer_builder,
911 arrow_schema_with_op_column: schema_with_extra_op_column,
912 },
913 })
914 } else {
915 let original_arrow_schema = Arc::new(
916 schema_to_arrow_schema(table.metadata().current_schema())
917 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
918 );
919 let schema_with_extra_op_column = {
920 let mut new_fields = original_arrow_schema.fields().iter().cloned().collect_vec();
921 new_fields.push(Arc::new(ArrowField::new(
922 "op".to_owned(),
923 ArrowDataType::Int32,
924 false,
925 )));
926 Arc::new(ArrowSchema::new(new_fields))
927 };
928 let partition_builder = MonitoredGeneralWriterBuilder::new(
929 FanoutPartitionWriterBuilder::new_with_custom_schema(
930 delta_builder,
931 schema_with_extra_op_column.clone(),
932 partition_spec.clone(),
933 table.metadata().current_schema().clone(),
934 ),
935 write_qps.clone(),
936 write_latency.clone(),
937 );
938 let inner_writer = Some(Box::new(
939 partition_builder
940 .clone()
941 .build()
942 .await
943 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
944 ) as Box<dyn IcebergWriter>);
945 Ok(Self {
946 arrow_schema: original_arrow_schema,
947 metrics: IcebergWriterMetrics {
948 _write_qps: write_qps,
949 _write_latency: write_latency,
950 write_bytes,
951 },
952 table,
953 writer: IcebergWriterDispatch::PartitionUpsert {
954 writer: inner_writer,
955 writer_builder: partition_builder,
956 arrow_schema_with_op_column: schema_with_extra_op_column,
957 },
958 })
959 }
960 }
961}
962
963#[async_trait]
964impl SinkWriter for IcebergSinkWriter {
965 type CommitMetadata = Option<SinkMetadata>;
966
967 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
969 Ok(())
971 }
972
973 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
975 match &mut self.writer {
977 IcebergWriterDispatch::PartitionAppendOnly {
978 writer,
979 writer_builder,
980 } => {
981 if writer.is_none() {
982 *writer = Some(Box::new(
983 writer_builder
984 .clone()
985 .build()
986 .await
987 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
988 ));
989 }
990 }
991 IcebergWriterDispatch::NonpartitionAppendOnly {
992 writer,
993 writer_builder,
994 } => {
995 if writer.is_none() {
996 *writer = Some(Box::new(
997 writer_builder
998 .clone()
999 .build()
1000 .await
1001 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1002 ));
1003 }
1004 }
1005 IcebergWriterDispatch::PartitionUpsert {
1006 writer,
1007 writer_builder,
1008 ..
1009 } => {
1010 if writer.is_none() {
1011 *writer = Some(Box::new(
1012 writer_builder
1013 .clone()
1014 .build()
1015 .await
1016 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1017 ));
1018 }
1019 }
1020 IcebergWriterDispatch::NonpartitionUpsert {
1021 writer,
1022 writer_builder,
1023 ..
1024 } => {
1025 if writer.is_none() {
1026 *writer = Some(Box::new(
1027 writer_builder
1028 .clone()
1029 .build()
1030 .await
1031 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
1032 ));
1033 }
1034 }
1035 };
1036
1037 let (mut chunk, ops) = chunk.compact().into_parts();
1039 if ops.is_empty() {
1040 return Ok(());
1041 }
1042 let write_batch_size = chunk.estimated_heap_size();
1043 let batch = match &self.writer {
1044 IcebergWriterDispatch::PartitionAppendOnly { .. }
1045 | IcebergWriterDispatch::NonpartitionAppendOnly { .. } => {
1046 let filters =
1048 chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
1049 chunk.set_visibility(filters);
1050 IcebergArrowConvert
1051 .to_record_batch(self.arrow_schema.clone(), &chunk.compact())
1052 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1053 }
1054 IcebergWriterDispatch::PartitionUpsert {
1055 arrow_schema_with_op_column,
1056 ..
1057 }
1058 | IcebergWriterDispatch::NonpartitionUpsert {
1059 arrow_schema_with_op_column,
1060 ..
1061 } => {
1062 let chunk = IcebergArrowConvert
1063 .to_record_batch(self.arrow_schema.clone(), &chunk)
1064 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1065 let ops = Arc::new(Int32Array::from(
1066 ops.iter()
1067 .map(|op| match op {
1068 Op::UpdateInsert | Op::Insert => INSERT_OP,
1069 Op::UpdateDelete | Op::Delete => DELETE_OP,
1070 })
1071 .collect_vec(),
1072 ));
1073 let mut columns = chunk.columns().to_vec();
1074 columns.push(ops);
1075 RecordBatch::try_new(arrow_schema_with_op_column.clone(), columns)
1076 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?
1077 }
1078 };
1079
1080 let writer = self.writer.get_writer().unwrap();
1081 writer
1082 .write(batch)
1083 .await
1084 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1085 self.metrics.write_bytes.inc_by(write_batch_size as _);
1086 Ok(())
1087 }
1088
1089 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
1092 if !is_checkpoint {
1094 return Ok(None);
1095 }
1096
1097 let close_result = match &mut self.writer {
1098 IcebergWriterDispatch::PartitionAppendOnly {
1099 writer,
1100 writer_builder,
1101 } => {
1102 let close_result = match writer.take() {
1103 Some(mut writer) => Some(writer.close().await),
1104 _ => None,
1105 };
1106 match writer_builder.clone().build().await {
1107 Ok(new_writer) => {
1108 *writer = Some(Box::new(new_writer));
1109 }
1110 _ => {
1111 warn!("Failed to build new writer after close");
1114 }
1115 }
1116 close_result
1117 }
1118 IcebergWriterDispatch::NonpartitionAppendOnly {
1119 writer,
1120 writer_builder,
1121 } => {
1122 let close_result = match writer.take() {
1123 Some(mut writer) => Some(writer.close().await),
1124 _ => None,
1125 };
1126 match writer_builder.clone().build().await {
1127 Ok(new_writer) => {
1128 *writer = Some(Box::new(new_writer));
1129 }
1130 _ => {
1131 warn!("Failed to build new writer after close");
1134 }
1135 }
1136 close_result
1137 }
1138 IcebergWriterDispatch::PartitionUpsert {
1139 writer,
1140 writer_builder,
1141 ..
1142 } => {
1143 let close_result = match writer.take() {
1144 Some(mut writer) => Some(writer.close().await),
1145 _ => None,
1146 };
1147 match writer_builder.clone().build().await {
1148 Ok(new_writer) => {
1149 *writer = Some(Box::new(new_writer));
1150 }
1151 _ => {
1152 warn!("Failed to build new writer after close");
1155 }
1156 }
1157 close_result
1158 }
1159 IcebergWriterDispatch::NonpartitionUpsert {
1160 writer,
1161 writer_builder,
1162 ..
1163 } => {
1164 let close_result = match writer.take() {
1165 Some(mut writer) => Some(writer.close().await),
1166 _ => None,
1167 };
1168 match writer_builder.clone().build().await {
1169 Ok(new_writer) => {
1170 *writer = Some(Box::new(new_writer));
1171 }
1172 _ => {
1173 warn!("Failed to build new writer after close");
1176 }
1177 }
1178 close_result
1179 }
1180 };
1181
1182 match close_result {
1183 Some(Ok(result)) => {
1184 let version = self.table.metadata().format_version() as u8;
1185 let partition_type = self.table.metadata().default_partition_type();
1186 let data_files = result
1187 .into_iter()
1188 .map(|f| {
1189 SerializedDataFile::try_from(f, partition_type, version == 1)
1190 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1191 })
1192 .collect::<Result<Vec<_>>>()?;
1193 Ok(Some(SinkMetadata::try_from(&IcebergCommitResult {
1194 data_files,
1195 schema_id: self.table.metadata().current_schema_id(),
1196 partition_spec_id: self.table.metadata().default_partition_spec_id(),
1197 })?))
1198 }
1199 Some(Err(err)) => Err(SinkError::Iceberg(anyhow!(err))),
1200 None => Err(SinkError::Iceberg(anyhow!("No writer to close"))),
1201 }
1202 }
1203
1204 async fn abort(&mut self) -> Result<()> {
1206 Ok(())
1208 }
1209}
1210
1211const SCHEMA_ID: &str = "schema_id";
1212const PARTITION_SPEC_ID: &str = "partition_spec_id";
1213const DATA_FILES: &str = "data_files";
1214
1215#[derive(Default, Clone)]
1216struct IcebergCommitResult {
1217 schema_id: i32,
1218 partition_spec_id: i32,
1219 data_files: Vec<SerializedDataFile>,
1220}
1221
1222impl IcebergCommitResult {
1223 fn try_from(value: &SinkMetadata) -> Result<Self> {
1224 if let Some(Serialized(v)) = &value.metadata {
1225 let mut values = if let serde_json::Value::Object(v) =
1226 serde_json::from_slice::<serde_json::Value>(&v.metadata)
1227 .context("Can't parse iceberg sink metadata")?
1228 {
1229 v
1230 } else {
1231 bail!("iceberg sink metadata should be an object");
1232 };
1233
1234 let schema_id;
1235 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1236 schema_id = value
1237 .as_u64()
1238 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1239 } else {
1240 bail!("iceberg sink metadata should have schema_id");
1241 }
1242
1243 let partition_spec_id;
1244 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1245 partition_spec_id = value
1246 .as_u64()
1247 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1248 } else {
1249 bail!("iceberg sink metadata should have partition_spec_id");
1250 }
1251
1252 let data_files: Vec<SerializedDataFile>;
1253 if let serde_json::Value::Array(values) = values
1254 .remove(DATA_FILES)
1255 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1256 {
1257 data_files = values
1258 .into_iter()
1259 .map(from_value::<SerializedDataFile>)
1260 .collect::<std::result::Result<_, _>>()
1261 .unwrap();
1262 } else {
1263 bail!("iceberg sink metadata should have data_files object");
1264 }
1265
1266 Ok(Self {
1267 schema_id: schema_id as i32,
1268 partition_spec_id: partition_spec_id as i32,
1269 data_files,
1270 })
1271 } else {
1272 bail!("Can't create iceberg sink write result from empty data!")
1273 }
1274 }
1275
1276 fn try_from_sealized_bytes(value: Vec<u8>) -> Result<Self> {
1277 let mut values = if let serde_json::Value::Object(value) =
1278 serde_json::from_slice::<serde_json::Value>(&value)
1279 .context("Can't parse iceberg sink metadata")?
1280 {
1281 value
1282 } else {
1283 bail!("iceberg sink metadata should be an object");
1284 };
1285
1286 let schema_id;
1287 if let Some(serde_json::Value::Number(value)) = values.remove(SCHEMA_ID) {
1288 schema_id = value
1289 .as_u64()
1290 .ok_or_else(|| anyhow!("schema_id should be a u64"))?;
1291 } else {
1292 bail!("iceberg sink metadata should have schema_id");
1293 }
1294
1295 let partition_spec_id;
1296 if let Some(serde_json::Value::Number(value)) = values.remove(PARTITION_SPEC_ID) {
1297 partition_spec_id = value
1298 .as_u64()
1299 .ok_or_else(|| anyhow!("partition_spec_id should be a u64"))?;
1300 } else {
1301 bail!("iceberg sink metadata should have partition_spec_id");
1302 }
1303
1304 let data_files: Vec<SerializedDataFile>;
1305 if let serde_json::Value::Array(values) = values
1306 .remove(DATA_FILES)
1307 .ok_or_else(|| anyhow!("iceberg sink metadata should have data_files object"))?
1308 {
1309 data_files = values
1310 .into_iter()
1311 .map(from_value::<SerializedDataFile>)
1312 .collect::<std::result::Result<_, _>>()
1313 .unwrap();
1314 } else {
1315 bail!("iceberg sink metadata should have data_files object");
1316 }
1317
1318 Ok(Self {
1319 schema_id: schema_id as i32,
1320 partition_spec_id: partition_spec_id as i32,
1321 data_files,
1322 })
1323 }
1324}
1325
1326impl<'a> TryFrom<&'a IcebergCommitResult> for SinkMetadata {
1327 type Error = SinkError;
1328
1329 fn try_from(value: &'a IcebergCommitResult) -> std::result::Result<SinkMetadata, Self::Error> {
1330 let json_data_files = serde_json::Value::Array(
1331 value
1332 .data_files
1333 .iter()
1334 .map(serde_json::to_value)
1335 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1336 .context("Can't serialize data files to json")?,
1337 );
1338 let json_value = serde_json::Value::Object(
1339 vec![
1340 (
1341 SCHEMA_ID.to_owned(),
1342 serde_json::Value::Number(value.schema_id.into()),
1343 ),
1344 (
1345 PARTITION_SPEC_ID.to_owned(),
1346 serde_json::Value::Number(value.partition_spec_id.into()),
1347 ),
1348 (DATA_FILES.to_owned(), json_data_files),
1349 ]
1350 .into_iter()
1351 .collect(),
1352 );
1353 Ok(SinkMetadata {
1354 metadata: Some(Serialized(SerializedMetadata {
1355 metadata: serde_json::to_vec(&json_value)
1356 .context("Can't serialize iceberg sink metadata")?,
1357 })),
1358 })
1359 }
1360}
1361
1362impl TryFrom<IcebergCommitResult> for Vec<u8> {
1363 type Error = SinkError;
1364
1365 fn try_from(value: IcebergCommitResult) -> std::result::Result<Vec<u8>, Self::Error> {
1366 let json_data_files = serde_json::Value::Array(
1367 value
1368 .data_files
1369 .iter()
1370 .map(serde_json::to_value)
1371 .collect::<std::result::Result<Vec<serde_json::Value>, _>>()
1372 .context("Can't serialize data files to json")?,
1373 );
1374 let json_value = serde_json::Value::Object(
1375 vec![
1376 (
1377 SCHEMA_ID.to_owned(),
1378 serde_json::Value::Number(value.schema_id.into()),
1379 ),
1380 (
1381 PARTITION_SPEC_ID.to_owned(),
1382 serde_json::Value::Number(value.partition_spec_id.into()),
1383 ),
1384 (DATA_FILES.to_owned(), json_data_files),
1385 ]
1386 .into_iter()
1387 .collect(),
1388 );
1389 Ok(serde_json::to_vec(&json_value).context("Can't serialize iceberg sink metadata")?)
1390 }
1391}
1392pub struct IcebergSinkCommitter {
1393 catalog: Arc<dyn Catalog>,
1394 table: Table,
1395 pub last_commit_epoch: u64,
1396 pub(crate) is_exactly_once: bool,
1397 pub(crate) sink_id: u32,
1398 pub(crate) config: IcebergConfig,
1399 pub(crate) param: SinkParam,
1400 pub(crate) db: DatabaseConnection,
1401 commit_notifier: Option<mpsc::UnboundedSender<()>>,
1402 commit_retry_num: u32,
1403 _compact_task_guard: Option<oneshot::Sender<()>>,
1404 pub(crate) committed_epoch_subscriber: Option<SinkCommittedEpochSubscriber>,
1405}
1406
1407impl IcebergSinkCommitter {
1408 async fn reload_table(
1411 catalog: &dyn Catalog,
1412 table_ident: &TableIdent,
1413 schema_id: i32,
1414 partition_spec_id: i32,
1415 ) -> Result<Table> {
1416 let table = catalog
1417 .load_table(table_ident)
1418 .await
1419 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1420 if table.metadata().current_schema_id() != schema_id {
1421 return Err(SinkError::Iceberg(anyhow!(
1422 "Schema evolution not supported, expect schema id {}, but got {}",
1423 schema_id,
1424 table.metadata().current_schema_id()
1425 )));
1426 }
1427 if table.metadata().default_partition_spec_id() != partition_spec_id {
1428 return Err(SinkError::Iceberg(anyhow!(
1429 "Partition evolution not supported, expect partition spec id {}, but got {}",
1430 partition_spec_id,
1431 table.metadata().default_partition_spec_id()
1432 )));
1433 }
1434 Ok(table)
1435 }
1436}
1437
1438#[async_trait::async_trait]
1439impl SinkCommitCoordinator for IcebergSinkCommitter {
1440 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
1441 if self.is_exactly_once {
1442 self.committed_epoch_subscriber = Some(subscriber);
1443 tracing::info!(
1444 "Sink id = {}: iceberg sink coordinator initing.",
1445 self.param.sink_id.sink_id()
1446 );
1447 if self
1448 .iceberg_sink_has_pre_commit_metadata(&self.db, self.param.sink_id.sink_id())
1449 .await?
1450 {
1451 let ordered_metadata_list_by_end_epoch = self
1452 .get_pre_commit_info_by_sink_id(&self.db, self.param.sink_id.sink_id())
1453 .await?;
1454
1455 let mut last_recommit_epoch = 0;
1456 for (end_epoch, sealized_bytes, snapshot_id, committed) in
1457 ordered_metadata_list_by_end_epoch
1458 {
1459 let write_results_bytes = deserialize_metadata(sealized_bytes);
1460 let mut write_results = vec![];
1461
1462 for each in write_results_bytes {
1463 let write_result = IcebergCommitResult::try_from_sealized_bytes(each)?;
1464 write_results.push(write_result);
1465 }
1466
1467 match (
1468 committed,
1469 self.is_snapshot_id_in_iceberg(&self.config, snapshot_id)
1470 .await?,
1471 ) {
1472 (true, _) => {
1473 tracing::info!(
1474 "Sink id = {}: all data in log store has been written into external sink, do nothing when recovery.",
1475 self.param.sink_id.sink_id()
1476 );
1477 }
1478 (false, true) => {
1479 tracing::info!(
1481 "Sink id = {}: all pre-commit files have been successfully committed into iceberg and do not need to be committed again, mark it as committed.",
1482 self.param.sink_id.sink_id()
1483 );
1484 self.mark_row_is_committed_by_sink_id_and_end_epoch(
1485 &self.db,
1486 self.sink_id,
1487 end_epoch,
1488 )
1489 .await?;
1490 }
1491 (false, false) => {
1492 tracing::info!(
1493 "Sink id = {}: there are files that were not successfully committed; re-commit these files.",
1494 self.param.sink_id.sink_id()
1495 );
1496 self.re_commit(end_epoch, write_results, snapshot_id)
1497 .await?;
1498 }
1499 }
1500
1501 last_recommit_epoch = end_epoch;
1502 }
1503 tracing::info!(
1504 "Sink id = {}: iceberg commit coordinator inited.",
1505 self.param.sink_id.sink_id()
1506 );
1507 return Ok(Some(last_recommit_epoch));
1508 } else {
1509 tracing::info!(
1510 "Sink id = {}: init iceberg coodinator, and system table is empty.",
1511 self.param.sink_id.sink_id()
1512 );
1513 return Ok(None);
1514 }
1515 }
1516
1517 tracing::info!("Iceberg commit coordinator inited.");
1518 return Ok(None);
1519 }
1520
1521 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
1522 tracing::info!("Starting iceberg commit in epoch {epoch}.");
1523 let write_results: Vec<IcebergCommitResult> = metadata
1524 .iter()
1525 .map(IcebergCommitResult::try_from)
1526 .collect::<Result<Vec<IcebergCommitResult>>>()?;
1527
1528 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1530 tracing::debug!(?epoch, "no data to commit");
1531 return Ok(());
1532 }
1533
1534 if write_results
1536 .iter()
1537 .any(|r| r.schema_id != write_results[0].schema_id)
1538 || write_results
1539 .iter()
1540 .any(|r| r.partition_spec_id != write_results[0].partition_spec_id)
1541 {
1542 return Err(SinkError::Iceberg(anyhow!(
1543 "schema_id and partition_spec_id should be the same in all write results"
1544 )));
1545 }
1546
1547 if self.is_exactly_once {
1548 assert!(self.committed_epoch_subscriber.is_some());
1549 match self.committed_epoch_subscriber.clone() {
1550 Some(committed_epoch_subscriber) => {
1551 let (committed_epoch, mut rw_futures_utilrx) =
1553 committed_epoch_subscriber(self.param.sink_id).await?;
1554 if committed_epoch >= epoch {
1556 self.commit_iceberg_inner(epoch, write_results, None)
1557 .await?;
1558 } else {
1559 tracing::info!(
1560 "Waiting for the committed epoch to rise. Current: {}, Waiting for: {}",
1561 committed_epoch,
1562 epoch
1563 );
1564 while let Some(next_committed_epoch) = rw_futures_utilrx.recv().await {
1565 tracing::info!(
1566 "Received next committed epoch: {}",
1567 next_committed_epoch
1568 );
1569 if next_committed_epoch >= epoch {
1571 self.commit_iceberg_inner(epoch, write_results, None)
1572 .await?;
1573 break;
1574 }
1575 }
1576 }
1577 }
1578 None => unreachable!(
1579 "Exactly once sink must wait epoch before committing, committed_epoch_subscriber is not initialized."
1580 ),
1581 }
1582 } else {
1583 self.commit_iceberg_inner(epoch, write_results, None)
1584 .await?;
1585 }
1586
1587 Ok(())
1588 }
1589}
1590
1591impl IcebergSinkCommitter {
1593 async fn re_commit(
1594 &mut self,
1595 epoch: u64,
1596 write_results: Vec<IcebergCommitResult>,
1597 snapshot_id: i64,
1598 ) -> Result<()> {
1599 tracing::info!("Starting iceberg re commit in epoch {epoch}.");
1600
1601 if write_results.is_empty() || write_results.iter().all(|r| r.data_files.is_empty()) {
1603 tracing::debug!(?epoch, "no data to commit");
1604 return Ok(());
1605 }
1606 self.commit_iceberg_inner(epoch, write_results, Some(snapshot_id))
1607 .await?;
1608 Ok(())
1609 }
1610
1611 async fn commit_iceberg_inner(
1612 &mut self,
1613 epoch: u64,
1614 write_results: Vec<IcebergCommitResult>,
1615 snapshot_id: Option<i64>,
1616 ) -> Result<()> {
1617 let is_first_commit = snapshot_id.is_none();
1621 if !is_first_commit {
1622 tracing::info!("Doing iceberg re commit.");
1623 }
1624 self.last_commit_epoch = epoch;
1625 let expect_schema_id = write_results[0].schema_id;
1626 let expect_partition_spec_id = write_results[0].partition_spec_id;
1627
1628 self.table = Self::reload_table(
1630 self.catalog.as_ref(),
1631 self.table.identifier(),
1632 expect_schema_id,
1633 expect_partition_spec_id,
1634 )
1635 .await?;
1636 let Some(schema) = self.table.metadata().schema_by_id(expect_schema_id) else {
1637 return Err(SinkError::Iceberg(anyhow!(
1638 "Can't find schema by id {}",
1639 expect_schema_id
1640 )));
1641 };
1642 let Some(partition_spec) = self
1643 .table
1644 .metadata()
1645 .partition_spec_by_id(expect_partition_spec_id)
1646 else {
1647 return Err(SinkError::Iceberg(anyhow!(
1648 "Can't find partition spec by id {}",
1649 expect_partition_spec_id
1650 )));
1651 };
1652 let partition_type = partition_spec
1653 .as_ref()
1654 .clone()
1655 .partition_type(schema)
1656 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1657
1658 let txn = Transaction::new(&self.table);
1659 let snapshot_id = match snapshot_id {
1661 Some(previous_snapshot_id) => previous_snapshot_id,
1662 None => txn.generate_unique_snapshot_id(),
1663 };
1664 if self.is_exactly_once && is_first_commit {
1665 let mut pre_commit_metadata_bytes = Vec::new();
1667 for each_parallelism_write_result in write_results.clone() {
1668 let each_parallelism_write_result_bytes: Vec<u8> =
1669 each_parallelism_write_result.try_into()?;
1670 pre_commit_metadata_bytes.push(each_parallelism_write_result_bytes);
1671 }
1672
1673 let pre_commit_metadata_bytes: Vec<u8> = serialize_metadata(pre_commit_metadata_bytes);
1674
1675 self.persist_pre_commit_metadata(
1676 self.db.clone(),
1677 self.last_commit_epoch,
1678 epoch,
1679 pre_commit_metadata_bytes,
1680 snapshot_id,
1681 )
1682 .await?;
1683 }
1684
1685 let data_files = write_results
1686 .into_iter()
1687 .flat_map(|r| {
1688 r.data_files.into_iter().map(|f| {
1689 f.try_into(expect_partition_spec_id, &partition_type, schema)
1690 .map_err(|err| SinkError::Iceberg(anyhow!(err)))
1691 })
1692 })
1693 .collect::<Result<Vec<DataFile>>>()?;
1694 let retry_strategy = ExponentialBackoff::from_millis(10)
1699 .max_delay(Duration::from_secs(60))
1700 .map(jitter)
1701 .take(self.commit_retry_num as usize);
1702 let catalog = self.catalog.clone();
1703 let table_ident = self.table.identifier().clone();
1704 let table = Retry::spawn(retry_strategy, || async {
1705 let table = Self::reload_table(
1706 catalog.as_ref(),
1707 &table_ident,
1708 expect_schema_id,
1709 expect_partition_spec_id,
1710 )
1711 .await?;
1712 let txn = Transaction::new(&table);
1713 let mut append_action = txn
1714 .fast_append(Some(snapshot_id), None, vec![])
1715 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1716 append_action
1717 .add_data_files(data_files.clone())
1718 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
1719 let tx = append_action.apply().await.map_err(|err| {
1720 tracing::error!(error = %err.as_report(), "Failed to apply iceberg table");
1721 SinkError::Iceberg(anyhow!(err))
1722 })?;
1723 tx.commit(self.catalog.as_ref()).await.map_err(|err| {
1724 tracing::error!(error = %err.as_report(), "Failed to commit iceberg table");
1725 SinkError::Iceberg(anyhow!(err))
1726 })
1727 })
1728 .await?;
1729 self.table = table;
1730
1731 if let Some(commit_notifier) = &mut self.commit_notifier {
1732 if commit_notifier.send(()).is_err() {
1733 warn!("failed to notify commit");
1734 }
1735 }
1736 tracing::info!("Succeeded to commit to iceberg table in epoch {epoch}.");
1737
1738 if self.is_exactly_once {
1739 self.mark_row_is_committed_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1740 .await?;
1741 tracing::info!(
1742 "Sink id = {}: succeeded mark pre commit metadata in epoch {} to deleted.",
1743 self.sink_id,
1744 epoch
1745 );
1746
1747 self.delete_row_by_sink_id_and_end_epoch(&self.db, self.sink_id, epoch)
1748 .await?;
1749 }
1750 Ok(())
1751 }
1752
1753 async fn persist_pre_commit_metadata(
1754 &self,
1755 db: DatabaseConnection,
1756 start_epoch: u64,
1757 end_epoch: u64,
1758 pre_commit_metadata: Vec<u8>,
1759 snapshot_id: i64,
1760 ) -> Result<()> {
1761 let m = exactly_once_iceberg_sink::ActiveModel {
1762 sink_id: Set(self.sink_id as i32),
1763 end_epoch: Set(end_epoch.try_into().unwrap()),
1764 start_epoch: Set(start_epoch.try_into().unwrap()),
1765 metadata: Set(pre_commit_metadata),
1766 committed: Set(false),
1767 snapshot_id: Set(snapshot_id),
1768 };
1769 match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
1770 Ok(_) => Ok(()),
1771 Err(e) => {
1772 tracing::error!("Error inserting into system table: {:?}", e.as_report());
1773 Err(e.into())
1774 }
1775 }
1776 }
1777
1778 async fn mark_row_is_committed_by_sink_id_and_end_epoch(
1779 &self,
1780 db: &DatabaseConnection,
1781 sink_id: u32,
1782 end_epoch: u64,
1783 ) -> Result<()> {
1784 match Entity::update(exactly_once_iceberg_sink::ActiveModel {
1785 sink_id: Set(sink_id as i32),
1786 end_epoch: Set(end_epoch.try_into().unwrap()),
1787 committed: Set(true),
1788 ..Default::default()
1789 })
1790 .exec(db)
1791 .await
1792 {
1793 Ok(_) => {
1794 tracing::info!(
1795 "Sink id = {}: mark written data status to committed, end_epoch = {}.",
1796 sink_id,
1797 end_epoch
1798 );
1799 Ok(())
1800 }
1801 Err(e) => {
1802 tracing::error!(
1803 "Error marking item to committed from iceberg exactly once system table: {:?}",
1804 e.as_report()
1805 );
1806 Err(e.into())
1807 }
1808 }
1809 }
1810
1811 async fn delete_row_by_sink_id_and_end_epoch(
1812 &self,
1813 db: &DatabaseConnection,
1814 sink_id: u32,
1815 end_epoch: u64,
1816 ) -> Result<()> {
1817 let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
1818 match Entity::delete_many()
1819 .filter(Column::SinkId.eq(sink_id))
1820 .filter(Column::EndEpoch.lt(end_epoch_i64))
1821 .exec(db)
1822 .await
1823 {
1824 Ok(result) => {
1825 let deleted_count = result.rows_affected;
1826
1827 if deleted_count == 0 {
1828 tracing::info!(
1829 "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
1830 sink_id,
1831 end_epoch
1832 );
1833 } else {
1834 tracing::info!(
1835 "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
1836 sink_id,
1837 end_epoch
1838 );
1839 }
1840 Ok(())
1841 }
1842 Err(e) => {
1843 tracing::error!(
1844 "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
1845 sink_id,
1846 e.as_report()
1847 );
1848 Err(e.into())
1849 }
1850 }
1851 }
1852
1853 async fn iceberg_sink_has_pre_commit_metadata(
1854 &self,
1855 db: &DatabaseConnection,
1856 sink_id: u32,
1857 ) -> Result<bool> {
1858 match exactly_once_iceberg_sink::Entity::find()
1859 .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id as i32))
1860 .count(db)
1861 .await
1862 {
1863 Ok(count) => Ok(count > 0),
1864 Err(e) => {
1865 tracing::error!(
1866 "Error querying pre-commit metadata from system table: {:?}",
1867 e.as_report()
1868 );
1869 Err(e.into())
1870 }
1871 }
1872 }
1873
1874 async fn get_pre_commit_info_by_sink_id(
1875 &self,
1876 db: &DatabaseConnection,
1877 sink_id: u32,
1878 ) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
1879 let models: Vec<Model> = Entity::find()
1880 .filter(Column::SinkId.eq(sink_id as i32))
1881 .order_by(Column::EndEpoch, Order::Asc)
1882 .all(db)
1883 .await?;
1884
1885 let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
1886
1887 for model in models {
1888 result.push((
1889 model.end_epoch.try_into().unwrap(),
1890 model.metadata,
1891 model.snapshot_id,
1892 model.committed,
1893 ));
1894 }
1895
1896 Ok(result)
1897 }
1898
1899 async fn is_snapshot_id_in_iceberg(
1903 &self,
1904 iceberg_config: &IcebergConfig,
1905 snapshot_id: i64,
1906 ) -> Result<bool> {
1907 let iceberg_common = iceberg_config.common.clone();
1908 let table = iceberg_common
1909 .load_table(&iceberg_config.java_catalog_props)
1910 .await?;
1911 if table.metadata().snapshot_by_id(snapshot_id).is_some() {
1912 Ok(true)
1913 } else {
1914 Ok(false)
1915 }
1916 }
1917}
1918
1919const MAP_KEY: &str = "key";
1920const MAP_VALUE: &str = "value";
1921
1922fn get_fields<'a>(
1923 our_field_type: &'a risingwave_common::types::DataType,
1924 data_type: &ArrowDataType,
1925 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
1926) -> Option<ArrowFields> {
1927 match data_type {
1928 ArrowDataType::Struct(fields) => {
1929 match our_field_type {
1930 risingwave_common::types::DataType::Struct(struct_fields) => {
1931 struct_fields.iter().for_each(|(name, data_type)| {
1932 let res = schema_fields.insert(name, data_type);
1933 assert!(res.is_none())
1935 });
1936 }
1937 risingwave_common::types::DataType::Map(map_fields) => {
1938 schema_fields.insert(MAP_KEY, map_fields.key());
1939 schema_fields.insert(MAP_VALUE, map_fields.value());
1940 }
1941 risingwave_common::types::DataType::List(list_field) => {
1942 list_field.as_struct().iter().for_each(|(name, data_type)| {
1943 let res = schema_fields.insert(name, data_type);
1944 assert!(res.is_none())
1946 });
1947 }
1948 _ => {}
1949 };
1950 Some(fields.clone())
1951 }
1952 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
1953 get_fields(our_field_type, field.data_type(), schema_fields)
1954 }
1955 _ => None, }
1957}
1958
1959fn check_compatibility(
1960 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
1961 fields: &ArrowFields,
1962) -> anyhow::Result<bool> {
1963 for arrow_field in fields {
1964 let our_field_type = schema_fields
1965 .get(arrow_field.name().as_str())
1966 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
1967
1968 let converted_arrow_data_type = IcebergArrowConvert
1970 .to_arrow_field("", our_field_type)
1971 .map_err(|e| anyhow!(e))?
1972 .data_type()
1973 .clone();
1974
1975 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
1976 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
1977 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
1978 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
1979 (ArrowDataType::List(_), ArrowDataType::List(field))
1980 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
1981 let mut schema_fields = HashMap::new();
1982 get_fields(our_field_type, field.data_type(), &mut schema_fields)
1983 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
1984 }
1985 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
1987 let mut schema_fields = HashMap::new();
1988 our_field_type
1989 .as_struct()
1990 .iter()
1991 .for_each(|(name, data_type)| {
1992 let res = schema_fields.insert(name, data_type);
1993 assert!(res.is_none())
1995 });
1996 check_compatibility(schema_fields, fields)?
1997 }
1998 (left, right) => left.equals_datatype(right),
2006 };
2007 if !compatible {
2008 bail!(
2009 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
2010 arrow_field.name(),
2011 converted_arrow_data_type,
2012 arrow_field.data_type()
2013 );
2014 }
2015 }
2016 Ok(true)
2017}
2018
2019pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
2021 if rw_schema.fields.len() != arrow_schema.fields().len() {
2022 bail!(
2023 "Schema length mismatch, risingwave is {}, and iceberg is {}",
2024 rw_schema.fields.len(),
2025 arrow_schema.fields.len()
2026 );
2027 }
2028
2029 let mut schema_fields = HashMap::new();
2030 rw_schema.fields.iter().for_each(|field| {
2031 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
2032 assert!(res.is_none())
2034 });
2035
2036 check_compatibility(schema_fields, &arrow_schema.fields)?;
2037 Ok(())
2038}
2039
2040pub fn serialize_metadata(metadata: Vec<Vec<u8>>) -> Vec<u8> {
2041 serde_json::to_vec(&metadata).unwrap()
2042}
2043
2044pub fn deserialize_metadata(bytes: Vec<u8>) -> Vec<Vec<u8>> {
2045 serde_json::from_slice(&bytes).unwrap()
2046}
2047
2048#[cfg(test)]
2049mod test {
2050 use std::collections::BTreeMap;
2051
2052 use risingwave_common::array::arrow::arrow_schema_iceberg::FieldRef as ArrowFieldRef;
2053 use risingwave_common::catalog::Field;
2054 use risingwave_common::types::{DataType, MapType, StructType};
2055
2056 use crate::connector_common::IcebergCommon;
2057 use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE;
2058 use crate::sink::iceberg::IcebergConfig;
2059
2060 #[test]
2061 fn test_compatible_arrow_schema() {
2062 use arrow_schema_iceberg::{DataType as ArrowDataType, Field as ArrowField};
2063
2064 use super::*;
2065 let risingwave_schema = Schema::new(vec![
2066 Field::with_name(DataType::Int32, "a"),
2067 Field::with_name(DataType::Int32, "b"),
2068 Field::with_name(DataType::Int32, "c"),
2069 ]);
2070 let arrow_schema = ArrowSchema::new(vec![
2071 ArrowField::new("a", ArrowDataType::Int32, false),
2072 ArrowField::new("b", ArrowDataType::Int32, false),
2073 ArrowField::new("c", ArrowDataType::Int32, false),
2074 ]);
2075
2076 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2077
2078 let risingwave_schema = Schema::new(vec![
2079 Field::with_name(DataType::Int32, "d"),
2080 Field::with_name(DataType::Int32, "c"),
2081 Field::with_name(DataType::Int32, "a"),
2082 Field::with_name(DataType::Int32, "b"),
2083 ]);
2084 let arrow_schema = ArrowSchema::new(vec![
2085 ArrowField::new("a", ArrowDataType::Int32, false),
2086 ArrowField::new("b", ArrowDataType::Int32, false),
2087 ArrowField::new("d", ArrowDataType::Int32, false),
2088 ArrowField::new("c", ArrowDataType::Int32, false),
2089 ]);
2090 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2091
2092 let risingwave_schema = Schema::new(vec![
2093 Field::with_name(
2094 DataType::Struct(StructType::new(vec![
2095 ("a1", DataType::Int32),
2096 (
2097 "a2",
2098 DataType::Struct(StructType::new(vec![
2099 ("a21", DataType::Bytea),
2100 (
2101 "a22",
2102 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2103 ),
2104 ])),
2105 ),
2106 ])),
2107 "a",
2108 ),
2109 Field::with_name(
2110 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
2111 ("b1", DataType::Int32),
2112 ("b2", DataType::Bytea),
2113 (
2114 "b3",
2115 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2116 ),
2117 ])))),
2118 "b",
2119 ),
2120 Field::with_name(
2121 DataType::Map(MapType::from_kv(
2122 DataType::Varchar,
2123 DataType::List(Box::new(DataType::Struct(StructType::new([
2124 ("c1", DataType::Int32),
2125 ("c2", DataType::Bytea),
2126 (
2127 "c3",
2128 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Jsonb)),
2129 ),
2130 ])))),
2131 )),
2132 "c",
2133 ),
2134 ]);
2135 let arrow_schema = ArrowSchema::new(vec![
2136 ArrowField::new(
2137 "a",
2138 ArrowDataType::Struct(ArrowFields::from(vec![
2139 ArrowField::new("a1", ArrowDataType::Int32, false),
2140 ArrowField::new(
2141 "a2",
2142 ArrowDataType::Struct(ArrowFields::from(vec![
2143 ArrowField::new("a21", ArrowDataType::LargeBinary, false),
2144 ArrowField::new_map(
2145 "a22",
2146 "entries",
2147 ArrowFieldRef::new(ArrowField::new(
2148 "key",
2149 ArrowDataType::Utf8,
2150 false,
2151 )),
2152 ArrowFieldRef::new(ArrowField::new(
2153 "value",
2154 ArrowDataType::Utf8,
2155 false,
2156 )),
2157 false,
2158 false,
2159 ),
2160 ])),
2161 false,
2162 ),
2163 ])),
2164 false,
2165 ),
2166 ArrowField::new(
2167 "b",
2168 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2169 ArrowDataType::Struct(ArrowFields::from(vec![
2170 ArrowField::new("b1", ArrowDataType::Int32, false),
2171 ArrowField::new("b2", ArrowDataType::LargeBinary, false),
2172 ArrowField::new_map(
2173 "b3",
2174 "entries",
2175 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2176 ArrowFieldRef::new(ArrowField::new(
2177 "value",
2178 ArrowDataType::Utf8,
2179 false,
2180 )),
2181 false,
2182 false,
2183 ),
2184 ])),
2185 false,
2186 ))),
2187 false,
2188 ),
2189 ArrowField::new_map(
2190 "c",
2191 "entries",
2192 ArrowFieldRef::new(ArrowField::new("key", ArrowDataType::Utf8, false)),
2193 ArrowFieldRef::new(ArrowField::new(
2194 "value",
2195 ArrowDataType::List(ArrowFieldRef::new(ArrowField::new_list_field(
2196 ArrowDataType::Struct(ArrowFields::from(vec![
2197 ArrowField::new("c1", ArrowDataType::Int32, false),
2198 ArrowField::new("c2", ArrowDataType::LargeBinary, false),
2199 ArrowField::new_map(
2200 "c3",
2201 "entries",
2202 ArrowFieldRef::new(ArrowField::new(
2203 "key",
2204 ArrowDataType::Utf8,
2205 false,
2206 )),
2207 ArrowFieldRef::new(ArrowField::new(
2208 "value",
2209 ArrowDataType::Utf8,
2210 false,
2211 )),
2212 false,
2213 false,
2214 ),
2215 ])),
2216 false,
2217 ))),
2218 false,
2219 )),
2220 false,
2221 false,
2222 ),
2223 ]);
2224 try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
2225 }
2226
2227 #[test]
2228 fn test_parse_iceberg_config() {
2229 let values = [
2230 ("connector", "iceberg"),
2231 ("type", "upsert"),
2232 ("primary_key", "v1"),
2233 ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
2234 ("warehouse.path", "s3://iceberg"),
2235 ("s3.endpoint", "http://127.0.0.1:9301"),
2236 ("s3.access.key", "hummockadmin"),
2237 ("s3.secret.key", "hummockadmin"),
2238 ("s3.path.style.access", "true"),
2239 ("s3.region", "us-east-1"),
2240 ("catalog.type", "jdbc"),
2241 ("catalog.name", "demo"),
2242 ("catalog.uri", "jdbc://postgresql://postgres:5432/iceberg"),
2243 ("catalog.jdbc.user", "admin"),
2244 ("catalog.jdbc.password", "123456"),
2245 ("database.name", "demo_db"),
2246 ("table.name", "demo_table"),
2247 ]
2248 .into_iter()
2249 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2250 .collect();
2251
2252 let iceberg_config = IcebergConfig::from_btreemap(values).unwrap();
2253
2254 let expected_iceberg_config = IcebergConfig {
2255 common: IcebergCommon {
2256 warehouse_path: Some("s3://iceberg".to_owned()),
2257 catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_owned()),
2258 region: Some("us-east-1".to_owned()),
2259 endpoint: Some("http://127.0.0.1:9301".to_owned()),
2260 access_key: Some("hummockadmin".to_owned()),
2261 secret_key: Some("hummockadmin".to_owned()),
2262 gcs_credential: None,
2263 catalog_type: Some("jdbc".to_owned()),
2264 glue_id: None,
2265 catalog_name: Some("demo".to_owned()),
2266 database_name: Some("demo_db".to_owned()),
2267 table_name: "demo_table".to_owned(),
2268 path_style_access: Some(true),
2269 credential: None,
2270 oauth2_server_uri: None,
2271 scope: None,
2272 token: None,
2273 enable_config_load: None,
2274 rest_signing_name: None,
2275 rest_signing_region: None,
2276 rest_sigv4_enabled: None,
2277 },
2278 r#type: "upsert".to_owned(),
2279 force_append_only: false,
2280 primary_key: Some(vec!["v1".to_owned()]),
2281 partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
2282 java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
2283 .into_iter()
2284 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2285 .collect(),
2286 commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
2287 create_table_if_not_exists: false,
2288 is_exactly_once: None,
2289 commit_retry_num: 8,
2290 };
2291
2292 assert_eq!(iceberg_config, expected_iceberg_config);
2293
2294 assert_eq!(
2295 &iceberg_config.common.full_table_name().unwrap().to_string(),
2296 "demo_db.demo_table"
2297 );
2298 }
2299
2300 async fn test_create_catalog(configs: BTreeMap<String, String>) {
2301 let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();
2302
2303 let table = iceberg_config.load_table().await.unwrap();
2304
2305 println!("{:?}", table.identifier());
2306 }
2307
2308 #[tokio::test]
2309 #[ignore]
2310 async fn test_storage_catalog() {
2311 let values = [
2312 ("connector", "iceberg"),
2313 ("type", "append-only"),
2314 ("force_append_only", "true"),
2315 ("s3.endpoint", "http://127.0.0.1:9301"),
2316 ("s3.access.key", "hummockadmin"),
2317 ("s3.secret.key", "hummockadmin"),
2318 ("s3.region", "us-east-1"),
2319 ("s3.path.style.access", "true"),
2320 ("catalog.name", "demo"),
2321 ("catalog.type", "storage"),
2322 ("warehouse.path", "s3://icebergdata/demo"),
2323 ("database.name", "s1"),
2324 ("table.name", "t1"),
2325 ]
2326 .into_iter()
2327 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2328 .collect();
2329
2330 test_create_catalog(values).await;
2331 }
2332
2333 #[tokio::test]
2334 #[ignore]
2335 async fn test_rest_catalog() {
2336 let values = [
2337 ("connector", "iceberg"),
2338 ("type", "append-only"),
2339 ("force_append_only", "true"),
2340 ("s3.endpoint", "http://127.0.0.1:9301"),
2341 ("s3.access.key", "hummockadmin"),
2342 ("s3.secret.key", "hummockadmin"),
2343 ("s3.region", "us-east-1"),
2344 ("s3.path.style.access", "true"),
2345 ("catalog.name", "demo"),
2346 ("catalog.type", "rest"),
2347 ("catalog.uri", "http://192.168.167.4:8181"),
2348 ("warehouse.path", "s3://icebergdata/demo"),
2349 ("database.name", "s1"),
2350 ("table.name", "t1"),
2351 ]
2352 .into_iter()
2353 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2354 .collect();
2355
2356 test_create_catalog(values).await;
2357 }
2358
2359 #[tokio::test]
2360 #[ignore]
2361 async fn test_jdbc_catalog() {
2362 let values = [
2363 ("connector", "iceberg"),
2364 ("type", "append-only"),
2365 ("force_append_only", "true"),
2366 ("s3.endpoint", "http://127.0.0.1:9301"),
2367 ("s3.access.key", "hummockadmin"),
2368 ("s3.secret.key", "hummockadmin"),
2369 ("s3.region", "us-east-1"),
2370 ("s3.path.style.access", "true"),
2371 ("catalog.name", "demo"),
2372 ("catalog.type", "jdbc"),
2373 ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"),
2374 ("catalog.jdbc.user", "admin"),
2375 ("catalog.jdbc.password", "123456"),
2376 ("warehouse.path", "s3://icebergdata/demo"),
2377 ("database.name", "s1"),
2378 ("table.name", "t1"),
2379 ]
2380 .into_iter()
2381 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2382 .collect();
2383
2384 test_create_catalog(values).await;
2385 }
2386
2387 #[tokio::test]
2388 #[ignore]
2389 async fn test_hive_catalog() {
2390 let values = [
2391 ("connector", "iceberg"),
2392 ("type", "append-only"),
2393 ("force_append_only", "true"),
2394 ("s3.endpoint", "http://127.0.0.1:9301"),
2395 ("s3.access.key", "hummockadmin"),
2396 ("s3.secret.key", "hummockadmin"),
2397 ("s3.region", "us-east-1"),
2398 ("s3.path.style.access", "true"),
2399 ("catalog.name", "demo"),
2400 ("catalog.type", "hive"),
2401 ("catalog.uri", "thrift://localhost:9083"),
2402 ("warehouse.path", "s3://icebergdata/demo"),
2403 ("database.name", "s1"),
2404 ("table.name", "t1"),
2405 ]
2406 .into_iter()
2407 .map(|(k, v)| (k.to_owned(), v.to_owned()))
2408 .collect();
2409
2410 test_create_catalog(values).await;
2411 }
2412}