risingwave_storage/hummock/
object_id_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::{HummockRawObjectId, HummockSstableObjectId, ObjectIdRange};
21use risingwave_pb::hummock::GetNewObjectIdsRequest;
22use risingwave_rpc_client::{GrpcCompactorProxyClient, HummockMetaClient};
23use sync_point::sync_point;
24use thiserror_ext::AsReport;
25use tokio::sync::oneshot;
26
27use crate::hummock::{HummockError, HummockResult};
28pub type ObjectIdManagerRef = Arc<ObjectIdManager>;
29
30#[async_trait::async_trait]
31pub trait GetObjectId: Send + Sync {
32    async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId>;
33}
34
35/// Caches SST object ids fetched from meta.
36pub struct ObjectIdManager {
37    // Lock order: `wait_queue` before `available_object_ids`.
38    wait_queue: Mutex<Option<Vec<oneshot::Sender<bool>>>>,
39    available_object_ids: Mutex<ObjectIdRange>,
40    remote_fetch_number: u32,
41    hummock_meta_client: Arc<dyn HummockMetaClient>,
42}
43
44impl ObjectIdManager {
45    pub fn new(hummock_meta_client: Arc<dyn HummockMetaClient>, remote_fetch_number: u32) -> Self {
46        Self {
47            wait_queue: Default::default(),
48            available_object_ids: Mutex::new(ObjectIdRange::new(u64::MIN, u64::MIN)),
49            remote_fetch_number,
50            hummock_meta_client,
51        }
52    }
53
54    pub async fn get_new_object_id<I: From<HummockRawObjectId>>(&self) -> HummockResult<I> {
55        self.map_next_object_id(|available_object_ids| available_object_ids.get_next_object_id())
56            .await
57            .map(Into::into)
58    }
59
60    /// Executes `f` with next SST id.
61    /// May fetch new SST ids via RPC.
62    async fn map_next_object_id<F>(&self, f: F) -> HummockResult<HummockRawObjectId>
63    where
64        F: Fn(&mut ObjectIdRange) -> Option<HummockRawObjectId>,
65    {
66        loop {
67            // 1. Try to get
68            if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
69                return Ok(new_id);
70            }
71            // 2. Otherwise either fetch new ids, or wait for previous fetch if any.
72            let waiter = {
73                let mut guard = self.wait_queue.lock();
74                if let Some(new_id) = f(self.available_object_ids.lock().deref_mut()) {
75                    return Ok(new_id);
76                }
77                let wait_queue = guard.deref_mut();
78                if let Some(wait_queue) = wait_queue {
79                    let (tx, rx) = oneshot::channel();
80                    wait_queue.push(tx);
81                    Some(rx)
82                } else {
83                    *wait_queue = Some(vec![]);
84                    None
85                }
86            };
87            if let Some(waiter) = waiter {
88                // Wait for previous fetch
89                sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_FOLLOWER");
90                let _ = waiter.await;
91                continue;
92            }
93            // Fetch new ids.
94            sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_LEADER");
95            sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FETCH");
96            async move {
97                let new_sst_ids = match self
98                    .hummock_meta_client
99                    .get_new_object_ids(self.remote_fetch_number)
100                    .await
101                    .map_err(HummockError::meta_error)
102                {
103                    Ok(new_sst_ids) => new_sst_ids,
104                    Err(err) => {
105                        self.notify_waiters(false);
106                        return Err(err);
107                    }
108                };
109                sync_point!("MAP_NEXT_SST_OBJECT_ID.AFTER_FETCH");
110                sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FILL_CACHE");
111                // Update local cache.
112                let result = {
113                    let mut guard = self.available_object_ids.lock();
114                    let available_object_ids = guard.deref_mut();
115                    if new_sst_ids.start_id < available_object_ids.end_id {
116                        Err(HummockError::meta_error(format!(
117                            "SST id moves backwards. new {} < old {}",
118                            new_sst_ids.start_id, available_object_ids.end_id
119                        )))
120                    } else {
121                        *available_object_ids = new_sst_ids;
122                        Ok(())
123                    }
124                };
125                self.notify_waiters(result.is_ok());
126                result
127            }
128            .await?;
129        }
130    }
131
132    fn notify_waiters(&self, success: bool) {
133        let mut guard = self.wait_queue.lock();
134        let wait_queue = guard.deref_mut().take().unwrap();
135        for notify in wait_queue {
136            let _ = notify.send(success);
137        }
138    }
139}
140
141#[async_trait::async_trait]
142impl GetObjectId for ObjectIdManager {
143    /// Returns a new SST id.
144    /// The id is guaranteed to be monotonic increasing.
145    async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
146        self.get_new_object_id().await
147    }
148}
149
150struct SharedComapctorObjectIdManagerCore {
151    output_object_ids: VecDeque<HummockSstableObjectId>,
152    client: Option<GrpcCompactorProxyClient>,
153    sstable_id_remote_fetch_number: u32,
154}
155impl SharedComapctorObjectIdManagerCore {
156    pub fn new(
157        output_object_ids: VecDeque<HummockSstableObjectId>,
158        client: GrpcCompactorProxyClient,
159        sstable_id_remote_fetch_number: u32,
160    ) -> Self {
161        Self {
162            output_object_ids,
163            client: Some(client),
164            sstable_id_remote_fetch_number,
165        }
166    }
167
168    pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
169        Self {
170            output_object_ids: output_object_ids.into_iter().map(|x| x.into()).collect(),
171            client: None,
172            sstable_id_remote_fetch_number: 0,
173        }
174    }
175}
176/// `SharedComapctorObjectIdManager` is used to get output sst id for serverless compaction.
177pub struct SharedComapctorObjectIdManager {
178    core: tokio::sync::Mutex<SharedComapctorObjectIdManagerCore>,
179}
180
181impl SharedComapctorObjectIdManager {
182    pub fn new(
183        output_object_ids: VecDeque<HummockSstableObjectId>,
184        client: GrpcCompactorProxyClient,
185        sstable_id_remote_fetch_number: u32,
186    ) -> Arc<Self> {
187        Arc::new(Self {
188            core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::new(
189                output_object_ids,
190                client,
191                sstable_id_remote_fetch_number,
192            )),
193        })
194    }
195
196    pub fn for_test(output_object_ids: VecDeque<u64>) -> Arc<Self> {
197        Arc::new(Self {
198            core: tokio::sync::Mutex::new(SharedComapctorObjectIdManagerCore::for_test(
199                output_object_ids,
200            )),
201        })
202    }
203}
204
205#[async_trait::async_trait]
206impl GetObjectId for SharedComapctorObjectIdManager {
207    async fn get_new_sst_object_id(&self) -> HummockResult<HummockSstableObjectId> {
208        let mut guard = self.core.lock().await;
209        let core = guard.deref_mut();
210
211        if let Some(first_element) = core.output_object_ids.pop_front() {
212            Ok(first_element)
213        } else {
214            tracing::warn!(
215                "The pre-allocated object ids are used up, and new object id are obtained through RPC."
216            );
217            let request = GetNewObjectIdsRequest {
218                number: core.sstable_id_remote_fetch_number,
219            };
220            match core
221                .client
222                .as_mut()
223                .expect("GrpcCompactorProxyClient is None")
224                .get_new_sst_ids(request)
225                .await
226            {
227                Ok(response) => {
228                    let resp = response.into_inner();
229                    let start_id = resp.start_id;
230                    core.output_object_ids.extend(
231                        ((start_id + 1)..resp.end_id).map(Into::<HummockSstableObjectId>::into),
232                    );
233                    Ok(start_id.into())
234                }
235                Err(e) => Err(HummockError::other(format!(
236                    "Fail to get new sst id: {}",
237                    e.as_report()
238                ))),
239            }
240        }
241    }
242}