pgwire/memory_manager.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use crate::pg_message::ServerThrottleReason;
19
20/// `MessageMemoryManager` tracks memory usage of frontend messages and provides hint of whether this message would cause memory constraint violation.
21///
22/// For any message of size n,
23/// - If n <= `min_filter_bytes`, the message won't add to the memory constraint, because the message is too small that it can always be accepted.
24/// - If n > `max_filter_bytes`, the message won't add to the memory constraint, because the message is too large that it's better to be rejected immediately.
25/// - Otherwise, the message will add size n to the memory constraint. See `MessageMemoryManager::add`.
26pub struct MessageMemoryManager {
27 pub current_running_bytes: AtomicU64,
28 pub max_running_bytes: u64,
29 pub min_filter_bytes: u64,
30 pub max_filter_bytes: u64,
31}
32
33pub type MessageMemoryManagerRef = Arc<MessageMemoryManager>;
34
35impl MessageMemoryManager {
36 pub fn new(max_running_bytes: u64, min_bytes: u64, max_bytes: u64) -> Self {
37 Self {
38 current_running_bytes: AtomicU64::new(0),
39 max_running_bytes,
40 min_filter_bytes: min_bytes,
41 max_filter_bytes: max_bytes,
42 }
43 }
44
45 /// Returns a `ServerThrottleReason` indicating whether any memory constraint has been violated.
46 pub fn add(
47 self: &MessageMemoryManagerRef,
48 bytes: u64,
49 ) -> (Option<ServerThrottleReason>, Option<MessageMemoryGuard>) {
50 if bytes > self.max_filter_bytes {
51 return (Some(ServerThrottleReason::TooLargeMessage), None);
52 }
53 if bytes <= self.min_filter_bytes {
54 return (None, None);
55 }
56 let prev = self
57 .current_running_bytes
58 .fetch_add(bytes, Ordering::Relaxed);
59 let guard: MessageMemoryGuard = MessageMemoryGuard {
60 bytes,
61 manager: self.clone(),
62 };
63 // Always permit at least one entry, regardless of its size.
64 let reason = if prev != 0 && prev + bytes > self.max_running_bytes {
65 Some(ServerThrottleReason::TooManyMemoryUsage)
66 } else {
67 None
68 };
69 (reason, Some(guard))
70 }
71
72 fn sub(&self, bytes: u64) {
73 self.current_running_bytes
74 .fetch_sub(bytes, Ordering::Relaxed);
75 }
76}
77
78pub struct MessageMemoryGuard {
79 bytes: u64,
80 manager: MessageMemoryManagerRef,
81}
82
83impl Drop for MessageMemoryGuard {
84 fn drop(&mut self) {
85 self.manager.sub(self.bytes);
86 }
87}