risingwave_meta/hummock/manager/
sequence.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::LazyLock;
18
19use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
20use risingwave_hummock_sdk::{CompactionGroupId, HummockRawObjectId, HummockSstableId};
21use risingwave_meta_model::hummock_sequence;
22use risingwave_meta_model::hummock_sequence::{
23    COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
24};
25use risingwave_meta_model::prelude::HummockSequence;
26use risingwave_pb::id::TypedId;
27use sea_orm::{ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
28use tokio::sync::Mutex;
29
30use crate::hummock::error::Result;
31use crate::manager::MetaSrvEnv;
32
33static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
34    maplit::hashmap! {
35        COMPACTION_TASK_ID.into() => 1,
36        COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End.as_i64_id() + 1,
37        SSTABLE_OBJECT_ID.into() => 1,
38        META_BACKUP_ID.into() => 1,
39    }
40});
41
42pub struct SequenceGenerator {
43    db: Mutex<DatabaseConnection>,
44}
45
46impl SequenceGenerator {
47    pub fn new(db: DatabaseConnection) -> Self {
48        Self { db: Mutex::new(db) }
49    }
50
51    /// Returns start, indicates range [start, start + num).
52    ///
53    /// Despite being a serial function, its infrequent invocation allows for acceptable performance.
54    ///
55    /// If num is 0, the next seq is returned just like num is 1, but caller must not use this seq.
56    pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
57        // TODO: add pre-allocation if necessary
58        let guard = self.db.lock().await;
59        let txn = guard.begin().await?;
60        let model: Option<hummock_sequence::Model> =
61            hummock_sequence::Entity::find_by_id(ident.to_owned())
62                .one(&txn)
63                .await?;
64        let start_seq = match model {
65            None => {
66                let init: u64 = SEQ_INIT
67                    .get(ident)
68                    .copied()
69                    .unwrap_or_else(|| panic!("seq {ident} not found"))
70                    as u64;
71                let active_model = hummock_sequence::ActiveModel {
72                    name: ActiveValue::set(ident.into()),
73                    seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
74                };
75                HummockSequence::insert(active_model).exec(&txn).await?;
76                init
77            }
78            Some(model) => {
79                let start_seq: u64 = model.seq as u64;
80                if num > 0 {
81                    let mut active_model: hummock_sequence::ActiveModel = model.into();
82                    active_model.seq = ActiveValue::set(
83                        start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
84                    );
85                    HummockSequence::update(active_model).exec(&txn).await?;
86                }
87                start_seq
88            }
89        };
90        if num > 0 {
91            txn.commit().await?;
92        }
93        Ok(start_seq)
94    }
95}
96
97pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
98    env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
99}
100
101pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
102    env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
103}
104
105pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<CompactionGroupId> {
106    Ok(env
107        .hummock_seq
108        .next_interval(COMPACTION_GROUP_ID, 1)
109        .await?
110        .into())
111}
112
113pub async fn next_sstable_id(
114    env: &MetaSrvEnv,
115    num: impl TryInto<u32> + Display + Copy,
116) -> Result<HummockSstableId> {
117    next_unique_id(env, num).await
118}
119
120pub async fn next_raw_object_id(
121    env: &MetaSrvEnv,
122    num: impl TryInto<u32> + Display + Copy,
123) -> Result<HummockRawObjectId> {
124    next_unique_id(env, num).await
125}
126
127async fn next_unique_id<const C: usize>(
128    env: &MetaSrvEnv,
129    num: impl TryInto<u32> + Display + Copy,
130) -> Result<TypedId<C, u64>> {
131    let num: u32 = num
132        .try_into()
133        .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
134    env.hummock_seq
135        .next_interval(SSTABLE_OBJECT_ID, num)
136        .await
137        .map(Into::into)
138}
139
140#[cfg(test)]
141mod tests {
142    use crate::controller::SqlMetaStore;
143    use crate::hummock::manager::sequence::{COMPACTION_TASK_ID, SequenceGenerator};
144
145    #[cfg(not(madsim))]
146    #[tokio::test]
147    async fn test_seq_gen() {
148        let store = SqlMetaStore::for_test().await;
149        let conn = store.conn.clone();
150        let s = SequenceGenerator::new(conn);
151        assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
152        assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
153        assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
154    }
155}