risingwave_storage/hummock/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::backtrace::Backtrace;
16use std::cmp::Ordering;
17use std::collections::VecDeque;
18use std::fmt::{Debug, Formatter};
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::ops::{Bound, RangeBounds};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
23use std::time::{Duration, Instant};
24
25use bytes::Bytes;
26use foyer::Hint;
27use futures::{Stream, StreamExt, pin_mut};
28use parking_lot::Mutex;
29use risingwave_common::catalog::{TableId, TableOption};
30use risingwave_common::config::StorageMemoryConfig;
31use risingwave_expr::codegen::try_stream;
32use risingwave_hummock_sdk::can_concat;
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::key::{
35    EmptySliceRef, FullKey, TableKey, UserKey, bound_table_key_range,
36};
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use tokio::sync::oneshot::{Receiver, Sender, channel};
39
40use super::{HummockError, HummockResult, SstableStoreRef};
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::hummock::local_version::pinned_version::PinnedVersion;
44use crate::mem_table::{KeyOp, MemTableError};
45use crate::monitor::MemoryCollector;
46use crate::store::{
47    OpConsistencyLevel, ReadOptions, StateStoreGet, StateStoreKeyedRow, StateStoreRead,
48};
49
50pub fn range_overlap<R, B>(
51    search_key_range: &R,
52    inclusive_start_key: &B,
53    end_key: Bound<&B>,
54) -> bool
55where
56    R: RangeBounds<B>,
57    B: Ord,
58{
59    let (start_bound, end_bound) = (search_key_range.start_bound(), search_key_range.end_bound());
60
61    //        RANGE
62    // TABLE
63    let too_left = match (start_bound, end_key) {
64        (Included(range_start), Included(inclusive_end_key)) => range_start > inclusive_end_key,
65        (Included(range_start), Excluded(end_key))
66        | (Excluded(range_start), Included(end_key))
67        | (Excluded(range_start), Excluded(end_key)) => range_start >= end_key,
68        (Unbounded, _) | (_, Unbounded) => false,
69    };
70    // RANGE
71    //        TABLE
72    let too_right = match end_bound {
73        Included(range_end) => range_end < inclusive_start_key,
74        Excluded(range_end) => range_end <= inclusive_start_key,
75        Unbounded => false,
76    };
77
78    !too_left && !too_right
79}
80
81pub fn filter_single_sst<R, B>(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool
82where
83    R: RangeBounds<TableKey<B>>,
84    B: AsRef<[u8]> + EmptySliceRef,
85{
86    let table_range = &info.key_range;
87    let table_start = FullKey::decode(table_range.left.as_ref()).user_key;
88    let table_end = FullKey::decode(table_range.right.as_ref()).user_key;
89    let (left, right) = bound_table_key_range(table_id, table_key_range);
90    let left: Bound<UserKey<&[u8]>> = left.as_ref().map(|key| key.as_ref());
91    let right: Bound<UserKey<&[u8]>> = right.as_ref().map(|key| key.as_ref());
92    range_overlap(
93        &(left, right),
94        &table_start,
95        if table_range.right_exclusive {
96            Bound::Excluded(&table_end)
97        } else {
98            Bound::Included(&table_end)
99        },
100    ) && info.table_ids.binary_search(&table_id.table_id()).is_ok()
101}
102
103/// Search the SST containing the specified key within a level, using binary search.
104pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize {
105    ssts.partition_point(|table| {
106        let ord = FullKey::decode(&table.key_range.left).user_key.cmp(&key);
107        ord == Ordering::Less || ord == Ordering::Equal
108    })
109}
110
111/// Prune overlapping SSTs that does not overlap with a specific key range or does not overlap with
112/// a specific table id. Returns the sst ids after pruning.
113pub fn prune_overlapping_ssts<'a, R, B>(
114    ssts: &'a [SstableInfo],
115    table_id: TableId,
116    table_key_range: &'a R,
117) -> impl DoubleEndedIterator<Item = &'a SstableInfo>
118where
119    R: RangeBounds<TableKey<B>>,
120    B: AsRef<[u8]> + EmptySliceRef,
121{
122    ssts.iter()
123        .filter(move |info| filter_single_sst(info, table_id, table_key_range))
124}
125
126/// Prune non-overlapping SSTs that does not overlap with a specific key range or does not overlap
127/// with a specific table id. Returns the sst ids after pruning.
128#[allow(clippy::type_complexity)]
129pub fn prune_nonoverlapping_ssts<'a>(
130    ssts: &'a [SstableInfo],
131    user_key_range: (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>),
132    table_id: StateTableId,
133) -> impl DoubleEndedIterator<Item = &'a SstableInfo> {
134    debug_assert!(can_concat(ssts));
135    let start_table_idx = match user_key_range.0 {
136        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
137        _ => 0,
138    };
139    let end_table_idx = match user_key_range.1 {
140        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
141        _ => ssts.len().saturating_sub(1),
142    };
143    ssts[start_table_idx..=end_table_idx]
144        .iter()
145        .filter(move |sst| sst.table_ids.binary_search(&table_id).is_ok())
146}
147
148type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
149enum MemoryRequest {
150    Ready(MemoryTracker),
151    Pending(Receiver<MemoryTracker>),
152}
153
154struct MemoryLimiterInner {
155    total_size: AtomicU64,
156    controller: Mutex<RequestQueue>,
157    has_waiter: AtomicBool,
158    quota: u64,
159}
160
161impl MemoryLimiterInner {
162    fn release_quota(&self, quota: u64) {
163        self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
164    }
165
166    fn add_memory(&self, quota: u64) {
167        self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
168    }
169
170    fn may_notify_waiters(self: &Arc<Self>) {
171        // check `has_waiter` to avoid access lock every times drop `MemoryTracker`.
172        if !self.has_waiter.load(AtomicOrdering::Acquire) {
173            return;
174        }
175        let mut notify_waiters = vec![];
176        {
177            let mut waiters = self.controller.lock();
178            while let Some((_, quota)) = waiters.front() {
179                if !self.try_require_memory(*quota) {
180                    break;
181                }
182                let (tx, quota) = waiters.pop_front().unwrap();
183                notify_waiters.push((tx, quota));
184            }
185
186            if waiters.is_empty() {
187                self.has_waiter.store(false, AtomicOrdering::Release);
188            }
189        }
190
191        for (tx, quota) in notify_waiters {
192            let _ = tx.send(MemoryTracker::new(self.clone(), quota));
193        }
194    }
195
196    fn try_require_memory(&self, quota: u64) -> bool {
197        let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
198        while self.permit_quota(current_quota, quota) {
199            match self.total_size.compare_exchange(
200                current_quota,
201                current_quota + quota,
202                AtomicOrdering::SeqCst,
203                AtomicOrdering::SeqCst,
204            ) {
205                Ok(_) => {
206                    return true;
207                }
208                Err(old_quota) => {
209                    current_quota = old_quota;
210                }
211            }
212        }
213        false
214    }
215
216    fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
217        let mut waiters = self.controller.lock();
218        let first_req = waiters.is_empty();
219        if first_req {
220            // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock.
221            self.has_waiter.store(true, AtomicOrdering::Release);
222        }
223        // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue.
224        if self.try_require_memory(quota) {
225            if first_req {
226                self.has_waiter.store(false, AtomicOrdering::Release);
227            }
228            return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
229        }
230        let (tx, rx) = channel();
231        waiters.push_back((tx, quota));
232        MemoryRequest::Pending(rx)
233    }
234
235    fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
236        current_quota <= self.quota
237    }
238}
239
240pub struct MemoryLimiter {
241    inner: Arc<MemoryLimiterInner>,
242}
243
244impl Debug for MemoryLimiter {
245    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
246        f.debug_struct("MemoryLimiter")
247            .field("quota", &self.inner.quota)
248            .field("usage", &self.inner.total_size)
249            .finish()
250    }
251}
252
253pub struct MemoryTracker {
254    limiter: Arc<MemoryLimiterInner>,
255    quota: Option<u64>,
256}
257impl MemoryTracker {
258    fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
259        Self {
260            limiter,
261            quota: Some(quota),
262        }
263    }
264}
265
266impl Debug for MemoryTracker {
267    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268        f.debug_struct("MemoryTracker")
269            .field("quota", &self.quota)
270            .finish()
271    }
272}
273
274impl MemoryLimiter {
275    pub fn unlimit() -> Arc<Self> {
276        Arc::new(Self::new(u64::MAX))
277    }
278
279    pub fn new(quota: u64) -> Self {
280        Self {
281            inner: Arc::new(MemoryLimiterInner {
282                total_size: AtomicU64::new(0),
283                controller: Mutex::new(VecDeque::default()),
284                has_waiter: AtomicBool::new(false),
285                quota,
286            }),
287        }
288    }
289
290    pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
291        if self.inner.try_require_memory(quota) {
292            Some(MemoryTracker::new(self.inner.clone(), quota))
293        } else {
294            None
295        }
296    }
297
298    pub fn get_memory_usage(&self) -> u64 {
299        self.inner.total_size.load(AtomicOrdering::Acquire)
300    }
301
302    pub fn quota(&self) -> u64 {
303        self.inner.quota
304    }
305
306    pub fn must_require_memory(&self, quota: u64) -> MemoryTracker {
307        if !self.inner.try_require_memory(quota) {
308            self.inner.add_memory(quota);
309        }
310
311        MemoryTracker::new(self.inner.clone(), quota)
312    }
313}
314
315impl MemoryLimiter {
316    pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
317        match self.inner.require_memory(quota) {
318            MemoryRequest::Ready(tracker) => tracker,
319            MemoryRequest::Pending(rx) => rx.await.unwrap(),
320        }
321    }
322}
323
324impl MemoryTracker {
325    pub fn try_increase_memory(&mut self, target: u64) -> bool {
326        let quota = self.quota.unwrap();
327        if quota >= target {
328            return true;
329        }
330        if self.limiter.try_require_memory(target - quota) {
331            self.quota = Some(target);
332            true
333        } else {
334            false
335        }
336    }
337}
338
339// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner.
340impl Drop for MemoryTracker {
341    fn drop(&mut self) {
342        if let Some(quota) = self.quota.take() {
343            self.limiter.release_quota(quota);
344            self.limiter.may_notify_waiters();
345        }
346    }
347}
348
349/// Check whether the items in `sub_iter` is a subset of the items in `full_iter`, and meanwhile
350/// preserve the order.
351pub fn check_subset_preserve_order<T: Eq>(
352    sub_iter: impl Iterator<Item = T>,
353    mut full_iter: impl Iterator<Item = T>,
354) -> bool {
355    for sub_iter_item in sub_iter {
356        let mut found = false;
357        for full_iter_item in full_iter.by_ref() {
358            if sub_iter_item == full_iter_item {
359                found = true;
360                break;
361            }
362        }
363        if !found {
364            return false;
365        }
366    }
367    true
368}
369
370static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));
371
372/// This function is intended to be called during compute node initialization if the storage
373/// sanity check is not desired. This controls a global flag so only need to be called once
374/// if need to disable the sanity check.
375pub fn disable_sanity_check() {
376    SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
377}
378
379pub(crate) fn sanity_check_enabled() -> bool {
380    SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
381}
382
383async fn get_from_state_store(
384    state_store: &impl StateStoreGet,
385    key: TableKey<Bytes>,
386    read_options: ReadOptions,
387) -> StorageResult<Option<Bytes>> {
388    state_store
389        .on_key_value(key, read_options, |_, value| {
390            Ok(Bytes::copy_from_slice(value))
391        })
392        .await
393}
394
395/// Make sure the key to insert should not exist in storage.
396pub(crate) async fn do_insert_sanity_check(
397    key: &TableKey<Bytes>,
398    value: &Bytes,
399    inner: &impl StateStoreRead,
400    table_option: TableOption,
401    op_consistency_level: &OpConsistencyLevel,
402) -> StorageResult<()> {
403    if let OpConsistencyLevel::Inconsistent = op_consistency_level {
404        return Ok(());
405    }
406    let read_options = ReadOptions {
407        retention_seconds: table_option.retention_seconds,
408        cache_policy: CachePolicy::Fill(Hint::Normal),
409        ..Default::default()
410    };
411    let stored_value = get_from_state_store(inner, key.clone(), read_options).await?;
412
413    if let Some(stored_value) = stored_value {
414        return Err(Box::new(MemTableError::InconsistentOperation {
415            key: key.clone(),
416            prev: KeyOp::Insert(stored_value),
417            new: KeyOp::Insert(value.clone()),
418        })
419        .into());
420    }
421    Ok(())
422}
423
424/// Make sure that the key to delete should exist in storage and the value should be matched.
425pub(crate) async fn do_delete_sanity_check(
426    key: &TableKey<Bytes>,
427    old_value: &Bytes,
428    inner: &impl StateStoreRead,
429    table_option: TableOption,
430    op_consistency_level: &OpConsistencyLevel,
431) -> StorageResult<()> {
432    let OpConsistencyLevel::ConsistentOldValue {
433        check_old_value: old_value_checker,
434        ..
435    } = op_consistency_level
436    else {
437        return Ok(());
438    };
439    let read_options = ReadOptions {
440        retention_seconds: table_option.retention_seconds,
441        cache_policy: CachePolicy::Fill(Hint::Normal),
442        ..Default::default()
443    };
444    match get_from_state_store(inner, key.clone(), read_options).await? {
445        None => Err(Box::new(MemTableError::InconsistentOperation {
446            key: key.clone(),
447            prev: KeyOp::Delete(Bytes::default()),
448            new: KeyOp::Delete(old_value.clone()),
449        })
450        .into()),
451        Some(stored_value) => {
452            if !old_value_checker(&stored_value, old_value) {
453                Err(Box::new(MemTableError::InconsistentOperation {
454                    key: key.clone(),
455                    prev: KeyOp::Insert(stored_value),
456                    new: KeyOp::Delete(old_value.clone()),
457                })
458                .into())
459            } else {
460                Ok(())
461            }
462        }
463    }
464}
465
466/// Make sure that the key to update should exist in storage and the value should be matched
467pub(crate) async fn do_update_sanity_check(
468    key: &TableKey<Bytes>,
469    old_value: &Bytes,
470    new_value: &Bytes,
471    inner: &impl StateStoreRead,
472    table_option: TableOption,
473    op_consistency_level: &OpConsistencyLevel,
474) -> StorageResult<()> {
475    let OpConsistencyLevel::ConsistentOldValue {
476        check_old_value: old_value_checker,
477        ..
478    } = op_consistency_level
479    else {
480        return Ok(());
481    };
482    let read_options = ReadOptions {
483        retention_seconds: table_option.retention_seconds,
484        cache_policy: CachePolicy::Fill(Hint::Normal),
485        ..Default::default()
486    };
487
488    match get_from_state_store(inner, key.clone(), read_options).await? {
489        None => Err(Box::new(MemTableError::InconsistentOperation {
490            key: key.clone(),
491            prev: KeyOp::Delete(Bytes::default()),
492            new: KeyOp::Update((old_value.clone(), new_value.clone())),
493        })
494        .into()),
495        Some(stored_value) => {
496            if !old_value_checker(&stored_value, old_value) {
497                Err(Box::new(MemTableError::InconsistentOperation {
498                    key: key.clone(),
499                    prev: KeyOp::Insert(stored_value),
500                    new: KeyOp::Update((old_value.clone(), new_value.clone())),
501                })
502                .into())
503            } else {
504                Ok(())
505            }
506        }
507    }
508}
509
510pub fn cmp_delete_range_left_bounds(a: Bound<&Bytes>, b: Bound<&Bytes>) -> Ordering {
511    match (a, b) {
512        // only right bound of delete range can be `Unbounded`
513        (Unbounded, _) | (_, Unbounded) => unreachable!(),
514        (Included(x), Included(y)) | (Excluded(x), Excluded(y)) => x.cmp(y),
515        (Included(x), Excluded(y)) => x.cmp(y).then(Ordering::Less),
516        (Excluded(x), Included(y)) => x.cmp(y).then(Ordering::Greater),
517    }
518}
519
520pub(crate) fn validate_delete_range(left: &Bound<Bytes>, right: &Bound<Bytes>) -> bool {
521    match (left, right) {
522        // only right bound of delete range can be `Unbounded`
523        (Unbounded, _) => unreachable!(),
524        (_, Unbounded) => true,
525        (Included(x), Included(y)) => x <= y,
526        (Included(x), Excluded(y)) | (Excluded(x), Included(y)) | (Excluded(x), Excluded(y)) => {
527            x < y
528        }
529    }
530}
531
532#[expect(dead_code)]
533pub(crate) fn filter_with_delete_range<'a>(
534    kv_iter: impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a,
535    mut delete_ranges_iter: impl Iterator<Item = &'a (Bound<Bytes>, Bound<Bytes>)> + 'a,
536) -> impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a {
537    let mut range = delete_ranges_iter.next();
538    if let Some((range_start, range_end)) = range {
539        assert!(
540            validate_delete_range(range_start, range_end),
541            "range_end {:?} smaller than range_start {:?}",
542            range_start,
543            range_end
544        );
545    }
546    kv_iter.filter(move |(key, _)| {
547        if let Some(range_bound) = range {
548            if cmp_delete_range_left_bounds(Included(&key.0), range_bound.0.as_ref())
549                == Ordering::Less
550            {
551                true
552            } else if range_bound.contains(key.as_ref()) {
553                false
554            } else {
555                // Key has exceeded the current key range. Advance to the next range.
556                loop {
557                    range = delete_ranges_iter.next();
558                    if let Some(range_bound) = range {
559                        assert!(
560                            validate_delete_range(&range_bound.0, &range_bound.1),
561                            "range_end {:?} smaller than range_start {:?}",
562                            range_bound.0,
563                            range_bound.1
564                        );
565                        if cmp_delete_range_left_bounds(Included(key), range_bound.0.as_ref())
566                            == Ordering::Less
567                        {
568                            // Not fall in the next delete range
569                            break true;
570                        } else if range_bound.contains(key.as_ref()) {
571                            // Fall in the next delete range
572                            break false;
573                        } else {
574                            // Exceed the next delete range. Go to the next delete range if there is
575                            // any in the next loop
576                            continue;
577                        }
578                    } else {
579                        // No more delete range.
580                        break true;
581                    }
582                }
583            }
584        } else {
585            true
586        }
587    })
588}
589
590/// Wait for the `committed_epoch` of `table_id` to reach `wait_epoch`.
591///
592/// When the `table_id` does not exist in the latest version, we assume that
593/// the table is not created yet, and will wait until the table is created.
594pub(crate) async fn wait_for_epoch(
595    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
596    wait_epoch: u64,
597    table_id: TableId,
598) -> StorageResult<()> {
599    let mut prev_committed_epoch = None;
600    let prev_committed_epoch = &mut prev_committed_epoch;
601    wait_for_update(
602        notifier,
603        |version| {
604            let committed_epoch = version.table_committed_epoch(table_id);
605            let ret = if let Some(committed_epoch) = committed_epoch {
606                if committed_epoch >= wait_epoch {
607                    Ok(true)
608                } else {
609                    Ok(false)
610                }
611            } else if prev_committed_epoch.is_none() {
612                Ok(false)
613            } else {
614                Err(HummockError::wait_epoch(format!(
615                    "table {} has been dropped",
616                    table_id
617                )))
618            };
619            *prev_committed_epoch = committed_epoch;
620            ret
621        },
622        || {
623            format!(
624                "wait_for_epoch: epoch: {}, table_id: {}",
625                wait_epoch, table_id
626            )
627        },
628    )
629    .await?;
630    Ok(())
631}
632
633pub(crate) async fn wait_for_update(
634    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
635    mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult<bool>,
636    mut periodic_debug_info: impl FnMut() -> String,
637) -> HummockResult<()> {
638    let mut receiver = notifier.subscribe();
639    if inspect_fn(&receiver.borrow_and_update())? {
640        return Ok(());
641    }
642    let start_time = Instant::now();
643    loop {
644        match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
645            Err(_) => {
646                // Provide backtrace iff in debug mode for observability.
647                let backtrace = cfg!(debug_assertions)
648                    .then(Backtrace::capture)
649                    .map(tracing::field::display);
650
651                // The reason that we need to retry here is batch scan in
652                // chain/rearrange_chain is waiting for an
653                // uncommitted epoch carried by the CreateMV barrier, which
654                // can take unbounded time to become committed and propagate
655                // to the CN. We should consider removing the retry as well as wait_epoch
656                // for chain/rearrange_chain if we enforce
657                // chain/rearrange_chain to be scheduled on the same
658                // CN with the same distribution as the upstream MV.
659                // See #3845 for more details.
660                tracing::warn!(
661                    info = periodic_debug_info(),
662                    elapsed = ?start_time.elapsed(),
663                    backtrace,
664                    "timeout when waiting for version update",
665                );
666                continue;
667            }
668            Ok(Err(_)) => {
669                return Err(HummockError::wait_epoch("tx dropped"));
670            }
671            Ok(Ok(_)) => {
672                if inspect_fn(&receiver.borrow_and_update())? {
673                    return Ok(());
674                }
675            }
676        }
677    }
678}
679
680pub struct HummockMemoryCollector {
681    sstable_store: SstableStoreRef,
682    limiter: Arc<MemoryLimiter>,
683    storage_memory_config: StorageMemoryConfig,
684}
685
686impl HummockMemoryCollector {
687    pub fn new(
688        sstable_store: SstableStoreRef,
689        limiter: Arc<MemoryLimiter>,
690        storage_memory_config: StorageMemoryConfig,
691    ) -> Self {
692        Self {
693            sstable_store,
694            limiter,
695            storage_memory_config,
696        }
697    }
698}
699
700impl MemoryCollector for HummockMemoryCollector {
701    fn get_meta_memory_usage(&self) -> u64 {
702        self.sstable_store.meta_cache().memory().usage() as _
703    }
704
705    fn get_data_memory_usage(&self) -> u64 {
706        self.sstable_store.block_cache().memory().usage() as _
707    }
708
709    fn get_uploading_memory_usage(&self) -> u64 {
710        self.limiter.get_memory_usage()
711    }
712
713    fn get_prefetch_memory_usage(&self) -> usize {
714        self.sstable_store.get_prefetch_memory_usage()
715    }
716
717    fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
718        self.sstable_store.meta_cache().memory().usage() as f64
719            / self.sstable_store.meta_cache().memory().capacity() as f64
720    }
721
722    fn get_block_cache_memory_usage_ratio(&self) -> f64 {
723        self.sstable_store.block_cache().memory().usage() as f64
724            / self.sstable_store.block_cache().memory().capacity() as f64
725    }
726
727    fn get_shared_buffer_usage_ratio(&self) -> f64 {
728        self.limiter.get_memory_usage() as f64
729            / (self.storage_memory_config.shared_buffer_capacity_mb * 1024 * 1024) as f64
730    }
731}
732
733#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
734pub(crate) async fn merge_stream<'a>(
735    mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
736    inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
737    table_id: TableId,
738    epoch: u64,
739    rev: bool,
740) {
741    let inner_stream = inner_stream.peekable();
742    pin_mut!(inner_stream);
743
744    let mut mem_table_iter = mem_table_iter.fuse().peekable();
745
746    loop {
747        match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
748            (None, None) => break,
749            // The mem table side has come to an end, return data from the shared storage.
750            (Some(_), None) => {
751                let (key, value) = inner_stream.next().await.unwrap()?;
752                yield (key, value)
753            }
754            // The stream side has come to an end, return data from the mem table.
755            (None, Some(_)) => {
756                let (key, key_op) = mem_table_iter.next().unwrap();
757                match key_op {
758                    KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
759                        yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
760                    }
761                    _ => {}
762                }
763            }
764            (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
765                debug_assert_eq!(inner_key.user_key.table_id, table_id);
766                let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
767                if rev {
768                    ret = ret.reverse();
769                }
770                match ret {
771                    Ordering::Less => {
772                        // yield data from storage
773                        let (key, value) = inner_stream.next().await.unwrap()?;
774                        yield (key, value);
775                    }
776                    Ordering::Equal => {
777                        // both memtable and storage contain the key, so we advance both
778                        // iterators and return the data in memory.
779
780                        let (_, key_op) = mem_table_iter.next().unwrap();
781                        let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
782                        match key_op {
783                            KeyOp::Insert(value) => {
784                                yield (key.clone(), value.clone());
785                            }
786                            KeyOp::Delete(_) => {}
787                            KeyOp::Update((old_value, new_value)) => {
788                                debug_assert!(old_value == &old_value_in_inner);
789
790                                yield (key, new_value.clone());
791                            }
792                        }
793                    }
794                    Ordering::Greater => {
795                        // yield data from mem table
796                        let (key, key_op) = mem_table_iter.next().unwrap();
797
798                        match key_op {
799                            KeyOp::Insert(value) => {
800                                yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
801                            }
802                            KeyOp::Delete(_) => {}
803                            KeyOp::Update(_) => unreachable!(
804                                "memtable update should always be paired with a storage key"
805                            ),
806                        }
807                    }
808                }
809            }
810            (Some(Err(_)), Some(_)) => {
811                // Throw the error.
812                return Err(inner_stream.next().await.unwrap().unwrap_err());
813            }
814        }
815    }
816}
817
818#[cfg(test)]
819mod tests {
820    use std::future::{Future, poll_fn};
821    use std::sync::Arc;
822    use std::task::Poll;
823
824    use futures::FutureExt;
825    use futures::future::join_all;
826    use rand::random_range;
827
828    use crate::hummock::utils::MemoryLimiter;
829
830    async fn assert_pending(future: &mut (impl Future + Unpin)) {
831        for _ in 0..10 {
832            assert!(
833                poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
834                    .await
835                    .is_pending()
836            );
837        }
838    }
839
840    #[tokio::test]
841    async fn test_loose_memory_limiter() {
842        let quota = 5;
843        let memory_limiter = MemoryLimiter::new(quota);
844        drop(memory_limiter.require_memory(6).await);
845        let tracker1 = memory_limiter.require_memory(3).await;
846        assert_eq!(3, memory_limiter.get_memory_usage());
847        let tracker2 = memory_limiter.require_memory(4).await;
848        assert_eq!(7, memory_limiter.get_memory_usage());
849        let mut future = memory_limiter.require_memory(5).boxed();
850        assert_pending(&mut future).await;
851        assert_eq!(7, memory_limiter.get_memory_usage());
852        drop(tracker1);
853        let tracker3 = future.await;
854        assert_eq!(9, memory_limiter.get_memory_usage());
855        drop(tracker2);
856        assert_eq!(5, memory_limiter.get_memory_usage());
857        drop(tracker3);
858        assert_eq!(0, memory_limiter.get_memory_usage());
859    }
860
861    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
862    async fn test_multi_thread_acquire_memory() {
863        const QUOTA: u64 = 10;
864        let memory_limiter = Arc::new(MemoryLimiter::new(200));
865        let mut handles = vec![];
866        for _ in 0..40 {
867            let limiter = memory_limiter.clone();
868            let h = tokio::spawn(async move {
869                let mut buffers = vec![];
870                let mut current_buffer_usage = random_range(2..=9);
871                for _ in 0..1000 {
872                    if buffers.len() < current_buffer_usage
873                        && let Some(tracker) = limiter.try_require_memory(QUOTA)
874                    {
875                        buffers.push(tracker);
876                    } else {
877                        buffers.clear();
878                        current_buffer_usage = random_range(2..=9);
879                        let req = limiter.require_memory(QUOTA);
880                        match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
881                            Ok(tracker) => {
882                                buffers.push(tracker);
883                            }
884                            Err(_) => {
885                                continue;
886                            }
887                        }
888                    }
889                    let sleep_time = random_range(1..=3);
890                    tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
891                }
892            });
893            handles.push(h);
894        }
895        let h = join_all(handles);
896        let _ = h.await;
897    }
898}