1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::key::{
29 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
30};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
33use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
34use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
35use risingwave_rpc_client::HummockMetaClient;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
38use tokio::sync::oneshot;
39
40use super::local_hummock_storage::LocalHummockStorage;
41use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43#[cfg(any(test, feature = "test"))]
44use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
45use crate::error::StorageResult;
46use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
47use crate::hummock::compactor::{
48 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
49};
50use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
51use crate::hummock::event_handler::{
52 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
53};
54use crate::hummock::iterator::change_log::ChangeLogIterator;
55use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
56use crate::hummock::local_version::recent_versions::RecentVersions;
57use crate::hummock::observer_manager::HummockObserverNode;
58use crate::hummock::store::vector_writer::HummockVectorWriter;
59use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
60use crate::hummock::utils::{wait_for_epoch, wait_for_update};
61use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
62use crate::hummock::{
63 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
64 MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
65};
66use crate::mem_table::ImmutableMemtable;
67use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
68use crate::opts::StorageOpts;
69use crate::store::*;
70
71struct HummockStorageShutdownGuard {
72 shutdown_sender: HummockEventSender,
73}
74
75impl Drop for HummockStorageShutdownGuard {
76 fn drop(&mut self) {
77 let _ = self
78 .shutdown_sender
79 .send(HummockEvent::Shutdown)
80 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
81 }
82}
83
84#[derive(Clone)]
90pub struct HummockStorage {
91 hummock_event_sender: HummockEventSender,
92 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
94
95 context: CompactorContext,
96
97 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
98
99 object_id_manager: ObjectIdManagerRef,
100
101 buffer_tracker: BufferTracker,
102
103 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
104
105 recent_versions: Arc<ArcSwap<RecentVersions>>,
106
107 hummock_version_reader: HummockVersionReader,
108
109 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
110
111 read_version_mapping: ReadOnlyReadVersionMapping,
112
113 backup_reader: BackupReaderRef,
114
115 write_limiter: WriteLimiterRef,
116
117 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
118
119 hummock_meta_client: Arc<dyn HummockMetaClient>,
120
121 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
122}
123
124pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
125
126pub fn get_committed_read_version_tuple(
127 version: PinnedVersion,
128 table_id: TableId,
129 mut key_range: TableKeyRange,
130 epoch: HummockEpoch,
131) -> (TableKeyRange, ReadVersionTuple) {
132 if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
133 TableWatermarksIndex::new_committed(
134 table_watermarks.clone(),
135 version
136 .state_table_info
137 .info()
138 .get(&table_id)
139 .expect("should exist when having table watermark")
140 .committed_epoch,
141 table_watermarks.watermark_type,
142 )
143 .rewrite_range_with_table_watermark(epoch, &mut key_range)
144 }
145 (key_range, (vec![], vec![], version))
146}
147
148impl HummockStorage {
149 #[allow(clippy::too_many_arguments)]
151 pub async fn new(
152 options: Arc<StorageOpts>,
153 sstable_store: SstableStoreRef,
154 hummock_meta_client: Arc<dyn HummockMetaClient>,
155 notification_client: impl NotificationClient,
156 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
157 state_store_metrics: Arc<HummockStateStoreMetrics>,
158 compactor_metrics: Arc<CompactorMetrics>,
159 await_tree_config: Option<await_tree::Config>,
160 ) -> HummockResult<Self> {
161 let object_id_manager = Arc::new(ObjectIdManager::new(
162 hummock_meta_client.clone(),
163 options.sstable_id_remote_fetch_number,
164 ));
165 let backup_reader = BackupReader::new(
166 &options.backup_storage_url,
167 &options.backup_storage_directory,
168 &options.object_store_config,
169 )
170 .await
171 .map_err(HummockError::read_backup_error)?;
172 let write_limiter = Arc::new(WriteLimiter::default());
173 let (version_update_tx, mut version_update_rx) = unbounded_channel();
174
175 let observer_manager = ObserverManager::new(
176 notification_client,
177 HummockObserverNode::new(
178 compaction_catalog_manager_ref.clone(),
179 backup_reader.clone(),
180 version_update_tx.clone(),
181 write_limiter.clone(),
182 ),
183 )
184 .await;
185 observer_manager.start().await;
186
187 let hummock_version = match version_update_rx.recv().await {
188 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
189 _ => unreachable!(
190 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
191 ),
192 };
193
194 let (pin_version_tx, pin_version_rx) = unbounded_channel();
195 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
196 tokio::spawn(start_pinned_version_worker(
197 pin_version_rx,
198 hummock_meta_client.clone(),
199 options.max_version_pinning_duration_sec,
200 ));
201
202 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
203
204 let compactor_context = CompactorContext::new_local_compact_context(
205 options.clone(),
206 sstable_store.clone(),
207 compactor_metrics.clone(),
208 await_tree_reg.clone(),
209 );
210
211 let hummock_event_handler = HummockEventHandler::new(
212 version_update_rx,
213 pinned_version,
214 compactor_context.clone(),
215 compaction_catalog_manager_ref.clone(),
216 object_id_manager.clone(),
217 state_store_metrics.clone(),
218 );
219
220 let event_tx = hummock_event_handler.event_sender();
221
222 let instance = Self {
223 context: compactor_context,
224 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
225 object_id_manager,
226 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
227 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
228 hummock_event_sender: event_tx.clone(),
229 _version_update_sender: version_update_tx,
230 recent_versions: hummock_event_handler.recent_versions(),
231 hummock_version_reader: HummockVersionReader::new(
232 sstable_store,
233 state_store_metrics.clone(),
234 options.max_preload_io_retry_times,
235 ),
236 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
237 shutdown_sender: event_tx,
238 }),
239 read_version_mapping: hummock_event_handler.read_version_mapping(),
240 backup_reader,
241 write_limiter,
242 compact_await_tree_reg: await_tree_reg,
243 hummock_meta_client,
244 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
245 options.time_travel_version_cache_capacity,
246 )),
247 };
248
249 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
250
251 Ok(instance)
252 }
253}
254
255impl HummockStorageReadSnapshot {
256 async fn get_inner<'a, O>(
264 &'a self,
265 key: TableKey<Bytes>,
266 read_options: ReadOptions,
267 on_key_value_fn: impl KeyValueFn<'a, O>,
268 ) -> StorageResult<Option<O>> {
269 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
270
271 let (key_range, read_version_tuple) =
272 self.build_read_version_tuple(self.epoch, key_range).await?;
273
274 if is_empty_key_range(&key_range) {
275 return Ok(None);
276 }
277
278 self.hummock_version_reader
279 .get(
280 key,
281 self.epoch.get_epoch(),
282 self.table_id,
283 self.table_option,
284 read_options,
285 read_version_tuple,
286 on_key_value_fn,
287 )
288 .await
289 }
290
291 async fn iter_inner(
292 &self,
293 key_range: TableKeyRange,
294 read_options: ReadOptions,
295 ) -> StorageResult<HummockStorageIterator> {
296 let (key_range, read_version_tuple) =
297 self.build_read_version_tuple(self.epoch, key_range).await?;
298
299 self.hummock_version_reader
300 .iter(
301 key_range,
302 self.epoch.get_epoch(),
303 self.table_id,
304 self.table_option,
305 read_options,
306 read_version_tuple,
307 )
308 .await
309 }
310
311 async fn rev_iter_inner(
312 &self,
313 key_range: TableKeyRange,
314 read_options: ReadOptions,
315 ) -> StorageResult<HummockStorageRevIterator> {
316 let (key_range, read_version_tuple) =
317 self.build_read_version_tuple(self.epoch, key_range).await?;
318
319 self.hummock_version_reader
320 .rev_iter(
321 key_range,
322 self.epoch.get_epoch(),
323 self.table_id,
324 self.table_option,
325 read_options,
326 read_version_tuple,
327 None,
328 )
329 .await
330 }
331
332 async fn get_time_travel_version(
333 &self,
334 epoch: u64,
335 table_id: TableId,
336 ) -> StorageResult<PinnedVersion> {
337 let meta_client = self.hummock_meta_client.clone();
338 let fetch = async move {
339 let pb_version = meta_client
340 .get_version_by_epoch(epoch, table_id)
341 .await
342 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
343 .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
344 let version = HummockVersion::from_rpc_protobuf(&pb_version);
345 let (tx, _rx) = unbounded_channel();
346 Ok(PinnedVersion::new(version, tx))
347 };
348 let version = self
349 .simple_time_travel_version_cache
350 .get_or_insert(table_id, epoch, fetch)
351 .await?;
352 Ok(version)
353 }
354
355 async fn build_read_version_tuple(
356 &self,
357 epoch: HummockReadEpoch,
358 key_range: TableKeyRange,
359 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
360 match epoch {
361 HummockReadEpoch::Backup(epoch) => {
362 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
363 .await
364 }
365 HummockReadEpoch::Committed(epoch) => {
366 let tuple = self
367 .build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
368 .await?;
369 let (_, (_, _, version)) = &tuple;
370 let Some(committed_epoch) = version.table_committed_epoch(self.table_id) else {
371 return Err(HummockError::other(format!(
372 "table {} not found in version",
373 self.table_id
374 ))
375 .into());
376 };
377 if committed_epoch != epoch {
378 return Err(HummockError::wait_epoch(format!(
379 "mismatch table {} committed_epoch {} for read committed_epoch {}",
380 self.table_id, committed_epoch, epoch
381 ))
382 .into());
383 }
384 Ok(tuple)
385 }
386 HummockReadEpoch::BatchQueryCommitted(epoch, _)
387 | HummockReadEpoch::TimeTravel(epoch) => {
388 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
389 .await
390 }
391 HummockReadEpoch::NoWait(epoch) => {
392 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
393 .await
394 }
395 }
396 }
397
398 async fn build_read_version_tuple_from_backup(
399 &self,
400 epoch: u64,
401 table_id: TableId,
402 key_range: TableKeyRange,
403 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
404 match self
405 .backup_reader
406 .try_get_hummock_version(table_id, epoch)
407 .await
408 {
409 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
410 backup_version,
411 table_id,
412 key_range,
413 epoch,
414 )),
415 Ok(None) => Err(HummockError::read_backup_error(format!(
416 "backup include epoch {} not found",
417 epoch
418 ))
419 .into()),
420 Err(e) => Err(e),
421 }
422 }
423
424 async fn get_epoch_hummock_version(
425 &self,
426 epoch: u64,
427 table_id: TableId,
428 ) -> StorageResult<PinnedVersion> {
429 match self
430 .recent_versions
431 .load()
432 .get_safe_version(table_id, epoch)
433 {
434 Some(version) => Ok(version),
435 None => self.get_time_travel_version(epoch, table_id).await,
436 }
437 }
438
439 async fn build_read_version_tuple_from_committed(
440 &self,
441 epoch: u64,
442 table_id: TableId,
443 key_range: TableKeyRange,
444 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
445 let version = self.get_epoch_hummock_version(epoch, table_id).await?;
446 Ok(get_committed_read_version_tuple(
447 version, table_id, key_range, epoch,
448 ))
449 }
450
451 async fn build_read_version_tuple_from_all(
452 &self,
453 epoch: u64,
454 table_id: TableId,
455 key_range: TableKeyRange,
456 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
457 let pinned_version = self.recent_versions.load().latest_version().clone();
458 let info = pinned_version.state_table_info.info().get(&table_id);
459
460 let ret = if let Some(info) = info
462 && epoch <= info.committed_epoch
463 {
464 let pinned_version = if epoch < info.committed_epoch {
465 pinned_version
466 } else {
467 self.get_epoch_hummock_version(epoch, table_id).await?
468 };
469 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
471 } else {
472 let vnode = vnode(&key_range);
473 let mut matched_replicated_read_version_cnt = 0;
474 let read_version_vec = {
475 let read_guard = self.read_version_mapping.read();
476 read_guard
477 .get(&table_id)
478 .map(|v| {
479 v.values()
480 .filter(|v| {
481 let read_version = v.read();
482 if read_version.is_initialized() && read_version.contains(vnode) {
483 if read_version.is_replicated() {
484 matched_replicated_read_version_cnt += 1;
485 false
486 } else {
487 true
489 }
490 } else {
491 false
492 }
493 })
494 .cloned()
495 .collect_vec()
496 })
497 .unwrap_or_default()
498 };
499
500 if read_version_vec.is_empty() {
503 let table_committed_epoch = info.map(|info| info.committed_epoch);
504 if matched_replicated_read_version_cnt > 0 {
505 tracing::warn!(
506 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
507 table_id,
508 vnode.to_index(),
509 epoch,
510 matched_replicated_read_version_cnt,
511 table_committed_epoch,
512 );
513 } else {
514 tracing::debug!(
515 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
516 table_id,
517 vnode.to_index(),
518 epoch,
519 table_committed_epoch
520 );
521 }
522 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
523 } else {
524 if read_version_vec.len() != 1 {
525 let read_version_vnodes = read_version_vec
526 .into_iter()
527 .map(|v| {
528 let v = v.read();
529 v.vnodes().iter_ones().collect_vec()
530 })
531 .collect_vec();
532 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
533 }
534 read_filter_for_version(
535 epoch,
536 table_id,
537 key_range,
538 read_version_vec.first().unwrap(),
539 )?
540 }
541 };
542
543 Ok(ret)
544 }
545}
546
547impl HummockStorage {
548 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
549 let (tx, rx) = tokio::sync::oneshot::channel();
550 self.hummock_event_sender
551 .send(HummockEvent::RegisterReadVersion {
552 table_id: option.table_id,
553 new_read_version_sender: tx,
554 is_replicated: option.is_replicated,
555 vnodes: option.vnodes.clone(),
556 })
557 .unwrap();
558
559 let (basic_read_version, instance_guard) = rx.await.unwrap();
560 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
561 LocalHummockStorage::new(
562 instance_guard,
563 basic_read_version,
564 self.hummock_version_reader.clone(),
565 self.hummock_event_sender.clone(),
566 self.buffer_tracker.get_memory_limiter().clone(),
567 self.write_limiter.clone(),
568 option,
569 version_update_notifier_tx,
570 self.context.storage_opts.mem_table_spill_threshold,
571 )
572 }
573
574 pub async fn clear_shared_buffer(&self) {
575 let (tx, rx) = oneshot::channel();
576 self.hummock_event_sender
577 .send(HummockEvent::Clear(tx, None))
578 .expect("should send success");
579 rx.await.expect("should wait success");
580 }
581
582 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
583 if !table_ids.is_empty() {
584 let (tx, rx) = oneshot::channel();
585 self.hummock_event_sender
586 .send(HummockEvent::Clear(tx, Some(table_ids)))
587 .expect("should send success");
588 rx.await.expect("should wait success");
589 }
590 }
591
592 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
596 let _ = self
597 .hummock_event_sender
598 .send(HummockEvent::StartEpoch { epoch, table_ids });
599 }
600
601 pub fn sstable_store(&self) -> SstableStoreRef {
602 self.context.sstable_store.clone()
603 }
604
605 pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
606 &self.object_id_manager
607 }
608
609 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
610 self.compaction_catalog_manager_ref.clone()
611 }
612
613 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
614 self.buffer_tracker.get_memory_limiter().clone()
615 }
616
617 pub fn get_pinned_version(&self) -> PinnedVersion {
618 self.recent_versions.load().latest_version().clone()
619 }
620
621 pub fn backup_reader(&self) -> BackupReaderRef {
622 self.backup_reader.clone()
623 }
624
625 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
626 self.compact_await_tree_reg.as_ref()
627 }
628
629 pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
630 let (tx, rx) = oneshot::channel();
631 self.hummock_event_sender
632 .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
633 .expect("should send success");
634 rx.await.expect("should await success")
635 }
636
637 pub async fn sync(
638 &self,
639 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
640 ) -> StorageResult<SyncResult> {
641 let (tx, rx) = oneshot::channel();
642 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
643 sync_result_sender: tx,
644 sync_table_epochs,
645 });
646 let synced_data = rx
647 .await
648 .map_err(|_| HummockError::other("failed to receive sync result"))??;
649 Ok(synced_data.into_sync_result())
650 }
651}
652
653#[derive(Clone)]
654pub struct HummockStorageReadSnapshot {
655 epoch: HummockReadEpoch,
656 table_id: TableId,
657 table_option: TableOption,
658 recent_versions: Arc<ArcSwap<RecentVersions>>,
659 hummock_version_reader: HummockVersionReader,
660 read_version_mapping: ReadOnlyReadVersionMapping,
661 backup_reader: BackupReaderRef,
662 hummock_meta_client: Arc<dyn HummockMetaClient>,
663 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
664}
665
666impl StateStoreGet for HummockStorageReadSnapshot {
667 fn on_key_value<'a, O: Send + 'a>(
668 &'a self,
669 key: TableKey<Bytes>,
670 read_options: ReadOptions,
671 on_key_value_fn: impl KeyValueFn<'a, O>,
672 ) -> impl StorageFuture<'a, Option<O>> {
673 self.get_inner(key, read_options, on_key_value_fn)
674 }
675}
676
677impl StateStoreRead for HummockStorageReadSnapshot {
678 type Iter = HummockStorageIterator;
679 type RevIter = HummockStorageRevIterator;
680
681 fn iter(
682 &self,
683 key_range: TableKeyRange,
684 read_options: ReadOptions,
685 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
686 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
687 assert_eq!(
688 r_vnode_exclusive - l_vnode_inclusive,
689 1,
690 "read range {:?} for table {} iter contains more than one vnode",
691 key_range,
692 self.table_id
693 );
694 self.iter_inner(key_range, read_options)
695 }
696
697 fn rev_iter(
698 &self,
699 key_range: TableKeyRange,
700 read_options: ReadOptions,
701 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
702 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
703 assert_eq!(
704 r_vnode_exclusive - l_vnode_inclusive,
705 1,
706 "read range {:?} for table {} iter contains more than one vnode",
707 key_range,
708 self.table_id
709 );
710 self.rev_iter_inner(key_range, read_options)
711 }
712}
713
714impl StateStoreReadVector for HummockStorageReadSnapshot {
715 async fn nearest<'a, O: Send + 'a>(
716 &'a self,
717 vec: VectorRef<'a>,
718 options: VectorNearestOptions,
719 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
720 ) -> StorageResult<Vec<O>> {
721 let version = match self.epoch {
722 HummockReadEpoch::Committed(epoch)
723 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
724 | HummockReadEpoch::TimeTravel(epoch) => {
725 self.get_epoch_hummock_version(epoch, self.table_id).await?
726 }
727 HummockReadEpoch::Backup(epoch) => self
728 .backup_reader
729 .try_get_hummock_version(self.table_id, epoch)
730 .await?
731 .ok_or_else(|| {
732 HummockError::read_backup_error(format!(
733 "backup include epoch {} not found",
734 epoch
735 ))
736 })?,
737 HummockReadEpoch::NoWait(_) => {
738 return Err(
739 HummockError::other("nearest query does not support NoWait epoch").into(),
740 );
741 }
742 };
743 dispatch_distance_measurement!(options.measure, MeasurementType, {
744 Ok(self
745 .hummock_version_reader
746 .nearest::<MeasurementType, O>(
747 version,
748 self.table_id,
749 vec,
750 options,
751 on_nearest_item_fn,
752 )
753 .await?)
754 })
755 }
756}
757
758impl StateStoreReadLog for HummockStorage {
759 type ChangeLogIter = ChangeLogIterator;
760
761 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
762 fn next_epoch(
763 version: &LocalHummockVersion,
764 epoch: u64,
765 table_id: TableId,
766 ) -> HummockResult<Option<u64>> {
767 let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
768 HummockError::next_epoch(format!("table {} has been dropped", table_id))
769 })?;
770 table_change_log.next_epoch(epoch).map_err(|_| {
771 HummockError::next_epoch(format!(
772 "invalid epoch {}, change log epoch: {:?}",
773 epoch,
774 table_change_log.epochs().collect_vec()
775 ))
776 })
777 }
778 {
779 let recent_versions = self.recent_versions.load();
781 if let Some(next_epoch) =
782 next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
783 {
784 return Ok(next_epoch);
785 }
786 }
787 let mut next_epoch_ret = None;
788 wait_for_update(
789 &self.version_update_notifier_tx,
790 |version| {
791 if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
792 next_epoch_ret = Some(next_epoch);
793 Ok(true)
794 } else {
795 Ok(false)
796 }
797 },
798 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
799 )
800 .await?;
801 Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
802 }
803
804 async fn iter_log(
805 &self,
806 epoch_range: (u64, u64),
807 key_range: TableKeyRange,
808 options: ReadLogOptions,
809 ) -> StorageResult<Self::ChangeLogIter> {
810 let version = self.recent_versions.load().latest_version().clone();
811 let iter = self
812 .hummock_version_reader
813 .iter_log(version, epoch_range, key_range, options)
814 .await?;
815 Ok(iter)
816 }
817}
818
819impl HummockStorage {
820 async fn try_wait_epoch_impl(
823 &self,
824 wait_epoch: HummockReadEpoch,
825 table_id: TableId,
826 ) -> StorageResult<()> {
827 tracing::debug!(
828 "try_wait_epoch: epoch: {:?}, table_id: {}",
829 wait_epoch,
830 table_id
831 );
832 match wait_epoch {
833 HummockReadEpoch::Committed(wait_epoch) => {
834 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
835 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
836 }
837 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
838 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
839 {
841 let recent_versions = self.recent_versions.load();
842 let latest_version = recent_versions.latest_version();
843 if latest_version.id >= wait_version_id
844 && let Some(committed_epoch) =
845 latest_version.table_committed_epoch(table_id)
846 && committed_epoch >= wait_epoch
847 {
848 return Ok(());
849 }
850 }
851 wait_for_update(
852 &self.version_update_notifier_tx,
853 |version| {
854 if wait_version_id > version.id() {
855 return Ok(false);
856 }
857 let committed_epoch =
858 version.table_committed_epoch(table_id).ok_or_else(|| {
859 HummockError::wait_epoch(format!(
863 "table id {} has been dropped",
864 table_id
865 ))
866 })?;
867 Ok(committed_epoch >= wait_epoch)
868 },
869 || {
870 format!(
871 "try_wait_epoch: epoch: {}, version_id: {:?}",
872 wait_epoch, wait_version_id
873 )
874 },
875 )
876 .await?;
877 }
878 _ => {}
879 };
880 Ok(())
881 }
882}
883
884impl StateStore for HummockStorage {
885 type Local = LocalHummockStorage;
886 type ReadSnapshot = HummockStorageReadSnapshot;
887 type VectorWriter = HummockVectorWriter;
888
889 async fn try_wait_epoch(
892 &self,
893 wait_epoch: HummockReadEpoch,
894 options: TryWaitEpochOptions,
895 ) -> StorageResult<()> {
896 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
897 }
898
899 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
900 self.new_local_inner(option)
901 }
902
903 async fn new_read_snapshot(
904 &self,
905 epoch: HummockReadEpoch,
906 options: NewReadSnapshotOptions,
907 ) -> StorageResult<Self::ReadSnapshot> {
908 self.try_wait_epoch_impl(epoch, options.table_id).await?;
909 Ok(HummockStorageReadSnapshot {
910 epoch,
911 table_id: options.table_id,
912 table_option: options.table_option,
913 recent_versions: self.recent_versions.clone(),
914 hummock_version_reader: self.hummock_version_reader.clone(),
915 read_version_mapping: self.read_version_mapping.clone(),
916 backup_reader: self.backup_reader.clone(),
917 hummock_meta_client: self.hummock_meta_client.clone(),
918 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
919 })
920 }
921
922 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
923 HummockVectorWriter::new(
924 options.table_id,
925 self.version_update_notifier_tx.clone(),
926 self.context.sstable_store.clone(),
927 self.object_id_manager.clone(),
928 self.hummock_event_sender.clone(),
929 self.hummock_version_reader.stats().clone(),
930 self.context.storage_opts.clone(),
931 )
932 }
933}
934
935#[cfg(any(test, feature = "test"))]
936impl HummockStorage {
937 pub async fn seal_and_sync_epoch(
938 &self,
939 epoch: u64,
940 table_ids: HashSet<TableId>,
941 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
942 self.sync(vec![(epoch, table_ids)]).await
943 }
944
945 pub async fn update_version_and_wait(&self, version: HummockVersion) {
947 use tokio::task::yield_now;
948 let version_id = version.id;
949 self._version_update_sender
950 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
951 .unwrap();
952 loop {
953 if self.recent_versions.load().latest_version().id() >= version_id {
954 break;
955 }
956
957 yield_now().await
958 }
959 }
960
961 pub async fn wait_version(&self, version: HummockVersion) {
962 use tokio::task::yield_now;
963 loop {
964 if self.recent_versions.load().latest_version().id() >= version.id {
965 break;
966 }
967
968 yield_now().await
969 }
970 }
971
972 pub async fn for_test(
974 options: Arc<StorageOpts>,
975 sstable_store: SstableStoreRef,
976 hummock_meta_client: Arc<dyn HummockMetaClient>,
977 notification_client: impl NotificationClient,
978 ) -> HummockResult<Self> {
979 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
980 FakeRemoteTableAccessor {},
981 )));
982
983 Self::new(
984 options,
985 sstable_store,
986 hummock_meta_client,
987 notification_client,
988 compaction_catalog_manager,
989 Arc::new(HummockStateStoreMetrics::unused()),
990 Arc::new(CompactorMetrics::unused()),
991 None,
992 )
993 .await
994 }
995
996 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
997 &self.context.storage_opts
998 }
999
1000 pub fn version_reader(&self) -> &HummockVersionReader {
1001 &self.hummock_version_reader
1002 }
1003
1004 pub async fn wait_version_update(
1005 &self,
1006 old_id: risingwave_hummock_sdk::HummockVersionId,
1007 ) -> risingwave_hummock_sdk::HummockVersionId {
1008 use tokio::task::yield_now;
1009 loop {
1010 let cur_id = self.recent_versions.load().latest_version().id();
1011 if cur_id > old_id {
1012 return cur_id;
1013 }
1014 yield_now().await;
1015 }
1016 }
1017
1018 #[cfg(any(test, feature = "test"))]
1019 pub async fn flush_events_for_test(&self) {
1020 let (tx, rx) = oneshot::channel();
1021 self.hummock_event_sender
1022 .send(HummockEvent::FlushEvent(tx))
1023 .expect("flush event should succeed");
1024 rx.await.expect("flush event receiver dropped");
1025 }
1026}