risingwave_storage/hummock/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::backtrace::Backtrace;
16use std::cmp::Ordering;
17use std::collections::VecDeque;
18use std::fmt::{Debug, Formatter};
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::ops::{Bound, RangeBounds};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
23use std::time::{Duration, Instant};
24
25use bytes::Bytes;
26use foyer::Hint;
27use futures::{Stream, StreamExt, pin_mut};
28use parking_lot::Mutex;
29use risingwave_common::catalog::{TableId, TableOption};
30use risingwave_common::config::StorageMemoryConfig;
31use risingwave_expr::codegen::try_stream;
32use risingwave_hummock_sdk::can_concat;
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::key::{
35    EmptySliceRef, FullKey, TableKey, UserKey, bound_table_key_range,
36};
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use tokio::sync::oneshot::{Receiver, Sender, channel};
39
40use super::{HummockError, HummockResult, SstableStoreRef};
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::hummock::local_version::pinned_version::PinnedVersion;
44use crate::mem_table::{KeyOp, MemTableError};
45use crate::monitor::MemoryCollector;
46use crate::store::{
47    OpConsistencyLevel, ReadOptions, StateStoreGet, StateStoreKeyedRow, StateStoreRead,
48};
49
50pub fn range_overlap<R, B>(
51    search_key_range: &R,
52    inclusive_start_key: &B,
53    end_key: Bound<&B>,
54) -> bool
55where
56    R: RangeBounds<B>,
57    B: Ord,
58{
59    let (start_bound, end_bound) = (search_key_range.start_bound(), search_key_range.end_bound());
60
61    //        RANGE
62    // TABLE
63    let too_left = match (start_bound, end_key) {
64        (Included(range_start), Included(inclusive_end_key)) => range_start > inclusive_end_key,
65        (Included(range_start), Excluded(end_key))
66        | (Excluded(range_start), Included(end_key))
67        | (Excluded(range_start), Excluded(end_key)) => range_start >= end_key,
68        (Unbounded, _) | (_, Unbounded) => false,
69    };
70    // RANGE
71    //        TABLE
72    let too_right = match end_bound {
73        Included(range_end) => range_end < inclusive_start_key,
74        Excluded(range_end) => range_end <= inclusive_start_key,
75        Unbounded => false,
76    };
77
78    !too_left && !too_right
79}
80
81pub fn filter_single_sst<R, B>(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool
82where
83    R: RangeBounds<TableKey<B>>,
84    B: AsRef<[u8]> + EmptySliceRef,
85{
86    let table_range = &info.key_range;
87    let table_start = FullKey::decode(table_range.left.as_ref()).user_key;
88    let table_end = FullKey::decode(table_range.right.as_ref()).user_key;
89    let (left, right) = bound_table_key_range(table_id, table_key_range);
90    let left: Bound<UserKey<&[u8]>> = left.as_ref().map(|key| key.as_ref());
91    let right: Bound<UserKey<&[u8]>> = right.as_ref().map(|key| key.as_ref());
92    range_overlap(
93        &(left, right),
94        &table_start,
95        if table_range.right_exclusive {
96            Bound::Excluded(&table_end)
97        } else {
98            Bound::Included(&table_end)
99        },
100    ) && info.table_ids.binary_search(&table_id.table_id()).is_ok()
101}
102
103/// Search the SST containing the specified key within a level, using binary search.
104pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize {
105    ssts.partition_point(|table| {
106        let ord = FullKey::decode(&table.key_range.left).user_key.cmp(&key);
107        ord == Ordering::Less || ord == Ordering::Equal
108    })
109}
110
111/// Prune overlapping SSTs that does not overlap with a specific key range or does not overlap with
112/// a specific table id. Returns the sst ids after pruning.
113pub fn prune_overlapping_ssts<'a, R, B>(
114    ssts: &'a [SstableInfo],
115    table_id: TableId,
116    table_key_range: &'a R,
117) -> impl DoubleEndedIterator<Item = &'a SstableInfo>
118where
119    R: RangeBounds<TableKey<B>>,
120    B: AsRef<[u8]> + EmptySliceRef,
121{
122    ssts.iter()
123        .filter(move |info| filter_single_sst(info, table_id, table_key_range))
124}
125
126/// Prune non-overlapping SSTs that does not overlap with a specific key range or does not overlap
127/// with a specific table id. Returns the sst ids after pruning.
128#[allow(clippy::type_complexity)]
129pub fn prune_nonoverlapping_ssts<'a>(
130    ssts: &'a [SstableInfo],
131    user_key_range: (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>),
132    table_id: StateTableId,
133) -> impl DoubleEndedIterator<Item = &'a SstableInfo> {
134    debug_assert!(can_concat(ssts));
135    let start_table_idx = match user_key_range.0 {
136        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
137        _ => 0,
138    };
139    let end_table_idx = match user_key_range.1 {
140        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
141        _ => ssts.len().saturating_sub(1),
142    };
143    ssts[start_table_idx..=end_table_idx]
144        .iter()
145        .filter(move |sst| sst.table_ids.binary_search(&table_id).is_ok())
146}
147
148type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
149enum MemoryRequest {
150    Ready(MemoryTracker),
151    Pending(Receiver<MemoryTracker>),
152}
153
154struct MemoryLimiterInner {
155    total_size: AtomicU64,
156    controller: Mutex<RequestQueue>,
157    has_waiter: AtomicBool,
158    quota: u64,
159}
160
161impl MemoryLimiterInner {
162    fn release_quota(&self, quota: u64) {
163        self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
164    }
165
166    fn add_memory(&self, quota: u64) {
167        self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
168    }
169
170    fn may_notify_waiters(self: &Arc<Self>) {
171        // check `has_waiter` to avoid access lock every times drop `MemoryTracker`.
172        if !self.has_waiter.load(AtomicOrdering::Acquire) {
173            return;
174        }
175        let mut notify_waiters = vec![];
176        {
177            let mut waiters = self.controller.lock();
178            while let Some((_, quota)) = waiters.front() {
179                if !self.try_require_memory(*quota) {
180                    break;
181                }
182                let (tx, quota) = waiters.pop_front().unwrap();
183                notify_waiters.push((tx, quota));
184            }
185
186            if waiters.is_empty() {
187                self.has_waiter.store(false, AtomicOrdering::Release);
188            }
189        }
190
191        for (tx, quota) in notify_waiters {
192            let _ = tx.send(MemoryTracker::new(self.clone(), quota));
193        }
194    }
195
196    fn try_require_memory(&self, quota: u64) -> bool {
197        let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
198        while self.permit_quota(current_quota, quota) {
199            match self.total_size.compare_exchange(
200                current_quota,
201                current_quota + quota,
202                AtomicOrdering::SeqCst,
203                AtomicOrdering::SeqCst,
204            ) {
205                Ok(_) => {
206                    return true;
207                }
208                Err(old_quota) => {
209                    current_quota = old_quota;
210                }
211            }
212        }
213        false
214    }
215
216    fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
217        let mut waiters = self.controller.lock();
218        let first_req = waiters.is_empty();
219        if first_req {
220            // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock.
221            self.has_waiter.store(true, AtomicOrdering::Release);
222        }
223        // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue.
224        if self.try_require_memory(quota) {
225            if first_req {
226                self.has_waiter.store(false, AtomicOrdering::Release);
227            }
228            return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
229        }
230        let (tx, rx) = channel();
231        waiters.push_back((tx, quota));
232        MemoryRequest::Pending(rx)
233    }
234
235    fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
236        current_quota <= self.quota
237    }
238}
239
240pub struct MemoryLimiter {
241    inner: Arc<MemoryLimiterInner>,
242}
243
244impl Debug for MemoryLimiter {
245    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
246        f.debug_struct("MemoryLimiter")
247            .field("quota", &self.inner.quota)
248            .field("usage", &self.inner.total_size)
249            .finish()
250    }
251}
252
253pub struct MemoryTracker {
254    limiter: Arc<MemoryLimiterInner>,
255    quota: Option<u64>,
256}
257impl MemoryTracker {
258    fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
259        Self {
260            limiter,
261            quota: Some(quota),
262        }
263    }
264}
265
266impl Debug for MemoryTracker {
267    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268        f.debug_struct("MemoryTracker")
269            .field("quota", &self.quota)
270            .finish()
271    }
272}
273
274impl MemoryLimiter {
275    pub fn unlimit() -> Arc<Self> {
276        Arc::new(Self::new(u64::MAX))
277    }
278
279    pub fn new(quota: u64) -> Self {
280        Self {
281            inner: Arc::new(MemoryLimiterInner {
282                total_size: AtomicU64::new(0),
283                controller: Mutex::new(VecDeque::default()),
284                has_waiter: AtomicBool::new(false),
285                quota,
286            }),
287        }
288    }
289
290    pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
291        if self.inner.try_require_memory(quota) {
292            Some(MemoryTracker::new(self.inner.clone(), quota))
293        } else {
294            None
295        }
296    }
297
298    pub fn get_memory_usage(&self) -> u64 {
299        self.inner.total_size.load(AtomicOrdering::Acquire)
300    }
301
302    pub fn quota(&self) -> u64 {
303        self.inner.quota
304    }
305
306    pub fn must_require_memory(&self, quota: u64) -> MemoryTracker {
307        if !self.inner.try_require_memory(quota) {
308            self.inner.add_memory(quota);
309        }
310
311        MemoryTracker::new(self.inner.clone(), quota)
312    }
313}
314
315impl MemoryLimiter {
316    pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
317        match self.inner.require_memory(quota) {
318            MemoryRequest::Ready(tracker) => tracker,
319            MemoryRequest::Pending(rx) => rx.await.unwrap(),
320        }
321    }
322}
323
324impl MemoryTracker {
325    pub fn try_increase_memory(&mut self, target: u64) -> bool {
326        let quota = self.quota.unwrap();
327        if quota >= target {
328            return true;
329        }
330        if self.limiter.try_require_memory(target - quota) {
331            self.quota = Some(target);
332            true
333        } else {
334            false
335        }
336    }
337}
338
339// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner.
340impl Drop for MemoryTracker {
341    fn drop(&mut self) {
342        if let Some(quota) = self.quota.take() {
343            self.limiter.release_quota(quota);
344            self.limiter.may_notify_waiters();
345        }
346    }
347}
348
349/// Check whether the items in `sub_iter` is a subset of the items in `full_iter`, and meanwhile
350/// preserve the order.
351pub fn check_subset_preserve_order<T: Eq>(
352    sub_iter: impl Iterator<Item = T>,
353    mut full_iter: impl Iterator<Item = T>,
354) -> bool {
355    for sub_iter_item in sub_iter {
356        let mut found = false;
357        for full_iter_item in full_iter.by_ref() {
358            if sub_iter_item == full_iter_item {
359                found = true;
360                break;
361            }
362        }
363        if !found {
364            return false;
365        }
366    }
367    true
368}
369
370static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));
371
372/// This function is intended to be called during compute node initialization if the storage
373/// sanity check is not desired. This controls a global flag so only need to be called once
374/// if need to disable the sanity check.
375pub fn disable_sanity_check() {
376    SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
377}
378
379pub(crate) fn sanity_check_enabled() -> bool {
380    SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
381}
382
383async fn get_from_state_store(
384    state_store: &impl StateStoreGet,
385    key: TableKey<Bytes>,
386    read_options: ReadOptions,
387) -> StorageResult<Option<Bytes>> {
388    state_store
389        .on_key_value(key, read_options, |_, value| {
390            Ok(Bytes::copy_from_slice(value))
391        })
392        .await
393}
394
395/// Make sure the key to insert should not exist in storage.
396pub(crate) async fn do_insert_sanity_check(
397    table_id: TableId,
398    key: &TableKey<Bytes>,
399    value: &Bytes,
400    inner: &impl StateStoreRead,
401    table_option: TableOption,
402    op_consistency_level: &OpConsistencyLevel,
403) -> StorageResult<()> {
404    if let OpConsistencyLevel::Inconsistent = op_consistency_level {
405        return Ok(());
406    }
407    let read_options = ReadOptions {
408        retention_seconds: table_option.retention_seconds,
409        cache_policy: CachePolicy::Fill(Hint::Normal),
410        ..Default::default()
411    };
412    let stored_value = get_from_state_store(inner, key.clone(), read_options).await?;
413
414    if let Some(stored_value) = stored_value {
415        return Err(Box::new(MemTableError::InconsistentOperation {
416            table_id,
417            key: key.clone(),
418            prev: KeyOp::Insert(stored_value),
419            new: KeyOp::Insert(value.clone()),
420        })
421        .into());
422    }
423    Ok(())
424}
425
426/// Make sure that the key to delete should exist in storage and the value should be matched.
427pub(crate) async fn do_delete_sanity_check(
428    table_id: TableId,
429    key: &TableKey<Bytes>,
430    old_value: &Bytes,
431    inner: &impl StateStoreRead,
432    table_option: TableOption,
433    op_consistency_level: &OpConsistencyLevel,
434) -> StorageResult<()> {
435    let OpConsistencyLevel::ConsistentOldValue {
436        check_old_value: old_value_checker,
437        ..
438    } = op_consistency_level
439    else {
440        return Ok(());
441    };
442    let read_options = ReadOptions {
443        retention_seconds: table_option.retention_seconds,
444        cache_policy: CachePolicy::Fill(Hint::Normal),
445        ..Default::default()
446    };
447    match get_from_state_store(inner, key.clone(), read_options).await? {
448        None => Err(Box::new(MemTableError::InconsistentOperation {
449            table_id,
450            key: key.clone(),
451            prev: KeyOp::Delete(Bytes::default()),
452            new: KeyOp::Delete(old_value.clone()),
453        })
454        .into()),
455        Some(stored_value) => {
456            if !old_value_checker(&stored_value, old_value) {
457                Err(Box::new(MemTableError::InconsistentOperation {
458                    table_id,
459                    key: key.clone(),
460                    prev: KeyOp::Insert(stored_value),
461                    new: KeyOp::Delete(old_value.clone()),
462                })
463                .into())
464            } else {
465                Ok(())
466            }
467        }
468    }
469}
470
471/// Make sure that the key to update should exist in storage and the value should be matched
472pub(crate) async fn do_update_sanity_check(
473    table_id: TableId,
474    key: &TableKey<Bytes>,
475    old_value: &Bytes,
476    new_value: &Bytes,
477    inner: &impl StateStoreRead,
478    table_option: TableOption,
479    op_consistency_level: &OpConsistencyLevel,
480) -> StorageResult<()> {
481    let OpConsistencyLevel::ConsistentOldValue {
482        check_old_value: old_value_checker,
483        ..
484    } = op_consistency_level
485    else {
486        return Ok(());
487    };
488    let read_options = ReadOptions {
489        retention_seconds: table_option.retention_seconds,
490        cache_policy: CachePolicy::Fill(Hint::Normal),
491        ..Default::default()
492    };
493
494    match get_from_state_store(inner, key.clone(), read_options).await? {
495        None => Err(Box::new(MemTableError::InconsistentOperation {
496            table_id,
497            key: key.clone(),
498            prev: KeyOp::Delete(Bytes::default()),
499            new: KeyOp::Update((old_value.clone(), new_value.clone())),
500        })
501        .into()),
502        Some(stored_value) => {
503            if !old_value_checker(&stored_value, old_value) {
504                Err(Box::new(MemTableError::InconsistentOperation {
505                    table_id,
506                    key: key.clone(),
507                    prev: KeyOp::Insert(stored_value),
508                    new: KeyOp::Update((old_value.clone(), new_value.clone())),
509                })
510                .into())
511            } else {
512                Ok(())
513            }
514        }
515    }
516}
517
518pub fn cmp_delete_range_left_bounds(a: Bound<&Bytes>, b: Bound<&Bytes>) -> Ordering {
519    match (a, b) {
520        // only right bound of delete range can be `Unbounded`
521        (Unbounded, _) | (_, Unbounded) => unreachable!(),
522        (Included(x), Included(y)) | (Excluded(x), Excluded(y)) => x.cmp(y),
523        (Included(x), Excluded(y)) => x.cmp(y).then(Ordering::Less),
524        (Excluded(x), Included(y)) => x.cmp(y).then(Ordering::Greater),
525    }
526}
527
528pub(crate) fn validate_delete_range(left: &Bound<Bytes>, right: &Bound<Bytes>) -> bool {
529    match (left, right) {
530        // only right bound of delete range can be `Unbounded`
531        (Unbounded, _) => unreachable!(),
532        (_, Unbounded) => true,
533        (Included(x), Included(y)) => x <= y,
534        (Included(x), Excluded(y)) | (Excluded(x), Included(y)) | (Excluded(x), Excluded(y)) => {
535            x < y
536        }
537    }
538}
539
540#[expect(dead_code)]
541pub(crate) fn filter_with_delete_range<'a>(
542    kv_iter: impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a,
543    mut delete_ranges_iter: impl Iterator<Item = &'a (Bound<Bytes>, Bound<Bytes>)> + 'a,
544) -> impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a {
545    let mut range = delete_ranges_iter.next();
546    if let Some((range_start, range_end)) = range {
547        assert!(
548            validate_delete_range(range_start, range_end),
549            "range_end {:?} smaller than range_start {:?}",
550            range_start,
551            range_end
552        );
553    }
554    kv_iter.filter(move |(key, _)| {
555        if let Some(range_bound) = range {
556            if cmp_delete_range_left_bounds(Included(&key.0), range_bound.0.as_ref())
557                == Ordering::Less
558            {
559                true
560            } else if range_bound.contains(key.as_ref()) {
561                false
562            } else {
563                // Key has exceeded the current key range. Advance to the next range.
564                loop {
565                    range = delete_ranges_iter.next();
566                    if let Some(range_bound) = range {
567                        assert!(
568                            validate_delete_range(&range_bound.0, &range_bound.1),
569                            "range_end {:?} smaller than range_start {:?}",
570                            range_bound.0,
571                            range_bound.1
572                        );
573                        if cmp_delete_range_left_bounds(Included(key), range_bound.0.as_ref())
574                            == Ordering::Less
575                        {
576                            // Not fall in the next delete range
577                            break true;
578                        } else if range_bound.contains(key.as_ref()) {
579                            // Fall in the next delete range
580                            break false;
581                        } else {
582                            // Exceed the next delete range. Go to the next delete range if there is
583                            // any in the next loop
584                            continue;
585                        }
586                    } else {
587                        // No more delete range.
588                        break true;
589                    }
590                }
591            }
592        } else {
593            true
594        }
595    })
596}
597
598/// Wait for the `committed_epoch` of `table_id` to reach `wait_epoch`.
599///
600/// When the `table_id` does not exist in the latest version, we assume that
601/// the table is not created yet, and will wait until the table is created.
602pub(crate) async fn wait_for_epoch(
603    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
604    wait_epoch: u64,
605    table_id: TableId,
606) -> StorageResult<PinnedVersion> {
607    let mut prev_committed_epoch = None;
608    let prev_committed_epoch = &mut prev_committed_epoch;
609    let version = wait_for_update(
610        notifier,
611        |version| {
612            let committed_epoch = version.table_committed_epoch(table_id);
613            let ret = if let Some(committed_epoch) = committed_epoch {
614                if committed_epoch >= wait_epoch {
615                    Ok(true)
616                } else {
617                    Ok(false)
618                }
619            } else if prev_committed_epoch.is_none() {
620                Ok(false)
621            } else {
622                Err(HummockError::wait_epoch(format!(
623                    "table {} has been dropped",
624                    table_id
625                )))
626            };
627            *prev_committed_epoch = committed_epoch;
628            ret
629        },
630        || {
631            format!(
632                "wait_for_epoch: epoch: {}, table_id: {}",
633                wait_epoch, table_id
634            )
635        },
636    )
637    .await?;
638    Ok(version)
639}
640
641pub(crate) async fn wait_for_update(
642    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
643    mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult<bool>,
644    mut periodic_debug_info: impl FnMut() -> String,
645) -> HummockResult<PinnedVersion> {
646    let mut receiver = notifier.subscribe();
647    {
648        let version = receiver.borrow_and_update();
649        if inspect_fn(&version)? {
650            return Ok(version.clone());
651        }
652    }
653    let start_time = Instant::now();
654    loop {
655        match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
656            Err(_) => {
657                // Provide backtrace iff in debug mode for observability.
658                let backtrace = cfg!(debug_assertions)
659                    .then(Backtrace::capture)
660                    .map(tracing::field::display);
661
662                // The reason that we need to retry here is batch scan in
663                // chain/rearrange_chain is waiting for an
664                // uncommitted epoch carried by the CreateMV barrier, which
665                // can take unbounded time to become committed and propagate
666                // to the CN. We should consider removing the retry as well as wait_epoch
667                // for chain/rearrange_chain if we enforce
668                // chain/rearrange_chain to be scheduled on the same
669                // CN with the same distribution as the upstream MV.
670                // See #3845 for more details.
671                tracing::warn!(
672                    info = periodic_debug_info(),
673                    elapsed = ?start_time.elapsed(),
674                    backtrace,
675                    "timeout when waiting for version update",
676                );
677                continue;
678            }
679            Ok(Err(_)) => {
680                return Err(HummockError::wait_epoch("tx dropped"));
681            }
682            Ok(Ok(_)) => {
683                let version = receiver.borrow_and_update();
684                if inspect_fn(&version)? {
685                    return Ok(version.clone());
686                }
687            }
688        }
689    }
690}
691
692pub struct HummockMemoryCollector {
693    sstable_store: SstableStoreRef,
694    limiter: Arc<MemoryLimiter>,
695    storage_memory_config: StorageMemoryConfig,
696}
697
698impl HummockMemoryCollector {
699    pub fn new(
700        sstable_store: SstableStoreRef,
701        limiter: Arc<MemoryLimiter>,
702        storage_memory_config: StorageMemoryConfig,
703    ) -> Self {
704        Self {
705            sstable_store,
706            limiter,
707            storage_memory_config,
708        }
709    }
710}
711
712impl MemoryCollector for HummockMemoryCollector {
713    fn get_meta_memory_usage(&self) -> u64 {
714        self.sstable_store.meta_cache().memory().usage() as _
715    }
716
717    fn get_data_memory_usage(&self) -> u64 {
718        self.sstable_store.block_cache().memory().usage() as _
719    }
720
721    fn get_uploading_memory_usage(&self) -> u64 {
722        self.limiter.get_memory_usage()
723    }
724
725    fn get_prefetch_memory_usage(&self) -> usize {
726        self.sstable_store.get_prefetch_memory_usage()
727    }
728
729    fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
730        self.sstable_store.meta_cache().memory().usage() as f64
731            / self.sstable_store.meta_cache().memory().capacity() as f64
732    }
733
734    fn get_block_cache_memory_usage_ratio(&self) -> f64 {
735        self.sstable_store.block_cache().memory().usage() as f64
736            / self.sstable_store.block_cache().memory().capacity() as f64
737    }
738
739    fn get_shared_buffer_usage_ratio(&self) -> f64 {
740        self.limiter.get_memory_usage() as f64
741            / (self.storage_memory_config.shared_buffer_capacity_mb * 1024 * 1024) as f64
742    }
743}
744
745#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
746pub(crate) async fn merge_stream<'a>(
747    mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
748    inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
749    table_id: TableId,
750    epoch: u64,
751    rev: bool,
752) {
753    let inner_stream = inner_stream.peekable();
754    pin_mut!(inner_stream);
755
756    let mut mem_table_iter = mem_table_iter.fuse().peekable();
757
758    loop {
759        match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
760            (None, None) => break,
761            // The mem table side has come to an end, return data from the shared storage.
762            (Some(_), None) => {
763                let (key, value) = inner_stream.next().await.unwrap()?;
764                yield (key, value)
765            }
766            // The stream side has come to an end, return data from the mem table.
767            (None, Some(_)) => {
768                let (key, key_op) = mem_table_iter.next().unwrap();
769                match key_op {
770                    KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
771                        yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
772                    }
773                    _ => {}
774                }
775            }
776            (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
777                debug_assert_eq!(inner_key.user_key.table_id, table_id);
778                let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
779                if rev {
780                    ret = ret.reverse();
781                }
782                match ret {
783                    Ordering::Less => {
784                        // yield data from storage
785                        let (key, value) = inner_stream.next().await.unwrap()?;
786                        yield (key, value);
787                    }
788                    Ordering::Equal => {
789                        // both memtable and storage contain the key, so we advance both
790                        // iterators and return the data in memory.
791
792                        let (_, key_op) = mem_table_iter.next().unwrap();
793                        let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
794                        match key_op {
795                            KeyOp::Insert(value) => {
796                                yield (key.clone(), value.clone());
797                            }
798                            KeyOp::Delete(_) => {}
799                            KeyOp::Update((old_value, new_value)) => {
800                                debug_assert!(old_value == &old_value_in_inner);
801
802                                yield (key, new_value.clone());
803                            }
804                        }
805                    }
806                    Ordering::Greater => {
807                        // yield data from mem table
808                        let (key, key_op) = mem_table_iter.next().unwrap();
809
810                        match key_op {
811                            KeyOp::Insert(value) => {
812                                yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
813                            }
814                            KeyOp::Delete(_) => {}
815                            KeyOp::Update(_) => unreachable!(
816                                "memtable update should always be paired with a storage key"
817                            ),
818                        }
819                    }
820                }
821            }
822            (Some(Err(_)), Some(_)) => {
823                // Throw the error.
824                return Err(inner_stream.next().await.unwrap().unwrap_err());
825            }
826        }
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use std::future::{Future, poll_fn};
833    use std::sync::Arc;
834    use std::task::Poll;
835
836    use futures::FutureExt;
837    use futures::future::join_all;
838    use rand::random_range;
839
840    use crate::hummock::utils::MemoryLimiter;
841
842    async fn assert_pending(future: &mut (impl Future + Unpin)) {
843        for _ in 0..10 {
844            assert!(
845                poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
846                    .await
847                    .is_pending()
848            );
849        }
850    }
851
852    #[tokio::test]
853    async fn test_loose_memory_limiter() {
854        let quota = 5;
855        let memory_limiter = MemoryLimiter::new(quota);
856        drop(memory_limiter.require_memory(6).await);
857        let tracker1 = memory_limiter.require_memory(3).await;
858        assert_eq!(3, memory_limiter.get_memory_usage());
859        let tracker2 = memory_limiter.require_memory(4).await;
860        assert_eq!(7, memory_limiter.get_memory_usage());
861        let mut future = memory_limiter.require_memory(5).boxed();
862        assert_pending(&mut future).await;
863        assert_eq!(7, memory_limiter.get_memory_usage());
864        drop(tracker1);
865        let tracker3 = future.await;
866        assert_eq!(9, memory_limiter.get_memory_usage());
867        drop(tracker2);
868        assert_eq!(5, memory_limiter.get_memory_usage());
869        drop(tracker3);
870        assert_eq!(0, memory_limiter.get_memory_usage());
871    }
872
873    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
874    async fn test_multi_thread_acquire_memory() {
875        const QUOTA: u64 = 10;
876        let memory_limiter = Arc::new(MemoryLimiter::new(200));
877        let mut handles = vec![];
878        for _ in 0..40 {
879            let limiter = memory_limiter.clone();
880            let h = tokio::spawn(async move {
881                let mut buffers = vec![];
882                let mut current_buffer_usage = random_range(2..=9);
883                for _ in 0..1000 {
884                    if buffers.len() < current_buffer_usage
885                        && let Some(tracker) = limiter.try_require_memory(QUOTA)
886                    {
887                        buffers.push(tracker);
888                    } else {
889                        buffers.clear();
890                        current_buffer_usage = random_range(2..=9);
891                        let req = limiter.require_memory(QUOTA);
892                        match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
893                            Ok(tracker) => {
894                                buffers.push(tracker);
895                            }
896                            Err(_) => {
897                                continue;
898                            }
899                        }
900                    }
901                    let sleep_time = random_range(1..=3);
902                    tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
903                }
904            });
905            handles.push(h);
906        }
907        let h = join_all(handles);
908        let _ = h.await;
909    }
910}