1use std::any::type_name;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Formatter;
18use std::num::TryFromIntError;
19use std::ops::{Add, AddAssign};
20use std::str::FromStr;
21
22use sea_orm::sea_query::{ArrayType, ValueTypeErr};
23use sea_orm::{ColIdx, ColumnType, DbErr, QueryResult, TryGetError};
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use thiserror_ext::AsReport;
26use tracing::warn;
27
28use crate::catalog::source::OptionalAssociatedTableId;
29use crate::catalog::table::OptionalAssociatedSourceId;
30
31pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
32
33#[derive(Clone, Copy, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
34#[repr(transparent)]
35pub struct TypedId<const N: usize, P>(pub(crate) P);
36
37impl<const N: usize, P: std::fmt::Debug> std::fmt::Debug for TypedId<N, P> {
38 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39 <P as std::fmt::Debug>::fmt(&self.0, f)
40 }
41}
42
43impl<const N: usize, P: std::fmt::Display> std::fmt::Display for TypedId<N, P> {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 <P as std::fmt::Display>::fmt(&self.0, f)
46 }
47}
48
49impl<const N: usize, P: std::fmt::UpperHex> std::fmt::UpperHex for TypedId<N, P> {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 <P as std::fmt::UpperHex>::fmt(&self.0, f)
52 }
53}
54
55impl<const N: usize, P: PartialEq> PartialEq<P> for TypedId<N, P> {
56 fn eq(&self, other: &P) -> bool {
57 self.0 == *other
58 }
59}
60
61impl<const N: usize, P: FromStr> FromStr for TypedId<N, P> {
62 type Err = <P as FromStr>::Err;
63
64 fn from_str(s: &str) -> Result<Self, Self::Err> {
65 Ok(Self(<P as FromStr>::from_str(s)?))
66 }
67}
68
69impl<const N: usize, P> TypedId<N, P>
70where
71 Self: UniqueTypedIdDeclaration,
72{
73 pub const fn new(inner: P) -> Self {
74 TypedId(inner)
75 }
76
77 #[expect(clippy::wrong_self_convention)]
78 pub fn as_raw_id(self) -> P {
79 self.0
80 }
81
82 pub fn raw_slice(slice: &[Self]) -> &[P] {
83 unsafe { std::mem::transmute(slice) }
85 }
86
87 pub fn mut_raw_vec(vec: &mut Vec<Self>) -> &mut Vec<P> {
88 unsafe { std::mem::transmute(vec) }
90 }
91
92 pub fn raw_hash_map_ref<V>(map: &HashMap<Self, V>) -> &HashMap<P, V> {
93 unsafe { std::mem::transmute(map) }
95 }
96
97 pub fn raw_hash_map_mut_ref<V>(map: &mut HashMap<Self, V>) -> &mut HashMap<P, V> {
98 unsafe { std::mem::transmute(map) }
100 }
101
102 pub fn raw_btree_map_ref<V>(map: &BTreeMap<Self, V>) -> &BTreeMap<P, V> {
103 unsafe { std::mem::transmute(map) }
105 }
106
107 pub fn raw_btree_map_mut_ref<V>(map: &mut BTreeMap<Self, V>) -> &mut BTreeMap<P, V> {
108 unsafe { std::mem::transmute(map) }
110 }
111}
112
113type TypedU32Id<const N: usize> = TypedId<N, u32>;
114
115impl<const N: usize> TypedU32Id<N>
116where
117 Self: UniqueTypedIdDeclaration,
118{
119 pub const fn placeholder() -> Self {
120 Self(OBJECT_ID_PLACEHOLDER)
121 }
122
123 pub fn is_placeholder(&self) -> bool {
124 self.0 == OBJECT_ID_PLACEHOLDER
125 }
126
127 pub fn as_i32_id(self) -> i32 {
128 self.to_i32()
129 }
130
131 fn from_i32(inner: i32) -> Self {
132 Self(inner.try_into().unwrap_or_else(|e: TryFromIntError| {
133 if cfg!(debug_assertions) {
134 panic!(
135 "invalid i32 id {} for {}: {:?}",
136 inner,
137 type_name::<Self>(),
138 e.as_report()
139 );
140 } else {
141 warn!(
142 "invalid i32 id {} for {}: {:?}",
143 inner,
144 type_name::<Self>(),
145 e.as_report()
146 );
147 inner as _
148 }
149 }))
150 }
151
152 fn to_i32(self) -> i32 {
153 self.0.try_into().unwrap_or_else(|e: TryFromIntError| {
154 if cfg!(debug_assertions) {
155 panic!(
156 "invalid u32 id {} for {}: {:?}",
157 self.0,
158 type_name::<Self>(),
159 e.as_report()
160 );
161 } else {
162 warn!(
163 "invalid u32 id {} for {}: {:?}",
164 self.0,
165 type_name::<Self>(),
166 e.as_report()
167 );
168 self.0 as _
169 }
170 })
171 }
172}
173
174impl<const N: usize, P> From<P> for TypedId<N, P> {
175 fn from(id: P) -> Self {
176 Self(id)
177 }
178}
179
180impl<const N: usize> From<TypedU32Id<N>> for sea_orm::Value
181where
182 TypedU32Id<N>: UniqueTypedIdDeclaration,
183{
184 fn from(value: TypedU32Id<N>) -> Self {
185 sea_orm::Value::from(value.to_i32())
186 }
187}
188
189impl<const N: usize> sea_orm::sea_query::ValueType for TypedU32Id<N>
190where
191 Self: UniqueTypedIdDeclaration,
192{
193 fn try_from(v: sea_orm::Value) -> Result<Self, ValueTypeErr> {
194 let inner = <i32 as sea_orm::sea_query::ValueType>::try_from(v)?;
195 Ok(Self::from_i32(inner))
196 }
197
198 fn type_name() -> String {
199 <i32 as sea_orm::sea_query::ValueType>::type_name()
200 }
201
202 fn array_type() -> ArrayType {
203 <i32 as sea_orm::sea_query::ValueType>::array_type()
204 }
205
206 fn column_type() -> ColumnType {
207 <i32 as sea_orm::sea_query::ValueType>::column_type()
208 }
209}
210
211impl<const N: usize> sea_orm::sea_query::Nullable for TypedU32Id<N> {
212 fn null() -> sea_orm::Value {
213 <i32 as sea_orm::sea_query::Nullable>::null()
214 }
215}
216
217impl<const N: usize> sea_orm::TryGetable for TypedU32Id<N>
218where
219 Self: UniqueTypedIdDeclaration,
220{
221 fn try_get_by<I: ColIdx>(res: &QueryResult, index: I) -> Result<Self, TryGetError> {
222 let inner = <i32 as sea_orm::TryGetable>::try_get_by(res, index)?;
223 Ok(Self::from_i32(inner))
224 }
225}
226
227impl<const N: usize> sea_orm::TryFromU64 for TypedU32Id<N>
228where
229 Self: UniqueTypedIdDeclaration,
230{
231 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
232 let inner = <i32 as sea_orm::TryFromU64>::try_from_u64(n)?;
233 Ok(Self::from_i32(inner))
234 }
235}
236
237impl<const N: usize, P: Serialize> Serialize for TypedId<N, P> {
238 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
239 where
240 S: Serializer,
241 {
242 <P as Serialize>::serialize(&self.0, serializer)
243 }
244}
245
246impl<'de, const N: usize, P: Deserialize<'de>> Deserialize<'de> for TypedId<N, P> {
247 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
248 where
249 D: Deserializer<'de>,
250 {
251 Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
252 }
253}
254
255impl<const N: usize> Add<u32> for TypedU32Id<N> {
256 type Output = Self;
257
258 fn add(self, rhs: u32) -> Self::Output {
259 Self(self.0.checked_add(rhs).unwrap())
260 }
261}
262
263impl<const N: usize> AddAssign<u32> for TypedU32Id<N> {
264 fn add_assign(&mut self, rhs: u32) {
265 self.0 = self.0.checked_add(rhs).unwrap()
266 }
267}
268
269#[expect(dead_code)]
270pub trait UniqueTypedIdDeclaration {}
271
272macro_rules! declare_id_type {
273 ($name:ident, $primitive:ty, $type_id:expr) => {
274 pub type $name = TypedId<{ $type_id }, $primitive>;
275 impl UniqueTypedIdDeclaration for $name {}
276 };
277}
278
279macro_rules! declare_id_types {
280 ($primitive:ty $(, $name:ident)+) => {
281 declare_id_types! {
282 $primitive, 0 $(, $name)+
283 }
284 };
285 ($primitive:ty, $next_type_id:expr) => {};
286 ($primitive:ty, $next_type_id:expr, $name:ident $(, $rest:ident)*) => {
287 declare_id_type! { $name, $primitive, $next_type_id }
288 declare_id_types! {
289 $primitive, $next_type_id + 1 $(, $rest)*
290 }
291 };
292 ($($invalid:tt)+) => {
293 compile_error!(stringify!($($invalid)+))
294 }
295}
296
297declare_id_types!(
298 u32,
299 TableId,
300 JobId,
301 DatabaseId,
302 SchemaId,
303 FragmentId,
304 ActorId,
305 WorkerId,
306 SinkId,
307 SourceId,
308 SubscriptionId,
309 IndexId,
310 ViewId,
311 FunctionId,
312 ConnectionId,
313 SecretId,
314 SubscriberId,
315 LocalOperatorId
316);
317
318declare_id_type!(ObjectId, u32, 256);
319
320declare_id_types!(u64, GlobalOperatorId, StreamNodeLocalOperatorId, ExecutorId);
321
322macro_rules! impl_as {
323 (@func $target_id_name:ident, $alias_name:ident) => {
324 paste::paste! {
325 pub fn [< as_ $alias_name >](self) -> $target_id_name {
326 $target_id_name::new(self.0)
327 }
328 }
329 };
330 (@func $target_id_name:ident) => {
331 paste::paste! {
332 impl_as! { @func $target_id_name, [< $target_id_name:snake >] }
333 }
334 };
335 ($src_id_name:ident $(,$target_id_name:ident)* $(,{$orig_target_id_name:ident , $alias_name:ident})*) => {
336 impl $src_id_name {
337 $(
338 impl_as! { @func $target_id_name }
339 )*
340 $(
341 impl_as! { @func $orig_target_id_name, $alias_name }
342 )*
343 }
344 }
345}
346
347impl JobId {
348 pub fn is_mv_table_id(self, table_id: TableId) -> bool {
349 self.0 == table_id.0
350 }
351}
352
353impl_as!(JobId, SinkId, IndexId, SubscriberId, {TableId, mv_table_id}, {SourceId, shared_source_id});
354impl_as!(TableId, JobId);
355
356impl From<StreamNodeLocalOperatorId> for LocalOperatorId {
357 fn from(value: StreamNodeLocalOperatorId) -> Self {
358 assert!(
359 value.0 <= u32::MAX as u64,
360 "oversized operator id {} in stream node",
361 value.0
362 );
363 Self(value.0 as u32)
364 }
365}
366
367impl From<LocalOperatorId> for StreamNodeLocalOperatorId {
368 fn from(value: LocalOperatorId) -> Self {
369 Self(value.0 as u64)
370 }
371}
372
373impl From<OptionalAssociatedTableId> for TableId {
374 fn from(value: OptionalAssociatedTableId) -> Self {
375 let OptionalAssociatedTableId::AssociatedTableId(table_id) = value;
376 Self(table_id)
377 }
378}
379
380impl From<TableId> for OptionalAssociatedTableId {
381 fn from(value: TableId) -> Self {
382 OptionalAssociatedTableId::AssociatedTableId(value.0)
383 }
384}
385
386impl_as!(SinkId, JobId);
387impl_as!(IndexId, JobId);
388impl_as!(SourceId, {JobId, share_source_job_id}, {TableId, cdc_table_id});
389impl_as!(SubscriptionId, SubscriberId);
390
391impl From<OptionalAssociatedSourceId> for SourceId {
392 fn from(value: OptionalAssociatedSourceId) -> Self {
393 let OptionalAssociatedSourceId::AssociatedSourceId(source_id) = value;
394 Self(source_id)
395 }
396}
397
398impl From<SourceId> for OptionalAssociatedSourceId {
399 fn from(value: SourceId) -> Self {
400 OptionalAssociatedSourceId::AssociatedSourceId(value.0)
401 }
402}
403
404macro_rules! impl_into_object {
405 ($mod_prefix:ty, $($type_name:ident),+) => {
406 $(
407 impl From<$type_name> for $mod_prefix {
408 fn from(value: $type_name) -> Self {
409 <$mod_prefix>::$type_name(value.0)
410 }
411 }
412 )+
413 };
414}
415
416impl_into_object!(
417 crate::user::grant_privilege::Object,
418 DatabaseId,
419 TableId,
420 SchemaId,
421 SinkId,
422 SourceId,
423 SubscriptionId,
424 ViewId,
425 FunctionId,
426 ConnectionId,
427 SecretId
428);
429
430impl_into_object!(
431 crate::ddl_service::alter_name_request::Object,
432 DatabaseId,
433 TableId,
434 SchemaId,
435 SinkId,
436 SourceId,
437 SubscriptionId,
438 IndexId,
439 ViewId
440);
441
442impl_into_object!(
443 crate::ddl_service::alter_owner_request::Object,
444 DatabaseId,
445 TableId,
446 SchemaId,
447 SinkId,
448 SourceId,
449 SubscriptionId,
450 ViewId,
451 ConnectionId
452);
453
454impl_into_object!(
455 crate::ddl_service::alter_set_schema_request::Object,
456 TableId,
457 ViewId,
458 SourceId,
459 SinkId,
460 SubscriptionId,
461 FunctionId,
462 ConnectionId
463);
464
465macro_rules! impl_into_rename_object {
466 ($($type_name:ident),+) => {
467 paste::paste! {
468 $(
469 impl From<([<$type_name Id>], [<$type_name Id>])> for crate::ddl_service::alter_swap_rename_request::Object {
470 fn from((src_object_id, dst_object_id): ([<$type_name Id>], [<$type_name Id>])) -> Self {
471 crate::ddl_service::alter_swap_rename_request::Object::$type_name(crate::ddl_service::alter_swap_rename_request::ObjectNameSwapPair {
472 src_object_id: src_object_id.as_object_id(),
473 dst_object_id: dst_object_id.as_object_id(),
474 })
475 }
476 }
477 )+
478 }
479 };
480}
481
482impl_into_rename_object!(Table, View, Source, Sink, Subscription);
483
484macro_rules! impl_object_id_conversion {
485 ($($type_name:ident),+) => {
486 $(
487 impl From<$type_name> for ObjectId {
488 fn from(value: $type_name) -> Self {
489 Self::new(value.0)
490 }
491 }
492
493 impl $type_name {
494 pub fn as_object_id(self) -> ObjectId {
495 ObjectId::new(self.0)
496 }
497 }
498 )+
499
500 paste::paste! {
501 impl ObjectId {
502 $(
503 pub fn [< as_ $type_name:snake>](self) -> $type_name {
504 $type_name::new(self.0)
505 }
506 )+
507 }
508 }
509 };
510}
511
512impl_object_id_conversion!(
513 DatabaseId,
514 TableId,
515 SchemaId,
516 SinkId,
517 SourceId,
518 JobId,
519 SubscriptionId,
520 IndexId,
521 ViewId,
522 FunctionId,
523 ConnectionId,
524 SecretId
525);