1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18
19use anyhow::Context;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use risingwave_hummock_sdk::compact_task::ReportTask;
23use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
24use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
25use risingwave_pb::hummock::subscribe_compaction_event_request::{
26 Event as RequestEvent, HeartBeat, PullTask,
27};
28use risingwave_pb::hummock::subscribe_compaction_event_response::{
29 Event as ResponseEvent, PullTaskAck,
30};
31use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
32use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
34 Event as IcebergRequestEvent, PullTask as IcebergPullTask,
35};
36use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
37 Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
38};
39use rw_futures_util::pending_on_none;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
43use tokio::task::JoinHandle;
44use tonic::Streaming;
45use tracing::warn;
46
47use super::init_selectors;
48use crate::hummock::HummockManager;
49use crate::hummock::compaction::CompactionSelector;
50use crate::hummock::error::{Error, Result};
51use crate::hummock::sequence::next_compaction_task_id;
52use crate::manager::MetaOpts;
53use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
54use crate::rpc::metrics::MetaMetrics;
55
56const MAX_SKIP_TIMES: usize = 8;
57const MAX_REPORT_COUNT: usize = 16;
58
59#[async_trait::async_trait]
60pub trait CompactionEventDispatcher: Send + Sync + 'static {
61 type EventType: Send + Sync + 'static;
62
63 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
64
65 async fn on_event_remotely(
66 &self,
67 context_id: HummockContextId,
68 event: Self::EventType,
69 ) -> Result<()>;
70
71 fn should_forward(&self, event: &Self::EventType) -> bool;
72
73 fn remove_compactor(&self, context_id: HummockContextId);
74}
75
76pub trait CompactorStreamEvent: Send + Sync + 'static {
77 type EventType: Send + Sync + 'static;
78 fn take_event(self) -> Self::EventType;
79 fn create_at(&self) -> u64;
80}
81
82pub struct CompactionEventLoop<
83 D: CompactionEventDispatcher<EventType = E::EventType>,
84 E: CompactorStreamEvent,
85> {
86 hummock_compactor_dispatcher: D,
87 metrics: Arc<MetaMetrics>,
88 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
89}
90
91pub type HummockCompactionEventLoop =
92 CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
93
94pub type IcebergCompactionEventLoop =
95 CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
96
97pub struct HummockCompactionEventDispatcher {
98 meta_opts: Arc<MetaOpts>,
99 hummock_compaction_event_handler: HummockCompactionEventHandler,
100 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
101}
102
103#[async_trait::async_trait]
104impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
105 type EventType = RequestEvent;
106
107 fn should_forward(&self, event: &Self::EventType) -> bool {
108 if self.tx.is_none() {
109 return false;
110 }
111
112 matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
113 }
114
115 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
116 let mut compactor_alive = true;
117 match event {
118 RequestEvent::HeartBeat(HeartBeat { progress }) => {
119 compactor_alive = self
120 .hummock_compaction_event_handler
121 .handle_heartbeat(context_id, progress)
122 .await;
123 }
124
125 RequestEvent::Register(_event) => {
126 unreachable!()
127 }
128
129 RequestEvent::PullTask(pull_task) => {
130 compactor_alive = self
131 .hummock_compaction_event_handler
132 .handle_pull_task_event(
133 context_id,
134 pull_task.pull_task_count as usize,
135 &mut init_selectors(),
136 self.meta_opts.max_get_task_probe_times,
137 )
138 .await;
139 }
140
141 RequestEvent::ReportTask(report_event) => {
142 if let Err(e) = self
143 .hummock_compaction_event_handler
144 .handle_report_task_event(vec![report_event.into()])
145 .await
146 {
147 tracing::error!(error = %e.as_report(), "report compact_tack fail")
148 }
149 }
150 }
151
152 compactor_alive
153 }
154
155 async fn on_event_remotely(
156 &self,
157 context_id: HummockContextId,
158 event: Self::EventType,
159 ) -> Result<()> {
160 if let Some(tx) = &self.tx {
161 tx.send((context_id, event))
162 .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
163 } else {
164 unreachable!();
165 }
166 Ok(())
167 }
168
169 fn remove_compactor(&self, context_id: HummockContextId) {
170 self.hummock_compaction_event_handler
171 .hummock_manager
172 .compactor_manager
173 .remove_compactor(context_id);
174 }
175}
176
177impl HummockCompactionEventDispatcher {
178 pub fn new(
179 meta_opts: Arc<MetaOpts>,
180 hummock_compaction_event_handler: HummockCompactionEventHandler,
181 tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
182 ) -> Self {
183 Self {
184 meta_opts,
185 hummock_compaction_event_handler,
186 tx,
187 }
188 }
189}
190
191#[derive(Clone)]
192pub struct HummockCompactionEventHandler {
193 pub hummock_manager: Arc<HummockManager>,
194}
195
196impl HummockCompactionEventHandler {
197 pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
198 Self { hummock_manager }
199 }
200
201 async fn handle_heartbeat(
202 &self,
203 context_id: HummockContextId,
204 progress: Vec<CompactTaskProgress>,
205 ) -> bool {
206 let mut compactor_alive = true;
207 let compactor_manager = self.hummock_manager.compactor_manager.clone();
208 let cancel_tasks = compactor_manager
209 .update_task_heartbeats(&progress)
210 .into_iter()
211 .map(|task| task.task_id)
212 .collect::<Vec<_>>();
213 if !cancel_tasks.is_empty() {
214 tracing::info!(
215 ?cancel_tasks,
216 %context_id,
217 "Tasks cancel has expired due to lack of visible progress",
218 );
219
220 if let Err(e) = self
221 .hummock_manager
222 .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
223 .await
224 {
225 tracing::error!(
226 error = %e.as_report(),
227 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
228 until we can successfully report its status."
229 );
230 }
231 }
232
233 match compactor_manager.get_compactor(context_id) {
234 Some(compactor) => {
235 if !cancel_tasks.is_empty() {
239 let _ = compactor.cancel_tasks(&cancel_tasks);
240 tracing::info!(
241 ?cancel_tasks,
242 %context_id,
243 "CancelTask operation has been sent to compactor node",
244 );
245 }
246 }
247 _ => {
248 compactor_alive = false;
251 }
252 }
253
254 compactor_alive
255 }
256
257 async fn handle_pull_task_event(
258 &self,
259 context_id: HummockContextId,
260 pull_task_count: usize,
261 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
262 max_get_task_probe_times: usize,
263 ) -> bool {
264 assert_ne!(0, pull_task_count);
265 if let Some(compactor) = self
266 .hummock_manager
267 .compactor_manager
268 .get_compactor(context_id)
269 {
270 let mut compactor_alive = true;
271 let (groups, task_type) = self
272 .hummock_manager
273 .auto_pick_compaction_groups_and_type()
274 .await;
275 if let TaskType::Ttl = task_type {
276 match self
277 .hummock_manager
278 .metadata_manager
279 .get_all_table_options()
280 .await
281 .map_err(|err| Error::MetaStore(err.into()))
282 {
283 Ok(table_options) => {
284 self.hummock_manager
285 .update_table_id_to_table_option(table_options);
286 }
287 Err(err) => {
288 warn!(error = %err.as_report(), "Failed to get table options");
289 }
290 }
291 }
292
293 if !groups.is_empty() {
294 let selector: &mut Box<dyn CompactionSelector> =
295 compaction_selectors.get_mut(&task_type).unwrap();
296
297 let mut generated_task_count = 0;
298 let mut existed_groups = groups.clone();
299 let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
300 let mut failed_tasks = vec![];
301 let mut loop_times = 0;
302
303 while generated_task_count < pull_task_count
304 && failed_tasks.is_empty()
305 && loop_times < max_get_task_probe_times
306 {
307 loop_times += 1;
308 let compact_ret = self
309 .hummock_manager
310 .get_compact_tasks(
311 existed_groups.clone(),
312 pull_task_count - generated_task_count,
313 selector,
314 )
315 .await;
316
317 match compact_ret {
318 Ok((compact_tasks, unschedule_groups)) => {
319 no_task_groups.extend(unschedule_groups);
320 if compact_tasks.is_empty() {
321 break;
322 }
323 generated_task_count += compact_tasks.len();
324 for task in compact_tasks {
325 let task_id = task.task_id;
326 if let Err(e) =
327 compactor.send_event(ResponseEvent::CompactTask(task.into()))
328 {
329 tracing::warn!(
330 error = %e.as_report(),
331 "Failed to send task {} to {}",
332 task_id,
333 compactor.context_id(),
334 );
335 failed_tasks.push(task_id);
336 }
337 }
338 if !failed_tasks.is_empty() {
339 self.hummock_manager
340 .compactor_manager
341 .remove_compactor(context_id);
342 }
343 existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
344 }
345 Err(err) => {
346 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
347 break;
348 }
349 };
350 }
351 for group in no_task_groups {
352 self.hummock_manager
353 .compaction_state
354 .unschedule(group, task_type);
355 }
356 if let Err(err) = self
357 .hummock_manager
358 .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
359 .await
360 {
361 tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
362 }
363 }
364
365 if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
367 tracing::warn!(
368 error = %e.as_report(),
369 "Failed to send ask to {}",
370 context_id,
371 );
372 compactor_alive = false;
373 }
374
375 return compactor_alive;
376 }
377
378 false
379 }
380
381 async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
382 if let Err(e) = self
383 .hummock_manager
384 .report_compact_tasks(report_events)
385 .await
386 {
387 tracing::error!(error = %e.as_report(), "report compact_tack fail")
388 }
389 Ok(())
390 }
391}
392
393impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
394 CompactionEventLoop<D, E>
395{
396 pub fn new(
397 hummock_compactor_dispatcher: D,
398 metrics: Arc<MetaMetrics>,
399 compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
400 ) -> Self {
401 Self {
402 hummock_compactor_dispatcher,
403 metrics,
404 compactor_streams_change_rx,
405 }
406 }
407
408 pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
409 let mut compactor_request_streams = FuturesUnordered::new();
410 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
411 let shutdown_rx_shared = shutdown_rx.shared();
412
413 let join_handle = tokio::spawn(async move {
414 let push_stream =
415 |context_id: HummockContextId,
416 stream: Streaming<E>,
417 compactor_request_streams: &mut FuturesUnordered<_>| {
418 let future = StreamExt::into_future(stream)
419 .map(move |stream_future| (context_id, stream_future));
420
421 compactor_request_streams.push(future);
422 };
423
424 let mut event_loop_iteration_now = Instant::now();
425
426 loop {
427 let shutdown_rx_shared = shutdown_rx_shared.clone();
428 self.metrics
429 .compaction_event_loop_iteration_latency
430 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
431 event_loop_iteration_now = Instant::now();
432
433 tokio::select! {
434 _ = shutdown_rx_shared => { return; },
435
436 compactor_stream = self.compactor_streams_change_rx.recv() => {
437 if let Some((context_id, stream)) = compactor_stream {
438 tracing::info!("compactor {} enters the cluster", context_id);
439 push_stream(context_id, stream, &mut compactor_request_streams);
440 }
441 },
442
443 result = pending_on_none(compactor_request_streams.next()) => {
444 let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
445 let (event, create_at, stream) = match compactor_stream_req {
446 (Some(Ok(req)), stream) => {
447 let create_at = req.create_at();
448 let event = req.take_event();
449 (event, create_at, stream)
450 }
451
452 (Some(Err(err)), _stream) => {
453 tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
454 continue
455 }
456
457 _ => {
458 tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
460 self.hummock_compactor_dispatcher.remove_compactor(context_id);
461 continue
462 },
463 };
464
465 {
466 let consumed_latency_ms = SystemTime::now()
467 .duration_since(std::time::UNIX_EPOCH)
468 .expect("Clock may have gone backwards")
469 .as_millis()
470 as u64
471 - create_at;
472 self.metrics
473 .compaction_event_consumed_latency
474 .observe(consumed_latency_ms as _);
475 }
476
477 let mut compactor_alive = true;
478 if self
479 .hummock_compactor_dispatcher
480 .should_forward(&event)
481 {
482 if let Err(e) = self
483 .hummock_compactor_dispatcher
484 .on_event_remotely(context_id, event)
485 .await
486 {
487 tracing::warn!(error = %e.as_report(), "Failed to forward event");
488 }
489 } else {
490 compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
491 context_id,
492 event,
493 ).await;
494 }
495
496 if compactor_alive {
497 push_stream(context_id, stream, &mut compactor_request_streams);
498 } else {
499 tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
500 self
501 .hummock_compactor_dispatcher
502 .remove_compactor(context_id);
503 }
504 },
505 }
506 }
507 });
508
509 (join_handle, shutdown_tx)
510 }
511}
512
513impl CompactorStreamEvent for SubscribeCompactionEventRequest {
514 type EventType = RequestEvent;
515
516 fn take_event(self) -> Self::EventType {
517 self.event.unwrap()
518 }
519
520 fn create_at(&self) -> u64 {
521 self.create_at
522 }
523}
524
525pub struct HummockCompactorDedicatedEventLoop {
526 hummock_manager: Arc<HummockManager>,
527 hummock_compaction_event_handler: HummockCompactionEventHandler,
528}
529
530impl HummockCompactorDedicatedEventLoop {
531 pub fn new(
532 hummock_manager: Arc<HummockManager>,
533 hummock_compaction_event_handler: HummockCompactionEventHandler,
534 ) -> Self {
535 Self {
536 hummock_manager,
537 hummock_compaction_event_handler,
538 }
539 }
540
541 async fn compact_task_dedicated_event_handler(
543 &self,
544 mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
545 shutdown_rx: OneShotReceiver<()>,
546 ) {
547 let mut compaction_selectors = init_selectors();
548
549 tokio::select! {
550 _ = shutdown_rx => {}
551
552 _ = async {
553 while let Some((context_id, event)) = rx.recv().await {
554 let mut report_events = vec![];
555 let mut skip_times = 0;
556 match event {
557 RequestEvent::PullTask(PullTask { pull_task_count }) => {
558 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
559 }
560
561 RequestEvent::ReportTask(task) => {
562 report_events.push(task.into());
563 }
564
565 _ => unreachable!(),
566 }
567 while let Ok((context_id, event)) = rx.try_recv() {
568 match event {
569 RequestEvent::PullTask(PullTask { pull_task_count }) => {
570 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
571 if !report_events.is_empty() {
572 if skip_times > MAX_SKIP_TIMES {
573 break;
574 }
575 skip_times += 1;
576 }
577 }
578
579 RequestEvent::ReportTask(task) => {
580 report_events.push(task.into());
581 if report_events.len() >= MAX_REPORT_COUNT {
582 break;
583 }
584 }
585 _ => unreachable!(),
586 }
587 }
588 if !report_events.is_empty()
589 && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
590 {
591 tracing::error!(error = %e.as_report(), "report compact_tack fail")
592 }
593 }
594 } => {}
595 }
596 }
597
598 pub fn run(
599 self,
600 ) -> (
601 JoinHandle<()>,
602 UnboundedSender<(HummockContextId, RequestEvent)>,
603 Sender<()>,
604 ) {
605 let (tx, rx) = unbounded_channel();
606 let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
607 let join_handler = tokio::spawn(async move {
608 self.compact_task_dedicated_event_handler(rx, shutdown_rx)
609 .await;
610 });
611 (join_handler, tx, shutdon_tx)
612 }
613}
614
615pub struct IcebergCompactionEventHandler {
616 compaction_manager: IcebergCompactionManagerRef,
617}
618
619impl IcebergCompactionEventHandler {
620 pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
621 Self { compaction_manager }
622 }
623
624 async fn handle_pull_task_event(
625 &self,
626 context_id: HummockContextId,
627 pull_task_count: usize,
628 ) -> bool {
629 assert_ne!(0, pull_task_count);
630 if let Some(compactor) = self
631 .compaction_manager
632 .iceberg_compactor_manager
633 .get_compactor(context_id)
634 {
635 let mut compactor_alive = true;
636
637 let iceberg_compaction_handles = self
638 .compaction_manager
639 .get_top_n_iceberg_commit_sink_ids(pull_task_count)
640 .await;
641
642 for handle in iceberg_compaction_handles {
643 let compactor = compactor.clone();
644 if let Err(e) = async move {
646 handle
647 .send_compact_task(
648 compactor,
649 next_compaction_task_id(&self.compaction_manager.env).await?,
650 )
651 .await
652 }
653 .await
654 {
655 tracing::warn!(
656 error = %e.as_report(),
657 "Failed to send iceberg commit task to {}",
658 context_id,
659 );
660 compactor_alive = false;
661 }
662 }
663
664 if let Err(e) =
665 compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
666 {
667 tracing::warn!(
668 error = %e.as_report(),
669 "Failed to send ask to {}",
670 context_id,
671 );
672 compactor_alive = false;
673 }
674
675 return compactor_alive;
676 }
677
678 false
679 }
680}
681
682pub struct IcebergCompactionEventDispatcher {
683 compaction_event_handler: IcebergCompactionEventHandler,
684}
685
686#[async_trait::async_trait]
687impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
688 type EventType = IcebergRequestEvent;
689
690 async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
691 match event {
692 IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
693 return self
694 .compaction_event_handler
695 .handle_pull_task_event(context_id, pull_task_count as usize)
696 .await;
697 }
698 _ => unreachable!(),
699 }
700 }
701
702 async fn on_event_remotely(
703 &self,
704 _context_id: HummockContextId,
705 _event: Self::EventType,
706 ) -> Result<()> {
707 unreachable!()
708 }
709
710 fn should_forward(&self, _event: &Self::EventType) -> bool {
711 false
712 }
713
714 fn remove_compactor(&self, context_id: HummockContextId) {
715 self.compaction_event_handler
716 .compaction_manager
717 .iceberg_compactor_manager
718 .remove_compactor(context_id);
719 }
720}
721
722impl IcebergCompactionEventDispatcher {
723 pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
724 Self {
725 compaction_event_handler,
726 }
727 }
728}
729
730impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
731 type EventType = IcebergRequestEvent;
732
733 fn take_event(self) -> Self::EventType {
734 self.event.unwrap()
735 }
736
737 fn create_at(&self) -> u64 {
738 self.create_at
739 }
740}