1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26 CancelCompactTask, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
29use risingwave_pb::iceberg_compaction::{
30 CancelCompactTask as IcebergCancelCompactTask, SubscribeIcebergCompactionEventResponse,
31};
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33
34use crate::MetaResult;
35use crate::hummock::model::ext::compaction_task_model_to_assignment;
36use crate::manager::MetaSrvEnv;
37use crate::model::MetadataModelError;
38
39pub type CompactorManagerRef = Arc<CompactorManager>;
40pub type IcebergCompactorManagerRef = Arc<IcebergCompactorManager>;
41
42pub const TASK_RUN_TOO_LONG: &str = "running too long";
43pub const TASK_NOT_FOUND: &str = "task not found";
44pub const TASK_NORMAL: &str = "task is normal, please wait some time";
45
46type CompactorSubscribeStreamSender = UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>;
47type CompactorSubscribeStreamReceiver =
48 UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>>;
49
50type IcebergCompactorSubscribeStreamSender =
51 UnboundedSender<MetaResult<SubscribeIcebergCompactionEventResponse>>;
52type IcebergCompactorSubscribeStreamReceiver =
53 UnboundedReceiver<MetaResult<SubscribeIcebergCompactionEventResponse>>;
54
55type CompactorSubscribeResponseEvent = ResponseEvent;
56
57type IcebergCompactorSubscribeResponseEvent = IcebergResponseEvent;
58
59pub struct Compactor {
62 context_id: HummockContextId,
63 sender: CompactorSubscribeStreamSender,
64}
65
66pub struct IcebergCompactor {
67 context_id: HummockContextId,
68 sender: IcebergCompactorSubscribeStreamSender,
69}
70
71struct TaskHeartbeat {
72 task: CompactTask,
73 num_ssts_sealed: u32,
74 num_ssts_uploaded: u32,
75 num_progress_key: u64,
76 num_pending_read_io: u64,
77 num_pending_write_io: u64,
78 create_time: Instant,
79 expire_at: u64,
80
81 update_at: u64,
82}
83
84impl Compactor {
85 pub fn new(context_id: HummockContextId, sender: CompactorSubscribeStreamSender) -> Self {
86 Self { context_id, sender }
87 }
88
89 pub fn send_event(&self, event: CompactorSubscribeResponseEvent) -> MetaResult<()> {
90 fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
91 "compaction_send_task_fail"
92 )
93 .into()));
94
95 self.sender
96 .send(Ok(SubscribeCompactionEventResponse {
97 create_at: SystemTime::now()
98 .duration_since(SystemTime::UNIX_EPOCH)
99 .expect("Clock may have gone backwards")
100 .as_millis() as u64,
101 event: Some(event),
102 }))
103 .map_err(|e| anyhow::anyhow!(e))?;
104
105 Ok(())
106 }
107
108 pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
109 self.send_event(ResponseEvent::CancelCompactTask(CancelCompactTask {
110 context_id: self.context_id,
111 task_id,
112 }))
113 }
114
115 pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
116 for task_id in task_ids {
117 self.cancel_task(*task_id)?;
118 }
119 Ok(())
120 }
121
122 pub fn context_id(&self) -> HummockContextId {
123 self.context_id
124 }
125}
126
127pub trait CompactorManagerTrait {
128 fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver;
129 fn remove_compactor(&self, context_id: HummockContextId);
130 fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>>;
131 fn next_compactor(&self) -> Option<Arc<Compactor>>;
132 fn compactor_num(&self) -> usize;
133}
134
135pub struct CompactorManagerInner {
147 pub task_expired_seconds: u64,
148 pub heartbeat_expired_seconds: u64,
149 task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
150
151 pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
153}
154
155impl CompactorManagerInner {
156 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
157 use risingwave_meta_model::compaction_task;
158 use sea_orm::EntityTrait;
159 let task_assignment: Vec<_> = compaction_task::Entity::find()
161 .all(&env.meta_store_ref().conn)
162 .await
163 .map_err(MetadataModelError::from)?
164 .into_iter()
165 .map(compaction_task_model_to_assignment)
166 .collect();
167 let mut manager = Self {
168 task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
169 heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
170 task_heartbeats: Default::default(),
171 compactor_map: Default::default(),
172 };
173 task_assignment.into_iter().for_each(|assignment| {
175 manager.initiate_task_heartbeat(assignment.compact_task);
176 });
177 Ok(manager)
178 }
179
180 pub fn for_test() -> Self {
182 Self {
183 task_expired_seconds: 1,
184 heartbeat_expired_seconds: 1,
185 task_heartbeats: Default::default(),
186 compactor_map: Default::default(),
187 }
188 }
189
190 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
191 use rand::Rng;
192 if self.compactor_map.is_empty() {
193 return None;
194 }
195
196 let rand_index = rand::rng().random_range(0..self.compactor_map.len());
197 let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
198
199 Some(compactor)
200 }
201
202 pub fn add_compactor(
209 &mut self,
210 context_id: HummockContextId,
211 ) -> CompactorSubscribeStreamReceiver {
212 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
213
214 self.compactor_map
215 .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
216
217 tracing::info!(context_id = %context_id, "Added compactor session");
218
219 rx
220 }
221
222 pub fn abort_all_compactors(&mut self) {
224 while let Some(compactor) = self.next_compactor() {
225 self.remove_compactor(compactor.context_id);
226 }
227 }
228
229 pub fn remove_compactor(&mut self, context_id: HummockContextId) {
230 if self.compactor_map.remove(&context_id).is_some() {
231 tracing::info!(context_id = %context_id, "Removed compactor session")
232 };
233 }
234
235 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
236 self.compactor_map.get(&context_id).cloned()
237 }
238
239 pub fn check_tasks_status(
240 &self,
241 tasks: &[HummockCompactionTaskId],
242 slow_task_duration: Duration,
243 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
244 let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
245 let mut ret = HashMap::default();
246 for TaskHeartbeat {
247 task, create_time, ..
248 } in self.task_heartbeats.values()
249 {
250 if !tasks_ids.contains(&task.task_id) {
251 continue;
252 }
253 let pending_time = create_time.elapsed();
254 if pending_time > slow_task_duration {
255 ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
256 } else {
257 ret.insert(task.task_id, (pending_time, TASK_NORMAL));
258 }
259 }
260
261 for task_id in tasks {
262 if !ret.contains_key(task_id) {
263 ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
264 }
265 }
266 ret
267 }
268
269 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
270 let heartbeat_expired_ts: u64 = SystemTime::now()
271 .duration_since(SystemTime::UNIX_EPOCH)
272 .expect("Clock may have gone backwards")
273 .as_secs()
274 - self.heartbeat_expired_seconds;
275 Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
276 }
277
278 fn get_heartbeat_expired_tasks_impl(
279 task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
280 heartbeat_expired_ts: u64,
281 ) -> Vec<CompactTask> {
282 let mut cancellable_tasks = vec![];
283 const MAX_TASK_DURATION_SEC: u64 = 2700;
284
285 for TaskHeartbeat {
286 expire_at,
287 task,
288 create_time,
289 num_ssts_sealed,
290 num_ssts_uploaded,
291 num_progress_key,
292 num_pending_read_io,
293 num_pending_write_io,
294 update_at,
295 } in task_heartbeats.values()
296 {
297 if *update_at < heartbeat_expired_ts {
298 cancellable_tasks.push(task.clone());
299 }
300
301 let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
302 if task_duration_too_long {
303 let compact_task_statistics = statistics_compact_task(task);
304 tracing::info!(
305 "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
306 pending_read_io_count {} pending_write_io_count {} target_level {} \
307 base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
308 task.compaction_group_id,
309 task.task_id,
310 create_time,
311 expire_at,
312 num_ssts_sealed,
313 num_ssts_uploaded,
314 num_progress_key,
315 num_pending_read_io,
316 num_pending_write_io,
317 task.target_level,
318 task.base_level,
319 task.target_sub_level_id,
320 task.task_type.as_str_name(),
321 compact_task_statistics
322 );
323 }
324 }
325 cancellable_tasks
326 }
327
328 pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
329 let now = SystemTime::now()
330 .duration_since(SystemTime::UNIX_EPOCH)
331 .expect("Clock may have gone backwards")
332 .as_secs();
333 self.task_heartbeats.insert(
334 task.task_id,
335 TaskHeartbeat {
336 task,
337 num_ssts_sealed: 0,
338 num_ssts_uploaded: 0,
339 num_progress_key: 0,
340 num_pending_read_io: 0,
341 num_pending_write_io: 0,
342 create_time: Instant::now(),
343 expire_at: now + self.task_expired_seconds,
344 update_at: now,
345 },
346 );
347 }
348
349 pub fn remove_task_heartbeat(&mut self, task_id: u64) {
350 self.task_heartbeats.remove(&task_id).unwrap();
351 }
352
353 pub fn update_task_heartbeats(
354 &mut self,
355 progress_list: &Vec<CompactTaskProgress>,
356 ) -> Vec<CompactTask> {
357 let now = SystemTime::now()
358 .duration_since(SystemTime::UNIX_EPOCH)
359 .expect("Clock may have gone backwards")
360 .as_secs();
361 let mut cancel_tasks = vec![];
362 for progress in progress_list {
363 if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
364 task_ref.update_at = now;
365
366 if task_ref.num_ssts_sealed < progress.num_ssts_sealed
367 || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
368 || task_ref.num_progress_key < progress.num_progress_key
369 {
370 task_ref.expire_at = now + self.task_expired_seconds;
372 task_ref.num_ssts_sealed = progress.num_ssts_sealed;
373 task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
374 task_ref.num_progress_key = progress.num_progress_key;
375 }
376 task_ref.num_pending_read_io = progress.num_pending_read_io;
377 task_ref.num_pending_write_io = progress.num_pending_write_io;
378
379 if task_ref.expire_at < now {
381 cancel_tasks.push(task_ref.task.clone())
383 }
384 }
385 }
386
387 cancel_tasks
388 }
389
390 pub fn compactor_num(&self) -> usize {
391 self.compactor_map.len()
392 }
393
394 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
395 self.task_heartbeats
396 .values()
397 .map(|hb| CompactTaskProgress {
398 task_id: hb.task.task_id,
399 num_ssts_sealed: hb.num_ssts_sealed,
400 num_ssts_uploaded: hb.num_ssts_uploaded,
401 num_progress_key: hb.num_progress_key,
402 num_pending_read_io: hb.num_pending_read_io,
403 num_pending_write_io: hb.num_pending_write_io,
404 compaction_group_id: Some(hb.task.compaction_group_id),
405 })
406 .collect()
407 }
408}
409
410pub struct CompactorManager {
411 inner: Arc<RwLock<CompactorManagerInner>>,
412}
413
414impl CompactorManager {
415 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
416 let inner = CompactorManagerInner::with_meta(env).await?;
417
418 Ok(Self {
419 inner: Arc::new(RwLock::new(inner)),
420 })
421 }
422
423 pub fn for_test() -> Self {
425 let inner = CompactorManagerInner::for_test();
426 Self {
427 inner: Arc::new(RwLock::new(inner)),
428 }
429 }
430
431 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
432 self.inner.read().next_compactor()
433 }
434
435 pub fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver {
436 self.inner.write().add_compactor(context_id)
437 }
438
439 pub fn abort_all_compactors(&self) {
440 self.inner.write().abort_all_compactors();
441 }
442
443 pub fn remove_compactor(&self, context_id: HummockContextId) {
444 self.inner.write().remove_compactor(context_id)
445 }
446
447 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
448 self.inner.read().get_compactor(context_id)
449 }
450
451 pub fn check_tasks_status(
452 &self,
453 tasks: &[HummockCompactionTaskId],
454 slow_task_duration: Duration,
455 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
456 self.inner
457 .read()
458 .check_tasks_status(tasks, slow_task_duration)
459 }
460
461 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
462 self.inner.read().get_heartbeat_expired_tasks()
463 }
464
465 pub fn initiate_task_heartbeat(&self, task: CompactTask) {
466 self.inner.write().initiate_task_heartbeat(task);
467 }
468
469 pub fn remove_task_heartbeat(&self, task_id: u64) {
470 self.inner.write().remove_task_heartbeat(task_id);
471 }
472
473 pub fn update_task_heartbeats(
474 &self,
475 progress_list: &Vec<CompactTaskProgress>,
476 ) -> Vec<CompactTask> {
477 self.inner.write().update_task_heartbeats(progress_list)
478 }
479
480 pub fn compactor_num(&self) -> usize {
481 self.inner.read().compactor_num()
482 }
483
484 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
485 self.inner.read().get_progress()
486 }
487}
488
489impl IcebergCompactor {
490 pub fn new(
491 context_id: HummockContextId,
492 sender: IcebergCompactorSubscribeStreamSender,
493 ) -> Self {
494 Self { context_id, sender }
495 }
496
497 pub fn context_id(&self) -> HummockContextId {
498 self.context_id
499 }
500
501 pub fn send_event(&self, event: IcebergCompactorSubscribeResponseEvent) -> MetaResult<()> {
502 fail_point!("iceberg_compaction_send_task_fail", |_| Err(
503 anyhow::anyhow!("iceberg_compaction_send_task_fail").into()
504 ));
505
506 self.sender
507 .send(Ok(SubscribeIcebergCompactionEventResponse {
508 create_at: SystemTime::now()
509 .duration_since(SystemTime::UNIX_EPOCH)
510 .expect("Clock may have gone backwards")
511 .as_millis() as u64,
512 event: Some(event),
513 }))
514 .map_err(|e| anyhow::anyhow!(e))?;
515
516 Ok(())
517 }
518
519 pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
520 self.send_event(IcebergResponseEvent::CancelCompactTask(
521 IcebergCancelCompactTask { task_id },
522 ))
523 }
524}
525
526pub struct IcebergCompactorManagerInner {
527 pub compactor_map: HashMap<HummockContextId, Arc<IcebergCompactor>>,
528}
529
530pub struct IcebergCompactorManager {
531 inner: Arc<RwLock<IcebergCompactorManagerInner>>,
532}
533
534impl IcebergCompactorManager {
535 pub fn new() -> Self {
536 Self {
537 inner: Arc::new(RwLock::new(IcebergCompactorManagerInner {
538 compactor_map: HashMap::new(),
539 })),
540 }
541 }
542
543 pub fn add_compactor(
544 &self,
545 context_id: HummockContextId,
546 ) -> IcebergCompactorSubscribeStreamReceiver {
547 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
548 self.inner
549 .write()
550 .compactor_map
551 .insert(context_id, Arc::new(IcebergCompactor::new(context_id, tx)));
552 tracing::info!(context_id = %context_id, "Added iceberg compactor session");
553 rx
554 }
555
556 pub fn remove_compactor(&self, context_id: HummockContextId) {
557 if self
558 .inner
559 .write()
560 .compactor_map
561 .remove(&context_id)
562 .is_some()
563 {
564 tracing::info!(context_id = %context_id, "Removed iceberg compactor session");
565 }
566 }
567
568 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<IcebergCompactor>> {
569 self.inner.read().compactor_map.get(&context_id).cloned()
570 }
571
572 pub fn next_compactor(&self) -> Option<Arc<IcebergCompactor>> {
573 use rand::Rng;
574 let compactor_map = &self.inner.read().compactor_map;
575 if compactor_map.is_empty() {
576 return None;
577 }
578 let rand_index = rand::rng().random_range(0..compactor_map.len());
579 compactor_map.values().nth(rand_index).cloned()
580 }
581
582 pub fn compactor_num(&self) -> usize {
583 self.inner.read().compactor_map.len()
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use std::sync::Arc;
590 use std::time::Duration;
591
592 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
593 use risingwave_pb::hummock::CompactTaskProgress;
594 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
595 use risingwave_rpc_client::HummockMetaClient;
596
597 use crate::hummock::compaction::selector::default_compaction_selector;
598 use crate::hummock::test_utils::{
599 add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
600 };
601 use crate::hummock::{CompactorManager, IcebergCompactorManager, MockHummockMetaClient};
602
603 #[tokio::test]
604 async fn test_compactor_manager() {
605 let (env, context_id) = {
607 let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
608 let context_id = worker_id as _;
609 let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
610 MockHummockMetaClient::new(hummock_manager.clone(), context_id),
611 );
612 let compactor_manager = hummock_manager.compactor_manager.clone();
613 register_table_ids_to_compaction_group(
614 hummock_manager.as_ref(),
615 &[1],
616 StaticCompactionGroupId::StateDefault,
617 )
618 .await;
619 let _sst_infos =
620 add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
621 let _receiver = compactor_manager.add_compactor(context_id);
622 hummock_manager
623 .get_compact_task(
624 StaticCompactionGroupId::StateDefault,
625 &mut *default_compaction_selector(),
626 )
627 .await
628 .unwrap()
629 .unwrap();
630 (env, context_id)
631 };
632
633 let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
635 assert_eq!(compactor_manager.compactor_num(), 0);
638 assert!(compactor_manager.get_compactor(context_id).is_none());
639
640 tokio::time::sleep(Duration::from_secs(2)).await;
642 let expired = compactor_manager.get_heartbeat_expired_tasks();
643 assert_eq!(expired.len(), 1);
644
645 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
647
648 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
650 task_id: expired[0].task_id + 1,
651 num_ssts_sealed: 1,
652 num_ssts_uploaded: 1,
653 num_progress_key: 100,
654 ..Default::default()
655 }]);
656 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
657
658 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
660 task_id: expired[0].task_id,
661 num_ssts_sealed: 1,
662 num_ssts_uploaded: 1,
663 num_progress_key: 100,
664 ..Default::default()
665 }]);
666 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
667
668 assert_eq!(compactor_manager.compactor_num(), 0);
670 assert!(compactor_manager.get_compactor(context_id).is_none());
671 compactor_manager.add_compactor(context_id);
672 assert_eq!(compactor_manager.compactor_num(), 1);
673 assert_eq!(
674 compactor_manager
675 .get_compactor(context_id)
676 .unwrap()
677 .context_id(),
678 context_id
679 );
680 compactor_manager.remove_compactor(context_id);
682 assert_eq!(compactor_manager.compactor_num(), 0);
683 assert!(compactor_manager.get_compactor(context_id).is_none());
684 }
685
686 #[test]
687 fn test_iceberg_compactor_manager() {
688 let iceberg_context_id = 1000.into();
690 let iceberg_compactor_manager = IcebergCompactorManager::new();
691 assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
692 assert!(
693 iceberg_compactor_manager
694 .get_compactor(iceberg_context_id)
695 .is_none()
696 );
697 let mut receiver = iceberg_compactor_manager.add_compactor(iceberg_context_id);
698 assert_eq!(iceberg_compactor_manager.compactor_num(), 1);
699 let compactor = iceberg_compactor_manager
700 .get_compactor(iceberg_context_id)
701 .unwrap();
702 assert_eq!(compactor.context_id(), iceberg_context_id);
703
704 compactor.cancel_task(42).unwrap();
705 let event = receiver.try_recv().unwrap().unwrap().event.unwrap();
706 match event {
707 IcebergResponseEvent::CancelCompactTask(cancel_task) => {
708 assert_eq!(cancel_task.task_id, 42);
709 }
710 other => panic!("unexpected iceberg compactor event: {other:?}"),
711 }
712
713 iceberg_compactor_manager.remove_compactor(iceberg_context_id);
715 assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
716 assert!(
717 iceberg_compactor_manager
718 .get_compactor(iceberg_context_id)
719 .is_none()
720 );
721 }
722}