risingwave_storage/hummock/
write_limiter.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use arc_swap::ArcSwap;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::CompactionGroupId;
21use risingwave_pb::hummock::write_limits::WriteLimit;
22
23pub type WriteLimiterRef = Arc<WriteLimiter>;
24
25#[derive(Default)]
26pub struct WriteLimiter {
27 limits: ArcSwap<(
28 HashMap<CompactionGroupId, WriteLimit>,
29 HashMap<TableId, CompactionGroupId>,
30 )>,
31 notify: tokio::sync::Notify,
32}
33
34impl WriteLimiter {
35 pub fn unused() -> Arc<Self> {
36 Arc::new(WriteLimiter::default())
37 }
38
39 pub fn update_write_limits(&self, limits: HashMap<CompactionGroupId, WriteLimit>) {
40 let mut index: HashMap<TableId, CompactionGroupId> = HashMap::new();
41 for (group_id, limit) in &limits {
42 for table_id in &limit.table_ids {
43 index.insert(table_id.into(), *group_id);
44 }
45 }
46 self.limits.store(Arc::new((limits, index)));
47 self.notify.notify_waiters();
48 }
49
50 fn try_find(&self, table_id: &TableId) -> Option<String> {
52 let limits = self.limits.load();
53 let group_id = match limits.1.get(table_id) {
54 None => {
55 return None;
56 }
57 Some(group_id) => *group_id,
58 };
59 let reason = limits
60 .0
61 .get(&group_id)
62 .as_ref()
63 .expect("table to group index should be accurate")
64 .reason
65 .clone();
66 Some(reason)
67 }
68
69 pub async fn wait_permission(&self, table_id: TableId) {
71 if self.try_find(&table_id).is_none() {
73 return;
74 }
75 let mut first_block_msg = true;
76 loop {
78 let notified = self.notify.notified();
79 match self.try_find(&table_id) {
80 Some(reason) => {
81 if first_block_msg {
82 first_block_msg = false;
83 tracing::debug!(
84 "write to table {} is blocked: {}",
85 table_id.table_id,
86 reason,
87 );
88 } else {
89 tracing::debug!(
90 "write limiter is updated, but write to table {} is still blocked: {}",
91 table_id.table_id,
92 reason,
93 );
94 }
95 }
96 None => {
97 break;
98 }
99 }
100 notified.await;
101 }
102 tracing::debug!("write to table {} is unblocked", table_id.table_id,);
103 }
104}