1use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::time::{Instant, SystemTime};
32
33use anyhow::Context;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, StreamExt};
36use risingwave_hummock_sdk::compact_task::ReportTask;
37use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
38use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
39use risingwave_pb::hummock::subscribe_compaction_event_request::{
40 Event as RequestEvent, HeartBeat, PullTask,
41};
42use risingwave_pb::hummock::subscribe_compaction_event_response::{
43 Event as ResponseEvent, PullTaskAck,
44};
45use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
46use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
47use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
48 Event as IcebergRequestEvent, PullTask as IcebergPullTask,
49};
50use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
51 Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
52};
53use rw_futures_util::pending_on_none;
54use thiserror_ext::AsReport;
55use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
56use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use super::init_selectors;
62use crate::hummock::HummockManager;
63use crate::hummock::compaction::CompactionSelector;
64use crate::hummock::error::{Error, Result};
65use crate::hummock::sequence::next_compaction_task_id;
66use crate::manager::MetaOpts;
67use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
68use crate::rpc::metrics::MetaMetrics;
69
70const MAX_SKIP_TIMES: usize = 8;
71const MAX_REPORT_COUNT: usize = 16;
72
73#[async_trait::async_trait]
74pub trait CompactionEventDispatcher: Send + Sync + 'static {
75 type EventType: Send + Sync + 'static;
76
77 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
78
79 async fn on_event_remotely(
80 &self,
81 context_id: HummockContextId,
82 event: Self::EventType,
83 ) -> Result<()>;
84
85 fn should_forward(&self, event: &Self::EventType) -> bool;
86
87 fn remove_compactor(&self, context_id: HummockContextId);
88}
89
90pub trait CompactorStreamEvent: Send + Sync + 'static {
91 type EventType: Send + Sync + 'static;
92 fn take_event(self) -> Self::EventType;
93 fn create_at(&self) -> u64;
94}
95
96pub struct CompactionEventLoop<
97 D: CompactionEventDispatcher<EventType = E::EventType>,
98 E: CompactorStreamEvent,
99> {
100 hummock_compactor_dispatcher: D,
101 metrics: Arc<MetaMetrics>,
102 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
103}
104
105pub type HummockCompactionEventLoop =
106 CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
107
108pub type IcebergCompactionEventLoop =
109 CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
110
111pub struct HummockCompactionEventDispatcher {
112 meta_opts: Arc<MetaOpts>,
113 hummock_compaction_event_handler: HummockCompactionEventHandler,
114 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
115}
116
117#[async_trait::async_trait]
118impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
119 type EventType = RequestEvent;
120
121 fn should_forward(&self, event: &Self::EventType) -> bool {
122 if self.tx.is_none() {
123 return false;
124 }
125
126 matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
127 }
128
129 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
130 let mut compactor_alive = true;
131 match event {
132 RequestEvent::HeartBeat(HeartBeat { progress }) => {
133 compactor_alive = self
134 .hummock_compaction_event_handler
135 .handle_heartbeat(context_id, progress)
136 .await;
137 }
138
139 RequestEvent::Register(_event) => {
140 unreachable!()
141 }
142
143 RequestEvent::PullTask(pull_task) => {
144 compactor_alive = self
145 .hummock_compaction_event_handler
146 .handle_pull_task_event(
147 context_id,
148 pull_task.pull_task_count as usize,
149 &mut init_selectors(),
150 self.meta_opts.max_get_task_probe_times,
151 )
152 .await;
153 }
154
155 RequestEvent::ReportTask(report_event) => {
156 if let Err(e) = self
157 .hummock_compaction_event_handler
158 .handle_report_task_event(vec![report_event.into()])
159 .await
160 {
161 tracing::error!(error = %e.as_report(), "report compact_tack fail")
162 }
163 }
164 }
165
166 compactor_alive
167 }
168
169 async fn on_event_remotely(
170 &self,
171 context_id: HummockContextId,
172 event: Self::EventType,
173 ) -> Result<()> {
174 if let Some(tx) = &self.tx {
175 tx.send((context_id, event))
176 .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
177 } else {
178 unreachable!();
179 }
180 Ok(())
181 }
182
183 fn remove_compactor(&self, context_id: HummockContextId) {
184 self.hummock_compaction_event_handler
185 .hummock_manager
186 .compactor_manager
187 .remove_compactor(context_id);
188 }
189}
190
191impl HummockCompactionEventDispatcher {
192 pub fn new(
193 meta_opts: Arc<MetaOpts>,
194 hummock_compaction_event_handler: HummockCompactionEventHandler,
195 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
196 ) -> Self {
197 Self {
198 meta_opts,
199 hummock_compaction_event_handler,
200 tx,
201 }
202 }
203}
204
205#[derive(Clone)]
206pub struct HummockCompactionEventHandler {
207 pub hummock_manager: Arc<HummockManager>,
208}
209
210impl HummockCompactionEventHandler {
211 pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
212 Self { hummock_manager }
213 }
214
215 async fn handle_heartbeat(
216 &self,
217 context_id: HummockContextId,
218 progress: Vec<CompactTaskProgress>,
219 ) -> bool {
220 let mut compactor_alive = true;
221 let compactor_manager = self.hummock_manager.compactor_manager.clone();
222 let cancel_tasks = compactor_manager
223 .update_task_heartbeats(&progress)
224 .into_iter()
225 .map(|task| task.task_id)
226 .collect::<Vec<_>>();
227 if !cancel_tasks.is_empty() {
228 tracing::info!(
229 ?cancel_tasks,
230 %context_id,
231 "Tasks cancel has expired due to lack of visible progress",
232 );
233
234 if let Err(e) = self
235 .hummock_manager
236 .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
237 .await
238 {
239 tracing::error!(
240 error = %e.as_report(),
241 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
242 until we can successfully report its status."
243 );
244 }
245 }
246
247 match compactor_manager.get_compactor(context_id) {
248 Some(compactor) => {
249 if !cancel_tasks.is_empty() {
253 let _ = compactor.cancel_tasks(&cancel_tasks);
254 tracing::info!(
255 ?cancel_tasks,
256 %context_id,
257 "CancelTask operation has been sent to compactor node",
258 );
259 }
260 }
261 _ => {
262 compactor_alive = false;
265 }
266 }
267
268 compactor_alive
269 }
270
271 async fn handle_pull_task_event(
272 &self,
273 context_id: HummockContextId,
274 pull_task_count: usize,
275 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
276 max_get_task_probe_times: usize,
277 ) -> bool {
278 assert_ne!(0, pull_task_count);
279 if let Some(compactor) = self
280 .hummock_manager
281 .compactor_manager
282 .get_compactor(context_id)
283 {
284 let mut compactor_alive = true;
285 let (groups, task_type) = self
286 .hummock_manager
287 .auto_pick_compaction_groups_and_type()
288 .await;
289 if let TaskType::Ttl = task_type {
290 match self
291 .hummock_manager
292 .metadata_manager
293 .get_all_table_options()
294 .await
295 .map_err(|err| Error::MetaStore(err.into()))
296 {
297 Ok(table_options) => {
298 self.hummock_manager
299 .update_table_id_to_table_option(table_options);
300 }
301 Err(err) => {
302 warn!(error = %err.as_report(), "Failed to get table options");
303 }
304 }
305 }
306
307 if !groups.is_empty() {
308 let selector: &mut Box<dyn CompactionSelector> =
309 compaction_selectors.get_mut(&task_type).unwrap();
310
311 let mut generated_task_count = 0;
312 let mut existed_groups = groups.clone();
313 let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
314 let mut failed_tasks = vec![];
315 let mut loop_times = 0;
316
317 while generated_task_count < pull_task_count
318 && failed_tasks.is_empty()
319 && loop_times < max_get_task_probe_times
320 {
321 loop_times += 1;
322 let compact_ret = self
323 .hummock_manager
324 .get_compact_tasks(
325 existed_groups.clone(),
326 pull_task_count - generated_task_count,
327 selector,
328 )
329 .await;
330
331 match compact_ret {
332 Ok((compact_tasks, unschedule_groups)) => {
333 no_task_groups.extend(unschedule_groups);
334 if compact_tasks.is_empty() {
335 break;
336 }
337 generated_task_count += compact_tasks.len();
338 for task in compact_tasks {
339 let task_id = task.task_id;
340 if let Err(e) =
341 compactor.send_event(ResponseEvent::CompactTask(task.into()))
342 {
343 tracing::warn!(
344 error = %e.as_report(),
345 "Failed to send task {} to {}",
346 task_id,
347 compactor.context_id(),
348 );
349 failed_tasks.push(task_id);
350 }
351 }
352 if !failed_tasks.is_empty() {
353 self.hummock_manager
354 .compactor_manager
355 .remove_compactor(context_id);
356 }
357 existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
358 }
359 Err(err) => {
360 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
361 break;
362 }
363 };
364 }
365 for group in no_task_groups {
366 self.hummock_manager
367 .compaction_state
368 .unschedule(group, task_type);
369 }
370 if let Err(err) = self
371 .hummock_manager
372 .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
373 .await
374 {
375 tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
376 }
377 }
378
379 if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
381 tracing::warn!(
382 error = %e.as_report(),
383 "Failed to send ask to {}",
384 context_id,
385 );
386 compactor_alive = false;
387 }
388
389 return compactor_alive;
390 }
391
392 false
393 }
394
395 async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
396 if let Err(e) = self
397 .hummock_manager
398 .report_compact_tasks(report_events)
399 .await
400 {
401 tracing::error!(error = %e.as_report(), "report compact_tack fail")
402 }
403 Ok(())
404 }
405}
406
407impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
408 CompactionEventLoop<D, E>
409{
410 pub fn new(
411 hummock_compactor_dispatcher: D,
412 metrics: Arc<MetaMetrics>,
413 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
414 ) -> Self {
415 Self {
416 hummock_compactor_dispatcher,
417 metrics,
418 compactor_streams_change_rx,
419 }
420 }
421
422 pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
423 let mut compactor_request_streams = FuturesUnordered::new();
424 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
425 let shutdown_rx_shared = shutdown_rx.shared();
426
427 let join_handle = tokio::spawn(async move {
428 let push_stream =
429 |context_id: HummockContextId,
430 stream: Streaming<E>,
431 compactor_request_streams: &mut FuturesUnordered<_>| {
432 let future = StreamExt::into_future(stream)
433 .map(move |stream_future| (context_id, stream_future));
434
435 compactor_request_streams.push(future);
436 };
437
438 let mut event_loop_iteration_now = Instant::now();
439
440 loop {
441 let shutdown_rx_shared = shutdown_rx_shared.clone();
442 self.metrics
443 .compaction_event_loop_iteration_latency
444 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
445 event_loop_iteration_now = Instant::now();
446
447 tokio::select! {
448 _ = shutdown_rx_shared => { return; },
449
450 compactor_stream = self.compactor_streams_change_rx.recv() => {
451 if let Some((context_id, stream)) = compactor_stream {
452 tracing::info!("compactor {} enters the cluster", context_id);
453 push_stream(context_id, stream, &mut compactor_request_streams);
454 }
455 },
456
457 result = pending_on_none(compactor_request_streams.next()) => {
458 let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
459 let (event, create_at, stream) = match compactor_stream_req {
460 (Some(Ok(req)), stream) => {
461 let create_at = req.create_at();
462 let event = req.take_event();
463 (event, create_at, stream)
464 }
465
466 (Some(Err(err)), _stream) => {
467 tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
468 continue
469 }
470
471 _ => {
472 tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
474 self.hummock_compactor_dispatcher.remove_compactor(context_id);
475 continue
476 },
477 };
478
479 {
480 let consumed_latency_ms = SystemTime::now()
481 .duration_since(std::time::UNIX_EPOCH)
482 .expect("Clock may have gone backwards")
483 .as_millis()
484 as u64
485 - create_at;
486 self.metrics
487 .compaction_event_consumed_latency
488 .observe(consumed_latency_ms as _);
489 }
490
491 let mut compactor_alive = true;
492 if self
493 .hummock_compactor_dispatcher
494 .should_forward(&event)
495 {
496 if let Err(e) = self
497 .hummock_compactor_dispatcher
498 .on_event_remotely(context_id, event)
499 .await
500 {
501 tracing::warn!(error = %e.as_report(), "Failed to forward event");
502 }
503 } else {
504 compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
505 context_id,
506 event,
507 ).await;
508 }
509
510 if compactor_alive {
511 push_stream(context_id, stream, &mut compactor_request_streams);
512 } else {
513 tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
514 self
515 .hummock_compactor_dispatcher
516 .remove_compactor(context_id);
517 }
518 },
519 }
520 }
521 });
522
523 (join_handle, shutdown_tx)
524 }
525}
526
527impl CompactorStreamEvent for SubscribeCompactionEventRequest {
528 type EventType = RequestEvent;
529
530 fn take_event(self) -> Self::EventType {
531 self.event.unwrap()
532 }
533
534 fn create_at(&self) -> u64 {
535 self.create_at
536 }
537}
538
539pub struct HummockCompactorDedicatedEventLoop {
540 hummock_manager: Arc<HummockManager>,
541 hummock_compaction_event_handler: HummockCompactionEventHandler,
542}
543
544impl HummockCompactorDedicatedEventLoop {
545 pub fn new(
546 hummock_manager: Arc<HummockManager>,
547 hummock_compaction_event_handler: HummockCompactionEventHandler,
548 ) -> Self {
549 Self {
550 hummock_manager,
551 hummock_compaction_event_handler,
552 }
553 }
554
555 async fn compact_task_dedicated_event_handler(
557 &self,
558 mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
559 shutdown_rx: OneShotReceiver<()>,
560 ) {
561 let mut compaction_selectors = init_selectors();
562
563 tokio::select! {
564 _ = shutdown_rx => {}
565
566 _ = async {
567 while let Some((context_id, event)) = rx.recv().await {
568 let mut report_events = vec![];
569 let mut skip_times = 0;
570 match event {
571 RequestEvent::PullTask(PullTask { pull_task_count }) => {
572 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
573 }
574
575 RequestEvent::ReportTask(task) => {
576 report_events.push(task.into());
577 }
578
579 _ => unreachable!(),
580 }
581 while let Ok((context_id, event)) = rx.try_recv() {
582 match event {
583 RequestEvent::PullTask(PullTask { pull_task_count }) => {
584 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
585 if !report_events.is_empty() {
586 if skip_times > MAX_SKIP_TIMES {
587 break;
588 }
589 skip_times += 1;
590 }
591 }
592
593 RequestEvent::ReportTask(task) => {
594 report_events.push(task.into());
595 if report_events.len() >= MAX_REPORT_COUNT {
596 break;
597 }
598 }
599 _ => unreachable!(),
600 }
601 }
602 if !report_events.is_empty()
603 && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
604 {
605 tracing::error!(error = %e.as_report(), "report compact_tack fail")
606 }
607 }
608 } => {}
609 }
610 }
611
612 pub fn run(
613 self,
614 ) -> (
615 JoinHandle<()>,
616 UnboundedSender<(HummockContextId, RequestEvent)>,
617 Sender<()>,
618 ) {
619 let (tx, rx) = unbounded_channel();
620 let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
621 let join_handler = tokio::spawn(async move {
622 self.compact_task_dedicated_event_handler(rx, shutdown_rx)
623 .await;
624 });
625 (join_handler, tx, shutdon_tx)
626 }
627}
628
629pub struct IcebergCompactionEventHandler {
630 compaction_manager: IcebergCompactionManagerRef,
631}
632
633impl IcebergCompactionEventHandler {
634 pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
635 Self { compaction_manager }
636 }
637
638 async fn handle_pull_task_event(
639 &self,
640 context_id: HummockContextId,
641 pull_task_count: usize,
642 ) -> bool {
643 assert_ne!(0, pull_task_count);
644 if let Some(compactor) = self
645 .compaction_manager
646 .iceberg_compactor_manager
647 .get_compactor(context_id)
648 {
649 let mut compactor_alive = true;
650
651 let iceberg_compaction_handles = self
652 .compaction_manager
653 .get_top_n_iceberg_commit_sink_ids(pull_task_count)
654 .await;
655
656 for handle in iceberg_compaction_handles {
657 let compactor = compactor.clone();
658 if let Err(e) = async move {
660 handle
661 .send_compact_task(
662 compactor,
663 next_compaction_task_id(&self.compaction_manager.env).await?,
664 )
665 .await
666 }
667 .await
668 {
669 tracing::warn!(
670 error = %e.as_report(),
671 "Failed to send iceberg commit task to {}",
672 context_id,
673 );
674 compactor_alive = false;
675 }
676 }
677
678 if let Err(e) =
679 compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
680 {
681 tracing::warn!(
682 error = %e.as_report(),
683 "Failed to send ask to {}",
684 context_id,
685 );
686 compactor_alive = false;
687 }
688
689 return compactor_alive;
690 }
691
692 false
693 }
694}
695
696pub struct IcebergCompactionEventDispatcher {
697 compaction_event_handler: IcebergCompactionEventHandler,
698}
699
700#[async_trait::async_trait]
701impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
702 type EventType = IcebergRequestEvent;
703
704 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
705 match event {
706 IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
707 return self
708 .compaction_event_handler
709 .handle_pull_task_event(context_id, pull_task_count as usize)
710 .await;
711 }
712 _ => unreachable!(),
713 }
714 }
715
716 async fn on_event_remotely(
717 &self,
718 _context_id: HummockContextId,
719 _event: Self::EventType,
720 ) -> Result<()> {
721 unreachable!()
722 }
723
724 fn should_forward(&self, _event: &Self::EventType) -> bool {
725 false
726 }
727
728 fn remove_compactor(&self, context_id: HummockContextId) {
729 self.compaction_event_handler
730 .compaction_manager
731 .iceberg_compactor_manager
732 .remove_compactor(context_id);
733 }
734}
735
736impl IcebergCompactionEventDispatcher {
737 pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
738 Self {
739 compaction_event_handler,
740 }
741 }
742}
743
744impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
745 type EventType = IcebergRequestEvent;
746
747 fn take_event(self) -> Self::EventType {
748 self.event.unwrap()
749 }
750
751 fn create_at(&self) -> u64 {
752 self.create_at
753 }
754}