risingwave_meta/manager/iceberg_compaction/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod gc;
16mod manual;
17mod schedule;
18mod stream;
19
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::anyhow;
25use parking_lot::RwLock;
26use risingwave_common::id::WorkerId;
27use risingwave_connector::sink::SinkParam;
28use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
29use risingwave_connector::sink::iceberg::IcebergConfig;
30use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
31use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
32use tonic::Streaming;
33
34use super::MetaSrvEnv;
35use crate::MetaResult;
36use crate::hummock::IcebergCompactorManagerRef;
37use crate::manager::MetadataManager;
38use crate::rpc::metrics::MetaMetrics;
39
40pub type IcebergCompactionManagerRef = Arc<IcebergCompactionManager>;
41
42pub(crate) type CompactorChangeTx =
43    UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
44
45pub(crate) type CompactorChangeRx =
46    UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
47
48type ManualCompactionWaiter = tokio::sync::oneshot::Sender<MetaResult<u64>>;
49
50use schedule::CompactionTrack;
51pub use schedule::IcebergCompactionScheduleStatus;
52
53pub struct IcebergCompactionManager {
54    pub env: MetaSrvEnv,
55    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
56
57    metadata_manager: MetadataManager,
58    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
59
60    compactor_streams_change_tx: CompactorChangeTx,
61
62    pub metrics: Arc<MetaMetrics>,
63}
64
65struct IcebergCompactionManagerInner {
66    sink_schedules: HashMap<SinkId, CompactionTrack>,
67    snapshot_expiration_sink_ids: HashSet<SinkId>,
68    manual_compaction_waiters: HashMap<SinkId, ManualCompactionWaiter>,
69}
70
71impl IcebergCompactionManager {
72    fn report_timeout(&self) -> Duration {
73        Duration::from_secs(self.env.opts.iceberg_compaction_report_timeout_sec)
74    }
75
76    fn config_refresh_interval(&self) -> Duration {
77        Duration::from_secs(self.env.opts.iceberg_compaction_config_refresh_interval_sec)
78    }
79
80    pub fn build(
81        env: MetaSrvEnv,
82        metadata_manager: MetadataManager,
83        iceberg_compactor_manager: IcebergCompactorManagerRef,
84        metrics: Arc<MetaMetrics>,
85    ) -> (Arc<Self>, CompactorChangeRx) {
86        let (compactor_streams_change_tx, compactor_streams_change_rx) =
87            tokio::sync::mpsc::unbounded_channel();
88        (
89            Arc::new(Self {
90                env,
91                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
92                    sink_schedules: HashMap::default(),
93                    snapshot_expiration_sink_ids: HashSet::default(),
94                    manual_compaction_waiters: HashMap::default(),
95                })),
96                metadata_manager,
97                iceberg_compactor_manager,
98                compactor_streams_change_tx,
99                metrics,
100            }),
101            compactor_streams_change_rx,
102        )
103    }
104
105    async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
106        let prost_sink_catalog = self
107            .metadata_manager
108            .catalog_controller
109            .get_sink_by_id(sink_id)
110            .await?
111            .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
112        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
113        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
114        Ok(param)
115    }
116
117    async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
118        let sink_param = self.get_sink_param(sink_id).await?;
119        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
120        Ok(iceberg_config)
121    }
122}