1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::error::StorageResult;
44use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
45use crate::hummock::compactor::{
46 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
47};
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
49use crate::hummock::event_handler::{
50 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51};
52use crate::hummock::iterator::change_log::ChangeLogIterator;
53use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::observer_manager::HummockObserverNode;
56use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
57use crate::hummock::utils::{wait_for_epoch, wait_for_update};
58use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
59use crate::hummock::{
60 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
61 MemoryLimiter, SstableObjectIdManager, SstableObjectIdManagerRef, SstableStoreRef,
62};
63use crate::mem_table::ImmutableMemtable;
64use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
65use crate::opts::StorageOpts;
66use crate::store::*;
67
68struct HummockStorageShutdownGuard {
69 shutdown_sender: HummockEventSender,
70}
71
72impl Drop for HummockStorageShutdownGuard {
73 fn drop(&mut self) {
74 let _ = self
75 .shutdown_sender
76 .send(HummockEvent::Shutdown)
77 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
78 }
79}
80
81#[derive(Clone)]
87pub struct HummockStorage {
88 hummock_event_sender: HummockEventSender,
89 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
91
92 context: CompactorContext,
93
94 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
95
96 sstable_object_id_manager: SstableObjectIdManagerRef,
97
98 buffer_tracker: BufferTracker,
99
100 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
101
102 recent_versions: Arc<ArcSwap<RecentVersions>>,
103
104 hummock_version_reader: HummockVersionReader,
105
106 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
107
108 read_version_mapping: ReadOnlyReadVersionMapping,
109
110 backup_reader: BackupReaderRef,
111
112 write_limiter: WriteLimiterRef,
113
114 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
115
116 hummock_meta_client: Arc<dyn HummockMetaClient>,
117
118 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
119}
120
121pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
122
123pub fn get_committed_read_version_tuple(
124 version: PinnedVersion,
125 table_id: TableId,
126 mut key_range: TableKeyRange,
127 epoch: HummockEpoch,
128) -> (TableKeyRange, ReadVersionTuple) {
129 if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
130 TableWatermarksIndex::new_committed(
131 table_watermarks.clone(),
132 version
133 .state_table_info
134 .info()
135 .get(&table_id)
136 .expect("should exist when having table watermark")
137 .committed_epoch,
138 )
139 .rewrite_range_with_table_watermark(epoch, &mut key_range)
140 }
141 (key_range, (vec![], vec![], version))
142}
143
144impl HummockStorage {
145 #[allow(clippy::too_many_arguments)]
147 pub async fn new(
148 options: Arc<StorageOpts>,
149 sstable_store: SstableStoreRef,
150 hummock_meta_client: Arc<dyn HummockMetaClient>,
151 notification_client: impl NotificationClient,
152 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
153 state_store_metrics: Arc<HummockStateStoreMetrics>,
154 compactor_metrics: Arc<CompactorMetrics>,
155 await_tree_config: Option<await_tree::Config>,
156 ) -> HummockResult<Self> {
157 let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
158 hummock_meta_client.clone(),
159 options.sstable_id_remote_fetch_number,
160 ));
161 let backup_reader = BackupReader::new(
162 &options.backup_storage_url,
163 &options.backup_storage_directory,
164 &options.object_store_config,
165 )
166 .await
167 .map_err(HummockError::read_backup_error)?;
168 let write_limiter = Arc::new(WriteLimiter::default());
169 let (version_update_tx, mut version_update_rx) = unbounded_channel();
170
171 let observer_manager = ObserverManager::new(
172 notification_client,
173 HummockObserverNode::new(
174 compaction_catalog_manager_ref.clone(),
175 backup_reader.clone(),
176 version_update_tx.clone(),
177 write_limiter.clone(),
178 ),
179 )
180 .await;
181 observer_manager.start().await;
182
183 let hummock_version = match version_update_rx.recv().await {
184 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
185 _ => unreachable!(
186 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
187 ),
188 };
189
190 let (pin_version_tx, pin_version_rx) = unbounded_channel();
191 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
192 tokio::spawn(start_pinned_version_worker(
193 pin_version_rx,
194 hummock_meta_client.clone(),
195 options.max_version_pinning_duration_sec,
196 ));
197
198 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
199
200 let compactor_context = CompactorContext::new_local_compact_context(
201 options.clone(),
202 sstable_store.clone(),
203 compactor_metrics.clone(),
204 await_tree_reg.clone(),
205 );
206
207 let hummock_event_handler = HummockEventHandler::new(
208 version_update_rx,
209 pinned_version,
210 compactor_context.clone(),
211 compaction_catalog_manager_ref.clone(),
212 sstable_object_id_manager.clone(),
213 state_store_metrics.clone(),
214 );
215
216 let event_tx = hummock_event_handler.event_sender();
217
218 let instance = Self {
219 context: compactor_context,
220 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
221 sstable_object_id_manager,
222 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
223 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
224 hummock_event_sender: event_tx.clone(),
225 _version_update_sender: version_update_tx,
226 recent_versions: hummock_event_handler.recent_versions(),
227 hummock_version_reader: HummockVersionReader::new(
228 sstable_store,
229 state_store_metrics.clone(),
230 options.max_preload_io_retry_times,
231 ),
232 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
233 shutdown_sender: event_tx,
234 }),
235 read_version_mapping: hummock_event_handler.read_version_mapping(),
236 backup_reader,
237 write_limiter,
238 compact_await_tree_reg: await_tree_reg,
239 hummock_meta_client,
240 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
241 options.time_travel_version_cache_capacity,
242 )),
243 };
244
245 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
246
247 Ok(instance)
248 }
249}
250
251impl HummockStorageReadSnapshot {
252 async fn get_inner(
260 &self,
261 key: TableKey<Bytes>,
262 read_options: ReadOptions,
263 ) -> StorageResult<Option<StateStoreKeyedRow>> {
264 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
265
266 let (key_range, read_version_tuple) = self
267 .build_read_version_tuple(self.raw_epoch, key_range, &read_options)
268 .await?;
269
270 if is_empty_key_range(&key_range) {
271 return Ok(None);
272 }
273
274 self.hummock_version_reader
275 .get(key, self.raw_epoch, read_options, read_version_tuple)
276 .await
277 }
278
279 async fn iter_inner(
280 &self,
281 key_range: TableKeyRange,
282 read_options: ReadOptions,
283 ) -> StorageResult<HummockStorageIterator> {
284 let (key_range, read_version_tuple) = self
285 .build_read_version_tuple(self.raw_epoch, key_range, &read_options)
286 .await?;
287
288 self.hummock_version_reader
289 .iter(key_range, self.raw_epoch, read_options, read_version_tuple)
290 .await
291 }
292
293 async fn rev_iter_inner(
294 &self,
295 key_range: TableKeyRange,
296 read_options: ReadOptions,
297 ) -> StorageResult<HummockStorageRevIterator> {
298 let (key_range, read_version_tuple) = self
299 .build_read_version_tuple(self.raw_epoch, key_range, &read_options)
300 .await?;
301
302 self.hummock_version_reader
303 .rev_iter(
304 key_range,
305 self.raw_epoch,
306 read_options,
307 read_version_tuple,
308 None,
309 )
310 .await
311 }
312
313 async fn get_time_travel_version(
314 &self,
315 epoch: u64,
316 table_id: TableId,
317 ) -> StorageResult<PinnedVersion> {
318 let meta_client = self.hummock_meta_client.clone();
319 let fetch = async move {
320 let pb_version = meta_client
321 .get_version_by_epoch(epoch, table_id.table_id())
322 .await
323 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
324 .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
325 let version = HummockVersion::from_rpc_protobuf(&pb_version);
326 let (tx, _rx) = unbounded_channel();
327 Ok(PinnedVersion::new(version, tx))
328 };
329 let version = self
330 .simple_time_travel_version_cache
331 .get_or_insert(table_id.table_id, epoch, fetch)
332 .await?;
333 Ok(version)
334 }
335
336 async fn build_read_version_tuple(
337 &self,
338 epoch: u64,
339 key_range: TableKeyRange,
340 read_options: &ReadOptions,
341 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
342 if read_options.read_version_from_backup {
343 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
344 .await
345 } else if read_options.read_committed {
346 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
347 .await
348 } else {
349 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
350 }
351 }
352
353 async fn build_read_version_tuple_from_backup(
354 &self,
355 epoch: u64,
356 table_id: TableId,
357 key_range: TableKeyRange,
358 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
359 match self
360 .backup_reader
361 .try_get_hummock_version(table_id, epoch)
362 .await
363 {
364 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
365 backup_version,
366 table_id,
367 key_range,
368 epoch,
369 )),
370 Ok(None) => Err(HummockError::read_backup_error(format!(
371 "backup include epoch {} not found",
372 epoch
373 ))
374 .into()),
375 Err(e) => Err(e),
376 }
377 }
378
379 async fn build_read_version_tuple_from_committed(
380 &self,
381 epoch: u64,
382 table_id: TableId,
383 key_range: TableKeyRange,
384 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
385 let version = match self
386 .recent_versions
387 .load()
388 .get_safe_version(table_id, epoch)
389 {
390 Some(version) => version,
391 None => self.get_time_travel_version(epoch, table_id).await?,
392 };
393 Ok(get_committed_read_version_tuple(
394 version, table_id, key_range, epoch,
395 ))
396 }
397
398 fn build_read_version_tuple_from_all(
399 &self,
400 epoch: u64,
401 table_id: TableId,
402 key_range: TableKeyRange,
403 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
404 let pinned_version = self.recent_versions.load().latest_version().clone();
405 let info = pinned_version.state_table_info.info().get(&table_id);
406
407 let ret = if let Some(info) = info
409 && epoch <= info.committed_epoch
410 {
411 if epoch < info.committed_epoch {
412 return Err(
413 HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
414 );
415 }
416 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
418 } else {
419 let vnode = vnode(&key_range);
420 let mut matched_replicated_read_version_cnt = 0;
421 let read_version_vec = {
422 let read_guard = self.read_version_mapping.read();
423 read_guard
424 .get(&table_id)
425 .map(|v| {
426 v.values()
427 .filter(|v| {
428 let read_version = v.read();
429 if read_version.contains(vnode) {
430 if read_version.is_replicated() {
431 matched_replicated_read_version_cnt += 1;
432 false
433 } else {
434 true
436 }
437 } else {
438 false
439 }
440 })
441 .cloned()
442 .collect_vec()
443 })
444 .unwrap_or_default()
445 };
446
447 if read_version_vec.is_empty() {
450 let table_committed_epoch = info.map(|info| info.committed_epoch);
451 if matched_replicated_read_version_cnt > 0 {
452 tracing::warn!(
453 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
454 table_id,
455 vnode.to_index(),
456 epoch,
457 matched_replicated_read_version_cnt,
458 table_committed_epoch,
459 );
460 } else {
461 tracing::debug!(
462 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
463 table_id,
464 vnode.to_index(),
465 epoch,
466 table_committed_epoch
467 );
468 }
469 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
470 } else {
471 if read_version_vec.len() != 1 {
472 let read_version_vnodes = read_version_vec
473 .into_iter()
474 .map(|v| {
475 let v = v.read();
476 v.vnodes().iter_ones().collect_vec()
477 })
478 .collect_vec();
479 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
480 }
481 read_filter_for_version(
482 epoch,
483 table_id,
484 key_range,
485 read_version_vec.first().unwrap(),
486 )?
487 }
488 };
489
490 Ok(ret)
491 }
492}
493
494impl HummockStorage {
495 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
496 let (tx, rx) = tokio::sync::oneshot::channel();
497 self.hummock_event_sender
498 .send(HummockEvent::RegisterReadVersion {
499 table_id: option.table_id,
500 new_read_version_sender: tx,
501 is_replicated: option.is_replicated,
502 vnodes: option.vnodes.clone(),
503 })
504 .unwrap();
505
506 let (basic_read_version, instance_guard) = rx.await.unwrap();
507 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
508 LocalHummockStorage::new(
509 instance_guard,
510 basic_read_version,
511 self.hummock_version_reader.clone(),
512 self.hummock_event_sender.clone(),
513 self.buffer_tracker.get_memory_limiter().clone(),
514 self.write_limiter.clone(),
515 option,
516 version_update_notifier_tx,
517 self.context.storage_opts.mem_table_spill_threshold,
518 )
519 }
520
521 pub async fn clear_shared_buffer(&self) {
522 let (tx, rx) = oneshot::channel();
523 self.hummock_event_sender
524 .send(HummockEvent::Clear(tx, None))
525 .expect("should send success");
526 rx.await.expect("should wait success");
527 }
528
529 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
530 if !table_ids.is_empty() {
531 let (tx, rx) = oneshot::channel();
532 self.hummock_event_sender
533 .send(HummockEvent::Clear(tx, Some(table_ids)))
534 .expect("should send success");
535 rx.await.expect("should wait success");
536 }
537 }
538
539 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
543 let _ = self
544 .hummock_event_sender
545 .send(HummockEvent::StartEpoch { epoch, table_ids });
546 }
547
548 pub fn sstable_store(&self) -> SstableStoreRef {
549 self.context.sstable_store.clone()
550 }
551
552 pub fn sstable_object_id_manager(&self) -> &SstableObjectIdManagerRef {
553 &self.sstable_object_id_manager
554 }
555
556 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
557 self.compaction_catalog_manager_ref.clone()
558 }
559
560 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
561 self.buffer_tracker.get_memory_limiter().clone()
562 }
563
564 pub fn get_pinned_version(&self) -> PinnedVersion {
565 self.recent_versions.load().latest_version().clone()
566 }
567
568 pub fn backup_reader(&self) -> BackupReaderRef {
569 self.backup_reader.clone()
570 }
571
572 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
573 self.compact_await_tree_reg.as_ref()
574 }
575
576 pub async fn min_uncommitted_sst_id(&self) -> Option<HummockSstableObjectId> {
577 let (tx, rx) = oneshot::channel();
578 self.hummock_event_sender
579 .send(HummockEvent::GetMinUncommittedSstId { result_tx: tx })
580 .expect("should send success");
581 rx.await.expect("should await success")
582 }
583
584 pub async fn sync(
585 &self,
586 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
587 ) -> StorageResult<SyncResult> {
588 let (tx, rx) = oneshot::channel();
589 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
590 sync_result_sender: tx,
591 sync_table_epochs,
592 });
593 let synced_data = rx
594 .await
595 .map_err(|_| HummockError::other("failed to receive sync result"))??;
596 Ok(synced_data.into_sync_result())
597 }
598}
599
600#[derive(Clone)]
601pub struct HummockStorageReadSnapshot {
602 raw_epoch: u64,
603 table_id: TableId,
604 recent_versions: Arc<ArcSwap<RecentVersions>>,
605 hummock_version_reader: HummockVersionReader,
606 read_version_mapping: ReadOnlyReadVersionMapping,
607 backup_reader: BackupReaderRef,
608 hummock_meta_client: Arc<dyn HummockMetaClient>,
609 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
610}
611
612impl StateStoreRead for HummockStorageReadSnapshot {
613 type Iter = HummockStorageIterator;
614 type RevIter = HummockStorageRevIterator;
615
616 fn get_keyed_row(
617 &self,
618 key: TableKey<Bytes>,
619 read_options: ReadOptions,
620 ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + Send + '_ {
621 self.get_inner(key, read_options)
622 }
623
624 fn iter(
625 &self,
626 key_range: TableKeyRange,
627 read_options: ReadOptions,
628 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
629 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
630 assert_eq!(
631 r_vnode_exclusive - l_vnode_inclusive,
632 1,
633 "read range {:?} for table {} iter contains more than one vnode",
634 key_range,
635 self.table_id
636 );
637 self.iter_inner(key_range, read_options)
638 }
639
640 fn rev_iter(
641 &self,
642 key_range: TableKeyRange,
643 read_options: ReadOptions,
644 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
645 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
646 assert_eq!(
647 r_vnode_exclusive - l_vnode_inclusive,
648 1,
649 "read range {:?} for table {} iter contains more than one vnode",
650 key_range,
651 self.table_id
652 );
653 self.rev_iter_inner(key_range, read_options)
654 }
655}
656
657impl StateStoreReadLog for HummockStorage {
658 type ChangeLogIter = ChangeLogIterator;
659
660 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
661 fn next_epoch(
662 version: &LocalHummockVersion,
663 epoch: u64,
664 table_id: TableId,
665 ) -> HummockResult<Option<u64>> {
666 let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
667 HummockError::next_epoch(format!("table {} has been dropped", table_id))
668 })?;
669 table_change_log.next_epoch(epoch).map_err(|_| {
670 HummockError::next_epoch(format!(
671 "invalid epoch {}, change log epoch: {:?}",
672 epoch,
673 table_change_log.epochs().collect_vec()
674 ))
675 })
676 }
677 {
678 let recent_versions = self.recent_versions.load();
680 if let Some(next_epoch) =
681 next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
682 {
683 return Ok(next_epoch);
684 }
685 }
686 let mut next_epoch_ret = None;
687 wait_for_update(
688 &self.version_update_notifier_tx,
689 |version| {
690 if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
691 next_epoch_ret = Some(next_epoch);
692 Ok(true)
693 } else {
694 Ok(false)
695 }
696 },
697 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
698 )
699 .await?;
700 Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
701 }
702
703 async fn iter_log(
704 &self,
705 epoch_range: (u64, u64),
706 key_range: TableKeyRange,
707 options: ReadLogOptions,
708 ) -> StorageResult<Self::ChangeLogIter> {
709 let version = self.recent_versions.load().latest_version().clone();
710 let iter = self
711 .hummock_version_reader
712 .iter_log(version, epoch_range, key_range, options)
713 .await?;
714 Ok(iter)
715 }
716}
717
718impl HummockStorage {
719 async fn try_wait_epoch_impl(
722 &self,
723 wait_epoch: HummockReadEpoch,
724 table_id: TableId,
725 ) -> StorageResult<()> {
726 match wait_epoch {
727 HummockReadEpoch::Committed(wait_epoch) => {
728 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
729 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
730 }
731 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
732 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
733 {
735 let recent_versions = self.recent_versions.load();
736 let latest_version = recent_versions.latest_version();
737 if latest_version.id >= wait_version_id
738 && let Some(committed_epoch) =
739 latest_version.table_committed_epoch(table_id)
740 && committed_epoch >= wait_epoch
741 {
742 return Ok(());
743 }
744 }
745 wait_for_update(
746 &self.version_update_notifier_tx,
747 |version| {
748 if wait_version_id > version.id() {
749 return Ok(false);
750 }
751 let committed_epoch =
752 version.table_committed_epoch(table_id).ok_or_else(|| {
753 HummockError::wait_epoch(format!(
757 "table id {} has been dropped",
758 table_id
759 ))
760 })?;
761 Ok(committed_epoch >= wait_epoch)
762 },
763 || {
764 format!(
765 "try_wait_epoch: epoch: {}, version_id: {:?}",
766 wait_epoch, wait_version_id
767 )
768 },
769 )
770 .await?;
771 }
772 _ => {}
773 };
774 Ok(())
775 }
776}
777
778impl StateStore for HummockStorage {
779 type Local = LocalHummockStorage;
780 type ReadSnapshot = HummockStorageReadSnapshot;
781
782 async fn try_wait_epoch(
785 &self,
786 wait_epoch: HummockReadEpoch,
787 options: TryWaitEpochOptions,
788 ) -> StorageResult<()> {
789 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
790 }
791
792 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
793 self.new_local_inner(option)
794 }
795
796 async fn new_read_snapshot(
797 &self,
798 epoch: HummockReadEpoch,
799 options: NewReadSnapshotOptions,
800 ) -> StorageResult<Self::ReadSnapshot> {
801 self.try_wait_epoch_impl(epoch, options.table_id).await?;
802 Ok(HummockStorageReadSnapshot {
803 raw_epoch: epoch.get_epoch(),
804 table_id: options.table_id,
805 recent_versions: self.recent_versions.clone(),
806 hummock_version_reader: self.hummock_version_reader.clone(),
807 read_version_mapping: self.read_version_mapping.clone(),
808 backup_reader: self.backup_reader.clone(),
809 hummock_meta_client: self.hummock_meta_client.clone(),
810 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
811 })
812 }
813}
814
815#[cfg(any(test, feature = "test"))]
816impl HummockStorage {
817 pub async fn seal_and_sync_epoch(
818 &self,
819 epoch: u64,
820 table_ids: HashSet<TableId>,
821 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
822 self.sync(vec![(epoch, table_ids)]).await
823 }
824
825 pub async fn update_version_and_wait(&self, version: HummockVersion) {
827 use tokio::task::yield_now;
828 let version_id = version.id;
829 self._version_update_sender
830 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
831 .unwrap();
832 loop {
833 if self.recent_versions.load().latest_version().id() >= version_id {
834 break;
835 }
836
837 yield_now().await
838 }
839 }
840
841 pub async fn wait_version(&self, version: HummockVersion) {
842 use tokio::task::yield_now;
843 loop {
844 if self.recent_versions.load().latest_version().id() >= version.id {
845 break;
846 }
847
848 yield_now().await
849 }
850 }
851
852 pub fn get_shared_buffer_size(&self) -> usize {
853 self.buffer_tracker.get_buffer_size()
854 }
855
856 pub async fn for_test(
858 options: Arc<StorageOpts>,
859 sstable_store: SstableStoreRef,
860 hummock_meta_client: Arc<dyn HummockMetaClient>,
861 notification_client: impl NotificationClient,
862 ) -> HummockResult<Self> {
863 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
864 FakeRemoteTableAccessor {},
865 )));
866
867 Self::new(
868 options,
869 sstable_store,
870 hummock_meta_client,
871 notification_client,
872 compaction_catalog_manager,
873 Arc::new(HummockStateStoreMetrics::unused()),
874 Arc::new(CompactorMetrics::unused()),
875 None,
876 )
877 .await
878 }
879
880 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
881 &self.context.storage_opts
882 }
883
884 pub fn version_reader(&self) -> &HummockVersionReader {
885 &self.hummock_version_reader
886 }
887
888 pub async fn wait_version_update(
889 &self,
890 old_id: risingwave_hummock_sdk::HummockVersionId,
891 ) -> risingwave_hummock_sdk::HummockVersionId {
892 use tokio::task::yield_now;
893 loop {
894 let cur_id = self.recent_versions.load().latest_version().id();
895 if cur_id > old_id {
896 return cur_id;
897 }
898 yield_now().await;
899 }
900 }
901}