risingwave_meta/hummock/manager/compaction/
compaction_event_loop.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18
19use anyhow::Context;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use risingwave_hummock_sdk::compact_task::ReportTask;
23use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
24use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
25use risingwave_pb::hummock::subscribe_compaction_event_request::{
26    Event as RequestEvent, HeartBeat, PullTask,
27};
28use risingwave_pb::hummock::subscribe_compaction_event_response::{
29    Event as ResponseEvent, PullTaskAck,
30};
31use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
32use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
34    Event as IcebergRequestEvent, PullTask as IcebergPullTask,
35};
36use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
37    Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
38};
39use rw_futures_util::pending_on_none;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
43use tokio::task::JoinHandle;
44use tonic::Streaming;
45use tracing::warn;
46
47use super::init_selectors;
48use crate::hummock::HummockManager;
49use crate::hummock::compaction::CompactionSelector;
50use crate::hummock::error::{Error, Result};
51use crate::hummock::sequence::next_compaction_task_id;
52use crate::manager::MetaOpts;
53use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
54use crate::rpc::metrics::MetaMetrics;
55
56const MAX_SKIP_TIMES: usize = 8;
57const MAX_REPORT_COUNT: usize = 16;
58
59#[async_trait::async_trait]
60pub trait CompactionEventDispatcher: Send + Sync + 'static {
61    type EventType: Send + Sync + 'static;
62
63    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
64
65    async fn on_event_remotely(
66        &self,
67        context_id: HummockContextId,
68        event: Self::EventType,
69    ) -> Result<()>;
70
71    fn should_forward(&self, event: &Self::EventType) -> bool;
72
73    fn remove_compactor(&self, context_id: HummockContextId);
74}
75
76pub trait CompactorStreamEvent: Send + Sync + 'static {
77    type EventType: Send + Sync + 'static;
78    fn take_event(self) -> Self::EventType;
79    fn create_at(&self) -> u64;
80}
81
82pub struct CompactionEventLoop<
83    D: CompactionEventDispatcher<EventType = E::EventType>,
84    E: CompactorStreamEvent,
85> {
86    hummock_compactor_dispatcher: D,
87    metrics: Arc<MetaMetrics>,
88    compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
89}
90
91pub type HummockCompactionEventLoop =
92    CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
93
94pub type IcebergCompactionEventLoop =
95    CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
96
97pub struct HummockCompactionEventDispatcher {
98    meta_opts: Arc<MetaOpts>,
99    hummock_compaction_event_handler: HummockCompactionEventHandler,
100    tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
101}
102
103#[async_trait::async_trait]
104impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
105    type EventType = RequestEvent;
106
107    fn should_forward(&self, event: &Self::EventType) -> bool {
108        if self.tx.is_none() {
109            return false;
110        }
111
112        matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
113    }
114
115    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
116        let mut compactor_alive = true;
117        match event {
118            RequestEvent::HeartBeat(HeartBeat { progress }) => {
119                compactor_alive = self
120                    .hummock_compaction_event_handler
121                    .handle_heartbeat(context_id, progress)
122                    .await;
123            }
124
125            RequestEvent::Register(_event) => {
126                unreachable!()
127            }
128
129            RequestEvent::PullTask(pull_task) => {
130                compactor_alive = self
131                    .hummock_compaction_event_handler
132                    .handle_pull_task_event(
133                        context_id,
134                        pull_task.pull_task_count as usize,
135                        &mut init_selectors(),
136                        self.meta_opts.max_get_task_probe_times,
137                    )
138                    .await;
139            }
140
141            RequestEvent::ReportTask(report_event) => {
142                if let Err(e) = self
143                    .hummock_compaction_event_handler
144                    .handle_report_task_event(vec![report_event.into()])
145                    .await
146                {
147                    tracing::error!(error = %e.as_report(), "report compact_tack fail")
148                }
149            }
150        }
151
152        compactor_alive
153    }
154
155    async fn on_event_remotely(
156        &self,
157        context_id: HummockContextId,
158        event: Self::EventType,
159    ) -> Result<()> {
160        if let Some(tx) = &self.tx {
161            tx.send((context_id, event))
162                .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
163        } else {
164            unreachable!();
165        }
166        Ok(())
167    }
168
169    fn remove_compactor(&self, context_id: HummockContextId) {
170        self.hummock_compaction_event_handler
171            .hummock_manager
172            .compactor_manager
173            .remove_compactor(context_id);
174    }
175}
176
177impl HummockCompactionEventDispatcher {
178    pub fn new(
179        meta_opts: Arc<MetaOpts>,
180        hummock_compaction_event_handler: HummockCompactionEventHandler,
181        tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
182    ) -> Self {
183        Self {
184            meta_opts,
185            hummock_compaction_event_handler,
186            tx,
187        }
188    }
189}
190
191#[derive(Clone)]
192pub struct HummockCompactionEventHandler {
193    pub hummock_manager: Arc<HummockManager>,
194}
195
196impl HummockCompactionEventHandler {
197    pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
198        Self { hummock_manager }
199    }
200
201    async fn handle_heartbeat(
202        &self,
203        context_id: HummockContextId,
204        progress: Vec<CompactTaskProgress>,
205    ) -> bool {
206        let mut compactor_alive = true;
207        let compactor_manager = self.hummock_manager.compactor_manager.clone();
208        let cancel_tasks = compactor_manager
209            .update_task_heartbeats(&progress)
210            .into_iter()
211            .map(|task| task.task_id)
212            .collect::<Vec<_>>();
213        if !cancel_tasks.is_empty() {
214            tracing::info!(
215                ?cancel_tasks,
216                %context_id,
217                "Tasks cancel has expired due to lack of visible progress",
218            );
219
220            if let Err(e) = self
221                .hummock_manager
222                .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
223                .await
224            {
225                tracing::error!(
226                    error = %e.as_report(),
227                    "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
228                    until we can successfully report its status."
229                );
230            }
231        }
232
233        match compactor_manager.get_compactor(context_id) {
234            Some(compactor) => {
235                // Forcefully cancel the task so that it terminates
236                // early on the compactor
237                // node.
238                if !cancel_tasks.is_empty() {
239                    let _ = compactor.cancel_tasks(&cancel_tasks);
240                    tracing::info!(
241                        ?cancel_tasks,
242                        %context_id,
243                        "CancelTask operation has been sent to compactor node",
244                    );
245                }
246            }
247            _ => {
248                // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
249                // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
250                compactor_alive = false;
251            }
252        }
253
254        compactor_alive
255    }
256
257    async fn handle_pull_task_event(
258        &self,
259        context_id: HummockContextId,
260        pull_task_count: usize,
261        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
262        max_get_task_probe_times: usize,
263    ) -> bool {
264        assert_ne!(0, pull_task_count);
265        if let Some(compactor) = self
266            .hummock_manager
267            .compactor_manager
268            .get_compactor(context_id)
269        {
270            let mut compactor_alive = true;
271            let (groups, task_type) = self
272                .hummock_manager
273                .auto_pick_compaction_groups_and_type()
274                .await;
275            if let TaskType::Ttl = task_type {
276                match self
277                    .hummock_manager
278                    .metadata_manager
279                    .get_all_table_options()
280                    .await
281                    .map_err(|err| Error::MetaStore(err.into()))
282                {
283                    Ok(table_options) => {
284                        self.hummock_manager
285                            .update_table_id_to_table_option(table_options);
286                    }
287                    Err(err) => {
288                        warn!(error = %err.as_report(), "Failed to get table options");
289                    }
290                }
291            }
292
293            if !groups.is_empty() {
294                let selector: &mut Box<dyn CompactionSelector> =
295                    compaction_selectors.get_mut(&task_type).unwrap();
296
297                let mut generated_task_count = 0;
298                let mut existed_groups = groups.clone();
299                let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
300                let mut failed_tasks = vec![];
301                let mut loop_times = 0;
302
303                while generated_task_count < pull_task_count
304                    && failed_tasks.is_empty()
305                    && loop_times < max_get_task_probe_times
306                {
307                    loop_times += 1;
308                    let compact_ret = self
309                        .hummock_manager
310                        .get_compact_tasks(
311                            existed_groups.clone(),
312                            pull_task_count - generated_task_count,
313                            selector,
314                        )
315                        .await;
316
317                    match compact_ret {
318                        Ok((compact_tasks, unschedule_groups)) => {
319                            no_task_groups.extend(unschedule_groups);
320                            if compact_tasks.is_empty() {
321                                break;
322                            }
323                            generated_task_count += compact_tasks.len();
324                            for task in compact_tasks {
325                                let task_id = task.task_id;
326                                if let Err(e) =
327                                    compactor.send_event(ResponseEvent::CompactTask(task.into()))
328                                {
329                                    tracing::warn!(
330                                        error = %e.as_report(),
331                                        "Failed to send task {} to {}",
332                                        task_id,
333                                        compactor.context_id(),
334                                    );
335                                    failed_tasks.push(task_id);
336                                }
337                            }
338                            if !failed_tasks.is_empty() {
339                                self.hummock_manager
340                                    .compactor_manager
341                                    .remove_compactor(context_id);
342                            }
343                            existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
344                        }
345                        Err(err) => {
346                            tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
347                            break;
348                        }
349                    };
350                }
351                for group in no_task_groups {
352                    self.hummock_manager
353                        .compaction_state
354                        .unschedule(group, task_type);
355                }
356                if let Err(err) = self
357                    .hummock_manager
358                    .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
359                    .await
360                {
361                    tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
362                }
363            }
364
365            // ack to compactor
366            if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
367                tracing::warn!(
368                    error = %e.as_report(),
369                    "Failed to send ask to {}",
370                    context_id,
371                );
372                compactor_alive = false;
373            }
374
375            return compactor_alive;
376        }
377
378        false
379    }
380
381    async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
382        if let Err(e) = self
383            .hummock_manager
384            .report_compact_tasks(report_events)
385            .await
386        {
387            tracing::error!(error = %e.as_report(), "report compact_tack fail")
388        }
389        Ok(())
390    }
391}
392
393impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
394    CompactionEventLoop<D, E>
395{
396    pub fn new(
397        hummock_compactor_dispatcher: D,
398        metrics: Arc<MetaMetrics>,
399        compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
400    ) -> Self {
401        Self {
402            hummock_compactor_dispatcher,
403            metrics,
404            compactor_streams_change_rx,
405        }
406    }
407
408    pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
409        let mut compactor_request_streams = FuturesUnordered::new();
410        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
411        let shutdown_rx_shared = shutdown_rx.shared();
412
413        let join_handle = tokio::spawn(async move {
414            let push_stream =
415                |context_id: HummockContextId,
416                 stream: Streaming<E>,
417                 compactor_request_streams: &mut FuturesUnordered<_>| {
418                    let future = StreamExt::into_future(stream)
419                        .map(move |stream_future| (context_id, stream_future));
420
421                    compactor_request_streams.push(future);
422                };
423
424            let mut event_loop_iteration_now = Instant::now();
425
426            loop {
427                let shutdown_rx_shared = shutdown_rx_shared.clone();
428                self.metrics
429                    .compaction_event_loop_iteration_latency
430                    .observe(event_loop_iteration_now.elapsed().as_millis() as _);
431                event_loop_iteration_now = Instant::now();
432
433                tokio::select! {
434                    _ = shutdown_rx_shared => { return; },
435
436                    compactor_stream = self.compactor_streams_change_rx.recv() => {
437                        if let Some((context_id, stream)) = compactor_stream {
438                            tracing::info!("compactor {} enters the cluster", context_id);
439                            push_stream(context_id, stream, &mut compactor_request_streams);
440                        }
441                    },
442
443                    result = pending_on_none(compactor_request_streams.next()) => {
444                        let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
445                        let (event, create_at, stream) = match compactor_stream_req {
446                            (Some(Ok(req)), stream) => {
447                                let create_at = req.create_at();
448                                let event  = req.take_event();
449                                (event, create_at, stream)
450                            }
451
452                            (Some(Err(err)), _stream) => {
453                                tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
454                                continue
455                            }
456
457                            _ => {
458                                // remove compactor from compactor manager
459                                tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
460                                self.hummock_compactor_dispatcher.remove_compactor(context_id);
461                                continue
462                            },
463                        };
464
465                        {
466                            let consumed_latency_ms = SystemTime::now()
467                                .duration_since(std::time::UNIX_EPOCH)
468                                .expect("Clock may have gone backwards")
469                                .as_millis()
470                                as u64
471                            - create_at;
472                            self.metrics
473                                .compaction_event_consumed_latency
474                                .observe(consumed_latency_ms as _);
475                        }
476
477                        let mut compactor_alive = true;
478                        if self
479                            .hummock_compactor_dispatcher
480                            .should_forward(&event)
481                        {
482                            if let Err(e) = self
483                                .hummock_compactor_dispatcher
484                                .on_event_remotely(context_id, event)
485                                .await
486                            {
487                                tracing::warn!(error = %e.as_report(), "Failed to forward event");
488                            }
489                        } else {
490                            compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
491                                context_id,
492                                event,
493                            ).await;
494                        }
495
496                        if compactor_alive {
497                            push_stream(context_id, stream, &mut compactor_request_streams);
498                        } else {
499                            tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
500                            self
501                            .hummock_compactor_dispatcher
502                            .remove_compactor(context_id);
503                        }
504                    },
505                }
506            }
507        });
508
509        (join_handle, shutdown_tx)
510    }
511}
512
513impl CompactorStreamEvent for SubscribeCompactionEventRequest {
514    type EventType = RequestEvent;
515
516    fn take_event(self) -> Self::EventType {
517        self.event.unwrap()
518    }
519
520    fn create_at(&self) -> u64 {
521        self.create_at
522    }
523}
524
525pub struct HummockCompactorDedicatedEventLoop {
526    hummock_manager: Arc<HummockManager>,
527    hummock_compaction_event_handler: HummockCompactionEventHandler,
528}
529
530impl HummockCompactorDedicatedEventLoop {
531    pub fn new(
532        hummock_manager: Arc<HummockManager>,
533        hummock_compaction_event_handler: HummockCompactionEventHandler,
534    ) -> Self {
535        Self {
536            hummock_manager,
537            hummock_compaction_event_handler,
538        }
539    }
540
541    /// dedicated event runtime for CPU/IO bound event
542    async fn compact_task_dedicated_event_handler(
543        &self,
544        mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
545        shutdown_rx: OneShotReceiver<()>,
546    ) {
547        let mut compaction_selectors = init_selectors();
548
549        tokio::select! {
550            _ = shutdown_rx => {}
551
552            _ = async {
553                while let Some((context_id, event)) = rx.recv().await {
554                    let mut report_events = vec![];
555                    let mut skip_times = 0;
556                    match event {
557                        RequestEvent::PullTask(PullTask { pull_task_count }) => {
558                            self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
559                        }
560
561                        RequestEvent::ReportTask(task) => {
562                           report_events.push(task.into());
563                        }
564
565                        _ => unreachable!(),
566                    }
567                    while let Ok((context_id, event)) = rx.try_recv() {
568                        match event {
569                            RequestEvent::PullTask(PullTask { pull_task_count }) => {
570                                self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
571                                if !report_events.is_empty() {
572                                    if skip_times > MAX_SKIP_TIMES {
573                                        break;
574                                    }
575                                    skip_times += 1;
576                                }
577                            }
578
579                            RequestEvent::ReportTask(task) => {
580                                report_events.push(task.into());
581                                if report_events.len() >= MAX_REPORT_COUNT {
582                                    break;
583                                }
584                            }
585                        _ => unreachable!(),
586                        }
587                    }
588                    if !report_events.is_empty()
589                        && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
590                        {
591                            tracing::error!(error = %e.as_report(), "report compact_tack fail")
592                        }
593                }
594            } => {}
595        }
596    }
597
598    pub fn run(
599        self,
600    ) -> (
601        JoinHandle<()>,
602        UnboundedSender<(HummockContextId, RequestEvent)>,
603        Sender<()>,
604    ) {
605        let (tx, rx) = unbounded_channel();
606        let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
607        let join_handler = tokio::spawn(async move {
608            self.compact_task_dedicated_event_handler(rx, shutdown_rx)
609                .await;
610        });
611        (join_handler, tx, shutdon_tx)
612    }
613}
614
615pub struct IcebergCompactionEventHandler {
616    compaction_manager: IcebergCompactionManagerRef,
617}
618
619impl IcebergCompactionEventHandler {
620    pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
621        Self { compaction_manager }
622    }
623
624    async fn handle_pull_task_event(
625        &self,
626        context_id: HummockContextId,
627        pull_task_count: usize,
628    ) -> bool {
629        assert_ne!(0, pull_task_count);
630        if let Some(compactor) = self
631            .compaction_manager
632            .iceberg_compactor_manager
633            .get_compactor(context_id)
634        {
635            let mut compactor_alive = true;
636
637            let iceberg_compaction_handles = self
638                .compaction_manager
639                .get_top_n_iceberg_commit_sink_ids(pull_task_count)
640                .await;
641
642            for handle in iceberg_compaction_handles {
643                let compactor = compactor.clone();
644                // send iceberg commit task to compactor
645                if let Err(e) = async move {
646                    handle
647                        .send_compact_task(
648                            compactor,
649                            next_compaction_task_id(&self.compaction_manager.env).await?,
650                        )
651                        .await
652                }
653                .await
654                {
655                    tracing::warn!(
656                        error = %e.as_report(),
657                        "Failed to send iceberg commit task to {}",
658                        context_id,
659                    );
660                    compactor_alive = false;
661                }
662            }
663
664            if let Err(e) =
665                compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
666            {
667                tracing::warn!(
668                    error = %e.as_report(),
669                    "Failed to send ask to {}",
670                    context_id,
671                );
672                compactor_alive = false;
673            }
674
675            return compactor_alive;
676        }
677
678        false
679    }
680}
681
682pub struct IcebergCompactionEventDispatcher {
683    compaction_event_handler: IcebergCompactionEventHandler,
684}
685
686#[async_trait::async_trait]
687impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
688    type EventType = IcebergRequestEvent;
689
690    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
691        match event {
692            IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
693                return self
694                    .compaction_event_handler
695                    .handle_pull_task_event(context_id, pull_task_count as usize)
696                    .await;
697            }
698            _ => unreachable!(),
699        }
700    }
701
702    async fn on_event_remotely(
703        &self,
704        _context_id: HummockContextId,
705        _event: Self::EventType,
706    ) -> Result<()> {
707        unreachable!()
708    }
709
710    fn should_forward(&self, _event: &Self::EventType) -> bool {
711        false
712    }
713
714    fn remove_compactor(&self, context_id: HummockContextId) {
715        self.compaction_event_handler
716            .compaction_manager
717            .iceberg_compactor_manager
718            .remove_compactor(context_id);
719    }
720}
721
722impl IcebergCompactionEventDispatcher {
723    pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
724        Self {
725            compaction_event_handler,
726        }
727    }
728}
729
730impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
731    type EventType = IcebergRequestEvent;
732
733    fn take_event(self) -> Self::EventType {
734        self.event.unwrap()
735    }
736
737    fn create_at(&self) -> u64 {
738        self.create_at
739    }
740}