risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::util::epoch::is_max_epoch;
25use risingwave_common_service::{NotificationClient, ObserverManager};
26use risingwave_hummock_sdk::key::{
27    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_watermark::{PkPrefixTableWatermarksIndex, WatermarkSerdeType};
31use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
32use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
33use risingwave_rpc_client::HummockMetaClient;
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
36use tokio::sync::oneshot;
37
38use super::local_hummock_storage::LocalHummockStorage;
39use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
40use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
41#[cfg(any(test, feature = "test"))]
42use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
43use crate::dispatch_measurement;
44use crate::error::StorageResult;
45use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
46use crate::hummock::compactor::{
47    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
48};
49use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
50use crate::hummock::event_handler::{
51    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
52};
53use crate::hummock::iterator::change_log::ChangeLogIterator;
54use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
55use crate::hummock::local_version::recent_versions::RecentVersions;
56use crate::hummock::observer_manager::HummockObserverNode;
57use crate::hummock::store::vector_writer::HummockVectorWriter;
58use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
59use crate::hummock::utils::{wait_for_epoch, wait_for_update};
60use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
61use crate::hummock::{
62    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
63    MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
64};
65use crate::mem_table::ImmutableMemtable;
66use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
67use crate::opts::StorageOpts;
68use crate::store::*;
69
70struct HummockStorageShutdownGuard {
71    shutdown_sender: HummockEventSender,
72}
73
74impl Drop for HummockStorageShutdownGuard {
75    fn drop(&mut self) {
76        let _ = self
77            .shutdown_sender
78            .send(HummockEvent::Shutdown)
79            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
80    }
81}
82
83/// `HummockStorage` is the entry point of the Hummock state store backend.
84/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
85/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
86/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
87/// Hummock is the state store backend.
88#[derive(Clone)]
89pub struct HummockStorage {
90    hummock_event_sender: HummockEventSender,
91    // only used in test for setting hummock version in uploader
92    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
93
94    context: CompactorContext,
95
96    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
97
98    object_id_manager: ObjectIdManagerRef,
99
100    buffer_tracker: BufferTracker,
101
102    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
103
104    recent_versions: Arc<ArcSwap<RecentVersions>>,
105
106    hummock_version_reader: HummockVersionReader,
107
108    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
109
110    read_version_mapping: ReadOnlyReadVersionMapping,
111
112    backup_reader: BackupReaderRef,
113
114    write_limiter: WriteLimiterRef,
115
116    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
117
118    hummock_meta_client: Arc<dyn HummockMetaClient>,
119
120    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
121}
122
123pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
124
125pub fn get_committed_read_version_tuple(
126    version: PinnedVersion,
127    table_id: TableId,
128    mut key_range: TableKeyRange,
129    epoch: HummockEpoch,
130) -> (TableKeyRange, ReadVersionTuple) {
131    if let Some(table_watermarks) = version.table_watermarks.get(&table_id)
132        && let WatermarkSerdeType::PkPrefix = table_watermarks.watermark_type
133    {
134        PkPrefixTableWatermarksIndex::new_committed(
135            table_watermarks.clone(),
136            version
137                .state_table_info
138                .info()
139                .get(&table_id)
140                .expect("should exist when having table watermark")
141                .committed_epoch,
142        )
143        .rewrite_range_with_table_watermark(epoch, &mut key_range)
144    }
145    (key_range, (vec![], vec![], version))
146}
147
148impl HummockStorage {
149    /// Creates a [`HummockStorage`].
150    #[allow(clippy::too_many_arguments)]
151    pub async fn new(
152        options: Arc<StorageOpts>,
153        sstable_store: SstableStoreRef,
154        hummock_meta_client: Arc<dyn HummockMetaClient>,
155        notification_client: impl NotificationClient,
156        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
157        state_store_metrics: Arc<HummockStateStoreMetrics>,
158        compactor_metrics: Arc<CompactorMetrics>,
159        await_tree_config: Option<await_tree::Config>,
160    ) -> HummockResult<Self> {
161        let object_id_manager = Arc::new(ObjectIdManager::new(
162            hummock_meta_client.clone(),
163            options.sstable_id_remote_fetch_number,
164        ));
165        let backup_reader = BackupReader::new(
166            &options.backup_storage_url,
167            &options.backup_storage_directory,
168            &options.object_store_config,
169        )
170        .await
171        .map_err(HummockError::read_backup_error)?;
172        let write_limiter = Arc::new(WriteLimiter::default());
173        let (version_update_tx, mut version_update_rx) = unbounded_channel();
174
175        let observer_manager = ObserverManager::new(
176            notification_client,
177            HummockObserverNode::new(
178                compaction_catalog_manager_ref.clone(),
179                backup_reader.clone(),
180                version_update_tx.clone(),
181                write_limiter.clone(),
182            ),
183        )
184        .await;
185        observer_manager.start().await;
186
187        let hummock_version = match version_update_rx.recv().await {
188            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
189            _ => unreachable!(
190                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
191            ),
192        };
193
194        let (pin_version_tx, pin_version_rx) = unbounded_channel();
195        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
196        tokio::spawn(start_pinned_version_worker(
197            pin_version_rx,
198            hummock_meta_client.clone(),
199            options.max_version_pinning_duration_sec,
200        ));
201
202        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
203
204        let compactor_context = CompactorContext::new_local_compact_context(
205            options.clone(),
206            sstable_store.clone(),
207            compactor_metrics.clone(),
208            await_tree_reg.clone(),
209        );
210
211        let hummock_event_handler = HummockEventHandler::new(
212            version_update_rx,
213            pinned_version,
214            compactor_context.clone(),
215            compaction_catalog_manager_ref.clone(),
216            object_id_manager.clone(),
217            state_store_metrics.clone(),
218        );
219
220        let event_tx = hummock_event_handler.event_sender();
221
222        let instance = Self {
223            context: compactor_context,
224            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
225            object_id_manager,
226            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
227            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
228            hummock_event_sender: event_tx.clone(),
229            _version_update_sender: version_update_tx,
230            recent_versions: hummock_event_handler.recent_versions(),
231            hummock_version_reader: HummockVersionReader::new(
232                sstable_store,
233                state_store_metrics.clone(),
234                options.max_preload_io_retry_times,
235            ),
236            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
237                shutdown_sender: event_tx,
238            }),
239            read_version_mapping: hummock_event_handler.read_version_mapping(),
240            backup_reader,
241            write_limiter,
242            compact_await_tree_reg: await_tree_reg,
243            hummock_meta_client,
244            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
245                options.time_travel_version_cache_capacity,
246            )),
247        };
248
249        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
250
251        Ok(instance)
252    }
253}
254
255impl HummockStorageReadSnapshot {
256    /// Gets the value of a specified `key` in the table specified in `read_options`.
257    /// The result is based on a snapshot corresponding to the given `epoch`.
258    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
259    ///
260    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
261    /// the key is not found. If `Err()` is returned, the searching for the key
262    /// failed due to other non-EOF errors.
263    async fn get_inner<O>(
264        &self,
265        key: TableKey<Bytes>,
266        read_options: ReadOptions,
267        on_key_value_fn: impl KeyValueFn<O>,
268    ) -> StorageResult<Option<O>> {
269        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
270
271        let (key_range, read_version_tuple) =
272            self.build_read_version_tuple(self.epoch, key_range).await?;
273
274        if is_empty_key_range(&key_range) {
275            return Ok(None);
276        }
277
278        self.hummock_version_reader
279            .get(
280                key,
281                self.epoch.get_epoch(),
282                self.table_id,
283                read_options,
284                read_version_tuple,
285                on_key_value_fn,
286            )
287            .await
288    }
289
290    async fn iter_inner(
291        &self,
292        key_range: TableKeyRange,
293        read_options: ReadOptions,
294    ) -> StorageResult<HummockStorageIterator> {
295        let (key_range, read_version_tuple) =
296            self.build_read_version_tuple(self.epoch, key_range).await?;
297
298        self.hummock_version_reader
299            .iter(
300                key_range,
301                self.epoch.get_epoch(),
302                self.table_id,
303                read_options,
304                read_version_tuple,
305            )
306            .await
307    }
308
309    async fn rev_iter_inner(
310        &self,
311        key_range: TableKeyRange,
312        read_options: ReadOptions,
313    ) -> StorageResult<HummockStorageRevIterator> {
314        let (key_range, read_version_tuple) =
315            self.build_read_version_tuple(self.epoch, key_range).await?;
316
317        self.hummock_version_reader
318            .rev_iter(
319                key_range,
320                self.epoch.get_epoch(),
321                self.table_id,
322                read_options,
323                read_version_tuple,
324                None,
325            )
326            .await
327    }
328
329    async fn get_time_travel_version(
330        &self,
331        epoch: u64,
332        table_id: TableId,
333    ) -> StorageResult<PinnedVersion> {
334        let meta_client = self.hummock_meta_client.clone();
335        let fetch = async move {
336            let pb_version = meta_client
337                .get_version_by_epoch(epoch, table_id.table_id())
338                .await
339                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
340                .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
341            let version = HummockVersion::from_rpc_protobuf(&pb_version);
342            let (tx, _rx) = unbounded_channel();
343            Ok(PinnedVersion::new(version, tx))
344        };
345        let version = self
346            .simple_time_travel_version_cache
347            .get_or_insert(table_id.table_id, epoch, fetch)
348            .await?;
349        Ok(version)
350    }
351
352    async fn build_read_version_tuple(
353        &self,
354        epoch: HummockReadEpoch,
355        key_range: TableKeyRange,
356    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
357        match epoch {
358            HummockReadEpoch::Backup(epoch) => {
359                self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
360                    .await
361            }
362            HummockReadEpoch::Committed(epoch)
363            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
364            | HummockReadEpoch::TimeTravel(epoch) => {
365                self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
366                    .await
367            }
368            HummockReadEpoch::NoWait(epoch) => {
369                self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
370            }
371        }
372    }
373
374    async fn build_read_version_tuple_from_backup(
375        &self,
376        epoch: u64,
377        table_id: TableId,
378        key_range: TableKeyRange,
379    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
380        match self
381            .backup_reader
382            .try_get_hummock_version(table_id, epoch)
383            .await
384        {
385            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
386                backup_version,
387                table_id,
388                key_range,
389                epoch,
390            )),
391            Ok(None) => Err(HummockError::read_backup_error(format!(
392                "backup include epoch {} not found",
393                epoch
394            ))
395            .into()),
396            Err(e) => Err(e),
397        }
398    }
399
400    async fn get_epoch_hummock_version(
401        &self,
402        epoch: u64,
403        table_id: TableId,
404    ) -> StorageResult<PinnedVersion> {
405        match self
406            .recent_versions
407            .load()
408            .get_safe_version(table_id, epoch)
409        {
410            Some(version) => Ok(version),
411            None => self.get_time_travel_version(epoch, table_id).await,
412        }
413    }
414
415    async fn build_read_version_tuple_from_committed(
416        &self,
417        epoch: u64,
418        table_id: TableId,
419        key_range: TableKeyRange,
420    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
421        let version = self.get_epoch_hummock_version(epoch, table_id).await?;
422        Ok(get_committed_read_version_tuple(
423            version, table_id, key_range, epoch,
424        ))
425    }
426
427    fn build_read_version_tuple_from_all(
428        &self,
429        epoch: u64,
430        table_id: TableId,
431        key_range: TableKeyRange,
432    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
433        let pinned_version = self.recent_versions.load().latest_version().clone();
434        let info = pinned_version.state_table_info.info().get(&table_id);
435
436        // check epoch if lower mce
437        let ret = if let Some(info) = info
438            && epoch <= info.committed_epoch
439        {
440            if epoch < info.committed_epoch {
441                return Err(
442                    HummockError::expired_epoch(table_id, info.committed_epoch, epoch).into(),
443                );
444            }
445            // read committed_version directly without build snapshot
446            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
447        } else {
448            let vnode = vnode(&key_range);
449            let mut matched_replicated_read_version_cnt = 0;
450            let read_version_vec = {
451                let read_guard = self.read_version_mapping.read();
452                read_guard
453                    .get(&table_id)
454                    .map(|v| {
455                        v.values()
456                            .filter(|v| {
457                                let read_version = v.read();
458                                if read_version.contains(vnode) {
459                                    if read_version.is_replicated() {
460                                        matched_replicated_read_version_cnt += 1;
461                                        false
462                                    } else {
463                                        // Only non-replicated read version with matched vnode is considered
464                                        true
465                                    }
466                                } else {
467                                    false
468                                }
469                            })
470                            .cloned()
471                            .collect_vec()
472                    })
473                    .unwrap_or_default()
474            };
475
476            // When the system has just started and no state has been created, the memory state
477            // may be empty
478            if read_version_vec.is_empty() {
479                let table_committed_epoch = info.map(|info| info.committed_epoch);
480                if matched_replicated_read_version_cnt > 0 {
481                    tracing::warn!(
482                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
483                        table_id,
484                        vnode.to_index(),
485                        epoch,
486                        matched_replicated_read_version_cnt,
487                        table_committed_epoch,
488                    );
489                } else {
490                    tracing::debug!(
491                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
492                        table_id,
493                        vnode.to_index(),
494                        epoch,
495                        table_committed_epoch
496                    );
497                }
498                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
499            } else {
500                if read_version_vec.len() != 1 {
501                    let read_version_vnodes = read_version_vec
502                        .into_iter()
503                        .map(|v| {
504                            let v = v.read();
505                            v.vnodes().iter_ones().collect_vec()
506                        })
507                        .collect_vec();
508                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
509                }
510                read_filter_for_version(
511                    epoch,
512                    table_id,
513                    key_range,
514                    read_version_vec.first().unwrap(),
515                )?
516            }
517        };
518
519        Ok(ret)
520    }
521}
522
523impl HummockStorage {
524    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
525        let (tx, rx) = tokio::sync::oneshot::channel();
526        self.hummock_event_sender
527            .send(HummockEvent::RegisterReadVersion {
528                table_id: option.table_id,
529                new_read_version_sender: tx,
530                is_replicated: option.is_replicated,
531                vnodes: option.vnodes.clone(),
532            })
533            .unwrap();
534
535        let (basic_read_version, instance_guard) = rx.await.unwrap();
536        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
537        LocalHummockStorage::new(
538            instance_guard,
539            basic_read_version,
540            self.hummock_version_reader.clone(),
541            self.hummock_event_sender.clone(),
542            self.buffer_tracker.get_memory_limiter().clone(),
543            self.write_limiter.clone(),
544            option,
545            version_update_notifier_tx,
546            self.context.storage_opts.mem_table_spill_threshold,
547        )
548    }
549
550    pub async fn clear_shared_buffer(&self) {
551        let (tx, rx) = oneshot::channel();
552        self.hummock_event_sender
553            .send(HummockEvent::Clear(tx, None))
554            .expect("should send success");
555        rx.await.expect("should wait success");
556    }
557
558    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
559        if !table_ids.is_empty() {
560            let (tx, rx) = oneshot::channel();
561            self.hummock_event_sender
562                .send(HummockEvent::Clear(tx, Some(table_ids)))
563                .expect("should send success");
564            rx.await.expect("should wait success");
565        }
566    }
567
568    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
569    /// include data of two or more syncs.
570    // TODO: remove this method when we support spill task that can include data of more two or more syncs
571    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
572        let _ = self
573            .hummock_event_sender
574            .send(HummockEvent::StartEpoch { epoch, table_ids });
575    }
576
577    pub fn sstable_store(&self) -> SstableStoreRef {
578        self.context.sstable_store.clone()
579    }
580
581    pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
582        &self.object_id_manager
583    }
584
585    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
586        self.compaction_catalog_manager_ref.clone()
587    }
588
589    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
590        self.buffer_tracker.get_memory_limiter().clone()
591    }
592
593    pub fn get_pinned_version(&self) -> PinnedVersion {
594        self.recent_versions.load().latest_version().clone()
595    }
596
597    pub fn backup_reader(&self) -> BackupReaderRef {
598        self.backup_reader.clone()
599    }
600
601    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
602        self.compact_await_tree_reg.as_ref()
603    }
604
605    pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
606        let (tx, rx) = oneshot::channel();
607        self.hummock_event_sender
608            .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
609            .expect("should send success");
610        rx.await.expect("should await success")
611    }
612
613    pub async fn sync(
614        &self,
615        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
616    ) -> StorageResult<SyncResult> {
617        let (tx, rx) = oneshot::channel();
618        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
619            sync_result_sender: tx,
620            sync_table_epochs,
621        });
622        let synced_data = rx
623            .await
624            .map_err(|_| HummockError::other("failed to receive sync result"))??;
625        Ok(synced_data.into_sync_result())
626    }
627}
628
629#[derive(Clone)]
630pub struct HummockStorageReadSnapshot {
631    epoch: HummockReadEpoch,
632    table_id: TableId,
633    recent_versions: Arc<ArcSwap<RecentVersions>>,
634    hummock_version_reader: HummockVersionReader,
635    read_version_mapping: ReadOnlyReadVersionMapping,
636    backup_reader: BackupReaderRef,
637    hummock_meta_client: Arc<dyn HummockMetaClient>,
638    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
639}
640
641impl StateStoreGet for HummockStorageReadSnapshot {
642    fn on_key_value<O: Send + 'static>(
643        &self,
644        key: TableKey<Bytes>,
645        read_options: ReadOptions,
646        on_key_value_fn: impl KeyValueFn<O>,
647    ) -> impl StorageFuture<'_, Option<O>> {
648        self.get_inner(key, read_options, on_key_value_fn)
649    }
650}
651
652impl StateStoreRead for HummockStorageReadSnapshot {
653    type Iter = HummockStorageIterator;
654    type RevIter = HummockStorageRevIterator;
655
656    fn iter(
657        &self,
658        key_range: TableKeyRange,
659        read_options: ReadOptions,
660    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
661        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
662        assert_eq!(
663            r_vnode_exclusive - l_vnode_inclusive,
664            1,
665            "read range {:?} for table {} iter contains more than one vnode",
666            key_range,
667            self.table_id
668        );
669        self.iter_inner(key_range, read_options)
670    }
671
672    fn rev_iter(
673        &self,
674        key_range: TableKeyRange,
675        read_options: ReadOptions,
676    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
677        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
678        assert_eq!(
679            r_vnode_exclusive - l_vnode_inclusive,
680            1,
681            "read range {:?} for table {} iter contains more than one vnode",
682            key_range,
683            self.table_id
684        );
685        self.rev_iter_inner(key_range, read_options)
686    }
687}
688
689impl StateStoreReadVector for HummockStorageReadSnapshot {
690    async fn nearest<O: Send + 'static>(
691        &self,
692        vec: Vector,
693        options: VectorNearestOptions,
694        on_nearest_item_fn: impl OnNearestItemFn<O>,
695    ) -> StorageResult<Vec<O>> {
696        let version = match self.epoch {
697            HummockReadEpoch::Committed(epoch)
698            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
699            | HummockReadEpoch::TimeTravel(epoch) => {
700                self.get_epoch_hummock_version(epoch, self.table_id).await?
701            }
702            HummockReadEpoch::Backup(epoch) => self
703                .backup_reader
704                .try_get_hummock_version(self.table_id, epoch)
705                .await?
706                .ok_or_else(|| {
707                    HummockError::read_backup_error(format!(
708                        "backup include epoch {} not found",
709                        epoch
710                    ))
711                })?,
712            HummockReadEpoch::NoWait(_) => {
713                return Err(
714                    HummockError::other("nearest query does not support NoWait epoch").into(),
715                );
716            }
717        };
718        dispatch_measurement!(options.measure, MeasurementType, {
719            Ok(self
720                .hummock_version_reader
721                .nearest::<MeasurementType, O>(
722                    version,
723                    self.table_id,
724                    vec,
725                    options,
726                    on_nearest_item_fn,
727                )
728                .await?)
729        })
730    }
731}
732
733impl StateStoreReadLog for HummockStorage {
734    type ChangeLogIter = ChangeLogIterator;
735
736    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
737        fn next_epoch(
738            version: &LocalHummockVersion,
739            epoch: u64,
740            table_id: TableId,
741        ) -> HummockResult<Option<u64>> {
742            let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
743                HummockError::next_epoch(format!("table {} has been dropped", table_id))
744            })?;
745            table_change_log.next_epoch(epoch).map_err(|_| {
746                HummockError::next_epoch(format!(
747                    "invalid epoch {}, change log epoch: {:?}",
748                    epoch,
749                    table_change_log.epochs().collect_vec()
750                ))
751            })
752        }
753        {
754            // fast path
755            let recent_versions = self.recent_versions.load();
756            if let Some(next_epoch) =
757                next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
758            {
759                return Ok(next_epoch);
760            }
761        }
762        let mut next_epoch_ret = None;
763        wait_for_update(
764            &self.version_update_notifier_tx,
765            |version| {
766                if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
767                    next_epoch_ret = Some(next_epoch);
768                    Ok(true)
769                } else {
770                    Ok(false)
771                }
772            },
773            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
774        )
775        .await?;
776        Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
777    }
778
779    async fn iter_log(
780        &self,
781        epoch_range: (u64, u64),
782        key_range: TableKeyRange,
783        options: ReadLogOptions,
784    ) -> StorageResult<Self::ChangeLogIter> {
785        let version = self.recent_versions.load().latest_version().clone();
786        let iter = self
787            .hummock_version_reader
788            .iter_log(version, epoch_range, key_range, options)
789            .await?;
790        Ok(iter)
791    }
792}
793
794impl HummockStorage {
795    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
796    /// we will only check whether it is le `sealed_epoch` and won't wait.
797    async fn try_wait_epoch_impl(
798        &self,
799        wait_epoch: HummockReadEpoch,
800        table_id: TableId,
801    ) -> StorageResult<()> {
802        match wait_epoch {
803            HummockReadEpoch::Committed(wait_epoch) => {
804                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
805                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
806            }
807            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
808                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
809                // fast path by checking recent_versions
810                {
811                    let recent_versions = self.recent_versions.load();
812                    let latest_version = recent_versions.latest_version();
813                    if latest_version.id >= wait_version_id
814                        && let Some(committed_epoch) =
815                            latest_version.table_committed_epoch(table_id)
816                        && committed_epoch >= wait_epoch
817                    {
818                        return Ok(());
819                    }
820                }
821                wait_for_update(
822                    &self.version_update_notifier_tx,
823                    |version| {
824                        if wait_version_id > version.id() {
825                            return Ok(false);
826                        }
827                        let committed_epoch =
828                            version.table_committed_epoch(table_id).ok_or_else(|| {
829                                // In batch query, since we have ensured that the current version must be after the
830                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
831                                // the table must have been dropped.
832                                HummockError::wait_epoch(format!(
833                                    "table id {} has been dropped",
834                                    table_id
835                                ))
836                            })?;
837                        Ok(committed_epoch >= wait_epoch)
838                    },
839                    || {
840                        format!(
841                            "try_wait_epoch: epoch: {}, version_id: {:?}",
842                            wait_epoch, wait_version_id
843                        )
844                    },
845                )
846                .await?;
847            }
848            _ => {}
849        };
850        Ok(())
851    }
852}
853
854impl StateStore for HummockStorage {
855    type Local = LocalHummockStorage;
856    type ReadSnapshot = HummockStorageReadSnapshot;
857    type VectorWriter = HummockVectorWriter;
858
859    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
860    /// we will only check whether it is le `sealed_epoch` and won't wait.
861    async fn try_wait_epoch(
862        &self,
863        wait_epoch: HummockReadEpoch,
864        options: TryWaitEpochOptions,
865    ) -> StorageResult<()> {
866        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
867    }
868
869    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
870        self.new_local_inner(option)
871    }
872
873    async fn new_read_snapshot(
874        &self,
875        epoch: HummockReadEpoch,
876        options: NewReadSnapshotOptions,
877    ) -> StorageResult<Self::ReadSnapshot> {
878        self.try_wait_epoch_impl(epoch, options.table_id).await?;
879        Ok(HummockStorageReadSnapshot {
880            epoch,
881            table_id: options.table_id,
882            recent_versions: self.recent_versions.clone(),
883            hummock_version_reader: self.hummock_version_reader.clone(),
884            read_version_mapping: self.read_version_mapping.clone(),
885            backup_reader: self.backup_reader.clone(),
886            hummock_meta_client: self.hummock_meta_client.clone(),
887            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
888        })
889    }
890
891    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
892        HummockVectorWriter::new(
893            options.table_id,
894            self.version_update_notifier_tx.clone(),
895            self.context.sstable_store.clone(),
896            self.object_id_manager.clone(),
897            self.hummock_event_sender.clone(),
898            self.context.storage_opts.clone(),
899        )
900    }
901}
902
903#[cfg(any(test, feature = "test"))]
904impl HummockStorage {
905    pub async fn seal_and_sync_epoch(
906        &self,
907        epoch: u64,
908        table_ids: HashSet<TableId>,
909    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
910        self.sync(vec![(epoch, table_ids)]).await
911    }
912
913    /// Used in the compaction test tool
914    pub async fn update_version_and_wait(&self, version: HummockVersion) {
915        use tokio::task::yield_now;
916        let version_id = version.id;
917        self._version_update_sender
918            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
919            .unwrap();
920        loop {
921            if self.recent_versions.load().latest_version().id() >= version_id {
922                break;
923            }
924
925            yield_now().await
926        }
927    }
928
929    pub async fn wait_version(&self, version: HummockVersion) {
930        use tokio::task::yield_now;
931        loop {
932            if self.recent_versions.load().latest_version().id() >= version.id {
933                break;
934            }
935
936            yield_now().await
937        }
938    }
939
940    pub fn get_shared_buffer_size(&self) -> usize {
941        self.buffer_tracker.get_buffer_size()
942    }
943
944    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
945    pub async fn for_test(
946        options: Arc<StorageOpts>,
947        sstable_store: SstableStoreRef,
948        hummock_meta_client: Arc<dyn HummockMetaClient>,
949        notification_client: impl NotificationClient,
950    ) -> HummockResult<Self> {
951        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
952            FakeRemoteTableAccessor {},
953        )));
954
955        Self::new(
956            options,
957            sstable_store,
958            hummock_meta_client,
959            notification_client,
960            compaction_catalog_manager,
961            Arc::new(HummockStateStoreMetrics::unused()),
962            Arc::new(CompactorMetrics::unused()),
963            None,
964        )
965        .await
966    }
967
968    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
969        &self.context.storage_opts
970    }
971
972    pub fn version_reader(&self) -> &HummockVersionReader {
973        &self.hummock_version_reader
974    }
975
976    pub async fn wait_version_update(
977        &self,
978        old_id: risingwave_hummock_sdk::HummockVersionId,
979    ) -> risingwave_hummock_sdk::HummockVersionId {
980        use tokio::task::yield_now;
981        loop {
982            let cur_id = self.recent_versions.load().latest_version().id();
983            if cur_id > old_id {
984                return cur_id;
985            }
986            yield_now().await;
987        }
988    }
989}