1use std::backtrace::Backtrace;
16use std::cmp::Ordering;
17use std::collections::VecDeque;
18use std::fmt::{Debug, Formatter};
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::ops::{Bound, RangeBounds};
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
22use std::sync::{Arc, LazyLock};
23use std::time::{Duration, Instant};
24
25use bytes::Bytes;
26use foyer::Hint;
27use futures::{Stream, StreamExt, pin_mut};
28use parking_lot::Mutex;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::StorageMemoryConfig;
31use risingwave_common::log::LogSuppressor;
32use risingwave_expr::codegen::try_stream;
33use risingwave_hummock_sdk::can_concat;
34use risingwave_hummock_sdk::compaction_group::StateTableId;
35use risingwave_hummock_sdk::key::{
36 EmptySliceRef, FullKey, TableKey, UserKey, bound_table_key_range,
37};
38use risingwave_hummock_sdk::sstable_info::SstableInfo;
39use tokio::sync::oneshot::{Receiver, Sender, channel};
40
41use super::{HummockError, HummockResult, SstableStoreRef};
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::hummock::local_version::pinned_version::PinnedVersion;
45use crate::mem_table::{KeyOp, MemTableError};
46use crate::monitor::MemoryCollector;
47use crate::store::{
48 OpConsistencyLevel, ReadOptions, StateStoreGet, StateStoreKeyedRow, StateStoreRead,
49};
50
51pub fn range_overlap<R, B>(
52 search_key_range: &R,
53 inclusive_start_key: &B,
54 end_key: Bound<&B>,
55) -> bool
56where
57 R: RangeBounds<B>,
58 B: Ord,
59{
60 let (start_bound, end_bound) = (search_key_range.start_bound(), search_key_range.end_bound());
61
62 let too_left = match (start_bound, end_key) {
65 (Included(range_start), Included(inclusive_end_key)) => range_start > inclusive_end_key,
66 (Included(range_start), Excluded(end_key))
67 | (Excluded(range_start), Included(end_key))
68 | (Excluded(range_start), Excluded(end_key)) => range_start >= end_key,
69 (Unbounded, _) | (_, Unbounded) => false,
70 };
71 let too_right = match end_bound {
74 Included(range_end) => range_end < inclusive_start_key,
75 Excluded(range_end) => range_end <= inclusive_start_key,
76 Unbounded => false,
77 };
78
79 !too_left && !too_right
80}
81
82pub fn filter_single_sst<R, B>(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool
83where
84 R: RangeBounds<TableKey<B>>,
85 B: AsRef<[u8]> + EmptySliceRef,
86{
87 debug_assert!(info.table_ids.is_sorted());
88 let table_range = &info.key_range;
89 let table_start = FullKey::decode(table_range.left.as_ref()).user_key;
90 let table_end = FullKey::decode(table_range.right.as_ref()).user_key;
91 let (left, right) = bound_table_key_range(table_id, table_key_range);
92 let left: Bound<UserKey<&[u8]>> = left.as_ref().map(|key| key.as_ref());
93 let right: Bound<UserKey<&[u8]>> = right.as_ref().map(|key| key.as_ref());
94
95 info.table_ids.binary_search(&table_id).is_ok()
96 && range_overlap(
97 &(left, right),
98 &table_start,
99 if table_range.right_exclusive {
100 Bound::Excluded(&table_end)
101 } else {
102 Bound::Included(&table_end)
103 },
104 )
105}
106
107pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize {
109 ssts.partition_point(|table| {
110 let ord = FullKey::decode(&table.key_range.left).user_key.cmp(&key);
111 ord == Ordering::Less || ord == Ordering::Equal
112 })
113}
114
115pub fn prune_overlapping_ssts<'a, R, B>(
118 ssts: &'a [SstableInfo],
119 table_id: TableId,
120 table_key_range: &'a R,
121) -> impl DoubleEndedIterator<Item = &'a SstableInfo>
122where
123 R: RangeBounds<TableKey<B>>,
124 B: AsRef<[u8]> + EmptySliceRef,
125{
126 ssts.iter()
127 .filter(move |info| filter_single_sst(info, table_id, table_key_range))
128}
129
130#[allow(clippy::type_complexity)]
133pub fn prune_nonoverlapping_ssts<'a>(
134 ssts: &'a [SstableInfo],
135 user_key_range: (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>),
136 table_id: StateTableId,
137) -> impl DoubleEndedIterator<Item = &'a SstableInfo> {
138 debug_assert!(can_concat(ssts));
139 let start_table_idx = match user_key_range.0 {
140 Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
141 _ => 0,
142 };
143 let end_table_idx = match user_key_range.1 {
144 Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
145 _ => ssts.len().saturating_sub(1),
146 };
147 ssts[start_table_idx..=end_table_idx]
148 .iter()
149 .filter(move |sst| sst.table_ids.binary_search(&table_id).is_ok())
150}
151
152type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
153enum MemoryRequest {
154 Ready(MemoryTracker),
155 Pending(Receiver<MemoryTracker>),
156}
157
158struct MemoryLimiterInner {
159 total_size: AtomicU64,
160 controller: Mutex<RequestQueue>,
161 has_waiter: AtomicBool,
162 quota: u64,
163}
164
165impl MemoryLimiterInner {
166 fn release_quota(&self, quota: u64) {
167 self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
168 }
169
170 fn add_memory(&self, quota: u64) {
171 self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
172 }
173
174 fn may_notify_waiters(self: &Arc<Self>) {
175 if !self.has_waiter.load(AtomicOrdering::Acquire) {
177 return;
178 }
179 let mut notify_waiters = vec![];
180 {
181 let mut waiters = self.controller.lock();
182 while let Some((_, quota)) = waiters.front() {
183 if !self.try_require_memory(*quota) {
184 break;
185 }
186 let (tx, quota) = waiters.pop_front().unwrap();
187 notify_waiters.push((tx, quota));
188 }
189
190 if waiters.is_empty() {
191 self.has_waiter.store(false, AtomicOrdering::Release);
192 }
193 }
194
195 for (tx, quota) in notify_waiters {
196 let _ = tx.send(MemoryTracker::new(self.clone(), quota));
197 }
198 }
199
200 fn try_require_memory(&self, quota: u64) -> bool {
201 let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
202 while self.permit_quota(current_quota, quota) {
203 match self.total_size.compare_exchange(
204 current_quota,
205 current_quota + quota,
206 AtomicOrdering::SeqCst,
207 AtomicOrdering::SeqCst,
208 ) {
209 Ok(_) => {
210 return true;
211 }
212 Err(old_quota) => {
213 current_quota = old_quota;
214 }
215 }
216 }
217 false
218 }
219
220 fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
221 let mut waiters = self.controller.lock();
222 let first_req = waiters.is_empty();
223 if first_req {
224 self.has_waiter.store(true, AtomicOrdering::Release);
226 }
227 if self.try_require_memory(quota) {
229 if first_req {
230 self.has_waiter.store(false, AtomicOrdering::Release);
231 }
232 return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
233 }
234 let (tx, rx) = channel();
235 waiters.push_back((tx, quota));
236 MemoryRequest::Pending(rx)
237 }
238
239 fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
240 current_quota <= self.quota
241 }
242}
243
244pub struct MemoryLimiter {
245 inner: Arc<MemoryLimiterInner>,
246}
247
248impl Debug for MemoryLimiter {
249 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
250 f.debug_struct("MemoryLimiter")
251 .field("quota", &self.inner.quota)
252 .field("usage", &self.inner.total_size)
253 .finish()
254 }
255}
256
257pub struct MemoryTracker {
258 limiter: Arc<MemoryLimiterInner>,
259 quota: Option<u64>,
260}
261impl MemoryTracker {
262 fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
263 Self {
264 limiter,
265 quota: Some(quota),
266 }
267 }
268}
269
270impl Debug for MemoryTracker {
271 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
272 f.debug_struct("MemoryTracker")
273 .field("quota", &self.quota)
274 .finish()
275 }
276}
277
278impl MemoryLimiter {
279 pub fn unlimit() -> Arc<Self> {
280 Arc::new(Self::new(u64::MAX))
281 }
282
283 pub fn new(quota: u64) -> Self {
284 Self {
285 inner: Arc::new(MemoryLimiterInner {
286 total_size: AtomicU64::new(0),
287 controller: Mutex::new(VecDeque::default()),
288 has_waiter: AtomicBool::new(false),
289 quota,
290 }),
291 }
292 }
293
294 pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
295 if self.inner.try_require_memory(quota) {
296 Some(MemoryTracker::new(self.inner.clone(), quota))
297 } else {
298 None
299 }
300 }
301
302 pub fn get_memory_usage(&self) -> u64 {
303 self.inner.total_size.load(AtomicOrdering::Acquire)
304 }
305
306 pub fn quota(&self) -> u64 {
307 self.inner.quota
308 }
309
310 pub fn must_require_memory(&self, quota: u64) -> MemoryTracker {
311 if !self.inner.try_require_memory(quota) {
312 self.inner.add_memory(quota);
313 }
314
315 MemoryTracker::new(self.inner.clone(), quota)
316 }
317}
318
319impl MemoryLimiter {
320 pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
321 match self.inner.require_memory(quota) {
322 MemoryRequest::Ready(tracker) => tracker,
323 MemoryRequest::Pending(rx) => rx.await.unwrap(),
324 }
325 }
326}
327
328impl MemoryTracker {
329 pub fn try_increase_memory(&mut self, target: u64) -> bool {
330 let quota = self.quota.unwrap();
331 if quota >= target {
332 return true;
333 }
334 if self.limiter.try_require_memory(target - quota) {
335 self.quota = Some(target);
336 true
337 } else {
338 false
339 }
340 }
341}
342
343impl Drop for MemoryTracker {
345 fn drop(&mut self) {
346 if let Some(quota) = self.quota.take() {
347 self.limiter.release_quota(quota);
348 self.limiter.may_notify_waiters();
349 }
350 }
351}
352
353pub fn check_subset_preserve_order<T: Eq>(
356 sub_iter: impl Iterator<Item = T>,
357 mut full_iter: impl Iterator<Item = T>,
358) -> bool {
359 for sub_iter_item in sub_iter {
360 let mut found = false;
361 for full_iter_item in full_iter.by_ref() {
362 if sub_iter_item == full_iter_item {
363 found = true;
364 break;
365 }
366 }
367 if !found {
368 return false;
369 }
370 }
371 true
372}
373
374static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));
375
376pub fn disable_sanity_check() {
380 SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
381}
382
383pub(crate) fn sanity_check_enabled() -> bool {
384 SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
385}
386
387async fn get_from_state_store(
388 state_store: &impl StateStoreGet,
389 key: TableKey<Bytes>,
390 read_options: ReadOptions,
391) -> StorageResult<Option<Bytes>> {
392 state_store
393 .on_key_value(key, read_options, |_, value| {
394 Ok(Bytes::copy_from_slice(value))
395 })
396 .await
397}
398
399pub(crate) async fn do_insert_sanity_check(
401 table_id: TableId,
402 key: &TableKey<Bytes>,
403 value: &Bytes,
404 inner: &impl StateStoreRead,
405 op_consistency_level: &OpConsistencyLevel,
406) -> StorageResult<()> {
407 if let OpConsistencyLevel::Inconsistent = op_consistency_level {
408 return Ok(());
409 }
410 let read_options = ReadOptions {
411 cache_policy: CachePolicy::Fill(Hint::Normal),
412 ..Default::default()
413 };
414 let stored_value = get_from_state_store(inner, key.clone(), read_options).await?;
415
416 if let Some(stored_value) = stored_value {
417 return Err(Box::new(MemTableError::InconsistentOperation {
418 table_id,
419 key: key.clone(),
420 prev: KeyOp::Insert(stored_value),
421 new: KeyOp::Insert(value.clone()),
422 })
423 .into());
424 }
425 Ok(())
426}
427
428pub(crate) async fn do_delete_sanity_check(
430 table_id: TableId,
431 key: &TableKey<Bytes>,
432 old_value: &Bytes,
433 inner: &impl StateStoreRead,
434 op_consistency_level: &OpConsistencyLevel,
435) -> StorageResult<()> {
436 let OpConsistencyLevel::ConsistentOldValue {
437 check_old_value: old_value_checker,
438 ..
439 } = op_consistency_level
440 else {
441 return Ok(());
442 };
443 let read_options = ReadOptions {
444 cache_policy: CachePolicy::Fill(Hint::Normal),
445 ..Default::default()
446 };
447 match get_from_state_store(inner, key.clone(), read_options).await? {
448 None => Err(Box::new(MemTableError::InconsistentOperation {
449 table_id,
450 key: key.clone(),
451 prev: KeyOp::Delete(Bytes::default()),
452 new: KeyOp::Delete(old_value.clone()),
453 })
454 .into()),
455 Some(stored_value) => {
456 if !old_value_checker(&stored_value, old_value) {
457 Err(Box::new(MemTableError::InconsistentOperation {
458 table_id,
459 key: key.clone(),
460 prev: KeyOp::Insert(stored_value),
461 new: KeyOp::Delete(old_value.clone()),
462 })
463 .into())
464 } else {
465 Ok(())
466 }
467 }
468 }
469}
470
471pub(crate) async fn do_update_sanity_check(
473 table_id: TableId,
474 key: &TableKey<Bytes>,
475 old_value: &Bytes,
476 new_value: &Bytes,
477 inner: &impl StateStoreRead,
478 op_consistency_level: &OpConsistencyLevel,
479) -> StorageResult<()> {
480 let OpConsistencyLevel::ConsistentOldValue {
481 check_old_value: old_value_checker,
482 ..
483 } = op_consistency_level
484 else {
485 return Ok(());
486 };
487 let read_options = ReadOptions {
488 cache_policy: CachePolicy::Fill(Hint::Normal),
489 ..Default::default()
490 };
491
492 match get_from_state_store(inner, key.clone(), read_options).await? {
493 None => Err(Box::new(MemTableError::InconsistentOperation {
494 table_id,
495 key: key.clone(),
496 prev: KeyOp::Delete(Bytes::default()),
497 new: KeyOp::Update((old_value.clone(), new_value.clone())),
498 })
499 .into()),
500 Some(stored_value) => {
501 if !old_value_checker(&stored_value, old_value) {
502 Err(Box::new(MemTableError::InconsistentOperation {
503 table_id,
504 key: key.clone(),
505 prev: KeyOp::Insert(stored_value),
506 new: KeyOp::Update((old_value.clone(), new_value.clone())),
507 })
508 .into())
509 } else {
510 Ok(())
511 }
512 }
513 }
514}
515
516pub fn cmp_delete_range_left_bounds(a: Bound<&Bytes>, b: Bound<&Bytes>) -> Ordering {
517 match (a, b) {
518 (Unbounded, _) | (_, Unbounded) => unreachable!(),
520 (Included(x), Included(y)) | (Excluded(x), Excluded(y)) => x.cmp(y),
521 (Included(x), Excluded(y)) => x.cmp(y).then(Ordering::Less),
522 (Excluded(x), Included(y)) => x.cmp(y).then(Ordering::Greater),
523 }
524}
525
526pub(crate) fn validate_delete_range(left: &Bound<Bytes>, right: &Bound<Bytes>) -> bool {
527 match (left, right) {
528 (Unbounded, _) => unreachable!(),
530 (_, Unbounded) => true,
531 (Included(x), Included(y)) => x <= y,
532 (Included(x), Excluded(y)) | (Excluded(x), Included(y)) | (Excluded(x), Excluded(y)) => {
533 x < y
534 }
535 }
536}
537
538#[expect(dead_code)]
539pub(crate) fn filter_with_delete_range<'a>(
540 kv_iter: impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a,
541 mut delete_ranges_iter: impl Iterator<Item = &'a (Bound<Bytes>, Bound<Bytes>)> + 'a,
542) -> impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a {
543 let mut range = delete_ranges_iter.next();
544 if let Some((range_start, range_end)) = range {
545 assert!(
546 validate_delete_range(range_start, range_end),
547 "range_end {:?} smaller than range_start {:?}",
548 range_start,
549 range_end
550 );
551 }
552 kv_iter.filter(move |(key, _)| {
553 if let Some(range_bound) = range {
554 if cmp_delete_range_left_bounds(Included(&key.0), range_bound.0.as_ref())
555 == Ordering::Less
556 {
557 true
558 } else if range_bound.contains(key.as_ref()) {
559 false
560 } else {
561 loop {
563 range = delete_ranges_iter.next();
564 if let Some(range_bound) = range {
565 assert!(
566 validate_delete_range(&range_bound.0, &range_bound.1),
567 "range_end {:?} smaller than range_start {:?}",
568 range_bound.0,
569 range_bound.1
570 );
571 if cmp_delete_range_left_bounds(Included(key), range_bound.0.as_ref())
572 == Ordering::Less
573 {
574 break true;
576 } else if range_bound.contains(key.as_ref()) {
577 break false;
579 } else {
580 continue;
583 }
584 } else {
585 break true;
587 }
588 }
589 }
590 } else {
591 true
592 }
593 })
594}
595
596pub(crate) async fn wait_for_epoch(
601 notifier: &tokio::sync::watch::Sender<PinnedVersion>,
602 wait_epoch: u64,
603 table_id: TableId,
604) -> StorageResult<PinnedVersion> {
605 let mut prev_committed_epoch = None;
606 let prev_committed_epoch = &mut prev_committed_epoch;
607 let version = wait_for_update(
608 notifier,
609 |version| {
610 let committed_epoch = version.table_committed_epoch(table_id);
611 let ret = if let Some(committed_epoch) = committed_epoch {
612 if committed_epoch >= wait_epoch {
613 Ok(true)
614 } else {
615 Ok(false)
616 }
617 } else if prev_committed_epoch.is_none() {
618 Ok(false)
619 } else {
620 Err(HummockError::wait_epoch(format!(
621 "table {} has been dropped",
622 table_id
623 )))
624 };
625 *prev_committed_epoch = committed_epoch;
626 ret
627 },
628 || {
629 format!(
630 "wait_for_epoch: epoch: {}, table_id: {}",
631 wait_epoch, table_id
632 )
633 },
634 )
635 .await?;
636 Ok(version)
637}
638
639pub(crate) async fn wait_for_update(
640 notifier: &tokio::sync::watch::Sender<PinnedVersion>,
641 mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult<bool>,
642 mut periodic_debug_info: impl FnMut() -> String,
643) -> HummockResult<PinnedVersion> {
644 let mut receiver = notifier.subscribe();
645 {
646 let version = receiver.borrow_and_update();
647 if inspect_fn(&version)? {
648 return Ok(version.clone());
649 }
650 }
651 let start_time = Instant::now();
652 loop {
653 match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
654 Err(_) => {
655 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
656 LazyLock::new(|| LogSuppressor::per_minute(1));
657 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
667 let backtrace = cfg!(debug_assertions).then(Backtrace::capture);
669
670 let backtrace = backtrace
671 .as_ref()
672 .map(|bt| ShortBacktrace { bt, limit: 30 })
673 .map(tracing::field::display);
674
675 struct ShortBacktrace<'a> {
676 bt: &'a Backtrace,
677 limit: usize,
678 }
679
680 impl std::fmt::Display for ShortBacktrace<'_> {
681 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682 writeln!(f, "backtrace:")?;
683 for (idx, frame) in self.bt.frames().iter().take(self.limit).enumerate()
684 {
685 writeln!(f, "{}: {:?}", idx, frame)?;
686 }
687 Ok(())
688 }
689 }
690 tracing::warn!(
691 suppressed_count,
692 info = periodic_debug_info(),
693 elapsed = ?start_time.elapsed(),
694 backtrace,
695 "timeout when waiting for version update",
696 );
697 }
698 continue;
699 }
700 Ok(Err(_)) => {
701 return Err(HummockError::wait_epoch("tx dropped"));
702 }
703 Ok(Ok(_)) => {
704 let version = receiver.borrow_and_update();
705 if inspect_fn(&version)? {
706 return Ok(version.clone());
707 }
708 }
709 }
710 }
711}
712
713pub struct HummockMemoryCollector {
714 sstable_store: SstableStoreRef,
715 limiter: Arc<MemoryLimiter>,
716 storage_memory_config: StorageMemoryConfig,
717}
718
719impl HummockMemoryCollector {
720 pub fn new(
721 sstable_store: SstableStoreRef,
722 limiter: Arc<MemoryLimiter>,
723 storage_memory_config: StorageMemoryConfig,
724 ) -> Self {
725 Self {
726 sstable_store,
727 limiter,
728 storage_memory_config,
729 }
730 }
731}
732
733impl MemoryCollector for HummockMemoryCollector {
734 fn get_meta_memory_usage(&self) -> u64 {
735 self.sstable_store.meta_cache().memory().usage() as _
736 }
737
738 fn get_data_memory_usage(&self) -> u64 {
739 self.sstable_store.block_cache().memory().usage() as _
740 }
741
742 fn get_vector_meta_memory_usage(&self) -> u64 {
743 self.sstable_store.vector_meta_cache.usage() as _
744 }
745
746 fn get_vector_data_memory_usage(&self) -> u64 {
747 self.sstable_store.vector_block_cache.usage() as _
748 }
749
750 fn get_uploading_memory_usage(&self) -> u64 {
751 self.limiter.get_memory_usage()
752 }
753
754 fn get_prefetch_memory_usage(&self) -> usize {
755 self.sstable_store.get_prefetch_memory_usage()
756 }
757
758 fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
759 self.sstable_store.meta_cache().memory().usage() as f64
760 / self.sstable_store.meta_cache().memory().capacity() as f64
761 }
762
763 fn get_block_cache_memory_usage_ratio(&self) -> f64 {
764 self.sstable_store.block_cache().memory().usage() as f64
765 / self.sstable_store.block_cache().memory().capacity() as f64
766 }
767
768 fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64 {
769 self.sstable_store.vector_meta_cache.usage() as f64
770 / self.sstable_store.vector_meta_cache.capacity() as f64
771 }
772
773 fn get_vector_data_cache_memory_usage_ratio(&self) -> f64 {
774 self.sstable_store.vector_block_cache.usage() as f64
775 / self.sstable_store.vector_block_cache.capacity() as f64
776 }
777
778 fn get_shared_buffer_usage_ratio(&self) -> f64 {
779 self.limiter.get_memory_usage() as f64
780 / (self.storage_memory_config.shared_buffer_capacity_mb * 1024 * 1024) as f64
781 }
782}
783
784#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
785pub(crate) async fn merge_stream<'a>(
786 mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
787 inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
788 table_id: TableId,
789 epoch: u64,
790 rev: bool,
791) {
792 let inner_stream = inner_stream.peekable();
793 pin_mut!(inner_stream);
794
795 let mut mem_table_iter = mem_table_iter.fuse().peekable();
796
797 loop {
798 match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
799 (None, None) => break,
800 (Some(_), None) => {
802 let (key, value) = inner_stream.next().await.unwrap()?;
803 yield (key, value)
804 }
805 (None, Some(_)) => {
807 let (key, key_op) = mem_table_iter.next().unwrap();
808 match key_op {
809 KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
810 yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
811 }
812 _ => {}
813 }
814 }
815 (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
816 debug_assert_eq!(inner_key.user_key.table_id, table_id);
817 let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
818 if rev {
819 ret = ret.reverse();
820 }
821 match ret {
822 Ordering::Less => {
823 let (key, value) = inner_stream.next().await.unwrap()?;
825 yield (key, value);
826 }
827 Ordering::Equal => {
828 let (_, key_op) = mem_table_iter.next().unwrap();
832 let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
833 match key_op {
834 KeyOp::Insert(value) => {
835 yield (key.clone(), value.clone());
836 }
837 KeyOp::Delete(_) => {}
838 KeyOp::Update((old_value, new_value)) => {
839 debug_assert!(old_value == &old_value_in_inner);
840
841 yield (key, new_value.clone());
842 }
843 }
844 }
845 Ordering::Greater => {
846 let (key, key_op) = mem_table_iter.next().unwrap();
848
849 match key_op {
850 KeyOp::Insert(value) => {
851 yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
852 }
853 KeyOp::Delete(_) => {}
854 KeyOp::Update(_) => unreachable!(
855 "memtable update should always be paired with a storage key"
856 ),
857 }
858 }
859 }
860 }
861 (Some(Err(_)), Some(_)) => {
862 return Err(inner_stream.next().await.unwrap().unwrap_err());
864 }
865 }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use std::future::{Future, poll_fn};
872 use std::sync::Arc;
873 use std::task::Poll;
874
875 use futures::FutureExt;
876 use futures::future::join_all;
877 use rand::random_range;
878
879 use crate::hummock::utils::MemoryLimiter;
880
881 async fn assert_pending(future: &mut (impl Future + Unpin)) {
882 for _ in 0..10 {
883 assert!(
884 poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
885 .await
886 .is_pending()
887 );
888 }
889 }
890
891 #[tokio::test]
892 async fn test_loose_memory_limiter() {
893 let quota = 5;
894 let memory_limiter = MemoryLimiter::new(quota);
895 drop(memory_limiter.require_memory(6).await);
896 let tracker1 = memory_limiter.require_memory(3).await;
897 assert_eq!(3, memory_limiter.get_memory_usage());
898 let tracker2 = memory_limiter.require_memory(4).await;
899 assert_eq!(7, memory_limiter.get_memory_usage());
900 let mut future = memory_limiter.require_memory(5).boxed();
901 assert_pending(&mut future).await;
902 assert_eq!(7, memory_limiter.get_memory_usage());
903 drop(tracker1);
904 let tracker3 = future.await;
905 assert_eq!(9, memory_limiter.get_memory_usage());
906 drop(tracker2);
907 assert_eq!(5, memory_limiter.get_memory_usage());
908 drop(tracker3);
909 assert_eq!(0, memory_limiter.get_memory_usage());
910 }
911
912 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
913 async fn test_multi_thread_acquire_memory() {
914 const QUOTA: u64 = 10;
915 let memory_limiter = Arc::new(MemoryLimiter::new(200));
916 let mut handles = vec![];
917 for _ in 0..40 {
918 let limiter = memory_limiter.clone();
919 let h = tokio::spawn(async move {
920 let mut buffers = vec![];
921 let mut current_buffer_usage = random_range(2..=9);
922 for _ in 0..1000 {
923 if buffers.len() < current_buffer_usage
924 && let Some(tracker) = limiter.try_require_memory(QUOTA)
925 {
926 buffers.push(tracker);
927 } else {
928 buffers.clear();
929 current_buffer_usage = random_range(2..=9);
930 let req = limiter.require_memory(QUOTA);
931 match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
932 Ok(tracker) => {
933 buffers.push(tracker);
934 }
935 Err(_) => {
936 continue;
937 }
938 }
939 }
940 let sleep_time = random_range(1..=3);
941 tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
942 }
943 });
944 handles.push(h);
945 }
946 let h = join_all(handles);
947 let _ = h.await;
948 }
949}