risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::{PkPrefixTableWatermarksIndex, WatermarkSerdeType};
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::error::StorageResult;
44use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
45use crate::hummock::compactor::{
46    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
47};
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
49use crate::hummock::event_handler::{
50    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51};
52use crate::hummock::iterator::change_log::ChangeLogIterator;
53use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::observer_manager::HummockObserverNode;
56use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
57use crate::hummock::utils::{wait_for_epoch, wait_for_update};
58use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
59use crate::hummock::{
60    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
61    MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
62};
63use crate::mem_table::ImmutableMemtable;
64use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
65use crate::opts::StorageOpts;
66use crate::panic_store::PanicStateStore;
67use crate::store::*;
68
69struct HummockStorageShutdownGuard {
70    shutdown_sender: HummockEventSender,
71}
72
73impl Drop for HummockStorageShutdownGuard {
74    fn drop(&mut self) {
75        let _ = self
76            .shutdown_sender
77            .send(HummockEvent::Shutdown)
78            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
79    }
80}
81
82/// `HummockStorage` is the entry point of the Hummock state store backend.
83/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
84/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
85/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
86/// Hummock is the state store backend.
87#[derive(Clone)]
88pub struct HummockStorage {
89    hummock_event_sender: HummockEventSender,
90    // only used in test for setting hummock version in uploader
91    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
92
93    context: CompactorContext,
94
95    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
96
97    object_id_manager: ObjectIdManagerRef,
98
99    buffer_tracker: BufferTracker,
100
101    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
102
103    recent_versions: Arc<ArcSwap<RecentVersions>>,
104
105    hummock_version_reader: HummockVersionReader,
106
107    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
108
109    read_version_mapping: ReadOnlyReadVersionMapping,
110
111    backup_reader: BackupReaderRef,
112
113    write_limiter: WriteLimiterRef,
114
115    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
116
117    hummock_meta_client: Arc<dyn HummockMetaClient>,
118
119    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
120}
121
122pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
123
124pub fn get_committed_read_version_tuple(
125    version: PinnedVersion,
126    table_id: TableId,
127    mut key_range: TableKeyRange,
128    epoch: HummockEpoch,
129) -> (TableKeyRange, ReadVersionTuple) {
130    if let Some(table_watermarks) = version.table_watermarks.get(&table_id)
131        && let WatermarkSerdeType::PkPrefix = table_watermarks.watermark_type
132    {
133        PkPrefixTableWatermarksIndex::new_committed(
134            table_watermarks.clone(),
135            version
136                .state_table_info
137                .info()
138                .get(&table_id)
139                .expect("should exist when having table watermark")
140                .committed_epoch,
141        )
142        .rewrite_range_with_table_watermark(epoch, &mut key_range)
143    }
144    (key_range, (vec![], vec![], version))
145}
146
147impl HummockStorage {
148    /// Creates a [`HummockStorage`].
149    #[allow(clippy::too_many_arguments)]
150    pub async fn new(
151        options: Arc<StorageOpts>,
152        sstable_store: SstableStoreRef,
153        hummock_meta_client: Arc<dyn HummockMetaClient>,
154        notification_client: impl NotificationClient,
155        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
156        state_store_metrics: Arc<HummockStateStoreMetrics>,
157        compactor_metrics: Arc<CompactorMetrics>,
158        await_tree_config: Option<await_tree::Config>,
159    ) -> HummockResult<Self> {
160        let object_id_manager = Arc::new(ObjectIdManager::new(
161            hummock_meta_client.clone(),
162            options.sstable_id_remote_fetch_number,
163        ));
164        let backup_reader = BackupReader::new(
165            &options.backup_storage_url,
166            &options.backup_storage_directory,
167            &options.object_store_config,
168        )
169        .await
170        .map_err(HummockError::read_backup_error)?;
171        let write_limiter = Arc::new(WriteLimiter::default());
172        let (version_update_tx, mut version_update_rx) = unbounded_channel();
173
174        let observer_manager = ObserverManager::new(
175            notification_client,
176            HummockObserverNode::new(
177                compaction_catalog_manager_ref.clone(),
178                backup_reader.clone(),
179                version_update_tx.clone(),
180                write_limiter.clone(),
181            ),
182        )
183        .await;
184        observer_manager.start().await;
185
186        let hummock_version = match version_update_rx.recv().await {
187            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
188            _ => unreachable!(
189                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
190            ),
191        };
192
193        let (pin_version_tx, pin_version_rx) = unbounded_channel();
194        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
195        tokio::spawn(start_pinned_version_worker(
196            pin_version_rx,
197            hummock_meta_client.clone(),
198            options.max_version_pinning_duration_sec,
199        ));
200
201        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
202
203        let compactor_context = CompactorContext::new_local_compact_context(
204            options.clone(),
205            sstable_store.clone(),
206            compactor_metrics.clone(),
207            await_tree_reg.clone(),
208        );
209
210        let hummock_event_handler = HummockEventHandler::new(
211            version_update_rx,
212            pinned_version,
213            compactor_context.clone(),
214            compaction_catalog_manager_ref.clone(),
215            object_id_manager.clone(),
216            state_store_metrics.clone(),
217        );
218
219        let event_tx = hummock_event_handler.event_sender();
220
221        let instance = Self {
222            context: compactor_context,
223            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
224            object_id_manager,
225            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
226            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
227            hummock_event_sender: event_tx.clone(),
228            _version_update_sender: version_update_tx,
229            recent_versions: hummock_event_handler.recent_versions(),
230            hummock_version_reader: HummockVersionReader::new(
231                sstable_store,
232                state_store_metrics.clone(),
233                options.max_preload_io_retry_times,
234            ),
235            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
236                shutdown_sender: event_tx,
237            }),
238            read_version_mapping: hummock_event_handler.read_version_mapping(),
239            backup_reader,
240            write_limiter,
241            compact_await_tree_reg: await_tree_reg,
242            hummock_meta_client,
243            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
244                options.time_travel_version_cache_capacity,
245            )),
246        };
247
248        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
249
250        Ok(instance)
251    }
252}
253
254impl HummockStorageReadSnapshot {
255    /// Gets the value of a specified `key` in the table specified in `read_options`.
256    /// The result is based on a snapshot corresponding to the given `epoch`.
257    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
258    ///
259    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
260    /// the key is not found. If `Err()` is returned, the searching for the key
261    /// failed due to other non-EOF errors.
262    async fn get_inner<O>(
263        &self,
264        key: TableKey<Bytes>,
265        read_options: ReadOptions,
266        on_key_value_fn: impl KeyValueFn<O>,
267    ) -> StorageResult<Option<O>> {
268        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
269
270        let (key_range, read_version_tuple) =
271            self.build_read_version_tuple(self.epoch, key_range).await?;
272
273        if is_empty_key_range(&key_range) {
274            return Ok(None);
275        }
276
277        self.hummock_version_reader
278            .get(
279                key,
280                self.epoch.get_epoch(),
281                self.table_id,
282                read_options,
283                read_version_tuple,
284                on_key_value_fn,
285            )
286            .await
287    }
288
289    async fn iter_inner(
290        &self,
291        key_range: TableKeyRange,
292        read_options: ReadOptions,
293    ) -> StorageResult<HummockStorageIterator> {
294        let (key_range, read_version_tuple) =
295            self.build_read_version_tuple(self.epoch, key_range).await?;
296
297        self.hummock_version_reader
298            .iter(
299                key_range,
300                self.epoch.get_epoch(),
301                self.table_id,
302                read_options,
303                read_version_tuple,
304            )
305            .await
306    }
307
308    async fn rev_iter_inner(
309        &self,
310        key_range: TableKeyRange,
311        read_options: ReadOptions,
312    ) -> StorageResult<HummockStorageRevIterator> {
313        let (key_range, read_version_tuple) =
314            self.build_read_version_tuple(self.epoch, key_range).await?;
315
316        self.hummock_version_reader
317            .rev_iter(
318                key_range,
319                self.epoch.get_epoch(),
320                self.table_id,
321                read_options,
322                read_version_tuple,
323                None,
324            )
325            .await
326    }
327
328    async fn get_time_travel_version(
329        &self,
330        epoch: u64,
331        table_id: TableId,
332    ) -> StorageResult<PinnedVersion> {
333        let meta_client = self.hummock_meta_client.clone();
334        let fetch = async move {
335            let pb_version = meta_client
336                .get_version_by_epoch(epoch, table_id.table_id())
337                .await
338                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
339                .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
340            let version = HummockVersion::from_rpc_protobuf(&pb_version);
341            let (tx, _rx) = unbounded_channel();
342            Ok(PinnedVersion::new(version, tx))
343        };
344        let version = self
345            .simple_time_travel_version_cache
346            .get_or_insert(table_id.table_id, epoch, fetch)
347            .await?;
348        Ok(version)
349    }
350
351    async fn build_read_version_tuple(
352        &self,
353        epoch: HummockReadEpoch,
354        key_range: TableKeyRange,
355    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
356        match epoch {
357            HummockReadEpoch::Backup(epoch) => {
358                self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
359                    .await
360            }
361            HummockReadEpoch::Committed(epoch)
362            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
363            | HummockReadEpoch::TimeTravel(epoch) => {
364                self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
365                    .await
366            }
367            HummockReadEpoch::NoWait(epoch) => {
368                self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
369            }
370        }
371    }
372
373    async fn build_read_version_tuple_from_backup(
374        &self,
375        epoch: u64,
376        table_id: TableId,
377        key_range: TableKeyRange,
378    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
379        match self
380            .backup_reader
381            .try_get_hummock_version(table_id, epoch)
382            .await
383        {
384            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
385                backup_version,
386                table_id,
387                key_range,
388                epoch,
389            )),
390            Ok(None) => Err(HummockError::read_backup_error(format!(
391                "backup include epoch {} not found",
392                epoch
393            ))
394            .into()),
395            Err(e) => Err(e),
396        }
397    }
398
399    async fn build_read_version_tuple_from_committed(
400        &self,
401        epoch: u64,
402        table_id: TableId,
403        key_range: TableKeyRange,
404    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
405        let version = match self
406            .recent_versions
407            .load()
408            .get_safe_version(table_id, epoch)
409        {
410            Some(version) => version,
411            None => self.get_time_travel_version(epoch, table_id).await?,
412        };
413        Ok(get_committed_read_version_tuple(
414            version, table_id, key_range, epoch,
415        ))
416    }
417
418    fn build_read_version_tuple_from_all(
419        &self,
420        epoch: u64,
421        table_id: TableId,
422        key_range: TableKeyRange,
423    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
424        let pinned_version = self.recent_versions.load().latest_version().clone();
425        let info = pinned_version.state_table_info.info().get(&table_id);
426
427        // check epoch if lower mce
428        let ret = if let Some(info) = info
429            && epoch <= info.committed_epoch
430        {
431            if epoch < info.committed_epoch {
432                return Err(
433                    HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
434                );
435            }
436            // read committed_version directly without build snapshot
437            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
438        } else {
439            let vnode = vnode(&key_range);
440            let mut matched_replicated_read_version_cnt = 0;
441            let read_version_vec = {
442                let read_guard = self.read_version_mapping.read();
443                read_guard
444                    .get(&table_id)
445                    .map(|v| {
446                        v.values()
447                            .filter(|v| {
448                                let read_version = v.read();
449                                if read_version.contains(vnode) {
450                                    if read_version.is_replicated() {
451                                        matched_replicated_read_version_cnt += 1;
452                                        false
453                                    } else {
454                                        // Only non-replicated read version with matched vnode is considered
455                                        true
456                                    }
457                                } else {
458                                    false
459                                }
460                            })
461                            .cloned()
462                            .collect_vec()
463                    })
464                    .unwrap_or_default()
465            };
466
467            // When the system has just started and no state has been created, the memory state
468            // may be empty
469            if read_version_vec.is_empty() {
470                let table_committed_epoch = info.map(|info| info.committed_epoch);
471                if matched_replicated_read_version_cnt > 0 {
472                    tracing::warn!(
473                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
474                        table_id,
475                        vnode.to_index(),
476                        epoch,
477                        matched_replicated_read_version_cnt,
478                        table_committed_epoch,
479                    );
480                } else {
481                    tracing::debug!(
482                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
483                        table_id,
484                        vnode.to_index(),
485                        epoch,
486                        table_committed_epoch
487                    );
488                }
489                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
490            } else {
491                if read_version_vec.len() != 1 {
492                    let read_version_vnodes = read_version_vec
493                        .into_iter()
494                        .map(|v| {
495                            let v = v.read();
496                            v.vnodes().iter_ones().collect_vec()
497                        })
498                        .collect_vec();
499                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
500                }
501                read_filter_for_version(
502                    epoch,
503                    table_id,
504                    key_range,
505                    read_version_vec.first().unwrap(),
506                )?
507            }
508        };
509
510        Ok(ret)
511    }
512}
513
514impl HummockStorage {
515    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
516        let (tx, rx) = tokio::sync::oneshot::channel();
517        self.hummock_event_sender
518            .send(HummockEvent::RegisterReadVersion {
519                table_id: option.table_id,
520                new_read_version_sender: tx,
521                is_replicated: option.is_replicated,
522                vnodes: option.vnodes.clone(),
523            })
524            .unwrap();
525
526        let (basic_read_version, instance_guard) = rx.await.unwrap();
527        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
528        LocalHummockStorage::new(
529            instance_guard,
530            basic_read_version,
531            self.hummock_version_reader.clone(),
532            self.hummock_event_sender.clone(),
533            self.buffer_tracker.get_memory_limiter().clone(),
534            self.write_limiter.clone(),
535            option,
536            version_update_notifier_tx,
537            self.context.storage_opts.mem_table_spill_threshold,
538        )
539    }
540
541    pub async fn clear_shared_buffer(&self) {
542        let (tx, rx) = oneshot::channel();
543        self.hummock_event_sender
544            .send(HummockEvent::Clear(tx, None))
545            .expect("should send success");
546        rx.await.expect("should wait success");
547    }
548
549    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
550        if !table_ids.is_empty() {
551            let (tx, rx) = oneshot::channel();
552            self.hummock_event_sender
553                .send(HummockEvent::Clear(tx, Some(table_ids)))
554                .expect("should send success");
555            rx.await.expect("should wait success");
556        }
557    }
558
559    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
560    /// include data of two or more syncs.
561    // TODO: remove this method when we support spill task that can include data of more two or more syncs
562    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
563        let _ = self
564            .hummock_event_sender
565            .send(HummockEvent::StartEpoch { epoch, table_ids });
566    }
567
568    pub fn sstable_store(&self) -> SstableStoreRef {
569        self.context.sstable_store.clone()
570    }
571
572    pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
573        &self.object_id_manager
574    }
575
576    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
577        self.compaction_catalog_manager_ref.clone()
578    }
579
580    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
581        self.buffer_tracker.get_memory_limiter().clone()
582    }
583
584    pub fn get_pinned_version(&self) -> PinnedVersion {
585        self.recent_versions.load().latest_version().clone()
586    }
587
588    pub fn backup_reader(&self) -> BackupReaderRef {
589        self.backup_reader.clone()
590    }
591
592    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
593        self.compact_await_tree_reg.as_ref()
594    }
595
596    pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
597        let (tx, rx) = oneshot::channel();
598        self.hummock_event_sender
599            .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
600            .expect("should send success");
601        rx.await.expect("should await success")
602    }
603
604    pub async fn sync(
605        &self,
606        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
607    ) -> StorageResult<SyncResult> {
608        let (tx, rx) = oneshot::channel();
609        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
610            sync_result_sender: tx,
611            sync_table_epochs,
612        });
613        let synced_data = rx
614            .await
615            .map_err(|_| HummockError::other("failed to receive sync result"))??;
616        Ok(synced_data.into_sync_result())
617    }
618}
619
620#[derive(Clone)]
621pub struct HummockStorageReadSnapshot {
622    epoch: HummockReadEpoch,
623    table_id: TableId,
624    recent_versions: Arc<ArcSwap<RecentVersions>>,
625    hummock_version_reader: HummockVersionReader,
626    read_version_mapping: ReadOnlyReadVersionMapping,
627    backup_reader: BackupReaderRef,
628    hummock_meta_client: Arc<dyn HummockMetaClient>,
629    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
630}
631
632impl StateStoreGet for HummockStorageReadSnapshot {
633    fn on_key_value<O: Send + 'static>(
634        &self,
635        key: TableKey<Bytes>,
636        read_options: ReadOptions,
637        on_key_value_fn: impl KeyValueFn<O>,
638    ) -> impl StorageFuture<'_, Option<O>> {
639        self.get_inner(key, read_options, on_key_value_fn)
640    }
641}
642
643impl StateStoreRead for HummockStorageReadSnapshot {
644    type Iter = HummockStorageIterator;
645    type RevIter = HummockStorageRevIterator;
646
647    fn iter(
648        &self,
649        key_range: TableKeyRange,
650        read_options: ReadOptions,
651    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
652        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
653        assert_eq!(
654            r_vnode_exclusive - l_vnode_inclusive,
655            1,
656            "read range {:?} for table {} iter contains more than one vnode",
657            key_range,
658            self.table_id
659        );
660        self.iter_inner(key_range, read_options)
661    }
662
663    fn rev_iter(
664        &self,
665        key_range: TableKeyRange,
666        read_options: ReadOptions,
667    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
668        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
669        assert_eq!(
670            r_vnode_exclusive - l_vnode_inclusive,
671            1,
672            "read range {:?} for table {} iter contains more than one vnode",
673            key_range,
674            self.table_id
675        );
676        self.rev_iter_inner(key_range, read_options)
677    }
678}
679
680impl StateStoreReadVector for HummockStorageReadSnapshot {
681    async fn nearest<O: Send + 'static>(
682        &self,
683        _vec: Vector,
684        _options: VectorNearestOptions,
685        _on_nearest_item_fn: impl OnNearestItemFn<O>,
686    ) -> StorageResult<Vec<O>> {
687        unimplemented!()
688    }
689}
690
691impl StateStoreReadLog for HummockStorage {
692    type ChangeLogIter = ChangeLogIterator;
693
694    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
695        fn next_epoch(
696            version: &LocalHummockVersion,
697            epoch: u64,
698            table_id: TableId,
699        ) -> HummockResult<Option<u64>> {
700            let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
701                HummockError::next_epoch(format!("table {} has been dropped", table_id))
702            })?;
703            table_change_log.next_epoch(epoch).map_err(|_| {
704                HummockError::next_epoch(format!(
705                    "invalid epoch {}, change log epoch: {:?}",
706                    epoch,
707                    table_change_log.epochs().collect_vec()
708                ))
709            })
710        }
711        {
712            // fast path
713            let recent_versions = self.recent_versions.load();
714            if let Some(next_epoch) =
715                next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
716            {
717                return Ok(next_epoch);
718            }
719        }
720        let mut next_epoch_ret = None;
721        wait_for_update(
722            &self.version_update_notifier_tx,
723            |version| {
724                if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
725                    next_epoch_ret = Some(next_epoch);
726                    Ok(true)
727                } else {
728                    Ok(false)
729                }
730            },
731            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
732        )
733        .await?;
734        Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
735    }
736
737    async fn iter_log(
738        &self,
739        epoch_range: (u64, u64),
740        key_range: TableKeyRange,
741        options: ReadLogOptions,
742    ) -> StorageResult<Self::ChangeLogIter> {
743        let version = self.recent_versions.load().latest_version().clone();
744        let iter = self
745            .hummock_version_reader
746            .iter_log(version, epoch_range, key_range, options)
747            .await?;
748        Ok(iter)
749    }
750}
751
752impl HummockStorage {
753    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
754    /// we will only check whether it is le `sealed_epoch` and won't wait.
755    async fn try_wait_epoch_impl(
756        &self,
757        wait_epoch: HummockReadEpoch,
758        table_id: TableId,
759    ) -> StorageResult<()> {
760        match wait_epoch {
761            HummockReadEpoch::Committed(wait_epoch) => {
762                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
763                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
764            }
765            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
766                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
767                // fast path by checking recent_versions
768                {
769                    let recent_versions = self.recent_versions.load();
770                    let latest_version = recent_versions.latest_version();
771                    if latest_version.id >= wait_version_id
772                        && let Some(committed_epoch) =
773                            latest_version.table_committed_epoch(table_id)
774                        && committed_epoch >= wait_epoch
775                    {
776                        return Ok(());
777                    }
778                }
779                wait_for_update(
780                    &self.version_update_notifier_tx,
781                    |version| {
782                        if wait_version_id > version.id() {
783                            return Ok(false);
784                        }
785                        let committed_epoch =
786                            version.table_committed_epoch(table_id).ok_or_else(|| {
787                                // In batch query, since we have ensured that the current version must be after the
788                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
789                                // the table must have been dropped.
790                                HummockError::wait_epoch(format!(
791                                    "table id {} has been dropped",
792                                    table_id
793                                ))
794                            })?;
795                        Ok(committed_epoch >= wait_epoch)
796                    },
797                    || {
798                        format!(
799                            "try_wait_epoch: epoch: {}, version_id: {:?}",
800                            wait_epoch, wait_version_id
801                        )
802                    },
803                )
804                .await?;
805            }
806            _ => {}
807        };
808        Ok(())
809    }
810}
811
812impl StateStore for HummockStorage {
813    type Local = LocalHummockStorage;
814    type ReadSnapshot = HummockStorageReadSnapshot;
815    type VectorWriter = PanicStateStore;
816
817    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
818    /// we will only check whether it is le `sealed_epoch` and won't wait.
819    async fn try_wait_epoch(
820        &self,
821        wait_epoch: HummockReadEpoch,
822        options: TryWaitEpochOptions,
823    ) -> StorageResult<()> {
824        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
825    }
826
827    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
828        self.new_local_inner(option)
829    }
830
831    async fn new_read_snapshot(
832        &self,
833        epoch: HummockReadEpoch,
834        options: NewReadSnapshotOptions,
835    ) -> StorageResult<Self::ReadSnapshot> {
836        self.try_wait_epoch_impl(epoch, options.table_id).await?;
837        Ok(HummockStorageReadSnapshot {
838            epoch,
839            table_id: options.table_id,
840            recent_versions: self.recent_versions.clone(),
841            hummock_version_reader: self.hummock_version_reader.clone(),
842            read_version_mapping: self.read_version_mapping.clone(),
843            backup_reader: self.backup_reader.clone(),
844            hummock_meta_client: self.hummock_meta_client.clone(),
845            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
846        })
847    }
848
849    async fn new_vector_writer(&self, _options: NewVectorWriterOptions) -> Self::VectorWriter {
850        unimplemented!()
851    }
852}
853
854#[cfg(any(test, feature = "test"))]
855impl HummockStorage {
856    pub async fn seal_and_sync_epoch(
857        &self,
858        epoch: u64,
859        table_ids: HashSet<TableId>,
860    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
861        self.sync(vec![(epoch, table_ids)]).await
862    }
863
864    /// Used in the compaction test tool
865    pub async fn update_version_and_wait(&self, version: HummockVersion) {
866        use tokio::task::yield_now;
867        let version_id = version.id;
868        self._version_update_sender
869            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
870            .unwrap();
871        loop {
872            if self.recent_versions.load().latest_version().id() >= version_id {
873                break;
874            }
875
876            yield_now().await
877        }
878    }
879
880    pub async fn wait_version(&self, version: HummockVersion) {
881        use tokio::task::yield_now;
882        loop {
883            if self.recent_versions.load().latest_version().id() >= version.id {
884                break;
885            }
886
887            yield_now().await
888        }
889    }
890
891    pub fn get_shared_buffer_size(&self) -> usize {
892        self.buffer_tracker.get_buffer_size()
893    }
894
895    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
896    pub async fn for_test(
897        options: Arc<StorageOpts>,
898        sstable_store: SstableStoreRef,
899        hummock_meta_client: Arc<dyn HummockMetaClient>,
900        notification_client: impl NotificationClient,
901    ) -> HummockResult<Self> {
902        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
903            FakeRemoteTableAccessor {},
904        )));
905
906        Self::new(
907            options,
908            sstable_store,
909            hummock_meta_client,
910            notification_client,
911            compaction_catalog_manager,
912            Arc::new(HummockStateStoreMetrics::unused()),
913            Arc::new(CompactorMetrics::unused()),
914            None,
915        )
916        .await
917    }
918
919    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
920        &self.context.storage_opts
921    }
922
923    pub fn version_reader(&self) -> &HummockVersionReader {
924        &self.hummock_version_reader
925    }
926
927    pub async fn wait_version_update(
928        &self,
929        old_id: risingwave_hummock_sdk::HummockVersionId,
930    ) -> risingwave_hummock_sdk::HummockVersionId {
931        use tokio::task::yield_now;
932        loop {
933            let cur_id = self.recent_versions.load().latest_version().id();
934            if cur_id > old_id {
935                return cur_id;
936            }
937            yield_now().await;
938        }
939    }
940}