1use std::any::type_name;
16use std::collections::{BTreeMap, HashMap};
17use std::fmt::Formatter;
18use std::num::TryFromIntError;
19use std::ops::{Add, AddAssign};
20use std::str::FromStr;
21
22use sea_orm::sea_query::{ArrayType, ValueTypeErr};
23use sea_orm::{ColIdx, ColumnType, DbErr, QueryResult, TryGetError};
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use thiserror_ext::AsReport;
26use tracing::warn;
27
28use crate::catalog::source::OptionalAssociatedTableId;
29use crate::catalog::table::OptionalAssociatedSourceId;
30
31pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
32
33#[derive(Clone, Copy, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
34#[repr(transparent)]
35pub struct TypedId<const N: usize, P>(pub(crate) P);
36
37impl<const N: usize, P: std::fmt::Debug> std::fmt::Debug for TypedId<N, P> {
38 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
39 <P as std::fmt::Debug>::fmt(&self.0, f)
40 }
41}
42
43impl<const N: usize, P: std::fmt::Display> std::fmt::Display for TypedId<N, P> {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 <P as std::fmt::Display>::fmt(&self.0, f)
46 }
47}
48
49impl<const N: usize, P: std::fmt::UpperHex> std::fmt::UpperHex for TypedId<N, P> {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 <P as std::fmt::UpperHex>::fmt(&self.0, f)
52 }
53}
54
55impl<const N: usize, P: PartialEq> PartialEq<P> for TypedId<N, P> {
56 fn eq(&self, other: &P) -> bool {
57 self.0 == *other
58 }
59}
60
61impl<const N: usize, P: FromStr> FromStr for TypedId<N, P> {
62 type Err = <P as FromStr>::Err;
63
64 fn from_str(s: &str) -> Result<Self, Self::Err> {
65 Ok(Self(<P as FromStr>::from_str(s)?))
66 }
67}
68
69impl<const N: usize, P> TypedId<N, P>
70where
71 Self: UniqueTypedIdDeclaration,
72{
73 pub const fn new(inner: P) -> Self {
74 TypedId(inner)
75 }
76
77 #[expect(clippy::wrong_self_convention)]
78 pub fn as_raw_id(self) -> P {
79 self.0
80 }
81
82 pub fn raw_slice(slice: &[Self]) -> &[P] {
83 unsafe { std::mem::transmute(slice) }
85 }
86
87 pub fn mut_raw_vec(vec: &mut Vec<Self>) -> &mut Vec<P> {
88 unsafe { std::mem::transmute(vec) }
90 }
91
92 pub fn raw_hash_map_ref<V>(map: &HashMap<Self, V>) -> &HashMap<P, V> {
93 unsafe { std::mem::transmute(map) }
95 }
96
97 pub fn raw_hash_map_mut_ref<V>(map: &mut HashMap<Self, V>) -> &mut HashMap<P, V> {
98 unsafe { std::mem::transmute(map) }
100 }
101
102 pub fn raw_btree_map_ref<V>(map: &BTreeMap<Self, V>) -> &BTreeMap<P, V> {
103 unsafe { std::mem::transmute(map) }
105 }
106
107 pub fn raw_btree_map_mut_ref<V>(map: &mut BTreeMap<Self, V>) -> &mut BTreeMap<P, V> {
108 unsafe { std::mem::transmute(map) }
110 }
111}
112
113type TypedU32Id<const N: usize> = TypedId<N, u32>;
114
115impl<const N: usize> TypedU32Id<N>
116where
117 Self: UniqueTypedIdDeclaration,
118{
119 pub const fn placeholder() -> Self {
120 Self(OBJECT_ID_PLACEHOLDER)
121 }
122
123 pub fn is_placeholder(&self) -> bool {
124 self.0 == OBJECT_ID_PLACEHOLDER
125 }
126
127 pub fn as_i32_id(self) -> i32 {
128 self.to_i32()
129 }
130
131 fn from_i32(inner: i32) -> Self {
132 Self(inner.try_into().unwrap_or_else(|e: TryFromIntError| {
133 if cfg!(debug_assertions) {
134 panic!(
135 "invalid i32 id {} for {}: {:?}",
136 inner,
137 type_name::<Self>(),
138 e.as_report()
139 );
140 } else {
141 warn!(
142 "invalid i32 id {} for {}: {:?}",
143 inner,
144 type_name::<Self>(),
145 e.as_report()
146 );
147 inner as _
148 }
149 }))
150 }
151
152 fn to_i32(self) -> i32 {
153 self.0.try_into().unwrap_or_else(|e: TryFromIntError| {
154 if cfg!(debug_assertions) {
155 panic!(
156 "invalid u32 id {} for {}: {:?}",
157 self.0,
158 type_name::<Self>(),
159 e.as_report()
160 );
161 } else {
162 warn!(
163 "invalid u32 id {} for {}: {:?}",
164 self.0,
165 type_name::<Self>(),
166 e.as_report()
167 );
168 self.0 as _
169 }
170 })
171 }
172}
173
174impl<const N: usize, P> From<P> for TypedId<N, P> {
175 fn from(id: P) -> Self {
176 Self(id)
177 }
178}
179
180impl<const N: usize> From<TypedU32Id<N>> for sea_orm::Value
181where
182 TypedU32Id<N>: UniqueTypedIdDeclaration,
183{
184 fn from(value: TypedU32Id<N>) -> Self {
185 sea_orm::Value::from(value.to_i32())
186 }
187}
188
189impl<const N: usize> sea_orm::sea_query::ValueType for TypedU32Id<N>
190where
191 Self: UniqueTypedIdDeclaration,
192{
193 fn try_from(v: sea_orm::Value) -> Result<Self, ValueTypeErr> {
194 let inner = <i32 as sea_orm::sea_query::ValueType>::try_from(v)?;
195 Ok(Self::from_i32(inner))
196 }
197
198 fn type_name() -> String {
199 <i32 as sea_orm::sea_query::ValueType>::type_name()
200 }
201
202 fn array_type() -> ArrayType {
203 <i32 as sea_orm::sea_query::ValueType>::array_type()
204 }
205
206 fn column_type() -> ColumnType {
207 <i32 as sea_orm::sea_query::ValueType>::column_type()
208 }
209}
210
211impl<const N: usize> sea_orm::sea_query::Nullable for TypedU32Id<N> {
212 fn null() -> sea_orm::Value {
213 <i32 as sea_orm::sea_query::Nullable>::null()
214 }
215}
216
217impl<const N: usize> sea_orm::TryGetable for TypedU32Id<N>
218where
219 Self: UniqueTypedIdDeclaration,
220{
221 fn try_get_by<I: ColIdx>(res: &QueryResult, index: I) -> Result<Self, TryGetError> {
222 let inner = <i32 as sea_orm::TryGetable>::try_get_by(res, index)?;
223 Ok(Self::from_i32(inner))
224 }
225}
226
227impl<const N: usize> sea_orm::TryFromU64 for TypedU32Id<N>
228where
229 Self: UniqueTypedIdDeclaration,
230{
231 fn try_from_u64(n: u64) -> Result<Self, DbErr> {
232 let inner = <i32 as sea_orm::TryFromU64>::try_from_u64(n)?;
233 Ok(Self::from_i32(inner))
234 }
235}
236
237impl<const N: usize, P: Serialize> Serialize for TypedId<N, P> {
238 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
239 where
240 S: Serializer,
241 {
242 <P as Serialize>::serialize(&self.0, serializer)
243 }
244}
245
246impl<'de, const N: usize, P: Deserialize<'de>> Deserialize<'de> for TypedId<N, P> {
247 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
248 where
249 D: Deserializer<'de>,
250 {
251 Ok(Self(<P as Deserialize>::deserialize(deserializer)?))
252 }
253}
254
255impl<const N: usize> Add<u32> for TypedU32Id<N> {
256 type Output = Self;
257
258 fn add(self, rhs: u32) -> Self::Output {
259 Self(self.0.checked_add(rhs).unwrap())
260 }
261}
262
263impl<const N: usize> AddAssign<u32> for TypedU32Id<N> {
264 fn add_assign(&mut self, rhs: u32) {
265 self.0 = self.0.checked_add(rhs).unwrap()
266 }
267}
268
269#[expect(dead_code)]
270pub trait UniqueTypedIdDeclaration {}
271
272macro_rules! declare_id_type {
273 ($name:ident, $primitive:ty, $type_id:expr) => {
274 pub type $name = TypedId<{ $type_id }, $primitive>;
275 impl UniqueTypedIdDeclaration for $name {}
276 };
277}
278
279macro_rules! declare_id_types {
280 ($primitive:ty $(, $name:ident)+) => {
281 declare_id_types! {
282 $primitive, 0 $(, $name)+
283 }
284 };
285 ($primitive:ty, $next_type_id:expr) => {};
286 ($primitive:ty, $next_type_id:expr, $name:ident $(, $rest:ident)*) => {
287 declare_id_type! { $name, $primitive, $next_type_id }
288 declare_id_types! {
289 $primitive, $next_type_id + 1 $(, $rest)*
290 }
291 };
292 ($($invalid:tt)+) => {
293 compile_error!(stringify!($($invalid)+))
294 }
295}
296
297declare_id_types!(
298 u32,
299 TableId,
300 JobId,
301 DatabaseId,
302 SchemaId,
303 FragmentId,
304 ActorId,
305 WorkerId,
306 SinkId,
307 SourceId,
308 SubscriptionId,
309 IndexId,
310 ViewId,
311 FunctionId,
312 ConnectionId,
313 SecretId,
314 SubscriberId,
315 LocalOperatorId
316);
317
318declare_id_type!(ObjectId, u32, 256);
319
320declare_id_types!(
321 u64,
322 GlobalOperatorId,
323 StreamNodeLocalOperatorId,
324 ExecutorId,
325 PartialGraphId
326);
327
328macro_rules! impl_as {
329 (@func $target_id_name:ident, $alias_name:ident) => {
330 paste::paste! {
331 pub fn [< as_ $alias_name >](self) -> $target_id_name {
332 $target_id_name::new(self.0)
333 }
334 }
335 };
336 (@func $target_id_name:ident) => {
337 paste::paste! {
338 impl_as! { @func $target_id_name, [< $target_id_name:snake >] }
339 }
340 };
341 ($src_id_name:ident $(,$target_id_name:ident)* $(,{$orig_target_id_name:ident , $alias_name:ident})*) => {
342 impl $src_id_name {
343 $(
344 impl_as! { @func $target_id_name }
345 )*
346 $(
347 impl_as! { @func $orig_target_id_name, $alias_name }
348 )*
349 }
350 }
351}
352
353impl JobId {
354 pub fn is_mv_table_id(self, table_id: TableId) -> bool {
355 self.0 == table_id.0
356 }
357}
358
359impl_as!(JobId, SinkId, IndexId, SubscriberId, {TableId, mv_table_id}, {SourceId, shared_source_id});
360impl_as!(TableId, JobId);
361
362impl From<StreamNodeLocalOperatorId> for LocalOperatorId {
363 fn from(value: StreamNodeLocalOperatorId) -> Self {
364 assert!(
365 value.0 <= u32::MAX as u64,
366 "oversized operator id {} in stream node",
367 value.0
368 );
369 Self(value.0 as u32)
370 }
371}
372
373impl From<LocalOperatorId> for StreamNodeLocalOperatorId {
374 fn from(value: LocalOperatorId) -> Self {
375 Self(value.0 as u64)
376 }
377}
378
379impl From<OptionalAssociatedTableId> for TableId {
380 fn from(value: OptionalAssociatedTableId) -> Self {
381 let OptionalAssociatedTableId::AssociatedTableId(table_id) = value;
382 Self(table_id)
383 }
384}
385
386impl From<TableId> for OptionalAssociatedTableId {
387 fn from(value: TableId) -> Self {
388 OptionalAssociatedTableId::AssociatedTableId(value.0)
389 }
390}
391
392impl_as!(SinkId, JobId);
393impl_as!(IndexId, JobId);
394impl_as!(SourceId, {JobId, share_source_job_id}, {TableId, cdc_table_id});
395impl_as!(SubscriptionId, SubscriberId);
396
397impl From<OptionalAssociatedSourceId> for SourceId {
398 fn from(value: OptionalAssociatedSourceId) -> Self {
399 let OptionalAssociatedSourceId::AssociatedSourceId(source_id) = value;
400 Self(source_id)
401 }
402}
403
404impl From<SourceId> for OptionalAssociatedSourceId {
405 fn from(value: SourceId) -> Self {
406 OptionalAssociatedSourceId::AssociatedSourceId(value.0)
407 }
408}
409
410macro_rules! impl_into_object {
411 ($mod_prefix:ty, $($type_name:ident),+) => {
412 $(
413 impl From<$type_name> for $mod_prefix {
414 fn from(value: $type_name) -> Self {
415 <$mod_prefix>::$type_name(value.0)
416 }
417 }
418 )+
419 };
420}
421
422impl_into_object!(
423 crate::user::grant_privilege::Object,
424 DatabaseId,
425 TableId,
426 SchemaId,
427 SinkId,
428 SourceId,
429 SubscriptionId,
430 ViewId,
431 FunctionId,
432 ConnectionId,
433 SecretId
434);
435
436impl_into_object!(
437 crate::ddl_service::alter_name_request::Object,
438 DatabaseId,
439 TableId,
440 SchemaId,
441 SinkId,
442 SourceId,
443 SubscriptionId,
444 IndexId,
445 ViewId
446);
447
448impl_into_object!(
449 crate::ddl_service::alter_owner_request::Object,
450 DatabaseId,
451 TableId,
452 SchemaId,
453 SinkId,
454 SourceId,
455 SubscriptionId,
456 ViewId,
457 ConnectionId,
458 FunctionId,
459 SecretId
460);
461
462impl_into_object!(
463 crate::ddl_service::alter_set_schema_request::Object,
464 TableId,
465 ViewId,
466 SourceId,
467 SinkId,
468 SubscriptionId,
469 FunctionId,
470 ConnectionId
471);
472
473macro_rules! impl_into_rename_object {
474 ($($type_name:ident),+) => {
475 paste::paste! {
476 $(
477 impl From<([<$type_name Id>], [<$type_name Id>])> for crate::ddl_service::alter_swap_rename_request::Object {
478 fn from((src_object_id, dst_object_id): ([<$type_name Id>], [<$type_name Id>])) -> Self {
479 crate::ddl_service::alter_swap_rename_request::Object::$type_name(crate::ddl_service::alter_swap_rename_request::ObjectNameSwapPair {
480 src_object_id: src_object_id.as_object_id(),
481 dst_object_id: dst_object_id.as_object_id(),
482 })
483 }
484 }
485 )+
486 }
487 };
488}
489
490impl_into_rename_object!(Table, View, Source, Sink, Subscription);
491
492macro_rules! impl_object_id_conversion {
493 ($($type_name:ident),+) => {
494 $(
495 impl From<$type_name> for ObjectId {
496 fn from(value: $type_name) -> Self {
497 Self::new(value.0)
498 }
499 }
500
501 impl $type_name {
502 pub fn as_object_id(self) -> ObjectId {
503 ObjectId::new(self.0)
504 }
505 }
506 )+
507
508 paste::paste! {
509 impl ObjectId {
510 $(
511 pub fn [< as_ $type_name:snake>](self) -> $type_name {
512 $type_name::new(self.0)
513 }
514 )+
515 }
516 }
517 };
518}
519
520impl_object_id_conversion!(
521 DatabaseId,
522 TableId,
523 SchemaId,
524 SinkId,
525 SourceId,
526 JobId,
527 SubscriptionId,
528 IndexId,
529 ViewId,
530 FunctionId,
531 ConnectionId,
532 SecretId
533);