1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18
19use anyhow::Context;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use risingwave_hummock_sdk::compact_task::ReportTask;
23use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
24use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
25use risingwave_pb::hummock::subscribe_compaction_event_request::{
26 Event as RequestEvent, HeartBeat, PullTask,
27};
28use risingwave_pb::hummock::subscribe_compaction_event_response::{
29 Event as ResponseEvent, PullTaskAck,
30};
31use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
32use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
34 Event as IcebergRequestEvent, PullTask as IcebergPullTask, ReportTask as IcebergReportTask,
35};
36use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
37 Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
38};
39use rw_futures_util::pending_on_none;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
43use tokio::task::JoinHandle;
44use tonic::Streaming;
45use tracing::warn;
46
47use super::init_selectors;
48use crate::hummock::HummockManager;
49use crate::hummock::compaction::CompactionSelector;
50use crate::hummock::compactor_manager::Compactor;
51use crate::hummock::error::{Error, Result};
52use crate::hummock::sequence::next_compaction_task_id;
53use crate::manager::MetaOpts;
54use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
55use crate::rpc::metrics::MetaMetrics;
56
57const MAX_SKIP_TIMES: usize = 8;
58const MAX_REPORT_COUNT: usize = 16;
59
60#[async_trait::async_trait]
61pub trait CompactionEventDispatcher: Send + Sync + 'static {
62 type EventType: Send + Sync + 'static;
63
64 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
65
66 async fn on_event_remotely(
67 &self,
68 context_id: HummockContextId,
69 event: Self::EventType,
70 ) -> Result<()>;
71
72 fn should_forward(&self, event: &Self::EventType) -> bool;
73
74 fn remove_compactor(&self, context_id: HummockContextId);
75}
76
77pub trait CompactorStreamEvent: Send + Sync + 'static {
78 type EventType: Send + Sync + 'static;
79 fn take_event(self) -> Self::EventType;
80 fn create_at(&self) -> u64;
81}
82
83pub struct CompactionEventLoop<
84 D: CompactionEventDispatcher<EventType = E::EventType>,
85 E: CompactorStreamEvent,
86> {
87 hummock_compactor_dispatcher: D,
88 metrics: Arc<MetaMetrics>,
89 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
90}
91
92pub type HummockCompactionEventLoop =
93 CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
94
95pub type IcebergCompactionEventLoop =
96 CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
97
98pub struct HummockCompactionEventDispatcher {
99 meta_opts: Arc<MetaOpts>,
100 hummock_compaction_event_handler: HummockCompactionEventHandler,
101 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
102}
103
104#[async_trait::async_trait]
105impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
106 type EventType = RequestEvent;
107
108 fn should_forward(&self, event: &Self::EventType) -> bool {
109 if self.tx.is_none() {
110 return false;
111 }
112
113 matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
114 }
115
116 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
117 let mut compactor_alive = true;
118 match event {
119 RequestEvent::HeartBeat(HeartBeat { progress }) => {
120 compactor_alive = self
121 .hummock_compaction_event_handler
122 .handle_heartbeat(context_id, progress)
123 .await;
124 }
125
126 RequestEvent::Register(_event) => {
127 unreachable!()
128 }
129
130 RequestEvent::PullTask(pull_task) => {
131 compactor_alive = self
132 .hummock_compaction_event_handler
133 .handle_pull_task_event(
134 context_id,
135 pull_task.pull_task_count as usize,
136 &mut init_selectors(),
137 self.meta_opts.max_get_task_probe_times,
138 )
139 .await;
140 }
141
142 RequestEvent::ReportTask(report_event) => {
143 if let Err(e) = self
144 .hummock_compaction_event_handler
145 .handle_report_task_event(vec![report_event.into()])
146 .await
147 {
148 tracing::error!(error = %e.as_report(), "report compact_tack fail")
149 }
150 }
151 }
152
153 compactor_alive
154 }
155
156 async fn on_event_remotely(
157 &self,
158 context_id: HummockContextId,
159 event: Self::EventType,
160 ) -> Result<()> {
161 if let Some(tx) = &self.tx {
162 tx.send((context_id, event))
163 .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
164 } else {
165 unreachable!();
166 }
167 Ok(())
168 }
169
170 fn remove_compactor(&self, context_id: HummockContextId) {
171 self.hummock_compaction_event_handler
172 .hummock_manager
173 .compactor_manager
174 .remove_compactor(context_id);
175 }
176}
177
178impl HummockCompactionEventDispatcher {
179 pub fn new(
180 meta_opts: Arc<MetaOpts>,
181 hummock_compaction_event_handler: HummockCompactionEventHandler,
182 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
183 ) -> Self {
184 Self {
185 meta_opts,
186 hummock_compaction_event_handler,
187 tx,
188 }
189 }
190}
191
192#[derive(Clone)]
193pub struct HummockCompactionEventHandler {
194 pub hummock_manager: Arc<HummockManager>,
195}
196
197impl HummockCompactionEventHandler {
198 pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
199 Self { hummock_manager }
200 }
201
202 async fn handle_heartbeat(
203 &self,
204 context_id: HummockContextId,
205 progress: Vec<CompactTaskProgress>,
206 ) -> bool {
207 let mut compactor_alive = true;
208 let compactor_manager = self.hummock_manager.compactor_manager.clone();
209 let cancel_tasks = compactor_manager
210 .update_task_heartbeats(&progress)
211 .into_iter()
212 .map(|task| task.task_id)
213 .collect::<Vec<_>>();
214 if !cancel_tasks.is_empty() {
215 tracing::info!(
216 ?cancel_tasks,
217 %context_id,
218 "Tasks cancel has expired due to lack of visible progress",
219 );
220
221 if let Err(e) = self
222 .hummock_manager
223 .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
224 .await
225 {
226 tracing::error!(
227 error = %e.as_report(),
228 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
229 until we can successfully report its status."
230 );
231 }
232 }
233
234 match compactor_manager.get_compactor(context_id) {
235 Some(compactor) => {
236 if !cancel_tasks.is_empty() {
240 let _ = compactor.cancel_tasks(&cancel_tasks);
241 tracing::info!(
242 ?cancel_tasks,
243 %context_id,
244 "CancelTask operation has been sent to compactor node",
245 );
246 }
247 }
248 _ => {
249 compactor_alive = false;
252 }
253 }
254
255 compactor_alive
256 }
257
258 async fn handle_pull_task_event(
259 &self,
260 context_id: HummockContextId,
261 pull_task_count: usize,
262 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
263 max_get_task_probe_times: usize,
264 ) -> bool {
265 assert_ne!(0, pull_task_count);
266 let Some(compactor) = self
267 .hummock_manager
268 .compactor_manager
269 .get_compactor(context_id)
270 else {
271 return false;
272 };
273
274 self.try_dispatch_tasks(
276 &compactor,
277 pull_task_count,
278 compaction_selectors,
279 max_get_task_probe_times,
280 )
281 .await;
282
283 if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
285 tracing::warn!(
286 error = %e.as_report(),
287 "Failed to send ack to {}",
288 context_id,
289 );
290 return false;
291 }
292
293 true
294 }
295
296 async fn try_dispatch_tasks(
301 &self,
302 compactor: &Arc<Compactor>,
303 pull_task_count: usize,
304 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
305 max_get_task_probe_times: usize,
306 ) {
307 let snapshot = self.hummock_manager.compaction_state.snapshot();
308 let Some((groups, task_type)) = snapshot.pick_compaction_groups_and_type() else {
309 return;
310 };
311
312 if let TaskType::Ttl = task_type {
313 match self
314 .hummock_manager
315 .metadata_manager
316 .get_all_table_options()
317 .await
318 .map_err(|err| Error::MetaStore(err.into()))
319 {
320 Ok(table_options) => {
321 self.hummock_manager
322 .update_table_id_to_table_option(table_options);
323 }
324 Err(err) => {
325 warn!(error = %err.as_report(), "Failed to get table options");
326 }
327 }
328 }
329
330 let selector: &mut Box<dyn CompactionSelector> =
331 compaction_selectors.get_mut(&task_type).unwrap();
332
333 let mut generated_task_count = 0;
334 let mut existed_groups = groups.clone();
335 let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
336 let mut failed_tasks = vec![];
337 let mut loop_times = 0;
338
339 while generated_task_count < pull_task_count
340 && failed_tasks.is_empty()
341 && loop_times < max_get_task_probe_times
342 {
343 loop_times += 1;
344 let compact_ret = self
345 .hummock_manager
346 .get_compact_tasks(
347 existed_groups.clone(),
348 pull_task_count - generated_task_count,
349 &mut **selector,
350 )
351 .await;
352
353 match compact_ret {
354 Ok((compact_tasks, unschedule_groups)) => {
355 no_task_groups.extend(unschedule_groups);
356 if compact_tasks.is_empty() {
357 break;
358 }
359 generated_task_count += compact_tasks.len();
360 for task in compact_tasks {
361 let task_id = task.task_id;
362 if let Err(e) =
363 compactor.send_event(ResponseEvent::CompactTask(task.into()))
364 {
365 tracing::warn!(
366 error = %e.as_report(),
367 "Failed to send task {} to {}",
368 task_id,
369 compactor.context_id(),
370 );
371 failed_tasks.push(task_id);
372 }
373 }
374 if !failed_tasks.is_empty() {
375 self.hummock_manager
376 .compactor_manager
377 .remove_compactor(compactor.context_id());
378 }
379 existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
380 }
381 Err(err) => {
382 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
383 break;
384 }
385 };
386 }
387 for group in no_task_groups {
388 self.hummock_manager.compaction_state.unschedule(
389 group,
390 task_type,
391 snapshot.snapshot_time(),
392 );
393 }
394 if let Err(err) = self
395 .hummock_manager
396 .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
397 .await
398 {
399 tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
400 }
401 }
402
403 async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
404 if let Err(e) = self
405 .hummock_manager
406 .report_compact_tasks(report_events)
407 .await
408 {
409 tracing::error!(error = %e.as_report(), "report compact_tack fail")
410 }
411 Ok(())
412 }
413}
414
415impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
416 CompactionEventLoop<D, E>
417{
418 pub fn new(
419 hummock_compactor_dispatcher: D,
420 metrics: Arc<MetaMetrics>,
421 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
422 ) -> Self {
423 Self {
424 hummock_compactor_dispatcher,
425 metrics,
426 compactor_streams_change_rx,
427 }
428 }
429
430 pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
431 let mut compactor_request_streams = FuturesUnordered::new();
432 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
433 let shutdown_rx_shared = shutdown_rx.shared();
434
435 let join_handle = tokio::spawn(async move {
436 let mut stream_generations = HashMap::<HummockContextId, u64>::new();
440 let push_stream =
441 |context_id: HummockContextId,
442 stream_generation: u64,
443 stream: Streaming<E>,
444 compactor_request_streams: &mut FuturesUnordered<_>| {
445 let future = StreamExt::into_future(stream)
446 .map(move |stream_future| (context_id, stream_generation, stream_future));
447
448 compactor_request_streams.push(future);
449 };
450
451 let mut event_loop_iteration_now = Instant::now();
452
453 loop {
454 let shutdown_rx_shared = shutdown_rx_shared.clone();
455 self.metrics
456 .compaction_event_loop_iteration_latency
457 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
458 event_loop_iteration_now = Instant::now();
459
460 tokio::select! {
461 _ = shutdown_rx_shared => { return; },
462
463 compactor_stream = self.compactor_streams_change_rx.recv() => {
464 if let Some((context_id, stream)) = compactor_stream {
465 tracing::info!("compactor {} enters the cluster", context_id);
466 let stream_generation = stream_generations
467 .entry(context_id)
468 .and_modify(|generation| *generation += 1)
469 .or_insert(1);
470 push_stream(
471 context_id,
472 *stream_generation,
473 stream,
474 &mut compactor_request_streams,
475 );
476 }
477 },
478
479 result = pending_on_none(compactor_request_streams.next()) => {
480 let (context_id, stream_generation, compactor_stream_req): (_, _, (std::option::Option<std::result::Result<E, _>>, _)) = result;
481 let Some(current_generation) = stream_generations.get(&context_id).copied() else {
482 continue;
483 };
484 if current_generation != stream_generation {
485 continue;
489 }
490 let (event, create_at, stream) = match compactor_stream_req {
491 (Some(Ok(req)), stream) => {
492 let create_at = req.create_at();
493 let event = req.take_event();
494 (event, create_at, stream)
495 }
496
497 (Some(Err(err)), _stream) => {
498 tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
499 self.hummock_compactor_dispatcher.remove_compactor(context_id);
500 continue
501 }
502
503 _ => {
504 tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
506 self.hummock_compactor_dispatcher.remove_compactor(context_id);
507 continue
508 },
509 };
510
511 {
512 let consumed_latency_ms = SystemTime::now()
513 .duration_since(std::time::UNIX_EPOCH)
514 .expect("Clock may have gone backwards")
515 .as_millis()
516 as u64
517 - create_at;
518 self.metrics
519 .compaction_event_consumed_latency
520 .observe(consumed_latency_ms as _);
521 }
522
523 let mut compactor_alive = true;
524 if self
525 .hummock_compactor_dispatcher
526 .should_forward(&event)
527 {
528 if let Err(e) = self
529 .hummock_compactor_dispatcher
530 .on_event_remotely(context_id, event)
531 .await
532 {
533 tracing::warn!(error = %e.as_report(), "Failed to forward event");
534 }
535 } else {
536 compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
537 context_id,
538 event,
539 ).await;
540 }
541
542 if compactor_alive {
543 push_stream(
544 context_id,
545 stream_generation,
546 stream,
547 &mut compactor_request_streams,
548 );
549 } else {
550 tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
551 self
552 .hummock_compactor_dispatcher
553 .remove_compactor(context_id);
554 }
555 },
556 }
557 }
558 });
559
560 (join_handle, shutdown_tx)
561 }
562}
563
564impl CompactorStreamEvent for SubscribeCompactionEventRequest {
565 type EventType = RequestEvent;
566
567 fn take_event(self) -> Self::EventType {
568 self.event.unwrap()
569 }
570
571 fn create_at(&self) -> u64 {
572 self.create_at
573 }
574}
575
576pub struct HummockCompactorDedicatedEventLoop {
577 hummock_manager: Arc<HummockManager>,
578 hummock_compaction_event_handler: HummockCompactionEventHandler,
579}
580
581impl HummockCompactorDedicatedEventLoop {
582 pub fn new(
583 hummock_manager: Arc<HummockManager>,
584 hummock_compaction_event_handler: HummockCompactionEventHandler,
585 ) -> Self {
586 Self {
587 hummock_manager,
588 hummock_compaction_event_handler,
589 }
590 }
591
592 async fn compact_task_dedicated_event_handler(
594 &self,
595 mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
596 shutdown_rx: OneShotReceiver<()>,
597 ) {
598 let mut compaction_selectors = init_selectors();
599
600 tokio::select! {
601 _ = shutdown_rx => {}
602
603 _ = async {
604 while let Some((context_id, event)) = rx.recv().await {
605 let mut report_events = vec![];
606 let mut skip_times = 0;
607 match event {
608 RequestEvent::PullTask(PullTask { pull_task_count }) => {
609 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
610 }
611
612 RequestEvent::ReportTask(task) => {
613 report_events.push(task.into());
614 }
615
616 _ => unreachable!(),
617 }
618 while let Ok((context_id, event)) = rx.try_recv() {
619 match event {
620 RequestEvent::PullTask(PullTask { pull_task_count }) => {
621 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
622 if !report_events.is_empty() {
623 if skip_times > MAX_SKIP_TIMES {
624 break;
625 }
626 skip_times += 1;
627 }
628 }
629
630 RequestEvent::ReportTask(task) => {
631 report_events.push(task.into());
632 if report_events.len() >= MAX_REPORT_COUNT {
633 break;
634 }
635 }
636 _ => unreachable!(),
637 }
638 }
639 if !report_events.is_empty()
640 && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
641 {
642 tracing::error!(error = %e.as_report(), "report compact_tack fail")
643 }
644 }
645 } => {}
646 }
647 }
648
649 pub fn run(
650 self,
651 ) -> (
652 JoinHandle<()>,
653 UnboundedSender<(HummockContextId, RequestEvent)>,
654 Sender<()>,
655 ) {
656 let (tx, rx) = unbounded_channel();
657 let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
658 let join_handler = tokio::spawn(async move {
659 self.compact_task_dedicated_event_handler(rx, shutdown_rx)
660 .await;
661 });
662 (join_handler, tx, shutdon_tx)
663 }
664}
665
666pub struct IcebergCompactionEventHandler {
667 compaction_manager: IcebergCompactionManagerRef,
668}
669
670impl IcebergCompactionEventHandler {
671 pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
672 Self { compaction_manager }
673 }
674
675 async fn handle_pull_task_event(
676 &self,
677 context_id: HummockContextId,
678 pull_task_count: usize,
679 ) -> bool {
680 assert_ne!(0, pull_task_count);
681 if let Some(compactor) = self
682 .compaction_manager
683 .iceberg_compactor_manager
684 .get_compactor(context_id)
685 {
686 let mut compactor_alive = true;
687
688 let iceberg_compaction_handles = self
689 .compaction_manager
690 .get_top_n_iceberg_commit_sink_ids(pull_task_count);
691
692 for handle in iceberg_compaction_handles {
693 let compactor = compactor.clone();
694 if let Err(e) = async move {
696 handle
697 .send_compact_task(
698 compactor,
699 next_compaction_task_id(&self.compaction_manager.env).await?,
700 )
701 .await
702 }
703 .await
704 {
705 tracing::warn!(
706 error = %e.as_report(),
707 "Failed to send iceberg commit task to {}",
708 context_id,
709 );
710 compactor_alive = false;
711 }
712 }
713
714 if let Err(e) =
715 compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
716 {
717 tracing::warn!(
718 error = %e.as_report(),
719 "Failed to send ask to {}",
720 context_id,
721 );
722 compactor_alive = false;
723 }
724
725 return compactor_alive;
726 }
727
728 false
729 }
730
731 fn apply_report_task_event(&self, report: IcebergReportTask) {
732 self.compaction_manager.handle_report_task(report);
733 }
734}
735
736pub struct IcebergCompactionEventDispatcher {
737 compaction_event_handler: IcebergCompactionEventHandler,
738}
739
740#[async_trait::async_trait]
741impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
742 type EventType = IcebergRequestEvent;
743
744 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
745 match event {
746 IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
747 return self
748 .compaction_event_handler
749 .handle_pull_task_event(context_id, pull_task_count as usize)
750 .await;
751 }
752 IcebergRequestEvent::ReportTask(report) => {
753 self.compaction_event_handler
754 .apply_report_task_event(report);
755 return true;
756 }
757 _ => unreachable!(),
758 }
759 }
760
761 async fn on_event_remotely(
762 &self,
763 _context_id: HummockContextId,
764 _event: Self::EventType,
765 ) -> Result<()> {
766 unreachable!()
767 }
768
769 fn should_forward(&self, _event: &Self::EventType) -> bool {
770 false
771 }
772
773 fn remove_compactor(&self, context_id: HummockContextId) {
774 self.compaction_event_handler
775 .compaction_manager
776 .iceberg_compactor_manager
777 .remove_compactor(context_id);
778 }
779}
780
781impl IcebergCompactionEventDispatcher {
782 pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
783 Self {
784 compaction_event_handler,
785 }
786 }
787}
788
789impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
790 type EventType = IcebergRequestEvent;
791
792 fn take_event(self) -> Self::EventType {
793 self.event.unwrap()
794 }
795
796 fn create_at(&self) -> u64 {
797 self.create_at
798 }
799}