1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26 CancelCompactTask, CompactTaskAssignment, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
29use risingwave_pb::iceberg_compaction::{
30 CancelCompactTask as IcebergCancelCompactTask, SubscribeIcebergCompactionEventResponse,
31};
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33
34use crate::MetaResult;
35use crate::manager::MetaSrvEnv;
36use crate::model::MetadataModelError;
37
38pub type CompactorManagerRef = Arc<CompactorManager>;
39pub type IcebergCompactorManagerRef = Arc<IcebergCompactorManager>;
40
41pub const TASK_RUN_TOO_LONG: &str = "running too long";
42pub const TASK_NOT_FOUND: &str = "task not found";
43pub const TASK_NORMAL: &str = "task is normal, please wait some time";
44
45type CompactorSubscribeStreamSender = UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>;
46type CompactorSubscribeStreamReceiver =
47 UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>>;
48
49type IcebergCompactorSubscribeStreamSender =
50 UnboundedSender<MetaResult<SubscribeIcebergCompactionEventResponse>>;
51type IcebergCompactorSubscribeStreamReceiver =
52 UnboundedReceiver<MetaResult<SubscribeIcebergCompactionEventResponse>>;
53
54type CompactorSubscribeResponseEvent = ResponseEvent;
55
56type IcebergCompactorSubscribeResponseEvent = IcebergResponseEvent;
57
58pub struct Compactor {
61 context_id: HummockContextId,
62 sender: CompactorSubscribeStreamSender,
63}
64
65pub struct IcebergCompactor {
66 context_id: HummockContextId,
67 sender: IcebergCompactorSubscribeStreamSender,
68}
69
70struct TaskHeartbeat {
71 task: CompactTask,
72 num_ssts_sealed: u32,
73 num_ssts_uploaded: u32,
74 num_progress_key: u64,
75 num_pending_read_io: u64,
76 num_pending_write_io: u64,
77 create_time: Instant,
78 expire_at: u64,
79
80 update_at: u64,
81}
82
83impl Compactor {
84 pub fn new(context_id: HummockContextId, sender: CompactorSubscribeStreamSender) -> Self {
85 Self { context_id, sender }
86 }
87
88 pub fn send_event(&self, event: CompactorSubscribeResponseEvent) -> MetaResult<()> {
89 fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
90 "compaction_send_task_fail"
91 )
92 .into()));
93
94 self.sender
95 .send(Ok(SubscribeCompactionEventResponse {
96 create_at: SystemTime::now()
97 .duration_since(SystemTime::UNIX_EPOCH)
98 .expect("Clock may have gone backwards")
99 .as_millis() as u64,
100 event: Some(event),
101 }))
102 .map_err(|e| anyhow::anyhow!(e))?;
103
104 Ok(())
105 }
106
107 pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
108 self.send_event(ResponseEvent::CancelCompactTask(CancelCompactTask {
109 context_id: self.context_id,
110 task_id,
111 }))
112 }
113
114 pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
115 for task_id in task_ids {
116 self.cancel_task(*task_id)?;
117 }
118 Ok(())
119 }
120
121 pub fn context_id(&self) -> HummockContextId {
122 self.context_id
123 }
124}
125
126pub trait CompactorManagerTrait {
127 fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver;
128 fn remove_compactor(&self, context_id: HummockContextId);
129 fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>>;
130 fn next_compactor(&self) -> Option<Arc<Compactor>>;
131 fn compactor_num(&self) -> usize;
132}
133
134pub struct CompactorManagerInner {
146 pub task_expired_seconds: u64,
147 pub heartbeat_expired_seconds: u64,
148 task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
149
150 pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
152}
153
154impl CompactorManagerInner {
155 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
156 use risingwave_meta_model::compaction_task;
157 use sea_orm::EntityTrait;
158 let task_assignment: Vec<CompactTaskAssignment> = compaction_task::Entity::find()
160 .all(&env.meta_store_ref().conn)
161 .await
162 .map_err(MetadataModelError::from)?
163 .into_iter()
164 .map(Into::into)
165 .collect();
166 let mut manager = Self {
167 task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
168 heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
169 task_heartbeats: Default::default(),
170 compactor_map: Default::default(),
171 };
172 task_assignment.into_iter().for_each(|assignment| {
174 manager.initiate_task_heartbeat(CompactTask::from(assignment.compact_task.unwrap()));
175 });
176 Ok(manager)
177 }
178
179 pub fn for_test() -> Self {
181 Self {
182 task_expired_seconds: 1,
183 heartbeat_expired_seconds: 1,
184 task_heartbeats: Default::default(),
185 compactor_map: Default::default(),
186 }
187 }
188
189 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
190 use rand::Rng;
191 if self.compactor_map.is_empty() {
192 return None;
193 }
194
195 let rand_index = rand::rng().random_range(0..self.compactor_map.len());
196 let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
197
198 Some(compactor)
199 }
200
201 pub fn add_compactor(
208 &mut self,
209 context_id: HummockContextId,
210 ) -> CompactorSubscribeStreamReceiver {
211 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
212
213 self.compactor_map
214 .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
215
216 tracing::info!(context_id = %context_id, "Added compactor session");
217
218 rx
219 }
220
221 pub fn abort_all_compactors(&mut self) {
223 while let Some(compactor) = self.next_compactor() {
224 self.remove_compactor(compactor.context_id);
225 }
226 }
227
228 pub fn remove_compactor(&mut self, context_id: HummockContextId) {
229 if self.compactor_map.remove(&context_id).is_some() {
230 tracing::info!(context_id = %context_id, "Removed compactor session")
231 };
232 }
233
234 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
235 self.compactor_map.get(&context_id).cloned()
236 }
237
238 pub fn check_tasks_status(
239 &self,
240 tasks: &[HummockCompactionTaskId],
241 slow_task_duration: Duration,
242 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
243 let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
244 let mut ret = HashMap::default();
245 for TaskHeartbeat {
246 task, create_time, ..
247 } in self.task_heartbeats.values()
248 {
249 if !tasks_ids.contains(&task.task_id) {
250 continue;
251 }
252 let pending_time = create_time.elapsed();
253 if pending_time > slow_task_duration {
254 ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
255 } else {
256 ret.insert(task.task_id, (pending_time, TASK_NORMAL));
257 }
258 }
259
260 for task_id in tasks {
261 if !ret.contains_key(task_id) {
262 ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
263 }
264 }
265 ret
266 }
267
268 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
269 let heartbeat_expired_ts: u64 = SystemTime::now()
270 .duration_since(SystemTime::UNIX_EPOCH)
271 .expect("Clock may have gone backwards")
272 .as_secs()
273 - self.heartbeat_expired_seconds;
274 Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
275 }
276
277 fn get_heartbeat_expired_tasks_impl(
278 task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
279 heartbeat_expired_ts: u64,
280 ) -> Vec<CompactTask> {
281 let mut cancellable_tasks = vec![];
282 const MAX_TASK_DURATION_SEC: u64 = 2700;
283
284 for TaskHeartbeat {
285 expire_at,
286 task,
287 create_time,
288 num_ssts_sealed,
289 num_ssts_uploaded,
290 num_progress_key,
291 num_pending_read_io,
292 num_pending_write_io,
293 update_at,
294 } in task_heartbeats.values()
295 {
296 if *update_at < heartbeat_expired_ts {
297 cancellable_tasks.push(task.clone());
298 }
299
300 let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
301 if task_duration_too_long {
302 let compact_task_statistics = statistics_compact_task(task);
303 tracing::info!(
304 "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
305 pending_read_io_count {} pending_write_io_count {} target_level {} \
306 base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
307 task.compaction_group_id,
308 task.task_id,
309 create_time,
310 expire_at,
311 num_ssts_sealed,
312 num_ssts_uploaded,
313 num_progress_key,
314 num_pending_read_io,
315 num_pending_write_io,
316 task.target_level,
317 task.base_level,
318 task.target_sub_level_id,
319 task.task_type.as_str_name(),
320 compact_task_statistics
321 );
322 }
323 }
324 cancellable_tasks
325 }
326
327 pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
328 let now = SystemTime::now()
329 .duration_since(SystemTime::UNIX_EPOCH)
330 .expect("Clock may have gone backwards")
331 .as_secs();
332 self.task_heartbeats.insert(
333 task.task_id,
334 TaskHeartbeat {
335 task,
336 num_ssts_sealed: 0,
337 num_ssts_uploaded: 0,
338 num_progress_key: 0,
339 num_pending_read_io: 0,
340 num_pending_write_io: 0,
341 create_time: Instant::now(),
342 expire_at: now + self.task_expired_seconds,
343 update_at: now,
344 },
345 );
346 }
347
348 pub fn remove_task_heartbeat(&mut self, task_id: u64) {
349 self.task_heartbeats.remove(&task_id).unwrap();
350 }
351
352 pub fn update_task_heartbeats(
353 &mut self,
354 progress_list: &Vec<CompactTaskProgress>,
355 ) -> Vec<CompactTask> {
356 let now = SystemTime::now()
357 .duration_since(SystemTime::UNIX_EPOCH)
358 .expect("Clock may have gone backwards")
359 .as_secs();
360 let mut cancel_tasks = vec![];
361 for progress in progress_list {
362 if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
363 task_ref.update_at = now;
364
365 if task_ref.num_ssts_sealed < progress.num_ssts_sealed
366 || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
367 || task_ref.num_progress_key < progress.num_progress_key
368 {
369 task_ref.expire_at = now + self.task_expired_seconds;
371 task_ref.num_ssts_sealed = progress.num_ssts_sealed;
372 task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
373 task_ref.num_progress_key = progress.num_progress_key;
374 }
375 task_ref.num_pending_read_io = progress.num_pending_read_io;
376 task_ref.num_pending_write_io = progress.num_pending_write_io;
377
378 if task_ref.expire_at < now {
380 cancel_tasks.push(task_ref.task.clone())
382 }
383 }
384 }
385
386 cancel_tasks
387 }
388
389 pub fn compactor_num(&self) -> usize {
390 self.compactor_map.len()
391 }
392
393 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
394 self.task_heartbeats
395 .values()
396 .map(|hb| CompactTaskProgress {
397 task_id: hb.task.task_id,
398 num_ssts_sealed: hb.num_ssts_sealed,
399 num_ssts_uploaded: hb.num_ssts_uploaded,
400 num_progress_key: hb.num_progress_key,
401 num_pending_read_io: hb.num_pending_read_io,
402 num_pending_write_io: hb.num_pending_write_io,
403 compaction_group_id: Some(hb.task.compaction_group_id),
404 })
405 .collect()
406 }
407}
408
409pub struct CompactorManager {
410 inner: Arc<RwLock<CompactorManagerInner>>,
411}
412
413impl CompactorManager {
414 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
415 let inner = CompactorManagerInner::with_meta(env).await?;
416
417 Ok(Self {
418 inner: Arc::new(RwLock::new(inner)),
419 })
420 }
421
422 pub fn for_test() -> Self {
424 let inner = CompactorManagerInner::for_test();
425 Self {
426 inner: Arc::new(RwLock::new(inner)),
427 }
428 }
429
430 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
431 self.inner.read().next_compactor()
432 }
433
434 pub fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver {
435 self.inner.write().add_compactor(context_id)
436 }
437
438 pub fn abort_all_compactors(&self) {
439 self.inner.write().abort_all_compactors();
440 }
441
442 pub fn remove_compactor(&self, context_id: HummockContextId) {
443 self.inner.write().remove_compactor(context_id)
444 }
445
446 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
447 self.inner.read().get_compactor(context_id)
448 }
449
450 pub fn check_tasks_status(
451 &self,
452 tasks: &[HummockCompactionTaskId],
453 slow_task_duration: Duration,
454 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
455 self.inner
456 .read()
457 .check_tasks_status(tasks, slow_task_duration)
458 }
459
460 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
461 self.inner.read().get_heartbeat_expired_tasks()
462 }
463
464 pub fn initiate_task_heartbeat(&self, task: CompactTask) {
465 self.inner.write().initiate_task_heartbeat(task);
466 }
467
468 pub fn remove_task_heartbeat(&self, task_id: u64) {
469 self.inner.write().remove_task_heartbeat(task_id);
470 }
471
472 pub fn update_task_heartbeats(
473 &self,
474 progress_list: &Vec<CompactTaskProgress>,
475 ) -> Vec<CompactTask> {
476 self.inner.write().update_task_heartbeats(progress_list)
477 }
478
479 pub fn compactor_num(&self) -> usize {
480 self.inner.read().compactor_num()
481 }
482
483 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
484 self.inner.read().get_progress()
485 }
486}
487
488impl IcebergCompactor {
489 pub fn new(
490 context_id: HummockContextId,
491 sender: IcebergCompactorSubscribeStreamSender,
492 ) -> Self {
493 Self { context_id, sender }
494 }
495
496 pub fn context_id(&self) -> HummockContextId {
497 self.context_id
498 }
499
500 pub fn send_event(&self, event: IcebergCompactorSubscribeResponseEvent) -> MetaResult<()> {
501 fail_point!("iceberg_compaction_send_task_fail", |_| Err(
502 anyhow::anyhow!("iceberg_compaction_send_task_fail").into()
503 ));
504
505 self.sender
506 .send(Ok(SubscribeIcebergCompactionEventResponse {
507 create_at: SystemTime::now()
508 .duration_since(SystemTime::UNIX_EPOCH)
509 .expect("Clock may have gone backwards")
510 .as_millis() as u64,
511 event: Some(event),
512 }))
513 .map_err(|e| anyhow::anyhow!(e))?;
514
515 Ok(())
516 }
517
518 pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
519 self.send_event(IcebergResponseEvent::CancelCompactTask(
520 IcebergCancelCompactTask { task_id },
521 ))
522 }
523}
524
525pub struct IcebergCompactorManagerInner {
526 pub compactor_map: HashMap<HummockContextId, Arc<IcebergCompactor>>,
527}
528
529pub struct IcebergCompactorManager {
530 inner: Arc<RwLock<IcebergCompactorManagerInner>>,
531}
532
533impl IcebergCompactorManager {
534 pub fn new() -> Self {
535 Self {
536 inner: Arc::new(RwLock::new(IcebergCompactorManagerInner {
537 compactor_map: HashMap::new(),
538 })),
539 }
540 }
541
542 pub fn add_compactor(
543 &self,
544 context_id: HummockContextId,
545 ) -> IcebergCompactorSubscribeStreamReceiver {
546 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
547 self.inner
548 .write()
549 .compactor_map
550 .insert(context_id, Arc::new(IcebergCompactor::new(context_id, tx)));
551 tracing::info!(context_id = %context_id, "Added iceberg compactor session");
552 rx
553 }
554
555 pub fn remove_compactor(&self, context_id: HummockContextId) {
556 if self
557 .inner
558 .write()
559 .compactor_map
560 .remove(&context_id)
561 .is_some()
562 {
563 tracing::info!(context_id = %context_id, "Removed iceberg compactor session");
564 }
565 }
566
567 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<IcebergCompactor>> {
568 self.inner.read().compactor_map.get(&context_id).cloned()
569 }
570
571 pub fn next_compactor(&self) -> Option<Arc<IcebergCompactor>> {
572 use rand::Rng;
573 let compactor_map = &self.inner.read().compactor_map;
574 if compactor_map.is_empty() {
575 return None;
576 }
577 let rand_index = rand::rng().random_range(0..compactor_map.len());
578 compactor_map.values().nth(rand_index).cloned()
579 }
580
581 pub fn compactor_num(&self) -> usize {
582 self.inner.read().compactor_map.len()
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use std::sync::Arc;
589 use std::time::Duration;
590
591 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
592 use risingwave_pb::hummock::CompactTaskProgress;
593 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
594 use risingwave_rpc_client::HummockMetaClient;
595
596 use crate::hummock::compaction::selector::default_compaction_selector;
597 use crate::hummock::test_utils::{
598 add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
599 };
600 use crate::hummock::{CompactorManager, IcebergCompactorManager, MockHummockMetaClient};
601
602 #[tokio::test]
603 async fn test_compactor_manager() {
604 let (env, context_id) = {
606 let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
607 let context_id = worker_id as _;
608 let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
609 MockHummockMetaClient::new(hummock_manager.clone(), context_id),
610 );
611 let compactor_manager = hummock_manager.compactor_manager.clone();
612 register_table_ids_to_compaction_group(
613 hummock_manager.as_ref(),
614 &[1],
615 StaticCompactionGroupId::StateDefault,
616 )
617 .await;
618 let _sst_infos =
619 add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
620 let _receiver = compactor_manager.add_compactor(context_id);
621 hummock_manager
622 .get_compact_task(
623 StaticCompactionGroupId::StateDefault,
624 &mut *default_compaction_selector(),
625 )
626 .await
627 .unwrap()
628 .unwrap();
629 (env, context_id)
630 };
631
632 let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
634 assert_eq!(compactor_manager.compactor_num(), 0);
637 assert!(compactor_manager.get_compactor(context_id).is_none());
638
639 tokio::time::sleep(Duration::from_secs(2)).await;
641 let expired = compactor_manager.get_heartbeat_expired_tasks();
642 assert_eq!(expired.len(), 1);
643
644 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
646
647 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
649 task_id: expired[0].task_id + 1,
650 num_ssts_sealed: 1,
651 num_ssts_uploaded: 1,
652 num_progress_key: 100,
653 ..Default::default()
654 }]);
655 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
656
657 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
659 task_id: expired[0].task_id,
660 num_ssts_sealed: 1,
661 num_ssts_uploaded: 1,
662 num_progress_key: 100,
663 ..Default::default()
664 }]);
665 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
666
667 assert_eq!(compactor_manager.compactor_num(), 0);
669 assert!(compactor_manager.get_compactor(context_id).is_none());
670 compactor_manager.add_compactor(context_id);
671 assert_eq!(compactor_manager.compactor_num(), 1);
672 assert_eq!(
673 compactor_manager
674 .get_compactor(context_id)
675 .unwrap()
676 .context_id(),
677 context_id
678 );
679 compactor_manager.remove_compactor(context_id);
681 assert_eq!(compactor_manager.compactor_num(), 0);
682 assert!(compactor_manager.get_compactor(context_id).is_none());
683 }
684
685 #[test]
686 fn test_iceberg_compactor_manager() {
687 let iceberg_context_id = 1000.into();
689 let iceberg_compactor_manager = IcebergCompactorManager::new();
690 assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
691 assert!(
692 iceberg_compactor_manager
693 .get_compactor(iceberg_context_id)
694 .is_none()
695 );
696 let mut receiver = iceberg_compactor_manager.add_compactor(iceberg_context_id);
697 assert_eq!(iceberg_compactor_manager.compactor_num(), 1);
698 let compactor = iceberg_compactor_manager
699 .get_compactor(iceberg_context_id)
700 .unwrap();
701 assert_eq!(compactor.context_id(), iceberg_context_id);
702
703 compactor.cancel_task(42).unwrap();
704 let event = receiver.try_recv().unwrap().unwrap().event.unwrap();
705 match event {
706 IcebergResponseEvent::CancelCompactTask(cancel_task) => {
707 assert_eq!(cancel_task.task_id, 42);
708 }
709 other => panic!("unexpected iceberg compactor event: {other:?}"),
710 }
711
712 iceberg_compactor_manager.remove_compactor(iceberg_context_id);
714 assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
715 assert!(
716 iceberg_compactor_manager
717 .get_compactor(iceberg_context_id)
718 .is_none()
719 );
720 }
721}