risingwave_storage/hummock/
object_id_manager.rs1use std::collections::VecDeque;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::{HummockRawObjectId, HummockSstableObjectId, ObjectIdRange};
21use risingwave_pb::hummock::GetNewObjectIdsRequest;
22use risingwave_rpc_client::{GrpcCompactorProxyClient, HummockMetaClient};
23use sync_point::sync_point;
24use thiserror_ext::AsReport;
25use tokio::sync::oneshot;
26
27use crate::hummock::{HummockError, HummockResult};
28pub type ObjectIdManagerRef = Arc<ObjectIdManager>;
29
30#[async_trait::async_trait]
31pub trait GetObjectId: Send + Sync {
32 async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId>;
33}
34
35pub struct ObjectIdManager {
37 wait_queue: Mutex<Option<Vec<oneshot::Sender<bool>>>>,
39 available_object_ids: Mutex<ObjectIdRange>,
40 remote_fetch_number: u32,
41 hummock_meta_client: Arc<dyn HummockMetaClient>,
42}
43
44impl ObjectIdManager {
45 pub fn new(hummock_meta_client: Arc<dyn HummockMetaClient>, remote_fetch_number: u32) -> Self {
46 Self {
47 wait_queue: Default::default(),
48 available_object_ids: Mutex::new(ObjectIdRange::new(u64::MIN, u64::MIN)),
49 remote_fetch_number,
50 hummock_meta_client,
51 }
52 }
53
54 async fn map_next_object_id<F>(&self, f: F) -> HummockResult<HummockRawObjectId>
57 where
58 F: Fn(&mut ObjectIdRange) -> Option<HummockRawObjectId>,
59 {
60 loop {
61 if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
63 return Ok(new_id);
64 }
65 let waiter = {
67 let mut guard = self.wait_queue.lock();
68 if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
69 return Ok(new_id);
70 }
71 let wait_queue = guard.deref_mut();
72 if let Some(wait_queue) = wait_queue {
73 let (tx, rx) = oneshot::channel();
74 wait_queue.push(tx);
75 Some(rx)
76 } else {
77 *wait_queue = Some(vec![]);
78 None
79 }
80 };
81 if let Some(waiter) = waiter {
82 sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_FOLLOWER");
84 let _ = waiter.await;
85 continue;
86 }
87 sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_LEADER");
89 sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FETCH");
90 async move {
91 let new_sst_ids = match self
92 .hummock_meta_client
93 .get_new_object_ids(self.remote_fetch_number)
94 .await
95 .map_err(HummockError::meta_error)
96 {
97 Ok(new_sst_ids) => new_sst_ids,
98 Err(err) => {
99 self.notify_waiters(false);
100 return Err(err);
101 }
102 };
103 sync_point!("MAP_NEXT_SST_OBJECT_ID.AFTER_FETCH");
104 sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FILL_CACHE");
105 let result = {
107 let mut guard = self.available_object_ids.lock();
108 let available_object_ids = guard.deref_mut();
109 if new_sst_ids.start_id < available_object_ids.end_id {
110 Err(HummockError::meta_error(format!(
111 "SST id moves backwards. new {} < old {}",
112 new_sst_ids.start_id, available_object_ids.end_id
113 )))
114 } else {
115 *available_object_ids = new_sst_ids;
116 Ok(())
117 }
118 };
119 self.notify_waiters(result.is_ok());
120 result
121 }
122 .await?;
123 }
124 }
125
126 fn notify_waiters(&self, success: bool) {
127 let mut guard = self.wait_queue.lock();
128 let wait_queue = guard.deref_mut().take().unwrap();
129 for notify in wait_queue {
130 let _ = notify.send(success);
131 }
132 }
133}
134
135#[async_trait::async_trait]
136impl GetObjectId for ObjectIdManager {
137 async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
140 self.map_next_object_id(|available_object_ids| available_object_ids.get_next_object_id())
141 .await
142 .map(Into::into)
143 }
144}
145
146struct SharedComapctorObjectIdManagerCore {
147 output_object_ids: VecDeque<HummockSstableObjectId>,
148 client: Option<GrpcCompactorProxyClient>,
149 sstable_id_remote_fetch_number: u32,
150}
151impl SharedComapctorObjectIdManagerCore {
152 pub fn new(
153 output_object_ids: VecDeque<HummockSstableObjectId>,
154 client: GrpcCompactorProxyClient,
155 sstable_id_remote_fetch_number: u32,
156 ) -> Self {
157 Self {
158 output_object_ids,
159 client: Some(client),
160 sstable_id_remote_fetch_number,
161 }
162 }
163
164 pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
165 Self {
166 output_object_ids: output_object_ids.into_iter().map(|x| x.into()).collect(),
167 client: None,
168 sstable_id_remote_fetch_number: 0,
169 }
170 }
171}
172pub struct SharedComapctorObjectIdManager {
174 core: tokio::sync::Mutex<SharedComapctorObjectIdManagerCore>,
175}
176
177impl SharedComapctorObjectIdManager {
178 pub fn new(
179 output_object_ids: VecDeque<HummockSstableObjectId>,
180 client: GrpcCompactorProxyClient,
181 sstable_id_remote_fetch_number: u32,
182 ) -> Arc<Self> {
183 Arc::new(Self {
184 core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::new(
185 output_object_ids,
186 client,
187 sstable_id_remote_fetch_number,
188 )),
189 })
190 }
191
192 pub fn for_test(output_object_ids: VecDeque<u64>) -> Arc<Self> {
193 Arc::new(Self {
194 core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::for_test(
195 output_object_ids,
196 )),
197 })
198 }
199}
200
201#[async_trait::async_trait]
202impl GetObjectId for SharedComapctorObjectIdManager {
203 async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
204 let mut guard = self.core.lock().await;
205 let core = guard.deref_mut();
206
207 if let Some(first_element) = core.output_object_ids.pop_front() {
208 Ok(first_element)
209 } else {
210 tracing::warn!(
211 "The pre-allocated object ids are used up, and new object id are obtained through RPC."
212 );
213 let request = GetNewObjectIdsRequest {
214 number: core.sstable_id_remote_fetch_number,
215 };
216 match core
217 .client
218 .as_mut()
219 .expect("GrpcCompactorProxyClient is None")
220 .get_new_sst_ids(request)
221 .await
222 {
223 Ok(response) => {
224 let resp = response.into_inner();
225 let start_id = resp.start_id;
226 core.output_object_ids.extend(
227 ((start_id + 1)..resp.end_id).map(Into::<HummockSstableObjectId>::into),
228 );
229 Ok(start_id.into())
230 }
231 Err(e) => Err(HummockError::other(format!(
232 "Fail to get new sst id: {}",
233 e.as_report()
234 ))),
235 }
236 }
237 }
238}