risingwave_connector/sink/iceberg/
mod.rs1#[cfg(test)]
16mod test;
17
18mod commit;
19pub mod commit_retry;
20mod config;
21mod create_table;
22#[cfg(any(test, madsim))]
23pub mod mock_v3_catalog_registry;
24mod prometheus;
25mod writer;
26
27use std::collections::BTreeMap;
28use std::fmt::Debug;
29use std::num::NonZeroU64;
30
31use anyhow::{Context, anyhow};
32pub use commit::*;
33pub use config::*;
34pub use create_table::*;
35use iceberg::table::Table;
36use risingwave_common::bail;
37use tokio::sync::mpsc::UnboundedSender;
38pub use writer::*;
39
40use super::{
41 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
42 SinkError, SinkWriterParam,
43};
44use crate::connector_common::IcebergSinkCompactionUpdate;
45use crate::enforce_secret::EnforceSecret;
46use crate::sink::coordinate::CoordinatedLogSinker;
47use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
48
49pub const ICEBERG_SINK: &str = "iceberg";
50
51pub struct IcebergSink {
52 pub config: IcebergConfig,
53 param: SinkParam,
54 upsert_primary_key_column_names: Option<Vec<String>>,
56}
57
58impl EnforceSecret for IcebergSink {
59 fn enforce_secret<'a>(
60 prop_iter: impl Iterator<Item = &'a str>,
61 ) -> crate::error::ConnectorResult<()> {
62 for prop in prop_iter {
63 IcebergConfig::enforce_one(prop)?;
64 }
65 Ok(())
66 }
67}
68
69impl TryFrom<SinkParam> for IcebergSink {
70 type Error = SinkError;
71
72 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
73 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
74 IcebergSink::new(config, param)
75 }
76}
77
78impl Debug for IcebergSink {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("IcebergSink")
81 .field("config", &self.config)
82 .finish()
83 }
84}
85
86impl IcebergSink {
87 pub async fn create_and_validate_table(&self) -> Result<Table> {
88 create_and_validate_table_impl(&self.config, &self.param).await
89 }
90
91 pub async fn create_table_if_not_exists(&self) -> Result<()> {
92 create_table_if_not_exists_impl(&self.config, &self.param).await
93 }
94
95 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
96 if let Some(order_key) = &config.order_key {
97 validate_order_key_columns(
98 order_key,
99 param.columns.iter().map(|column| column.name.as_str()),
100 )
101 .context("invalid order_key")
102 .map_err(SinkError::Config)?;
103 }
104
105 let upsert_primary_key_column_names =
106 if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
107 let pk_indices = param
108 .downstream_pk
109 .as_ref()
110 .filter(|pk| !pk.is_empty())
111 .ok_or_else(|| {
112 SinkError::Config(anyhow!(
113 "primary key must be specified for upsert iceberg sink"
114 ))
115 })?;
116 Some(
117 pk_indices
118 .iter()
119 .map(|&idx| {
120 param
121 .columns
122 .get(idx)
123 .map(|column| column.name.clone())
124 .ok_or_else(|| {
125 SinkError::Config(anyhow!(
126 "primary key column index {} out of range in sink schema",
127 idx
128 ))
129 })
130 })
131 .collect::<Result<Vec<_>>>()?,
132 )
133 } else {
134 None
135 };
136 Ok(Self {
137 config,
138 param,
139 upsert_primary_key_column_names,
140 })
141 }
142}
143
144impl Sink for IcebergSink {
145 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
146
147 const SINK_NAME: &'static str = ICEBERG_SINK;
148
149 async fn validate(&self) -> Result<()> {
150 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
151 bail!("Snowflake catalog only supports iceberg sources");
152 }
153
154 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
155 risingwave_common::license::Feature::IcebergSinkWithGlue
156 .check_available()
157 .map_err(|e| anyhow::anyhow!(e))?;
158 }
159
160 IcebergConfig::validate_append_only_write_mode(
162 &self.config.r#type,
163 self.config.write_mode,
164 )?;
165
166 let compaction_type = self.config.compaction_type();
168
169 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
172 && compaction_type != CompactionType::Full
173 {
174 bail!(
175 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
176 compaction_type
177 );
178 }
179
180 match compaction_type {
181 CompactionType::SmallFiles => {
182 risingwave_common::license::Feature::IcebergCompaction
184 .check_available()
185 .map_err(|e| anyhow::anyhow!(e))?;
186
187 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
189 bail!(
190 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
191 self.config.write_mode
192 );
193 }
194
195 if self.config.delete_files_count_threshold.is_some() {
197 bail!(
198 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
199 );
200 }
201 }
202 CompactionType::FilesWithDelete => {
203 risingwave_common::license::Feature::IcebergCompaction
205 .check_available()
206 .map_err(|e| anyhow::anyhow!(e))?;
207
208 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
210 bail!(
211 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
212 self.config.write_mode
213 );
214 }
215
216 if self.config.small_files_threshold_mb.is_some() {
218 bail!(
219 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
220 );
221 }
222 }
223 CompactionType::Full => {
224 }
226 }
227
228 let _ = self.create_and_validate_table().await?;
229 Ok(())
230 }
231
232 fn support_schema_change() -> bool {
233 true
234 }
235
236 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
237 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
238
239 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
241 if iceberg_config.enable_compaction && compaction_interval == 0 {
242 bail!(
243 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
244 );
245 }
246
247 tracing::info!(
248 "Alter config compaction_interval set to {} seconds",
249 compaction_interval
250 );
251 }
252
253 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
255 && max_snapshots < 1
256 {
257 bail!(
258 "`compaction.max_snapshots_num` must be greater than 0, got: {}",
259 max_snapshots
260 );
261 }
262
263 if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
265 && target_file_size_mb == 0
266 {
267 bail!("`compaction.target_file_size_mb` must be greater than 0");
268 }
269
270 if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
272 && max_row_group_rows == 0
273 {
274 bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
275 }
276
277 if let Some(max_row_group_bytes) = iceberg_config.write_parquet_max_row_group_bytes
279 && max_row_group_bytes == 0
280 {
281 bail!("`compaction.write_parquet_max_row_group_bytes` must be greater than 0");
282 }
283
284 if let Some(ref compression) = iceberg_config.write_parquet_compression {
286 let valid_codecs = [
287 "uncompressed",
288 "snappy",
289 "gzip",
290 "lzo",
291 "brotli",
292 "lz4",
293 "zstd",
294 ];
295 if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
296 bail!(
297 "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
298 valid_codecs,
299 compression
300 );
301 }
302 }
303
304 Ok(())
305 }
306
307 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
308 let writer = IcebergSinkWriter::new(
309 self.config.clone(),
310 self.param.clone(),
311 writer_param.clone(),
312 self.upsert_primary_key_column_names.clone(),
313 );
314
315 let commit_checkpoint_interval =
316 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
317 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
318 );
319 let log_sinker = CoordinatedLogSinker::new(
320 &writer_param,
321 self.param.clone(),
322 writer,
323 commit_checkpoint_interval,
324 )
325 .await?;
326
327 Ok(log_sinker)
328 }
329
330 fn is_coordinated_sink(&self) -> bool {
331 true
332 }
333
334 async fn new_coordinator(
335 &self,
336 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
337 ) -> Result<SinkCommitCoordinator> {
338 let catalog = self.config.create_catalog().await?;
339 let table = self.create_and_validate_table().await?;
340 let coordinator = IcebergSinkCommitter {
341 catalog,
342 table,
343 last_commit_epoch: 0,
344 sink_id: self.param.sink_id,
345 config: self.config.clone(),
346 param: self.param.clone(),
347 commit_retry_num: self.config.commit_retry_num,
348 iceberg_compact_stat_sender,
349 };
350 if self.config.is_exactly_once.unwrap_or_default() {
351 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
352 } else {
353 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
354 }
355 }
356}