risingwave_meta/controller/
id.rs1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use risingwave_meta_model::fragment;
19use risingwave_meta_model::prelude::Fragment;
20use sea_orm::sea_query::{Expr, Func};
21use sea_orm::{DatabaseConnection, EntityTrait, QuerySelect};
22
23use crate::MetaResult;
24
25pub type IdCategoryType = u8;
26
27#[expect(non_snake_case, non_upper_case_globals)]
29pub mod IdCategory {
30 use super::IdCategoryType;
31
32 #[cfg(test)]
33 pub const Test: IdCategoryType = 0;
34 pub const Table: IdCategoryType = 1;
35 pub const Fragment: IdCategoryType = 2;
36}
37pub struct IdGenerator<const TYPE: IdCategoryType>(AtomicU64);
38
39impl<const TYPE: IdCategoryType> IdGenerator<TYPE> {
40 pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
41 let id: i32 = match TYPE {
42 IdCategory::Table => {
43 0
46 }
47 IdCategory::Fragment => Fragment::find()
48 .select_only()
49 .expr(Func::if_null(
50 Expr::col(fragment::Column::FragmentId).max().add(1),
51 1,
52 ))
53 .into_tuple()
54 .one(conn)
55 .await?
56 .unwrap_or_default(),
57 _ => unreachable!("IdGenerator only supports Table and Fragment"),
58 };
59
60 Ok(Self(AtomicU64::new(id as u64)))
61 }
62
63 pub fn generate_interval(&self, interval: u64) -> u64 {
64 self.0.fetch_add(interval, Ordering::Relaxed)
65 }
66}
67
68pub type IdGeneratorManagerRef = Arc<IdGeneratorManager>;
69
70pub struct IdGeneratorManager {
73 pub tables: Arc<IdGenerator<{ IdCategory::Table }>>,
74 pub fragments: Arc<IdGenerator<{ IdCategory::Fragment }>>,
75}
76
77impl IdGeneratorManager {
78 pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
79 Ok(Self {
80 tables: Arc::new(IdGenerator::new(conn).await?),
81 fragments: Arc::new(IdGenerator::new(conn).await?),
82 })
83 }
84
85 pub fn generate<const C: IdCategoryType>(&self) -> u64 {
86 match C {
87 IdCategory::Table => self.tables.generate_interval(1),
88 IdCategory::Fragment => self.fragments.generate_interval(1),
89 _ => unreachable!("IdGenerator only supports Table and Fragment"),
90 }
91 }
92
93 pub fn generate_interval<const C: IdCategoryType>(&self, interval: u64) -> u64 {
94 match C {
95 IdCategory::Table => self.tables.generate_interval(interval),
96 IdCategory::Fragment => self.fragments.generate_interval(interval),
97 _ => unreachable!("IdGenerator only supports Table and Fragment"),
98 }
99 }
100}