Skip to main content

risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::{Bound, Deref};
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::change_log::TableChangeLogs;
29use risingwave_hummock_sdk::key::{
30    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
31};
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
34use risingwave_hummock_sdk::version::HummockVersion;
35use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
36use risingwave_rpc_client::HummockMetaClient;
37use risingwave_rpc_client::error::RpcError;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
40use tokio::sync::oneshot;
41
42use super::local_hummock_storage::LocalHummockStorage;
43use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
44use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
45#[cfg(any(test, feature = "test"))]
46use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
47use crate::error::StorageResult;
48use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
49use crate::hummock::compactor::{
50    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
51};
52use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
53use crate::hummock::event_handler::{
54    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
55};
56use crate::hummock::iterator::change_log::ChangeLogIterator;
57use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
58use crate::hummock::local_version::recent_versions::RecentVersions;
59use crate::hummock::observer_manager::HummockObserverNode;
60use crate::hummock::store::vector_writer::HummockVectorWriter;
61use crate::hummock::table_change_log_manager::TableChangeLogManager;
62use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
63use crate::hummock::utils::{wait_for_epoch, wait_for_update};
64use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
65use crate::hummock::{
66    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
67    MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
68};
69use crate::mem_table::ImmutableMemtable;
70use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
71use crate::opts::StorageOpts;
72use crate::store::*;
73
74struct HummockStorageShutdownGuard {
75    shutdown_sender: HummockEventSender,
76}
77
78impl Drop for HummockStorageShutdownGuard {
79    fn drop(&mut self) {
80        let _ = self
81            .shutdown_sender
82            .send(HummockEvent::Shutdown)
83            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
84    }
85}
86
87/// `HummockStorage` is the entry point of the Hummock state store backend.
88/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
89/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
90/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
91/// Hummock is the state store backend.
92#[derive(Clone)]
93pub struct HummockStorage {
94    hummock_event_sender: HummockEventSender,
95    // only used in test for setting hummock version in uploader
96    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
97
98    context: CompactorContext,
99
100    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
101
102    object_id_manager: ObjectIdManagerRef,
103
104    buffer_tracker: BufferTracker,
105
106    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
107
108    recent_versions: Arc<ArcSwap<RecentVersions>>,
109
110    hummock_version_reader: HummockVersionReader,
111
112    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
113
114    read_version_mapping: ReadOnlyReadVersionMapping,
115
116    backup_reader: BackupReaderRef,
117
118    write_limiter: WriteLimiterRef,
119
120    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
121
122    hummock_meta_client: Arc<dyn HummockMetaClient>,
123
124    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
125
126    table_change_log_manager: Arc<TableChangeLogManager>,
127}
128
129pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
130
131pub fn get_committed_read_version_tuple(
132    version: PinnedVersion,
133    table_id: TableId,
134    mut key_range: TableKeyRange,
135    epoch: HummockEpoch,
136) -> (TableKeyRange, ReadVersionTuple) {
137    if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
138        TableWatermarksIndex::new_committed(
139            table_watermarks.clone(),
140            version
141                .state_table_info
142                .info()
143                .get(&table_id)
144                .expect("should exist when having table watermark")
145                .committed_epoch,
146            table_watermarks.watermark_type,
147        )
148        .rewrite_range_with_table_watermark(epoch, &mut key_range)
149    }
150    (key_range, (vec![], vec![], version))
151}
152
153impl HummockStorage {
154    /// Creates a [`HummockStorage`].
155    #[allow(clippy::too_many_arguments)]
156    pub async fn new(
157        options: Arc<StorageOpts>,
158        sstable_store: SstableStoreRef,
159        hummock_meta_client: Arc<dyn HummockMetaClient>,
160        notification_client: impl NotificationClient,
161        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
162        state_store_metrics: Arc<HummockStateStoreMetrics>,
163        compactor_metrics: Arc<CompactorMetrics>,
164        await_tree_config: Option<await_tree::Config>,
165    ) -> HummockResult<Self> {
166        let object_id_manager = Arc::new(ObjectIdManager::new(
167            hummock_meta_client.clone(),
168            options.sstable_id_remote_fetch_number,
169        ));
170        let backup_reader = BackupReader::new(
171            &options.backup_storage_url,
172            &options.backup_storage_directory,
173            &options.object_store_config,
174        )
175        .await
176        .map_err(HummockError::read_backup_error)?;
177        let write_limiter = Arc::new(WriteLimiter::default());
178        let (version_update_tx, mut version_update_rx) = unbounded_channel();
179
180        let observer_manager = ObserverManager::new(
181            notification_client,
182            HummockObserverNode::new(
183                compaction_catalog_manager_ref.clone(),
184                backup_reader.clone(),
185                version_update_tx.clone(),
186                write_limiter.clone(),
187            ),
188        )
189        .await;
190        observer_manager.start().await;
191
192        let hummock_version = match version_update_rx.recv().await {
193            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
194            _ => unreachable!(
195                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
196            ),
197        };
198
199        let (pin_version_tx, pin_version_rx) = unbounded_channel();
200        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
201        tokio::spawn(start_pinned_version_worker(
202            pin_version_rx,
203            hummock_meta_client.clone(),
204            options.max_version_pinning_duration_sec,
205        ));
206
207        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
208
209        let compactor_context = CompactorContext::new_local_compact_context(
210            options.clone(),
211            sstable_store.clone(),
212            compactor_metrics.clone(),
213            await_tree_reg.clone(),
214        );
215
216        let hummock_event_handler = HummockEventHandler::new(
217            version_update_rx,
218            pinned_version,
219            compactor_context.clone(),
220            compaction_catalog_manager_ref.clone(),
221            object_id_manager.clone(),
222            state_store_metrics.clone(),
223        );
224
225        let event_tx = hummock_event_handler.event_sender();
226        let table_change_log_manager = Arc::new(TableChangeLogManager::new(
227            options.table_change_log_cache_capacity,
228            hummock_meta_client.clone(),
229        ));
230        let instance = Self {
231            context: compactor_context,
232            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
233            object_id_manager,
234            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
235            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
236            hummock_event_sender: event_tx.clone(),
237            _version_update_sender: version_update_tx,
238            recent_versions: hummock_event_handler.recent_versions(),
239            hummock_version_reader: HummockVersionReader::new(
240                sstable_store,
241                state_store_metrics.clone(),
242                options.max_preload_io_retry_times,
243            ),
244            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
245                shutdown_sender: event_tx,
246            }),
247            read_version_mapping: hummock_event_handler.read_version_mapping(),
248            backup_reader,
249            write_limiter,
250            compact_await_tree_reg: await_tree_reg,
251            hummock_meta_client,
252            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
253                options.time_travel_version_cache_capacity,
254            )),
255            table_change_log_manager,
256        };
257
258        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
259
260        Ok(instance)
261    }
262}
263
264impl HummockStorageReadSnapshot {
265    /// Gets the value of a specified `key` in the table specified in `read_options`.
266    /// The result is based on a snapshot corresponding to the given `epoch`.
267    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
268    ///
269    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
270    /// the key is not found. If `Err()` is returned, the searching for the key
271    /// failed due to other non-EOF errors.
272    async fn get_inner<'a, O>(
273        &'a self,
274        key: TableKey<Bytes>,
275        read_options: ReadOptions,
276        on_key_value_fn: impl KeyValueFn<'a, O>,
277    ) -> StorageResult<Option<O>> {
278        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
279
280        let (key_range, read_version_tuple) =
281            self.build_read_version_tuple(self.epoch, key_range).await?;
282
283        if is_empty_key_range(&key_range) {
284            return Ok(None);
285        }
286
287        self.hummock_version_reader
288            .get(
289                key,
290                self.epoch.get_epoch(),
291                self.table_id,
292                self.table_option,
293                read_options,
294                read_version_tuple,
295                on_key_value_fn,
296            )
297            .await
298    }
299
300    async fn iter_inner(
301        &self,
302        key_range: TableKeyRange,
303        read_options: ReadOptions,
304    ) -> StorageResult<HummockStorageIterator> {
305        let (key_range, read_version_tuple) =
306            self.build_read_version_tuple(self.epoch, key_range).await?;
307
308        self.hummock_version_reader
309            .iter(
310                key_range,
311                self.epoch.get_epoch(),
312                self.table_id,
313                self.table_option,
314                read_options,
315                read_version_tuple,
316            )
317            .await
318    }
319
320    async fn rev_iter_inner(
321        &self,
322        key_range: TableKeyRange,
323        read_options: ReadOptions,
324    ) -> StorageResult<HummockStorageRevIterator> {
325        let (key_range, read_version_tuple) =
326            self.build_read_version_tuple(self.epoch, key_range).await?;
327
328        self.hummock_version_reader
329            .rev_iter(
330                key_range,
331                self.epoch.get_epoch(),
332                self.table_id,
333                self.table_option,
334                read_options,
335                read_version_tuple,
336                None,
337            )
338            .await
339    }
340
341    async fn get_time_travel_version(
342        &self,
343        epoch: u64,
344        table_id: TableId,
345    ) -> StorageResult<PinnedVersion> {
346        let meta_client = self.hummock_meta_client.clone();
347        let fetch = async move {
348            let pb_version = meta_client
349                .get_version_by_epoch(epoch, table_id)
350                .await
351                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
352                .map_err(|e| match &e {
353                    RpcError::GrpcStatus(status)
354                        if status.inner().code() == tonic::Code::OutOfRange =>
355                    {
356                        HummockError::time_travel_version_expired(table_id, epoch)
357                    }
358                    _ => HummockError::meta_error(e.to_report_string()),
359                })?;
360            let version = HummockVersion::from_rpc_protobuf(&pb_version);
361            let (tx, _rx) = unbounded_channel();
362            Ok(PinnedVersion::new(version, tx))
363        };
364        let version = self
365            .simple_time_travel_version_cache
366            .get_or_insert(table_id, epoch, fetch)
367            .await?;
368        Ok(version)
369    }
370
371    async fn build_read_version_tuple(
372        &self,
373        epoch: HummockReadEpoch,
374        key_range: TableKeyRange,
375    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
376        match epoch {
377            HummockReadEpoch::Backup(epoch) => {
378                self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
379                    .await
380            }
381            HummockReadEpoch::Committed(epoch) => {
382                let tuple = self
383                    .build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
384                    .await?;
385                let (_, (_, _, version)) = &tuple;
386                let Some(committed_epoch) = version.table_committed_epoch(self.table_id) else {
387                    return Err(HummockError::other(format!(
388                        "table {} not found in version",
389                        self.table_id
390                    ))
391                    .into());
392                };
393                if committed_epoch != epoch {
394                    return Err(HummockError::committed_epoch_mismatch(
395                        self.table_id,
396                        committed_epoch,
397                        epoch,
398                    )
399                    .into());
400                }
401                Ok(tuple)
402            }
403            HummockReadEpoch::BatchQueryCommitted(epoch, _)
404            | HummockReadEpoch::TimeTravel(epoch) => {
405                self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
406                    .await
407            }
408            HummockReadEpoch::NoWait(epoch) => {
409                self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
410                    .await
411            }
412        }
413    }
414
415    async fn build_read_version_tuple_from_backup(
416        &self,
417        epoch: u64,
418        table_id: TableId,
419        key_range: TableKeyRange,
420    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
421        match self
422            .backup_reader
423            .try_get_hummock_version(table_id, epoch)
424            .await
425        {
426            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
427                backup_version,
428                table_id,
429                key_range,
430                epoch,
431            )),
432            Ok(None) => Err(HummockError::read_backup_error(format!(
433                "backup include epoch {} not found",
434                epoch
435            ))
436            .into()),
437            Err(e) => Err(e),
438        }
439    }
440
441    async fn get_epoch_hummock_version(
442        &self,
443        epoch: u64,
444        table_id: TableId,
445    ) -> StorageResult<PinnedVersion> {
446        match self
447            .recent_versions
448            .load()
449            .get_safe_version(table_id, epoch)
450        {
451            Some(version) => Ok(version),
452            None => self.get_time_travel_version(epoch, table_id).await,
453        }
454    }
455
456    async fn build_read_version_tuple_from_committed(
457        &self,
458        epoch: u64,
459        table_id: TableId,
460        key_range: TableKeyRange,
461    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
462        let version = self.get_epoch_hummock_version(epoch, table_id).await?;
463        Ok(get_committed_read_version_tuple(
464            version, table_id, key_range, epoch,
465        ))
466    }
467
468    async fn build_read_version_tuple_from_all(
469        &self,
470        epoch: u64,
471        table_id: TableId,
472        key_range: TableKeyRange,
473    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
474        let pinned_version = self.recent_versions.load().latest_version().clone();
475        let info = pinned_version.state_table_info.info().get(&table_id);
476
477        // check epoch if lower mce
478        let ret = if let Some(info) = info
479            && epoch <= info.committed_epoch
480        {
481            let pinned_version = if epoch < info.committed_epoch {
482                pinned_version
483            } else {
484                self.get_epoch_hummock_version(epoch, table_id).await?
485            };
486            // read committed_version directly without build snapshot
487            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
488        } else {
489            let vnode = vnode(&key_range);
490            let mut matched_replicated_read_version_cnt = 0;
491            let read_version_vec = {
492                let read_guard = self.read_version_mapping.read();
493                read_guard
494                    .get(&table_id)
495                    .map(|v| {
496                        v.values()
497                            .filter(|v| {
498                                let read_version = v.read();
499                                if read_version.is_initialized() && read_version.contains(vnode) {
500                                    if read_version.is_replicated() {
501                                        matched_replicated_read_version_cnt += 1;
502                                        false
503                                    } else {
504                                        // Only non-replicated read version with matched vnode is considered
505                                        true
506                                    }
507                                } else {
508                                    false
509                                }
510                            })
511                            .cloned()
512                            .collect_vec()
513                    })
514                    .unwrap_or_default()
515            };
516
517            // When the system has just started and no state has been created, the memory state
518            // may be empty
519            if read_version_vec.is_empty() {
520                let table_committed_epoch = info.map(|info| info.committed_epoch);
521                if matched_replicated_read_version_cnt > 0 {
522                    tracing::warn!(
523                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
524                        table_id,
525                        vnode.to_index(),
526                        epoch,
527                        matched_replicated_read_version_cnt,
528                        table_committed_epoch,
529                    );
530                } else {
531                    tracing::debug!(
532                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
533                        table_id,
534                        vnode.to_index(),
535                        epoch,
536                        table_committed_epoch
537                    );
538                }
539                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
540            } else {
541                if read_version_vec.len() != 1 {
542                    let read_version_vnodes = read_version_vec
543                        .into_iter()
544                        .map(|v| {
545                            let v = v.read();
546                            v.vnodes().iter_ones().collect_vec()
547                        })
548                        .collect_vec();
549                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
550                }
551                read_filter_for_version(
552                    epoch,
553                    table_id,
554                    key_range,
555                    read_version_vec.first().unwrap(),
556                )?
557            }
558        };
559
560        Ok(ret)
561    }
562}
563
564impl HummockStorage {
565    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
566        let (tx, rx) = tokio::sync::oneshot::channel();
567        self.hummock_event_sender
568            .send(HummockEvent::RegisterReadVersion {
569                table_id: option.table_id,
570                new_read_version_sender: tx,
571                is_replicated: option.is_replicated,
572                vnodes: option.vnodes.clone(),
573            })
574            .unwrap();
575
576        let (basic_read_version, instance_guard) = rx.await.unwrap();
577        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
578        LocalHummockStorage::new(
579            instance_guard,
580            basic_read_version,
581            self.hummock_version_reader.clone(),
582            self.hummock_event_sender.clone(),
583            self.buffer_tracker.get_memory_limiter().clone(),
584            self.write_limiter.clone(),
585            option,
586            version_update_notifier_tx,
587            self.context.storage_opts.mem_table_spill_threshold,
588        )
589    }
590
591    pub async fn clear_shared_buffer(&self) {
592        let (tx, rx) = oneshot::channel();
593        self.hummock_event_sender
594            .send(HummockEvent::Clear(tx, None))
595            .expect("should send success");
596        rx.await.expect("should wait success");
597    }
598
599    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
600        if !table_ids.is_empty() {
601            let (tx, rx) = oneshot::channel();
602            self.hummock_event_sender
603                .send(HummockEvent::Clear(tx, Some(table_ids)))
604                .expect("should send success");
605            rx.await.expect("should wait success");
606        }
607    }
608
609    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
610    /// include data of two or more syncs.
611    // TODO: remove this method when we support spill task that can include data of more two or more syncs
612    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
613        let _ = self
614            .hummock_event_sender
615            .send(HummockEvent::StartEpoch { epoch, table_ids });
616    }
617
618    pub fn sstable_store(&self) -> SstableStoreRef {
619        self.context.sstable_store.clone()
620    }
621
622    pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
623        &self.object_id_manager
624    }
625
626    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
627        self.compaction_catalog_manager_ref.clone()
628    }
629
630    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
631        self.buffer_tracker.get_memory_limiter().clone()
632    }
633
634    pub fn get_pinned_version(&self) -> PinnedVersion {
635        self.recent_versions.load().latest_version().clone()
636    }
637
638    pub fn backup_reader(&self) -> BackupReaderRef {
639        self.backup_reader.clone()
640    }
641
642    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
643        self.compact_await_tree_reg.as_ref()
644    }
645
646    pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
647        let (tx, rx) = oneshot::channel();
648        self.hummock_event_sender
649            .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
650            .expect("should send success");
651        rx.await.expect("should await success")
652    }
653
654    pub async fn sync(
655        &self,
656        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
657    ) -> StorageResult<SyncResult> {
658        let (tx, rx) = oneshot::channel();
659        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
660            sync_result_sender: tx,
661            sync_table_epochs,
662        });
663        let synced_data = rx
664            .await
665            .map_err(|_| HummockError::other("failed to receive sync result"))??;
666        Ok(synced_data.into_sync_result())
667    }
668}
669
670#[derive(Clone)]
671pub struct HummockStorageReadSnapshot {
672    epoch: HummockReadEpoch,
673    table_id: TableId,
674    table_option: TableOption,
675    recent_versions: Arc<ArcSwap<RecentVersions>>,
676    hummock_version_reader: HummockVersionReader,
677    read_version_mapping: ReadOnlyReadVersionMapping,
678    backup_reader: BackupReaderRef,
679    hummock_meta_client: Arc<dyn HummockMetaClient>,
680    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
681}
682
683impl StateStoreGet for HummockStorageReadSnapshot {
684    fn on_key_value<'a, O: Send + 'a>(
685        &'a self,
686        key: TableKey<Bytes>,
687        read_options: ReadOptions,
688        on_key_value_fn: impl KeyValueFn<'a, O>,
689    ) -> impl StorageFuture<'a, Option<O>> {
690        self.get_inner(key, read_options, on_key_value_fn)
691    }
692}
693
694impl StateStoreRead for HummockStorageReadSnapshot {
695    type Iter = HummockStorageIterator;
696    type RevIter = HummockStorageRevIterator;
697
698    fn iter(
699        &self,
700        key_range: TableKeyRange,
701        read_options: ReadOptions,
702    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
703        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
704        assert_eq!(
705            r_vnode_exclusive - l_vnode_inclusive,
706            1,
707            "read range {:?} for table {} iter contains more than one vnode",
708            key_range,
709            self.table_id
710        );
711        self.iter_inner(key_range, read_options)
712    }
713
714    fn rev_iter(
715        &self,
716        key_range: TableKeyRange,
717        read_options: ReadOptions,
718    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
719        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
720        assert_eq!(
721            r_vnode_exclusive - l_vnode_inclusive,
722            1,
723            "read range {:?} for table {} iter contains more than one vnode",
724            key_range,
725            self.table_id
726        );
727        self.rev_iter_inner(key_range, read_options)
728    }
729}
730
731impl StateStoreReadVector for HummockStorageReadSnapshot {
732    async fn nearest<'a, O: Send + 'a>(
733        &'a self,
734        vec: VectorRef<'a>,
735        options: VectorNearestOptions,
736        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
737    ) -> StorageResult<Vec<O>> {
738        let version = match self.epoch {
739            HummockReadEpoch::Committed(epoch)
740            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
741            | HummockReadEpoch::TimeTravel(epoch) => {
742                self.get_epoch_hummock_version(epoch, self.table_id).await?
743            }
744            HummockReadEpoch::Backup(epoch) => self
745                .backup_reader
746                .try_get_hummock_version(self.table_id, epoch)
747                .await?
748                .ok_or_else(|| {
749                    HummockError::read_backup_error(format!(
750                        "backup include epoch {} not found",
751                        epoch
752                    ))
753                })?,
754            HummockReadEpoch::NoWait(_) => {
755                return Err(
756                    HummockError::other("nearest query does not support NoWait epoch").into(),
757                );
758            }
759        };
760        dispatch_distance_measurement!(options.measure, MeasurementType, {
761            Ok(self
762                .hummock_version_reader
763                .nearest::<MeasurementType, O>(
764                    version,
765                    self.table_id,
766                    vec,
767                    options,
768                    on_nearest_item_fn,
769                )
770                .await?)
771        })
772    }
773}
774
775impl StateStoreReadLog for HummockStorage {
776    type ChangeLogIter = ChangeLogIterator;
777
778    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
779        fn next_epoch(
780            table_change_log: &TableChangeLogs,
781            epoch: u64,
782            table_id: TableId,
783        ) -> HummockResult<Option<u64>> {
784            let table_change_log = table_change_log.get(&table_id).ok_or_else(|| {
785                HummockError::next_epoch(format!("table {} has been dropped", table_id))
786            })?;
787            table_change_log
788                .next_epoch(epoch)
789                .map_err(|_| HummockError::change_log_retention_miss(table_id, epoch))
790        }
791        {
792            // fast path
793            if let Some(max_epoch) = self
794                .recent_versions
795                .load()
796                .latest_version()
797                .deref()
798                .state_table_info
799                .info()
800                .get(&options.table_id)
801                .map(|i| i.committed_epoch)
802                && max_epoch > epoch
803            {
804                // The next epoch exists either in the same `EpochNewChangeLog` or the next `EpochNewChangeLog`, so we fetch 2 `EpochNewChangeLogCommon`.
805                let table_change_log = self
806                    .table_change_log_manager
807                    .fetch_table_change_logs(options.table_id, (epoch, max_epoch), true, Some(2))
808                    .await?;
809                if let Some(next_epoch) = next_epoch(&table_change_log, epoch, options.table_id)? {
810                    return Ok(next_epoch);
811                }
812            }
813        }
814        let mut max_epoch = None;
815        wait_for_update(
816            &self.version_update_notifier_tx,
817            |version| {
818                let Some(mce) = version
819                    .state_table_info
820                    .info()
821                    .get(&options.table_id)
822                    .map(|i| i.committed_epoch)
823                else {
824                    return Ok(false);
825                };
826                max_epoch = Some(mce);
827                Ok(mce > epoch)
828            },
829            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
830        )
831        .await?;
832        // The next epoch exists either in the same `EpochNewChangeLog` or the next `EpochNewChangeLog`, so we fetch 2 `EpochNewChangeLogCommon`.
833        let table_change_log = self
834            .table_change_log_manager
835            .fetch_table_change_logs(
836                options.table_id,
837                (epoch, max_epoch.unwrap_or(u64::MAX)),
838                true,
839                Some(2),
840            )
841            .await?;
842        let next_epoch_ret = next_epoch(&table_change_log, epoch, options.table_id)?;
843        next_epoch_ret.ok_or_else(|| {
844            HummockError::next_epoch(format!(
845                "next_epoch for {} {} should be valid",
846                options.table_id, epoch
847            ))
848            .into()
849        })
850    }
851
852    async fn iter_log(
853        &self,
854        epoch_range: (u64, u64),
855        key_range: TableKeyRange,
856        options: ReadLogOptions,
857    ) -> StorageResult<Self::ChangeLogIter> {
858        let iter = self
859            .hummock_version_reader
860            .iter_log(
861                epoch_range,
862                key_range,
863                options,
864                self.table_change_log_manager.clone(),
865            )
866            .await?;
867        Ok(iter)
868    }
869}
870
871impl HummockStorage {
872    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
873    /// we will only check whether it is le `sealed_epoch` and won't wait.
874    async fn try_wait_epoch_impl(
875        &self,
876        wait_epoch: HummockReadEpoch,
877        table_id: TableId,
878    ) -> StorageResult<()> {
879        tracing::debug!(
880            "try_wait_epoch: epoch: {:?}, table_id: {}",
881            wait_epoch,
882            table_id
883        );
884        match wait_epoch {
885            HummockReadEpoch::Committed(wait_epoch) => {
886                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
887                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
888            }
889            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
890                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
891                // fast path by checking recent_versions
892                {
893                    let recent_versions = self.recent_versions.load();
894                    let latest_version = recent_versions.latest_version();
895                    if latest_version.id >= wait_version_id
896                        && let Some(committed_epoch) =
897                            latest_version.table_committed_epoch(table_id)
898                        && committed_epoch >= wait_epoch
899                    {
900                        return Ok(());
901                    }
902                }
903                wait_for_update(
904                    &self.version_update_notifier_tx,
905                    |version| {
906                        if wait_version_id > version.id() {
907                            return Ok(false);
908                        }
909                        let committed_epoch =
910                            version.table_committed_epoch(table_id).ok_or_else(|| {
911                                // In batch query, since we have ensured that the current version must be after the
912                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
913                                // the table must have been dropped.
914                                HummockError::wait_epoch(format!(
915                                    "table id {} has been dropped",
916                                    table_id
917                                ))
918                            })?;
919                        Ok(committed_epoch >= wait_epoch)
920                    },
921                    || {
922                        format!(
923                            "try_wait_epoch: epoch: {}, version_id: {:?}",
924                            wait_epoch, wait_version_id
925                        )
926                    },
927                )
928                .await?;
929            }
930            _ => {}
931        };
932        Ok(())
933    }
934}
935
936impl StateStore for HummockStorage {
937    type Local = LocalHummockStorage;
938    type ReadSnapshot = HummockStorageReadSnapshot;
939    type VectorWriter = HummockVectorWriter;
940
941    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
942    /// we will only check whether it is le `sealed_epoch` and won't wait.
943    async fn try_wait_epoch(
944        &self,
945        wait_epoch: HummockReadEpoch,
946        options: TryWaitEpochOptions,
947    ) -> StorageResult<()> {
948        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
949    }
950
951    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
952        self.new_local_inner(option)
953    }
954
955    async fn new_read_snapshot(
956        &self,
957        epoch: HummockReadEpoch,
958        options: NewReadSnapshotOptions,
959    ) -> StorageResult<Self::ReadSnapshot> {
960        self.try_wait_epoch_impl(epoch, options.table_id).await?;
961        Ok(HummockStorageReadSnapshot {
962            epoch,
963            table_id: options.table_id,
964            table_option: options.table_option,
965            recent_versions: self.recent_versions.clone(),
966            hummock_version_reader: self.hummock_version_reader.clone(),
967            read_version_mapping: self.read_version_mapping.clone(),
968            backup_reader: self.backup_reader.clone(),
969            hummock_meta_client: self.hummock_meta_client.clone(),
970            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
971        })
972    }
973
974    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
975        HummockVectorWriter::new(
976            options.table_id,
977            self.version_update_notifier_tx.clone(),
978            self.context.sstable_store.clone(),
979            self.object_id_manager.clone(),
980            self.hummock_event_sender.clone(),
981            self.hummock_version_reader.stats().clone(),
982            self.context.storage_opts.clone(),
983        )
984    }
985}
986
987#[cfg(any(test, feature = "test"))]
988impl HummockStorage {
989    pub async fn seal_and_sync_epoch(
990        &self,
991        epoch: u64,
992        table_ids: HashSet<TableId>,
993    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
994        self.sync(vec![(epoch, table_ids)]).await
995    }
996
997    /// Used in the compaction test tool
998    pub async fn update_version_and_wait(&self, version: HummockVersion) {
999        use tokio::task::yield_now;
1000        let version_id = version.id;
1001        self._version_update_sender
1002            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
1003            .unwrap();
1004        loop {
1005            if self.recent_versions.load().latest_version().id() >= version_id {
1006                break;
1007            }
1008
1009            yield_now().await
1010        }
1011    }
1012
1013    pub async fn wait_version(&self, version: HummockVersion) {
1014        use tokio::task::yield_now;
1015        loop {
1016            if self.recent_versions.load().latest_version().id() >= version.id {
1017                break;
1018            }
1019
1020            yield_now().await
1021        }
1022    }
1023
1024    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
1025    pub async fn for_test(
1026        options: Arc<StorageOpts>,
1027        sstable_store: SstableStoreRef,
1028        hummock_meta_client: Arc<dyn HummockMetaClient>,
1029        notification_client: impl NotificationClient,
1030    ) -> HummockResult<Self> {
1031        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
1032            FakeRemoteTableAccessor {},
1033        )));
1034
1035        Self::new(
1036            options,
1037            sstable_store,
1038            hummock_meta_client,
1039            notification_client,
1040            compaction_catalog_manager,
1041            Arc::new(HummockStateStoreMetrics::unused()),
1042            Arc::new(CompactorMetrics::unused()),
1043            None,
1044        )
1045        .await
1046    }
1047
1048    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
1049        &self.context.storage_opts
1050    }
1051
1052    pub fn version_reader(&self) -> &HummockVersionReader {
1053        &self.hummock_version_reader
1054    }
1055
1056    pub async fn wait_version_update(
1057        &self,
1058        old_id: risingwave_hummock_sdk::HummockVersionId,
1059    ) -> risingwave_hummock_sdk::HummockVersionId {
1060        use tokio::task::yield_now;
1061        loop {
1062            let cur_id = self.recent_versions.load().latest_version().id();
1063            if cur_id > old_id {
1064                return cur_id;
1065            }
1066            yield_now().await;
1067        }
1068    }
1069
1070    #[cfg(any(test, feature = "test"))]
1071    pub async fn flush_events_for_test(&self) {
1072        let (tx, rx) = oneshot::channel();
1073        self.hummock_event_sender
1074            .send(HummockEvent::FlushEvent(tx))
1075            .expect("flush event should succeed");
1076        rx.await.expect("flush event receiver dropped");
1077    }
1078}