risingwave_storage/hummock/event_handler/
hummock_event_handler.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::pin::pin;
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering::Relaxed;
20use std::sync::{Arc, LazyLock};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use await_tree::{InstrumentAwait, SpanExt};
25use futures::FutureExt;
26use itertools::Itertools;
27use parking_lot::RwLock;
28use prometheus::{Histogram, IntGauge, IntGaugeVec};
29use risingwave_common::catalog::TableId;
30use risingwave_common::metrics::UintGauge;
31use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta};
34use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
35use tokio::spawn;
36use tokio::sync::mpsc::error::SendError;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, trace, warn};
40
41use super::refiller::{CacheRefillConfig, CacheRefiller};
42use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
43use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
44use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
45use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
46use crate::hummock::event_handler::uploader::{
47    HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
48};
49use crate::hummock::event_handler::{
50    HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51    ReadOnlyRwLockRef,
52};
53use crate::hummock::local_version::pinned_version::PinnedVersion;
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
56use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
57use crate::mem_table::ImmutableMemtable;
58use crate::monitor::HummockStateStoreMetrics;
59use crate::opts::StorageOpts;
60
61#[derive(Clone)]
62pub(crate) struct BufferTracker {
63    flush_threshold: usize,
64    min_batch_flush_size: usize,
65    global_uploading_memory_limiter: Arc<MemoryLimiter>,
66    uploader_imm_size: UintGauge,
67    uploader_uploading_task_size: UintGauge,
68}
69
70impl BufferTracker {
71    pub fn from_storage_opts(config: &StorageOpts, metrics: &HummockStateStoreMetrics) -> Self {
72        let capacity = config.shared_buffer_capacity_mb * (1 << 20);
73        let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
74        let min_batch_flush_size = config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
75        assert!(
76            flush_threshold < capacity,
77            "flush_threshold {} should be less or equal to capacity {}",
78            flush_threshold,
79            capacity
80        );
81        Self {
82            flush_threshold,
83            min_batch_flush_size,
84            global_uploading_memory_limiter: Arc::new(MemoryLimiter::new(capacity as u64)),
85            uploader_imm_size: metrics.uploader_imm_size.clone(),
86            uploader_uploading_task_size: metrics.uploader_uploading_task_size.clone(),
87        }
88    }
89
90    #[cfg(test)]
91    fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
92        Self {
93            flush_threshold,
94            min_batch_flush_size,
95            ..Self::for_test()
96        }
97    }
98
99    #[cfg(test)]
100    pub fn for_test() -> Self {
101        Self::from_storage_opts(&StorageOpts::default(), &HummockStateStoreMetrics::unused())
102    }
103
104    pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
105        &self.global_uploading_memory_limiter
106    }
107
108    pub fn global_upload_task_size(&self) -> &UintGauge {
109        &self.uploader_uploading_task_size
110    }
111
112    /// Return true when the buffer size minus current upload task size is still greater than the
113    /// flush threshold.
114    pub fn need_flush(&self) -> bool {
115        self.uploader_imm_size.get()
116            > self.flush_threshold as u64 + self.uploader_uploading_task_size.get()
117    }
118
119    pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
120        curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
121    }
122
123    #[cfg(test)]
124    pub(crate) fn flush_threshold(&self) -> usize {
125        self.flush_threshold
126    }
127}
128
129#[derive(Clone)]
130pub struct HummockEventSender {
131    inner: UnboundedSender<HummockEvent>,
132    event_count: IntGaugeVec,
133}
134
135pub fn event_channel(event_count: IntGaugeVec) -> (HummockEventSender, HummockEventReceiver) {
136    let (tx, rx) = unbounded_channel();
137    (
138        HummockEventSender {
139            inner: tx,
140            event_count: event_count.clone(),
141        },
142        HummockEventReceiver {
143            inner: rx,
144            event_count,
145        },
146    )
147}
148
149impl HummockEventSender {
150    pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
151        let event_type = event.event_name();
152        self.inner.send(event)?;
153        get_event_pending_gauge(&self.event_count, event_type).inc();
154        Ok(())
155    }
156}
157
158pub struct HummockEventReceiver {
159    inner: UnboundedReceiver<HummockEvent>,
160    event_count: IntGaugeVec,
161}
162
163impl HummockEventReceiver {
164    async fn recv(&mut self) -> Option<HummockEvent> {
165        let event = self.inner.recv().await?;
166        let event_type = event.event_name();
167        get_event_pending_gauge(&self.event_count, event_type).dec();
168        Some(event)
169    }
170}
171
172thread_local! {
173    static EVENT_PENDING_GAUGE_CACHE: RefCell<HashMap<&'static str, IntGauge>> = RefCell::new(HashMap::new());
174}
175
176fn get_event_pending_gauge(event_count: &IntGaugeVec, event_type: &'static str) -> IntGauge {
177    EVENT_PENDING_GAUGE_CACHE.with(|cache| {
178        cache
179            .borrow_mut()
180            .entry(event_type)
181            .or_insert_with(|| event_count.with_label_values(&[event_type]))
182            .clone()
183    })
184}
185
186struct HummockEventHandlerMetrics {
187    event_handler_on_upload_finish_latency: Histogram,
188    event_handler_on_apply_version_update: Histogram,
189    event_handler_on_recv_version_update: Histogram,
190    event_handler_on_spiller: Histogram,
191}
192
193pub(crate) struct HummockEventHandler {
194    hummock_event_tx: HummockEventSender,
195    hummock_event_rx: HummockEventReceiver,
196    version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
197    read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
198    /// A copy of `read_version_mapping` but owned by event handler
199    local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
200
201    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
202    recent_versions: Arc<ArcSwap<RecentVersions>>,
203
204    uploader: HummockUploader,
205    refiller: CacheRefiller,
206
207    last_instance_id: LocalInstanceId,
208
209    metrics: HummockEventHandlerMetrics,
210}
211
212async fn flush_imms(
213    payload: Vec<ImmutableMemtable>,
214    compactor_context: CompactorContext,
215    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
216    object_id_manager: Arc<ObjectIdManager>,
217) -> HummockResult<UploadTaskOutput> {
218    compact(
219        compactor_context,
220        object_id_manager,
221        payload,
222        compaction_catalog_manager_ref,
223    )
224    .instrument_await("shared_buffer_compact".verbose())
225    .await
226}
227
228impl HummockEventHandler {
229    pub fn new(
230        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
231        pinned_version: PinnedVersion,
232        compactor_context: CompactorContext,
233        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
234        object_id_manager: Arc<ObjectIdManager>,
235        state_store_metrics: Arc<HummockStateStoreMetrics>,
236    ) -> Self {
237        let upload_compactor_context = compactor_context.clone();
238        let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
239        let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
240        let recent_versions = RecentVersions::new(
241            pinned_version,
242            compactor_context
243                .storage_opts
244                .max_cached_recent_versions_number,
245            state_store_metrics.clone(),
246        );
247        let buffer_tracker =
248            BufferTracker::from_storage_opts(&compactor_context.storage_opts, &state_store_metrics);
249        Self::new_inner(
250            version_update_rx,
251            compactor_context.sstable_store.clone(),
252            state_store_metrics,
253            CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
254            recent_versions,
255            buffer_tracker,
256            Arc::new(move |payload, task_info| {
257                static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
258                    LazyLock::new(|| AtomicUsize::new(0));
259                let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
260                    let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
261                    reg.register(
262                        await_tree_key::SpawnUploadTask { id: upload_task_id },
263                        format!("Spawn Upload Task: {}", task_info),
264                    )
265                });
266                let upload_task_latency = upload_task_latency.clone();
267                let wait_poll_latency = wait_poll_latency.clone();
268                let upload_compactor_context = upload_compactor_context.clone();
269                let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
270                let object_id_manager = object_id_manager.clone();
271                spawn({
272                    let future = async move {
273                        let _timer = upload_task_latency.start_timer();
274                        let mut output = flush_imms(
275                            payload
276                                .into_values()
277                                .flat_map(|imms| imms.into_iter())
278                                .collect(),
279                            upload_compactor_context.clone(),
280                            compaction_catalog_manager_ref.clone(),
281                            object_id_manager.clone(),
282                        )
283                        .await?;
284                        assert!(
285                            output
286                                .wait_poll_timer
287                                .replace(wait_poll_latency.start_timer())
288                                .is_none(),
289                            "should not set timer before"
290                        );
291                        Ok(output)
292                    };
293                    if let Some(tree_root) = tree_root {
294                        tree_root.instrument(future).left_future()
295                    } else {
296                        future.right_future()
297                    }
298                })
299            }),
300            CacheRefiller::default_spawn_refill_task(),
301        )
302    }
303
304    fn new_inner(
305        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
306        sstable_store: SstableStoreRef,
307        state_store_metrics: Arc<HummockStateStoreMetrics>,
308        refill_config: CacheRefillConfig,
309        recent_versions: RecentVersions,
310        buffer_tracker: BufferTracker,
311        spawn_upload_task: SpawnUploadTask,
312        spawn_refill_task: SpawnRefillTask,
313    ) -> Self {
314        let (hummock_event_tx, hummock_event_rx) =
315            event_channel(state_store_metrics.event_handler_pending_event.clone());
316        let (version_update_notifier_tx, _) =
317            tokio::sync::watch::channel(recent_versions.latest_version().clone());
318        let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
319        let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
320
321        let metrics = HummockEventHandlerMetrics {
322            event_handler_on_upload_finish_latency: state_store_metrics
323                .event_handler_latency
324                .with_label_values(&["on_upload_finish"]),
325            event_handler_on_apply_version_update: state_store_metrics
326                .event_handler_latency
327                .with_label_values(&["apply_version"]),
328            event_handler_on_recv_version_update: state_store_metrics
329                .event_handler_latency
330                .with_label_values(&["recv_version_update"]),
331            event_handler_on_spiller: state_store_metrics
332                .event_handler_latency
333                .with_label_values(&["spiller"]),
334        };
335
336        let uploader = HummockUploader::new(
337            state_store_metrics,
338            recent_versions.latest_version().clone(),
339            spawn_upload_task,
340            buffer_tracker,
341        );
342        let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
343
344        Self {
345            hummock_event_tx,
346            hummock_event_rx,
347            version_update_rx,
348            version_update_notifier_tx,
349            recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
350            read_version_mapping,
351            local_read_version_mapping: Default::default(),
352            uploader,
353            refiller,
354            last_instance_id: 0,
355            metrics,
356        }
357    }
358
359    pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
360        self.version_update_notifier_tx.clone()
361    }
362
363    pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
364        self.recent_versions.clone()
365    }
366
367    pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
368        ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
369    }
370
371    pub fn event_sender(&self) -> HummockEventSender {
372        self.hummock_event_tx.clone()
373    }
374
375    pub fn buffer_tracker(&self) -> &BufferTracker {
376        self.uploader.buffer_tracker()
377    }
378}
379
380// Handler for different events
381impl HummockEventHandler {
382    /// This function will be performed under the protection of the `read_version_mapping` read
383    /// lock, and add write lock on each `read_version` operation
384    fn for_each_read_version(
385        &self,
386        instances: impl IntoIterator<Item = LocalInstanceId>,
387        mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
388    ) {
389        let instances = {
390            #[cfg(debug_assertions)]
391            {
392                // check duplication on debug_mode
393                let mut id_set = std::collections::HashSet::new();
394                for instance in instances {
395                    assert!(id_set.insert(instance));
396                }
397                id_set
398            }
399            #[cfg(not(debug_assertions))]
400            {
401                instances
402            }
403        };
404        let mut pending = VecDeque::new();
405        let mut total_count = 0;
406        for instance_id in instances {
407            let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
408                continue;
409            };
410            total_count += 1;
411            match read_version.try_write() {
412                Some(mut write_guard) => {
413                    f(instance_id, &mut write_guard);
414                }
415                _ => {
416                    pending.push_back(instance_id);
417                }
418            }
419        }
420        if !pending.is_empty() {
421            if pending.len() * 10 > total_count {
422                // Only print warn log when failed to acquire more than 10%
423                warn!(
424                    pending_count = pending.len(),
425                    total_count, "cannot acquire lock for all read version"
426                );
427            } else {
428                debug!(
429                    pending_count = pending.len(),
430                    total_count, "cannot acquire lock for all read version"
431                );
432            }
433        }
434
435        const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
436
437        while let Some(instance_id) = pending.pop_front() {
438            let (_, read_version) = self
439                .local_read_version_mapping
440                .get(&instance_id)
441                .expect("have checked exist before");
442            match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
443                Some(mut write_guard) => {
444                    f(instance_id, &mut write_guard);
445                }
446                _ => {
447                    warn!(instance_id, "failed to get lock again for instance");
448                    pending.push_back(instance_id);
449                }
450            }
451        }
452    }
453
454    fn handle_uploaded_ssts_inner(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
455        match ssts.as_slice() {
456            [] => {
457                if cfg!(debug_assertions) {
458                    panic!("empty ssts")
459                }
460            }
461            [staging_sstable_info] => {
462                trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
463                self.for_each_read_version(
464                    staging_sstable_info.imm_ids().keys().cloned(),
465                    |_, read_version| {
466                        read_version.update(VersionUpdate::Sst(staging_sstable_info.clone()))
467                    },
468                )
469            }
470            ssts => {
471                warn!(
472                    batch_size = ssts.len(),
473                    "handle multiple uploaded ssts in batch"
474                );
475                let affected_instances: HashSet<_> = ssts
476                    .iter()
477                    .flat_map(|sst| {
478                        trace!("data_flushed. SST size {}", sst.imm_size());
479                        sst.imm_ids().keys()
480                    })
481                    .copied()
482                    .collect();
483                self.for_each_read_version(affected_instances, |instance_id, read_version| {
484                    for sst in ssts {
485                        if sst.imm_ids().contains_key(&instance_id) {
486                            read_version.update(VersionUpdate::Sst(sst.clone()));
487                        }
488                    }
489                })
490            }
491        }
492    }
493
494    fn handle_sync_epoch(
495        &mut self,
496        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
497        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
498    ) {
499        debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
500        self.uploader
501            .start_sync_epoch(sync_result_sender, sync_table_epochs);
502    }
503
504    fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
505        info!(
506            current_version_id = ?self.uploader.hummock_version().id(),
507            ?table_ids,
508            "handle clear event"
509        );
510
511        self.uploader.clear(table_ids.clone());
512
513        if table_ids.is_none() {
514            assert!(
515                self.local_read_version_mapping.is_empty(),
516                "read version mapping not empty when clear. remaining tables: {:?}",
517                self.local_read_version_mapping
518                    .values()
519                    .map(|(_, read_version)| read_version.read().table_id())
520                    .collect_vec()
521            );
522        }
523
524        // Notify completion of the Clear event.
525        let _ = notifier.send(()).inspect_err(|e| {
526            error!("failed to notify completion of clear event: {:?}", e);
527        });
528
529        info!("clear finished");
530    }
531
532    fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
533        let _timer = self
534            .metrics
535            .event_handler_on_recv_version_update
536            .start_timer();
537        let pinned_version = self
538            .refiller
539            .last_new_pinned_version()
540            .cloned()
541            .unwrap_or_else(|| self.uploader.hummock_version().clone());
542
543        let mut sst_delta_infos = vec![];
544        if let Some(new_pinned_version) = Self::resolve_version_update_info(
545            &pinned_version,
546            version_payload,
547            Some(&mut sst_delta_infos),
548        ) {
549            self.refiller
550                .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
551        }
552    }
553
554    fn resolve_version_update_info(
555        pinned_version: &PinnedVersion,
556        version_payload: HummockVersionUpdate,
557        mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
558    ) -> Option<PinnedVersion> {
559        match version_payload {
560            HummockVersionUpdate::VersionDeltas(version_deltas) => {
561                let mut version_to_apply = (**pinned_version).clone();
562                {
563                    let mut table_change_log_to_apply_guard =
564                        pinned_version.table_change_log_write_lock();
565                    for version_delta in version_deltas {
566                        assert_eq!(version_to_apply.id, version_delta.prev_id);
567
568                        // apply change-log-delta
569                        {
570                            let mut state_table_info = version_to_apply.state_table_info.clone();
571                            let (changed_table_info, _is_commit_epoch) = state_table_info
572                                .apply_delta(
573                                    &version_delta.state_table_info_delta,
574                                    &version_delta.removed_table_ids,
575                                );
576
577                            HummockVersionCommon::<SstableInfo>::apply_change_log_delta(
578                                &mut *table_change_log_to_apply_guard,
579                                &version_delta.change_log_delta,
580                                &version_delta.removed_table_ids,
581                                &version_delta.state_table_info_delta,
582                                &changed_table_info,
583                            );
584                        }
585
586                        let local_hummock_version_delta =
587                            LocalHummockVersionDelta::from(version_delta);
588                        if let Some(sst_delta_infos) = &mut sst_delta_infos {
589                            sst_delta_infos.extend(
590                                version_to_apply
591                                    .build_sst_delta_infos(&local_hummock_version_delta)
592                                    .into_iter(),
593                            );
594                        }
595
596                        version_to_apply.apply_version_delta(&local_hummock_version_delta);
597                    }
598                }
599
600                pinned_version.new_with_local_version(version_to_apply)
601            }
602            HummockVersionUpdate::PinnedVersion(version) => {
603                pinned_version.new_pin_version(*version)
604            }
605        }
606    }
607
608    fn apply_version_updates(&mut self, events: Vec<CacheRefillerEvent>) {
609        let Some(CacheRefillerEvent {
610            new_pinned_version: latest_pinned_version,
611            ..
612        }) = events.last()
613        else {
614            if cfg!(debug_assertions) {
615                panic!("empty events")
616            }
617            return;
618        };
619        if events.len() > 1 {
620            warn!(
621                count = events.len(),
622                "handle multiple version updates in batch"
623            );
624        }
625        let _timer = self
626            .metrics
627            .event_handler_on_apply_version_update
628            .start_timer();
629        self.recent_versions.rcu(|prev_recent_versions| {
630            let mut recent_versions = None;
631            for event in &events {
632                let CacheRefillerEvent {
633                    new_pinned_version, ..
634                } = event;
635                recent_versions = Some(
636                    recent_versions
637                        .as_ref()
638                        .unwrap_or(prev_recent_versions.as_ref())
639                        .with_new_version(new_pinned_version.clone()),
640                );
641            }
642            recent_versions.expect("non-empty events")
643        });
644
645        {
646            self.for_each_read_version(
647                self.local_read_version_mapping.keys().cloned(),
648                |_, read_version| {
649                    for CacheRefillerEvent {
650                        new_pinned_version, ..
651                    } in &events
652                    {
653                        read_version
654                            .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
655                    }
656                },
657            );
658        }
659
660        self.version_update_notifier_tx.send_if_modified(|state| {
661            let mut modified = false;
662            for CacheRefillerEvent {
663                pinned_version,
664                new_pinned_version,
665            } in &events
666            {
667                assert_eq!(pinned_version.id(), state.id());
668                if state.id() == new_pinned_version.id() {
669                    continue;
670                }
671                assert!(new_pinned_version.id() > state.id());
672                *state = new_pinned_version.clone();
673                modified = true;
674            }
675            modified
676        });
677
678        debug!("update to hummock version: {}", latest_pinned_version.id(),);
679
680        self.uploader
681            .update_pinned_version(latest_pinned_version.clone());
682    }
683}
684
685impl HummockEventHandler {
686    pub async fn start_hummock_event_handler_worker(mut self) {
687        loop {
688            tokio::select! {
689                ssts = self.uploader.next_uploaded_ssts() => {
690                    self.handle_uploaded_ssts(ssts);
691                }
692                events = self.refiller.next_events() => {
693                    self.apply_version_updates(events);
694                }
695                event = pin!(self.hummock_event_rx.recv()) => {
696                    let Some(event) = event else { break };
697                    match event {
698                        HummockEvent::Shutdown => {
699                            info!("event handler shutdown");
700                            return;
701                        },
702                        event => {
703                            self.handle_hummock_event(event);
704                        }
705                    }
706                }
707                version_update = pin!(self.version_update_rx.recv()) => {
708                    let Some(version_update) = version_update else {
709                        warn!("version update stream ends. event handle shutdown");
710                        return;
711                    };
712                    self.handle_version_update(version_update);
713                }
714            }
715        }
716    }
717
718    fn handle_uploaded_ssts(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
719        let _timer = self
720            .metrics
721            .event_handler_on_upload_finish_latency
722            .start_timer();
723        self.handle_uploaded_ssts_inner(ssts);
724    }
725
726    /// Gracefully shutdown if returns `true`.
727    fn handle_hummock_event(&mut self, event: HummockEvent) {
728        match event {
729            HummockEvent::BufferMayFlush => {
730                self.uploader
731                    .may_flush(&self.metrics.event_handler_on_spiller);
732            }
733            HummockEvent::SyncEpoch {
734                sync_result_sender,
735                sync_table_epochs,
736            } => {
737                self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
738            }
739            HummockEvent::Clear(notifier, table_ids) => {
740                self.handle_clear(notifier, table_ids);
741            }
742            HummockEvent::Shutdown => {
743                unreachable!("shutdown is handled specially")
744            }
745            HummockEvent::StartEpoch { epoch, table_ids } => {
746                self.uploader.start_epoch(epoch, table_ids);
747            }
748            HummockEvent::InitEpoch {
749                instance_id,
750                init_epoch,
751            } => {
752                let table_id = self
753                    .local_read_version_mapping
754                    .get(&instance_id)
755                    .expect("should exist")
756                    .0;
757                self.uploader
758                    .init_instance(instance_id, table_id, init_epoch);
759            }
760            HummockEvent::ImmToUploader { instance_id, imms } => {
761                assert!(
762                    self.local_read_version_mapping.contains_key(&instance_id),
763                    "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
764                    instance_id,
765                    imms.first().map(|(imm, _)| imm.table_id),
766                );
767                self.uploader.add_imms(instance_id, imms);
768                self.uploader
769                    .may_flush(&self.metrics.event_handler_on_spiller);
770            }
771
772            HummockEvent::LocalSealEpoch {
773                next_epoch,
774                opts,
775                instance_id,
776            } => {
777                self.uploader
778                    .local_seal_epoch(instance_id, next_epoch, opts);
779            }
780
781            #[cfg(any(test, feature = "test"))]
782            HummockEvent::FlushEvent(sender) => {
783                let _ = sender.send(()).inspect_err(|e| {
784                    error!("unable to send flush result: {:?}", e);
785                });
786            }
787
788            HummockEvent::RegisterReadVersion {
789                table_id,
790                new_read_version_sender,
791                is_replicated,
792                vnodes,
793            } => {
794                let pinned_version = self.recent_versions.load().latest_version().clone();
795                let instance_id = self.generate_instance_id();
796                let basic_read_version = Arc::new(RwLock::new(
797                    HummockReadVersion::new_with_replication_option(
798                        table_id,
799                        instance_id,
800                        pinned_version,
801                        is_replicated,
802                        vnodes,
803                    ),
804                ));
805
806                debug!(
807                    "new read version registered: table_id: {}, instance_id: {}",
808                    table_id, instance_id
809                );
810
811                {
812                    self.local_read_version_mapping
813                        .insert(instance_id, (table_id, basic_read_version.clone()));
814                    let mut read_version_mapping_guard = self.read_version_mapping.write();
815
816                    read_version_mapping_guard
817                        .entry(table_id)
818                        .or_default()
819                        .insert(instance_id, basic_read_version.clone());
820                }
821
822                match new_read_version_sender.send((
823                    basic_read_version,
824                    LocalInstanceGuard {
825                        table_id,
826                        instance_id,
827                        event_sender: Some(self.hummock_event_tx.clone()),
828                    },
829                )) {
830                    Ok(_) => {}
831                    Err((_, mut guard)) => {
832                        warn!(
833                            "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
834                            table_id, instance_id
835                        );
836                        guard.event_sender.take().expect("sender is just set");
837                        self.destroy_read_version(instance_id);
838                    }
839                }
840            }
841
842            HummockEvent::DestroyReadVersion { instance_id } => {
843                self.uploader.may_destroy_instance(instance_id);
844                self.destroy_read_version(instance_id);
845            }
846            HummockEvent::GetMinUncommittedObjectId { result_tx } => {
847                let _ = result_tx
848                    .send(self.uploader.min_uncommitted_object_id())
849                    .inspect_err(|e| {
850                        error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
851                    });
852            }
853            HummockEvent::RegisterVectorWriter {
854                table_id,
855                init_epoch,
856            } => self.uploader.register_vector_writer(table_id, init_epoch),
857            HummockEvent::VectorWriterSealEpoch {
858                table_id,
859                next_epoch,
860                add,
861            } => {
862                self.uploader
863                    .vector_writer_seal_epoch(table_id, next_epoch, add);
864            }
865            HummockEvent::DropVectorWriter { table_id } => {
866                self.uploader.drop_vector_writer(table_id);
867            }
868        }
869    }
870
871    fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
872        {
873            {
874                debug!("read version deregister: instance_id: {}", instance_id);
875                let (table_id, _) = self
876                    .local_read_version_mapping
877                    .remove(&instance_id)
878                    .unwrap_or_else(|| {
879                        panic!(
880                            "DestroyHummockInstance inexist instance instance_id {}",
881                            instance_id
882                        )
883                    });
884                let mut read_version_mapping_guard = self.read_version_mapping.write();
885                let entry = read_version_mapping_guard
886                    .get_mut(&table_id)
887                    .unwrap_or_else(|| {
888                        panic!(
889                            "DestroyHummockInstance table_id {} instance_id {} fail",
890                            table_id, instance_id
891                        )
892                    });
893                entry.remove(&instance_id).unwrap_or_else(|| {
894                    panic!(
895                        "DestroyHummockInstance inexist instance table_id {} instance_id {}",
896                        table_id, instance_id
897                    )
898                });
899                if entry.is_empty() {
900                    read_version_mapping_guard.remove(&table_id);
901                }
902            }
903        }
904    }
905
906    fn generate_instance_id(&mut self) -> LocalInstanceId {
907        self.last_instance_id += 1;
908        self.last_instance_id
909    }
910}
911
912pub(super) fn send_sync_result(
913    sender: oneshot::Sender<HummockResult<SyncedData>>,
914    result: HummockResult<SyncedData>,
915) {
916    let _ = sender.send(result).inspect_err(|e| {
917        error!("unable to send sync result. Err: {:?}", e);
918    });
919}
920
921impl SyncedData {
922    pub fn into_sync_result(self) -> SyncResult {
923        {
924            let SyncedData {
925                uploaded_ssts,
926                table_watermarks,
927                vector_index_adds,
928            } = self;
929            let mut sync_size = 0;
930            let mut uncommitted_ssts = Vec::new();
931            let mut old_value_ssts = Vec::new();
932            // The newly uploaded `sstable_infos` contains newer data. Therefore,
933            // `newly_upload_ssts` at the front
934            for sst in uploaded_ssts {
935                sync_size += sst.imm_size();
936                uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
937                old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
938            }
939            SyncResult {
940                sync_size,
941                uncommitted_ssts,
942                table_watermarks,
943                old_value_ssts,
944                vector_index_adds,
945            }
946        }
947    }
948}
949
950#[cfg(test)]
951mod tests {
952    use std::collections::{HashMap, HashSet};
953    use std::future::poll_fn;
954    use std::sync::Arc;
955    use std::task::Poll;
956
957    use futures::FutureExt;
958    use parking_lot::Mutex;
959    use risingwave_common::bitmap::Bitmap;
960    use risingwave_common::catalog::TableId;
961    use risingwave_common::hash::VirtualNode;
962    use risingwave_common::util::epoch::{EpochExt, test_epoch};
963    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
964    use risingwave_hummock_sdk::version::HummockVersion;
965    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
966    use tokio::spawn;
967    use tokio::sync::mpsc::unbounded_channel;
968    use tokio::sync::oneshot;
969
970    use crate::hummock::HummockError;
971    use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
972    use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
973    use crate::hummock::event_handler::uploader::UploadTaskOutput;
974    use crate::hummock::event_handler::uploader::test_utils::{
975        TEST_TABLE_ID, gen_imm_inner, gen_imm_with_unlimit,
976        prepare_uploader_order_test_spawn_task_fn,
977    };
978    use crate::hummock::event_handler::{
979        HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
980    };
981    use crate::hummock::iterator::test_utils::mock_sstable_store;
982    use crate::hummock::local_version::pinned_version::PinnedVersion;
983    use crate::hummock::local_version::recent_versions::RecentVersions;
984    use crate::hummock::test_utils::default_opts_for_test;
985    use crate::mem_table::ImmutableMemtable;
986    use crate::monitor::HummockStateStoreMetrics;
987    use crate::store::SealCurrentEpochOptions;
988
989    #[tokio::test]
990    async fn test_old_epoch_sync_fail() {
991        let epoch0 = test_epoch(233);
992
993        let initial_version = PinnedVersion::new(
994            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
995                id: 1,
996                state_table_info: HashMap::from_iter([(
997                    TEST_TABLE_ID,
998                    StateTableInfo {
999                        committed_epoch: epoch0,
1000                        compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1001                    },
1002                )]),
1003                ..Default::default()
1004            }),
1005            unbounded_channel().0,
1006        );
1007
1008        let (_version_update_tx, version_update_rx) = unbounded_channel();
1009
1010        let epoch1 = epoch0.next_epoch();
1011        let epoch2 = epoch1.next_epoch();
1012        let (tx, rx) = oneshot::channel();
1013        let rx = Arc::new(Mutex::new(Some(rx)));
1014
1015        let storage_opt = default_opts_for_test();
1016        let metrics = Arc::new(HummockStateStoreMetrics::unused());
1017
1018        let event_handler = HummockEventHandler::new_inner(
1019            version_update_rx,
1020            mock_sstable_store().await,
1021            metrics.clone(),
1022            CacheRefillConfig::from_storage_opts(&storage_opt),
1023            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1024            BufferTracker::from_storage_opts(&storage_opt, &metrics),
1025            Arc::new(move |_, info| {
1026                assert_eq!(info.epochs.len(), 1);
1027                let epoch = info.epochs[0];
1028                match epoch {
1029                    epoch if epoch == epoch1 => {
1030                        let rx = rx.lock().take().unwrap();
1031                        spawn(async move {
1032                            rx.await.unwrap();
1033                            Err(HummockError::other("fail"))
1034                        })
1035                    }
1036                    epoch if epoch == epoch2 => spawn(async move {
1037                        Ok(UploadTaskOutput {
1038                            new_value_ssts: vec![],
1039                            old_value_ssts: vec![],
1040                            wait_poll_timer: None,
1041                        })
1042                    }),
1043                    _ => unreachable!(),
1044                }
1045            }),
1046            CacheRefiller::default_spawn_refill_task(),
1047        );
1048
1049        let event_tx = event_handler.event_sender();
1050
1051        let send_event = |event| event_tx.send(event).unwrap();
1052
1053        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1054
1055        let (read_version, guard) = {
1056            let (tx, rx) = oneshot::channel();
1057            send_event(HummockEvent::RegisterReadVersion {
1058                table_id: TEST_TABLE_ID,
1059                new_read_version_sender: tx,
1060                is_replicated: false,
1061                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1062            });
1063            rx.await.unwrap()
1064        };
1065
1066        send_event(HummockEvent::StartEpoch {
1067            epoch: epoch1,
1068            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1069        });
1070
1071        read_version.write().init();
1072
1073        send_event(HummockEvent::InitEpoch {
1074            instance_id: guard.instance_id,
1075            init_epoch: epoch1,
1076        });
1077
1078        let (imm1, tracker1) = gen_imm_with_unlimit(epoch1);
1079        read_version.write().add_pending_imm(imm1.clone(), tracker1);
1080
1081        send_event(HummockEvent::ImmToUploader {
1082            instance_id: guard.instance_id,
1083            imms: read_version.write().start_upload_pending_imms(),
1084        });
1085
1086        send_event(HummockEvent::StartEpoch {
1087            epoch: epoch2,
1088            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1089        });
1090
1091        send_event(HummockEvent::LocalSealEpoch {
1092            instance_id: guard.instance_id,
1093            next_epoch: epoch2,
1094            opts: SealCurrentEpochOptions::for_test(),
1095        });
1096
1097        {
1098            let (imm2, tracker2) = gen_imm_with_unlimit(epoch2);
1099            let mut read_version = read_version.write();
1100            read_version.add_pending_imm(imm2, tracker2);
1101
1102            send_event(HummockEvent::ImmToUploader {
1103                instance_id: guard.instance_id,
1104                imms: read_version.start_upload_pending_imms(),
1105            });
1106        }
1107
1108        let epoch3 = epoch2.next_epoch();
1109        send_event(HummockEvent::StartEpoch {
1110            epoch: epoch3,
1111            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1112        });
1113        send_event(HummockEvent::LocalSealEpoch {
1114            instance_id: guard.instance_id,
1115            next_epoch: epoch3,
1116            opts: SealCurrentEpochOptions::for_test(),
1117        });
1118
1119        let (tx1, mut rx1) = oneshot::channel();
1120        send_event(HummockEvent::SyncEpoch {
1121            sync_result_sender: tx1,
1122            sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1123        });
1124        assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1125        let (tx2, mut rx2) = oneshot::channel();
1126        send_event(HummockEvent::SyncEpoch {
1127            sync_result_sender: tx2,
1128            sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1129        });
1130        assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1131
1132        tx.send(()).unwrap();
1133        rx1.await.unwrap().unwrap_err();
1134        rx2.await.unwrap().unwrap_err();
1135
1136        send_event(HummockEvent::Shutdown);
1137        join_handle.await.unwrap();
1138    }
1139
1140    #[tokio::test]
1141    async fn test_clear_tables() {
1142        let table_id1 = TableId::new(1);
1143        let table_id2 = TableId::new(2);
1144        let epoch0 = test_epoch(233);
1145
1146        let initial_version = PinnedVersion::new(
1147            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1148                id: 1,
1149                state_table_info: HashMap::from_iter([
1150                    (
1151                        table_id1,
1152                        StateTableInfo {
1153                            committed_epoch: epoch0,
1154                            compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1155                        },
1156                    ),
1157                    (
1158                        table_id2,
1159                        StateTableInfo {
1160                            committed_epoch: epoch0,
1161                            compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1162                        },
1163                    ),
1164                ]),
1165                ..Default::default()
1166            }),
1167            unbounded_channel().0,
1168        );
1169
1170        let (_version_update_tx, version_update_rx) = unbounded_channel();
1171
1172        let epoch1 = epoch0.next_epoch();
1173        let epoch2 = epoch1.next_epoch();
1174        let epoch3 = epoch2.next_epoch();
1175
1176        let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0).size();
1177
1178        // The buffer can hold at most 1 imm. When a new imm is added, the previous one will be spilled, and the newly added one will be retained.
1179        let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1180        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1181
1182        let gen_imm = |table_id, epoch, spill_offset| {
1183            let imm = gen_imm_inner(table_id, epoch, spill_offset);
1184            assert_eq!(imm.size(), imm_size);
1185            imm
1186        };
1187        let imm1_1 = gen_imm(table_id1, epoch1, 0);
1188        let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1189
1190        let storage_opt = default_opts_for_test();
1191        let metrics = Arc::new(HummockStateStoreMetrics::unused());
1192
1193        let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1194
1195        let event_handler = HummockEventHandler::new_inner(
1196            version_update_rx,
1197            mock_sstable_store().await,
1198            metrics.clone(),
1199            CacheRefillConfig::from_storage_opts(&storage_opt),
1200            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1201            buffer_tracker,
1202            spawn_task,
1203            CacheRefiller::default_spawn_refill_task(),
1204        );
1205
1206        let event_tx = event_handler.event_sender();
1207
1208        let send_event = |event| event_tx.send(event).unwrap();
1209        let flush_event = || async {
1210            let (tx, rx) = oneshot::channel();
1211            send_event(HummockEvent::FlushEvent(tx));
1212            rx.await.unwrap();
1213        };
1214        let start_epoch = |table_id, epoch| {
1215            send_event(HummockEvent::StartEpoch {
1216                epoch,
1217                table_ids: HashSet::from_iter([table_id]),
1218            })
1219        };
1220        let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1221            send_event(HummockEvent::InitEpoch {
1222                instance_id: instance.instance_id,
1223                init_epoch,
1224            })
1225        };
1226        let event_tx_clone = event_tx.clone();
1227        let write_imm = {
1228            let memory_limiter = memory_limiter.clone();
1229            move |read_version: &HummockReadVersionRef,
1230                  instance: &LocalInstanceGuard,
1231                  imm: &ImmutableMemtable| {
1232                let memory_limiter = memory_limiter.clone();
1233                let event_tx = event_tx_clone.clone();
1234                let read_version = read_version.clone();
1235                let imm = imm.clone();
1236                let instance_id = instance.instance_id;
1237                async move {
1238                    let tracker = memory_limiter.require_memory(imm.size() as _).await;
1239                    let mut read_version = read_version.write();
1240                    read_version.add_pending_imm(imm.clone(), tracker);
1241
1242                    event_tx
1243                        .send(HummockEvent::ImmToUploader {
1244                            instance_id,
1245                            imms: read_version.start_upload_pending_imms(),
1246                        })
1247                        .unwrap();
1248                }
1249            }
1250        };
1251        let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1252            send_event(HummockEvent::LocalSealEpoch {
1253                instance_id: instance.instance_id,
1254                next_epoch,
1255                opts: SealCurrentEpochOptions::for_test(),
1256            })
1257        };
1258        let sync_epoch = |table_id, new_sync_epoch| {
1259            let (tx, rx) = oneshot::channel();
1260            send_event(HummockEvent::SyncEpoch {
1261                sync_result_sender: tx,
1262                sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1263            });
1264            rx
1265        };
1266
1267        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1268
1269        let (read_version1, guard1) = {
1270            let (tx, rx) = oneshot::channel();
1271            send_event(HummockEvent::RegisterReadVersion {
1272                table_id: table_id1,
1273                new_read_version_sender: tx,
1274                is_replicated: false,
1275                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1276            });
1277            rx.await.unwrap()
1278        };
1279
1280        let (read_version2, guard2) = {
1281            let (tx, rx) = oneshot::channel();
1282            send_event(HummockEvent::RegisterReadVersion {
1283                table_id: table_id2,
1284                new_read_version_sender: tx,
1285                is_replicated: false,
1286                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1287            });
1288            rx.await.unwrap()
1289        };
1290
1291        // prepare data of table1
1292        let (task1_1_finish_tx, task1_1_rx) = {
1293            start_epoch(table_id1, epoch1);
1294
1295            read_version1.write().init();
1296            init_epoch(&guard1, epoch1);
1297
1298            write_imm(&read_version1, &guard1, &imm1_1).await;
1299
1300            start_epoch(table_id1, epoch2);
1301
1302            seal_epoch(&guard1, epoch2);
1303
1304            let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1305                guard1.instance_id,
1306                vec![imm1_1.batch_id()],
1307            )]));
1308
1309            let mut rx = sync_epoch(table_id1, epoch1);
1310            wait_task_start.await;
1311            assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1312
1313            write_imm(&read_version1, &guard1, &imm1_2_1).await;
1314            flush_event().await;
1315
1316            (task_finish_tx, rx)
1317        };
1318        // by now, the state in uploader of table_id1
1319        // unsync:  epoch2 -> [imm1_2]
1320        // syncing: epoch1 -> [imm1_1]
1321
1322        let (task1_2_finish_tx, _finish_txs) = {
1323            let mut finish_txs = vec![];
1324            let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1325            start_epoch(table_id2, epoch1);
1326            read_version2.write().init();
1327            init_epoch(&guard2, epoch1);
1328            let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1329                guard1.instance_id,
1330                vec![imm1_2_1.batch_id()],
1331            )]));
1332            write_imm(&read_version2, &guard2, &imm2_1_1).await;
1333            wait_task_start.await;
1334
1335            let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1336            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1337                guard2.instance_id,
1338                vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1339            )]));
1340            finish_txs.push(finish_tx);
1341            write_imm(&read_version2, &guard2, &imm2_1_2).await;
1342            wait_task_start.await;
1343
1344            let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1345            write_imm(&read_version2, &guard2, &imm2_1_3).await;
1346            start_epoch(table_id2, epoch2);
1347            seal_epoch(&guard2, epoch2);
1348            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1349                guard2.instance_id,
1350                vec![imm2_1_3.batch_id()],
1351            )]));
1352            finish_txs.push(finish_tx);
1353            let _sync_rx = sync_epoch(table_id2, epoch1);
1354            wait_task_start.await;
1355
1356            let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1357            write_imm(&read_version2, &guard2, &imm2_2_1).await;
1358            flush_event().await;
1359            let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1360            write_imm(&read_version2, &guard2, &imm2_2_2).await;
1361            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1362                guard2.instance_id,
1363                vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1364            )]));
1365            finish_txs.push(finish_tx);
1366            wait_task_start.await;
1367
1368            let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1369            write_imm(&read_version2, &guard2, &imm2_2_3).await;
1370
1371            // by now, the state in uploader of table_id2
1372            // syncing: epoch1 -> spill: [imm2_1_2, imm2_1_1], sync: [imm2_1_3]
1373            // unsync: epoch2 -> spilling: [imm2_2_2, imm2_2_1], imm: [imm2_2_3]
1374            // the state in uploader of table_id1
1375            // unsync:  epoch2 -> spilling [imm1_2]
1376            // syncing: epoch1 -> [imm1_1]
1377
1378            drop(guard2);
1379            let (clear_tx, clear_rx) = oneshot::channel();
1380            send_event(HummockEvent::Clear(
1381                clear_tx,
1382                Some(HashSet::from_iter([table_id2])),
1383            ));
1384            clear_rx.await.unwrap();
1385            (task1_2_finish_tx, finish_txs)
1386        };
1387
1388        let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1389        write_imm(&read_version1, &guard1, &imm1_2_2).await;
1390        start_epoch(table_id1, epoch3);
1391        seal_epoch(&guard1, epoch3);
1392
1393        let (tx2, mut sync_rx2) = oneshot::channel();
1394        let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1395            guard1.instance_id,
1396            vec![imm1_2_2.batch_id()],
1397        )]));
1398        send_event(HummockEvent::SyncEpoch {
1399            sync_result_sender: tx2,
1400            sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1401        });
1402        wait_task_start.await;
1403        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1404
1405        task1_1_finish_tx.send(()).unwrap();
1406        let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1407        sync_data1
1408            .uploaded_ssts
1409            .iter()
1410            .all(|sst| sst.epochs() == &vec![epoch1]);
1411        task1_2_finish_tx.send(()).unwrap();
1412        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1413        task1_2_2_finish_tx.send(()).unwrap();
1414        let sync_data2 = sync_rx2.await.unwrap().unwrap();
1415        sync_data2
1416            .uploaded_ssts
1417            .iter()
1418            .all(|sst| sst.epochs() == &vec![epoch2]);
1419
1420        send_event(HummockEvent::Shutdown);
1421        join_handle.await.unwrap();
1422    }
1423}