risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockReadEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::error::StorageResult;
44use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
45use crate::hummock::compactor::{
46    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
47};
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
49use crate::hummock::event_handler::{
50    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51};
52use crate::hummock::iterator::change_log::ChangeLogIterator;
53use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::observer_manager::HummockObserverNode;
56use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
57use crate::hummock::utils::{wait_for_epoch, wait_for_update};
58use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
59use crate::hummock::{
60    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
61    MemoryLimiter, SstableObjectIdManager, SstableObjectIdManagerRef, SstableStoreRef,
62};
63use crate::mem_table::ImmutableMemtable;
64use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
65use crate::opts::StorageOpts;
66use crate::store::*;
67
68struct HummockStorageShutdownGuard {
69    shutdown_sender: HummockEventSender,
70}
71
72impl Drop for HummockStorageShutdownGuard {
73    fn drop(&mut self) {
74        let _ = self
75            .shutdown_sender
76            .send(HummockEvent::Shutdown)
77            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
78    }
79}
80
81/// `HummockStorage` is the entry point of the Hummock state store backend.
82/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
83/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
84/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
85/// Hummock is the state store backend.
86#[derive(Clone)]
87pub struct HummockStorage {
88    hummock_event_sender: HummockEventSender,
89    // only used in test for setting hummock version in uploader
90    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
91
92    context: CompactorContext,
93
94    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
95
96    sstable_object_id_manager: SstableObjectIdManagerRef,
97
98    buffer_tracker: BufferTracker,
99
100    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
101
102    recent_versions: Arc<ArcSwap<RecentVersions>>,
103
104    hummock_version_reader: HummockVersionReader,
105
106    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
107
108    read_version_mapping: ReadOnlyReadVersionMapping,
109
110    backup_reader: BackupReaderRef,
111
112    write_limiter: WriteLimiterRef,
113
114    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
115
116    hummock_meta_client: Arc<dyn HummockMetaClient>,
117
118    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
119}
120
121pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
122
123pub fn get_committed_read_version_tuple(
124    version: PinnedVersion,
125    table_id: TableId,
126    mut key_range: TableKeyRange,
127    epoch: HummockEpoch,
128) -> (TableKeyRange, ReadVersionTuple) {
129    if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
130        TableWatermarksIndex::new_committed(
131            table_watermarks.clone(),
132            version
133                .state_table_info
134                .info()
135                .get(&table_id)
136                .expect("should exist when having table watermark")
137                .committed_epoch,
138        )
139        .rewrite_range_with_table_watermark(epoch, &mut key_range)
140    }
141    (key_range, (vec![], vec![], version))
142}
143
144impl HummockStorage {
145    /// Creates a [`HummockStorage`].
146    #[allow(clippy::too_many_arguments)]
147    pub async fn new(
148        options: Arc<StorageOpts>,
149        sstable_store: SstableStoreRef,
150        hummock_meta_client: Arc<dyn HummockMetaClient>,
151        notification_client: impl NotificationClient,
152        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
153        state_store_metrics: Arc<HummockStateStoreMetrics>,
154        compactor_metrics: Arc<CompactorMetrics>,
155        await_tree_config: Option<await_tree::Config>,
156    ) -> HummockResult<Self> {
157        let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
158            hummock_meta_client.clone(),
159            options.sstable_id_remote_fetch_number,
160        ));
161        let backup_reader = BackupReader::new(
162            &options.backup_storage_url,
163            &options.backup_storage_directory,
164            &options.object_store_config,
165        )
166        .await
167        .map_err(HummockError::read_backup_error)?;
168        let write_limiter = Arc::new(WriteLimiter::default());
169        let (version_update_tx, mut version_update_rx) = unbounded_channel();
170
171        let observer_manager = ObserverManager::new(
172            notification_client,
173            HummockObserverNode::new(
174                compaction_catalog_manager_ref.clone(),
175                backup_reader.clone(),
176                version_update_tx.clone(),
177                write_limiter.clone(),
178            ),
179        )
180        .await;
181        observer_manager.start().await;
182
183        let hummock_version = match version_update_rx.recv().await {
184            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
185            _ => unreachable!(
186                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
187            ),
188        };
189
190        let (pin_version_tx, pin_version_rx) = unbounded_channel();
191        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
192        tokio::spawn(start_pinned_version_worker(
193            pin_version_rx,
194            hummock_meta_client.clone(),
195            options.max_version_pinning_duration_sec,
196        ));
197
198        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
199
200        let compactor_context = CompactorContext::new_local_compact_context(
201            options.clone(),
202            sstable_store.clone(),
203            compactor_metrics.clone(),
204            await_tree_reg.clone(),
205        );
206
207        let hummock_event_handler = HummockEventHandler::new(
208            version_update_rx,
209            pinned_version,
210            compactor_context.clone(),
211            compaction_catalog_manager_ref.clone(),
212            sstable_object_id_manager.clone(),
213            state_store_metrics.clone(),
214        );
215
216        let event_tx = hummock_event_handler.event_sender();
217
218        let instance = Self {
219            context: compactor_context,
220            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
221            sstable_object_id_manager,
222            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
223            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
224            hummock_event_sender: event_tx.clone(),
225            _version_update_sender: version_update_tx,
226            recent_versions: hummock_event_handler.recent_versions(),
227            hummock_version_reader: HummockVersionReader::new(
228                sstable_store,
229                state_store_metrics.clone(),
230                options.max_preload_io_retry_times,
231            ),
232            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
233                shutdown_sender: event_tx,
234            }),
235            read_version_mapping: hummock_event_handler.read_version_mapping(),
236            backup_reader,
237            write_limiter,
238            compact_await_tree_reg: await_tree_reg,
239            hummock_meta_client,
240            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
241                options.time_travel_version_cache_capacity,
242            )),
243        };
244
245        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
246
247        Ok(instance)
248    }
249}
250
251impl HummockStorageReadSnapshot {
252    /// Gets the value of a specified `key` in the table specified in `read_options`.
253    /// The result is based on a snapshot corresponding to the given `epoch`.
254    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
255    ///
256    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
257    /// the key is not found. If `Err()` is returned, the searching for the key
258    /// failed due to other non-EOF errors.
259    async fn get_inner(
260        &self,
261        key: TableKey<Bytes>,
262        read_options: ReadOptions,
263    ) -> StorageResult<Option<StateStoreKeyedRow>> {
264        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
265
266        let (key_range, read_version_tuple) = self
267            .build_read_version_tuple(self.raw_epoch, key_range, &read_options)
268            .await?;
269
270        if is_empty_key_range(&key_range) {
271            return Ok(None);
272        }
273
274        self.hummock_version_reader
275            .get(key, self.raw_epoch, read_options, read_version_tuple)
276            .await
277    }
278
279    async fn iter_inner(
280        &self,
281        key_range: TableKeyRange,
282        read_options: ReadOptions,
283    ) -> StorageResult<HummockStorageIterator> {
284        let (key_range, read_version_tuple) = self
285            .build_read_version_tuple(self.raw_epoch, key_range, &read_options)
286            .await?;
287
288        self.hummock_version_reader
289            .iter(key_range, self.raw_epoch, read_options, read_version_tuple)
290            .await
291    }
292
293    async fn rev_iter_inner(
294        &self,
295        key_range: TableKeyRange,
296        read_options: ReadOptions,
297    ) -> StorageResult<HummockStorageRevIterator> {
298        let (key_range, read_version_tuple) = self
299            .build_read_version_tuple(self.raw_epoch, key_range, &read_options)
300            .await?;
301
302        self.hummock_version_reader
303            .rev_iter(
304                key_range,
305                self.raw_epoch,
306                read_options,
307                read_version_tuple,
308                None,
309            )
310            .await
311    }
312
313    async fn get_time_travel_version(
314        &self,
315        epoch: u64,
316        table_id: TableId,
317    ) -> StorageResult<PinnedVersion> {
318        let meta_client = self.hummock_meta_client.clone();
319        let fetch = async move {
320            let pb_version = meta_client
321                .get_version_by_epoch(epoch, table_id.table_id())
322                .await
323                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
324                .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
325            let version = HummockVersion::from_rpc_protobuf(&pb_version);
326            let (tx, _rx) = unbounded_channel();
327            Ok(PinnedVersion::new(version, tx))
328        };
329        let version = self
330            .simple_time_travel_version_cache
331            .get_or_insert(table_id.table_id, epoch, fetch)
332            .await?;
333        Ok(version)
334    }
335
336    async fn build_read_version_tuple(
337        &self,
338        epoch: u64,
339        key_range: TableKeyRange,
340        read_options: &ReadOptions,
341    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
342        if read_options.read_version_from_backup {
343            self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
344                .await
345        } else if read_options.read_committed {
346            self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
347                .await
348        } else {
349            self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
350        }
351    }
352
353    async fn build_read_version_tuple_from_backup(
354        &self,
355        epoch: u64,
356        table_id: TableId,
357        key_range: TableKeyRange,
358    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
359        match self
360            .backup_reader
361            .try_get_hummock_version(table_id, epoch)
362            .await
363        {
364            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
365                backup_version,
366                table_id,
367                key_range,
368                epoch,
369            )),
370            Ok(None) => Err(HummockError::read_backup_error(format!(
371                "backup include epoch {} not found",
372                epoch
373            ))
374            .into()),
375            Err(e) => Err(e),
376        }
377    }
378
379    async fn build_read_version_tuple_from_committed(
380        &self,
381        epoch: u64,
382        table_id: TableId,
383        key_range: TableKeyRange,
384    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
385        let version = match self
386            .recent_versions
387            .load()
388            .get_safe_version(table_id, epoch)
389        {
390            Some(version) => version,
391            None => self.get_time_travel_version(epoch, table_id).await?,
392        };
393        Ok(get_committed_read_version_tuple(
394            version, table_id, key_range, epoch,
395        ))
396    }
397
398    fn build_read_version_tuple_from_all(
399        &self,
400        epoch: u64,
401        table_id: TableId,
402        key_range: TableKeyRange,
403    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
404        let pinned_version = self.recent_versions.load().latest_version().clone();
405        let info = pinned_version.state_table_info.info().get(&table_id);
406
407        // check epoch if lower mce
408        let ret = if let Some(info) = info
409            && epoch <= info.committed_epoch
410        {
411            if epoch < info.committed_epoch {
412                return Err(
413                    HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
414                );
415            }
416            // read committed_version directly without build snapshot
417            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
418        } else {
419            let vnode = vnode(&key_range);
420            let mut matched_replicated_read_version_cnt = 0;
421            let read_version_vec = {
422                let read_guard = self.read_version_mapping.read();
423                read_guard
424                    .get(&table_id)
425                    .map(|v| {
426                        v.values()
427                            .filter(|v| {
428                                let read_version = v.read();
429                                if read_version.contains(vnode) {
430                                    if read_version.is_replicated() {
431                                        matched_replicated_read_version_cnt += 1;
432                                        false
433                                    } else {
434                                        // Only non-replicated read version with matched vnode is considered
435                                        true
436                                    }
437                                } else {
438                                    false
439                                }
440                            })
441                            .cloned()
442                            .collect_vec()
443                    })
444                    .unwrap_or_default()
445            };
446
447            // When the system has just started and no state has been created, the memory state
448            // may be empty
449            if read_version_vec.is_empty() {
450                let table_committed_epoch = info.map(|info| info.committed_epoch);
451                if matched_replicated_read_version_cnt > 0 {
452                    tracing::warn!(
453                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
454                        table_id,
455                        vnode.to_index(),
456                        epoch,
457                        matched_replicated_read_version_cnt,
458                        table_committed_epoch,
459                    );
460                } else {
461                    tracing::debug!(
462                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
463                        table_id,
464                        vnode.to_index(),
465                        epoch,
466                        table_committed_epoch
467                    );
468                }
469                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
470            } else {
471                if read_version_vec.len() != 1 {
472                    let read_version_vnodes = read_version_vec
473                        .into_iter()
474                        .map(|v| {
475                            let v = v.read();
476                            v.vnodes().iter_ones().collect_vec()
477                        })
478                        .collect_vec();
479                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
480                }
481                read_filter_for_version(
482                    epoch,
483                    table_id,
484                    key_range,
485                    read_version_vec.first().unwrap(),
486                )?
487            }
488        };
489
490        Ok(ret)
491    }
492}
493
494impl HummockStorage {
495    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
496        let (tx, rx) = tokio::sync::oneshot::channel();
497        self.hummock_event_sender
498            .send(HummockEvent::RegisterReadVersion {
499                table_id: option.table_id,
500                new_read_version_sender: tx,
501                is_replicated: option.is_replicated,
502                vnodes: option.vnodes.clone(),
503            })
504            .unwrap();
505
506        let (basic_read_version, instance_guard) = rx.await.unwrap();
507        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
508        LocalHummockStorage::new(
509            instance_guard,
510            basic_read_version,
511            self.hummock_version_reader.clone(),
512            self.hummock_event_sender.clone(),
513            self.buffer_tracker.get_memory_limiter().clone(),
514            self.write_limiter.clone(),
515            option,
516            version_update_notifier_tx,
517            self.context.storage_opts.mem_table_spill_threshold,
518        )
519    }
520
521    pub async fn clear_shared_buffer(&self) {
522        let (tx, rx) = oneshot::channel();
523        self.hummock_event_sender
524            .send(HummockEvent::Clear(tx, None))
525            .expect("should send success");
526        rx.await.expect("should wait success");
527    }
528
529    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
530        if !table_ids.is_empty() {
531            let (tx, rx) = oneshot::channel();
532            self.hummock_event_sender
533                .send(HummockEvent::Clear(tx, Some(table_ids)))
534                .expect("should send success");
535            rx.await.expect("should wait success");
536        }
537    }
538
539    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
540    /// include data of two or more syncs.
541    // TODO: remove this method when we support spill task that can include data of more two or more syncs
542    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
543        let _ = self
544            .hummock_event_sender
545            .send(HummockEvent::StartEpoch { epoch, table_ids });
546    }
547
548    pub fn sstable_store(&self) -> SstableStoreRef {
549        self.context.sstable_store.clone()
550    }
551
552    pub fn sstable_object_id_manager(&self) -> &SstableObjectIdManagerRef {
553        &self.sstable_object_id_manager
554    }
555
556    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
557        self.compaction_catalog_manager_ref.clone()
558    }
559
560    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
561        self.buffer_tracker.get_memory_limiter().clone()
562    }
563
564    pub fn get_pinned_version(&self) -> PinnedVersion {
565        self.recent_versions.load().latest_version().clone()
566    }
567
568    pub fn backup_reader(&self) -> BackupReaderRef {
569        self.backup_reader.clone()
570    }
571
572    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
573        self.compact_await_tree_reg.as_ref()
574    }
575
576    pub async fn min_uncommitted_sst_id(&self) -> Option<HummockSstableObjectId> {
577        let (tx, rx) = oneshot::channel();
578        self.hummock_event_sender
579            .send(HummockEvent::GetMinUncommittedSstId { result_tx: tx })
580            .expect("should send success");
581        rx.await.expect("should await success")
582    }
583
584    pub async fn sync(
585        &self,
586        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
587    ) -> StorageResult<SyncResult> {
588        let (tx, rx) = oneshot::channel();
589        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
590            sync_result_sender: tx,
591            sync_table_epochs,
592        });
593        let synced_data = rx
594            .await
595            .map_err(|_| HummockError::other("failed to receive sync result"))??;
596        Ok(synced_data.into_sync_result())
597    }
598}
599
600#[derive(Clone)]
601pub struct HummockStorageReadSnapshot {
602    raw_epoch: u64,
603    table_id: TableId,
604    recent_versions: Arc<ArcSwap<RecentVersions>>,
605    hummock_version_reader: HummockVersionReader,
606    read_version_mapping: ReadOnlyReadVersionMapping,
607    backup_reader: BackupReaderRef,
608    hummock_meta_client: Arc<dyn HummockMetaClient>,
609    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
610}
611
612impl StateStoreRead for HummockStorageReadSnapshot {
613    type Iter = HummockStorageIterator;
614    type RevIter = HummockStorageRevIterator;
615
616    fn get_keyed_row(
617        &self,
618        key: TableKey<Bytes>,
619        read_options: ReadOptions,
620    ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + Send + '_ {
621        self.get_inner(key, read_options)
622    }
623
624    fn iter(
625        &self,
626        key_range: TableKeyRange,
627        read_options: ReadOptions,
628    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
629        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
630        assert_eq!(
631            r_vnode_exclusive - l_vnode_inclusive,
632            1,
633            "read range {:?} for table {} iter contains more than one vnode",
634            key_range,
635            self.table_id
636        );
637        self.iter_inner(key_range, read_options)
638    }
639
640    fn rev_iter(
641        &self,
642        key_range: TableKeyRange,
643        read_options: ReadOptions,
644    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
645        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
646        assert_eq!(
647            r_vnode_exclusive - l_vnode_inclusive,
648            1,
649            "read range {:?} for table {} iter contains more than one vnode",
650            key_range,
651            self.table_id
652        );
653        self.rev_iter_inner(key_range, read_options)
654    }
655}
656
657impl StateStoreReadLog for HummockStorage {
658    type ChangeLogIter = ChangeLogIterator;
659
660    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
661        fn next_epoch(
662            version: &LocalHummockVersion,
663            epoch: u64,
664            table_id: TableId,
665        ) -> HummockResult<Option<u64>> {
666            let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
667                HummockError::next_epoch(format!("table {} has been dropped", table_id))
668            })?;
669            table_change_log.next_epoch(epoch).map_err(|_| {
670                HummockError::next_epoch(format!(
671                    "invalid epoch {}, change log epoch: {:?}",
672                    epoch,
673                    table_change_log.epochs().collect_vec()
674                ))
675            })
676        }
677        {
678            // fast path
679            let recent_versions = self.recent_versions.load();
680            if let Some(next_epoch) =
681                next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
682            {
683                return Ok(next_epoch);
684            }
685        }
686        let mut next_epoch_ret = None;
687        wait_for_update(
688            &self.version_update_notifier_tx,
689            |version| {
690                if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
691                    next_epoch_ret = Some(next_epoch);
692                    Ok(true)
693                } else {
694                    Ok(false)
695                }
696            },
697            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
698        )
699        .await?;
700        Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
701    }
702
703    async fn iter_log(
704        &self,
705        epoch_range: (u64, u64),
706        key_range: TableKeyRange,
707        options: ReadLogOptions,
708    ) -> StorageResult<Self::ChangeLogIter> {
709        let version = self.recent_versions.load().latest_version().clone();
710        let iter = self
711            .hummock_version_reader
712            .iter_log(version, epoch_range, key_range, options)
713            .await?;
714        Ok(iter)
715    }
716}
717
718impl HummockStorage {
719    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
720    /// we will only check whether it is le `sealed_epoch` and won't wait.
721    async fn try_wait_epoch_impl(
722        &self,
723        wait_epoch: HummockReadEpoch,
724        table_id: TableId,
725    ) -> StorageResult<()> {
726        match wait_epoch {
727            HummockReadEpoch::Committed(wait_epoch) => {
728                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
729                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
730            }
731            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
732                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
733                // fast path by checking recent_versions
734                {
735                    let recent_versions = self.recent_versions.load();
736                    let latest_version = recent_versions.latest_version();
737                    if latest_version.id >= wait_version_id
738                        && let Some(committed_epoch) =
739                            latest_version.table_committed_epoch(table_id)
740                        && committed_epoch >= wait_epoch
741                    {
742                        return Ok(());
743                    }
744                }
745                wait_for_update(
746                    &self.version_update_notifier_tx,
747                    |version| {
748                        if wait_version_id > version.id() {
749                            return Ok(false);
750                        }
751                        let committed_epoch =
752                            version.table_committed_epoch(table_id).ok_or_else(|| {
753                                // In batch query, since we have ensured that the current version must be after the
754                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
755                                // the table must have been dropped.
756                                HummockError::wait_epoch(format!(
757                                    "table id {} has been dropped",
758                                    table_id
759                                ))
760                            })?;
761                        Ok(committed_epoch >= wait_epoch)
762                    },
763                    || {
764                        format!(
765                            "try_wait_epoch: epoch: {}, version_id: {:?}",
766                            wait_epoch, wait_version_id
767                        )
768                    },
769                )
770                .await?;
771            }
772            _ => {}
773        };
774        Ok(())
775    }
776}
777
778impl StateStore for HummockStorage {
779    type Local = LocalHummockStorage;
780    type ReadSnapshot = HummockStorageReadSnapshot;
781
782    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
783    /// we will only check whether it is le `sealed_epoch` and won't wait.
784    async fn try_wait_epoch(
785        &self,
786        wait_epoch: HummockReadEpoch,
787        options: TryWaitEpochOptions,
788    ) -> StorageResult<()> {
789        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
790    }
791
792    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
793        self.new_local_inner(option)
794    }
795
796    async fn new_read_snapshot(
797        &self,
798        epoch: HummockReadEpoch,
799        options: NewReadSnapshotOptions,
800    ) -> StorageResult<Self::ReadSnapshot> {
801        self.try_wait_epoch_impl(epoch, options.table_id).await?;
802        Ok(HummockStorageReadSnapshot {
803            raw_epoch: epoch.get_epoch(),
804            table_id: options.table_id,
805            recent_versions: self.recent_versions.clone(),
806            hummock_version_reader: self.hummock_version_reader.clone(),
807            read_version_mapping: self.read_version_mapping.clone(),
808            backup_reader: self.backup_reader.clone(),
809            hummock_meta_client: self.hummock_meta_client.clone(),
810            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
811        })
812    }
813}
814
815#[cfg(any(test, feature = "test"))]
816impl HummockStorage {
817    pub async fn seal_and_sync_epoch(
818        &self,
819        epoch: u64,
820        table_ids: HashSet<TableId>,
821    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
822        self.sync(vec![(epoch, table_ids)]).await
823    }
824
825    /// Used in the compaction test tool
826    pub async fn update_version_and_wait(&self, version: HummockVersion) {
827        use tokio::task::yield_now;
828        let version_id = version.id;
829        self._version_update_sender
830            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
831            .unwrap();
832        loop {
833            if self.recent_versions.load().latest_version().id() >= version_id {
834                break;
835            }
836
837            yield_now().await
838        }
839    }
840
841    pub async fn wait_version(&self, version: HummockVersion) {
842        use tokio::task::yield_now;
843        loop {
844            if self.recent_versions.load().latest_version().id() >= version.id {
845                break;
846            }
847
848            yield_now().await
849        }
850    }
851
852    pub fn get_shared_buffer_size(&self) -> usize {
853        self.buffer_tracker.get_buffer_size()
854    }
855
856    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
857    pub async fn for_test(
858        options: Arc<StorageOpts>,
859        sstable_store: SstableStoreRef,
860        hummock_meta_client: Arc<dyn HummockMetaClient>,
861        notification_client: impl NotificationClient,
862    ) -> HummockResult<Self> {
863        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
864            FakeRemoteTableAccessor {},
865        )));
866
867        Self::new(
868            options,
869            sstable_store,
870            hummock_meta_client,
871            notification_client,
872            compaction_catalog_manager,
873            Arc::new(HummockStateStoreMetrics::unused()),
874            Arc::new(CompactorMetrics::unused()),
875            None,
876        )
877        .await
878    }
879
880    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
881        &self.context.storage_opts
882    }
883
884    pub fn version_reader(&self) -> &HummockVersionReader {
885        &self.hummock_version_reader
886    }
887
888    pub async fn wait_version_update(
889        &self,
890        old_id: risingwave_hummock_sdk::HummockVersionId,
891    ) -> risingwave_hummock_sdk::HummockVersionId {
892        use tokio::task::yield_now;
893        loop {
894            let cur_id = self.recent_versions.load().latest_version().id();
895            if cur_id > old_id {
896                return cur_id;
897            }
898            yield_now().await;
899        }
900    }
901}