risingwave_meta/manager/iceberg_compaction/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod gc;
16mod manual;
17mod schedule;
18mod stream;
19
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::anyhow;
25use parking_lot::RwLock;
26use risingwave_common::id::WorkerId;
27use risingwave_connector::sink::SinkParam;
28use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
29use risingwave_connector::sink::iceberg::IcebergConfig;
30use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
31use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
32use tonic::Streaming;
33
34use super::MetaSrvEnv;
35use crate::MetaResult;
36use crate::hummock::IcebergCompactorManagerRef;
37use crate::manager::MetadataManager;
38use crate::rpc::metrics::MetaMetrics;
39
40pub type IcebergCompactionManagerRef = Arc<IcebergCompactionManager>;
41
42pub(crate) type CompactorChangeTx =
43    UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
44
45pub(crate) type CompactorChangeRx =
46    UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
47
48type ManualTaskWaiter = tokio::sync::oneshot::Sender<MetaResult<()>>;
49
50use schedule::CompactionTrack;
51pub use schedule::IcebergCompactionScheduleStatus;
52
53pub struct IcebergCompactionManager {
54    pub env: MetaSrvEnv,
55    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
56
57    metadata_manager: MetadataManager,
58    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
59
60    compactor_streams_change_tx: CompactorChangeTx,
61
62    pub metrics: Arc<MetaMetrics>,
63}
64
65struct IcebergCompactionManagerInner {
66    sink_schedules: HashMap<SinkId, CompactionTrack>,
67    manual_task_waiters: HashMap<u64, ManualTaskWaiter>,
68}
69
70impl IcebergCompactionManager {
71    fn report_timeout(&self) -> Duration {
72        Duration::from_secs(self.env.opts.iceberg_compaction_report_timeout_sec)
73    }
74
75    fn config_refresh_interval(&self) -> Duration {
76        Duration::from_secs(self.env.opts.iceberg_compaction_config_refresh_interval_sec)
77    }
78
79    pub fn build(
80        env: MetaSrvEnv,
81        metadata_manager: MetadataManager,
82        iceberg_compactor_manager: IcebergCompactorManagerRef,
83        metrics: Arc<MetaMetrics>,
84    ) -> (Arc<Self>, CompactorChangeRx) {
85        let (compactor_streams_change_tx, compactor_streams_change_rx) =
86            tokio::sync::mpsc::unbounded_channel();
87        (
88            Arc::new(Self {
89                env,
90                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
91                    sink_schedules: HashMap::default(),
92                    manual_task_waiters: HashMap::default(),
93                })),
94                metadata_manager,
95                iceberg_compactor_manager,
96                compactor_streams_change_tx,
97                metrics,
98            }),
99            compactor_streams_change_rx,
100        )
101    }
102
103    async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
104        let prost_sink_catalog = self
105            .metadata_manager
106            .catalog_controller
107            .get_sink_by_id(sink_id)
108            .await?
109            .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
110        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
111        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
112        Ok(param)
113    }
114
115    async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
116        let sink_param = self.get_sink_param(sink_id).await?;
117        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
118        Ok(iceberg_config)
119    }
120}