risingwave_pb/
id.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Formatter;
18use std::num::TryFromIntError;
19use std::ops::{Add, AddAssign};
20use std::str::FromStr;
21
22use sea_orm::sea_query::{ArrayType, ValueTypeErr};
23use sea_orm::{ColIdx, ColumnType, DbErr, QueryResult, TryGetError};
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use thiserror_ext::AsReport;
26use tracing::warn;
27
28use crate::catalog::source::OptionalAssociatedTableId;
29use crate::catalog::table::OptionalAssociatedSourceId;
30
31pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
32
33#[derive(Clone, Copy, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
34#[repr(transparent)]
35pub struct TypedId<const N: usize, P>(pub(crate) P);
36
37impl<const N: usize, P: std::fmt::Debug> std::fmt::Debug for TypedId<N, P> {
38    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39        <P as std::fmt::Debug>::fmt(&self.0, f)
40    }
41}
42
43impl<const N: usize, P: std::fmt::Display> std::fmt::Display for TypedId<N, P> {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        <P as std::fmt::Display>::fmt(&self.0, f)
46    }
47}
48
49impl<const N: usize, P: std::fmt::UpperHex> std::fmt::UpperHex for TypedId<N, P> {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        <P as std::fmt::UpperHex>::fmt(&self.0, f)
52    }
53}
54
55impl<const N: usize, P: PartialEq> PartialEq<P> for TypedId<N, P> {
56    fn eq(&self, other: &P) -> bool {
57        self.0 == *other
58    }
59}
60
61impl<const N: usize, P: FromStr> FromStr for TypedId<N, P> {
62    type Err = <P as FromStr>::Err;
63
64    fn from_str(s: &str) -> Result<Self, Self::Err> {
65        Ok(Self(<P as FromStr>::from_str(s)?))
66    }
67}
68
69impl<const N: usize, P> TypedId<N, P>
70where
71    Self: UniqueTypedIdDeclaration,
72{
73    pub const fn new(inner: P) -> Self {
74        TypedId(inner)
75    }
76
77    #[expect(clippy::wrong_self_convention)]
78    pub fn as_raw_id(self) -> P {
79        self.0
80    }
81
82    pub fn raw_slice(slice: &[Self]) -> &[P] {
83        // SAFETY: transparent repr
84        unsafe { std::mem::transmute(slice) }
85    }
86
87    pub fn mut_raw_vec(vec: &mut Vec<Self>) -> &mut Vec<P> {
88        // SAFETY: transparent repr
89        unsafe { std::mem::transmute(vec) }
90    }
91
92    pub fn raw_hash_map_ref<V>(map: &HashMap<Self, V>) -> &HashMap<P, V> {
93        // SAFETY: transparent repr
94        unsafe { std::mem::transmute(map) }
95    }
96
97    pub fn raw_hash_map_mut_ref<V>(map: &mut HashMap<Self, V>) -> &mut HashMap<P, V> {
98        // SAFETY: transparent repr
99        unsafe { std::mem::transmute(map) }
100    }
101
102    pub fn raw_btree_map_ref<V>(map: &BTreeMap<Self, V>) -> &BTreeMap<P, V> {
103        // SAFETY: transparent repr
104        unsafe { std::mem::transmute(map) }
105    }
106
107    pub fn raw_btree_map_mut_ref<V>(map: &mut BTreeMap<Self, V>) -> &mut BTreeMap<P, V> {
108        // SAFETY: transparent repr
109        unsafe { std::mem::transmute(map) }
110    }
111}
112
113type TypedU32Id<const N: usize> = TypedId<N, u32>;
114
115impl<const N: usize> TypedU32Id<N>
116where
117    Self: UniqueTypedIdDeclaration,
118{
119    pub const fn placeholder() -> Self {
120        Self(OBJECT_ID_PLACEHOLDER)
121    }
122
123    pub fn is_placeholder(&self) -> bool {
124        self.0 == OBJECT_ID_PLACEHOLDER
125    }
126
127    pub fn as_i32_id(self) -> i32 {
128        self.to_i32()
129    }
130
131    fn from_i32(inner: i32) -> Self {
132        Self(inner.try_into().unwrap_or_else(|e: TryFromIntError| {
133            if cfg!(debug_assertions) {
134                panic!(
135                    "invalid i32 id {} for {}: {:?}",
136                    inner,
137                    type_name::<Self>(),
138                    e.as_report()
139                );
140            } else {
141                warn!(
142                    "invalid i32 id {} for {}: {:?}",
143                    inner,
144                    type_name::<Self>(),
145                    e.as_report()
146                );
147                inner as _
148            }
149        }))
150    }
151
152    fn to_i32(self) -> i32 {
153        self.0.try_into().unwrap_or_else(|e: TryFromIntError| {
154            if cfg!(debug_assertions) {
155                panic!(
156                    "invalid u32 id {} for {}: {:?}",
157                    self.0,
158                    type_name::<Self>(),
159                    e.as_report()
160                );
161            } else {
162                warn!(
163                    "invalid u32 id {} for {}: {:?}",
164                    self.0,
165                    type_name::<Self>(),
166                    e.as_report()
167                );
168                self.0 as _
169            }
170        })
171    }
172}
173
174impl<const N: usize, P> From<P> for TypedId<N, P> {
175    fn from(id: P) -> Self {
176        Self(id)
177    }
178}
179
180impl<const N: usize> From<TypedU32Id<N>> for sea_orm::Value
181where
182    TypedU32Id<N>: UniqueTypedIdDeclaration,
183{
184    fn from(value: TypedU32Id<N>) -> Self {
185        sea_orm::Value::from(value.to_i32())
186    }
187}
188
189impl<const N: usize> sea_orm::sea_query::ValueType for TypedU32Id<N>
190where
191    Self: UniqueTypedIdDeclaration,
192{
193    fn try_from(v: sea_orm::Value) -> Result<Self, ValueTypeErr> {
194        let inner = <i32 as sea_orm::sea_query::ValueType>::try_from(v)?;
195        Ok(Self::from_i32(inner))
196    }
197
198    fn type_name() -> String {
199        <i32 as sea_orm::sea_query::ValueType>::type_name()
200    }
201
202    fn array_type() -> ArrayType {
203        <i32 as sea_orm::sea_query::ValueType>::array_type()
204    }
205
206    fn column_type() -> ColumnType {
207        <i32 as sea_orm::sea_query::ValueType>::column_type()
208    }
209}
210
211impl<const N: usize> sea_orm::sea_query::Nullable for TypedU32Id<N> {
212    fn null() -> sea_orm::Value {
213        <i32 as sea_orm::sea_query::Nullable>::null()
214    }
215}
216
217impl<const N: usize> sea_orm::TryGetable for TypedU32Id<N>
218where
219    Self: UniqueTypedIdDeclaration,
220{
221    fn try_get_by<I: ColIdx>(res: &QueryResult, index: I) -> Result<Self, TryGetError> {
222        let inner = <i32 as sea_orm::TryGetable>::try_get_by(res, index)?;
223        Ok(Self::from_i32(inner))
224    }
225}
226
227impl<const N: usize> sea_orm::TryFromU64 for TypedU32Id<N>
228where
229    Self: UniqueTypedIdDeclaration,
230{
231    fn try_from_u64(n: u64) -> Result<Self, DbErr> {
232        let inner = <i32 as sea_orm::TryFromU64>::try_from_u64(n)?;
233        Ok(Self::from_i32(inner))
234    }
235}
236
237impl<const N: usize, P: Serialize> Serialize for TypedId<N, P> {
238    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
239    where
240        S: Serializer,
241    {
242        <P as Serialize>::serialize(&self.0, serializer)
243    }
244}
245
246impl<'de, const N: usize, P: Deserialize<'de>> Deserialize<'de> for TypedId<N, P> {
247    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
248    where
249        D: Deserializer<'de>,
250    {
251        Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
252    }
253}
254
255impl<const N: usize> Add<u32> for TypedU32Id<N> {
256    type Output = Self;
257
258    fn add(self, rhs: u32) -> Self::Output {
259        Self(self.0.checked_add(rhs).unwrap())
260    }
261}
262
263impl<const N: usize> AddAssign<u32> for TypedU32Id<N> {
264    fn add_assign(&mut self, rhs: u32) {
265        self.0 = self.0.checked_add(rhs).unwrap()
266    }
267}
268
269#[expect(dead_code)]
270pub trait UniqueTypedIdDeclaration {}
271
272macro_rules! declare_id_type {
273    ($name:ident, $primitive:ty, $type_id:expr) => {
274        pub type $name = TypedId<{ $type_id }, $primitive>;
275        impl UniqueTypedIdDeclaration for $name {}
276    };
277}
278
279macro_rules! declare_id_types {
280    ($primitive:ty $(, $name:ident)+) => {
281        declare_id_types! {
282            $primitive, 0 $(, $name)+
283        }
284    };
285    ($primitive:ty, $next_type_id:expr) => {};
286    ($primitive:ty, $next_type_id:expr, $name:ident $(, $rest:ident)*) => {
287        declare_id_type! { $name, $primitive, $next_type_id }
288        declare_id_types! {
289            $primitive, $next_type_id + 1  $(, $rest)*
290        }
291    };
292    ($($invalid:tt)+) => {
293        compile_error!(stringify!($($invalid)+))
294    }
295}
296
297declare_id_types!(
298    u32,
299    TableId,
300    JobId,
301    DatabaseId,
302    SchemaId,
303    FragmentId,
304    ActorId,
305    WorkerId,
306    SinkId,
307    SourceId,
308    SubscriptionId,
309    IndexId,
310    ViewId,
311    FunctionId,
312    ConnectionId,
313    SecretId,
314    SubscriberId,
315    LocalOperatorId
316);
317
318declare_id_type!(ObjectId, u32, 256);
319
320declare_id_types!(
321    u64,
322    GlobalOperatorId,
323    StreamNodeLocalOperatorId,
324    ExecutorId,
325    PartialGraphId
326);
327
328macro_rules! impl_as {
329    (@func $target_id_name:ident, $alias_name:ident) => {
330        paste::paste! {
331            pub fn [< as_ $alias_name >](self) -> $target_id_name {
332                $target_id_name::new(self.0)
333            }
334        }
335    };
336    (@func $target_id_name:ident) => {
337        paste::paste! {
338            impl_as! { @func $target_id_name, [< $target_id_name:snake >] }
339        }
340    };
341    ($src_id_name:ident $(,$target_id_name:ident)* $(,{$orig_target_id_name:ident , $alias_name:ident})*) => {
342        impl $src_id_name {
343            $(
344                impl_as! { @func $target_id_name }
345            )*
346            $(
347                impl_as! { @func $orig_target_id_name, $alias_name }
348            )*
349        }
350    }
351}
352
353impl JobId {
354    pub fn is_mv_table_id(self, table_id: TableId) -> bool {
355        self.0 == table_id.0
356    }
357}
358
359impl_as!(JobId, SinkId, IndexId, SubscriberId, {TableId, mv_table_id}, {SourceId, shared_source_id});
360impl_as!(TableId, JobId);
361
362impl From<StreamNodeLocalOperatorId> for LocalOperatorId {
363    fn from(value: StreamNodeLocalOperatorId) -> Self {
364        assert!(
365            value.0 <= u32::MAX as u64,
366            "oversized operator id {} in stream node",
367            value.0
368        );
369        Self(value.0 as u32)
370    }
371}
372
373impl From<LocalOperatorId> for StreamNodeLocalOperatorId {
374    fn from(value: LocalOperatorId) -> Self {
375        Self(value.0 as u64)
376    }
377}
378
379impl From<OptionalAssociatedTableId> for TableId {
380    fn from(value: OptionalAssociatedTableId) -> Self {
381        let OptionalAssociatedTableId::AssociatedTableId(table_id) = value;
382        Self(table_id)
383    }
384}
385
386impl From<TableId> for OptionalAssociatedTableId {
387    fn from(value: TableId) -> Self {
388        OptionalAssociatedTableId::AssociatedTableId(value.0)
389    }
390}
391
392impl_as!(SinkId, JobId);
393impl_as!(IndexId, JobId);
394impl_as!(SourceId, {JobId, share_source_job_id}, {TableId, cdc_table_id});
395impl_as!(SubscriptionId, SubscriberId);
396
397impl From<OptionalAssociatedSourceId> for SourceId {
398    fn from(value: OptionalAssociatedSourceId) -> Self {
399        let OptionalAssociatedSourceId::AssociatedSourceId(source_id) = value;
400        Self(source_id)
401    }
402}
403
404impl From<SourceId> for OptionalAssociatedSourceId {
405    fn from(value: SourceId) -> Self {
406        OptionalAssociatedSourceId::AssociatedSourceId(value.0)
407    }
408}
409
410macro_rules! impl_into_object {
411    ($mod_prefix:ty, $($type_name:ident),+) => {
412        $(
413            impl From<$type_name> for $mod_prefix {
414                fn from(value: $type_name) -> Self {
415                    <$mod_prefix>::$type_name(value.0)
416                }
417            }
418        )+
419    };
420}
421
422impl_into_object!(
423    crate::user::grant_privilege::Object,
424    DatabaseId,
425    TableId,
426    SchemaId,
427    SinkId,
428    SourceId,
429    SubscriptionId,
430    ViewId,
431    FunctionId,
432    ConnectionId,
433    SecretId
434);
435
436impl_into_object!(
437    crate::ddl_service::alter_name_request::Object,
438    DatabaseId,
439    TableId,
440    SchemaId,
441    SinkId,
442    SourceId,
443    SubscriptionId,
444    IndexId,
445    ViewId
446);
447
448impl_into_object!(
449    crate::ddl_service::alter_owner_request::Object,
450    DatabaseId,
451    TableId,
452    SchemaId,
453    SinkId,
454    SourceId,
455    SubscriptionId,
456    ViewId,
457    ConnectionId,
458    FunctionId,
459    SecretId
460);
461
462impl_into_object!(
463    crate::ddl_service::alter_set_schema_request::Object,
464    TableId,
465    ViewId,
466    SourceId,
467    SinkId,
468    SubscriptionId,
469    FunctionId,
470    ConnectionId
471);
472
473macro_rules! impl_into_rename_object {
474    ($($type_name:ident),+) => {
475        paste::paste! {
476            $(
477                impl From<([<$type_name Id>], [<$type_name Id>])> for crate::ddl_service::alter_swap_rename_request::Object {
478                    fn from((src_object_id, dst_object_id): ([<$type_name Id>], [<$type_name Id>])) -> Self {
479                        crate::ddl_service::alter_swap_rename_request::Object::$type_name(crate::ddl_service::alter_swap_rename_request::ObjectNameSwapPair {
480                            src_object_id: src_object_id.as_object_id(),
481                            dst_object_id: dst_object_id.as_object_id(),
482                        })
483                    }
484                }
485            )+
486        }
487    };
488}
489
490impl_into_rename_object!(Table, View, Source, Sink, Subscription);
491
492macro_rules! impl_object_id_conversion {
493    ($($type_name:ident),+) => {
494        $(
495            impl From<$type_name> for ObjectId {
496                fn from(value: $type_name) -> Self {
497                    Self::new(value.0)
498                }
499            }
500
501            impl $type_name {
502                pub fn as_object_id(self) -> ObjectId {
503                    ObjectId::new(self.0)
504                }
505            }
506        )+
507
508        paste::paste! {
509            impl ObjectId {
510                $(
511                    pub fn [< as_ $type_name:snake>](self) -> $type_name {
512                        $type_name::new(self.0)
513                    }
514                )+
515            }
516        }
517    };
518}
519
520impl_object_id_conversion!(
521    DatabaseId,
522    TableId,
523    SchemaId,
524    SinkId,
525    SourceId,
526    JobId,
527    SubscriptionId,
528    IndexId,
529    ViewId,
530    FunctionId,
531    ConnectionId,
532    SecretId
533);