risingwave_storage/hummock/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::backtrace::Backtrace;
16use std::cmp::Ordering;
17use std::collections::VecDeque;
18use std::fmt::{Debug, Formatter};
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::ops::{Bound, RangeBounds};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
23use std::time::{Duration, Instant};
24
25use bytes::Bytes;
26use foyer::CacheHint;
27use futures::{Stream, StreamExt, pin_mut};
28use parking_lot::Mutex;
29use risingwave_common::catalog::{TableId, TableOption};
30use risingwave_common::config::StorageMemoryConfig;
31use risingwave_expr::codegen::try_stream;
32use risingwave_hummock_sdk::can_concat;
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::key::{
35    EmptySliceRef, FullKey, TableKey, UserKey, bound_table_key_range,
36};
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use tokio::sync::oneshot::{Receiver, Sender, channel};
39
40use super::{HummockError, HummockResult, SstableStoreRef};
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::hummock::local_version::pinned_version::PinnedVersion;
44use crate::mem_table::{KeyOp, MemTableError};
45use crate::monitor::MemoryCollector;
46use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreKeyedRow, StateStoreRead};
47
48pub fn range_overlap<R, B>(
49    search_key_range: &R,
50    inclusive_start_key: &B,
51    end_key: Bound<&B>,
52) -> bool
53where
54    R: RangeBounds<B>,
55    B: Ord,
56{
57    let (start_bound, end_bound) = (search_key_range.start_bound(), search_key_range.end_bound());
58
59    //        RANGE
60    // TABLE
61    let too_left = match (start_bound, end_key) {
62        (Included(range_start), Included(inclusive_end_key)) => range_start > inclusive_end_key,
63        (Included(range_start), Excluded(end_key))
64        | (Excluded(range_start), Included(end_key))
65        | (Excluded(range_start), Excluded(end_key)) => range_start >= end_key,
66        (Unbounded, _) | (_, Unbounded) => false,
67    };
68    // RANGE
69    //        TABLE
70    let too_right = match end_bound {
71        Included(range_end) => range_end < inclusive_start_key,
72        Excluded(range_end) => range_end <= inclusive_start_key,
73        Unbounded => false,
74    };
75
76    !too_left && !too_right
77}
78
79pub fn filter_single_sst<R, B>(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool
80where
81    R: RangeBounds<TableKey<B>>,
82    B: AsRef<[u8]> + EmptySliceRef,
83{
84    let table_range = &info.key_range;
85    let table_start = FullKey::decode(table_range.left.as_ref()).user_key;
86    let table_end = FullKey::decode(table_range.right.as_ref()).user_key;
87    let (left, right) = bound_table_key_range(table_id, table_key_range);
88    let left: Bound<UserKey<&[u8]>> = left.as_ref().map(|key| key.as_ref());
89    let right: Bound<UserKey<&[u8]>> = right.as_ref().map(|key| key.as_ref());
90    range_overlap(
91        &(left, right),
92        &table_start,
93        if table_range.right_exclusive {
94            Bound::Excluded(&table_end)
95        } else {
96            Bound::Included(&table_end)
97        },
98    ) && info.table_ids.binary_search(&table_id.table_id()).is_ok()
99}
100
101/// Search the SST containing the specified key within a level, using binary search.
102pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize {
103    ssts.partition_point(|table| {
104        let ord = FullKey::decode(&table.key_range.left).user_key.cmp(&key);
105        ord == Ordering::Less || ord == Ordering::Equal
106    })
107}
108
109/// Prune overlapping SSTs that does not overlap with a specific key range or does not overlap with
110/// a specific table id. Returns the sst ids after pruning.
111pub fn prune_overlapping_ssts<'a, R, B>(
112    ssts: &'a [SstableInfo],
113    table_id: TableId,
114    table_key_range: &'a R,
115) -> impl DoubleEndedIterator<Item = &'a SstableInfo>
116where
117    R: RangeBounds<TableKey<B>>,
118    B: AsRef<[u8]> + EmptySliceRef,
119{
120    ssts.iter()
121        .filter(move |info| filter_single_sst(info, table_id, table_key_range))
122}
123
124/// Prune non-overlapping SSTs that does not overlap with a specific key range or does not overlap
125/// with a specific table id. Returns the sst ids after pruning.
126#[allow(clippy::type_complexity)]
127pub fn prune_nonoverlapping_ssts<'a>(
128    ssts: &'a [SstableInfo],
129    user_key_range: (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>),
130    table_id: StateTableId,
131) -> impl DoubleEndedIterator<Item = &'a SstableInfo> {
132    debug_assert!(can_concat(ssts));
133    let start_table_idx = match user_key_range.0 {
134        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
135        _ => 0,
136    };
137    let end_table_idx = match user_key_range.1 {
138        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
139        _ => ssts.len().saturating_sub(1),
140    };
141    ssts[start_table_idx..=end_table_idx]
142        .iter()
143        .filter(move |sst| sst.table_ids.binary_search(&table_id).is_ok())
144}
145
146type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
147enum MemoryRequest {
148    Ready(MemoryTracker),
149    Pending(Receiver<MemoryTracker>),
150}
151
152struct MemoryLimiterInner {
153    total_size: AtomicU64,
154    controller: Mutex<RequestQueue>,
155    has_waiter: AtomicBool,
156    quota: u64,
157}
158
159impl MemoryLimiterInner {
160    fn release_quota(&self, quota: u64) {
161        self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
162    }
163
164    fn add_memory(&self, quota: u64) {
165        self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
166    }
167
168    fn may_notify_waiters(self: &Arc<Self>) {
169        // check `has_waiter` to avoid access lock every times drop `MemoryTracker`.
170        if !self.has_waiter.load(AtomicOrdering::Acquire) {
171            return;
172        }
173        let mut notify_waiters = vec![];
174        {
175            let mut waiters = self.controller.lock();
176            while let Some((_, quota)) = waiters.front() {
177                if !self.try_require_memory(*quota) {
178                    break;
179                }
180                let (tx, quota) = waiters.pop_front().unwrap();
181                notify_waiters.push((tx, quota));
182            }
183
184            if waiters.is_empty() {
185                self.has_waiter.store(false, AtomicOrdering::Release);
186            }
187        }
188
189        for (tx, quota) in notify_waiters {
190            let _ = tx.send(MemoryTracker::new(self.clone(), quota));
191        }
192    }
193
194    fn try_require_memory(&self, quota: u64) -> bool {
195        let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
196        while self.permit_quota(current_quota, quota) {
197            match self.total_size.compare_exchange(
198                current_quota,
199                current_quota + quota,
200                AtomicOrdering::SeqCst,
201                AtomicOrdering::SeqCst,
202            ) {
203                Ok(_) => {
204                    return true;
205                }
206                Err(old_quota) => {
207                    current_quota = old_quota;
208                }
209            }
210        }
211        false
212    }
213
214    fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
215        let mut waiters = self.controller.lock();
216        let first_req = waiters.is_empty();
217        if first_req {
218            // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock.
219            self.has_waiter.store(true, AtomicOrdering::Release);
220        }
221        // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue.
222        if self.try_require_memory(quota) {
223            if first_req {
224                self.has_waiter.store(false, AtomicOrdering::Release);
225            }
226            return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
227        }
228        let (tx, rx) = channel();
229        waiters.push_back((tx, quota));
230        MemoryRequest::Pending(rx)
231    }
232
233    fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
234        current_quota <= self.quota
235    }
236}
237
238pub struct MemoryLimiter {
239    inner: Arc<MemoryLimiterInner>,
240}
241
242impl Debug for MemoryLimiter {
243    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct("MemoryLimiter")
245            .field("quota", &self.inner.quota)
246            .field("usage", &self.inner.total_size)
247            .finish()
248    }
249}
250
251pub struct MemoryTracker {
252    limiter: Arc<MemoryLimiterInner>,
253    quota: Option<u64>,
254}
255impl MemoryTracker {
256    fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
257        Self {
258            limiter,
259            quota: Some(quota),
260        }
261    }
262}
263
264impl Debug for MemoryTracker {
265    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
266        f.debug_struct("MemoryTracker")
267            .field("quota", &self.quota)
268            .finish()
269    }
270}
271
272impl MemoryLimiter {
273    pub fn unlimit() -> Arc<Self> {
274        Arc::new(Self::new(u64::MAX))
275    }
276
277    pub fn new(quota: u64) -> Self {
278        Self {
279            inner: Arc::new(MemoryLimiterInner {
280                total_size: AtomicU64::new(0),
281                controller: Mutex::new(VecDeque::default()),
282                has_waiter: AtomicBool::new(false),
283                quota,
284            }),
285        }
286    }
287
288    pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
289        if self.inner.try_require_memory(quota) {
290            Some(MemoryTracker::new(self.inner.clone(), quota))
291        } else {
292            None
293        }
294    }
295
296    pub fn get_memory_usage(&self) -> u64 {
297        self.inner.total_size.load(AtomicOrdering::Acquire)
298    }
299
300    pub fn quota(&self) -> u64 {
301        self.inner.quota
302    }
303
304    pub fn must_require_memory(&self, quota: u64) -> MemoryTracker {
305        if !self.inner.try_require_memory(quota) {
306            self.inner.add_memory(quota);
307        }
308
309        MemoryTracker::new(self.inner.clone(), quota)
310    }
311}
312
313impl MemoryLimiter {
314    pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
315        match self.inner.require_memory(quota) {
316            MemoryRequest::Ready(tracker) => tracker,
317            MemoryRequest::Pending(rx) => rx.await.unwrap(),
318        }
319    }
320}
321
322impl MemoryTracker {
323    pub fn try_increase_memory(&mut self, target: u64) -> bool {
324        let quota = self.quota.unwrap();
325        if quota >= target {
326            return true;
327        }
328        if self.limiter.try_require_memory(target - quota) {
329            self.quota = Some(target);
330            true
331        } else {
332            false
333        }
334    }
335}
336
337// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner.
338impl Drop for MemoryTracker {
339    fn drop(&mut self) {
340        if let Some(quota) = self.quota.take() {
341            self.limiter.release_quota(quota);
342            self.limiter.may_notify_waiters();
343        }
344    }
345}
346
347/// Check whether the items in `sub_iter` is a subset of the items in `full_iter`, and meanwhile
348/// preserve the order.
349pub fn check_subset_preserve_order<T: Eq>(
350    sub_iter: impl Iterator<Item = T>,
351    mut full_iter: impl Iterator<Item = T>,
352) -> bool {
353    for sub_iter_item in sub_iter {
354        let mut found = false;
355        for full_iter_item in full_iter.by_ref() {
356            if sub_iter_item == full_iter_item {
357                found = true;
358                break;
359            }
360        }
361        if !found {
362            return false;
363        }
364    }
365    true
366}
367
368static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));
369
370/// This function is intended to be called during compute node initialization if the storage
371/// sanity check is not desired. This controls a global flag so only need to be called once
372/// if need to disable the sanity check.
373pub fn disable_sanity_check() {
374    SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
375}
376
377pub(crate) fn sanity_check_enabled() -> bool {
378    SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
379}
380
381/// Make sure the key to insert should not exist in storage.
382pub(crate) async fn do_insert_sanity_check(
383    key: &TableKey<Bytes>,
384    value: &Bytes,
385    inner: &impl StateStoreRead,
386    table_id: TableId,
387    table_option: TableOption,
388    op_consistency_level: &OpConsistencyLevel,
389) -> StorageResult<()> {
390    if let OpConsistencyLevel::Inconsistent = op_consistency_level {
391        return Ok(());
392    }
393    let read_options = ReadOptions {
394        retention_seconds: table_option.retention_seconds,
395        table_id,
396        cache_policy: CachePolicy::Fill(CacheHint::Normal),
397        ..Default::default()
398    };
399    let stored_value = inner.get(key.clone(), read_options).await?;
400
401    if let Some(stored_value) = stored_value {
402        return Err(Box::new(MemTableError::InconsistentOperation {
403            key: key.clone(),
404            prev: KeyOp::Insert(stored_value),
405            new: KeyOp::Insert(value.clone()),
406        })
407        .into());
408    }
409    Ok(())
410}
411
412/// Make sure that the key to delete should exist in storage and the value should be matched.
413pub(crate) async fn do_delete_sanity_check(
414    key: &TableKey<Bytes>,
415    old_value: &Bytes,
416    inner: &impl StateStoreRead,
417    table_id: TableId,
418    table_option: TableOption,
419    op_consistency_level: &OpConsistencyLevel,
420) -> StorageResult<()> {
421    let OpConsistencyLevel::ConsistentOldValue {
422        check_old_value: old_value_checker,
423        ..
424    } = op_consistency_level
425    else {
426        return Ok(());
427    };
428    let read_options = ReadOptions {
429        retention_seconds: table_option.retention_seconds,
430        table_id,
431        cache_policy: CachePolicy::Fill(CacheHint::Normal),
432        ..Default::default()
433    };
434    match inner.get(key.clone(), read_options).await? {
435        None => Err(Box::new(MemTableError::InconsistentOperation {
436            key: key.clone(),
437            prev: KeyOp::Delete(Bytes::default()),
438            new: KeyOp::Delete(old_value.clone()),
439        })
440        .into()),
441        Some(stored_value) => {
442            if !old_value_checker(&stored_value, old_value) {
443                Err(Box::new(MemTableError::InconsistentOperation {
444                    key: key.clone(),
445                    prev: KeyOp::Insert(stored_value),
446                    new: KeyOp::Delete(old_value.clone()),
447                })
448                .into())
449            } else {
450                Ok(())
451            }
452        }
453    }
454}
455
456/// Make sure that the key to update should exist in storage and the value should be matched
457pub(crate) async fn do_update_sanity_check(
458    key: &TableKey<Bytes>,
459    old_value: &Bytes,
460    new_value: &Bytes,
461    inner: &impl StateStoreRead,
462    table_id: TableId,
463    table_option: TableOption,
464    op_consistency_level: &OpConsistencyLevel,
465) -> StorageResult<()> {
466    let OpConsistencyLevel::ConsistentOldValue {
467        check_old_value: old_value_checker,
468        ..
469    } = op_consistency_level
470    else {
471        return Ok(());
472    };
473    let read_options = ReadOptions {
474        retention_seconds: table_option.retention_seconds,
475        table_id,
476        cache_policy: CachePolicy::Fill(CacheHint::Normal),
477        ..Default::default()
478    };
479
480    match inner.get(key.clone(), read_options).await? {
481        None => Err(Box::new(MemTableError::InconsistentOperation {
482            key: key.clone(),
483            prev: KeyOp::Delete(Bytes::default()),
484            new: KeyOp::Update((old_value.clone(), new_value.clone())),
485        })
486        .into()),
487        Some(stored_value) => {
488            if !old_value_checker(&stored_value, old_value) {
489                Err(Box::new(MemTableError::InconsistentOperation {
490                    key: key.clone(),
491                    prev: KeyOp::Insert(stored_value),
492                    new: KeyOp::Update((old_value.clone(), new_value.clone())),
493                })
494                .into())
495            } else {
496                Ok(())
497            }
498        }
499    }
500}
501
502pub fn cmp_delete_range_left_bounds(a: Bound<&Bytes>, b: Bound<&Bytes>) -> Ordering {
503    match (a, b) {
504        // only right bound of delete range can be `Unbounded`
505        (Unbounded, _) | (_, Unbounded) => unreachable!(),
506        (Included(x), Included(y)) | (Excluded(x), Excluded(y)) => x.cmp(y),
507        (Included(x), Excluded(y)) => x.cmp(y).then(Ordering::Less),
508        (Excluded(x), Included(y)) => x.cmp(y).then(Ordering::Greater),
509    }
510}
511
512pub(crate) fn validate_delete_range(left: &Bound<Bytes>, right: &Bound<Bytes>) -> bool {
513    match (left, right) {
514        // only right bound of delete range can be `Unbounded`
515        (Unbounded, _) => unreachable!(),
516        (_, Unbounded) => true,
517        (Included(x), Included(y)) => x <= y,
518        (Included(x), Excluded(y)) | (Excluded(x), Included(y)) | (Excluded(x), Excluded(y)) => {
519            x < y
520        }
521    }
522}
523
524#[expect(dead_code)]
525pub(crate) fn filter_with_delete_range<'a>(
526    kv_iter: impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a,
527    mut delete_ranges_iter: impl Iterator<Item = &'a (Bound<Bytes>, Bound<Bytes>)> + 'a,
528) -> impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a {
529    let mut range = delete_ranges_iter.next();
530    if let Some((range_start, range_end)) = range {
531        assert!(
532            validate_delete_range(range_start, range_end),
533            "range_end {:?} smaller than range_start {:?}",
534            range_start,
535            range_end
536        );
537    }
538    kv_iter.filter(move |(key, _)| {
539        if let Some(range_bound) = range {
540            if cmp_delete_range_left_bounds(Included(&key.0), range_bound.0.as_ref())
541                == Ordering::Less
542            {
543                true
544            } else if range_bound.contains(key.as_ref()) {
545                false
546            } else {
547                // Key has exceeded the current key range. Advance to the next range.
548                loop {
549                    range = delete_ranges_iter.next();
550                    if let Some(range_bound) = range {
551                        assert!(
552                            validate_delete_range(&range_bound.0, &range_bound.1),
553                            "range_end {:?} smaller than range_start {:?}",
554                            range_bound.0,
555                            range_bound.1
556                        );
557                        if cmp_delete_range_left_bounds(Included(key), range_bound.0.as_ref())
558                            == Ordering::Less
559                        {
560                            // Not fall in the next delete range
561                            break true;
562                        } else if range_bound.contains(key.as_ref()) {
563                            // Fall in the next delete range
564                            break false;
565                        } else {
566                            // Exceed the next delete range. Go to the next delete range if there is
567                            // any in the next loop
568                            continue;
569                        }
570                    } else {
571                        // No more delete range.
572                        break true;
573                    }
574                }
575            }
576        } else {
577            true
578        }
579    })
580}
581
582/// Wait for the `committed_epoch` of `table_id` to reach `wait_epoch`.
583///
584/// When the `table_id` does not exist in the latest version, we assume that
585/// the table is not created yet, and will wait until the table is created.
586pub(crate) async fn wait_for_epoch(
587    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
588    wait_epoch: u64,
589    table_id: TableId,
590) -> StorageResult<()> {
591    let mut prev_committed_epoch = None;
592    let prev_committed_epoch = &mut prev_committed_epoch;
593    wait_for_update(
594        notifier,
595        |version| {
596            let committed_epoch = version.table_committed_epoch(table_id);
597            let ret = if let Some(committed_epoch) = committed_epoch {
598                if committed_epoch >= wait_epoch {
599                    Ok(true)
600                } else {
601                    Ok(false)
602                }
603            } else if prev_committed_epoch.is_none() {
604                Ok(false)
605            } else {
606                Err(HummockError::wait_epoch(format!(
607                    "table {} has been dropped",
608                    table_id
609                )))
610            };
611            *prev_committed_epoch = committed_epoch;
612            ret
613        },
614        || {
615            format!(
616                "wait_for_epoch: epoch: {}, table_id: {}",
617                wait_epoch, table_id
618            )
619        },
620    )
621    .await?;
622    Ok(())
623}
624
625pub(crate) async fn wait_for_update(
626    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
627    mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult<bool>,
628    mut periodic_debug_info: impl FnMut() -> String,
629) -> HummockResult<()> {
630    let mut receiver = notifier.subscribe();
631    if inspect_fn(&receiver.borrow_and_update())? {
632        return Ok(());
633    }
634    let start_time = Instant::now();
635    loop {
636        match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
637            Err(_) => {
638                // Provide backtrace iff in debug mode for observability.
639                let backtrace = cfg!(debug_assertions)
640                    .then(Backtrace::capture)
641                    .map(tracing::field::display);
642
643                // The reason that we need to retry here is batch scan in
644                // chain/rearrange_chain is waiting for an
645                // uncommitted epoch carried by the CreateMV barrier, which
646                // can take unbounded time to become committed and propagate
647                // to the CN. We should consider removing the retry as well as wait_epoch
648                // for chain/rearrange_chain if we enforce
649                // chain/rearrange_chain to be scheduled on the same
650                // CN with the same distribution as the upstream MV.
651                // See #3845 for more details.
652                tracing::warn!(
653                    info = periodic_debug_info(),
654                    elapsed = ?start_time.elapsed(),
655                    backtrace,
656                    "timeout when waiting for version update",
657                );
658                continue;
659            }
660            Ok(Err(_)) => {
661                return Err(HummockError::wait_epoch("tx dropped"));
662            }
663            Ok(Ok(_)) => {
664                if inspect_fn(&receiver.borrow_and_update())? {
665                    return Ok(());
666                }
667            }
668        }
669    }
670}
671
672pub struct HummockMemoryCollector {
673    sstable_store: SstableStoreRef,
674    limiter: Arc<MemoryLimiter>,
675    storage_memory_config: StorageMemoryConfig,
676}
677
678impl HummockMemoryCollector {
679    pub fn new(
680        sstable_store: SstableStoreRef,
681        limiter: Arc<MemoryLimiter>,
682        storage_memory_config: StorageMemoryConfig,
683    ) -> Self {
684        Self {
685            sstable_store,
686            limiter,
687            storage_memory_config,
688        }
689    }
690}
691
692impl MemoryCollector for HummockMemoryCollector {
693    fn get_meta_memory_usage(&self) -> u64 {
694        self.sstable_store.meta_cache().memory().usage() as _
695    }
696
697    fn get_data_memory_usage(&self) -> u64 {
698        self.sstable_store.block_cache().memory().usage() as _
699    }
700
701    fn get_uploading_memory_usage(&self) -> u64 {
702        self.limiter.get_memory_usage()
703    }
704
705    fn get_prefetch_memory_usage(&self) -> usize {
706        self.sstable_store.get_prefetch_memory_usage()
707    }
708
709    fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
710        self.sstable_store.meta_cache().memory().usage() as f64
711            / self.sstable_store.meta_cache().memory().capacity() as f64
712    }
713
714    fn get_block_cache_memory_usage_ratio(&self) -> f64 {
715        self.sstable_store.block_cache().memory().usage() as f64
716            / self.sstable_store.block_cache().memory().capacity() as f64
717    }
718
719    fn get_shared_buffer_usage_ratio(&self) -> f64 {
720        self.limiter.get_memory_usage() as f64
721            / (self.storage_memory_config.shared_buffer_capacity_mb * 1024 * 1024) as f64
722    }
723}
724
725#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
726pub(crate) async fn merge_stream<'a>(
727    mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
728    inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
729    table_id: TableId,
730    epoch: u64,
731    rev: bool,
732) {
733    let inner_stream = inner_stream.peekable();
734    pin_mut!(inner_stream);
735
736    let mut mem_table_iter = mem_table_iter.fuse().peekable();
737
738    loop {
739        match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
740            (None, None) => break,
741            // The mem table side has come to an end, return data from the shared storage.
742            (Some(_), None) => {
743                let (key, value) = inner_stream.next().await.unwrap()?;
744                yield (key, value)
745            }
746            // The stream side has come to an end, return data from the mem table.
747            (None, Some(_)) => {
748                let (key, key_op) = mem_table_iter.next().unwrap();
749                match key_op {
750                    KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
751                        yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
752                    }
753                    _ => {}
754                }
755            }
756            (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
757                debug_assert_eq!(inner_key.user_key.table_id, table_id);
758                let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
759                if rev {
760                    ret = ret.reverse();
761                }
762                match ret {
763                    Ordering::Less => {
764                        // yield data from storage
765                        let (key, value) = inner_stream.next().await.unwrap()?;
766                        yield (key, value);
767                    }
768                    Ordering::Equal => {
769                        // both memtable and storage contain the key, so we advance both
770                        // iterators and return the data in memory.
771
772                        let (_, key_op) = mem_table_iter.next().unwrap();
773                        let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
774                        match key_op {
775                            KeyOp::Insert(value) => {
776                                yield (key.clone(), value.clone());
777                            }
778                            KeyOp::Delete(_) => {}
779                            KeyOp::Update((old_value, new_value)) => {
780                                debug_assert!(old_value == &old_value_in_inner);
781
782                                yield (key, new_value.clone());
783                            }
784                        }
785                    }
786                    Ordering::Greater => {
787                        // yield data from mem table
788                        let (key, key_op) = mem_table_iter.next().unwrap();
789
790                        match key_op {
791                            KeyOp::Insert(value) => {
792                                yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
793                            }
794                            KeyOp::Delete(_) => {}
795                            KeyOp::Update(_) => unreachable!(
796                                "memtable update should always be paired with a storage key"
797                            ),
798                        }
799                    }
800                }
801            }
802            (Some(Err(_)), Some(_)) => {
803                // Throw the error.
804                return Err(inner_stream.next().await.unwrap().unwrap_err());
805            }
806        }
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use std::future::{Future, poll_fn};
813    use std::sync::Arc;
814    use std::task::Poll;
815
816    use futures::FutureExt;
817    use futures::future::join_all;
818    use rand::random_range;
819
820    use crate::hummock::utils::MemoryLimiter;
821
822    async fn assert_pending(future: &mut (impl Future + Unpin)) {
823        for _ in 0..10 {
824            assert!(
825                poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
826                    .await
827                    .is_pending()
828            );
829        }
830    }
831
832    #[tokio::test]
833    async fn test_loose_memory_limiter() {
834        let quota = 5;
835        let memory_limiter = MemoryLimiter::new(quota);
836        drop(memory_limiter.require_memory(6).await);
837        let tracker1 = memory_limiter.require_memory(3).await;
838        assert_eq!(3, memory_limiter.get_memory_usage());
839        let tracker2 = memory_limiter.require_memory(4).await;
840        assert_eq!(7, memory_limiter.get_memory_usage());
841        let mut future = memory_limiter.require_memory(5).boxed();
842        assert_pending(&mut future).await;
843        assert_eq!(7, memory_limiter.get_memory_usage());
844        drop(tracker1);
845        let tracker3 = future.await;
846        assert_eq!(9, memory_limiter.get_memory_usage());
847        drop(tracker2);
848        assert_eq!(5, memory_limiter.get_memory_usage());
849        drop(tracker3);
850        assert_eq!(0, memory_limiter.get_memory_usage());
851    }
852
853    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
854    async fn test_multi_thread_acquire_memory() {
855        const QUOTA: u64 = 10;
856        let memory_limiter = Arc::new(MemoryLimiter::new(200));
857        let mut handles = vec![];
858        for _ in 0..40 {
859            let limiter = memory_limiter.clone();
860            let h = tokio::spawn(async move {
861                let mut buffers = vec![];
862                let mut current_buffer_usage = random_range(2..=9);
863                for _ in 0..1000 {
864                    if buffers.len() < current_buffer_usage
865                        && let Some(tracker) = limiter.try_require_memory(QUOTA)
866                    {
867                        buffers.push(tracker);
868                    } else {
869                        buffers.clear();
870                        current_buffer_usage = random_range(2..=9);
871                        let req = limiter.require_memory(QUOTA);
872                        match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
873                            Ok(tracker) => {
874                                buffers.push(tracker);
875                            }
876                            Err(_) => {
877                                continue;
878                            }
879                        }
880                    }
881                    let sleep_time = random_range(1..=3);
882                    tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
883                }
884            });
885            handles.push(h);
886        }
887        let h = join_all(handles);
888        let _ = h.await;
889    }
890}