risingwave_meta/manager/iceberg_v3_sink/mod.rs
1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Manager for the Iceberg V3 sink path. Owns per-sink commit coordinators that
16//! drive iceberg `commit_epoch` ahead of hummock `commit_epoch` and persist
17//! exactly-once state via `pending_sink_state`.
18//!
19//! This is intentionally separate from [`crate::manager::sink_coordination`]
20//! (which serves V1/V2 sinks via gRPC). Future V3 responsibilities such as
21//! per-sink compaction will live alongside the per-sink commit coordinator here.
22
23pub mod backfill;
24mod coordinator;
25mod manager;
26
27use std::collections::BTreeMap;
28
29use anyhow::anyhow;
30pub use manager::IcebergV3SinkManager;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_connector::sink::iceberg::{ENABLE_PK_INDEX, IcebergConfig};
33use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
34use risingwave_pb::catalog::PbSink;
35
36/// Returns true if the given sink properties identify a Iceberg V3 sink
37/// (i.e. an iceberg sink with `enable_pk_index = 'true'`).
38pub fn is_iceberg_v3_sink(properties: &BTreeMap<String, String>) -> bool {
39 let connector_match = properties
40 .get(UPSTREAM_SOURCE_KEY)
41 .map(|v| v.eq_ignore_ascii_case("iceberg"))
42 .unwrap_or(false);
43 let pk_index_enabled = properties
44 .get(ENABLE_PK_INDEX)
45 .map(|v| v.eq_ignore_ascii_case("true"))
46 .unwrap_or(false);
47 connector_match && pk_index_enabled
48}
49
50/// Build an [`IcebergConfig`] from a [`PbSink`], filling secret refs along the
51/// way. Used at CREATE SINK time and during recovery to (re-)register the V3
52/// commit coordinator.
53pub fn build_iceberg_config(pb_sink: &PbSink) -> anyhow::Result<IcebergConfig> {
54 let properties: BTreeMap<String, String> = pb_sink.properties.clone().into_iter().collect();
55 let secret_refs: BTreeMap<_, _> = pb_sink.secret_refs.clone().into_iter().collect();
56 let with_secrets = LocalSecretManager::global()
57 .fill_secrets(properties, secret_refs)
58 .map_err(|e| anyhow!(e).context("fill secrets for iceberg"))?;
59 IcebergConfig::from_btreemap(with_secrets)
60 .map_err(|e| anyhow!(e).context("parse iceberg config"))
61}