risingwave_meta/hummock/manager/compaction/
compaction_event_loop.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 RisingWave Labs
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::time::{Instant, SystemTime};
32
33use anyhow::Context;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, StreamExt};
36use risingwave_hummock_sdk::CompactionGroupId;
37use risingwave_hummock_sdk::compact_task::ReportTask;
38use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
39use risingwave_pb::hummock::subscribe_compaction_event_request::{
40    Event as RequestEvent, HeartBeat, PullTask,
41};
42use risingwave_pb::hummock::subscribe_compaction_event_response::{
43    Event as ResponseEvent, PullTaskAck,
44};
45use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
46use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
47use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
48    Event as IcebergRequestEvent, PullTask as IcebergPullTask,
49};
50use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
51    Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
52};
53use rw_futures_util::pending_on_none;
54use thiserror_ext::AsReport;
55use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
56use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use super::init_selectors;
62use crate::hummock::HummockManager;
63use crate::hummock::compaction::CompactionSelector;
64use crate::hummock::error::{Error, Result};
65use crate::hummock::sequence::next_compaction_task_id;
66use crate::manager::MetaOpts;
67use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
68use crate::rpc::metrics::MetaMetrics;
69
70const MAX_SKIP_TIMES: usize = 8;
71const MAX_REPORT_COUNT: usize = 16;
72
73#[async_trait::async_trait]
74pub trait CompactionEventDispatcher: Send + Sync + 'static {
75    type EventType: Send + Sync + 'static;
76
77    async fn on_event_locally(&self, context_id: u32, event: Self::EventType) -> bool;
78
79    async fn on_event_remotely(&self, context_id: u32, event: Self::EventType) -> Result<()>;
80
81    fn should_forward(&self, event: &Self::EventType) -> bool;
82
83    fn remove_compactor(&self, context_id: u32);
84}
85
86pub trait CompactorStreamEvent: Send + Sync + 'static {
87    type EventType: Send + Sync + 'static;
88    fn take_event(self) -> Self::EventType;
89    fn create_at(&self) -> u64;
90}
91
92pub struct CompactionEventLoop<
93    D: CompactionEventDispatcher<EventType = E::EventType>,
94    E: CompactorStreamEvent,
95> {
96    hummock_compactor_dispatcher: D,
97    metrics: Arc<MetaMetrics>,
98    compactor_streams_change_rx: UnboundedReceiver<(u32, Streaming<E>)>,
99}
100
101pub type HummockCompactionEventLoop =
102    CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
103
104pub type IcebergCompactionEventLoop =
105    CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
106
107pub struct HummockCompactionEventDispatcher {
108    meta_opts: Arc<MetaOpts>,
109    hummock_compaction_event_handler: HummockCompactionEventHandler,
110    tx: Option<UnboundedSender<(u32, RequestEvent)>>,
111}
112
113#[async_trait::async_trait]
114impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
115    type EventType = RequestEvent;
116
117    fn should_forward(&self, event: &Self::EventType) -> bool {
118        if self.tx.is_none() {
119            return false;
120        }
121
122        matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
123    }
124
125    async fn on_event_locally(&self, context_id: u32, event: Self::EventType) -> bool {
126        let mut compactor_alive = true;
127        match event {
128            RequestEvent::HeartBeat(HeartBeat { progress }) => {
129                compactor_alive = self
130                    .hummock_compaction_event_handler
131                    .handle_heartbeat(context_id, progress)
132                    .await;
133            }
134
135            RequestEvent::Register(_event) => {
136                unreachable!()
137            }
138
139            RequestEvent::PullTask(pull_task) => {
140                compactor_alive = self
141                    .hummock_compaction_event_handler
142                    .handle_pull_task_event(
143                        context_id,
144                        pull_task.pull_task_count as usize,
145                        &mut init_selectors(),
146                        self.meta_opts.max_get_task_probe_times,
147                    )
148                    .await;
149            }
150
151            RequestEvent::ReportTask(report_event) => {
152                if let Err(e) = self
153                    .hummock_compaction_event_handler
154                    .handle_report_task_event(vec![report_event.into()])
155                    .await
156                {
157                    tracing::error!(error = %e.as_report(), "report compact_tack fail")
158                }
159            }
160        }
161
162        compactor_alive
163    }
164
165    async fn on_event_remotely(&self, context_id: u32, event: Self::EventType) -> Result<()> {
166        if let Some(tx) = &self.tx {
167            tx.send((context_id, event))
168                .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
169        } else {
170            unreachable!();
171        }
172        Ok(())
173    }
174
175    fn remove_compactor(&self, context_id: u32) {
176        self.hummock_compaction_event_handler
177            .hummock_manager
178            .compactor_manager
179            .remove_compactor(context_id);
180    }
181}
182
183impl HummockCompactionEventDispatcher {
184    pub fn new(
185        meta_opts: Arc<MetaOpts>,
186        hummock_compaction_event_handler: HummockCompactionEventHandler,
187        tx: Option<UnboundedSender<(u32, RequestEvent)>>,
188    ) -> Self {
189        Self {
190            meta_opts,
191            hummock_compaction_event_handler,
192            tx,
193        }
194    }
195}
196
197#[derive(Clone)]
198pub struct HummockCompactionEventHandler {
199    pub hummock_manager: Arc<HummockManager>,
200}
201
202impl HummockCompactionEventHandler {
203    pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
204        Self { hummock_manager }
205    }
206
207    async fn handle_heartbeat(&self, context_id: u32, progress: Vec<CompactTaskProgress>) -> bool {
208        let mut compactor_alive = true;
209        let compactor_manager = self.hummock_manager.compactor_manager.clone();
210        let cancel_tasks = compactor_manager
211            .update_task_heartbeats(&progress)
212            .into_iter()
213            .map(|task| task.task_id)
214            .collect::<Vec<_>>();
215        if !cancel_tasks.is_empty() {
216            tracing::info!(
217                ?cancel_tasks,
218                context_id,
219                "Tasks cancel has expired due to lack of visible progress",
220            );
221
222            if let Err(e) = self
223                .hummock_manager
224                .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
225                .await
226            {
227                tracing::error!(
228                    error = %e.as_report(),
229                    "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
230                    until we can successfully report its status."
231                );
232            }
233        }
234
235        match compactor_manager.get_compactor(context_id) {
236            Some(compactor) => {
237                // Forcefully cancel the task so that it terminates
238                // early on the compactor
239                // node.
240                if !cancel_tasks.is_empty() {
241                    let _ = compactor.cancel_tasks(&cancel_tasks);
242                    tracing::info!(
243                        ?cancel_tasks,
244                        context_id,
245                        "CancelTask operation has been sent to compactor node",
246                    );
247                }
248            }
249            _ => {
250                // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
251                // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
252                compactor_alive = false;
253            }
254        }
255
256        compactor_alive
257    }
258
259    async fn handle_pull_task_event(
260        &self,
261        context_id: u32,
262        pull_task_count: usize,
263        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
264        max_get_task_probe_times: usize,
265    ) -> bool {
266        assert_ne!(0, pull_task_count);
267        if let Some(compactor) = self
268            .hummock_manager
269            .compactor_manager
270            .get_compactor(context_id)
271        {
272            let mut compactor_alive = true;
273            let (groups, task_type) = self
274                .hummock_manager
275                .auto_pick_compaction_groups_and_type()
276                .await;
277            if let TaskType::Ttl = task_type {
278                match self
279                    .hummock_manager
280                    .metadata_manager
281                    .get_all_table_options()
282                    .await
283                    .map_err(|err| Error::MetaStore(err.into()))
284                {
285                    Ok(table_options) => {
286                        self.hummock_manager
287                            .update_table_id_to_table_option(table_options);
288                    }
289                    Err(err) => {
290                        warn!(error = %err.as_report(), "Failed to get table options");
291                    }
292                }
293            }
294
295            if !groups.is_empty() {
296                let selector: &mut Box<dyn CompactionSelector> =
297                    compaction_selectors.get_mut(&task_type).unwrap();
298
299                let mut generated_task_count = 0;
300                let mut existed_groups = groups.clone();
301                let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
302                let mut failed_tasks = vec![];
303                let mut loop_times = 0;
304
305                while generated_task_count < pull_task_count
306                    && failed_tasks.is_empty()
307                    && loop_times < max_get_task_probe_times
308                {
309                    loop_times += 1;
310                    let compact_ret = self
311                        .hummock_manager
312                        .get_compact_tasks(
313                            existed_groups.clone(),
314                            pull_task_count - generated_task_count,
315                            selector,
316                        )
317                        .await;
318
319                    match compact_ret {
320                        Ok((compact_tasks, unschedule_groups)) => {
321                            no_task_groups.extend(unschedule_groups);
322                            if compact_tasks.is_empty() {
323                                break;
324                            }
325                            generated_task_count += compact_tasks.len();
326                            for task in compact_tasks {
327                                let task_id = task.task_id;
328                                if let Err(e) =
329                                    compactor.send_event(ResponseEvent::CompactTask(task.into()))
330                                {
331                                    tracing::warn!(
332                                        error = %e.as_report(),
333                                        "Failed to send task {} to {}",
334                                        task_id,
335                                        compactor.context_id(),
336                                    );
337                                    failed_tasks.push(task_id);
338                                }
339                            }
340                            if !failed_tasks.is_empty() {
341                                self.hummock_manager
342                                    .compactor_manager
343                                    .remove_compactor(context_id);
344                            }
345                            existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
346                        }
347                        Err(err) => {
348                            tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
349                            break;
350                        }
351                    };
352                }
353                for group in no_task_groups {
354                    self.hummock_manager
355                        .compaction_state
356                        .unschedule(group, task_type);
357                }
358                if let Err(err) = self
359                    .hummock_manager
360                    .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
361                    .await
362                {
363                    tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
364                }
365            }
366
367            // ack to compactor
368            if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
369                tracing::warn!(
370                    error = %e.as_report(),
371                    "Failed to send ask to {}",
372                    context_id,
373                );
374                compactor_alive = false;
375            }
376
377            return compactor_alive;
378        }
379
380        false
381    }
382
383    async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
384        if let Err(e) = self
385            .hummock_manager
386            .report_compact_tasks(report_events)
387            .await
388        {
389            tracing::error!(error = %e.as_report(), "report compact_tack fail")
390        }
391        Ok(())
392    }
393}
394
395impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
396    CompactionEventLoop<D, E>
397{
398    pub fn new(
399        hummock_compactor_dispatcher: D,
400        metrics: Arc<MetaMetrics>,
401        compactor_streams_change_rx: UnboundedReceiver<(u32, Streaming<E>)>,
402    ) -> Self {
403        Self {
404            hummock_compactor_dispatcher,
405            metrics,
406            compactor_streams_change_rx,
407        }
408    }
409
410    pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
411        let mut compactor_request_streams = FuturesUnordered::new();
412        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
413        let shutdown_rx_shared = shutdown_rx.shared();
414
415        let join_handle = tokio::spawn(async move {
416            let push_stream =
417                |context_id: u32,
418                 stream: Streaming<E>,
419                 compactor_request_streams: &mut FuturesUnordered<_>| {
420                    let future = StreamExt::into_future(stream)
421                        .map(move |stream_future| (context_id, stream_future));
422
423                    compactor_request_streams.push(future);
424                };
425
426            let mut event_loop_iteration_now = Instant::now();
427
428            loop {
429                let shutdown_rx_shared = shutdown_rx_shared.clone();
430                self.metrics
431                    .compaction_event_loop_iteration_latency
432                    .observe(event_loop_iteration_now.elapsed().as_millis() as _);
433                event_loop_iteration_now = Instant::now();
434
435                tokio::select! {
436                    _ = shutdown_rx_shared => { return; },
437
438                    compactor_stream = self.compactor_streams_change_rx.recv() => {
439                        if let Some((context_id, stream)) = compactor_stream {
440                            tracing::info!("compactor {} enters the cluster", context_id);
441                            push_stream(context_id, stream, &mut compactor_request_streams);
442                        }
443                    },
444
445                    result = pending_on_none(compactor_request_streams.next()) => {
446                        let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
447                        let (event, create_at, stream) = match compactor_stream_req {
448                            (Some(Ok(req)), stream) => {
449                                let create_at = req.create_at();
450                                let event  = req.take_event();
451                                (event, create_at, stream)
452                            }
453
454                            (Some(Err(err)), _stream) => {
455                                tracing::warn!(error = %err.as_report(), context_id, "compactor stream poll with err, recv stream may be destroyed");
456                                continue
457                            }
458
459                            _ => {
460                                // remove compactor from compactor manager
461                                tracing::warn!(context_id, "compactor stream poll err, recv stream may be destroyed");
462                                self.hummock_compactor_dispatcher.remove_compactor(context_id);
463                                continue
464                            },
465                        };
466
467                        {
468                            let consumed_latency_ms = SystemTime::now()
469                                .duration_since(std::time::UNIX_EPOCH)
470                                .expect("Clock may have gone backwards")
471                                .as_millis()
472                                as u64
473                            - create_at;
474                            self.metrics
475                                .compaction_event_consumed_latency
476                                .observe(consumed_latency_ms as _);
477                        }
478
479                        let mut compactor_alive = true;
480                        if self
481                            .hummock_compactor_dispatcher
482                            .should_forward(&event)
483                        {
484                            if let Err(e) = self
485                                .hummock_compactor_dispatcher
486                                .on_event_remotely(context_id, event)
487                                .await
488                            {
489                                tracing::warn!(error = %e.as_report(), "Failed to forward event");
490                            }
491                        } else {
492                            compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
493                                context_id,
494                                event,
495                            ).await;
496                        }
497
498                        if compactor_alive {
499                            push_stream(context_id, stream, &mut compactor_request_streams);
500                        } else {
501                            tracing::warn!(context_id, "compactor stream error, send stream may be destroyed");
502                            self
503                            .hummock_compactor_dispatcher
504                            .remove_compactor(context_id);
505                        }
506                    },
507                }
508            }
509        });
510
511        (join_handle, shutdown_tx)
512    }
513}
514
515impl CompactorStreamEvent for SubscribeCompactionEventRequest {
516    type EventType = RequestEvent;
517
518    fn take_event(self) -> Self::EventType {
519        self.event.unwrap()
520    }
521
522    fn create_at(&self) -> u64 {
523        self.create_at
524    }
525}
526
527pub struct HummockCompactorDedicatedEventLoop {
528    hummock_manager: Arc<HummockManager>,
529    hummock_compaction_event_handler: HummockCompactionEventHandler,
530}
531
532impl HummockCompactorDedicatedEventLoop {
533    pub fn new(
534        hummock_manager: Arc<HummockManager>,
535        hummock_compaction_event_handler: HummockCompactionEventHandler,
536    ) -> Self {
537        Self {
538            hummock_manager,
539            hummock_compaction_event_handler,
540        }
541    }
542
543    /// dedicated event runtime for CPU/IO bound event
544    async fn compact_task_dedicated_event_handler(
545        &self,
546        mut rx: UnboundedReceiver<(u32, RequestEvent)>,
547        shutdown_rx: OneShotReceiver<()>,
548    ) {
549        let mut compaction_selectors = init_selectors();
550
551        tokio::select! {
552            _ = shutdown_rx => {}
553
554            _ = async {
555                while let Some((context_id, event)) = rx.recv().await {
556                    let mut report_events = vec![];
557                    let mut skip_times = 0;
558                    match event {
559                        RequestEvent::PullTask(PullTask { pull_task_count }) => {
560                            self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
561                        }
562
563                        RequestEvent::ReportTask(task) => {
564                           report_events.push(task.into());
565                        }
566
567                        _ => unreachable!(),
568                    }
569                    while let Ok((context_id, event)) = rx.try_recv() {
570                        match event {
571                            RequestEvent::PullTask(PullTask { pull_task_count }) => {
572                                self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
573                                if !report_events.is_empty() {
574                                    if skip_times > MAX_SKIP_TIMES {
575                                        break;
576                                    }
577                                    skip_times += 1;
578                                }
579                            }
580
581                            RequestEvent::ReportTask(task) => {
582                                report_events.push(task.into());
583                                if report_events.len() >= MAX_REPORT_COUNT {
584                                    break;
585                                }
586                            }
587                        _ => unreachable!(),
588                        }
589                    }
590                    if !report_events.is_empty() {
591                        if let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
592                        {
593                            tracing::error!(error = %e.as_report(), "report compact_tack fail")
594                        }
595                    }
596                }
597            } => {}
598        }
599    }
600
601    pub fn run(
602        self,
603    ) -> (
604        JoinHandle<()>,
605        UnboundedSender<(u32, RequestEvent)>,
606        Sender<()>,
607    ) {
608        let (tx, rx) = unbounded_channel();
609        let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
610        let join_handler = tokio::spawn(async move {
611            self.compact_task_dedicated_event_handler(rx, shutdown_rx)
612                .await;
613        });
614        (join_handler, tx, shutdon_tx)
615    }
616}
617
618pub struct IcebergCompactionEventHandler {
619    compaction_manager: IcebergCompactionManagerRef,
620}
621
622impl IcebergCompactionEventHandler {
623    pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
624        Self { compaction_manager }
625    }
626
627    async fn handle_pull_task_event(&self, context_id: u32, pull_task_count: usize) -> bool {
628        assert_ne!(0, pull_task_count);
629        if let Some(compactor) = self
630            .compaction_manager
631            .iceberg_compactor_manager
632            .get_compactor(context_id)
633        {
634            let mut compactor_alive = true;
635
636            let iceberg_compaction_handles = self
637                .compaction_manager
638                .get_top_n_iceberg_commit_sink_ids(pull_task_count);
639
640            for handle in iceberg_compaction_handles {
641                let compactor = compactor.clone();
642                // send iceberg commit task to compactor
643                if let Err(e) = async move {
644                    handle
645                        .send_compact_task(
646                            compactor,
647                            next_compaction_task_id(&self.compaction_manager.env).await?,
648                        )
649                        .await
650                }
651                .await
652                {
653                    tracing::warn!(
654                        error = %e.as_report(),
655                        "Failed to send iceberg commit task to {}",
656                        context_id,
657                    );
658                    compactor_alive = false;
659                }
660            }
661
662            if let Err(e) =
663                compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
664            {
665                tracing::warn!(
666                    error = %e.as_report(),
667                    "Failed to send ask to {}",
668                    context_id,
669                );
670                compactor_alive = false;
671            }
672
673            return compactor_alive;
674        }
675
676        false
677    }
678}
679
680pub struct IcebergCompactionEventDispatcher {
681    compaction_event_handler: IcebergCompactionEventHandler,
682}
683
684#[async_trait::async_trait]
685impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
686    type EventType = IcebergRequestEvent;
687
688    async fn on_event_locally(&self, context_id: u32, event: Self::EventType) -> bool {
689        match event {
690            IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
691                return self
692                    .compaction_event_handler
693                    .handle_pull_task_event(context_id, pull_task_count as usize)
694                    .await;
695            }
696            _ => unreachable!(),
697        }
698    }
699
700    async fn on_event_remotely(&self, _context_id: u32, _event: Self::EventType) -> Result<()> {
701        unreachable!()
702    }
703
704    fn should_forward(&self, _event: &Self::EventType) -> bool {
705        false
706    }
707
708    fn remove_compactor(&self, context_id: u32) {
709        self.compaction_event_handler
710            .compaction_manager
711            .iceberg_compactor_manager
712            .remove_compactor(context_id);
713    }
714}
715
716impl IcebergCompactionEventDispatcher {
717    pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
718        Self {
719            compaction_event_handler,
720        }
721    }
722}
723
724impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
725    type EventType = IcebergRequestEvent;
726
727    fn take_event(self) -> Self::EventType {
728        self.event.unwrap()
729    }
730
731    fn create_at(&self) -> u64 {
732        self.create_at
733    }
734}