risingwave_storage/hummock/event_handler/
hummock_event_handler.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::pin::pin;
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering::Relaxed;
20use std::sync::{Arc, LazyLock};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use await_tree::{InstrumentAwait, SpanExt};
25use futures::FutureExt;
26use itertools::Itertools;
27use parking_lot::RwLock;
28use prometheus::{Histogram, IntGauge, IntGaugeVec};
29use risingwave_common::catalog::TableId;
30use risingwave_common::metrics::UintGauge;
31use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
32use risingwave_hummock_sdk::version::LocalHummockVersionDelta;
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46    HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49    HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50    ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
55use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
56use crate::mem_table::ImmutableMemtable;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::opts::StorageOpts;
59
60#[derive(Clone)]
61pub(crate) struct BufferTracker {
62    flush_threshold: usize,
63    min_batch_flush_size: usize,
64    global_uploading_memory_limiter: Arc<MemoryLimiter>,
65    uploader_imm_size: UintGauge,
66    uploader_uploading_task_size: UintGauge,
67}
68
69impl BufferTracker {
70    pub fn from_storage_opts(config: &StorageOpts, metrics: &HummockStateStoreMetrics) -> Self {
71        let capacity = config.shared_buffer_capacity_mb * (1 << 20);
72        let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
73        let min_batch_flush_size = config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
74        assert!(
75            flush_threshold < capacity,
76            "flush_threshold {} should be less or equal to capacity {}",
77            flush_threshold,
78            capacity
79        );
80        Self {
81            flush_threshold,
82            min_batch_flush_size,
83            global_uploading_memory_limiter: Arc::new(MemoryLimiter::new(capacity as u64)),
84            uploader_imm_size: metrics.uploader_imm_size.clone(),
85            uploader_uploading_task_size: metrics.uploader_uploading_task_size.clone(),
86        }
87    }
88
89    #[cfg(test)]
90    fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
91        Self {
92            flush_threshold,
93            min_batch_flush_size,
94            ..Self::for_test()
95        }
96    }
97
98    #[cfg(test)]
99    pub fn for_test() -> Self {
100        Self::from_storage_opts(&StorageOpts::default(), &HummockStateStoreMetrics::unused())
101    }
102
103    pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
104        &self.global_uploading_memory_limiter
105    }
106
107    pub fn global_upload_task_size(&self) -> &UintGauge {
108        &self.uploader_uploading_task_size
109    }
110
111    /// Return true when the buffer size minus current upload task size is still greater than the
112    /// flush threshold.
113    pub fn need_flush(&self) -> bool {
114        self.uploader_imm_size.get()
115            > self.flush_threshold as u64 + self.uploader_uploading_task_size.get()
116    }
117
118    pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
119        curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
120    }
121
122    #[cfg(test)]
123    pub(crate) fn flush_threshold(&self) -> usize {
124        self.flush_threshold
125    }
126}
127
128#[derive(Clone)]
129pub struct HummockEventSender {
130    inner: UnboundedSender<HummockEvent>,
131    event_count: IntGaugeVec,
132}
133
134pub fn event_channel(event_count: IntGaugeVec) -> (HummockEventSender, HummockEventReceiver) {
135    let (tx, rx) = unbounded_channel();
136    (
137        HummockEventSender {
138            inner: tx,
139            event_count: event_count.clone(),
140        },
141        HummockEventReceiver {
142            inner: rx,
143            event_count,
144        },
145    )
146}
147
148impl HummockEventSender {
149    pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
150        let event_type = event.event_name();
151        self.inner.send(event)?;
152        get_event_pending_gauge(&self.event_count, event_type).inc();
153        Ok(())
154    }
155}
156
157pub struct HummockEventReceiver {
158    inner: UnboundedReceiver<HummockEvent>,
159    event_count: IntGaugeVec,
160}
161
162impl HummockEventReceiver {
163    async fn recv(&mut self) -> Option<HummockEvent> {
164        let event = self.inner.recv().await?;
165        let event_type = event.event_name();
166        get_event_pending_gauge(&self.event_count, event_type).dec();
167        Some(event)
168    }
169}
170
171thread_local! {
172    static EVENT_PENDING_GAUGE_CACHE: RefCell<HashMap<&'static str, IntGauge>> = RefCell::new(HashMap::new());
173}
174
175fn get_event_pending_gauge(event_count: &IntGaugeVec, event_type: &'static str) -> IntGauge {
176    EVENT_PENDING_GAUGE_CACHE.with(|cache| {
177        cache
178            .borrow_mut()
179            .entry(event_type)
180            .or_insert_with(|| event_count.with_label_values(&[event_type]))
181            .clone()
182    })
183}
184
185struct HummockEventHandlerMetrics {
186    event_handler_on_upload_finish_latency: Histogram,
187    event_handler_on_apply_version_update: Histogram,
188    event_handler_on_recv_version_update: Histogram,
189    event_handler_on_spiller: Histogram,
190}
191
192pub(crate) struct HummockEventHandler {
193    hummock_event_tx: HummockEventSender,
194    hummock_event_rx: HummockEventReceiver,
195    version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
196    read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
197    /// A copy of `read_version_mapping` but owned by event handler
198    local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
199
200    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
201    recent_versions: Arc<ArcSwap<RecentVersions>>,
202
203    uploader: HummockUploader,
204    refiller: CacheRefiller,
205
206    last_instance_id: LocalInstanceId,
207
208    metrics: HummockEventHandlerMetrics,
209}
210
211async fn flush_imms(
212    payload: Vec<ImmutableMemtable>,
213    compactor_context: CompactorContext,
214    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
215    object_id_manager: Arc<ObjectIdManager>,
216) -> HummockResult<UploadTaskOutput> {
217    compact(
218        compactor_context,
219        object_id_manager,
220        payload,
221        compaction_catalog_manager_ref,
222    )
223    .instrument_await("shared_buffer_compact".verbose())
224    .await
225}
226
227impl HummockEventHandler {
228    pub fn new(
229        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
230        pinned_version: PinnedVersion,
231        compactor_context: CompactorContext,
232        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
233        object_id_manager: Arc<ObjectIdManager>,
234        state_store_metrics: Arc<HummockStateStoreMetrics>,
235    ) -> Self {
236        let upload_compactor_context = compactor_context.clone();
237        let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
238        let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
239        let recent_versions = RecentVersions::new(
240            pinned_version,
241            compactor_context
242                .storage_opts
243                .max_cached_recent_versions_number,
244            state_store_metrics.clone(),
245        );
246        let buffer_tracker =
247            BufferTracker::from_storage_opts(&compactor_context.storage_opts, &state_store_metrics);
248        Self::new_inner(
249            version_update_rx,
250            compactor_context.sstable_store.clone(),
251            state_store_metrics,
252            CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
253            recent_versions,
254            buffer_tracker,
255            Arc::new(move |payload, task_info| {
256                static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
257                    LazyLock::new(|| AtomicUsize::new(0));
258                let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
259                    let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
260                    reg.register(
261                        await_tree_key::SpawnUploadTask { id: upload_task_id },
262                        format!("Spawn Upload Task: {}", task_info),
263                    )
264                });
265                let upload_task_latency = upload_task_latency.clone();
266                let wait_poll_latency = wait_poll_latency.clone();
267                let upload_compactor_context = upload_compactor_context.clone();
268                let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
269                let object_id_manager = object_id_manager.clone();
270                spawn({
271                    let future = async move {
272                        let _timer = upload_task_latency.start_timer();
273                        let mut output = flush_imms(
274                            payload
275                                .into_values()
276                                .flat_map(|imms| imms.into_iter())
277                                .collect(),
278                            upload_compactor_context.clone(),
279                            compaction_catalog_manager_ref.clone(),
280                            object_id_manager.clone(),
281                        )
282                        .await?;
283                        assert!(
284                            output
285                                .wait_poll_timer
286                                .replace(wait_poll_latency.start_timer())
287                                .is_none(),
288                            "should not set timer before"
289                        );
290                        Ok(output)
291                    };
292                    if let Some(tree_root) = tree_root {
293                        tree_root.instrument(future).left_future()
294                    } else {
295                        future.right_future()
296                    }
297                })
298            }),
299            CacheRefiller::default_spawn_refill_task(),
300        )
301    }
302
303    fn new_inner(
304        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
305        sstable_store: SstableStoreRef,
306        state_store_metrics: Arc<HummockStateStoreMetrics>,
307        refill_config: CacheRefillConfig,
308        recent_versions: RecentVersions,
309        buffer_tracker: BufferTracker,
310        spawn_upload_task: SpawnUploadTask,
311        spawn_refill_task: SpawnRefillTask,
312    ) -> Self {
313        let (hummock_event_tx, hummock_event_rx) =
314            event_channel(state_store_metrics.event_handler_pending_event.clone());
315        let (version_update_notifier_tx, _) =
316            tokio::sync::watch::channel(recent_versions.latest_version().clone());
317        let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
318        let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
319
320        let metrics = HummockEventHandlerMetrics {
321            event_handler_on_upload_finish_latency: state_store_metrics
322                .event_handler_latency
323                .with_label_values(&["on_upload_finish"]),
324            event_handler_on_apply_version_update: state_store_metrics
325                .event_handler_latency
326                .with_label_values(&["apply_version"]),
327            event_handler_on_recv_version_update: state_store_metrics
328                .event_handler_latency
329                .with_label_values(&["recv_version_update"]),
330            event_handler_on_spiller: state_store_metrics
331                .event_handler_latency
332                .with_label_values(&["spiller"]),
333        };
334
335        let uploader = HummockUploader::new(
336            state_store_metrics,
337            recent_versions.latest_version().clone(),
338            spawn_upload_task,
339            buffer_tracker,
340        );
341        let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
342
343        Self {
344            hummock_event_tx,
345            hummock_event_rx,
346            version_update_rx,
347            version_update_notifier_tx,
348            recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
349            read_version_mapping,
350            local_read_version_mapping: Default::default(),
351            uploader,
352            refiller,
353            last_instance_id: 0,
354            metrics,
355        }
356    }
357
358    pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
359        self.version_update_notifier_tx.clone()
360    }
361
362    pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
363        self.recent_versions.clone()
364    }
365
366    pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
367        ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
368    }
369
370    pub fn event_sender(&self) -> HummockEventSender {
371        self.hummock_event_tx.clone()
372    }
373
374    pub fn buffer_tracker(&self) -> &BufferTracker {
375        self.uploader.buffer_tracker()
376    }
377}
378
379// Handler for different events
380impl HummockEventHandler {
381    /// This function will be performed under the protection of the `read_version_mapping` read
382    /// lock, and add write lock on each `read_version` operation
383    fn for_each_read_version(
384        &self,
385        instances: impl IntoIterator<Item = LocalInstanceId>,
386        mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
387    ) {
388        let instances = {
389            #[cfg(debug_assertions)]
390            {
391                // check duplication on debug_mode
392                let mut id_set = std::collections::HashSet::new();
393                for instance in instances {
394                    assert!(id_set.insert(instance));
395                }
396                id_set
397            }
398            #[cfg(not(debug_assertions))]
399            {
400                instances
401            }
402        };
403        let mut pending = VecDeque::new();
404        let mut total_count = 0;
405        for instance_id in instances {
406            let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
407                continue;
408            };
409            total_count += 1;
410            match read_version.try_write() {
411                Some(mut write_guard) => {
412                    f(instance_id, &mut write_guard);
413                }
414                _ => {
415                    pending.push_back(instance_id);
416                }
417            }
418        }
419        if !pending.is_empty() {
420            if pending.len() * 10 > total_count {
421                // Only print warn log when failed to acquire more than 10%
422                warn!(
423                    pending_count = pending.len(),
424                    total_count, "cannot acquire lock for all read version"
425                );
426            } else {
427                debug!(
428                    pending_count = pending.len(),
429                    total_count, "cannot acquire lock for all read version"
430                );
431            }
432        }
433
434        const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
435
436        while let Some(instance_id) = pending.pop_front() {
437            let (_, read_version) = self
438                .local_read_version_mapping
439                .get(&instance_id)
440                .expect("have checked exist before");
441            match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
442                Some(mut write_guard) => {
443                    f(instance_id, &mut write_guard);
444                }
445                _ => {
446                    warn!(instance_id, "failed to get lock again for instance");
447                    pending.push_back(instance_id);
448                }
449            }
450        }
451    }
452
453    fn handle_uploaded_ssts_inner(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
454        match ssts.as_slice() {
455            [] => {
456                if cfg!(debug_assertions) {
457                    panic!("empty ssts")
458                }
459            }
460            [staging_sstable_info] => {
461                trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
462                self.for_each_read_version(
463                    staging_sstable_info.imm_ids().keys().cloned(),
464                    |_, read_version| {
465                        read_version.update(VersionUpdate::Sst(staging_sstable_info.clone()))
466                    },
467                )
468            }
469            ssts => {
470                warn!(
471                    batch_size = ssts.len(),
472                    "handle multiple uploaded ssts in batch"
473                );
474                let affected_instances: HashSet<_> = ssts
475                    .iter()
476                    .flat_map(|sst| {
477                        trace!("data_flushed. SST size {}", sst.imm_size());
478                        sst.imm_ids().keys()
479                    })
480                    .copied()
481                    .collect();
482                self.for_each_read_version(affected_instances, |instance_id, read_version| {
483                    for sst in ssts {
484                        if sst.imm_ids().contains_key(&instance_id) {
485                            read_version.update(VersionUpdate::Sst(sst.clone()));
486                        }
487                    }
488                })
489            }
490        }
491    }
492
493    fn handle_sync_epoch(
494        &mut self,
495        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
496        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
497    ) {
498        debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
499        self.uploader
500            .start_sync_epoch(sync_result_sender, sync_table_epochs);
501    }
502
503    fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
504        info!(
505            current_version_id = ?self.uploader.hummock_version().id(),
506            ?table_ids,
507            "handle clear event"
508        );
509
510        self.uploader.clear(table_ids.clone());
511
512        if table_ids.is_none() {
513            assert!(
514                self.local_read_version_mapping.is_empty(),
515                "read version mapping not empty when clear. remaining tables: {:?}",
516                self.local_read_version_mapping
517                    .values()
518                    .map(|(_, read_version)| read_version.read().table_id())
519                    .collect_vec()
520            );
521        }
522
523        // Notify completion of the Clear event.
524        let _ = notifier.send(()).inspect_err(|e| {
525            error!("failed to notify completion of clear event: {:?}", e);
526        });
527
528        info!("clear finished");
529    }
530
531    fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
532        let _timer = self
533            .metrics
534            .event_handler_on_recv_version_update
535            .start_timer();
536        let pinned_version = self
537            .refiller
538            .last_new_pinned_version()
539            .cloned()
540            .unwrap_or_else(|| self.uploader.hummock_version().clone());
541
542        let mut sst_delta_infos = vec![];
543        if let Some(new_pinned_version) = Self::resolve_version_update_info(
544            &pinned_version,
545            version_payload,
546            Some(&mut sst_delta_infos),
547        ) {
548            self.refiller
549                .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
550        }
551    }
552
553    fn resolve_version_update_info(
554        pinned_version: &PinnedVersion,
555        version_payload: HummockVersionUpdate,
556        mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
557    ) -> Option<PinnedVersion> {
558        match version_payload {
559            HummockVersionUpdate::VersionDeltas(version_deltas) => {
560                let mut version_to_apply = (**pinned_version).clone();
561                {
562                    for version_delta in version_deltas {
563                        assert_eq!(version_to_apply.id, version_delta.prev_id);
564                        let local_hummock_version_delta =
565                            LocalHummockVersionDelta::from(version_delta);
566                        if let Some(sst_delta_infos) = &mut sst_delta_infos {
567                            sst_delta_infos.extend(
568                                version_to_apply
569                                    .build_sst_delta_infos(&local_hummock_version_delta)
570                                    .into_iter(),
571                            );
572                        }
573
574                        version_to_apply.apply_version_delta(&local_hummock_version_delta);
575                    }
576                }
577
578                pinned_version.new_with_local_version(version_to_apply)
579            }
580            HummockVersionUpdate::PinnedVersion(version) => {
581                pinned_version.new_pin_version(*version)
582            }
583        }
584    }
585
586    fn apply_version_updates(&mut self, events: Vec<CacheRefillerEvent>) {
587        let Some(CacheRefillerEvent {
588            new_pinned_version: latest_pinned_version,
589            ..
590        }) = events.last()
591        else {
592            if cfg!(debug_assertions) {
593                panic!("empty events")
594            }
595            return;
596        };
597        if events.len() > 1 {
598            warn!(
599                count = events.len(),
600                "handle multiple version updates in batch"
601            );
602        }
603        let _timer = self
604            .metrics
605            .event_handler_on_apply_version_update
606            .start_timer();
607        self.recent_versions.rcu(|prev_recent_versions| {
608            let mut recent_versions = None;
609            for event in &events {
610                let CacheRefillerEvent {
611                    new_pinned_version, ..
612                } = event;
613                recent_versions = Some(
614                    recent_versions
615                        .as_ref()
616                        .unwrap_or(prev_recent_versions.as_ref())
617                        .with_new_version(new_pinned_version.clone()),
618                );
619            }
620            recent_versions.expect("non-empty events")
621        });
622
623        {
624            self.for_each_read_version(
625                self.local_read_version_mapping.keys().cloned(),
626                |_, read_version| {
627                    for CacheRefillerEvent {
628                        new_pinned_version, ..
629                    } in &events
630                    {
631                        read_version
632                            .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
633                    }
634                },
635            );
636        }
637
638        self.version_update_notifier_tx.send_if_modified(|state| {
639            let mut modified = false;
640            for CacheRefillerEvent {
641                pinned_version,
642                new_pinned_version,
643            } in &events
644            {
645                assert_eq!(pinned_version.id(), state.id());
646                if state.id() == new_pinned_version.id() {
647                    continue;
648                }
649                assert!(new_pinned_version.id() > state.id());
650                *state = new_pinned_version.clone();
651                modified = true;
652            }
653            modified
654        });
655
656        debug!("update to hummock version: {}", latest_pinned_version.id(),);
657
658        self.uploader
659            .update_pinned_version(latest_pinned_version.clone());
660    }
661}
662
663impl HummockEventHandler {
664    pub async fn start_hummock_event_handler_worker(mut self) {
665        loop {
666            tokio::select! {
667                ssts = self.uploader.next_uploaded_ssts() => {
668                    self.handle_uploaded_ssts(ssts);
669                }
670                events = self.refiller.next_events() => {
671                    self.apply_version_updates(events);
672                }
673                event = pin!(self.hummock_event_rx.recv()) => {
674                    let Some(event) = event else { break };
675                    match event {
676                        HummockEvent::Shutdown => {
677                            info!("event handler shutdown");
678                            return;
679                        },
680                        event => {
681                            self.handle_hummock_event(event);
682                        }
683                    }
684                }
685                version_update = pin!(self.version_update_rx.recv()) => {
686                    let Some(version_update) = version_update else {
687                        warn!("version update stream ends. event handle shutdown");
688                        return;
689                    };
690                    self.handle_version_update(version_update);
691                }
692            }
693        }
694    }
695
696    fn handle_uploaded_ssts(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
697        let _timer = self
698            .metrics
699            .event_handler_on_upload_finish_latency
700            .start_timer();
701        self.handle_uploaded_ssts_inner(ssts);
702    }
703
704    /// Gracefully shutdown if returns `true`.
705    fn handle_hummock_event(&mut self, event: HummockEvent) {
706        match event {
707            HummockEvent::BufferMayFlush => {
708                self.uploader
709                    .may_flush(&self.metrics.event_handler_on_spiller);
710            }
711            HummockEvent::SyncEpoch {
712                sync_result_sender,
713                sync_table_epochs,
714            } => {
715                self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
716            }
717            HummockEvent::Clear(notifier, table_ids) => {
718                self.handle_clear(notifier, table_ids);
719            }
720            HummockEvent::Shutdown => {
721                unreachable!("shutdown is handled specially")
722            }
723            HummockEvent::StartEpoch { epoch, table_ids } => {
724                self.uploader.start_epoch(epoch, table_ids);
725            }
726            HummockEvent::InitEpoch {
727                instance_id,
728                init_epoch,
729            } => {
730                let table_id = self
731                    .local_read_version_mapping
732                    .get(&instance_id)
733                    .expect("should exist")
734                    .0;
735                self.uploader
736                    .init_instance(instance_id, table_id, init_epoch);
737            }
738            HummockEvent::ImmToUploader { instance_id, imms } => {
739                assert!(
740                    self.local_read_version_mapping.contains_key(&instance_id),
741                    "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
742                    instance_id,
743                    imms.first().map(|(imm, _)| imm.table_id),
744                );
745                self.uploader.add_imms(instance_id, imms);
746                self.uploader
747                    .may_flush(&self.metrics.event_handler_on_spiller);
748            }
749
750            HummockEvent::LocalSealEpoch {
751                next_epoch,
752                opts,
753                instance_id,
754            } => {
755                self.uploader
756                    .local_seal_epoch(instance_id, next_epoch, opts);
757            }
758
759            #[cfg(any(test, feature = "test"))]
760            HummockEvent::FlushEvent(sender) => {
761                let _ = sender.send(()).inspect_err(|e| {
762                    error!("unable to send flush result: {:?}", e);
763                });
764            }
765
766            HummockEvent::RegisterReadVersion {
767                table_id,
768                new_read_version_sender,
769                is_replicated,
770                vnodes,
771            } => {
772                let pinned_version = self.recent_versions.load().latest_version().clone();
773                let instance_id = self.generate_instance_id();
774                let basic_read_version = Arc::new(RwLock::new(
775                    HummockReadVersion::new_with_replication_option(
776                        table_id,
777                        instance_id,
778                        pinned_version,
779                        is_replicated,
780                        vnodes,
781                    ),
782                ));
783
784                debug!(
785                    "new read version registered: table_id: {}, instance_id: {}",
786                    table_id, instance_id
787                );
788
789                {
790                    self.local_read_version_mapping
791                        .insert(instance_id, (table_id, basic_read_version.clone()));
792                    let mut read_version_mapping_guard = self.read_version_mapping.write();
793
794                    read_version_mapping_guard
795                        .entry(table_id)
796                        .or_default()
797                        .insert(instance_id, basic_read_version.clone());
798                }
799
800                match new_read_version_sender.send((
801                    basic_read_version,
802                    LocalInstanceGuard {
803                        table_id,
804                        instance_id,
805                        event_sender: Some(self.hummock_event_tx.clone()),
806                    },
807                )) {
808                    Ok(_) => {}
809                    Err((_, mut guard)) => {
810                        warn!(
811                            "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
812                            table_id, instance_id
813                        );
814                        guard.event_sender.take().expect("sender is just set");
815                        self.destroy_read_version(instance_id);
816                    }
817                }
818            }
819
820            HummockEvent::DestroyReadVersion { instance_id } => {
821                self.uploader.may_destroy_instance(instance_id);
822                self.destroy_read_version(instance_id);
823            }
824            HummockEvent::GetMinUncommittedObjectId { result_tx } => {
825                let _ = result_tx
826                    .send(self.uploader.min_uncommitted_object_id())
827                    .inspect_err(|e| {
828                        error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
829                    });
830            }
831            HummockEvent::RegisterVectorWriter {
832                table_id,
833                init_epoch,
834            } => self.uploader.register_vector_writer(table_id, init_epoch),
835            HummockEvent::VectorWriterSealEpoch {
836                table_id,
837                next_epoch,
838                add,
839            } => {
840                self.uploader
841                    .vector_writer_seal_epoch(table_id, next_epoch, add);
842            }
843            HummockEvent::DropVectorWriter { table_id } => {
844                self.uploader.drop_vector_writer(table_id);
845            }
846        }
847    }
848
849    fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
850        {
851            {
852                debug!("read version deregister: instance_id: {}", instance_id);
853                let (table_id, _) = self
854                    .local_read_version_mapping
855                    .remove(&instance_id)
856                    .unwrap_or_else(|| {
857                        panic!(
858                            "DestroyHummockInstance inexist instance instance_id {}",
859                            instance_id
860                        )
861                    });
862                let mut read_version_mapping_guard = self.read_version_mapping.write();
863                let entry = read_version_mapping_guard
864                    .get_mut(&table_id)
865                    .unwrap_or_else(|| {
866                        panic!(
867                            "DestroyHummockInstance table_id {} instance_id {} fail",
868                            table_id, instance_id
869                        )
870                    });
871                entry.remove(&instance_id).unwrap_or_else(|| {
872                    panic!(
873                        "DestroyHummockInstance inexist instance table_id {} instance_id {}",
874                        table_id, instance_id
875                    )
876                });
877                if entry.is_empty() {
878                    read_version_mapping_guard.remove(&table_id);
879                }
880            }
881        }
882    }
883
884    fn generate_instance_id(&mut self) -> LocalInstanceId {
885        self.last_instance_id += 1;
886        self.last_instance_id
887    }
888}
889
890pub(super) fn send_sync_result(
891    sender: oneshot::Sender<HummockResult<SyncedData>>,
892    result: HummockResult<SyncedData>,
893) {
894    let _ = sender.send(result).inspect_err(|e| {
895        error!("unable to send sync result. Err: {:?}", e);
896    });
897}
898
899impl SyncedData {
900    pub fn into_sync_result(self) -> SyncResult {
901        {
902            let SyncedData {
903                uploaded_ssts,
904                table_watermarks,
905                vector_index_adds,
906            } = self;
907            let mut sync_size = 0;
908            let mut uncommitted_ssts = Vec::new();
909            let mut old_value_ssts = Vec::new();
910            // The newly uploaded `sstable_infos` contains newer data. Therefore,
911            // `newly_upload_ssts` at the front
912            for sst in uploaded_ssts {
913                sync_size += sst.imm_size();
914                uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
915                old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
916            }
917            SyncResult {
918                sync_size,
919                uncommitted_ssts,
920                table_watermarks,
921                old_value_ssts,
922                vector_index_adds,
923            }
924        }
925    }
926}
927
928#[cfg(test)]
929mod tests {
930    use std::collections::{HashMap, HashSet};
931    use std::future::poll_fn;
932    use std::sync::Arc;
933    use std::task::Poll;
934
935    use futures::FutureExt;
936    use parking_lot::Mutex;
937    use risingwave_common::bitmap::Bitmap;
938    use risingwave_common::catalog::TableId;
939    use risingwave_common::hash::VirtualNode;
940    use risingwave_common::util::epoch::{EpochExt, test_epoch};
941    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
942    use risingwave_hummock_sdk::version::HummockVersion;
943    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
944    use tokio::spawn;
945    use tokio::sync::mpsc::unbounded_channel;
946    use tokio::sync::oneshot;
947
948    use crate::hummock::HummockError;
949    use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
950    use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
951    use crate::hummock::event_handler::uploader::UploadTaskOutput;
952    use crate::hummock::event_handler::uploader::test_utils::{
953        TEST_TABLE_ID, gen_imm_inner, gen_imm_with_unlimit,
954        prepare_uploader_order_test_spawn_task_fn,
955    };
956    use crate::hummock::event_handler::{
957        HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
958    };
959    use crate::hummock::iterator::test_utils::mock_sstable_store;
960    use crate::hummock::local_version::pinned_version::PinnedVersion;
961    use crate::hummock::local_version::recent_versions::RecentVersions;
962    use crate::hummock::test_utils::default_opts_for_test;
963    use crate::mem_table::ImmutableMemtable;
964    use crate::monitor::HummockStateStoreMetrics;
965    use crate::store::SealCurrentEpochOptions;
966
967    #[tokio::test]
968    async fn test_old_epoch_sync_fail() {
969        let epoch0 = test_epoch(233);
970
971        let initial_version = PinnedVersion::new(
972            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
973                id: 1.into(),
974                state_table_info: HashMap::from_iter([(
975                    TEST_TABLE_ID,
976                    StateTableInfo {
977                        committed_epoch: epoch0,
978                        compaction_group_id: StaticCompactionGroupId::StateDefault,
979                    },
980                )]),
981                ..Default::default()
982            }),
983            unbounded_channel().0,
984        );
985
986        let (_version_update_tx, version_update_rx) = unbounded_channel();
987
988        let epoch1 = epoch0.next_epoch();
989        let epoch2 = epoch1.next_epoch();
990        let (tx, rx) = oneshot::channel();
991        let rx = Arc::new(Mutex::new(Some(rx)));
992
993        let storage_opt = default_opts_for_test();
994        let metrics = Arc::new(HummockStateStoreMetrics::unused());
995
996        let event_handler = HummockEventHandler::new_inner(
997            version_update_rx,
998            mock_sstable_store().await,
999            metrics.clone(),
1000            CacheRefillConfig::from_storage_opts(&storage_opt),
1001            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1002            BufferTracker::from_storage_opts(&storage_opt, &metrics),
1003            Arc::new(move |_, info| {
1004                assert_eq!(info.epochs.len(), 1);
1005                let epoch = info.epochs[0];
1006                match epoch {
1007                    epoch if epoch == epoch1 => {
1008                        let rx = rx.lock().take().unwrap();
1009                        spawn(async move {
1010                            rx.await.unwrap();
1011                            Err(HummockError::other("fail"))
1012                        })
1013                    }
1014                    epoch if epoch == epoch2 => spawn(async move {
1015                        Ok(UploadTaskOutput {
1016                            new_value_ssts: vec![],
1017                            old_value_ssts: vec![],
1018                            wait_poll_timer: None,
1019                        })
1020                    }),
1021                    _ => unreachable!(),
1022                }
1023            }),
1024            CacheRefiller::default_spawn_refill_task(),
1025        );
1026
1027        let event_tx = event_handler.event_sender();
1028
1029        let send_event = |event| event_tx.send(event).unwrap();
1030
1031        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1032
1033        let (read_version, guard) = {
1034            let (tx, rx) = oneshot::channel();
1035            send_event(HummockEvent::RegisterReadVersion {
1036                table_id: TEST_TABLE_ID,
1037                new_read_version_sender: tx,
1038                is_replicated: false,
1039                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1040            });
1041            rx.await.unwrap()
1042        };
1043
1044        send_event(HummockEvent::StartEpoch {
1045            epoch: epoch1,
1046            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1047        });
1048
1049        read_version.write().init();
1050
1051        send_event(HummockEvent::InitEpoch {
1052            instance_id: guard.instance_id,
1053            init_epoch: epoch1,
1054        });
1055
1056        let (imm1, tracker1) = gen_imm_with_unlimit(epoch1);
1057        read_version.write().add_pending_imm(imm1.clone(), tracker1);
1058
1059        send_event(HummockEvent::ImmToUploader {
1060            instance_id: guard.instance_id,
1061            imms: read_version.write().start_upload_pending_imms(),
1062        });
1063
1064        send_event(HummockEvent::StartEpoch {
1065            epoch: epoch2,
1066            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1067        });
1068
1069        send_event(HummockEvent::LocalSealEpoch {
1070            instance_id: guard.instance_id,
1071            next_epoch: epoch2,
1072            opts: SealCurrentEpochOptions::for_test(),
1073        });
1074
1075        {
1076            let (imm2, tracker2) = gen_imm_with_unlimit(epoch2);
1077            let mut read_version = read_version.write();
1078            read_version.add_pending_imm(imm2, tracker2);
1079
1080            send_event(HummockEvent::ImmToUploader {
1081                instance_id: guard.instance_id,
1082                imms: read_version.start_upload_pending_imms(),
1083            });
1084        }
1085
1086        let epoch3 = epoch2.next_epoch();
1087        send_event(HummockEvent::StartEpoch {
1088            epoch: epoch3,
1089            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1090        });
1091        send_event(HummockEvent::LocalSealEpoch {
1092            instance_id: guard.instance_id,
1093            next_epoch: epoch3,
1094            opts: SealCurrentEpochOptions::for_test(),
1095        });
1096
1097        let (tx1, mut rx1) = oneshot::channel();
1098        send_event(HummockEvent::SyncEpoch {
1099            sync_result_sender: tx1,
1100            sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1101        });
1102        assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1103        let (tx2, mut rx2) = oneshot::channel();
1104        send_event(HummockEvent::SyncEpoch {
1105            sync_result_sender: tx2,
1106            sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1107        });
1108        assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1109
1110        tx.send(()).unwrap();
1111        rx1.await.unwrap().unwrap_err();
1112        rx2.await.unwrap().unwrap_err();
1113
1114        send_event(HummockEvent::Shutdown);
1115        join_handle.await.unwrap();
1116    }
1117
1118    #[tokio::test]
1119    async fn test_clear_tables() {
1120        let table_id1 = TableId::new(1);
1121        let table_id2 = TableId::new(2);
1122        let epoch0 = test_epoch(233);
1123
1124        let initial_version = PinnedVersion::new(
1125            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1126                id: 1.into(),
1127                state_table_info: HashMap::from_iter([
1128                    (
1129                        table_id1,
1130                        StateTableInfo {
1131                            committed_epoch: epoch0,
1132                            compaction_group_id: StaticCompactionGroupId::StateDefault,
1133                        },
1134                    ),
1135                    (
1136                        table_id2,
1137                        StateTableInfo {
1138                            committed_epoch: epoch0,
1139                            compaction_group_id: StaticCompactionGroupId::StateDefault,
1140                        },
1141                    ),
1142                ]),
1143                ..Default::default()
1144            }),
1145            unbounded_channel().0,
1146        );
1147
1148        let (_version_update_tx, version_update_rx) = unbounded_channel();
1149
1150        let epoch1 = epoch0.next_epoch();
1151        let epoch2 = epoch1.next_epoch();
1152        let epoch3 = epoch2.next_epoch();
1153
1154        let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0).size();
1155
1156        // The buffer can hold at most 1 imm. When a new imm is added, the previous one will be spilled, and the newly added one will be retained.
1157        let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1158        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1159
1160        let gen_imm = |table_id, epoch, spill_offset| {
1161            let imm = gen_imm_inner(table_id, epoch, spill_offset);
1162            assert_eq!(imm.size(), imm_size);
1163            imm
1164        };
1165        let imm1_1 = gen_imm(table_id1, epoch1, 0);
1166        let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1167
1168        let storage_opt = default_opts_for_test();
1169        let metrics = Arc::new(HummockStateStoreMetrics::unused());
1170
1171        let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1172
1173        let event_handler = HummockEventHandler::new_inner(
1174            version_update_rx,
1175            mock_sstable_store().await,
1176            metrics.clone(),
1177            CacheRefillConfig::from_storage_opts(&storage_opt),
1178            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1179            buffer_tracker,
1180            spawn_task,
1181            CacheRefiller::default_spawn_refill_task(),
1182        );
1183
1184        let event_tx = event_handler.event_sender();
1185
1186        let send_event = |event| event_tx.send(event).unwrap();
1187        let flush_event = || async {
1188            let (tx, rx) = oneshot::channel();
1189            send_event(HummockEvent::FlushEvent(tx));
1190            rx.await.unwrap();
1191        };
1192        let start_epoch = |table_id, epoch| {
1193            send_event(HummockEvent::StartEpoch {
1194                epoch,
1195                table_ids: HashSet::from_iter([table_id]),
1196            })
1197        };
1198        let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1199            send_event(HummockEvent::InitEpoch {
1200                instance_id: instance.instance_id,
1201                init_epoch,
1202            })
1203        };
1204        let event_tx_clone = event_tx.clone();
1205        let write_imm = {
1206            let memory_limiter = memory_limiter.clone();
1207            move |read_version: &HummockReadVersionRef,
1208                  instance: &LocalInstanceGuard,
1209                  imm: &ImmutableMemtable| {
1210                let memory_limiter = memory_limiter.clone();
1211                let event_tx = event_tx_clone.clone();
1212                let read_version = read_version.clone();
1213                let imm = imm.clone();
1214                let instance_id = instance.instance_id;
1215                async move {
1216                    let tracker = memory_limiter.require_memory(imm.size() as _).await;
1217                    let mut read_version = read_version.write();
1218                    read_version.add_pending_imm(imm.clone(), tracker);
1219
1220                    event_tx
1221                        .send(HummockEvent::ImmToUploader {
1222                            instance_id,
1223                            imms: read_version.start_upload_pending_imms(),
1224                        })
1225                        .unwrap();
1226                }
1227            }
1228        };
1229        let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1230            send_event(HummockEvent::LocalSealEpoch {
1231                instance_id: instance.instance_id,
1232                next_epoch,
1233                opts: SealCurrentEpochOptions::for_test(),
1234            })
1235        };
1236        let sync_epoch = |table_id, new_sync_epoch| {
1237            let (tx, rx) = oneshot::channel();
1238            send_event(HummockEvent::SyncEpoch {
1239                sync_result_sender: tx,
1240                sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1241            });
1242            rx
1243        };
1244
1245        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1246
1247        let (read_version1, guard1) = {
1248            let (tx, rx) = oneshot::channel();
1249            send_event(HummockEvent::RegisterReadVersion {
1250                table_id: table_id1,
1251                new_read_version_sender: tx,
1252                is_replicated: false,
1253                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1254            });
1255            rx.await.unwrap()
1256        };
1257
1258        let (read_version2, guard2) = {
1259            let (tx, rx) = oneshot::channel();
1260            send_event(HummockEvent::RegisterReadVersion {
1261                table_id: table_id2,
1262                new_read_version_sender: tx,
1263                is_replicated: false,
1264                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1265            });
1266            rx.await.unwrap()
1267        };
1268
1269        // prepare data of table1
1270        let (task1_1_finish_tx, task1_1_rx) = {
1271            start_epoch(table_id1, epoch1);
1272
1273            read_version1.write().init();
1274            init_epoch(&guard1, epoch1);
1275
1276            write_imm(&read_version1, &guard1, &imm1_1).await;
1277
1278            start_epoch(table_id1, epoch2);
1279
1280            seal_epoch(&guard1, epoch2);
1281
1282            let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1283                guard1.instance_id,
1284                vec![imm1_1.batch_id()],
1285            )]));
1286
1287            let mut rx = sync_epoch(table_id1, epoch1);
1288            wait_task_start.await;
1289            assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1290
1291            write_imm(&read_version1, &guard1, &imm1_2_1).await;
1292            flush_event().await;
1293
1294            (task_finish_tx, rx)
1295        };
1296        // by now, the state in uploader of table_id1
1297        // unsync:  epoch2 -> [imm1_2]
1298        // syncing: epoch1 -> [imm1_1]
1299
1300        let (task1_2_finish_tx, _finish_txs) = {
1301            let mut finish_txs = vec![];
1302            let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1303            start_epoch(table_id2, epoch1);
1304            read_version2.write().init();
1305            init_epoch(&guard2, epoch1);
1306            let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1307                guard1.instance_id,
1308                vec![imm1_2_1.batch_id()],
1309            )]));
1310            write_imm(&read_version2, &guard2, &imm2_1_1).await;
1311            wait_task_start.await;
1312
1313            let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1314            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1315                guard2.instance_id,
1316                vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1317            )]));
1318            finish_txs.push(finish_tx);
1319            write_imm(&read_version2, &guard2, &imm2_1_2).await;
1320            wait_task_start.await;
1321
1322            let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1323            write_imm(&read_version2, &guard2, &imm2_1_3).await;
1324            start_epoch(table_id2, epoch2);
1325            seal_epoch(&guard2, epoch2);
1326            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1327                guard2.instance_id,
1328                vec![imm2_1_3.batch_id()],
1329            )]));
1330            finish_txs.push(finish_tx);
1331            let _sync_rx = sync_epoch(table_id2, epoch1);
1332            wait_task_start.await;
1333
1334            let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1335            write_imm(&read_version2, &guard2, &imm2_2_1).await;
1336            flush_event().await;
1337            let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1338            write_imm(&read_version2, &guard2, &imm2_2_2).await;
1339            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1340                guard2.instance_id,
1341                vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1342            )]));
1343            finish_txs.push(finish_tx);
1344            wait_task_start.await;
1345
1346            let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1347            write_imm(&read_version2, &guard2, &imm2_2_3).await;
1348
1349            // by now, the state in uploader of table_id2
1350            // syncing: epoch1 -> spill: [imm2_1_2, imm2_1_1], sync: [imm2_1_3]
1351            // unsync: epoch2 -> spilling: [imm2_2_2, imm2_2_1], imm: [imm2_2_3]
1352            // the state in uploader of table_id1
1353            // unsync:  epoch2 -> spilling [imm1_2]
1354            // syncing: epoch1 -> [imm1_1]
1355
1356            drop(guard2);
1357            let (clear_tx, clear_rx) = oneshot::channel();
1358            send_event(HummockEvent::Clear(
1359                clear_tx,
1360                Some(HashSet::from_iter([table_id2])),
1361            ));
1362            clear_rx.await.unwrap();
1363            (task1_2_finish_tx, finish_txs)
1364        };
1365
1366        let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1367        write_imm(&read_version1, &guard1, &imm1_2_2).await;
1368        start_epoch(table_id1, epoch3);
1369        seal_epoch(&guard1, epoch3);
1370
1371        let (tx2, mut sync_rx2) = oneshot::channel();
1372        let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1373            guard1.instance_id,
1374            vec![imm1_2_2.batch_id()],
1375        )]));
1376        send_event(HummockEvent::SyncEpoch {
1377            sync_result_sender: tx2,
1378            sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1379        });
1380        wait_task_start.await;
1381        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1382
1383        task1_1_finish_tx.send(()).unwrap();
1384        let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1385        sync_data1
1386            .uploaded_ssts
1387            .iter()
1388            .all(|sst| sst.epochs() == &vec![epoch1]);
1389        task1_2_finish_tx.send(()).unwrap();
1390        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1391        task1_2_2_finish_tx.send(()).unwrap();
1392        let sync_data2 = sync_rx2.await.unwrap().unwrap();
1393        sync_data2
1394            .uploaded_ssts
1395            .iter()
1396            .all(|sst| sst.epochs() == &vec![epoch2]);
1397
1398        send_event(HummockEvent::Shutdown);
1399        join_handle.await.unwrap();
1400    }
1401}