risingwave_meta/manager/iceberg_compaction/
stream.rs1use std::sync::Arc;
16
17use risingwave_common::id::WorkerId;
18use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
19use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
20use tokio::sync::mpsc::UnboundedReceiver;
21use tokio::sync::oneshot::Sender;
22use tokio::task::JoinHandle;
23use tonic::Streaming;
24
25use super::*;
26use crate::hummock::{
27 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
28};
29
30impl IcebergCompactionManager {
31 pub fn compaction_stat_loop(
32 manager: Arc<Self>,
33 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
34 ) -> (JoinHandle<()>, Sender<()>) {
35 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
36 let join_handle = tokio::spawn(async move {
37 loop {
38 tokio::select! {
39 Some(stat) = rx.recv() => {
40 manager.update_iceberg_commit_info(stat).await;
41 },
42 _ = &mut shutdown_rx => {
43 tracing::info!("Iceberg compaction manager is stopped");
44 return;
45 }
46 }
47 }
48 });
49
50 (join_handle, shutdown_tx)
51 }
52
53 pub fn add_compactor_stream(
54 &self,
55 context_id: WorkerId,
56 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
57 ) {
58 if self
59 .compactor_streams_change_tx
60 .send((context_id, req_stream))
61 .is_err()
62 {
63 tracing::warn!(context_id = %context_id, "Failed to enqueue iceberg compactor stream");
64 }
65 }
66
67 pub fn iceberg_compaction_event_loop(
68 iceberg_compaction_manager: Arc<Self>,
69 compactor_streams_change_rx: CompactorChangeRx,
70 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
71 let mut join_handle_vec = Vec::default();
72
73 let iceberg_compaction_event_handler =
74 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
75
76 let iceberg_compaction_event_dispatcher =
77 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
78
79 let event_loop = IcebergCompactionEventLoop::new(
80 iceberg_compaction_event_dispatcher,
81 iceberg_compaction_manager.metrics.clone(),
82 compactor_streams_change_rx,
83 );
84
85 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
86 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
87
88 join_handle_vec
89 }
90}