risingwave_meta/stream/stream_graph/
id.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::sync::atomic::{AtomicU32, Ordering};
17
18use risingwave_pb::id::{ActorId, FragmentId};
19
20use crate::controller::id::{
21    IdCategory, IdCategoryType, IdGeneratorManager as SqlIdGeneratorManager,
22};
23
24/// A wrapper to distinguish global ID generated by the [`SqlIdGeneratorManager`] and the local ID from
25/// the frontend.
26#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
27pub(super) struct GlobalId<const TYPE: IdCategoryType>(u32);
28
29impl GlobalFragmentId {
30    pub fn new(id: FragmentId) -> Self {
31        Self(id.as_raw_id())
32    }
33}
34
35impl<const TYPE: IdCategoryType> GlobalId<TYPE> {
36    pub fn as_global_id<T: From<u32>>(&self) -> T {
37        self.0.into()
38    }
39}
40
41impl<const TYPE: IdCategoryType> From<u32> for GlobalId<TYPE> {
42    fn from(id: u32) -> Self {
43        Self(id)
44    }
45}
46
47/// Utility for converting local IDs into pre-allocated global IDs by adding an `offset`.
48///
49/// This requires the local IDs exactly a permutation of the range `[0, len)`.
50#[derive(Clone, Copy, Debug)]
51pub(crate) struct GlobalIdGen<ID: From<u32>> {
52    offset: u32,
53    len: u32,
54    _phantom: PhantomData<ID>,
55}
56
57pub(super) type GlobalFragmentId = GlobalId<{ IdCategory::Fragment }>;
58pub(super) type GlobalFragmentIdGen = GlobalIdGen<GlobalFragmentId>;
59
60pub(super) type GlobalTableIdGen = GlobalIdGen<GlobalId<{ IdCategory::Table }>>;
61
62#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
63pub(crate) struct GlobalActorId(u32);
64
65impl GlobalActorId {
66    pub fn as_global_id(&self) -> ActorId {
67        self.0.into()
68    }
69}
70
71impl From<u32> for GlobalActorId {
72    fn from(id: u32) -> Self {
73        Self(id)
74    }
75}
76
77impl<const TYPE: IdCategoryType> GlobalIdGen<GlobalId<TYPE>> {
78    /// Pre-allocate a range of IDs with the given `len` and return the generator.
79    pub fn new(id_gen: &SqlIdGeneratorManager, len: u64) -> Self {
80        let offset = id_gen.generate_interval::<TYPE>(len);
81        Self {
82            offset: offset as u32,
83            len: len as u32,
84            _phantom: PhantomData,
85        }
86    }
87}
88
89pub(crate) type GlobalActorIdGen = GlobalIdGen<GlobalActorId>;
90
91impl GlobalIdGen<GlobalActorId> {
92    pub fn new(counter: &AtomicU32, len: u64) -> Self {
93        let len_u32 = u32::try_from(len).expect("actor count exceeds u32::MAX");
94        let offset = counter.fetch_add(len_u32, Ordering::Relaxed);
95        Self {
96            offset,
97            len: len_u32,
98            _phantom: PhantomData,
99        }
100    }
101}
102
103impl<ID: From<u32>> GlobalIdGen<ID> {
104    /// Convert local id to global id. Panics if `id >= len`.
105    pub fn to_global_id(&self, local_id: u32) -> ID {
106        assert!(
107            local_id < self.len,
108            "id {} is out of range (len: {})",
109            local_id,
110            self.len
111        );
112        ID::from(local_id + self.offset)
113    }
114}