risingwave_storage/hummock/sstable/
sstable_object_id_manager.rs1use std::collections::VecDeque;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange};
21use risingwave_pb::hummock::GetNewSstIdsRequest;
22use risingwave_rpc_client::{GrpcCompactorProxyClient, HummockMetaClient};
23use sync_point::sync_point;
24use thiserror_ext::AsReport;
25use tokio::sync::oneshot;
26
27use crate::hummock::{HummockError, HummockResult};
28pub type SstableObjectIdManagerRef = Arc<SstableObjectIdManager>;
29use dyn_clone::DynClone;
30#[async_trait::async_trait]
31pub trait GetObjectId: DynClone + Send + Sync {
32 async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId>;
33}
34dyn_clone::clone_trait_object!(GetObjectId);
35pub struct SstableObjectIdManager {
37 wait_queue: Mutex<Option<Vec<oneshot::Sender<bool>>>>,
39 available_sst_object_ids: Mutex<SstObjectIdRange>,
40 remote_fetch_number: u32,
41 hummock_meta_client: Arc<dyn HummockMetaClient>,
42}
43
44impl SstableObjectIdManager {
45 pub fn new(hummock_meta_client: Arc<dyn HummockMetaClient>, remote_fetch_number: u32) -> Self {
46 Self {
47 wait_queue: Default::default(),
48 available_sst_object_ids: Mutex::new(SstObjectIdRange::new(
49 HummockSstableObjectId::MIN,
50 HummockSstableObjectId::MIN,
51 )),
52 remote_fetch_number,
53 hummock_meta_client,
54 }
55 }
56
57 async fn map_next_sst_object_id<F>(
60 self: &Arc<Self>,
61 f: F,
62 ) -> HummockResult<HummockSstableObjectId>
63 where
64 F: Fn(&mut SstObjectIdRange) -> Option<HummockSstableObjectId>,
65 {
66 loop {
67 if let Some(new_id) = f(self.available_sst_object_ids.lock().deref_mut()) {
69 return Ok(new_id);
70 }
71 let waiter = {
73 let mut guard = self.wait_queue.lock();
74 if let Some(new_id) = f(self.available_sst_object_ids.lock().deref_mut()) {
75 return Ok(new_id);
76 }
77 let wait_queue = guard.deref_mut();
78 if let Some(wait_queue) = wait_queue {
79 let (tx, rx) = oneshot::channel();
80 wait_queue.push(tx);
81 Some(rx)
82 } else {
83 *wait_queue = Some(vec![]);
84 None
85 }
86 };
87 if let Some(waiter) = waiter {
88 sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_FOLLOWER");
90 let _ = waiter.await;
91 continue;
92 }
93 sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_LEADER");
95 sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FETCH");
96 let this = self.clone();
97 tokio::spawn(async move {
98 let new_sst_ids = match this
99 .hummock_meta_client
100 .get_new_sst_ids(this.remote_fetch_number)
101 .await
102 .map_err(HummockError::meta_error)
103 {
104 Ok(new_sst_ids) => new_sst_ids,
105 Err(err) => {
106 this.notify_waiters(false);
107 return Err(err);
108 }
109 };
110 sync_point!("MAP_NEXT_SST_OBJECT_ID.AFTER_FETCH");
111 sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FILL_CACHE");
112 let result = {
114 let mut guard = this.available_sst_object_ids.lock();
115 let available_sst_object_ids = guard.deref_mut();
116 if new_sst_ids.start_id < available_sst_object_ids.end_id {
117 Err(HummockError::meta_error(format!(
118 "SST id moves backwards. new {} < old {}",
119 new_sst_ids.start_id, available_sst_object_ids.end_id
120 )))
121 } else {
122 *available_sst_object_ids = new_sst_ids;
123 Ok(())
124 }
125 };
126 this.notify_waiters(result.is_ok());
127 result
128 })
129 .await
130 .unwrap()?;
131 }
132 }
133
134 fn notify_waiters(&self, success: bool) {
135 let mut guard = self.wait_queue.lock();
136 let wait_queue = guard.deref_mut().take().unwrap();
137 for notify in wait_queue {
138 let _ = notify.send(success);
139 }
140 }
141}
142
143#[async_trait::async_trait]
144impl GetObjectId for Arc<SstableObjectIdManager> {
145 async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId> {
148 self.map_next_sst_object_id(|available_sst_object_ids| {
149 available_sst_object_ids.get_next_sst_object_id()
150 })
151 .await
152 }
153}
154
155struct SharedComapctorObjectIdManagerCore {
156 output_object_ids: VecDeque<u64>,
157 client: Option<GrpcCompactorProxyClient>,
158 sstable_id_remote_fetch_number: u32,
159}
160impl SharedComapctorObjectIdManagerCore {
161 pub fn new(
162 output_object_ids: VecDeque<u64>,
163 client: GrpcCompactorProxyClient,
164 sstable_id_remote_fetch_number: u32,
165 ) -> Self {
166 Self {
167 output_object_ids,
168 client: Some(client),
169 sstable_id_remote_fetch_number,
170 }
171 }
172
173 pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
174 Self {
175 output_object_ids,
176 client: None,
177 sstable_id_remote_fetch_number: 0,
178 }
179 }
180}
181#[derive(Clone)]
183pub struct SharedComapctorObjectIdManager {
184 core: Arc<tokio::sync::Mutex<SharedComapctorObjectIdManagerCore>>,
185}
186
187impl SharedComapctorObjectIdManager {
188 pub fn new(
189 output_object_ids: VecDeque<u64>,
190 client: GrpcCompactorProxyClient,
191 sstable_id_remote_fetch_number: u32,
192 ) -> Self {
193 Self {
194 core: Arc::new(tokio::sync::Mutex::new(
195 SharedComapctorObjectIdManagerCore::new(
196 output_object_ids,
197 client,
198 sstable_id_remote_fetch_number,
199 ),
200 )),
201 }
202 }
203
204 pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
205 Self {
206 core: Arc::new(tokio::sync::Mutex::new(
207 SharedComapctorObjectIdManagerCore::for_test(output_object_ids),
208 )),
209 }
210 }
211}
212
213#[async_trait::async_trait]
214impl GetObjectId for SharedComapctorObjectIdManager {
215 async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId> {
216 let mut guard = self.core.lock().await;
217 let core = guard.deref_mut();
218
219 if let Some(first_element) = core.output_object_ids.pop_front() {
220 Ok(first_element)
221 } else {
222 tracing::warn!(
223 "The pre-allocated object ids are used up, and new object id are obtained through RPC."
224 );
225 let request = GetNewSstIdsRequest {
226 number: core.sstable_id_remote_fetch_number,
227 };
228 match core
229 .client
230 .as_mut()
231 .expect("GrpcCompactorProxyClient is None")
232 .get_new_sst_ids(request)
233 .await
234 {
235 Ok(response) => {
236 let resp = response.into_inner();
237 let start_id = resp.start_id;
238 core.output_object_ids.extend((start_id + 1)..resp.end_id);
239 Ok(start_id)
240 }
241 Err(e) => Err(HummockError::other(format!(
242 "Fail to get new sst id: {}",
243 e.as_report()
244 ))),
245 }
246 }
247 }
248}