risingwave_pb/
id.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::borrow::Borrow;
17use std::fmt::Formatter;
18use std::iter::Step;
19use std::num::TryFromIntError;
20use std::ops::{Add, AddAssign, Sub};
21use std::str::FromStr;
22
23use sea_orm::sea_query::{ArrayType, ValueTypeErr};
24use sea_orm::{ColIdx, ColumnType, DbErr, QueryResult, TryGetError};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use thiserror_ext::AsReport;
27use tracing::warn;
28
29use crate::catalog::source::OptionalAssociatedTableId;
30use crate::catalog::table::OptionalAssociatedSourceId;
31
32pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
33
34#[derive(Clone, Copy, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
35#[repr(transparent)]
36pub struct TypedId<const N: usize, P>(pub(crate) P);
37
38impl<const N: usize, P: std::fmt::Debug> std::fmt::Debug for TypedId<N, P> {
39    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40        <P as std::fmt::Debug>::fmt(&self.0, f)
41    }
42}
43
44impl<const N: usize, P: std::fmt::Display> std::fmt::Display for TypedId<N, P> {
45    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46        <P as std::fmt::Display>::fmt(&self.0, f)
47    }
48}
49
50impl<const N: usize, P: std::fmt::UpperHex> std::fmt::UpperHex for TypedId<N, P> {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        <P as std::fmt::UpperHex>::fmt(&self.0, f)
53    }
54}
55
56impl<const N: usize, P: PartialEq> PartialEq<P> for TypedId<N, P> {
57    fn eq(&self, other: &P) -> bool {
58        self.0 == *other
59    }
60}
61
62impl<const N: usize, P: FromStr> FromStr for TypedId<N, P> {
63    type Err = <P as FromStr>::Err;
64
65    fn from_str(s: &str) -> Result<Self, Self::Err> {
66        Ok(Self(<P as FromStr>::from_str(s)?))
67    }
68}
69
70// # safety: transparent repr
71unsafe impl<const N: usize, P> prost::TransparentOver for TypedId<N, P> {
72    type Inner = P;
73}
74
75impl<const N: usize, P> Borrow<P> for TypedId<N, P> {
76    fn borrow(&self) -> &P {
77        // Safety: transparent repr
78        unsafe { std::mem::transmute(self) }
79    }
80}
81
82impl<const N: usize, P> TypedId<N, P> {
83    pub const fn new(inner: P) -> Self {
84        TypedId(inner)
85    }
86}
87
88impl<const N: usize, P> TypedId<N, P>
89where
90    Self: UniqueTypedIdDeclaration,
91{
92    #[expect(clippy::wrong_self_convention)]
93    pub fn as_raw_id(self) -> P {
94        self.0
95    }
96}
97
98type TypedU32Id<const N: usize> = TypedId<N, u32>;
99
100impl<const N: usize> TypedU32Id<N> {
101    pub const fn placeholder() -> Self {
102        Self(OBJECT_ID_PLACEHOLDER)
103    }
104
105    pub fn is_placeholder(&self) -> bool {
106        self.0 == OBJECT_ID_PLACEHOLDER
107    }
108}
109
110impl<const N: usize, P> From<P> for TypedId<N, P> {
111    fn from(id: P) -> Self {
112        Self(id)
113    }
114}
115
116/// Implements `from_$signed` and `to_$signed` conversion methods for TypedId types.
117/// - `$unsigned`: the unsigned primitive type (u32 or u64)
118/// - `$signed`: the corresponding signed type for DB storage (i32 or i64)
119macro_rules! impl_typed_id_conversion {
120    ($unsigned:ty, $signed:ty) => {
121        paste::paste! {
122            impl<const N: usize> TypedId<N, $unsigned>
123            {
124                fn [< from_ $signed >](inner: $signed) -> Self {
125                    Self(inner.try_into().unwrap_or_else(|e: TryFromIntError| {
126                        if cfg!(debug_assertions) {
127                            panic!(
128                                "invalid {} id {} for {}: {:?}",
129                                stringify!($signed),
130                                inner,
131                                type_name::<Self>(),
132                                e.as_report()
133                            );
134                        } else {
135                            warn!(
136                                "invalid {} id {} for {}: {:?}",
137                                stringify!($signed),
138                                inner,
139                                type_name::<Self>(),
140                                e.as_report()
141                            );
142                            inner as _
143                        }
144                    }))
145                }
146
147                fn [< to_ $signed >](self) -> $signed {
148                    self.0.try_into().unwrap_or_else(|e: TryFromIntError| {
149                        if cfg!(debug_assertions) {
150                            panic!(
151                                "invalid {} id {} for {}: {:?}",
152                                stringify!($unsigned),
153                                self.0,
154                                type_name::<Self>(),
155                                e.as_report()
156                            );
157                        } else {
158                            warn!(
159                                "invalid {} id {} for {}: {:?}",
160                                stringify!($unsigned),
161                                self.0,
162                                type_name::<Self>(),
163                                e.as_report()
164                            );
165                            self.0 as _
166                        }
167                    })
168                }
169
170                pub fn [< as_ $signed _id >](self) -> $signed {
171                    self.[< to_ $signed >]()
172                }
173            }
174        }
175    };
176}
177
178impl_typed_id_conversion!(u32, i32);
179impl_typed_id_conversion!(u64, i64);
180
181/// Implements SeaORM traits for TypedId types.
182/// - `$unsigned`: the unsigned primitive type (u32 or u64)
183/// - `$signed`: the corresponding signed type for DB storage (i32 or i64)
184macro_rules! impl_sea_orm_for_typed_id {
185    ($unsigned:ty, $signed:ty) => {
186        paste::paste! {
187            impl<const N: usize> From<TypedId<N, $unsigned>> for sea_orm::Value
188            where
189                TypedId<N, $unsigned>: UniqueTypedIdDeclaration,
190            {
191                fn from(value: TypedId<N, $unsigned>) -> Self {
192                    sea_orm::Value::from(value.[< to_ $signed >]())
193                }
194            }
195
196            impl<const N: usize> sea_orm::sea_query::ValueType for TypedId<N, $unsigned>
197            where
198                Self: UniqueTypedIdDeclaration,
199            {
200                fn try_from(v: sea_orm::Value) -> Result<Self, ValueTypeErr> {
201                    let inner = <$signed as sea_orm::sea_query::ValueType>::try_from(v)?;
202                    Ok(Self::[< from_ $signed >](inner))
203                }
204
205                fn type_name() -> String {
206                    <$signed as sea_orm::sea_query::ValueType>::type_name()
207                }
208
209                fn array_type() -> ArrayType {
210                    <$signed as sea_orm::sea_query::ValueType>::array_type()
211                }
212
213                fn column_type() -> ColumnType {
214                    <$signed as sea_orm::sea_query::ValueType>::column_type()
215                }
216            }
217
218            impl<const N: usize> sea_orm::sea_query::Nullable for TypedId<N, $unsigned>
219            where
220                Self: UniqueTypedIdDeclaration,
221            {
222                fn null() -> sea_orm::Value {
223                    <$signed as sea_orm::sea_query::Nullable>::null()
224                }
225            }
226
227            impl<const N: usize> sea_orm::TryGetable for TypedId<N, $unsigned>
228            where
229                Self: UniqueTypedIdDeclaration,
230            {
231                fn try_get_by<I: ColIdx>(res: &QueryResult, index: I) -> Result<Self, TryGetError> {
232                    let inner = <$signed as sea_orm::TryGetable>::try_get_by(res, index)?;
233                    Ok(Self::[< from_ $signed >](inner))
234                }
235            }
236
237            impl<const N: usize> sea_orm::TryFromU64 for TypedId<N, $unsigned>
238            where
239                Self: UniqueTypedIdDeclaration,
240            {
241                fn try_from_u64(n: u64) -> Result<Self, DbErr> {
242                    Ok(Self::[< from_ $signed >](<$signed as sea_orm::TryFromU64>::try_from_u64(n)?))
243                }
244            }
245        }
246    };
247}
248
249impl_sea_orm_for_typed_id!(u32, i32);
250impl_sea_orm_for_typed_id!(u64, i64);
251
252impl<const N: usize, P: Serialize> Serialize for TypedId<N, P> {
253    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
254    where
255        S: Serializer,
256    {
257        <P as Serialize>::serialize(&self.0, serializer)
258    }
259}
260
261impl<'de, const N: usize, P: Deserialize<'de>> Deserialize<'de> for TypedId<N, P> {
262    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
263    where
264        D: Deserializer<'de>,
265    {
266        Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
267    }
268}
269
270impl<const N: usize, P: Sub<Output = P>> Sub for TypedId<N, P> {
271    type Output = P;
272
273    fn sub(self, rhs: Self) -> Self::Output {
274        self.0 - rhs.0
275    }
276}
277
278impl<const N: usize, P: Step> Step for TypedId<N, P> {
279    fn steps_between(start: &Self, end: &Self) -> (usize, Option<usize>) {
280        P::steps_between(&start.0, &end.0)
281    }
282
283    fn forward_checked(start: Self, count: usize) -> Option<Self> {
284        P::forward_checked(start.0, count).map(Self)
285    }
286
287    fn backward_checked(start: Self, count: usize) -> Option<Self> {
288        P::backward_checked(start.0, count).map(Self)
289    }
290}
291
292macro_rules! impl_add {
293    ($type_name:ty) => {
294        impl<const N: usize> Add<$type_name> for TypedId<N, $type_name> {
295            type Output = Self;
296
297            fn add(self, rhs: $type_name) -> Self::Output {
298                Self(self.0.checked_add(rhs).unwrap())
299            }
300        }
301
302        impl<const N: usize> AddAssign<$type_name> for TypedId<N, $type_name> {
303            fn add_assign(&mut self, rhs: $type_name) {
304                self.0 = self.0.checked_add(rhs).unwrap()
305            }
306        }
307
308        impl<const N: usize> PartialEq<TypedId<N, $type_name>> for $type_name {
309            fn eq(&self, other: &TypedId<N, $type_name>) -> bool {
310                *self == other.0
311            }
312        }
313    };
314}
315
316impl_add!(u32);
317impl_add!(u64);
318
319#[expect(dead_code)]
320pub trait UniqueTypedIdDeclaration {}
321
322macro_rules! declare_id_type {
323    ($name:ident, $primitive:ty, $type_id:expr) => {
324        pub type $name = TypedId<{ $type_id }, $primitive>;
325        impl UniqueTypedIdDeclaration for $name {}
326    };
327}
328
329macro_rules! declare_id_types {
330    ($primitive:ty $(, $name:ident)+) => {
331        declare_id_types! {
332            $primitive, 0 $(, $name)+
333        }
334    };
335    ($primitive:ty, $next_type_id:expr) => {};
336    ($primitive:ty, $next_type_id:expr, $name:ident $(, $rest:ident)*) => {
337        declare_id_type! { $name, $primitive, $next_type_id }
338        declare_id_types! {
339            $primitive, $next_type_id + 1  $(, $rest)*
340        }
341    };
342    ($($invalid:tt)+) => {
343        compile_error!(stringify!($($invalid)+))
344    }
345}
346
347declare_id_types!(
348    u32,
349    TableId,
350    JobId,
351    DatabaseId,
352    SchemaId,
353    FragmentId,
354    ActorId,
355    WorkerId,
356    SinkId,
357    SourceId,
358    SubscriptionId,
359    IndexId,
360    ViewId,
361    FunctionId,
362    ConnectionId,
363    SecretId,
364    SubscriberId,
365    LocalOperatorId,
366    UserId,
367    RelationId
368);
369
370declare_id_type!(ObjectId, u32, 256);
371
372declare_id_types!(
373    u64,
374    GlobalOperatorId,
375    StreamNodeLocalOperatorId,
376    ExecutorId,
377    PartialGraphId,
378    HummockRawObjectId,
379    HummockSstableObjectId,
380    HummockSstableId,
381    HummockVectorFileId,
382    HummockHnswGraphFileId,
383    HummockVersionId,
384    CompactionGroupId
385);
386
387macro_rules! impl_as {
388    (@func $target_id_name:ident, $alias_name:ident) => {
389        paste::paste! {
390            pub fn [< as_ $alias_name >](self) -> $target_id_name {
391                $target_id_name::new(self.0)
392            }
393        }
394    };
395    (@func $target_id_name:ident) => {
396        paste::paste! {
397            impl_as! { @func $target_id_name, [< $target_id_name:snake >] }
398        }
399    };
400    ($src_id_name:ident $(,$target_id_name:ident)* $(,{$orig_target_id_name:ident , $alias_name:ident})*) => {
401        impl $src_id_name {
402            $(
403                impl_as! { @func $target_id_name }
404            )*
405            $(
406                impl_as! { @func $orig_target_id_name, $alias_name }
407            )*
408        }
409    }
410}
411
412impl JobId {
413    pub fn is_mv_table_id(self, table_id: TableId) -> bool {
414        self.0 == table_id.0
415    }
416}
417
418impl_as!(JobId, SinkId, IndexId, SubscriberId, {TableId, mv_table_id}, {SourceId, shared_source_id});
419impl_as!(TableId, JobId);
420
421impl From<StreamNodeLocalOperatorId> for LocalOperatorId {
422    fn from(value: StreamNodeLocalOperatorId) -> Self {
423        assert!(
424            value.0 <= u32::MAX as u64,
425            "oversized operator id {} in stream node",
426            value.0
427        );
428        Self(value.0 as u32)
429    }
430}
431
432impl From<LocalOperatorId> for StreamNodeLocalOperatorId {
433    fn from(value: LocalOperatorId) -> Self {
434        Self(value.0 as u64)
435    }
436}
437
438impl From<OptionalAssociatedTableId> for TableId {
439    fn from(value: OptionalAssociatedTableId) -> Self {
440        let OptionalAssociatedTableId::AssociatedTableId(table_id) = value;
441        table_id
442    }
443}
444
445impl From<TableId> for OptionalAssociatedTableId {
446    fn from(value: TableId) -> Self {
447        OptionalAssociatedTableId::AssociatedTableId(value)
448    }
449}
450
451impl_as!(SinkId, JobId);
452impl_as!(IndexId, JobId);
453impl_as!(SourceId, {JobId, share_source_job_id}, {TableId, cdc_table_id});
454impl_as!(SubscriptionId, SubscriberId);
455
456impl From<OptionalAssociatedSourceId> for SourceId {
457    fn from(value: OptionalAssociatedSourceId) -> Self {
458        let OptionalAssociatedSourceId::AssociatedSourceId(source_id) = value;
459        source_id
460    }
461}
462
463impl From<SourceId> for OptionalAssociatedSourceId {
464    fn from(value: SourceId) -> Self {
465        OptionalAssociatedSourceId::AssociatedSourceId(value)
466    }
467}
468
469macro_rules! impl_into_object {
470    ($mod_prefix:ty, $($type_name:ident),+) => {
471        $(
472            impl From<$type_name> for $mod_prefix {
473                fn from(value: $type_name) -> Self {
474                    <$mod_prefix>::$type_name(value)
475                }
476            }
477        )+
478    };
479}
480
481impl_into_object!(
482    crate::user::grant_privilege::Object,
483    DatabaseId,
484    TableId,
485    SchemaId,
486    SinkId,
487    SourceId,
488    SubscriptionId,
489    ViewId,
490    FunctionId,
491    ConnectionId,
492    SecretId
493);
494
495impl_into_object!(
496    crate::ddl_service::alter_name_request::Object,
497    DatabaseId,
498    TableId,
499    SchemaId,
500    SinkId,
501    SourceId,
502    SubscriptionId,
503    IndexId,
504    ViewId
505);
506
507impl_into_object!(
508    crate::ddl_service::alter_owner_request::Object,
509    DatabaseId,
510    TableId,
511    SchemaId,
512    SinkId,
513    SourceId,
514    SubscriptionId,
515    ViewId,
516    ConnectionId,
517    FunctionId,
518    SecretId
519);
520
521impl_into_object!(
522    crate::ddl_service::alter_set_schema_request::Object,
523    TableId,
524    ViewId,
525    SourceId,
526    SinkId,
527    SubscriptionId,
528    FunctionId,
529    ConnectionId
530);
531
532macro_rules! impl_into_rename_object {
533    ($($type_name:ident),+) => {
534        paste::paste! {
535            $(
536                impl From<([<$type_name Id>], [<$type_name Id>])> for crate::ddl_service::alter_swap_rename_request::Object {
537                    fn from((src_object_id, dst_object_id): ([<$type_name Id>], [<$type_name Id>])) -> Self {
538                        crate::ddl_service::alter_swap_rename_request::Object::$type_name(crate::ddl_service::alter_swap_rename_request::ObjectNameSwapPair {
539                            src_object_id: src_object_id.as_object_id(),
540                            dst_object_id: dst_object_id.as_object_id(),
541                        })
542                    }
543                }
544            )+
545        }
546    };
547}
548
549impl_into_rename_object!(Table, View, Source, Sink, Subscription);
550
551macro_rules! impl_object_id_conversion {
552    ($($type_name:ident),+) => {
553        $(
554            impl From<$type_name> for ObjectId {
555                fn from(value: $type_name) -> Self {
556                    Self::new(value.0)
557                }
558            }
559
560            impl $type_name {
561                pub fn as_object_id(self) -> ObjectId {
562                    ObjectId::new(self.0)
563                }
564            }
565        )+
566
567        paste::paste! {
568            impl ObjectId {
569                $(
570                    pub fn [< as_ $type_name:snake>](self) -> $type_name {
571                        $type_name::new(self.0)
572                    }
573                )+
574            }
575        }
576    };
577}
578
579impl_object_id_conversion!(
580    DatabaseId,
581    TableId,
582    SchemaId,
583    SinkId,
584    SourceId,
585    JobId,
586    SubscriptionId,
587    IndexId,
588    ViewId,
589    FunctionId,
590    ConnectionId,
591    SecretId
592);
593
594macro_rules! declare_relation {
595    ($($id_name:ident),+) => {
596        $(
597            impl $id_name {
598                pub fn as_relation_id(self) -> RelationId {
599                    RelationId::new(self.0)
600                }
601            }
602        )+
603    };
604}
605
606declare_relation!(TableId, SourceId, SinkId, IndexId, ViewId, SubscriptionId);
607
608macro_rules! impl_hummock_object_id {
609    ($type_name:ty) => {
610        impl $type_name {
611            pub fn as_raw(&self) -> HummockRawObjectId {
612                HummockRawObjectId::new(self.0)
613            }
614        }
615
616        impl From<HummockRawObjectId> for $type_name {
617            fn from(id: HummockRawObjectId) -> Self {
618                Self(id.0)
619            }
620        }
621    };
622}
623
624impl_hummock_object_id!(HummockSstableObjectId);
625impl_hummock_object_id!(HummockVectorFileId);
626impl_hummock_object_id!(HummockHnswGraphFileId);