1mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33 CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34 StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42pub use crate::id::{
43 ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, TableId,
44};
45
46pub type CatalogVersion = u64;
48
49pub type TableVersionId = u64;
51pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
53pub type SourceVersionId = u64;
55pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
57
58pub const DEFAULT_DATABASE_NAME: &str = "dev";
59pub const DEFAULT_SCHEMA_NAME: &str = "public";
60pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
61pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
62pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
63pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
64pub const DEFAULT_SUPER_USER: &str = "root";
65pub const DEFAULT_SUPER_USER_ID: u32 = 1;
66pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
68pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
69
70pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
72pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: u32 = 3;
73
74pub const NON_RESERVED_USER_ID: i32 = 11;
75
76pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
77pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
78
79pub use risingwave_pb::id::OBJECT_ID_PLACEHOLDER;
80
81pub const SYSTEM_SCHEMAS: [&str; 3] = [
82 PG_CATALOG_SCHEMA_NAME,
83 INFORMATION_SCHEMA_SCHEMA_NAME,
84 RW_CATALOG_SCHEMA_NAME,
85];
86pub fn is_system_schema(schema_name: &str) -> bool {
87 SYSTEM_SCHEMAS.contains(&schema_name)
88}
89
90pub fn is_reserved_admin_user(user_name: &str) -> bool {
91 user_name == DEFAULT_SUPER_USER_FOR_ADMIN
92}
93
94pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
95
96pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
100
101pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
102 match version {
103 ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
104 _ => DEFAULT_KEY_COLUMN_NAME,
105 }
106}
107
108pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
113
114pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
119
120pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
121pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
123
124pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
128
129pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
130pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
131
132pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
133pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
134pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
135
136pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
137pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
140pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
141
142pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
143pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
144
145pub trait SysCatalogReader: Sync + Send + 'static {
147 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
149}
150
151pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
152
153#[derive(Clone, Debug, PartialEq, Default, Copy)]
154pub struct TableOption {
155 pub retention_seconds: Option<u32>, }
157
158impl From<&risingwave_pb::hummock::TableOption> for TableOption {
159 fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
160 Self {
161 retention_seconds: table_option.retention_seconds,
162 }
163 }
164}
165
166impl From<&TableOption> for risingwave_pb::hummock::TableOption {
167 fn from(table_option: &TableOption) -> Self {
168 Self {
169 retention_seconds: table_option.retention_seconds,
170 }
171 }
172}
173
174impl TableOption {
175 pub fn new(retention_seconds: Option<u32>) -> Self {
176 TableOption { retention_seconds }
178 }
179}
180
181#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
182#[display("{user_id}")]
183pub struct UserId {
184 pub user_id: u32,
185}
186
187impl UserId {
188 pub const fn new(user_id: u32) -> Self {
189 UserId { user_id }
190 }
191
192 pub const fn placeholder() -> Self {
193 UserId {
194 user_id: OBJECT_ID_PLACEHOLDER,
195 }
196 }
197}
198
199impl From<u32> for UserId {
200 fn from(id: u32) -> Self {
201 Self::new(id)
202 }
203}
204
205impl From<&u32> for UserId {
206 fn from(id: &u32) -> Self {
207 Self::new(*id)
208 }
209}
210
211impl From<UserId> for u32 {
212 fn from(id: UserId) -> Self {
213 id.user_id
214 }
215}
216
217#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
218pub enum ConflictBehavior {
219 #[default]
220 NoCheck,
221 Overwrite,
222 IgnoreConflict,
223 DoUpdateIfNotNull,
224}
225
226#[macro_export]
227macro_rules! _checked_conflict_behaviors {
228 () => {
229 ConflictBehavior::Overwrite
230 | ConflictBehavior::IgnoreConflict
231 | ConflictBehavior::DoUpdateIfNotNull
232 };
233}
234pub use _checked_conflict_behaviors as checked_conflict_behaviors;
235
236impl ConflictBehavior {
237 pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
238 match tb_conflict_behavior {
239 PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
240 PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
241 PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
242 PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
245 ConflictBehavior::NoCheck
246 }
247 }
248 }
249
250 pub fn to_protobuf(self) -> PbHandleConflictBehavior {
251 match self {
252 ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
253 ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
254 ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
255 ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
256 }
257 }
258
259 pub fn debug_to_string(self) -> String {
260 match self {
261 ConflictBehavior::NoCheck => "NoCheck".to_owned(),
262 ConflictBehavior::Overwrite => "Overwrite".to_owned(),
263 ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
264 ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
265 }
266 }
267}
268
269#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
270pub enum Engine {
271 #[default]
272 Hummock,
273 Iceberg,
274}
275
276impl Engine {
277 pub fn from_protobuf(engine: &PbEngine) -> Self {
278 match engine {
279 PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
280 PbEngine::Iceberg => Engine::Iceberg,
281 }
282 }
283
284 pub fn to_protobuf(self) -> PbEngine {
285 match self {
286 Engine::Hummock => PbEngine::Hummock,
287 Engine::Iceberg => PbEngine::Iceberg,
288 }
289 }
290
291 pub fn debug_to_string(self) -> String {
292 match self {
293 Engine::Hummock => "Hummock".to_owned(),
294 Engine::Iceberg => "Iceberg".to_owned(),
295 }
296 }
297}
298
299#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
300pub enum StreamJobStatus {
301 #[default]
302 Creating,
303 Created,
304}
305
306impl StreamJobStatus {
307 pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
308 match stream_job_status {
309 PbStreamJobStatus::Creating => StreamJobStatus::Creating,
310 PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
311 }
312 }
313
314 pub fn to_proto(self) -> PbStreamJobStatus {
315 match self {
316 StreamJobStatus::Creating => PbStreamJobStatus::Creating,
317 StreamJobStatus::Created => PbStreamJobStatus::Created,
318 }
319 }
320}
321
322#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord, Default)]
323pub enum CreateType {
324 #[default]
325 Foreground,
326 Background,
327}
328
329impl CreateType {
330 pub fn from_proto(pb_create_type: PbCreateType) -> Self {
331 match pb_create_type {
332 PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
333 PbCreateType::Background => CreateType::Background,
334 }
335 }
336
337 pub fn to_proto(self) -> PbCreateType {
338 match self {
339 CreateType::Foreground => PbCreateType::Foreground,
340 CreateType::Background => PbCreateType::Background,
341 }
342 }
343}
344
345#[derive(Clone, Debug)]
346pub enum AlterDatabaseParam {
347 BarrierIntervalMs(Option<u32>),
350 CheckpointFrequency(Option<u64>),
351}
352
353macro_rules! for_all_fragment_type_flags {
354 () => {
355 for_all_fragment_type_flags! {
356 {
357 Source,
358 Mview,
359 Sink,
360 Now,
361 StreamScan,
362 BarrierRecv,
363 Values,
364 Dml,
365 CdcFilter,
366 Skipped1,
367 SourceScan,
368 SnapshotBackfillStreamScan,
369 FsFetch,
370 CrossDbSnapshotBackfillStreamScan,
371 StreamCdcScan,
372 VectorIndexWrite,
373 UpstreamSinkUnion,
374 LocalityProvider
375 },
376 {},
377 0
378 }
379 };
380 (
381 {},
382 {
383 $(
384 {$flag:ident, $index:expr}
385 ),*
386 },
387 $next_index:expr
388 ) => {
389 #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
390 #[repr(u32)]
391 pub enum FragmentTypeFlag {
392 $(
393 $flag = (1 << $index),
394 )*
395 }
396
397 pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
398 $(
399 FragmentTypeFlag::$flag,
400 )*
401 ];
402
403 impl TryFrom<u32> for FragmentTypeFlag {
404 type Error = String;
405
406 fn try_from(value: u32) -> Result<Self, Self::Error> {
407 match value {
408 $(
409 value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
410 )*
411 _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
412 }
413 }
414 }
415
416 impl FragmentTypeFlag {
417 pub fn as_str_name(&self) -> &'static str {
418 match self {
419 $(
420 FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
421 )*
422 }
423 }
424 }
425 };
426 (
427 {$first:ident $(, $rest:ident)*},
428 {
429 $(
430 {$flag:ident, $index:expr}
431 ),*
432 },
433 $next_index:expr
434 ) => {
435 for_all_fragment_type_flags! {
436 {$($rest),*},
437 {
438 $({$flag, $index},)*
439 {$first, $next_index}
440 },
441 $next_index + 1
442 }
443 };
444}
445
446for_all_fragment_type_flags!();
447
448impl FragmentTypeFlag {
449 pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
450 flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
451 }
452
453 pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
455 [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
456 }
457
458 pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
461 [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
462 }
463
464 pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
466 [FragmentTypeFlag::Sink].into_iter()
467 }
468
469 pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
471 Self::backfill_rate_limit_fragments()
472 .chain(Self::source_rate_limit_fragments())
473 .chain(Self::sink_rate_limit_fragments())
474 }
475
476 pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
477 [FragmentTypeFlag::Dml].into_iter()
478 }
479}
480
481#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
482pub struct FragmentTypeMask(u32);
483
484impl Binary for FragmentTypeMask {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 write!(f, "{:b}", self.0)
487 }
488}
489
490impl From<i32> for FragmentTypeMask {
491 fn from(value: i32) -> Self {
492 Self(value as u32)
493 }
494}
495
496impl From<u32> for FragmentTypeMask {
497 fn from(value: u32) -> Self {
498 Self(value)
499 }
500}
501
502impl From<FragmentTypeMask> for u32 {
503 fn from(value: FragmentTypeMask) -> Self {
504 value.0
505 }
506}
507
508impl From<FragmentTypeMask> for i32 {
509 fn from(value: FragmentTypeMask) -> Self {
510 value.0 as _
511 }
512}
513
514impl FragmentTypeMask {
515 pub fn empty() -> Self {
516 FragmentTypeMask(0)
517 }
518
519 pub fn add(&mut self, flag: FragmentTypeFlag) {
520 self.0 |= flag as u32;
521 }
522
523 pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
524 let flag = FragmentTypeFlag::raw_flag(flags);
525 (self.0 & flag) != 0
526 }
527
528 pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
529 self.contains_any([flag])
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use itertools::Itertools;
536 use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
537
538 use crate::catalog::FragmentTypeFlag;
539
540 #[test]
541 fn test_all_fragment_type_flag() {
542 expect_test::expect![[r#"
543 [
544 (
545 Source,
546 1,
547 "SOURCE",
548 ),
549 (
550 Mview,
551 2,
552 "MVIEW",
553 ),
554 (
555 Sink,
556 4,
557 "SINK",
558 ),
559 (
560 Now,
561 8,
562 "NOW",
563 ),
564 (
565 StreamScan,
566 16,
567 "STREAM_SCAN",
568 ),
569 (
570 BarrierRecv,
571 32,
572 "BARRIER_RECV",
573 ),
574 (
575 Values,
576 64,
577 "VALUES",
578 ),
579 (
580 Dml,
581 128,
582 "DML",
583 ),
584 (
585 CdcFilter,
586 256,
587 "CDC_FILTER",
588 ),
589 (
590 Skipped1,
591 512,
592 "SKIPPED1",
593 ),
594 (
595 SourceScan,
596 1024,
597 "SOURCE_SCAN",
598 ),
599 (
600 SnapshotBackfillStreamScan,
601 2048,
602 "SNAPSHOT_BACKFILL_STREAM_SCAN",
603 ),
604 (
605 FsFetch,
606 4096,
607 "FS_FETCH",
608 ),
609 (
610 CrossDbSnapshotBackfillStreamScan,
611 8192,
612 "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
613 ),
614 (
615 StreamCdcScan,
616 16384,
617 "STREAM_CDC_SCAN",
618 ),
619 (
620 VectorIndexWrite,
621 32768,
622 "VECTOR_INDEX_WRITE",
623 ),
624 (
625 UpstreamSinkUnion,
626 65536,
627 "UPSTREAM_SINK_UNION",
628 ),
629 (
630 LocalityProvider,
631 131072,
632 "LOCALITY_PROVIDER",
633 ),
634 ]
635 "#]]
636 .assert_debug_eq(
637 &FRAGMENT_TYPE_FLAG_LIST
638 .into_iter()
639 .map(|flag| (flag, flag as u32, flag.as_str_name()))
640 .collect_vec(),
641 );
642 for flag in FRAGMENT_TYPE_FLAG_LIST {
643 assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
644 }
645 }
646}