risingwave_meta/controller/
id.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use risingwave_meta_model::fragment;
19use risingwave_meta_model::prelude::Fragment;
20use sea_orm::sea_query::{Expr, Func};
21use sea_orm::{DatabaseConnection, EntityTrait, QuerySelect};
22
23use crate::MetaResult;
24
25pub type IdCategoryType = u8;
26
27// TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed.
28#[expect(non_snake_case, non_upper_case_globals)]
29pub mod IdCategory {
30    use super::IdCategoryType;
31
32    #[cfg(test)]
33    pub const Test: IdCategoryType = 0;
34    pub const Table: IdCategoryType = 1;
35    pub const Fragment: IdCategoryType = 2;
36}
37pub struct IdGenerator<const TYPE: IdCategoryType>(AtomicU64);
38
39impl<const TYPE: IdCategoryType> IdGenerator<TYPE> {
40    pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
41        let id: i32 = match TYPE {
42            IdCategory::Table => {
43                // Since we are using object pk to generate id for tables, here we just implement a dummy
44                // id generator and refill it later when inserting the table.
45                0
46            }
47            IdCategory::Fragment => Fragment::find()
48                .select_only()
49                .expr(Func::if_null(
50                    Expr::col(fragment::Column::FragmentId).max().add(1),
51                    1,
52                ))
53                .into_tuple()
54                .one(conn)
55                .await?
56                .unwrap_or_default(),
57            _ => unreachable!("IdGenerator only supports Table and Fragment"),
58        };
59
60        Ok(Self(AtomicU64::new(id as u64)))
61    }
62
63    pub fn generate_interval(&self, interval: u64) -> u64 {
64        self.0.fetch_add(interval, Ordering::Relaxed)
65    }
66}
67
68pub type IdGeneratorManagerRef = Arc<IdGeneratorManager>;
69
70/// `IdGeneratorManager` is a manager for table and fragment ID generators. Note that this is just a
71/// workaround for the current implementation of `IdGenerator`. We should refactor it later.
72pub struct IdGeneratorManager {
73    pub tables: Arc<IdGenerator<{ IdCategory::Table }>>,
74    pub fragments: Arc<IdGenerator<{ IdCategory::Fragment }>>,
75}
76
77impl IdGeneratorManager {
78    pub async fn new(conn: &DatabaseConnection) -> MetaResult<Self> {
79        Ok(Self {
80            tables: Arc::new(IdGenerator::new(conn).await?),
81            fragments: Arc::new(IdGenerator::new(conn).await?),
82        })
83    }
84
85    pub fn generate<const C: IdCategoryType>(&self) -> u64 {
86        match C {
87            IdCategory::Table => self.tables.generate_interval(1),
88            IdCategory::Fragment => self.fragments.generate_interval(1),
89            _ => unreachable!("IdGenerator only supports Table and Fragment"),
90        }
91    }
92
93    pub fn generate_interval<const C: IdCategoryType>(&self, interval: u64) -> u64 {
94        match C {
95            IdCategory::Table => self.tables.generate_interval(interval),
96            IdCategory::Fragment => self.fragments.generate_interval(interval),
97            _ => unreachable!("IdGenerator only supports Table and Fragment"),
98        }
99    }
100}