risingwave_storage/hummock/
utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::backtrace::Backtrace;
16use std::cmp::Ordering;
17use std::collections::VecDeque;
18use std::fmt::{Debug, Formatter};
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
22use std::sync::{Arc, LazyLock};
23use std::time::{Duration, Instant};
24
25use bytes::Bytes;
26use foyer::Hint;
27use futures::{Stream, StreamExt, pin_mut};
28use parking_lot::Mutex;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::StorageMemoryConfig;
31use risingwave_common::log::LogSuppressor;
32use risingwave_expr::codegen::try_stream;
33use risingwave_hummock_sdk::can_concat;
34use risingwave_hummock_sdk::compaction_group::StateTableId;
35use risingwave_hummock_sdk::key::{
36    EmptySliceRef, FullKey, TableKey, UserKey, bound_table_key_range,
37};
38use risingwave_hummock_sdk::sstable_info::SstableInfo;
39use tokio::sync::oneshot::{Receiver, Sender, channel};
40
41use super::{HummockError, HummockResult, SstableStoreRef};
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::hummock::local_version::pinned_version::PinnedVersion;
45use crate::mem_table::{KeyOp, MemTableError};
46use crate::monitor::MemoryCollector;
47use crate::store::{
48    OpConsistencyLevel, ReadOptions, StateStoreGet, StateStoreKeyedRow, StateStoreRead,
49};
50
51pub fn range_overlap<R, B>(
52    search_key_range: &R,
53    inclusive_start_key: &B,
54    end_key: Bound<&B>,
55) -> bool
56where
57    R: RangeBounds<B>,
58    B: Ord,
59{
60    let (start_bound, end_bound) = (search_key_range.start_bound(), search_key_range.end_bound());
61
62    //        RANGE
63    // TABLE
64    let too_left = match (start_bound, end_key) {
65        (Included(range_start), Included(inclusive_end_key)) => range_start > inclusive_end_key,
66        (Included(range_start), Excluded(end_key))
67        | (Excluded(range_start), Included(end_key))
68        | (Excluded(range_start), Excluded(end_key)) => range_start >= end_key,
69        (Unbounded, _) | (_, Unbounded) => false,
70    };
71    // RANGE
72    //        TABLE
73    let too_right = match end_bound {
74        Included(range_end) => range_end < inclusive_start_key,
75        Excluded(range_end) => range_end <= inclusive_start_key,
76        Unbounded => false,
77    };
78
79    !too_left && !too_right
80}
81
82pub fn filter_single_sst<R, B>(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool
83where
84    R: RangeBounds<TableKey<B>>,
85    B: AsRef<[u8]> + EmptySliceRef,
86{
87    debug_assert!(info.table_ids.is_sorted());
88    let table_range = &info.key_range;
89    let table_start = FullKey::decode(table_range.left.as_ref()).user_key;
90    let table_end = FullKey::decode(table_range.right.as_ref()).user_key;
91    let (left, right) = bound_table_key_range(table_id, table_key_range);
92    let left: Bound<UserKey<&[u8]>> = left.as_ref().map(|key| key.as_ref());
93    let right: Bound<UserKey<&[u8]>> = right.as_ref().map(|key| key.as_ref());
94
95    info.table_ids.binary_search(&table_id).is_ok()
96        && range_overlap(
97            &(left, right),
98            &table_start,
99            if table_range.right_exclusive {
100                Bound::Excluded(&table_end)
101            } else {
102                Bound::Included(&table_end)
103            },
104        )
105}
106
107/// Search the SST containing the specified key within a level, using binary search.
108pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize {
109    ssts.partition_point(|table| {
110        let ord = FullKey::decode(&table.key_range.left).user_key.cmp(&key);
111        ord == Ordering::Less || ord == Ordering::Equal
112    })
113}
114
115/// Prune overlapping SSTs that does not overlap with a specific key range or does not overlap with
116/// a specific table id. Returns the sst ids after pruning.
117pub fn prune_overlapping_ssts<'a, R, B>(
118    ssts: &'a [SstableInfo],
119    table_id: TableId,
120    table_key_range: &'a R,
121) -> impl DoubleEndedIterator<Item = &'a SstableInfo>
122where
123    R: RangeBounds<TableKey<B>>,
124    B: AsRef<[u8]> + EmptySliceRef,
125{
126    ssts.iter()
127        .filter(move |info| filter_single_sst(info, table_id, table_key_range))
128}
129
130/// Prune non-overlapping SSTs that does not overlap with a specific key range or does not overlap
131/// with a specific table id. Returns the sst ids after pruning.
132#[allow(clippy::type_complexity)]
133pub fn prune_nonoverlapping_ssts<'a>(
134    ssts: &'a [SstableInfo],
135    user_key_range: (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>),
136    table_id: StateTableId,
137) -> impl DoubleEndedIterator<Item = &'a SstableInfo> {
138    debug_assert!(can_concat(ssts));
139    let start_table_idx = match user_key_range.0 {
140        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
141        _ => 0,
142    };
143    let end_table_idx = match user_key_range.1 {
144        Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
145        _ => ssts.len().saturating_sub(1),
146    };
147    ssts[start_table_idx..=end_table_idx]
148        .iter()
149        .filter(move |sst| sst.table_ids.binary_search(&table_id).is_ok())
150}
151
152type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
153enum MemoryRequest {
154    Ready(MemoryTracker),
155    Pending(Receiver<MemoryTracker>),
156}
157
158struct MemoryLimiterInner {
159    total_size: AtomicU64,
160    controller: Mutex<RequestQueue>,
161    has_waiter: AtomicBool,
162    quota: u64,
163}
164
165impl MemoryLimiterInner {
166    fn release_quota(&self, quota: u64) {
167        self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
168    }
169
170    fn add_memory(&self, quota: u64) {
171        self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
172    }
173
174    fn may_notify_waiters(self: &Arc<Self>) {
175        // check `has_waiter` to avoid access lock every times drop `MemoryTracker`.
176        if !self.has_waiter.load(AtomicOrdering::Acquire) {
177            return;
178        }
179        let mut notify_waiters = vec![];
180        {
181            let mut waiters = self.controller.lock();
182            while let Some((_, quota)) = waiters.front() {
183                if !self.try_require_memory(*quota) {
184                    break;
185                }
186                let (tx, quota) = waiters.pop_front().unwrap();
187                notify_waiters.push((tx, quota));
188            }
189
190            if waiters.is_empty() {
191                self.has_waiter.store(false, AtomicOrdering::Release);
192            }
193        }
194
195        for (tx, quota) in notify_waiters {
196            let _ = tx.send(MemoryTracker::new(self.clone(), quota));
197        }
198    }
199
200    fn try_require_memory(&self, quota: u64) -> bool {
201        let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
202        while self.permit_quota(current_quota, quota) {
203            match self.total_size.compare_exchange(
204                current_quota,
205                current_quota + quota,
206                AtomicOrdering::SeqCst,
207                AtomicOrdering::SeqCst,
208            ) {
209                Ok(_) => {
210                    return true;
211                }
212                Err(old_quota) => {
213                    current_quota = old_quota;
214                }
215            }
216        }
217        false
218    }
219
220    fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
221        let mut waiters = self.controller.lock();
222        let first_req = waiters.is_empty();
223        if first_req {
224            // When this request is the first waiter but the previous `MemoryTracker` is just release a large quota, it may skip notifying this waiter because it has checked `has_waiter` and found it was false. So we must set it one and retry `try_require_memory` again to avoid deadlock.
225            self.has_waiter.store(true, AtomicOrdering::Release);
226        }
227        // We must require again with lock because some other MemoryTracker may drop just after this thread gets mutex but before it enters queue.
228        if self.try_require_memory(quota) {
229            if first_req {
230                self.has_waiter.store(false, AtomicOrdering::Release);
231            }
232            return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
233        }
234        let (tx, rx) = channel();
235        waiters.push_back((tx, quota));
236        MemoryRequest::Pending(rx)
237    }
238
239    fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
240        current_quota <= self.quota
241    }
242}
243
244pub struct MemoryLimiter {
245    inner: Arc<MemoryLimiterInner>,
246}
247
248impl Debug for MemoryLimiter {
249    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
250        f.debug_struct("MemoryLimiter")
251            .field("quota", &self.inner.quota)
252            .field("usage", &self.inner.total_size)
253            .finish()
254    }
255}
256
257pub struct MemoryTracker {
258    limiter: Arc<MemoryLimiterInner>,
259    quota: Option<u64>,
260}
261impl MemoryTracker {
262    fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
263        Self {
264            limiter,
265            quota: Some(quota),
266        }
267    }
268}
269
270impl Debug for MemoryTracker {
271    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
272        f.debug_struct("MemoryTracker")
273            .field("quota", &self.quota)
274            .finish()
275    }
276}
277
278impl MemoryLimiter {
279    pub fn unlimit() -> Arc<Self> {
280        Arc::new(Self::new(u64::MAX))
281    }
282
283    pub fn new(quota: u64) -> Self {
284        Self {
285            inner: Arc::new(MemoryLimiterInner {
286                total_size: AtomicU64::new(0),
287                controller: Mutex::new(VecDeque::default()),
288                has_waiter: AtomicBool::new(false),
289                quota,
290            }),
291        }
292    }
293
294    pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
295        if self.inner.try_require_memory(quota) {
296            Some(MemoryTracker::new(self.inner.clone(), quota))
297        } else {
298            None
299        }
300    }
301
302    pub fn get_memory_usage(&self) -> u64 {
303        self.inner.total_size.load(AtomicOrdering::Acquire)
304    }
305
306    pub fn quota(&self) -> u64 {
307        self.inner.quota
308    }
309
310    pub fn must_require_memory(&self, quota: u64) -> MemoryTracker {
311        if !self.inner.try_require_memory(quota) {
312            self.inner.add_memory(quota);
313        }
314
315        MemoryTracker::new(self.inner.clone(), quota)
316    }
317}
318
319impl MemoryLimiter {
320    pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
321        match self.inner.require_memory(quota) {
322            MemoryRequest::Ready(tracker) => tracker,
323            MemoryRequest::Pending(rx) => rx.await.unwrap(),
324        }
325    }
326}
327
328impl MemoryTracker {
329    pub fn try_increase_memory(&mut self, target: u64) -> bool {
330        let quota = self.quota.unwrap();
331        if quota >= target {
332            return true;
333        }
334        if self.limiter.try_require_memory(target - quota) {
335            self.quota = Some(target);
336            true
337        } else {
338            false
339        }
340    }
341}
342
343// We must notify waiters outside `MemoryTracker` to avoid dead-lock and loop-owner.
344impl Drop for MemoryTracker {
345    fn drop(&mut self) {
346        if let Some(quota) = self.quota.take() {
347            self.limiter.release_quota(quota);
348            self.limiter.may_notify_waiters();
349        }
350    }
351}
352
353/// Check whether the items in `sub_iter` is a subset of the items in `full_iter`, and meanwhile
354/// preserve the order.
355pub fn check_subset_preserve_order<T: Eq>(
356    sub_iter: impl Iterator<Item = T>,
357    mut full_iter: impl Iterator<Item = T>,
358) -> bool {
359    for sub_iter_item in sub_iter {
360        let mut found = false;
361        for full_iter_item in full_iter.by_ref() {
362            if sub_iter_item == full_iter_item {
363                found = true;
364                break;
365            }
366        }
367        if !found {
368            return false;
369        }
370    }
371    true
372}
373
374static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));
375
376/// This function is intended to be called during compute node initialization if the storage
377/// sanity check is not desired. This controls a global flag so only need to be called once
378/// if need to disable the sanity check.
379pub fn disable_sanity_check() {
380    SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
381}
382
383pub(crate) fn sanity_check_enabled() -> bool {
384    SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
385}
386
387async fn get_from_state_store(
388    state_store: &impl StateStoreGet,
389    key: TableKey<Bytes>,
390    read_options: ReadOptions,
391) -> StorageResult<Option<Bytes>> {
392    state_store
393        .on_key_value(key, read_options, |_, value| {
394            Ok(Bytes::copy_from_slice(value))
395        })
396        .await
397}
398
399/// Make sure the key to insert should not exist in storage.
400pub(crate) async fn do_insert_sanity_check(
401    table_id: TableId,
402    key: &TableKey<Bytes>,
403    value: &Bytes,
404    inner: &impl StateStoreRead,
405    op_consistency_level: &OpConsistencyLevel,
406) -> StorageResult<()> {
407    if let OpConsistencyLevel::Inconsistent = op_consistency_level {
408        return Ok(());
409    }
410    let read_options = ReadOptions {
411        cache_policy: CachePolicy::Fill(Hint::Normal),
412        ..Default::default()
413    };
414    let stored_value = get_from_state_store(inner, key.clone(), read_options).await?;
415
416    if let Some(stored_value) = stored_value {
417        return Err(Box::new(MemTableError::InconsistentOperation {
418            table_id,
419            key: key.clone(),
420            prev: KeyOp::Insert(stored_value),
421            new: KeyOp::Insert(value.clone()),
422        })
423        .into());
424    }
425    Ok(())
426}
427
428/// Make sure that the key to delete should exist in storage and the value should be matched.
429pub(crate) async fn do_delete_sanity_check(
430    table_id: TableId,
431    key: &TableKey<Bytes>,
432    old_value: &Bytes,
433    inner: &impl StateStoreRead,
434    op_consistency_level: &OpConsistencyLevel,
435) -> StorageResult<()> {
436    let OpConsistencyLevel::ConsistentOldValue {
437        check_old_value: old_value_checker,
438        ..
439    } = op_consistency_level
440    else {
441        return Ok(());
442    };
443    let read_options = ReadOptions {
444        cache_policy: CachePolicy::Fill(Hint::Normal),
445        ..Default::default()
446    };
447    match get_from_state_store(inner, key.clone(), read_options).await? {
448        None => Err(Box::new(MemTableError::InconsistentOperation {
449            table_id,
450            key: key.clone(),
451            prev: KeyOp::Delete(Bytes::default()),
452            new: KeyOp::Delete(old_value.clone()),
453        })
454        .into()),
455        Some(stored_value) => {
456            if !old_value_checker(&stored_value, old_value) {
457                Err(Box::new(MemTableError::InconsistentOperation {
458                    table_id,
459                    key: key.clone(),
460                    prev: KeyOp::Insert(stored_value),
461                    new: KeyOp::Delete(old_value.clone()),
462                })
463                .into())
464            } else {
465                Ok(())
466            }
467        }
468    }
469}
470
471/// Make sure that the key to update should exist in storage and the value should be matched
472pub(crate) async fn do_update_sanity_check(
473    table_id: TableId,
474    key: &TableKey<Bytes>,
475    old_value: &Bytes,
476    new_value: &Bytes,
477    inner: &impl StateStoreRead,
478    op_consistency_level: &OpConsistencyLevel,
479) -> StorageResult<()> {
480    let OpConsistencyLevel::ConsistentOldValue {
481        check_old_value: old_value_checker,
482        ..
483    } = op_consistency_level
484    else {
485        return Ok(());
486    };
487    let read_options = ReadOptions {
488        cache_policy: CachePolicy::Fill(Hint::Normal),
489        ..Default::default()
490    };
491
492    match get_from_state_store(inner, key.clone(), read_options).await? {
493        None => Err(Box::new(MemTableError::InconsistentOperation {
494            table_id,
495            key: key.clone(),
496            prev: KeyOp::Delete(Bytes::default()),
497            new: KeyOp::Update((old_value.clone(), new_value.clone())),
498        })
499        .into()),
500        Some(stored_value) => {
501            if !old_value_checker(&stored_value, old_value) {
502                Err(Box::new(MemTableError::InconsistentOperation {
503                    table_id,
504                    key: key.clone(),
505                    prev: KeyOp::Insert(stored_value),
506                    new: KeyOp::Update((old_value.clone(), new_value.clone())),
507                })
508                .into())
509            } else {
510                Ok(())
511            }
512        }
513    }
514}
515
516pub fn cmp_delete_range_left_bounds(a: Bound<&Bytes>, b: Bound<&Bytes>) -> Ordering {
517    match (a, b) {
518        // only right bound of delete range can be `Unbounded`
519        (Unbounded, _) | (_, Unbounded) => unreachable!(),
520        (Included(x), Included(y)) | (Excluded(x), Excluded(y)) => x.cmp(y),
521        (Included(x), Excluded(y)) => x.cmp(y).then(Ordering::Less),
522        (Excluded(x), Included(y)) => x.cmp(y).then(Ordering::Greater),
523    }
524}
525
526pub(crate) fn validate_delete_range(left: &Bound<Bytes>, right: &Bound<Bytes>) -> bool {
527    match (left, right) {
528        // only right bound of delete range can be `Unbounded`
529        (Unbounded, _) => unreachable!(),
530        (_, Unbounded) => true,
531        (Included(x), Included(y)) => x <= y,
532        (Included(x), Excluded(y)) | (Excluded(x), Included(y)) | (Excluded(x), Excluded(y)) => {
533            x < y
534        }
535    }
536}
537
538#[expect(dead_code)]
539pub(crate) fn filter_with_delete_range<'a>(
540    kv_iter: impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a,
541    mut delete_ranges_iter: impl Iterator<Item = &'a (Bound<Bytes>, Bound<Bytes>)> + 'a,
542) -> impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a {
543    let mut range = delete_ranges_iter.next();
544    if let Some((range_start, range_end)) = range {
545        assert!(
546            validate_delete_range(range_start, range_end),
547            "range_end {:?} smaller than range_start {:?}",
548            range_start,
549            range_end
550        );
551    }
552    kv_iter.filter(move |(key, _)| {
553        if let Some(range_bound) = range {
554            if cmp_delete_range_left_bounds(Included(&key.0), range_bound.0.as_ref())
555                == Ordering::Less
556            {
557                true
558            } else if range_bound.contains(key.as_ref()) {
559                false
560            } else {
561                // Key has exceeded the current key range. Advance to the next range.
562                loop {
563                    range = delete_ranges_iter.next();
564                    if let Some(range_bound) = range {
565                        assert!(
566                            validate_delete_range(&range_bound.0, &range_bound.1),
567                            "range_end {:?} smaller than range_start {:?}",
568                            range_bound.0,
569                            range_bound.1
570                        );
571                        if cmp_delete_range_left_bounds(Included(key), range_bound.0.as_ref())
572                            == Ordering::Less
573                        {
574                            // Not fall in the next delete range
575                            break true;
576                        } else if range_bound.contains(key.as_ref()) {
577                            // Fall in the next delete range
578                            break false;
579                        } else {
580                            // Exceed the next delete range. Go to the next delete range if there is
581                            // any in the next loop
582                            continue;
583                        }
584                    } else {
585                        // No more delete range.
586                        break true;
587                    }
588                }
589            }
590        } else {
591            true
592        }
593    })
594}
595
596/// Wait for the `committed_epoch` of `table_id` to reach `wait_epoch`.
597///
598/// When the `table_id` does not exist in the latest version, we assume that
599/// the table is not created yet, and will wait until the table is created.
600pub(crate) async fn wait_for_epoch(
601    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
602    wait_epoch: u64,
603    table_id: TableId,
604) -> StorageResult<PinnedVersion> {
605    let mut prev_committed_epoch = None;
606    let prev_committed_epoch = &mut prev_committed_epoch;
607    let version = wait_for_update(
608        notifier,
609        |version| {
610            let committed_epoch = version.table_committed_epoch(table_id);
611            let ret = if let Some(committed_epoch) = committed_epoch {
612                if committed_epoch >= wait_epoch {
613                    Ok(true)
614                } else {
615                    Ok(false)
616                }
617            } else if prev_committed_epoch.is_none() {
618                Ok(false)
619            } else {
620                Err(HummockError::wait_epoch(format!(
621                    "table {} has been dropped",
622                    table_id
623                )))
624            };
625            *prev_committed_epoch = committed_epoch;
626            ret
627        },
628        || {
629            format!(
630                "wait_for_epoch: epoch: {}, table_id: {}",
631                wait_epoch, table_id
632            )
633        },
634    )
635    .await?;
636    Ok(version)
637}
638
639pub(crate) async fn wait_for_update(
640    notifier: &tokio::sync::watch::Sender<PinnedVersion>,
641    mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult<bool>,
642    mut periodic_debug_info: impl FnMut() -> String,
643) -> HummockResult<PinnedVersion> {
644    let mut receiver = notifier.subscribe();
645    {
646        let version = receiver.borrow_and_update();
647        if inspect_fn(&version)? {
648            return Ok(version.clone());
649        }
650    }
651    let start_time = Instant::now();
652    loop {
653        match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
654            Err(_) => {
655                static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
656                    LazyLock::new(|| LogSuppressor::per_minute(1));
657                // The reason that we need to retry here is batch scan in
658                // chain/rearrange_chain is waiting for an
659                // uncommitted epoch carried by the CreateMV barrier, which
660                // can take unbounded time to become committed and propagate
661                // to the CN. We should consider removing the retry as well as wait_epoch
662                // for chain/rearrange_chain if we enforce
663                // chain/rearrange_chain to be scheduled on the same
664                // CN with the same distribution as the upstream MV.
665                // See #3845 for more details.
666                if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
667                    // Provide backtrace iff in debug mode for observability.
668                    let backtrace = cfg!(debug_assertions).then(Backtrace::capture);
669
670                    let backtrace = backtrace
671                        .as_ref()
672                        .map(|bt| ShortBacktrace { bt, limit: 30 })
673                        .map(tracing::field::display);
674
675                    struct ShortBacktrace<'a> {
676                        bt: &'a Backtrace,
677                        limit: usize,
678                    }
679
680                    impl std::fmt::Display for ShortBacktrace<'_> {
681                        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682                            writeln!(f, "backtrace:")?;
683                            for (idx, frame) in self.bt.frames().iter().take(self.limit).enumerate()
684                            {
685                                writeln!(f, "{}: {:?}", idx, frame)?;
686                            }
687                            Ok(())
688                        }
689                    }
690                    tracing::warn!(
691                        suppressed_count,
692                        info = periodic_debug_info(),
693                        elapsed = ?start_time.elapsed(),
694                        backtrace,
695                        "timeout when waiting for version update",
696                    );
697                }
698                continue;
699            }
700            Ok(Err(_)) => {
701                return Err(HummockError::wait_epoch("tx dropped"));
702            }
703            Ok(Ok(_)) => {
704                let version = receiver.borrow_and_update();
705                if inspect_fn(&version)? {
706                    return Ok(version.clone());
707                }
708            }
709        }
710    }
711}
712
713pub struct HummockMemoryCollector {
714    sstable_store: SstableStoreRef,
715    limiter: Arc<MemoryLimiter>,
716    storage_memory_config: StorageMemoryConfig,
717}
718
719impl HummockMemoryCollector {
720    pub fn new(
721        sstable_store: SstableStoreRef,
722        limiter: Arc<MemoryLimiter>,
723        storage_memory_config: StorageMemoryConfig,
724    ) -> Self {
725        Self {
726            sstable_store,
727            limiter,
728            storage_memory_config,
729        }
730    }
731}
732
733impl MemoryCollector for HummockMemoryCollector {
734    fn get_meta_memory_usage(&self) -> u64 {
735        self.sstable_store.meta_cache().memory().usage() as _
736    }
737
738    fn get_data_memory_usage(&self) -> u64 {
739        self.sstable_store.block_cache().memory().usage() as _
740    }
741
742    fn get_vector_meta_memory_usage(&self) -> u64 {
743        self.sstable_store.vector_meta_cache.usage() as _
744    }
745
746    fn get_vector_data_memory_usage(&self) -> u64 {
747        self.sstable_store.vector_block_cache.usage() as _
748    }
749
750    fn get_uploading_memory_usage(&self) -> u64 {
751        self.limiter.get_memory_usage()
752    }
753
754    fn get_prefetch_memory_usage(&self) -> usize {
755        self.sstable_store.get_prefetch_memory_usage()
756    }
757
758    fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
759        self.sstable_store.meta_cache().memory().usage() as f64
760            / self.sstable_store.meta_cache().memory().capacity() as f64
761    }
762
763    fn get_block_cache_memory_usage_ratio(&self) -> f64 {
764        self.sstable_store.block_cache().memory().usage() as f64
765            / self.sstable_store.block_cache().memory().capacity() as f64
766    }
767
768    fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64 {
769        self.sstable_store.vector_meta_cache.usage() as f64
770            / self.sstable_store.vector_meta_cache.capacity() as f64
771    }
772
773    fn get_vector_data_cache_memory_usage_ratio(&self) -> f64 {
774        self.sstable_store.vector_block_cache.usage() as f64
775            / self.sstable_store.vector_block_cache.capacity() as f64
776    }
777
778    fn get_shared_buffer_usage_ratio(&self) -> f64 {
779        self.limiter.get_memory_usage() as f64
780            / (self.storage_memory_config.shared_buffer_capacity_mb * 1024 * 1024) as f64
781    }
782}
783
784#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
785pub(crate) async fn merge_stream<'a>(
786    mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
787    inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
788    table_id: TableId,
789    epoch: u64,
790    rev: bool,
791) {
792    let inner_stream = inner_stream.peekable();
793    pin_mut!(inner_stream);
794
795    let mut mem_table_iter = mem_table_iter.fuse().peekable();
796
797    loop {
798        match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
799            (None, None) => break,
800            // The mem table side has come to an end, return data from the shared storage.
801            (Some(_), None) => {
802                let (key, value) = inner_stream.next().await.unwrap()?;
803                yield (key, value)
804            }
805            // The stream side has come to an end, return data from the mem table.
806            (None, Some(_)) => {
807                let (key, key_op) = mem_table_iter.next().unwrap();
808                match key_op {
809                    KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
810                        yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
811                    }
812                    _ => {}
813                }
814            }
815            (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
816                debug_assert_eq!(inner_key.user_key.table_id, table_id);
817                let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
818                if rev {
819                    ret = ret.reverse();
820                }
821                match ret {
822                    Ordering::Less => {
823                        // yield data from storage
824                        let (key, value) = inner_stream.next().await.unwrap()?;
825                        yield (key, value);
826                    }
827                    Ordering::Equal => {
828                        // both memtable and storage contain the key, so we advance both
829                        // iterators and return the data in memory.
830
831                        let (_, key_op) = mem_table_iter.next().unwrap();
832                        let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
833                        match key_op {
834                            KeyOp::Insert(value) => {
835                                yield (key.clone(), value.clone());
836                            }
837                            KeyOp::Delete(_) => {}
838                            KeyOp::Update((old_value, new_value)) => {
839                                debug_assert!(old_value == &old_value_in_inner);
840
841                                yield (key, new_value.clone());
842                            }
843                        }
844                    }
845                    Ordering::Greater => {
846                        // yield data from mem table
847                        let (key, key_op) = mem_table_iter.next().unwrap();
848
849                        match key_op {
850                            KeyOp::Insert(value) => {
851                                yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
852                            }
853                            KeyOp::Delete(_) => {}
854                            KeyOp::Update(_) => unreachable!(
855                                "memtable update should always be paired with a storage key"
856                            ),
857                        }
858                    }
859                }
860            }
861            (Some(Err(_)), Some(_)) => {
862                // Throw the error.
863                return Err(inner_stream.next().await.unwrap().unwrap_err());
864            }
865        }
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use std::future::{Future, poll_fn};
872    use std::sync::Arc;
873    use std::task::Poll;
874
875    use futures::FutureExt;
876    use futures::future::join_all;
877    use rand::random_range;
878
879    use crate::hummock::utils::MemoryLimiter;
880
881    async fn assert_pending(future: &mut (impl Future + Unpin)) {
882        for _ in 0..10 {
883            assert!(
884                poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
885                    .await
886                    .is_pending()
887            );
888        }
889    }
890
891    #[tokio::test]
892    async fn test_loose_memory_limiter() {
893        let quota = 5;
894        let memory_limiter = MemoryLimiter::new(quota);
895        drop(memory_limiter.require_memory(6).await);
896        let tracker1 = memory_limiter.require_memory(3).await;
897        assert_eq!(3, memory_limiter.get_memory_usage());
898        let tracker2 = memory_limiter.require_memory(4).await;
899        assert_eq!(7, memory_limiter.get_memory_usage());
900        let mut future = memory_limiter.require_memory(5).boxed();
901        assert_pending(&mut future).await;
902        assert_eq!(7, memory_limiter.get_memory_usage());
903        drop(tracker1);
904        let tracker3 = future.await;
905        assert_eq!(9, memory_limiter.get_memory_usage());
906        drop(tracker2);
907        assert_eq!(5, memory_limiter.get_memory_usage());
908        drop(tracker3);
909        assert_eq!(0, memory_limiter.get_memory_usage());
910    }
911
912    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
913    async fn test_multi_thread_acquire_memory() {
914        const QUOTA: u64 = 10;
915        let memory_limiter = Arc::new(MemoryLimiter::new(200));
916        let mut handles = vec![];
917        for _ in 0..40 {
918            let limiter = memory_limiter.clone();
919            let h = tokio::spawn(async move {
920                let mut buffers = vec![];
921                let mut current_buffer_usage = random_range(2..=9);
922                for _ in 0..1000 {
923                    if buffers.len() < current_buffer_usage
924                        && let Some(tracker) = limiter.try_require_memory(QUOTA)
925                    {
926                        buffers.push(tracker);
927                    } else {
928                        buffers.clear();
929                        current_buffer_usage = random_range(2..=9);
930                        let req = limiter.require_memory(QUOTA);
931                        match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
932                            Ok(tracker) => {
933                                buffers.push(tracker);
934                            }
935                            Err(_) => {
936                                continue;
937                            }
938                        }
939                    }
940                    let sleep_time = random_range(1..=3);
941                    tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
942                }
943            });
944            handles.push(h);
945        }
946        let h = join_all(handles);
947        let _ = h.await;
948    }
949}