risingwave_meta/stream/stream_graph/
id.rs1use std::marker::PhantomData;
16use std::sync::atomic::{AtomicU32, Ordering};
17
18use risingwave_pb::id::{ActorId, FragmentId};
19
20use crate::controller::id::{
21 IdCategory, IdCategoryType, IdGeneratorManager as SqlIdGeneratorManager,
22};
23
24#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
27pub(super) struct GlobalId<const TYPE: IdCategoryType>(u32);
28
29impl GlobalFragmentId {
30 pub fn new(id: FragmentId) -> Self {
31 Self(id.as_raw_id())
32 }
33}
34
35impl<const TYPE: IdCategoryType> GlobalId<TYPE> {
36 pub fn as_global_id<T: From<u32>>(&self) -> T {
37 self.0.into()
38 }
39}
40
41impl<const TYPE: IdCategoryType> From<u32> for GlobalId<TYPE> {
42 fn from(id: u32) -> Self {
43 Self(id)
44 }
45}
46
47#[derive(Clone, Copy, Debug)]
51pub(super) struct GlobalIdGen<ID: From<u32>> {
52 offset: u32,
53 len: u32,
54 _phantom: PhantomData<ID>,
55}
56
57pub(super) type GlobalFragmentId = GlobalId<{ IdCategory::Fragment }>;
58pub(super) type GlobalFragmentIdGen = GlobalIdGen<GlobalFragmentId>;
59
60pub(super) type GlobalTableIdGen = GlobalIdGen<GlobalId<{ IdCategory::Table }>>;
61
62#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
63pub(super) struct GlobalActorId(u32);
64
65impl GlobalActorId {
66 pub fn new(id: ActorId) -> Self {
67 Self(id.as_raw_id())
68 }
69
70 pub fn as_global_id(&self) -> ActorId {
71 self.0.into()
72 }
73}
74
75impl From<u32> for GlobalActorId {
76 fn from(id: u32) -> Self {
77 Self(id)
78 }
79}
80
81impl<const TYPE: IdCategoryType> GlobalIdGen<GlobalId<TYPE>> {
82 pub fn new(id_gen: &SqlIdGeneratorManager, len: u64) -> Self {
84 let offset = id_gen.generate_interval::<TYPE>(len);
85 Self {
86 offset: offset as u32,
87 len: len as u32,
88 _phantom: PhantomData,
89 }
90 }
91}
92
93pub(super) type GlobalActorIdGen = GlobalIdGen<GlobalActorId>;
94
95impl GlobalIdGen<GlobalActorId> {
96 pub fn new(counter: &AtomicU32, len: u64) -> Self {
97 let len_u32 = u32::try_from(len).expect("actor count exceeds u32::MAX");
98 let offset = counter.fetch_add(len_u32, Ordering::Relaxed);
99 Self {
100 offset,
101 len: len_u32,
102 _phantom: PhantomData,
103 }
104 }
105}
106
107impl<ID: From<u32>> GlobalIdGen<ID> {
108 pub fn to_global_id(&self, local_id: u32) -> ID {
110 assert!(
111 local_id < self.len,
112 "id {} is out of range (len: {})",
113 local_id,
114 self.len
115 );
116 ID::from(local_id + self.offset)
117 }
118
119 pub fn len(&self) -> u32 {
120 self.len
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn global_actor_id_gen_reserves_unique_ranges() {
130 let counter = AtomicU32::new(10);
131 let first = GlobalActorIdGen::new(&counter, 3);
132 assert_eq!(first.len(), 3);
133 let second = GlobalActorIdGen::new(&counter, 2);
134 assert_eq!(second.len(), 2);
135
136 assert_eq!(first.to_global_id(0).as_global_id(), 10);
137 assert_eq!(first.to_global_id(2).as_global_id(), 12);
138 assert_eq!(second.to_global_id(1).as_global_id(), 14);
139 assert_eq!(counter.load(Ordering::Relaxed), 15);
140 }
141}