risingwave_storage/hummock/store/
hummock_storage.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::future::Future;
17use std::ops::{Bound, Deref};
18use std::sync::Arc;
19
20use arc_swap::ArcSwap;
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::array::VectorRef;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::dispatch_distance_measurement;
26use risingwave_common::util::epoch::is_max_epoch;
27use risingwave_common_service::{NotificationClient, ObserverManager};
28use risingwave_hummock_sdk::change_log::TableChangeLogs;
29use risingwave_hummock_sdk::key::{
30    TableKey, TableKeyRange, is_empty_key_range, vnode, vnode_range,
31};
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex;
34use risingwave_hummock_sdk::version::HummockVersion;
35use risingwave_hummock_sdk::{HummockRawObjectId, HummockReadEpoch, SyncResult};
36use risingwave_rpc_client::HummockMetaClient;
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
39use tokio::sync::oneshot;
40
41use super::local_hummock_storage::LocalHummockStorage;
42use super::version::{CommittedVersion, HummockVersionReader, read_filter_for_version};
43use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
44#[cfg(any(test, feature = "test"))]
45use crate::compaction_catalog_manager::{CompactionCatalogManager, FakeRemoteTableAccessor};
46use crate::error::StorageResult;
47use crate::hummock::backup_reader::{BackupReader, BackupReaderRef};
48use crate::hummock::compactor::{
49    CompactionAwaitTreeRegRef, CompactorContext, new_compaction_await_tree_reg_ref,
50};
51use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, HummockEventSender};
52use crate::hummock::event_handler::{
53    HummockEvent, HummockEventHandler, HummockVersionUpdate, ReadOnlyReadVersionMapping,
54};
55use crate::hummock::iterator::change_log::ChangeLogIterator;
56use crate::hummock::local_version::pinned_version::{PinnedVersion, start_pinned_version_worker};
57use crate::hummock::local_version::recent_versions::RecentVersions;
58use crate::hummock::observer_manager::HummockObserverNode;
59use crate::hummock::store::vector_writer::HummockVectorWriter;
60use crate::hummock::table_change_log_manager::TableChangeLogManager;
61use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache;
62use crate::hummock::utils::{wait_for_epoch, wait_for_update};
63use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
64use crate::hummock::{
65    HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
66    MemoryLimiter, ObjectIdManager, ObjectIdManagerRef, SstableStoreRef,
67};
68use crate::mem_table::ImmutableMemtable;
69use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics};
70use crate::opts::StorageOpts;
71use crate::store::*;
72
73struct HummockStorageShutdownGuard {
74    shutdown_sender: HummockEventSender,
75}
76
77impl Drop for HummockStorageShutdownGuard {
78    fn drop(&mut self) {
79        let _ = self
80            .shutdown_sender
81            .send(HummockEvent::Shutdown)
82            .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown"));
83    }
84}
85
86/// `HummockStorage` is the entry point of the Hummock state store backend.
87/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
88/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
89/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
90/// Hummock is the state store backend.
91#[derive(Clone)]
92pub struct HummockStorage {
93    hummock_event_sender: HummockEventSender,
94    // only used in test for setting hummock version in uploader
95    _version_update_sender: UnboundedSender<HummockVersionUpdate>,
96
97    context: CompactorContext,
98
99    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
100
101    object_id_manager: ObjectIdManagerRef,
102
103    buffer_tracker: BufferTracker,
104
105    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
106
107    recent_versions: Arc<ArcSwap<RecentVersions>>,
108
109    hummock_version_reader: HummockVersionReader,
110
111    _shutdown_guard: Arc<HummockStorageShutdownGuard>,
112
113    read_version_mapping: ReadOnlyReadVersionMapping,
114
115    backup_reader: BackupReaderRef,
116
117    write_limiter: WriteLimiterRef,
118
119    compact_await_tree_reg: Option<CompactionAwaitTreeRegRef>,
120
121    hummock_meta_client: Arc<dyn HummockMetaClient>,
122
123    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
124
125    table_change_log_manager: Arc<TableChangeLogManager>,
126}
127
128pub type ReadVersionTuple = (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion);
129
130pub fn get_committed_read_version_tuple(
131    version: PinnedVersion,
132    table_id: TableId,
133    mut key_range: TableKeyRange,
134    epoch: HummockEpoch,
135) -> (TableKeyRange, ReadVersionTuple) {
136    if let Some(table_watermarks) = version.table_watermarks.get(&table_id) {
137        TableWatermarksIndex::new_committed(
138            table_watermarks.clone(),
139            version
140                .state_table_info
141                .info()
142                .get(&table_id)
143                .expect("should exist when having table watermark")
144                .committed_epoch,
145            table_watermarks.watermark_type,
146        )
147        .rewrite_range_with_table_watermark(epoch, &mut key_range)
148    }
149    (key_range, (vec![], vec![], version))
150}
151
152impl HummockStorage {
153    /// Creates a [`HummockStorage`].
154    #[allow(clippy::too_many_arguments)]
155    pub async fn new(
156        options: Arc<StorageOpts>,
157        sstable_store: SstableStoreRef,
158        hummock_meta_client: Arc<dyn HummockMetaClient>,
159        notification_client: impl NotificationClient,
160        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
161        state_store_metrics: Arc<HummockStateStoreMetrics>,
162        compactor_metrics: Arc<CompactorMetrics>,
163        await_tree_config: Option<await_tree::Config>,
164    ) -> HummockResult<Self> {
165        let object_id_manager = Arc::new(ObjectIdManager::new(
166            hummock_meta_client.clone(),
167            options.sstable_id_remote_fetch_number,
168        ));
169        let backup_reader = BackupReader::new(
170            &options.backup_storage_url,
171            &options.backup_storage_directory,
172            &options.object_store_config,
173        )
174        .await
175        .map_err(HummockError::read_backup_error)?;
176        let write_limiter = Arc::new(WriteLimiter::default());
177        let (version_update_tx, mut version_update_rx) = unbounded_channel();
178
179        let observer_manager = ObserverManager::new(
180            notification_client,
181            HummockObserverNode::new(
182                compaction_catalog_manager_ref.clone(),
183                backup_reader.clone(),
184                version_update_tx.clone(),
185                write_limiter.clone(),
186            ),
187        )
188        .await;
189        observer_manager.start().await;
190
191        let hummock_version = match version_update_rx.recv().await {
192            Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
193            _ => unreachable!(
194                "the hummock observer manager is the first one to take the event tx. Should be full hummock version"
195            ),
196        };
197
198        let (pin_version_tx, pin_version_rx) = unbounded_channel();
199        let pinned_version = PinnedVersion::new(hummock_version, pin_version_tx);
200        tokio::spawn(start_pinned_version_worker(
201            pin_version_rx,
202            hummock_meta_client.clone(),
203            options.max_version_pinning_duration_sec,
204        ));
205
206        let await_tree_reg = await_tree_config.map(new_compaction_await_tree_reg_ref);
207
208        let compactor_context = CompactorContext::new_local_compact_context(
209            options.clone(),
210            sstable_store.clone(),
211            compactor_metrics.clone(),
212            await_tree_reg.clone(),
213        );
214
215        let hummock_event_handler = HummockEventHandler::new(
216            version_update_rx,
217            pinned_version,
218            compactor_context.clone(),
219            compaction_catalog_manager_ref.clone(),
220            object_id_manager.clone(),
221            state_store_metrics.clone(),
222        );
223
224        let event_tx = hummock_event_handler.event_sender();
225        let table_change_log_manager = Arc::new(TableChangeLogManager::new(
226            options.table_change_log_cache_capacity,
227            hummock_meta_client.clone(),
228        ));
229        let instance = Self {
230            context: compactor_context,
231            compaction_catalog_manager_ref: compaction_catalog_manager_ref.clone(),
232            object_id_manager,
233            buffer_tracker: hummock_event_handler.buffer_tracker().clone(),
234            version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(),
235            hummock_event_sender: event_tx.clone(),
236            _version_update_sender: version_update_tx,
237            recent_versions: hummock_event_handler.recent_versions(),
238            hummock_version_reader: HummockVersionReader::new(
239                sstable_store,
240                state_store_metrics.clone(),
241                options.max_preload_io_retry_times,
242            ),
243            _shutdown_guard: Arc::new(HummockStorageShutdownGuard {
244                shutdown_sender: event_tx,
245            }),
246            read_version_mapping: hummock_event_handler.read_version_mapping(),
247            backup_reader,
248            write_limiter,
249            compact_await_tree_reg: await_tree_reg,
250            hummock_meta_client,
251            simple_time_travel_version_cache: Arc::new(SimpleTimeTravelVersionCache::new(
252                options.time_travel_version_cache_capacity,
253            )),
254            table_change_log_manager,
255        };
256
257        tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker());
258
259        Ok(instance)
260    }
261}
262
263impl HummockStorageReadSnapshot {
264    /// Gets the value of a specified `key` in the table specified in `read_options`.
265    /// The result is based on a snapshot corresponding to the given `epoch`.
266    /// if `key` has consistent hash virtual node value, then such value is stored in `value_meta`
267    ///
268    /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
269    /// the key is not found. If `Err()` is returned, the searching for the key
270    /// failed due to other non-EOF errors.
271    async fn get_inner<'a, O>(
272        &'a self,
273        key: TableKey<Bytes>,
274        read_options: ReadOptions,
275        on_key_value_fn: impl KeyValueFn<'a, O>,
276    ) -> StorageResult<Option<O>> {
277        let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));
278
279        let (key_range, read_version_tuple) =
280            self.build_read_version_tuple(self.epoch, key_range).await?;
281
282        if is_empty_key_range(&key_range) {
283            return Ok(None);
284        }
285
286        self.hummock_version_reader
287            .get(
288                key,
289                self.epoch.get_epoch(),
290                self.table_id,
291                self.table_option,
292                read_options,
293                read_version_tuple,
294                on_key_value_fn,
295            )
296            .await
297    }
298
299    async fn iter_inner(
300        &self,
301        key_range: TableKeyRange,
302        read_options: ReadOptions,
303    ) -> StorageResult<HummockStorageIterator> {
304        let (key_range, read_version_tuple) =
305            self.build_read_version_tuple(self.epoch, key_range).await?;
306
307        self.hummock_version_reader
308            .iter(
309                key_range,
310                self.epoch.get_epoch(),
311                self.table_id,
312                self.table_option,
313                read_options,
314                read_version_tuple,
315            )
316            .await
317    }
318
319    async fn rev_iter_inner(
320        &self,
321        key_range: TableKeyRange,
322        read_options: ReadOptions,
323    ) -> StorageResult<HummockStorageRevIterator> {
324        let (key_range, read_version_tuple) =
325            self.build_read_version_tuple(self.epoch, key_range).await?;
326
327        self.hummock_version_reader
328            .rev_iter(
329                key_range,
330                self.epoch.get_epoch(),
331                self.table_id,
332                self.table_option,
333                read_options,
334                read_version_tuple,
335                None,
336            )
337            .await
338    }
339
340    async fn get_time_travel_version(
341        &self,
342        epoch: u64,
343        table_id: TableId,
344    ) -> StorageResult<PinnedVersion> {
345        let meta_client = self.hummock_meta_client.clone();
346        let fetch = async move {
347            let pb_version = meta_client
348                .get_version_by_epoch(epoch, table_id)
349                .await
350                .inspect_err(|e| tracing::error!("{}", e.to_report_string()))
351                .map_err(|e| HummockError::meta_error(e.to_report_string()))?;
352            let version = HummockVersion::from_rpc_protobuf(&pb_version);
353            let (tx, _rx) = unbounded_channel();
354            Ok(PinnedVersion::new(version, tx))
355        };
356        let version = self
357            .simple_time_travel_version_cache
358            .get_or_insert(table_id, epoch, fetch)
359            .await?;
360        Ok(version)
361    }
362
363    async fn build_read_version_tuple(
364        &self,
365        epoch: HummockReadEpoch,
366        key_range: TableKeyRange,
367    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
368        match epoch {
369            HummockReadEpoch::Backup(epoch) => {
370                self.build_read_version_tuple_from_backup(epoch, self.table_id, key_range)
371                    .await
372            }
373            HummockReadEpoch::Committed(epoch) => {
374                let tuple = self
375                    .build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
376                    .await?;
377                let (_, (_, _, version)) = &tuple;
378                let Some(committed_epoch) = version.table_committed_epoch(self.table_id) else {
379                    return Err(HummockError::other(format!(
380                        "table {} not found in version",
381                        self.table_id
382                    ))
383                    .into());
384                };
385                if committed_epoch != epoch {
386                    return Err(HummockError::wait_epoch(format!(
387                        "mismatch table {} committed_epoch {} for read committed_epoch {}",
388                        self.table_id, committed_epoch, epoch
389                    ))
390                    .into());
391                }
392                Ok(tuple)
393            }
394            HummockReadEpoch::BatchQueryCommitted(epoch, _)
395            | HummockReadEpoch::TimeTravel(epoch) => {
396                self.build_read_version_tuple_from_committed(epoch, self.table_id, key_range)
397                    .await
398            }
399            HummockReadEpoch::NoWait(epoch) => {
400                self.build_read_version_tuple_from_all(epoch, self.table_id, key_range)
401                    .await
402            }
403        }
404    }
405
406    async fn build_read_version_tuple_from_backup(
407        &self,
408        epoch: u64,
409        table_id: TableId,
410        key_range: TableKeyRange,
411    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
412        match self
413            .backup_reader
414            .try_get_hummock_version(table_id, epoch)
415            .await
416        {
417            Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple(
418                backup_version,
419                table_id,
420                key_range,
421                epoch,
422            )),
423            Ok(None) => Err(HummockError::read_backup_error(format!(
424                "backup include epoch {} not found",
425                epoch
426            ))
427            .into()),
428            Err(e) => Err(e),
429        }
430    }
431
432    async fn get_epoch_hummock_version(
433        &self,
434        epoch: u64,
435        table_id: TableId,
436    ) -> StorageResult<PinnedVersion> {
437        match self
438            .recent_versions
439            .load()
440            .get_safe_version(table_id, epoch)
441        {
442            Some(version) => Ok(version),
443            None => self.get_time_travel_version(epoch, table_id).await,
444        }
445    }
446
447    async fn build_read_version_tuple_from_committed(
448        &self,
449        epoch: u64,
450        table_id: TableId,
451        key_range: TableKeyRange,
452    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
453        let version = self.get_epoch_hummock_version(epoch, table_id).await?;
454        Ok(get_committed_read_version_tuple(
455            version, table_id, key_range, epoch,
456        ))
457    }
458
459    async fn build_read_version_tuple_from_all(
460        &self,
461        epoch: u64,
462        table_id: TableId,
463        key_range: TableKeyRange,
464    ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
465        let pinned_version = self.recent_versions.load().latest_version().clone();
466        let info = pinned_version.state_table_info.info().get(&table_id);
467
468        // check epoch if lower mce
469        let ret = if let Some(info) = info
470            && epoch <= info.committed_epoch
471        {
472            let pinned_version = if epoch < info.committed_epoch {
473                pinned_version
474            } else {
475                self.get_epoch_hummock_version(epoch, table_id).await?
476            };
477            // read committed_version directly without build snapshot
478            get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
479        } else {
480            let vnode = vnode(&key_range);
481            let mut matched_replicated_read_version_cnt = 0;
482            let read_version_vec = {
483                let read_guard = self.read_version_mapping.read();
484                read_guard
485                    .get(&table_id)
486                    .map(|v| {
487                        v.values()
488                            .filter(|v| {
489                                let read_version = v.read();
490                                if read_version.is_initialized() && read_version.contains(vnode) {
491                                    if read_version.is_replicated() {
492                                        matched_replicated_read_version_cnt += 1;
493                                        false
494                                    } else {
495                                        // Only non-replicated read version with matched vnode is considered
496                                        true
497                                    }
498                                } else {
499                                    false
500                                }
501                            })
502                            .cloned()
503                            .collect_vec()
504                    })
505                    .unwrap_or_default()
506            };
507
508            // When the system has just started and no state has been created, the memory state
509            // may be empty
510            if read_version_vec.is_empty() {
511                let table_committed_epoch = info.map(|info| info.committed_epoch);
512                if matched_replicated_read_version_cnt > 0 {
513                    tracing::warn!(
514                        "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
515                        table_id,
516                        vnode.to_index(),
517                        epoch,
518                        matched_replicated_read_version_cnt,
519                        table_committed_epoch,
520                    );
521                } else {
522                    tracing::debug!(
523                        "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
524                        table_id,
525                        vnode.to_index(),
526                        epoch,
527                        table_committed_epoch
528                    );
529                }
530                get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
531            } else {
532                if read_version_vec.len() != 1 {
533                    let read_version_vnodes = read_version_vec
534                        .into_iter()
535                        .map(|v| {
536                            let v = v.read();
537                            v.vnodes().iter_ones().collect_vec()
538                        })
539                        .collect_vec();
540                    return Err(HummockError::other(format!("There are {} read version associated with vnode {}. read_version_vnodes={:?}", read_version_vnodes.len(), vnode.to_index(), read_version_vnodes)).into());
541                }
542                read_filter_for_version(
543                    epoch,
544                    table_id,
545                    key_range,
546                    read_version_vec.first().unwrap(),
547                )?
548            }
549        };
550
551        Ok(ret)
552    }
553}
554
555impl HummockStorage {
556    async fn new_local_inner(&self, option: NewLocalOptions) -> LocalHummockStorage {
557        let (tx, rx) = tokio::sync::oneshot::channel();
558        self.hummock_event_sender
559            .send(HummockEvent::RegisterReadVersion {
560                table_id: option.table_id,
561                new_read_version_sender: tx,
562                is_replicated: option.is_replicated,
563                vnodes: option.vnodes.clone(),
564            })
565            .unwrap();
566
567        let (basic_read_version, instance_guard) = rx.await.unwrap();
568        let version_update_notifier_tx = self.version_update_notifier_tx.clone();
569        LocalHummockStorage::new(
570            instance_guard,
571            basic_read_version,
572            self.hummock_version_reader.clone(),
573            self.hummock_event_sender.clone(),
574            self.buffer_tracker.get_memory_limiter().clone(),
575            self.write_limiter.clone(),
576            option,
577            version_update_notifier_tx,
578            self.context.storage_opts.mem_table_spill_threshold,
579        )
580    }
581
582    pub async fn clear_shared_buffer(&self) {
583        let (tx, rx) = oneshot::channel();
584        self.hummock_event_sender
585            .send(HummockEvent::Clear(tx, None))
586            .expect("should send success");
587        rx.await.expect("should wait success");
588    }
589
590    pub async fn clear_tables(&self, table_ids: HashSet<TableId>) {
591        if !table_ids.is_empty() {
592            let (tx, rx) = oneshot::channel();
593            self.hummock_event_sender
594                .send(HummockEvent::Clear(tx, Some(table_ids)))
595                .expect("should send success");
596            rx.await.expect("should wait success");
597        }
598    }
599
600    /// Declare the start of an epoch. This information is provided for spill so that the spill task won't
601    /// include data of two or more syncs.
602    // TODO: remove this method when we support spill task that can include data of more two or more syncs
603    pub fn start_epoch(&self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
604        let _ = self
605            .hummock_event_sender
606            .send(HummockEvent::StartEpoch { epoch, table_ids });
607    }
608
609    pub fn sstable_store(&self) -> SstableStoreRef {
610        self.context.sstable_store.clone()
611    }
612
613    pub fn object_id_manager(&self) -> &ObjectIdManagerRef {
614        &self.object_id_manager
615    }
616
617    pub fn compaction_catalog_manager_ref(&self) -> CompactionCatalogManagerRef {
618        self.compaction_catalog_manager_ref.clone()
619    }
620
621    pub fn get_memory_limiter(&self) -> Arc<MemoryLimiter> {
622        self.buffer_tracker.get_memory_limiter().clone()
623    }
624
625    pub fn get_pinned_version(&self) -> PinnedVersion {
626        self.recent_versions.load().latest_version().clone()
627    }
628
629    pub fn backup_reader(&self) -> BackupReaderRef {
630        self.backup_reader.clone()
631    }
632
633    pub fn compaction_await_tree_reg(&self) -> Option<&await_tree::Registry> {
634        self.compact_await_tree_reg.as_ref()
635    }
636
637    pub async fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
638        let (tx, rx) = oneshot::channel();
639        self.hummock_event_sender
640            .send(HummockEvent::GetMinUncommittedObjectId { result_tx: tx })
641            .expect("should send success");
642        rx.await.expect("should await success")
643    }
644
645    pub async fn sync(
646        &self,
647        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
648    ) -> StorageResult<SyncResult> {
649        let (tx, rx) = oneshot::channel();
650        let _ = self.hummock_event_sender.send(HummockEvent::SyncEpoch {
651            sync_result_sender: tx,
652            sync_table_epochs,
653        });
654        let synced_data = rx
655            .await
656            .map_err(|_| HummockError::other("failed to receive sync result"))??;
657        Ok(synced_data.into_sync_result())
658    }
659}
660
661#[derive(Clone)]
662pub struct HummockStorageReadSnapshot {
663    epoch: HummockReadEpoch,
664    table_id: TableId,
665    table_option: TableOption,
666    recent_versions: Arc<ArcSwap<RecentVersions>>,
667    hummock_version_reader: HummockVersionReader,
668    read_version_mapping: ReadOnlyReadVersionMapping,
669    backup_reader: BackupReaderRef,
670    hummock_meta_client: Arc<dyn HummockMetaClient>,
671    simple_time_travel_version_cache: Arc<SimpleTimeTravelVersionCache>,
672}
673
674impl StateStoreGet for HummockStorageReadSnapshot {
675    fn on_key_value<'a, O: Send + 'a>(
676        &'a self,
677        key: TableKey<Bytes>,
678        read_options: ReadOptions,
679        on_key_value_fn: impl KeyValueFn<'a, O>,
680    ) -> impl StorageFuture<'a, Option<O>> {
681        self.get_inner(key, read_options, on_key_value_fn)
682    }
683}
684
685impl StateStoreRead for HummockStorageReadSnapshot {
686    type Iter = HummockStorageIterator;
687    type RevIter = HummockStorageRevIterator;
688
689    fn iter(
690        &self,
691        key_range: TableKeyRange,
692        read_options: ReadOptions,
693    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
694        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
695        assert_eq!(
696            r_vnode_exclusive - l_vnode_inclusive,
697            1,
698            "read range {:?} for table {} iter contains more than one vnode",
699            key_range,
700            self.table_id
701        );
702        self.iter_inner(key_range, read_options)
703    }
704
705    fn rev_iter(
706        &self,
707        key_range: TableKeyRange,
708        read_options: ReadOptions,
709    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
710        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
711        assert_eq!(
712            r_vnode_exclusive - l_vnode_inclusive,
713            1,
714            "read range {:?} for table {} iter contains more than one vnode",
715            key_range,
716            self.table_id
717        );
718        self.rev_iter_inner(key_range, read_options)
719    }
720}
721
722impl StateStoreReadVector for HummockStorageReadSnapshot {
723    async fn nearest<'a, O: Send + 'a>(
724        &'a self,
725        vec: VectorRef<'a>,
726        options: VectorNearestOptions,
727        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
728    ) -> StorageResult<Vec<O>> {
729        let version = match self.epoch {
730            HummockReadEpoch::Committed(epoch)
731            | HummockReadEpoch::BatchQueryCommitted(epoch, _)
732            | HummockReadEpoch::TimeTravel(epoch) => {
733                self.get_epoch_hummock_version(epoch, self.table_id).await?
734            }
735            HummockReadEpoch::Backup(epoch) => self
736                .backup_reader
737                .try_get_hummock_version(self.table_id, epoch)
738                .await?
739                .ok_or_else(|| {
740                    HummockError::read_backup_error(format!(
741                        "backup include epoch {} not found",
742                        epoch
743                    ))
744                })?,
745            HummockReadEpoch::NoWait(_) => {
746                return Err(
747                    HummockError::other("nearest query does not support NoWait epoch").into(),
748                );
749            }
750        };
751        dispatch_distance_measurement!(options.measure, MeasurementType, {
752            Ok(self
753                .hummock_version_reader
754                .nearest::<MeasurementType, O>(
755                    version,
756                    self.table_id,
757                    vec,
758                    options,
759                    on_nearest_item_fn,
760                )
761                .await?)
762        })
763    }
764}
765
766impl StateStoreReadLog for HummockStorage {
767    type ChangeLogIter = ChangeLogIterator;
768
769    async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
770        fn next_epoch(
771            table_change_log: &TableChangeLogs,
772            epoch: u64,
773            table_id: TableId,
774        ) -> HummockResult<Option<u64>> {
775            let table_change_log = table_change_log.get(&table_id).ok_or_else(|| {
776                HummockError::next_epoch(format!("table {} has been dropped", table_id))
777            })?;
778            table_change_log.next_epoch(epoch).map_err(|_| {
779                HummockError::next_epoch(format!(
780                    "invalid epoch {}, change log epoch: {:?}",
781                    epoch,
782                    table_change_log.epochs().collect_vec()
783                ))
784            })
785        }
786        {
787            // fast path
788            if let Some(max_epoch) = self
789                .recent_versions
790                .load()
791                .latest_version()
792                .deref()
793                .state_table_info
794                .info()
795                .get(&options.table_id)
796                .map(|i| i.committed_epoch)
797                && max_epoch > epoch
798            {
799                // The next epoch exists either in the same `EpochNewChangeLog` or the next `EpochNewChangeLog`, so we fetch 2 `EpochNewChangeLogCommon`.
800                let table_change_log = self
801                    .table_change_log_manager
802                    .fetch_table_change_logs(options.table_id, (epoch, max_epoch), true, Some(2))
803                    .await?;
804                if let Some(next_epoch) = next_epoch(&table_change_log, epoch, options.table_id)? {
805                    return Ok(next_epoch);
806                }
807            }
808        }
809        let mut max_epoch = None;
810        wait_for_update(
811            &self.version_update_notifier_tx,
812            |version| {
813                let Some(mce) = version
814                    .state_table_info
815                    .info()
816                    .get(&options.table_id)
817                    .map(|i| i.committed_epoch)
818                else {
819                    return Ok(false);
820                };
821                max_epoch = Some(mce);
822                Ok(mce > epoch)
823            },
824            || format!("wait next_epoch: epoch: {} {}", epoch, options.table_id),
825        )
826        .await?;
827        // The next epoch exists either in the same `EpochNewChangeLog` or the next `EpochNewChangeLog`, so we fetch 2 `EpochNewChangeLogCommon`.
828        let table_change_log = self
829            .table_change_log_manager
830            .fetch_table_change_logs(
831                options.table_id,
832                (epoch, max_epoch.unwrap_or(u64::MAX)),
833                true,
834                Some(2),
835            )
836            .await?;
837        let next_epoch_ret = next_epoch(&table_change_log, epoch, options.table_id)?;
838        next_epoch_ret.ok_or_else(|| {
839            HummockError::next_epoch(format!(
840                "next_epoch for {} {} should be valid",
841                options.table_id, epoch
842            ))
843            .into()
844        })
845    }
846
847    async fn iter_log(
848        &self,
849        epoch_range: (u64, u64),
850        key_range: TableKeyRange,
851        options: ReadLogOptions,
852    ) -> StorageResult<Self::ChangeLogIter> {
853        let iter = self
854            .hummock_version_reader
855            .iter_log(
856                epoch_range,
857                key_range,
858                options,
859                self.table_change_log_manager.clone(),
860            )
861            .await?;
862        Ok(iter)
863    }
864}
865
866impl HummockStorage {
867    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
868    /// we will only check whether it is le `sealed_epoch` and won't wait.
869    async fn try_wait_epoch_impl(
870        &self,
871        wait_epoch: HummockReadEpoch,
872        table_id: TableId,
873    ) -> StorageResult<()> {
874        tracing::debug!(
875            "try_wait_epoch: epoch: {:?}, table_id: {}",
876            wait_epoch,
877            table_id
878        );
879        match wait_epoch {
880            HummockReadEpoch::Committed(wait_epoch) => {
881                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
882                wait_for_epoch(&self.version_update_notifier_tx, wait_epoch, table_id).await?;
883            }
884            HummockReadEpoch::BatchQueryCommitted(wait_epoch, wait_version_id) => {
885                assert!(!is_max_epoch(wait_epoch), "epoch should not be MAX EPOCH");
886                // fast path by checking recent_versions
887                {
888                    let recent_versions = self.recent_versions.load();
889                    let latest_version = recent_versions.latest_version();
890                    if latest_version.id >= wait_version_id
891                        && let Some(committed_epoch) =
892                            latest_version.table_committed_epoch(table_id)
893                        && committed_epoch >= wait_epoch
894                    {
895                        return Ok(());
896                    }
897                }
898                wait_for_update(
899                    &self.version_update_notifier_tx,
900                    |version| {
901                        if wait_version_id > version.id() {
902                            return Ok(false);
903                        }
904                        let committed_epoch =
905                            version.table_committed_epoch(table_id).ok_or_else(|| {
906                                // In batch query, since we have ensured that the current version must be after the
907                                // `wait_version_id`, when seeing that the table_id not exist in the latest version,
908                                // the table must have been dropped.
909                                HummockError::wait_epoch(format!(
910                                    "table id {} has been dropped",
911                                    table_id
912                                ))
913                            })?;
914                        Ok(committed_epoch >= wait_epoch)
915                    },
916                    || {
917                        format!(
918                            "try_wait_epoch: epoch: {}, version_id: {:?}",
919                            wait_epoch, wait_version_id
920                        )
921                    },
922                )
923                .await?;
924            }
925            _ => {}
926        };
927        Ok(())
928    }
929}
930
931impl StateStore for HummockStorage {
932    type Local = LocalHummockStorage;
933    type ReadSnapshot = HummockStorageReadSnapshot;
934    type VectorWriter = HummockVectorWriter;
935
936    /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`,
937    /// we will only check whether it is le `sealed_epoch` and won't wait.
938    async fn try_wait_epoch(
939        &self,
940        wait_epoch: HummockReadEpoch,
941        options: TryWaitEpochOptions,
942    ) -> StorageResult<()> {
943        self.try_wait_epoch_impl(wait_epoch, options.table_id).await
944    }
945
946    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_ {
947        self.new_local_inner(option)
948    }
949
950    async fn new_read_snapshot(
951        &self,
952        epoch: HummockReadEpoch,
953        options: NewReadSnapshotOptions,
954    ) -> StorageResult<Self::ReadSnapshot> {
955        self.try_wait_epoch_impl(epoch, options.table_id).await?;
956        Ok(HummockStorageReadSnapshot {
957            epoch,
958            table_id: options.table_id,
959            table_option: options.table_option,
960            recent_versions: self.recent_versions.clone(),
961            hummock_version_reader: self.hummock_version_reader.clone(),
962            read_version_mapping: self.read_version_mapping.clone(),
963            backup_reader: self.backup_reader.clone(),
964            hummock_meta_client: self.hummock_meta_client.clone(),
965            simple_time_travel_version_cache: self.simple_time_travel_version_cache.clone(),
966        })
967    }
968
969    async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
970        HummockVectorWriter::new(
971            options.table_id,
972            self.version_update_notifier_tx.clone(),
973            self.context.sstable_store.clone(),
974            self.object_id_manager.clone(),
975            self.hummock_event_sender.clone(),
976            self.hummock_version_reader.stats().clone(),
977            self.context.storage_opts.clone(),
978        )
979    }
980}
981
982#[cfg(any(test, feature = "test"))]
983impl HummockStorage {
984    pub async fn seal_and_sync_epoch(
985        &self,
986        epoch: u64,
987        table_ids: HashSet<TableId>,
988    ) -> StorageResult<risingwave_hummock_sdk::SyncResult> {
989        self.sync(vec![(epoch, table_ids)]).await
990    }
991
992    /// Used in the compaction test tool
993    pub async fn update_version_and_wait(&self, version: HummockVersion) {
994        use tokio::task::yield_now;
995        let version_id = version.id;
996        self._version_update_sender
997            .send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
998            .unwrap();
999        loop {
1000            if self.recent_versions.load().latest_version().id() >= version_id {
1001                break;
1002            }
1003
1004            yield_now().await
1005        }
1006    }
1007
1008    pub async fn wait_version(&self, version: HummockVersion) {
1009        use tokio::task::yield_now;
1010        loop {
1011            if self.recent_versions.load().latest_version().id() >= version.id {
1012                break;
1013            }
1014
1015            yield_now().await
1016        }
1017    }
1018
1019    /// Creates a [`HummockStorage`] with default stats. Should only be used by tests.
1020    pub async fn for_test(
1021        options: Arc<StorageOpts>,
1022        sstable_store: SstableStoreRef,
1023        hummock_meta_client: Arc<dyn HummockMetaClient>,
1024        notification_client: impl NotificationClient,
1025    ) -> HummockResult<Self> {
1026        let compaction_catalog_manager = Arc::new(CompactionCatalogManager::new(Box::new(
1027            FakeRemoteTableAccessor {},
1028        )));
1029
1030        Self::new(
1031            options,
1032            sstable_store,
1033            hummock_meta_client,
1034            notification_client,
1035            compaction_catalog_manager,
1036            Arc::new(HummockStateStoreMetrics::unused()),
1037            Arc::new(CompactorMetrics::unused()),
1038            None,
1039        )
1040        .await
1041    }
1042
1043    pub fn storage_opts(&self) -> &Arc<StorageOpts> {
1044        &self.context.storage_opts
1045    }
1046
1047    pub fn version_reader(&self) -> &HummockVersionReader {
1048        &self.hummock_version_reader
1049    }
1050
1051    pub async fn wait_version_update(
1052        &self,
1053        old_id: risingwave_hummock_sdk::HummockVersionId,
1054    ) -> risingwave_hummock_sdk::HummockVersionId {
1055        use tokio::task::yield_now;
1056        loop {
1057            let cur_id = self.recent_versions.load().latest_version().id();
1058            if cur_id > old_id {
1059                return cur_id;
1060            }
1061            yield_now().await;
1062        }
1063    }
1064
1065    #[cfg(any(test, feature = "test"))]
1066    pub async fn flush_events_for_test(&self) {
1067        let (tx, rx) = oneshot::channel();
1068        self.hummock_event_sender
1069            .send(HummockEvent::FlushEvent(tx))
1070            .expect("flush event should succeed");
1071        rx.await.expect("flush event receiver dropped");
1072    }
1073}