risingwave_meta/stream/stream_graph/
id.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::sync::atomic::{AtomicU32, Ordering};
17
18use risingwave_pb::id::{ActorId, FragmentId};
19
20use crate::controller::id::{
21    IdCategory, IdCategoryType, IdGeneratorManager as SqlIdGeneratorManager,
22};
23
24/// A wrapper to distinguish global ID generated by the [`SqlIdGeneratorManager`] and the local ID from
25/// the frontend.
26#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
27pub(super) struct GlobalId<const TYPE: IdCategoryType>(u32);
28
29impl GlobalFragmentId {
30    pub fn new(id: FragmentId) -> Self {
31        Self(id.as_raw_id())
32    }
33}
34
35impl<const TYPE: IdCategoryType> GlobalId<TYPE> {
36    pub fn as_global_id<T: From<u32>>(&self) -> T {
37        self.0.into()
38    }
39}
40
41impl<const TYPE: IdCategoryType> From<u32> for GlobalId<TYPE> {
42    fn from(id: u32) -> Self {
43        Self(id)
44    }
45}
46
47/// Utility for converting local IDs into pre-allocated global IDs by adding an `offset`.
48///
49/// This requires the local IDs exactly a permutation of the range `[0, len)`.
50#[derive(Clone, Copy, Debug)]
51pub(super) struct GlobalIdGen<ID: From<u32>> {
52    offset: u32,
53    len: u32,
54    _phantom: PhantomData<ID>,
55}
56
57pub(super) type GlobalFragmentId = GlobalId<{ IdCategory::Fragment }>;
58pub(super) type GlobalFragmentIdGen = GlobalIdGen<GlobalFragmentId>;
59
60pub(super) type GlobalTableIdGen = GlobalIdGen<GlobalId<{ IdCategory::Table }>>;
61
62#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
63pub(super) struct GlobalActorId(u32);
64
65impl GlobalActorId {
66    pub fn new(id: ActorId) -> Self {
67        Self(id.as_raw_id())
68    }
69
70    pub fn as_global_id(&self) -> ActorId {
71        self.0.into()
72    }
73}
74
75impl From<u32> for GlobalActorId {
76    fn from(id: u32) -> Self {
77        Self(id)
78    }
79}
80
81impl<const TYPE: IdCategoryType> GlobalIdGen<GlobalId<TYPE>> {
82    /// Pre-allocate a range of IDs with the given `len` and return the generator.
83    pub fn new(id_gen: &SqlIdGeneratorManager, len: u64) -> Self {
84        let offset = id_gen.generate_interval::<TYPE>(len);
85        Self {
86            offset: offset as u32,
87            len: len as u32,
88            _phantom: PhantomData,
89        }
90    }
91}
92
93pub(super) type GlobalActorIdGen = GlobalIdGen<GlobalActorId>;
94
95impl GlobalIdGen<GlobalActorId> {
96    pub fn new(counter: &AtomicU32, len: u64) -> Self {
97        let len_u32 = u32::try_from(len).expect("actor count exceeds u32::MAX");
98        let offset = counter.fetch_add(len_u32, Ordering::Relaxed);
99        Self {
100            offset,
101            len: len_u32,
102            _phantom: PhantomData,
103        }
104    }
105}
106
107impl<ID: From<u32>> GlobalIdGen<ID> {
108    /// Convert local id to global id. Panics if `id >= len`.
109    pub fn to_global_id(&self, local_id: u32) -> ID {
110        assert!(
111            local_id < self.len,
112            "id {} is out of range (len: {})",
113            local_id,
114            self.len
115        );
116        ID::from(local_id + self.offset)
117    }
118
119    pub fn len(&self) -> u32 {
120        self.len
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn global_actor_id_gen_reserves_unique_ranges() {
130        let counter = AtomicU32::new(10);
131        let first = GlobalActorIdGen::new(&counter, 3);
132        assert_eq!(first.len(), 3);
133        let second = GlobalActorIdGen::new(&counter, 2);
134        assert_eq!(second.len(), 2);
135
136        assert_eq!(first.to_global_id(0).as_global_id(), 10);
137        assert_eq!(first.to_global_id(2).as_global_id(), 12);
138        assert_eq!(second.to_global_id(1).as_global_id(), 14);
139        assert_eq!(counter.load(Ordering::Relaxed), 15);
140    }
141}