1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant, SystemTime};
18
19use fail::fail_point;
20use parking_lot::RwLock;
21use risingwave_hummock_sdk::compact::statistics_compact_task;
22use risingwave_hummock_sdk::compact_task::CompactTask;
23use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
24use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
25use risingwave_pb::hummock::{
26 CancelCompactTask, CompactTaskAssignment, CompactTaskProgress, SubscribeCompactionEventResponse,
27};
28use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
29
30use crate::MetaResult;
31use crate::manager::MetaSrvEnv;
32use crate::model::MetadataModelError;
33
34pub type CompactorManagerRef = Arc<CompactorManager>;
35
36pub const TASK_RUN_TOO_LONG: &str = "running too long";
37pub const TASK_NOT_FOUND: &str = "task not found";
38pub const TASK_NORMAL: &str = "task is normal, please wait some time";
39
40#[derive(Debug)]
43pub struct Compactor {
44 context_id: HummockContextId,
45 sender: UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>,
46}
47
48struct TaskHeartbeat {
49 task: CompactTask,
50 num_ssts_sealed: u32,
51 num_ssts_uploaded: u32,
52 num_progress_key: u64,
53 num_pending_read_io: u64,
54 num_pending_write_io: u64,
55 create_time: Instant,
56 expire_at: u64,
57
58 update_at: u64,
59}
60
61impl Compactor {
62 pub fn new(
63 context_id: HummockContextId,
64 sender: UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>,
65 ) -> Self {
66 Self { context_id, sender }
67 }
68
69 pub fn send_event(&self, event: ResponseEvent) -> MetaResult<()> {
70 fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
71 "compaction_send_task_fail"
72 )
73 .into()));
74
75 self.sender
76 .send(Ok(SubscribeCompactionEventResponse {
77 event: Some(event),
78 create_at: SystemTime::now()
79 .duration_since(std::time::UNIX_EPOCH)
80 .expect("Clock may have gone backwards")
81 .as_millis() as u64,
82 }))
83 .map_err(|e| anyhow::anyhow!(e))?;
84
85 Ok(())
86 }
87
88 pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
89 self.sender
90 .send(Ok(SubscribeCompactionEventResponse {
91 event: Some(ResponseEvent::CancelCompactTask(CancelCompactTask {
92 context_id: self.context_id,
93 task_id,
94 })),
95 create_at: SystemTime::now()
96 .duration_since(std::time::UNIX_EPOCH)
97 .expect("Clock may have gone backwards")
98 .as_millis() as u64,
99 }))
100 .map_err(|e| anyhow::anyhow!(e))?;
101 Ok(())
102 }
103
104 pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
105 for task_id in task_ids {
106 self.cancel_task(*task_id)?;
107 }
108 Ok(())
109 }
110
111 pub fn context_id(&self) -> HummockContextId {
112 self.context_id
113 }
114}
115
116pub struct CompactorManagerInner {
128 pub task_expired_seconds: u64,
129 pub heartbeat_expired_seconds: u64,
130 task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
131
132 pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
134}
135
136impl CompactorManagerInner {
137 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
138 use risingwave_meta_model::compaction_task;
139 use sea_orm::EntityTrait;
140 let task_assignment: Vec<CompactTaskAssignment> = compaction_task::Entity::find()
142 .all(&env.meta_store_ref().conn)
143 .await
144 .map_err(MetadataModelError::from)?
145 .into_iter()
146 .map(Into::into)
147 .collect();
148 let mut manager = Self {
149 task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
150 heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
151 task_heartbeats: Default::default(),
152 compactor_map: Default::default(),
153 };
154 task_assignment.into_iter().for_each(|assignment| {
156 manager.initiate_task_heartbeat(CompactTask::from(assignment.compact_task.unwrap()));
157 });
158 Ok(manager)
159 }
160
161 pub fn for_test() -> Self {
163 Self {
164 task_expired_seconds: 1,
165 heartbeat_expired_seconds: 1,
166 task_heartbeats: Default::default(),
167 compactor_map: Default::default(),
168 }
169 }
170
171 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
172 use rand::Rng;
173
174 if self.compactor_map.is_empty() {
175 return None;
176 }
177
178 let rand_index = rand::rng().random_range(0..self.compactor_map.len());
179 let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
180
181 Some(compactor)
182 }
183
184 pub fn add_compactor(
191 &mut self,
192 context_id: HummockContextId,
193 ) -> UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>> {
194 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
195 self.compactor_map
196 .insert(context_id, Arc::new(Compactor::new(context_id, tx)));
197
198 tracing::info!("Added compactor session {}", context_id);
199 rx
200 }
201
202 pub fn abort_all_compactors(&mut self) {
204 while let Some(compactor) = self.next_compactor() {
205 self.remove_compactor(compactor.context_id);
206 }
207 }
208
209 pub fn remove_compactor(&mut self, context_id: HummockContextId) {
210 self.compactor_map.remove(&context_id);
211
212 tracing::info!("Removed compactor session {}", context_id);
215 }
216
217 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
218 self.compactor_map.get(&context_id).cloned()
219 }
220
221 pub fn check_tasks_status(
222 &self,
223 tasks: &[HummockCompactionTaskId],
224 slow_task_duration: Duration,
225 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
226 let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
227 let mut ret = HashMap::default();
228 for TaskHeartbeat {
229 task, create_time, ..
230 } in self.task_heartbeats.values()
231 {
232 if !tasks_ids.contains(&task.task_id) {
233 continue;
234 }
235 let pending_time = create_time.elapsed();
236 if pending_time > slow_task_duration {
237 ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
238 } else {
239 ret.insert(task.task_id, (pending_time, TASK_NORMAL));
240 }
241 }
242
243 for task_id in tasks {
244 if !ret.contains_key(task_id) {
245 ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
246 }
247 }
248 ret
249 }
250
251 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
252 let heartbeat_expired_ts: u64 = SystemTime::now()
253 .duration_since(SystemTime::UNIX_EPOCH)
254 .expect("Clock may have gone backwards")
255 .as_secs()
256 - self.heartbeat_expired_seconds;
257 Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
258 }
259
260 fn get_heartbeat_expired_tasks_impl(
261 task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
262 heartbeat_expired_ts: u64,
263 ) -> Vec<CompactTask> {
264 let mut cancellable_tasks = vec![];
265 const MAX_TASK_DURATION_SEC: u64 = 2700;
266
267 for TaskHeartbeat {
268 expire_at,
269 task,
270 create_time,
271 num_ssts_sealed,
272 num_ssts_uploaded,
273 num_progress_key,
274 num_pending_read_io,
275 num_pending_write_io,
276 update_at,
277 } in task_heartbeats.values()
278 {
279 if *update_at < heartbeat_expired_ts {
280 cancellable_tasks.push(task.clone());
281 }
282
283 let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
284 if task_duration_too_long {
285 let compact_task_statistics = statistics_compact_task(task);
286 tracing::info!(
287 "CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
288 pending_read_io_count {} pending_write_io_count {} target_level {} \
289 base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
290 task.compaction_group_id,
291 task.task_id,
292 create_time,
293 expire_at,
294 num_ssts_sealed,
295 num_ssts_uploaded,
296 num_progress_key,
297 num_pending_read_io,
298 num_pending_write_io,
299 task.target_level,
300 task.base_level,
301 task.target_sub_level_id,
302 task.task_type.as_str_name(),
303 compact_task_statistics
304 );
305 }
306 }
307 cancellable_tasks
308 }
309
310 pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
311 let now = SystemTime::now()
312 .duration_since(SystemTime::UNIX_EPOCH)
313 .expect("Clock may have gone backwards")
314 .as_secs();
315 self.task_heartbeats.insert(
316 task.task_id,
317 TaskHeartbeat {
318 task,
319 num_ssts_sealed: 0,
320 num_ssts_uploaded: 0,
321 num_progress_key: 0,
322 num_pending_read_io: 0,
323 num_pending_write_io: 0,
324 create_time: Instant::now(),
325 expire_at: now + self.task_expired_seconds,
326 update_at: now,
327 },
328 );
329 }
330
331 pub fn remove_task_heartbeat(&mut self, task_id: u64) {
332 self.task_heartbeats.remove(&task_id).unwrap();
333 }
334
335 pub fn update_task_heartbeats(
336 &mut self,
337 progress_list: &Vec<CompactTaskProgress>,
338 ) -> Vec<CompactTask> {
339 let now = SystemTime::now()
340 .duration_since(SystemTime::UNIX_EPOCH)
341 .expect("Clock may have gone backwards")
342 .as_secs();
343 let mut cancel_tasks = vec![];
344 for progress in progress_list {
345 if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
346 task_ref.update_at = now;
347
348 if task_ref.num_ssts_sealed < progress.num_ssts_sealed
349 || task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
350 || task_ref.num_progress_key < progress.num_progress_key
351 {
352 task_ref.expire_at = now + self.task_expired_seconds;
354 task_ref.num_ssts_sealed = progress.num_ssts_sealed;
355 task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
356 task_ref.num_progress_key = progress.num_progress_key;
357 }
358 task_ref.num_pending_read_io = progress.num_pending_read_io;
359 task_ref.num_pending_write_io = progress.num_pending_write_io;
360
361 if task_ref.expire_at < now {
363 cancel_tasks.push(task_ref.task.clone())
365 }
366 }
367 }
368
369 cancel_tasks
370 }
371
372 pub fn compactor_num(&self) -> usize {
373 self.compactor_map.len()
374 }
375
376 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
377 self.task_heartbeats
378 .values()
379 .map(|hb| CompactTaskProgress {
380 task_id: hb.task.task_id,
381 num_ssts_sealed: hb.num_ssts_sealed,
382 num_ssts_uploaded: hb.num_ssts_uploaded,
383 num_progress_key: hb.num_progress_key,
384 num_pending_read_io: hb.num_pending_read_io,
385 num_pending_write_io: hb.num_pending_write_io,
386 compaction_group_id: Some(hb.task.compaction_group_id),
387 })
388 .collect()
389 }
390}
391
392pub struct CompactorManager {
393 inner: Arc<RwLock<CompactorManagerInner>>,
394}
395
396impl CompactorManager {
397 pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
398 let inner = CompactorManagerInner::with_meta(env).await?;
399
400 Ok(Self {
401 inner: Arc::new(RwLock::new(inner)),
402 })
403 }
404
405 pub fn for_test() -> Self {
407 let inner = CompactorManagerInner::for_test();
408 Self {
409 inner: Arc::new(RwLock::new(inner)),
410 }
411 }
412
413 pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
414 self.inner.read().next_compactor()
415 }
416
417 pub fn add_compactor(
418 &self,
419 context_id: HummockContextId,
420 ) -> UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>> {
421 self.inner.write().add_compactor(context_id)
422 }
423
424 pub fn abort_all_compactors(&self) {
425 self.inner.write().abort_all_compactors();
426 }
427
428 pub fn remove_compactor(&self, context_id: HummockContextId) {
429 self.inner.write().remove_compactor(context_id)
430 }
431
432 pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
433 self.inner.read().get_compactor(context_id)
434 }
435
436 pub fn check_tasks_status(
437 &self,
438 tasks: &[HummockCompactionTaskId],
439 slow_task_duration: Duration,
440 ) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
441 self.inner
442 .read()
443 .check_tasks_status(tasks, slow_task_duration)
444 }
445
446 pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
447 self.inner.read().get_heartbeat_expired_tasks()
448 }
449
450 pub fn initiate_task_heartbeat(&self, task: CompactTask) {
451 self.inner.write().initiate_task_heartbeat(task);
452 }
453
454 pub fn remove_task_heartbeat(&self, task_id: u64) {
455 self.inner.write().remove_task_heartbeat(task_id);
456 }
457
458 pub fn update_task_heartbeats(
459 &self,
460 progress_list: &Vec<CompactTaskProgress>,
461 ) -> Vec<CompactTask> {
462 self.inner.write().update_task_heartbeats(progress_list)
463 }
464
465 pub fn compactor_num(&self) -> usize {
466 self.inner.read().compactor_num()
467 }
468
469 pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
470 self.inner.read().get_progress()
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use std::sync::Arc;
477 use std::time::Duration;
478
479 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
480 use risingwave_pb::hummock::CompactTaskProgress;
481 use risingwave_rpc_client::HummockMetaClient;
482
483 use crate::hummock::compaction::selector::default_compaction_selector;
484 use crate::hummock::test_utils::{
485 add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
486 };
487 use crate::hummock::{CompactorManager, MockHummockMetaClient};
488
489 #[tokio::test]
490 async fn test_compactor_manager() {
491 let (env, context_id) = {
493 let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
494 let context_id = worker_id as _;
495 let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
496 MockHummockMetaClient::new(hummock_manager.clone(), context_id),
497 );
498 let compactor_manager = hummock_manager.compactor_manager_ref_for_test();
499 register_table_ids_to_compaction_group(
500 hummock_manager.as_ref(),
501 &[1],
502 StaticCompactionGroupId::StateDefault.into(),
503 )
504 .await;
505 let _sst_infos =
506 add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
507 let _receiver = compactor_manager.add_compactor(context_id);
508 hummock_manager
509 .get_compact_task(
510 StaticCompactionGroupId::StateDefault.into(),
511 &mut default_compaction_selector(),
512 )
513 .await
514 .unwrap()
515 .unwrap();
516 (env, context_id)
517 };
518
519 let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
521 assert_eq!(compactor_manager.compactor_num(), 0);
524 assert!(compactor_manager.get_compactor(context_id).is_none());
525
526 tokio::time::sleep(Duration::from_secs(2)).await;
528 let expired = compactor_manager.get_heartbeat_expired_tasks();
529 assert_eq!(expired.len(), 1);
530
531 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
533
534 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
536 task_id: expired[0].task_id + 1,
537 num_ssts_sealed: 1,
538 num_ssts_uploaded: 1,
539 num_progress_key: 100,
540 ..Default::default()
541 }]);
542 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
543
544 compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
546 task_id: expired[0].task_id,
547 num_ssts_sealed: 1,
548 num_ssts_uploaded: 1,
549 num_progress_key: 100,
550 ..Default::default()
551 }]);
552 assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
553
554 assert_eq!(compactor_manager.compactor_num(), 0);
556 assert!(compactor_manager.get_compactor(context_id).is_none());
557 compactor_manager.add_compactor(context_id);
558 assert_eq!(compactor_manager.compactor_num(), 1);
559 assert_eq!(
560 compactor_manager
561 .get_compactor(context_id)
562 .unwrap()
563 .context_id(),
564 context_id
565 );
566 compactor_manager.remove_compactor(context_id);
568 assert_eq!(compactor_manager.compactor_num(), 0);
569 assert!(compactor_manager.get_compactor(context_id).is_none());
570 }
571}