1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::{Bound, Deref};
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::change_log::TableChangeLogs;
29use risingwave_hummock_sdk::key::{
30 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
31};
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
34use risingwave_hummock_sdk::version::HummockVersion;
35use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
36use risingwave_rpc_client::HummockMetaClient;
37use risingwave_rpc_client::error::RpcError;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
40use tokio::sync::oneshot;
41
42use super::local_hummock_storage::LocalHummockStorage;
43use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
44use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
45#[cfg(any(test, feature = "test"))]
46use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
47use crate::error::StorageResult;
48use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
49use crate::hummock::compactor::{
50 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
51};
52use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
53use crate::hummock::event_handler::{
54 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
55};
56use crate::hummock::iterator::change_log::ChangeLogIterator;
57use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
58use crate::hummock::local_version::recent_versions::RecentVersions;
59use crate::hummock::observer_manager::HummockObserverNode;
60use crate::hummock::store::vector_writer::HummockVectorWriter;
61use crate::hummock::table_change_log_manager::TableChangeLogManager;
62use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
63use crate::hummock::utils::{wait_for_epoch, wait_for_update};
64use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
65use crate::hummock::{
66 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
67 MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
68};
69use crate::mem_table::ImmutableMemtable;
70use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
71use crate::opts::StorageOpts;
72use crate::store::*;
73
74struct HummockStorageShutdownGuard {
75 shutdown_sender: HummockEventSender,
76}
77
78impl Drop for HummockStorageShutdownGuard {
79 fn drop(&mut self) {
80 let _ = self
81 .shutdown_sender
82 .send(HummockEvent::Shutdown)
83 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
84 }
85}
86
87#[derive(Clone)]
93pub struct HummockStorage {
94 hummock_event_sender: HummockEventSender,
95 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
97
98 context: CompactorContext,
99
100 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
101
102 object_id_manager: ObjectIdManagerRef,
103
104 buffer_tracker: BufferTracker,
105
106 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
107
108 recent_versions: Arc<ArcSwap<RecentVersions>>,
109
110 hummock_version_reader: HummockVersionReader,
111
112 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
113
114 read_version_mapping: ReadOnlyReadVersionMapping,
115
116 backup_reader: BackupReaderRef,
117
118 write_limiter: WriteLimiterRef,
119
120 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
121
122 hummock_meta_client: Arc<dyn HummockMetaClient>,
123
124 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
125
126 table_change_log_manager: Arc<TableChangeLogManager>,
127}
128
129pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
130
131pub fn get_committed_read_version_tuple(
132 version: PinnedVersion,
133 table_id: TableId,
134 mut key_range: TableKeyRange,
135 epoch: HummockEpoch,
136) -> (TableKeyRange, ReadVersionTuple) {
137 if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
138 TableWatermarksIndex::new_committed(
139 table_watermarks.clone(),
140 version
141 .state_table_info
142 .info()
143 .get(&table_id)
144 .expect("should exist when having table watermark")
145 .committed_epoch,
146 table_watermarks.watermark_type,
147 )
148 .rewrite_range_with_table_watermark(epoch, &mut key_range)
149 }
150 (key_range, (vec![], vec![], version))
151}
152
153impl HummockStorage {
154 #[allow(clippy::too_many_arguments)]
156 pub async fn new(
157 options: Arc<StorageOpts>,
158 sstable_store: SstableStoreRef,
159 hummock_meta_client: Arc<dyn HummockMetaClient>,
160 notification_client: impl NotificationClient,
161 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
162 state_store_metrics: Arc<HummockStateStoreMetrics>,
163 compactor_metrics: Arc<CompactorMetrics>,
164 await_tree_config: Option<await_tree::Config>,
165 ) -> HummockResult<Self> {
166 let object_id_manager = Arc::new(ObjectIdManager::new(
167 hummock_meta_client.clone(),
168 options.sstable_id_remote_fetch_number,
169 ));
170 let backup_reader = BackupReader::new(
171 &options.backup_storage_url,
172 &options.backup_storage_directory,
173 &options.object_store_config,
174 )
175 .await
176 .map_err(HummockError::read_backup_error)?;
177 let write_limiter = Arc::new(WriteLimiter::default());
178 let (version_update_tx, mut version_update_rx) = unbounded_channel();
179
180 let observer_manager = ObserverManager::new(
181 notification_client,
182 HummockObserverNode::new(
183 compaction_catalog_manager_ref.clone(),
184 backup_reader.clone(),
185 version_update_tx.clone(),
186 write_limiter.clone(),
187 ),
188 )
189 .await;
190 observer_manager.start().await;
191
192 let hummock_version = match version_update_rx.recv().await {
193 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
194 _ => unreachable!(
195 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
196 ),
197 };
198
199 let (pin_version_tx, pin_version_rx) = unbounded_channel();
200 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
201 tokio::spawn(start_pinned_version_worker(
202 pin_version_rx,
203 hummock_meta_client.clone(),
204 options.max_version_pinning_duration_sec,
205 ));
206
207 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
208
209 let compactor_context = CompactorContext::new_local_compact_context(
210 options.clone(),
211 sstable_store.clone(),
212 compactor_metrics.clone(),
213 await_tree_reg.clone(),
214 );
215
216 let hummock_event_handler = HummockEventHandler::new(
217 version_update_rx,
218 pinned_version,
219 compactor_context.clone(),
220 compaction_catalog_manager_ref.clone(),
221 object_id_manager.clone(),
222 state_store_metrics.clone(),
223 );
224
225 let event_tx = hummock_event_handler.event_sender();
226 let table_change_log_manager = Arc::new(TableChangeLogManager::new(
227 options.table_change_log_cache_capacity,
228 hummock_meta_client.clone(),
229 ));
230 let instance = Self {
231 context: compactor_context,
232 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
233 object_id_manager,
234 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
235 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
236 hummock_event_sender: event_tx.clone(),
237 _version_update_sender: version_update_tx,
238 recent_versions: hummock_event_handler.recent_versions(),
239 hummock_version_reader: HummockVersionReader::new(
240 sstable_store,
241 state_store_metrics.clone(),
242 options.max_preload_io_retry_times,
243 ),
244 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
245 shutdown_sender: event_tx,
246 }),
247 read_version_mapping: hummock_event_handler.read_version_mapping(),
248 backup_reader,
249 write_limiter,
250 compact_await_tree_reg: await_tree_reg,
251 hummock_meta_client,
252 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
253 options.time_travel_version_cache_capacity,
254 )),
255 table_change_log_manager,
256 };
257
258 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
259
260 Ok(instance)
261 }
262}
263
264impl HummockStorageReadSnapshot {
265 async fn get_inner<'a, O>(
273 &'a self,
274 key: TableKey<Bytes>,
275 read_options: ReadOptions,
276 on_key_value_fn: impl KeyValueFn<'a, O>,
277 ) -> StorageResult<Option<O>> {
278 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
279
280 let (key_range, read_version_tuple) =
281 self.build_read_version_tuple(self.epoch, key_range).await?;
282
283 if is_empty_key_range(&key_range) {
284 return Ok(None);
285 }
286
287 self.hummock_version_reader
288 .get(
289 key,
290 self.epoch.get_epoch(),
291 self.table_id,
292 self.table_option,
293 read_options,
294 read_version_tuple,
295 on_key_value_fn,
296 )
297 .await
298 }
299
300 async fn iter_inner(
301 &self,
302 key_range: TableKeyRange,
303 read_options: ReadOptions,
304 ) -> StorageResult<HummockStorageIterator> {
305 let (key_range, read_version_tuple) =
306 self.build_read_version_tuple(self.epoch, key_range).await?;
307
308 self.hummock_version_reader
309 .iter(
310 key_range,
311 self.epoch.get_epoch(),
312 self.table_id,
313 self.table_option,
314 read_options,
315 read_version_tuple,
316 )
317 .await
318 }
319
320 async fn rev_iter_inner(
321 &self,
322 key_range: TableKeyRange,
323 read_options: ReadOptions,
324 ) -> StorageResult<HummockStorageRevIterator> {
325 let (key_range, read_version_tuple) =
326 self.build_read_version_tuple(self.epoch, key_range).await?;
327
328 self.hummock_version_reader
329 .rev_iter(
330 key_range,
331 self.epoch.get_epoch(),
332 self.table_id,
333 self.table_option,
334 read_options,
335 read_version_tuple,
336 None,
337 )
338 .await
339 }
340
341 async fn get_time_travel_version(
342 &self,
343 epoch: u64,
344 table_id: TableId,
345 ) -> StorageResult<PinnedVersion> {
346 let meta_client = self.hummock_meta_client.clone();
347 let fetch = async move {
348 let pb_version = meta_client
349 .get_version_by_epoch(epoch, table_id)
350 .await
351 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
352 .map_err(|e| match &e {
353 RpcError::GrpcStatus(status)
354 if status.inner().code() == tonic::Code::OutOfRange =>
355 {
356 HummockError::time_travel_version_expired(table_id, epoch)
357 }
358 _ => HummockError::meta_error(e.to_report_string()),
359 })?;
360 let version = HummockVersion::from_rpc_protobuf(&pb_version);
361 let (tx, _rx) = unbounded_channel();
362 Ok(PinnedVersion::new(version, tx))
363 };
364 let version = self
365 .simple_time_travel_version_cache
366 .get_or_insert(table_id, epoch, fetch)
367 .await?;
368 Ok(version)
369 }
370
371 async fn build_read_version_tuple(
372 &self,
373 epoch: HummockReadEpoch,
374 key_range: TableKeyRange,
375 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
376 match epoch {
377 HummockReadEpoch::Backup(epoch) => {
378 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
379 .await
380 }
381 HummockReadEpoch::Committed(epoch) => {
382 let tuple = self
383 .build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
384 .await?;
385 let (_, (_, _, version)) = &tuple;
386 let Some(committed_epoch) = version.table_committed_epoch(self.table_id) else {
387 return Err(HummockError::other(format!(
388 "table {} not found in version",
389 self.table_id
390 ))
391 .into());
392 };
393 if committed_epoch != epoch {
394 return Err(HummockError::committed_epoch_mismatch(
395 self.table_id,
396 committed_epoch,
397 epoch,
398 )
399 .into());
400 }
401 Ok(tuple)
402 }
403 HummockReadEpoch::BatchQueryCommitted(epoch, _)
404 | HummockReadEpoch::TimeTravel(epoch) => {
405 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
406 .await
407 }
408 HummockReadEpoch::NoWait(epoch) => {
409 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
410 .await
411 }
412 }
413 }
414
415 async fn build_read_version_tuple_from_backup(
416 &self,
417 epoch: u64,
418 table_id: TableId,
419 key_range: TableKeyRange,
420 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
421 match self
422 .backup_reader
423 .try_get_hummock_version(table_id, epoch)
424 .await
425 {
426 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
427 backup_version,
428 table_id,
429 key_range,
430 epoch,
431 )),
432 Ok(None) => Err(HummockError::read_backup_error(format!(
433 "backup include epoch {} not found",
434 epoch
435 ))
436 .into()),
437 Err(e) => Err(e),
438 }
439 }
440
441 async fn get_epoch_hummock_version(
442 &self,
443 epoch: u64,
444 table_id: TableId,
445 ) -> StorageResult<PinnedVersion> {
446 match self
447 .recent_versions
448 .load()
449 .get_safe_version(table_id, epoch)
450 {
451 Some(version) => Ok(version),
452 None => self.get_time_travel_version(epoch, table_id).await,
453 }
454 }
455
456 async fn build_read_version_tuple_from_committed(
457 &self,
458 epoch: u64,
459 table_id: TableId,
460 key_range: TableKeyRange,
461 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
462 let version = self.get_epoch_hummock_version(epoch, table_id).await?;
463 Ok(get_committed_read_version_tuple(
464 version, table_id, key_range, epoch,
465 ))
466 }
467
468 async fn build_read_version_tuple_from_all(
469 &self,
470 epoch: u64,
471 table_id: TableId,
472 key_range: TableKeyRange,
473 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
474 let pinned_version = self.recent_versions.load().latest_version().clone();
475 let info = pinned_version.state_table_info.info().get(&table_id);
476
477 let ret = if let Some(info) = info
479 && epoch <= info.committed_epoch
480 {
481 let pinned_version = if epoch < info.committed_epoch {
482 pinned_version
483 } else {
484 self.get_epoch_hummock_version(epoch, table_id).await?
485 };
486 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
488 } else {
489 let vnode = vnode(&key_range);
490 let mut matched_replicated_read_version_cnt = 0;
491 let read_version_vec = {
492 let read_guard = self.read_version_mapping.read();
493 read_guard
494 .get(&table_id)
495 .map(|v| {
496 v.values()
497 .filter(|v| {
498 let read_version = v.read();
499 if read_version.is_initialized() && read_version.contains(vnode) {
500 if read_version.is_replicated() {
501 matched_replicated_read_version_cnt += 1;
502 false
503 } else {
504 true
506 }
507 } else {
508 false
509 }
510 })
511 .cloned()
512 .collect_vec()
513 })
514 .unwrap_or_default()
515 };
516
517 if read_version_vec.is_empty() {
520 let table_committed_epoch = info.map(|info| info.committed_epoch);
521 if matched_replicated_read_version_cnt > 0 {
522 tracing::warn!(
523 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
524 table_id,
525 vnode.to_index(),
526 epoch,
527 matched_replicated_read_version_cnt,
528 table_committed_epoch,
529 );
530 } else {
531 tracing::debug!(
532 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
533 table_id,
534 vnode.to_index(),
535 epoch,
536 table_committed_epoch
537 );
538 }
539 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
540 } else {
541 if read_version_vec.len() != 1 {
542 let read_version_vnodes = read_version_vec
543 .into_iter()
544 .map(|v| {
545 let v = v.read();
546 v.vnodes().iter_ones().collect_vec()
547 })
548 .collect_vec();
549 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
550 }
551 read_filter_for_version(
552 epoch,
553 table_id,
554 key_range,
555 read_version_vec.first().unwrap(),
556 )?
557 }
558 };
559
560 Ok(ret)
561 }
562}
563
564impl HummockStorage {
565 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
566 let (tx, rx) = tokio::sync::oneshot::channel();
567 self.hummock_event_sender
568 .send(HummockEvent::RegisterReadVersion {
569 table_id: option.table_id,
570 new_read_version_sender: tx,
571 is_replicated: option.is_replicated,
572 vnodes: option.vnodes.clone(),
573 })
574 .unwrap();
575
576 let (basic_read_version, instance_guard) = rx.await.unwrap();
577 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
578 LocalHummockStorage::new(
579 instance_guard,
580 basic_read_version,
581 self.hummock_version_reader.clone(),
582 self.hummock_event_sender.clone(),
583 self.buffer_tracker.get_memory_limiter().clone(),
584 self.write_limiter.clone(),
585 option,
586 version_update_notifier_tx,
587 self.context.storage_opts.mem_table_spill_threshold,
588 )
589 }
590
591 pub async fn clear_shared_buffer(&self) {
592 let (tx, rx) = oneshot::channel();
593 self.hummock_event_sender
594 .send(HummockEvent::Clear(tx, None))
595 .expect("should send success");
596 rx.await.expect("should wait success");
597 }
598
599 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
600 if !table_ids.is_empty() {
601 let (tx, rx) = oneshot::channel();
602 self.hummock_event_sender
603 .send(HummockEvent::Clear(tx, Some(table_ids)))
604 .expect("should send success");
605 rx.await.expect("should wait success");
606 }
607 }
608
609 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
613 let _ = self
614 .hummock_event_sender
615 .send(HummockEvent::StartEpoch { epoch, table_ids });
616 }
617
618 pub fn sstable_store(&self) -> SstableStoreRef {
619 self.context.sstable_store.clone()
620 }
621
622 pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
623 &self.object_id_manager
624 }
625
626 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
627 self.compaction_catalog_manager_ref.clone()
628 }
629
630 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
631 self.buffer_tracker.get_memory_limiter().clone()
632 }
633
634 pub fn get_pinned_version(&self) -> PinnedVersion {
635 self.recent_versions.load().latest_version().clone()
636 }
637
638 pub fn backup_reader(&self) -> BackupReaderRef {
639 self.backup_reader.clone()
640 }
641
642 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
643 self.compact_await_tree_reg.as_ref()
644 }
645
646 pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
647 let (tx, rx) = oneshot::channel();
648 self.hummock_event_sender
649 .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
650 .expect("should send success");
651 rx.await.expect("should await success")
652 }
653
654 pub async fn sync(
655 &self,
656 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
657 ) -> StorageResult<SyncResult> {
658 let (tx, rx) = oneshot::channel();
659 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
660 sync_result_sender: tx,
661 sync_table_epochs,
662 });
663 let synced_data = rx
664 .await
665 .map_err(|_| HummockError::other("failed to receive sync result"))??;
666 Ok(synced_data.into_sync_result())
667 }
668}
669
670#[derive(Clone)]
671pub struct HummockStorageReadSnapshot {
672 epoch: HummockReadEpoch,
673 table_id: TableId,
674 table_option: TableOption,
675 recent_versions: Arc<ArcSwap<RecentVersions>>,
676 hummock_version_reader: HummockVersionReader,
677 read_version_mapping: ReadOnlyReadVersionMapping,
678 backup_reader: BackupReaderRef,
679 hummock_meta_client: Arc<dyn HummockMetaClient>,
680 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
681}
682
683impl StateStoreGet for HummockStorageReadSnapshot {
684 fn on_key_value<'a, O: Send + 'a>(
685 &'a self,
686 key: TableKey<Bytes>,
687 read_options: ReadOptions,
688 on_key_value_fn: impl KeyValueFn<'a, O>,
689 ) -> impl StorageFuture<'a, Option<O>> {
690 self.get_inner(key, read_options, on_key_value_fn)
691 }
692}
693
694impl StateStoreRead for HummockStorageReadSnapshot {
695 type Iter = HummockStorageIterator;
696 type RevIter = HummockStorageRevIterator;
697
698 fn iter(
699 &self,
700 key_range: TableKeyRange,
701 read_options: ReadOptions,
702 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
703 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
704 assert_eq!(
705 r_vnode_exclusive - l_vnode_inclusive,
706 1,
707 "read range {:?} for table {} iter contains more than one vnode",
708 key_range,
709 self.table_id
710 );
711 self.iter_inner(key_range, read_options)
712 }
713
714 fn rev_iter(
715 &self,
716 key_range: TableKeyRange,
717 read_options: ReadOptions,
718 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
719 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
720 assert_eq!(
721 r_vnode_exclusive - l_vnode_inclusive,
722 1,
723 "read range {:?} for table {} iter contains more than one vnode",
724 key_range,
725 self.table_id
726 );
727 self.rev_iter_inner(key_range, read_options)
728 }
729}
730
731impl StateStoreReadVector for HummockStorageReadSnapshot {
732 async fn nearest<'a, O: Send + 'a>(
733 &'a self,
734 vec: VectorRef<'a>,
735 options: VectorNearestOptions,
736 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
737 ) -> StorageResult<Vec<O>> {
738 let version = match self.epoch {
739 HummockReadEpoch::Committed(epoch)
740 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
741 | HummockReadEpoch::TimeTravel(epoch) => {
742 self.get_epoch_hummock_version(epoch, self.table_id).await?
743 }
744 HummockReadEpoch::Backup(epoch) => self
745 .backup_reader
746 .try_get_hummock_version(self.table_id, epoch)
747 .await?
748 .ok_or_else(|| {
749 HummockError::read_backup_error(format!(
750 "backup include epoch {} not found",
751 epoch
752 ))
753 })?,
754 HummockReadEpoch::NoWait(_) => {
755 return Err(
756 HummockError::other("nearest query does not support NoWait epoch").into(),
757 );
758 }
759 };
760 dispatch_distance_measurement!(options.measure, MeasurementType, {
761 Ok(self
762 .hummock_version_reader
763 .nearest::<MeasurementType, O>(
764 version,
765 self.table_id,
766 vec,
767 options,
768 on_nearest_item_fn,
769 )
770 .await?)
771 })
772 }
773}
774
775impl StateStoreReadLog for HummockStorage {
776 type ChangeLogIter = ChangeLogIterator;
777
778 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
779 fn next_epoch(
780 table_change_log: &TableChangeLogs,
781 epoch: u64,
782 table_id: TableId,
783 ) -> HummockResult<Option<u64>> {
784 let table_change_log = table_change_log.get(&table_id).ok_or_else(|| {
785 HummockError::next_epoch(format!("table {} has been dropped", table_id))
786 })?;
787 table_change_log
788 .next_epoch(epoch)
789 .map_err(|_| HummockError::change_log_retention_miss(table_id, epoch))
790 }
791 {
792 if let Some(max_epoch) = self
794 .recent_versions
795 .load()
796 .latest_version()
797 .deref()
798 .state_table_info
799 .info()
800 .get(&options.table_id)
801 .map(|i| i.committed_epoch)
802 && max_epoch > epoch
803 {
804 let table_change_log = self
806 .table_change_log_manager
807 .fetch_table_change_logs(options.table_id, (epoch, max_epoch), true, Some(2))
808 .await?;
809 if let Some(next_epoch) = next_epoch(&table_change_log, epoch, options.table_id)? {
810 return Ok(next_epoch);
811 }
812 }
813 }
814 let mut max_epoch = None;
815 wait_for_update(
816 &self.version_update_notifier_tx,
817 |version| {
818 let Some(mce) = version
819 .state_table_info
820 .info()
821 .get(&options.table_id)
822 .map(|i| i.committed_epoch)
823 else {
824 return Ok(false);
825 };
826 max_epoch = Some(mce);
827 Ok(mce > epoch)
828 },
829 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
830 )
831 .await?;
832 let table_change_log = self
834 .table_change_log_manager
835 .fetch_table_change_logs(
836 options.table_id,
837 (epoch, max_epoch.unwrap_or(u64::MAX)),
838 true,
839 Some(2),
840 )
841 .await?;
842 let next_epoch_ret = next_epoch(&table_change_log, epoch, options.table_id)?;
843 next_epoch_ret.ok_or_else(|| {
844 HummockError::next_epoch(format!(
845 "next_epoch for {} {} should be valid",
846 options.table_id, epoch
847 ))
848 .into()
849 })
850 }
851
852 async fn iter_log(
853 &self,
854 epoch_range: (u64, u64),
855 key_range: TableKeyRange,
856 options: ReadLogOptions,
857 ) -> StorageResult<Self::ChangeLogIter> {
858 let iter = self
859 .hummock_version_reader
860 .iter_log(
861 epoch_range,
862 key_range,
863 options,
864 self.table_change_log_manager.clone(),
865 )
866 .await?;
867 Ok(iter)
868 }
869}
870
871impl HummockStorage {
872 async fn try_wait_epoch_impl(
875 &self,
876 wait_epoch: HummockReadEpoch,
877 table_id: TableId,
878 ) -> StorageResult<()> {
879 tracing::debug!(
880 "try_wait_epoch: epoch: {:?}, table_id: {}",
881 wait_epoch,
882 table_id
883 );
884 match wait_epoch {
885 HummockReadEpoch::Committed(wait_epoch) => {
886 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
887 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
888 }
889 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
890 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
891 {
893 let recent_versions = self.recent_versions.load();
894 let latest_version = recent_versions.latest_version();
895 if latest_version.id >= wait_version_id
896 && let Some(committed_epoch) =
897 latest_version.table_committed_epoch(table_id)
898 && committed_epoch >= wait_epoch
899 {
900 return Ok(());
901 }
902 }
903 wait_for_update(
904 &self.version_update_notifier_tx,
905 |version| {
906 if wait_version_id > version.id() {
907 return Ok(false);
908 }
909 let committed_epoch =
910 version.table_committed_epoch(table_id).ok_or_else(|| {
911 HummockError::wait_epoch(format!(
915 "table id {} has been dropped",
916 table_id
917 ))
918 })?;
919 Ok(committed_epoch >= wait_epoch)
920 },
921 || {
922 format!(
923 "try_wait_epoch: epoch: {}, version_id: {:?}",
924 wait_epoch, wait_version_id
925 )
926 },
927 )
928 .await?;
929 }
930 _ => {}
931 };
932 Ok(())
933 }
934}
935
936impl StateStore for HummockStorage {
937 type Local = LocalHummockStorage;
938 type ReadSnapshot = HummockStorageReadSnapshot;
939 type VectorWriter = HummockVectorWriter;
940
941 async fn try_wait_epoch(
944 &self,
945 wait_epoch: HummockReadEpoch,
946 options: TryWaitEpochOptions,
947 ) -> StorageResult<()> {
948 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
949 }
950
951 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
952 self.new_local_inner(option)
953 }
954
955 async fn new_read_snapshot(
956 &self,
957 epoch: HummockReadEpoch,
958 options: NewReadSnapshotOptions,
959 ) -> StorageResult<Self::ReadSnapshot> {
960 self.try_wait_epoch_impl(epoch, options.table_id).await?;
961 Ok(HummockStorageReadSnapshot {
962 epoch,
963 table_id: options.table_id,
964 table_option: options.table_option,
965 recent_versions: self.recent_versions.clone(),
966 hummock_version_reader: self.hummock_version_reader.clone(),
967 read_version_mapping: self.read_version_mapping.clone(),
968 backup_reader: self.backup_reader.clone(),
969 hummock_meta_client: self.hummock_meta_client.clone(),
970 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
971 })
972 }
973
974 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
975 HummockVectorWriter::new(
976 options.table_id,
977 self.version_update_notifier_tx.clone(),
978 self.context.sstable_store.clone(),
979 self.object_id_manager.clone(),
980 self.hummock_event_sender.clone(),
981 self.hummock_version_reader.stats().clone(),
982 self.context.storage_opts.clone(),
983 )
984 }
985}
986
987#[cfg(any(test, feature = "test"))]
988impl HummockStorage {
989 pub async fn seal_and_sync_epoch(
990 &self,
991 epoch: u64,
992 table_ids: HashSet<TableId>,
993 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
994 self.sync(vec![(epoch, table_ids)]).await
995 }
996
997 pub async fn update_version_and_wait(&self, version: HummockVersion) {
999 use tokio::task::yield_now;
1000 let version_id = version.id;
1001 self._version_update_sender
1002 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
1003 .unwrap();
1004 loop {
1005 if self.recent_versions.load().latest_version().id() >= version_id {
1006 break;
1007 }
1008
1009 yield_now().await
1010 }
1011 }
1012
1013 pub async fn wait_version(&self, version: HummockVersion) {
1014 use tokio::task::yield_now;
1015 loop {
1016 if self.recent_versions.load().latest_version().id() >= version.id {
1017 break;
1018 }
1019
1020 yield_now().await
1021 }
1022 }
1023
1024 pub async fn for_test(
1026 options: Arc<StorageOpts>,
1027 sstable_store: SstableStoreRef,
1028 hummock_meta_client: Arc<dyn HummockMetaClient>,
1029 notification_client: impl NotificationClient,
1030 ) -> HummockResult<Self> {
1031 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
1032 FakeRemoteTableAccessor {},
1033 )));
1034
1035 Self::new(
1036 options,
1037 sstable_store,
1038 hummock_meta_client,
1039 notification_client,
1040 compaction_catalog_manager,
1041 Arc::new(HummockStateStoreMetrics::unused()),
1042 Arc::new(CompactorMetrics::unused()),
1043 None,
1044 )
1045 .await
1046 }
1047
1048 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
1049 &self.context.storage_opts
1050 }
1051
1052 pub fn version_reader(&self) -> &HummockVersionReader {
1053 &self.hummock_version_reader
1054 }
1055
1056 pub async fn wait_version_update(
1057 &self,
1058 old_id: risingwave_hummock_sdk::HummockVersionId,
1059 ) -> risingwave_hummock_sdk::HummockVersionId {
1060 use tokio::task::yield_now;
1061 loop {
1062 let cur_id = self.recent_versions.load().latest_version().id();
1063 if cur_id > old_id {
1064 return cur_id;
1065 }
1066 yield_now().await;
1067 }
1068 }
1069
1070 #[cfg(any(test, feature = "test"))]
1071 pub async fn flush_events_for_test(&self) {
1072 let (tx, rx) = oneshot::channel();
1073 self.hummock_event_sender
1074 .send(HummockEvent::FlushEvent(tx))
1075 .expect("flush event should succeed");
1076 rx.await.expect("flush event receiver dropped");
1077 }
1078}