risingwave_storage/hummock/
object_id_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::{HummockRawObjectId, HummockSstableObjectId, ObjectIdRange};
21use risingwave_pb::hummock::GetNewObjectIdsRequest;
22use risingwave_rpc_client::{GrpcCompactorProxyClient, HummockMetaClient};
23use sync_point::sync_point;
24use thiserror_ext::AsReport;
25use tokio::sync::oneshot;
26
27use crate::hummock::{HummockError, HummockResult};
28pub type ObjectIdManagerRef = Arc<ObjectIdManager>;
29
30#[async_trait::async_trait]
31pub trait GetObjectId: Send + Sync {
32    async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId>;
33}
34
35/// Caches SST object ids fetched from meta.
36pub struct ObjectIdManager {
37    // Lock order: `wait_queue` before `available_object_ids`.
38    wait_queue: Mutex<Option<Vec<oneshot::Sender<bool>>>>,
39    available_object_ids: Mutex<ObjectIdRange>,
40    remote_fetch_number: u32,
41    hummock_meta_client: Arc<dyn HummockMetaClient>,
42}
43
44impl ObjectIdManager {
45    pub fn new(hummock_meta_client: Arc<dyn HummockMetaClient>, remote_fetch_number: u32) -> Self {
46        Self {
47            wait_queue: Default::default(),
48            available_object_ids: Mutex::new(ObjectIdRange::new(u64::MIN, u64::MIN)),
49            remote_fetch_number,
50            hummock_meta_client,
51        }
52    }
53
54    /// Executes `f` with next SST id.
55    /// May fetch new SST ids via RPC.
56    async fn map_next_object_id<F>(&self, f: F) -> HummockResult<HummockRawObjectId>
57    where
58        F: Fn(&mut ObjectIdRange) -> Option<HummockRawObjectId>,
59    {
60        loop {
61            // 1. Try to get
62            if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
63                return Ok(new_id);
64            }
65            // 2. Otherwise either fetch new ids, or wait for previous fetch if any.
66            let waiter = {
67                let mut guard = self.wait_queue.lock();
68                if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
69                    return Ok(new_id);
70                }
71                let wait_queue = guard.deref_mut();
72                if let Some(wait_queue) = wait_queue {
73                    let (tx, rx) = oneshot::channel();
74                    wait_queue.push(tx);
75                    Some(rx)
76                } else {
77                    *wait_queue = Some(vec![]);
78                    None
79                }
80            };
81            if let Some(waiter) = waiter {
82                // Wait for previous fetch
83                sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_FOLLOWER");
84                let _ = waiter.await;
85                continue;
86            }
87            // Fetch new ids.
88            sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_LEADER");
89            sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FETCH");
90            async move {
91                let new_sst_ids = match self
92                    .hummock_meta_client
93                    .get_new_object_ids(self.remote_fetch_number)
94                    .await
95                    .map_err(HummockError::meta_error)
96                {
97                    Ok(new_sst_ids) => new_sst_ids,
98                    Err(err) => {
99                        self.notify_waiters(false);
100                        return Err(err);
101                    }
102                };
103                sync_point!("MAP_NEXT_SST_OBJECT_ID.AFTER_FETCH");
104                sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FILL_CACHE");
105                // Update local cache.
106                let result = {
107                    let mut guard = self.available_object_ids.lock();
108                    let available_object_ids = guard.deref_mut();
109                    if new_sst_ids.start_id < available_object_ids.end_id {
110                        Err(HummockError::meta_error(format!(
111                            "SST id moves backwards. new {} < old {}",
112                            new_sst_ids.start_id, available_object_ids.end_id
113                        )))
114                    } else {
115                        *available_object_ids = new_sst_ids;
116                        Ok(())
117                    }
118                };
119                self.notify_waiters(result.is_ok());
120                result
121            }
122            .await?;
123        }
124    }
125
126    fn notify_waiters(&self, success: bool) {
127        let mut guard = self.wait_queue.lock();
128        let wait_queue = guard.deref_mut().take().unwrap();
129        for notify in wait_queue {
130            let _ = notify.send(success);
131        }
132    }
133}
134
135#[async_trait::async_trait]
136impl GetObjectId for ObjectIdManager {
137    /// Returns a new SST id.
138    /// The id is guaranteed to be monotonic increasing.
139    async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
140        self.map_next_object_id(|available_object_ids| available_object_ids.get_next_object_id())
141            .await
142            .map(Into::into)
143    }
144}
145
146struct SharedComapctorObjectIdManagerCore {
147    output_object_ids: VecDeque<HummockSstableObjectId>,
148    client: Option<GrpcCompactorProxyClient>,
149    sstable_id_remote_fetch_number: u32,
150}
151impl SharedComapctorObjectIdManagerCore {
152    pub fn new(
153        output_object_ids: VecDeque<HummockSstableObjectId>,
154        client: GrpcCompactorProxyClient,
155        sstable_id_remote_fetch_number: u32,
156    ) -> Self {
157        Self {
158            output_object_ids,
159            client: Some(client),
160            sstable_id_remote_fetch_number,
161        }
162    }
163
164    pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
165        Self {
166            output_object_ids: output_object_ids.into_iter().map(|x| x.into()).collect(),
167            client: None,
168            sstable_id_remote_fetch_number: 0,
169        }
170    }
171}
172/// `SharedComapctorObjectIdManager` is used to get output sst id for serverless compaction.
173pub struct SharedComapctorObjectIdManager {
174    core: tokio::sync::Mutex<SharedComapctorObjectIdManagerCore>,
175}
176
177impl SharedComapctorObjectIdManager {
178    pub fn new(
179        output_object_ids: VecDeque<HummockSstableObjectId>,
180        client: GrpcCompactorProxyClient,
181        sstable_id_remote_fetch_number: u32,
182    ) -> Arc<Self> {
183        Arc::new(Self {
184            core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::new(
185                output_object_ids,
186                client,
187                sstable_id_remote_fetch_number,
188            )),
189        })
190    }
191
192    pub fn for_test(output_object_ids: VecDeque<u64>) -> Arc<Self> {
193        Arc::new(Self {
194            core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::for_test(
195                output_object_ids,
196            )),
197        })
198    }
199}
200
201#[async_trait::async_trait]
202impl GetObjectId for SharedComapctorObjectIdManager {
203    async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
204        let mut guard = self.core.lock().await;
205        let core = guard.deref_mut();
206
207        if let Some(first_element) = core.output_object_ids.pop_front() {
208            Ok(first_element)
209        } else {
210            tracing::warn!(
211                "The pre-allocated object ids are used up, and new object id are obtained through RPC."
212            );
213            let request = GetNewObjectIdsRequest {
214                number: core.sstable_id_remote_fetch_number,
215            };
216            match core
217                .client
218                .as_mut()
219                .expect("GrpcCompactorProxyClient is None")
220                .get_new_sst_ids(request)
221                .await
222            {
223                Ok(response) => {
224                    let resp = response.into_inner();
225                    let start_id = resp.start_id;
226                    core.output_object_ids.extend(
227                        ((start_id + 1)..resp.end_id).map(Into::<HummockSstableObjectId>::into),
228                    );
229                    Ok(start_id.into())
230                }
231                Err(e) => Err(HummockError::other(format!(
232                    "Fail to get new sst id: {}",
233                    e.as_report()
234                ))),
235            }
236        }
237    }
238}