risingwave_meta/hummock/
compactor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26    CancelCompactTask, CompactTaskAssignment, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
29
30use crate::MetaResult;
31use crate::manager::MetaSrvEnv;
32use crate::model::MetadataModelError;
33
34pub type CompactorManagerRef = Arc<CompactorManager>;
35
36pub const TASK_RUN_TOO_LONG: &str = "running too long";
37pub const TASK_NOT_FOUND: &str = "task not found";
38pub const TASK_NORMAL: &str = "task is normal, please wait some time";
39
40/// Wraps the stream between meta node and compactor node.
41/// Compactor node will re-establish the stream when the previous one fails.
42#[derive(Debug)]
43pub struct Compactor {
44    context_id: HummockContextId,
45    sender: UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>,
46}
47
48struct TaskHeartbeat {
49    task: CompactTask,
50    num_ssts_sealed: u32,
51    num_ssts_uploaded: u32,
52    num_progress_key: u64,
53    num_pending_read_io: u64,
54    num_pending_write_io: u64,
55    create_time: Instant,
56    expire_at: u64,
57
58    update_at: u64,
59}
60
61impl Compactor {
62    pub fn new(
63        context_id: HummockContextId,
64        sender: UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>,
65    ) -> Self {
66        Self { context_id, sender }
67    }
68
69    pub fn send_event(&self, event: ResponseEvent) -> MetaResult<()> {
70        fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
71            "compaction_send_task_fail"
72        )
73        .into()));
74
75        self.sender
76            .send(Ok(SubscribeCompactionEventResponse {
77                event: Some(event),
78                create_at: SystemTime::now()
79                    .duration_since(std::time::UNIX_EPOCH)
80                    .expect("Clock may have gone backwards")
81                    .as_millis() as u64,
82            }))
83            .map_err(|e| anyhow::anyhow!(e))?;
84
85        Ok(())
86    }
87
88    pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
89        self.sender
90            .send(Ok(SubscribeCompactionEventResponse {
91                event: Some(ResponseEvent::CancelCompactTask(CancelCompactTask {
92                    context_id: self.context_id,
93                    task_id,
94                })),
95                create_at: SystemTime::now()
96                    .duration_since(std::time::UNIX_EPOCH)
97                    .expect("Clock may have gone backwards")
98                    .as_millis() as u64,
99            }))
100            .map_err(|e| anyhow::anyhow!(e))?;
101        Ok(())
102    }
103
104    pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
105        for task_id in task_ids {
106            self.cancel_task(*task_id)?;
107        }
108        Ok(())
109    }
110
111    pub fn context_id(&self) -> HummockContextId {
112        self.context_id
113    }
114}
115
116/// `CompactorManagerInner` maintains compactors which can process compact task.
117/// A compact task is tracked in `HummockManager::Compaction` via both `CompactStatus` and
118/// `CompactTaskAssignment`.
119///
120/// A compact task can be in one of these states:
121/// 1. Success: an assigned task is reported as success via `CompactStatus::report_compact_task`.
122///    It's the final state.
123/// 2. Failed: an Failed task is reported as success via `CompactStatus::report_compact_task`.
124///    It's the final state.
125/// 3. Cancelled: a task is reported as cancelled via `CompactStatus::report_compact_task`. It's
126///    the final state.
127pub struct CompactorManagerInner {
128    pub task_expired_seconds: u64,
129    pub heartbeat_expired_seconds: u64,
130    task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
131
132    /// The outer lock is a `RwLock`, so we should still be able to modify each compactor
133    pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
134}
135
136impl CompactorManagerInner {
137    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
138        use risingwave_meta_model::compaction_task;
139        use sea_orm::EntityTrait;
140        // Retrieve the existing task assignments from metastore.
141        let task_assignment: Vec<CompactTaskAssignment> = compaction_task::Entity::find()
142            .all(&env.meta_store_ref().conn)
143            .await
144            .map_err(MetadataModelError::from)?
145            .into_iter()
146            .map(Into::into)
147            .collect();
148        let mut manager = Self {
149            task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
150            heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
151            task_heartbeats: Default::default(),
152            compactor_map: Default::default(),
153        };
154        // Initialize heartbeat for existing tasks.
155        task_assignment.into_iter().for_each(|assignment| {
156            manager.initiate_task_heartbeat(CompactTask::from(assignment.compact_task.unwrap()));
157        });
158        Ok(manager)
159    }
160
161    /// Only used for unit test.
162    pub fn for_test() -> Self {
163        Self {
164            task_expired_seconds: 1,
165            heartbeat_expired_seconds: 1,
166            task_heartbeats: Default::default(),
167            compactor_map: Default::default(),
168        }
169    }
170
171    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
172        use rand::Rng;
173
174        if self.compactor_map.is_empty() {
175            return None;
176        }
177
178        let rand_index = rand::rng().random_range(0..self.compactor_map.len());
179        let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
180
181        Some(compactor)
182    }
183
184    /// Retrieve a receiver of tasks for the compactor identified by `context_id`. The sender should
185    /// be obtained by calling one of the compactor getters.
186    ///
187    ///  If `add_compactor` is called with the same `context_id` more than once, the only cause
188    /// would be compactor re-subscription, as `context_id` is a monotonically increasing
189    /// sequence.
190    pub fn add_compactor(
191        &mut self,
192        context_id: HummockContextId,
193    ) -> UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>> {
194        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
195        self.compactor_map
196            .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
197
198        tracing::info!("Added compactor session {}", context_id);
199        rx
200    }
201
202    /// Used when meta exiting to support graceful shutdown.
203    pub fn abort_all_compactors(&mut self) {
204        while let Some(compactor) = self.next_compactor() {
205            self.remove_compactor(compactor.context_id);
206        }
207    }
208
209    pub fn remove_compactor(&mut self, context_id: HummockContextId) {
210        self.compactor_map.remove(&context_id);
211
212        // To remove the heartbeats, they need to be forcefully purged,
213        // which is only safe when the context has been completely removed from meta.
214        tracing::info!("Removed compactor session {}", context_id);
215    }
216
217    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
218        self.compactor_map.get(&context_id).cloned()
219    }
220
221    pub fn check_tasks_status(
222        &self,
223        tasks: &[HummockCompactionTaskId],
224        slow_task_duration: Duration,
225    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
226        let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
227        let mut ret = HashMap::default();
228        for TaskHeartbeat {
229            task, create_time, ..
230        } in self.task_heartbeats.values()
231        {
232            if !tasks_ids.contains(&task.task_id) {
233                continue;
234            }
235            let pending_time = create_time.elapsed();
236            if pending_time > slow_task_duration {
237                ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
238            } else {
239                ret.insert(task.task_id, (pending_time, TASK_NORMAL));
240            }
241        }
242
243        for task_id in tasks {
244            if !ret.contains_key(task_id) {
245                ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
246            }
247        }
248        ret
249    }
250
251    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
252        let heartbeat_expired_ts: u64 = SystemTime::now()
253            .duration_since(SystemTime::UNIX_EPOCH)
254            .expect("Clock may have gone backwards")
255            .as_secs()
256            - self.heartbeat_expired_seconds;
257        Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
258    }
259
260    fn get_heartbeat_expired_tasks_impl(
261        task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
262        heartbeat_expired_ts: u64,
263    ) -> Vec<CompactTask> {
264        let mut cancellable_tasks = vec![];
265        const MAX_TASK_DURATION_SEC: u64 = 2700;
266
267        for TaskHeartbeat {
268            expire_at,
269            task,
270            create_time,
271            num_ssts_sealed,
272            num_ssts_uploaded,
273            num_progress_key,
274            num_pending_read_io,
275            num_pending_write_io,
276            update_at,
277        } in task_heartbeats.values()
278        {
279            if *update_at < heartbeat_expired_ts {
280                cancellable_tasks.push(task.clone());
281            }
282
283            let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
284            if task_duration_too_long {
285                let compact_task_statistics = statistics_compact_task(task);
286                tracing::info!(
287                    "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
288                        pending_read_io_count {} pending_write_io_count {} target_level {} \
289                        base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
290                    task.compaction_group_id,
291                    task.task_id,
292                    create_time,
293                    expire_at,
294                    num_ssts_sealed,
295                    num_ssts_uploaded,
296                    num_progress_key,
297                    num_pending_read_io,
298                    num_pending_write_io,
299                    task.target_level,
300                    task.base_level,
301                    task.target_sub_level_id,
302                    task.task_type.as_str_name(),
303                    compact_task_statistics
304                );
305            }
306        }
307        cancellable_tasks
308    }
309
310    pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
311        let now = SystemTime::now()
312            .duration_since(SystemTime::UNIX_EPOCH)
313            .expect("Clock may have gone backwards")
314            .as_secs();
315        self.task_heartbeats.insert(
316            task.task_id,
317            TaskHeartbeat {
318                task,
319                num_ssts_sealed: 0,
320                num_ssts_uploaded: 0,
321                num_progress_key: 0,
322                num_pending_read_io: 0,
323                num_pending_write_io: 0,
324                create_time: Instant::now(),
325                expire_at: now + self.task_expired_seconds,
326                update_at: now,
327            },
328        );
329    }
330
331    pub fn remove_task_heartbeat(&mut self, task_id: u64) {
332        self.task_heartbeats.remove(&task_id).unwrap();
333    }
334
335    pub fn update_task_heartbeats(
336        &mut self,
337        progress_list: &Vec<CompactTaskProgress>,
338    ) -> Vec<CompactTask> {
339        let now = SystemTime::now()
340            .duration_since(SystemTime::UNIX_EPOCH)
341            .expect("Clock may have gone backwards")
342            .as_secs();
343        let mut cancel_tasks = vec![];
344        for progress in progress_list {
345            if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
346                task_ref.update_at = now;
347
348                if task_ref.num_ssts_sealed < progress.num_ssts_sealed
349                    || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
350                    || task_ref.num_progress_key < progress.num_progress_key
351                {
352                    // Refresh the expired of the task as it is showing progress.
353                    task_ref.expire_at = now + self.task_expired_seconds;
354                    task_ref.num_ssts_sealed = progress.num_ssts_sealed;
355                    task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
356                    task_ref.num_progress_key = progress.num_progress_key;
357                }
358                task_ref.num_pending_read_io = progress.num_pending_read_io;
359                task_ref.num_pending_write_io = progress.num_pending_write_io;
360
361                // timeout check
362                if task_ref.expire_at < now {
363                    // cancel
364                    cancel_tasks.push(task_ref.task.clone())
365                }
366            }
367        }
368
369        cancel_tasks
370    }
371
372    pub fn compactor_num(&self) -> usize {
373        self.compactor_map.len()
374    }
375
376    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
377        self.task_heartbeats
378            .values()
379            .map(|hb| CompactTaskProgress {
380                task_id: hb.task.task_id,
381                num_ssts_sealed: hb.num_ssts_sealed,
382                num_ssts_uploaded: hb.num_ssts_uploaded,
383                num_progress_key: hb.num_progress_key,
384                num_pending_read_io: hb.num_pending_read_io,
385                num_pending_write_io: hb.num_pending_write_io,
386                compaction_group_id: Some(hb.task.compaction_group_id),
387            })
388            .collect()
389    }
390}
391
392pub struct CompactorManager {
393    inner: Arc<RwLock<CompactorManagerInner>>,
394}
395
396impl CompactorManager {
397    pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
398        let inner = CompactorManagerInner::with_meta(env).await?;
399
400        Ok(Self {
401            inner: Arc::new(RwLock::new(inner)),
402        })
403    }
404
405    /// Only used for unit test.
406    pub fn for_test() -> Self {
407        let inner = CompactorManagerInner::for_test();
408        Self {
409            inner: Arc::new(RwLock::new(inner)),
410        }
411    }
412
413    pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
414        self.inner.read().next_compactor()
415    }
416
417    pub fn add_compactor(
418        &self,
419        context_id: HummockContextId,
420    ) -> UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>> {
421        self.inner.write().add_compactor(context_id)
422    }
423
424    pub fn abort_all_compactors(&self) {
425        self.inner.write().abort_all_compactors();
426    }
427
428    pub fn remove_compactor(&self, context_id: HummockContextId) {
429        self.inner.write().remove_compactor(context_id)
430    }
431
432    pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
433        self.inner.read().get_compactor(context_id)
434    }
435
436    pub fn check_tasks_status(
437        &self,
438        tasks: &[HummockCompactionTaskId],
439        slow_task_duration: Duration,
440    ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
441        self.inner
442            .read()
443            .check_tasks_status(tasks, slow_task_duration)
444    }
445
446    pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
447        self.inner.read().get_heartbeat_expired_tasks()
448    }
449
450    pub fn initiate_task_heartbeat(&self, task: CompactTask) {
451        self.inner.write().initiate_task_heartbeat(task);
452    }
453
454    pub fn remove_task_heartbeat(&self, task_id: u64) {
455        self.inner.write().remove_task_heartbeat(task_id);
456    }
457
458    pub fn update_task_heartbeats(
459        &self,
460        progress_list: &Vec<CompactTaskProgress>,
461    ) -> Vec<CompactTask> {
462        self.inner.write().update_task_heartbeats(progress_list)
463    }
464
465    pub fn compactor_num(&self) -> usize {
466        self.inner.read().compactor_num()
467    }
468
469    pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
470        self.inner.read().get_progress()
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use std::sync::Arc;
477    use std::time::Duration;
478
479    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
480    use risingwave_pb::hummock::CompactTaskProgress;
481    use risingwave_rpc_client::HummockMetaClient;
482
483    use crate::hummock::compaction::selector::default_compaction_selector;
484    use crate::hummock::test_utils::{
485        add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
486    };
487    use crate::hummock::{CompactorManager, MockHummockMetaClient};
488
489    #[tokio::test]
490    async fn test_compactor_manager() {
491        // Initialize metastore with task assignment.
492        let (env, context_id) = {
493            let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
494            let context_id = worker_id as _;
495            let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
496                MockHummockMetaClient::new(hummock_manager.clone(), context_id),
497            );
498            let compactor_manager = hummock_manager.compactor_manager_ref_for_test();
499            register_table_ids_to_compaction_group(
500                hummock_manager.as_ref(),
501                &[1],
502                StaticCompactionGroupId::StateDefault.into(),
503            )
504            .await;
505            let _sst_infos =
506                add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
507            let _receiver = compactor_manager.add_compactor(context_id);
508            hummock_manager
509                .get_compact_task(
510                    StaticCompactionGroupId::StateDefault.into(),
511                    &mut default_compaction_selector(),
512                )
513                .await
514                .unwrap()
515                .unwrap();
516            (env, context_id)
517        };
518
519        // Restart. Set task_expired_seconds to 0 only to speed up test.
520        let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
521        // Because task assignment exists.
522        // Because compactor gRPC is not established yet.
523        assert_eq!(compactor_manager.compactor_num(), 0);
524        assert!(compactor_manager.get_compactor(context_id).is_none());
525
526        // Ensure task is expired.
527        tokio::time::sleep(Duration::from_secs(2)).await;
528        let expired = compactor_manager.get_heartbeat_expired_tasks();
529        assert_eq!(expired.len(), 1);
530
531        // Mimic no-op compaction heartbeat
532        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
533
534        // Mimic compaction heartbeat with invalid task id
535        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
536            task_id: expired[0].task_id + 1,
537            num_ssts_sealed: 1,
538            num_ssts_uploaded: 1,
539            num_progress_key: 100,
540            ..Default::default()
541        }]);
542        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
543
544        // Mimic effective compaction heartbeat
545        compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
546            task_id: expired[0].task_id,
547            num_ssts_sealed: 1,
548            num_ssts_uploaded: 1,
549            num_progress_key: 100,
550            ..Default::default()
551        }]);
552        assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
553
554        // Test add
555        assert_eq!(compactor_manager.compactor_num(), 0);
556        assert!(compactor_manager.get_compactor(context_id).is_none());
557        compactor_manager.add_compactor(context_id);
558        assert_eq!(compactor_manager.compactor_num(), 1);
559        assert_eq!(
560            compactor_manager
561                .get_compactor(context_id)
562                .unwrap()
563                .context_id(),
564            context_id
565        );
566        // Test remove
567        compactor_manager.remove_compactor(context_id);
568        assert_eq!(compactor_manager.compactor_num(), 0);
569        assert!(compactor_manager.get_compactor(context_id).is_none());
570    }
571}