Skip to main content

risingwave_storage/hummock/event_handler/
hummock_event_handler.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::pin::pin;
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering::Relaxed;
20use std::sync::{Arc, LazyLock};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use await_tree::{InstrumentAwait, SpanExt};
25use futures::FutureExt;
26use itertools::Itertools;
27use parking_lot::RwLock;
28use prometheus::{Histogram, IntGauge, IntGaugeVec};
29use risingwave_common::catalog::TableId;
30use risingwave_common::metrics::UintGauge;
31use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
32use risingwave_hummock_sdk::version::LocalHummockVersionDelta;
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46    HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49    HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50    ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
55use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
56use crate::mem_table::ImmutableMemtable;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::opts::StorageOpts;
59
60#[derive(Clone)]
61pub(crate) struct BufferTracker {
62    flush_threshold: usize,
63    min_batch_flush_size: usize,
64    global_uploading_memory_limiter: Arc<MemoryLimiter>,
65    uploader_imm_size: UintGauge,
66    uploader_uploading_task_size: UintGauge,
67}
68
69impl BufferTracker {
70    pub fn from_storage_opts(config: &StorageOpts, metrics: &HummockStateStoreMetrics) -> Self {
71        let capacity = config.shared_buffer_capacity_mb * (1 << 20);
72        let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
73        let min_batch_flush_size = config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
74        assert!(
75            flush_threshold < capacity,
76            "flush_threshold {} should be less or equal to capacity {}",
77            flush_threshold,
78            capacity
79        );
80        Self {
81            flush_threshold,
82            min_batch_flush_size,
83            global_uploading_memory_limiter: Arc::new(MemoryLimiter::new(capacity as u64)),
84            uploader_imm_size: metrics.uploader_imm_size.clone(),
85            uploader_uploading_task_size: metrics.uploader_uploading_task_size.clone(),
86        }
87    }
88
89    #[cfg(test)]
90    fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
91        Self {
92            flush_threshold,
93            min_batch_flush_size,
94            ..Self::for_test()
95        }
96    }
97
98    #[cfg(test)]
99    pub fn for_test() -> Self {
100        Self::from_storage_opts(&StorageOpts::default(), &HummockStateStoreMetrics::unused())
101    }
102
103    pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
104        &self.global_uploading_memory_limiter
105    }
106
107    pub fn global_upload_task_size(&self) -> &UintGauge {
108        &self.uploader_uploading_task_size
109    }
110
111    /// Return true when the buffer size minus current upload task size is still greater than the
112    /// flush threshold.
113    pub fn need_flush(&self) -> bool {
114        self.uploader_imm_size.get()
115            > self.flush_threshold as u64 + self.uploader_uploading_task_size.get()
116    }
117
118    pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
119        curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
120    }
121
122    #[cfg(test)]
123    pub(crate) fn flush_threshold(&self) -> usize {
124        self.flush_threshold
125    }
126}
127
128#[derive(Clone)]
129pub struct HummockEventSender {
130    inner: UnboundedSender<HummockEvent>,
131    event_count: IntGaugeVec,
132}
133
134pub fn event_channel(event_count: IntGaugeVec) -> (HummockEventSender, HummockEventReceiver) {
135    let (tx, rx) = unbounded_channel();
136    (
137        HummockEventSender {
138            inner: tx,
139            event_count: event_count.clone(),
140        },
141        HummockEventReceiver {
142            inner: rx,
143            event_count,
144        },
145    )
146}
147
148impl HummockEventSender {
149    pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
150        let event_type = event.event_name();
151        self.inner.send(event)?;
152        get_event_pending_gauge(&self.event_count, event_type).inc();
153        Ok(())
154    }
155}
156
157pub struct HummockEventReceiver {
158    inner: UnboundedReceiver<HummockEvent>,
159    event_count: IntGaugeVec,
160}
161
162impl HummockEventReceiver {
163    async fn recv(&mut self) -> Option<HummockEvent> {
164        let event = self.inner.recv().await?;
165        let event_type = event.event_name();
166        get_event_pending_gauge(&self.event_count, event_type).dec();
167        Some(event)
168    }
169}
170
171thread_local! {
172    static EVENT_PENDING_GAUGE_CACHE: RefCell<HashMap<&'static str, IntGauge>> = RefCell::new(HashMap::new());
173}
174
175fn get_event_pending_gauge(event_count: &IntGaugeVec, event_type: &'static str) -> IntGauge {
176    EVENT_PENDING_GAUGE_CACHE.with(|cache| {
177        cache
178            .borrow_mut()
179            .entry(event_type)
180            .or_insert_with(|| event_count.with_label_values(&[event_type]))
181            .clone()
182    })
183}
184
185struct HummockEventHandlerMetrics {
186    event_handler_on_upload_finish_latency: Histogram,
187    event_handler_on_apply_version_update: Histogram,
188    event_handler_on_recv_version_update: Histogram,
189    event_handler_on_spiller: Histogram,
190}
191
192pub(crate) struct HummockEventHandler {
193    hummock_event_tx: HummockEventSender,
194    hummock_event_rx: HummockEventReceiver,
195    version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
196    read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
197    /// A copy of `read_version_mapping` but owned by event handler
198    local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
199
200    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
201    recent_versions: Arc<ArcSwap<RecentVersions>>,
202
203    uploader: HummockUploader,
204    refiller: CacheRefiller,
205
206    last_instance_id: LocalInstanceId,
207
208    metrics: HummockEventHandlerMetrics,
209}
210
211async fn flush_imms(
212    payload: Vec<ImmutableMemtable>,
213    compactor_context: CompactorContext,
214    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
215    object_id_manager: Arc<ObjectIdManager>,
216) -> HummockResult<UploadTaskOutput> {
217    compact(
218        compactor_context,
219        object_id_manager,
220        payload,
221        compaction_catalog_manager_ref,
222    )
223    .instrument_await("shared_buffer_compact".verbose())
224    .await
225}
226
227impl HummockEventHandler {
228    pub fn new(
229        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
230        pinned_version: PinnedVersion,
231        compactor_context: CompactorContext,
232        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
233        object_id_manager: Arc<ObjectIdManager>,
234        state_store_metrics: Arc<HummockStateStoreMetrics>,
235    ) -> Self {
236        let upload_compactor_context = compactor_context.clone();
237        let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
238        let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
239        let recent_versions = RecentVersions::new(
240            pinned_version,
241            compactor_context
242                .storage_opts
243                .max_cached_recent_versions_number,
244            state_store_metrics.clone(),
245        );
246        let buffer_tracker =
247            BufferTracker::from_storage_opts(&compactor_context.storage_opts, &state_store_metrics);
248        Self::new_inner(
249            version_update_rx,
250            compactor_context.sstable_store.clone(),
251            state_store_metrics,
252            CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
253            recent_versions,
254            buffer_tracker,
255            Arc::new(move |payload, task_info| {
256                static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
257                    LazyLock::new(|| AtomicUsize::new(0));
258                let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
259                    let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
260                    reg.register(
261                        await_tree_key::SpawnUploadTask { id: upload_task_id },
262                        format!("Spawn Upload Task: {}", task_info),
263                    )
264                });
265                let upload_task_latency = upload_task_latency.clone();
266                let wait_poll_latency = wait_poll_latency.clone();
267                let upload_compactor_context = upload_compactor_context.clone();
268                let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
269                let object_id_manager = object_id_manager.clone();
270                spawn({
271                    let future = async move {
272                        let _timer = upload_task_latency.start_timer();
273                        let mut output = flush_imms(
274                            payload
275                                .into_values()
276                                .flat_map(|imms| imms.into_iter())
277                                .collect(),
278                            upload_compactor_context.clone(),
279                            compaction_catalog_manager_ref.clone(),
280                            object_id_manager.clone(),
281                        )
282                        .await?;
283                        assert!(
284                            output
285                                .wait_poll_timer
286                                .replace(wait_poll_latency.start_timer())
287                                .is_none(),
288                            "should not set timer before"
289                        );
290                        Ok(output)
291                    };
292                    if let Some(tree_root) = tree_root {
293                        tree_root.instrument(future).left_future()
294                    } else {
295                        future.right_future()
296                    }
297                })
298            }),
299            CacheRefiller::default_spawn_refill_task(),
300        )
301    }
302
303    fn new_inner(
304        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
305        sstable_store: SstableStoreRef,
306        state_store_metrics: Arc<HummockStateStoreMetrics>,
307        refill_config: CacheRefillConfig,
308        recent_versions: RecentVersions,
309        buffer_tracker: BufferTracker,
310        spawn_upload_task: SpawnUploadTask,
311        spawn_refill_task: SpawnRefillTask,
312    ) -> Self {
313        let (hummock_event_tx, hummock_event_rx) =
314            event_channel(state_store_metrics.event_handler_pending_event.clone());
315        let (version_update_notifier_tx, _) =
316            tokio::sync::watch::channel(recent_versions.latest_version().clone());
317        let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
318        let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
319
320        let metrics = HummockEventHandlerMetrics {
321            event_handler_on_upload_finish_latency: state_store_metrics
322                .event_handler_latency
323                .with_label_values(&["on_upload_finish"]),
324            event_handler_on_apply_version_update: state_store_metrics
325                .event_handler_latency
326                .with_label_values(&["apply_version"]),
327            event_handler_on_recv_version_update: state_store_metrics
328                .event_handler_latency
329                .with_label_values(&["recv_version_update"]),
330            event_handler_on_spiller: state_store_metrics
331                .event_handler_latency
332                .with_label_values(&["spiller"]),
333        };
334
335        let uploader = HummockUploader::new(
336            state_store_metrics,
337            recent_versions.latest_version().clone(),
338            spawn_upload_task,
339            buffer_tracker,
340        );
341        let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
342
343        Self {
344            hummock_event_tx,
345            hummock_event_rx,
346            version_update_rx,
347            version_update_notifier_tx,
348            recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
349            read_version_mapping,
350            local_read_version_mapping: Default::default(),
351            uploader,
352            refiller,
353            last_instance_id: 0,
354            metrics,
355        }
356    }
357
358    pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
359        self.version_update_notifier_tx.clone()
360    }
361
362    pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
363        self.recent_versions.clone()
364    }
365
366    pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
367        ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
368    }
369
370    pub fn event_sender(&self) -> HummockEventSender {
371        self.hummock_event_tx.clone()
372    }
373
374    pub fn buffer_tracker(&self) -> &BufferTracker {
375        self.uploader.buffer_tracker()
376    }
377}
378
379// Handler for different events
380impl HummockEventHandler {
381    /// This function will be performed under the protection of the `read_version_mapping` read
382    /// lock, and add write lock on each `read_version` operation
383    fn for_each_read_version(
384        &self,
385        instances: impl IntoIterator<Item = LocalInstanceId>,
386        mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
387    ) {
388        let instances = {
389            #[cfg(debug_assertions)]
390            {
391                // check duplication on debug_mode
392                let mut id_set = std::collections::HashSet::new();
393                for instance in instances {
394                    assert!(id_set.insert(instance));
395                }
396                id_set
397            }
398            #[cfg(not(debug_assertions))]
399            {
400                instances
401            }
402        };
403        let mut pending = VecDeque::new();
404        let mut total_count = 0;
405        for instance_id in instances {
406            let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
407                continue;
408            };
409            total_count += 1;
410            match read_version.try_write() {
411                Some(mut write_guard) => {
412                    f(instance_id, &mut write_guard);
413                }
414                _ => {
415                    pending.push_back(instance_id);
416                }
417            }
418        }
419        if !pending.is_empty() {
420            if pending.len() * 10 > total_count {
421                // Only print warn log when failed to acquire more than 10%
422                warn!(
423                    pending_count = pending.len(),
424                    total_count, "cannot acquire lock for all read version"
425                );
426            } else {
427                debug!(
428                    pending_count = pending.len(),
429                    total_count, "cannot acquire lock for all read version"
430                );
431            }
432        }
433
434        const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
435
436        while let Some(instance_id) = pending.pop_front() {
437            let (_, read_version) = self
438                .local_read_version_mapping
439                .get(&instance_id)
440                .expect("have checked exist before");
441            match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
442                Some(mut write_guard) => {
443                    f(instance_id, &mut write_guard);
444                }
445                _ => {
446                    warn!(instance_id, "failed to get lock again for instance");
447                    pending.push_back(instance_id);
448                }
449            }
450        }
451    }
452
453    fn handle_uploaded_ssts_inner(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
454        match ssts.as_slice() {
455            [] => {
456                if cfg!(debug_assertions) {
457                    panic!("empty ssts")
458                }
459            }
460            [staging_sstable_info] => {
461                trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
462                self.for_each_read_version(
463                    staging_sstable_info.imm_ids().keys().cloned(),
464                    |_, read_version| {
465                        read_version.update(VersionUpdate::Sst(staging_sstable_info.clone()))
466                    },
467                )
468            }
469            ssts => {
470                warn!(
471                    batch_size = ssts.len(),
472                    "handle multiple uploaded ssts in batch"
473                );
474                let affected_instances: HashSet<_> = ssts
475                    .iter()
476                    .flat_map(|sst| {
477                        trace!("data_flushed. SST size {}", sst.imm_size());
478                        sst.imm_ids().keys()
479                    })
480                    .copied()
481                    .collect();
482                self.for_each_read_version(affected_instances, |instance_id, read_version| {
483                    for sst in ssts {
484                        if sst.imm_ids().contains_key(&instance_id) {
485                            read_version.update(VersionUpdate::Sst(sst.clone()));
486                        }
487                    }
488                })
489            }
490        }
491    }
492
493    fn handle_sync_epoch(
494        &mut self,
495        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
496        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
497    ) {
498        debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
499        self.uploader
500            .start_sync_epoch(sync_result_sender, sync_table_epochs);
501    }
502
503    fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
504        info!(
505            current_version_id = ?self.uploader.hummock_version().id(),
506            ?table_ids,
507            "handle clear event"
508        );
509
510        self.uploader.clear(table_ids.clone());
511
512        if table_ids.is_none() {
513            assert!(
514                self.local_read_version_mapping.is_empty(),
515                "read version mapping not empty when clear. remaining tables: {:?}",
516                self.local_read_version_mapping
517                    .values()
518                    .map(|(_, read_version)| read_version.read().table_id())
519                    .collect_vec()
520            );
521        }
522
523        // Notify completion of the Clear event.
524        let _ = notifier.send(()).inspect_err(|e| {
525            error!("failed to notify completion of clear event: {:?}", e);
526        });
527
528        info!("clear finished");
529    }
530
531    fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
532        let _timer = self
533            .metrics
534            .event_handler_on_recv_version_update
535            .start_timer();
536        let pinned_version = self
537            .refiller
538            .last_new_pinned_version()
539            .cloned()
540            .unwrap_or_else(|| self.uploader.hummock_version().clone());
541
542        let mut sst_delta_infos = vec![];
543        if let Some(new_pinned_version) = Self::resolve_version_update_info(
544            &pinned_version,
545            version_payload,
546            Some(&mut sst_delta_infos),
547        ) {
548            self.refiller
549                .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
550        }
551    }
552
553    fn resolve_version_update_info(
554        pinned_version: &PinnedVersion,
555        version_payload: HummockVersionUpdate,
556        mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
557    ) -> Option<PinnedVersion> {
558        match version_payload {
559            HummockVersionUpdate::VersionDeltas(version_deltas) => {
560                let mut version_to_apply = (**pinned_version).clone();
561                {
562                    for version_delta in version_deltas {
563                        assert_eq!(version_to_apply.id, version_delta.prev_id);
564                        let local_hummock_version_delta =
565                            LocalHummockVersionDelta::from(version_delta);
566                        if let Some(sst_delta_infos) = &mut sst_delta_infos {
567                            sst_delta_infos.extend(
568                                version_to_apply
569                                    .build_sst_delta_infos(&local_hummock_version_delta),
570                            );
571                        }
572
573                        version_to_apply.apply_version_delta(&local_hummock_version_delta);
574                    }
575                }
576
577                pinned_version.new_with_local_version(version_to_apply)
578            }
579            HummockVersionUpdate::PinnedVersion(version) => {
580                pinned_version.new_pin_version(*version)
581            }
582        }
583    }
584
585    fn apply_version_updates(&mut self, events: Vec<CacheRefillerEvent>) {
586        let Some(CacheRefillerEvent {
587            new_pinned_version: latest_pinned_version,
588            ..
589        }) = events.last()
590        else {
591            if cfg!(debug_assertions) {
592                panic!("empty events")
593            }
594            return;
595        };
596        if events.len() > 1 {
597            warn!(
598                count = events.len(),
599                "handle multiple version updates in batch"
600            );
601        }
602        let _timer = self
603            .metrics
604            .event_handler_on_apply_version_update
605            .start_timer();
606        self.recent_versions.rcu(|prev_recent_versions| {
607            let mut recent_versions = None;
608            for event in &events {
609                let CacheRefillerEvent {
610                    new_pinned_version, ..
611                } = event;
612                recent_versions = Some(
613                    recent_versions
614                        .as_ref()
615                        .unwrap_or(prev_recent_versions.as_ref())
616                        .with_new_version(new_pinned_version.clone()),
617                );
618            }
619            recent_versions.expect("non-empty events")
620        });
621
622        {
623            self.for_each_read_version(
624                self.local_read_version_mapping.keys().cloned(),
625                |_, read_version| {
626                    for CacheRefillerEvent {
627                        new_pinned_version, ..
628                    } in &events
629                    {
630                        read_version
631                            .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
632                    }
633                },
634            );
635        }
636
637        self.version_update_notifier_tx.send_if_modified(|state| {
638            let mut modified = false;
639            for CacheRefillerEvent {
640                pinned_version,
641                new_pinned_version,
642            } in &events
643            {
644                assert_eq!(pinned_version.id(), state.id());
645                if state.id() == new_pinned_version.id() {
646                    continue;
647                }
648                assert!(new_pinned_version.id() > state.id());
649                *state = new_pinned_version.clone();
650                modified = true;
651            }
652            modified
653        });
654
655        debug!("update to hummock version: {}", latest_pinned_version.id(),);
656
657        self.uploader
658            .update_pinned_version(latest_pinned_version.clone());
659    }
660}
661
662impl HummockEventHandler {
663    pub async fn start_hummock_event_handler_worker(mut self) {
664        loop {
665            tokio::select! {
666                ssts = self.uploader.next_uploaded_ssts() => {
667                    self.handle_uploaded_ssts(ssts);
668                }
669                events = self.refiller.next_events() => {
670                    self.apply_version_updates(events);
671                }
672                event = pin!(self.hummock_event_rx.recv()) => {
673                    let Some(event) = event else { break };
674                    match event {
675                        HummockEvent::Shutdown => {
676                            info!("event handler shutdown");
677                            return;
678                        },
679                        event => {
680                            self.handle_hummock_event(event);
681                        }
682                    }
683                }
684                version_update = pin!(self.version_update_rx.recv()) => {
685                    let Some(version_update) = version_update else {
686                        warn!("version update stream ends. event handle shutdown");
687                        return;
688                    };
689                    self.handle_version_update(version_update);
690                }
691            }
692        }
693    }
694
695    fn handle_uploaded_ssts(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
696        let _timer = self
697            .metrics
698            .event_handler_on_upload_finish_latency
699            .start_timer();
700        self.handle_uploaded_ssts_inner(ssts);
701    }
702
703    /// Gracefully shutdown if returns `true`.
704    fn handle_hummock_event(&mut self, event: HummockEvent) {
705        match event {
706            HummockEvent::BufferMayFlush => {
707                self.uploader
708                    .may_flush(&self.metrics.event_handler_on_spiller);
709            }
710            HummockEvent::SyncEpoch {
711                sync_result_sender,
712                sync_table_epochs,
713            } => {
714                self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
715            }
716            HummockEvent::Clear(notifier, table_ids) => {
717                self.handle_clear(notifier, table_ids);
718            }
719            HummockEvent::Shutdown => {
720                unreachable!("shutdown is handled specially")
721            }
722            HummockEvent::StartEpoch { epoch, table_ids } => {
723                self.uploader.start_epoch(epoch, table_ids);
724            }
725            HummockEvent::InitEpoch {
726                instance_id,
727                init_epoch,
728            } => {
729                let table_id = self
730                    .local_read_version_mapping
731                    .get(&instance_id)
732                    .expect("should exist")
733                    .0;
734                self.uploader
735                    .init_instance(instance_id, table_id, init_epoch);
736            }
737            HummockEvent::ImmToUploader { instance_id, imms } => {
738                assert!(
739                    self.local_read_version_mapping.contains_key(&instance_id),
740                    "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
741                    instance_id,
742                    imms.first().map(|(imm, _)| imm.table_id),
743                );
744                self.uploader.add_imms(instance_id, imms);
745                self.uploader
746                    .may_flush(&self.metrics.event_handler_on_spiller);
747            }
748
749            HummockEvent::LocalSealEpoch {
750                next_epoch,
751                opts,
752                instance_id,
753            } => {
754                self.uploader
755                    .local_seal_epoch(instance_id, next_epoch, opts);
756            }
757
758            #[cfg(any(test, feature = "test"))]
759            HummockEvent::FlushEvent(sender) => {
760                let _ = sender.send(()).inspect_err(|e| {
761                    error!("unable to send flush result: {:?}", e);
762                });
763            }
764
765            HummockEvent::RegisterReadVersion {
766                table_id,
767                new_read_version_sender,
768                is_replicated,
769                vnodes,
770            } => {
771                let pinned_version = self.recent_versions.load().latest_version().clone();
772                let instance_id = self.generate_instance_id();
773                let basic_read_version = Arc::new(RwLock::new(
774                    HummockReadVersion::new_with_replication_option(
775                        table_id,
776                        instance_id,
777                        pinned_version,
778                        is_replicated,
779                        vnodes,
780                    ),
781                ));
782
783                debug!(
784                    "new read version registered: table_id: {}, instance_id: {}",
785                    table_id, instance_id
786                );
787
788                {
789                    self.local_read_version_mapping
790                        .insert(instance_id, (table_id, basic_read_version.clone()));
791                    let mut read_version_mapping_guard = self.read_version_mapping.write();
792
793                    read_version_mapping_guard
794                        .entry(table_id)
795                        .or_default()
796                        .insert(instance_id, basic_read_version.clone());
797                }
798
799                match new_read_version_sender.send((
800                    basic_read_version,
801                    LocalInstanceGuard {
802                        table_id,
803                        instance_id,
804                        event_sender: Some(self.hummock_event_tx.clone()),
805                    },
806                )) {
807                    Ok(_) => {}
808                    Err((_, mut guard)) => {
809                        warn!(
810                            "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
811                            table_id, instance_id
812                        );
813                        guard.event_sender.take().expect("sender is just set");
814                        self.destroy_read_version(instance_id);
815                    }
816                }
817            }
818
819            HummockEvent::DestroyReadVersion { instance_id } => {
820                self.uploader.may_destroy_instance(instance_id);
821                self.destroy_read_version(instance_id);
822            }
823            HummockEvent::GetMinUncommittedObjectId { result_tx } => {
824                let _ = result_tx
825                    .send(self.uploader.min_uncommitted_object_id())
826                    .inspect_err(|e| {
827                        error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
828                    });
829            }
830            HummockEvent::RegisterVectorWriter {
831                table_id,
832                init_epoch,
833            } => self.uploader.register_vector_writer(table_id, init_epoch),
834            HummockEvent::VectorWriterSealEpoch {
835                table_id,
836                next_epoch,
837                add,
838            } => {
839                self.uploader
840                    .vector_writer_seal_epoch(table_id, next_epoch, add);
841            }
842            HummockEvent::DropVectorWriter { table_id } => {
843                self.uploader.drop_vector_writer(table_id);
844            }
845        }
846    }
847
848    fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
849        {
850            {
851                debug!("read version deregister: instance_id: {}", instance_id);
852                let (table_id, _) = self
853                    .local_read_version_mapping
854                    .remove(&instance_id)
855                    .unwrap_or_else(|| {
856                        panic!(
857                            "DestroyHummockInstance inexist instance instance_id {}",
858                            instance_id
859                        )
860                    });
861                let mut read_version_mapping_guard = self.read_version_mapping.write();
862                let entry = read_version_mapping_guard
863                    .get_mut(&table_id)
864                    .unwrap_or_else(|| {
865                        panic!(
866                            "DestroyHummockInstance table_id {} instance_id {} fail",
867                            table_id, instance_id
868                        )
869                    });
870                entry.remove(&instance_id).unwrap_or_else(|| {
871                    panic!(
872                        "DestroyHummockInstance inexist instance table_id {} instance_id {}",
873                        table_id, instance_id
874                    )
875                });
876                if entry.is_empty() {
877                    read_version_mapping_guard.remove(&table_id);
878                }
879            }
880        }
881    }
882
883    fn generate_instance_id(&mut self) -> LocalInstanceId {
884        self.last_instance_id += 1;
885        self.last_instance_id
886    }
887}
888
889pub(super) fn send_sync_result(
890    sender: oneshot::Sender<HummockResult<SyncedData>>,
891    result: HummockResult<SyncedData>,
892) {
893    let _ = sender.send(result).inspect_err(|e| {
894        error!("unable to send sync result. Err: {:?}", e);
895    });
896}
897
898impl SyncedData {
899    pub fn into_sync_result(self) -> SyncResult {
900        {
901            let SyncedData {
902                uploaded_ssts,
903                table_watermarks,
904                vector_index_adds,
905            } = self;
906            let mut sync_size = 0;
907            let mut uncommitted_ssts = Vec::new();
908            let mut old_value_ssts = Vec::new();
909            // The newly uploaded `sstable_infos` contains newer data. Therefore,
910            // `newly_upload_ssts` at the front
911            for sst in uploaded_ssts {
912                sync_size += sst.imm_size();
913                uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
914                old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
915            }
916            SyncResult {
917                sync_size,
918                uncommitted_ssts,
919                table_watermarks,
920                old_value_ssts,
921                vector_index_adds,
922            }
923        }
924    }
925}
926
927#[cfg(test)]
928mod tests {
929    use std::collections::{HashMap, HashSet};
930    use std::future::poll_fn;
931    use std::sync::Arc;
932    use std::task::Poll;
933
934    use futures::FutureExt;
935    use parking_lot::Mutex;
936    use risingwave_common::bitmap::Bitmap;
937    use risingwave_common::catalog::TableId;
938    use risingwave_common::hash::VirtualNode;
939    use risingwave_common::util::epoch::{EpochExt, test_epoch};
940    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
941    use risingwave_hummock_sdk::version::HummockVersion;
942    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
943    use tokio::spawn;
944    use tokio::sync::mpsc::unbounded_channel;
945    use tokio::sync::oneshot;
946
947    use crate::hummock::HummockError;
948    use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
949    use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
950    use crate::hummock::event_handler::uploader::UploadTaskOutput;
951    use crate::hummock::event_handler::uploader::test_utils::{
952        TEST_TABLE_ID, gen_imm_inner, gen_imm_with_unlimit,
953        prepare_uploader_order_test_spawn_task_fn,
954    };
955    use crate::hummock::event_handler::{
956        HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
957    };
958    use crate::hummock::iterator::test_utils::mock_sstable_store;
959    use crate::hummock::local_version::pinned_version::PinnedVersion;
960    use crate::hummock::local_version::recent_versions::RecentVersions;
961    use crate::hummock::test_utils::default_opts_for_test;
962    use crate::mem_table::ImmutableMemtable;
963    use crate::monitor::HummockStateStoreMetrics;
964    use crate::store::SealCurrentEpochOptions;
965
966    #[tokio::test]
967    async fn test_old_epoch_sync_fail() {
968        let epoch0 = test_epoch(233);
969
970        let initial_version = PinnedVersion::new(
971            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
972                id: 1.into(),
973                state_table_info: HashMap::from_iter([(
974                    TEST_TABLE_ID,
975                    StateTableInfo {
976                        committed_epoch: epoch0,
977                        compaction_group_id: StaticCompactionGroupId::StateDefault,
978                    },
979                )]),
980                ..Default::default()
981            }),
982            unbounded_channel().0,
983        );
984
985        let (_version_update_tx, version_update_rx) = unbounded_channel();
986
987        let epoch1 = epoch0.next_epoch();
988        let epoch2 = epoch1.next_epoch();
989        let (tx, rx) = oneshot::channel();
990        let rx = Arc::new(Mutex::new(Some(rx)));
991
992        let storage_opt = default_opts_for_test();
993        let metrics = Arc::new(HummockStateStoreMetrics::unused());
994
995        let event_handler = HummockEventHandler::new_inner(
996            version_update_rx,
997            mock_sstable_store().await,
998            metrics.clone(),
999            CacheRefillConfig::from_storage_opts(&storage_opt),
1000            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1001            BufferTracker::from_storage_opts(&storage_opt, &metrics),
1002            Arc::new(move |_, info| {
1003                assert_eq!(info.epochs.len(), 1);
1004                let epoch = info.epochs[0];
1005                match epoch {
1006                    epoch if epoch == epoch1 => {
1007                        let rx = rx.lock().take().unwrap();
1008                        spawn(async move {
1009                            rx.await.unwrap();
1010                            Err(HummockError::other("fail"))
1011                        })
1012                    }
1013                    epoch if epoch == epoch2 => spawn(async move {
1014                        Ok(UploadTaskOutput {
1015                            new_value_ssts: vec![],
1016                            old_value_ssts: vec![],
1017                            wait_poll_timer: None,
1018                        })
1019                    }),
1020                    _ => unreachable!(),
1021                }
1022            }),
1023            CacheRefiller::default_spawn_refill_task(),
1024        );
1025
1026        let event_tx = event_handler.event_sender();
1027
1028        let send_event = |event| event_tx.send(event).unwrap();
1029
1030        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1031
1032        let (read_version, guard) = {
1033            let (tx, rx) = oneshot::channel();
1034            send_event(HummockEvent::RegisterReadVersion {
1035                table_id: TEST_TABLE_ID,
1036                new_read_version_sender: tx,
1037                is_replicated: false,
1038                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1039            });
1040            rx.await.unwrap()
1041        };
1042
1043        send_event(HummockEvent::StartEpoch {
1044            epoch: epoch1,
1045            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1046        });
1047
1048        read_version.write().init();
1049
1050        send_event(HummockEvent::InitEpoch {
1051            instance_id: guard.instance_id,
1052            init_epoch: epoch1,
1053        });
1054
1055        let (imm1, tracker1) = gen_imm_with_unlimit(epoch1);
1056        read_version.write().add_pending_imm(imm1.clone(), tracker1);
1057
1058        send_event(HummockEvent::ImmToUploader {
1059            instance_id: guard.instance_id,
1060            imms: read_version.write().start_upload_pending_imms(),
1061        });
1062
1063        send_event(HummockEvent::StartEpoch {
1064            epoch: epoch2,
1065            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1066        });
1067
1068        send_event(HummockEvent::LocalSealEpoch {
1069            instance_id: guard.instance_id,
1070            next_epoch: epoch2,
1071            opts: SealCurrentEpochOptions::for_test(),
1072        });
1073
1074        {
1075            let (imm2, tracker2) = gen_imm_with_unlimit(epoch2);
1076            let mut read_version = read_version.write();
1077            read_version.add_pending_imm(imm2, tracker2);
1078
1079            send_event(HummockEvent::ImmToUploader {
1080                instance_id: guard.instance_id,
1081                imms: read_version.start_upload_pending_imms(),
1082            });
1083        }
1084
1085        let epoch3 = epoch2.next_epoch();
1086        send_event(HummockEvent::StartEpoch {
1087            epoch: epoch3,
1088            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1089        });
1090        send_event(HummockEvent::LocalSealEpoch {
1091            instance_id: guard.instance_id,
1092            next_epoch: epoch3,
1093            opts: SealCurrentEpochOptions::for_test(),
1094        });
1095
1096        let (tx1, mut rx1) = oneshot::channel();
1097        send_event(HummockEvent::SyncEpoch {
1098            sync_result_sender: tx1,
1099            sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1100        });
1101        assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1102        let (tx2, mut rx2) = oneshot::channel();
1103        send_event(HummockEvent::SyncEpoch {
1104            sync_result_sender: tx2,
1105            sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1106        });
1107        assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1108
1109        tx.send(()).unwrap();
1110        rx1.await.unwrap().unwrap_err();
1111        rx2.await.unwrap().unwrap_err();
1112
1113        send_event(HummockEvent::Shutdown);
1114        join_handle.await.unwrap();
1115    }
1116
1117    #[tokio::test]
1118    async fn test_clear_tables() {
1119        let table_id1 = TableId::new(1);
1120        let table_id2 = TableId::new(2);
1121        let epoch0 = test_epoch(233);
1122
1123        let initial_version = PinnedVersion::new(
1124            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1125                id: 1.into(),
1126                state_table_info: HashMap::from_iter([
1127                    (
1128                        table_id1,
1129                        StateTableInfo {
1130                            committed_epoch: epoch0,
1131                            compaction_group_id: StaticCompactionGroupId::StateDefault,
1132                        },
1133                    ),
1134                    (
1135                        table_id2,
1136                        StateTableInfo {
1137                            committed_epoch: epoch0,
1138                            compaction_group_id: StaticCompactionGroupId::StateDefault,
1139                        },
1140                    ),
1141                ]),
1142                ..Default::default()
1143            }),
1144            unbounded_channel().0,
1145        );
1146
1147        let (_version_update_tx, version_update_rx) = unbounded_channel();
1148
1149        let epoch1 = epoch0.next_epoch();
1150        let epoch2 = epoch1.next_epoch();
1151        let epoch3 = epoch2.next_epoch();
1152
1153        let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0).size();
1154
1155        // The buffer can hold at most 1 imm. When a new imm is added, the previous one will be spilled, and the newly added one will be retained.
1156        let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1157        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1158
1159        let gen_imm = |table_id, epoch, spill_offset| {
1160            let imm = gen_imm_inner(table_id, epoch, spill_offset);
1161            assert_eq!(imm.size(), imm_size);
1162            imm
1163        };
1164        let imm1_1 = gen_imm(table_id1, epoch1, 0);
1165        let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1166
1167        let storage_opt = default_opts_for_test();
1168        let metrics = Arc::new(HummockStateStoreMetrics::unused());
1169
1170        let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1171
1172        let event_handler = HummockEventHandler::new_inner(
1173            version_update_rx,
1174            mock_sstable_store().await,
1175            metrics.clone(),
1176            CacheRefillConfig::from_storage_opts(&storage_opt),
1177            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1178            buffer_tracker,
1179            spawn_task,
1180            CacheRefiller::default_spawn_refill_task(),
1181        );
1182
1183        let event_tx = event_handler.event_sender();
1184
1185        let send_event = |event| event_tx.send(event).unwrap();
1186        let flush_event = || async {
1187            let (tx, rx) = oneshot::channel();
1188            send_event(HummockEvent::FlushEvent(tx));
1189            rx.await.unwrap();
1190        };
1191        let start_epoch = |table_id, epoch| {
1192            send_event(HummockEvent::StartEpoch {
1193                epoch,
1194                table_ids: HashSet::from_iter([table_id]),
1195            })
1196        };
1197        let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1198            send_event(HummockEvent::InitEpoch {
1199                instance_id: instance.instance_id,
1200                init_epoch,
1201            })
1202        };
1203        let event_tx_clone = event_tx.clone();
1204        let write_imm = {
1205            let memory_limiter = memory_limiter.clone();
1206            move |read_version: &HummockReadVersionRef,
1207                  instance: &LocalInstanceGuard,
1208                  imm: &ImmutableMemtable| {
1209                let memory_limiter = memory_limiter.clone();
1210                let event_tx = event_tx_clone.clone();
1211                let read_version = read_version.clone();
1212                let imm = imm.clone();
1213                let instance_id = instance.instance_id;
1214                async move {
1215                    let tracker = memory_limiter.require_memory(imm.size() as _).await;
1216                    let mut read_version = read_version.write();
1217                    read_version.add_pending_imm(imm.clone(), tracker);
1218
1219                    event_tx
1220                        .send(HummockEvent::ImmToUploader {
1221                            instance_id,
1222                            imms: read_version.start_upload_pending_imms(),
1223                        })
1224                        .unwrap();
1225                }
1226            }
1227        };
1228        let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1229            send_event(HummockEvent::LocalSealEpoch {
1230                instance_id: instance.instance_id,
1231                next_epoch,
1232                opts: SealCurrentEpochOptions::for_test(),
1233            })
1234        };
1235        let sync_epoch = |table_id, new_sync_epoch| {
1236            let (tx, rx) = oneshot::channel();
1237            send_event(HummockEvent::SyncEpoch {
1238                sync_result_sender: tx,
1239                sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1240            });
1241            rx
1242        };
1243
1244        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1245
1246        let (read_version1, guard1) = {
1247            let (tx, rx) = oneshot::channel();
1248            send_event(HummockEvent::RegisterReadVersion {
1249                table_id: table_id1,
1250                new_read_version_sender: tx,
1251                is_replicated: false,
1252                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1253            });
1254            rx.await.unwrap()
1255        };
1256
1257        let (read_version2, guard2) = {
1258            let (tx, rx) = oneshot::channel();
1259            send_event(HummockEvent::RegisterReadVersion {
1260                table_id: table_id2,
1261                new_read_version_sender: tx,
1262                is_replicated: false,
1263                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1264            });
1265            rx.await.unwrap()
1266        };
1267
1268        // prepare data of table1
1269        let (task1_1_finish_tx, task1_1_rx) = {
1270            start_epoch(table_id1, epoch1);
1271
1272            read_version1.write().init();
1273            init_epoch(&guard1, epoch1);
1274
1275            write_imm(&read_version1, &guard1, &imm1_1).await;
1276
1277            start_epoch(table_id1, epoch2);
1278
1279            seal_epoch(&guard1, epoch2);
1280
1281            let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1282                guard1.instance_id,
1283                vec![imm1_1.batch_id()],
1284            )]));
1285
1286            let mut rx = sync_epoch(table_id1, epoch1);
1287            wait_task_start.await;
1288            assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1289
1290            write_imm(&read_version1, &guard1, &imm1_2_1).await;
1291            flush_event().await;
1292
1293            (task_finish_tx, rx)
1294        };
1295        // by now, the state in uploader of table_id1
1296        // unsync:  epoch2 -> [imm1_2]
1297        // syncing: epoch1 -> [imm1_1]
1298
1299        let (task1_2_finish_tx, _finish_txs) = {
1300            let mut finish_txs = vec![];
1301            let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1302            start_epoch(table_id2, epoch1);
1303            read_version2.write().init();
1304            init_epoch(&guard2, epoch1);
1305            let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1306                guard1.instance_id,
1307                vec![imm1_2_1.batch_id()],
1308            )]));
1309            write_imm(&read_version2, &guard2, &imm2_1_1).await;
1310            wait_task_start.await;
1311
1312            let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1313            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1314                guard2.instance_id,
1315                vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1316            )]));
1317            finish_txs.push(finish_tx);
1318            write_imm(&read_version2, &guard2, &imm2_1_2).await;
1319            wait_task_start.await;
1320
1321            let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1322            write_imm(&read_version2, &guard2, &imm2_1_3).await;
1323            start_epoch(table_id2, epoch2);
1324            seal_epoch(&guard2, epoch2);
1325            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1326                guard2.instance_id,
1327                vec![imm2_1_3.batch_id()],
1328            )]));
1329            finish_txs.push(finish_tx);
1330            let _sync_rx = sync_epoch(table_id2, epoch1);
1331            wait_task_start.await;
1332
1333            let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1334            write_imm(&read_version2, &guard2, &imm2_2_1).await;
1335            flush_event().await;
1336            let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1337            write_imm(&read_version2, &guard2, &imm2_2_2).await;
1338            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1339                guard2.instance_id,
1340                vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1341            )]));
1342            finish_txs.push(finish_tx);
1343            wait_task_start.await;
1344
1345            let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1346            write_imm(&read_version2, &guard2, &imm2_2_3).await;
1347
1348            // by now, the state in uploader of table_id2
1349            // syncing: epoch1 -> spill: [imm2_1_2, imm2_1_1], sync: [imm2_1_3]
1350            // unsync: epoch2 -> spilling: [imm2_2_2, imm2_2_1], imm: [imm2_2_3]
1351            // the state in uploader of table_id1
1352            // unsync:  epoch2 -> spilling [imm1_2]
1353            // syncing: epoch1 -> [imm1_1]
1354
1355            drop(guard2);
1356            let (clear_tx, clear_rx) = oneshot::channel();
1357            send_event(HummockEvent::Clear(
1358                clear_tx,
1359                Some(HashSet::from_iter([table_id2])),
1360            ));
1361            clear_rx.await.unwrap();
1362            (task1_2_finish_tx, finish_txs)
1363        };
1364
1365        let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1366        write_imm(&read_version1, &guard1, &imm1_2_2).await;
1367        start_epoch(table_id1, epoch3);
1368        seal_epoch(&guard1, epoch3);
1369
1370        let (tx2, mut sync_rx2) = oneshot::channel();
1371        let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1372            guard1.instance_id,
1373            vec![imm1_2_2.batch_id()],
1374        )]));
1375        send_event(HummockEvent::SyncEpoch {
1376            sync_result_sender: tx2,
1377            sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1378        });
1379        wait_task_start.await;
1380        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1381
1382        task1_1_finish_tx.send(()).unwrap();
1383        let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1384        sync_data1
1385            .uploaded_ssts
1386            .iter()
1387            .all(|sst| sst.epochs() == &vec![epoch1]);
1388        task1_2_finish_tx.send(()).unwrap();
1389        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1390        task1_2_2_finish_tx.send(()).unwrap();
1391        let sync_data2 = sync_rx2.await.unwrap().unwrap();
1392        sync_data2
1393            .uploaded_ssts
1394            .iter()
1395            .all(|sst| sst.epochs() == &vec![epoch2]);
1396
1397        send_event(HummockEvent::Shutdown);
1398        join_handle.await.unwrap();
1399    }
1400}