risingwave_meta/manager/iceberg_compaction/
mod.rs1mod gc;
16mod manual;
17mod schedule;
18mod stream;
19
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::anyhow;
25use parking_lot::RwLock;
26use risingwave_common::id::WorkerId;
27use risingwave_connector::sink::SinkParam;
28use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
29use risingwave_connector::sink::iceberg::IcebergConfig;
30use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
31use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
32use tonic::Streaming;
33
34use super::MetaSrvEnv;
35use crate::MetaResult;
36use crate::hummock::IcebergCompactorManagerRef;
37use crate::manager::MetadataManager;
38use crate::rpc::metrics::MetaMetrics;
39
40pub type IcebergCompactionManagerRef = Arc<IcebergCompactionManager>;
41
42pub(crate) type CompactorChangeTx =
43 UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
44
45pub(crate) type CompactorChangeRx =
46 UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
47
48type ManualCompactionWaiter = tokio::sync::oneshot::Sender<MetaResult<u64>>;
49
50use schedule::CompactionTrack;
51pub use schedule::IcebergCompactionScheduleStatus;
52
53pub struct IcebergCompactionManager {
54 pub env: MetaSrvEnv,
55 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
56
57 metadata_manager: MetadataManager,
58 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
59
60 compactor_streams_change_tx: CompactorChangeTx,
61
62 pub metrics: Arc<MetaMetrics>,
63}
64
65struct IcebergCompactionManagerInner {
66 sink_schedules: HashMap<SinkId, CompactionTrack>,
67 snapshot_expiration_sink_ids: HashSet<SinkId>,
68 manual_compaction_waiters: HashMap<SinkId, ManualCompactionWaiter>,
69}
70
71impl IcebergCompactionManager {
72 fn report_timeout(&self) -> Duration {
73 Duration::from_secs(self.env.opts.iceberg_compaction_report_timeout_sec)
74 }
75
76 fn config_refresh_interval(&self) -> Duration {
77 Duration::from_secs(self.env.opts.iceberg_compaction_config_refresh_interval_sec)
78 }
79
80 pub fn build(
81 env: MetaSrvEnv,
82 metadata_manager: MetadataManager,
83 iceberg_compactor_manager: IcebergCompactorManagerRef,
84 metrics: Arc<MetaMetrics>,
85 ) -> (Arc<Self>, CompactorChangeRx) {
86 let (compactor_streams_change_tx, compactor_streams_change_rx) =
87 tokio::sync::mpsc::unbounded_channel();
88 (
89 Arc::new(Self {
90 env,
91 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
92 sink_schedules: HashMap::default(),
93 snapshot_expiration_sink_ids: HashSet::default(),
94 manual_compaction_waiters: HashMap::default(),
95 })),
96 metadata_manager,
97 iceberg_compactor_manager,
98 compactor_streams_change_tx,
99 metrics,
100 }),
101 compactor_streams_change_rx,
102 )
103 }
104
105 async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
106 let prost_sink_catalog = self
107 .metadata_manager
108 .catalog_controller
109 .get_sink_by_id(sink_id)
110 .await?
111 .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
112 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
113 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
114 Ok(param)
115 }
116
117 async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
118 let sink_param = self.get_sink_param(sink_id).await?;
119 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
120 Ok(iceberg_config)
121 }
122}