risingwave_meta/hummock/manager/
sequence.rs1use std::collections::HashMap;
16use std::fmt::Display;
17use std::sync::LazyLock;
18
19use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
20use risingwave_hummock_sdk::{
21 HummockRawObjectId, HummockSstableId, HummockSstableObjectId, TypedPrimitive,
22};
23use risingwave_meta_model::hummock_sequence;
24use risingwave_meta_model::hummock_sequence::{
25 COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
26};
27use risingwave_meta_model::prelude::HummockSequence;
28use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
29use tokio::sync::Mutex;
30
31use crate::hummock::error::Result;
32use crate::manager::MetaSrvEnv;
33
34static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
35 maplit::hashmap! {
36 COMPACTION_TASK_ID.into() => 1,
37 COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End as i64 + 1,
38 SSTABLE_OBJECT_ID.into() => 1,
39 META_BACKUP_ID.into() => 1,
40 }
41});
42
43pub struct SequenceGenerator {
44 db: Mutex<DatabaseConnection>,
45}
46
47impl SequenceGenerator {
48 pub fn new(db: DatabaseConnection) -> Self {
49 Self { db: Mutex::new(db) }
50 }
51
52 pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
58 let guard = self.db.lock().await;
60 let txn = guard.begin().await?;
61 let model: Option<hummock_sequence::Model> =
62 hummock_sequence::Entity::find_by_id(ident.to_owned())
63 .one(&txn)
64 .await?;
65 let start_seq = match model {
66 None => {
67 let init: u64 = SEQ_INIT
68 .get(ident)
69 .copied()
70 .unwrap_or_else(|| panic!("seq {ident} not found"))
71 as u64;
72 let active_model = hummock_sequence::ActiveModel {
73 name: ActiveValue::set(ident.into()),
74 seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
75 };
76 HummockSequence::insert(active_model).exec(&txn).await?;
77 init
78 }
79 Some(model) => {
80 let start_seq: u64 = model.seq as u64;
81 if num > 0 {
82 let mut active_model: hummock_sequence::ActiveModel = model.into();
83 active_model.seq = ActiveValue::set(
84 start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
85 );
86 active_model.update(&txn).await?;
87 }
88 start_seq
89 }
90 };
91 if num > 0 {
92 txn.commit().await?;
93 }
94 Ok(start_seq)
95 }
96}
97
98pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
99 env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
100}
101
102pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
103 env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
104}
105
106pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<u64> {
107 env.hummock_seq.next_interval(COMPACTION_GROUP_ID, 1).await
108}
109
110pub async fn next_sstable_object_id(
111 env: &MetaSrvEnv,
112 num: impl TryInto<u32> + Display + Copy,
113) -> Result<HummockSstableObjectId> {
114 next_unique_id(env, num).await
115}
116
117pub async fn next_sstable_id(
118 env: &MetaSrvEnv,
119 num: impl TryInto<u32> + Display + Copy,
120) -> Result<HummockSstableId> {
121 next_unique_id(env, num).await
122}
123
124pub async fn next_raw_object_id(
125 env: &MetaSrvEnv,
126 num: impl TryInto<u32> + Display + Copy,
127) -> Result<HummockRawObjectId> {
128 next_unique_id(env, num).await
129}
130
131async fn next_unique_id<const C: usize>(
132 env: &MetaSrvEnv,
133 num: impl TryInto<u32> + Display + Copy,
134) -> Result<TypedPrimitive<C, u64>> {
135 let num: u32 = num
136 .try_into()
137 .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
138 env.hummock_seq
139 .next_interval(SSTABLE_OBJECT_ID, num)
140 .await
141 .map(Into::into)
142}
143
144#[cfg(test)]
145mod tests {
146 use crate::controller::SqlMetaStore;
147 use crate::hummock::manager::sequence::{COMPACTION_TASK_ID, SequenceGenerator};
148
149 #[cfg(not(madsim))]
150 #[tokio::test]
151 async fn test_seq_gen() {
152 let store = SqlMetaStore::for_test().await;
153 let conn = store.conn.clone();
154 let s = SequenceGenerator::new(conn);
155 assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
156 assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
157 assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
158 }
159}