risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::error::StorageResult;
44use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
45use crate::hummock::compactor::{
46    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
47};
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
49use crate::hummock::event_handler::{
50    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
51};
52use crate::hummock::iterator::change_log::ChangeLogIterator;
53use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
54use crate::hummock::local_version::recent_versions::RecentVersions;
55use crate::hummock::observer_manager::HummockObserverNode;
56use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
57use crate::hummock::utils::{wait_for_epoch, wait_for_update};
58use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
59use crate::hummock::{
60    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
61    MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
62};
63use crate::mem_table::ImmutableMemtable;
64use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
65use crate::opts::StorageOpts;
66use crate::panic_store::PanicStateStore;
67use crate::store::*;
68
69struct HummockStorageShutdownGuard {
70    shutdown_sender: HummockEventSender,
71}
72
73impl Drop for HummockStorageShutdownGuard {
74    fn drop(&mut self) {
75        let _ = self
76            .shutdown_sender
77            .send(HummockEvent::Shutdown)
78            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
79    }
80}
81
82/// `HummockStorage` is the entry point of the Hummock state store backend.
83/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
84/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
85/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
86/// Hummock is the state store backend.
87#[derive(Clone)]
88pub struct HummockStorage {
89    hummock_event_sender: HummockEventSender,
90    // only used in test for setting hummock version in uploader
91    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
92
93    context: CompactorContext,
94
95    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
96
97    object_id_manager: ObjectIdManagerRef,
98
99    buffer_tracker: BufferTracker,
100
101    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
102
103    recent_versions: Arc<ArcSwap<RecentVersions>>,
104
105    hummock_version_reader: HummockVersionReader,
106
107    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
108
109    read_version_mapping: ReadOnlyReadVersionMapping,
110
111    backup_reader: BackupReaderRef,
112
113    write_limiter: WriteLimiterRef,
114
115    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
116
117    hummock_meta_client: Arc<dyn HummockMetaClient>,
118
119    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
120}
121
122pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
123
124pub fn get_committed_read_version_tuple(
125    version: PinnedVersion,
126    table_id: TableId,
127    mut key_range: TableKeyRange,
128    epoch: HummockEpoch,
129) -> (TableKeyRange, ReadVersionTuple) {
130    if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
131        TableWatermarksIndex::new_committed(
132            table_watermarks.clone(),
133            version
134                .state_table_info
135                .info()
136                .get(&table_id)
137                .expect("should exist when having table watermark")
138                .committed_epoch,
139        )
140        .rewrite_range_with_table_watermark(epoch, &mut key_range)
141    }
142    (key_range, (vec![], vec![], version))
143}
144
145impl HummockStorage {
146    /// Creates a [`HummockStorage`].
147    #[allow(clippy::too_many_arguments)]
148    pub async fn new(
149        options: Arc<StorageOpts>,
150        sstable_store: SstableStoreRef,
151        hummock_meta_client: Arc<dyn HummockMetaClient>,
152        notification_client: impl NotificationClient,
153        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
154        state_store_metrics: Arc<HummockStateStoreMetrics>,
155        compactor_metrics: Arc<CompactorMetrics>,
156        await_tree_config: Option<await_tree::Config>,
157    ) -> HummockResult<Self> {
158        let object_id_manager = Arc::new(ObjectIdManager::new(
159            hummock_meta_client.clone(),
160            options.sstable_id_remote_fetch_number,
161        ));
162        let backup_reader = BackupReader::new(
163            &options.backup_storage_url,
164            &options.backup_storage_directory,
165            &options.object_store_config,
166        )
167        .await
168        .map_err(HummockError::read_backup_error)?;
169        let write_limiter = Arc::new(WriteLimiter::default());
170        let (version_update_tx, mut version_update_rx) = unbounded_channel();
171
172        let observer_manager = ObserverManager::new(
173            notification_client,
174            HummockObserverNode::new(
175                compaction_catalog_manager_ref.clone(),
176                backup_reader.clone(),
177                version_update_tx.clone(),
178                write_limiter.clone(),
179            ),
180        )
181        .await;
182        observer_manager.start().await;
183
184        let hummock_version = match version_update_rx.recv().await {
185            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
186            _ => unreachable!(
187                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
188            ),
189        };
190
191        let (pin_version_tx, pin_version_rx) = unbounded_channel();
192        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
193        tokio::spawn(start_pinned_version_worker(
194            pin_version_rx,
195            hummock_meta_client.clone(),
196            options.max_version_pinning_duration_sec,
197        ));
198
199        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
200
201        let compactor_context = CompactorContext::new_local_compact_context(
202            options.clone(),
203            sstable_store.clone(),
204            compactor_metrics.clone(),
205            await_tree_reg.clone(),
206        );
207
208        let hummock_event_handler = HummockEventHandler::new(
209            version_update_rx,
210            pinned_version,
211            compactor_context.clone(),
212            compaction_catalog_manager_ref.clone(),
213            object_id_manager.clone(),
214            state_store_metrics.clone(),
215        );
216
217        let event_tx = hummock_event_handler.event_sender();
218
219        let instance = Self {
220            context: compactor_context,
221            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
222            object_id_manager,
223            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
224            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
225            hummock_event_sender: event_tx.clone(),
226            _version_update_sender: version_update_tx,
227            recent_versions: hummock_event_handler.recent_versions(),
228            hummock_version_reader: HummockVersionReader::new(
229                sstable_store,
230                state_store_metrics.clone(),
231                options.max_preload_io_retry_times,
232            ),
233            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
234                shutdown_sender: event_tx,
235            }),
236            read_version_mapping: hummock_event_handler.read_version_mapping(),
237            backup_reader,
238            write_limiter,
239            compact_await_tree_reg: await_tree_reg,
240            hummock_meta_client,
241            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
242                options.time_travel_version_cache_capacity,
243            )),
244        };
245
246        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
247
248        Ok(instance)
249    }
250}
251
252impl HummockStorageReadSnapshot {
253    /// Gets the value of a specified `key` in the table specified in `read_options`.
254    /// The result is based on a snapshot corresponding to the given `epoch`.
255    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
256    ///
257    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
258    /// the key is not found. If `Err()` is returned, the searching for the key
259    /// failed due to other non-EOF errors.
260    async fn get_inner<O>(
261        &self,
262        key: TableKey<Bytes>,
263        read_options: ReadOptions,
264        on_key_value_fn: impl KeyValueFn<O>,
265    ) -> StorageResult<Option<O>> {
266        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
267
268        let (key_range, read_version_tuple) =
269            self.build_read_version_tuple(self.epoch, key_range).await?;
270
271        if is_empty_key_range(&key_range) {
272            return Ok(None);
273        }
274
275        self.hummock_version_reader
276            .get(
277                key,
278                self.epoch.get_epoch(),
279                self.table_id,
280                read_options,
281                read_version_tuple,
282                on_key_value_fn,
283            )
284            .await
285    }
286
287    async fn iter_inner(
288        &self,
289        key_range: TableKeyRange,
290        read_options: ReadOptions,
291    ) -> StorageResult<HummockStorageIterator> {
292        let (key_range, read_version_tuple) =
293            self.build_read_version_tuple(self.epoch, key_range).await?;
294
295        self.hummock_version_reader
296            .iter(
297                key_range,
298                self.epoch.get_epoch(),
299                self.table_id,
300                read_options,
301                read_version_tuple,
302            )
303            .await
304    }
305
306    async fn rev_iter_inner(
307        &self,
308        key_range: TableKeyRange,
309        read_options: ReadOptions,
310    ) -> StorageResult<HummockStorageRevIterator> {
311        let (key_range, read_version_tuple) =
312            self.build_read_version_tuple(self.epoch, key_range).await?;
313
314        self.hummock_version_reader
315            .rev_iter(
316                key_range,
317                self.epoch.get_epoch(),
318                self.table_id,
319                read_options,
320                read_version_tuple,
321                None,
322            )
323            .await
324    }
325
326    async fn get_time_travel_version(
327        &self,
328        epoch: u64,
329        table_id: TableId,
330    ) -> StorageResult<PinnedVersion> {
331        let meta_client = self.hummock_meta_client.clone();
332        let fetch = async move {
333            let pb_version = meta_client
334                .get_version_by_epoch(epoch, table_id.table_id())
335                .await
336                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
337                .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
338            let version = HummockVersion::from_rpc_protobuf(&pb_version);
339            let (tx, _rx) = unbounded_channel();
340            Ok(PinnedVersion::new(version, tx))
341        };
342        let version = self
343            .simple_time_travel_version_cache
344            .get_or_insert(table_id.table_id, epoch, fetch)
345            .await?;
346        Ok(version)
347    }
348
349    async fn build_read_version_tuple(
350        &self,
351        epoch: HummockReadEpoch,
352        key_range: TableKeyRange,
353    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
354        match epoch {
355            HummockReadEpoch::Backup(epoch) => {
356                self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
357                    .await
358            }
359            HummockReadEpoch::Committed(epoch)
360            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
361            | HummockReadEpoch::TimeTravel(epoch) => {
362                self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
363                    .await
364            }
365            HummockReadEpoch::NoWait(epoch) => {
366                self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
367            }
368        }
369    }
370
371    async fn build_read_version_tuple_from_backup(
372        &self,
373        epoch: u64,
374        table_id: TableId,
375        key_range: TableKeyRange,
376    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
377        match self
378            .backup_reader
379            .try_get_hummock_version(table_id, epoch)
380            .await
381        {
382            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
383                backup_version,
384                table_id,
385                key_range,
386                epoch,
387            )),
388            Ok(None) => Err(HummockError::read_backup_error(format!(
389                "backup include epoch {} not found",
390                epoch
391            ))
392            .into()),
393            Err(e) => Err(e),
394        }
395    }
396
397    async fn build_read_version_tuple_from_committed(
398        &self,
399        epoch: u64,
400        table_id: TableId,
401        key_range: TableKeyRange,
402    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
403        let version = match self
404            .recent_versions
405            .load()
406            .get_safe_version(table_id, epoch)
407        {
408            Some(version) => version,
409            None => self.get_time_travel_version(epoch, table_id).await?,
410        };
411        Ok(get_committed_read_version_tuple(
412            version, table_id, key_range, epoch,
413        ))
414    }
415
416    fn build_read_version_tuple_from_all(
417        &self,
418        epoch: u64,
419        table_id: TableId,
420        key_range: TableKeyRange,
421    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
422        let pinned_version = self.recent_versions.load().latest_version().clone();
423        let info = pinned_version.state_table_info.info().get(&table_id);
424
425        // check epoch if lower mce
426        let ret = if let Some(info) = info
427            && epoch <= info.committed_epoch
428        {
429            if epoch < info.committed_epoch {
430                return Err(
431                    HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
432                );
433            }
434            // read committed_version directly without build snapshot
435            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
436        } else {
437            let vnode = vnode(&key_range);
438            let mut matched_replicated_read_version_cnt = 0;
439            let read_version_vec = {
440                let read_guard = self.read_version_mapping.read();
441                read_guard
442                    .get(&table_id)
443                    .map(|v| {
444                        v.values()
445                            .filter(|v| {
446                                let read_version = v.read();
447                                if read_version.contains(vnode) {
448                                    if read_version.is_replicated() {
449                                        matched_replicated_read_version_cnt += 1;
450                                        false
451                                    } else {
452                                        // Only non-replicated read version with matched vnode is considered
453                                        true
454                                    }
455                                } else {
456                                    false
457                                }
458                            })
459                            .cloned()
460                            .collect_vec()
461                    })
462                    .unwrap_or_default()
463            };
464
465            // When the system has just started and no state has been created, the memory state
466            // may be empty
467            if read_version_vec.is_empty() {
468                let table_committed_epoch = info.map(|info| info.committed_epoch);
469                if matched_replicated_read_version_cnt > 0 {
470                    tracing::warn!(
471                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
472                        table_id,
473                        vnode.to_index(),
474                        epoch,
475                        matched_replicated_read_version_cnt,
476                        table_committed_epoch,
477                    );
478                } else {
479                    tracing::debug!(
480                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
481                        table_id,
482                        vnode.to_index(),
483                        epoch,
484                        table_committed_epoch
485                    );
486                }
487                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
488            } else {
489                if read_version_vec.len() != 1 {
490                    let read_version_vnodes = read_version_vec
491                        .into_iter()
492                        .map(|v| {
493                            let v = v.read();
494                            v.vnodes().iter_ones().collect_vec()
495                        })
496                        .collect_vec();
497                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
498                }
499                read_filter_for_version(
500                    epoch,
501                    table_id,
502                    key_range,
503                    read_version_vec.first().unwrap(),
504                )?
505            }
506        };
507
508        Ok(ret)
509    }
510}
511
512impl HummockStorage {
513    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
514        let (tx, rx) = tokio::sync::oneshot::channel();
515        self.hummock_event_sender
516            .send(HummockEvent::RegisterReadVersion {
517                table_id: option.table_id,
518                new_read_version_sender: tx,
519                is_replicated: option.is_replicated,
520                vnodes: option.vnodes.clone(),
521            })
522            .unwrap();
523
524        let (basic_read_version, instance_guard) = rx.await.unwrap();
525        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
526        LocalHummockStorage::new(
527            instance_guard,
528            basic_read_version,
529            self.hummock_version_reader.clone(),
530            self.hummock_event_sender.clone(),
531            self.buffer_tracker.get_memory_limiter().clone(),
532            self.write_limiter.clone(),
533            option,
534            version_update_notifier_tx,
535            self.context.storage_opts.mem_table_spill_threshold,
536        )
537    }
538
539    pub async fn clear_shared_buffer(&self) {
540        let (tx, rx) = oneshot::channel();
541        self.hummock_event_sender
542            .send(HummockEvent::Clear(tx, None))
543            .expect("should send success");
544        rx.await.expect("should wait success");
545    }
546
547    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
548        if !table_ids.is_empty() {
549            let (tx, rx) = oneshot::channel();
550            self.hummock_event_sender
551                .send(HummockEvent::Clear(tx, Some(table_ids)))
552                .expect("should send success");
553            rx.await.expect("should wait success");
554        }
555    }
556
557    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
558    /// include data of two or more syncs.
559    // TODO: remove this method when we support spill task that can include data of more two or more syncs
560    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
561        let _ = self
562            .hummock_event_sender
563            .send(HummockEvent::StartEpoch { epoch, table_ids });
564    }
565
566    pub fn sstable_store(&self) -> SstableStoreRef {
567        self.context.sstable_store.clone()
568    }
569
570    pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
571        &self.object_id_manager
572    }
573
574    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
575        self.compaction_catalog_manager_ref.clone()
576    }
577
578    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
579        self.buffer_tracker.get_memory_limiter().clone()
580    }
581
582    pub fn get_pinned_version(&self) -> PinnedVersion {
583        self.recent_versions.load().latest_version().clone()
584    }
585
586    pub fn backup_reader(&self) -> BackupReaderRef {
587        self.backup_reader.clone()
588    }
589
590    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
591        self.compact_await_tree_reg.as_ref()
592    }
593
594    pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
595        let (tx, rx) = oneshot::channel();
596        self.hummock_event_sender
597            .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
598            .expect("should send success");
599        rx.await.expect("should await success")
600    }
601
602    pub async fn sync(
603        &self,
604        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
605    ) -> StorageResult<SyncResult> {
606        let (tx, rx) = oneshot::channel();
607        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
608            sync_result_sender: tx,
609            sync_table_epochs,
610        });
611        let synced_data = rx
612            .await
613            .map_err(|_| HummockError::other("failed to receive sync result"))??;
614        Ok(synced_data.into_sync_result())
615    }
616}
617
618#[derive(Clone)]
619pub struct HummockStorageReadSnapshot {
620    epoch: HummockReadEpoch,
621    table_id: TableId,
622    recent_versions: Arc<ArcSwap<RecentVersions>>,
623    hummock_version_reader: HummockVersionReader,
624    read_version_mapping: ReadOnlyReadVersionMapping,
625    backup_reader: BackupReaderRef,
626    hummock_meta_client: Arc<dyn HummockMetaClient>,
627    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
628}
629
630impl StateStoreGet for HummockStorageReadSnapshot {
631    fn on_key_value<O: Send + 'static>(
632        &self,
633        key: TableKey<Bytes>,
634        read_options: ReadOptions,
635        on_key_value_fn: impl KeyValueFn<O>,
636    ) -> impl StorageFuture<'_, Option<O>> {
637        self.get_inner(key, read_options, on_key_value_fn)
638    }
639}
640
641impl StateStoreRead for HummockStorageReadSnapshot {
642    type Iter = HummockStorageIterator;
643    type RevIter = HummockStorageRevIterator;
644
645    fn iter(
646        &self,
647        key_range: TableKeyRange,
648        read_options: ReadOptions,
649    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
650        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
651        assert_eq!(
652            r_vnode_exclusive - l_vnode_inclusive,
653            1,
654            "read range {:?} for table {} iter contains more than one vnode",
655            key_range,
656            self.table_id
657        );
658        self.iter_inner(key_range, read_options)
659    }
660
661    fn rev_iter(
662        &self,
663        key_range: TableKeyRange,
664        read_options: ReadOptions,
665    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
666        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
667        assert_eq!(
668            r_vnode_exclusive - l_vnode_inclusive,
669            1,
670            "read range {:?} for table {} iter contains more than one vnode",
671            key_range,
672            self.table_id
673        );
674        self.rev_iter_inner(key_range, read_options)
675    }
676}
677
678impl StateStoreReadVector for HummockStorageReadSnapshot {
679    async fn nearest<O: Send + 'static>(
680        &self,
681        _vec: Vector,
682        _options: VectorNearestOptions,
683        _on_nearest_item_fn: impl OnNearestItemFn<O>,
684    ) -> StorageResult<Vec<O>> {
685        unimplemented!()
686    }
687}
688
689impl StateStoreReadLog for HummockStorage {
690    type ChangeLogIter = ChangeLogIterator;
691
692    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
693        fn next_epoch(
694            version: &LocalHummockVersion,
695            epoch: u64,
696            table_id: TableId,
697        ) -> HummockResult<Option<u64>> {
698            let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
699                HummockError::next_epoch(format!("table {} has been dropped", table_id))
700            })?;
701            table_change_log.next_epoch(epoch).map_err(|_| {
702                HummockError::next_epoch(format!(
703                    "invalid epoch {}, change log epoch: {:?}",
704                    epoch,
705                    table_change_log.epochs().collect_vec()
706                ))
707            })
708        }
709        {
710            // fast path
711            let recent_versions = self.recent_versions.load();
712            if let Some(next_epoch) =
713                next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
714            {
715                return Ok(next_epoch);
716            }
717        }
718        let mut next_epoch_ret = None;
719        wait_for_update(
720            &self.version_update_notifier_tx,
721            |version| {
722                if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
723                    next_epoch_ret = Some(next_epoch);
724                    Ok(true)
725                } else {
726                    Ok(false)
727                }
728            },
729            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
730        )
731        .await?;
732        Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
733    }
734
735    async fn iter_log(
736        &self,
737        epoch_range: (u64, u64),
738        key_range: TableKeyRange,
739        options: ReadLogOptions,
740    ) -> StorageResult<Self::ChangeLogIter> {
741        let version = self.recent_versions.load().latest_version().clone();
742        let iter = self
743            .hummock_version_reader
744            .iter_log(version, epoch_range, key_range, options)
745            .await?;
746        Ok(iter)
747    }
748}
749
750impl HummockStorage {
751    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
752    /// we will only check whether it is le `sealed_epoch` and won't wait.
753    async fn try_wait_epoch_impl(
754        &self,
755        wait_epoch: HummockReadEpoch,
756        table_id: TableId,
757    ) -> StorageResult<()> {
758        match wait_epoch {
759            HummockReadEpoch::Committed(wait_epoch) => {
760                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
761                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
762            }
763            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
764                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
765                // fast path by checking recent_versions
766                {
767                    let recent_versions = self.recent_versions.load();
768                    let latest_version = recent_versions.latest_version();
769                    if latest_version.id >= wait_version_id
770                        && let Some(committed_epoch) =
771                            latest_version.table_committed_epoch(table_id)
772                        && committed_epoch >= wait_epoch
773                    {
774                        return Ok(());
775                    }
776                }
777                wait_for_update(
778                    &self.version_update_notifier_tx,
779                    |version| {
780                        if wait_version_id > version.id() {
781                            return Ok(false);
782                        }
783                        let committed_epoch =
784                            version.table_committed_epoch(table_id).ok_or_else(|| {
785                                // In batch query, since we have ensured that the current version must be after the
786                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
787                                // the table must have been dropped.
788                                HummockError::wait_epoch(format!(
789                                    "table id {} has been dropped",
790                                    table_id
791                                ))
792                            })?;
793                        Ok(committed_epoch >= wait_epoch)
794                    },
795                    || {
796                        format!(
797                            "try_wait_epoch: epoch: {}, version_id: {:?}",
798                            wait_epoch, wait_version_id
799                        )
800                    },
801                )
802                .await?;
803            }
804            _ => {}
805        };
806        Ok(())
807    }
808}
809
810impl StateStore for HummockStorage {
811    type Local = LocalHummockStorage;
812    type ReadSnapshot = HummockStorageReadSnapshot;
813    type VectorWriter = PanicStateStore;
814
815    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
816    /// we will only check whether it is le `sealed_epoch` and won't wait.
817    async fn try_wait_epoch(
818        &self,
819        wait_epoch: HummockReadEpoch,
820        options: TryWaitEpochOptions,
821    ) -> StorageResult<()> {
822        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
823    }
824
825    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
826        self.new_local_inner(option)
827    }
828
829    async fn new_read_snapshot(
830        &self,
831        epoch: HummockReadEpoch,
832        options: NewReadSnapshotOptions,
833    ) -> StorageResult<Self::ReadSnapshot> {
834        self.try_wait_epoch_impl(epoch, options.table_id).await?;
835        Ok(HummockStorageReadSnapshot {
836            epoch,
837            table_id: options.table_id,
838            recent_versions: self.recent_versions.clone(),
839            hummock_version_reader: self.hummock_version_reader.clone(),
840            read_version_mapping: self.read_version_mapping.clone(),
841            backup_reader: self.backup_reader.clone(),
842            hummock_meta_client: self.hummock_meta_client.clone(),
843            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
844        })
845    }
846
847    async fn new_vector_writer(&self, _options: NewVectorWriterOptions) -> Self::VectorWriter {
848        unimplemented!()
849    }
850}
851
852#[cfg(any(test, feature = "test"))]
853impl HummockStorage {
854    pub async fn seal_and_sync_epoch(
855        &self,
856        epoch: u64,
857        table_ids: HashSet<TableId>,
858    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
859        self.sync(vec![(epoch, table_ids)]).await
860    }
861
862    /// Used in the compaction test tool
863    pub async fn update_version_and_wait(&self, version: HummockVersion) {
864        use tokio::task::yield_now;
865        let version_id = version.id;
866        self._version_update_sender
867            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
868            .unwrap();
869        loop {
870            if self.recent_versions.load().latest_version().id() >= version_id {
871                break;
872            }
873
874            yield_now().await
875        }
876    }
877
878    pub async fn wait_version(&self, version: HummockVersion) {
879        use tokio::task::yield_now;
880        loop {
881            if self.recent_versions.load().latest_version().id() >= version.id {
882                break;
883            }
884
885            yield_now().await
886        }
887    }
888
889    pub fn get_shared_buffer_size(&self) -> usize {
890        self.buffer_tracker.get_buffer_size()
891    }
892
893    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
894    pub async fn for_test(
895        options: Arc<StorageOpts>,
896        sstable_store: SstableStoreRef,
897        hummock_meta_client: Arc<dyn HummockMetaClient>,
898        notification_client: impl NotificationClient,
899    ) -> HummockResult<Self> {
900        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
901            FakeRemoteTableAccessor {},
902        )));
903
904        Self::new(
905            options,
906            sstable_store,
907            hummock_meta_client,
908            notification_client,
909            compaction_catalog_manager,
910            Arc::new(HummockStateStoreMetrics::unused()),
911            Arc::new(CompactorMetrics::unused()),
912            None,
913        )
914        .await
915    }
916
917    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
918        &self.context.storage_opts
919    }
920
921    pub fn version_reader(&self) -> &HummockVersionReader {
922        &self.hummock_version_reader
923    }
924
925    pub async fn wait_version_update(
926        &self,
927        old_id: risingwave_hummock_sdk::HummockVersionId,
928    ) -> risingwave_hummock_sdk::HummockVersionId {
929        use tokio::task::yield_now;
930        loop {
931            let cur_id = self.recent_versions.load().latest_version().id();
932            if cur_id > old_id {
933                return cur_id;
934            }
935            yield_now().await;
936        }
937    }
938}