1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::{PkPrefixTableWatermarksIndex, WatermarkSerdeType};
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::error::StorageResult;
44use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
45use crate::hummock::compactor::{
46 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
47};
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
49use crate::hummock::event_handler::{
50 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51};
52use crate::hummock::iterator::change_log::ChangeLogIterator;
53use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::observer_manager::HummockObserverNode;
56use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
57use crate::hummock::utils::{wait_for_epoch, wait_for_update};
58use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
59use crate::hummock::{
60 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
61 MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
62};
63use crate::mem_table::ImmutableMemtable;
64use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
65use crate::opts::StorageOpts;
66use crate::panic_store::PanicStateStore;
67use crate::store::*;
68
69struct HummockStorageShutdownGuard {
70 shutdown_sender: HummockEventSender,
71}
72
73impl Drop for HummockStorageShutdownGuard {
74 fn drop(&mut self) {
75 let _ = self
76 .shutdown_sender
77 .send(HummockEvent::Shutdown)
78 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
79 }
80}
81
82#[derive(Clone)]
88pub struct HummockStorage {
89 hummock_event_sender: HummockEventSender,
90 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
92
93 context: CompactorContext,
94
95 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
96
97 object_id_manager: ObjectIdManagerRef,
98
99 buffer_tracker: BufferTracker,
100
101 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
102
103 recent_versions: Arc<ArcSwap<RecentVersions>>,
104
105 hummock_version_reader: HummockVersionReader,
106
107 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
108
109 read_version_mapping: ReadOnlyReadVersionMapping,
110
111 backup_reader: BackupReaderRef,
112
113 write_limiter: WriteLimiterRef,
114
115 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
116
117 hummock_meta_client: Arc<dyn HummockMetaClient>,
118
119 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
120}
121
122pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
123
124pub fn get_committed_read_version_tuple(
125 version: PinnedVersion,
126 table_id: TableId,
127 mut key_range: TableKeyRange,
128 epoch: HummockEpoch,
129) -> (TableKeyRange, ReadVersionTuple) {
130 if let Some(table_watermarks) = version.table_watermarks.get(&table_id)
131 && let WatermarkSerdeType::PkPrefix = table_watermarks.watermark_type
132 {
133 PkPrefixTableWatermarksIndex::new_committed(
134 table_watermarks.clone(),
135 version
136 .state_table_info
137 .info()
138 .get(&table_id)
139 .expect("should exist when having table watermark")
140 .committed_epoch,
141 )
142 .rewrite_range_with_table_watermark(epoch, &mut key_range)
143 }
144 (key_range, (vec![], vec![], version))
145}
146
147impl HummockStorage {
148 #[allow(clippy::too_many_arguments)]
150 pub async fn new(
151 options: Arc<StorageOpts>,
152 sstable_store: SstableStoreRef,
153 hummock_meta_client: Arc<dyn HummockMetaClient>,
154 notification_client: impl NotificationClient,
155 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
156 state_store_metrics: Arc<HummockStateStoreMetrics>,
157 compactor_metrics: Arc<CompactorMetrics>,
158 await_tree_config: Option<await_tree::Config>,
159 ) -> HummockResult<Self> {
160 let object_id_manager = Arc::new(ObjectIdManager::new(
161 hummock_meta_client.clone(),
162 options.sstable_id_remote_fetch_number,
163 ));
164 let backup_reader = BackupReader::new(
165 &options.backup_storage_url,
166 &options.backup_storage_directory,
167 &options.object_store_config,
168 )
169 .await
170 .map_err(HummockError::read_backup_error)?;
171 let write_limiter = Arc::new(WriteLimiter::default());
172 let (version_update_tx, mut version_update_rx) = unbounded_channel();
173
174 let observer_manager = ObserverManager::new(
175 notification_client,
176 HummockObserverNode::new(
177 compaction_catalog_manager_ref.clone(),
178 backup_reader.clone(),
179 version_update_tx.clone(),
180 write_limiter.clone(),
181 ),
182 )
183 .await;
184 observer_manager.start().await;
185
186 let hummock_version = match version_update_rx.recv().await {
187 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
188 _ => unreachable!(
189 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
190 ),
191 };
192
193 let (pin_version_tx, pin_version_rx) = unbounded_channel();
194 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
195 tokio::spawn(start_pinned_version_worker(
196 pin_version_rx,
197 hummock_meta_client.clone(),
198 options.max_version_pinning_duration_sec,
199 ));
200
201 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
202
203 let compactor_context = CompactorContext::new_local_compact_context(
204 options.clone(),
205 sstable_store.clone(),
206 compactor_metrics.clone(),
207 await_tree_reg.clone(),
208 );
209
210 let hummock_event_handler = HummockEventHandler::new(
211 version_update_rx,
212 pinned_version,
213 compactor_context.clone(),
214 compaction_catalog_manager_ref.clone(),
215 object_id_manager.clone(),
216 state_store_metrics.clone(),
217 );
218
219 let event_tx = hummock_event_handler.event_sender();
220
221 let instance = Self {
222 context: compactor_context,
223 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
224 object_id_manager,
225 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
226 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
227 hummock_event_sender: event_tx.clone(),
228 _version_update_sender: version_update_tx,
229 recent_versions: hummock_event_handler.recent_versions(),
230 hummock_version_reader: HummockVersionReader::new(
231 sstable_store,
232 state_store_metrics.clone(),
233 options.max_preload_io_retry_times,
234 ),
235 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
236 shutdown_sender: event_tx,
237 }),
238 read_version_mapping: hummock_event_handler.read_version_mapping(),
239 backup_reader,
240 write_limiter,
241 compact_await_tree_reg: await_tree_reg,
242 hummock_meta_client,
243 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
244 options.time_travel_version_cache_capacity,
245 )),
246 };
247
248 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
249
250 Ok(instance)
251 }
252}
253
254impl HummockStorageReadSnapshot {
255 async fn get_inner<O>(
263 &self,
264 key: TableKey<Bytes>,
265 read_options: ReadOptions,
266 on_key_value_fn: impl KeyValueFn<O>,
267 ) -> StorageResult<Option<O>> {
268 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
269
270 let (key_range, read_version_tuple) =
271 self.build_read_version_tuple(self.epoch, key_range).await?;
272
273 if is_empty_key_range(&key_range) {
274 return Ok(None);
275 }
276
277 self.hummock_version_reader
278 .get(
279 key,
280 self.epoch.get_epoch(),
281 self.table_id,
282 read_options,
283 read_version_tuple,
284 on_key_value_fn,
285 )
286 .await
287 }
288
289 async fn iter_inner(
290 &self,
291 key_range: TableKeyRange,
292 read_options: ReadOptions,
293 ) -> StorageResult<HummockStorageIterator> {
294 let (key_range, read_version_tuple) =
295 self.build_read_version_tuple(self.epoch, key_range).await?;
296
297 self.hummock_version_reader
298 .iter(
299 key_range,
300 self.epoch.get_epoch(),
301 self.table_id,
302 read_options,
303 read_version_tuple,
304 )
305 .await
306 }
307
308 async fn rev_iter_inner(
309 &self,
310 key_range: TableKeyRange,
311 read_options: ReadOptions,
312 ) -> StorageResult<HummockStorageRevIterator> {
313 let (key_range, read_version_tuple) =
314 self.build_read_version_tuple(self.epoch, key_range).await?;
315
316 self.hummock_version_reader
317 .rev_iter(
318 key_range,
319 self.epoch.get_epoch(),
320 self.table_id,
321 read_options,
322 read_version_tuple,
323 None,
324 )
325 .await
326 }
327
328 async fn get_time_travel_version(
329 &self,
330 epoch: u64,
331 table_id: TableId,
332 ) -> StorageResult<PinnedVersion> {
333 let meta_client = self.hummock_meta_client.clone();
334 let fetch = async move {
335 let pb_version = meta_client
336 .get_version_by_epoch(epoch, table_id.table_id())
337 .await
338 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
339 .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
340 let version = HummockVersion::from_rpc_protobuf(&pb_version);
341 let (tx, _rx) = unbounded_channel();
342 Ok(PinnedVersion::new(version, tx))
343 };
344 let version = self
345 .simple_time_travel_version_cache
346 .get_or_insert(table_id.table_id, epoch, fetch)
347 .await?;
348 Ok(version)
349 }
350
351 async fn build_read_version_tuple(
352 &self,
353 epoch: HummockReadEpoch,
354 key_range: TableKeyRange,
355 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
356 match epoch {
357 HummockReadEpoch::Backup(epoch) => {
358 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
359 .await
360 }
361 HummockReadEpoch::Committed(epoch)
362 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
363 | HummockReadEpoch::TimeTravel(epoch) => {
364 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
365 .await
366 }
367 HummockReadEpoch::NoWait(epoch) => {
368 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
369 }
370 }
371 }
372
373 async fn build_read_version_tuple_from_backup(
374 &self,
375 epoch: u64,
376 table_id: TableId,
377 key_range: TableKeyRange,
378 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
379 match self
380 .backup_reader
381 .try_get_hummock_version(table_id, epoch)
382 .await
383 {
384 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
385 backup_version,
386 table_id,
387 key_range,
388 epoch,
389 )),
390 Ok(None) => Err(HummockError::read_backup_error(format!(
391 "backup include epoch {} not found",
392 epoch
393 ))
394 .into()),
395 Err(e) => Err(e),
396 }
397 }
398
399 async fn build_read_version_tuple_from_committed(
400 &self,
401 epoch: u64,
402 table_id: TableId,
403 key_range: TableKeyRange,
404 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
405 let version = match self
406 .recent_versions
407 .load()
408 .get_safe_version(table_id, epoch)
409 {
410 Some(version) => version,
411 None => self.get_time_travel_version(epoch, table_id).await?,
412 };
413 Ok(get_committed_read_version_tuple(
414 version, table_id, key_range, epoch,
415 ))
416 }
417
418 fn build_read_version_tuple_from_all(
419 &self,
420 epoch: u64,
421 table_id: TableId,
422 key_range: TableKeyRange,
423 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
424 let pinned_version = self.recent_versions.load().latest_version().clone();
425 let info = pinned_version.state_table_info.info().get(&table_id);
426
427 let ret = if let Some(info) = info
429 && epoch <= info.committed_epoch
430 {
431 if epoch < info.committed_epoch {
432 return Err(
433 HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
434 );
435 }
436 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
438 } else {
439 let vnode = vnode(&key_range);
440 let mut matched_replicated_read_version_cnt = 0;
441 let read_version_vec = {
442 let read_guard = self.read_version_mapping.read();
443 read_guard
444 .get(&table_id)
445 .map(|v| {
446 v.values()
447 .filter(|v| {
448 let read_version = v.read();
449 if read_version.contains(vnode) {
450 if read_version.is_replicated() {
451 matched_replicated_read_version_cnt += 1;
452 false
453 } else {
454 true
456 }
457 } else {
458 false
459 }
460 })
461 .cloned()
462 .collect_vec()
463 })
464 .unwrap_or_default()
465 };
466
467 if read_version_vec.is_empty() {
470 let table_committed_epoch = info.map(|info| info.committed_epoch);
471 if matched_replicated_read_version_cnt > 0 {
472 tracing::warn!(
473 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
474 table_id,
475 vnode.to_index(),
476 epoch,
477 matched_replicated_read_version_cnt,
478 table_committed_epoch,
479 );
480 } else {
481 tracing::debug!(
482 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
483 table_id,
484 vnode.to_index(),
485 epoch,
486 table_committed_epoch
487 );
488 }
489 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
490 } else {
491 if read_version_vec.len() != 1 {
492 let read_version_vnodes = read_version_vec
493 .into_iter()
494 .map(|v| {
495 let v = v.read();
496 v.vnodes().iter_ones().collect_vec()
497 })
498 .collect_vec();
499 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
500 }
501 read_filter_for_version(
502 epoch,
503 table_id,
504 key_range,
505 read_version_vec.first().unwrap(),
506 )?
507 }
508 };
509
510 Ok(ret)
511 }
512}
513
514impl HummockStorage {
515 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
516 let (tx, rx) = tokio::sync::oneshot::channel();
517 self.hummock_event_sender
518 .send(HummockEvent::RegisterReadVersion {
519 table_id: option.table_id,
520 new_read_version_sender: tx,
521 is_replicated: option.is_replicated,
522 vnodes: option.vnodes.clone(),
523 })
524 .unwrap();
525
526 let (basic_read_version, instance_guard) = rx.await.unwrap();
527 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
528 LocalHummockStorage::new(
529 instance_guard,
530 basic_read_version,
531 self.hummock_version_reader.clone(),
532 self.hummock_event_sender.clone(),
533 self.buffer_tracker.get_memory_limiter().clone(),
534 self.write_limiter.clone(),
535 option,
536 version_update_notifier_tx,
537 self.context.storage_opts.mem_table_spill_threshold,
538 )
539 }
540
541 pub async fn clear_shared_buffer(&self) {
542 let (tx, rx) = oneshot::channel();
543 self.hummock_event_sender
544 .send(HummockEvent::Clear(tx, None))
545 .expect("should send success");
546 rx.await.expect("should wait success");
547 }
548
549 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
550 if !table_ids.is_empty() {
551 let (tx, rx) = oneshot::channel();
552 self.hummock_event_sender
553 .send(HummockEvent::Clear(tx, Some(table_ids)))
554 .expect("should send success");
555 rx.await.expect("should wait success");
556 }
557 }
558
559 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
563 let _ = self
564 .hummock_event_sender
565 .send(HummockEvent::StartEpoch { epoch, table_ids });
566 }
567
568 pub fn sstable_store(&self) -> SstableStoreRef {
569 self.context.sstable_store.clone()
570 }
571
572 pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
573 &self.object_id_manager
574 }
575
576 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
577 self.compaction_catalog_manager_ref.clone()
578 }
579
580 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
581 self.buffer_tracker.get_memory_limiter().clone()
582 }
583
584 pub fn get_pinned_version(&self) -> PinnedVersion {
585 self.recent_versions.load().latest_version().clone()
586 }
587
588 pub fn backup_reader(&self) -> BackupReaderRef {
589 self.backup_reader.clone()
590 }
591
592 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
593 self.compact_await_tree_reg.as_ref()
594 }
595
596 pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
597 let (tx, rx) = oneshot::channel();
598 self.hummock_event_sender
599 .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
600 .expect("should send success");
601 rx.await.expect("should await success")
602 }
603
604 pub async fn sync(
605 &self,
606 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
607 ) -> StorageResult<SyncResult> {
608 let (tx, rx) = oneshot::channel();
609 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
610 sync_result_sender: tx,
611 sync_table_epochs,
612 });
613 let synced_data = rx
614 .await
615 .map_err(|_| HummockError::other("failed to receive sync result"))??;
616 Ok(synced_data.into_sync_result())
617 }
618}
619
620#[derive(Clone)]
621pub struct HummockStorageReadSnapshot {
622 epoch: HummockReadEpoch,
623 table_id: TableId,
624 recent_versions: Arc<ArcSwap<RecentVersions>>,
625 hummock_version_reader: HummockVersionReader,
626 read_version_mapping: ReadOnlyReadVersionMapping,
627 backup_reader: BackupReaderRef,
628 hummock_meta_client: Arc<dyn HummockMetaClient>,
629 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
630}
631
632impl StateStoreGet for HummockStorageReadSnapshot {
633 fn on_key_value<O: Send + 'static>(
634 &self,
635 key: TableKey<Bytes>,
636 read_options: ReadOptions,
637 on_key_value_fn: impl KeyValueFn<O>,
638 ) -> impl StorageFuture<'_, Option<O>> {
639 self.get_inner(key, read_options, on_key_value_fn)
640 }
641}
642
643impl StateStoreRead for HummockStorageReadSnapshot {
644 type Iter = HummockStorageIterator;
645 type RevIter = HummockStorageRevIterator;
646
647 fn iter(
648 &self,
649 key_range: TableKeyRange,
650 read_options: ReadOptions,
651 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
652 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
653 assert_eq!(
654 r_vnode_exclusive - l_vnode_inclusive,
655 1,
656 "read range {:?} for table {} iter contains more than one vnode",
657 key_range,
658 self.table_id
659 );
660 self.iter_inner(key_range, read_options)
661 }
662
663 fn rev_iter(
664 &self,
665 key_range: TableKeyRange,
666 read_options: ReadOptions,
667 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
668 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
669 assert_eq!(
670 r_vnode_exclusive - l_vnode_inclusive,
671 1,
672 "read range {:?} for table {} iter contains more than one vnode",
673 key_range,
674 self.table_id
675 );
676 self.rev_iter_inner(key_range, read_options)
677 }
678}
679
680impl StateStoreReadVector for HummockStorageReadSnapshot {
681 async fn nearest<O: Send + 'static>(
682 &self,
683 _vec: Vector,
684 _options: VectorNearestOptions,
685 _on_nearest_item_fn: impl OnNearestItemFn<O>,
686 ) -> StorageResult<Vec<O>> {
687 unimplemented!()
688 }
689}
690
691impl StateStoreReadLog for HummockStorage {
692 type ChangeLogIter = ChangeLogIterator;
693
694 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
695 fn next_epoch(
696 version: &LocalHummockVersion,
697 epoch: u64,
698 table_id: TableId,
699 ) -> HummockResult<Option<u64>> {
700 let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
701 HummockError::next_epoch(format!("table {} has been dropped", table_id))
702 })?;
703 table_change_log.next_epoch(epoch).map_err(|_| {
704 HummockError::next_epoch(format!(
705 "invalid epoch {}, change log epoch: {:?}",
706 epoch,
707 table_change_log.epochs().collect_vec()
708 ))
709 })
710 }
711 {
712 let recent_versions = self.recent_versions.load();
714 if let Some(next_epoch) =
715 next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
716 {
717 return Ok(next_epoch);
718 }
719 }
720 let mut next_epoch_ret = None;
721 wait_for_update(
722 &self.version_update_notifier_tx,
723 |version| {
724 if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
725 next_epoch_ret = Some(next_epoch);
726 Ok(true)
727 } else {
728 Ok(false)
729 }
730 },
731 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
732 )
733 .await?;
734 Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
735 }
736
737 async fn iter_log(
738 &self,
739 epoch_range: (u64, u64),
740 key_range: TableKeyRange,
741 options: ReadLogOptions,
742 ) -> StorageResult<Self::ChangeLogIter> {
743 let version = self.recent_versions.load().latest_version().clone();
744 let iter = self
745 .hummock_version_reader
746 .iter_log(version, epoch_range, key_range, options)
747 .await?;
748 Ok(iter)
749 }
750}
751
752impl HummockStorage {
753 async fn try_wait_epoch_impl(
756 &self,
757 wait_epoch: HummockReadEpoch,
758 table_id: TableId,
759 ) -> StorageResult<()> {
760 match wait_epoch {
761 HummockReadEpoch::Committed(wait_epoch) => {
762 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
763 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
764 }
765 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
766 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
767 {
769 let recent_versions = self.recent_versions.load();
770 let latest_version = recent_versions.latest_version();
771 if latest_version.id >= wait_version_id
772 && let Some(committed_epoch) =
773 latest_version.table_committed_epoch(table_id)
774 && committed_epoch >= wait_epoch
775 {
776 return Ok(());
777 }
778 }
779 wait_for_update(
780 &self.version_update_notifier_tx,
781 |version| {
782 if wait_version_id > version.id() {
783 return Ok(false);
784 }
785 let committed_epoch =
786 version.table_committed_epoch(table_id).ok_or_else(|| {
787 HummockError::wait_epoch(format!(
791 "table id {} has been dropped",
792 table_id
793 ))
794 })?;
795 Ok(committed_epoch >= wait_epoch)
796 },
797 || {
798 format!(
799 "try_wait_epoch: epoch: {}, version_id: {:?}",
800 wait_epoch, wait_version_id
801 )
802 },
803 )
804 .await?;
805 }
806 _ => {}
807 };
808 Ok(())
809 }
810}
811
812impl StateStore for HummockStorage {
813 type Local = LocalHummockStorage;
814 type ReadSnapshot = HummockStorageReadSnapshot;
815 type VectorWriter = PanicStateStore;
816
817 async fn try_wait_epoch(
820 &self,
821 wait_epoch: HummockReadEpoch,
822 options: TryWaitEpochOptions,
823 ) -> StorageResult<()> {
824 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
825 }
826
827 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
828 self.new_local_inner(option)
829 }
830
831 async fn new_read_snapshot(
832 &self,
833 epoch: HummockReadEpoch,
834 options: NewReadSnapshotOptions,
835 ) -> StorageResult<Self::ReadSnapshot> {
836 self.try_wait_epoch_impl(epoch, options.table_id).await?;
837 Ok(HummockStorageReadSnapshot {
838 epoch,
839 table_id: options.table_id,
840 recent_versions: self.recent_versions.clone(),
841 hummock_version_reader: self.hummock_version_reader.clone(),
842 read_version_mapping: self.read_version_mapping.clone(),
843 backup_reader: self.backup_reader.clone(),
844 hummock_meta_client: self.hummock_meta_client.clone(),
845 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
846 })
847 }
848
849 async fn new_vector_writer(&self, _options: NewVectorWriterOptions) -> Self::VectorWriter {
850 unimplemented!()
851 }
852}
853
854#[cfg(any(test, feature = "test"))]
855impl HummockStorage {
856 pub async fn seal_and_sync_epoch(
857 &self,
858 epoch: u64,
859 table_ids: HashSet<TableId>,
860 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
861 self.sync(vec![(epoch, table_ids)]).await
862 }
863
864 pub async fn update_version_and_wait(&self, version: HummockVersion) {
866 use tokio::task::yield_now;
867 let version_id = version.id;
868 self._version_update_sender
869 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
870 .unwrap();
871 loop {
872 if self.recent_versions.load().latest_version().id() >= version_id {
873 break;
874 }
875
876 yield_now().await
877 }
878 }
879
880 pub async fn wait_version(&self, version: HummockVersion) {
881 use tokio::task::yield_now;
882 loop {
883 if self.recent_versions.load().latest_version().id() >= version.id {
884 break;
885 }
886
887 yield_now().await
888 }
889 }
890
891 pub fn get_shared_buffer_size(&self) -> usize {
892 self.buffer_tracker.get_buffer_size()
893 }
894
895 pub async fn for_test(
897 options: Arc<StorageOpts>,
898 sstable_store: SstableStoreRef,
899 hummock_meta_client: Arc<dyn HummockMetaClient>,
900 notification_client: impl NotificationClient,
901 ) -> HummockResult<Self> {
902 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
903 FakeRemoteTableAccessor {},
904 )));
905
906 Self::new(
907 options,
908 sstable_store,
909 hummock_meta_client,
910 notification_client,
911 compaction_catalog_manager,
912 Arc::new(HummockStateStoreMetrics::unused()),
913 Arc::new(CompactorMetrics::unused()),
914 None,
915 )
916 .await
917 }
918
919 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
920 &self.context.storage_opts
921 }
922
923 pub fn version_reader(&self) -> &HummockVersionReader {
924 &self.hummock_version_reader
925 }
926
927 pub async fn wait_version_update(
928 &self,
929 old_id: risingwave_hummock_sdk::HummockVersionId,
930 ) -> risingwave_hummock_sdk::HummockVersionId {
931 use tokio::task::yield_now;
932 loop {
933 let cur_id = self.recent_versions.load().latest_version().id();
934 if cur_id > old_id {
935 return cur_id;
936 }
937 yield_now().await;
938 }
939 }
940}