risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod test;
17
18mod commit;
19mod config;
20mod create_table;
21mod prometheus;
22mod writer;
23
24use std::collections::BTreeMap;
25use std::fmt::Debug;
26use std::num::NonZeroU64;
27
28use anyhow::anyhow;
29pub use commit::*;
30pub use config::*;
31pub use create_table::*;
32use iceberg::table::Table;
33use risingwave_common::bail;
34use tokio::sync::mpsc::UnboundedSender;
35pub use writer::*;
36
37use super::{
38    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
39    SinkError, SinkWriterParam,
40};
41use crate::connector_common::IcebergSinkCompactionUpdate;
42use crate::enforce_secret::EnforceSecret;
43use crate::sink::coordinate::CoordinatedLogSinker;
44use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
45
46pub const ICEBERG_SINK: &str = "iceberg";
47
48pub struct IcebergSink {
49    pub config: IcebergConfig,
50    param: SinkParam,
51    // In upsert mode, it never be None and empty.
52    unique_column_ids: Option<Vec<usize>>,
53}
54
55impl EnforceSecret for IcebergSink {
56    fn enforce_secret<'a>(
57        prop_iter: impl Iterator<Item = &'a str>,
58    ) -> crate::error::ConnectorResult<()> {
59        for prop in prop_iter {
60            IcebergConfig::enforce_one(prop)?;
61        }
62        Ok(())
63    }
64}
65
66impl TryFrom<SinkParam> for IcebergSink {
67    type Error = SinkError;
68
69    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
70        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
71        IcebergSink::new(config, param)
72    }
73}
74
75impl Debug for IcebergSink {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("IcebergSink")
78            .field("config", &self.config)
79            .finish()
80    }
81}
82
83impl IcebergSink {
84    pub async fn create_and_validate_table(&self) -> Result<Table> {
85        create_and_validate_table_impl(&self.config, &self.param).await
86    }
87
88    pub async fn create_table_if_not_exists(&self) -> Result<()> {
89        create_table_if_not_exists_impl(&self.config, &self.param).await
90    }
91
92    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
93        let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
94            if let Some(pk) = &config.primary_key {
95                let mut unique_column_ids = Vec::with_capacity(pk.len());
96                for col_name in pk {
97                    let id = param
98                        .columns
99                        .iter()
100                        .find(|col| col.name.as_str() == col_name)
101                        .ok_or_else(|| {
102                            SinkError::Config(anyhow!(
103                                "Primary key column {} not found in sink schema",
104                                col_name
105                            ))
106                        })?
107                        .column_id
108                        .get_id() as usize;
109                    unique_column_ids.push(id);
110                }
111                Some(unique_column_ids)
112            } else {
113                unreachable!()
114            }
115        } else {
116            None
117        };
118        Ok(Self {
119            config,
120            param,
121            unique_column_ids,
122        })
123    }
124}
125
126impl Sink for IcebergSink {
127    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
128
129    const SINK_NAME: &'static str = ICEBERG_SINK;
130
131    async fn validate(&self) -> Result<()> {
132        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
133            bail!("Snowflake catalog only supports iceberg sources");
134        }
135
136        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
137            risingwave_common::license::Feature::IcebergSinkWithGlue
138                .check_available()
139                .map_err(|e| anyhow::anyhow!(e))?;
140        }
141
142        // Enforce merge-on-read for append-only tables
143        IcebergConfig::validate_append_only_write_mode(
144            &self.config.r#type,
145            self.config.write_mode,
146        )?;
147
148        // Validate compaction type configuration
149        let compaction_type = self.config.compaction_type();
150
151        // Check COW mode constraints
152        // COW mode only supports 'full' compaction type
153        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
154            && compaction_type != CompactionType::Full
155        {
156            bail!(
157                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
158                compaction_type
159            );
160        }
161
162        match compaction_type {
163            CompactionType::SmallFiles => {
164                // 1. check license
165                risingwave_common::license::Feature::IcebergCompaction
166                    .check_available()
167                    .map_err(|e| anyhow::anyhow!(e))?;
168
169                // 2. check write mode
170                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
171                    bail!(
172                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
173                        self.config.write_mode
174                    );
175                }
176
177                // 3. check conflicting parameters
178                if self.config.delete_files_count_threshold.is_some() {
179                    bail!(
180                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
181                    );
182                }
183            }
184            CompactionType::FilesWithDelete => {
185                // 1. check license
186                risingwave_common::license::Feature::IcebergCompaction
187                    .check_available()
188                    .map_err(|e| anyhow::anyhow!(e))?;
189
190                // 2. check write mode
191                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
192                    bail!(
193                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
194                        self.config.write_mode
195                    );
196                }
197
198                // 3. check conflicting parameters
199                if self.config.small_files_threshold_mb.is_some() {
200                    bail!(
201                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
202                    );
203                }
204            }
205            CompactionType::Full => {
206                // Full compaction has no special requirements
207            }
208        }
209
210        let _ = self.create_and_validate_table().await?;
211        Ok(())
212    }
213
214    fn support_schema_change() -> bool {
215        true
216    }
217
218    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
219        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
220
221        // Validate compaction interval
222        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
223            if iceberg_config.enable_compaction && compaction_interval == 0 {
224                bail!(
225                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
226                );
227            }
228
229            // log compaction_interval
230            tracing::info!(
231                "Alter config compaction_interval set to {} seconds",
232                compaction_interval
233            );
234        }
235
236        // Validate max snapshots
237        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
238            && max_snapshots < 1
239        {
240            bail!(
241                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
242                max_snapshots
243            );
244        }
245
246        // Validate target file size
247        if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
248            && target_file_size_mb == 0
249        {
250            bail!("`compaction.target_file_size_mb` must be greater than 0");
251        }
252
253        // Validate parquet max row group rows
254        if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
255            && max_row_group_rows == 0
256        {
257            bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
258        }
259
260        // Validate parquet compression codec
261        if let Some(ref compression) = iceberg_config.write_parquet_compression {
262            let valid_codecs = [
263                "uncompressed",
264                "snappy",
265                "gzip",
266                "lzo",
267                "brotli",
268                "lz4",
269                "zstd",
270            ];
271            if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
272                bail!(
273                    "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
274                    valid_codecs,
275                    compression
276                );
277            }
278        }
279
280        Ok(())
281    }
282
283    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
284        let writer = IcebergSinkWriter::new(
285            self.config.clone(),
286            self.param.clone(),
287            writer_param.clone(),
288            self.unique_column_ids.clone(),
289        );
290
291        let commit_checkpoint_interval =
292            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
293                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
294            );
295        let log_sinker = CoordinatedLogSinker::new(
296            &writer_param,
297            self.param.clone(),
298            writer,
299            commit_checkpoint_interval,
300        )
301        .await?;
302
303        Ok(log_sinker)
304    }
305
306    fn is_coordinated_sink(&self) -> bool {
307        true
308    }
309
310    async fn new_coordinator(
311        &self,
312        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
313    ) -> Result<SinkCommitCoordinator> {
314        let catalog = self.config.create_catalog().await?;
315        let table = self.create_and_validate_table().await?;
316        let coordinator = IcebergSinkCommitter {
317            catalog,
318            table,
319            last_commit_epoch: 0,
320            sink_id: self.param.sink_id,
321            config: self.config.clone(),
322            param: self.param.clone(),
323            commit_retry_num: self.config.commit_retry_num,
324            iceberg_compact_stat_sender,
325        };
326        if self.config.is_exactly_once.unwrap_or_default() {
327            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
328        } else {
329            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
330        }
331    }
332}