1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26 CancelCompactTask, CompactTaskAssignment, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventResponse;
29use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31
32use crate::MetaResult;
33use crate::manager::MetaSrvEnv;
34use crate::model::MetadataModelError;
35
36pub type CompactorManagerRef = Arc<CompactorManager>;
37pub type IcebergCompactorManagerRef = Arc<IcebergCompactorManager>;
38
39pub const TASK_RUN_TOO_LONG: &str = "running too long";
40pub const TASK_NOT_FOUND: &str = "task not found";
41pub const TASK_NORMAL: &str = "task is normal, please wait some time";
42
43type CompactorSubscribeStreamSender = UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>;
44type CompactorSubscribeStreamReceiver =
45 UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>>;
46
47type IcebergCompactorSubscribeStreamSender =
48 UnboundedSender<MetaResult<SubscribeIcebergCompactionEventResponse>>;
49type IcebergCompactorSubscribeStreamReceiver =
50 UnboundedReceiver<MetaResult<SubscribeIcebergCompactionEventResponse>>;
51
52type CompactorSubscribeResponseEvent = ResponseEvent;
53
54type IcebergCompactorSubscribeResponseEvent = IcebergResponseEvent;
55
56pub struct Compactor {
59 context_id: HummockContextId,
60 sender: CompactorSubscribeStreamSender,
61}
62
63pub struct IcebergCompactor {
64 context_id: HummockContextId,
65 sender: IcebergCompactorSubscribeStreamSender,
66}
67
68struct TaskHeartbeat {
69 task: CompactTask,
70 num_ssts_sealed: u32,
71 num_ssts_uploaded: u32,
72 num_progress_key: u64,
73 num_pending_read_io: u64,
74 num_pending_write_io: u64,
75 create_time: Instant,
76 expire_at: u64,
77
78 update_at: u64,
79}
80
81impl Compactor {
82 pub fn new(context_id: HummockContextId, sender: CompactorSubscribeStreamSender) -> Self {
83 Self { context_id, sender }
84 }
85
86 pub fn send_event(&self, event: CompactorSubscribeResponseEvent) -> MetaResult<()> {
87 fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
88 "compaction_send_task_fail"
89 )
90 .into()));
91
92 self.sender
93 .send(Ok(SubscribeCompactionEventResponse {
94 create_at: SystemTime::now()
95 .duration_since(SystemTime::UNIX_EPOCH)
96 .expect("Clock may have gone backwards")
97 .as_millis() as u64,
98 event: Some(event),
99 }))
100 .map_err(|e| anyhow::anyhow!(e))?;
101
102 Ok(())
103 }
104
105 pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
106 self.send_event(ResponseEvent::CancelCompactTask(CancelCompactTask {
107 context_id: self.context_id,
108 task_id,
109 }))
110 }
111
112 pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
113 for task_id in task_ids {
114 self.cancel_task(*task_id)?;
115 }
116 Ok(())
117 }
118
119 pub fn context_id(&self) -> HummockContextId {
120 self.context_id
121 }
122}
123
124pub trait CompactorManagerTrait {
125 fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver;
126 fn remove_compactor(&self, context_id: HummockContextId);
127 fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>>;
128 fn next_compactor(&self) -> Option<Arc<Compactor>>;
129 fn compactor_num(&self) -> usize;
130}
131
132pub struct CompactorManagerInner {
144 pub task_expired_seconds: u64,
145 pub heartbeat_expired_seconds: u64,
146 task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
147
148 pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
150}
151
152impl CompactorManagerInner {
153 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
154 use risingwave_meta_model::compaction_task;
155 use sea_orm::EntityTrait;
156 let task_assignment: Vec<CompactTaskAssignment> = compaction_task::Entity::find()
158 .all(&env.meta_store_ref().conn)
159 .await
160 .map_err(MetadataModelError::from)?
161 .into_iter()
162 .map(Into::into)
163 .collect();
164 let mut manager = Self {
165 task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
166 heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
167 task_heartbeats: Default::default(),
168 compactor_map: Default::default(),
169 };
170 task_assignment.into_iter().for_each(|assignment| {
172 manager.initiate_task_heartbeat(CompactTask::from(assignment.compact_task.unwrap()));
173 });
174 Ok(manager)
175 }
176
177 pub fn for_test() -> Self {
179 Self {
180 task_expired_seconds: 1,
181 heartbeat_expired_seconds: 1,
182 task_heartbeats: Default::default(),
183 compactor_map: Default::default(),
184 }
185 }
186
187 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
188 use rand::Rng;
189 if self.compactor_map.is_empty() {
190 return None;
191 }
192
193 let rand_index = rand::rng().random_range(0..self.compactor_map.len());
194 let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
195
196 Some(compactor)
197 }
198
199 pub fn add_compactor(
206 &mut self,
207 context_id: HummockContextId,
208 ) -> CompactorSubscribeStreamReceiver {
209 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
210
211 self.compactor_map
212 .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
213
214 tracing::info!(context_id = context_id, "Added compactor session");
215
216 rx
217 }
218
219 pub fn abort_all_compactors(&mut self) {
221 while let Some(compactor) = self.next_compactor() {
222 self.remove_compactor(compactor.context_id);
223 }
224 }
225
226 pub fn remove_compactor(&mut self, context_id: HummockContextId) {
227 if self.compactor_map.remove(&context_id).is_some() {
228 tracing::info!(context_id = context_id, "Removed compactor session")
229 };
230 }
231
232 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
233 self.compactor_map.get(&context_id).cloned()
234 }
235
236 pub fn check_tasks_status(
237 &self,
238 tasks: &[HummockCompactionTaskId],
239 slow_task_duration: Duration,
240 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
241 let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
242 let mut ret = HashMap::default();
243 for TaskHeartbeat {
244 task, create_time, ..
245 } in self.task_heartbeats.values()
246 {
247 if !tasks_ids.contains(&task.task_id) {
248 continue;
249 }
250 let pending_time = create_time.elapsed();
251 if pending_time > slow_task_duration {
252 ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
253 } else {
254 ret.insert(task.task_id, (pending_time, TASK_NORMAL));
255 }
256 }
257
258 for task_id in tasks {
259 if !ret.contains_key(task_id) {
260 ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
261 }
262 }
263 ret
264 }
265
266 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
267 let heartbeat_expired_ts: u64 = SystemTime::now()
268 .duration_since(SystemTime::UNIX_EPOCH)
269 .expect("Clock may have gone backwards")
270 .as_secs()
271 - self.heartbeat_expired_seconds;
272 Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
273 }
274
275 fn get_heartbeat_expired_tasks_impl(
276 task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
277 heartbeat_expired_ts: u64,
278 ) -> Vec<CompactTask> {
279 let mut cancellable_tasks = vec![];
280 const MAX_TASK_DURATION_SEC: u64 = 2700;
281
282 for TaskHeartbeat {
283 expire_at,
284 task,
285 create_time,
286 num_ssts_sealed,
287 num_ssts_uploaded,
288 num_progress_key,
289 num_pending_read_io,
290 num_pending_write_io,
291 update_at,
292 } in task_heartbeats.values()
293 {
294 if *update_at < heartbeat_expired_ts {
295 cancellable_tasks.push(task.clone());
296 }
297
298 let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
299 if task_duration_too_long {
300 let compact_task_statistics = statistics_compact_task(task);
301 tracing::info!(
302 "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
303 pending_read_io_count {} pending_write_io_count {} target_level {} \
304 base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
305 task.compaction_group_id,
306 task.task_id,
307 create_time,
308 expire_at,
309 num_ssts_sealed,
310 num_ssts_uploaded,
311 num_progress_key,
312 num_pending_read_io,
313 num_pending_write_io,
314 task.target_level,
315 task.base_level,
316 task.target_sub_level_id,
317 task.task_type.as_str_name(),
318 compact_task_statistics
319 );
320 }
321 }
322 cancellable_tasks
323 }
324
325 pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
326 let now = SystemTime::now()
327 .duration_since(SystemTime::UNIX_EPOCH)
328 .expect("Clock may have gone backwards")
329 .as_secs();
330 self.task_heartbeats.insert(
331 task.task_id,
332 TaskHeartbeat {
333 task,
334 num_ssts_sealed: 0,
335 num_ssts_uploaded: 0,
336 num_progress_key: 0,
337 num_pending_read_io: 0,
338 num_pending_write_io: 0,
339 create_time: Instant::now(),
340 expire_at: now + self.task_expired_seconds,
341 update_at: now,
342 },
343 );
344 }
345
346 pub fn remove_task_heartbeat(&mut self, task_id: u64) {
347 self.task_heartbeats.remove(&task_id).unwrap();
348 }
349
350 pub fn update_task_heartbeats(
351 &mut self,
352 progress_list: &Vec<CompactTaskProgress>,
353 ) -> Vec<CompactTask> {
354 let now = SystemTime::now()
355 .duration_since(SystemTime::UNIX_EPOCH)
356 .expect("Clock may have gone backwards")
357 .as_secs();
358 let mut cancel_tasks = vec![];
359 for progress in progress_list {
360 if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
361 task_ref.update_at = now;
362
363 if task_ref.num_ssts_sealed < progress.num_ssts_sealed
364 || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
365 || task_ref.num_progress_key < progress.num_progress_key
366 {
367 task_ref.expire_at = now + self.task_expired_seconds;
369 task_ref.num_ssts_sealed = progress.num_ssts_sealed;
370 task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
371 task_ref.num_progress_key = progress.num_progress_key;
372 }
373 task_ref.num_pending_read_io = progress.num_pending_read_io;
374 task_ref.num_pending_write_io = progress.num_pending_write_io;
375
376 if task_ref.expire_at < now {
378 cancel_tasks.push(task_ref.task.clone())
380 }
381 }
382 }
383
384 cancel_tasks
385 }
386
387 pub fn compactor_num(&self) -> usize {
388 self.compactor_map.len()
389 }
390
391 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
392 self.task_heartbeats
393 .values()
394 .map(|hb| CompactTaskProgress {
395 task_id: hb.task.task_id,
396 num_ssts_sealed: hb.num_ssts_sealed,
397 num_ssts_uploaded: hb.num_ssts_uploaded,
398 num_progress_key: hb.num_progress_key,
399 num_pending_read_io: hb.num_pending_read_io,
400 num_pending_write_io: hb.num_pending_write_io,
401 compaction_group_id: Some(hb.task.compaction_group_id),
402 })
403 .collect()
404 }
405}
406
407pub struct CompactorManager {
408 inner: Arc<RwLock<CompactorManagerInner>>,
409}
410
411impl CompactorManager {
412 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
413 let inner = CompactorManagerInner::with_meta(env).await?;
414
415 Ok(Self {
416 inner: Arc::new(RwLock::new(inner)),
417 })
418 }
419
420 pub fn for_test() -> Self {
422 let inner = CompactorManagerInner::for_test();
423 Self {
424 inner: Arc::new(RwLock::new(inner)),
425 }
426 }
427
428 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
429 self.inner.read().next_compactor()
430 }
431
432 pub fn add_compactor(&self, context_id: HummockContextId) -> CompactorSubscribeStreamReceiver {
433 self.inner.write().add_compactor(context_id)
434 }
435
436 pub fn abort_all_compactors(&self) {
437 self.inner.write().abort_all_compactors();
438 }
439
440 pub fn remove_compactor(&self, context_id: HummockContextId) {
441 self.inner.write().remove_compactor(context_id)
442 }
443
444 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
445 self.inner.read().get_compactor(context_id)
446 }
447
448 pub fn check_tasks_status(
449 &self,
450 tasks: &[HummockCompactionTaskId],
451 slow_task_duration: Duration,
452 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
453 self.inner
454 .read()
455 .check_tasks_status(tasks, slow_task_duration)
456 }
457
458 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
459 self.inner.read().get_heartbeat_expired_tasks()
460 }
461
462 pub fn initiate_task_heartbeat(&self, task: CompactTask) {
463 self.inner.write().initiate_task_heartbeat(task);
464 }
465
466 pub fn remove_task_heartbeat(&self, task_id: u64) {
467 self.inner.write().remove_task_heartbeat(task_id);
468 }
469
470 pub fn update_task_heartbeats(
471 &self,
472 progress_list: &Vec<CompactTaskProgress>,
473 ) -> Vec<CompactTask> {
474 self.inner.write().update_task_heartbeats(progress_list)
475 }
476
477 pub fn compactor_num(&self) -> usize {
478 self.inner.read().compactor_num()
479 }
480
481 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
482 self.inner.read().get_progress()
483 }
484}
485
486impl IcebergCompactor {
487 pub fn new(
488 context_id: HummockContextId,
489 sender: IcebergCompactorSubscribeStreamSender,
490 ) -> Self {
491 Self { context_id, sender }
492 }
493
494 pub fn context_id(&self) -> HummockContextId {
495 self.context_id
496 }
497
498 pub fn send_event(&self, event: IcebergCompactorSubscribeResponseEvent) -> MetaResult<()> {
499 fail_point!("iceberg_compaction_send_task_fail", |_| Err(
500 anyhow::anyhow!("iceberg_compaction_send_task_fail").into()
501 ));
502
503 self.sender
504 .send(Ok(SubscribeIcebergCompactionEventResponse {
505 create_at: SystemTime::now()
506 .duration_since(SystemTime::UNIX_EPOCH)
507 .expect("Clock may have gone backwards")
508 .as_millis() as u64,
509 event: Some(event),
510 }))
511 .map_err(|e| anyhow::anyhow!(e))?;
512
513 Ok(())
514 }
515}
516
517pub struct IcebergCompactorManagerInner {
518 pub compactor_map: HashMap<HummockContextId, Arc<IcebergCompactor>>,
519}
520
521pub struct IcebergCompactorManager {
522 inner: Arc<RwLock<IcebergCompactorManagerInner>>,
523}
524
525impl IcebergCompactorManager {
526 pub fn new() -> Self {
527 Self {
528 inner: Arc::new(RwLock::new(IcebergCompactorManagerInner {
529 compactor_map: HashMap::new(),
530 })),
531 }
532 }
533
534 pub fn add_compactor(
535 &self,
536 context_id: HummockContextId,
537 ) -> IcebergCompactorSubscribeStreamReceiver {
538 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
539 self.inner
540 .write()
541 .compactor_map
542 .insert(context_id, Arc::new(IcebergCompactor::new(context_id, tx)));
543 tracing::info!(context_id = context_id, "Added iceberg compactor session");
544 rx
545 }
546
547 pub fn remove_compactor(&self, context_id: HummockContextId) {
548 if self
549 .inner
550 .write()
551 .compactor_map
552 .remove(&context_id)
553 .is_some()
554 {
555 tracing::info!(context_id = context_id, "Removed iceberg compactor session");
556 }
557 }
558
559 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<IcebergCompactor>> {
560 self.inner.read().compactor_map.get(&context_id).cloned()
561 }
562
563 pub fn next_compactor(&self) -> Option<Arc<IcebergCompactor>> {
564 use rand::Rng;
565 let compactor_map = &self.inner.read().compactor_map;
566 if compactor_map.is_empty() {
567 return None;
568 }
569 let rand_index = rand::rng().random_range(0..compactor_map.len());
570 compactor_map.values().nth(rand_index).cloned()
571 }
572
573 pub fn compactor_num(&self) -> usize {
574 self.inner.read().compactor_map.len()
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use std::sync::Arc;
581 use std::time::Duration;
582
583 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
584 use risingwave_pb::hummock::CompactTaskProgress;
585 use risingwave_rpc_client::HummockMetaClient;
586
587 use crate::hummock::compaction::selector::default_compaction_selector;
588 use crate::hummock::test_utils::{
589 add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
590 };
591 use crate::hummock::{CompactorManager, IcebergCompactorManager, MockHummockMetaClient};
592
593 #[tokio::test]
594 async fn test_compactor_manager() {
595 let (env, context_id) = {
597 let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
598 let context_id = worker_id as _;
599 let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
600 MockHummockMetaClient::new(hummock_manager.clone(), context_id),
601 );
602 let compactor_manager = hummock_manager.compactor_manager.clone();
603 register_table_ids_to_compaction_group(
604 hummock_manager.as_ref(),
605 &[1],
606 StaticCompactionGroupId::StateDefault.into(),
607 )
608 .await;
609 let _sst_infos =
610 add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
611 let _receiver = compactor_manager.add_compactor(context_id);
612 hummock_manager
613 .get_compact_task(
614 StaticCompactionGroupId::StateDefault.into(),
615 &mut default_compaction_selector(),
616 )
617 .await
618 .unwrap()
619 .unwrap();
620 (env, context_id)
621 };
622
623 let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
625 assert_eq!(compactor_manager.compactor_num(), 0);
628 assert!(compactor_manager.get_compactor(context_id).is_none());
629
630 tokio::time::sleep(Duration::from_secs(2)).await;
632 let expired = compactor_manager.get_heartbeat_expired_tasks();
633 assert_eq!(expired.len(), 1);
634
635 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
637
638 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
640 task_id: expired[0].task_id + 1,
641 num_ssts_sealed: 1,
642 num_ssts_uploaded: 1,
643 num_progress_key: 100,
644 ..Default::default()
645 }]);
646 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
647
648 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
650 task_id: expired[0].task_id,
651 num_ssts_sealed: 1,
652 num_ssts_uploaded: 1,
653 num_progress_key: 100,
654 ..Default::default()
655 }]);
656 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
657
658 assert_eq!(compactor_manager.compactor_num(), 0);
660 assert!(compactor_manager.get_compactor(context_id).is_none());
661 compactor_manager.add_compactor(context_id);
662 assert_eq!(compactor_manager.compactor_num(), 1);
663 assert_eq!(
664 compactor_manager
665 .get_compactor(context_id)
666 .unwrap()
667 .context_id(),
668 context_id
669 );
670 compactor_manager.remove_compactor(context_id);
672 assert_eq!(compactor_manager.compactor_num(), 0);
673 assert!(compactor_manager.get_compactor(context_id).is_none());
674 }
675
676 #[test]
677 fn test_iceberg_compactor_manager() {
678 let iceberg_context_id = 1000;
680 let iceberg_compactor_manager = IcebergCompactorManager::new();
681 assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
682 assert!(
683 iceberg_compactor_manager
684 .get_compactor(iceberg_context_id)
685 .is_none()
686 );
687 iceberg_compactor_manager.add_compactor(iceberg_context_id);
688 assert_eq!(iceberg_compactor_manager.compactor_num(), 1);
689 assert_eq!(
690 iceberg_compactor_manager
691 .get_compactor(iceberg_context_id)
692 .unwrap()
693 .context_id(),
694 iceberg_context_id
695 );
696 iceberg_compactor_manager.remove_compactor(iceberg_context_id);
698 assert_eq!(iceberg_compactor_manager.compactor_num(), 0);
699 assert!(
700 iceberg_compactor_manager
701 .get_compactor(iceberg_context_id)
702 .is_none()
703 );
704 }
705}