risingwave_storage/hummock/event_handler/
hummock_event_handler.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::pin::pin;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20use std::time::Duration;
21
22use arc_swap::ArcSwap;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::FutureExt;
25use itertools::Itertools;
26use parking_lot::RwLock;
27use prometheus::{Histogram, IntGauge};
28use risingwave_common::catalog::TableId;
29use risingwave_common::metrics::UintGauge;
30use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta};
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46    HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49    HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50    ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
55use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
56use crate::mem_table::ImmutableMemtable;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::opts::StorageOpts;
59
60#[derive(Clone)]
61pub struct BufferTracker {
62    flush_threshold: usize,
63    min_batch_flush_size: usize,
64    global_buffer: Arc<MemoryLimiter>,
65    global_upload_task_size: UintGauge,
66}
67
68impl BufferTracker {
69    pub fn from_storage_opts(config: &StorageOpts, global_upload_task_size: UintGauge) -> Self {
70        let capacity = config.shared_buffer_capacity_mb * (1 << 20);
71        let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
72        let shared_buffer_min_batch_flush_size =
73            config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
74        assert!(
75            flush_threshold < capacity,
76            "flush_threshold {} should be less or equal to capacity {}",
77            flush_threshold,
78            capacity
79        );
80        Self::new(
81            capacity,
82            flush_threshold,
83            global_upload_task_size,
84            shared_buffer_min_batch_flush_size,
85        )
86    }
87
88    #[cfg(test)]
89    fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
90        Self::new(
91            usize::MAX,
92            flush_threshold,
93            UintGauge::new("test", "test").unwrap(),
94            min_batch_flush_size,
95        )
96    }
97
98    fn new(
99        capacity: usize,
100        flush_threshold: usize,
101        global_upload_task_size: UintGauge,
102        min_batch_flush_size: usize,
103    ) -> Self {
104        assert!(capacity >= flush_threshold);
105        Self {
106            flush_threshold,
107            global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)),
108            global_upload_task_size,
109            min_batch_flush_size,
110        }
111    }
112
113    pub fn for_test() -> Self {
114        Self::from_storage_opts(
115            &StorageOpts::default(),
116            UintGauge::new("test", "test").unwrap(),
117        )
118    }
119
120    pub fn get_buffer_size(&self) -> usize {
121        self.global_buffer.get_memory_usage() as usize
122    }
123
124    pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
125        &self.global_buffer
126    }
127
128    pub fn global_upload_task_size(&self) -> &UintGauge {
129        &self.global_upload_task_size
130    }
131
132    /// Return true when the buffer size minus current upload task size is still greater than the
133    /// flush threshold.
134    pub fn need_flush(&self) -> bool {
135        self.get_buffer_size() > self.flush_threshold + self.global_upload_task_size.get() as usize
136    }
137
138    pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
139        curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
140    }
141
142    #[cfg(test)]
143    pub(crate) fn flush_threshold(&self) -> usize {
144        self.flush_threshold
145    }
146}
147
148#[derive(Clone)]
149pub struct HummockEventSender {
150    inner: UnboundedSender<HummockEvent>,
151    event_count: IntGauge,
152}
153
154pub fn event_channel(event_count: IntGauge) -> (HummockEventSender, HummockEventReceiver) {
155    let (tx, rx) = unbounded_channel();
156    (
157        HummockEventSender {
158            inner: tx,
159            event_count: event_count.clone(),
160        },
161        HummockEventReceiver {
162            inner: rx,
163            event_count,
164        },
165    )
166}
167
168impl HummockEventSender {
169    pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
170        self.inner.send(event)?;
171        self.event_count.inc();
172        Ok(())
173    }
174}
175
176pub struct HummockEventReceiver {
177    inner: UnboundedReceiver<HummockEvent>,
178    event_count: IntGauge,
179}
180
181impl HummockEventReceiver {
182    async fn recv(&mut self) -> Option<HummockEvent> {
183        let event = self.inner.recv().await?;
184        self.event_count.dec();
185        Some(event)
186    }
187}
188
189struct HummockEventHandlerMetrics {
190    event_handler_on_upload_finish_latency: Histogram,
191    event_handler_on_apply_version_update: Histogram,
192    event_handler_on_recv_version_update: Histogram,
193}
194
195pub struct HummockEventHandler {
196    hummock_event_tx: HummockEventSender,
197    hummock_event_rx: HummockEventReceiver,
198    version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
199    read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
200    /// A copy of `read_version_mapping` but owned by event handler
201    local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
202
203    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
204    recent_versions: Arc<ArcSwap<RecentVersions>>,
205
206    uploader: HummockUploader,
207    refiller: CacheRefiller,
208
209    last_instance_id: LocalInstanceId,
210
211    metrics: HummockEventHandlerMetrics,
212}
213
214async fn flush_imms(
215    payload: Vec<ImmutableMemtable>,
216    compactor_context: CompactorContext,
217    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
218    object_id_manager: Arc<ObjectIdManager>,
219) -> HummockResult<UploadTaskOutput> {
220    compact(
221        compactor_context,
222        object_id_manager,
223        payload,
224        compaction_catalog_manager_ref,
225    )
226    .instrument_await("shared_buffer_compact".verbose())
227    .await
228}
229
230impl HummockEventHandler {
231    pub fn new(
232        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
233        pinned_version: PinnedVersion,
234        compactor_context: CompactorContext,
235        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
236        object_id_manager: Arc<ObjectIdManager>,
237        state_store_metrics: Arc<HummockStateStoreMetrics>,
238    ) -> Self {
239        let upload_compactor_context = compactor_context.clone();
240        let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
241        let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
242        let recent_versions = RecentVersions::new(
243            pinned_version,
244            compactor_context
245                .storage_opts
246                .max_cached_recent_versions_number,
247            state_store_metrics.clone(),
248        );
249        let buffer_tracker = BufferTracker::from_storage_opts(
250            &compactor_context.storage_opts,
251            state_store_metrics.uploader_uploading_task_size.clone(),
252        );
253        Self::new_inner(
254            version_update_rx,
255            compactor_context.sstable_store.clone(),
256            state_store_metrics,
257            CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
258            recent_versions,
259            buffer_tracker,
260            Arc::new(move |payload, task_info| {
261                static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
262                    LazyLock::new(|| AtomicUsize::new(0));
263                let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
264                    let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
265                    reg.register(
266                        await_tree_key::SpawnUploadTask { id: upload_task_id },
267                        format!("Spawn Upload Task: {}", task_info),
268                    )
269                });
270                let upload_task_latency = upload_task_latency.clone();
271                let wait_poll_latency = wait_poll_latency.clone();
272                let upload_compactor_context = upload_compactor_context.clone();
273                let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
274                let object_id_manager = object_id_manager.clone();
275                spawn({
276                    let future = async move {
277                        let _timer = upload_task_latency.start_timer();
278                        let mut output = flush_imms(
279                            payload
280                                .into_values()
281                                .flat_map(|imms| imms.into_iter())
282                                .collect(),
283                            upload_compactor_context.clone(),
284                            compaction_catalog_manager_ref.clone(),
285                            object_id_manager.clone(),
286                        )
287                        .await?;
288                        assert!(
289                            output
290                                .wait_poll_timer
291                                .replace(wait_poll_latency.start_timer())
292                                .is_none(),
293                            "should not set timer before"
294                        );
295                        Ok(output)
296                    };
297                    if let Some(tree_root) = tree_root {
298                        tree_root.instrument(future).left_future()
299                    } else {
300                        future.right_future()
301                    }
302                })
303            }),
304            CacheRefiller::default_spawn_refill_task(),
305        )
306    }
307
308    fn new_inner(
309        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
310        sstable_store: SstableStoreRef,
311        state_store_metrics: Arc<HummockStateStoreMetrics>,
312        refill_config: CacheRefillConfig,
313        recent_versions: RecentVersions,
314        buffer_tracker: BufferTracker,
315        spawn_upload_task: SpawnUploadTask,
316        spawn_refill_task: SpawnRefillTask,
317    ) -> Self {
318        let (hummock_event_tx, hummock_event_rx) =
319            event_channel(state_store_metrics.event_handler_pending_event.clone());
320        let (version_update_notifier_tx, _) =
321            tokio::sync::watch::channel(recent_versions.latest_version().clone());
322        let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
323        let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
324
325        let metrics = HummockEventHandlerMetrics {
326            event_handler_on_upload_finish_latency: state_store_metrics
327                .event_handler_latency
328                .with_label_values(&["on_upload_finish"]),
329            event_handler_on_apply_version_update: state_store_metrics
330                .event_handler_latency
331                .with_label_values(&["apply_version"]),
332            event_handler_on_recv_version_update: state_store_metrics
333                .event_handler_latency
334                .with_label_values(&["recv_version_update"]),
335        };
336
337        let uploader = HummockUploader::new(
338            state_store_metrics.clone(),
339            recent_versions.latest_version().clone(),
340            spawn_upload_task,
341            buffer_tracker,
342        );
343        let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
344
345        Self {
346            hummock_event_tx,
347            hummock_event_rx,
348            version_update_rx,
349            version_update_notifier_tx,
350            recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
351            read_version_mapping,
352            local_read_version_mapping: Default::default(),
353            uploader,
354            refiller,
355            last_instance_id: 0,
356            metrics,
357        }
358    }
359
360    pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
361        self.version_update_notifier_tx.clone()
362    }
363
364    pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
365        self.recent_versions.clone()
366    }
367
368    pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
369        ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
370    }
371
372    pub fn event_sender(&self) -> HummockEventSender {
373        self.hummock_event_tx.clone()
374    }
375
376    pub fn buffer_tracker(&self) -> &BufferTracker {
377        self.uploader.buffer_tracker()
378    }
379}
380
381// Handler for different events
382impl HummockEventHandler {
383    /// This function will be performed under the protection of the `read_version_mapping` read
384    /// lock, and add write lock on each `read_version` operation
385    fn for_each_read_version(
386        &self,
387        instances: impl IntoIterator<Item = LocalInstanceId>,
388        mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
389    ) {
390        let instances = {
391            #[cfg(debug_assertions)]
392            {
393                // check duplication on debug_mode
394                let mut id_set = std::collections::HashSet::new();
395                for instance in instances {
396                    assert!(id_set.insert(instance));
397                }
398                id_set
399            }
400            #[cfg(not(debug_assertions))]
401            {
402                instances
403            }
404        };
405        let mut pending = VecDeque::new();
406        let mut total_count = 0;
407        for instance_id in instances {
408            let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
409                continue;
410            };
411            total_count += 1;
412            match read_version.try_write() {
413                Some(mut write_guard) => {
414                    f(instance_id, &mut write_guard);
415                }
416                _ => {
417                    pending.push_back(instance_id);
418                }
419            }
420        }
421        if !pending.is_empty() {
422            if pending.len() * 10 > total_count {
423                // Only print warn log when failed to acquire more than 10%
424                warn!(
425                    pending_count = pending.len(),
426                    total_count, "cannot acquire lock for all read version"
427                );
428            } else {
429                debug!(
430                    pending_count = pending.len(),
431                    total_count, "cannot acquire lock for all read version"
432                );
433            }
434        }
435
436        const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
437
438        while let Some(instance_id) = pending.pop_front() {
439            let (_, read_version) = self
440                .local_read_version_mapping
441                .get(&instance_id)
442                .expect("have checked exist before");
443            match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
444                Some(mut write_guard) => {
445                    f(instance_id, &mut write_guard);
446                }
447                _ => {
448                    warn!(instance_id, "failed to get lock again for instance");
449                    pending.push_back(instance_id);
450                }
451            }
452        }
453    }
454
455    fn handle_uploaded_sst_inner(&mut self, staging_sstable_info: Arc<StagingSstableInfo>) {
456        trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
457        self.for_each_read_version(
458            staging_sstable_info.imm_ids().keys().cloned(),
459            |_, read_version| read_version.update(VersionUpdate::Sst(staging_sstable_info.clone())),
460        )
461    }
462
463    fn handle_sync_epoch(
464        &mut self,
465        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
466        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
467    ) {
468        debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
469        self.uploader
470            .start_sync_epoch(sync_result_sender, sync_table_epochs);
471    }
472
473    fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
474        info!(
475            current_version_id = ?self.uploader.hummock_version().id(),
476            "handle clear event"
477        );
478
479        self.uploader.clear(table_ids.clone());
480
481        if table_ids.is_none() {
482            assert!(
483                self.local_read_version_mapping.is_empty(),
484                "read version mapping not empty when clear. remaining tables: {:?}",
485                self.local_read_version_mapping
486                    .values()
487                    .map(|(_, read_version)| read_version.read().table_id())
488                    .collect_vec()
489            );
490        }
491
492        // Notify completion of the Clear event.
493        let _ = notifier.send(()).inspect_err(|e| {
494            error!("failed to notify completion of clear event: {:?}", e);
495        });
496
497        info!("clear finished");
498    }
499
500    fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
501        let _timer = self
502            .metrics
503            .event_handler_on_recv_version_update
504            .start_timer();
505        let pinned_version = self
506            .refiller
507            .last_new_pinned_version()
508            .cloned()
509            .unwrap_or_else(|| self.uploader.hummock_version().clone());
510
511        let mut sst_delta_infos = vec![];
512        if let Some(new_pinned_version) = Self::resolve_version_update_info(
513            &pinned_version,
514            version_payload,
515            Some(&mut sst_delta_infos),
516        ) {
517            self.refiller
518                .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
519        }
520    }
521
522    fn resolve_version_update_info(
523        pinned_version: &PinnedVersion,
524        version_payload: HummockVersionUpdate,
525        mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
526    ) -> Option<PinnedVersion> {
527        match version_payload {
528            HummockVersionUpdate::VersionDeltas(version_deltas) => {
529                let mut version_to_apply = (**pinned_version).clone();
530                {
531                    let mut table_change_log_to_apply_guard =
532                        pinned_version.table_change_log_write_lock();
533                    for version_delta in version_deltas {
534                        assert_eq!(version_to_apply.id, version_delta.prev_id);
535
536                        // apply change-log-delta
537                        {
538                            let mut state_table_info = version_to_apply.state_table_info.clone();
539                            let (changed_table_info, _is_commit_epoch) = state_table_info
540                                .apply_delta(
541                                    &version_delta.state_table_info_delta,
542                                    &version_delta.removed_table_ids,
543                                );
544
545                            HummockVersionCommon::<SstableInfo>::apply_change_log_delta(
546                                &mut *table_change_log_to_apply_guard,
547                                &version_delta.change_log_delta,
548                                &version_delta.removed_table_ids,
549                                &version_delta.state_table_info_delta,
550                                &changed_table_info,
551                            );
552                        }
553
554                        let local_hummock_version_delta =
555                            LocalHummockVersionDelta::from(version_delta);
556                        if let Some(sst_delta_infos) = &mut sst_delta_infos {
557                            sst_delta_infos.extend(
558                                version_to_apply
559                                    .build_sst_delta_infos(&local_hummock_version_delta)
560                                    .into_iter(),
561                            );
562                        }
563
564                        version_to_apply.apply_version_delta(&local_hummock_version_delta);
565                    }
566                }
567
568                pinned_version.new_with_local_version(version_to_apply)
569            }
570            HummockVersionUpdate::PinnedVersion(version) => {
571                pinned_version.new_pin_version(*version)
572            }
573        }
574    }
575
576    fn apply_version_update(
577        &mut self,
578        pinned_version: PinnedVersion,
579        new_pinned_version: PinnedVersion,
580    ) {
581        let _timer = self
582            .metrics
583            .event_handler_on_apply_version_update
584            .start_timer();
585        self.recent_versions.rcu(|prev_recent_versions| {
586            prev_recent_versions.with_new_version(new_pinned_version.clone())
587        });
588
589        {
590            self.for_each_read_version(
591                self.local_read_version_mapping.keys().cloned(),
592                |_, read_version| {
593                    read_version
594                        .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
595                },
596            );
597        }
598
599        self.version_update_notifier_tx.send_if_modified(|state| {
600            assert_eq!(pinned_version.id(), state.id());
601            if state.id() == new_pinned_version.id() {
602                return false;
603            }
604            assert!(new_pinned_version.id() > state.id());
605            *state = new_pinned_version.clone();
606            true
607        });
608
609        debug!("update to hummock version: {}", new_pinned_version.id(),);
610
611        self.uploader.update_pinned_version(new_pinned_version);
612    }
613}
614
615impl HummockEventHandler {
616    pub async fn start_hummock_event_handler_worker(mut self) {
617        loop {
618            tokio::select! {
619                sst = self.uploader.next_uploaded_sst() => {
620                    self.handle_uploaded_sst(sst);
621                }
622                event = self.refiller.next_event() => {
623                    let CacheRefillerEvent {pinned_version, new_pinned_version } = event;
624                    self.apply_version_update(pinned_version, new_pinned_version);
625                }
626                event = pin!(self.hummock_event_rx.recv()) => {
627                    let Some(event) = event else { break };
628                    match event {
629                        HummockEvent::Shutdown => {
630                            info!("event handler shutdown");
631                            return;
632                        },
633                        event => {
634                            self.handle_hummock_event(event);
635                        }
636                    }
637                }
638                version_update = pin!(self.version_update_rx.recv()) => {
639                    let Some(version_update) = version_update else {
640                        warn!("version update stream ends. event handle shutdown");
641                        return;
642                    };
643                    self.handle_version_update(version_update);
644                }
645            }
646        }
647    }
648
649    fn handle_uploaded_sst(&mut self, sst: Arc<StagingSstableInfo>) {
650        let _timer = self
651            .metrics
652            .event_handler_on_upload_finish_latency
653            .start_timer();
654        self.handle_uploaded_sst_inner(sst);
655    }
656
657    /// Gracefully shutdown if returns `true`.
658    fn handle_hummock_event(&mut self, event: HummockEvent) {
659        match event {
660            HummockEvent::BufferMayFlush => {
661                self.uploader.may_flush();
662            }
663            HummockEvent::SyncEpoch {
664                sync_result_sender,
665                sync_table_epochs,
666            } => {
667                self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
668            }
669            HummockEvent::Clear(notifier, table_ids) => {
670                self.handle_clear(notifier, table_ids);
671            }
672            HummockEvent::Shutdown => {
673                unreachable!("shutdown is handled specially")
674            }
675            HummockEvent::StartEpoch { epoch, table_ids } => {
676                self.uploader.start_epoch(epoch, table_ids);
677            }
678            HummockEvent::InitEpoch {
679                instance_id,
680                init_epoch,
681            } => {
682                let table_id = self
683                    .local_read_version_mapping
684                    .get(&instance_id)
685                    .expect("should exist")
686                    .0;
687                self.uploader
688                    .init_instance(instance_id, table_id, init_epoch);
689            }
690            HummockEvent::ImmToUploader { instance_id, imms } => {
691                assert!(
692                    self.local_read_version_mapping.contains_key(&instance_id),
693                    "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
694                    instance_id,
695                    imms.first().map(|imm| imm.table_id),
696                );
697                self.uploader.add_imms(instance_id, imms);
698                self.uploader.may_flush();
699            }
700
701            HummockEvent::LocalSealEpoch {
702                next_epoch,
703                opts,
704                instance_id,
705            } => {
706                self.uploader
707                    .local_seal_epoch(instance_id, next_epoch, opts);
708            }
709
710            #[cfg(any(test, feature = "test"))]
711            HummockEvent::FlushEvent(sender) => {
712                let _ = sender.send(()).inspect_err(|e| {
713                    error!("unable to send flush result: {:?}", e);
714                });
715            }
716
717            HummockEvent::RegisterReadVersion {
718                table_id,
719                new_read_version_sender,
720                is_replicated,
721                vnodes,
722            } => {
723                let pinned_version = self.recent_versions.load().latest_version().clone();
724                let instance_id = self.generate_instance_id();
725                let basic_read_version = Arc::new(RwLock::new(
726                    HummockReadVersion::new_with_replication_option(
727                        table_id,
728                        instance_id,
729                        pinned_version,
730                        is_replicated,
731                        vnodes,
732                    ),
733                ));
734
735                debug!(
736                    "new read version registered: table_id: {}, instance_id: {}",
737                    table_id, instance_id
738                );
739
740                {
741                    self.local_read_version_mapping
742                        .insert(instance_id, (table_id, basic_read_version.clone()));
743                    let mut read_version_mapping_guard = self.read_version_mapping.write();
744
745                    read_version_mapping_guard
746                        .entry(table_id)
747                        .or_default()
748                        .insert(instance_id, basic_read_version.clone());
749                }
750
751                match new_read_version_sender.send((
752                    basic_read_version,
753                    LocalInstanceGuard {
754                        table_id,
755                        instance_id,
756                        event_sender: Some(self.hummock_event_tx.clone()),
757                    },
758                )) {
759                    Ok(_) => {}
760                    Err((_, mut guard)) => {
761                        warn!(
762                            "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
763                            table_id, instance_id
764                        );
765                        guard.event_sender.take().expect("sender is just set");
766                        self.destroy_read_version(instance_id);
767                    }
768                }
769            }
770
771            HummockEvent::DestroyReadVersion { instance_id } => {
772                self.uploader.may_destroy_instance(instance_id);
773                self.destroy_read_version(instance_id);
774            }
775            HummockEvent::GetMinUncommittedObjectId { result_tx } => {
776                let _ = result_tx
777                    .send(self.uploader.min_uncommitted_object_id())
778                    .inspect_err(|e| {
779                        error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
780                    });
781            }
782            HummockEvent::RegisterVectorWriter {
783                table_id,
784                init_epoch,
785            } => self.uploader.register_vector_writer(table_id, init_epoch),
786            HummockEvent::VectorWriterSealEpoch {
787                table_id,
788                next_epoch,
789                add,
790            } => {
791                self.uploader
792                    .vector_writer_seal_epoch(table_id, next_epoch, add);
793            }
794            HummockEvent::DropVectorWriter { table_id } => {
795                self.uploader.drop_vector_writer(table_id);
796            }
797        }
798    }
799
800    fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
801        {
802            {
803                debug!("read version deregister: instance_id: {}", instance_id);
804                let (table_id, _) = self
805                    .local_read_version_mapping
806                    .remove(&instance_id)
807                    .unwrap_or_else(|| {
808                        panic!(
809                            "DestroyHummockInstance inexist instance instance_id {}",
810                            instance_id
811                        )
812                    });
813                let mut read_version_mapping_guard = self.read_version_mapping.write();
814                let entry = read_version_mapping_guard
815                    .get_mut(&table_id)
816                    .unwrap_or_else(|| {
817                        panic!(
818                            "DestroyHummockInstance table_id {} instance_id {} fail",
819                            table_id, instance_id
820                        )
821                    });
822                entry.remove(&instance_id).unwrap_or_else(|| {
823                    panic!(
824                        "DestroyHummockInstance inexist instance table_id {} instance_id {}",
825                        table_id, instance_id
826                    )
827                });
828                if entry.is_empty() {
829                    read_version_mapping_guard.remove(&table_id);
830                }
831            }
832        }
833    }
834
835    fn generate_instance_id(&mut self) -> LocalInstanceId {
836        self.last_instance_id += 1;
837        self.last_instance_id
838    }
839}
840
841pub(super) fn send_sync_result(
842    sender: oneshot::Sender<HummockResult<SyncedData>>,
843    result: HummockResult<SyncedData>,
844) {
845    let _ = sender.send(result).inspect_err(|e| {
846        error!("unable to send sync result. Err: {:?}", e);
847    });
848}
849
850impl SyncedData {
851    pub fn into_sync_result(self) -> SyncResult {
852        {
853            let SyncedData {
854                uploaded_ssts,
855                table_watermarks,
856                vector_index_adds,
857            } = self;
858            let mut sync_size = 0;
859            let mut uncommitted_ssts = Vec::new();
860            let mut old_value_ssts = Vec::new();
861            // The newly uploaded `sstable_infos` contains newer data. Therefore,
862            // `newly_upload_ssts` at the front
863            for sst in uploaded_ssts {
864                sync_size += sst.imm_size();
865                uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
866                old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
867            }
868            SyncResult {
869                sync_size,
870                uncommitted_ssts,
871                table_watermarks: table_watermarks.clone(),
872                old_value_ssts,
873                vector_index_adds,
874            }
875        }
876    }
877}
878
879#[cfg(test)]
880mod tests {
881    use std::collections::{HashMap, HashSet};
882    use std::future::poll_fn;
883    use std::sync::Arc;
884    use std::task::Poll;
885
886    use futures::FutureExt;
887    use parking_lot::Mutex;
888    use risingwave_common::bitmap::Bitmap;
889    use risingwave_common::catalog::TableId;
890    use risingwave_common::hash::VirtualNode;
891    use risingwave_common::util::epoch::{EpochExt, test_epoch};
892    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
893    use risingwave_hummock_sdk::version::HummockVersion;
894    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
895    use tokio::spawn;
896    use tokio::sync::mpsc::unbounded_channel;
897    use tokio::sync::oneshot;
898
899    use crate::hummock::HummockError;
900    use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
901    use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
902    use crate::hummock::event_handler::uploader::UploadTaskOutput;
903    use crate::hummock::event_handler::uploader::test_utils::{
904        TEST_TABLE_ID, gen_imm, gen_imm_inner, prepare_uploader_order_test_spawn_task_fn,
905    };
906    use crate::hummock::event_handler::{
907        HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
908    };
909    use crate::hummock::iterator::test_utils::mock_sstable_store;
910    use crate::hummock::local_version::pinned_version::PinnedVersion;
911    use crate::hummock::local_version::recent_versions::RecentVersions;
912    use crate::hummock::test_utils::default_opts_for_test;
913    use crate::mem_table::ImmutableMemtable;
914    use crate::monitor::HummockStateStoreMetrics;
915    use crate::store::SealCurrentEpochOptions;
916
917    #[tokio::test]
918    async fn test_old_epoch_sync_fail() {
919        let epoch0 = test_epoch(233);
920
921        let initial_version = PinnedVersion::new(
922            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
923                id: 1,
924                state_table_info: HashMap::from_iter([(
925                    TEST_TABLE_ID.table_id,
926                    StateTableInfo {
927                        committed_epoch: epoch0,
928                        compaction_group_id: StaticCompactionGroupId::StateDefault as _,
929                    },
930                )]),
931                ..Default::default()
932            }),
933            unbounded_channel().0,
934        );
935
936        let (_version_update_tx, version_update_rx) = unbounded_channel();
937
938        let epoch1 = epoch0.next_epoch();
939        let epoch2 = epoch1.next_epoch();
940        let (tx, rx) = oneshot::channel();
941        let rx = Arc::new(Mutex::new(Some(rx)));
942
943        let storage_opt = default_opts_for_test();
944        let metrics = Arc::new(HummockStateStoreMetrics::unused());
945
946        let event_handler = HummockEventHandler::new_inner(
947            version_update_rx,
948            mock_sstable_store().await,
949            metrics.clone(),
950            CacheRefillConfig::from_storage_opts(&storage_opt),
951            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
952            BufferTracker::from_storage_opts(
953                &storage_opt,
954                metrics.uploader_uploading_task_size.clone(),
955            ),
956            Arc::new(move |_, info| {
957                assert_eq!(info.epochs.len(), 1);
958                let epoch = info.epochs[0];
959                match epoch {
960                    epoch if epoch == epoch1 => {
961                        let rx = rx.lock().take().unwrap();
962                        spawn(async move {
963                            rx.await.unwrap();
964                            Err(HummockError::other("fail"))
965                        })
966                    }
967                    epoch if epoch == epoch2 => spawn(async move {
968                        Ok(UploadTaskOutput {
969                            new_value_ssts: vec![],
970                            old_value_ssts: vec![],
971                            wait_poll_timer: None,
972                        })
973                    }),
974                    _ => unreachable!(),
975                }
976            }),
977            CacheRefiller::default_spawn_refill_task(),
978        );
979
980        let event_tx = event_handler.event_sender();
981
982        let send_event = |event| event_tx.send(event).unwrap();
983
984        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
985
986        let (read_version, guard) = {
987            let (tx, rx) = oneshot::channel();
988            send_event(HummockEvent::RegisterReadVersion {
989                table_id: TEST_TABLE_ID,
990                new_read_version_sender: tx,
991                is_replicated: false,
992                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
993            });
994            rx.await.unwrap()
995        };
996
997        send_event(HummockEvent::StartEpoch {
998            epoch: epoch1,
999            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1000        });
1001
1002        send_event(HummockEvent::InitEpoch {
1003            instance_id: guard.instance_id,
1004            init_epoch: epoch1,
1005        });
1006
1007        let imm1 = gen_imm(epoch1).await;
1008        read_version.write().add_imm(imm1.clone());
1009
1010        send_event(HummockEvent::ImmToUploader {
1011            instance_id: guard.instance_id,
1012            imms: read_version.write().start_upload_pending_imms(),
1013        });
1014
1015        send_event(HummockEvent::StartEpoch {
1016            epoch: epoch2,
1017            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1018        });
1019
1020        send_event(HummockEvent::LocalSealEpoch {
1021            instance_id: guard.instance_id,
1022            next_epoch: epoch2,
1023            opts: SealCurrentEpochOptions::for_test(),
1024        });
1025
1026        {
1027            let imm2 = gen_imm(epoch2).await;
1028            let mut read_version = read_version.write();
1029            read_version.add_imm(imm2);
1030
1031            send_event(HummockEvent::ImmToUploader {
1032                instance_id: guard.instance_id,
1033                imms: read_version.start_upload_pending_imms(),
1034            });
1035        }
1036
1037        let epoch3 = epoch2.next_epoch();
1038        send_event(HummockEvent::StartEpoch {
1039            epoch: epoch3,
1040            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1041        });
1042        send_event(HummockEvent::LocalSealEpoch {
1043            instance_id: guard.instance_id,
1044            next_epoch: epoch3,
1045            opts: SealCurrentEpochOptions::for_test(),
1046        });
1047
1048        let (tx1, mut rx1) = oneshot::channel();
1049        send_event(HummockEvent::SyncEpoch {
1050            sync_result_sender: tx1,
1051            sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1052        });
1053        assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1054        let (tx2, mut rx2) = oneshot::channel();
1055        send_event(HummockEvent::SyncEpoch {
1056            sync_result_sender: tx2,
1057            sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1058        });
1059        assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1060
1061        tx.send(()).unwrap();
1062        rx1.await.unwrap().unwrap_err();
1063        rx2.await.unwrap().unwrap_err();
1064
1065        send_event(HummockEvent::Shutdown);
1066        join_handle.await.unwrap();
1067    }
1068
1069    #[tokio::test]
1070    async fn test_clear_tables() {
1071        let table_id1 = TableId::new(1);
1072        let table_id2 = TableId::new(2);
1073        let epoch0 = test_epoch(233);
1074
1075        let initial_version = PinnedVersion::new(
1076            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1077                id: 1,
1078                state_table_info: HashMap::from_iter([
1079                    (
1080                        table_id1.table_id,
1081                        StateTableInfo {
1082                            committed_epoch: epoch0,
1083                            compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1084                        },
1085                    ),
1086                    (
1087                        table_id2.table_id,
1088                        StateTableInfo {
1089                            committed_epoch: epoch0,
1090                            compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1091                        },
1092                    ),
1093                ]),
1094                ..Default::default()
1095            }),
1096            unbounded_channel().0,
1097        );
1098
1099        let (_version_update_tx, version_update_rx) = unbounded_channel();
1100
1101        let epoch1 = epoch0.next_epoch();
1102        let epoch2 = epoch1.next_epoch();
1103        let epoch3 = epoch2.next_epoch();
1104
1105        let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0, None).await.size();
1106
1107        // The buffer can hold at most 1 imm. When a new imm is added, the previous one will be spilled, and the newly added one will be retained.
1108        let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1109        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1110
1111        let gen_imm = |table_id, epoch, spill_offset| {
1112            let imm = gen_imm_inner(table_id, epoch, spill_offset, Some(&*memory_limiter))
1113                .now_or_never()
1114                .unwrap();
1115            assert_eq!(imm.size(), imm_size);
1116            imm
1117        };
1118        let imm1_1 = gen_imm(table_id1, epoch1, 0);
1119        let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1120
1121        let storage_opt = default_opts_for_test();
1122        let metrics = Arc::new(HummockStateStoreMetrics::unused());
1123
1124        let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1125
1126        let event_handler = HummockEventHandler::new_inner(
1127            version_update_rx,
1128            mock_sstable_store().await,
1129            metrics.clone(),
1130            CacheRefillConfig::from_storage_opts(&storage_opt),
1131            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1132            buffer_tracker,
1133            spawn_task,
1134            CacheRefiller::default_spawn_refill_task(),
1135        );
1136
1137        let event_tx = event_handler.event_sender();
1138
1139        let send_event = |event| event_tx.send(event).unwrap();
1140        let flush_event = || async {
1141            let (tx, rx) = oneshot::channel();
1142            send_event(HummockEvent::FlushEvent(tx));
1143            rx.await.unwrap();
1144        };
1145        let start_epoch = |table_id, epoch| {
1146            send_event(HummockEvent::StartEpoch {
1147                epoch,
1148                table_ids: HashSet::from_iter([table_id]),
1149            })
1150        };
1151        let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1152            send_event(HummockEvent::InitEpoch {
1153                instance_id: instance.instance_id,
1154                init_epoch,
1155            })
1156        };
1157        let write_imm = |read_version: &HummockReadVersionRef,
1158                         instance: &LocalInstanceGuard,
1159                         imm: &ImmutableMemtable| {
1160            let mut read_version = read_version.write();
1161            read_version.add_imm(imm.clone());
1162
1163            send_event(HummockEvent::ImmToUploader {
1164                instance_id: instance.instance_id,
1165                imms: read_version.start_upload_pending_imms(),
1166            });
1167        };
1168        let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1169            send_event(HummockEvent::LocalSealEpoch {
1170                instance_id: instance.instance_id,
1171                next_epoch,
1172                opts: SealCurrentEpochOptions::for_test(),
1173            })
1174        };
1175        let sync_epoch = |table_id, new_sync_epoch| {
1176            let (tx, rx) = oneshot::channel();
1177            send_event(HummockEvent::SyncEpoch {
1178                sync_result_sender: tx,
1179                sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1180            });
1181            rx
1182        };
1183
1184        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1185
1186        let (read_version1, guard1) = {
1187            let (tx, rx) = oneshot::channel();
1188            send_event(HummockEvent::RegisterReadVersion {
1189                table_id: table_id1,
1190                new_read_version_sender: tx,
1191                is_replicated: false,
1192                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1193            });
1194            rx.await.unwrap()
1195        };
1196
1197        let (read_version2, guard2) = {
1198            let (tx, rx) = oneshot::channel();
1199            send_event(HummockEvent::RegisterReadVersion {
1200                table_id: table_id2,
1201                new_read_version_sender: tx,
1202                is_replicated: false,
1203                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1204            });
1205            rx.await.unwrap()
1206        };
1207
1208        // prepare data of table1
1209        let (task1_1_finish_tx, task1_1_rx) = {
1210            start_epoch(table_id1, epoch1);
1211
1212            init_epoch(&guard1, epoch1);
1213
1214            write_imm(&read_version1, &guard1, &imm1_1);
1215
1216            start_epoch(table_id1, epoch2);
1217
1218            seal_epoch(&guard1, epoch2);
1219
1220            let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1221                guard1.instance_id,
1222                vec![imm1_1.batch_id()],
1223            )]));
1224
1225            let mut rx = sync_epoch(table_id1, epoch1);
1226            wait_task_start.await;
1227            assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1228
1229            write_imm(&read_version1, &guard1, &imm1_2_1);
1230            flush_event().await;
1231
1232            (task_finish_tx, rx)
1233        };
1234        // by now, the state in uploader of table_id1
1235        // unsync:  epoch2 -> [imm1_2]
1236        // syncing: epoch1 -> [imm1_1]
1237
1238        let (task1_2_finish_tx, _finish_txs) = {
1239            let mut finish_txs = vec![];
1240            let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1241            start_epoch(table_id2, epoch1);
1242            init_epoch(&guard2, epoch1);
1243            let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1244                guard1.instance_id,
1245                vec![imm1_2_1.batch_id()],
1246            )]));
1247            write_imm(&read_version2, &guard2, &imm2_1_1);
1248            wait_task_start.await;
1249
1250            let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1251            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1252                guard2.instance_id,
1253                vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1254            )]));
1255            finish_txs.push(finish_tx);
1256            write_imm(&read_version2, &guard2, &imm2_1_2);
1257            wait_task_start.await;
1258
1259            let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1260            write_imm(&read_version2, &guard2, &imm2_1_3);
1261            start_epoch(table_id2, epoch2);
1262            seal_epoch(&guard2, epoch2);
1263            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1264                guard2.instance_id,
1265                vec![imm2_1_3.batch_id()],
1266            )]));
1267            finish_txs.push(finish_tx);
1268            let _sync_rx = sync_epoch(table_id2, epoch1);
1269            wait_task_start.await;
1270
1271            let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1272            write_imm(&read_version2, &guard2, &imm2_2_1);
1273            flush_event().await;
1274            let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1275            write_imm(&read_version2, &guard2, &imm2_2_2);
1276            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1277                guard2.instance_id,
1278                vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1279            )]));
1280            finish_txs.push(finish_tx);
1281            wait_task_start.await;
1282
1283            let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1284            write_imm(&read_version2, &guard2, &imm2_2_3);
1285
1286            // by now, the state in uploader of table_id2
1287            // syncing: epoch1 -> spill: [imm2_1_2, imm2_1_1], sync: [imm2_1_3]
1288            // unsync: epoch2 -> spilling: [imm2_2_2, imm2_2_1], imm: [imm2_2_3]
1289            // the state in uploader of table_id1
1290            // unsync:  epoch2 -> spilling [imm1_2]
1291            // syncing: epoch1 -> [imm1_1]
1292
1293            drop(guard2);
1294            let (clear_tx, clear_rx) = oneshot::channel();
1295            send_event(HummockEvent::Clear(
1296                clear_tx,
1297                Some(HashSet::from_iter([table_id2])),
1298            ));
1299            clear_rx.await.unwrap();
1300            (task1_2_finish_tx, finish_txs)
1301        };
1302
1303        let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1304        write_imm(&read_version1, &guard1, &imm1_2_2);
1305        start_epoch(table_id1, epoch3);
1306        seal_epoch(&guard1, epoch3);
1307
1308        let (tx2, mut sync_rx2) = oneshot::channel();
1309        let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1310            guard1.instance_id,
1311            vec![imm1_2_2.batch_id()],
1312        )]));
1313        send_event(HummockEvent::SyncEpoch {
1314            sync_result_sender: tx2,
1315            sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1316        });
1317        wait_task_start.await;
1318        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1319
1320        task1_1_finish_tx.send(()).unwrap();
1321        let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1322        sync_data1
1323            .uploaded_ssts
1324            .iter()
1325            .all(|sst| sst.epochs() == &vec![epoch1]);
1326        task1_2_finish_tx.send(()).unwrap();
1327        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1328        task1_2_2_finish_tx.send(()).unwrap();
1329        let sync_data2 = sync_rx2.await.unwrap().unwrap();
1330        sync_data2
1331            .uploaded_ssts
1332            .iter()
1333            .all(|sst| sst.epochs() == &vec![epoch2]);
1334
1335        send_event(HummockEvent::Shutdown);
1336        join_handle.await.unwrap();
1337    }
1338}