risingwave_meta/hummock/manager/compaction/
compaction_event_loop.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Instant, SystemTime};
18
19use anyhow::Context;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use risingwave_hummock_sdk::compact_task::ReportTask;
23use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
24use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
25use risingwave_pb::hummock::subscribe_compaction_event_request::{
26    Event as RequestEvent, HeartBeat, PullTask,
27};
28use risingwave_pb::hummock::subscribe_compaction_event_response::{
29    Event as ResponseEvent, PullTaskAck,
30};
31use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
32use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
34    Event as IcebergRequestEvent, PullTask as IcebergPullTask,
35};
36use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
37    Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
38};
39use rw_futures_util::pending_on_none;
40use thiserror_ext::AsReport;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
43use tokio::task::JoinHandle;
44use tonic::Streaming;
45use tracing::warn;
46
47use super::init_selectors;
48use crate::hummock::HummockManager;
49use crate::hummock::compaction::CompactionSelector;
50use crate::hummock::compactor_manager::Compactor;
51use crate::hummock::error::{Error, Result};
52use crate::hummock::sequence::next_compaction_task_id;
53use crate::manager::MetaOpts;
54use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
55use crate::rpc::metrics::MetaMetrics;
56
57const MAX_SKIP_TIMES: usize = 8;
58const MAX_REPORT_COUNT: usize = 16;
59
60#[async_trait::async_trait]
61pub trait CompactionEventDispatcher: Send + Sync + 'static {
62    type EventType: Send + Sync + 'static;
63
64    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
65
66    async fn on_event_remotely(
67        &self,
68        context_id: HummockContextId,
69        event: Self::EventType,
70    ) -> Result<()>;
71
72    fn should_forward(&self, event: &Self::EventType) -> bool;
73
74    fn remove_compactor(&self, context_id: HummockContextId);
75}
76
77pub trait CompactorStreamEvent: Send + Sync + 'static {
78    type EventType: Send + Sync + 'static;
79    fn take_event(self) -> Self::EventType;
80    fn create_at(&self) -> u64;
81}
82
83pub struct CompactionEventLoop<
84    D: CompactionEventDispatcher<EventType = E::EventType>,
85    E: CompactorStreamEvent,
86> {
87    hummock_compactor_dispatcher: D,
88    metrics: Arc<MetaMetrics>,
89    compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
90}
91
92pub type HummockCompactionEventLoop =
93    CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
94
95pub type IcebergCompactionEventLoop =
96    CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
97
98pub struct HummockCompactionEventDispatcher {
99    meta_opts: Arc<MetaOpts>,
100    hummock_compaction_event_handler: HummockCompactionEventHandler,
101    tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
102}
103
104#[async_trait::async_trait]
105impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
106    type EventType = RequestEvent;
107
108    fn should_forward(&self, event: &Self::EventType) -> bool {
109        if self.tx.is_none() {
110            return false;
111        }
112
113        matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
114    }
115
116    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
117        let mut compactor_alive = true;
118        match event {
119            RequestEvent::HeartBeat(HeartBeat { progress }) => {
120                compactor_alive = self
121                    .hummock_compaction_event_handler
122                    .handle_heartbeat(context_id, progress)
123                    .await;
124            }
125
126            RequestEvent::Register(_event) => {
127                unreachable!()
128            }
129
130            RequestEvent::PullTask(pull_task) => {
131                compactor_alive = self
132                    .hummock_compaction_event_handler
133                    .handle_pull_task_event(
134                        context_id,
135                        pull_task.pull_task_count as usize,
136                        &mut init_selectors(),
137                        self.meta_opts.max_get_task_probe_times,
138                    )
139                    .await;
140            }
141
142            RequestEvent::ReportTask(report_event) => {
143                if let Err(e) = self
144                    .hummock_compaction_event_handler
145                    .handle_report_task_event(vec![report_event.into()])
146                    .await
147                {
148                    tracing::error!(error = %e.as_report(), "report compact_tack fail")
149                }
150            }
151        }
152
153        compactor_alive
154    }
155
156    async fn on_event_remotely(
157        &self,
158        context_id: HummockContextId,
159        event: Self::EventType,
160    ) -> Result<()> {
161        if let Some(tx) = &self.tx {
162            tx.send((context_id, event))
163                .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
164        } else {
165            unreachable!();
166        }
167        Ok(())
168    }
169
170    fn remove_compactor(&self, context_id: HummockContextId) {
171        self.hummock_compaction_event_handler
172            .hummock_manager
173            .compactor_manager
174            .remove_compactor(context_id);
175    }
176}
177
178impl HummockCompactionEventDispatcher {
179    pub fn new(
180        meta_opts: Arc<MetaOpts>,
181        hummock_compaction_event_handler: HummockCompactionEventHandler,
182        tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
183    ) -> Self {
184        Self {
185            meta_opts,
186            hummock_compaction_event_handler,
187            tx,
188        }
189    }
190}
191
192#[derive(Clone)]
193pub struct HummockCompactionEventHandler {
194    pub hummock_manager: Arc<HummockManager>,
195}
196
197impl HummockCompactionEventHandler {
198    pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
199        Self { hummock_manager }
200    }
201
202    async fn handle_heartbeat(
203        &self,
204        context_id: HummockContextId,
205        progress: Vec<CompactTaskProgress>,
206    ) -> bool {
207        let mut compactor_alive = true;
208        let compactor_manager = self.hummock_manager.compactor_manager.clone();
209        let cancel_tasks = compactor_manager
210            .update_task_heartbeats(&progress)
211            .into_iter()
212            .map(|task| task.task_id)
213            .collect::<Vec<_>>();
214        if !cancel_tasks.is_empty() {
215            tracing::info!(
216                ?cancel_tasks,
217                %context_id,
218                "Tasks cancel has expired due to lack of visible progress",
219            );
220
221            if let Err(e) = self
222                .hummock_manager
223                .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
224                .await
225            {
226                tracing::error!(
227                    error = %e.as_report(),
228                    "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
229                    until we can successfully report its status."
230                );
231            }
232        }
233
234        match compactor_manager.get_compactor(context_id) {
235            Some(compactor) => {
236                // Forcefully cancel the task so that it terminates
237                // early on the compactor
238                // node.
239                if !cancel_tasks.is_empty() {
240                    let _ = compactor.cancel_tasks(&cancel_tasks);
241                    tracing::info!(
242                        ?cancel_tasks,
243                        %context_id,
244                        "CancelTask operation has been sent to compactor node",
245                    );
246                }
247            }
248            _ => {
249                // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
250                // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
251                compactor_alive = false;
252            }
253        }
254
255        compactor_alive
256    }
257
258    async fn handle_pull_task_event(
259        &self,
260        context_id: HummockContextId,
261        pull_task_count: usize,
262        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
263        max_get_task_probe_times: usize,
264    ) -> bool {
265        assert_ne!(0, pull_task_count);
266        let Some(compactor) = self
267            .hummock_manager
268            .compactor_manager
269            .get_compactor(context_id)
270        else {
271            return false;
272        };
273
274        // Task selection + dispatch (any early return inside is safe — PullTaskAck below is unreachable only if we return here)
275        self.try_dispatch_tasks(
276            &compactor,
277            pull_task_count,
278            compaction_selectors,
279            max_get_task_probe_times,
280        )
281        .await;
282
283        // PullTaskAck: structurally guaranteed to execute after try_dispatch_tasks returns
284        if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
285            tracing::warn!(
286                error = %e.as_report(),
287                "Failed to send ack to {}",
288                context_id,
289            );
290            return false;
291        }
292
293        true
294    }
295
296    /// Selects and dispatches compaction tasks to a compactor.
297    ///
298    /// Separated from `handle_pull_task_event` so that `PullTaskAck` cannot be
299    /// accidentally skipped by early returns.
300    async fn try_dispatch_tasks(
301        &self,
302        compactor: &Arc<Compactor>,
303        pull_task_count: usize,
304        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
305        max_get_task_probe_times: usize,
306    ) {
307        let snapshot = self.hummock_manager.compaction_state.snapshot();
308        let Some((groups, task_type)) = snapshot.pick_compaction_groups_and_type() else {
309            return;
310        };
311
312        if let TaskType::Ttl = task_type {
313            match self
314                .hummock_manager
315                .metadata_manager
316                .get_all_table_options()
317                .await
318                .map_err(|err| Error::MetaStore(err.into()))
319            {
320                Ok(table_options) => {
321                    self.hummock_manager
322                        .update_table_id_to_table_option(table_options);
323                }
324                Err(err) => {
325                    warn!(error = %err.as_report(), "Failed to get table options");
326                }
327            }
328        }
329
330        let selector: &mut Box<dyn CompactionSelector> =
331            compaction_selectors.get_mut(&task_type).unwrap();
332
333        let mut generated_task_count = 0;
334        let mut existed_groups = groups.clone();
335        let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
336        let mut failed_tasks = vec![];
337        let mut loop_times = 0;
338
339        while generated_task_count < pull_task_count
340            && failed_tasks.is_empty()
341            && loop_times < max_get_task_probe_times
342        {
343            loop_times += 1;
344            let compact_ret = self
345                .hummock_manager
346                .get_compact_tasks(
347                    existed_groups.clone(),
348                    pull_task_count - generated_task_count,
349                    selector,
350                )
351                .await;
352
353            match compact_ret {
354                Ok((compact_tasks, unschedule_groups)) => {
355                    no_task_groups.extend(unschedule_groups);
356                    if compact_tasks.is_empty() {
357                        break;
358                    }
359                    generated_task_count += compact_tasks.len();
360                    for task in compact_tasks {
361                        let task_id = task.task_id;
362                        if let Err(e) =
363                            compactor.send_event(ResponseEvent::CompactTask(task.into()))
364                        {
365                            tracing::warn!(
366                                error = %e.as_report(),
367                                "Failed to send task {} to {}",
368                                task_id,
369                                compactor.context_id(),
370                            );
371                            failed_tasks.push(task_id);
372                        }
373                    }
374                    if !failed_tasks.is_empty() {
375                        self.hummock_manager
376                            .compactor_manager
377                            .remove_compactor(compactor.context_id());
378                    }
379                    existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
380                }
381                Err(err) => {
382                    tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
383                    break;
384                }
385            };
386        }
387        for group in no_task_groups {
388            self.hummock_manager.compaction_state.unschedule(
389                group,
390                task_type,
391                snapshot.snapshot_time(),
392            );
393        }
394        if let Err(err) = self
395            .hummock_manager
396            .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
397            .await
398        {
399            tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
400        }
401    }
402
403    async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
404        if let Err(e) = self
405            .hummock_manager
406            .report_compact_tasks(report_events)
407            .await
408        {
409            tracing::error!(error = %e.as_report(), "report compact_tack fail")
410        }
411        Ok(())
412    }
413}
414
415impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
416    CompactionEventLoop<D, E>
417{
418    pub fn new(
419        hummock_compactor_dispatcher: D,
420        metrics: Arc<MetaMetrics>,
421        compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
422    ) -> Self {
423        Self {
424            hummock_compactor_dispatcher,
425            metrics,
426            compactor_streams_change_rx,
427        }
428    }
429
430    pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
431        let mut compactor_request_streams = FuturesUnordered::new();
432        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
433        let shutdown_rx_shared = shutdown_rx.shared();
434
435        let join_handle = tokio::spawn(async move {
436            let push_stream =
437                |context_id: HummockContextId,
438                 stream: Streaming<E>,
439                 compactor_request_streams: &mut FuturesUnordered<_>| {
440                    let future = StreamExt::into_future(stream)
441                        .map(move |stream_future| (context_id, stream_future));
442
443                    compactor_request_streams.push(future);
444                };
445
446            let mut event_loop_iteration_now = Instant::now();
447
448            loop {
449                let shutdown_rx_shared = shutdown_rx_shared.clone();
450                self.metrics
451                    .compaction_event_loop_iteration_latency
452                    .observe(event_loop_iteration_now.elapsed().as_millis() as _);
453                event_loop_iteration_now = Instant::now();
454
455                tokio::select! {
456                    _ = shutdown_rx_shared => { return; },
457
458                    compactor_stream = self.compactor_streams_change_rx.recv() => {
459                        if let Some((context_id, stream)) = compactor_stream {
460                            tracing::info!("compactor {} enters the cluster", context_id);
461                            push_stream(context_id, stream, &mut compactor_request_streams);
462                        }
463                    },
464
465                    result = pending_on_none(compactor_request_streams.next()) => {
466                        let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
467                        let (event, create_at, stream) = match compactor_stream_req {
468                            (Some(Ok(req)), stream) => {
469                                let create_at = req.create_at();
470                                let event  = req.take_event();
471                                (event, create_at, stream)
472                            }
473
474                            (Some(Err(err)), _stream) => {
475                                tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
476                                continue
477                            }
478
479                            _ => {
480                                // remove compactor from compactor manager
481                                tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
482                                self.hummock_compactor_dispatcher.remove_compactor(context_id);
483                                continue
484                            },
485                        };
486
487                        {
488                            let consumed_latency_ms = SystemTime::now()
489                                .duration_since(std::time::UNIX_EPOCH)
490                                .expect("Clock may have gone backwards")
491                                .as_millis()
492                                as u64
493                            - create_at;
494                            self.metrics
495                                .compaction_event_consumed_latency
496                                .observe(consumed_latency_ms as _);
497                        }
498
499                        let mut compactor_alive = true;
500                        if self
501                            .hummock_compactor_dispatcher
502                            .should_forward(&event)
503                        {
504                            if let Err(e) = self
505                                .hummock_compactor_dispatcher
506                                .on_event_remotely(context_id, event)
507                                .await
508                            {
509                                tracing::warn!(error = %e.as_report(), "Failed to forward event");
510                            }
511                        } else {
512                            compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
513                                context_id,
514                                event,
515                            ).await;
516                        }
517
518                        if compactor_alive {
519                            push_stream(context_id, stream, &mut compactor_request_streams);
520                        } else {
521                            tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
522                            self
523                            .hummock_compactor_dispatcher
524                            .remove_compactor(context_id);
525                        }
526                    },
527                }
528            }
529        });
530
531        (join_handle, shutdown_tx)
532    }
533}
534
535impl CompactorStreamEvent for SubscribeCompactionEventRequest {
536    type EventType = RequestEvent;
537
538    fn take_event(self) -> Self::EventType {
539        self.event.unwrap()
540    }
541
542    fn create_at(&self) -> u64 {
543        self.create_at
544    }
545}
546
547pub struct HummockCompactorDedicatedEventLoop {
548    hummock_manager: Arc<HummockManager>,
549    hummock_compaction_event_handler: HummockCompactionEventHandler,
550}
551
552impl HummockCompactorDedicatedEventLoop {
553    pub fn new(
554        hummock_manager: Arc<HummockManager>,
555        hummock_compaction_event_handler: HummockCompactionEventHandler,
556    ) -> Self {
557        Self {
558            hummock_manager,
559            hummock_compaction_event_handler,
560        }
561    }
562
563    /// dedicated event runtime for CPU/IO bound event
564    async fn compact_task_dedicated_event_handler(
565        &self,
566        mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
567        shutdown_rx: OneShotReceiver<()>,
568    ) {
569        let mut compaction_selectors = init_selectors();
570
571        tokio::select! {
572            _ = shutdown_rx => {}
573
574            _ = async {
575                while let Some((context_id, event)) = rx.recv().await {
576                    let mut report_events = vec![];
577                    let mut skip_times = 0;
578                    match event {
579                        RequestEvent::PullTask(PullTask { pull_task_count }) => {
580                            self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
581                        }
582
583                        RequestEvent::ReportTask(task) => {
584                           report_events.push(task.into());
585                        }
586
587                        _ => unreachable!(),
588                    }
589                    while let Ok((context_id, event)) = rx.try_recv() {
590                        match event {
591                            RequestEvent::PullTask(PullTask { pull_task_count }) => {
592                                self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
593                                if !report_events.is_empty() {
594                                    if skip_times > MAX_SKIP_TIMES {
595                                        break;
596                                    }
597                                    skip_times += 1;
598                                }
599                            }
600
601                            RequestEvent::ReportTask(task) => {
602                                report_events.push(task.into());
603                                if report_events.len() >= MAX_REPORT_COUNT {
604                                    break;
605                                }
606                            }
607                        _ => unreachable!(),
608                        }
609                    }
610                    if !report_events.is_empty()
611                        && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
612                        {
613                            tracing::error!(error = %e.as_report(), "report compact_tack fail")
614                        }
615                }
616            } => {}
617        }
618    }
619
620    pub fn run(
621        self,
622    ) -> (
623        JoinHandle<()>,
624        UnboundedSender<(HummockContextId, RequestEvent)>,
625        Sender<()>,
626    ) {
627        let (tx, rx) = unbounded_channel();
628        let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
629        let join_handler = tokio::spawn(async move {
630            self.compact_task_dedicated_event_handler(rx, shutdown_rx)
631                .await;
632        });
633        (join_handler, tx, shutdon_tx)
634    }
635}
636
637pub struct IcebergCompactionEventHandler {
638    compaction_manager: IcebergCompactionManagerRef,
639}
640
641impl IcebergCompactionEventHandler {
642    pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
643        Self { compaction_manager }
644    }
645
646    async fn handle_pull_task_event(
647        &self,
648        context_id: HummockContextId,
649        pull_task_count: usize,
650    ) -> bool {
651        assert_ne!(0, pull_task_count);
652        if let Some(compactor) = self
653            .compaction_manager
654            .iceberg_compactor_manager
655            .get_compactor(context_id)
656        {
657            let mut compactor_alive = true;
658
659            let iceberg_compaction_handles = self
660                .compaction_manager
661                .get_top_n_iceberg_commit_sink_ids(pull_task_count)
662                .await;
663
664            for handle in iceberg_compaction_handles {
665                let compactor = compactor.clone();
666                // send iceberg commit task to compactor
667                if let Err(e) = async move {
668                    handle
669                        .send_compact_task(
670                            compactor,
671                            next_compaction_task_id(&self.compaction_manager.env).await?,
672                        )
673                        .await
674                }
675                .await
676                {
677                    tracing::warn!(
678                        error = %e.as_report(),
679                        "Failed to send iceberg commit task to {}",
680                        context_id,
681                    );
682                    compactor_alive = false;
683                }
684            }
685
686            if let Err(e) =
687                compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
688            {
689                tracing::warn!(
690                    error = %e.as_report(),
691                    "Failed to send ask to {}",
692                    context_id,
693                );
694                compactor_alive = false;
695            }
696
697            return compactor_alive;
698        }
699
700        false
701    }
702}
703
704pub struct IcebergCompactionEventDispatcher {
705    compaction_event_handler: IcebergCompactionEventHandler,
706}
707
708#[async_trait::async_trait]
709impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
710    type EventType = IcebergRequestEvent;
711
712    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
713        match event {
714            IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
715                return self
716                    .compaction_event_handler
717                    .handle_pull_task_event(context_id, pull_task_count as usize)
718                    .await;
719            }
720            _ => unreachable!(),
721        }
722    }
723
724    async fn on_event_remotely(
725        &self,
726        _context_id: HummockContextId,
727        _event: Self::EventType,
728    ) -> Result<()> {
729        unreachable!()
730    }
731
732    fn should_forward(&self, _event: &Self::EventType) -> bool {
733        false
734    }
735
736    fn remove_compactor(&self, context_id: HummockContextId) {
737        self.compaction_event_handler
738            .compaction_manager
739            .iceberg_compactor_manager
740            .remove_compactor(context_id);
741    }
742}
743
744impl IcebergCompactionEventDispatcher {
745    pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
746        Self {
747            compaction_event_handler,
748        }
749    }
750}
751
752impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
753    type EventType = IcebergRequestEvent;
754
755    fn take_event(self) -> Self::EventType {
756        self.event.unwrap()
757    }
758
759    fn create_at(&self) -> u64 {
760        self.create_at
761    }
762}