risingwave_meta/hummock/manager/
sequence.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::LazyLock;
18
19use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
20use risingwave_meta_model::hummock_sequence;
21use risingwave_meta_model::hummock_sequence::{
22    COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
23};
24use risingwave_meta_model::prelude::HummockSequence;
25use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
26use tokio::sync::Mutex;
27
28use crate::hummock::error::Result;
29use crate::manager::MetaSrvEnv;
30
31static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
32    maplit::hashmap! {
33        COMPACTION_TASK_ID.into() => 1,
34        COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End as i64 + 1,
35        SSTABLE_OBJECT_ID.into() => 1,
36        META_BACKUP_ID.into() => 1,
37    }
38});
39
40pub struct SequenceGenerator {
41    db: Mutex<DatabaseConnection>,
42}
43
44impl SequenceGenerator {
45    pub fn new(db: DatabaseConnection) -> Self {
46        Self { db: Mutex::new(db) }
47    }
48
49    /// Returns start, indicates range [start, start + num).
50    ///
51    /// Despite being a serial function, its infrequent invocation allows for acceptable performance.
52    ///
53    /// If num is 0, the next seq is returned just like num is 1, but caller must not use this seq.
54    pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
55        // TODO: add pre-allocation if necessary
56        let guard = self.db.lock().await;
57        let txn = guard.begin().await?;
58        let model: Option<hummock_sequence::Model> =
59            hummock_sequence::Entity::find_by_id(ident.to_owned())
60                .one(&txn)
61                .await?;
62        let start_seq = match model {
63            None => {
64                let init: u64 = SEQ_INIT
65                    .get(ident)
66                    .copied()
67                    .unwrap_or_else(|| panic!("seq {ident} not found"))
68                    as u64;
69                let active_model = hummock_sequence::ActiveModel {
70                    name: ActiveValue::set(ident.into()),
71                    seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
72                };
73                HummockSequence::insert(active_model).exec(&txn).await?;
74                init
75            }
76            Some(model) => {
77                let start_seq: u64 = model.seq as u64;
78                if num > 0 {
79                    let mut active_model: hummock_sequence::ActiveModel = model.into();
80                    active_model.seq = ActiveValue::set(
81                        start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
82                    );
83                    active_model.update(&txn).await?;
84                }
85                start_seq
86            }
87        };
88        if num > 0 {
89            txn.commit().await?;
90        }
91        Ok(start_seq)
92    }
93}
94
95pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
96    env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
97}
98
99pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
100    env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
101}
102
103pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<u64> {
104    env.hummock_seq.next_interval(COMPACTION_GROUP_ID, 1).await
105}
106
107pub async fn next_sstable_object_id(
108    env: &MetaSrvEnv,
109    num: impl TryInto<u32> + Display + Copy,
110) -> Result<u64> {
111    let num: u32 = num
112        .try_into()
113        .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
114    env.hummock_seq.next_interval(SSTABLE_OBJECT_ID, num).await
115}
116
117#[cfg(test)]
118mod tests {
119    use crate::controller::SqlMetaStore;
120    use crate::hummock::manager::sequence::{COMPACTION_TASK_ID, SequenceGenerator};
121
122    #[cfg(not(madsim))]
123    #[tokio::test]
124    async fn test_seq_gen() {
125        let store = SqlMetaStore::for_test().await;
126        let conn = store.conn.clone();
127        let s = SequenceGenerator::new(conn);
128        assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
129        assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
130        assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
131    }
132}