risingwave_meta/hummock/manager/compaction/
compaction_event_loop.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18
19use anyhow::Context;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use risingwave_hummock_sdk::compact_task::ReportTask;
23use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
24use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
25use risingwave_pb::hummock::subscribe_compaction_event_request::{
26    Event as RequestEvent, HeartBeat, PullTask,
27};
28use risingwave_pb::hummock::subscribe_compaction_event_response::{
29    Event as ResponseEvent, PullTaskAck,
30};
31use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
32use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
34    Event as IcebergRequestEvent, PullTask as IcebergPullTask, ReportTask as IcebergReportTask,
35};
36use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
37    Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
38};
39use rw_futures_util::pending_on_none;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
43use tokio::task::JoinHandle;
44use tonic::Streaming;
45use tracing::warn;
46
47use super::init_selectors;
48use crate::hummock::HummockManager;
49use crate::hummock::compaction::CompactionSelector;
50use crate::hummock::compactor_manager::Compactor;
51use crate::hummock::error::{Error, Result};
52use crate::hummock::sequence::next_compaction_task_id;
53use crate::manager::MetaOpts;
54use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
55use crate::rpc::metrics::MetaMetrics;
56
57const MAX_SKIP_TIMES: usize = 8;
58const MAX_REPORT_COUNT: usize = 16;
59
60#[async_trait::async_trait]
61pub trait CompactionEventDispatcher: Send + Sync + 'static {
62    type EventType: Send + Sync + 'static;
63
64    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
65
66    async fn on_event_remotely(
67        &self,
68        context_id: HummockContextId,
69        event: Self::EventType,
70    ) -> Result<()>;
71
72    fn should_forward(&self, event: &Self::EventType) -> bool;
73
74    fn remove_compactor(&self, context_id: HummockContextId);
75}
76
77pub trait CompactorStreamEvent: Send + Sync + 'static {
78    type EventType: Send + Sync + 'static;
79    fn take_event(self) -> Self::EventType;
80    fn create_at(&self) -> u64;
81}
82
83pub struct CompactionEventLoop<
84    D: CompactionEventDispatcher<EventType = E::EventType>,
85    E: CompactorStreamEvent,
86> {
87    hummock_compactor_dispatcher: D,
88    metrics: Arc<MetaMetrics>,
89    compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
90}
91
92pub type HummockCompactionEventLoop =
93    CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
94
95pub type IcebergCompactionEventLoop =
96    CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
97
98pub struct HummockCompactionEventDispatcher {
99    meta_opts: Arc<MetaOpts>,
100    hummock_compaction_event_handler: HummockCompactionEventHandler,
101    tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
102}
103
104#[async_trait::async_trait]
105impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
106    type EventType = RequestEvent;
107
108    fn should_forward(&self, event: &Self::EventType) -> bool {
109        if self.tx.is_none() {
110            return false;
111        }
112
113        matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
114    }
115
116    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
117        let mut compactor_alive = true;
118        match event {
119            RequestEvent::HeartBeat(HeartBeat { progress }) => {
120                compactor_alive = self
121                    .hummock_compaction_event_handler
122                    .handle_heartbeat(context_id, progress)
123                    .await;
124            }
125
126            RequestEvent::Register(_event) => {
127                unreachable!()
128            }
129
130            RequestEvent::PullTask(pull_task) => {
131                compactor_alive = self
132                    .hummock_compaction_event_handler
133                    .handle_pull_task_event(
134                        context_id,
135                        pull_task.pull_task_count as usize,
136                        &mut init_selectors(),
137                        self.meta_opts.max_get_task_probe_times,
138                    )
139                    .await;
140            }
141
142            RequestEvent::ReportTask(report_event) => {
143                if let Err(e) = self
144                    .hummock_compaction_event_handler
145                    .handle_report_task_event(vec![report_event.into()])
146                    .await
147                {
148                    tracing::error!(error = %e.as_report(), "report compact_tack fail")
149                }
150            }
151        }
152
153        compactor_alive
154    }
155
156    async fn on_event_remotely(
157        &self,
158        context_id: HummockContextId,
159        event: Self::EventType,
160    ) -> Result<()> {
161        if let Some(tx) = &self.tx {
162            tx.send((context_id, event))
163                .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
164        } else {
165            unreachable!();
166        }
167        Ok(())
168    }
169
170    fn remove_compactor(&self, context_id: HummockContextId) {
171        self.hummock_compaction_event_handler
172            .hummock_manager
173            .compactor_manager
174            .remove_compactor(context_id);
175    }
176}
177
178impl HummockCompactionEventDispatcher {
179    pub fn new(
180        meta_opts: Arc<MetaOpts>,
181        hummock_compaction_event_handler: HummockCompactionEventHandler,
182        tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
183    ) -> Self {
184        Self {
185            meta_opts,
186            hummock_compaction_event_handler,
187            tx,
188        }
189    }
190}
191
192#[derive(Clone)]
193pub struct HummockCompactionEventHandler {
194    pub hummock_manager: Arc<HummockManager>,
195}
196
197impl HummockCompactionEventHandler {
198    pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
199        Self { hummock_manager }
200    }
201
202    async fn handle_heartbeat(
203        &self,
204        context_id: HummockContextId,
205        progress: Vec<CompactTaskProgress>,
206    ) -> bool {
207        let mut compactor_alive = true;
208        let compactor_manager = self.hummock_manager.compactor_manager.clone();
209        let cancel_tasks = compactor_manager
210            .update_task_heartbeats(&progress)
211            .into_iter()
212            .map(|task| task.task_id)
213            .collect::<Vec<_>>();
214        if !cancel_tasks.is_empty() {
215            tracing::info!(
216                ?cancel_tasks,
217                %context_id,
218                "Tasks cancel has expired due to lack of visible progress",
219            );
220
221            if let Err(e) = self
222                .hummock_manager
223                .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
224                .await
225            {
226                tracing::error!(
227                    error = %e.as_report(),
228                    "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
229                    until we can successfully report its status."
230                );
231            }
232        }
233
234        match compactor_manager.get_compactor(context_id) {
235            Some(compactor) => {
236                // Forcefully cancel the task so that it terminates
237                // early on the compactor
238                // node.
239                if !cancel_tasks.is_empty() {
240                    let _ = compactor.cancel_tasks(&cancel_tasks);
241                    tracing::info!(
242                        ?cancel_tasks,
243                        %context_id,
244                        "CancelTask operation has been sent to compactor node",
245                    );
246                }
247            }
248            _ => {
249                // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
250                // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
251                compactor_alive = false;
252            }
253        }
254
255        compactor_alive
256    }
257
258    async fn handle_pull_task_event(
259        &self,
260        context_id: HummockContextId,
261        pull_task_count: usize,
262        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
263        max_get_task_probe_times: usize,
264    ) -> bool {
265        assert_ne!(0, pull_task_count);
266        let Some(compactor) = self
267            .hummock_manager
268            .compactor_manager
269            .get_compactor(context_id)
270        else {
271            return false;
272        };
273
274        // Task selection + dispatch (any early return inside is safe — PullTaskAck below is unreachable only if we return here)
275        self.try_dispatch_tasks(
276            &compactor,
277            pull_task_count,
278            compaction_selectors,
279            max_get_task_probe_times,
280        )
281        .await;
282
283        // PullTaskAck: structurally guaranteed to execute after try_dispatch_tasks returns
284        if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
285            tracing::warn!(
286                error = %e.as_report(),
287                "Failed to send ack to {}",
288                context_id,
289            );
290            return false;
291        }
292
293        true
294    }
295
296    /// Selects and dispatches compaction tasks to a compactor.
297    ///
298    /// Separated from `handle_pull_task_event` so that `PullTaskAck` cannot be
299    /// accidentally skipped by early returns.
300    async fn try_dispatch_tasks(
301        &self,
302        compactor: &Arc<Compactor>,
303        pull_task_count: usize,
304        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
305        max_get_task_probe_times: usize,
306    ) {
307        let snapshot = self.hummock_manager.compaction_state.snapshot();
308        let Some((groups, task_type)) = snapshot.pick_compaction_groups_and_type() else {
309            return;
310        };
311
312        if let TaskType::Ttl = task_type {
313            match self
314                .hummock_manager
315                .metadata_manager
316                .get_all_table_options()
317                .await
318                .map_err(|err| Error::MetaStore(err.into()))
319            {
320                Ok(table_options) => {
321                    self.hummock_manager
322                        .update_table_id_to_table_option(table_options);
323                }
324                Err(err) => {
325                    warn!(error = %err.as_report(), "Failed to get table options");
326                }
327            }
328        }
329
330        let selector: &mut Box<dyn CompactionSelector> =
331            compaction_selectors.get_mut(&task_type).unwrap();
332
333        let mut generated_task_count = 0;
334        let mut existed_groups = groups.clone();
335        let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
336        let mut failed_tasks = vec![];
337        let mut loop_times = 0;
338
339        while generated_task_count < pull_task_count
340            && failed_tasks.is_empty()
341            && loop_times < max_get_task_probe_times
342        {
343            loop_times += 1;
344            let compact_ret = self
345                .hummock_manager
346                .get_compact_tasks(
347                    existed_groups.clone(),
348                    pull_task_count - generated_task_count,
349                    &mut **selector,
350                )
351                .await;
352
353            match compact_ret {
354                Ok((compact_tasks, unschedule_groups)) => {
355                    no_task_groups.extend(unschedule_groups);
356                    if compact_tasks.is_empty() {
357                        break;
358                    }
359                    generated_task_count += compact_tasks.len();
360                    for task in compact_tasks {
361                        let task_id = task.task_id;
362                        if let Err(e) =
363                            compactor.send_event(ResponseEvent::CompactTask(task.into()))
364                        {
365                            tracing::warn!(
366                                error = %e.as_report(),
367                                "Failed to send task {} to {}",
368                                task_id,
369                                compactor.context_id(),
370                            );
371                            failed_tasks.push(task_id);
372                        }
373                    }
374                    if !failed_tasks.is_empty() {
375                        self.hummock_manager
376                            .compactor_manager
377                            .remove_compactor(compactor.context_id());
378                    }
379                    existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
380                }
381                Err(err) => {
382                    tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
383                    break;
384                }
385            };
386        }
387        for group in no_task_groups {
388            self.hummock_manager.compaction_state.unschedule(
389                group,
390                task_type,
391                snapshot.snapshot_time(),
392            );
393        }
394        if let Err(err) = self
395            .hummock_manager
396            .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
397            .await
398        {
399            tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
400        }
401    }
402
403    async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
404        if let Err(e) = self
405            .hummock_manager
406            .report_compact_tasks(report_events)
407            .await
408        {
409            tracing::error!(error = %e.as_report(), "report compact_tack fail")
410        }
411        Ok(())
412    }
413}
414
415impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
416    CompactionEventLoop<D, E>
417{
418    pub fn new(
419        hummock_compactor_dispatcher: D,
420        metrics: Arc<MetaMetrics>,
421        compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
422    ) -> Self {
423        Self {
424            hummock_compactor_dispatcher,
425            metrics,
426            compactor_streams_change_rx,
427        }
428    }
429
430    pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
431        let mut compactor_request_streams = FuturesUnordered::new();
432        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
433        let shutdown_rx_shared = shutdown_rx.shared();
434
435        let join_handle = tokio::spawn(async move {
436            // A compactor reconnect reuses the same `context_id`, so an older stream may still
437            // resolve after a newer stream has already been registered. Track a local generation
438            // per context to fence off stale stream results from the previous session.
439            let mut stream_generations = HashMap::<HummockContextId, u64>::new();
440            let push_stream =
441                |context_id: HummockContextId,
442                 stream_generation: u64,
443                 stream: Streaming<E>,
444                 compactor_request_streams: &mut FuturesUnordered<_>| {
445                    let future = StreamExt::into_future(stream)
446                        .map(move |stream_future| (context_id, stream_generation, stream_future));
447
448                    compactor_request_streams.push(future);
449                };
450
451            let mut event_loop_iteration_now = Instant::now();
452
453            loop {
454                let shutdown_rx_shared = shutdown_rx_shared.clone();
455                self.metrics
456                    .compaction_event_loop_iteration_latency
457                    .observe(event_loop_iteration_now.elapsed().as_millis() as _);
458                event_loop_iteration_now = Instant::now();
459
460                tokio::select! {
461                    _ = shutdown_rx_shared => { return; },
462
463                    compactor_stream = self.compactor_streams_change_rx.recv() => {
464                        if let Some((context_id, stream)) = compactor_stream {
465                            tracing::info!("compactor {} enters the cluster", context_id);
466                            let stream_generation = stream_generations
467                                .entry(context_id)
468                                .and_modify(|generation| *generation += 1)
469                                .or_insert(1);
470                            push_stream(
471                                context_id,
472                                *stream_generation,
473                                stream,
474                                &mut compactor_request_streams,
475                            );
476                        }
477                    },
478
479                    result = pending_on_none(compactor_request_streams.next()) => {
480                        let (context_id, stream_generation, compactor_stream_req): (_, _, (std::option::Option<std::result::Result<E, _>>, _)) = result;
481                        let Some(current_generation) = stream_generations.get(&context_id).copied() else {
482                            continue;
483                        };
484                        if current_generation != stream_generation {
485                            // Ignore events, EOF and poll errors from superseded streams. Without
486                            // this fence, a late error from an old stream could remove the current
487                            // compactor session for the same `context_id`.
488                            continue;
489                        }
490                        let (event, create_at, stream) = match compactor_stream_req {
491                            (Some(Ok(req)), stream) => {
492                                let create_at = req.create_at();
493                                let event  = req.take_event();
494                                (event, create_at, stream)
495                            }
496
497                            (Some(Err(err)), _stream) => {
498                                tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
499                                self.hummock_compactor_dispatcher.remove_compactor(context_id);
500                                continue
501                            }
502
503                            _ => {
504                                // remove compactor from compactor manager
505                                tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
506                                self.hummock_compactor_dispatcher.remove_compactor(context_id);
507                                continue
508                            },
509                        };
510
511                        {
512                            let consumed_latency_ms = SystemTime::now()
513                                .duration_since(std::time::UNIX_EPOCH)
514                                .expect("Clock may have gone backwards")
515                                .as_millis()
516                                as u64
517                            - create_at;
518                            self.metrics
519                                .compaction_event_consumed_latency
520                                .observe(consumed_latency_ms as _);
521                        }
522
523                        let mut compactor_alive = true;
524                        if self
525                            .hummock_compactor_dispatcher
526                            .should_forward(&event)
527                        {
528                            if let Err(e) = self
529                                .hummock_compactor_dispatcher
530                                .on_event_remotely(context_id, event)
531                                .await
532                            {
533                                tracing::warn!(error = %e.as_report(), "Failed to forward event");
534                            }
535                        } else {
536                            compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
537                                context_id,
538                                event,
539                            ).await;
540                        }
541
542                        if compactor_alive {
543                            push_stream(
544                                context_id,
545                                stream_generation,
546                                stream,
547                                &mut compactor_request_streams,
548                            );
549                        } else {
550                            tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
551                            self
552                            .hummock_compactor_dispatcher
553                            .remove_compactor(context_id);
554                        }
555                    },
556                }
557            }
558        });
559
560        (join_handle, shutdown_tx)
561    }
562}
563
564impl CompactorStreamEvent for SubscribeCompactionEventRequest {
565    type EventType = RequestEvent;
566
567    fn take_event(self) -> Self::EventType {
568        self.event.unwrap()
569    }
570
571    fn create_at(&self) -> u64 {
572        self.create_at
573    }
574}
575
576pub struct HummockCompactorDedicatedEventLoop {
577    hummock_manager: Arc<HummockManager>,
578    hummock_compaction_event_handler: HummockCompactionEventHandler,
579}
580
581impl HummockCompactorDedicatedEventLoop {
582    pub fn new(
583        hummock_manager: Arc<HummockManager>,
584        hummock_compaction_event_handler: HummockCompactionEventHandler,
585    ) -> Self {
586        Self {
587            hummock_manager,
588            hummock_compaction_event_handler,
589        }
590    }
591
592    /// dedicated event runtime for CPU/IO bound event
593    async fn compact_task_dedicated_event_handler(
594        &self,
595        mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
596        shutdown_rx: OneShotReceiver<()>,
597    ) {
598        let mut compaction_selectors = init_selectors();
599
600        tokio::select! {
601            _ = shutdown_rx => {}
602
603            _ = async {
604                while let Some((context_id, event)) = rx.recv().await {
605                    let mut report_events = vec![];
606                    let mut skip_times = 0;
607                    match event {
608                        RequestEvent::PullTask(PullTask { pull_task_count }) => {
609                            self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
610                        }
611
612                        RequestEvent::ReportTask(task) => {
613                           report_events.push(task.into());
614                        }
615
616                        _ => unreachable!(),
617                    }
618                    while let Ok((context_id, event)) = rx.try_recv() {
619                        match event {
620                            RequestEvent::PullTask(PullTask { pull_task_count }) => {
621                                self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
622                                if !report_events.is_empty() {
623                                    if skip_times > MAX_SKIP_TIMES {
624                                        break;
625                                    }
626                                    skip_times += 1;
627                                }
628                            }
629
630                            RequestEvent::ReportTask(task) => {
631                                report_events.push(task.into());
632                                if report_events.len() >= MAX_REPORT_COUNT {
633                                    break;
634                                }
635                            }
636                        _ => unreachable!(),
637                        }
638                    }
639                    if !report_events.is_empty()
640                        && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
641                        {
642                            tracing::error!(error = %e.as_report(), "report compact_tack fail")
643                        }
644                }
645            } => {}
646        }
647    }
648
649    pub fn run(
650        self,
651    ) -> (
652        JoinHandle<()>,
653        UnboundedSender<(HummockContextId, RequestEvent)>,
654        Sender<()>,
655    ) {
656        let (tx, rx) = unbounded_channel();
657        let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
658        let join_handler = tokio::spawn(async move {
659            self.compact_task_dedicated_event_handler(rx, shutdown_rx)
660                .await;
661        });
662        (join_handler, tx, shutdon_tx)
663    }
664}
665
666pub struct IcebergCompactionEventHandler {
667    compaction_manager: IcebergCompactionManagerRef,
668}
669
670impl IcebergCompactionEventHandler {
671    pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
672        Self { compaction_manager }
673    }
674
675    async fn handle_pull_task_event(
676        &self,
677        context_id: HummockContextId,
678        pull_task_count: usize,
679    ) -> bool {
680        assert_ne!(0, pull_task_count);
681        if let Some(compactor) = self
682            .compaction_manager
683            .iceberg_compactor_manager
684            .get_compactor(context_id)
685        {
686            let mut compactor_alive = true;
687
688            let iceberg_compaction_handles = self
689                .compaction_manager
690                .get_top_n_iceberg_commit_sink_ids(pull_task_count);
691
692            for handle in iceberg_compaction_handles {
693                let compactor = compactor.clone();
694                // send iceberg commit task to compactor
695                if let Err(e) = async move {
696                    handle
697                        .send_compact_task(
698                            compactor,
699                            next_compaction_task_id(&self.compaction_manager.env).await?,
700                        )
701                        .await
702                }
703                .await
704                {
705                    tracing::warn!(
706                        error = %e.as_report(),
707                        "Failed to send iceberg commit task to {}",
708                        context_id,
709                    );
710                    compactor_alive = false;
711                }
712            }
713
714            if let Err(e) =
715                compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
716            {
717                tracing::warn!(
718                    error = %e.as_report(),
719                    "Failed to send ask to {}",
720                    context_id,
721                );
722                compactor_alive = false;
723            }
724
725            return compactor_alive;
726        }
727
728        false
729    }
730
731    fn apply_report_task_event(&self, report: IcebergReportTask) {
732        self.compaction_manager.handle_report_task(report);
733    }
734}
735
736pub struct IcebergCompactionEventDispatcher {
737    compaction_event_handler: IcebergCompactionEventHandler,
738}
739
740#[async_trait::async_trait]
741impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
742    type EventType = IcebergRequestEvent;
743
744    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
745        match event {
746            IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
747                return self
748                    .compaction_event_handler
749                    .handle_pull_task_event(context_id, pull_task_count as usize)
750                    .await;
751            }
752            IcebergRequestEvent::ReportTask(report) => {
753                self.compaction_event_handler
754                    .apply_report_task_event(report);
755                return true;
756            }
757            _ => unreachable!(),
758        }
759    }
760
761    async fn on_event_remotely(
762        &self,
763        _context_id: HummockContextId,
764        _event: Self::EventType,
765    ) -> Result<()> {
766        unreachable!()
767    }
768
769    fn should_forward(&self, _event: &Self::EventType) -> bool {
770        false
771    }
772
773    fn remove_compactor(&self, context_id: HummockContextId) {
774        self.compaction_event_handler
775            .compaction_manager
776            .iceberg_compactor_manager
777            .remove_compactor(context_id);
778    }
779}
780
781impl IcebergCompactionEventDispatcher {
782    pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
783        Self {
784            compaction_event_handler,
785        }
786    }
787}
788
789impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
790    type EventType = IcebergRequestEvent;
791
792    fn take_event(self) -> Self::EventType {
793        self.event.unwrap()
794    }
795
796    fn create_at(&self) -> u64 {
797        self.create_at
798    }
799}