risingwave_meta/manager/iceberg_v3_sink/manager.rs
1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::RwLock;
20use risingwave_connector::sink::catalog::SinkId;
21use risingwave_connector::sink::iceberg::IcebergConfig;
22use risingwave_pb::stream_service::barrier_complete_response::IcebergV3SinkMetadata as PbIcebergV3SinkMetadata;
23use sea_orm::DatabaseConnection;
24use tokio::sync::Mutex;
25use tracing::warn;
26
27use super::coordinator::IcebergV3Coordinator;
28
29type CoordinatorRef = Arc<Mutex<IcebergV3Coordinator>>;
30
31/// Manager for the Iceberg V3 sink path, cheap to clone.
32#[derive(Clone)]
33pub struct IcebergV3SinkManager {
34 inner: Arc<ManagerInner>,
35}
36
37struct ManagerInner {
38 db: DatabaseConnection,
39 /// Read on every pre-commit/commit (only to clone out the `CoordinatorRef`, never held across an await);
40 /// written only by register/unregister/reset, which are rare control-plane events.
41 coordinators: RwLock<HashMap<SinkId, CoordinatorRef>>,
42}
43
44impl IcebergV3SinkManager {
45 pub fn new(db: DatabaseConnection) -> Self {
46 IcebergV3SinkManager {
47 inner: Arc::new(ManagerInner {
48 db,
49 coordinators: RwLock::new(HashMap::new()),
50 }),
51 }
52 }
53
54 /// Register an Iceberg V3 sink so its commit coordinator is ready to receive epoch reports. Builds and
55 /// fully initializes the coordinator (loading the iceberg table and draining any recovered pending
56 /// commits) BEFORE inserting it, so a successful return means the sink is ready to serve. Idempotent:
57 /// registering the same `sink_id` replaces the existing coordinator.
58 pub async fn register_v3_sink(
59 &self,
60 sink_id: SinkId,
61 iceberg_config: IcebergConfig,
62 ) -> anyhow::Result<()> {
63 // Initialize (load + recover + drain) outside the map lock; this is the slow, fallible part.
64 let coordinator =
65 IcebergV3Coordinator::init(sink_id, iceberg_config, self.inner.db.clone()).await?;
66
67 let prev = self
68 .inner
69 .coordinators
70 .write()
71 .insert(sink_id, Arc::new(Mutex::new(coordinator)));
72 if prev.is_some() {
73 // Replacing an existing coordinator. Any in-flight commit on the old one keeps it alive via its
74 // own `Arc` until it finishes; the snapshot_id idempotency check guards against double-commit.
75 warn!(%sink_id, "iceberg v3 coordinator re-registered; replacing previous instance");
76 }
77 Ok(())
78 }
79
80 /// Pre-commit phase for one epoch: persist the merged report under `pending_sink_state` (no iceberg IO).
81 /// The barrier-complete path awaits this BEFORE issuing hummock `commit_epoch`.
82 pub async fn pre_commit_v3_epoch(
83 &self,
84 sink_id: SinkId,
85 prev_epoch: u64,
86 reports: Vec<PbIcebergV3SinkMetadata>,
87 ) -> anyhow::Result<()> {
88 let coordinator = self.coordinator(sink_id)?;
89 coordinator
90 .lock()
91 .await
92 .pre_commit(prev_epoch, reports)
93 .await
94 }
95
96 /// Commit phase for one epoch: run an iceberg `overwrite_files` transaction and mark its pending row
97 /// committed. The barrier-complete path awaits this AFTER hummock `commit_epoch`.
98 pub async fn commit_v3_epoch(&self, sink_id: SinkId) -> anyhow::Result<()> {
99 let coordinator = self.coordinator(sink_id)?;
100 coordinator.lock().await.commit().await
101 }
102
103 /// Unregister the given `sink_id`(s)' coordinator(s) (e.g. at DROP SINK time). Unregistering an unknown
104 /// `sink_id` is a no-op.
105 pub fn unregister_v3_sinks(&self, sink_ids: Vec<SinkId>) {
106 let mut coordinators = self.inner.coordinators.write();
107 for sink_id in sink_ids {
108 coordinators.remove(&sink_id);
109 }
110 }
111
112 /// Drop every coordinator. Used at recovery time.
113 pub fn reset(&self) {
114 self.inner.coordinators.write().clear();
115 }
116
117 fn coordinator(&self, sink_id: SinkId) -> anyhow::Result<CoordinatorRef> {
118 self.inner
119 .coordinators
120 .read()
121 .get(&sink_id)
122 .cloned()
123 .ok_or_else(|| {
124 anyhow!(
125 "iceberg v3 coordinator for sink {} is not registered",
126 sink_id
127 )
128 })
129 }
130}