risingwave_meta/hummock/manager/
sequence.rs1use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::LazyLock;
18
19use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
20use risingwave_hummock_sdk::{CompactionGroupId, HummockRawObjectId, HummockSstableId};
21use risingwave_meta_model::hummock_sequence;
22use risingwave_meta_model::hummock_sequence::{
23 COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
24};
25use risingwave_meta_model::prelude::HummockSequence;
26use risingwave_pb::id::TypedId;
27use sea_orm::{ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
28use tokio::sync::Mutex;
29
30use crate::hummock::error::Result;
31use crate::manager::MetaSrvEnv;
32
33static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
34 maplit::hashmap! {
35 COMPACTION_TASK_ID.into() => 1,
36 COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End.as_i64_id() + 1,
37 SSTABLE_OBJECT_ID.into() => 1,
38 META_BACKUP_ID.into() => 1,
39 }
40});
41
42pub struct SequenceGenerator {
43 db: Mutex<DatabaseConnection>,
44}
45
46impl SequenceGenerator {
47 pub fn new(db: DatabaseConnection) -> Self {
48 Self { db: Mutex::new(db) }
49 }
50
51 pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
57 let guard = self.db.lock().await;
59 let txn = guard.begin().await?;
60 let model: Option<hummock_sequence::Model> =
61 hummock_sequence::Entity::find_by_id(ident.to_owned())
62 .one(&txn)
63 .await?;
64 let start_seq = match model {
65 None => {
66 let init: u64 = SEQ_INIT
67 .get(ident)
68 .copied()
69 .unwrap_or_else(|| panic!("seq {ident} not found"))
70 as u64;
71 let active_model = hummock_sequence::ActiveModel {
72 name: ActiveValue::set(ident.into()),
73 seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
74 };
75 HummockSequence::insert(active_model).exec(&txn).await?;
76 init
77 }
78 Some(model) => {
79 let start_seq: u64 = model.seq as u64;
80 if num > 0 {
81 let mut active_model: hummock_sequence::ActiveModel = model.into();
82 active_model.seq = ActiveValue::set(
83 start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
84 );
85 HummockSequence::update(active_model).exec(&txn).await?;
86 }
87 start_seq
88 }
89 };
90 if num > 0 {
91 txn.commit().await?;
92 }
93 Ok(start_seq)
94 }
95}
96
97pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
98 env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
99}
100
101pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
102 env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
103}
104
105pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<CompactionGroupId> {
106 Ok(env
107 .hummock_seq
108 .next_interval(COMPACTION_GROUP_ID, 1)
109 .await?
110 .into())
111}
112
113pub async fn next_sstable_id(
114 env: &MetaSrvEnv,
115 num: impl TryInto<u32> + Display + Copy,
116) -> Result<HummockSstableId> {
117 next_unique_id(env, num).await
118}
119
120pub async fn next_raw_object_id(
121 env: &MetaSrvEnv,
122 num: impl TryInto<u32> + Display + Copy,
123) -> Result<HummockRawObjectId> {
124 next_unique_id(env, num).await
125}
126
127async fn next_unique_id<const C: usize>(
128 env: &MetaSrvEnv,
129 num: impl TryInto<u32> + Display + Copy,
130) -> Result<TypedId<C, u64>> {
131 let num: u32 = num
132 .try_into()
133 .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
134 env.hummock_seq
135 .next_interval(SSTABLE_OBJECT_ID, num)
136 .await
137 .map(Into::into)
138}
139
140#[cfg(test)]
141mod tests {
142 use crate::controller::SqlMetaStore;
143 use crate::hummock::manager::sequence::{COMPACTION_TASK_ID, SequenceGenerator};
144
145 #[cfg(not(madsim))]
146 #[tokio::test]
147 async fn test_seq_gen() {
148 let store = SqlMetaStore::for_test().await;
149 let conn = store.conn.clone();
150 let s = SequenceGenerator::new(conn);
151 assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
152 assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
153 assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
154 }
155}