1use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::time::{Instant, SystemTime};
32
33use anyhow::Context;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, StreamExt};
36use risingwave_hummock_sdk::CompactionGroupId;
37use risingwave_hummock_sdk::compact_task::ReportTask;
38use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
39use risingwave_pb::hummock::subscribe_compaction_event_request::{
40 Event as RequestEvent, HeartBeat, PullTask,
41};
42use risingwave_pb::hummock::subscribe_compaction_event_response::{
43 Event as ResponseEvent, PullTaskAck,
44};
45use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
46use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
47use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
48 Event as IcebergRequestEvent, PullTask as IcebergPullTask,
49};
50use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
51 Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
52};
53use rw_futures_util::pending_on_none;
54use thiserror_ext::AsReport;
55use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
56use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use super::init_selectors;
62use crate::hummock::HummockManager;
63use crate::hummock::compaction::CompactionSelector;
64use crate::hummock::error::{Error, Result};
65use crate::hummock::sequence::next_compaction_task_id;
66use crate::manager::MetaOpts;
67use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
68use crate::rpc::metrics::MetaMetrics;
69
70const MAX_SKIP_TIMES: usize = 8;
71const MAX_REPORT_COUNT: usize = 16;
72
73#[async_trait::async_trait]
74pub trait CompactionEventDispatcher: Send + Sync + 'static {
75 type EventType: Send + Sync + 'static;
76
77 async fn on_event_locally(&self, context_id: u32, event: Self::EventType) -> bool;
78
79 async fn on_event_remotely(&self, context_id: u32, event: Self::EventType) -> Result<()>;
80
81 fn should_forward(&self, event: &Self::EventType) -> bool;
82
83 fn remove_compactor(&self, context_id: u32);
84}
85
86pub trait CompactorStreamEvent: Send + Sync + 'static {
87 type EventType: Send + Sync + 'static;
88 fn take_event(self) -> Self::EventType;
89 fn create_at(&self) -> u64;
90}
91
92pub struct CompactionEventLoop<
93 D: CompactionEventDispatcher<EventType = E::EventType>,
94 E: CompactorStreamEvent,
95> {
96 hummock_compactor_dispatcher: D,
97 metrics: Arc<MetaMetrics>,
98 compactor_streams_change_rx: UnboundedReceiver<(u32, Streaming<E>)>,
99}
100
101pub type HummockCompactionEventLoop =
102 CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
103
104pub type IcebergCompactionEventLoop =
105 CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
106
107pub struct HummockCompactionEventDispatcher {
108 meta_opts: Arc<MetaOpts>,
109 hummock_compaction_event_handler: HummockCompactionEventHandler,
110 tx: Option<UnboundedSender<(u32, RequestEvent)>>,
111}
112
113#[async_trait::async_trait]
114impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
115 type EventType = RequestEvent;
116
117 fn should_forward(&self, event: &Self::EventType) -> bool {
118 if self.tx.is_none() {
119 return false;
120 }
121
122 matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
123 }
124
125 async fn on_event_locally(&self, context_id: u32, event: Self::EventType) -> bool {
126 let mut compactor_alive = true;
127 match event {
128 RequestEvent::HeartBeat(HeartBeat { progress }) => {
129 compactor_alive = self
130 .hummock_compaction_event_handler
131 .handle_heartbeat(context_id, progress)
132 .await;
133 }
134
135 RequestEvent::Register(_event) => {
136 unreachable!()
137 }
138
139 RequestEvent::PullTask(pull_task) => {
140 compactor_alive = self
141 .hummock_compaction_event_handler
142 .handle_pull_task_event(
143 context_id,
144 pull_task.pull_task_count as usize,
145 &mut init_selectors(),
146 self.meta_opts.max_get_task_probe_times,
147 )
148 .await;
149 }
150
151 RequestEvent::ReportTask(report_event) => {
152 if let Err(e) = self
153 .hummock_compaction_event_handler
154 .handle_report_task_event(vec![report_event.into()])
155 .await
156 {
157 tracing::error!(error = %e.as_report(), "report compact_tack fail")
158 }
159 }
160 }
161
162 compactor_alive
163 }
164
165 async fn on_event_remotely(&self, context_id: u32, event: Self::EventType) -> Result<()> {
166 if let Some(tx) = &self.tx {
167 tx.send((context_id, event))
168 .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
169 } else {
170 unreachable!();
171 }
172 Ok(())
173 }
174
175 fn remove_compactor(&self, context_id: u32) {
176 self.hummock_compaction_event_handler
177 .hummock_manager
178 .compactor_manager
179 .remove_compactor(context_id);
180 }
181}
182
183impl HummockCompactionEventDispatcher {
184 pub fn new(
185 meta_opts: Arc<MetaOpts>,
186 hummock_compaction_event_handler: HummockCompactionEventHandler,
187 tx: Option<UnboundedSender<(u32, RequestEvent)>>,
188 ) -> Self {
189 Self {
190 meta_opts,
191 hummock_compaction_event_handler,
192 tx,
193 }
194 }
195}
196
197#[derive(Clone)]
198pub struct HummockCompactionEventHandler {
199 pub hummock_manager: Arc<HummockManager>,
200}
201
202impl HummockCompactionEventHandler {
203 pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
204 Self { hummock_manager }
205 }
206
207 async fn handle_heartbeat(&self, context_id: u32, progress: Vec<CompactTaskProgress>) -> bool {
208 let mut compactor_alive = true;
209 let compactor_manager = self.hummock_manager.compactor_manager.clone();
210 let cancel_tasks = compactor_manager
211 .update_task_heartbeats(&progress)
212 .into_iter()
213 .map(|task| task.task_id)
214 .collect::<Vec<_>>();
215 if !cancel_tasks.is_empty() {
216 tracing::info!(
217 ?cancel_tasks,
218 context_id,
219 "Tasks cancel has expired due to lack of visible progress",
220 );
221
222 if let Err(e) = self
223 .hummock_manager
224 .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
225 .await
226 {
227 tracing::error!(
228 error = %e.as_report(),
229 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
230 until we can successfully report its status."
231 );
232 }
233 }
234
235 match compactor_manager.get_compactor(context_id) {
236 Some(compactor) => {
237 if !cancel_tasks.is_empty() {
241 let _ = compactor.cancel_tasks(&cancel_tasks);
242 tracing::info!(
243 ?cancel_tasks,
244 context_id,
245 "CancelTask operation has been sent to compactor node",
246 );
247 }
248 }
249 _ => {
250 compactor_alive = false;
253 }
254 }
255
256 compactor_alive
257 }
258
259 async fn handle_pull_task_event(
260 &self,
261 context_id: u32,
262 pull_task_count: usize,
263 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
264 max_get_task_probe_times: usize,
265 ) -> bool {
266 assert_ne!(0, pull_task_count);
267 if let Some(compactor) = self
268 .hummock_manager
269 .compactor_manager
270 .get_compactor(context_id)
271 {
272 let mut compactor_alive = true;
273 let (groups, task_type) = self
274 .hummock_manager
275 .auto_pick_compaction_groups_and_type()
276 .await;
277 if let TaskType::Ttl = task_type {
278 match self
279 .hummock_manager
280 .metadata_manager
281 .get_all_table_options()
282 .await
283 .map_err(|err| Error::MetaStore(err.into()))
284 {
285 Ok(table_options) => {
286 self.hummock_manager
287 .update_table_id_to_table_option(table_options);
288 }
289 Err(err) => {
290 warn!(error = %err.as_report(), "Failed to get table options");
291 }
292 }
293 }
294
295 if !groups.is_empty() {
296 let selector: &mut Box<dyn CompactionSelector> =
297 compaction_selectors.get_mut(&task_type).unwrap();
298
299 let mut generated_task_count = 0;
300 let mut existed_groups = groups.clone();
301 let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
302 let mut failed_tasks = vec![];
303 let mut loop_times = 0;
304
305 while generated_task_count < pull_task_count
306 && failed_tasks.is_empty()
307 && loop_times < max_get_task_probe_times
308 {
309 loop_times += 1;
310 let compact_ret = self
311 .hummock_manager
312 .get_compact_tasks(
313 existed_groups.clone(),
314 pull_task_count - generated_task_count,
315 selector,
316 )
317 .await;
318
319 match compact_ret {
320 Ok((compact_tasks, unschedule_groups)) => {
321 no_task_groups.extend(unschedule_groups);
322 if compact_tasks.is_empty() {
323 break;
324 }
325 generated_task_count += compact_tasks.len();
326 for task in compact_tasks {
327 let task_id = task.task_id;
328 if let Err(e) =
329 compactor.send_event(ResponseEvent::CompactTask(task.into()))
330 {
331 tracing::warn!(
332 error = %e.as_report(),
333 "Failed to send task {} to {}",
334 task_id,
335 compactor.context_id(),
336 );
337 failed_tasks.push(task_id);
338 }
339 }
340 if !failed_tasks.is_empty() {
341 self.hummock_manager
342 .compactor_manager
343 .remove_compactor(context_id);
344 }
345 existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
346 }
347 Err(err) => {
348 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
349 break;
350 }
351 };
352 }
353 for group in no_task_groups {
354 self.hummock_manager
355 .compaction_state
356 .unschedule(group, task_type);
357 }
358 if let Err(err) = self
359 .hummock_manager
360 .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
361 .await
362 {
363 tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
364 }
365 }
366
367 if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
369 tracing::warn!(
370 error = %e.as_report(),
371 "Failed to send ask to {}",
372 context_id,
373 );
374 compactor_alive = false;
375 }
376
377 return compactor_alive;
378 }
379
380 false
381 }
382
383 async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
384 if let Err(e) = self
385 .hummock_manager
386 .report_compact_tasks(report_events)
387 .await
388 {
389 tracing::error!(error = %e.as_report(), "report compact_tack fail")
390 }
391 Ok(())
392 }
393}
394
395impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
396 CompactionEventLoop<D, E>
397{
398 pub fn new(
399 hummock_compactor_dispatcher: D,
400 metrics: Arc<MetaMetrics>,
401 compactor_streams_change_rx: UnboundedReceiver<(u32, Streaming<E>)>,
402 ) -> Self {
403 Self {
404 hummock_compactor_dispatcher,
405 metrics,
406 compactor_streams_change_rx,
407 }
408 }
409
410 pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
411 let mut compactor_request_streams = FuturesUnordered::new();
412 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
413 let shutdown_rx_shared = shutdown_rx.shared();
414
415 let join_handle = tokio::spawn(async move {
416 let push_stream =
417 |context_id: u32,
418 stream: Streaming<E>,
419 compactor_request_streams: &mut FuturesUnordered<_>| {
420 let future = StreamExt::into_future(stream)
421 .map(move |stream_future| (context_id, stream_future));
422
423 compactor_request_streams.push(future);
424 };
425
426 let mut event_loop_iteration_now = Instant::now();
427
428 loop {
429 let shutdown_rx_shared = shutdown_rx_shared.clone();
430 self.metrics
431 .compaction_event_loop_iteration_latency
432 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
433 event_loop_iteration_now = Instant::now();
434
435 tokio::select! {
436 _ = shutdown_rx_shared => { return; },
437
438 compactor_stream = self.compactor_streams_change_rx.recv() => {
439 if let Some((context_id, stream)) = compactor_stream {
440 tracing::info!("compactor {} enters the cluster", context_id);
441 push_stream(context_id, stream, &mut compactor_request_streams);
442 }
443 },
444
445 result = pending_on_none(compactor_request_streams.next()) => {
446 let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
447 let (event, create_at, stream) = match compactor_stream_req {
448 (Some(Ok(req)), stream) => {
449 let create_at = req.create_at();
450 let event = req.take_event();
451 (event, create_at, stream)
452 }
453
454 (Some(Err(err)), _stream) => {
455 tracing::warn!(error = %err.as_report(), context_id, "compactor stream poll with err, recv stream may be destroyed");
456 continue
457 }
458
459 _ => {
460 tracing::warn!(context_id, "compactor stream poll err, recv stream may be destroyed");
462 self.hummock_compactor_dispatcher.remove_compactor(context_id);
463 continue
464 },
465 };
466
467 {
468 let consumed_latency_ms = SystemTime::now()
469 .duration_since(std::time::UNIX_EPOCH)
470 .expect("Clock may have gone backwards")
471 .as_millis()
472 as u64
473 - create_at;
474 self.metrics
475 .compaction_event_consumed_latency
476 .observe(consumed_latency_ms as _);
477 }
478
479 let mut compactor_alive = true;
480 if self
481 .hummock_compactor_dispatcher
482 .should_forward(&event)
483 {
484 if let Err(e) = self
485 .hummock_compactor_dispatcher
486 .on_event_remotely(context_id, event)
487 .await
488 {
489 tracing::warn!(error = %e.as_report(), "Failed to forward event");
490 }
491 } else {
492 compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
493 context_id,
494 event,
495 ).await;
496 }
497
498 if compactor_alive {
499 push_stream(context_id, stream, &mut compactor_request_streams);
500 } else {
501 tracing::warn!(context_id, "compactor stream error, send stream may be destroyed");
502 self
503 .hummock_compactor_dispatcher
504 .remove_compactor(context_id);
505 }
506 },
507 }
508 }
509 });
510
511 (join_handle, shutdown_tx)
512 }
513}
514
515impl CompactorStreamEvent for SubscribeCompactionEventRequest {
516 type EventType = RequestEvent;
517
518 fn take_event(self) -> Self::EventType {
519 self.event.unwrap()
520 }
521
522 fn create_at(&self) -> u64 {
523 self.create_at
524 }
525}
526
527pub struct HummockCompactorDedicatedEventLoop {
528 hummock_manager: Arc<HummockManager>,
529 hummock_compaction_event_handler: HummockCompactionEventHandler,
530}
531
532impl HummockCompactorDedicatedEventLoop {
533 pub fn new(
534 hummock_manager: Arc<HummockManager>,
535 hummock_compaction_event_handler: HummockCompactionEventHandler,
536 ) -> Self {
537 Self {
538 hummock_manager,
539 hummock_compaction_event_handler,
540 }
541 }
542
543 async fn compact_task_dedicated_event_handler(
545 &self,
546 mut rx: UnboundedReceiver<(u32, RequestEvent)>,
547 shutdown_rx: OneShotReceiver<()>,
548 ) {
549 let mut compaction_selectors = init_selectors();
550
551 tokio::select! {
552 _ = shutdown_rx => {}
553
554 _ = async {
555 while let Some((context_id, event)) = rx.recv().await {
556 let mut report_events = vec![];
557 let mut skip_times = 0;
558 match event {
559 RequestEvent::PullTask(PullTask { pull_task_count }) => {
560 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
561 }
562
563 RequestEvent::ReportTask(task) => {
564 report_events.push(task.into());
565 }
566
567 _ => unreachable!(),
568 }
569 while let Ok((context_id, event)) = rx.try_recv() {
570 match event {
571 RequestEvent::PullTask(PullTask { pull_task_count }) => {
572 self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
573 if !report_events.is_empty() {
574 if skip_times > MAX_SKIP_TIMES {
575 break;
576 }
577 skip_times += 1;
578 }
579 }
580
581 RequestEvent::ReportTask(task) => {
582 report_events.push(task.into());
583 if report_events.len() >= MAX_REPORT_COUNT {
584 break;
585 }
586 }
587 _ => unreachable!(),
588 }
589 }
590 if !report_events.is_empty() {
591 if let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
592 {
593 tracing::error!(error = %e.as_report(), "report compact_tack fail")
594 }
595 }
596 }
597 } => {}
598 }
599 }
600
601 pub fn run(
602 self,
603 ) -> (
604 JoinHandle<()>,
605 UnboundedSender<(u32, RequestEvent)>,
606 Sender<()>,
607 ) {
608 let (tx, rx) = unbounded_channel();
609 let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
610 let join_handler = tokio::spawn(async move {
611 self.compact_task_dedicated_event_handler(rx, shutdown_rx)
612 .await;
613 });
614 (join_handler, tx, shutdon_tx)
615 }
616}
617
618pub struct IcebergCompactionEventHandler {
619 compaction_manager: IcebergCompactionManagerRef,
620}
621
622impl IcebergCompactionEventHandler {
623 pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
624 Self { compaction_manager }
625 }
626
627 async fn handle_pull_task_event(&self, context_id: u32, pull_task_count: usize) -> bool {
628 assert_ne!(0, pull_task_count);
629 if let Some(compactor) = self
630 .compaction_manager
631 .iceberg_compactor_manager
632 .get_compactor(context_id)
633 {
634 let mut compactor_alive = true;
635
636 let iceberg_compaction_handles = self
637 .compaction_manager
638 .get_top_n_iceberg_commit_sink_ids(pull_task_count);
639
640 for handle in iceberg_compaction_handles {
641 let compactor = compactor.clone();
642 if let Err(e) = async move {
644 handle
645 .send_compact_task(
646 compactor,
647 next_compaction_task_id(&self.compaction_manager.env).await?,
648 )
649 .await
650 }
651 .await
652 {
653 tracing::warn!(
654 error = %e.as_report(),
655 "Failed to send iceberg commit task to {}",
656 context_id,
657 );
658 compactor_alive = false;
659 }
660 }
661
662 if let Err(e) =
663 compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
664 {
665 tracing::warn!(
666 error = %e.as_report(),
667 "Failed to send ask to {}",
668 context_id,
669 );
670 compactor_alive = false;
671 }
672
673 return compactor_alive;
674 }
675
676 false
677 }
678}
679
680pub struct IcebergCompactionEventDispatcher {
681 compaction_event_handler: IcebergCompactionEventHandler,
682}
683
684#[async_trait::async_trait]
685impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
686 type EventType = IcebergRequestEvent;
687
688 async fn on_event_locally(&self, context_id: u32, event: Self::EventType) -> bool {
689 match event {
690 IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
691 return self
692 .compaction_event_handler
693 .handle_pull_task_event(context_id, pull_task_count as usize)
694 .await;
695 }
696 _ => unreachable!(),
697 }
698 }
699
700 async fn on_event_remotely(&self, _context_id: u32, _event: Self::EventType) -> Result<()> {
701 unreachable!()
702 }
703
704 fn should_forward(&self, _event: &Self::EventType) -> bool {
705 false
706 }
707
708 fn remove_compactor(&self, context_id: u32) {
709 self.compaction_event_handler
710 .compaction_manager
711 .iceberg_compactor_manager
712 .remove_compactor(context_id);
713 }
714}
715
716impl IcebergCompactionEventDispatcher {
717 pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
718 Self {
719 compaction_event_handler,
720 }
721 }
722}
723
724impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
725 type EventType = IcebergRequestEvent;
726
727 fn take_event(self) -> Self::EventType {
728 self.event.unwrap()
729 }
730
731 fn create_at(&self) -> u64 {
732 self.create_at
733 }
734}