1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::{Bound, Deref};
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::change_log::TableChangeLogs;
29use risingwave_hummock_sdk::key::{
30 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
31};
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
34use risingwave_hummock_sdk::version::HummockVersion;
35use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
36use risingwave_rpc_client::HummockMetaClient;
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
39use tokio::sync::oneshot;
40
41use super::local_hummock_storage::LocalHummockStorage;
42use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
43use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
44#[cfg(any(test, feature = "test"))]
45use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
46use crate::error::StorageResult;
47use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
48use crate::hummock::compactor::{
49 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
50};
51use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
52use crate::hummock::event_handler::{
53 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
54};
55use crate::hummock::iterator::change_log::ChangeLogIterator;
56use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
57use crate::hummock::local_version::recent_versions::RecentVersions;
58use crate::hummock::observer_manager::HummockObserverNode;
59use crate::hummock::store::vector_writer::HummockVectorWriter;
60use crate::hummock::table_change_log_manager::TableChangeLogManager;
61use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
62use crate::hummock::utils::{wait_for_epoch, wait_for_update};
63use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
64use crate::hummock::{
65 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
66 MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
67};
68use crate::mem_table::ImmutableMemtable;
69use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
70use crate::opts::StorageOpts;
71use crate::store::*;
72
73struct HummockStorageShutdownGuard {
74 shutdown_sender: HummockEventSender,
75}
76
77impl Drop for HummockStorageShutdownGuard {
78 fn drop(&mut self) {
79 let _ = self
80 .shutdown_sender
81 .send(HummockEvent::Shutdown)
82 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
83 }
84}
85
86#[derive(Clone)]
92pub struct HummockStorage {
93 hummock_event_sender: HummockEventSender,
94 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
96
97 context: CompactorContext,
98
99 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
100
101 object_id_manager: ObjectIdManagerRef,
102
103 buffer_tracker: BufferTracker,
104
105 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
106
107 recent_versions: Arc<ArcSwap<RecentVersions>>,
108
109 hummock_version_reader: HummockVersionReader,
110
111 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
112
113 read_version_mapping: ReadOnlyReadVersionMapping,
114
115 backup_reader: BackupReaderRef,
116
117 write_limiter: WriteLimiterRef,
118
119 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
120
121 hummock_meta_client: Arc<dyn HummockMetaClient>,
122
123 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
124
125 table_change_log_manager: Arc<TableChangeLogManager>,
126}
127
128pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
129
130pub fn get_committed_read_version_tuple(
131 version: PinnedVersion,
132 table_id: TableId,
133 mut key_range: TableKeyRange,
134 epoch: HummockEpoch,
135) -> (TableKeyRange, ReadVersionTuple) {
136 if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
137 TableWatermarksIndex::new_committed(
138 table_watermarks.clone(),
139 version
140 .state_table_info
141 .info()
142 .get(&table_id)
143 .expect("should exist when having table watermark")
144 .committed_epoch,
145 table_watermarks.watermark_type,
146 )
147 .rewrite_range_with_table_watermark(epoch, &mut key_range)
148 }
149 (key_range, (vec![], vec![], version))
150}
151
152impl HummockStorage {
153 #[allow(clippy::too_many_arguments)]
155 pub async fn new(
156 options: Arc<StorageOpts>,
157 sstable_store: SstableStoreRef,
158 hummock_meta_client: Arc<dyn HummockMetaClient>,
159 notification_client: impl NotificationClient,
160 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
161 state_store_metrics: Arc<HummockStateStoreMetrics>,
162 compactor_metrics: Arc<CompactorMetrics>,
163 await_tree_config: Option<await_tree::Config>,
164 ) -> HummockResult<Self> {
165 let object_id_manager = Arc::new(ObjectIdManager::new(
166 hummock_meta_client.clone(),
167 options.sstable_id_remote_fetch_number,
168 ));
169 let backup_reader = BackupReader::new(
170 &options.backup_storage_url,
171 &options.backup_storage_directory,
172 &options.object_store_config,
173 )
174 .await
175 .map_err(HummockError::read_backup_error)?;
176 let write_limiter = Arc::new(WriteLimiter::default());
177 let (version_update_tx, mut version_update_rx) = unbounded_channel();
178
179 let observer_manager = ObserverManager::new(
180 notification_client,
181 HummockObserverNode::new(
182 compaction_catalog_manager_ref.clone(),
183 backup_reader.clone(),
184 version_update_tx.clone(),
185 write_limiter.clone(),
186 ),
187 )
188 .await;
189 observer_manager.start().await;
190
191 let hummock_version = match version_update_rx.recv().await {
192 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
193 _ => unreachable!(
194 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
195 ),
196 };
197
198 let (pin_version_tx, pin_version_rx) = unbounded_channel();
199 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
200 tokio::spawn(start_pinned_version_worker(
201 pin_version_rx,
202 hummock_meta_client.clone(),
203 options.max_version_pinning_duration_sec,
204 ));
205
206 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
207
208 let compactor_context = CompactorContext::new_local_compact_context(
209 options.clone(),
210 sstable_store.clone(),
211 compactor_metrics.clone(),
212 await_tree_reg.clone(),
213 );
214
215 let hummock_event_handler = HummockEventHandler::new(
216 version_update_rx,
217 pinned_version,
218 compactor_context.clone(),
219 compaction_catalog_manager_ref.clone(),
220 object_id_manager.clone(),
221 state_store_metrics.clone(),
222 );
223
224 let event_tx = hummock_event_handler.event_sender();
225 let table_change_log_manager = Arc::new(TableChangeLogManager::new(
226 options.table_change_log_cache_capacity,
227 hummock_meta_client.clone(),
228 ));
229 let instance = Self {
230 context: compactor_context,
231 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
232 object_id_manager,
233 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
234 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
235 hummock_event_sender: event_tx.clone(),
236 _version_update_sender: version_update_tx,
237 recent_versions: hummock_event_handler.recent_versions(),
238 hummock_version_reader: HummockVersionReader::new(
239 sstable_store,
240 state_store_metrics.clone(),
241 options.max_preload_io_retry_times,
242 ),
243 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
244 shutdown_sender: event_tx,
245 }),
246 read_version_mapping: hummock_event_handler.read_version_mapping(),
247 backup_reader,
248 write_limiter,
249 compact_await_tree_reg: await_tree_reg,
250 hummock_meta_client,
251 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
252 options.time_travel_version_cache_capacity,
253 )),
254 table_change_log_manager,
255 };
256
257 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
258
259 Ok(instance)
260 }
261}
262
263impl HummockStorageReadSnapshot {
264 async fn get_inner<'a, O>(
272 &'a self,
273 key: TableKey<Bytes>,
274 read_options: ReadOptions,
275 on_key_value_fn: impl KeyValueFn<'a, O>,
276 ) -> StorageResult<Option<O>> {
277 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
278
279 let (key_range, read_version_tuple) =
280 self.build_read_version_tuple(self.epoch, key_range).await?;
281
282 if is_empty_key_range(&key_range) {
283 return Ok(None);
284 }
285
286 self.hummock_version_reader
287 .get(
288 key,
289 self.epoch.get_epoch(),
290 self.table_id,
291 self.table_option,
292 read_options,
293 read_version_tuple,
294 on_key_value_fn,
295 )
296 .await
297 }
298
299 async fn iter_inner(
300 &self,
301 key_range: TableKeyRange,
302 read_options: ReadOptions,
303 ) -> StorageResult<HummockStorageIterator> {
304 let (key_range, read_version_tuple) =
305 self.build_read_version_tuple(self.epoch, key_range).await?;
306
307 self.hummock_version_reader
308 .iter(
309 key_range,
310 self.epoch.get_epoch(),
311 self.table_id,
312 self.table_option,
313 read_options,
314 read_version_tuple,
315 )
316 .await
317 }
318
319 async fn rev_iter_inner(
320 &self,
321 key_range: TableKeyRange,
322 read_options: ReadOptions,
323 ) -> StorageResult<HummockStorageRevIterator> {
324 let (key_range, read_version_tuple) =
325 self.build_read_version_tuple(self.epoch, key_range).await?;
326
327 self.hummock_version_reader
328 .rev_iter(
329 key_range,
330 self.epoch.get_epoch(),
331 self.table_id,
332 self.table_option,
333 read_options,
334 read_version_tuple,
335 None,
336 )
337 .await
338 }
339
340 async fn get_time_travel_version(
341 &self,
342 epoch: u64,
343 table_id: TableId,
344 ) -> StorageResult<PinnedVersion> {
345 let meta_client = self.hummock_meta_client.clone();
346 let fetch = async move {
347 let pb_version = meta_client
348 .get_version_by_epoch(epoch, table_id)
349 .await
350 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
351 .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
352 let version = HummockVersion::from_rpc_protobuf(&pb_version);
353 let (tx, _rx) = unbounded_channel();
354 Ok(PinnedVersion::new(version, tx))
355 };
356 let version = self
357 .simple_time_travel_version_cache
358 .get_or_insert(table_id, epoch, fetch)
359 .await?;
360 Ok(version)
361 }
362
363 async fn build_read_version_tuple(
364 &self,
365 epoch: HummockReadEpoch,
366 key_range: TableKeyRange,
367 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
368 match epoch {
369 HummockReadEpoch::Backup(epoch) => {
370 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
371 .await
372 }
373 HummockReadEpoch::Committed(epoch) => {
374 let tuple = self
375 .build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
376 .await?;
377 let (_, (_, _, version)) = &tuple;
378 let Some(committed_epoch) = version.table_committed_epoch(self.table_id) else {
379 return Err(HummockError::other(format!(
380 "table {} not found in version",
381 self.table_id
382 ))
383 .into());
384 };
385 if committed_epoch != epoch {
386 return Err(HummockError::wait_epoch(format!(
387 "mismatch table {} committed_epoch {} for read committed_epoch {}",
388 self.table_id, committed_epoch, epoch
389 ))
390 .into());
391 }
392 Ok(tuple)
393 }
394 HummockReadEpoch::BatchQueryCommitted(epoch, _)
395 | HummockReadEpoch::TimeTravel(epoch) => {
396 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
397 .await
398 }
399 HummockReadEpoch::NoWait(epoch) => {
400 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
401 .await
402 }
403 }
404 }
405
406 async fn build_read_version_tuple_from_backup(
407 &self,
408 epoch: u64,
409 table_id: TableId,
410 key_range: TableKeyRange,
411 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
412 match self
413 .backup_reader
414 .try_get_hummock_version(table_id, epoch)
415 .await
416 {
417 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
418 backup_version,
419 table_id,
420 key_range,
421 epoch,
422 )),
423 Ok(None) => Err(HummockError::read_backup_error(format!(
424 "backup include epoch {} not found",
425 epoch
426 ))
427 .into()),
428 Err(e) => Err(e),
429 }
430 }
431
432 async fn get_epoch_hummock_version(
433 &self,
434 epoch: u64,
435 table_id: TableId,
436 ) -> StorageResult<PinnedVersion> {
437 match self
438 .recent_versions
439 .load()
440 .get_safe_version(table_id, epoch)
441 {
442 Some(version) => Ok(version),
443 None => self.get_time_travel_version(epoch, table_id).await,
444 }
445 }
446
447 async fn build_read_version_tuple_from_committed(
448 &self,
449 epoch: u64,
450 table_id: TableId,
451 key_range: TableKeyRange,
452 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
453 let version = self.get_epoch_hummock_version(epoch, table_id).await?;
454 Ok(get_committed_read_version_tuple(
455 version, table_id, key_range, epoch,
456 ))
457 }
458
459 async fn build_read_version_tuple_from_all(
460 &self,
461 epoch: u64,
462 table_id: TableId,
463 key_range: TableKeyRange,
464 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
465 let pinned_version = self.recent_versions.load().latest_version().clone();
466 let info = pinned_version.state_table_info.info().get(&table_id);
467
468 let ret = if let Some(info) = info
470 && epoch <= info.committed_epoch
471 {
472 let pinned_version = if epoch < info.committed_epoch {
473 pinned_version
474 } else {
475 self.get_epoch_hummock_version(epoch, table_id).await?
476 };
477 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
479 } else {
480 let vnode = vnode(&key_range);
481 let mut matched_replicated_read_version_cnt = 0;
482 let read_version_vec = {
483 let read_guard = self.read_version_mapping.read();
484 read_guard
485 .get(&table_id)
486 .map(|v| {
487 v.values()
488 .filter(|v| {
489 let read_version = v.read();
490 if read_version.is_initialized() && read_version.contains(vnode) {
491 if read_version.is_replicated() {
492 matched_replicated_read_version_cnt += 1;
493 false
494 } else {
495 true
497 }
498 } else {
499 false
500 }
501 })
502 .cloned()
503 .collect_vec()
504 })
505 .unwrap_or_default()
506 };
507
508 if read_version_vec.is_empty() {
511 let table_committed_epoch = info.map(|info| info.committed_epoch);
512 if matched_replicated_read_version_cnt > 0 {
513 tracing::warn!(
514 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
515 table_id,
516 vnode.to_index(),
517 epoch,
518 matched_replicated_read_version_cnt,
519 table_committed_epoch,
520 );
521 } else {
522 tracing::debug!(
523 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
524 table_id,
525 vnode.to_index(),
526 epoch,
527 table_committed_epoch
528 );
529 }
530 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
531 } else {
532 if read_version_vec.len() != 1 {
533 let read_version_vnodes = read_version_vec
534 .into_iter()
535 .map(|v| {
536 let v = v.read();
537 v.vnodes().iter_ones().collect_vec()
538 })
539 .collect_vec();
540 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
541 }
542 read_filter_for_version(
543 epoch,
544 table_id,
545 key_range,
546 read_version_vec.first().unwrap(),
547 )?
548 }
549 };
550
551 Ok(ret)
552 }
553}
554
555impl HummockStorage {
556 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
557 let (tx, rx) = tokio::sync::oneshot::channel();
558 self.hummock_event_sender
559 .send(HummockEvent::RegisterReadVersion {
560 table_id: option.table_id,
561 new_read_version_sender: tx,
562 is_replicated: option.is_replicated,
563 vnodes: option.vnodes.clone(),
564 })
565 .unwrap();
566
567 let (basic_read_version, instance_guard) = rx.await.unwrap();
568 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
569 LocalHummockStorage::new(
570 instance_guard,
571 basic_read_version,
572 self.hummock_version_reader.clone(),
573 self.hummock_event_sender.clone(),
574 self.buffer_tracker.get_memory_limiter().clone(),
575 self.write_limiter.clone(),
576 option,
577 version_update_notifier_tx,
578 self.context.storage_opts.mem_table_spill_threshold,
579 )
580 }
581
582 pub async fn clear_shared_buffer(&self) {
583 let (tx, rx) = oneshot::channel();
584 self.hummock_event_sender
585 .send(HummockEvent::Clear(tx, None))
586 .expect("should send success");
587 rx.await.expect("should wait success");
588 }
589
590 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
591 if !table_ids.is_empty() {
592 let (tx, rx) = oneshot::channel();
593 self.hummock_event_sender
594 .send(HummockEvent::Clear(tx, Some(table_ids)))
595 .expect("should send success");
596 rx.await.expect("should wait success");
597 }
598 }
599
600 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
604 let _ = self
605 .hummock_event_sender
606 .send(HummockEvent::StartEpoch { epoch, table_ids });
607 }
608
609 pub fn sstable_store(&self) -> SstableStoreRef {
610 self.context.sstable_store.clone()
611 }
612
613 pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
614 &self.object_id_manager
615 }
616
617 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
618 self.compaction_catalog_manager_ref.clone()
619 }
620
621 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
622 self.buffer_tracker.get_memory_limiter().clone()
623 }
624
625 pub fn get_pinned_version(&self) -> PinnedVersion {
626 self.recent_versions.load().latest_version().clone()
627 }
628
629 pub fn backup_reader(&self) -> BackupReaderRef {
630 self.backup_reader.clone()
631 }
632
633 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
634 self.compact_await_tree_reg.as_ref()
635 }
636
637 pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
638 let (tx, rx) = oneshot::channel();
639 self.hummock_event_sender
640 .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
641 .expect("should send success");
642 rx.await.expect("should await success")
643 }
644
645 pub async fn sync(
646 &self,
647 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
648 ) -> StorageResult<SyncResult> {
649 let (tx, rx) = oneshot::channel();
650 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
651 sync_result_sender: tx,
652 sync_table_epochs,
653 });
654 let synced_data = rx
655 .await
656 .map_err(|_| HummockError::other("failed to receive sync result"))??;
657 Ok(synced_data.into_sync_result())
658 }
659}
660
661#[derive(Clone)]
662pub struct HummockStorageReadSnapshot {
663 epoch: HummockReadEpoch,
664 table_id: TableId,
665 table_option: TableOption,
666 recent_versions: Arc<ArcSwap<RecentVersions>>,
667 hummock_version_reader: HummockVersionReader,
668 read_version_mapping: ReadOnlyReadVersionMapping,
669 backup_reader: BackupReaderRef,
670 hummock_meta_client: Arc<dyn HummockMetaClient>,
671 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
672}
673
674impl StateStoreGet for HummockStorageReadSnapshot {
675 fn on_key_value<'a, O: Send + 'a>(
676 &'a self,
677 key: TableKey<Bytes>,
678 read_options: ReadOptions,
679 on_key_value_fn: impl KeyValueFn<'a, O>,
680 ) -> impl StorageFuture<'a, Option<O>> {
681 self.get_inner(key, read_options, on_key_value_fn)
682 }
683}
684
685impl StateStoreRead for HummockStorageReadSnapshot {
686 type Iter = HummockStorageIterator;
687 type RevIter = HummockStorageRevIterator;
688
689 fn iter(
690 &self,
691 key_range: TableKeyRange,
692 read_options: ReadOptions,
693 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
694 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
695 assert_eq!(
696 r_vnode_exclusive - l_vnode_inclusive,
697 1,
698 "read range {:?} for table {} iter contains more than one vnode",
699 key_range,
700 self.table_id
701 );
702 self.iter_inner(key_range, read_options)
703 }
704
705 fn rev_iter(
706 &self,
707 key_range: TableKeyRange,
708 read_options: ReadOptions,
709 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
710 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
711 assert_eq!(
712 r_vnode_exclusive - l_vnode_inclusive,
713 1,
714 "read range {:?} for table {} iter contains more than one vnode",
715 key_range,
716 self.table_id
717 );
718 self.rev_iter_inner(key_range, read_options)
719 }
720}
721
722impl StateStoreReadVector for HummockStorageReadSnapshot {
723 async fn nearest<'a, O: Send + 'a>(
724 &'a self,
725 vec: VectorRef<'a>,
726 options: VectorNearestOptions,
727 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
728 ) -> StorageResult<Vec<O>> {
729 let version = match self.epoch {
730 HummockReadEpoch::Committed(epoch)
731 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
732 | HummockReadEpoch::TimeTravel(epoch) => {
733 self.get_epoch_hummock_version(epoch, self.table_id).await?
734 }
735 HummockReadEpoch::Backup(epoch) => self
736 .backup_reader
737 .try_get_hummock_version(self.table_id, epoch)
738 .await?
739 .ok_or_else(|| {
740 HummockError::read_backup_error(format!(
741 "backup include epoch {} not found",
742 epoch
743 ))
744 })?,
745 HummockReadEpoch::NoWait(_) => {
746 return Err(
747 HummockError::other("nearest query does not support NoWait epoch").into(),
748 );
749 }
750 };
751 dispatch_distance_measurement!(options.measure, MeasurementType, {
752 Ok(self
753 .hummock_version_reader
754 .nearest::<MeasurementType, O>(
755 version,
756 self.table_id,
757 vec,
758 options,
759 on_nearest_item_fn,
760 )
761 .await?)
762 })
763 }
764}
765
766impl StateStoreReadLog for HummockStorage {
767 type ChangeLogIter = ChangeLogIterator;
768
769 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
770 fn next_epoch(
771 table_change_log: &TableChangeLogs,
772 epoch: u64,
773 table_id: TableId,
774 ) -> HummockResult<Option<u64>> {
775 let table_change_log = table_change_log.get(&table_id).ok_or_else(|| {
776 HummockError::next_epoch(format!("table {} has been dropped", table_id))
777 })?;
778 table_change_log.next_epoch(epoch).map_err(|_| {
779 HummockError::next_epoch(format!(
780 "invalid epoch {}, change log epoch: {:?}",
781 epoch,
782 table_change_log.epochs().collect_vec()
783 ))
784 })
785 }
786 {
787 if let Some(max_epoch) = self
789 .recent_versions
790 .load()
791 .latest_version()
792 .deref()
793 .state_table_info
794 .info()
795 .get(&options.table_id)
796 .map(|i| i.committed_epoch)
797 && max_epoch > epoch
798 {
799 let table_change_log = self
801 .table_change_log_manager
802 .fetch_table_change_logs(options.table_id, (epoch, max_epoch), true, Some(2))
803 .await?;
804 if let Some(next_epoch) = next_epoch(&table_change_log, epoch, options.table_id)? {
805 return Ok(next_epoch);
806 }
807 }
808 }
809 let mut max_epoch = None;
810 wait_for_update(
811 &self.version_update_notifier_tx,
812 |version| {
813 let Some(mce) = version
814 .state_table_info
815 .info()
816 .get(&options.table_id)
817 .map(|i| i.committed_epoch)
818 else {
819 return Ok(false);
820 };
821 max_epoch = Some(mce);
822 Ok(mce > epoch)
823 },
824 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
825 )
826 .await?;
827 let table_change_log = self
829 .table_change_log_manager
830 .fetch_table_change_logs(
831 options.table_id,
832 (epoch, max_epoch.unwrap_or(u64::MAX)),
833 true,
834 Some(2),
835 )
836 .await?;
837 let next_epoch_ret = next_epoch(&table_change_log, epoch, options.table_id)?;
838 next_epoch_ret.ok_or_else(|| {
839 HummockError::next_epoch(format!(
840 "next_epoch for {} {} should be valid",
841 options.table_id, epoch
842 ))
843 .into()
844 })
845 }
846
847 async fn iter_log(
848 &self,
849 epoch_range: (u64, u64),
850 key_range: TableKeyRange,
851 options: ReadLogOptions,
852 ) -> StorageResult<Self::ChangeLogIter> {
853 let iter = self
854 .hummock_version_reader
855 .iter_log(
856 epoch_range,
857 key_range,
858 options,
859 self.table_change_log_manager.clone(),
860 )
861 .await?;
862 Ok(iter)
863 }
864}
865
866impl HummockStorage {
867 async fn try_wait_epoch_impl(
870 &self,
871 wait_epoch: HummockReadEpoch,
872 table_id: TableId,
873 ) -> StorageResult<()> {
874 tracing::debug!(
875 "try_wait_epoch: epoch: {:?}, table_id: {}",
876 wait_epoch,
877 table_id
878 );
879 match wait_epoch {
880 HummockReadEpoch::Committed(wait_epoch) => {
881 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
882 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
883 }
884 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
885 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
886 {
888 let recent_versions = self.recent_versions.load();
889 let latest_version = recent_versions.latest_version();
890 if latest_version.id >= wait_version_id
891 && let Some(committed_epoch) =
892 latest_version.table_committed_epoch(table_id)
893 && committed_epoch >= wait_epoch
894 {
895 return Ok(());
896 }
897 }
898 wait_for_update(
899 &self.version_update_notifier_tx,
900 |version| {
901 if wait_version_id > version.id() {
902 return Ok(false);
903 }
904 let committed_epoch =
905 version.table_committed_epoch(table_id).ok_or_else(|| {
906 HummockError::wait_epoch(format!(
910 "table id {} has been dropped",
911 table_id
912 ))
913 })?;
914 Ok(committed_epoch >= wait_epoch)
915 },
916 || {
917 format!(
918 "try_wait_epoch: epoch: {}, version_id: {:?}",
919 wait_epoch, wait_version_id
920 )
921 },
922 )
923 .await?;
924 }
925 _ => {}
926 };
927 Ok(())
928 }
929}
930
931impl StateStore for HummockStorage {
932 type Local = LocalHummockStorage;
933 type ReadSnapshot = HummockStorageReadSnapshot;
934 type VectorWriter = HummockVectorWriter;
935
936 async fn try_wait_epoch(
939 &self,
940 wait_epoch: HummockReadEpoch,
941 options: TryWaitEpochOptions,
942 ) -> StorageResult<()> {
943 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
944 }
945
946 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
947 self.new_local_inner(option)
948 }
949
950 async fn new_read_snapshot(
951 &self,
952 epoch: HummockReadEpoch,
953 options: NewReadSnapshotOptions,
954 ) -> StorageResult<Self::ReadSnapshot> {
955 self.try_wait_epoch_impl(epoch, options.table_id).await?;
956 Ok(HummockStorageReadSnapshot {
957 epoch,
958 table_id: options.table_id,
959 table_option: options.table_option,
960 recent_versions: self.recent_versions.clone(),
961 hummock_version_reader: self.hummock_version_reader.clone(),
962 read_version_mapping: self.read_version_mapping.clone(),
963 backup_reader: self.backup_reader.clone(),
964 hummock_meta_client: self.hummock_meta_client.clone(),
965 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
966 })
967 }
968
969 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
970 HummockVectorWriter::new(
971 options.table_id,
972 self.version_update_notifier_tx.clone(),
973 self.context.sstable_store.clone(),
974 self.object_id_manager.clone(),
975 self.hummock_event_sender.clone(),
976 self.hummock_version_reader.stats().clone(),
977 self.context.storage_opts.clone(),
978 )
979 }
980}
981
982#[cfg(any(test, feature = "test"))]
983impl HummockStorage {
984 pub async fn seal_and_sync_epoch(
985 &self,
986 epoch: u64,
987 table_ids: HashSet<TableId>,
988 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
989 self.sync(vec![(epoch, table_ids)]).await
990 }
991
992 pub async fn update_version_and_wait(&self, version: HummockVersion) {
994 use tokio::task::yield_now;
995 let version_id = version.id;
996 self._version_update_sender
997 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
998 .unwrap();
999 loop {
1000 if self.recent_versions.load().latest_version().id() >= version_id {
1001 break;
1002 }
1003
1004 yield_now().await
1005 }
1006 }
1007
1008 pub async fn wait_version(&self, version: HummockVersion) {
1009 use tokio::task::yield_now;
1010 loop {
1011 if self.recent_versions.load().latest_version().id() >= version.id {
1012 break;
1013 }
1014
1015 yield_now().await
1016 }
1017 }
1018
1019 pub async fn for_test(
1021 options: Arc<StorageOpts>,
1022 sstable_store: SstableStoreRef,
1023 hummock_meta_client: Arc<dyn HummockMetaClient>,
1024 notification_client: impl NotificationClient,
1025 ) -> HummockResult<Self> {
1026 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
1027 FakeRemoteTableAccessor {},
1028 )));
1029
1030 Self::new(
1031 options,
1032 sstable_store,
1033 hummock_meta_client,
1034 notification_client,
1035 compaction_catalog_manager,
1036 Arc::new(HummockStateStoreMetrics::unused()),
1037 Arc::new(CompactorMetrics::unused()),
1038 None,
1039 )
1040 .await
1041 }
1042
1043 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
1044 &self.context.storage_opts
1045 }
1046
1047 pub fn version_reader(&self) -> &HummockVersionReader {
1048 &self.hummock_version_reader
1049 }
1050
1051 pub async fn wait_version_update(
1052 &self,
1053 old_id: risingwave_hummock_sdk::HummockVersionId,
1054 ) -> risingwave_hummock_sdk::HummockVersionId {
1055 use tokio::task::yield_now;
1056 loop {
1057 let cur_id = self.recent_versions.load().latest_version().id();
1058 if cur_id > old_id {
1059 return cur_id;
1060 }
1061 yield_now().await;
1062 }
1063 }
1064
1065 #[cfg(any(test, feature = "test"))]
1066 pub async fn flush_events_for_test(&self) {
1067 let (tx, rx) = oneshot::channel();
1068 self.hummock_event_sender
1069 .send(HummockEvent::FlushEvent(tx))
1070 .expect("flush event should succeed");
1071 rx.await.expect("flush event receiver dropped");
1072 }
1073}