risingwave_meta/hummock/manager/
sequence.rs1use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::LazyLock;
18
19use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
20use risingwave_meta_model::hummock_sequence;
21use risingwave_meta_model::hummock_sequence::{
22 COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
23};
24use risingwave_meta_model::prelude::HummockSequence;
25use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
26use tokio::sync::Mutex;
27
28use crate::hummock::error::Result;
29use crate::manager::MetaSrvEnv;
30
31static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
32 maplit::hashmap! {
33 COMPACTION_TASK_ID.into() => 1,
34 COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End as i64 + 1,
35 SSTABLE_OBJECT_ID.into() => 1,
36 META_BACKUP_ID.into() => 1,
37 }
38});
39
40pub struct SequenceGenerator {
41 db: Mutex<DatabaseConnection>,
42}
43
44impl SequenceGenerator {
45 pub fn new(db: DatabaseConnection) -> Self {
46 Self { db: Mutex::new(db) }
47 }
48
49 pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
55 let guard = self.db.lock().await;
57 let txn = guard.begin().await?;
58 let model: Option<hummock_sequence::Model> =
59 hummock_sequence::Entity::find_by_id(ident.to_owned())
60 .one(&txn)
61 .await?;
62 let start_seq = match model {
63 None => {
64 let init: u64 = SEQ_INIT
65 .get(ident)
66 .copied()
67 .unwrap_or_else(|| panic!("seq {ident} not found"))
68 as u64;
69 let active_model = hummock_sequence::ActiveModel {
70 name: ActiveValue::set(ident.into()),
71 seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
72 };
73 HummockSequence::insert(active_model).exec(&txn).await?;
74 init
75 }
76 Some(model) => {
77 let start_seq: u64 = model.seq as u64;
78 if num > 0 {
79 let mut active_model: hummock_sequence::ActiveModel = model.into();
80 active_model.seq = ActiveValue::set(
81 start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
82 );
83 active_model.update(&txn).await?;
84 }
85 start_seq
86 }
87 };
88 if num > 0 {
89 txn.commit().await?;
90 }
91 Ok(start_seq)
92 }
93}
94
95pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
96 env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
97}
98
99pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
100 env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
101}
102
103pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<u64> {
104 env.hummock_seq.next_interval(COMPACTION_GROUP_ID, 1).await
105}
106
107pub async fn next_sstable_object_id(
108 env: &MetaSrvEnv,
109 num: impl TryInto<u32> + Display + Copy,
110) -> Result<u64> {
111 let num: u32 = num
112 .try_into()
113 .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
114 env.hummock_seq.next_interval(SSTABLE_OBJECT_ID, num).await
115}
116
117#[cfg(test)]
118mod tests {
119 use crate::controller::SqlMetaStore;
120 use crate::hummock::manager::sequence::{COMPACTION_TASK_ID, SequenceGenerator};
121
122 #[cfg(not(madsim))]
123 #[tokio::test]
124 async fn test_seq_gen() {
125 let store = SqlMetaStore::for_test().await;
126 let conn = store.conn.clone();
127 let s = SequenceGenerator::new(conn);
128 assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
129 assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
130 assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
131 }
132}