risingwave_storage/hummock/sstable/
sstable_object_id_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange};
21use risingwave_pb::hummock::GetNewSstIdsRequest;
22use risingwave_rpc_client::{GrpcCompactorProxyClient, HummockMetaClient};
23use sync_point::sync_point;
24use thiserror_ext::AsReport;
25use tokio::sync::oneshot;
26
27use crate::hummock::{HummockError, HummockResult};
28pub type SstableObjectIdManagerRef = Arc<SstableObjectIdManager>;
29use dyn_clone::DynClone;
30#[async_trait::async_trait]
31pub trait GetObjectId: DynClone + Send + Sync {
32    async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId>;
33}
34dyn_clone::clone_trait_object!(GetObjectId);
35/// Caches SST object ids fetched from meta.
36pub struct SstableObjectIdManager {
37    // Lock order: `wait_queue` before `available_sst_object_ids`.
38    wait_queue: Mutex<Option<Vec<oneshot::Sender<bool>>>>,
39    available_sst_object_ids: Mutex<SstObjectIdRange>,
40    remote_fetch_number: u32,
41    hummock_meta_client: Arc<dyn HummockMetaClient>,
42}
43
44impl SstableObjectIdManager {
45    pub fn new(hummock_meta_client: Arc<dyn HummockMetaClient>, remote_fetch_number: u32) -> Self {
46        Self {
47            wait_queue: Default::default(),
48            available_sst_object_ids: Mutex::new(SstObjectIdRange::new(
49                HummockSstableObjectId::MIN,
50                HummockSstableObjectId::MIN,
51            )),
52            remote_fetch_number,
53            hummock_meta_client,
54        }
55    }
56
57    /// Executes `f` with next SST id.
58    /// May fetch new SST ids via RPC.
59    async fn map_next_sst_object_id<F>(
60        self: &Arc<Self>,
61        f: F,
62    ) -> HummockResult<HummockSstableObjectId>
63    where
64        F: Fn(&mut SstObjectIdRange) -> Option<HummockSstableObjectId>,
65    {
66        loop {
67            // 1. Try to get
68            if let Some(new_id) = f(self.available_sst_object_ids.lock().deref_mut()) {
69                return Ok(new_id);
70            }
71            // 2. Otherwise either fetch new ids, or wait for previous fetch if any.
72            let waiter = {
73                let mut guard = self.wait_queue.lock();
74                if let Some(new_id) = f(self.available_sst_object_ids.lock().deref_mut()) {
75                    return Ok(new_id);
76                }
77                let wait_queue = guard.deref_mut();
78                if let Some(wait_queue) = wait_queue {
79                    let (tx, rx) = oneshot::channel();
80                    wait_queue.push(tx);
81                    Some(rx)
82                } else {
83                    *wait_queue = Some(vec![]);
84                    None
85                }
86            };
87            if let Some(waiter) = waiter {
88                // Wait for previous fetch
89                sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_FOLLOWER");
90                let _ = waiter.await;
91                continue;
92            }
93            // Fetch new ids.
94            sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_LEADER");
95            sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FETCH");
96            let this = self.clone();
97            tokio::spawn(async move {
98                let new_sst_ids = match this
99                    .hummock_meta_client
100                    .get_new_sst_ids(this.remote_fetch_number)
101                    .await
102                    .map_err(HummockError::meta_error)
103                {
104                    Ok(new_sst_ids) => new_sst_ids,
105                    Err(err) => {
106                        this.notify_waiters(false);
107                        return Err(err);
108                    }
109                };
110                sync_point!("MAP_NEXT_SST_OBJECT_ID.AFTER_FETCH");
111                sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FILL_CACHE");
112                // Update local cache.
113                let result = {
114                    let mut guard = this.available_sst_object_ids.lock();
115                    let available_sst_object_ids = guard.deref_mut();
116                    if new_sst_ids.start_id < available_sst_object_ids.end_id {
117                        Err(HummockError::meta_error(format!(
118                            "SST id moves backwards. new {} < old {}",
119                            new_sst_ids.start_id, available_sst_object_ids.end_id
120                        )))
121                    } else {
122                        *available_sst_object_ids = new_sst_ids;
123                        Ok(())
124                    }
125                };
126                this.notify_waiters(result.is_ok());
127                result
128            })
129            .await
130            .unwrap()?;
131        }
132    }
133
134    fn notify_waiters(&self, success: bool) {
135        let mut guard = self.wait_queue.lock();
136        let wait_queue = guard.deref_mut().take().unwrap();
137        for notify in wait_queue {
138            let _ = notify.send(success);
139        }
140    }
141}
142
143#[async_trait::async_trait]
144impl GetObjectId for Arc<SstableObjectIdManager> {
145    /// Returns a new SST id.
146    /// The id is guaranteed to be monotonic increasing.
147    async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId> {
148        self.map_next_sst_object_id(|available_sst_object_ids| {
149            available_sst_object_ids.get_next_sst_object_id()
150        })
151        .await
152    }
153}
154
155struct SharedComapctorObjectIdManagerCore {
156    output_object_ids: VecDeque<u64>,
157    client: Option<GrpcCompactorProxyClient>,
158    sstable_id_remote_fetch_number: u32,
159}
160impl SharedComapctorObjectIdManagerCore {
161    pub fn new(
162        output_object_ids: VecDeque<u64>,
163        client: GrpcCompactorProxyClient,
164        sstable_id_remote_fetch_number: u32,
165    ) -> Self {
166        Self {
167            output_object_ids,
168            client: Some(client),
169            sstable_id_remote_fetch_number,
170        }
171    }
172
173    pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
174        Self {
175            output_object_ids,
176            client: None,
177            sstable_id_remote_fetch_number: 0,
178        }
179    }
180}
181/// `SharedComapctorObjectIdManager` is used to get output sst id for serverless compaction.
182#[derive(Clone)]
183pub struct SharedComapctorObjectIdManager {
184    core: Arc<tokio::sync::Mutex<SharedComapctorObjectIdManagerCore>>,
185}
186
187impl SharedComapctorObjectIdManager {
188    pub fn new(
189        output_object_ids: VecDeque<u64>,
190        client: GrpcCompactorProxyClient,
191        sstable_id_remote_fetch_number: u32,
192    ) -> Self {
193        Self {
194            core: Arc::new(tokio::sync::Mutex::new(
195                SharedComapctorObjectIdManagerCore::new(
196                    output_object_ids,
197                    client,
198                    sstable_id_remote_fetch_number,
199                ),
200            )),
201        }
202    }
203
204    pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
205        Self {
206            core: Arc::new(tokio::sync::Mutex::new(
207                SharedComapctorObjectIdManagerCore::for_test(output_object_ids),
208            )),
209        }
210    }
211}
212
213#[async_trait::async_trait]
214impl GetObjectId for SharedComapctorObjectIdManager {
215    async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId> {
216        let mut guard = self.core.lock().await;
217        let core = guard.deref_mut();
218
219        if let Some(first_element) = core.output_object_ids.pop_front() {
220            Ok(first_element)
221        } else {
222            tracing::warn!(
223                "The pre-allocated object ids are used up, and new object id are obtained through RPC."
224            );
225            let request = GetNewSstIdsRequest {
226                number: core.sstable_id_remote_fetch_number,
227            };
228            match core
229                .client
230                .as_mut()
231                .expect("GrpcCompactorProxyClient is None")
232                .get_new_sst_ids(request)
233                .await
234            {
235                Ok(response) => {
236                    let resp = response.into_inner();
237                    let start_id = resp.start_id;
238                    core.output_object_ids.extend((start_id + 1)..resp.end_id);
239                    Ok(start_id)
240                }
241                Err(e) => Err(HummockError::other(format!(
242                    "Fail to get new sst id: {}",
243                    e.as_report()
244                ))),
245            }
246        }
247    }
248}