risingwave_connector/sink/iceberg/
mod.rs1#[cfg(test)]
16mod test;
17
18mod commit;
19mod config;
20mod create_table;
21mod prometheus;
22mod writer;
23
24use std::collections::BTreeMap;
25use std::fmt::Debug;
26use std::num::NonZeroU64;
27
28use anyhow::anyhow;
29pub use commit::*;
30pub use config::*;
31pub use create_table::*;
32use iceberg::table::Table;
33use risingwave_common::bail;
34use tokio::sync::mpsc::UnboundedSender;
35pub use writer::*;
36
37use super::{
38 GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
39 SinkError, SinkWriterParam,
40};
41use crate::connector_common::IcebergSinkCompactionUpdate;
42use crate::enforce_secret::EnforceSecret;
43use crate::sink::coordinate::CoordinatedLogSinker;
44use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
45
46pub const ICEBERG_SINK: &str = "iceberg";
47
48pub struct IcebergSink {
49 pub config: IcebergConfig,
50 param: SinkParam,
51 unique_column_ids: Option<Vec<usize>>,
53}
54
55impl EnforceSecret for IcebergSink {
56 fn enforce_secret<'a>(
57 prop_iter: impl Iterator<Item = &'a str>,
58 ) -> crate::error::ConnectorResult<()> {
59 for prop in prop_iter {
60 IcebergConfig::enforce_one(prop)?;
61 }
62 Ok(())
63 }
64}
65
66impl TryFrom<SinkParam> for IcebergSink {
67 type Error = SinkError;
68
69 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
70 let config = IcebergConfig::from_btreemap(param.properties.clone())?;
71 IcebergSink::new(config, param)
72 }
73}
74
75impl Debug for IcebergSink {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("IcebergSink")
78 .field("config", &self.config)
79 .finish()
80 }
81}
82
83impl IcebergSink {
84 pub async fn create_and_validate_table(&self) -> Result<Table> {
85 create_and_validate_table_impl(&self.config, &self.param).await
86 }
87
88 pub async fn create_table_if_not_exists(&self) -> Result<()> {
89 create_table_if_not_exists_impl(&self.config, &self.param).await
90 }
91
92 pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
93 let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
94 if let Some(pk) = &config.primary_key {
95 let mut unique_column_ids = Vec::with_capacity(pk.len());
96 for col_name in pk {
97 let id = param
98 .columns
99 .iter()
100 .find(|col| col.name.as_str() == col_name)
101 .ok_or_else(|| {
102 SinkError::Config(anyhow!(
103 "Primary key column {} not found in sink schema",
104 col_name
105 ))
106 })?
107 .column_id
108 .get_id() as usize;
109 unique_column_ids.push(id);
110 }
111 Some(unique_column_ids)
112 } else {
113 unreachable!()
114 }
115 } else {
116 None
117 };
118 Ok(Self {
119 config,
120 param,
121 unique_column_ids,
122 })
123 }
124}
125
126impl Sink for IcebergSink {
127 type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
128
129 const SINK_NAME: &'static str = ICEBERG_SINK;
130
131 async fn validate(&self) -> Result<()> {
132 if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
133 bail!("Snowflake catalog only supports iceberg sources");
134 }
135
136 if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
137 risingwave_common::license::Feature::IcebergSinkWithGlue
138 .check_available()
139 .map_err(|e| anyhow::anyhow!(e))?;
140 }
141
142 IcebergConfig::validate_append_only_write_mode(
144 &self.config.r#type,
145 self.config.write_mode,
146 )?;
147
148 let compaction_type = self.config.compaction_type();
150
151 if self.config.write_mode == IcebergWriteMode::CopyOnWrite
154 && compaction_type != CompactionType::Full
155 {
156 bail!(
157 "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
158 compaction_type
159 );
160 }
161
162 match compaction_type {
163 CompactionType::SmallFiles => {
164 risingwave_common::license::Feature::IcebergCompaction
166 .check_available()
167 .map_err(|e| anyhow::anyhow!(e))?;
168
169 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
171 bail!(
172 "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
173 self.config.write_mode
174 );
175 }
176
177 if self.config.delete_files_count_threshold.is_some() {
179 bail!(
180 "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
181 );
182 }
183 }
184 CompactionType::FilesWithDelete => {
185 risingwave_common::license::Feature::IcebergCompaction
187 .check_available()
188 .map_err(|e| anyhow::anyhow!(e))?;
189
190 if self.config.write_mode != IcebergWriteMode::MergeOnRead {
192 bail!(
193 "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
194 self.config.write_mode
195 );
196 }
197
198 if self.config.small_files_threshold_mb.is_some() {
200 bail!(
201 "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
202 );
203 }
204 }
205 CompactionType::Full => {
206 }
208 }
209
210 let _ = self.create_and_validate_table().await?;
211 Ok(())
212 }
213
214 fn support_schema_change() -> bool {
215 true
216 }
217
218 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
219 let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
220
221 if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
223 if iceberg_config.enable_compaction && compaction_interval == 0 {
224 bail!(
225 "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
226 );
227 }
228
229 tracing::info!(
231 "Alter config compaction_interval set to {} seconds",
232 compaction_interval
233 );
234 }
235
236 if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
238 && max_snapshots < 1
239 {
240 bail!(
241 "`compaction.max-snapshots-num` must be greater than 0, got: {}",
242 max_snapshots
243 );
244 }
245
246 if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
248 && target_file_size_mb == 0
249 {
250 bail!("`compaction.target_file_size_mb` must be greater than 0");
251 }
252
253 if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
255 && max_row_group_rows == 0
256 {
257 bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
258 }
259
260 if let Some(ref compression) = iceberg_config.write_parquet_compression {
262 let valid_codecs = [
263 "uncompressed",
264 "snappy",
265 "gzip",
266 "lzo",
267 "brotli",
268 "lz4",
269 "zstd",
270 ];
271 if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
272 bail!(
273 "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
274 valid_codecs,
275 compression
276 );
277 }
278 }
279
280 Ok(())
281 }
282
283 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
284 let writer = IcebergSinkWriter::new(
285 self.config.clone(),
286 self.param.clone(),
287 writer_param.clone(),
288 self.unique_column_ids.clone(),
289 );
290
291 let commit_checkpoint_interval =
292 NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
293 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
294 );
295 let log_sinker = CoordinatedLogSinker::new(
296 &writer_param,
297 self.param.clone(),
298 writer,
299 commit_checkpoint_interval,
300 )
301 .await?;
302
303 Ok(log_sinker)
304 }
305
306 fn is_coordinated_sink(&self) -> bool {
307 true
308 }
309
310 async fn new_coordinator(
311 &self,
312 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
313 ) -> Result<SinkCommitCoordinator> {
314 let catalog = self.config.create_catalog().await?;
315 let table = self.create_and_validate_table().await?;
316 let coordinator = IcebergSinkCommitter {
317 catalog,
318 table,
319 last_commit_epoch: 0,
320 sink_id: self.param.sink_id,
321 config: self.config.clone(),
322 param: self.param.clone(),
323 commit_retry_num: self.config.commit_retry_num,
324 iceberg_compact_stat_sender,
325 };
326 if self.config.is_exactly_once.unwrap_or_default() {
327 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
328 } else {
329 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
330 }
331 }
332}