Skip to main content

risingwave_meta/manager/iceberg_v3_sink/
manager.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::RwLock;
20use risingwave_connector::sink::catalog::SinkId;
21use risingwave_connector::sink::iceberg::IcebergConfig;
22use risingwave_pb::stream_service::barrier_complete_response::IcebergV3SinkMetadata as PbIcebergV3SinkMetadata;
23use sea_orm::DatabaseConnection;
24use tokio::sync::Mutex;
25use tracing::warn;
26
27use super::coordinator::IcebergV3Coordinator;
28
29type CoordinatorRef = Arc<Mutex<IcebergV3Coordinator>>;
30
31/// Manager for the Iceberg V3 sink path, cheap to clone.
32#[derive(Clone)]
33pub struct IcebergV3SinkManager {
34    inner: Arc<ManagerInner>,
35}
36
37struct ManagerInner {
38    db: DatabaseConnection,
39    /// Read on every pre-commit/commit (only to clone out the `CoordinatorRef`, never held across an await);
40    /// written only by register/unregister/reset, which are rare control-plane events.
41    coordinators: RwLock<HashMap<SinkId, CoordinatorRef>>,
42}
43
44impl IcebergV3SinkManager {
45    pub fn new(db: DatabaseConnection) -> Self {
46        IcebergV3SinkManager {
47            inner: Arc::new(ManagerInner {
48                db,
49                coordinators: RwLock::new(HashMap::new()),
50            }),
51        }
52    }
53
54    /// Register an Iceberg V3 sink so its commit coordinator is ready to receive epoch reports. Builds and
55    /// fully initializes the coordinator (loading the iceberg table and draining any recovered pending
56    /// commits) BEFORE inserting it, so a successful return means the sink is ready to serve. Idempotent:
57    /// registering the same `sink_id` replaces the existing coordinator.
58    pub async fn register_v3_sink(
59        &self,
60        sink_id: SinkId,
61        iceberg_config: IcebergConfig,
62    ) -> anyhow::Result<()> {
63        // Initialize (load + recover + drain) outside the map lock; this is the slow, fallible part.
64        let coordinator =
65            IcebergV3Coordinator::init(sink_id, iceberg_config, self.inner.db.clone()).await?;
66
67        let prev = self
68            .inner
69            .coordinators
70            .write()
71            .insert(sink_id, Arc::new(Mutex::new(coordinator)));
72        if prev.is_some() {
73            // Replacing an existing coordinator. Any in-flight commit on the old one keeps it alive via its
74            // own `Arc` until it finishes; the snapshot_id idempotency check guards against double-commit.
75            warn!(%sink_id, "iceberg v3 coordinator re-registered; replacing previous instance");
76        }
77        Ok(())
78    }
79
80    /// Pre-commit phase for one epoch: persist the merged report under `pending_sink_state` (no iceberg IO).
81    /// The barrier-complete path awaits this BEFORE issuing hummock `commit_epoch`.
82    pub async fn pre_commit_v3_epoch(
83        &self,
84        sink_id: SinkId,
85        prev_epoch: u64,
86        reports: Vec<PbIcebergV3SinkMetadata>,
87    ) -> anyhow::Result<()> {
88        let coordinator = self.coordinator(sink_id)?;
89        coordinator
90            .lock()
91            .await
92            .pre_commit(prev_epoch, reports)
93            .await
94    }
95
96    /// Commit phase for one epoch: run an iceberg `overwrite_files` transaction and mark its pending row
97    /// committed. The barrier-complete path awaits this AFTER hummock `commit_epoch`.
98    pub async fn commit_v3_epoch(&self, sink_id: SinkId) -> anyhow::Result<()> {
99        let coordinator = self.coordinator(sink_id)?;
100        coordinator.lock().await.commit().await
101    }
102
103    /// Unregister the given `sink_id`(s)' coordinator(s) (e.g. at DROP SINK time). Unregistering an unknown
104    /// `sink_id` is a no-op.
105    pub fn unregister_v3_sinks(&self, sink_ids: Vec<SinkId>) {
106        let mut coordinators = self.inner.coordinators.write();
107        for sink_id in sink_ids {
108            coordinators.remove(&sink_id);
109        }
110    }
111
112    /// Drop every coordinator. Used at recovery time.
113    pub fn reset(&self) {
114        self.inner.coordinators.write().clear();
115    }
116
117    fn coordinator(&self, sink_id: SinkId) -> anyhow::Result<CoordinatorRef> {
118        self.inner
119            .coordinators
120            .read()
121            .get(&sink_id)
122            .cloned()
123            .ok_or_else(|| {
124                anyhow!(
125                    "iceberg v3 coordinator for sink {} is not registered",
126                    sink_id
127                )
128            })
129    }
130}