1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::error::StorageResult;
44use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
45use crate::hummock::compactor::{
46 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
47};
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
49use crate::hummock::event_handler::{
50 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51};
52use crate::hummock::iterator::change_log::ChangeLogIterator;
53use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::observer_manager::HummockObserverNode;
56use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
57use crate::hummock::utils::{wait_for_epoch, wait_for_update};
58use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
59use crate::hummock::{
60 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
61 MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
62};
63use crate::mem_table::ImmutableMemtable;
64use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
65use crate::opts::StorageOpts;
66use crate::panic_store::PanicStateStore;
67use crate::store::*;
68
69struct HummockStorageShutdownGuard {
70 shutdown_sender: HummockEventSender,
71}
72
73impl Drop for HummockStorageShutdownGuard {
74 fn drop(&mut self) {
75 let _ = self
76 .shutdown_sender
77 .send(HummockEvent::Shutdown)
78 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
79 }
80}
81
82#[derive(Clone)]
88pub struct HummockStorage {
89 hummock_event_sender: HummockEventSender,
90 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
92
93 context: CompactorContext,
94
95 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
96
97 object_id_manager: ObjectIdManagerRef,
98
99 buffer_tracker: BufferTracker,
100
101 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
102
103 recent_versions: Arc<ArcSwap<RecentVersions>>,
104
105 hummock_version_reader: HummockVersionReader,
106
107 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
108
109 read_version_mapping: ReadOnlyReadVersionMapping,
110
111 backup_reader: BackupReaderRef,
112
113 write_limiter: WriteLimiterRef,
114
115 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
116
117 hummock_meta_client: Arc<dyn HummockMetaClient>,
118
119 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
120}
121
122pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
123
124pub fn get_committed_read_version_tuple(
125 version: PinnedVersion,
126 table_id: TableId,
127 mut key_range: TableKeyRange,
128 epoch: HummockEpoch,
129) -> (TableKeyRange, ReadVersionTuple) {
130 if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
131 TableWatermarksIndex::new_committed(
132 table_watermarks.clone(),
133 version
134 .state_table_info
135 .info()
136 .get(&table_id)
137 .expect("should exist when having table watermark")
138 .committed_epoch,
139 )
140 .rewrite_range_with_table_watermark(epoch, &mut key_range)
141 }
142 (key_range, (vec![], vec![], version))
143}
144
145impl HummockStorage {
146 #[allow(clippy::too_many_arguments)]
148 pub async fn new(
149 options: Arc<StorageOpts>,
150 sstable_store: SstableStoreRef,
151 hummock_meta_client: Arc<dyn HummockMetaClient>,
152 notification_client: impl NotificationClient,
153 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
154 state_store_metrics: Arc<HummockStateStoreMetrics>,
155 compactor_metrics: Arc<CompactorMetrics>,
156 await_tree_config: Option<await_tree::Config>,
157 ) -> HummockResult<Self> {
158 let object_id_manager = Arc::new(ObjectIdManager::new(
159 hummock_meta_client.clone(),
160 options.sstable_id_remote_fetch_number,
161 ));
162 let backup_reader = BackupReader::new(
163 &options.backup_storage_url,
164 &options.backup_storage_directory,
165 &options.object_store_config,
166 )
167 .await
168 .map_err(HummockError::read_backup_error)?;
169 let write_limiter = Arc::new(WriteLimiter::default());
170 let (version_update_tx, mut version_update_rx) = unbounded_channel();
171
172 let observer_manager = ObserverManager::new(
173 notification_client,
174 HummockObserverNode::new(
175 compaction_catalog_manager_ref.clone(),
176 backup_reader.clone(),
177 version_update_tx.clone(),
178 write_limiter.clone(),
179 ),
180 )
181 .await;
182 observer_manager.start().await;
183
184 let hummock_version = match version_update_rx.recv().await {
185 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
186 _ => unreachable!(
187 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
188 ),
189 };
190
191 let (pin_version_tx, pin_version_rx) = unbounded_channel();
192 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
193 tokio::spawn(start_pinned_version_worker(
194 pin_version_rx,
195 hummock_meta_client.clone(),
196 options.max_version_pinning_duration_sec,
197 ));
198
199 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
200
201 let compactor_context = CompactorContext::new_local_compact_context(
202 options.clone(),
203 sstable_store.clone(),
204 compactor_metrics.clone(),
205 await_tree_reg.clone(),
206 );
207
208 let hummock_event_handler = HummockEventHandler::new(
209 version_update_rx,
210 pinned_version,
211 compactor_context.clone(),
212 compaction_catalog_manager_ref.clone(),
213 object_id_manager.clone(),
214 state_store_metrics.clone(),
215 );
216
217 let event_tx = hummock_event_handler.event_sender();
218
219 let instance = Self {
220 context: compactor_context,
221 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
222 object_id_manager,
223 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
224 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
225 hummock_event_sender: event_tx.clone(),
226 _version_update_sender: version_update_tx,
227 recent_versions: hummock_event_handler.recent_versions(),
228 hummock_version_reader: HummockVersionReader::new(
229 sstable_store,
230 state_store_metrics.clone(),
231 options.max_preload_io_retry_times,
232 ),
233 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
234 shutdown_sender: event_tx,
235 }),
236 read_version_mapping: hummock_event_handler.read_version_mapping(),
237 backup_reader,
238 write_limiter,
239 compact_await_tree_reg: await_tree_reg,
240 hummock_meta_client,
241 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
242 options.time_travel_version_cache_capacity,
243 )),
244 };
245
246 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
247
248 Ok(instance)
249 }
250}
251
252impl HummockStorageReadSnapshot {
253 async fn get_inner<O>(
261 &self,
262 key: TableKey<Bytes>,
263 read_options: ReadOptions,
264 on_key_value_fn: impl KeyValueFn<O>,
265 ) -> StorageResult<Option<O>> {
266 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
267
268 let (key_range, read_version_tuple) =
269 self.build_read_version_tuple(self.epoch, key_range).await?;
270
271 if is_empty_key_range(&key_range) {
272 return Ok(None);
273 }
274
275 self.hummock_version_reader
276 .get(
277 key,
278 self.epoch.get_epoch(),
279 self.table_id,
280 read_options,
281 read_version_tuple,
282 on_key_value_fn,
283 )
284 .await
285 }
286
287 async fn iter_inner(
288 &self,
289 key_range: TableKeyRange,
290 read_options: ReadOptions,
291 ) -> StorageResult<HummockStorageIterator> {
292 let (key_range, read_version_tuple) =
293 self.build_read_version_tuple(self.epoch, key_range).await?;
294
295 self.hummock_version_reader
296 .iter(
297 key_range,
298 self.epoch.get_epoch(),
299 self.table_id,
300 read_options,
301 read_version_tuple,
302 )
303 .await
304 }
305
306 async fn rev_iter_inner(
307 &self,
308 key_range: TableKeyRange,
309 read_options: ReadOptions,
310 ) -> StorageResult<HummockStorageRevIterator> {
311 let (key_range, read_version_tuple) =
312 self.build_read_version_tuple(self.epoch, key_range).await?;
313
314 self.hummock_version_reader
315 .rev_iter(
316 key_range,
317 self.epoch.get_epoch(),
318 self.table_id,
319 read_options,
320 read_version_tuple,
321 None,
322 )
323 .await
324 }
325
326 async fn get_time_travel_version(
327 &self,
328 epoch: u64,
329 table_id: TableId,
330 ) -> StorageResult<PinnedVersion> {
331 let meta_client = self.hummock_meta_client.clone();
332 let fetch = async move {
333 let pb_version = meta_client
334 .get_version_by_epoch(epoch, table_id.table_id())
335 .await
336 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
337 .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
338 let version = HummockVersion::from_rpc_protobuf(&pb_version);
339 let (tx, _rx) = unbounded_channel();
340 Ok(PinnedVersion::new(version, tx))
341 };
342 let version = self
343 .simple_time_travel_version_cache
344 .get_or_insert(table_id.table_id, epoch, fetch)
345 .await?;
346 Ok(version)
347 }
348
349 async fn build_read_version_tuple(
350 &self,
351 epoch: HummockReadEpoch,
352 key_range: TableKeyRange,
353 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
354 match epoch {
355 HummockReadEpoch::Backup(epoch) => {
356 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
357 .await
358 }
359 HummockReadEpoch::Committed(epoch)
360 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
361 | HummockReadEpoch::TimeTravel(epoch) => {
362 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
363 .await
364 }
365 HummockReadEpoch::NoWait(epoch) => {
366 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
367 }
368 }
369 }
370
371 async fn build_read_version_tuple_from_backup(
372 &self,
373 epoch: u64,
374 table_id: TableId,
375 key_range: TableKeyRange,
376 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
377 match self
378 .backup_reader
379 .try_get_hummock_version(table_id, epoch)
380 .await
381 {
382 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
383 backup_version,
384 table_id,
385 key_range,
386 epoch,
387 )),
388 Ok(None) => Err(HummockError::read_backup_error(format!(
389 "backup include epoch {} not found",
390 epoch
391 ))
392 .into()),
393 Err(e) => Err(e),
394 }
395 }
396
397 async fn build_read_version_tuple_from_committed(
398 &self,
399 epoch: u64,
400 table_id: TableId,
401 key_range: TableKeyRange,
402 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
403 let version = match self
404 .recent_versions
405 .load()
406 .get_safe_version(table_id, epoch)
407 {
408 Some(version) => version,
409 None => self.get_time_travel_version(epoch, table_id).await?,
410 };
411 Ok(get_committed_read_version_tuple(
412 version, table_id, key_range, epoch,
413 ))
414 }
415
416 fn build_read_version_tuple_from_all(
417 &self,
418 epoch: u64,
419 table_id: TableId,
420 key_range: TableKeyRange,
421 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
422 let pinned_version = self.recent_versions.load().latest_version().clone();
423 let info = pinned_version.state_table_info.info().get(&table_id);
424
425 let ret = if let Some(info) = info
427 && epoch <= info.committed_epoch
428 {
429 if epoch < info.committed_epoch {
430 return Err(
431 HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
432 );
433 }
434 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
436 } else {
437 let vnode = vnode(&key_range);
438 let mut matched_replicated_read_version_cnt = 0;
439 let read_version_vec = {
440 let read_guard = self.read_version_mapping.read();
441 read_guard
442 .get(&table_id)
443 .map(|v| {
444 v.values()
445 .filter(|v| {
446 let read_version = v.read();
447 if read_version.contains(vnode) {
448 if read_version.is_replicated() {
449 matched_replicated_read_version_cnt += 1;
450 false
451 } else {
452 true
454 }
455 } else {
456 false
457 }
458 })
459 .cloned()
460 .collect_vec()
461 })
462 .unwrap_or_default()
463 };
464
465 if read_version_vec.is_empty() {
468 let table_committed_epoch = info.map(|info| info.committed_epoch);
469 if matched_replicated_read_version_cnt > 0 {
470 tracing::warn!(
471 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
472 table_id,
473 vnode.to_index(),
474 epoch,
475 matched_replicated_read_version_cnt,
476 table_committed_epoch,
477 );
478 } else {
479 tracing::debug!(
480 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
481 table_id,
482 vnode.to_index(),
483 epoch,
484 table_committed_epoch
485 );
486 }
487 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
488 } else {
489 if read_version_vec.len() != 1 {
490 let read_version_vnodes = read_version_vec
491 .into_iter()
492 .map(|v| {
493 let v = v.read();
494 v.vnodes().iter_ones().collect_vec()
495 })
496 .collect_vec();
497 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
498 }
499 read_filter_for_version(
500 epoch,
501 table_id,
502 key_range,
503 read_version_vec.first().unwrap(),
504 )?
505 }
506 };
507
508 Ok(ret)
509 }
510}
511
512impl HummockStorage {
513 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
514 let (tx, rx) = tokio::sync::oneshot::channel();
515 self.hummock_event_sender
516 .send(HummockEvent::RegisterReadVersion {
517 table_id: option.table_id,
518 new_read_version_sender: tx,
519 is_replicated: option.is_replicated,
520 vnodes: option.vnodes.clone(),
521 })
522 .unwrap();
523
524 let (basic_read_version, instance_guard) = rx.await.unwrap();
525 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
526 LocalHummockStorage::new(
527 instance_guard,
528 basic_read_version,
529 self.hummock_version_reader.clone(),
530 self.hummock_event_sender.clone(),
531 self.buffer_tracker.get_memory_limiter().clone(),
532 self.write_limiter.clone(),
533 option,
534 version_update_notifier_tx,
535 self.context.storage_opts.mem_table_spill_threshold,
536 )
537 }
538
539 pub async fn clear_shared_buffer(&self) {
540 let (tx, rx) = oneshot::channel();
541 self.hummock_event_sender
542 .send(HummockEvent::Clear(tx, None))
543 .expect("should send success");
544 rx.await.expect("should wait success");
545 }
546
547 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
548 if !table_ids.is_empty() {
549 let (tx, rx) = oneshot::channel();
550 self.hummock_event_sender
551 .send(HummockEvent::Clear(tx, Some(table_ids)))
552 .expect("should send success");
553 rx.await.expect("should wait success");
554 }
555 }
556
557 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
561 let _ = self
562 .hummock_event_sender
563 .send(HummockEvent::StartEpoch { epoch, table_ids });
564 }
565
566 pub fn sstable_store(&self) -> SstableStoreRef {
567 self.context.sstable_store.clone()
568 }
569
570 pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
571 &self.object_id_manager
572 }
573
574 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
575 self.compaction_catalog_manager_ref.clone()
576 }
577
578 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
579 self.buffer_tracker.get_memory_limiter().clone()
580 }
581
582 pub fn get_pinned_version(&self) -> PinnedVersion {
583 self.recent_versions.load().latest_version().clone()
584 }
585
586 pub fn backup_reader(&self) -> BackupReaderRef {
587 self.backup_reader.clone()
588 }
589
590 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
591 self.compact_await_tree_reg.as_ref()
592 }
593
594 pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
595 let (tx, rx) = oneshot::channel();
596 self.hummock_event_sender
597 .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
598 .expect("should send success");
599 rx.await.expect("should await success")
600 }
601
602 pub async fn sync(
603 &self,
604 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
605 ) -> StorageResult<SyncResult> {
606 let (tx, rx) = oneshot::channel();
607 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
608 sync_result_sender: tx,
609 sync_table_epochs,
610 });
611 let synced_data = rx
612 .await
613 .map_err(|_| HummockError::other("failed to receive sync result"))??;
614 Ok(synced_data.into_sync_result())
615 }
616}
617
618#[derive(Clone)]
619pub struct HummockStorageReadSnapshot {
620 epoch: HummockReadEpoch,
621 table_id: TableId,
622 recent_versions: Arc<ArcSwap<RecentVersions>>,
623 hummock_version_reader: HummockVersionReader,
624 read_version_mapping: ReadOnlyReadVersionMapping,
625 backup_reader: BackupReaderRef,
626 hummock_meta_client: Arc<dyn HummockMetaClient>,
627 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
628}
629
630impl StateStoreGet for HummockStorageReadSnapshot {
631 fn on_key_value<O: Send + 'static>(
632 &self,
633 key: TableKey<Bytes>,
634 read_options: ReadOptions,
635 on_key_value_fn: impl KeyValueFn<O>,
636 ) -> impl StorageFuture<'_, Option<O>> {
637 self.get_inner(key, read_options, on_key_value_fn)
638 }
639}
640
641impl StateStoreRead for HummockStorageReadSnapshot {
642 type Iter = HummockStorageIterator;
643 type RevIter = HummockStorageRevIterator;
644
645 fn iter(
646 &self,
647 key_range: TableKeyRange,
648 read_options: ReadOptions,
649 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
650 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
651 assert_eq!(
652 r_vnode_exclusive - l_vnode_inclusive,
653 1,
654 "read range {:?} for table {} iter contains more than one vnode",
655 key_range,
656 self.table_id
657 );
658 self.iter_inner(key_range, read_options)
659 }
660
661 fn rev_iter(
662 &self,
663 key_range: TableKeyRange,
664 read_options: ReadOptions,
665 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
666 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
667 assert_eq!(
668 r_vnode_exclusive - l_vnode_inclusive,
669 1,
670 "read range {:?} for table {} iter contains more than one vnode",
671 key_range,
672 self.table_id
673 );
674 self.rev_iter_inner(key_range, read_options)
675 }
676}
677
678impl StateStoreReadVector for HummockStorageReadSnapshot {
679 async fn nearest<O: Send + 'static>(
680 &self,
681 _vec: Vector,
682 _options: VectorNearestOptions,
683 _on_nearest_item_fn: impl OnNearestItemFn<O>,
684 ) -> StorageResult<Vec<O>> {
685 unimplemented!()
686 }
687}
688
689impl StateStoreReadLog for HummockStorage {
690 type ChangeLogIter = ChangeLogIterator;
691
692 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
693 fn next_epoch(
694 version: &LocalHummockVersion,
695 epoch: u64,
696 table_id: TableId,
697 ) -> HummockResult<Option<u64>> {
698 let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
699 HummockError::next_epoch(format!("table {} has been dropped", table_id))
700 })?;
701 table_change_log.next_epoch(epoch).map_err(|_| {
702 HummockError::next_epoch(format!(
703 "invalid epoch {}, change log epoch: {:?}",
704 epoch,
705 table_change_log.epochs().collect_vec()
706 ))
707 })
708 }
709 {
710 let recent_versions = self.recent_versions.load();
712 if let Some(next_epoch) =
713 next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
714 {
715 return Ok(next_epoch);
716 }
717 }
718 let mut next_epoch_ret = None;
719 wait_for_update(
720 &self.version_update_notifier_tx,
721 |version| {
722 if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
723 next_epoch_ret = Some(next_epoch);
724 Ok(true)
725 } else {
726 Ok(false)
727 }
728 },
729 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
730 )
731 .await?;
732 Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
733 }
734
735 async fn iter_log(
736 &self,
737 epoch_range: (u64, u64),
738 key_range: TableKeyRange,
739 options: ReadLogOptions,
740 ) -> StorageResult<Self::ChangeLogIter> {
741 let version = self.recent_versions.load().latest_version().clone();
742 let iter = self
743 .hummock_version_reader
744 .iter_log(version, epoch_range, key_range, options)
745 .await?;
746 Ok(iter)
747 }
748}
749
750impl HummockStorage {
751 async fn try_wait_epoch_impl(
754 &self,
755 wait_epoch: HummockReadEpoch,
756 table_id: TableId,
757 ) -> StorageResult<()> {
758 match wait_epoch {
759 HummockReadEpoch::Committed(wait_epoch) => {
760 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
761 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
762 }
763 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
764 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
765 {
767 let recent_versions = self.recent_versions.load();
768 let latest_version = recent_versions.latest_version();
769 if latest_version.id >= wait_version_id
770 && let Some(committed_epoch) =
771 latest_version.table_committed_epoch(table_id)
772 && committed_epoch >= wait_epoch
773 {
774 return Ok(());
775 }
776 }
777 wait_for_update(
778 &self.version_update_notifier_tx,
779 |version| {
780 if wait_version_id > version.id() {
781 return Ok(false);
782 }
783 let committed_epoch =
784 version.table_committed_epoch(table_id).ok_or_else(|| {
785 HummockError::wait_epoch(format!(
789 "table id {} has been dropped",
790 table_id
791 ))
792 })?;
793 Ok(committed_epoch >= wait_epoch)
794 },
795 || {
796 format!(
797 "try_wait_epoch: epoch: {}, version_id: {:?}",
798 wait_epoch, wait_version_id
799 )
800 },
801 )
802 .await?;
803 }
804 _ => {}
805 };
806 Ok(())
807 }
808}
809
810impl StateStore for HummockStorage {
811 type Local = LocalHummockStorage;
812 type ReadSnapshot = HummockStorageReadSnapshot;
813 type VectorWriter = PanicStateStore;
814
815 async fn try_wait_epoch(
818 &self,
819 wait_epoch: HummockReadEpoch,
820 options: TryWaitEpochOptions,
821 ) -> StorageResult<()> {
822 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
823 }
824
825 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
826 self.new_local_inner(option)
827 }
828
829 async fn new_read_snapshot(
830 &self,
831 epoch: HummockReadEpoch,
832 options: NewReadSnapshotOptions,
833 ) -> StorageResult<Self::ReadSnapshot> {
834 self.try_wait_epoch_impl(epoch, options.table_id).await?;
835 Ok(HummockStorageReadSnapshot {
836 epoch,
837 table_id: options.table_id,
838 recent_versions: self.recent_versions.clone(),
839 hummock_version_reader: self.hummock_version_reader.clone(),
840 read_version_mapping: self.read_version_mapping.clone(),
841 backup_reader: self.backup_reader.clone(),
842 hummock_meta_client: self.hummock_meta_client.clone(),
843 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
844 })
845 }
846
847 async fn new_vector_writer(&self, _options: NewVectorWriterOptions) -> Self::VectorWriter {
848 unimplemented!()
849 }
850}
851
852#[cfg(any(test, feature = "test"))]
853impl HummockStorage {
854 pub async fn seal_and_sync_epoch(
855 &self,
856 epoch: u64,
857 table_ids: HashSet<TableId>,
858 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
859 self.sync(vec![(epoch, table_ids)]).await
860 }
861
862 pub async fn update_version_and_wait(&self, version: HummockVersion) {
864 use tokio::task::yield_now;
865 let version_id = version.id;
866 self._version_update_sender
867 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
868 .unwrap();
869 loop {
870 if self.recent_versions.load().latest_version().id() >= version_id {
871 break;
872 }
873
874 yield_now().await
875 }
876 }
877
878 pub async fn wait_version(&self, version: HummockVersion) {
879 use tokio::task::yield_now;
880 loop {
881 if self.recent_versions.load().latest_version().id() >= version.id {
882 break;
883 }
884
885 yield_now().await
886 }
887 }
888
889 pub fn get_shared_buffer_size(&self) -> usize {
890 self.buffer_tracker.get_buffer_size()
891 }
892
893 pub async fn for_test(
895 options: Arc<StorageOpts>,
896 sstable_store: SstableStoreRef,
897 hummock_meta_client: Arc<dyn HummockMetaClient>,
898 notification_client: impl NotificationClient,
899 ) -> HummockResult<Self> {
900 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
901 FakeRemoteTableAccessor {},
902 )));
903
904 Self::new(
905 options,
906 sstable_store,
907 hummock_meta_client,
908 notification_client,
909 compaction_catalog_manager,
910 Arc::new(HummockStateStoreMetrics::unused()),
911 Arc::new(CompactorMetrics::unused()),
912 None,
913 )
914 .await
915 }
916
917 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
918 &self.context.storage_opts
919 }
920
921 pub fn version_reader(&self) -> &HummockVersionReader {
922 &self.hummock_version_reader
923 }
924
925 pub async fn wait_version_update(
926 &self,
927 old_id: risingwave_hummock_sdk::HummockVersionId,
928 ) -> risingwave_hummock_sdk::HummockVersionId {
929 use tokio::task::yield_now;
930 loop {
931 let cur_id = self.recent_versions.load().latest_version().id();
932 if cur_id > old_id {
933 return cur_id;
934 }
935 yield_now().await;
936 }
937 }
938}