risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use iceberg::table::Table;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
23use risingwave_connector::sink::SinkParam;
24use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
25use risingwave_connector::sink::iceberg::IcebergConfig;
26use risingwave_pb::catalog::PbSink;
27use risingwave_pb::iceberg_compaction::{
28    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
29};
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31use tokio::sync::oneshot::Sender;
32use tokio::task::JoinHandle;
33use tonic::Streaming;
34
35use super::MetaSrvEnv;
36use crate::MetaResult;
37use crate::hummock::{
38    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
39    IcebergCompactor, IcebergCompactorManagerRef,
40};
41use crate::manager::MetadataManager;
42use crate::rpc::metrics::MetaMetrics;
43
44pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
45
46type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
47
48type CompactorChangeRx =
49    UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
50
51#[derive(Debug, Clone)]
52struct CommitInfo {
53    count: usize,
54    next_compaction_time: Option<Instant>,
55    compaction_interval: u64,
56}
57
58impl CommitInfo {
59    fn set_processing(&mut self) {
60        self.count = 0;
61        // `set next_compaction_time` to `None` value that means is processing
62        self.next_compaction_time.take();
63    }
64
65    fn initialize(&mut self) {
66        self.count = 0;
67        self.next_compaction_time =
68            Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
69    }
70
71    fn replace(&mut self, commit_info: CommitInfo) {
72        self.count = commit_info.count;
73        self.next_compaction_time = commit_info.next_compaction_time;
74        self.compaction_interval = commit_info.compaction_interval;
75    }
76
77    fn increase_count(&mut self) {
78        self.count += 1;
79    }
80
81    fn update_compaction_interval(&mut self, compaction_interval: u64) {
82        self.compaction_interval = compaction_interval;
83
84        // reset the next compaction time
85        self.next_compaction_time =
86            Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
87    }
88}
89
90pub struct IcebergCompactionHandle {
91    sink_id: SinkId,
92    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
93    metadata_manager: MetadataManager,
94    handle_success: bool,
95
96    /// The commit info of the iceberg compaction handle for recovery.
97    commit_info: CommitInfo,
98}
99
100impl IcebergCompactionHandle {
101    fn new(
102        sink_id: SinkId,
103        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
104        metadata_manager: MetadataManager,
105        commit_info: CommitInfo,
106    ) -> Self {
107        Self {
108            sink_id,
109            inner,
110            metadata_manager,
111            handle_success: false,
112            commit_info,
113        }
114    }
115
116    pub async fn send_compact_task(
117        mut self,
118        compactor: Arc<IcebergCompactor>,
119        task_id: u64,
120    ) -> MetaResult<()> {
121        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
122        let prost_sink_catalog: PbSink = self
123            .metadata_manager
124            .catalog_controller
125            .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
126            .await?
127            .remove(0);
128        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
129        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
130        let result =
131            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
132                // Todo! Use iceberg's compaction task ID
133                task_id,
134                props: param.properties,
135            }));
136
137        if result.is_ok() {
138            self.handle_success = true;
139        }
140
141        result
142    }
143
144    pub fn sink_id(&self) -> SinkId {
145        self.sink_id
146    }
147}
148
149impl Drop for IcebergCompactionHandle {
150    fn drop(&mut self) {
151        if self.handle_success {
152            let mut guard = self.inner.write();
153            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
154                commit_info.initialize();
155            }
156        } else {
157            // If the handle is not successful, we need to reset the commit info
158            // to the original state.
159            // This is to avoid the case where the handle is dropped before the
160            // compaction task is sent.
161            let mut guard = self.inner.write();
162            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
163                commit_info.replace(self.commit_info.clone());
164            }
165        }
166    }
167}
168
169struct IcebergCompactionManagerInner {
170    pub iceberg_commits: HashMap<SinkId, CommitInfo>,
171}
172
173pub struct IcebergCompactionManager {
174    pub env: MetaSrvEnv,
175    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
176
177    metadata_manager: MetadataManager,
178    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
179
180    compactor_streams_change_tx: CompactorChangeTx,
181
182    pub metrics: Arc<MetaMetrics>,
183}
184
185impl IcebergCompactionManager {
186    pub fn build(
187        env: MetaSrvEnv,
188        metadata_manager: MetadataManager,
189        iceberg_compactor_manager: IcebergCompactorManagerRef,
190        metrics: Arc<MetaMetrics>,
191    ) -> (Arc<Self>, CompactorChangeRx) {
192        let (compactor_streams_change_tx, compactor_streams_change_rx) =
193            tokio::sync::mpsc::unbounded_channel();
194        (
195            Arc::new(Self {
196                env,
197                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
198                    iceberg_commits: HashMap::default(),
199                })),
200                metadata_manager,
201                iceberg_compactor_manager,
202                compactor_streams_change_tx,
203                metrics,
204            }),
205            compactor_streams_change_rx,
206        )
207    }
208
209    pub fn compaction_stat_loop(
210        manager: Arc<Self>,
211        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
212    ) -> (JoinHandle<()>, Sender<()>) {
213        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
214        let join_handle = tokio::spawn(async move {
215            loop {
216                tokio::select! {
217                    Some(stat) = rx.recv() => {
218                        manager.update_iceberg_commit_info(stat);
219                    },
220                    _ = &mut shutdown_rx => {
221                        tracing::info!("Iceberg compaction manager is stopped");
222                        return;
223                    }
224                }
225            }
226        });
227
228        (join_handle, shutdown_tx)
229    }
230
231    pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
232        let mut guard = self.inner.write();
233
234        let IcebergSinkCompactionUpdate {
235            sink_id,
236            compaction_interval,
237        } = msg;
238
239        // if the compaction interval is changed, we need to reset the commit info when the compaction task is sent of initialized
240        let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
241            count: 0,
242            next_compaction_time: Some(
243                Instant::now() + std::time::Duration::from_secs(compaction_interval),
244            ),
245            compaction_interval,
246        });
247
248        commit_info.increase_count();
249        if commit_info.compaction_interval != compaction_interval {
250            commit_info.update_compaction_interval(compaction_interval);
251        }
252    }
253
254    /// Get the top N iceberg commit sink ids
255    /// Sorted by commit count and next compaction time
256    pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
257        let now = Instant::now();
258        let mut guard = self.inner.write();
259        guard
260            .iceberg_commits
261            .iter_mut()
262            .filter(|(_, commit_info)| {
263                commit_info.count > 0
264                    && if let Some(next_compaction_time) = commit_info.next_compaction_time {
265                        next_compaction_time <= now
266                    } else {
267                        false
268                    }
269            })
270            .sorted_by(|a, b| {
271                b.1.count
272                    .cmp(&a.1.count)
273                    .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
274            })
275            .take(n)
276            .map(|(sink_id, commit_info)| {
277                // reset the commit count and next compaction time and avoid double call
278                let handle = IcebergCompactionHandle::new(
279                    *sink_id,
280                    self.inner.clone(),
281                    self.metadata_manager.clone(),
282                    commit_info.clone(),
283                );
284
285                commit_info.set_processing();
286
287                handle
288            })
289            .collect::<Vec<_>>()
290    }
291
292    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
293        let mut guard = self.inner.write();
294        guard.iceberg_commits.remove(&sink_id);
295    }
296
297    pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
298        let prost_sink_catalog: PbSink = self
299            .metadata_manager
300            .catalog_controller
301            .get_sink_by_ids(vec![sink_id.sink_id as i32])
302            .await?
303            .remove(0);
304        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
305        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
306        Ok(param)
307    }
308
309    #[allow(dead_code)]
310    pub async fn load_iceberg_table(&self, sink_id: &SinkId) -> MetaResult<Table> {
311        let sink_param = self.get_sink_param(sink_id).await?;
312        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
313        let table = iceberg_config.load_table().await?;
314        Ok(table)
315    }
316
317    pub fn add_compactor_stream(
318        &self,
319        context_id: u32,
320        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
321    ) {
322        self.compactor_streams_change_tx
323            .send((context_id, req_stream))
324            .unwrap();
325    }
326
327    pub fn iceberg_compaction_event_loop(
328        iceberg_compaction_manager: Arc<Self>,
329        compactor_streams_change_rx: UnboundedReceiver<(
330            u32,
331            Streaming<SubscribeIcebergCompactionEventRequest>,
332        )>,
333    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
334        let mut join_handle_vec = Vec::default();
335
336        let iceberg_compaction_event_handler =
337            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
338
339        let iceberg_compaction_event_dispatcher =
340            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
341
342        let event_loop = IcebergCompactionEventLoop::new(
343            iceberg_compaction_event_dispatcher,
344            iceberg_compaction_manager.metrics.clone(),
345            compactor_streams_change_rx,
346        );
347
348        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
349        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
350
351        join_handle_vec
352    }
353}