risingwave_meta/controller/
id.rs1use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use risingwave_meta_model::prelude::{Actor, Fragment};
19use risingwave_meta_model::{actor, fragment};
20use sea_orm::sea_query::{Expr, Func};
21use sea_orm::{DatabaseConnection, EntityTrait, QuerySelect};
22
23use crate::MetaResult;
24
25pub type IdCategoryType = u8;
26
27#[expect(non_snake_case, non_upper_case_globals)]
29pub mod IdCategory {
30 use super::IdCategoryType;
31
32 #[cfg(test)]
33 pub const Test: IdCategoryType = 0;
34 pub const Table: IdCategoryType = 1;
35 pub const Fragment: IdCategoryType = 2;
36 pub const Actor: IdCategoryType = 3;
37}
38pub struct IdGenerator<const TYPE: IdCategoryType>(AtomicU64);
39
40impl<const TYPE: IdCategoryType> IdGenerator<TYPE> {
41 pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
42 let id: i32 = match TYPE {
43 IdCategory::Table => {
44 0
47 }
48 IdCategory::Fragment => Fragment::find()
49 .select_only()
50 .expr(Func::if_null(
51 Expr::col(fragment::Column::FragmentId).max().add(1),
52 1,
53 ))
54 .into_tuple()
55 .one(conn)
56 .await?
57 .unwrap_or_default(),
58 IdCategory::Actor => Actor::find()
59 .select_only()
60 .expr(Func::if_null(
61 Expr::col(actor::Column::ActorId).max().add(1),
62 1,
63 ))
64 .into_tuple()
65 .one(conn)
66 .await?
67 .unwrap_or_default(),
68 _ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"),
69 };
70
71 Ok(Self(AtomicU64::new(id as u64)))
72 }
73
74 pub fn generate_interval(&self, interval: u64) -> u64 {
75 self.0.fetch_add(interval, Ordering::Relaxed)
76 }
77}
78
79pub type IdGeneratorManagerRef = Arc<IdGeneratorManager>;
80
81pub struct IdGeneratorManager {
84 pub tables: Arc<IdGenerator<{ IdCategory::Table }>>,
85 pub fragments: Arc<IdGenerator<{ IdCategory::Fragment }>>,
86 pub actors: Arc<IdGenerator<{ IdCategory::Actor }>>,
87}
88
89impl IdGeneratorManager {
90 pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
91 Ok(Self {
92 tables: Arc::new(IdGenerator::new(conn).await?),
93 fragments: Arc::new(IdGenerator::new(conn).await?),
94 actors: Arc::new(IdGenerator::new(conn).await?),
95 })
96 }
97
98 pub fn generate<const C: IdCategoryType>(&self) -> u64 {
99 match C {
100 IdCategory::Table => self.tables.generate_interval(1),
101 IdCategory::Fragment => self.fragments.generate_interval(1),
102 IdCategory::Actor => self.actors.generate_interval(1),
103 _ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"),
104 }
105 }
106
107 pub fn generate_interval<const C: IdCategoryType>(&self, interval: u64) -> u64 {
108 match C {
109 IdCategory::Table => self.tables.generate_interval(interval),
110 IdCategory::Fragment => self.fragments.generate_interval(interval),
111 IdCategory::Actor => self.actors.generate_interval(interval),
112 _ => unreachable!("IdGeneratorV2 only supports Table, Fragment, and Actor"),
113 }
114 }
115}