risingwave_pb/
id.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Formatter;
18use std::num::TryFromIntError;
19use std::ops::{Add, AddAssign};
20use std::str::FromStr;
21
22use sea_orm::sea_query::{ArrayType, ValueTypeErr};
23use sea_orm::{ColIdx, ColumnType, DbErr, QueryResult, TryGetError};
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use thiserror_ext::AsReport;
26use tracing::warn;
27
28use crate::catalog::source::OptionalAssociatedTableId;
29use crate::catalog::table::OptionalAssociatedSourceId;
30
31pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
32
33#[derive(Clone, Copy, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
34#[repr(transparent)]
35pub struct TypedId<const N: usize, P>(pub(crate) P);
36
37impl<const N: usize, P: std::fmt::Debug> std::fmt::Debug for TypedId<N, P> {
38    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39        <P as std::fmt::Debug>::fmt(&self.0, f)
40    }
41}
42
43impl<const N: usize, P: std::fmt::Display> std::fmt::Display for TypedId<N, P> {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        <P as std::fmt::Display>::fmt(&self.0, f)
46    }
47}
48
49impl<const N: usize, P: std::fmt::UpperHex> std::fmt::UpperHex for TypedId<N, P> {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        <P as std::fmt::UpperHex>::fmt(&self.0, f)
52    }
53}
54
55impl<const N: usize, P: PartialEq> PartialEq<P> for TypedId<N, P> {
56    fn eq(&self, other: &P) -> bool {
57        self.0 == *other
58    }
59}
60
61impl<const N: usize, P: FromStr> FromStr for TypedId<N, P> {
62    type Err = <P as FromStr>::Err;
63
64    fn from_str(s: &str) -> Result<Self, Self::Err> {
65        Ok(Self(<P as FromStr>::from_str(s)?))
66    }
67}
68
69impl<const N: usize, P> TypedId<N, P>
70where
71    Self: UniqueTypedIdDeclaration,
72{
73    pub const fn new(inner: P) -> Self {
74        TypedId(inner)
75    }
76
77    #[expect(clippy::wrong_self_convention)]
78    pub fn as_raw_id(self) -> P {
79        self.0
80    }
81
82    pub fn raw_slice(slice: &[Self]) -> &[P] {
83        // SAFETY: transparent repr
84        unsafe { std::mem::transmute(slice) }
85    }
86
87    pub fn mut_raw_vec(vec: &mut Vec<Self>) -> &mut Vec<P> {
88        // SAFETY: transparent repr
89        unsafe { std::mem::transmute(vec) }
90    }
91
92    pub fn raw_hash_map_ref<V>(map: &HashMap<Self, V>) -> &HashMap<P, V> {
93        // SAFETY: transparent repr
94        unsafe { std::mem::transmute(map) }
95    }
96
97    pub fn raw_hash_map_mut_ref<V>(map: &mut HashMap<Self, V>) -> &mut HashMap<P, V> {
98        // SAFETY: transparent repr
99        unsafe { std::mem::transmute(map) }
100    }
101
102    pub fn raw_btree_map_ref<V>(map: &BTreeMap<Self, V>) -> &BTreeMap<P, V> {
103        // SAFETY: transparent repr
104        unsafe { std::mem::transmute(map) }
105    }
106
107    pub fn raw_btree_map_mut_ref<V>(map: &mut BTreeMap<Self, V>) -> &mut BTreeMap<P, V> {
108        // SAFETY: transparent repr
109        unsafe { std::mem::transmute(map) }
110    }
111}
112
113type TypedU32Id<const N: usize> = TypedId<N, u32>;
114
115impl<const N: usize> TypedU32Id<N>
116where
117    Self: UniqueTypedIdDeclaration,
118{
119    pub const fn placeholder() -> Self {
120        Self(OBJECT_ID_PLACEHOLDER)
121    }
122
123    pub fn is_placeholder(&self) -> bool {
124        self.0 == OBJECT_ID_PLACEHOLDER
125    }
126
127    pub fn as_i32_id(self) -> i32 {
128        self.to_i32()
129    }
130
131    fn from_i32(inner: i32) -> Self {
132        Self(inner.try_into().unwrap_or_else(|e: TryFromIntError| {
133            if cfg!(debug_assertions) {
134                panic!(
135                    "invalid i32 id {} for {}: {:?}",
136                    inner,
137                    type_name::<Self>(),
138                    e.as_report()
139                );
140            } else {
141                warn!(
142                    "invalid i32 id {} for {}: {:?}",
143                    inner,
144                    type_name::<Self>(),
145                    e.as_report()
146                );
147                inner as _
148            }
149        }))
150    }
151
152    fn to_i32(self) -> i32 {
153        self.0.try_into().unwrap_or_else(|e: TryFromIntError| {
154            if cfg!(debug_assertions) {
155                panic!(
156                    "invalid u32 id {} for {}: {:?}",
157                    self.0,
158                    type_name::<Self>(),
159                    e.as_report()
160                );
161            } else {
162                warn!(
163                    "invalid u32 id {} for {}: {:?}",
164                    self.0,
165                    type_name::<Self>(),
166                    e.as_report()
167                );
168                self.0 as _
169            }
170        })
171    }
172}
173
174impl<const N: usize, P> From<P> for TypedId<N, P> {
175    fn from(id: P) -> Self {
176        Self(id)
177    }
178}
179
180impl<const N: usize> From<TypedU32Id<N>> for sea_orm::Value
181where
182    TypedU32Id<N>: UniqueTypedIdDeclaration,
183{
184    fn from(value: TypedU32Id<N>) -> Self {
185        sea_orm::Value::from(value.to_i32())
186    }
187}
188
189impl<const N: usize> sea_orm::sea_query::ValueType for TypedU32Id<N>
190where
191    Self: UniqueTypedIdDeclaration,
192{
193    fn try_from(v: sea_orm::Value) -> Result<Self, ValueTypeErr> {
194        let inner = <i32 as sea_orm::sea_query::ValueType>::try_from(v)?;
195        Ok(Self::from_i32(inner))
196    }
197
198    fn type_name() -> String {
199        <i32 as sea_orm::sea_query::ValueType>::type_name()
200    }
201
202    fn array_type() -> ArrayType {
203        <i32 as sea_orm::sea_query::ValueType>::array_type()
204    }
205
206    fn column_type() -> ColumnType {
207        <i32 as sea_orm::sea_query::ValueType>::column_type()
208    }
209}
210
211impl<const N: usize> sea_orm::sea_query::Nullable for TypedU32Id<N> {
212    fn null() -> sea_orm::Value {
213        <i32 as sea_orm::sea_query::Nullable>::null()
214    }
215}
216
217impl<const N: usize> sea_orm::TryGetable for TypedU32Id<N>
218where
219    Self: UniqueTypedIdDeclaration,
220{
221    fn try_get_by<I: ColIdx>(res: &QueryResult, index: I) -> Result<Self, TryGetError> {
222        let inner = <i32 as sea_orm::TryGetable>::try_get_by(res, index)?;
223        Ok(Self::from_i32(inner))
224    }
225}
226
227impl<const N: usize> sea_orm::TryFromU64 for TypedU32Id<N>
228where
229    Self: UniqueTypedIdDeclaration,
230{
231    fn try_from_u64(n: u64) -> Result<Self, DbErr> {
232        let inner = <i32 as sea_orm::TryFromU64>::try_from_u64(n)?;
233        Ok(Self::from_i32(inner))
234    }
235}
236
237impl<const N: usize, P: Serialize> Serialize for TypedId<N, P> {
238    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
239    where
240        S: Serializer,
241    {
242        <P as Serialize>::serialize(&self.0, serializer)
243    }
244}
245
246impl<'de, const N: usize, P: Deserialize<'de>> Deserialize<'de> for TypedId<N, P> {
247    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
248    where
249        D: Deserializer<'de>,
250    {
251        Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
252    }
253}
254
255impl<const N: usize> Add<u32> for TypedU32Id<N> {
256    type Output = Self;
257
258    fn add(self, rhs: u32) -> Self::Output {
259        Self(self.0.checked_add(rhs).unwrap())
260    }
261}
262
263impl<const N: usize> AddAssign<u32> for TypedU32Id<N> {
264    fn add_assign(&mut self, rhs: u32) {
265        self.0 = self.0.checked_add(rhs).unwrap()
266    }
267}
268
269#[expect(dead_code)]
270pub trait UniqueTypedIdDeclaration {}
271
272macro_rules! declare_id_type {
273    ($name:ident, $primitive:ty, $type_id:expr) => {
274        pub type $name = TypedId<{ $type_id }, $primitive>;
275        impl UniqueTypedIdDeclaration for $name {}
276    };
277}
278
279macro_rules! declare_id_types {
280    ($primitive:ty $(, $name:ident)+) => {
281        declare_id_types! {
282            $primitive, 0 $(, $name)+
283        }
284    };
285    ($primitive:ty, $next_type_id:expr) => {};
286    ($primitive:ty, $next_type_id:expr, $name:ident $(, $rest:ident)*) => {
287        declare_id_type! { $name, $primitive, $next_type_id }
288        declare_id_types! {
289            $primitive, $next_type_id + 1  $(, $rest)*
290        }
291    };
292    ($($invalid:tt)+) => {
293        compile_error!(stringify!($($invalid)+))
294    }
295}
296
297declare_id_types!(
298    u32,
299    TableId,
300    JobId,
301    DatabaseId,
302    SchemaId,
303    FragmentId,
304    ActorId,
305    WorkerId,
306    SinkId,
307    SourceId,
308    SubscriptionId,
309    IndexId,
310    ViewId,
311    FunctionId,
312    ConnectionId,
313    SecretId,
314    SubscriberId,
315    LocalOperatorId
316);
317
318declare_id_type!(ObjectId, u32, 256);
319
320declare_id_types!(u64, GlobalOperatorId, StreamNodeLocalOperatorId, ExecutorId);
321
322macro_rules! impl_as {
323    (@func $target_id_name:ident, $alias_name:ident) => {
324        paste::paste! {
325            pub fn [< as_ $alias_name >](self) -> $target_id_name {
326                $target_id_name::new(self.0)
327            }
328        }
329    };
330    (@func $target_id_name:ident) => {
331        paste::paste! {
332            impl_as! { @func $target_id_name, [< $target_id_name:snake >] }
333        }
334    };
335    ($src_id_name:ident $(,$target_id_name:ident)* $(,{$orig_target_id_name:ident , $alias_name:ident})*) => {
336        impl $src_id_name {
337            $(
338                impl_as! { @func $target_id_name }
339            )*
340            $(
341                impl_as! { @func $orig_target_id_name, $alias_name }
342            )*
343        }
344    }
345}
346
347impl JobId {
348    pub fn is_mv_table_id(self, table_id: TableId) -> bool {
349        self.0 == table_id.0
350    }
351}
352
353impl_as!(JobId, SinkId, IndexId, SubscriberId, {TableId, mv_table_id}, {SourceId, shared_source_id});
354impl_as!(TableId, JobId);
355
356impl From<StreamNodeLocalOperatorId> for LocalOperatorId {
357    fn from(value: StreamNodeLocalOperatorId) -> Self {
358        assert!(
359            value.0 <= u32::MAX as u64,
360            "oversized operator id {} in stream node",
361            value.0
362        );
363        Self(value.0 as u32)
364    }
365}
366
367impl From<LocalOperatorId> for StreamNodeLocalOperatorId {
368    fn from(value: LocalOperatorId) -> Self {
369        Self(value.0 as u64)
370    }
371}
372
373impl From<OptionalAssociatedTableId> for TableId {
374    fn from(value: OptionalAssociatedTableId) -> Self {
375        let OptionalAssociatedTableId::AssociatedTableId(table_id) = value;
376        Self(table_id)
377    }
378}
379
380impl From<TableId> for OptionalAssociatedTableId {
381    fn from(value: TableId) -> Self {
382        OptionalAssociatedTableId::AssociatedTableId(value.0)
383    }
384}
385
386impl_as!(SinkId, JobId);
387impl_as!(IndexId, JobId);
388impl_as!(SourceId, {JobId, share_source_job_id}, {TableId, cdc_table_id});
389impl_as!(SubscriptionId, SubscriberId);
390
391impl From<OptionalAssociatedSourceId> for SourceId {
392    fn from(value: OptionalAssociatedSourceId) -> Self {
393        let OptionalAssociatedSourceId::AssociatedSourceId(source_id) = value;
394        Self(source_id)
395    }
396}
397
398impl From<SourceId> for OptionalAssociatedSourceId {
399    fn from(value: SourceId) -> Self {
400        OptionalAssociatedSourceId::AssociatedSourceId(value.0)
401    }
402}
403
404macro_rules! impl_into_object {
405    ($mod_prefix:ty, $($type_name:ident),+) => {
406        $(
407            impl From<$type_name> for $mod_prefix {
408                fn from(value: $type_name) -> Self {
409                    <$mod_prefix>::$type_name(value.0)
410                }
411            }
412        )+
413    };
414}
415
416impl_into_object!(
417    crate::user::grant_privilege::Object,
418    DatabaseId,
419    TableId,
420    SchemaId,
421    SinkId,
422    SourceId,
423    SubscriptionId,
424    ViewId,
425    FunctionId,
426    ConnectionId,
427    SecretId
428);
429
430impl_into_object!(
431    crate::ddl_service::alter_name_request::Object,
432    DatabaseId,
433    TableId,
434    SchemaId,
435    SinkId,
436    SourceId,
437    SubscriptionId,
438    IndexId,
439    ViewId
440);
441
442impl_into_object!(
443    crate::ddl_service::alter_owner_request::Object,
444    DatabaseId,
445    TableId,
446    SchemaId,
447    SinkId,
448    SourceId,
449    SubscriptionId,
450    ViewId,
451    ConnectionId
452);
453
454impl_into_object!(
455    crate::ddl_service::alter_set_schema_request::Object,
456    TableId,
457    ViewId,
458    SourceId,
459    SinkId,
460    SubscriptionId,
461    FunctionId,
462    ConnectionId
463);
464
465macro_rules! impl_into_rename_object {
466    ($($type_name:ident),+) => {
467        paste::paste! {
468            $(
469                impl From<([<$type_name Id>], [<$type_name Id>])> for crate::ddl_service::alter_swap_rename_request::Object {
470                    fn from((src_object_id, dst_object_id): ([<$type_name Id>], [<$type_name Id>])) -> Self {
471                        crate::ddl_service::alter_swap_rename_request::Object::$type_name(crate::ddl_service::alter_swap_rename_request::ObjectNameSwapPair {
472                            src_object_id: src_object_id.as_object_id(),
473                            dst_object_id: dst_object_id.as_object_id(),
474                        })
475                    }
476                }
477            )+
478        }
479    };
480}
481
482impl_into_rename_object!(Table, View, Source, Sink, Subscription);
483
484macro_rules! impl_object_id_conversion {
485    ($($type_name:ident),+) => {
486        $(
487            impl From<$type_name> for ObjectId {
488                fn from(value: $type_name) -> Self {
489                    Self::new(value.0)
490                }
491            }
492
493            impl $type_name {
494                pub fn as_object_id(self) -> ObjectId {
495                    ObjectId::new(self.0)
496                }
497            }
498        )+
499
500        paste::paste! {
501            impl ObjectId {
502                $(
503                    pub fn [< as_ $type_name:snake>](self) -> $type_name {
504                        $type_name::new(self.0)
505                    }
506                )+
507            }
508        }
509    };
510}
511
512impl_object_id_conversion!(
513    DatabaseId,
514    TableId,
515    SchemaId,
516    SinkId,
517    SourceId,
518    JobId,
519    SubscriptionId,
520    IndexId,
521    ViewId,
522    FunctionId,
523    ConnectionId,
524    SecretId
525);