risingwave_meta/hummock/manager/
sequence.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::LazyLock;
18
19use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
20use risingwave_hummock_sdk::{
21    HummockRawObjectId, HummockSstableId, HummockSstableObjectId, TypedPrimitive,
22};
23use risingwave_meta_model::hummock_sequence;
24use risingwave_meta_model::hummock_sequence::{
25    COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
26};
27use risingwave_meta_model::prelude::HummockSequence;
28use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
29use tokio::sync::Mutex;
30
31use crate::hummock::error::Result;
32use crate::manager::MetaSrvEnv;
33
34static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
35    maplit::hashmap! {
36        COMPACTION_TASK_ID.into() => 1,
37        COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End as i64 + 1,
38        SSTABLE_OBJECT_ID.into() => 1,
39        META_BACKUP_ID.into() => 1,
40    }
41});
42
43pub struct SequenceGenerator {
44    db: Mutex<DatabaseConnection>,
45}
46
47impl SequenceGenerator {
48    pub fn new(db: DatabaseConnection) -> Self {
49        Self { db: Mutex::new(db) }
50    }
51
52    /// Returns start, indicates range [start, start + num).
53    ///
54    /// Despite being a serial function, its infrequent invocation allows for acceptable performance.
55    ///
56    /// If num is 0, the next seq is returned just like num is 1, but caller must not use this seq.
57    pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
58        // TODO: add pre-allocation if necessary
59        let guard = self.db.lock().await;
60        let txn = guard.begin().await?;
61        let model: Option<hummock_sequence::Model> =
62            hummock_sequence::Entity::find_by_id(ident.to_owned())
63                .one(&txn)
64                .await?;
65        let start_seq = match model {
66            None => {
67                let init: u64 = SEQ_INIT
68                    .get(ident)
69                    .copied()
70                    .unwrap_or_else(|| panic!("seq {ident} not found"))
71                    as u64;
72                let active_model = hummock_sequence::ActiveModel {
73                    name: ActiveValue::set(ident.into()),
74                    seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
75                };
76                HummockSequence::insert(active_model).exec(&txn).await?;
77                init
78            }
79            Some(model) => {
80                let start_seq: u64 = model.seq as u64;
81                if num > 0 {
82                    let mut active_model: hummock_sequence::ActiveModel = model.into();
83                    active_model.seq = ActiveValue::set(
84                        start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
85                    );
86                    active_model.update(&txn).await?;
87                }
88                start_seq
89            }
90        };
91        if num > 0 {
92            txn.commit().await?;
93        }
94        Ok(start_seq)
95    }
96}
97
98pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
99    env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
100}
101
102pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
103    env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
104}
105
106pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<u64> {
107    env.hummock_seq.next_interval(COMPACTION_GROUP_ID, 1).await
108}
109
110pub async fn next_sstable_object_id(
111    env: &MetaSrvEnv,
112    num: impl TryInto<u32> + Display + Copy,
113) -> Result<HummockSstableObjectId> {
114    next_unique_id(env, num).await
115}
116
117pub async fn next_sstable_id(
118    env: &MetaSrvEnv,
119    num: impl TryInto<u32> + Display + Copy,
120) -> Result<HummockSstableId> {
121    next_unique_id(env, num).await
122}
123
124pub async fn next_raw_object_id(
125    env: &MetaSrvEnv,
126    num: impl TryInto<u32> + Display + Copy,
127) -> Result<HummockRawObjectId> {
128    next_unique_id(env, num).await
129}
130
131async fn next_unique_id<const C: usize>(
132    env: &MetaSrvEnv,
133    num: impl TryInto<u32> + Display + Copy,
134) -> Result<TypedPrimitive<C, u64>> {
135    let num: u32 = num
136        .try_into()
137        .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
138    env.hummock_seq
139        .next_interval(SSTABLE_OBJECT_ID, num)
140        .await
141        .map(Into::into)
142}
143
144#[cfg(test)]
145mod tests {
146    use crate::controller::SqlMetaStore;
147    use crate::hummock::manager::sequence::{COMPACTION_TASK_ID, SequenceGenerator};
148
149    #[cfg(not(madsim))]
150    #[tokio::test]
151    async fn test_seq_gen() {
152        let store = SqlMetaStore::for_test().await;
153        let conn = store.conn.clone();
154        let s = SequenceGenerator::new(conn);
155        assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
156        assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
157        assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
158    }
159}