risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::key::{
29    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
30};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
33use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
34use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
35use risingwave_rpc_client::HummockMetaClient;
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
38use tokio::sync::oneshot;
39
40use super::local_hummock_storage::LocalHummockStorage;
41use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43#[cfg(any(test, feature = "test"))]
44use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
45use crate::error::StorageResult;
46use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
47use crate::hummock::compactor::{
48    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
49};
50use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
51use crate::hummock::event_handler::{
52    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
53};
54use crate::hummock::iterator::change_log::ChangeLogIterator;
55use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
56use crate::hummock::local_version::recent_versions::RecentVersions;
57use crate::hummock::observer_manager::HummockObserverNode;
58use crate::hummock::store::vector_writer::HummockVectorWriter;
59use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
60use crate::hummock::utils::{wait_for_epoch, wait_for_update};
61use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
62use crate::hummock::{
63    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
64    MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
65};
66use crate::mem_table::ImmutableMemtable;
67use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
68use crate::opts::StorageOpts;
69use crate::store::*;
70
71struct HummockStorageShutdownGuard {
72    shutdown_sender: HummockEventSender,
73}
74
75impl Drop for HummockStorageShutdownGuard {
76    fn drop(&mut self) {
77        let _ = self
78            .shutdown_sender
79            .send(HummockEvent::Shutdown)
80            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
81    }
82}
83
84/// `HummockStorage` is the entry point of the Hummock state store backend.
85/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
86/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
87/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
88/// Hummock is the state store backend.
89#[derive(Clone)]
90pub struct HummockStorage {
91    hummock_event_sender: HummockEventSender,
92    // only used in test for setting hummock version in uploader
93    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
94
95    context: CompactorContext,
96
97    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
98
99    object_id_manager: ObjectIdManagerRef,
100
101    buffer_tracker: BufferTracker,
102
103    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
104
105    recent_versions: Arc<ArcSwap<RecentVersions>>,
106
107    hummock_version_reader: HummockVersionReader,
108
109    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
110
111    read_version_mapping: ReadOnlyReadVersionMapping,
112
113    backup_reader: BackupReaderRef,
114
115    write_limiter: WriteLimiterRef,
116
117    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
118
119    hummock_meta_client: Arc<dyn HummockMetaClient>,
120
121    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
122}
123
124pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
125
126pub fn get_committed_read_version_tuple(
127    version: PinnedVersion,
128    table_id: TableId,
129    mut key_range: TableKeyRange,
130    epoch: HummockEpoch,
131) -> (TableKeyRange, ReadVersionTuple) {
132    if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
133        TableWatermarksIndex::new_committed(
134            table_watermarks.clone(),
135            version
136                .state_table_info
137                .info()
138                .get(&table_id)
139                .expect("should exist when having table watermark")
140                .committed_epoch,
141            table_watermarks.watermark_type,
142        )
143        .rewrite_range_with_table_watermark(epoch, &mut key_range)
144    }
145    (key_range, (vec![], vec![], version))
146}
147
148impl HummockStorage {
149    /// Creates a [`HummockStorage`].
150    #[allow(clippy::too_many_arguments)]
151    pub async fn new(
152        options: Arc<StorageOpts>,
153        sstable_store: SstableStoreRef,
154        hummock_meta_client: Arc<dyn HummockMetaClient>,
155        notification_client: impl NotificationClient,
156        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
157        state_store_metrics: Arc<HummockStateStoreMetrics>,
158        compactor_metrics: Arc<CompactorMetrics>,
159        await_tree_config: Option<await_tree::Config>,
160    ) -> HummockResult<Self> {
161        let object_id_manager = Arc::new(ObjectIdManager::new(
162            hummock_meta_client.clone(),
163            options.sstable_id_remote_fetch_number,
164        ));
165        let backup_reader = BackupReader::new(
166            &options.backup_storage_url,
167            &options.backup_storage_directory,
168            &options.object_store_config,
169        )
170        .await
171        .map_err(HummockError::read_backup_error)?;
172        let write_limiter = Arc::new(WriteLimiter::default());
173        let (version_update_tx, mut version_update_rx) = unbounded_channel();
174
175        let observer_manager = ObserverManager::new(
176            notification_client,
177            HummockObserverNode::new(
178                compaction_catalog_manager_ref.clone(),
179                backup_reader.clone(),
180                version_update_tx.clone(),
181                write_limiter.clone(),
182            ),
183        )
184        .await;
185        observer_manager.start().await;
186
187        let hummock_version = match version_update_rx.recv().await {
188            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
189            _ => unreachable!(
190                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
191            ),
192        };
193
194        let (pin_version_tx, pin_version_rx) = unbounded_channel();
195        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
196        tokio::spawn(start_pinned_version_worker(
197            pin_version_rx,
198            hummock_meta_client.clone(),
199            options.max_version_pinning_duration_sec,
200        ));
201
202        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
203
204        let compactor_context = CompactorContext::new_local_compact_context(
205            options.clone(),
206            sstable_store.clone(),
207            compactor_metrics.clone(),
208            await_tree_reg.clone(),
209        );
210
211        let hummock_event_handler = HummockEventHandler::new(
212            version_update_rx,
213            pinned_version,
214            compactor_context.clone(),
215            compaction_catalog_manager_ref.clone(),
216            object_id_manager.clone(),
217            state_store_metrics.clone(),
218        );
219
220        let event_tx = hummock_event_handler.event_sender();
221
222        let instance = Self {
223            context: compactor_context,
224            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
225            object_id_manager,
226            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
227            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
228            hummock_event_sender: event_tx.clone(),
229            _version_update_sender: version_update_tx,
230            recent_versions: hummock_event_handler.recent_versions(),
231            hummock_version_reader: HummockVersionReader::new(
232                sstable_store,
233                state_store_metrics.clone(),
234                options.max_preload_io_retry_times,
235            ),
236            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
237                shutdown_sender: event_tx,
238            }),
239            read_version_mapping: hummock_event_handler.read_version_mapping(),
240            backup_reader,
241            write_limiter,
242            compact_await_tree_reg: await_tree_reg,
243            hummock_meta_client,
244            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
245                options.time_travel_version_cache_capacity,
246            )),
247        };
248
249        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
250
251        Ok(instance)
252    }
253}
254
255impl HummockStorageReadSnapshot {
256    /// Gets the value of a specified `key` in the table specified in `read_options`.
257    /// The result is based on a snapshot corresponding to the given `epoch`.
258    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
259    ///
260    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
261    /// the key is not found. If `Err()` is returned, the searching for the key
262    /// failed due to other non-EOF errors.
263    async fn get_inner<'a, O>(
264        &'a self,
265        key: TableKey<Bytes>,
266        read_options: ReadOptions,
267        on_key_value_fn: impl KeyValueFn<'a, O>,
268    ) -> StorageResult<Option<O>> {
269        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
270
271        let (key_range, read_version_tuple) =
272            self.build_read_version_tuple(self.epoch, key_range).await?;
273
274        if is_empty_key_range(&key_range) {
275            return Ok(None);
276        }
277
278        self.hummock_version_reader
279            .get(
280                key,
281                self.epoch.get_epoch(),
282                self.table_id,
283                self.table_option,
284                read_options,
285                read_version_tuple,
286                on_key_value_fn,
287            )
288            .await
289    }
290
291    async fn iter_inner(
292        &self,
293        key_range: TableKeyRange,
294        read_options: ReadOptions,
295    ) -> StorageResult<HummockStorageIterator> {
296        let (key_range, read_version_tuple) =
297            self.build_read_version_tuple(self.epoch, key_range).await?;
298
299        self.hummock_version_reader
300            .iter(
301                key_range,
302                self.epoch.get_epoch(),
303                self.table_id,
304                self.table_option,
305                read_options,
306                read_version_tuple,
307            )
308            .await
309    }
310
311    async fn rev_iter_inner(
312        &self,
313        key_range: TableKeyRange,
314        read_options: ReadOptions,
315    ) -> StorageResult<HummockStorageRevIterator> {
316        let (key_range, read_version_tuple) =
317            self.build_read_version_tuple(self.epoch, key_range).await?;
318
319        self.hummock_version_reader
320            .rev_iter(
321                key_range,
322                self.epoch.get_epoch(),
323                self.table_id,
324                self.table_option,
325                read_options,
326                read_version_tuple,
327                None,
328            )
329            .await
330    }
331
332    async fn get_time_travel_version(
333        &self,
334        epoch: u64,
335        table_id: TableId,
336    ) -> StorageResult<PinnedVersion> {
337        let meta_client = self.hummock_meta_client.clone();
338        let fetch = async move {
339            let pb_version = meta_client
340                .get_version_by_epoch(epoch, table_id)
341                .await
342                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
343                .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
344            let version = HummockVersion::from_rpc_protobuf(&pb_version);
345            let (tx, _rx) = unbounded_channel();
346            Ok(PinnedVersion::new(version, tx))
347        };
348        let version = self
349            .simple_time_travel_version_cache
350            .get_or_insert(table_id, epoch, fetch)
351            .await?;
352        Ok(version)
353    }
354
355    async fn build_read_version_tuple(
356        &self,
357        epoch: HummockReadEpoch,
358        key_range: TableKeyRange,
359    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
360        match epoch {
361            HummockReadEpoch::Backup(epoch) => {
362                self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
363                    .await
364            }
365            HummockReadEpoch::Committed(epoch) => {
366                let tuple = self
367                    .build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
368                    .await?;
369                let (_, (_, _, version)) = &tuple;
370                let Some(committed_epoch) = version.table_committed_epoch(self.table_id) else {
371                    return Err(HummockError::other(format!(
372                        "table {} not found in version",
373                        self.table_id
374                    ))
375                    .into());
376                };
377                if committed_epoch != epoch {
378                    return Err(HummockError::wait_epoch(format!(
379                        "mismatch table {} committed_epoch {} for read committed_epoch {}",
380                        self.table_id, committed_epoch, epoch
381                    ))
382                    .into());
383                }
384                Ok(tuple)
385            }
386            HummockReadEpoch::BatchQueryCommitted(epoch, _)
387            | HummockReadEpoch::TimeTravel(epoch) => {
388                self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
389                    .await
390            }
391            HummockReadEpoch::NoWait(epoch) => {
392                self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
393                    .await
394            }
395        }
396    }
397
398    async fn build_read_version_tuple_from_backup(
399        &self,
400        epoch: u64,
401        table_id: TableId,
402        key_range: TableKeyRange,
403    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
404        match self
405            .backup_reader
406            .try_get_hummock_version(table_id, epoch)
407            .await
408        {
409            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
410                backup_version,
411                table_id,
412                key_range,
413                epoch,
414            )),
415            Ok(None) => Err(HummockError::read_backup_error(format!(
416                "backup include epoch {} not found",
417                epoch
418            ))
419            .into()),
420            Err(e) => Err(e),
421        }
422    }
423
424    async fn get_epoch_hummock_version(
425        &self,
426        epoch: u64,
427        table_id: TableId,
428    ) -> StorageResult<PinnedVersion> {
429        match self
430            .recent_versions
431            .load()
432            .get_safe_version(table_id, epoch)
433        {
434            Some(version) => Ok(version),
435            None => self.get_time_travel_version(epoch, table_id).await,
436        }
437    }
438
439    async fn build_read_version_tuple_from_committed(
440        &self,
441        epoch: u64,
442        table_id: TableId,
443        key_range: TableKeyRange,
444    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
445        let version = self.get_epoch_hummock_version(epoch, table_id).await?;
446        Ok(get_committed_read_version_tuple(
447            version, table_id, key_range, epoch,
448        ))
449    }
450
451    async fn build_read_version_tuple_from_all(
452        &self,
453        epoch: u64,
454        table_id: TableId,
455        key_range: TableKeyRange,
456    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
457        let pinned_version = self.recent_versions.load().latest_version().clone();
458        let info = pinned_version.state_table_info.info().get(&table_id);
459
460        // check epoch if lower mce
461        let ret = if let Some(info) = info
462            && epoch <= info.committed_epoch
463        {
464            let pinned_version = if epoch < info.committed_epoch {
465                pinned_version
466            } else {
467                self.get_epoch_hummock_version(epoch, table_id).await?
468            };
469            // read committed_version directly without build snapshot
470            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
471        } else {
472            let vnode = vnode(&key_range);
473            let mut matched_replicated_read_version_cnt = 0;
474            let read_version_vec = {
475                let read_guard = self.read_version_mapping.read();
476                read_guard
477                    .get(&table_id)
478                    .map(|v| {
479                        v.values()
480                            .filter(|v| {
481                                let read_version = v.read();
482                                if read_version.is_initialized() && read_version.contains(vnode) {
483                                    if read_version.is_replicated() {
484                                        matched_replicated_read_version_cnt += 1;
485                                        false
486                                    } else {
487                                        // Only non-replicated read version with matched vnode is considered
488                                        true
489                                    }
490                                } else {
491                                    false
492                                }
493                            })
494                            .cloned()
495                            .collect_vec()
496                    })
497                    .unwrap_or_default()
498            };
499
500            // When the system has just started and no state has been created, the memory state
501            // may be empty
502            if read_version_vec.is_empty() {
503                let table_committed_epoch = info.map(|info| info.committed_epoch);
504                if matched_replicated_read_version_cnt > 0 {
505                    tracing::warn!(
506                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
507                        table_id,
508                        vnode.to_index(),
509                        epoch,
510                        matched_replicated_read_version_cnt,
511                        table_committed_epoch,
512                    );
513                } else {
514                    tracing::debug!(
515                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
516                        table_id,
517                        vnode.to_index(),
518                        epoch,
519                        table_committed_epoch
520                    );
521                }
522                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
523            } else {
524                if read_version_vec.len() != 1 {
525                    let read_version_vnodes = read_version_vec
526                        .into_iter()
527                        .map(|v| {
528                            let v = v.read();
529                            v.vnodes().iter_ones().collect_vec()
530                        })
531                        .collect_vec();
532                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
533                }
534                read_filter_for_version(
535                    epoch,
536                    table_id,
537                    key_range,
538                    read_version_vec.first().unwrap(),
539                )?
540            }
541        };
542
543        Ok(ret)
544    }
545}
546
547impl HummockStorage {
548    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
549        let (tx, rx) = tokio::sync::oneshot::channel();
550        self.hummock_event_sender
551            .send(HummockEvent::RegisterReadVersion {
552                table_id: option.table_id,
553                new_read_version_sender: tx,
554                is_replicated: option.is_replicated,
555                vnodes: option.vnodes.clone(),
556            })
557            .unwrap();
558
559        let (basic_read_version, instance_guard) = rx.await.unwrap();
560        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
561        LocalHummockStorage::new(
562            instance_guard,
563            basic_read_version,
564            self.hummock_version_reader.clone(),
565            self.hummock_event_sender.clone(),
566            self.buffer_tracker.get_memory_limiter().clone(),
567            self.write_limiter.clone(),
568            option,
569            version_update_notifier_tx,
570            self.context.storage_opts.mem_table_spill_threshold,
571        )
572    }
573
574    pub async fn clear_shared_buffer(&self) {
575        let (tx, rx) = oneshot::channel();
576        self.hummock_event_sender
577            .send(HummockEvent::Clear(tx, None))
578            .expect("should send success");
579        rx.await.expect("should wait success");
580    }
581
582    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
583        if !table_ids.is_empty() {
584            let (tx, rx) = oneshot::channel();
585            self.hummock_event_sender
586                .send(HummockEvent::Clear(tx, Some(table_ids)))
587                .expect("should send success");
588            rx.await.expect("should wait success");
589        }
590    }
591
592    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
593    /// include data of two or more syncs.
594    // TODO: remove this method when we support spill task that can include data of more two or more syncs
595    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
596        let _ = self
597            .hummock_event_sender
598            .send(HummockEvent::StartEpoch { epoch, table_ids });
599    }
600
601    pub fn sstable_store(&self) -> SstableStoreRef {
602        self.context.sstable_store.clone()
603    }
604
605    pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
606        &self.object_id_manager
607    }
608
609    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
610        self.compaction_catalog_manager_ref.clone()
611    }
612
613    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
614        self.buffer_tracker.get_memory_limiter().clone()
615    }
616
617    pub fn get_pinned_version(&self) -> PinnedVersion {
618        self.recent_versions.load().latest_version().clone()
619    }
620
621    pub fn backup_reader(&self) -> BackupReaderRef {
622        self.backup_reader.clone()
623    }
624
625    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
626        self.compact_await_tree_reg.as_ref()
627    }
628
629    pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
630        let (tx, rx) = oneshot::channel();
631        self.hummock_event_sender
632            .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
633            .expect("should send success");
634        rx.await.expect("should await success")
635    }
636
637    pub async fn sync(
638        &self,
639        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
640    ) -> StorageResult<SyncResult> {
641        let (tx, rx) = oneshot::channel();
642        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
643            sync_result_sender: tx,
644            sync_table_epochs,
645        });
646        let synced_data = rx
647            .await
648            .map_err(|_| HummockError::other("failed to receive sync result"))??;
649        Ok(synced_data.into_sync_result())
650    }
651}
652
653#[derive(Clone)]
654pub struct HummockStorageReadSnapshot {
655    epoch: HummockReadEpoch,
656    table_id: TableId,
657    table_option: TableOption,
658    recent_versions: Arc<ArcSwap<RecentVersions>>,
659    hummock_version_reader: HummockVersionReader,
660    read_version_mapping: ReadOnlyReadVersionMapping,
661    backup_reader: BackupReaderRef,
662    hummock_meta_client: Arc<dyn HummockMetaClient>,
663    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
664}
665
666impl StateStoreGet for HummockStorageReadSnapshot {
667    fn on_key_value<'a, O: Send + 'a>(
668        &'a self,
669        key: TableKey<Bytes>,
670        read_options: ReadOptions,
671        on_key_value_fn: impl KeyValueFn<'a, O>,
672    ) -> impl StorageFuture<'a, Option<O>> {
673        self.get_inner(key, read_options, on_key_value_fn)
674    }
675}
676
677impl StateStoreRead for HummockStorageReadSnapshot {
678    type Iter = HummockStorageIterator;
679    type RevIter = HummockStorageRevIterator;
680
681    fn iter(
682        &self,
683        key_range: TableKeyRange,
684        read_options: ReadOptions,
685    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
686        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
687        assert_eq!(
688            r_vnode_exclusive - l_vnode_inclusive,
689            1,
690            "read range {:?} for table {} iter contains more than one vnode",
691            key_range,
692            self.table_id
693        );
694        self.iter_inner(key_range, read_options)
695    }
696
697    fn rev_iter(
698        &self,
699        key_range: TableKeyRange,
700        read_options: ReadOptions,
701    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
702        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
703        assert_eq!(
704            r_vnode_exclusive - l_vnode_inclusive,
705            1,
706            "read range {:?} for table {} iter contains more than one vnode",
707            key_range,
708            self.table_id
709        );
710        self.rev_iter_inner(key_range, read_options)
711    }
712}
713
714impl StateStoreReadVector for HummockStorageReadSnapshot {
715    async fn nearest<'a, O: Send + 'a>(
716        &'a self,
717        vec: VectorRef<'a>,
718        options: VectorNearestOptions,
719        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
720    ) -> StorageResult<Vec<O>> {
721        let version = match self.epoch {
722            HummockReadEpoch::Committed(epoch)
723            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
724            | HummockReadEpoch::TimeTravel(epoch) => {
725                self.get_epoch_hummock_version(epoch, self.table_id).await?
726            }
727            HummockReadEpoch::Backup(epoch) => self
728                .backup_reader
729                .try_get_hummock_version(self.table_id, epoch)
730                .await?
731                .ok_or_else(|| {
732                    HummockError::read_backup_error(format!(
733                        "backup include epoch {} not found",
734                        epoch
735                    ))
736                })?,
737            HummockReadEpoch::NoWait(_) => {
738                return Err(
739                    HummockError::other("nearest query does not support NoWait epoch").into(),
740                );
741            }
742        };
743        dispatch_distance_measurement!(options.measure, MeasurementType, {
744            Ok(self
745                .hummock_version_reader
746                .nearest::<MeasurementType, O>(
747                    version,
748                    self.table_id,
749                    vec,
750                    options,
751                    on_nearest_item_fn,
752                )
753                .await?)
754        })
755    }
756}
757
758impl StateStoreReadLog for HummockStorage {
759    type ChangeLogIter = ChangeLogIterator;
760
761    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
762        fn next_epoch(
763            version: &LocalHummockVersion,
764            epoch: u64,
765            table_id: TableId,
766        ) -> HummockResult<Option<u64>> {
767            let table_change_log = version.table_change_log.get(&table_id).ok_or_else(|| {
768                HummockError::next_epoch(format!("table {} has been dropped", table_id))
769            })?;
770            table_change_log.next_epoch(epoch).map_err(|_| {
771                HummockError::next_epoch(format!(
772                    "invalid epoch {}, change log epoch: {:?}",
773                    epoch,
774                    table_change_log.epochs().collect_vec()
775                ))
776            })
777        }
778        {
779            // fast path
780            let recent_versions = self.recent_versions.load();
781            if let Some(next_epoch) =
782                next_epoch(recent_versions.latest_version(), epoch, options.table_id)?
783            {
784                return Ok(next_epoch);
785            }
786        }
787        let mut next_epoch_ret = None;
788        wait_for_update(
789            &self.version_update_notifier_tx,
790            |version| {
791                if let Some(next_epoch) = next_epoch(version, epoch, options.table_id)? {
792                    next_epoch_ret = Some(next_epoch);
793                    Ok(true)
794                } else {
795                    Ok(false)
796                }
797            },
798            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
799        )
800        .await?;
801        Ok(next_epoch_ret.expect("should be set before wait_for_update returns"))
802    }
803
804    async fn iter_log(
805        &self,
806        epoch_range: (u64, u64),
807        key_range: TableKeyRange,
808        options: ReadLogOptions,
809    ) -> StorageResult<Self::ChangeLogIter> {
810        let version = self.recent_versions.load().latest_version().clone();
811        let iter = self
812            .hummock_version_reader
813            .iter_log(version, epoch_range, key_range, options)
814            .await?;
815        Ok(iter)
816    }
817}
818
819impl HummockStorage {
820    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
821    /// we will only check whether it is le `sealed_epoch` and won't wait.
822    async fn try_wait_epoch_impl(
823        &self,
824        wait_epoch: HummockReadEpoch,
825        table_id: TableId,
826    ) -> StorageResult<()> {
827        tracing::debug!(
828            "try_wait_epoch: epoch: {:?}, table_id: {}",
829            wait_epoch,
830            table_id
831        );
832        match wait_epoch {
833            HummockReadEpoch::Committed(wait_epoch) => {
834                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
835                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
836            }
837            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
838                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
839                // fast path by checking recent_versions
840                {
841                    let recent_versions = self.recent_versions.load();
842                    let latest_version = recent_versions.latest_version();
843                    if latest_version.id >= wait_version_id
844                        && let Some(committed_epoch) =
845                            latest_version.table_committed_epoch(table_id)
846                        && committed_epoch >= wait_epoch
847                    {
848                        return Ok(());
849                    }
850                }
851                wait_for_update(
852                    &self.version_update_notifier_tx,
853                    |version| {
854                        if wait_version_id > version.id() {
855                            return Ok(false);
856                        }
857                        let committed_epoch =
858                            version.table_committed_epoch(table_id).ok_or_else(|| {
859                                // In batch query, since we have ensured that the current version must be after the
860                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
861                                // the table must have been dropped.
862                                HummockError::wait_epoch(format!(
863                                    "table id {} has been dropped",
864                                    table_id
865                                ))
866                            })?;
867                        Ok(committed_epoch >= wait_epoch)
868                    },
869                    || {
870                        format!(
871                            "try_wait_epoch: epoch: {}, version_id: {:?}",
872                            wait_epoch, wait_version_id
873                        )
874                    },
875                )
876                .await?;
877            }
878            _ => {}
879        };
880        Ok(())
881    }
882}
883
884impl StateStore for HummockStorage {
885    type Local = LocalHummockStorage;
886    type ReadSnapshot = HummockStorageReadSnapshot;
887    type VectorWriter = HummockVectorWriter;
888
889    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
890    /// we will only check whether it is le `sealed_epoch` and won't wait.
891    async fn try_wait_epoch(
892        &self,
893        wait_epoch: HummockReadEpoch,
894        options: TryWaitEpochOptions,
895    ) -> StorageResult<()> {
896        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
897    }
898
899    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
900        self.new_local_inner(option)
901    }
902
903    async fn new_read_snapshot(
904        &self,
905        epoch: HummockReadEpoch,
906        options: NewReadSnapshotOptions,
907    ) -> StorageResult<Self::ReadSnapshot> {
908        self.try_wait_epoch_impl(epoch, options.table_id).await?;
909        Ok(HummockStorageReadSnapshot {
910            epoch,
911            table_id: options.table_id,
912            table_option: options.table_option,
913            recent_versions: self.recent_versions.clone(),
914            hummock_version_reader: self.hummock_version_reader.clone(),
915            read_version_mapping: self.read_version_mapping.clone(),
916            backup_reader: self.backup_reader.clone(),
917            hummock_meta_client: self.hummock_meta_client.clone(),
918            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
919        })
920    }
921
922    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
923        HummockVectorWriter::new(
924            options.table_id,
925            self.version_update_notifier_tx.clone(),
926            self.context.sstable_store.clone(),
927            self.object_id_manager.clone(),
928            self.hummock_event_sender.clone(),
929            self.hummock_version_reader.stats().clone(),
930            self.context.storage_opts.clone(),
931        )
932    }
933}
934
935#[cfg(any(test, feature = "test"))]
936impl HummockStorage {
937    pub async fn seal_and_sync_epoch(
938        &self,
939        epoch: u64,
940        table_ids: HashSet<TableId>,
941    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
942        self.sync(vec![(epoch, table_ids)]).await
943    }
944
945    /// Used in the compaction test tool
946    pub async fn update_version_and_wait(&self, version: HummockVersion) {
947        use tokio::task::yield_now;
948        let version_id = version.id;
949        self._version_update_sender
950            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
951            .unwrap();
952        loop {
953            if self.recent_versions.load().latest_version().id() >= version_id {
954                break;
955            }
956
957            yield_now().await
958        }
959    }
960
961    pub async fn wait_version(&self, version: HummockVersion) {
962        use tokio::task::yield_now;
963        loop {
964            if self.recent_versions.load().latest_version().id() >= version.id {
965                break;
966            }
967
968            yield_now().await
969        }
970    }
971
972    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
973    pub async fn for_test(
974        options: Arc<StorageOpts>,
975        sstable_store: SstableStoreRef,
976        hummock_meta_client: Arc<dyn HummockMetaClient>,
977        notification_client: impl NotificationClient,
978    ) -> HummockResult<Self> {
979        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
980            FakeRemoteTableAccessor {},
981        )));
982
983        Self::new(
984            options,
985            sstable_store,
986            hummock_meta_client,
987            notification_client,
988            compaction_catalog_manager,
989            Arc::new(HummockStateStoreMetrics::unused()),
990            Arc::new(CompactorMetrics::unused()),
991            None,
992        )
993        .await
994    }
995
996    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
997        &self.context.storage_opts
998    }
999
1000    pub fn version_reader(&self) -> &HummockVersionReader {
1001        &self.hummock_version_reader
1002    }
1003
1004    pub async fn wait_version_update(
1005        &self,
1006        old_id: risingwave_hummock_sdk::HummockVersionId,
1007    ) -> risingwave_hummock_sdk::HummockVersionId {
1008        use tokio::task::yield_now;
1009        loop {
1010            let cur_id = self.recent_versions.load().latest_version().id();
1011            if cur_id > old_id {
1012                return cur_id;
1013            }
1014            yield_now().await;
1015        }
1016    }
1017
1018    #[cfg(any(test, feature = "test"))]
1019    pub async fn flush_events_for_test(&self) {
1020        let (tx, rx) = oneshot::channel();
1021        self.hummock_event_sender
1022            .send(HummockEvent::FlushEvent(tx))
1023            .expect("flush event should succeed");
1024        rx.await.expect("flush event receiver dropped");
1025    }
1026}