risingwave_meta/hummock/
compactor_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26    CancelCompactTask, CompactTaskAssignment, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
29use risingwave_pb::iceberg_compaction::{
30    CancelCompactTask as IcebergCancelCompactTask, SubscribeIcebergCompactionEventResponse,
31};
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33
34use crate::MetaResult;
35use crate::manager::MetaSrvEnv;
36use crate::model::MetadataModelError;
37
38pub type CompactorManagerRef = Arc<CompactorManager>;
39pub type IcebergCompactorManagerRef = Arc<IcebergCompactorManager>;
40
41pub const TASK_RUN_TOO_LONG: &str = "running too long";
42pub const TASK_NOT_FOUND: &str = "task not found";
43pub const TASK_NORMAL: &str = "task is normal, please wait some time";
44
45type CompactorSubscribeStreamSender = UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>;
46type CompactorSubscribeStreamReceiver =
47    UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>>;
48
49type IcebergCompactorSubscribeStreamSender =
50    UnboundedSender<MetaResult<SubscribeIcebergCompactionEventResponse>>;
51type IcebergCompactorSubscribeStreamReceiver =
52    UnboundedReceiver<MetaResult<SubscribeIcebergCompactionEventResponse>>;
53
54type CompactorSubscribeResponseEvent = ResponseEvent;
55
56type IcebergCompactorSubscribeResponseEvent = IcebergResponseEvent;
57
58/// Wraps the stream between meta node and compactor node.
59/// Compactor node will re-establish the stream when the previous one fails.
60pub struct Compactor {
61    context_id: HummockContextId,
62    sender: CompactorSubscribeStreamSender,
63}
64
65pub struct IcebergCompactor {
66    context_id: HummockContextId,
67    sender: IcebergCompactorSubscribeStreamSender,
68}
69
70struct TaskHeartbeat {
71    task: CompactTask,
72    num_ssts_sealed: u32,
73    num_ssts_uploaded: u32,
74    num_progress_key: u64,
75    num_pending_read_io: u64,
76    num_pending_write_io: u64,
77    create_time: Instant,
78    expire_at: u64,
79
80    update_at: u64,
81}
82
83impl Compactor {
84    pub fn new(context_id: HummockContextId, sender: CompactorSubscribeStreamSender) -> Self {
85        Self { context_id, sender }
86    }
87
88    pub fn send_event(&self, event: CompactorSubscribeResponseEvent) -> MetaResult<()> {
89        fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
90            "compaction_send_task_fail"
91        )
92        .into()));
93
94        self.sender
95            .send(Ok(SubscribeCompactionEventResponse {
96                create_at: SystemTime::now()
97                    .duration_since(SystemTime::UNIX_EPOCH)
98                    .expect("Clock may have gone backwards")
99                    .as_millis() as u64,
100                event: Some(event),
101            }))
102            .map_err(|e| anyhow::anyhow!(e))?;
103
104        Ok(())
105    }
106
107    pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
108        self.send_event(ResponseEvent::CancelCompactTask(CancelCompactTask {
109            context_id: self.context_id,
110            task_id,
111        }))
112    }
113
114    pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
115        for task_id in task_ids {
116            self.cancel_task(*task_id)?;
117        }
118        Ok(())
119    }
120
121    pub fn context_id(&self) -> HummockContextId {
122        self.context_id
123    }
124}
125
126pub trait CompactorManagerTrait {
127    fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver;
128    fn remove_compactor(&self, context_id: HummockContextId);
129    fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>>;
130    fn next_compactor(&self) -> Option<Arc<Compactor>>;
131    fn compactor_num(&self) -> usize;
132}
133
134/// `CompactorManagerInner` maintains compactors which can process compact task.
135/// A compact task is tracked in `HummockManager::Compaction` via both `CompactStatus` and
136/// `CompactTaskAssignment`.
137///
138/// A compact task can be in one of these states:
139/// 1. Success: an assigned task is reported as success via `CompactStatus::report_compact_task`.
140///    It's the final state.
141/// 2. Failed: an Failed task is reported as success via `CompactStatus::report_compact_task`.
142///    It's the final state.
143/// 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's
144///    the final state.
145pub struct CompactorManagerInner {
146    pub task_expired_seconds: u64,
147    pub heartbeat_expired_seconds: u64,
148    task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
149
150    /// The outer lock is a `RwLock`, so we should still be able to modify each compactor
151    pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
152}
153
154impl CompactorManagerInner {
155    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
156        use risingwave_meta_model::compaction_task;
157        use sea_orm::EntityTrait;
158        // Retrieve the existing task assignments from metastore.
159        let task_assignment: Vec<CompactTaskAssignment> = compaction_task::Entity::find()
160            .all(&env.meta_store_ref().conn)
161            .await
162            .map_err(MetadataModelError::from)?
163            .into_iter()
164            .map(Into::into)
165            .collect();
166        let mut manager = Self {
167            task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
168            heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
169            task_heartbeats: Default::default(),
170            compactor_map: Default::default(),
171        };
172        // Initialize heartbeat for existing tasks.
173        task_assignment.into_iter().for_each(|assignment| {
174            manager.initiate_task_heartbeat(CompactTask::from(assignment.compact_task.unwrap()));
175        });
176        Ok(manager)
177    }
178
179    /// Only used for unit test.
180    pub fn for_test() -> Self {
181        Self {
182            task_expired_seconds: 1,
183            heartbeat_expired_seconds: 1,
184            task_heartbeats: Default::default(),
185            compactor_map: Default::default(),
186        }
187    }
188
189    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
190        use rand::Rng;
191        if self.compactor_map.is_empty() {
192            return None;
193        }
194
195        let rand_index = rand::rng().random_range(0..self.compactor_map.len());
196        let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
197
198        Some(compactor)
199    }
200
201    /// Retrieve a receiver of tasks for the compactor identified by `context_id`. The sender should
202    /// be obtained by calling one of the compactor getters.
203    ///
204    ///  If `add_compactor` is called with the same `context_id` more than once, the only cause
205    /// would be compactor re-subscription, as `context_id` is a monotonically increasing
206    /// sequence.
207    pub fn add_compactor(
208        &mut self,
209        context_id: HummockContextId,
210    ) -> CompactorSubscribeStreamReceiver {
211        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
212
213        self.compactor_map
214            .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
215
216        tracing::info!(context_id = %context_id, "Added compactor session");
217
218        rx
219    }
220
221    /// Used when meta exiting to support graceful shutdown.
222    pub fn abort_all_compactors(&mut self) {
223        while let Some(compactor) = self.next_compactor() {
224            self.remove_compactor(compactor.context_id);
225        }
226    }
227
228    pub fn remove_compactor(&mut self, context_id: HummockContextId) {
229        if self.compactor_map.remove(&context_id).is_some() {
230            tracing::info!(context_id = %context_id, "Removed compactor session")
231        };
232    }
233
234    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
235        self.compactor_map.get(&context_id).cloned()
236    }
237
238    pub fn check_tasks_status(
239        &self,
240        tasks: &[HummockCompactionTaskId],
241        slow_task_duration: Duration,
242    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
243        let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
244        let mut ret = HashMap::default();
245        for TaskHeartbeat {
246            task, create_time, ..
247        } in self.task_heartbeats.values()
248        {
249            if !tasks_ids.contains(&task.task_id) {
250                continue;
251            }
252            let pending_time = create_time.elapsed();
253            if pending_time > slow_task_duration {
254                ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
255            } else {
256                ret.insert(task.task_id, (pending_time, TASK_NORMAL));
257            }
258        }
259
260        for task_id in tasks {
261            if !ret.contains_key(task_id) {
262                ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
263            }
264        }
265        ret
266    }
267
268    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
269        let heartbeat_expired_ts: u64 = SystemTime::now()
270            .duration_since(SystemTime::UNIX_EPOCH)
271            .expect("Clock may have gone backwards")
272            .as_secs()
273            - self.heartbeat_expired_seconds;
274        Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
275    }
276
277    fn get_heartbeat_expired_tasks_impl(
278        task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
279        heartbeat_expired_ts: u64,
280    ) -> Vec<CompactTask> {
281        let mut cancellable_tasks = vec![];
282        const MAX_TASK_DURATION_SEC: u64 = 2700;
283
284        for TaskHeartbeat {
285            expire_at,
286            task,
287            create_time,
288            num_ssts_sealed,
289            num_ssts_uploaded,
290            num_progress_key,
291            num_pending_read_io,
292            num_pending_write_io,
293            update_at,
294        } in task_heartbeats.values()
295        {
296            if *update_at < heartbeat_expired_ts {
297                cancellable_tasks.push(task.clone());
298            }
299
300            let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
301            if task_duration_too_long {
302                let compact_task_statistics = statistics_compact_task(task);
303                tracing::info!(
304                    "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
305                        pending_read_io_count {} pending_write_io_count {} target_level {} \
306                        base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
307                    task.compaction_group_id,
308                    task.task_id,
309                    create_time,
310                    expire_at,
311                    num_ssts_sealed,
312                    num_ssts_uploaded,
313                    num_progress_key,
314                    num_pending_read_io,
315                    num_pending_write_io,
316                    task.target_level,
317                    task.base_level,
318                    task.target_sub_level_id,
319                    task.task_type.as_str_name(),
320                    compact_task_statistics
321                );
322            }
323        }
324        cancellable_tasks
325    }
326
327    pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
328        let now = SystemTime::now()
329            .duration_since(SystemTime::UNIX_EPOCH)
330            .expect("Clock may have gone backwards")
331            .as_secs();
332        self.task_heartbeats.insert(
333            task.task_id,
334            TaskHeartbeat {
335                task,
336                num_ssts_sealed: 0,
337                num_ssts_uploaded: 0,
338                num_progress_key: 0,
339                num_pending_read_io: 0,
340                num_pending_write_io: 0,
341                create_time: Instant::now(),
342                expire_at: now + self.task_expired_seconds,
343                update_at: now,
344            },
345        );
346    }
347
348    pub fn remove_task_heartbeat(&mut self, task_id: u64) {
349        self.task_heartbeats.remove(&task_id).unwrap();
350    }
351
352    pub fn update_task_heartbeats(
353        &mut self,
354        progress_list: &Vec<CompactTaskProgress>,
355    ) -> Vec<CompactTask> {
356        let now = SystemTime::now()
357            .duration_since(SystemTime::UNIX_EPOCH)
358            .expect("Clock may have gone backwards")
359            .as_secs();
360        let mut cancel_tasks = vec![];
361        for progress in progress_list {
362            if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
363                task_ref.update_at = now;
364
365                if task_ref.num_ssts_sealed < progress.num_ssts_sealed
366                    || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
367                    || task_ref.num_progress_key < progress.num_progress_key
368                {
369                    // Refresh the expired of the task as it is showing progress.
370                    task_ref.expire_at = now + self.task_expired_seconds;
371                    task_ref.num_ssts_sealed = progress.num_ssts_sealed;
372                    task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
373                    task_ref.num_progress_key = progress.num_progress_key;
374                }
375                task_ref.num_pending_read_io = progress.num_pending_read_io;
376                task_ref.num_pending_write_io = progress.num_pending_write_io;
377
378                // timeout check
379                if task_ref.expire_at < now {
380                    // cancel
381                    cancel_tasks.push(task_ref.task.clone())
382                }
383            }
384        }
385
386        cancel_tasks
387    }
388
389    pub fn compactor_num(&self) -> usize {
390        self.compactor_map.len()
391    }
392
393    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
394        self.task_heartbeats
395            .values()
396            .map(|hb| CompactTaskProgress {
397                task_id: hb.task.task_id,
398                num_ssts_sealed: hb.num_ssts_sealed,
399                num_ssts_uploaded: hb.num_ssts_uploaded,
400                num_progress_key: hb.num_progress_key,
401                num_pending_read_io: hb.num_pending_read_io,
402                num_pending_write_io: hb.num_pending_write_io,
403                compaction_group_id: Some(hb.task.compaction_group_id),
404            })
405            .collect()
406    }
407}
408
409pub struct CompactorManager {
410    inner: Arc<RwLock<CompactorManagerInner>>,
411}
412
413impl CompactorManager {
414    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
415        let inner = CompactorManagerInner::with_meta(env).await?;
416
417        Ok(Self {
418            inner: Arc::new(RwLock::new(inner)),
419        })
420    }
421
422    /// Only used for unit test.
423    pub fn for_test() -> Self {
424        let inner = CompactorManagerInner::for_test();
425        Self {
426            inner: Arc::new(RwLock::new(inner)),
427        }
428    }
429
430    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
431        self.inner.read().next_compactor()
432    }
433
434    pub fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver {
435        self.inner.write().add_compactor(context_id)
436    }
437
438    pub fn abort_all_compactors(&self) {
439        self.inner.write().abort_all_compactors();
440    }
441
442    pub fn remove_compactor(&self, context_id: HummockContextId) {
443        self.inner.write().remove_compactor(context_id)
444    }
445
446    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
447        self.inner.read().get_compactor(context_id)
448    }
449
450    pub fn check_tasks_status(
451        &self,
452        tasks: &[HummockCompactionTaskId],
453        slow_task_duration: Duration,
454    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
455        self.inner
456            .read()
457            .check_tasks_status(tasks, slow_task_duration)
458    }
459
460    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
461        self.inner.read().get_heartbeat_expired_tasks()
462    }
463
464    pub fn initiate_task_heartbeat(&self, task: CompactTask) {
465        self.inner.write().initiate_task_heartbeat(task);
466    }
467
468    pub fn remove_task_heartbeat(&self, task_id: u64) {
469        self.inner.write().remove_task_heartbeat(task_id);
470    }
471
472    pub fn update_task_heartbeats(
473        &self,
474        progress_list: &Vec<CompactTaskProgress>,
475    ) -> Vec<CompactTask> {
476        self.inner.write().update_task_heartbeats(progress_list)
477    }
478
479    pub fn compactor_num(&self) -> usize {
480        self.inner.read().compactor_num()
481    }
482
483    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
484        self.inner.read().get_progress()
485    }
486}
487
488impl IcebergCompactor {
489    pub fn new(
490        context_id: HummockContextId,
491        sender: IcebergCompactorSubscribeStreamSender,
492    ) -> Self {
493        Self { context_id, sender }
494    }
495
496    pub fn context_id(&self) -> HummockContextId {
497        self.context_id
498    }
499
500    pub fn send_event(&self, event: IcebergCompactorSubscribeResponseEvent) -> MetaResult<()> {
501        fail_point!("iceberg_compaction_send_task_fail", |_| Err(
502            anyhow::anyhow!("iceberg_compaction_send_task_fail").into()
503        ));
504
505        self.sender
506            .send(Ok(SubscribeIcebergCompactionEventResponse {
507                create_at: SystemTime::now()
508                    .duration_since(SystemTime::UNIX_EPOCH)
509                    .expect("Clock may have gone backwards")
510                    .as_millis() as u64,
511                event: Some(event),
512            }))
513            .map_err(|e| anyhow::anyhow!(e))?;
514
515        Ok(())
516    }
517
518    pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
519        self.send_event(IcebergResponseEvent::CancelCompactTask(
520            IcebergCancelCompactTask { task_id },
521        ))
522    }
523}
524
525pub struct IcebergCompactorManagerInner {
526    pub compactor_map: HashMap<HummockContextId, Arc<IcebergCompactor>>,
527}
528
529pub struct IcebergCompactorManager {
530    inner: Arc<RwLock<IcebergCompactorManagerInner>>,
531}
532
533impl IcebergCompactorManager {
534    pub fn new() -> Self {
535        Self {
536            inner: Arc::new(RwLock::new(IcebergCompactorManagerInner {
537                compactor_map: HashMap::new(),
538            })),
539        }
540    }
541
542    pub fn add_compactor(
543        &self,
544        context_id: HummockContextId,
545    ) -> IcebergCompactorSubscribeStreamReceiver {
546        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
547        self.inner
548            .write()
549            .compactor_map
550            .insert(context_id, Arc::new(IcebergCompactor::new(context_id, tx)));
551        tracing::info!(context_id = %context_id, "Added iceberg compactor session");
552        rx
553    }
554
555    pub fn remove_compactor(&self, context_id: HummockContextId) {
556        if self
557            .inner
558            .write()
559            .compactor_map
560            .remove(&context_id)
561            .is_some()
562        {
563            tracing::info!(context_id = %context_id, "Removed iceberg compactor session");
564        }
565    }
566
567    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<IcebergCompactor>> {
568        self.inner.read().compactor_map.get(&context_id).cloned()
569    }
570
571    pub fn next_compactor(&self) -> Option<Arc<IcebergCompactor>> {
572        use rand::Rng;
573        let compactor_map = &self.inner.read().compactor_map;
574        if compactor_map.is_empty() {
575            return None;
576        }
577        let rand_index = rand::rng().random_range(0..compactor_map.len());
578        compactor_map.values().nth(rand_index).cloned()
579    }
580
581    pub fn compactor_num(&self) -> usize {
582        self.inner.read().compactor_map.len()
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use std::sync::Arc;
589    use std::time::Duration;
590
591    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
592    use risingwave_pb::hummock::CompactTaskProgress;
593    use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
594    use risingwave_rpc_client::HummockMetaClient;
595
596    use crate::hummock::compaction::selector::default_compaction_selector;
597    use crate::hummock::test_utils::{
598        add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
599    };
600    use crate::hummock::{CompactorManager, IcebergCompactorManager, MockHummockMetaClient};
601
602    #[tokio::test]
603    async fn test_compactor_manager() {
604        // Initialize metastore with task assignment.
605        let (env, context_id) = {
606            let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
607            let context_id = worker_id as _;
608            let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
609                MockHummockMetaClient::new(hummock_manager.clone(), context_id),
610            );
611            let compactor_manager = hummock_manager.compactor_manager.clone();
612            register_table_ids_to_compaction_group(
613                hummock_manager.as_ref(),
614                &[1],
615                StaticCompactionGroupId::StateDefault,
616            )
617            .await;
618            let _sst_infos =
619                add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
620            let _receiver = compactor_manager.add_compactor(context_id);
621            hummock_manager
622                .get_compact_task(
623                    StaticCompactionGroupId::StateDefault,
624                    &mut *default_compaction_selector(),
625                )
626                .await
627                .unwrap()
628                .unwrap();
629            (env, context_id)
630        };
631
632        // Restart. Set task_expired_seconds to 0 only to speed up test.
633        let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
634        // Because task assignment exists.
635        // Because compactor gRPC is not established yet.
636        assert_eq!(compactor_manager.compactor_num(), 0);
637        assert!(compactor_manager.get_compactor(context_id).is_none());
638
639        // Ensure task is expired.
640        tokio::time::sleep(Duration::from_secs(2)).await;
641        let expired = compactor_manager.get_heartbeat_expired_tasks();
642        assert_eq!(expired.len(), 1);
643
644        // Mimic no-op compaction heartbeat
645        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
646
647        // Mimic compaction heartbeat with invalid task id
648        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
649            task_id: expired[0].task_id + 1,
650            num_ssts_sealed: 1,
651            num_ssts_uploaded: 1,
652            num_progress_key: 100,
653            ..Default::default()
654        }]);
655        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
656
657        // Mimic effective compaction heartbeat
658        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
659            task_id: expired[0].task_id,
660            num_ssts_sealed: 1,
661            num_ssts_uploaded: 1,
662            num_progress_key: 100,
663            ..Default::default()
664        }]);
665        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
666
667        // Test add
668        assert_eq!(compactor_manager.compactor_num(), 0);
669        assert!(compactor_manager.get_compactor(context_id).is_none());
670        compactor_manager.add_compactor(context_id);
671        assert_eq!(compactor_manager.compactor_num(), 1);
672        assert_eq!(
673            compactor_manager
674                .get_compactor(context_id)
675                .unwrap()
676                .context_id(),
677            context_id
678        );
679        // Test remove
680        compactor_manager.remove_compactor(context_id);
681        assert_eq!(compactor_manager.compactor_num(), 0);
682        assert!(compactor_manager.get_compactor(context_id).is_none());
683    }
684
685    #[test]
686    fn test_iceberg_compactor_manager() {
687        // Test Add and Remove Iceberg Compactor
688        let iceberg_context_id = 1000.into();
689        let iceberg_compactor_manager = IcebergCompactorManager::new();
690        assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
691        assert!(
692            iceberg_compactor_manager
693                .get_compactor(iceberg_context_id)
694                .is_none()
695        );
696        let mut receiver = iceberg_compactor_manager.add_compactor(iceberg_context_id);
697        assert_eq!(iceberg_compactor_manager.compactor_num(), 1);
698        let compactor = iceberg_compactor_manager
699            .get_compactor(iceberg_context_id)
700            .unwrap();
701        assert_eq!(compactor.context_id(), iceberg_context_id);
702
703        compactor.cancel_task(42).unwrap();
704        let event = receiver.try_recv().unwrap().unwrap().event.unwrap();
705        match event {
706            IcebergResponseEvent::CancelCompactTask(cancel_task) => {
707                assert_eq!(cancel_task.task_id, 42);
708            }
709            other => panic!("unexpected iceberg compactor event: {other:?}"),
710        }
711
712        // Test remove
713        iceberg_compactor_manager.remove_compactor(iceberg_context_id);
714        assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
715        assert!(
716            iceberg_compactor_manager
717                .get_compactor(iceberg_context_id)
718                .is_none()
719        );
720    }
721}