1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use iceberg::table::Table;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
23use risingwave_connector::sink::SinkParam;
24use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
25use risingwave_connector::sink::iceberg::IcebergConfig;
26use risingwave_pb::catalog::PbSink;
27use risingwave_pb::iceberg_compaction::{
28 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
29};
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31use tokio::sync::oneshot::Sender;
32use tokio::task::JoinHandle;
33use tonic::Streaming;
34
35use super::MetaSrvEnv;
36use crate::MetaResult;
37use crate::hummock::{
38 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
39 IcebergCompactor, IcebergCompactorManagerRef,
40};
41use crate::manager::MetadataManager;
42use crate::rpc::metrics::MetaMetrics;
43
44pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
45
46type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
47
48type CompactorChangeRx =
49 UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
50
51#[derive(Debug, Clone)]
52struct CommitInfo {
53 count: usize,
54 next_compaction_time: Option<Instant>,
55 compaction_interval: u64,
56}
57
58impl CommitInfo {
59 fn set_processing(&mut self) {
60 self.count = 0;
61 self.next_compaction_time.take();
63 }
64
65 fn initialize(&mut self) {
66 self.count = 0;
67 self.next_compaction_time =
68 Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
69 }
70
71 fn replace(&mut self, commit_info: CommitInfo) {
72 self.count = commit_info.count;
73 self.next_compaction_time = commit_info.next_compaction_time;
74 self.compaction_interval = commit_info.compaction_interval;
75 }
76
77 fn increase_count(&mut self) {
78 self.count += 1;
79 }
80
81 fn update_compaction_interval(&mut self, compaction_interval: u64) {
82 self.compaction_interval = compaction_interval;
83
84 self.next_compaction_time =
86 Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
87 }
88}
89
90pub struct IcebergCompactionHandle {
91 sink_id: SinkId,
92 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
93 metadata_manager: MetadataManager,
94 handle_success: bool,
95
96 commit_info: CommitInfo,
98}
99
100impl IcebergCompactionHandle {
101 fn new(
102 sink_id: SinkId,
103 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
104 metadata_manager: MetadataManager,
105 commit_info: CommitInfo,
106 ) -> Self {
107 Self {
108 sink_id,
109 inner,
110 metadata_manager,
111 handle_success: false,
112 commit_info,
113 }
114 }
115
116 pub async fn send_compact_task(
117 mut self,
118 compactor: Arc<IcebergCompactor>,
119 task_id: u64,
120 ) -> MetaResult<()> {
121 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
122 let prost_sink_catalog: PbSink = self
123 .metadata_manager
124 .catalog_controller
125 .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
126 .await?
127 .remove(0);
128 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
129 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
130 let result =
131 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
132 task_id,
134 props: param.properties,
135 }));
136
137 if result.is_ok() {
138 self.handle_success = true;
139 }
140
141 result
142 }
143
144 pub fn sink_id(&self) -> SinkId {
145 self.sink_id
146 }
147}
148
149impl Drop for IcebergCompactionHandle {
150 fn drop(&mut self) {
151 if self.handle_success {
152 let mut guard = self.inner.write();
153 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
154 commit_info.initialize();
155 }
156 } else {
157 let mut guard = self.inner.write();
162 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
163 commit_info.replace(self.commit_info.clone());
164 }
165 }
166 }
167}
168
169struct IcebergCompactionManagerInner {
170 pub iceberg_commits: HashMap<SinkId, CommitInfo>,
171}
172
173pub struct IcebergCompactionManager {
174 pub env: MetaSrvEnv,
175 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
176
177 metadata_manager: MetadataManager,
178 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
179
180 compactor_streams_change_tx: CompactorChangeTx,
181
182 pub metrics: Arc<MetaMetrics>,
183}
184
185impl IcebergCompactionManager {
186 pub fn build(
187 env: MetaSrvEnv,
188 metadata_manager: MetadataManager,
189 iceberg_compactor_manager: IcebergCompactorManagerRef,
190 metrics: Arc<MetaMetrics>,
191 ) -> (Arc<Self>, CompactorChangeRx) {
192 let (compactor_streams_change_tx, compactor_streams_change_rx) =
193 tokio::sync::mpsc::unbounded_channel();
194 (
195 Arc::new(Self {
196 env,
197 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
198 iceberg_commits: HashMap::default(),
199 })),
200 metadata_manager,
201 iceberg_compactor_manager,
202 compactor_streams_change_tx,
203 metrics,
204 }),
205 compactor_streams_change_rx,
206 )
207 }
208
209 pub fn compaction_stat_loop(
210 manager: Arc<Self>,
211 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
212 ) -> (JoinHandle<()>, Sender<()>) {
213 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
214 let join_handle = tokio::spawn(async move {
215 loop {
216 tokio::select! {
217 Some(stat) = rx.recv() => {
218 manager.update_iceberg_commit_info(stat);
219 },
220 _ = &mut shutdown_rx => {
221 tracing::info!("Iceberg compaction manager is stopped");
222 return;
223 }
224 }
225 }
226 });
227
228 (join_handle, shutdown_tx)
229 }
230
231 pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
232 let mut guard = self.inner.write();
233
234 let IcebergSinkCompactionUpdate {
235 sink_id,
236 compaction_interval,
237 } = msg;
238
239 let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
241 count: 0,
242 next_compaction_time: Some(
243 Instant::now() + std::time::Duration::from_secs(compaction_interval),
244 ),
245 compaction_interval,
246 });
247
248 commit_info.increase_count();
249 if commit_info.compaction_interval != compaction_interval {
250 commit_info.update_compaction_interval(compaction_interval);
251 }
252 }
253
254 pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
257 let now = Instant::now();
258 let mut guard = self.inner.write();
259 guard
260 .iceberg_commits
261 .iter_mut()
262 .filter(|(_, commit_info)| {
263 commit_info.count > 0
264 && if let Some(next_compaction_time) = commit_info.next_compaction_time {
265 next_compaction_time <= now
266 } else {
267 false
268 }
269 })
270 .sorted_by(|a, b| {
271 b.1.count
272 .cmp(&a.1.count)
273 .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
274 })
275 .take(n)
276 .map(|(sink_id, commit_info)| {
277 let handle = IcebergCompactionHandle::new(
279 *sink_id,
280 self.inner.clone(),
281 self.metadata_manager.clone(),
282 commit_info.clone(),
283 );
284
285 commit_info.set_processing();
286
287 handle
288 })
289 .collect::<Vec<_>>()
290 }
291
292 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
293 let mut guard = self.inner.write();
294 guard.iceberg_commits.remove(&sink_id);
295 }
296
297 pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
298 let prost_sink_catalog: PbSink = self
299 .metadata_manager
300 .catalog_controller
301 .get_sink_by_ids(vec![sink_id.sink_id as i32])
302 .await?
303 .remove(0);
304 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
305 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
306 Ok(param)
307 }
308
309 #[allow(dead_code)]
310 pub async fn load_iceberg_table(&self, sink_id: &SinkId) -> MetaResult<Table> {
311 let sink_param = self.get_sink_param(sink_id).await?;
312 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
313 let table = iceberg_config.load_table().await?;
314 Ok(table)
315 }
316
317 pub fn add_compactor_stream(
318 &self,
319 context_id: u32,
320 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
321 ) {
322 self.compactor_streams_change_tx
323 .send((context_id, req_stream))
324 .unwrap();
325 }
326
327 pub fn iceberg_compaction_event_loop(
328 iceberg_compaction_manager: Arc<Self>,
329 compactor_streams_change_rx: UnboundedReceiver<(
330 u32,
331 Streaming<SubscribeIcebergCompactionEventRequest>,
332 )>,
333 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
334 let mut join_handle_vec = Vec::default();
335
336 let iceberg_compaction_event_handler =
337 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
338
339 let iceberg_compaction_event_dispatcher =
340 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
341
342 let event_loop = IcebergCompactionEventLoop::new(
343 iceberg_compaction_event_dispatcher,
344 iceberg_compaction_manager.metrics.clone(),
345 compactor_streams_change_rx,
346 );
347
348 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
349 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
350
351 join_handle_vec
352 }
353}