risingwave_connector/sink/iceberg/
mod.rs1#[cfg(test)]
16mod test;
17
18mod commit;
19mod config;
20mod create_table;
21mod prometheus;
22mod writer;
23
24use std::collections::BTreeMap;
25use std::fmt::Debug;
26use std::num::NonZeroU64;
27
28use anyhow::{Context, anyhow};
29pub use commit::*;
30pub use config::*;
31pub use create_table::*;
32use iceberg::table::Table;
33use risingwave_common::bail;
34use tokio::sync::mpsc::UnboundedSender;
35pub use writer::*;
36
37use super::{
38 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
39 SinkError, SinkWriterParam,
40};
41use crate::connector_common::IcebergSinkCompactionUpdate;
42use crate::enforce_secret::EnforceSecret;
43use crate::sink::coordinate::CoordinatedLogSinker;
44use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
45
46pub const ICEBERG_SINK: &str = "iceberg";
47
48pub struct IcebergSink {
49 pub config: IcebergConfig,
50 param: SinkParam,
51 upsert_primary_key_column_names: Option<Vec<String>>,
53}
54
55impl EnforceSecret for IcebergSink {
56 fn enforce_secret<'a>(
57 prop_iter: impl Iterator<Item = &'a str>,
58 ) -> crate::error::ConnectorResult<()> {
59 for prop in prop_iter {
60 IcebergConfig::enforce_one(prop)?;
61 }
62 Ok(())
63 }
64}
65
66impl TryFrom<SinkParam> for IcebergSink {
67 type Error = SinkError;
68
69 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
70 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
71 IcebergSink::new(config, param)
72 }
73}
74
75impl Debug for IcebergSink {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("IcebergSink")
78 .field("config", &self.config)
79 .finish()
80 }
81}
82
83impl IcebergSink {
84 pub async fn create_and_validate_table(&self) -> Result<Table> {
85 create_and_validate_table_impl(&self.config, &self.param).await
86 }
87
88 pub async fn create_table_if_not_exists(&self) -> Result<()> {
89 create_table_if_not_exists_impl(&self.config, &self.param).await
90 }
91
92 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
93 if let Some(order_key) = &config.order_key {
94 validate_order_key_columns(
95 order_key,
96 param.columns.iter().map(|column| column.name.as_str()),
97 )
98 .context("invalid order_key")
99 .map_err(SinkError::Config)?;
100 }
101
102 let upsert_primary_key_column_names =
103 if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
104 let pk_indices = param
105 .downstream_pk
106 .as_ref()
107 .filter(|pk| !pk.is_empty())
108 .ok_or_else(|| {
109 SinkError::Config(anyhow!(
110 "primary key must be specified for upsert iceberg sink"
111 ))
112 })?;
113 Some(
114 pk_indices
115 .iter()
116 .map(|&idx| {
117 param
118 .columns
119 .get(idx)
120 .map(|column| column.name.clone())
121 .ok_or_else(|| {
122 SinkError::Config(anyhow!(
123 "primary key column index {} out of range in sink schema",
124 idx
125 ))
126 })
127 })
128 .collect::<Result<Vec<_>>>()?,
129 )
130 } else {
131 None
132 };
133 Ok(Self {
134 config,
135 param,
136 upsert_primary_key_column_names,
137 })
138 }
139}
140
141impl Sink for IcebergSink {
142 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
143
144 const SINK_NAME: &'static str = ICEBERG_SINK;
145
146 async fn validate(&self) -> Result<()> {
147 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
148 bail!("Snowflake catalog only supports iceberg sources");
149 }
150
151 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
152 risingwave_common::license::Feature::IcebergSinkWithGlue
153 .check_available()
154 .map_err(|e| anyhow::anyhow!(e))?;
155 }
156
157 IcebergConfig::validate_append_only_write_mode(
159 &self.config.r#type,
160 self.config.write_mode,
161 )?;
162
163 let compaction_type = self.config.compaction_type();
165
166 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
169 && compaction_type != CompactionType::Full
170 {
171 bail!(
172 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
173 compaction_type
174 );
175 }
176
177 match compaction_type {
178 CompactionType::SmallFiles => {
179 risingwave_common::license::Feature::IcebergCompaction
181 .check_available()
182 .map_err(|e| anyhow::anyhow!(e))?;
183
184 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
186 bail!(
187 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
188 self.config.write_mode
189 );
190 }
191
192 if self.config.delete_files_count_threshold.is_some() {
194 bail!(
195 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
196 );
197 }
198 }
199 CompactionType::FilesWithDelete => {
200 risingwave_common::license::Feature::IcebergCompaction
202 .check_available()
203 .map_err(|e| anyhow::anyhow!(e))?;
204
205 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
207 bail!(
208 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
209 self.config.write_mode
210 );
211 }
212
213 if self.config.small_files_threshold_mb.is_some() {
215 bail!(
216 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
217 );
218 }
219 }
220 CompactionType::Full => {
221 }
223 }
224
225 let _ = self.create_and_validate_table().await?;
226 Ok(())
227 }
228
229 fn support_schema_change() -> bool {
230 true
231 }
232
233 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
234 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
235
236 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
238 if iceberg_config.enable_compaction && compaction_interval == 0 {
239 bail!(
240 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
241 );
242 }
243
244 tracing::info!(
245 "Alter config compaction_interval set to {} seconds",
246 compaction_interval
247 );
248 }
249
250 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
252 && max_snapshots < 1
253 {
254 bail!(
255 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
256 max_snapshots
257 );
258 }
259
260 if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
262 && target_file_size_mb == 0
263 {
264 bail!("`compaction.target_file_size_mb` must be greater than 0");
265 }
266
267 if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
269 && max_row_group_rows == 0
270 {
271 bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
272 }
273
274 if let Some(max_row_group_bytes) = iceberg_config.write_parquet_max_row_group_bytes
276 && max_row_group_bytes == 0
277 {
278 bail!("`compaction.write_parquet_max_row_group_bytes` must be greater than 0");
279 }
280
281 if let Some(ref compression) = iceberg_config.write_parquet_compression {
283 let valid_codecs = [
284 "uncompressed",
285 "snappy",
286 "gzip",
287 "lzo",
288 "brotli",
289 "lz4",
290 "zstd",
291 ];
292 if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
293 bail!(
294 "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
295 valid_codecs,
296 compression
297 );
298 }
299 }
300
301 Ok(())
302 }
303
304 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
305 let writer = IcebergSinkWriter::new(
306 self.config.clone(),
307 self.param.clone(),
308 writer_param.clone(),
309 self.upsert_primary_key_column_names.clone(),
310 );
311
312 let commit_checkpoint_interval =
313 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
314 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
315 );
316 let log_sinker = CoordinatedLogSinker::new(
317 &writer_param,
318 self.param.clone(),
319 writer,
320 commit_checkpoint_interval,
321 )
322 .await?;
323
324 Ok(log_sinker)
325 }
326
327 fn is_coordinated_sink(&self) -> bool {
328 true
329 }
330
331 async fn new_coordinator(
332 &self,
333 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
334 ) -> Result<SinkCommitCoordinator> {
335 let catalog = self.config.create_catalog().await?;
336 let table = self.create_and_validate_table().await?;
337 let coordinator = IcebergSinkCommitter {
338 catalog,
339 table,
340 last_commit_epoch: 0,
341 sink_id: self.param.sink_id,
342 config: self.config.clone(),
343 param: self.param.clone(),
344 commit_retry_num: self.config.commit_retry_num,
345 iceberg_compact_stat_sender,
346 };
347 if self.config.is_exactly_once.unwrap_or_default() {
348 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
349 } else {
350 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
351 }
352 }
353}