Skip to main content

risingwave_connector/sink/iceberg/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod test;
17
18mod commit;
19pub mod commit_retry;
20mod config;
21mod create_table;
22#[cfg(any(test, madsim))]
23pub mod mock_v3_catalog_registry;
24mod prometheus;
25mod writer;
26
27use std::collections::BTreeMap;
28use std::fmt::Debug;
29use std::num::NonZeroU64;
30
31use anyhow::{Context, anyhow};
32pub use commit::*;
33pub use config::*;
34pub use create_table::*;
35use iceberg::table::Table;
36use risingwave_common::bail;
37use tokio::sync::mpsc::UnboundedSender;
38pub use writer::*;
39
40use super::{
41    GLOBAL_SINK_METRICS, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink,
42    SinkError, SinkWriterParam,
43};
44use crate::connector_common::IcebergSinkCompactionUpdate;
45use crate::enforce_secret::EnforceSecret;
46use crate::sink::coordinate::CoordinatedLogSinker;
47use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
48
49pub const ICEBERG_SINK: &str = "iceberg";
50
51pub struct IcebergSink {
52    pub config: IcebergConfig,
53    param: SinkParam,
54    // In upsert mode, it is never None or empty.
55    upsert_primary_key_column_names: Option<Vec<String>>,
56}
57
58impl EnforceSecret for IcebergSink {
59    fn enforce_secret<'a>(
60        prop_iter: impl Iterator<Item = &'a str>,
61    ) -> crate::error::ConnectorResult<()> {
62        for prop in prop_iter {
63            IcebergConfig::enforce_one(prop)?;
64        }
65        Ok(())
66    }
67}
68
69impl TryFrom<SinkParam> for IcebergSink {
70    type Error = SinkError;
71
72    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
73        let config = IcebergConfig::from_btreemap(param.properties.clone())?;
74        IcebergSink::new(config, param)
75    }
76}
77
78impl Debug for IcebergSink {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("IcebergSink")
81            .field("config", &self.config)
82            .finish()
83    }
84}
85
86impl IcebergSink {
87    pub async fn create_and_validate_table(&self) -> Result<Table> {
88        create_and_validate_table_impl(&self.config, &self.param).await
89    }
90
91    pub async fn create_table_if_not_exists(&self) -> Result<()> {
92        create_table_if_not_exists_impl(&self.config, &self.param).await
93    }
94
95    pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
96        if let Some(order_key) = &config.order_key {
97            validate_order_key_columns(
98                order_key,
99                param.columns.iter().map(|column| column.name.as_str()),
100            )
101            .context("invalid order_key")
102            .map_err(SinkError::Config)?;
103        }
104
105        let upsert_primary_key_column_names =
106            if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
107                let pk_indices = param
108                    .downstream_pk
109                    .as_ref()
110                    .filter(|pk| !pk.is_empty())
111                    .ok_or_else(|| {
112                        SinkError::Config(anyhow!(
113                            "primary key must be specified for upsert iceberg sink"
114                        ))
115                    })?;
116                Some(
117                    pk_indices
118                        .iter()
119                        .map(|&idx| {
120                            param
121                                .columns
122                                .get(idx)
123                                .map(|column| column.name.clone())
124                                .ok_or_else(|| {
125                                    SinkError::Config(anyhow!(
126                                        "primary key column index {} out of range in sink schema",
127                                        idx
128                                    ))
129                                })
130                        })
131                        .collect::<Result<Vec<_>>>()?,
132                )
133            } else {
134                None
135            };
136        Ok(Self {
137            config,
138            param,
139            upsert_primary_key_column_names,
140        })
141    }
142}
143
144impl Sink for IcebergSink {
145    type LogSinker = CoordinatedLogSinker<IcebergSinkWriter>;
146
147    const SINK_NAME: &'static str = ICEBERG_SINK;
148
149    async fn validate(&self) -> Result<()> {
150        if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
151            bail!("Snowflake catalog only supports iceberg sources");
152        }
153
154        if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
155            risingwave_common::license::Feature::IcebergSinkWithGlue
156                .check_available()
157                .map_err(|e| anyhow::anyhow!(e))?;
158        }
159
160        // Enforce merge-on-read for append-only tables
161        IcebergConfig::validate_append_only_write_mode(
162            &self.config.r#type,
163            self.config.write_mode,
164        )?;
165
166        // Validate compaction type configuration
167        let compaction_type = self.config.compaction_type();
168
169        // Check COW mode constraints
170        // COW mode only supports 'full' compaction type
171        if self.config.write_mode == IcebergWriteMode::CopyOnWrite
172            && compaction_type != CompactionType::Full
173        {
174            bail!(
175                "'copy-on-write' mode only supports 'full' compaction type, got: '{}'",
176                compaction_type
177            );
178        }
179
180        match compaction_type {
181            CompactionType::SmallFiles => {
182                // 1. check license
183                risingwave_common::license::Feature::IcebergCompaction
184                    .check_available()
185                    .map_err(|e| anyhow::anyhow!(e))?;
186
187                // 2. check write mode
188                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
189                    bail!(
190                        "'small-files' compaction type only supports 'merge-on-read' write mode, got: '{}'",
191                        self.config.write_mode
192                    );
193                }
194
195                // 3. check conflicting parameters
196                if self.config.delete_files_count_threshold.is_some() {
197                    bail!(
198                        "`compaction.delete-files-count-threshold` is not supported for 'small-files' compaction type"
199                    );
200                }
201            }
202            CompactionType::FilesWithDelete => {
203                // 1. check license
204                risingwave_common::license::Feature::IcebergCompaction
205                    .check_available()
206                    .map_err(|e| anyhow::anyhow!(e))?;
207
208                // 2. check write mode
209                if self.config.write_mode != IcebergWriteMode::MergeOnRead {
210                    bail!(
211                        "'files-with-delete' compaction type only supports 'merge-on-read' write mode, got: '{}'",
212                        self.config.write_mode
213                    );
214                }
215
216                // 3. check conflicting parameters
217                if self.config.small_files_threshold_mb.is_some() {
218                    bail!(
219                        "`compaction.small-files-threshold-mb` must not be set for 'files-with-delete' compaction type"
220                    );
221                }
222            }
223            CompactionType::Full => {
224                // Full compaction has no special requirements
225            }
226        }
227
228        let _ = self.create_and_validate_table().await?;
229        Ok(())
230    }
231
232    fn support_schema_change() -> bool {
233        true
234    }
235
236    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
237        let iceberg_config = IcebergConfig::from_btreemap(config.clone())?;
238
239        // Validate compaction interval
240        if let Some(compaction_interval) = iceberg_config.compaction_interval_sec {
241            if iceberg_config.enable_compaction && compaction_interval == 0 {
242                bail!(
243                    "`compaction-interval-sec` must be greater than 0 when `enable-compaction` is true"
244                );
245            }
246
247            tracing::info!(
248                "Alter config compaction_interval set to {} seconds",
249                compaction_interval
250            );
251        }
252
253        // Validate max snapshots
254        if let Some(max_snapshots) = iceberg_config.max_snapshots_num_before_compaction
255            && max_snapshots < 1
256        {
257            bail!(
258                "`compaction.max_snapshots_num` must be greater than 0, got: {}",
259                max_snapshots
260            );
261        }
262
263        // Validate target file size
264        if let Some(target_file_size_mb) = iceberg_config.target_file_size_mb
265            && target_file_size_mb == 0
266        {
267            bail!("`compaction.target_file_size_mb` must be greater than 0");
268        }
269
270        // Validate parquet max row group rows
271        if let Some(max_row_group_rows) = iceberg_config.write_parquet_max_row_group_rows
272            && max_row_group_rows == 0
273        {
274            bail!("`compaction.write_parquet_max_row_group_rows` must be greater than 0");
275        }
276
277        // Validate parquet max row group bytes
278        if let Some(max_row_group_bytes) = iceberg_config.write_parquet_max_row_group_bytes
279            && max_row_group_bytes == 0
280        {
281            bail!("`compaction.write_parquet_max_row_group_bytes` must be greater than 0");
282        }
283
284        // Validate parquet compression codec
285        if let Some(ref compression) = iceberg_config.write_parquet_compression {
286            let valid_codecs = [
287                "uncompressed",
288                "snappy",
289                "gzip",
290                "lzo",
291                "brotli",
292                "lz4",
293                "zstd",
294            ];
295            if !valid_codecs.contains(&compression.to_lowercase().as_str()) {
296                bail!(
297                    "`compaction.write_parquet_compression` must be one of {:?}, got: {}",
298                    valid_codecs,
299                    compression
300                );
301            }
302        }
303
304        Ok(())
305    }
306
307    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
308        let writer = IcebergSinkWriter::new(
309            self.config.clone(),
310            self.param.clone(),
311            writer_param.clone(),
312            self.upsert_primary_key_column_names.clone(),
313        );
314
315        let commit_checkpoint_interval =
316            NonZeroU64::new(self.config.commit_checkpoint_interval).expect(
317                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
318            );
319        let log_sinker = CoordinatedLogSinker::new(
320            &writer_param,
321            self.param.clone(),
322            writer,
323            commit_checkpoint_interval,
324        )
325        .await?;
326
327        Ok(log_sinker)
328    }
329
330    fn is_coordinated_sink(&self) -> bool {
331        true
332    }
333
334    async fn new_coordinator(
335        &self,
336        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
337    ) -> Result<SinkCommitCoordinator> {
338        let catalog = self.config.create_catalog().await?;
339        let table = self.create_and_validate_table().await?;
340        let coordinator = IcebergSinkCommitter {
341            catalog,
342            table,
343            last_commit_epoch: 0,
344            sink_id: self.param.sink_id,
345            config: self.config.clone(),
346            param: self.param.clone(),
347            commit_retry_num: self.config.commit_retry_num,
348            iceberg_compact_stat_sender,
349        };
350        if self.config.is_exactly_once.unwrap_or_default() {
351            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
352        } else {
353            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
354        }
355    }
356}