risingwave_common/memory/
mem_context.rsuse std::ops::Deref;
use std::sync::Arc;
use prometheus::core::Atomic;
use risingwave_common_metrics::TrAdderAtomic;
use super::MonitoredGlobalAlloc;
use crate::metrics::{LabelGuardedIntGauge, TrAdderGauge};
pub trait MemCounter: Send + Sync + 'static {
fn add(&self, bytes: i64);
fn get_bytes_used(&self) -> i64;
}
impl MemCounter for TrAdderGauge {
fn add(&self, bytes: i64) {
self.add(bytes)
}
fn get_bytes_used(&self) -> i64 {
self.get()
}
}
impl MemCounter for TrAdderAtomic {
fn add(&self, bytes: i64) {
self.inc_by(bytes)
}
fn get_bytes_used(&self) -> i64 {
self.get()
}
}
impl<const N: usize> MemCounter for LabelGuardedIntGauge<N> {
fn add(&self, bytes: i64) {
self.deref().add(bytes)
}
fn get_bytes_used(&self) -> i64 {
self.get()
}
}
struct MemoryContextInner {
counter: Box<dyn MemCounter>,
parent: Option<MemoryContext>,
mem_limit: u64,
}
#[derive(Clone)]
pub struct MemoryContext {
inner: Option<Arc<MemoryContextInner>>,
}
impl MemoryContext {
pub fn new(parent: Option<MemoryContext>, counter: impl MemCounter) -> Self {
let mem_limit = parent.as_ref().map_or_else(|| u64::MAX, |p| p.mem_limit());
Self::new_with_mem_limit(parent, counter, mem_limit)
}
pub fn new_with_mem_limit(
parent: Option<MemoryContext>,
counter: impl MemCounter,
mem_limit: u64,
) -> Self {
let c = Box::new(counter);
Self {
inner: Some(Arc::new(MemoryContextInner {
counter: c,
parent,
mem_limit,
})),
}
}
pub fn none() -> Self {
Self { inner: None }
}
pub fn root(counter: impl MemCounter, mem_limit: u64) -> Self {
Self::new_with_mem_limit(None, counter, mem_limit)
}
pub fn for_spill_test() -> Self {
Self::new_with_mem_limit(None, TrAdderAtomic::new(0), 0)
}
pub fn add(&self, bytes: i64) -> bool {
if let Some(inner) = &self.inner {
if (inner.counter.get_bytes_used() + bytes) as u64 > inner.mem_limit {
return false;
}
if let Some(parent) = &inner.parent {
if parent.add(bytes) {
inner.counter.add(bytes);
} else {
return false;
}
} else {
inner.counter.add(bytes);
}
}
true
}
pub fn get_bytes_used(&self) -> i64 {
if let Some(inner) = &self.inner {
inner.counter.get_bytes_used()
} else {
0
}
}
pub fn mem_limit(&self) -> u64 {
if let Some(inner) = &self.inner {
inner.mem_limit
} else {
u64::MAX
}
}
pub fn check_memory_usage(&self) -> bool {
if let Some(inner) = &self.inner {
if inner.counter.get_bytes_used() as u64 > inner.mem_limit {
return false;
}
if let Some(parent) = &inner.parent {
return parent.check_memory_usage();
}
}
true
}
pub fn global_allocator(&self) -> MonitoredGlobalAlloc {
MonitoredGlobalAlloc::with_memory_context(self.clone())
}
}
impl Drop for MemoryContextInner {
fn drop(&mut self) {
if let Some(p) = &self.parent {
p.add(-self.counter.get_bytes_used());
}
}
}