risingwave_meta/manager/iceberg_compaction/
stream.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::id::WorkerId;
18use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
19use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
20use tokio::sync::mpsc::UnboundedReceiver;
21use tokio::sync::oneshot::Sender;
22use tokio::task::JoinHandle;
23use tonic::Streaming;
24
25use super::*;
26use crate::hummock::{
27    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
28};
29
30impl IcebergCompactionManager {
31    pub fn compaction_stat_loop(
32        manager: Arc<Self>,
33        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
34    ) -> (JoinHandle<()>, Sender<()>) {
35        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
36        let join_handle = tokio::spawn(async move {
37            loop {
38                tokio::select! {
39                    Some(stat) = rx.recv() => {
40                        manager.update_iceberg_commit_info(stat).await;
41                    },
42                    _ = &mut shutdown_rx => {
43                        tracing::info!("Iceberg compaction manager is stopped");
44                        return;
45                    }
46                }
47            }
48        });
49
50        (join_handle, shutdown_tx)
51    }
52
53    pub fn add_compactor_stream(
54        &self,
55        context_id: WorkerId,
56        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
57    ) {
58        if self
59            .compactor_streams_change_tx
60            .send((context_id, req_stream))
61            .is_err()
62        {
63            tracing::warn!(context_id = %context_id, "Failed to enqueue iceberg compactor stream");
64        }
65    }
66
67    pub fn iceberg_compaction_event_loop(
68        iceberg_compaction_manager: Arc<Self>,
69        compactor_streams_change_rx: CompactorChangeRx,
70    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
71        let mut join_handle_vec = Vec::default();
72
73        let iceberg_compaction_event_handler =
74            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
75
76        let iceberg_compaction_event_dispatcher =
77            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
78
79        let event_loop = IcebergCompactionEventLoop::new(
80            iceberg_compaction_event_dispatcher,
81            iceberg_compaction_manager.metrics.clone(),
82            compactor_streams_change_rx,
83        );
84
85        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
86        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
87
88        join_handle_vec
89    }
90}