Skip to main content

risingwave_meta/hummock/
compactor_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26    CancelCompactTask, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
29use risingwave_pb::iceberg_compaction::{
30    CancelCompactTask as IcebergCancelCompactTask, SubscribeIcebergCompactionEventResponse,
31};
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33
34use crate::MetaResult;
35use crate::hummock::model::ext::compaction_task_model_to_assignment;
36use crate::manager::MetaSrvEnv;
37use crate::model::MetadataModelError;
38
39pub type CompactorManagerRef = Arc<CompactorManager>;
40pub type IcebergCompactorManagerRef = Arc<IcebergCompactorManager>;
41
42pub const TASK_RUN_TOO_LONG: &str = "running too long";
43pub const TASK_NOT_FOUND: &str = "task not found";
44pub const TASK_NORMAL: &str = "task is normal, please wait some time";
45
46type CompactorSubscribeStreamSender = UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>;
47type CompactorSubscribeStreamReceiver =
48    UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>>;
49
50type IcebergCompactorSubscribeStreamSender =
51    UnboundedSender<MetaResult<SubscribeIcebergCompactionEventResponse>>;
52type IcebergCompactorSubscribeStreamReceiver =
53    UnboundedReceiver<MetaResult<SubscribeIcebergCompactionEventResponse>>;
54
55type CompactorSubscribeResponseEvent = ResponseEvent;
56
57type IcebergCompactorSubscribeResponseEvent = IcebergResponseEvent;
58
59/// Wraps the stream between meta node and compactor node.
60/// Compactor node will re-establish the stream when the previous one fails.
61pub struct Compactor {
62    context_id: HummockContextId,
63    sender: CompactorSubscribeStreamSender,
64}
65
66pub struct IcebergCompactor {
67    context_id: HummockContextId,
68    sender: IcebergCompactorSubscribeStreamSender,
69}
70
71struct TaskHeartbeat {
72    task: CompactTask,
73    num_ssts_sealed: u32,
74    num_ssts_uploaded: u32,
75    num_progress_key: u64,
76    num_pending_read_io: u64,
77    num_pending_write_io: u64,
78    create_time: Instant,
79    expire_at: u64,
80
81    update_at: u64,
82}
83
84impl Compactor {
85    pub fn new(context_id: HummockContextId, sender: CompactorSubscribeStreamSender) -> Self {
86        Self { context_id, sender }
87    }
88
89    pub fn send_event(&self, event: CompactorSubscribeResponseEvent) -> MetaResult<()> {
90        fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
91            "compaction_send_task_fail"
92        )
93        .into()));
94
95        self.sender
96            .send(Ok(SubscribeCompactionEventResponse {
97                create_at: SystemTime::now()
98                    .duration_since(SystemTime::UNIX_EPOCH)
99                    .expect("Clock may have gone backwards")
100                    .as_millis() as u64,
101                event: Some(event),
102            }))
103            .map_err(|e| anyhow::anyhow!(e))?;
104
105        Ok(())
106    }
107
108    pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
109        self.send_event(ResponseEvent::CancelCompactTask(CancelCompactTask {
110            context_id: self.context_id,
111            task_id,
112        }))
113    }
114
115    pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
116        for task_id in task_ids {
117            self.cancel_task(*task_id)?;
118        }
119        Ok(())
120    }
121
122    pub fn context_id(&self) -> HummockContextId {
123        self.context_id
124    }
125}
126
127pub trait CompactorManagerTrait {
128    fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver;
129    fn remove_compactor(&self, context_id: HummockContextId);
130    fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>>;
131    fn next_compactor(&self) -> Option<Arc<Compactor>>;
132    fn compactor_num(&self) -> usize;
133}
134
135/// `CompactorManagerInner` maintains compactors which can process compact task.
136/// A compact task is tracked in `HummockManager::Compaction` via both `CompactStatus` and
137/// `CompactTaskAssignment`.
138///
139/// A compact task can be in one of these states:
140/// 1. Success: an assigned task is reported as success via `CompactStatus::report_compact_task`.
141///    It's the final state.
142/// 2. Failed: an Failed task is reported as success via `CompactStatus::report_compact_task`.
143///    It's the final state.
144/// 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's
145///    the final state.
146pub struct CompactorManagerInner {
147    pub task_expired_seconds: u64,
148    pub heartbeat_expired_seconds: u64,
149    task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
150
151    /// The outer lock is a `RwLock`, so we should still be able to modify each compactor
152    pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
153}
154
155impl CompactorManagerInner {
156    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
157        use risingwave_meta_model::compaction_task;
158        use sea_orm::EntityTrait;
159        // Retrieve the existing task assignments from metastore.
160        let task_assignment: Vec<_> = compaction_task::Entity::find()
161            .all(&env.meta_store_ref().conn)
162            .await
163            .map_err(MetadataModelError::from)?
164            .into_iter()
165            .map(compaction_task_model_to_assignment)
166            .collect();
167        let mut manager = Self {
168            task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
169            heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
170            task_heartbeats: Default::default(),
171            compactor_map: Default::default(),
172        };
173        // Initialize heartbeat for existing tasks.
174        task_assignment.into_iter().for_each(|assignment| {
175            manager.initiate_task_heartbeat(assignment.compact_task);
176        });
177        Ok(manager)
178    }
179
180    /// Only used for unit test.
181    pub fn for_test() -> Self {
182        Self {
183            task_expired_seconds: 1,
184            heartbeat_expired_seconds: 1,
185            task_heartbeats: Default::default(),
186            compactor_map: Default::default(),
187        }
188    }
189
190    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
191        use rand::Rng;
192        if self.compactor_map.is_empty() {
193            return None;
194        }
195
196        let rand_index = rand::rng().random_range(0..self.compactor_map.len());
197        let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
198
199        Some(compactor)
200    }
201
202    /// Retrieve a receiver of tasks for the compactor identified by `context_id`. The sender should
203    /// be obtained by calling one of the compactor getters.
204    ///
205    ///  If `add_compactor` is called with the same `context_id` more than once, the only cause
206    /// would be compactor re-subscription, as `context_id` is a monotonically increasing
207    /// sequence.
208    pub fn add_compactor(
209        &mut self,
210        context_id: HummockContextId,
211    ) -> CompactorSubscribeStreamReceiver {
212        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
213
214        self.compactor_map
215            .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
216
217        tracing::info!(context_id = %context_id, "Added compactor session");
218
219        rx
220    }
221
222    /// Used when meta exiting to support graceful shutdown.
223    pub fn abort_all_compactors(&mut self) {
224        while let Some(compactor) = self.next_compactor() {
225            self.remove_compactor(compactor.context_id);
226        }
227    }
228
229    pub fn remove_compactor(&mut self, context_id: HummockContextId) {
230        if self.compactor_map.remove(&context_id).is_some() {
231            tracing::info!(context_id = %context_id, "Removed compactor session")
232        };
233    }
234
235    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
236        self.compactor_map.get(&context_id).cloned()
237    }
238
239    pub fn check_tasks_status(
240        &self,
241        tasks: &[HummockCompactionTaskId],
242        slow_task_duration: Duration,
243    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
244        let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
245        let mut ret = HashMap::default();
246        for TaskHeartbeat {
247            task, create_time, ..
248        } in self.task_heartbeats.values()
249        {
250            if !tasks_ids.contains(&task.task_id) {
251                continue;
252            }
253            let pending_time = create_time.elapsed();
254            if pending_time > slow_task_duration {
255                ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
256            } else {
257                ret.insert(task.task_id, (pending_time, TASK_NORMAL));
258            }
259        }
260
261        for task_id in tasks {
262            if !ret.contains_key(task_id) {
263                ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
264            }
265        }
266        ret
267    }
268
269    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
270        let heartbeat_expired_ts: u64 = SystemTime::now()
271            .duration_since(SystemTime::UNIX_EPOCH)
272            .expect("Clock may have gone backwards")
273            .as_secs()
274            - self.heartbeat_expired_seconds;
275        Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
276    }
277
278    fn get_heartbeat_expired_tasks_impl(
279        task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
280        heartbeat_expired_ts: u64,
281    ) -> Vec<CompactTask> {
282        let mut cancellable_tasks = vec![];
283        const MAX_TASK_DURATION_SEC: u64 = 2700;
284
285        for TaskHeartbeat {
286            expire_at,
287            task,
288            create_time,
289            num_ssts_sealed,
290            num_ssts_uploaded,
291            num_progress_key,
292            num_pending_read_io,
293            num_pending_write_io,
294            update_at,
295        } in task_heartbeats.values()
296        {
297            if *update_at < heartbeat_expired_ts {
298                cancellable_tasks.push(task.clone());
299            }
300
301            let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
302            if task_duration_too_long {
303                let compact_task_statistics = statistics_compact_task(task);
304                tracing::info!(
305                    "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
306                        pending_read_io_count {} pending_write_io_count {} target_level {} \
307                        base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
308                    task.compaction_group_id,
309                    task.task_id,
310                    create_time,
311                    expire_at,
312                    num_ssts_sealed,
313                    num_ssts_uploaded,
314                    num_progress_key,
315                    num_pending_read_io,
316                    num_pending_write_io,
317                    task.target_level,
318                    task.base_level,
319                    task.target_sub_level_id,
320                    task.task_type.as_str_name(),
321                    compact_task_statistics
322                );
323            }
324        }
325        cancellable_tasks
326    }
327
328    pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
329        let now = SystemTime::now()
330            .duration_since(SystemTime::UNIX_EPOCH)
331            .expect("Clock may have gone backwards")
332            .as_secs();
333        self.task_heartbeats.insert(
334            task.task_id,
335            TaskHeartbeat {
336                task,
337                num_ssts_sealed: 0,
338                num_ssts_uploaded: 0,
339                num_progress_key: 0,
340                num_pending_read_io: 0,
341                num_pending_write_io: 0,
342                create_time: Instant::now(),
343                expire_at: now + self.task_expired_seconds,
344                update_at: now,
345            },
346        );
347    }
348
349    pub fn remove_task_heartbeat(&mut self, task_id: u64) {
350        self.task_heartbeats.remove(&task_id).unwrap();
351    }
352
353    pub fn update_task_heartbeats(
354        &mut self,
355        progress_list: &Vec<CompactTaskProgress>,
356    ) -> Vec<CompactTask> {
357        let now = SystemTime::now()
358            .duration_since(SystemTime::UNIX_EPOCH)
359            .expect("Clock may have gone backwards")
360            .as_secs();
361        let mut cancel_tasks = vec![];
362        for progress in progress_list {
363            if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
364                task_ref.update_at = now;
365
366                if task_ref.num_ssts_sealed < progress.num_ssts_sealed
367                    || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
368                    || task_ref.num_progress_key < progress.num_progress_key
369                {
370                    // Refresh the expired of the task as it is showing progress.
371                    task_ref.expire_at = now + self.task_expired_seconds;
372                    task_ref.num_ssts_sealed = progress.num_ssts_sealed;
373                    task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
374                    task_ref.num_progress_key = progress.num_progress_key;
375                }
376                task_ref.num_pending_read_io = progress.num_pending_read_io;
377                task_ref.num_pending_write_io = progress.num_pending_write_io;
378
379                // timeout check
380                if task_ref.expire_at < now {
381                    // cancel
382                    cancel_tasks.push(task_ref.task.clone())
383                }
384            }
385        }
386
387        cancel_tasks
388    }
389
390    pub fn compactor_num(&self) -> usize {
391        self.compactor_map.len()
392    }
393
394    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
395        self.task_heartbeats
396            .values()
397            .map(|hb| CompactTaskProgress {
398                task_id: hb.task.task_id,
399                num_ssts_sealed: hb.num_ssts_sealed,
400                num_ssts_uploaded: hb.num_ssts_uploaded,
401                num_progress_key: hb.num_progress_key,
402                num_pending_read_io: hb.num_pending_read_io,
403                num_pending_write_io: hb.num_pending_write_io,
404                compaction_group_id: Some(hb.task.compaction_group_id),
405            })
406            .collect()
407    }
408}
409
410pub struct CompactorManager {
411    inner: Arc<RwLock<CompactorManagerInner>>,
412}
413
414impl CompactorManager {
415    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
416        let inner = CompactorManagerInner::with_meta(env).await?;
417
418        Ok(Self {
419            inner: Arc::new(RwLock::new(inner)),
420        })
421    }
422
423    /// Only used for unit test.
424    pub fn for_test() -> Self {
425        let inner = CompactorManagerInner::for_test();
426        Self {
427            inner: Arc::new(RwLock::new(inner)),
428        }
429    }
430
431    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
432        self.inner.read().next_compactor()
433    }
434
435    pub fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver {
436        self.inner.write().add_compactor(context_id)
437    }
438
439    pub fn abort_all_compactors(&self) {
440        self.inner.write().abort_all_compactors();
441    }
442
443    pub fn remove_compactor(&self, context_id: HummockContextId) {
444        self.inner.write().remove_compactor(context_id)
445    }
446
447    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
448        self.inner.read().get_compactor(context_id)
449    }
450
451    pub fn check_tasks_status(
452        &self,
453        tasks: &[HummockCompactionTaskId],
454        slow_task_duration: Duration,
455    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
456        self.inner
457            .read()
458            .check_tasks_status(tasks, slow_task_duration)
459    }
460
461    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
462        self.inner.read().get_heartbeat_expired_tasks()
463    }
464
465    pub fn initiate_task_heartbeat(&self, task: CompactTask) {
466        self.inner.write().initiate_task_heartbeat(task);
467    }
468
469    pub fn remove_task_heartbeat(&self, task_id: u64) {
470        self.inner.write().remove_task_heartbeat(task_id);
471    }
472
473    pub fn update_task_heartbeats(
474        &self,
475        progress_list: &Vec<CompactTaskProgress>,
476    ) -> Vec<CompactTask> {
477        self.inner.write().update_task_heartbeats(progress_list)
478    }
479
480    pub fn compactor_num(&self) -> usize {
481        self.inner.read().compactor_num()
482    }
483
484    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
485        self.inner.read().get_progress()
486    }
487}
488
489impl IcebergCompactor {
490    pub fn new(
491        context_id: HummockContextId,
492        sender: IcebergCompactorSubscribeStreamSender,
493    ) -> Self {
494        Self { context_id, sender }
495    }
496
497    pub fn context_id(&self) -> HummockContextId {
498        self.context_id
499    }
500
501    pub fn send_event(&self, event: IcebergCompactorSubscribeResponseEvent) -> MetaResult<()> {
502        fail_point!("iceberg_compaction_send_task_fail", |_| Err(
503            anyhow::anyhow!("iceberg_compaction_send_task_fail").into()
504        ));
505
506        self.sender
507            .send(Ok(SubscribeIcebergCompactionEventResponse {
508                create_at: SystemTime::now()
509                    .duration_since(SystemTime::UNIX_EPOCH)
510                    .expect("Clock may have gone backwards")
511                    .as_millis() as u64,
512                event: Some(event),
513            }))
514            .map_err(|e| anyhow::anyhow!(e))?;
515
516        Ok(())
517    }
518
519    pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
520        self.send_event(IcebergResponseEvent::CancelCompactTask(
521            IcebergCancelCompactTask { task_id },
522        ))
523    }
524}
525
526pub struct IcebergCompactorManagerInner {
527    pub compactor_map: HashMap<HummockContextId, Arc<IcebergCompactor>>,
528}
529
530pub struct IcebergCompactorManager {
531    inner: Arc<RwLock<IcebergCompactorManagerInner>>,
532}
533
534impl IcebergCompactorManager {
535    pub fn new() -> Self {
536        Self {
537            inner: Arc::new(RwLock::new(IcebergCompactorManagerInner {
538                compactor_map: HashMap::new(),
539            })),
540        }
541    }
542
543    pub fn add_compactor(
544        &self,
545        context_id: HummockContextId,
546    ) -> IcebergCompactorSubscribeStreamReceiver {
547        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
548        self.inner
549            .write()
550            .compactor_map
551            .insert(context_id, Arc::new(IcebergCompactor::new(context_id, tx)));
552        tracing::info!(context_id = %context_id, "Added iceberg compactor session");
553        rx
554    }
555
556    pub fn remove_compactor(&self, context_id: HummockContextId) {
557        if self
558            .inner
559            .write()
560            .compactor_map
561            .remove(&context_id)
562            .is_some()
563        {
564            tracing::info!(context_id = %context_id, "Removed iceberg compactor session");
565        }
566    }
567
568    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<IcebergCompactor>> {
569        self.inner.read().compactor_map.get(&context_id).cloned()
570    }
571
572    pub fn next_compactor(&self) -> Option<Arc<IcebergCompactor>> {
573        use rand::Rng;
574        let compactor_map = &self.inner.read().compactor_map;
575        if compactor_map.is_empty() {
576            return None;
577        }
578        let rand_index = rand::rng().random_range(0..compactor_map.len());
579        compactor_map.values().nth(rand_index).cloned()
580    }
581
582    pub fn compactor_num(&self) -> usize {
583        self.inner.read().compactor_map.len()
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use std::sync::Arc;
590    use std::time::Duration;
591
592    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
593    use risingwave_pb::hummock::CompactTaskProgress;
594    use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
595    use risingwave_rpc_client::HummockMetaClient;
596
597    use crate::hummock::compaction::selector::default_compaction_selector;
598    use crate::hummock::test_utils::{
599        add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
600    };
601    use crate::hummock::{CompactorManager, IcebergCompactorManager, MockHummockMetaClient};
602
603    #[tokio::test]
604    async fn test_compactor_manager() {
605        // Initialize metastore with task assignment.
606        let (env, context_id) = {
607            let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
608            let context_id = worker_id as _;
609            let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
610                MockHummockMetaClient::new(hummock_manager.clone(), context_id),
611            );
612            let compactor_manager = hummock_manager.compactor_manager.clone();
613            register_table_ids_to_compaction_group(
614                hummock_manager.as_ref(),
615                &[1],
616                StaticCompactionGroupId::StateDefault,
617            )
618            .await;
619            let _sst_infos =
620                add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
621            let _receiver = compactor_manager.add_compactor(context_id);
622            hummock_manager
623                .get_compact_task(
624                    StaticCompactionGroupId::StateDefault,
625                    &mut *default_compaction_selector(),
626                )
627                .await
628                .unwrap()
629                .unwrap();
630            (env, context_id)
631        };
632
633        // Restart. Set task_expired_seconds to 0 only to speed up test.
634        let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
635        // Because task assignment exists.
636        // Because compactor gRPC is not established yet.
637        assert_eq!(compactor_manager.compactor_num(), 0);
638        assert!(compactor_manager.get_compactor(context_id).is_none());
639
640        // Ensure task is expired.
641        tokio::time::sleep(Duration::from_secs(2)).await;
642        let expired = compactor_manager.get_heartbeat_expired_tasks();
643        assert_eq!(expired.len(), 1);
644
645        // Mimic no-op compaction heartbeat
646        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
647
648        // Mimic compaction heartbeat with invalid task id
649        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
650            task_id: expired[0].task_id + 1,
651            num_ssts_sealed: 1,
652            num_ssts_uploaded: 1,
653            num_progress_key: 100,
654            ..Default::default()
655        }]);
656        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
657
658        // Mimic effective compaction heartbeat
659        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
660            task_id: expired[0].task_id,
661            num_ssts_sealed: 1,
662            num_ssts_uploaded: 1,
663            num_progress_key: 100,
664            ..Default::default()
665        }]);
666        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
667
668        // Test add
669        assert_eq!(compactor_manager.compactor_num(), 0);
670        assert!(compactor_manager.get_compactor(context_id).is_none());
671        compactor_manager.add_compactor(context_id);
672        assert_eq!(compactor_manager.compactor_num(), 1);
673        assert_eq!(
674            compactor_manager
675                .get_compactor(context_id)
676                .unwrap()
677                .context_id(),
678            context_id
679        );
680        // Test remove
681        compactor_manager.remove_compactor(context_id);
682        assert_eq!(compactor_manager.compactor_num(), 0);
683        assert!(compactor_manager.get_compactor(context_id).is_none());
684    }
685
686    #[test]
687    fn test_iceberg_compactor_manager() {
688        // Test Add and Remove Iceberg Compactor
689        let iceberg_context_id = 1000.into();
690        let iceberg_compactor_manager = IcebergCompactorManager::new();
691        assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
692        assert!(
693            iceberg_compactor_manager
694                .get_compactor(iceberg_context_id)
695                .is_none()
696        );
697        let mut receiver = iceberg_compactor_manager.add_compactor(iceberg_context_id);
698        assert_eq!(iceberg_compactor_manager.compactor_num(), 1);
699        let compactor = iceberg_compactor_manager
700            .get_compactor(iceberg_context_id)
701            .unwrap();
702        assert_eq!(compactor.context_id(), iceberg_context_id);
703
704        compactor.cancel_task(42).unwrap();
705        let event = receiver.try_recv().unwrap().unwrap().event.unwrap();
706        match event {
707            IcebergResponseEvent::CancelCompactTask(cancel_task) => {
708                assert_eq!(cancel_task.task_id, 42);
709            }
710            other => panic!("unexpected iceberg compactor event: {other:?}"),
711        }
712
713        // Test remove
714        iceberg_compactor_manager.remove_compactor(iceberg_context_id);
715        assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
716        assert!(
717            iceberg_compactor_manager
718                .get_compactor(iceberg_context_id)
719                .is_none()
720        );
721    }
722}