risingwave_storage/hummock/
object_id_manager.rs1use std::collections::VecDeque;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::{HummockRawObjectId, HummockSstableObjectId, ObjectIdRange};
21use risingwave_pb::hummock::GetNewObjectIdsRequest;
22use risingwave_rpc_client::{GrpcCompactorProxyClient, HummockMetaClient};
23use sync_point::sync_point;
24use thiserror_ext::AsReport;
25use tokio::sync::oneshot;
26
27use crate::hummock::{HummockError, HummockResult};
28pub type ObjectIdManagerRef = Arc<ObjectIdManager>;
29
30#[async_trait::async_trait]
31pub trait GetObjectId: Send + Sync {
32 async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId>;
33}
34
35pub struct ObjectIdManager {
37 wait_queue: Mutex<Option<Vec<oneshot::Sender<bool>>>>,
39 available_object_ids: Mutex<ObjectIdRange>,
40 remote_fetch_number: u32,
41 hummock_meta_client: Arc<dyn HummockMetaClient>,
42}
43
44impl ObjectIdManager {
45 pub fn new(hummock_meta_client: Arc<dyn HummockMetaClient>, remote_fetch_number: u32) -> Self {
46 Self {
47 wait_queue: Default::default(),
48 available_object_ids: Mutex::new(ObjectIdRange::new(u64::MIN, u64::MIN)),
49 remote_fetch_number,
50 hummock_meta_client,
51 }
52 }
53
54 pub async fn get_new_object_id<I: From<HummockRawObjectId>>(&self) -> HummockResult<I> {
55 self.map_next_object_id(|available_object_ids| available_object_ids.get_next_object_id())
56 .await
57 .map(Into::into)
58 }
59
60 async fn map_next_object_id<F>(&self, f: F) -> HummockResult<HummockRawObjectId>
63 where
64 F: Fn(&mut ObjectIdRange) -> Option<HummockRawObjectId>,
65 {
66 loop {
67 if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
69 return Ok(new_id);
70 }
71 let waiter = {
73 let mut guard = self.wait_queue.lock();
74 if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
75 return Ok(new_id);
76 }
77 let wait_queue = guard.deref_mut();
78 if let Some(wait_queue) = wait_queue {
79 let (tx, rx) = oneshot::channel();
80 wait_queue.push(tx);
81 Some(rx)
82 } else {
83 *wait_queue = Some(vec![]);
84 None
85 }
86 };
87 if let Some(waiter) = waiter {
88 sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_FOLLOWER");
90 let _ = waiter.await;
91 continue;
92 }
93 sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_LEADER");
95 sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FETCH");
96 async move {
97 let new_sst_ids = match self
98 .hummock_meta_client
99 .get_new_object_ids(self.remote_fetch_number)
100 .await
101 .map_err(HummockError::meta_error)
102 {
103 Ok(new_sst_ids) => new_sst_ids,
104 Err(err) => {
105 self.notify_waiters(false);
106 return Err(err);
107 }
108 };
109 sync_point!("MAP_NEXT_SST_OBJECT_ID.AFTER_FETCH");
110 sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FILL_CACHE");
111 let result = {
113 let mut guard = self.available_object_ids.lock();
114 let available_object_ids = guard.deref_mut();
115 if new_sst_ids.start_id < available_object_ids.end_id {
116 Err(HummockError::meta_error(format!(
117 "SST id moves backwards. new {} < old {}",
118 new_sst_ids.start_id, available_object_ids.end_id
119 )))
120 } else {
121 *available_object_ids = new_sst_ids;
122 Ok(())
123 }
124 };
125 self.notify_waiters(result.is_ok());
126 result
127 }
128 .await?;
129 }
130 }
131
132 fn notify_waiters(&self, success: bool) {
133 let mut guard = self.wait_queue.lock();
134 let wait_queue = guard.deref_mut().take().unwrap();
135 for notify in wait_queue {
136 let _ = notify.send(success);
137 }
138 }
139}
140
141#[async_trait::async_trait]
142impl GetObjectId for ObjectIdManager {
143 async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
146 self.get_new_object_id().await
147 }
148}
149
150struct SharedComapctorObjectIdManagerCore {
151 output_object_ids: VecDeque<HummockSstableObjectId>,
152 client: Option<GrpcCompactorProxyClient>,
153 sstable_id_remote_fetch_number: u32,
154}
155impl SharedComapctorObjectIdManagerCore {
156 pub fn new(
157 output_object_ids: VecDeque<HummockSstableObjectId>,
158 client: GrpcCompactorProxyClient,
159 sstable_id_remote_fetch_number: u32,
160 ) -> Self {
161 Self {
162 output_object_ids,
163 client: Some(client),
164 sstable_id_remote_fetch_number,
165 }
166 }
167
168 pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
169 Self {
170 output_object_ids: output_object_ids.into_iter().map(|x| x.into()).collect(),
171 client: None,
172 sstable_id_remote_fetch_number: 0,
173 }
174 }
175}
176pub struct SharedComapctorObjectIdManager {
178 core: tokio::sync::Mutex<SharedComapctorObjectIdManagerCore>,
179}
180
181impl SharedComapctorObjectIdManager {
182 pub fn new(
183 output_object_ids: VecDeque<HummockSstableObjectId>,
184 client: GrpcCompactorProxyClient,
185 sstable_id_remote_fetch_number: u32,
186 ) -> Arc<Self> {
187 Arc::new(Self {
188 core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::new(
189 output_object_ids,
190 client,
191 sstable_id_remote_fetch_number,
192 )),
193 })
194 }
195
196 pub fn for_test(output_object_ids: VecDeque<u64>) -> Arc<Self> {
197 Arc::new(Self {
198 core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::for_test(
199 output_object_ids,
200 )),
201 })
202 }
203}
204
205#[async_trait::async_trait]
206impl GetObjectId for SharedComapctorObjectIdManager {
207 async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
208 let mut guard = self.core.lock().await;
209 let core = guard.deref_mut();
210
211 if let Some(first_element) = core.output_object_ids.pop_front() {
212 Ok(first_element)
213 } else {
214 tracing::warn!(
215 "The pre-allocated object ids are used up, and new object id are obtained through RPC."
216 );
217 let request = GetNewObjectIdsRequest {
218 number: core.sstable_id_remote_fetch_number,
219 };
220 match core
221 .client
222 .as_mut()
223 .expect("GrpcCompactorProxyClient is None")
224 .get_new_sst_ids(request)
225 .await
226 {
227 Ok(response) => {
228 let resp = response.into_inner();
229 let start_id = resp.start_id;
230 core.output_object_ids.extend(
231 ((start_id + 1)..resp.end_id).map(Into::<HummockSstableObjectId>::into),
232 );
233 Ok(start_id.into())
234 }
235 Err(e) => Err(HummockError::other(format!(
236 "Fail to get new sst id: {}",
237 e.as_report()
238 ))),
239 }
240 }
241 }
242}