risingwave_storage/hummock/
write_limiter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use arc_swap::ArcSwap;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::CompactionGroupId;
21use risingwave_pb::hummock::write_limits::WriteLimit;
22
23pub type WriteLimiterRef = Arc<WriteLimiter>;
24
25#[derive(Default)]
26pub struct WriteLimiter {
27    limits: ArcSwap<(
28        HashMap<CompactionGroupId, WriteLimit>,
29        HashMap<TableId, CompactionGroupId>,
30    )>,
31    notify: tokio::sync::Notify,
32}
33
34impl WriteLimiter {
35    pub fn unused() -> Arc<Self> {
36        Arc::new(WriteLimiter::default())
37    }
38
39    pub fn update_write_limits(&self, limits: HashMap<CompactionGroupId, WriteLimit>) {
40        let mut index: HashMap<TableId, CompactionGroupId> = HashMap::new();
41        for (group_id, limit) in &limits {
42            for table_id in &limit.table_ids {
43                index.insert(table_id.into(), *group_id);
44            }
45        }
46        self.limits.store(Arc::new((limits, index)));
47        self.notify.notify_waiters();
48    }
49
50    /// Returns the reason if write for `table_id` is blocked.
51    fn try_find(&self, table_id: &TableId) -> Option<String> {
52        let limits = self.limits.load();
53        let group_id = match limits.1.get(table_id) {
54            None => {
55                return None;
56            }
57            Some(group_id) => *group_id,
58        };
59        let reason = limits
60            .0
61            .get(&group_id)
62            .as_ref()
63            .expect("table to group index should be accurate")
64            .reason
65            .clone();
66        Some(reason)
67    }
68
69    /// Waits until write is permitted for `table_id`.
70    pub async fn wait_permission(&self, table_id: TableId) {
71        // Fast path.
72        if self.try_find(&table_id).is_none() {
73            return;
74        }
75        let mut first_block_msg = true;
76        // Slow path.
77        loop {
78            let notified = self.notify.notified();
79            match self.try_find(&table_id) {
80                Some(reason) => {
81                    if first_block_msg {
82                        first_block_msg = false;
83                        tracing::debug!(
84                            "write to table {} is blocked: {}",
85                            table_id.table_id,
86                            reason,
87                        );
88                    } else {
89                        tracing::debug!(
90                            "write limiter is updated, but write to table {} is still blocked: {}",
91                            table_id.table_id,
92                            reason,
93                        );
94                    }
95                }
96                None => {
97                    break;
98                }
99            }
100            notified.await;
101        }
102        tracing::debug!("write to table {} is unblocked", table_id.table_id,);
103    }
104}