pgwire/
memory_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use crate::pg_message::ServerThrottleReason;
19
20/// `MessageMemoryManager` tracks memory usage of frontend messages and provides hint of whether this message would cause memory constraint violation.
21///
22/// For any message of size n,
23/// - If n <= `min_filter_bytes`, the message won't add to the memory constraint, because the message is too small that it can always be accepted.
24/// - If n > `max_filter_bytes`, the message won't add to the memory constraint, because the message is too large that it's better to be rejected immediately.
25/// - Otherwise, the message will add size n to the memory constraint. See `MessageMemoryManager::add`.
26pub struct MessageMemoryManager {
27    pub current_running_bytes: AtomicU64,
28    pub max_running_bytes: u64,
29    pub min_filter_bytes: u64,
30    pub max_filter_bytes: u64,
31}
32
33pub type MessageMemoryManagerRef = Arc<MessageMemoryManager>;
34
35impl MessageMemoryManager {
36    pub fn new(max_running_bytes: u64, min_bytes: u64, max_bytes: u64) -> Self {
37        Self {
38            current_running_bytes: AtomicU64::new(0),
39            max_running_bytes,
40            min_filter_bytes: min_bytes,
41            max_filter_bytes: max_bytes,
42        }
43    }
44
45    /// Returns a `ServerThrottleReason` indicating whether any memory constraint has been violated.
46    pub fn add(
47        self: &MessageMemoryManagerRef,
48        bytes: u64,
49    ) -> (Option<ServerThrottleReason>, Option<MessageMemoryGuard>) {
50        if bytes > self.max_filter_bytes {
51            return (Some(ServerThrottleReason::TooLargeMessage), None);
52        }
53        if bytes <= self.min_filter_bytes {
54            return (None, None);
55        }
56        let prev = self
57            .current_running_bytes
58            .fetch_add(bytes, Ordering::Relaxed);
59        let guard: MessageMemoryGuard = MessageMemoryGuard {
60            bytes,
61            manager: self.clone(),
62        };
63        // Always permit at least one entry, regardless of its size.
64        let reason = if prev != 0 && prev + bytes > self.max_running_bytes {
65            Some(ServerThrottleReason::TooManyMemoryUsage)
66        } else {
67            None
68        };
69        (reason, Some(guard))
70    }
71
72    fn sub(&self, bytes: u64) {
73        self.current_running_bytes
74            .fetch_sub(bytes, Ordering::Relaxed);
75    }
76}
77
78pub struct MessageMemoryGuard {
79    bytes: u64,
80    manager: MessageMemoryManagerRef,
81}
82
83impl Drop for MessageMemoryGuard {
84    fn drop(&mut self) {
85        self.manager.sub(self.bytes);
86    }
87}