risingwave_meta/hummock/manager/compaction/
compaction_event_loop.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 RisingWave Labs
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::{HashMap, HashSet};
30use std::sync::Arc;
31use std::time::{Instant, SystemTime};
32
33use anyhow::Context;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, StreamExt};
36use risingwave_hummock_sdk::compact_task::ReportTask;
37use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
38use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
39use risingwave_pb::hummock::subscribe_compaction_event_request::{
40    Event as RequestEvent, HeartBeat, PullTask,
41};
42use risingwave_pb::hummock::subscribe_compaction_event_response::{
43    Event as ResponseEvent, PullTaskAck,
44};
45use risingwave_pb::hummock::{CompactTaskProgress, SubscribeCompactionEventRequest};
46use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
47use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::{
48    Event as IcebergRequestEvent, PullTask as IcebergPullTask,
49};
50use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::{
51    Event as IcebergResponseEvent, PullTaskAck as IcebergPullTaskAck,
52};
53use rw_futures_util::pending_on_none;
54use thiserror_ext::AsReport;
55use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
56use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use super::init_selectors;
62use crate::hummock::HummockManager;
63use crate::hummock::compaction::CompactionSelector;
64use crate::hummock::error::{Error, Result};
65use crate::hummock::sequence::next_compaction_task_id;
66use crate::manager::MetaOpts;
67use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
68use crate::rpc::metrics::MetaMetrics;
69
70const MAX_SKIP_TIMES: usize = 8;
71const MAX_REPORT_COUNT: usize = 16;
72
73#[async_trait::async_trait]
74pub trait CompactionEventDispatcher: Send + Sync + 'static {
75    type EventType: Send + Sync + 'static;
76
77    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool;
78
79    async fn on_event_remotely(
80        &self,
81        context_id: HummockContextId,
82        event: Self::EventType,
83    ) -> Result<()>;
84
85    fn should_forward(&self, event: &Self::EventType) -> bool;
86
87    fn remove_compactor(&self, context_id: HummockContextId);
88}
89
90pub trait CompactorStreamEvent: Send + Sync + 'static {
91    type EventType: Send + Sync + 'static;
92    fn take_event(self) -> Self::EventType;
93    fn create_at(&self) -> u64;
94}
95
96pub struct CompactionEventLoop<
97    D: CompactionEventDispatcher<EventType = E::EventType>,
98    E: CompactorStreamEvent,
99> {
100    hummock_compactor_dispatcher: D,
101    metrics: Arc<MetaMetrics>,
102    compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
103}
104
105pub type HummockCompactionEventLoop =
106    CompactionEventLoop<HummockCompactionEventDispatcher, SubscribeCompactionEventRequest>;
107
108pub type IcebergCompactionEventLoop =
109    CompactionEventLoop<IcebergCompactionEventDispatcher, SubscribeIcebergCompactionEventRequest>;
110
111pub struct HummockCompactionEventDispatcher {
112    meta_opts: Arc<MetaOpts>,
113    hummock_compaction_event_handler: HummockCompactionEventHandler,
114    tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
115}
116
117#[async_trait::async_trait]
118impl CompactionEventDispatcher for HummockCompactionEventDispatcher {
119    type EventType = RequestEvent;
120
121    fn should_forward(&self, event: &Self::EventType) -> bool {
122        if self.tx.is_none() {
123            return false;
124        }
125
126        matches!(event, RequestEvent::PullTask(_)) || matches!(event, RequestEvent::ReportTask(_))
127    }
128
129    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
130        let mut compactor_alive = true;
131        match event {
132            RequestEvent::HeartBeat(HeartBeat { progress }) => {
133                compactor_alive = self
134                    .hummock_compaction_event_handler
135                    .handle_heartbeat(context_id, progress)
136                    .await;
137            }
138
139            RequestEvent::Register(_event) => {
140                unreachable!()
141            }
142
143            RequestEvent::PullTask(pull_task) => {
144                compactor_alive = self
145                    .hummock_compaction_event_handler
146                    .handle_pull_task_event(
147                        context_id,
148                        pull_task.pull_task_count as usize,
149                        &mut init_selectors(),
150                        self.meta_opts.max_get_task_probe_times,
151                    )
152                    .await;
153            }
154
155            RequestEvent::ReportTask(report_event) => {
156                if let Err(e) = self
157                    .hummock_compaction_event_handler
158                    .handle_report_task_event(vec![report_event.into()])
159                    .await
160                {
161                    tracing::error!(error = %e.as_report(), "report compact_tack fail")
162                }
163            }
164        }
165
166        compactor_alive
167    }
168
169    async fn on_event_remotely(
170        &self,
171        context_id: HummockContextId,
172        event: Self::EventType,
173    ) -> Result<()> {
174        if let Some(tx) = &self.tx {
175            tx.send((context_id, event))
176                .with_context(|| format!("Failed to send event to compactor {context_id}"))?;
177        } else {
178            unreachable!();
179        }
180        Ok(())
181    }
182
183    fn remove_compactor(&self, context_id: HummockContextId) {
184        self.hummock_compaction_event_handler
185            .hummock_manager
186            .compactor_manager
187            .remove_compactor(context_id);
188    }
189}
190
191impl HummockCompactionEventDispatcher {
192    pub fn new(
193        meta_opts: Arc<MetaOpts>,
194        hummock_compaction_event_handler: HummockCompactionEventHandler,
195        tx: Option<UnboundedSender<(HummockContextId, RequestEvent)>>,
196    ) -> Self {
197        Self {
198            meta_opts,
199            hummock_compaction_event_handler,
200            tx,
201        }
202    }
203}
204
205#[derive(Clone)]
206pub struct HummockCompactionEventHandler {
207    pub hummock_manager: Arc<HummockManager>,
208}
209
210impl HummockCompactionEventHandler {
211    pub fn new(hummock_manager: Arc<HummockManager>) -> Self {
212        Self { hummock_manager }
213    }
214
215    async fn handle_heartbeat(
216        &self,
217        context_id: HummockContextId,
218        progress: Vec<CompactTaskProgress>,
219    ) -> bool {
220        let mut compactor_alive = true;
221        let compactor_manager = self.hummock_manager.compactor_manager.clone();
222        let cancel_tasks = compactor_manager
223            .update_task_heartbeats(&progress)
224            .into_iter()
225            .map(|task| task.task_id)
226            .collect::<Vec<_>>();
227        if !cancel_tasks.is_empty() {
228            tracing::info!(
229                ?cancel_tasks,
230                %context_id,
231                "Tasks cancel has expired due to lack of visible progress",
232            );
233
234            if let Err(e) = self
235                .hummock_manager
236                .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
237                .await
238            {
239                tracing::error!(
240                    error = %e.as_report(),
241                    "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
242                    until we can successfully report its status."
243                );
244            }
245        }
246
247        match compactor_manager.get_compactor(context_id) {
248            Some(compactor) => {
249                // Forcefully cancel the task so that it terminates
250                // early on the compactor
251                // node.
252                if !cancel_tasks.is_empty() {
253                    let _ = compactor.cancel_tasks(&cancel_tasks);
254                    tracing::info!(
255                        ?cancel_tasks,
256                        %context_id,
257                        "CancelTask operation has been sent to compactor node",
258                    );
259                }
260            }
261            _ => {
262                // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
263                // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
264                compactor_alive = false;
265            }
266        }
267
268        compactor_alive
269    }
270
271    async fn handle_pull_task_event(
272        &self,
273        context_id: HummockContextId,
274        pull_task_count: usize,
275        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
276        max_get_task_probe_times: usize,
277    ) -> bool {
278        assert_ne!(0, pull_task_count);
279        if let Some(compactor) = self
280            .hummock_manager
281            .compactor_manager
282            .get_compactor(context_id)
283        {
284            let mut compactor_alive = true;
285            let (groups, task_type) = self
286                .hummock_manager
287                .auto_pick_compaction_groups_and_type()
288                .await;
289            if let TaskType::Ttl = task_type {
290                match self
291                    .hummock_manager
292                    .metadata_manager
293                    .get_all_table_options()
294                    .await
295                    .map_err(|err| Error::MetaStore(err.into()))
296                {
297                    Ok(table_options) => {
298                        self.hummock_manager
299                            .update_table_id_to_table_option(table_options);
300                    }
301                    Err(err) => {
302                        warn!(error = %err.as_report(), "Failed to get table options");
303                    }
304                }
305            }
306
307            if !groups.is_empty() {
308                let selector: &mut Box<dyn CompactionSelector> =
309                    compaction_selectors.get_mut(&task_type).unwrap();
310
311                let mut generated_task_count = 0;
312                let mut existed_groups = groups.clone();
313                let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
314                let mut failed_tasks = vec![];
315                let mut loop_times = 0;
316
317                while generated_task_count < pull_task_count
318                    && failed_tasks.is_empty()
319                    && loop_times < max_get_task_probe_times
320                {
321                    loop_times += 1;
322                    let compact_ret = self
323                        .hummock_manager
324                        .get_compact_tasks(
325                            existed_groups.clone(),
326                            pull_task_count - generated_task_count,
327                            selector,
328                        )
329                        .await;
330
331                    match compact_ret {
332                        Ok((compact_tasks, unschedule_groups)) => {
333                            no_task_groups.extend(unschedule_groups);
334                            if compact_tasks.is_empty() {
335                                break;
336                            }
337                            generated_task_count += compact_tasks.len();
338                            for task in compact_tasks {
339                                let task_id = task.task_id;
340                                if let Err(e) =
341                                    compactor.send_event(ResponseEvent::CompactTask(task.into()))
342                                {
343                                    tracing::warn!(
344                                        error = %e.as_report(),
345                                        "Failed to send task {} to {}",
346                                        task_id,
347                                        compactor.context_id(),
348                                    );
349                                    failed_tasks.push(task_id);
350                                }
351                            }
352                            if !failed_tasks.is_empty() {
353                                self.hummock_manager
354                                    .compactor_manager
355                                    .remove_compactor(context_id);
356                            }
357                            existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
358                        }
359                        Err(err) => {
360                            tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
361                            break;
362                        }
363                    };
364                }
365                for group in no_task_groups {
366                    self.hummock_manager
367                        .compaction_state
368                        .unschedule(group, task_type);
369                }
370                if let Err(err) = self
371                    .hummock_manager
372                    .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
373                    .await
374                {
375                    tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
376                }
377            }
378
379            // ack to compactor
380            if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
381                tracing::warn!(
382                    error = %e.as_report(),
383                    "Failed to send ask to {}",
384                    context_id,
385                );
386                compactor_alive = false;
387            }
388
389            return compactor_alive;
390        }
391
392        false
393    }
394
395    async fn handle_report_task_event(&self, report_events: Vec<ReportTask>) -> Result<()> {
396        if let Err(e) = self
397            .hummock_manager
398            .report_compact_tasks(report_events)
399            .await
400        {
401            tracing::error!(error = %e.as_report(), "report compact_tack fail")
402        }
403        Ok(())
404    }
405}
406
407impl<D: CompactionEventDispatcher<EventType = E::EventType>, E: CompactorStreamEvent>
408    CompactionEventLoop<D, E>
409{
410    pub fn new(
411        hummock_compactor_dispatcher: D,
412        metrics: Arc<MetaMetrics>,
413        compactor_streams_change_rx: UnboundedReceiver<(HummockContextId, Streaming<E>)>,
414    ) -> Self {
415        Self {
416            hummock_compactor_dispatcher,
417            metrics,
418            compactor_streams_change_rx,
419        }
420    }
421
422    pub fn run(mut self) -> (JoinHandle<()>, Sender<()>) {
423        let mut compactor_request_streams = FuturesUnordered::new();
424        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
425        let shutdown_rx_shared = shutdown_rx.shared();
426
427        let join_handle = tokio::spawn(async move {
428            let push_stream =
429                |context_id: HummockContextId,
430                 stream: Streaming<E>,
431                 compactor_request_streams: &mut FuturesUnordered<_>| {
432                    let future = StreamExt::into_future(stream)
433                        .map(move |stream_future| (context_id, stream_future));
434
435                    compactor_request_streams.push(future);
436                };
437
438            let mut event_loop_iteration_now = Instant::now();
439
440            loop {
441                let shutdown_rx_shared = shutdown_rx_shared.clone();
442                self.metrics
443                    .compaction_event_loop_iteration_latency
444                    .observe(event_loop_iteration_now.elapsed().as_millis() as _);
445                event_loop_iteration_now = Instant::now();
446
447                tokio::select! {
448                    _ = shutdown_rx_shared => { return; },
449
450                    compactor_stream = self.compactor_streams_change_rx.recv() => {
451                        if let Some((context_id, stream)) = compactor_stream {
452                            tracing::info!("compactor {} enters the cluster", context_id);
453                            push_stream(context_id, stream, &mut compactor_request_streams);
454                        }
455                    },
456
457                    result = pending_on_none(compactor_request_streams.next()) => {
458                        let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<E, _>>, _)) = result;
459                        let (event, create_at, stream) = match compactor_stream_req {
460                            (Some(Ok(req)), stream) => {
461                                let create_at = req.create_at();
462                                let event  = req.take_event();
463                                (event, create_at, stream)
464                            }
465
466                            (Some(Err(err)), _stream) => {
467                                tracing::warn!(error = %err.as_report(), %context_id, "compactor stream poll with err, recv stream may be destroyed");
468                                continue
469                            }
470
471                            _ => {
472                                // remove compactor from compactor manager
473                                tracing::warn!(%context_id, "compactor stream poll err, recv stream may be destroyed");
474                                self.hummock_compactor_dispatcher.remove_compactor(context_id);
475                                continue
476                            },
477                        };
478
479                        {
480                            let consumed_latency_ms = SystemTime::now()
481                                .duration_since(std::time::UNIX_EPOCH)
482                                .expect("Clock may have gone backwards")
483                                .as_millis()
484                                as u64
485                            - create_at;
486                            self.metrics
487                                .compaction_event_consumed_latency
488                                .observe(consumed_latency_ms as _);
489                        }
490
491                        let mut compactor_alive = true;
492                        if self
493                            .hummock_compactor_dispatcher
494                            .should_forward(&event)
495                        {
496                            if let Err(e) = self
497                                .hummock_compactor_dispatcher
498                                .on_event_remotely(context_id, event)
499                                .await
500                            {
501                                tracing::warn!(error = %e.as_report(), "Failed to forward event");
502                            }
503                        } else {
504                            compactor_alive = self.hummock_compactor_dispatcher.on_event_locally(
505                                context_id,
506                                event,
507                            ).await;
508                        }
509
510                        if compactor_alive {
511                            push_stream(context_id, stream, &mut compactor_request_streams);
512                        } else {
513                            tracing::warn!(%context_id, "compactor stream error, send stream may be destroyed");
514                            self
515                            .hummock_compactor_dispatcher
516                            .remove_compactor(context_id);
517                        }
518                    },
519                }
520            }
521        });
522
523        (join_handle, shutdown_tx)
524    }
525}
526
527impl CompactorStreamEvent for SubscribeCompactionEventRequest {
528    type EventType = RequestEvent;
529
530    fn take_event(self) -> Self::EventType {
531        self.event.unwrap()
532    }
533
534    fn create_at(&self) -> u64 {
535        self.create_at
536    }
537}
538
539pub struct HummockCompactorDedicatedEventLoop {
540    hummock_manager: Arc<HummockManager>,
541    hummock_compaction_event_handler: HummockCompactionEventHandler,
542}
543
544impl HummockCompactorDedicatedEventLoop {
545    pub fn new(
546        hummock_manager: Arc<HummockManager>,
547        hummock_compaction_event_handler: HummockCompactionEventHandler,
548    ) -> Self {
549        Self {
550            hummock_manager,
551            hummock_compaction_event_handler,
552        }
553    }
554
555    /// dedicated event runtime for CPU/IO bound event
556    async fn compact_task_dedicated_event_handler(
557        &self,
558        mut rx: UnboundedReceiver<(HummockContextId, RequestEvent)>,
559        shutdown_rx: OneShotReceiver<()>,
560    ) {
561        let mut compaction_selectors = init_selectors();
562
563        tokio::select! {
564            _ = shutdown_rx => {}
565
566            _ = async {
567                while let Some((context_id, event)) = rx.recv().await {
568                    let mut report_events = vec![];
569                    let mut skip_times = 0;
570                    match event {
571                        RequestEvent::PullTask(PullTask { pull_task_count }) => {
572                            self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
573                        }
574
575                        RequestEvent::ReportTask(task) => {
576                           report_events.push(task.into());
577                        }
578
579                        _ => unreachable!(),
580                    }
581                    while let Ok((context_id, event)) = rx.try_recv() {
582                        match event {
583                            RequestEvent::PullTask(PullTask { pull_task_count }) => {
584                                self.hummock_compaction_event_handler.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, self.hummock_manager.env.opts.max_get_task_probe_times).await;
585                                if !report_events.is_empty() {
586                                    if skip_times > MAX_SKIP_TIMES {
587                                        break;
588                                    }
589                                    skip_times += 1;
590                                }
591                            }
592
593                            RequestEvent::ReportTask(task) => {
594                                report_events.push(task.into());
595                                if report_events.len() >= MAX_REPORT_COUNT {
596                                    break;
597                                }
598                            }
599                        _ => unreachable!(),
600                        }
601                    }
602                    if !report_events.is_empty()
603                        && let Err(e) = self.hummock_compaction_event_handler.handle_report_task_event(report_events).await
604                        {
605                            tracing::error!(error = %e.as_report(), "report compact_tack fail")
606                        }
607                }
608            } => {}
609        }
610    }
611
612    pub fn run(
613        self,
614    ) -> (
615        JoinHandle<()>,
616        UnboundedSender<(HummockContextId, RequestEvent)>,
617        Sender<()>,
618    ) {
619        let (tx, rx) = unbounded_channel();
620        let (shutdon_tx, shutdown_rx) = tokio::sync::oneshot::channel();
621        let join_handler = tokio::spawn(async move {
622            self.compact_task_dedicated_event_handler(rx, shutdown_rx)
623                .await;
624        });
625        (join_handler, tx, shutdon_tx)
626    }
627}
628
629pub struct IcebergCompactionEventHandler {
630    compaction_manager: IcebergCompactionManagerRef,
631}
632
633impl IcebergCompactionEventHandler {
634    pub fn new(compaction_manager: IcebergCompactionManagerRef) -> Self {
635        Self { compaction_manager }
636    }
637
638    async fn handle_pull_task_event(
639        &self,
640        context_id: HummockContextId,
641        pull_task_count: usize,
642    ) -> bool {
643        assert_ne!(0, pull_task_count);
644        if let Some(compactor) = self
645            .compaction_manager
646            .iceberg_compactor_manager
647            .get_compactor(context_id)
648        {
649            let mut compactor_alive = true;
650
651            let iceberg_compaction_handles = self
652                .compaction_manager
653                .get_top_n_iceberg_commit_sink_ids(pull_task_count)
654                .await;
655
656            for handle in iceberg_compaction_handles {
657                let compactor = compactor.clone();
658                // send iceberg commit task to compactor
659                if let Err(e) = async move {
660                    handle
661                        .send_compact_task(
662                            compactor,
663                            next_compaction_task_id(&self.compaction_manager.env).await?,
664                        )
665                        .await
666                }
667                .await
668                {
669                    tracing::warn!(
670                        error = %e.as_report(),
671                        "Failed to send iceberg commit task to {}",
672                        context_id,
673                    );
674                    compactor_alive = false;
675                }
676            }
677
678            if let Err(e) =
679                compactor.send_event(IcebergResponseEvent::PullTaskAck(IcebergPullTaskAck {}))
680            {
681                tracing::warn!(
682                    error = %e.as_report(),
683                    "Failed to send ask to {}",
684                    context_id,
685                );
686                compactor_alive = false;
687            }
688
689            return compactor_alive;
690        }
691
692        false
693    }
694}
695
696pub struct IcebergCompactionEventDispatcher {
697    compaction_event_handler: IcebergCompactionEventHandler,
698}
699
700#[async_trait::async_trait]
701impl CompactionEventDispatcher for IcebergCompactionEventDispatcher {
702    type EventType = IcebergRequestEvent;
703
704    async fn on_event_locally(&self, context_id: HummockContextId, event: Self::EventType) -> bool {
705        match event {
706            IcebergRequestEvent::PullTask(IcebergPullTask { pull_task_count }) => {
707                return self
708                    .compaction_event_handler
709                    .handle_pull_task_event(context_id, pull_task_count as usize)
710                    .await;
711            }
712            _ => unreachable!(),
713        }
714    }
715
716    async fn on_event_remotely(
717        &self,
718        _context_id: HummockContextId,
719        _event: Self::EventType,
720    ) -> Result<()> {
721        unreachable!()
722    }
723
724    fn should_forward(&self, _event: &Self::EventType) -> bool {
725        false
726    }
727
728    fn remove_compactor(&self, context_id: HummockContextId) {
729        self.compaction_event_handler
730            .compaction_manager
731            .iceberg_compactor_manager
732            .remove_compactor(context_id);
733    }
734}
735
736impl IcebergCompactionEventDispatcher {
737    pub fn new(compaction_event_handler: IcebergCompactionEventHandler) -> Self {
738        Self {
739            compaction_event_handler,
740        }
741    }
742}
743
744impl CompactorStreamEvent for SubscribeIcebergCompactionEventRequest {
745    type EventType = IcebergRequestEvent;
746
747    fn take_event(self) -> Self::EventType {
748        self.event.unwrap()
749    }
750
751    fn create_at(&self) -> u64 {
752        self.create_at
753    }
754}