1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::TableId;
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::key::{
29 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
30};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_watermark::{PkPrefixTableWatermarksIndex, WatermarkSerdeType};
33use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
34use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
35use risingwave_rpc_client::HummockMetaClient;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
38use tokio::sync::oneshot;
39
40use super::local_hummock_storage::LocalHummockStorage;
41use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43#[cfg(any(test, feature = "test"))]
44use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
45use crate::error::StorageResult;
46use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
47use crate::hummock::compactor::{
48 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
49};
50use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
51use crate::hummock::event_handler::{
52 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
53};
54use crate::hummock::iterator::change_log::ChangeLogIterator;
55use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
56use crate::hummock::local_version::recent_versions::RecentVersions;
57use crate::hummock::observer_manager::HummockObserverNode;
58use crate::hummock::store::vector_writer::HummockVectorWriter;
59use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
60use crate::hummock::utils::{wait_for_epoch, wait_for_update};
61use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
62use crate::hummock::{
63 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
64 MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
65};
66use crate::mem_table::ImmutableMemtable;
67use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
68use crate::opts::StorageOpts;
69use crate::store::*;
70
71struct HummockStorageShutdownGuard {
72 shutdown_sender: HummockEventSender,
73}
74
75impl Drop for HummockStorageShutdownGuard {
76 fn drop(&mut self) {
77 let _ = self
78 .shutdown_sender
79 .send(HummockEvent::Shutdown)
80 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
81 }
82}
83
84#[derive(Clone)]
90pub struct HummockStorage {
91 hummock_event_sender: HummockEventSender,
92 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
94
95 context: CompactorContext,
96
97 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
98
99 object_id_manager: ObjectIdManagerRef,
100
101 buffer_tracker: BufferTracker,
102
103 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
104
105 recent_versions: Arc<ArcSwap<RecentVersions>>,
106
107 hummock_version_reader: HummockVersionReader,
108
109 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
110
111 read_version_mapping: ReadOnlyReadVersionMapping,
112
113 backup_reader: BackupReaderRef,
114
115 write_limiter: WriteLimiterRef,
116
117 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
118
119 hummock_meta_client: Arc<dyn HummockMetaClient>,
120
121 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
122}
123
124pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
125
126pub fn get_committed_read_version_tuple(
127 version: PinnedVersion,
128 table_id: TableId,
129 mut key_range: TableKeyRange,
130 epoch: HummockEpoch,
131) -> (TableKeyRange, ReadVersionTuple) {
132 if let Some(table_watermarks) = version.table_watermarks.get(&table_id)
133 && let WatermarkSerdeType::PkPrefix = table_watermarks.watermark_type
134 {
135 PkPrefixTableWatermarksIndex::new_committed(
136 table_watermarks.clone(),
137 version
138 .state_table_info
139 .info()
140 .get(&table_id)
141 .expect("should exist when having table watermark")
142 .committed_epoch,
143 )
144 .rewrite_range_with_table_watermark(epoch, &mut key_range)
145 }
146 (key_range, (vec![], vec![], version))
147}
148
149impl HummockStorage {
150 #[allow(clippy::too_many_arguments)]
152 pub async fn new(
153 options: Arc<StorageOpts>,
154 sstable_store: SstableStoreRef,
155 hummock_meta_client: Arc<dyn HummockMetaClient>,
156 notification_client: impl NotificationClient,
157 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
158 state_store_metrics: Arc<HummockStateStoreMetrics>,
159 compactor_metrics: Arc<CompactorMetrics>,
160 await_tree_config: Option<await_tree::Config>,
161 ) -> HummockResult<Self> {
162 let object_id_manager = Arc::new(ObjectIdManager::new(
163 hummock_meta_client.clone(),
164 options.sstable_id_remote_fetch_number,
165 ));
166 let backup_reader = BackupReader::new(
167 &options.backup_storage_url,
168 &options.backup_storage_directory,
169 &options.object_store_config,
170 )
171 .await
172 .map_err(HummockError::read_backup_error)?;
173 let write_limiter = Arc::new(WriteLimiter::default());
174 let (version_update_tx, mut version_update_rx) = unbounded_channel();
175
176 let observer_manager = ObserverManager::new(
177 notification_client,
178 HummockObserverNode::new(
179 compaction_catalog_manager_ref.clone(),
180 backup_reader.clone(),
181 version_update_tx.clone(),
182 write_limiter.clone(),
183 ),
184 )
185 .await;
186 observer_manager.start().await;
187
188 let hummock_version = match version_update_rx.recv().await {
189 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
190 _ => unreachable!(
191 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
192 ),
193 };
194
195 let (pin_version_tx, pin_version_rx) = unbounded_channel();
196 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
197 tokio::spawn(start_pinned_version_worker(
198 pin_version_rx,
199 hummock_meta_client.clone(),
200 options.max_version_pinning_duration_sec,
201 ));
202
203 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
204
205 let compactor_context = CompactorContext::new_local_compact_context(
206 options.clone(),
207 sstable_store.clone(),
208 compactor_metrics.clone(),
209 await_tree_reg.clone(),
210 );
211
212 let hummock_event_handler = HummockEventHandler::new(
213 version_update_rx,
214 pinned_version,
215 compactor_context.clone(),
216 compaction_catalog_manager_ref.clone(),
217 object_id_manager.clone(),
218 state_store_metrics.clone(),
219 );
220
221 let event_tx = hummock_event_handler.event_sender();
222
223 let instance = Self {
224 context: compactor_context,
225 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
226 object_id_manager,
227 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
228 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
229 hummock_event_sender: event_tx.clone(),
230 _version_update_sender: version_update_tx,
231 recent_versions: hummock_event_handler.recent_versions(),
232 hummock_version_reader: HummockVersionReader::new(
233 sstable_store,
234 state_store_metrics.clone(),
235 options.max_preload_io_retry_times,
236 ),
237 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
238 shutdown_sender: event_tx,
239 }),
240 read_version_mapping: hummock_event_handler.read_version_mapping(),
241 backup_reader,
242 write_limiter,
243 compact_await_tree_reg: await_tree_reg,
244 hummock_meta_client,
245 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
246 options.time_travel_version_cache_capacity,
247 )),
248 };
249
250 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
251
252 Ok(instance)
253 }
254}
255
256impl HummockStorageReadSnapshot {
257 async fn get_inner<'a, O>(
265 &'a self,
266 key: TableKey<Bytes>,
267 read_options: ReadOptions,
268 on_key_value_fn: impl KeyValueFn<'a, O>,
269 ) -> StorageResult<Option<O>> {
270 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
271
272 let (key_range, read_version_tuple) =
273 self.build_read_version_tuple(self.epoch, key_range).await?;
274
275 if is_empty_key_range(&key_range) {
276 return Ok(None);
277 }
278
279 self.hummock_version_reader
280 .get(
281 key,
282 self.epoch.get_epoch(),
283 self.table_id,
284 read_options,
285 read_version_tuple,
286 on_key_value_fn,
287 )
288 .await
289 }
290
291 async fn iter_inner(
292 &self,
293 key_range: TableKeyRange,
294 read_options: ReadOptions,
295 ) -> StorageResult<HummockStorageIterator> {
296 let (key_range, read_version_tuple) =
297 self.build_read_version_tuple(self.epoch, key_range).await?;
298
299 self.hummock_version_reader
300 .iter(
301 key_range,
302 self.epoch.get_epoch(),
303 self.table_id,
304 read_options,
305 read_version_tuple,
306 )
307 .await
308 }
309
310 async fn rev_iter_inner(
311 &self,
312 key_range: TableKeyRange,
313 read_options: ReadOptions,
314 ) -> StorageResult<HummockStorageRevIterator> {
315 let (key_range, read_version_tuple) =
316 self.build_read_version_tuple(self.epoch, key_range).await?;
317
318 self.hummock_version_reader
319 .rev_iter(
320 key_range,
321 self.epoch.get_epoch(),
322 self.table_id,
323 read_options,
324 read_version_tuple,
325 None,
326 )
327 .await
328 }
329
330 async fn get_time_travel_version(
331 &self,
332 epoch: u64,
333 table_id: TableId,
334 ) -> StorageResult<PinnedVersion> {
335 let meta_client = self.hummock_meta_client.clone();
336 let fetch = async move {
337 let pb_version = meta_client
338 .get_version_by_epoch(epoch, table_id.table_id())
339 .await
340 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
341 .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
342 let version = HummockVersion::from_rpc_protobuf(&pb_version);
343 let (tx, _rx) = unbounded_channel();
344 Ok(PinnedVersion::new(version, tx))
345 };
346 let version = self
347 .simple_time_travel_version_cache
348 .get_or_insert(table_id.table_id, epoch, fetch)
349 .await?;
350 Ok(version)
351 }
352
353 async fn build_read_version_tuple(
354 &self,
355 epoch: HummockReadEpoch,
356 key_range: TableKeyRange,
357 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
358 match epoch {
359 HummockReadEpoch::Backup(epoch) => {
360 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
361 .await
362 }
363 HummockReadEpoch::Committed(epoch)
364 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
365 | HummockReadEpoch::TimeTravel(epoch) => {
366 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
367 .await
368 }
369 HummockReadEpoch::NoWait(epoch) => {
370 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
371 }
372 }
373 }
374
375 async fn build_read_version_tuple_from_backup(
376 &self,
377 epoch: u64,
378 table_id: TableId,
379 key_range: TableKeyRange,
380 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
381 match self
382 .backup_reader
383 .try_get_hummock_version(table_id, epoch)
384 .await
385 {
386 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
387 backup_version,
388 table_id,
389 key_range,
390 epoch,
391 )),
392 Ok(None) => Err(HummockError::read_backup_error(format!(
393 "backup include epoch {} not found",
394 epoch
395 ))
396 .into()),
397 Err(e) => Err(e),
398 }
399 }
400
401 async fn get_epoch_hummock_version(
402 &self,
403 epoch: u64,
404 table_id: TableId,
405 ) -> StorageResult<PinnedVersion> {
406 match self
407 .recent_versions
408 .load()
409 .get_safe_version(table_id, epoch)
410 {
411 Some(version) => Ok(version),
412 None => self.get_time_travel_version(epoch, table_id).await,
413 }
414 }
415
416 async fn build_read_version_tuple_from_committed(
417 &self,
418 epoch: u64,
419 table_id: TableId,
420 key_range: TableKeyRange,
421 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
422 let version = self.get_epoch_hummock_version(epoch, table_id).await?;
423 Ok(get_committed_read_version_tuple(
424 version, table_id, key_range, epoch,
425 ))
426 }
427
428 fn build_read_version_tuple_from_all(
429 &self,
430 epoch: u64,
431 table_id: TableId,
432 key_range: TableKeyRange,
433 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
434 let pinned_version = self.recent_versions.load().latest_version().clone();
435 let info = pinned_version.state_table_info.info().get(&table_id);
436
437 let ret = if let Some(info) = info
439 && epoch <= info.committed_epoch
440 {
441 if epoch < info.committed_epoch {
442 return Err(
443 HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
444 );
445 }
446 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
448 } else {
449 let vnode = vnode(&key_range);
450 let mut matched_replicated_read_version_cnt = 0;
451 let read_version_vec = {
452 let read_guard = self.read_version_mapping.read();
453 read_guard
454 .get(&table_id)
455 .map(|v| {
456 v.values()
457 .filter(|v| {
458 let read_version = v.read();
459 if read_version.contains(vnode) {
460 if read_version.is_replicated() {
461 matched_replicated_read_version_cnt += 1;
462 false
463 } else {
464 true
466 }
467 } else {
468 false
469 }
470 })
471 .cloned()
472 .collect_vec()
473 })
474 .unwrap_or_default()
475 };
476
477 if read_version_vec.is_empty() {
480 let table_committed_epoch = info.map(|info| info.committed_epoch);
481 if matched_replicated_read_version_cnt > 0 {
482 tracing::warn!(
483 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
484 table_id,
485 vnode.to_index(),
486 epoch,
487 matched_replicated_read_version_cnt,
488 table_committed_epoch,
489 );
490 } else {
491 tracing::debug!(
492 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
493 table_id,
494 vnode.to_index(),
495 epoch,
496 table_committed_epoch
497 );
498 }
499 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
500 } else {
501 if read_version_vec.len() != 1 {
502 let read_version_vnodes = read_version_vec
503 .into_iter()
504 .map(|v| {
505 let v = v.read();
506 v.vnodes().iter_ones().collect_vec()
507 })
508 .collect_vec();
509 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
510 }
511 read_filter_for_version(
512 epoch,
513 table_id,
514 key_range,
515 read_version_vec.first().unwrap(),
516 )?
517 }
518 };
519
520 Ok(ret)
521 }
522}
523
524impl HummockStorage {
525 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
526 let (tx, rx) = tokio::sync::oneshot::channel();
527 self.hummock_event_sender
528 .send(HummockEvent::RegisterReadVersion {
529 table_id: option.table_id,
530 new_read_version_sender: tx,
531 is_replicated: option.is_replicated,
532 vnodes: option.vnodes.clone(),
533 })
534 .unwrap();
535
536 let (basic_read_version, instance_guard) = rx.await.unwrap();
537 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
538 LocalHummockStorage::new(
539 instance_guard,
540 basic_read_version,
541 self.hummock_version_reader.clone(),
542 self.hummock_event_sender.clone(),
543 self.buffer_tracker.get_memory_limiter().clone(),
544 self.write_limiter.clone(),
545 option,
546 version_update_notifier_tx,
547 self.context.storage_opts.mem_table_spill_threshold,
548 )
549 }
550
551 pub async fn clear_shared_buffer(&self) {
552 let (tx, rx) = oneshot::channel();
553 self.hummock_event_sender
554 .send(HummockEvent::Clear(tx, None))
555 .expect("should send success");
556 rx.await.expect("should wait success");
557 }
558
559 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
560 if !table_ids.is_empty() {
561 let (tx, rx) = oneshot::channel();
562 self.hummock_event_sender
563 .send(HummockEvent::Clear(tx, Some(table_ids)))
564 .expect("should send success");
565 rx.await.expect("should wait success");
566 }
567 }
568
569 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
573 let _ = self
574 .hummock_event_sender
575 .send(HummockEvent::StartEpoch { epoch, table_ids });
576 }
577
578 pub fn sstable_store(&self) -> SstableStoreRef {
579 self.context.sstable_store.clone()
580 }
581
582 pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
583 &self.object_id_manager
584 }
585
586 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
587 self.compaction_catalog_manager_ref.clone()
588 }
589
590 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
591 self.buffer_tracker.get_memory_limiter().clone()
592 }
593
594 pub fn get_pinned_version(&self) -> PinnedVersion {
595 self.recent_versions.load().latest_version().clone()
596 }
597
598 pub fn backup_reader(&self) -> BackupReaderRef {
599 self.backup_reader.clone()
600 }
601
602 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
603 self.compact_await_tree_reg.as_ref()
604 }
605
606 pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
607 let (tx, rx) = oneshot::channel();
608 self.hummock_event_sender
609 .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
610 .expect("should send success");
611 rx.await.expect("should await success")
612 }
613
614 pub async fn sync(
615 &self,
616 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
617 ) -> StorageResult<SyncResult> {
618 let (tx, rx) = oneshot::channel();
619 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
620 sync_result_sender: tx,
621 sync_table_epochs,
622 });
623 let synced_data = rx
624 .await
625 .map_err(|_| HummockError::other("failed to receive sync result"))??;
626 Ok(synced_data.into_sync_result())
627 }
628}
629
630#[derive(Clone)]
631pub struct HummockStorageReadSnapshot {
632 epoch: HummockReadEpoch,
633 table_id: TableId,
634 recent_versions: Arc<ArcSwap<RecentVersions>>,
635 hummock_version_reader: HummockVersionReader,
636 read_version_mapping: ReadOnlyReadVersionMapping,
637 backup_reader: BackupReaderRef,
638 hummock_meta_client: Arc<dyn HummockMetaClient>,
639 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
640}
641
642impl StateStoreGet for HummockStorageReadSnapshot {
643 fn on_key_value<'a, O: Send + 'a>(
644 &'a self,
645 key: TableKey<Bytes>,
646 read_options: ReadOptions,
647 on_key_value_fn: impl KeyValueFn<'a, O>,
648 ) -> impl StorageFuture<'a, Option<O>> {
649 self.get_inner(key, read_options, on_key_value_fn)
650 }
651}
652
653impl StateStoreRead for HummockStorageReadSnapshot {
654 type Iter = HummockStorageIterator;
655 type RevIter = HummockStorageRevIterator;
656
657 fn iter(
658 &self,
659 key_range: TableKeyRange,
660 read_options: ReadOptions,
661 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
662 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
663 assert_eq!(
664 r_vnode_exclusive - l_vnode_inclusive,
665 1,
666 "read range {:?} for table {} iter contains more than one vnode",
667 key_range,
668 self.table_id
669 );
670 self.iter_inner(key_range, read_options)
671 }
672
673 fn rev_iter(
674 &self,
675 key_range: TableKeyRange,
676 read_options: ReadOptions,
677 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
678 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
679 assert_eq!(
680 r_vnode_exclusive - l_vnode_inclusive,
681 1,
682 "read range {:?} for table {} iter contains more than one vnode",
683 key_range,
684 self.table_id
685 );
686 self.rev_iter_inner(key_range, read_options)
687 }
688}
689
690impl StateStoreReadVector for HummockStorageReadSnapshot {
691 async fn nearest<'a, O: Send + 'a>(
692 &'a self,
693 vec: VectorRef<'a>,
694 options: VectorNearestOptions,
695 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
696 ) -> StorageResult<Vec<O>> {
697 let version = match self.epoch {
698 HummockReadEpoch::Committed(epoch)
699 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
700 | HummockReadEpoch::TimeTravel(epoch) => {
701 self.get_epoch_hummock_version(epoch, self.table_id).await?
702 }
703 HummockReadEpoch::Backup(epoch) => self
704 .backup_reader
705 .try_get_hummock_version(self.table_id, epoch)
706 .await?
707 .ok_or_else(|| {
708 HummockError::read_backup_error(format!(
709 "backup include epoch {} not found",
710 epoch
711 ))
712 })?,
713 HummockReadEpoch::NoWait(_) => {
714 return Err(
715 HummockError::other("nearest query does not support NoWait epoch").into(),
716 );
717 }
718 };
719 dispatch_distance_measurement!(options.measure, MeasurementType, {
720 Ok(self
721 .hummock_version_reader
722 .nearest::<MeasurementType, O>(
723 version,
724 self.table_id,
725 vec,
726 options,
727 on_nearest_item_fn,
728 )
729 .await?)
730 })
731 }
732}
733
734impl StateStoreReadLog for HummockStorage {
735 type ChangeLogIter = ChangeLogIterator;
736
737 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
738 fn next_epoch(
739 version: &LocalHummockVersion,
740 epoch: u64,
741 table_id: TableId,
742 ) -> HummockResult<Option<u64>> {
743 let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
744 HummockError::next_epoch(format!("table {} has been dropped", table_id))
745 })?;
746 table_change_log.next_epoch(epoch).map_err(|_| {
747 HummockError::next_epoch(format!(
748 "invalid epoch {}, change log epoch: {:?}",
749 epoch,
750 table_change_log.epochs().collect_vec()
751 ))
752 })
753 }
754 {
755 let recent_versions = self.recent_versions.load();
757 if let Some(next_epoch) =
758 next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
759 {
760 return Ok(next_epoch);
761 }
762 }
763 let mut next_epoch_ret = None;
764 wait_for_update(
765 &self.version_update_notifier_tx,
766 |version| {
767 if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
768 next_epoch_ret = Some(next_epoch);
769 Ok(true)
770 } else {
771 Ok(false)
772 }
773 },
774 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
775 )
776 .await?;
777 Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
778 }
779
780 async fn iter_log(
781 &self,
782 epoch_range: (u64, u64),
783 key_range: TableKeyRange,
784 options: ReadLogOptions,
785 ) -> StorageResult<Self::ChangeLogIter> {
786 let version = self.recent_versions.load().latest_version().clone();
787 let iter = self
788 .hummock_version_reader
789 .iter_log(version, epoch_range, key_range, options)
790 .await?;
791 Ok(iter)
792 }
793}
794
795impl HummockStorage {
796 async fn try_wait_epoch_impl(
799 &self,
800 wait_epoch: HummockReadEpoch,
801 table_id: TableId,
802 ) -> StorageResult<()> {
803 tracing::debug!(
804 "try_wait_epoch: epoch: {:?}, table_id: {}",
805 wait_epoch,
806 table_id
807 );
808 match wait_epoch {
809 HummockReadEpoch::Committed(wait_epoch) => {
810 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
811 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
812 }
813 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
814 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
815 {
817 let recent_versions = self.recent_versions.load();
818 let latest_version = recent_versions.latest_version();
819 if latest_version.id >= wait_version_id
820 && let Some(committed_epoch) =
821 latest_version.table_committed_epoch(table_id)
822 && committed_epoch >= wait_epoch
823 {
824 return Ok(());
825 }
826 }
827 wait_for_update(
828 &self.version_update_notifier_tx,
829 |version| {
830 if wait_version_id > version.id() {
831 return Ok(false);
832 }
833 let committed_epoch =
834 version.table_committed_epoch(table_id).ok_or_else(|| {
835 HummockError::wait_epoch(format!(
839 "table id {} has been dropped",
840 table_id
841 ))
842 })?;
843 Ok(committed_epoch >= wait_epoch)
844 },
845 || {
846 format!(
847 "try_wait_epoch: epoch: {}, version_id: {:?}",
848 wait_epoch, wait_version_id
849 )
850 },
851 )
852 .await?;
853 }
854 _ => {}
855 };
856 Ok(())
857 }
858}
859
860impl StateStore for HummockStorage {
861 type Local = LocalHummockStorage;
862 type ReadSnapshot = HummockStorageReadSnapshot;
863 type VectorWriter = HummockVectorWriter;
864
865 async fn try_wait_epoch(
868 &self,
869 wait_epoch: HummockReadEpoch,
870 options: TryWaitEpochOptions,
871 ) -> StorageResult<()> {
872 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
873 }
874
875 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
876 self.new_local_inner(option)
877 }
878
879 async fn new_read_snapshot(
880 &self,
881 epoch: HummockReadEpoch,
882 options: NewReadSnapshotOptions,
883 ) -> StorageResult<Self::ReadSnapshot> {
884 self.try_wait_epoch_impl(epoch, options.table_id).await?;
885 Ok(HummockStorageReadSnapshot {
886 epoch,
887 table_id: options.table_id,
888 recent_versions: self.recent_versions.clone(),
889 hummock_version_reader: self.hummock_version_reader.clone(),
890 read_version_mapping: self.read_version_mapping.clone(),
891 backup_reader: self.backup_reader.clone(),
892 hummock_meta_client: self.hummock_meta_client.clone(),
893 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
894 })
895 }
896
897 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
898 HummockVectorWriter::new(
899 options.table_id,
900 self.version_update_notifier_tx.clone(),
901 self.context.sstable_store.clone(),
902 self.object_id_manager.clone(),
903 self.hummock_event_sender.clone(),
904 self.context.storage_opts.clone(),
905 )
906 }
907}
908
909#[cfg(any(test, feature = "test"))]
910impl HummockStorage {
911 pub async fn seal_and_sync_epoch(
912 &self,
913 epoch: u64,
914 table_ids: HashSet<TableId>,
915 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
916 self.sync(vec![(epoch, table_ids)]).await
917 }
918
919 pub async fn update_version_and_wait(&self, version: HummockVersion) {
921 use tokio::task::yield_now;
922 let version_id = version.id;
923 self._version_update_sender
924 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
925 .unwrap();
926 loop {
927 if self.recent_versions.load().latest_version().id() >= version_id {
928 break;
929 }
930
931 yield_now().await
932 }
933 }
934
935 pub async fn wait_version(&self, version: HummockVersion) {
936 use tokio::task::yield_now;
937 loop {
938 if self.recent_versions.load().latest_version().id() >= version.id {
939 break;
940 }
941
942 yield_now().await
943 }
944 }
945
946 pub fn get_shared_buffer_size(&self) -> usize {
947 self.buffer_tracker.get_buffer_size()
948 }
949
950 pub async fn for_test(
952 options: Arc<StorageOpts>,
953 sstable_store: SstableStoreRef,
954 hummock_meta_client: Arc<dyn HummockMetaClient>,
955 notification_client: impl NotificationClient,
956 ) -> HummockResult<Self> {
957 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
958 FakeRemoteTableAccessor {},
959 )));
960
961 Self::new(
962 options,
963 sstable_store,
964 hummock_meta_client,
965 notification_client,
966 compaction_catalog_manager,
967 Arc::new(HummockStateStoreMetrics::unused()),
968 Arc::new(CompactorMetrics::unused()),
969 None,
970 )
971 .await
972 }
973
974 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
975 &self.context.storage_opts
976 }
977
978 pub fn version_reader(&self) -> &HummockVersionReader {
979 &self.hummock_version_reader
980 }
981
982 pub async fn wait_version_update(
983 &self,
984 old_id: risingwave_hummock_sdk::HummockVersionId,
985 ) -> risingwave_hummock_sdk::HummockVersionId {
986 use tokio::task::yield_now;
987 loop {
988 let cur_id = self.recent_versions.load().latest_version().id();
989 if cur_id > old_id {
990 return cur_id;
991 }
992 yield_now().await;
993 }
994 }
995}