1use std::cell::RefCell;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::pin::pin;
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering::Relaxed;
20use std::sync::{Arc, LazyLock};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use await_tree::{InstrumentAwait, SpanExt};
25use futures::FutureExt;
26use itertools::Itertools;
27use parking_lot::RwLock;
28use prometheus::{Histogram, IntGauge, IntGaugeVec};
29use risingwave_common::catalog::TableId;
30use risingwave_common::metrics::UintGauge;
31use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta};
34use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
35use tokio::spawn;
36use tokio::sync::mpsc::error::SendError;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
38use tokio::sync::oneshot;
39use tracing::{debug, error, info, trace, warn};
40
41use super::refiller::{CacheRefillConfig, CacheRefiller};
42use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
43use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
44use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
45use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
46use crate::hummock::event_handler::uploader::{
47 HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
48};
49use crate::hummock::event_handler::{
50 HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51 ReadOnlyRwLockRef,
52};
53use crate::hummock::local_version::pinned_version::PinnedVersion;
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
56use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
57use crate::mem_table::ImmutableMemtable;
58use crate::monitor::HummockStateStoreMetrics;
59use crate::opts::StorageOpts;
60
61#[derive(Clone)]
62pub(crate) struct BufferTracker {
63 flush_threshold: usize,
64 min_batch_flush_size: usize,
65 global_uploading_memory_limiter: Arc<MemoryLimiter>,
66 uploader_imm_size: UintGauge,
67 uploader_uploading_task_size: UintGauge,
68}
69
70impl BufferTracker {
71 pub fn from_storage_opts(config: &StorageOpts, metrics: &HummockStateStoreMetrics) -> Self {
72 let capacity = config.shared_buffer_capacity_mb * (1 << 20);
73 let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
74 let min_batch_flush_size = config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
75 assert!(
76 flush_threshold < capacity,
77 "flush_threshold {} should be less or equal to capacity {}",
78 flush_threshold,
79 capacity
80 );
81 Self {
82 flush_threshold,
83 min_batch_flush_size,
84 global_uploading_memory_limiter: Arc::new(MemoryLimiter::new(capacity as u64)),
85 uploader_imm_size: metrics.uploader_imm_size.clone(),
86 uploader_uploading_task_size: metrics.uploader_uploading_task_size.clone(),
87 }
88 }
89
90 #[cfg(test)]
91 fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
92 Self {
93 flush_threshold,
94 min_batch_flush_size,
95 ..Self::for_test()
96 }
97 }
98
99 #[cfg(test)]
100 pub fn for_test() -> Self {
101 Self::from_storage_opts(&StorageOpts::default(), &HummockStateStoreMetrics::unused())
102 }
103
104 pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
105 &self.global_uploading_memory_limiter
106 }
107
108 pub fn global_upload_task_size(&self) -> &UintGauge {
109 &self.uploader_uploading_task_size
110 }
111
112 pub fn need_flush(&self) -> bool {
115 self.uploader_imm_size.get()
116 > self.flush_threshold as u64 + self.uploader_uploading_task_size.get()
117 }
118
119 pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
120 curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
121 }
122
123 #[cfg(test)]
124 pub(crate) fn flush_threshold(&self) -> usize {
125 self.flush_threshold
126 }
127}
128
129#[derive(Clone)]
130pub struct HummockEventSender {
131 inner: UnboundedSender<HummockEvent>,
132 event_count: IntGaugeVec,
133}
134
135pub fn event_channel(event_count: IntGaugeVec) -> (HummockEventSender, HummockEventReceiver) {
136 let (tx, rx) = unbounded_channel();
137 (
138 HummockEventSender {
139 inner: tx,
140 event_count: event_count.clone(),
141 },
142 HummockEventReceiver {
143 inner: rx,
144 event_count,
145 },
146 )
147}
148
149impl HummockEventSender {
150 pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
151 let event_type = event.event_name();
152 self.inner.send(event)?;
153 get_event_pending_gauge(&self.event_count, event_type).inc();
154 Ok(())
155 }
156}
157
158pub struct HummockEventReceiver {
159 inner: UnboundedReceiver<HummockEvent>,
160 event_count: IntGaugeVec,
161}
162
163impl HummockEventReceiver {
164 async fn recv(&mut self) -> Option<HummockEvent> {
165 let event = self.inner.recv().await?;
166 let event_type = event.event_name();
167 get_event_pending_gauge(&self.event_count, event_type).dec();
168 Some(event)
169 }
170}
171
172thread_local! {
173 static EVENT_PENDING_GAUGE_CACHE: RefCell<HashMap<&'static str, IntGauge>> = RefCell::new(HashMap::new());
174}
175
176fn get_event_pending_gauge(event_count: &IntGaugeVec, event_type: &'static str) -> IntGauge {
177 EVENT_PENDING_GAUGE_CACHE.with(|cache| {
178 cache
179 .borrow_mut()
180 .entry(event_type)
181 .or_insert_with(|| event_count.with_label_values(&[event_type]))
182 .clone()
183 })
184}
185
186struct HummockEventHandlerMetrics {
187 event_handler_on_upload_finish_latency: Histogram,
188 event_handler_on_apply_version_update: Histogram,
189 event_handler_on_recv_version_update: Histogram,
190 event_handler_on_spiller: Histogram,
191}
192
193pub(crate) struct HummockEventHandler {
194 hummock_event_tx: HummockEventSender,
195 hummock_event_rx: HummockEventReceiver,
196 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
197 read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
198 local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
200
201 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
202 recent_versions: Arc<ArcSwap<RecentVersions>>,
203
204 uploader: HummockUploader,
205 refiller: CacheRefiller,
206
207 last_instance_id: LocalInstanceId,
208
209 metrics: HummockEventHandlerMetrics,
210}
211
212async fn flush_imms(
213 payload: Vec<ImmutableMemtable>,
214 compactor_context: CompactorContext,
215 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
216 object_id_manager: Arc<ObjectIdManager>,
217) -> HummockResult<UploadTaskOutput> {
218 compact(
219 compactor_context,
220 object_id_manager,
221 payload,
222 compaction_catalog_manager_ref,
223 )
224 .instrument_await("shared_buffer_compact".verbose())
225 .await
226}
227
228impl HummockEventHandler {
229 pub fn new(
230 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
231 pinned_version: PinnedVersion,
232 compactor_context: CompactorContext,
233 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
234 object_id_manager: Arc<ObjectIdManager>,
235 state_store_metrics: Arc<HummockStateStoreMetrics>,
236 ) -> Self {
237 let upload_compactor_context = compactor_context.clone();
238 let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
239 let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
240 let recent_versions = RecentVersions::new(
241 pinned_version,
242 compactor_context
243 .storage_opts
244 .max_cached_recent_versions_number,
245 state_store_metrics.clone(),
246 );
247 let buffer_tracker =
248 BufferTracker::from_storage_opts(&compactor_context.storage_opts, &state_store_metrics);
249 Self::new_inner(
250 version_update_rx,
251 compactor_context.sstable_store.clone(),
252 state_store_metrics,
253 CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
254 recent_versions,
255 buffer_tracker,
256 Arc::new(move |payload, task_info| {
257 static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
258 LazyLock::new(|| AtomicUsize::new(0));
259 let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
260 let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
261 reg.register(
262 await_tree_key::SpawnUploadTask { id: upload_task_id },
263 format!("Spawn Upload Task: {}", task_info),
264 )
265 });
266 let upload_task_latency = upload_task_latency.clone();
267 let wait_poll_latency = wait_poll_latency.clone();
268 let upload_compactor_context = upload_compactor_context.clone();
269 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
270 let object_id_manager = object_id_manager.clone();
271 spawn({
272 let future = async move {
273 let _timer = upload_task_latency.start_timer();
274 let mut output = flush_imms(
275 payload
276 .into_values()
277 .flat_map(|imms| imms.into_iter())
278 .collect(),
279 upload_compactor_context.clone(),
280 compaction_catalog_manager_ref.clone(),
281 object_id_manager.clone(),
282 )
283 .await?;
284 assert!(
285 output
286 .wait_poll_timer
287 .replace(wait_poll_latency.start_timer())
288 .is_none(),
289 "should not set timer before"
290 );
291 Ok(output)
292 };
293 if let Some(tree_root) = tree_root {
294 tree_root.instrument(future).left_future()
295 } else {
296 future.right_future()
297 }
298 })
299 }),
300 CacheRefiller::default_spawn_refill_task(),
301 )
302 }
303
304 fn new_inner(
305 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
306 sstable_store: SstableStoreRef,
307 state_store_metrics: Arc<HummockStateStoreMetrics>,
308 refill_config: CacheRefillConfig,
309 recent_versions: RecentVersions,
310 buffer_tracker: BufferTracker,
311 spawn_upload_task: SpawnUploadTask,
312 spawn_refill_task: SpawnRefillTask,
313 ) -> Self {
314 let (hummock_event_tx, hummock_event_rx) =
315 event_channel(state_store_metrics.event_handler_pending_event.clone());
316 let (version_update_notifier_tx, _) =
317 tokio::sync::watch::channel(recent_versions.latest_version().clone());
318 let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
319 let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
320
321 let metrics = HummockEventHandlerMetrics {
322 event_handler_on_upload_finish_latency: state_store_metrics
323 .event_handler_latency
324 .with_label_values(&["on_upload_finish"]),
325 event_handler_on_apply_version_update: state_store_metrics
326 .event_handler_latency
327 .with_label_values(&["apply_version"]),
328 event_handler_on_recv_version_update: state_store_metrics
329 .event_handler_latency
330 .with_label_values(&["recv_version_update"]),
331 event_handler_on_spiller: state_store_metrics
332 .event_handler_latency
333 .with_label_values(&["spiller"]),
334 };
335
336 let uploader = HummockUploader::new(
337 state_store_metrics,
338 recent_versions.latest_version().clone(),
339 spawn_upload_task,
340 buffer_tracker,
341 );
342 let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
343
344 Self {
345 hummock_event_tx,
346 hummock_event_rx,
347 version_update_rx,
348 version_update_notifier_tx,
349 recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
350 read_version_mapping,
351 local_read_version_mapping: Default::default(),
352 uploader,
353 refiller,
354 last_instance_id: 0,
355 metrics,
356 }
357 }
358
359 pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
360 self.version_update_notifier_tx.clone()
361 }
362
363 pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
364 self.recent_versions.clone()
365 }
366
367 pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
368 ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
369 }
370
371 pub fn event_sender(&self) -> HummockEventSender {
372 self.hummock_event_tx.clone()
373 }
374
375 pub fn buffer_tracker(&self) -> &BufferTracker {
376 self.uploader.buffer_tracker()
377 }
378}
379
380impl HummockEventHandler {
382 fn for_each_read_version(
385 &self,
386 instances: impl IntoIterator<Item = LocalInstanceId>,
387 mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
388 ) {
389 let instances = {
390 #[cfg(debug_assertions)]
391 {
392 let mut id_set = std::collections::HashSet::new();
394 for instance in instances {
395 assert!(id_set.insert(instance));
396 }
397 id_set
398 }
399 #[cfg(not(debug_assertions))]
400 {
401 instances
402 }
403 };
404 let mut pending = VecDeque::new();
405 let mut total_count = 0;
406 for instance_id in instances {
407 let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
408 continue;
409 };
410 total_count += 1;
411 match read_version.try_write() {
412 Some(mut write_guard) => {
413 f(instance_id, &mut write_guard);
414 }
415 _ => {
416 pending.push_back(instance_id);
417 }
418 }
419 }
420 if !pending.is_empty() {
421 if pending.len() * 10 > total_count {
422 warn!(
424 pending_count = pending.len(),
425 total_count, "cannot acquire lock for all read version"
426 );
427 } else {
428 debug!(
429 pending_count = pending.len(),
430 total_count, "cannot acquire lock for all read version"
431 );
432 }
433 }
434
435 const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
436
437 while let Some(instance_id) = pending.pop_front() {
438 let (_, read_version) = self
439 .local_read_version_mapping
440 .get(&instance_id)
441 .expect("have checked exist before");
442 match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
443 Some(mut write_guard) => {
444 f(instance_id, &mut write_guard);
445 }
446 _ => {
447 warn!(instance_id, "failed to get lock again for instance");
448 pending.push_back(instance_id);
449 }
450 }
451 }
452 }
453
454 fn handle_uploaded_ssts_inner(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
455 match ssts.as_slice() {
456 [] => {
457 if cfg!(debug_assertions) {
458 panic!("empty ssts")
459 }
460 }
461 [staging_sstable_info] => {
462 trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
463 self.for_each_read_version(
464 staging_sstable_info.imm_ids().keys().cloned(),
465 |_, read_version| {
466 read_version.update(VersionUpdate::Sst(staging_sstable_info.clone()))
467 },
468 )
469 }
470 ssts => {
471 warn!(
472 batch_size = ssts.len(),
473 "handle multiple uploaded ssts in batch"
474 );
475 let affected_instances: HashSet<_> = ssts
476 .iter()
477 .flat_map(|sst| {
478 trace!("data_flushed. SST size {}", sst.imm_size());
479 sst.imm_ids().keys()
480 })
481 .copied()
482 .collect();
483 self.for_each_read_version(affected_instances, |instance_id, read_version| {
484 for sst in ssts {
485 if sst.imm_ids().contains_key(&instance_id) {
486 read_version.update(VersionUpdate::Sst(sst.clone()));
487 }
488 }
489 })
490 }
491 }
492 }
493
494 fn handle_sync_epoch(
495 &mut self,
496 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
497 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
498 ) {
499 debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
500 self.uploader
501 .start_sync_epoch(sync_result_sender, sync_table_epochs);
502 }
503
504 fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
505 info!(
506 current_version_id = ?self.uploader.hummock_version().id(),
507 ?table_ids,
508 "handle clear event"
509 );
510
511 self.uploader.clear(table_ids.clone());
512
513 if table_ids.is_none() {
514 assert!(
515 self.local_read_version_mapping.is_empty(),
516 "read version mapping not empty when clear. remaining tables: {:?}",
517 self.local_read_version_mapping
518 .values()
519 .map(|(_, read_version)| read_version.read().table_id())
520 .collect_vec()
521 );
522 }
523
524 let _ = notifier.send(()).inspect_err(|e| {
526 error!("failed to notify completion of clear event: {:?}", e);
527 });
528
529 info!("clear finished");
530 }
531
532 fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
533 let _timer = self
534 .metrics
535 .event_handler_on_recv_version_update
536 .start_timer();
537 let pinned_version = self
538 .refiller
539 .last_new_pinned_version()
540 .cloned()
541 .unwrap_or_else(|| self.uploader.hummock_version().clone());
542
543 let mut sst_delta_infos = vec![];
544 if let Some(new_pinned_version) = Self::resolve_version_update_info(
545 &pinned_version,
546 version_payload,
547 Some(&mut sst_delta_infos),
548 ) {
549 self.refiller
550 .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
551 }
552 }
553
554 fn resolve_version_update_info(
555 pinned_version: &PinnedVersion,
556 version_payload: HummockVersionUpdate,
557 mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
558 ) -> Option<PinnedVersion> {
559 match version_payload {
560 HummockVersionUpdate::VersionDeltas(version_deltas) => {
561 let mut version_to_apply = (**pinned_version).clone();
562 {
563 let mut table_change_log_to_apply_guard =
564 pinned_version.table_change_log_write_lock();
565 for version_delta in version_deltas {
566 assert_eq!(version_to_apply.id, version_delta.prev_id);
567
568 {
570 let mut state_table_info = version_to_apply.state_table_info.clone();
571 let (changed_table_info, _is_commit_epoch) = state_table_info
572 .apply_delta(
573 &version_delta.state_table_info_delta,
574 &version_delta.removed_table_ids,
575 );
576
577 HummockVersionCommon::<SstableInfo>::apply_change_log_delta(
578 &mut *table_change_log_to_apply_guard,
579 &version_delta.change_log_delta,
580 &version_delta.removed_table_ids,
581 &version_delta.state_table_info_delta,
582 &changed_table_info,
583 );
584 }
585
586 let local_hummock_version_delta =
587 LocalHummockVersionDelta::from(version_delta);
588 if let Some(sst_delta_infos) = &mut sst_delta_infos {
589 sst_delta_infos.extend(
590 version_to_apply
591 .build_sst_delta_infos(&local_hummock_version_delta)
592 .into_iter(),
593 );
594 }
595
596 version_to_apply.apply_version_delta(&local_hummock_version_delta);
597 }
598 }
599
600 pinned_version.new_with_local_version(version_to_apply)
601 }
602 HummockVersionUpdate::PinnedVersion(version) => {
603 pinned_version.new_pin_version(*version)
604 }
605 }
606 }
607
608 fn apply_version_updates(&mut self, events: Vec<CacheRefillerEvent>) {
609 let Some(CacheRefillerEvent {
610 new_pinned_version: latest_pinned_version,
611 ..
612 }) = events.last()
613 else {
614 if cfg!(debug_assertions) {
615 panic!("empty events")
616 }
617 return;
618 };
619 if events.len() > 1 {
620 warn!(
621 count = events.len(),
622 "handle multiple version updates in batch"
623 );
624 }
625 let _timer = self
626 .metrics
627 .event_handler_on_apply_version_update
628 .start_timer();
629 self.recent_versions.rcu(|prev_recent_versions| {
630 let mut recent_versions = None;
631 for event in &events {
632 let CacheRefillerEvent {
633 new_pinned_version, ..
634 } = event;
635 recent_versions = Some(
636 recent_versions
637 .as_ref()
638 .unwrap_or(prev_recent_versions.as_ref())
639 .with_new_version(new_pinned_version.clone()),
640 );
641 }
642 recent_versions.expect("non-empty events")
643 });
644
645 {
646 self.for_each_read_version(
647 self.local_read_version_mapping.keys().cloned(),
648 |_, read_version| {
649 for CacheRefillerEvent {
650 new_pinned_version, ..
651 } in &events
652 {
653 read_version
654 .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
655 }
656 },
657 );
658 }
659
660 self.version_update_notifier_tx.send_if_modified(|state| {
661 let mut modified = false;
662 for CacheRefillerEvent {
663 pinned_version,
664 new_pinned_version,
665 } in &events
666 {
667 assert_eq!(pinned_version.id(), state.id());
668 if state.id() == new_pinned_version.id() {
669 continue;
670 }
671 assert!(new_pinned_version.id() > state.id());
672 *state = new_pinned_version.clone();
673 modified = true;
674 }
675 modified
676 });
677
678 debug!("update to hummock version: {}", latest_pinned_version.id(),);
679
680 self.uploader
681 .update_pinned_version(latest_pinned_version.clone());
682 }
683}
684
685impl HummockEventHandler {
686 pub async fn start_hummock_event_handler_worker(mut self) {
687 loop {
688 tokio::select! {
689 ssts = self.uploader.next_uploaded_ssts() => {
690 self.handle_uploaded_ssts(ssts);
691 }
692 events = self.refiller.next_events() => {
693 self.apply_version_updates(events);
694 }
695 event = pin!(self.hummock_event_rx.recv()) => {
696 let Some(event) = event else { break };
697 match event {
698 HummockEvent::Shutdown => {
699 info!("event handler shutdown");
700 return;
701 },
702 event => {
703 self.handle_hummock_event(event);
704 }
705 }
706 }
707 version_update = pin!(self.version_update_rx.recv()) => {
708 let Some(version_update) = version_update else {
709 warn!("version update stream ends. event handle shutdown");
710 return;
711 };
712 self.handle_version_update(version_update);
713 }
714 }
715 }
716 }
717
718 fn handle_uploaded_ssts(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
719 let _timer = self
720 .metrics
721 .event_handler_on_upload_finish_latency
722 .start_timer();
723 self.handle_uploaded_ssts_inner(ssts);
724 }
725
726 fn handle_hummock_event(&mut self, event: HummockEvent) {
728 match event {
729 HummockEvent::BufferMayFlush => {
730 self.uploader
731 .may_flush(&self.metrics.event_handler_on_spiller);
732 }
733 HummockEvent::SyncEpoch {
734 sync_result_sender,
735 sync_table_epochs,
736 } => {
737 self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
738 }
739 HummockEvent::Clear(notifier, table_ids) => {
740 self.handle_clear(notifier, table_ids);
741 }
742 HummockEvent::Shutdown => {
743 unreachable!("shutdown is handled specially")
744 }
745 HummockEvent::StartEpoch { epoch, table_ids } => {
746 self.uploader.start_epoch(epoch, table_ids);
747 }
748 HummockEvent::InitEpoch {
749 instance_id,
750 init_epoch,
751 } => {
752 let table_id = self
753 .local_read_version_mapping
754 .get(&instance_id)
755 .expect("should exist")
756 .0;
757 self.uploader
758 .init_instance(instance_id, table_id, init_epoch);
759 }
760 HummockEvent::ImmToUploader { instance_id, imms } => {
761 assert!(
762 self.local_read_version_mapping.contains_key(&instance_id),
763 "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
764 instance_id,
765 imms.first().map(|(imm, _)| imm.table_id),
766 );
767 self.uploader.add_imms(instance_id, imms);
768 self.uploader
769 .may_flush(&self.metrics.event_handler_on_spiller);
770 }
771
772 HummockEvent::LocalSealEpoch {
773 next_epoch,
774 opts,
775 instance_id,
776 } => {
777 self.uploader
778 .local_seal_epoch(instance_id, next_epoch, opts);
779 }
780
781 #[cfg(any(test, feature = "test"))]
782 HummockEvent::FlushEvent(sender) => {
783 let _ = sender.send(()).inspect_err(|e| {
784 error!("unable to send flush result: {:?}", e);
785 });
786 }
787
788 HummockEvent::RegisterReadVersion {
789 table_id,
790 new_read_version_sender,
791 is_replicated,
792 vnodes,
793 } => {
794 let pinned_version = self.recent_versions.load().latest_version().clone();
795 let instance_id = self.generate_instance_id();
796 let basic_read_version = Arc::new(RwLock::new(
797 HummockReadVersion::new_with_replication_option(
798 table_id,
799 instance_id,
800 pinned_version,
801 is_replicated,
802 vnodes,
803 ),
804 ));
805
806 debug!(
807 "new read version registered: table_id: {}, instance_id: {}",
808 table_id, instance_id
809 );
810
811 {
812 self.local_read_version_mapping
813 .insert(instance_id, (table_id, basic_read_version.clone()));
814 let mut read_version_mapping_guard = self.read_version_mapping.write();
815
816 read_version_mapping_guard
817 .entry(table_id)
818 .or_default()
819 .insert(instance_id, basic_read_version.clone());
820 }
821
822 match new_read_version_sender.send((
823 basic_read_version,
824 LocalInstanceGuard {
825 table_id,
826 instance_id,
827 event_sender: Some(self.hummock_event_tx.clone()),
828 },
829 )) {
830 Ok(_) => {}
831 Err((_, mut guard)) => {
832 warn!(
833 "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
834 table_id, instance_id
835 );
836 guard.event_sender.take().expect("sender is just set");
837 self.destroy_read_version(instance_id);
838 }
839 }
840 }
841
842 HummockEvent::DestroyReadVersion { instance_id } => {
843 self.uploader.may_destroy_instance(instance_id);
844 self.destroy_read_version(instance_id);
845 }
846 HummockEvent::GetMinUncommittedObjectId { result_tx } => {
847 let _ = result_tx
848 .send(self.uploader.min_uncommitted_object_id())
849 .inspect_err(|e| {
850 error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
851 });
852 }
853 HummockEvent::RegisterVectorWriter {
854 table_id,
855 init_epoch,
856 } => self.uploader.register_vector_writer(table_id, init_epoch),
857 HummockEvent::VectorWriterSealEpoch {
858 table_id,
859 next_epoch,
860 add,
861 } => {
862 self.uploader
863 .vector_writer_seal_epoch(table_id, next_epoch, add);
864 }
865 HummockEvent::DropVectorWriter { table_id } => {
866 self.uploader.drop_vector_writer(table_id);
867 }
868 }
869 }
870
871 fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
872 {
873 {
874 debug!("read version deregister: instance_id: {}", instance_id);
875 let (table_id, _) = self
876 .local_read_version_mapping
877 .remove(&instance_id)
878 .unwrap_or_else(|| {
879 panic!(
880 "DestroyHummockInstance inexist instance instance_id {}",
881 instance_id
882 )
883 });
884 let mut read_version_mapping_guard = self.read_version_mapping.write();
885 let entry = read_version_mapping_guard
886 .get_mut(&table_id)
887 .unwrap_or_else(|| {
888 panic!(
889 "DestroyHummockInstance table_id {} instance_id {} fail",
890 table_id, instance_id
891 )
892 });
893 entry.remove(&instance_id).unwrap_or_else(|| {
894 panic!(
895 "DestroyHummockInstance inexist instance table_id {} instance_id {}",
896 table_id, instance_id
897 )
898 });
899 if entry.is_empty() {
900 read_version_mapping_guard.remove(&table_id);
901 }
902 }
903 }
904 }
905
906 fn generate_instance_id(&mut self) -> LocalInstanceId {
907 self.last_instance_id += 1;
908 self.last_instance_id
909 }
910}
911
912pub(super) fn send_sync_result(
913 sender: oneshot::Sender<HummockResult<SyncedData>>,
914 result: HummockResult<SyncedData>,
915) {
916 let _ = sender.send(result).inspect_err(|e| {
917 error!("unable to send sync result. Err: {:?}", e);
918 });
919}
920
921impl SyncedData {
922 pub fn into_sync_result(self) -> SyncResult {
923 {
924 let SyncedData {
925 uploaded_ssts,
926 table_watermarks,
927 vector_index_adds,
928 } = self;
929 let mut sync_size = 0;
930 let mut uncommitted_ssts = Vec::new();
931 let mut old_value_ssts = Vec::new();
932 for sst in uploaded_ssts {
935 sync_size += sst.imm_size();
936 uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
937 old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
938 }
939 SyncResult {
940 sync_size,
941 uncommitted_ssts,
942 table_watermarks,
943 old_value_ssts,
944 vector_index_adds,
945 }
946 }
947 }
948}
949
950#[cfg(test)]
951mod tests {
952 use std::collections::{HashMap, HashSet};
953 use std::future::poll_fn;
954 use std::sync::Arc;
955 use std::task::Poll;
956
957 use futures::FutureExt;
958 use parking_lot::Mutex;
959 use risingwave_common::bitmap::Bitmap;
960 use risingwave_common::catalog::TableId;
961 use risingwave_common::hash::VirtualNode;
962 use risingwave_common::util::epoch::{EpochExt, test_epoch};
963 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
964 use risingwave_hummock_sdk::version::HummockVersion;
965 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
966 use tokio::spawn;
967 use tokio::sync::mpsc::unbounded_channel;
968 use tokio::sync::oneshot;
969
970 use crate::hummock::HummockError;
971 use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
972 use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
973 use crate::hummock::event_handler::uploader::UploadTaskOutput;
974 use crate::hummock::event_handler::uploader::test_utils::{
975 TEST_TABLE_ID, gen_imm_inner, gen_imm_with_unlimit,
976 prepare_uploader_order_test_spawn_task_fn,
977 };
978 use crate::hummock::event_handler::{
979 HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
980 };
981 use crate::hummock::iterator::test_utils::mock_sstable_store;
982 use crate::hummock::local_version::pinned_version::PinnedVersion;
983 use crate::hummock::local_version::recent_versions::RecentVersions;
984 use crate::hummock::test_utils::default_opts_for_test;
985 use crate::mem_table::ImmutableMemtable;
986 use crate::monitor::HummockStateStoreMetrics;
987 use crate::store::SealCurrentEpochOptions;
988
989 #[tokio::test]
990 async fn test_old_epoch_sync_fail() {
991 let epoch0 = test_epoch(233);
992
993 let initial_version = PinnedVersion::new(
994 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
995 id: 1,
996 state_table_info: HashMap::from_iter([(
997 TEST_TABLE_ID,
998 StateTableInfo {
999 committed_epoch: epoch0,
1000 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1001 },
1002 )]),
1003 ..Default::default()
1004 }),
1005 unbounded_channel().0,
1006 );
1007
1008 let (_version_update_tx, version_update_rx) = unbounded_channel();
1009
1010 let epoch1 = epoch0.next_epoch();
1011 let epoch2 = epoch1.next_epoch();
1012 let (tx, rx) = oneshot::channel();
1013 let rx = Arc::new(Mutex::new(Some(rx)));
1014
1015 let storage_opt = default_opts_for_test();
1016 let metrics = Arc::new(HummockStateStoreMetrics::unused());
1017
1018 let event_handler = HummockEventHandler::new_inner(
1019 version_update_rx,
1020 mock_sstable_store().await,
1021 metrics.clone(),
1022 CacheRefillConfig::from_storage_opts(&storage_opt),
1023 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1024 BufferTracker::from_storage_opts(&storage_opt, &metrics),
1025 Arc::new(move |_, info| {
1026 assert_eq!(info.epochs.len(), 1);
1027 let epoch = info.epochs[0];
1028 match epoch {
1029 epoch if epoch == epoch1 => {
1030 let rx = rx.lock().take().unwrap();
1031 spawn(async move {
1032 rx.await.unwrap();
1033 Err(HummockError::other("fail"))
1034 })
1035 }
1036 epoch if epoch == epoch2 => spawn(async move {
1037 Ok(UploadTaskOutput {
1038 new_value_ssts: vec![],
1039 old_value_ssts: vec![],
1040 wait_poll_timer: None,
1041 })
1042 }),
1043 _ => unreachable!(),
1044 }
1045 }),
1046 CacheRefiller::default_spawn_refill_task(),
1047 );
1048
1049 let event_tx = event_handler.event_sender();
1050
1051 let send_event = |event| event_tx.send(event).unwrap();
1052
1053 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1054
1055 let (read_version, guard) = {
1056 let (tx, rx) = oneshot::channel();
1057 send_event(HummockEvent::RegisterReadVersion {
1058 table_id: TEST_TABLE_ID,
1059 new_read_version_sender: tx,
1060 is_replicated: false,
1061 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1062 });
1063 rx.await.unwrap()
1064 };
1065
1066 send_event(HummockEvent::StartEpoch {
1067 epoch: epoch1,
1068 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1069 });
1070
1071 read_version.write().init();
1072
1073 send_event(HummockEvent::InitEpoch {
1074 instance_id: guard.instance_id,
1075 init_epoch: epoch1,
1076 });
1077
1078 let (imm1, tracker1) = gen_imm_with_unlimit(epoch1);
1079 read_version.write().add_pending_imm(imm1.clone(), tracker1);
1080
1081 send_event(HummockEvent::ImmToUploader {
1082 instance_id: guard.instance_id,
1083 imms: read_version.write().start_upload_pending_imms(),
1084 });
1085
1086 send_event(HummockEvent::StartEpoch {
1087 epoch: epoch2,
1088 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1089 });
1090
1091 send_event(HummockEvent::LocalSealEpoch {
1092 instance_id: guard.instance_id,
1093 next_epoch: epoch2,
1094 opts: SealCurrentEpochOptions::for_test(),
1095 });
1096
1097 {
1098 let (imm2, tracker2) = gen_imm_with_unlimit(epoch2);
1099 let mut read_version = read_version.write();
1100 read_version.add_pending_imm(imm2, tracker2);
1101
1102 send_event(HummockEvent::ImmToUploader {
1103 instance_id: guard.instance_id,
1104 imms: read_version.start_upload_pending_imms(),
1105 });
1106 }
1107
1108 let epoch3 = epoch2.next_epoch();
1109 send_event(HummockEvent::StartEpoch {
1110 epoch: epoch3,
1111 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1112 });
1113 send_event(HummockEvent::LocalSealEpoch {
1114 instance_id: guard.instance_id,
1115 next_epoch: epoch3,
1116 opts: SealCurrentEpochOptions::for_test(),
1117 });
1118
1119 let (tx1, mut rx1) = oneshot::channel();
1120 send_event(HummockEvent::SyncEpoch {
1121 sync_result_sender: tx1,
1122 sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1123 });
1124 assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1125 let (tx2, mut rx2) = oneshot::channel();
1126 send_event(HummockEvent::SyncEpoch {
1127 sync_result_sender: tx2,
1128 sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1129 });
1130 assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1131
1132 tx.send(()).unwrap();
1133 rx1.await.unwrap().unwrap_err();
1134 rx2.await.unwrap().unwrap_err();
1135
1136 send_event(HummockEvent::Shutdown);
1137 join_handle.await.unwrap();
1138 }
1139
1140 #[tokio::test]
1141 async fn test_clear_tables() {
1142 let table_id1 = TableId::new(1);
1143 let table_id2 = TableId::new(2);
1144 let epoch0 = test_epoch(233);
1145
1146 let initial_version = PinnedVersion::new(
1147 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1148 id: 1,
1149 state_table_info: HashMap::from_iter([
1150 (
1151 table_id1,
1152 StateTableInfo {
1153 committed_epoch: epoch0,
1154 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1155 },
1156 ),
1157 (
1158 table_id2,
1159 StateTableInfo {
1160 committed_epoch: epoch0,
1161 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1162 },
1163 ),
1164 ]),
1165 ..Default::default()
1166 }),
1167 unbounded_channel().0,
1168 );
1169
1170 let (_version_update_tx, version_update_rx) = unbounded_channel();
1171
1172 let epoch1 = epoch0.next_epoch();
1173 let epoch2 = epoch1.next_epoch();
1174 let epoch3 = epoch2.next_epoch();
1175
1176 let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0).size();
1177
1178 let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1180 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1181
1182 let gen_imm = |table_id, epoch, spill_offset| {
1183 let imm = gen_imm_inner(table_id, epoch, spill_offset);
1184 assert_eq!(imm.size(), imm_size);
1185 imm
1186 };
1187 let imm1_1 = gen_imm(table_id1, epoch1, 0);
1188 let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1189
1190 let storage_opt = default_opts_for_test();
1191 let metrics = Arc::new(HummockStateStoreMetrics::unused());
1192
1193 let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1194
1195 let event_handler = HummockEventHandler::new_inner(
1196 version_update_rx,
1197 mock_sstable_store().await,
1198 metrics.clone(),
1199 CacheRefillConfig::from_storage_opts(&storage_opt),
1200 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1201 buffer_tracker,
1202 spawn_task,
1203 CacheRefiller::default_spawn_refill_task(),
1204 );
1205
1206 let event_tx = event_handler.event_sender();
1207
1208 let send_event = |event| event_tx.send(event).unwrap();
1209 let flush_event = || async {
1210 let (tx, rx) = oneshot::channel();
1211 send_event(HummockEvent::FlushEvent(tx));
1212 rx.await.unwrap();
1213 };
1214 let start_epoch = |table_id, epoch| {
1215 send_event(HummockEvent::StartEpoch {
1216 epoch,
1217 table_ids: HashSet::from_iter([table_id]),
1218 })
1219 };
1220 let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1221 send_event(HummockEvent::InitEpoch {
1222 instance_id: instance.instance_id,
1223 init_epoch,
1224 })
1225 };
1226 let event_tx_clone = event_tx.clone();
1227 let write_imm = {
1228 let memory_limiter = memory_limiter.clone();
1229 move |read_version: &HummockReadVersionRef,
1230 instance: &LocalInstanceGuard,
1231 imm: &ImmutableMemtable| {
1232 let memory_limiter = memory_limiter.clone();
1233 let event_tx = event_tx_clone.clone();
1234 let read_version = read_version.clone();
1235 let imm = imm.clone();
1236 let instance_id = instance.instance_id;
1237 async move {
1238 let tracker = memory_limiter.require_memory(imm.size() as _).await;
1239 let mut read_version = read_version.write();
1240 read_version.add_pending_imm(imm.clone(), tracker);
1241
1242 event_tx
1243 .send(HummockEvent::ImmToUploader {
1244 instance_id,
1245 imms: read_version.start_upload_pending_imms(),
1246 })
1247 .unwrap();
1248 }
1249 }
1250 };
1251 let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1252 send_event(HummockEvent::LocalSealEpoch {
1253 instance_id: instance.instance_id,
1254 next_epoch,
1255 opts: SealCurrentEpochOptions::for_test(),
1256 })
1257 };
1258 let sync_epoch = |table_id, new_sync_epoch| {
1259 let (tx, rx) = oneshot::channel();
1260 send_event(HummockEvent::SyncEpoch {
1261 sync_result_sender: tx,
1262 sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1263 });
1264 rx
1265 };
1266
1267 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1268
1269 let (read_version1, guard1) = {
1270 let (tx, rx) = oneshot::channel();
1271 send_event(HummockEvent::RegisterReadVersion {
1272 table_id: table_id1,
1273 new_read_version_sender: tx,
1274 is_replicated: false,
1275 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1276 });
1277 rx.await.unwrap()
1278 };
1279
1280 let (read_version2, guard2) = {
1281 let (tx, rx) = oneshot::channel();
1282 send_event(HummockEvent::RegisterReadVersion {
1283 table_id: table_id2,
1284 new_read_version_sender: tx,
1285 is_replicated: false,
1286 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1287 });
1288 rx.await.unwrap()
1289 };
1290
1291 let (task1_1_finish_tx, task1_1_rx) = {
1293 start_epoch(table_id1, epoch1);
1294
1295 read_version1.write().init();
1296 init_epoch(&guard1, epoch1);
1297
1298 write_imm(&read_version1, &guard1, &imm1_1).await;
1299
1300 start_epoch(table_id1, epoch2);
1301
1302 seal_epoch(&guard1, epoch2);
1303
1304 let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1305 guard1.instance_id,
1306 vec![imm1_1.batch_id()],
1307 )]));
1308
1309 let mut rx = sync_epoch(table_id1, epoch1);
1310 wait_task_start.await;
1311 assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1312
1313 write_imm(&read_version1, &guard1, &imm1_2_1).await;
1314 flush_event().await;
1315
1316 (task_finish_tx, rx)
1317 };
1318 let (task1_2_finish_tx, _finish_txs) = {
1323 let mut finish_txs = vec![];
1324 let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1325 start_epoch(table_id2, epoch1);
1326 read_version2.write().init();
1327 init_epoch(&guard2, epoch1);
1328 let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1329 guard1.instance_id,
1330 vec![imm1_2_1.batch_id()],
1331 )]));
1332 write_imm(&read_version2, &guard2, &imm2_1_1).await;
1333 wait_task_start.await;
1334
1335 let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1336 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1337 guard2.instance_id,
1338 vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1339 )]));
1340 finish_txs.push(finish_tx);
1341 write_imm(&read_version2, &guard2, &imm2_1_2).await;
1342 wait_task_start.await;
1343
1344 let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1345 write_imm(&read_version2, &guard2, &imm2_1_3).await;
1346 start_epoch(table_id2, epoch2);
1347 seal_epoch(&guard2, epoch2);
1348 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1349 guard2.instance_id,
1350 vec![imm2_1_3.batch_id()],
1351 )]));
1352 finish_txs.push(finish_tx);
1353 let _sync_rx = sync_epoch(table_id2, epoch1);
1354 wait_task_start.await;
1355
1356 let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1357 write_imm(&read_version2, &guard2, &imm2_2_1).await;
1358 flush_event().await;
1359 let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1360 write_imm(&read_version2, &guard2, &imm2_2_2).await;
1361 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1362 guard2.instance_id,
1363 vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1364 )]));
1365 finish_txs.push(finish_tx);
1366 wait_task_start.await;
1367
1368 let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1369 write_imm(&read_version2, &guard2, &imm2_2_3).await;
1370
1371 drop(guard2);
1379 let (clear_tx, clear_rx) = oneshot::channel();
1380 send_event(HummockEvent::Clear(
1381 clear_tx,
1382 Some(HashSet::from_iter([table_id2])),
1383 ));
1384 clear_rx.await.unwrap();
1385 (task1_2_finish_tx, finish_txs)
1386 };
1387
1388 let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1389 write_imm(&read_version1, &guard1, &imm1_2_2).await;
1390 start_epoch(table_id1, epoch3);
1391 seal_epoch(&guard1, epoch3);
1392
1393 let (tx2, mut sync_rx2) = oneshot::channel();
1394 let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1395 guard1.instance_id,
1396 vec![imm1_2_2.batch_id()],
1397 )]));
1398 send_event(HummockEvent::SyncEpoch {
1399 sync_result_sender: tx2,
1400 sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1401 });
1402 wait_task_start.await;
1403 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1404
1405 task1_1_finish_tx.send(()).unwrap();
1406 let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1407 sync_data1
1408 .uploaded_ssts
1409 .iter()
1410 .all(|sst| sst.epochs() == &vec![epoch1]);
1411 task1_2_finish_tx.send(()).unwrap();
1412 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1413 task1_2_2_finish_tx.send(()).unwrap();
1414 let sync_data2 = sync_rx2.await.unwrap().unwrap();
1415 sync_data2
1416 .uploaded_ssts
1417 .iter()
1418 .all(|sst| sst.epochs() == &vec![epoch2]);
1419
1420 send_event(HummockEvent::Shutdown);
1421 join_handle.await.unwrap();
1422 }
1423}