1use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27 TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::{PkPrefixTableWatermarksIndex, WatermarkSerdeType};
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::dispatch_measurement;
44use crate::error::StorageResult;
45use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
46use crate::hummock::compactor::{
47 CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
48};
49use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
50use crate::hummock::event_handler::{
51 HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
52};
53use crate::hummock::iterator::change_log::ChangeLogIterator;
54use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
55use crate::hummock::local_version::recent_versions::RecentVersions;
56use crate::hummock::observer_manager::HummockObserverNode;
57use crate::hummock::store::vector_writer::HummockVectorWriter;
58use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
59use crate::hummock::utils::{wait_for_epoch, wait_for_update};
60use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
61use crate::hummock::{
62 HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
63 MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
64};
65use crate::mem_table::ImmutableMemtable;
66use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
67use crate::opts::StorageOpts;
68use crate::store::*;
69
70struct HummockStorageShutdownGuard {
71 shutdown_sender: HummockEventSender,
72}
73
74impl Drop for HummockStorageShutdownGuard {
75 fn drop(&mut self) {
76 let _ = self
77 .shutdown_sender
78 .send(HummockEvent::Shutdown)
79 .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
80 }
81}
82
83#[derive(Clone)]
89pub struct HummockStorage {
90 hummock_event_sender: HummockEventSender,
91 _version_update_sender: UnboundedSender<HummockVersionUpdate>,
93
94 context: CompactorContext,
95
96 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
97
98 object_id_manager: ObjectIdManagerRef,
99
100 buffer_tracker: BufferTracker,
101
102 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
103
104 recent_versions: Arc<ArcSwap<RecentVersions>>,
105
106 hummock_version_reader: HummockVersionReader,
107
108 _shutdown_guard: Arc<HummockStorageShutdownGuard>,
109
110 read_version_mapping: ReadOnlyReadVersionMapping,
111
112 backup_reader: BackupReaderRef,
113
114 write_limiter: WriteLimiterRef,
115
116 compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
117
118 hummock_meta_client: Arc<dyn HummockMetaClient>,
119
120 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
121}
122
123pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
124
125pub fn get_committed_read_version_tuple(
126 version: PinnedVersion,
127 table_id: TableId,
128 mut key_range: TableKeyRange,
129 epoch: HummockEpoch,
130) -> (TableKeyRange, ReadVersionTuple) {
131 if let Some(table_watermarks) = version.table_watermarks.get(&table_id)
132 && let WatermarkSerdeType::PkPrefix = table_watermarks.watermark_type
133 {
134 PkPrefixTableWatermarksIndex::new_committed(
135 table_watermarks.clone(),
136 version
137 .state_table_info
138 .info()
139 .get(&table_id)
140 .expect("should exist when having table watermark")
141 .committed_epoch,
142 )
143 .rewrite_range_with_table_watermark(epoch, &mut key_range)
144 }
145 (key_range, (vec![], vec![], version))
146}
147
148impl HummockStorage {
149 #[allow(clippy::too_many_arguments)]
151 pub async fn new(
152 options: Arc<StorageOpts>,
153 sstable_store: SstableStoreRef,
154 hummock_meta_client: Arc<dyn HummockMetaClient>,
155 notification_client: impl NotificationClient,
156 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
157 state_store_metrics: Arc<HummockStateStoreMetrics>,
158 compactor_metrics: Arc<CompactorMetrics>,
159 await_tree_config: Option<await_tree::Config>,
160 ) -> HummockResult<Self> {
161 let object_id_manager = Arc::new(ObjectIdManager::new(
162 hummock_meta_client.clone(),
163 options.sstable_id_remote_fetch_number,
164 ));
165 let backup_reader = BackupReader::new(
166 &options.backup_storage_url,
167 &options.backup_storage_directory,
168 &options.object_store_config,
169 )
170 .await
171 .map_err(HummockError::read_backup_error)?;
172 let write_limiter = Arc::new(WriteLimiter::default());
173 let (version_update_tx, mut version_update_rx) = unbounded_channel();
174
175 let observer_manager = ObserverManager::new(
176 notification_client,
177 HummockObserverNode::new(
178 compaction_catalog_manager_ref.clone(),
179 backup_reader.clone(),
180 version_update_tx.clone(),
181 write_limiter.clone(),
182 ),
183 )
184 .await;
185 observer_manager.start().await;
186
187 let hummock_version = match version_update_rx.recv().await {
188 Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
189 _ => unreachable!(
190 "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
191 ),
192 };
193
194 let (pin_version_tx, pin_version_rx) = unbounded_channel();
195 let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
196 tokio::spawn(start_pinned_version_worker(
197 pin_version_rx,
198 hummock_meta_client.clone(),
199 options.max_version_pinning_duration_sec,
200 ));
201
202 let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
203
204 let compactor_context = CompactorContext::new_local_compact_context(
205 options.clone(),
206 sstable_store.clone(),
207 compactor_metrics.clone(),
208 await_tree_reg.clone(),
209 );
210
211 let hummock_event_handler = HummockEventHandler::new(
212 version_update_rx,
213 pinned_version,
214 compactor_context.clone(),
215 compaction_catalog_manager_ref.clone(),
216 object_id_manager.clone(),
217 state_store_metrics.clone(),
218 );
219
220 let event_tx = hummock_event_handler.event_sender();
221
222 let instance = Self {
223 context: compactor_context,
224 compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
225 object_id_manager,
226 buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
227 version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
228 hummock_event_sender: event_tx.clone(),
229 _version_update_sender: version_update_tx,
230 recent_versions: hummock_event_handler.recent_versions(),
231 hummock_version_reader: HummockVersionReader::new(
232 sstable_store,
233 state_store_metrics.clone(),
234 options.max_preload_io_retry_times,
235 ),
236 _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
237 shutdown_sender: event_tx,
238 }),
239 read_version_mapping: hummock_event_handler.read_version_mapping(),
240 backup_reader,
241 write_limiter,
242 compact_await_tree_reg: await_tree_reg,
243 hummock_meta_client,
244 simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
245 options.time_travel_version_cache_capacity,
246 )),
247 };
248
249 tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
250
251 Ok(instance)
252 }
253}
254
255impl HummockStorageReadSnapshot {
256 async fn get_inner<O>(
264 &self,
265 key: TableKey<Bytes>,
266 read_options: ReadOptions,
267 on_key_value_fn: impl KeyValueFn<O>,
268 ) -> StorageResult<Option<O>> {
269 let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
270
271 let (key_range, read_version_tuple) =
272 self.build_read_version_tuple(self.epoch, key_range).await?;
273
274 if is_empty_key_range(&key_range) {
275 return Ok(None);
276 }
277
278 self.hummock_version_reader
279 .get(
280 key,
281 self.epoch.get_epoch(),
282 self.table_id,
283 read_options,
284 read_version_tuple,
285 on_key_value_fn,
286 )
287 .await
288 }
289
290 async fn iter_inner(
291 &self,
292 key_range: TableKeyRange,
293 read_options: ReadOptions,
294 ) -> StorageResult<HummockStorageIterator> {
295 let (key_range, read_version_tuple) =
296 self.build_read_version_tuple(self.epoch, key_range).await?;
297
298 self.hummock_version_reader
299 .iter(
300 key_range,
301 self.epoch.get_epoch(),
302 self.table_id,
303 read_options,
304 read_version_tuple,
305 )
306 .await
307 }
308
309 async fn rev_iter_inner(
310 &self,
311 key_range: TableKeyRange,
312 read_options: ReadOptions,
313 ) -> StorageResult<HummockStorageRevIterator> {
314 let (key_range, read_version_tuple) =
315 self.build_read_version_tuple(self.epoch, key_range).await?;
316
317 self.hummock_version_reader
318 .rev_iter(
319 key_range,
320 self.epoch.get_epoch(),
321 self.table_id,
322 read_options,
323 read_version_tuple,
324 None,
325 )
326 .await
327 }
328
329 async fn get_time_travel_version(
330 &self,
331 epoch: u64,
332 table_id: TableId,
333 ) -> StorageResult<PinnedVersion> {
334 let meta_client = self.hummock_meta_client.clone();
335 let fetch = async move {
336 let pb_version = meta_client
337 .get_version_by_epoch(epoch, table_id.table_id())
338 .await
339 .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
340 .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
341 let version = HummockVersion::from_rpc_protobuf(&pb_version);
342 let (tx, _rx) = unbounded_channel();
343 Ok(PinnedVersion::new(version, tx))
344 };
345 let version = self
346 .simple_time_travel_version_cache
347 .get_or_insert(table_id.table_id, epoch, fetch)
348 .await?;
349 Ok(version)
350 }
351
352 async fn build_read_version_tuple(
353 &self,
354 epoch: HummockReadEpoch,
355 key_range: TableKeyRange,
356 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
357 match epoch {
358 HummockReadEpoch::Backup(epoch) => {
359 self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
360 .await
361 }
362 HummockReadEpoch::Committed(epoch)
363 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
364 | HummockReadEpoch::TimeTravel(epoch) => {
365 self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
366 .await
367 }
368 HummockReadEpoch::NoWait(epoch) => {
369 self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
370 }
371 }
372 }
373
374 async fn build_read_version_tuple_from_backup(
375 &self,
376 epoch: u64,
377 table_id: TableId,
378 key_range: TableKeyRange,
379 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
380 match self
381 .backup_reader
382 .try_get_hummock_version(table_id, epoch)
383 .await
384 {
385 Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
386 backup_version,
387 table_id,
388 key_range,
389 epoch,
390 )),
391 Ok(None) => Err(HummockError::read_backup_error(format!(
392 "backup include epoch {} not found",
393 epoch
394 ))
395 .into()),
396 Err(e) => Err(e),
397 }
398 }
399
400 async fn get_epoch_hummock_version(
401 &self,
402 epoch: u64,
403 table_id: TableId,
404 ) -> StorageResult<PinnedVersion> {
405 match self
406 .recent_versions
407 .load()
408 .get_safe_version(table_id, epoch)
409 {
410 Some(version) => Ok(version),
411 None => self.get_time_travel_version(epoch, table_id).await,
412 }
413 }
414
415 async fn build_read_version_tuple_from_committed(
416 &self,
417 epoch: u64,
418 table_id: TableId,
419 key_range: TableKeyRange,
420 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
421 let version = self.get_epoch_hummock_version(epoch, table_id).await?;
422 Ok(get_committed_read_version_tuple(
423 version, table_id, key_range, epoch,
424 ))
425 }
426
427 fn build_read_version_tuple_from_all(
428 &self,
429 epoch: u64,
430 table_id: TableId,
431 key_range: TableKeyRange,
432 ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
433 let pinned_version = self.recent_versions.load().latest_version().clone();
434 let info = pinned_version.state_table_info.info().get(&table_id);
435
436 let ret = if let Some(info) = info
438 && epoch <= info.committed_epoch
439 {
440 if epoch < info.committed_epoch {
441 return Err(
442 HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
443 );
444 }
445 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
447 } else {
448 let vnode = vnode(&key_range);
449 let mut matched_replicated_read_version_cnt = 0;
450 let read_version_vec = {
451 let read_guard = self.read_version_mapping.read();
452 read_guard
453 .get(&table_id)
454 .map(|v| {
455 v.values()
456 .filter(|v| {
457 let read_version = v.read();
458 if read_version.contains(vnode) {
459 if read_version.is_replicated() {
460 matched_replicated_read_version_cnt += 1;
461 false
462 } else {
463 true
465 }
466 } else {
467 false
468 }
469 })
470 .cloned()
471 .collect_vec()
472 })
473 .unwrap_or_default()
474 };
475
476 if read_version_vec.is_empty() {
479 let table_committed_epoch = info.map(|info| info.committed_epoch);
480 if matched_replicated_read_version_cnt > 0 {
481 tracing::warn!(
482 "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
483 table_id,
484 vnode.to_index(),
485 epoch,
486 matched_replicated_read_version_cnt,
487 table_committed_epoch,
488 );
489 } else {
490 tracing::debug!(
491 "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
492 table_id,
493 vnode.to_index(),
494 epoch,
495 table_committed_epoch
496 );
497 }
498 get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
499 } else {
500 if read_version_vec.len() != 1 {
501 let read_version_vnodes = read_version_vec
502 .into_iter()
503 .map(|v| {
504 let v = v.read();
505 v.vnodes().iter_ones().collect_vec()
506 })
507 .collect_vec();
508 return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
509 }
510 read_filter_for_version(
511 epoch,
512 table_id,
513 key_range,
514 read_version_vec.first().unwrap(),
515 )?
516 }
517 };
518
519 Ok(ret)
520 }
521}
522
523impl HummockStorage {
524 async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
525 let (tx, rx) = tokio::sync::oneshot::channel();
526 self.hummock_event_sender
527 .send(HummockEvent::RegisterReadVersion {
528 table_id: option.table_id,
529 new_read_version_sender: tx,
530 is_replicated: option.is_replicated,
531 vnodes: option.vnodes.clone(),
532 })
533 .unwrap();
534
535 let (basic_read_version, instance_guard) = rx.await.unwrap();
536 let version_update_notifier_tx = self.version_update_notifier_tx.clone();
537 LocalHummockStorage::new(
538 instance_guard,
539 basic_read_version,
540 self.hummock_version_reader.clone(),
541 self.hummock_event_sender.clone(),
542 self.buffer_tracker.get_memory_limiter().clone(),
543 self.write_limiter.clone(),
544 option,
545 version_update_notifier_tx,
546 self.context.storage_opts.mem_table_spill_threshold,
547 )
548 }
549
550 pub async fn clear_shared_buffer(&self) {
551 let (tx, rx) = oneshot::channel();
552 self.hummock_event_sender
553 .send(HummockEvent::Clear(tx, None))
554 .expect("should send success");
555 rx.await.expect("should wait success");
556 }
557
558 pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
559 if !table_ids.is_empty() {
560 let (tx, rx) = oneshot::channel();
561 self.hummock_event_sender
562 .send(HummockEvent::Clear(tx, Some(table_ids)))
563 .expect("should send success");
564 rx.await.expect("should wait success");
565 }
566 }
567
568 pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
572 let _ = self
573 .hummock_event_sender
574 .send(HummockEvent::StartEpoch { epoch, table_ids });
575 }
576
577 pub fn sstable_store(&self) -> SstableStoreRef {
578 self.context.sstable_store.clone()
579 }
580
581 pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
582 &self.object_id_manager
583 }
584
585 pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
586 self.compaction_catalog_manager_ref.clone()
587 }
588
589 pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
590 self.buffer_tracker.get_memory_limiter().clone()
591 }
592
593 pub fn get_pinned_version(&self) -> PinnedVersion {
594 self.recent_versions.load().latest_version().clone()
595 }
596
597 pub fn backup_reader(&self) -> BackupReaderRef {
598 self.backup_reader.clone()
599 }
600
601 pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
602 self.compact_await_tree_reg.as_ref()
603 }
604
605 pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
606 let (tx, rx) = oneshot::channel();
607 self.hummock_event_sender
608 .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
609 .expect("should send success");
610 rx.await.expect("should await success")
611 }
612
613 pub async fn sync(
614 &self,
615 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
616 ) -> StorageResult<SyncResult> {
617 let (tx, rx) = oneshot::channel();
618 let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
619 sync_result_sender: tx,
620 sync_table_epochs,
621 });
622 let synced_data = rx
623 .await
624 .map_err(|_| HummockError::other("failed to receive sync result"))??;
625 Ok(synced_data.into_sync_result())
626 }
627}
628
629#[derive(Clone)]
630pub struct HummockStorageReadSnapshot {
631 epoch: HummockReadEpoch,
632 table_id: TableId,
633 recent_versions: Arc<ArcSwap<RecentVersions>>,
634 hummock_version_reader: HummockVersionReader,
635 read_version_mapping: ReadOnlyReadVersionMapping,
636 backup_reader: BackupReaderRef,
637 hummock_meta_client: Arc<dyn HummockMetaClient>,
638 simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
639}
640
641impl StateStoreGet for HummockStorageReadSnapshot {
642 fn on_key_value<O: Send + 'static>(
643 &self,
644 key: TableKey<Bytes>,
645 read_options: ReadOptions,
646 on_key_value_fn: impl KeyValueFn<O>,
647 ) -> impl StorageFuture<'_, Option<O>> {
648 self.get_inner(key, read_options, on_key_value_fn)
649 }
650}
651
652impl StateStoreRead for HummockStorageReadSnapshot {
653 type Iter = HummockStorageIterator;
654 type RevIter = HummockStorageRevIterator;
655
656 fn iter(
657 &self,
658 key_range: TableKeyRange,
659 read_options: ReadOptions,
660 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
661 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
662 assert_eq!(
663 r_vnode_exclusive - l_vnode_inclusive,
664 1,
665 "read range {:?} for table {} iter contains more than one vnode",
666 key_range,
667 self.table_id
668 );
669 self.iter_inner(key_range, read_options)
670 }
671
672 fn rev_iter(
673 &self,
674 key_range: TableKeyRange,
675 read_options: ReadOptions,
676 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
677 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
678 assert_eq!(
679 r_vnode_exclusive - l_vnode_inclusive,
680 1,
681 "read range {:?} for table {} iter contains more than one vnode",
682 key_range,
683 self.table_id
684 );
685 self.rev_iter_inner(key_range, read_options)
686 }
687}
688
689impl StateStoreReadVector for HummockStorageReadSnapshot {
690 async fn nearest<O: Send + 'static>(
691 &self,
692 vec: Vector,
693 options: VectorNearestOptions,
694 on_nearest_item_fn: impl OnNearestItemFn<O>,
695 ) -> StorageResult<Vec<O>> {
696 let version = match self.epoch {
697 HummockReadEpoch::Committed(epoch)
698 | HummockReadEpoch::BatchQueryCommitted(epoch, _)
699 | HummockReadEpoch::TimeTravel(epoch) => {
700 self.get_epoch_hummock_version(epoch, self.table_id).await?
701 }
702 HummockReadEpoch::Backup(epoch) => self
703 .backup_reader
704 .try_get_hummock_version(self.table_id, epoch)
705 .await?
706 .ok_or_else(|| {
707 HummockError::read_backup_error(format!(
708 "backup include epoch {} not found",
709 epoch
710 ))
711 })?,
712 HummockReadEpoch::NoWait(_) => {
713 return Err(
714 HummockError::other("nearest query does not support NoWait epoch").into(),
715 );
716 }
717 };
718 dispatch_measurement!(options.measure, MeasurementType, {
719 Ok(self
720 .hummock_version_reader
721 .nearest::<MeasurementType, O>(
722 version,
723 self.table_id,
724 vec,
725 options,
726 on_nearest_item_fn,
727 )
728 .await?)
729 })
730 }
731}
732
733impl StateStoreReadLog for HummockStorage {
734 type ChangeLogIter = ChangeLogIterator;
735
736 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
737 fn next_epoch(
738 version: &LocalHummockVersion,
739 epoch: u64,
740 table_id: TableId,
741 ) -> HummockResult<Option<u64>> {
742 let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
743 HummockError::next_epoch(format!("table {} has been dropped", table_id))
744 })?;
745 table_change_log.next_epoch(epoch).map_err(|_| {
746 HummockError::next_epoch(format!(
747 "invalid epoch {}, change log epoch: {:?}",
748 epoch,
749 table_change_log.epochs().collect_vec()
750 ))
751 })
752 }
753 {
754 let recent_versions = self.recent_versions.load();
756 if let Some(next_epoch) =
757 next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
758 {
759 return Ok(next_epoch);
760 }
761 }
762 let mut next_epoch_ret = None;
763 wait_for_update(
764 &self.version_update_notifier_tx,
765 |version| {
766 if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
767 next_epoch_ret = Some(next_epoch);
768 Ok(true)
769 } else {
770 Ok(false)
771 }
772 },
773 || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
774 )
775 .await?;
776 Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
777 }
778
779 async fn iter_log(
780 &self,
781 epoch_range: (u64, u64),
782 key_range: TableKeyRange,
783 options: ReadLogOptions,
784 ) -> StorageResult<Self::ChangeLogIter> {
785 let version = self.recent_versions.load().latest_version().clone();
786 let iter = self
787 .hummock_version_reader
788 .iter_log(version, epoch_range, key_range, options)
789 .await?;
790 Ok(iter)
791 }
792}
793
794impl HummockStorage {
795 async fn try_wait_epoch_impl(
798 &self,
799 wait_epoch: HummockReadEpoch,
800 table_id: TableId,
801 ) -> StorageResult<()> {
802 match wait_epoch {
803 HummockReadEpoch::Committed(wait_epoch) => {
804 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
805 wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
806 }
807 HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
808 assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
809 {
811 let recent_versions = self.recent_versions.load();
812 let latest_version = recent_versions.latest_version();
813 if latest_version.id >= wait_version_id
814 && let Some(committed_epoch) =
815 latest_version.table_committed_epoch(table_id)
816 && committed_epoch >= wait_epoch
817 {
818 return Ok(());
819 }
820 }
821 wait_for_update(
822 &self.version_update_notifier_tx,
823 |version| {
824 if wait_version_id > version.id() {
825 return Ok(false);
826 }
827 let committed_epoch =
828 version.table_committed_epoch(table_id).ok_or_else(|| {
829 HummockError::wait_epoch(format!(
833 "table id {} has been dropped",
834 table_id
835 ))
836 })?;
837 Ok(committed_epoch >= wait_epoch)
838 },
839 || {
840 format!(
841 "try_wait_epoch: epoch: {}, version_id: {:?}",
842 wait_epoch, wait_version_id
843 )
844 },
845 )
846 .await?;
847 }
848 _ => {}
849 };
850 Ok(())
851 }
852}
853
854impl StateStore for HummockStorage {
855 type Local = LocalHummockStorage;
856 type ReadSnapshot = HummockStorageReadSnapshot;
857 type VectorWriter = HummockVectorWriter;
858
859 async fn try_wait_epoch(
862 &self,
863 wait_epoch: HummockReadEpoch,
864 options: TryWaitEpochOptions,
865 ) -> StorageResult<()> {
866 self.try_wait_epoch_impl(wait_epoch, options.table_id).await
867 }
868
869 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
870 self.new_local_inner(option)
871 }
872
873 async fn new_read_snapshot(
874 &self,
875 epoch: HummockReadEpoch,
876 options: NewReadSnapshotOptions,
877 ) -> StorageResult<Self::ReadSnapshot> {
878 self.try_wait_epoch_impl(epoch, options.table_id).await?;
879 Ok(HummockStorageReadSnapshot {
880 epoch,
881 table_id: options.table_id,
882 recent_versions: self.recent_versions.clone(),
883 hummock_version_reader: self.hummock_version_reader.clone(),
884 read_version_mapping: self.read_version_mapping.clone(),
885 backup_reader: self.backup_reader.clone(),
886 hummock_meta_client: self.hummock_meta_client.clone(),
887 simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
888 })
889 }
890
891 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
892 HummockVectorWriter::new(
893 options.table_id,
894 self.version_update_notifier_tx.clone(),
895 self.context.sstable_store.clone(),
896 self.object_id_manager.clone(),
897 self.hummock_event_sender.clone(),
898 self.context.storage_opts.clone(),
899 )
900 }
901}
902
903#[cfg(any(test, feature = "test"))]
904impl HummockStorage {
905 pub async fn seal_and_sync_epoch(
906 &self,
907 epoch: u64,
908 table_ids: HashSet<TableId>,
909 ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
910 self.sync(vec![(epoch, table_ids)]).await
911 }
912
913 pub async fn update_version_and_wait(&self, version: HummockVersion) {
915 use tokio::task::yield_now;
916 let version_id = version.id;
917 self._version_update_sender
918 .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
919 .unwrap();
920 loop {
921 if self.recent_versions.load().latest_version().id() >= version_id {
922 break;
923 }
924
925 yield_now().await
926 }
927 }
928
929 pub async fn wait_version(&self, version: HummockVersion) {
930 use tokio::task::yield_now;
931 loop {
932 if self.recent_versions.load().latest_version().id() >= version.id {
933 break;
934 }
935
936 yield_now().await
937 }
938 }
939
940 pub fn get_shared_buffer_size(&self) -> usize {
941 self.buffer_tracker.get_buffer_size()
942 }
943
944 pub async fn for_test(
946 options: Arc<StorageOpts>,
947 sstable_store: SstableStoreRef,
948 hummock_meta_client: Arc<dyn HummockMetaClient>,
949 notification_client: impl NotificationClient,
950 ) -> HummockResult<Self> {
951 let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
952 FakeRemoteTableAccessor {},
953 )));
954
955 Self::new(
956 options,
957 sstable_store,
958 hummock_meta_client,
959 notification_client,
960 compaction_catalog_manager,
961 Arc::new(HummockStateStoreMetrics::unused()),
962 Arc::new(CompactorMetrics::unused()),
963 None,
964 )
965 .await
966 }
967
968 pub fn storage_opts(&self) -> &Arc<StorageOpts> {
969 &self.context.storage_opts
970 }
971
972 pub fn version_reader(&self) -> &HummockVersionReader {
973 &self.hummock_version_reader
974 }
975
976 pub async fn wait_version_update(
977 &self,
978 old_id: risingwave_hummock_sdk::HummockVersionId,
979 ) -> risingwave_hummock_sdk::HummockVersionId {
980 use tokio::task::yield_now;
981 loop {
982 let cur_id = self.recent_versions.load().latest_version().id();
983 if cur_id > old_id {
984 return cur_id;
985 }
986 yield_now().await;
987 }
988 }
989}