1mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::fmt::Binary;
23use std::sync::Arc;
24
25pub use column::*;
26pub use external_table::*;
27use futures::stream::BoxStream;
28pub use internal_table::*;
29use parse_display::Display;
30pub use physical_table::*;
31use risingwave_pb::catalog::table::PbEngine;
32use risingwave_pb::catalog::{
33 CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
34 StreamJobStatus as PbStreamJobStatus,
35};
36use risingwave_pb::plan_common::ColumnDescVersion;
37pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42pub use crate::id::*;
43
44pub type CatalogVersion = u64;
46
47pub type TableVersionId = u64;
49pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
51pub type SourceVersionId = u64;
53pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
55
56pub const DEFAULT_DATABASE_NAME: &str = "dev";
57pub const DEFAULT_SCHEMA_NAME: &str = "public";
58pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
59pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
60pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
61pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
62pub const DEFAULT_SUPER_USER: &str = "root";
63pub const DEFAULT_SUPER_USER_ID: UserId = UserId::new(1);
64pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
66pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
67
68pub const DEFAULT_SUPER_USER_FOR_ADMIN: &str = "rwadmin";
70pub const DEFAULT_SUPER_USER_FOR_ADMIN_ID: UserId = UserId::new(3);
71
72pub const NON_RESERVED_USER_ID: UserId = UserId::new(11);
73
74pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
75pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
76
77pub use risingwave_pb::id::OBJECT_ID_PLACEHOLDER;
78
79pub const SYSTEM_SCHEMAS: [&str; 3] = [
80 PG_CATALOG_SCHEMA_NAME,
81 INFORMATION_SCHEMA_SCHEMA_NAME,
82 RW_CATALOG_SCHEMA_NAME,
83];
84pub fn is_system_schema(schema_name: &str) -> bool {
85 SYSTEM_SCHEMAS.contains(&schema_name)
86}
87
88pub fn is_reserved_admin_user(user_name: &str) -> bool {
89 user_name == DEFAULT_SUPER_USER_FOR_ADMIN
90}
91
92pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
93
94pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
98
99pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
100 match version {
101 ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
102 _ => DEFAULT_KEY_COLUMN_NAME,
103 }
104}
105
106pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
111
112pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
117
118pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
119pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
121
122pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
126
127pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
128pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
129
130pub const PROJECTED_ROW_ID_COLUMN_NAME: &str = "_rw_projected_row_id";
133
134pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
135pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
136pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
137
138pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
139pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
142pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
143
144pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
145pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
146
147pub trait SysCatalogReader: Sync + Send + 'static {
149 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
151}
152
153pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
154
155#[derive(Clone, Debug, PartialEq, Default, Copy)]
156pub struct TableOption {
157 pub retention_seconds: Option<u32>, }
159
160impl From<&risingwave_pb::hummock::TableOption> for TableOption {
161 fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
162 Self {
163 retention_seconds: table_option.retention_seconds,
164 }
165 }
166}
167
168impl From<&TableOption> for risingwave_pb::hummock::TableOption {
169 fn from(table_option: &TableOption) -> Self {
170 Self {
171 retention_seconds: table_option.retention_seconds,
172 }
173 }
174}
175
176impl TableOption {
177 pub fn new(retention_seconds: Option<u32>) -> Self {
178 TableOption { retention_seconds }
180 }
181}
182
183#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
184pub enum ConflictBehavior {
185 #[default]
186 NoCheck,
187 Overwrite,
188 IgnoreConflict,
189 DoUpdateIfNotNull,
190}
191
192#[macro_export]
193macro_rules! _checked_conflict_behaviors {
194 () => {
195 ConflictBehavior::Overwrite
196 | ConflictBehavior::IgnoreConflict
197 | ConflictBehavior::DoUpdateIfNotNull
198 };
199}
200pub use _checked_conflict_behaviors as checked_conflict_behaviors;
201
202impl ConflictBehavior {
203 pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
204 match tb_conflict_behavior {
205 PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
206 PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
207 PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
208 PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
211 ConflictBehavior::NoCheck
212 }
213 }
214 }
215
216 pub fn to_protobuf(self) -> PbHandleConflictBehavior {
217 match self {
218 ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
219 ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
220 ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
221 ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
222 }
223 }
224
225 pub fn debug_to_string(self) -> String {
226 match self {
227 ConflictBehavior::NoCheck => "NoCheck".to_owned(),
228 ConflictBehavior::Overwrite => "Overwrite".to_owned(),
229 ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
230 ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
231 }
232 }
233}
234
235#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
236pub enum Engine {
237 #[default]
238 Hummock,
239 Iceberg,
240}
241
242impl Engine {
243 pub fn from_protobuf(engine: &PbEngine) -> Self {
244 match engine {
245 PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
246 PbEngine::Iceberg => Engine::Iceberg,
247 }
248 }
249
250 pub fn to_protobuf(self) -> PbEngine {
251 match self {
252 Engine::Hummock => PbEngine::Hummock,
253 Engine::Iceberg => PbEngine::Iceberg,
254 }
255 }
256
257 pub fn debug_to_string(self) -> String {
258 match self {
259 Engine::Hummock => "Hummock".to_owned(),
260 Engine::Iceberg => "Iceberg".to_owned(),
261 }
262 }
263}
264
265#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
266pub enum StreamJobStatus {
267 #[default]
268 Creating,
269 Created,
270}
271
272impl StreamJobStatus {
273 pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
274 match stream_job_status {
275 PbStreamJobStatus::Creating => StreamJobStatus::Creating,
276 PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
277 }
278 }
279
280 pub fn to_proto(self) -> PbStreamJobStatus {
281 match self {
282 StreamJobStatus::Creating => PbStreamJobStatus::Creating,
283 StreamJobStatus::Created => PbStreamJobStatus::Created,
284 }
285 }
286}
287
288#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord, Default)]
289pub enum CreateType {
290 #[default]
291 Foreground,
292 Background,
293}
294
295impl CreateType {
296 pub fn from_proto(pb_create_type: PbCreateType) -> Self {
297 match pb_create_type {
298 PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
299 PbCreateType::Background => CreateType::Background,
300 }
301 }
302
303 pub fn to_proto(self) -> PbCreateType {
304 match self {
305 CreateType::Foreground => PbCreateType::Foreground,
306 CreateType::Background => PbCreateType::Background,
307 }
308 }
309}
310
311#[derive(Clone, Debug)]
312pub enum AlterDatabaseParam {
313 BarrierIntervalMs(Option<u32>),
316 CheckpointFrequency(Option<u64>),
317}
318
319macro_rules! for_all_fragment_type_flags {
320 () => {
321 for_all_fragment_type_flags! {
322 {
323 Source,
324 Mview,
325 Sink,
326 Now,
327 StreamScan,
328 BarrierRecv,
329 Values,
330 Dml,
331 CdcFilter,
332 Skipped1,
333 SourceScan,
334 SnapshotBackfillStreamScan,
335 FsFetch,
336 CrossDbSnapshotBackfillStreamScan,
337 StreamCdcScan,
338 VectorIndexWrite,
339 UpstreamSinkUnion,
340 LocalityProvider
341 },
342 {},
343 0
344 }
345 };
346 (
347 {},
348 {
349 $(
350 {$flag:ident, $index:expr}
351 ),*
352 },
353 $next_index:expr
354 ) => {
355 #[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq)]
356 #[repr(u32)]
357 pub enum FragmentTypeFlag {
358 $(
359 $flag = (1 << $index),
360 )*
361 }
362
363 pub const FRAGMENT_TYPE_FLAG_LIST: [FragmentTypeFlag; $next_index] = [
364 $(
365 FragmentTypeFlag::$flag,
366 )*
367 ];
368
369 impl TryFrom<u32> for FragmentTypeFlag {
370 type Error = String;
371
372 fn try_from(value: u32) -> Result<Self, Self::Error> {
373 match value {
374 $(
375 value if value == (FragmentTypeFlag::$flag as u32) => Ok(FragmentTypeFlag::$flag),
376 )*
377 _ => Err(format!("Invalid FragmentTypeFlag value: {}", value)),
378 }
379 }
380 }
381
382 impl FragmentTypeFlag {
383 pub fn as_str_name(&self) -> &'static str {
384 match self {
385 $(
386 FragmentTypeFlag::$flag => paste::paste!{stringify!( [< $flag:snake:upper >] )},
387 )*
388 }
389 }
390 }
391 };
392 (
393 {$first:ident $(, $rest:ident)*},
394 {
395 $(
396 {$flag:ident, $index:expr}
397 ),*
398 },
399 $next_index:expr
400 ) => {
401 for_all_fragment_type_flags! {
402 {$($rest),*},
403 {
404 $({$flag, $index},)*
405 {$first, $next_index}
406 },
407 $next_index + 1
408 }
409 };
410}
411
412for_all_fragment_type_flags!();
413
414impl FragmentTypeFlag {
415 pub fn raw_flag(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> u32 {
416 flags.into_iter().fold(0, |acc, flag| acc | (flag as u32))
417 }
418
419 pub fn backfill_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
421 [FragmentTypeFlag::SourceScan, FragmentTypeFlag::StreamScan].into_iter()
422 }
423
424 pub fn source_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
427 [FragmentTypeFlag::Source, FragmentTypeFlag::FsFetch].into_iter()
428 }
429
430 pub fn sink_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
432 [FragmentTypeFlag::Sink].into_iter()
433 }
434
435 pub fn rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
437 Self::backfill_rate_limit_fragments()
438 .chain(Self::source_rate_limit_fragments())
439 .chain(Self::sink_rate_limit_fragments())
440 }
441
442 pub fn dml_rate_limit_fragments() -> impl Iterator<Item = FragmentTypeFlag> {
443 [FragmentTypeFlag::Dml].into_iter()
444 }
445}
446
447#[derive(Clone, Copy, Debug, Hash, PartialOrd, PartialEq, Eq, Default)]
448pub struct FragmentTypeMask(u32);
449
450impl Binary for FragmentTypeMask {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 write!(f, "{:b}", self.0)
453 }
454}
455
456impl From<i32> for FragmentTypeMask {
457 fn from(value: i32) -> Self {
458 Self(value as u32)
459 }
460}
461
462impl From<u32> for FragmentTypeMask {
463 fn from(value: u32) -> Self {
464 Self(value)
465 }
466}
467
468impl From<FragmentTypeMask> for u32 {
469 fn from(value: FragmentTypeMask) -> Self {
470 value.0
471 }
472}
473
474impl From<FragmentTypeMask> for i32 {
475 fn from(value: FragmentTypeMask) -> Self {
476 value.0 as _
477 }
478}
479
480impl FragmentTypeMask {
481 pub fn empty() -> Self {
482 FragmentTypeMask(0)
483 }
484
485 pub fn add(&mut self, flag: FragmentTypeFlag) {
486 self.0 |= flag as u32;
487 }
488
489 pub fn contains_any(&self, flags: impl IntoIterator<Item = FragmentTypeFlag>) -> bool {
490 let flag = FragmentTypeFlag::raw_flag(flags);
491 (self.0 & flag) != 0
492 }
493
494 pub fn contains(&self, flag: FragmentTypeFlag) -> bool {
495 self.contains_any([flag])
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use itertools::Itertools;
502 use risingwave_common::catalog::FRAGMENT_TYPE_FLAG_LIST;
503
504 use crate::catalog::FragmentTypeFlag;
505
506 #[test]
507 fn test_all_fragment_type_flag() {
508 expect_test::expect![[r#"
509 [
510 (
511 Source,
512 1,
513 "SOURCE",
514 ),
515 (
516 Mview,
517 2,
518 "MVIEW",
519 ),
520 (
521 Sink,
522 4,
523 "SINK",
524 ),
525 (
526 Now,
527 8,
528 "NOW",
529 ),
530 (
531 StreamScan,
532 16,
533 "STREAM_SCAN",
534 ),
535 (
536 BarrierRecv,
537 32,
538 "BARRIER_RECV",
539 ),
540 (
541 Values,
542 64,
543 "VALUES",
544 ),
545 (
546 Dml,
547 128,
548 "DML",
549 ),
550 (
551 CdcFilter,
552 256,
553 "CDC_FILTER",
554 ),
555 (
556 Skipped1,
557 512,
558 "SKIPPED1",
559 ),
560 (
561 SourceScan,
562 1024,
563 "SOURCE_SCAN",
564 ),
565 (
566 SnapshotBackfillStreamScan,
567 2048,
568 "SNAPSHOT_BACKFILL_STREAM_SCAN",
569 ),
570 (
571 FsFetch,
572 4096,
573 "FS_FETCH",
574 ),
575 (
576 CrossDbSnapshotBackfillStreamScan,
577 8192,
578 "CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN",
579 ),
580 (
581 StreamCdcScan,
582 16384,
583 "STREAM_CDC_SCAN",
584 ),
585 (
586 VectorIndexWrite,
587 32768,
588 "VECTOR_INDEX_WRITE",
589 ),
590 (
591 UpstreamSinkUnion,
592 65536,
593 "UPSTREAM_SINK_UNION",
594 ),
595 (
596 LocalityProvider,
597 131072,
598 "LOCALITY_PROVIDER",
599 ),
600 ]
601 "#]]
602 .assert_debug_eq(
603 &FRAGMENT_TYPE_FLAG_LIST
604 .into_iter()
605 .map(|flag| (flag, flag as u32, flag.as_str_name()))
606 .collect_vec(),
607 );
608 for flag in FRAGMENT_TYPE_FLAG_LIST {
609 assert_eq!(FragmentTypeFlag::try_from(flag as u32).unwrap(), flag);
610 }
611 }
612}