risingwave_meta/hummock/manager/
sequence.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::future::Future;
18use std::sync::LazyLock;
19
20use parking_lot::Mutex as ParkingMutex;
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::{CompactionGroupId, HummockRawObjectId, HummockSstableId};
23use risingwave_meta_model::hummock_sequence;
24use risingwave_meta_model::hummock_sequence::{
25    COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
26};
27use risingwave_meta_model::prelude::HummockSequence;
28use risingwave_pb::id::TypedId;
29use sea_orm::{ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
30use tokio::sync::Mutex;
31
32use crate::hummock::error::Result;
33use crate::manager::MetaSrvEnv;
34
35static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
36    maplit::hashmap! {
37        COMPACTION_TASK_ID.into() => 1,
38        COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End.as_i64_id() + 1,
39        SSTABLE_OBJECT_ID.into() => 1,
40        META_BACKUP_ID.into() => 1,
41    }
42});
43
44/// A half-open range `[next, end)` of prefetched sequence values.
45///
46/// Invariant: `next <= end` always holds after construction / `reset`.
47/// `pop` advances `next` towards `end`; once `next == end` the range is empty.
48#[derive(Default, Debug, PartialEq, Eq)]
49struct PrefetchedRange {
50    next: u64,
51    end: u64,
52}
53
54impl PrefetchedRange {
55    fn is_empty(&self) -> bool {
56        self.next >= self.end
57    }
58
59    fn pop(&mut self) -> Option<u64> {
60        if self.is_empty() {
61            return None;
62        }
63
64        let value = self.next;
65        self.next += 1;
66        Some(value)
67    }
68
69    fn reset(&mut self, start: u64, count: u32) {
70        self.next = start;
71        self.end = start + u64::from(count);
72    }
73
74    #[cfg(test)]
75    fn set_bounds(&mut self, next: u64, end: u64) {
76        self.next = next;
77        self.end = end;
78    }
79
80    #[cfg(test)]
81    fn bounds(&self) -> (u64, u64) {
82        (self.next, self.end)
83    }
84}
85
86/// Stores a prefetched half-open interval `[next, end)` and serializes refills so concurrent
87/// callers share the same refill instead of wasting a newly allocated range.
88#[derive(Default, Debug)]
89pub(crate) struct PrefetchedSequence {
90    range: ParkingMutex<PrefetchedRange>,
91    refill_lock: Mutex<()>,
92}
93
94impl PrefetchedSequence {
95    pub(crate) fn new() -> Self {
96        Self::default()
97    }
98
99    /// Returns the next unique value from the prefetched range, refilling from the backend
100    /// when the range is exhausted.
101    ///
102    /// - `refill_count`: how many values to request from the backend in one batch when refilling.
103    ///   Clamped to at least 1 so that a caller with zero remaining budget still obtains a value
104    ///   (the caller may have already committed to needing one id before checking the budget).
105    /// - `refill`: an async closure `|count| -> Result<start>` that allocates `count` sequential
106    ///   values from the persistent sequence and returns the start of the allocated range.
107    ///
108    /// Concurrent callers are serialized by `refill_lock` so that only one refill is in-flight
109    /// at a time; others wait and then consume from the freshly filled range.
110    pub(crate) async fn next<F, Fut>(&self, refill_count: u32, refill: F) -> Result<u64>
111    where
112        F: FnOnce(u32) -> Fut,
113        Fut: Future<Output = Result<u64>>,
114    {
115        // Clamp to 1: even when the caller's budget is exhausted, it still needs exactly one id.
116        let refill_count = refill_count.max(1);
117        if let Some(value) = self.try_pop() {
118            return Ok(value);
119        }
120
121        let _guard = self.refill_lock.lock().await;
122        if let Some(value) = self.try_pop() {
123            return Ok(value);
124        }
125
126        let start = refill(refill_count).await?;
127        let mut range = self.range.lock();
128        debug_assert!(range.is_empty());
129        range.reset(start, refill_count);
130        Ok(range
131            .pop()
132            .expect("a freshly refilled sequence must return one value"))
133    }
134
135    fn try_pop(&self) -> Option<u64> {
136        self.range.lock().pop()
137    }
138
139    #[cfg(test)]
140    pub(crate) fn set_test_bounds(&self, next: u64, end: u64) {
141        self.range.lock().set_bounds(next, end);
142    }
143
144    #[cfg(test)]
145    pub(crate) fn test_bounds(&self) -> (u64, u64) {
146        self.range.lock().bounds()
147    }
148}
149
150pub struct SequenceGenerator {
151    db: Mutex<DatabaseConnection>,
152}
153
154impl SequenceGenerator {
155    pub fn new(db: DatabaseConnection) -> Self {
156        Self { db: Mutex::new(db) }
157    }
158
159    /// Returns start, indicates range [start, start + num).
160    ///
161    /// Despite being a serial function, its infrequent invocation allows for acceptable performance.
162    ///
163    /// If num is 0, the next seq is returned just like num is 1, but caller must not use this seq.
164    pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
165        // TODO: add pre-allocation if necessary
166        let guard = self.db.lock().await;
167        let txn = guard.begin().await?;
168        let model: Option<hummock_sequence::Model> =
169            hummock_sequence::Entity::find_by_id(ident.to_owned())
170                .one(&txn)
171                .await?;
172        let start_seq = match model {
173            None => {
174                let init: u64 = SEQ_INIT
175                    .get(ident)
176                    .copied()
177                    .unwrap_or_else(|| panic!("seq {ident} not found"))
178                    as u64;
179                let active_model = hummock_sequence::ActiveModel {
180                    name: ActiveValue::set(ident.into()),
181                    seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
182                };
183                HummockSequence::insert(active_model).exec(&txn).await?;
184                init
185            }
186            Some(model) => {
187                let start_seq: u64 = model.seq as u64;
188                if num > 0 {
189                    let mut active_model: hummock_sequence::ActiveModel = model.into();
190                    active_model.seq = ActiveValue::set(
191                        start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
192                    );
193                    HummockSequence::update(active_model).exec(&txn).await?;
194                }
195                start_seq
196            }
197        };
198        if num > 0 {
199            txn.commit().await?;
200        }
201        Ok(start_seq)
202    }
203}
204
205pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
206    env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
207}
208
209pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
210    env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
211}
212
213pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<CompactionGroupId> {
214    Ok(env
215        .hummock_seq
216        .next_interval(COMPACTION_GROUP_ID, 1)
217        .await?
218        .into())
219}
220
221pub async fn next_sstable_id(
222    env: &MetaSrvEnv,
223    num: impl TryInto<u32> + Display + Copy,
224) -> Result<HummockSstableId> {
225    next_unique_id(env, num).await
226}
227
228pub async fn next_raw_object_id(
229    env: &MetaSrvEnv,
230    num: impl TryInto<u32> + Display + Copy,
231) -> Result<HummockRawObjectId> {
232    next_unique_id(env, num).await
233}
234
235async fn next_unique_id<const C: usize>(
236    env: &MetaSrvEnv,
237    num: impl TryInto<u32> + Display + Copy,
238) -> Result<TypedId<C, u64>> {
239    let num: u32 = num
240        .try_into()
241        .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
242    env.hummock_seq
243        .next_interval(SSTABLE_OBJECT_ID, num)
244        .await
245        .map(Into::into)
246}
247
248#[cfg(test)]
249mod tests {
250    use std::collections::HashSet;
251    use std::sync::Arc;
252    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
253
254    use itertools::Itertools;
255
256    use crate::controller::SqlMetaStore;
257    use crate::hummock::manager::sequence::{
258        COMPACTION_TASK_ID, PrefetchedSequence, SequenceGenerator,
259    };
260
261    #[cfg(not(madsim))]
262    #[tokio::test]
263    async fn test_seq_gen() {
264        let store = SqlMetaStore::for_test().await;
265        let conn = store.conn.clone();
266        let s = SequenceGenerator::new(conn);
267        assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
268        assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
269        assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
270    }
271
272    #[test]
273    fn test_prefetched_sequence_reuses_cached_range() {
274        let seq = PrefetchedSequence::new();
275        seq.set_test_bounds(100, 103);
276
277        assert_eq!(
278            futures::executor::block_on(seq.next(16, |_| async {
279                panic!("refill should not be called when cached values exist")
280            }))
281            .unwrap(),
282            100
283        );
284        assert_eq!(
285            futures::executor::block_on(seq.next(16, |_| async {
286                panic!("refill should not be called when cached values exist")
287            }))
288            .unwrap(),
289            101
290        );
291        assert_eq!(
292            futures::executor::block_on(seq.next(16, |_| async {
293                panic!("refill should not be called when cached values exist")
294            }))
295            .unwrap(),
296            102
297        );
298        assert_eq!(seq.test_bounds(), (103, 103));
299    }
300
301    #[tokio::test]
302    async fn test_prefetched_sequence_refills_in_batch_when_empty() {
303        let seq = PrefetchedSequence::new();
304        let next_start = Arc::new(AtomicU64::new(100));
305
306        let first = seq
307            .next(4, {
308                let next_start = next_start.clone();
309                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
310            })
311            .await
312            .unwrap();
313        assert_eq!(first, 100);
314        assert_eq!(seq.test_bounds(), (101, 104));
315
316        assert_eq!(
317            seq.next(4, {
318                let next_start = next_start.clone();
319                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
320            })
321            .await
322            .unwrap(),
323            101
324        );
325        assert_eq!(
326            seq.next(4, {
327                let next_start = next_start.clone();
328                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
329            })
330            .await
331            .unwrap(),
332            102
333        );
334        assert_eq!(
335            seq.next(4, {
336                let next_start = next_start.clone();
337                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
338            })
339            .await
340            .unwrap(),
341            103
342        );
343        assert_eq!(
344            seq.next(4, {
345                let next_start = next_start.clone();
346                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
347            })
348            .await
349            .unwrap(),
350            104
351        );
352        assert_eq!(seq.test_bounds(), (105, 108));
353    }
354
355    #[tokio::test]
356    async fn test_prefetched_sequence_respects_refill_capacity() {
357        let seq = PrefetchedSequence::new();
358        let next_start = Arc::new(AtomicU64::new(100));
359
360        let first = seq
361            .next(2, {
362                let next_start = next_start.clone();
363                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
364            })
365            .await
366            .unwrap();
367        assert_eq!(first, 100);
368        assert_eq!(
369            seq.next(2, {
370                let next_start = next_start.clone();
371                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
372            })
373            .await
374            .unwrap(),
375            101
376        );
377        assert_eq!(seq.test_bounds(), (102, 102));
378
379        assert_eq!(
380            seq.next(1, {
381                let next_start = next_start.clone();
382                move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
383            })
384            .await
385            .unwrap(),
386            102
387        );
388        assert_eq!(seq.test_bounds(), (103, 103));
389    }
390
391    #[test]
392    fn test_prefetched_sequence_pop_does_not_overflow_bounds() {
393        let seq = PrefetchedSequence::new();
394        seq.set_test_bounds(10, 13);
395
396        assert_eq!(
397            futures::executor::block_on(seq.next(4, |_| async {
398                panic!("refill should not be called when cached values exist")
399            }))
400            .unwrap(),
401            10
402        );
403        assert_eq!(
404            futures::executor::block_on(seq.next(4, |_| async {
405                panic!("refill should not be called when cached values exist")
406            }))
407            .unwrap(),
408            11
409        );
410        assert_eq!(
411            futures::executor::block_on(seq.next(4, |_| async {
412                panic!("refill should not be called when cached values exist")
413            }))
414            .unwrap(),
415            12
416        );
417        assert_eq!(seq.test_bounds(), (13, 13));
418    }
419
420    #[tokio::test]
421    async fn test_prefetched_sequence_shares_refill_without_gaps() {
422        let seq = Arc::new(PrefetchedSequence::new());
423        let next_start = Arc::new(AtomicU64::new(100));
424        let refill_calls = Arc::new(AtomicUsize::new(0));
425
426        let mut handles = Vec::with_capacity(64);
427        for _ in 0..64 {
428            let seq = seq.clone();
429            let next_start = next_start.clone();
430            let refill_calls = refill_calls.clone();
431            handles.push(tokio::spawn(async move {
432                seq.next(4, move |count| {
433                    refill_calls.fetch_add(1, Ordering::SeqCst);
434                    async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
435                })
436                .await
437                .unwrap()
438            }));
439        }
440
441        let mut ids = Vec::with_capacity(64);
442        for handle in handles {
443            ids.push(handle.await.unwrap());
444        }
445
446        assert_eq!(ids.len(), 64);
447        assert_eq!(ids.iter().copied().collect::<HashSet<_>>().len(), 64);
448
449        let mut sorted_ids = ids.iter().copied().collect_vec();
450        sorted_ids.sort_unstable();
451        assert_eq!(sorted_ids.first().copied(), Some(100));
452        assert!(
453            sorted_ids
454                .windows(2)
455                .all(|window| window[1] == window[0] + 1),
456            "ids should form a contiguous range: {:?}",
457            sorted_ids
458        );
459        assert_eq!(refill_calls.load(Ordering::SeqCst), 16);
460        assert_eq!(seq.test_bounds(), (164, 164));
461    }
462
463    #[tokio::test]
464    async fn test_prefetched_sequence_propagates_refill_error() {
465        use crate::hummock::error::Error;
466
467        let seq = PrefetchedSequence::new();
468
469        // First call: refill fails → caller sees the error.
470        let err = seq
471            .next(4, |_count| async {
472                Err(Error::Internal(anyhow::anyhow!("db failure")))
473            })
474            .await;
475        assert!(err.is_err(), "refill error should propagate to caller");
476
477        // The range must still be empty so a subsequent call retries the refill
478        // instead of returning stale data.
479        assert_eq!(seq.test_bounds(), (0, 0));
480
481        // Second call: refill succeeds → caller gets a valid value, proving retry works.
482        let val = seq
483            .next(4, |_count| async move { Ok(200u64) })
484            .await
485            .unwrap();
486        assert_eq!(val, 200);
487        assert_eq!(seq.test_bounds(), (201, 204));
488    }
489}