risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::TableId;
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::key::{
29    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
30};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_watermark::{PkPrefixTableWatermarksIndex, WatermarkSerdeType};
33use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
34use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
35use risingwave_rpc_client::HummockMetaClient;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
38use tokio::sync::oneshot;
39
40use super::local_hummock_storage::LocalHummockStorage;
41use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43#[cfg(any(test, feature = "test"))]
44use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
45use crate::error::StorageResult;
46use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
47use crate::hummock::compactor::{
48    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
49};
50use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
51use crate::hummock::event_handler::{
52    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
53};
54use crate::hummock::iterator::change_log::ChangeLogIterator;
55use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
56use crate::hummock::local_version::recent_versions::RecentVersions;
57use crate::hummock::observer_manager::HummockObserverNode;
58use crate::hummock::store::vector_writer::HummockVectorWriter;
59use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
60use crate::hummock::utils::{wait_for_epoch, wait_for_update};
61use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
62use crate::hummock::{
63    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
64    MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
65};
66use crate::mem_table::ImmutableMemtable;
67use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
68use crate::opts::StorageOpts;
69use crate::store::*;
70
71struct HummockStorageShutdownGuard {
72    shutdown_sender: HummockEventSender,
73}
74
75impl Drop for HummockStorageShutdownGuard {
76    fn drop(&mut self) {
77        let _ = self
78            .shutdown_sender
79            .send(HummockEvent::Shutdown)
80            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
81    }
82}
83
84/// `HummockStorage` is the entry point of the Hummock state store backend.
85/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
86/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
87/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
88/// Hummock is the state store backend.
89#[derive(Clone)]
90pub struct HummockStorage {
91    hummock_event_sender: HummockEventSender,
92    // only used in test for setting hummock version in uploader
93    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
94
95    context: CompactorContext,
96
97    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
98
99    object_id_manager: ObjectIdManagerRef,
100
101    buffer_tracker: BufferTracker,
102
103    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
104
105    recent_versions: Arc<ArcSwap<RecentVersions>>,
106
107    hummock_version_reader: HummockVersionReader,
108
109    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
110
111    read_version_mapping: ReadOnlyReadVersionMapping,
112
113    backup_reader: BackupReaderRef,
114
115    write_limiter: WriteLimiterRef,
116
117    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
118
119    hummock_meta_client: Arc<dyn HummockMetaClient>,
120
121    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
122}
123
124pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
125
126pub fn get_committed_read_version_tuple(
127    version: PinnedVersion,
128    table_id: TableId,
129    mut key_range: TableKeyRange,
130    epoch: HummockEpoch,
131) -> (TableKeyRange, ReadVersionTuple) {
132    if let Some(table_watermarks) = version.table_watermarks.get(&table_id)
133        && let WatermarkSerdeType::PkPrefix = table_watermarks.watermark_type
134    {
135        PkPrefixTableWatermarksIndex::new_committed(
136            table_watermarks.clone(),
137            version
138                .state_table_info
139                .info()
140                .get(&table_id)
141                .expect("should exist when having table watermark")
142                .committed_epoch,
143        )
144        .rewrite_range_with_table_watermark(epoch, &mut key_range)
145    }
146    (key_range, (vec![], vec![], version))
147}
148
149impl HummockStorage {
150    /// Creates a [`HummockStorage`].
151    #[allow(clippy::too_many_arguments)]
152    pub async fn new(
153        options: Arc<StorageOpts>,
154        sstable_store: SstableStoreRef,
155        hummock_meta_client: Arc<dyn HummockMetaClient>,
156        notification_client: impl NotificationClient,
157        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
158        state_store_metrics: Arc<HummockStateStoreMetrics>,
159        compactor_metrics: Arc<CompactorMetrics>,
160        await_tree_config: Option<await_tree::Config>,
161    ) -> HummockResult<Self> {
162        let object_id_manager = Arc::new(ObjectIdManager::new(
163            hummock_meta_client.clone(),
164            options.sstable_id_remote_fetch_number,
165        ));
166        let backup_reader = BackupReader::new(
167            &options.backup_storage_url,
168            &options.backup_storage_directory,
169            &options.object_store_config,
170        )
171        .await
172        .map_err(HummockError::read_backup_error)?;
173        let write_limiter = Arc::new(WriteLimiter::default());
174        let (version_update_tx, mut version_update_rx) = unbounded_channel();
175
176        let observer_manager = ObserverManager::new(
177            notification_client,
178            HummockObserverNode::new(
179                compaction_catalog_manager_ref.clone(),
180                backup_reader.clone(),
181                version_update_tx.clone(),
182                write_limiter.clone(),
183            ),
184        )
185        .await;
186        observer_manager.start().await;
187
188        let hummock_version = match version_update_rx.recv().await {
189            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
190            _ => unreachable!(
191                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
192            ),
193        };
194
195        let (pin_version_tx, pin_version_rx) = unbounded_channel();
196        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
197        tokio::spawn(start_pinned_version_worker(
198            pin_version_rx,
199            hummock_meta_client.clone(),
200            options.max_version_pinning_duration_sec,
201        ));
202
203        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
204
205        let compactor_context = CompactorContext::new_local_compact_context(
206            options.clone(),
207            sstable_store.clone(),
208            compactor_metrics.clone(),
209            await_tree_reg.clone(),
210        );
211
212        let hummock_event_handler = HummockEventHandler::new(
213            version_update_rx,
214            pinned_version,
215            compactor_context.clone(),
216            compaction_catalog_manager_ref.clone(),
217            object_id_manager.clone(),
218            state_store_metrics.clone(),
219        );
220
221        let event_tx = hummock_event_handler.event_sender();
222
223        let instance = Self {
224            context: compactor_context,
225            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
226            object_id_manager,
227            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
228            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
229            hummock_event_sender: event_tx.clone(),
230            _version_update_sender: version_update_tx,
231            recent_versions: hummock_event_handler.recent_versions(),
232            hummock_version_reader: HummockVersionReader::new(
233                sstable_store,
234                state_store_metrics.clone(),
235                options.max_preload_io_retry_times,
236            ),
237            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
238                shutdown_sender: event_tx,
239            }),
240            read_version_mapping: hummock_event_handler.read_version_mapping(),
241            backup_reader,
242            write_limiter,
243            compact_await_tree_reg: await_tree_reg,
244            hummock_meta_client,
245            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
246                options.time_travel_version_cache_capacity,
247            )),
248        };
249
250        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
251
252        Ok(instance)
253    }
254}
255
256impl HummockStorageReadSnapshot {
257    /// Gets the value of a specified `key` in the table specified in `read_options`.
258    /// The result is based on a snapshot corresponding to the given `epoch`.
259    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
260    ///
261    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
262    /// the key is not found. If `Err()` is returned, the searching for the key
263    /// failed due to other non-EOF errors.
264    async fn get_inner<'a, O>(
265        &'a self,
266        key: TableKey<Bytes>,
267        read_options: ReadOptions,
268        on_key_value_fn: impl KeyValueFn<'a, O>,
269    ) -> StorageResult<Option<O>> {
270        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
271
272        let (key_range, read_version_tuple) =
273            self.build_read_version_tuple(self.epoch, key_range).await?;
274
275        if is_empty_key_range(&key_range) {
276            return Ok(None);
277        }
278
279        self.hummock_version_reader
280            .get(
281                key,
282                self.epoch.get_epoch(),
283                self.table_id,
284                read_options,
285                read_version_tuple,
286                on_key_value_fn,
287            )
288            .await
289    }
290
291    async fn iter_inner(
292        &self,
293        key_range: TableKeyRange,
294        read_options: ReadOptions,
295    ) -> StorageResult<HummockStorageIterator> {
296        let (key_range, read_version_tuple) =
297            self.build_read_version_tuple(self.epoch, key_range).await?;
298
299        self.hummock_version_reader
300            .iter(
301                key_range,
302                self.epoch.get_epoch(),
303                self.table_id,
304                read_options,
305                read_version_tuple,
306            )
307            .await
308    }
309
310    async fn rev_iter_inner(
311        &self,
312        key_range: TableKeyRange,
313        read_options: ReadOptions,
314    ) -> StorageResult<HummockStorageRevIterator> {
315        let (key_range, read_version_tuple) =
316            self.build_read_version_tuple(self.epoch, key_range).await?;
317
318        self.hummock_version_reader
319            .rev_iter(
320                key_range,
321                self.epoch.get_epoch(),
322                self.table_id,
323                read_options,
324                read_version_tuple,
325                None,
326            )
327            .await
328    }
329
330    async fn get_time_travel_version(
331        &self,
332        epoch: u64,
333        table_id: TableId,
334    ) -> StorageResult<PinnedVersion> {
335        let meta_client = self.hummock_meta_client.clone();
336        let fetch = async move {
337            let pb_version = meta_client
338                .get_version_by_epoch(epoch, table_id.table_id())
339                .await
340                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
341                .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
342            let version = HummockVersion::from_rpc_protobuf(&pb_version);
343            let (tx, _rx) = unbounded_channel();
344            Ok(PinnedVersion::new(version, tx))
345        };
346        let version = self
347            .simple_time_travel_version_cache
348            .get_or_insert(table_id.table_id, epoch, fetch)
349            .await?;
350        Ok(version)
351    }
352
353    async fn build_read_version_tuple(
354        &self,
355        epoch: HummockReadEpoch,
356        key_range: TableKeyRange,
357    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
358        match epoch {
359            HummockReadEpoch::Backup(epoch) => {
360                self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
361                    .await
362            }
363            HummockReadEpoch::Committed(epoch)
364            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
365            | HummockReadEpoch::TimeTravel(epoch) => {
366                self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
367                    .await
368            }
369            HummockReadEpoch::NoWait(epoch) => {
370                self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
371            }
372        }
373    }
374
375    async fn build_read_version_tuple_from_backup(
376        &self,
377        epoch: u64,
378        table_id: TableId,
379        key_range: TableKeyRange,
380    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
381        match self
382            .backup_reader
383            .try_get_hummock_version(table_id, epoch)
384            .await
385        {
386            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
387                backup_version,
388                table_id,
389                key_range,
390                epoch,
391            )),
392            Ok(None) => Err(HummockError::read_backup_error(format!(
393                "backup include epoch {} not found",
394                epoch
395            ))
396            .into()),
397            Err(e) => Err(e),
398        }
399    }
400
401    async fn get_epoch_hummock_version(
402        &self,
403        epoch: u64,
404        table_id: TableId,
405    ) -> StorageResult<PinnedVersion> {
406        match self
407            .recent_versions
408            .load()
409            .get_safe_version(table_id, epoch)
410        {
411            Some(version) => Ok(version),
412            None => self.get_time_travel_version(epoch, table_id).await,
413        }
414    }
415
416    async fn build_read_version_tuple_from_committed(
417        &self,
418        epoch: u64,
419        table_id: TableId,
420        key_range: TableKeyRange,
421    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
422        let version = self.get_epoch_hummock_version(epoch, table_id).await?;
423        Ok(get_committed_read_version_tuple(
424            version, table_id, key_range, epoch,
425        ))
426    }
427
428    fn build_read_version_tuple_from_all(
429        &self,
430        epoch: u64,
431        table_id: TableId,
432        key_range: TableKeyRange,
433    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
434        let pinned_version = self.recent_versions.load().latest_version().clone();
435        let info = pinned_version.state_table_info.info().get(&table_id);
436
437        // check epoch if lower mce
438        let ret = if let Some(info) = info
439            && epoch <= info.committed_epoch
440        {
441            if epoch < info.committed_epoch {
442                return Err(
443                    HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
444                );
445            }
446            // read committed_version directly without build snapshot
447            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
448        } else {
449            let vnode = vnode(&key_range);
450            let mut matched_replicated_read_version_cnt = 0;
451            let read_version_vec = {
452                let read_guard = self.read_version_mapping.read();
453                read_guard
454                    .get(&table_id)
455                    .map(|v| {
456                        v.values()
457                            .filter(|v| {
458                                let read_version = v.read();
459                                if read_version.contains(vnode) {
460                                    if read_version.is_replicated() {
461                                        matched_replicated_read_version_cnt += 1;
462                                        false
463                                    } else {
464                                        // Only non-replicated read version with matched vnode is considered
465                                        true
466                                    }
467                                } else {
468                                    false
469                                }
470                            })
471                            .cloned()
472                            .collect_vec()
473                    })
474                    .unwrap_or_default()
475            };
476
477            // When the system has just started and no state has been created, the memory state
478            // may be empty
479            if read_version_vec.is_empty() {
480                let table_committed_epoch = info.map(|info| info.committed_epoch);
481                if matched_replicated_read_version_cnt > 0 {
482                    tracing::warn!(
483                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
484                        table_id,
485                        vnode.to_index(),
486                        epoch,
487                        matched_replicated_read_version_cnt,
488                        table_committed_epoch,
489                    );
490                } else {
491                    tracing::debug!(
492                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
493                        table_id,
494                        vnode.to_index(),
495                        epoch,
496                        table_committed_epoch
497                    );
498                }
499                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
500            } else {
501                if read_version_vec.len() != 1 {
502                    let read_version_vnodes = read_version_vec
503                        .into_iter()
504                        .map(|v| {
505                            let v = v.read();
506                            v.vnodes().iter_ones().collect_vec()
507                        })
508                        .collect_vec();
509                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
510                }
511                read_filter_for_version(
512                    epoch,
513                    table_id,
514                    key_range,
515                    read_version_vec.first().unwrap(),
516                )?
517            }
518        };
519
520        Ok(ret)
521    }
522}
523
524impl HummockStorage {
525    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
526        let (tx, rx) = tokio::sync::oneshot::channel();
527        self.hummock_event_sender
528            .send(HummockEvent::RegisterReadVersion {
529                table_id: option.table_id,
530                new_read_version_sender: tx,
531                is_replicated: option.is_replicated,
532                vnodes: option.vnodes.clone(),
533            })
534            .unwrap();
535
536        let (basic_read_version, instance_guard) = rx.await.unwrap();
537        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
538        LocalHummockStorage::new(
539            instance_guard,
540            basic_read_version,
541            self.hummock_version_reader.clone(),
542            self.hummock_event_sender.clone(),
543            self.buffer_tracker.get_memory_limiter().clone(),
544            self.write_limiter.clone(),
545            option,
546            version_update_notifier_tx,
547            self.context.storage_opts.mem_table_spill_threshold,
548        )
549    }
550
551    pub async fn clear_shared_buffer(&self) {
552        let (tx, rx) = oneshot::channel();
553        self.hummock_event_sender
554            .send(HummockEvent::Clear(tx, None))
555            .expect("should send success");
556        rx.await.expect("should wait success");
557    }
558
559    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
560        if !table_ids.is_empty() {
561            let (tx, rx) = oneshot::channel();
562            self.hummock_event_sender
563                .send(HummockEvent::Clear(tx, Some(table_ids)))
564                .expect("should send success");
565            rx.await.expect("should wait success");
566        }
567    }
568
569    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
570    /// include data of two or more syncs.
571    // TODO: remove this method when we support spill task that can include data of more two or more syncs
572    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
573        let _ = self
574            .hummock_event_sender
575            .send(HummockEvent::StartEpoch { epoch, table_ids });
576    }
577
578    pub fn sstable_store(&self) -> SstableStoreRef {
579        self.context.sstable_store.clone()
580    }
581
582    pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
583        &self.object_id_manager
584    }
585
586    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
587        self.compaction_catalog_manager_ref.clone()
588    }
589
590    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
591        self.buffer_tracker.get_memory_limiter().clone()
592    }
593
594    pub fn get_pinned_version(&self) -> PinnedVersion {
595        self.recent_versions.load().latest_version().clone()
596    }
597
598    pub fn backup_reader(&self) -> BackupReaderRef {
599        self.backup_reader.clone()
600    }
601
602    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
603        self.compact_await_tree_reg.as_ref()
604    }
605
606    pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
607        let (tx, rx) = oneshot::channel();
608        self.hummock_event_sender
609            .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
610            .expect("should send success");
611        rx.await.expect("should await success")
612    }
613
614    pub async fn sync(
615        &self,
616        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
617    ) -> StorageResult<SyncResult> {
618        let (tx, rx) = oneshot::channel();
619        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
620            sync_result_sender: tx,
621            sync_table_epochs,
622        });
623        let synced_data = rx
624            .await
625            .map_err(|_| HummockError::other("failed to receive sync result"))??;
626        Ok(synced_data.into_sync_result())
627    }
628}
629
630#[derive(Clone)]
631pub struct HummockStorageReadSnapshot {
632    epoch: HummockReadEpoch,
633    table_id: TableId,
634    recent_versions: Arc<ArcSwap<RecentVersions>>,
635    hummock_version_reader: HummockVersionReader,
636    read_version_mapping: ReadOnlyReadVersionMapping,
637    backup_reader: BackupReaderRef,
638    hummock_meta_client: Arc<dyn HummockMetaClient>,
639    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
640}
641
642impl StateStoreGet for HummockStorageReadSnapshot {
643    fn on_key_value<'a, O: Send + 'a>(
644        &'a self,
645        key: TableKey<Bytes>,
646        read_options: ReadOptions,
647        on_key_value_fn: impl KeyValueFn<'a, O>,
648    ) -> impl StorageFuture<'a, Option<O>> {
649        self.get_inner(key, read_options, on_key_value_fn)
650    }
651}
652
653impl StateStoreRead for HummockStorageReadSnapshot {
654    type Iter = HummockStorageIterator;
655    type RevIter = HummockStorageRevIterator;
656
657    fn iter(
658        &self,
659        key_range: TableKeyRange,
660        read_options: ReadOptions,
661    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
662        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
663        assert_eq!(
664            r_vnode_exclusive - l_vnode_inclusive,
665            1,
666            "read range {:?} for table {} iter contains more than one vnode",
667            key_range,
668            self.table_id
669        );
670        self.iter_inner(key_range, read_options)
671    }
672
673    fn rev_iter(
674        &self,
675        key_range: TableKeyRange,
676        read_options: ReadOptions,
677    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
678        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
679        assert_eq!(
680            r_vnode_exclusive - l_vnode_inclusive,
681            1,
682            "read range {:?} for table {} iter contains more than one vnode",
683            key_range,
684            self.table_id
685        );
686        self.rev_iter_inner(key_range, read_options)
687    }
688}
689
690impl StateStoreReadVector for HummockStorageReadSnapshot {
691    async fn nearest<'a, O: Send + 'a>(
692        &'a self,
693        vec: VectorRef<'a>,
694        options: VectorNearestOptions,
695        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
696    ) -> StorageResult<Vec<O>> {
697        let version = match self.epoch {
698            HummockReadEpoch::Committed(epoch)
699            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
700            | HummockReadEpoch::TimeTravel(epoch) => {
701                self.get_epoch_hummock_version(epoch, self.table_id).await?
702            }
703            HummockReadEpoch::Backup(epoch) => self
704                .backup_reader
705                .try_get_hummock_version(self.table_id, epoch)
706                .await?
707                .ok_or_else(|| {
708                    HummockError::read_backup_error(format!(
709                        "backup include epoch {} not found",
710                        epoch
711                    ))
712                })?,
713            HummockReadEpoch::NoWait(_) => {
714                return Err(
715                    HummockError::other("nearest query does not support NoWait epoch").into(),
716                );
717            }
718        };
719        dispatch_distance_measurement!(options.measure, MeasurementType, {
720            Ok(self
721                .hummock_version_reader
722                .nearest::<MeasurementType, O>(
723                    version,
724                    self.table_id,
725                    vec,
726                    options,
727                    on_nearest_item_fn,
728                )
729                .await?)
730        })
731    }
732}
733
734impl StateStoreReadLog for HummockStorage {
735    type ChangeLogIter = ChangeLogIterator;
736
737    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
738        fn next_epoch(
739            version: &LocalHummockVersion,
740            epoch: u64,
741            table_id: TableId,
742        ) -> HummockResult<Option<u64>> {
743            let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
744                HummockError::next_epoch(format!("table {} has been dropped", table_id))
745            })?;
746            table_change_log.next_epoch(epoch).map_err(|_| {
747                HummockError::next_epoch(format!(
748                    "invalid epoch {}, change log epoch: {:?}",
749                    epoch,
750                    table_change_log.epochs().collect_vec()
751                ))
752            })
753        }
754        {
755            // fast path
756            let recent_versions = self.recent_versions.load();
757            if let Some(next_epoch) =
758                next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
759            {
760                return Ok(next_epoch);
761            }
762        }
763        let mut next_epoch_ret = None;
764        wait_for_update(
765            &self.version_update_notifier_tx,
766            |version| {
767                if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
768                    next_epoch_ret = Some(next_epoch);
769                    Ok(true)
770                } else {
771                    Ok(false)
772                }
773            },
774            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
775        )
776        .await?;
777        Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
778    }
779
780    async fn iter_log(
781        &self,
782        epoch_range: (u64, u64),
783        key_range: TableKeyRange,
784        options: ReadLogOptions,
785    ) -> StorageResult<Self::ChangeLogIter> {
786        let version = self.recent_versions.load().latest_version().clone();
787        let iter = self
788            .hummock_version_reader
789            .iter_log(version, epoch_range, key_range, options)
790            .await?;
791        Ok(iter)
792    }
793}
794
795impl HummockStorage {
796    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
797    /// we will only check whether it is le `sealed_epoch` and won't wait.
798    async fn try_wait_epoch_impl(
799        &self,
800        wait_epoch: HummockReadEpoch,
801        table_id: TableId,
802    ) -> StorageResult<()> {
803        tracing::debug!(
804            "try_wait_epoch: epoch: {:?}, table_id: {}",
805            wait_epoch,
806            table_id
807        );
808        match wait_epoch {
809            HummockReadEpoch::Committed(wait_epoch) => {
810                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
811                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
812            }
813            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
814                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
815                // fast path by checking recent_versions
816                {
817                    let recent_versions = self.recent_versions.load();
818                    let latest_version = recent_versions.latest_version();
819                    if latest_version.id >= wait_version_id
820                        && let Some(committed_epoch) =
821                            latest_version.table_committed_epoch(table_id)
822                        && committed_epoch >= wait_epoch
823                    {
824                        return Ok(());
825                    }
826                }
827                wait_for_update(
828                    &self.version_update_notifier_tx,
829                    |version| {
830                        if wait_version_id > version.id() {
831                            return Ok(false);
832                        }
833                        let committed_epoch =
834                            version.table_committed_epoch(table_id).ok_or_else(|| {
835                                // In batch query, since we have ensured that the current version must be after the
836                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
837                                // the table must have been dropped.
838                                HummockError::wait_epoch(format!(
839                                    "table id {} has been dropped",
840                                    table_id
841                                ))
842                            })?;
843                        Ok(committed_epoch >= wait_epoch)
844                    },
845                    || {
846                        format!(
847                            "try_wait_epoch: epoch: {}, version_id: {:?}",
848                            wait_epoch, wait_version_id
849                        )
850                    },
851                )
852                .await?;
853            }
854            _ => {}
855        };
856        Ok(())
857    }
858}
859
860impl StateStore for HummockStorage {
861    type Local = LocalHummockStorage;
862    type ReadSnapshot = HummockStorageReadSnapshot;
863    type VectorWriter = HummockVectorWriter;
864
865    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
866    /// we will only check whether it is le `sealed_epoch` and won't wait.
867    async fn try_wait_epoch(
868        &self,
869        wait_epoch: HummockReadEpoch,
870        options: TryWaitEpochOptions,
871    ) -> StorageResult<()> {
872        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
873    }
874
875    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
876        self.new_local_inner(option)
877    }
878
879    async fn new_read_snapshot(
880        &self,
881        epoch: HummockReadEpoch,
882        options: NewReadSnapshotOptions,
883    ) -> StorageResult<Self::ReadSnapshot> {
884        self.try_wait_epoch_impl(epoch, options.table_id).await?;
885        Ok(HummockStorageReadSnapshot {
886            epoch,
887            table_id: options.table_id,
888            recent_versions: self.recent_versions.clone(),
889            hummock_version_reader: self.hummock_version_reader.clone(),
890            read_version_mapping: self.read_version_mapping.clone(),
891            backup_reader: self.backup_reader.clone(),
892            hummock_meta_client: self.hummock_meta_client.clone(),
893            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
894        })
895    }
896
897    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
898        HummockVectorWriter::new(
899            options.table_id,
900            self.version_update_notifier_tx.clone(),
901            self.context.sstable_store.clone(),
902            self.object_id_manager.clone(),
903            self.hummock_event_sender.clone(),
904            self.context.storage_opts.clone(),
905        )
906    }
907}
908
909#[cfg(any(test, feature = "test"))]
910impl HummockStorage {
911    pub async fn seal_and_sync_epoch(
912        &self,
913        epoch: u64,
914        table_ids: HashSet<TableId>,
915    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
916        self.sync(vec![(epoch, table_ids)]).await
917    }
918
919    /// Used in the compaction test tool
920    pub async fn update_version_and_wait(&self, version: HummockVersion) {
921        use tokio::task::yield_now;
922        let version_id = version.id;
923        self._version_update_sender
924            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
925            .unwrap();
926        loop {
927            if self.recent_versions.load().latest_version().id() >= version_id {
928                break;
929            }
930
931            yield_now().await
932        }
933    }
934
935    pub async fn wait_version(&self, version: HummockVersion) {
936        use tokio::task::yield_now;
937        loop {
938            if self.recent_versions.load().latest_version().id() >= version.id {
939                break;
940            }
941
942            yield_now().await
943        }
944    }
945
946    pub fn get_shared_buffer_size(&self) -> usize {
947        self.buffer_tracker.get_buffer_size()
948    }
949
950    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
951    pub async fn for_test(
952        options: Arc<StorageOpts>,
953        sstable_store: SstableStoreRef,
954        hummock_meta_client: Arc<dyn HummockMetaClient>,
955        notification_client: impl NotificationClient,
956    ) -> HummockResult<Self> {
957        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
958            FakeRemoteTableAccessor {},
959        )));
960
961        Self::new(
962            options,
963            sstable_store,
964            hummock_meta_client,
965            notification_client,
966            compaction_catalog_manager,
967            Arc::new(HummockStateStoreMetrics::unused()),
968            Arc::new(CompactorMetrics::unused()),
969            None,
970        )
971        .await
972    }
973
974    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
975        &self.context.storage_opts
976    }
977
978    pub fn version_reader(&self) -> &HummockVersionReader {
979        &self.hummock_version_reader
980    }
981
982    pub async fn wait_version_update(
983        &self,
984        old_id: risingwave_hummock_sdk::HummockVersionId,
985    ) -> risingwave_hummock_sdk::HummockVersionId {
986        use tokio::task::yield_now;
987        loop {
988            let cur_id = self.recent_versions.load().latest_version().id();
989            if cur_id > old_id {
990                return cur_id;
991            }
992            yield_now().await;
993        }
994    }
995}