1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18
19use anyhow::Context;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use risingwave_hummock_sdk::compact_task::ReportTask;
23use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
24use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
25use risingwave_pb::hummock::subscribe_compaction_event_request::{
26 Event as RequestEvent, HeartBeat, PullTask,
27};
28use risingwave_pb::hummock::subscribe_compaction_event_response::{
29 Event as ResponseEvent, PullTaskAck,
30};
31use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
32use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
34 Event as IcebergRequestEvent, PullTask as IcebergPullTask,
35};
36use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
37 Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
38};
39use rw_futures_util::pending_on_none;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
43use tokio::task::JoinHandle;
44use tonic::Streaming;
45use tracing::warn;
46
47use super::init_selectors;
48use crate::hummock::HummockManager;
49use crate::hummock::compaction::CompactionSelector;
50use crate::hummock::compactor_manager::Compactor;
51use crate::hummock::error::{Error, Result};
52use crate::hummock::sequence::next_compaction_task_id;
53use crate::manager::MetaOpts;
54use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
55use crate::rpc::metrics::MetaMetrics;
56
57const MAX_SKIP_TIMES: usize = 8;
58const MAX_REPORT_COUNT: usize = 16;
59
60#[async_trait::async_trait]
61pub trait CompactionEventDispatcher: Send + Sync + 'static {
62 type EventType: Send + Sync + 'static;
63
64 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
65
66 async fn on_event_remotely(
67 &self,
68 context_id: HummockContextId,
69 event: Self::EventType,
70 ) -> Result<()>;
71
72 fn should_forward(&self, event: &Self::EventType) -> bool;
73
74 fn remove_compactor(&self, context_id: HummockContextId);
75}
76
77pub trait CompactorStreamEvent: Send + Sync + 'static {
78 type EventType: Send + Sync + 'static;
79 fn take_event(self) -> Self::EventType;
80 fn create_at(&self) -> u64;
81}
82
83pub struct CompactionEventLoop<
84 D: CompactionEventDispatcher<EventType = E::EventType>,
85 E: CompactorStreamEvent,
86> {
87 hummock_compactor_dispatcher: D,
88 metrics: Arc<MetaMetrics>,
89 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
90}
91
92pub type HummockCompactionEventLoop =
93 CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
94
95pub type IcebergCompactionEventLoop =
96 CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
97
98pub struct HummockCompactionEventDispatcher {
99 meta_opts: Arc<MetaOpts>,
100 hummock_compaction_event_handler: HummockCompactionEventHandler,
101 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
102}
103
104#[async_trait::async_trait]
105impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
106 type EventType = RequestEvent;
107
108 fn should_forward(&self, event: &Self::EventType) -> bool {
109 if self.tx.is_none() {
110 return false;
111 }
112
113 matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
114 }
115
116 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
117 let mut compactor_alive = true;
118 match event {
119 RequestEvent::HeartBeat(HeartBeat { progress }) => {
120 compactor_alive = self
121 .hummock_compaction_event_handler
122 .handle_heartbeat(context_id, progress)
123 .await;
124 }
125
126 RequestEvent::Register(_event) => {
127 unreachable!()
128 }
129
130 RequestEvent::PullTask(pull_task) => {
131 compactor_alive = self
132 .hummock_compaction_event_handler
133 .handle_pull_task_event(
134 context_id,
135 pull_task.pull_task_count as usize,
136 &mut init_selectors(),
137 self.meta_opts.max_get_task_probe_times,
138 )
139 .await;
140 }
141
142 RequestEvent::ReportTask(report_event) => {
143 if let Err(e) = self
144 .hummock_compaction_event_handler
145 .handle_report_task_event(vec![report_event.into()])
146 .await
147 {
148 tracing::error!(error = %e.as_report(), "report compact_tack fail")
149 }
150 }
151 }
152
153 compactor_alive
154 }
155
156 async fn on_event_remotely(
157 &self,
158 context_id: HummockContextId,
159 event: Self::EventType,
160 ) -> Result<()> {
161 if let Some(tx) = &self.tx {
162 tx.send((context_id, event))
163 .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
164 } else {
165 unreachable!();
166 }
167 Ok(())
168 }
169
170 fn remove_compactor(&self, context_id: HummockContextId) {
171 self.hummock_compaction_event_handler
172 .hummock_manager
173 .compactor_manager
174 .remove_compactor(context_id);
175 }
176}
177
178impl HummockCompactionEventDispatcher {
179 pub fn new(
180 meta_opts: Arc<MetaOpts>,
181 hummock_compaction_event_handler: HummockCompactionEventHandler,
182 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
183 ) -> Self {
184 Self {
185 meta_opts,
186 hummock_compaction_event_handler,
187 tx,
188 }
189 }
190}
191
192#[derive(Clone)]
193pub struct HummockCompactionEventHandler {
194 pub hummock_manager: Arc<HummockManager>,
195}
196
197impl HummockCompactionEventHandler {
198 pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
199 Self { hummock_manager }
200 }
201
202 async fn handle_heartbeat(
203 &self,
204 context_id: HummockContextId,
205 progress: Vec<CompactTaskProgress>,
206 ) -> bool {
207 let mut compactor_alive = true;
208 let compactor_manager = self.hummock_manager.compactor_manager.clone();
209 let cancel_tasks = compactor_manager
210 .update_task_heartbeats(&progress)
211 .into_iter()
212 .map(|task| task.task_id)
213 .collect::<Vec<_>>();
214 if !cancel_tasks.is_empty() {
215 tracing::info!(
216 ?cancel_tasks,
217 %context_id,
218 "Tasks cancel has expired due to lack of visible progress",
219 );
220
221 if let Err(e) = self
222 .hummock_manager
223 .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
224 .await
225 {
226 tracing::error!(
227 error = %e.as_report(),
228 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
229 until we can successfully report its status."
230 );
231 }
232 }
233
234 match compactor_manager.get_compactor(context_id) {
235 Some(compactor) => {
236 if !cancel_tasks.is_empty() {
240 let _ = compactor.cancel_tasks(&cancel_tasks);
241 tracing::info!(
242 ?cancel_tasks,
243 %context_id,
244 "CancelTask operation has been sent to compactor node",
245 );
246 }
247 }
248 _ => {
249 compactor_alive = false;
252 }
253 }
254
255 compactor_alive
256 }
257
258 async fn handle_pull_task_event(
259 &self,
260 context_id: HummockContextId,
261 pull_task_count: usize,
262 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
263 max_get_task_probe_times: usize,
264 ) -> bool {
265 assert_ne!(0, pull_task_count);
266 let Some(compactor) = self
267 .hummock_manager
268 .compactor_manager
269 .get_compactor(context_id)
270 else {
271 return false;
272 };
273
274 self.try_dispatch_tasks(
276 &compactor,
277 pull_task_count,
278 compaction_selectors,
279 max_get_task_probe_times,
280 )
281 .await;
282
283 if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
285 tracing::warn!(
286 error = %e.as_report(),
287 "Failed to send ack to {}",
288 context_id,
289 );
290 return false;
291 }
292
293 true
294 }
295
296 async fn try_dispatch_tasks(
301 &self,
302 compactor: &Arc<Compactor>,
303 pull_task_count: usize,
304 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
305 max_get_task_probe_times: usize,
306 ) {
307 let snapshot = self.hummock_manager.compaction_state.snapshot();
308 let Some((groups, task_type)) = snapshot.pick_compaction_groups_and_type() else {
309 return;
310 };
311
312 if let TaskType::Ttl = task_type {
313 match self
314 .hummock_manager
315 .metadata_manager
316 .get_all_table_options()
317 .await
318 .map_err(|err| Error::MetaStore(err.into()))
319 {
320 Ok(table_options) => {
321 self.hummock_manager
322 .update_table_id_to_table_option(table_options);
323 }
324 Err(err) => {
325 warn!(error = %err.as_report(), "Failed to get table options");
326 }
327 }
328 }
329
330 let selector: &mut Box<dyn CompactionSelector> =
331 compaction_selectors.get_mut(&task_type).unwrap();
332
333 let mut generated_task_count = 0;
334 let mut existed_groups = groups.clone();
335 let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
336 let mut failed_tasks = vec![];
337 let mut loop_times = 0;
338
339 while generated_task_count < pull_task_count
340 && failed_tasks.is_empty()
341 && loop_times < max_get_task_probe_times
342 {
343 loop_times += 1;
344 let compact_ret = self
345 .hummock_manager
346 .get_compact_tasks(
347 existed_groups.clone(),
348 pull_task_count - generated_task_count,
349 selector,
350 )
351 .await;
352
353 match compact_ret {
354 Ok((compact_tasks, unschedule_groups)) => {
355 no_task_groups.extend(unschedule_groups);
356 if compact_tasks.is_empty() {
357 break;
358 }
359 generated_task_count += compact_tasks.len();
360 for task in compact_tasks {
361 let task_id = task.task_id;
362 if let Err(e) =
363 compactor.send_event(ResponseEvent::CompactTask(task.into()))
364 {
365 tracing::warn!(
366 error = %e.as_report(),
367 "Failed to send task {} to {}",
368 task_id,
369 compactor.context_id(),
370 );
371 failed_tasks.push(task_id);
372 }
373 }
374 if !failed_tasks.is_empty() {
375 self.hummock_manager
376 .compactor_manager
377 .remove_compactor(compactor.context_id());
378 }
379 existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
380 }
381 Err(err) => {
382 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
383 break;
384 }
385 };
386 }
387 for group in no_task_groups {
388 self.hummock_manager.compaction_state.unschedule(
389 group,
390 task_type,
391 snapshot.snapshot_time(),
392 );
393 }
394 if let Err(err) = self
395 .hummock_manager
396 .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
397 .await
398 {
399 tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
400 }
401 }
402
403 async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
404 if let Err(e) = self
405 .hummock_manager
406 .report_compact_tasks(report_events)
407 .await
408 {
409 tracing::error!(error = %e.as_report(), "report compact_tack fail")
410 }
411 Ok(())
412 }
413}
414
415impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
416 CompactionEventLoop<D, E>
417{
418 pub fn new(
419 hummock_compactor_dispatcher: D,
420 metrics: Arc<MetaMetrics>,
421 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
422 ) -> Self {
423 Self {
424 hummock_compactor_dispatcher,
425 metrics,
426 compactor_streams_change_rx,
427 }
428 }
429
430 pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
431 let mut compactor_request_streams = FuturesUnordered::new();
432 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
433 let shutdown_rx_shared = shutdown_rx.shared();
434
435 let join_handle = tokio::spawn(async move {
436 let push_stream =
437 |context_id: HummockContextId,
438 stream: Streaming<E>,
439 compactor_request_streams: &mut FuturesUnordered<_>| {
440 let future = StreamExt::into_future(stream)
441 .map(move |stream_future| (context_id, stream_future));
442
443 compactor_request_streams.push(future);
444 };
445
446 let mut event_loop_iteration_now = Instant::now();
447
448 loop {
449 let shutdown_rx_shared = shutdown_rx_shared.clone();
450 self.metrics
451 .compaction_event_loop_iteration_latency
452 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
453 event_loop_iteration_now = Instant::now();
454
455 tokio::select! {
456 _ = shutdown_rx_shared => { return; },
457
458 compactor_stream = self.compactor_streams_change_rx.recv() => {
459 if let Some((context_id, stream)) = compactor_stream {
460 tracing::info!("compactor {} enters the cluster", context_id);
461 push_stream(context_id, stream, &mut compactor_request_streams);
462 }
463 },
464
465 result = pending_on_none(compactor_request_streams.next()) => {
466 let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
467 let (event, create_at, stream) = match compactor_stream_req {
468 (Some(Ok(req)), stream) => {
469 let create_at = req.create_at();
470 let event = req.take_event();
471 (event, create_at, stream)
472 }
473
474 (Some(Err(err)), _stream) => {
475 tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
476 continue
477 }
478
479 _ => {
480 tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
482 self.hummock_compactor_dispatcher.remove_compactor(context_id);
483 continue
484 },
485 };
486
487 {
488 let consumed_latency_ms = SystemTime::now()
489 .duration_since(std::time::UNIX_EPOCH)
490 .expect("Clock may have gone backwards")
491 .as_millis()
492 as u64
493 - create_at;
494 self.metrics
495 .compaction_event_consumed_latency
496 .observe(consumed_latency_ms as _);
497 }
498
499 let mut compactor_alive = true;
500 if self
501 .hummock_compactor_dispatcher
502 .should_forward(&event)
503 {
504 if let Err(e) = self
505 .hummock_compactor_dispatcher
506 .on_event_remotely(context_id, event)
507 .await
508 {
509 tracing::warn!(error = %e.as_report(), "Failed to forward event");
510 }
511 } else {
512 compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
513 context_id,
514 event,
515 ).await;
516 }
517
518 if compactor_alive {
519 push_stream(context_id, stream, &mut compactor_request_streams);
520 } else {
521 tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
522 self
523 .hummock_compactor_dispatcher
524 .remove_compactor(context_id);
525 }
526 },
527 }
528 }
529 });
530
531 (join_handle, shutdown_tx)
532 }
533}
534
535impl CompactorStreamEvent for SubscribeCompactionEventRequest {
536 type EventType = RequestEvent;
537
538 fn take_event(self) -> Self::EventType {
539 self.event.unwrap()
540 }
541
542 fn create_at(&self) -> u64 {
543 self.create_at
544 }
545}
546
547pub struct HummockCompactorDedicatedEventLoop {
548 hummock_manager: Arc<HummockManager>,
549 hummock_compaction_event_handler: HummockCompactionEventHandler,
550}
551
552impl HummockCompactorDedicatedEventLoop {
553 pub fn new(
554 hummock_manager: Arc<HummockManager>,
555 hummock_compaction_event_handler: HummockCompactionEventHandler,
556 ) -> Self {
557 Self {
558 hummock_manager,
559 hummock_compaction_event_handler,
560 }
561 }
562
563 async fn compact_task_dedicated_event_handler(
565 &self,
566 mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
567 shutdown_rx: OneShotReceiver<()>,
568 ) {
569 let mut compaction_selectors = init_selectors();
570
571 tokio::select! {
572 _ = shutdown_rx => {}
573
574 _ = async {
575 while let Some((context_id, event)) = rx.recv().await {
576 let mut report_events = vec![];
577 let mut skip_times = 0;
578 match event {
579 RequestEvent::PullTask(PullTask { pull_task_count }) => {
580 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
581 }
582
583 RequestEvent::ReportTask(task) => {
584 report_events.push(task.into());
585 }
586
587 _ => unreachable!(),
588 }
589 while let Ok((context_id, event)) = rx.try_recv() {
590 match event {
591 RequestEvent::PullTask(PullTask { pull_task_count }) => {
592 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
593 if !report_events.is_empty() {
594 if skip_times > MAX_SKIP_TIMES {
595 break;
596 }
597 skip_times += 1;
598 }
599 }
600
601 RequestEvent::ReportTask(task) => {
602 report_events.push(task.into());
603 if report_events.len() >= MAX_REPORT_COUNT {
604 break;
605 }
606 }
607 _ => unreachable!(),
608 }
609 }
610 if !report_events.is_empty()
611 && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
612 {
613 tracing::error!(error = %e.as_report(), "report compact_tack fail")
614 }
615 }
616 } => {}
617 }
618 }
619
620 pub fn run(
621 self,
622 ) -> (
623 JoinHandle<()>,
624 UnboundedSender<(HummockContextId, RequestEvent)>,
625 Sender<()>,
626 ) {
627 let (tx, rx) = unbounded_channel();
628 let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
629 let join_handler = tokio::spawn(async move {
630 self.compact_task_dedicated_event_handler(rx, shutdown_rx)
631 .await;
632 });
633 (join_handler, tx, shutdon_tx)
634 }
635}
636
637pub struct IcebergCompactionEventHandler {
638 compaction_manager: IcebergCompactionManagerRef,
639}
640
641impl IcebergCompactionEventHandler {
642 pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
643 Self { compaction_manager }
644 }
645
646 async fn handle_pull_task_event(
647 &self,
648 context_id: HummockContextId,
649 pull_task_count: usize,
650 ) -> bool {
651 assert_ne!(0, pull_task_count);
652 if let Some(compactor) = self
653 .compaction_manager
654 .iceberg_compactor_manager
655 .get_compactor(context_id)
656 {
657 let mut compactor_alive = true;
658
659 let iceberg_compaction_handles = self
660 .compaction_manager
661 .get_top_n_iceberg_commit_sink_ids(pull_task_count)
662 .await;
663
664 for handle in iceberg_compaction_handles {
665 let compactor = compactor.clone();
666 if let Err(e) = async move {
668 handle
669 .send_compact_task(
670 compactor,
671 next_compaction_task_id(&self.compaction_manager.env).await?,
672 )
673 .await
674 }
675 .await
676 {
677 tracing::warn!(
678 error = %e.as_report(),
679 "Failed to send iceberg commit task to {}",
680 context_id,
681 );
682 compactor_alive = false;
683 }
684 }
685
686 if let Err(e) =
687 compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
688 {
689 tracing::warn!(
690 error = %e.as_report(),
691 "Failed to send ask to {}",
692 context_id,
693 );
694 compactor_alive = false;
695 }
696
697 return compactor_alive;
698 }
699
700 false
701 }
702}
703
704pub struct IcebergCompactionEventDispatcher {
705 compaction_event_handler: IcebergCompactionEventHandler,
706}
707
708#[async_trait::async_trait]
709impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
710 type EventType = IcebergRequestEvent;
711
712 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
713 match event {
714 IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
715 return self
716 .compaction_event_handler
717 .handle_pull_task_event(context_id, pull_task_count as usize)
718 .await;
719 }
720 _ => unreachable!(),
721 }
722 }
723
724 async fn on_event_remotely(
725 &self,
726 _context_id: HummockContextId,
727 _event: Self::EventType,
728 ) -> Result<()> {
729 unreachable!()
730 }
731
732 fn should_forward(&self, _event: &Self::EventType) -> bool {
733 false
734 }
735
736 fn remove_compactor(&self, context_id: HummockContextId) {
737 self.compaction_event_handler
738 .compaction_manager
739 .iceberg_compactor_manager
740 .remove_compactor(context_id);
741 }
742}
743
744impl IcebergCompactionEventDispatcher {
745 pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
746 Self {
747 compaction_event_handler,
748 }
749 }
750}
751
752impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
753 type EventType = IcebergRequestEvent;
754
755 fn take_event(self) -> Self::EventType {
756 self.event.unwrap()
757 }
758
759 fn create_at(&self) -> u64 {
760 self.create_at
761 }
762}