risingwave_meta/hummock/
compactor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26    CancelCompactTask, CompactTaskAssignment, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventResponse;
29use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31
32use crate::MetaResult;
33use crate::manager::MetaSrvEnv;
34use crate::model::MetadataModelError;
35
36pub type CompactorManagerRef = Arc<CompactorManager>;
37pub type IcebergCompactorManagerRef = Arc<IcebergCompactorManager>;
38
39pub const TASK_RUN_TOO_LONG: &str = "running too long";
40pub const TASK_NOT_FOUND: &str = "task not found";
41pub const TASK_NORMAL: &str = "task is normal, please wait some time";
42
43type CompactorSubscribeStreamSender = UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>;
44type CompactorSubscribeStreamReceiver =
45    UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>>;
46
47type IcebergCompactorSubscribeStreamSender =
48    UnboundedSender<MetaResult<SubscribeIcebergCompactionEventResponse>>;
49type IcebergCompactorSubscribeStreamReceiver =
50    UnboundedReceiver<MetaResult<SubscribeIcebergCompactionEventResponse>>;
51
52type CompactorSubscribeResponseEvent = ResponseEvent;
53
54type IcebergCompactorSubscribeResponseEvent = IcebergResponseEvent;
55
56/// Wraps the stream between meta node and compactor node.
57/// Compactor node will re-establish the stream when the previous one fails.
58pub struct Compactor {
59    context_id: HummockContextId,
60    sender: CompactorSubscribeStreamSender,
61}
62
63pub struct IcebergCompactor {
64    context_id: HummockContextId,
65    sender: IcebergCompactorSubscribeStreamSender,
66}
67
68struct TaskHeartbeat {
69    task: CompactTask,
70    num_ssts_sealed: u32,
71    num_ssts_uploaded: u32,
72    num_progress_key: u64,
73    num_pending_read_io: u64,
74    num_pending_write_io: u64,
75    create_time: Instant,
76    expire_at: u64,
77
78    update_at: u64,
79}
80
81impl Compactor {
82    pub fn new(context_id: HummockContextId, sender: CompactorSubscribeStreamSender) -> Self {
83        Self { context_id, sender }
84    }
85
86    pub fn send_event(&self, event: CompactorSubscribeResponseEvent) -> MetaResult<()> {
87        fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
88            "compaction_send_task_fail"
89        )
90        .into()));
91
92        self.sender
93            .send(Ok(SubscribeCompactionEventResponse {
94                create_at: SystemTime::now()
95                    .duration_since(SystemTime::UNIX_EPOCH)
96                    .expect("Clock may have gone backwards")
97                    .as_millis() as u64,
98                event: Some(event),
99            }))
100            .map_err(|e| anyhow::anyhow!(e))?;
101
102        Ok(())
103    }
104
105    pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
106        self.send_event(ResponseEvent::CancelCompactTask(CancelCompactTask {
107            context_id: self.context_id,
108            task_id,
109        }))
110    }
111
112    pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
113        for task_id in task_ids {
114            self.cancel_task(*task_id)?;
115        }
116        Ok(())
117    }
118
119    pub fn context_id(&self) -> HummockContextId {
120        self.context_id
121    }
122}
123
124pub trait CompactorManagerTrait {
125    fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver;
126    fn remove_compactor(&self, context_id: HummockContextId);
127    fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>>;
128    fn next_compactor(&self) -> Option<Arc<Compactor>>;
129    fn compactor_num(&self) -> usize;
130}
131
132/// `CompactorManagerInner` maintains compactors which can process compact task.
133/// A compact task is tracked in `HummockManager::Compaction` via both `CompactStatus` and
134/// `CompactTaskAssignment`.
135///
136/// A compact task can be in one of these states:
137/// 1. Success: an assigned task is reported as success via `CompactStatus::report_compact_task`.
138///    It's the final state.
139/// 2. Failed: an Failed task is reported as success via `CompactStatus::report_compact_task`.
140///    It's the final state.
141/// 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's
142///    the final state.
143pub struct CompactorManagerInner {
144    pub task_expired_seconds: u64,
145    pub heartbeat_expired_seconds: u64,
146    task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
147
148    /// The outer lock is a `RwLock`, so we should still be able to modify each compactor
149    pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
150}
151
152impl CompactorManagerInner {
153    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
154        use risingwave_meta_model::compaction_task;
155        use sea_orm::EntityTrait;
156        // Retrieve the existing task assignments from metastore.
157        let task_assignment: Vec<CompactTaskAssignment> = compaction_task::Entity::find()
158            .all(&env.meta_store_ref().conn)
159            .await
160            .map_err(MetadataModelError::from)?
161            .into_iter()
162            .map(Into::into)
163            .collect();
164        let mut manager = Self {
165            task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
166            heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
167            task_heartbeats: Default::default(),
168            compactor_map: Default::default(),
169        };
170        // Initialize heartbeat for existing tasks.
171        task_assignment.into_iter().for_each(|assignment| {
172            manager.initiate_task_heartbeat(CompactTask::from(assignment.compact_task.unwrap()));
173        });
174        Ok(manager)
175    }
176
177    /// Only used for unit test.
178    pub fn for_test() -> Self {
179        Self {
180            task_expired_seconds: 1,
181            heartbeat_expired_seconds: 1,
182            task_heartbeats: Default::default(),
183            compactor_map: Default::default(),
184        }
185    }
186
187    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
188        use rand::Rng;
189        if self.compactor_map.is_empty() {
190            return None;
191        }
192
193        let rand_index = rand::rng().random_range(0..self.compactor_map.len());
194        let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
195
196        Some(compactor)
197    }
198
199    /// Retrieve a receiver of tasks for the compactor identified by `context_id`. The sender should
200    /// be obtained by calling one of the compactor getters.
201    ///
202    ///  If `add_compactor` is called with the same `context_id` more than once, the only cause
203    /// would be compactor re-subscription, as `context_id` is a monotonically increasing
204    /// sequence.
205    pub fn add_compactor(
206        &mut self,
207        context_id: HummockContextId,
208    ) -> CompactorSubscribeStreamReceiver {
209        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
210
211        self.compactor_map
212            .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
213
214        tracing::info!(context_id = context_id, "Added compactor session");
215
216        rx
217    }
218
219    /// Used when meta exiting to support graceful shutdown.
220    pub fn abort_all_compactors(&mut self) {
221        while let Some(compactor) = self.next_compactor() {
222            self.remove_compactor(compactor.context_id);
223        }
224    }
225
226    pub fn remove_compactor(&mut self, context_id: HummockContextId) {
227        if self.compactor_map.remove(&context_id).is_some() {
228            tracing::info!(context_id = context_id, "Removed compactor session")
229        };
230    }
231
232    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
233        self.compactor_map.get(&context_id).cloned()
234    }
235
236    pub fn check_tasks_status(
237        &self,
238        tasks: &[HummockCompactionTaskId],
239        slow_task_duration: Duration,
240    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
241        let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
242        let mut ret = HashMap::default();
243        for TaskHeartbeat {
244            task, create_time, ..
245        } in self.task_heartbeats.values()
246        {
247            if !tasks_ids.contains(&task.task_id) {
248                continue;
249            }
250            let pending_time = create_time.elapsed();
251            if pending_time > slow_task_duration {
252                ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
253            } else {
254                ret.insert(task.task_id, (pending_time, TASK_NORMAL));
255            }
256        }
257
258        for task_id in tasks {
259            if !ret.contains_key(task_id) {
260                ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
261            }
262        }
263        ret
264    }
265
266    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
267        let heartbeat_expired_ts: u64 = SystemTime::now()
268            .duration_since(SystemTime::UNIX_EPOCH)
269            .expect("Clock may have gone backwards")
270            .as_secs()
271            - self.heartbeat_expired_seconds;
272        Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
273    }
274
275    fn get_heartbeat_expired_tasks_impl(
276        task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
277        heartbeat_expired_ts: u64,
278    ) -> Vec<CompactTask> {
279        let mut cancellable_tasks = vec![];
280        const MAX_TASK_DURATION_SEC: u64 = 2700;
281
282        for TaskHeartbeat {
283            expire_at,
284            task,
285            create_time,
286            num_ssts_sealed,
287            num_ssts_uploaded,
288            num_progress_key,
289            num_pending_read_io,
290            num_pending_write_io,
291            update_at,
292        } in task_heartbeats.values()
293        {
294            if *update_at < heartbeat_expired_ts {
295                cancellable_tasks.push(task.clone());
296            }
297
298            let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
299            if task_duration_too_long {
300                let compact_task_statistics = statistics_compact_task(task);
301                tracing::info!(
302                    "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
303                        pending_read_io_count {} pending_write_io_count {} target_level {} \
304                        base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
305                    task.compaction_group_id,
306                    task.task_id,
307                    create_time,
308                    expire_at,
309                    num_ssts_sealed,
310                    num_ssts_uploaded,
311                    num_progress_key,
312                    num_pending_read_io,
313                    num_pending_write_io,
314                    task.target_level,
315                    task.base_level,
316                    task.target_sub_level_id,
317                    task.task_type.as_str_name(),
318                    compact_task_statistics
319                );
320            }
321        }
322        cancellable_tasks
323    }
324
325    pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
326        let now = SystemTime::now()
327            .duration_since(SystemTime::UNIX_EPOCH)
328            .expect("Clock may have gone backwards")
329            .as_secs();
330        self.task_heartbeats.insert(
331            task.task_id,
332            TaskHeartbeat {
333                task,
334                num_ssts_sealed: 0,
335                num_ssts_uploaded: 0,
336                num_progress_key: 0,
337                num_pending_read_io: 0,
338                num_pending_write_io: 0,
339                create_time: Instant::now(),
340                expire_at: now + self.task_expired_seconds,
341                update_at: now,
342            },
343        );
344    }
345
346    pub fn remove_task_heartbeat(&mut self, task_id: u64) {
347        self.task_heartbeats.remove(&task_id).unwrap();
348    }
349
350    pub fn update_task_heartbeats(
351        &mut self,
352        progress_list: &Vec<CompactTaskProgress>,
353    ) -> Vec<CompactTask> {
354        let now = SystemTime::now()
355            .duration_since(SystemTime::UNIX_EPOCH)
356            .expect("Clock may have gone backwards")
357            .as_secs();
358        let mut cancel_tasks = vec![];
359        for progress in progress_list {
360            if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
361                task_ref.update_at = now;
362
363                if task_ref.num_ssts_sealed < progress.num_ssts_sealed
364                    || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
365                    || task_ref.num_progress_key < progress.num_progress_key
366                {
367                    // Refresh the expired of the task as it is showing progress.
368                    task_ref.expire_at = now + self.task_expired_seconds;
369                    task_ref.num_ssts_sealed = progress.num_ssts_sealed;
370                    task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
371                    task_ref.num_progress_key = progress.num_progress_key;
372                }
373                task_ref.num_pending_read_io = progress.num_pending_read_io;
374                task_ref.num_pending_write_io = progress.num_pending_write_io;
375
376                // timeout check
377                if task_ref.expire_at < now {
378                    // cancel
379                    cancel_tasks.push(task_ref.task.clone())
380                }
381            }
382        }
383
384        cancel_tasks
385    }
386
387    pub fn compactor_num(&self) -> usize {
388        self.compactor_map.len()
389    }
390
391    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
392        self.task_heartbeats
393            .values()
394            .map(|hb| CompactTaskProgress {
395                task_id: hb.task.task_id,
396                num_ssts_sealed: hb.num_ssts_sealed,
397                num_ssts_uploaded: hb.num_ssts_uploaded,
398                num_progress_key: hb.num_progress_key,
399                num_pending_read_io: hb.num_pending_read_io,
400                num_pending_write_io: hb.num_pending_write_io,
401                compaction_group_id: Some(hb.task.compaction_group_id),
402            })
403            .collect()
404    }
405}
406
407pub struct CompactorManager {
408    inner: Arc<RwLock<CompactorManagerInner>>,
409}
410
411impl CompactorManager {
412    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
413        let inner = CompactorManagerInner::with_meta(env).await?;
414
415        Ok(Self {
416            inner: Arc::new(RwLock::new(inner)),
417        })
418    }
419
420    /// Only used for unit test.
421    pub fn for_test() -> Self {
422        let inner = CompactorManagerInner::for_test();
423        Self {
424            inner: Arc::new(RwLock::new(inner)),
425        }
426    }
427
428    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
429        self.inner.read().next_compactor()
430    }
431
432    pub fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver {
433        self.inner.write().add_compactor(context_id)
434    }
435
436    pub fn abort_all_compactors(&self) {
437        self.inner.write().abort_all_compactors();
438    }
439
440    pub fn remove_compactor(&self, context_id: HummockContextId) {
441        self.inner.write().remove_compactor(context_id)
442    }
443
444    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
445        self.inner.read().get_compactor(context_id)
446    }
447
448    pub fn check_tasks_status(
449        &self,
450        tasks: &[HummockCompactionTaskId],
451        slow_task_duration: Duration,
452    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
453        self.inner
454            .read()
455            .check_tasks_status(tasks, slow_task_duration)
456    }
457
458    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
459        self.inner.read().get_heartbeat_expired_tasks()
460    }
461
462    pub fn initiate_task_heartbeat(&self, task: CompactTask) {
463        self.inner.write().initiate_task_heartbeat(task);
464    }
465
466    pub fn remove_task_heartbeat(&self, task_id: u64) {
467        self.inner.write().remove_task_heartbeat(task_id);
468    }
469
470    pub fn update_task_heartbeats(
471        &self,
472        progress_list: &Vec<CompactTaskProgress>,
473    ) -> Vec<CompactTask> {
474        self.inner.write().update_task_heartbeats(progress_list)
475    }
476
477    pub fn compactor_num(&self) -> usize {
478        self.inner.read().compactor_num()
479    }
480
481    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
482        self.inner.read().get_progress()
483    }
484}
485
486impl IcebergCompactor {
487    pub fn new(
488        context_id: HummockContextId,
489        sender: IcebergCompactorSubscribeStreamSender,
490    ) -> Self {
491        Self { context_id, sender }
492    }
493
494    pub fn context_id(&self) -> HummockContextId {
495        self.context_id
496    }
497
498    pub fn send_event(&self, event: IcebergCompactorSubscribeResponseEvent) -> MetaResult<()> {
499        fail_point!("iceberg_compaction_send_task_fail", |_| Err(
500            anyhow::anyhow!("iceberg_compaction_send_task_fail").into()
501        ));
502
503        self.sender
504            .send(Ok(SubscribeIcebergCompactionEventResponse {
505                create_at: SystemTime::now()
506                    .duration_since(SystemTime::UNIX_EPOCH)
507                    .expect("Clock may have gone backwards")
508                    .as_millis() as u64,
509                event: Some(event),
510            }))
511            .map_err(|e| anyhow::anyhow!(e))?;
512
513        Ok(())
514    }
515}
516
517pub struct IcebergCompactorManagerInner {
518    pub compactor_map: HashMap<HummockContextId, Arc<IcebergCompactor>>,
519}
520
521pub struct IcebergCompactorManager {
522    inner: Arc<RwLock<IcebergCompactorManagerInner>>,
523}
524
525impl IcebergCompactorManager {
526    pub fn new() -> Self {
527        Self {
528            inner: Arc::new(RwLock::new(IcebergCompactorManagerInner {
529                compactor_map: HashMap::new(),
530            })),
531        }
532    }
533
534    pub fn add_compactor(
535        &self,
536        context_id: HummockContextId,
537    ) -> IcebergCompactorSubscribeStreamReceiver {
538        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
539        self.inner
540            .write()
541            .compactor_map
542            .insert(context_id, Arc::new(IcebergCompactor::new(context_id, tx)));
543        tracing::info!(context_id = context_id, "Added iceberg compactor session");
544        rx
545    }
546
547    pub fn remove_compactor(&self, context_id: HummockContextId) {
548        if self
549            .inner
550            .write()
551            .compactor_map
552            .remove(&context_id)
553            .is_some()
554        {
555            tracing::info!(context_id = context_id, "Removed iceberg compactor session");
556        }
557    }
558
559    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<IcebergCompactor>> {
560        self.inner.read().compactor_map.get(&context_id).cloned()
561    }
562
563    pub fn next_compactor(&self) -> Option<Arc<IcebergCompactor>> {
564        use rand::Rng;
565        let compactor_map = &self.inner.read().compactor_map;
566        if compactor_map.is_empty() {
567            return None;
568        }
569        let rand_index = rand::rng().random_range(0..compactor_map.len());
570        compactor_map.values().nth(rand_index).cloned()
571    }
572
573    pub fn compactor_num(&self) -> usize {
574        self.inner.read().compactor_map.len()
575    }
576}
577
578#[cfg(test)]
579mod tests {
580    use std::sync::Arc;
581    use std::time::Duration;
582
583    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
584    use risingwave_pb::hummock::CompactTaskProgress;
585    use risingwave_rpc_client::HummockMetaClient;
586
587    use crate::hummock::compaction::selector::default_compaction_selector;
588    use crate::hummock::test_utils::{
589        add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
590    };
591    use crate::hummock::{CompactorManager, IcebergCompactorManager, MockHummockMetaClient};
592
593    #[tokio::test]
594    async fn test_compactor_manager() {
595        // Initialize metastore with task assignment.
596        let (env, context_id) = {
597            let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
598            let context_id = worker_id as _;
599            let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
600                MockHummockMetaClient::new(hummock_manager.clone(), context_id),
601            );
602            let compactor_manager = hummock_manager.compactor_manager.clone();
603            register_table_ids_to_compaction_group(
604                hummock_manager.as_ref(),
605                &[1],
606                StaticCompactionGroupId::StateDefault.into(),
607            )
608            .await;
609            let _sst_infos =
610                add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
611            let _receiver = compactor_manager.add_compactor(context_id);
612            hummock_manager
613                .get_compact_task(
614                    StaticCompactionGroupId::StateDefault.into(),
615                    &mut default_compaction_selector(),
616                )
617                .await
618                .unwrap()
619                .unwrap();
620            (env, context_id)
621        };
622
623        // Restart. Set task_expired_seconds to 0 only to speed up test.
624        let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
625        // Because task assignment exists.
626        // Because compactor gRPC is not established yet.
627        assert_eq!(compactor_manager.compactor_num(), 0);
628        assert!(compactor_manager.get_compactor(context_id).is_none());
629
630        // Ensure task is expired.
631        tokio::time::sleep(Duration::from_secs(2)).await;
632        let expired = compactor_manager.get_heartbeat_expired_tasks();
633        assert_eq!(expired.len(), 1);
634
635        // Mimic no-op compaction heartbeat
636        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
637
638        // Mimic compaction heartbeat with invalid task id
639        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
640            task_id: expired[0].task_id + 1,
641            num_ssts_sealed: 1,
642            num_ssts_uploaded: 1,
643            num_progress_key: 100,
644            ..Default::default()
645        }]);
646        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
647
648        // Mimic effective compaction heartbeat
649        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
650            task_id: expired[0].task_id,
651            num_ssts_sealed: 1,
652            num_ssts_uploaded: 1,
653            num_progress_key: 100,
654            ..Default::default()
655        }]);
656        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
657
658        // Test add
659        assert_eq!(compactor_manager.compactor_num(), 0);
660        assert!(compactor_manager.get_compactor(context_id).is_none());
661        compactor_manager.add_compactor(context_id);
662        assert_eq!(compactor_manager.compactor_num(), 1);
663        assert_eq!(
664            compactor_manager
665                .get_compactor(context_id)
666                .unwrap()
667                .context_id(),
668            context_id
669        );
670        // Test remove
671        compactor_manager.remove_compactor(context_id);
672        assert_eq!(compactor_manager.compactor_num(), 0);
673        assert!(compactor_manager.get_compactor(context_id).is_none());
674    }
675
676    #[test]
677    fn test_iceberg_compactor_manager() {
678        // Test Add and Remove Iceberg Compactor
679        let iceberg_context_id = 1000;
680        let iceberg_compactor_manager = IcebergCompactorManager::new();
681        assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
682        assert!(
683            iceberg_compactor_manager
684                .get_compactor(iceberg_context_id)
685                .is_none()
686        );
687        iceberg_compactor_manager.add_compactor(iceberg_context_id);
688        assert_eq!(iceberg_compactor_manager.compactor_num(), 1);
689        assert_eq!(
690            iceberg_compactor_manager
691                .get_compactor(iceberg_context_id)
692                .unwrap()
693                .context_id(),
694            iceberg_context_id
695        );
696        // Test remove
697        iceberg_compactor_manager.remove_compactor(iceberg_context_id);
698        assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
699        assert!(
700            iceberg_compactor_manager
701                .get_compactor(iceberg_context_id)
702                .is_none()
703        );
704    }
705}