risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod test;
17
18mod commit;
19mod config;
20mod create_table;
21mod prometheus;
22mod writer;
23
24use std::collections::BTreeMap;
25use std::fmt::Debug;
26use std::num::NonZeroU64;
27
28use anyhow::{Context, anyhow};
29pub use commit::*;
30pub use config::*;
31pub use create_table::*;
32use iceberg::table::Table;
33use risingwave_common::bail;
34use tokio::sync::mpsc::UnboundedSender;
35pub use writer::*;
36
37use super::{
38    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
39    SinkError, SinkWriterParam,
40};
41use crate::connector_common::IcebergSinkCompactionUpdate;
42use crate::enforce_secret::EnforceSecret;
43use crate::sink::coordinate::CoordinatedLogSinker;
44use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
45
46pub const ICEBERG_SINK: &str = "iceberg";
47
48pub struct IcebergSink {
49    pub config: IcebergConfig,
50    param: SinkParam,
51    // In upsert mode, it is never None or empty.
52    upsert_primary_key_column_names: Option<Vec<String>>,
53}
54
55impl EnforceSecret for IcebergSink {
56    fn enforce_secret<'a>(
57        prop_iter: impl Iterator<Item = &'a str>,
58    ) -> crate::error::ConnectorResult<()> {
59        for prop in prop_iter {
60            IcebergConfig::enforce_one(prop)?;
61        }
62        Ok(())
63    }
64}
65
66impl TryFrom<SinkParam> for IcebergSink {
67    type Error = SinkError;
68
69    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
70        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
71        IcebergSink::new(config, param)
72    }
73}
74
75impl Debug for IcebergSink {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("IcebergSink")
78            .field("config", &self.config)
79            .finish()
80    }
81}
82
83impl IcebergSink {
84    pub async fn create_and_validate_table(&self) -> Result<Table> {
85        create_and_validate_table_impl(&self.config, &self.param).await
86    }
87
88    pub async fn create_table_if_not_exists(&self) -> Result<()> {
89        create_table_if_not_exists_impl(&self.config, &self.param).await
90    }
91
92    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
93        if let Some(order_key) = &config.order_key {
94            validate_order_key_columns(
95                order_key,
96                param.columns.iter().map(|column| column.name.as_str()),
97            )
98            .context("invalid order_key")
99            .map_err(SinkError::Config)?;
100        }
101
102        let upsert_primary_key_column_names =
103            if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
104                let pk_indices = param
105                    .downstream_pk
106                    .as_ref()
107                    .filter(|pk| !pk.is_empty())
108                    .ok_or_else(|| {
109                        SinkError::Config(anyhow!(
110                            "primary key must be specified for upsert iceberg sink"
111                        ))
112                    })?;
113                Some(
114                    pk_indices
115                        .iter()
116                        .map(|&idx| {
117                            param
118                                .columns
119                                .get(idx)
120                                .map(|column| column.name.clone())
121                                .ok_or_else(|| {
122                                    SinkError::Config(anyhow!(
123                                        "primary key column index {} out of range in sink schema",
124                                        idx
125                                    ))
126                                })
127                        })
128                        .collect::<Result<Vec<_>>>()?,
129                )
130            } else {
131                None
132            };
133        Ok(Self {
134            config,
135            param,
136            upsert_primary_key_column_names,
137        })
138    }
139}
140
141impl Sink for IcebergSink {
142    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
143
144    const SINK_NAME: &'static str = ICEBERG_SINK;
145
146    async fn validate(&self) -> Result<()> {
147        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
148            bail!("Snowflake catalog only supports iceberg sources");
149        }
150
151        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
152            risingwave_common::license::Feature::IcebergSinkWithGlue
153                .check_available()
154                .map_err(|e| anyhow::anyhow!(e))?;
155        }
156
157        // Enforce merge-on-read for append-only tables
158        IcebergConfig::validate_append_only_write_mode(
159            &self.config.r#type,
160            self.config.write_mode,
161        )?;
162
163        // Validate compaction type configuration
164        let compaction_type = self.config.compaction_type();
165
166        // Check COW mode constraints
167        // COW mode only supports 'full' compaction type
168        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
169            && compaction_type != CompactionType::Full
170        {
171            bail!(
172                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
173                compaction_type
174            );
175        }
176
177        match compaction_type {
178            CompactionType::SmallFiles => {
179                // 1. check license
180                risingwave_common::license::Feature::IcebergCompaction
181                    .check_available()
182                    .map_err(|e| anyhow::anyhow!(e))?;
183
184                // 2. check write mode
185                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
186                    bail!(
187                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
188                        self.config.write_mode
189                    );
190                }
191
192                // 3. check conflicting parameters
193                if self.config.delete_files_count_threshold.is_some() {
194                    bail!(
195                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
196                    );
197                }
198            }
199            CompactionType::FilesWithDelete => {
200                // 1. check license
201                risingwave_common::license::Feature::IcebergCompaction
202                    .check_available()
203                    .map_err(|e| anyhow::anyhow!(e))?;
204
205                // 2. check write mode
206                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
207                    bail!(
208                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
209                        self.config.write_mode
210                    );
211                }
212
213                // 3. check conflicting parameters
214                if self.config.small_files_threshold_mb.is_some() {
215                    bail!(
216                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
217                    );
218                }
219            }
220            CompactionType::Full => {
221                // Full compaction has no special requirements
222            }
223        }
224
225        let _ = self.create_and_validate_table().await?;
226        Ok(())
227    }
228
229    fn support_schema_change() -> bool {
230        true
231    }
232
233    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
234        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
235
236        // Validate compaction interval
237        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
238            if iceberg_config.enable_compaction && compaction_interval == 0 {
239                bail!(
240                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
241                );
242            }
243
244            tracing::info!(
245                "Alter config compaction_interval set to {} seconds",
246                compaction_interval
247            );
248        }
249
250        // Validate max snapshots
251        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
252            && max_snapshots < 1
253        {
254            bail!(
255                "`compaction.max-snapshots-num` must be greater than 0, got: {}",
256                max_snapshots
257            );
258        }
259
260        // Validate target file size
261        if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
262            && target_file_size_mb == 0
263        {
264            bail!("`compaction.target_file_size_mb` must be greater than 0");
265        }
266
267        // Validate parquet max row group rows
268        if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
269            && max_row_group_rows == 0
270        {
271            bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
272        }
273
274        // Validate parquet max row group bytes
275        if let Some(max_row_group_bytes) = iceberg_config.write_parquet_max_row_group_bytes
276            && max_row_group_bytes == 0
277        {
278            bail!("`compaction.write_parquet_max_row_group_bytes` must be greater than 0");
279        }
280
281        // Validate parquet compression codec
282        if let Some(ref compression) = iceberg_config.write_parquet_compression {
283            let valid_codecs = [
284                "uncompressed",
285                "snappy",
286                "gzip",
287                "lzo",
288                "brotli",
289                "lz4",
290                "zstd",
291            ];
292            if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
293                bail!(
294                    "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
295                    valid_codecs,
296                    compression
297                );
298            }
299        }
300
301        Ok(())
302    }
303
304    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
305        let writer = IcebergSinkWriter::new(
306            self.config.clone(),
307            self.param.clone(),
308            writer_param.clone(),
309            self.upsert_primary_key_column_names.clone(),
310        );
311
312        let commit_checkpoint_interval =
313            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
314                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
315            );
316        let log_sinker = CoordinatedLogSinker::new(
317            &writer_param,
318            self.param.clone(),
319            writer,
320            commit_checkpoint_interval,
321        )
322        .await?;
323
324        Ok(log_sinker)
325    }
326
327    fn is_coordinated_sink(&self) -> bool {
328        true
329    }
330
331    async fn new_coordinator(
332        &self,
333        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
334    ) -> Result<SinkCommitCoordinator> {
335        let catalog = self.config.create_catalog().await?;
336        let table = self.create_and_validate_table().await?;
337        let coordinator = IcebergSinkCommitter {
338            catalog,
339            table,
340            last_commit_epoch: 0,
341            sink_id: self.param.sink_id,
342            config: self.config.clone(),
343            param: self.param.clone(),
344            commit_retry_num: self.config.commit_retry_num,
345            iceberg_compact_stat_sender,
346        };
347        if self.config.is_exactly_once.unwrap_or_default() {
348            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
349        } else {
350            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
351        }
352    }
353}