risingwave_meta/stream/stream_graph/
id.rs1use std::marker::PhantomData;
16use std::sync::atomic::{AtomicU32, Ordering};
17
18use risingwave_pb::id::{ActorId, FragmentId};
19
20use crate::controller::id::{
21 IdCategory, IdCategoryType, IdGeneratorManager as SqlIdGeneratorManager,
22};
23
24#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
27pub(super) struct GlobalId<const TYPE: IdCategoryType>(u32);
28
29impl GlobalFragmentId {
30 pub fn new(id: FragmentId) -> Self {
31 Self(id.as_raw_id())
32 }
33}
34
35impl<const TYPE: IdCategoryType> GlobalId<TYPE> {
36 pub fn as_global_id<T: From<u32>>(&self) -> T {
37 self.0.into()
38 }
39}
40
41impl<const TYPE: IdCategoryType> From<u32> for GlobalId<TYPE> {
42 fn from(id: u32) -> Self {
43 Self(id)
44 }
45}
46
47#[derive(Clone, Copy, Debug)]
51pub(crate) struct GlobalIdGen<ID: From<u32>> {
52 offset: u32,
53 len: u32,
54 _phantom: PhantomData<ID>,
55}
56
57pub(super) type GlobalFragmentId = GlobalId<{ IdCategory::Fragment }>;
58pub(super) type GlobalFragmentIdGen = GlobalIdGen<GlobalFragmentId>;
59
60pub(super) type GlobalTableIdGen = GlobalIdGen<GlobalId<{ IdCategory::Table }>>;
61
62#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
63pub(crate) struct GlobalActorId(u32);
64
65impl GlobalActorId {
66 pub fn as_global_id(&self) -> ActorId {
67 self.0.into()
68 }
69}
70
71impl From<u32> for GlobalActorId {
72 fn from(id: u32) -> Self {
73 Self(id)
74 }
75}
76
77impl<const TYPE: IdCategoryType> GlobalIdGen<GlobalId<TYPE>> {
78 pub fn new(id_gen: &SqlIdGeneratorManager, len: u64) -> Self {
80 let offset = id_gen.generate_interval::<TYPE>(len);
81 Self {
82 offset: offset as u32,
83 len: len as u32,
84 _phantom: PhantomData,
85 }
86 }
87}
88
89pub(crate) type GlobalActorIdGen = GlobalIdGen<GlobalActorId>;
90
91impl GlobalIdGen<GlobalActorId> {
92 pub fn new(counter: &AtomicU32, len: u64) -> Self {
93 let len_u32 = u32::try_from(len).expect("actor count exceeds u32::MAX");
94 let offset = counter.fetch_add(len_u32, Ordering::Relaxed);
95 Self {
96 offset,
97 len: len_u32,
98 _phantom: PhantomData,
99 }
100 }
101}
102
103impl<ID: From<u32>> GlobalIdGen<ID> {
104 pub fn to_global_id(&self, local_id: u32) -> ID {
106 assert!(
107 local_id < self.len,
108 "id {} is out of range (len: {})",
109 local_id,
110 self.len
111 );
112 ID::from(local_id + self.offset)
113 }
114}