1mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::sync::Arc;
23
24pub use column::*;
25pub use external_table::*;
26use futures::stream::BoxStream;
27pub use internal_table::*;
28use parse_display::Display;
29pub use physical_table::*;
30use risingwave_pb::catalog::table::PbEngine;
31use risingwave_pb::catalog::{
32 CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
33 StreamJobStatus as PbStreamJobStatus,
34};
35use risingwave_pb::plan_common::ColumnDescVersion;
36pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
37use serde::{Deserialize, Serialize};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42
43pub type CatalogVersion = u64;
45
46pub type TableVersionId = u64;
48pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
50pub type SourceVersionId = u64;
52pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
54
55pub const DEFAULT_DATABASE_NAME: &str = "dev";
56pub const DEFAULT_SCHEMA_NAME: &str = "public";
57pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
58pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
59pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
60pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
61pub const DEFAULT_SUPER_USER: &str = "root";
62pub const DEFAULT_SUPER_USER_ID: u32 = 1;
63pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
65pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
66
67pub const NON_RESERVED_USER_ID: i32 = 11;
68
69pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
70pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
71
72pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
73
74pub const SYSTEM_SCHEMAS: [&str; 3] = [
75 PG_CATALOG_SCHEMA_NAME,
76 INFORMATION_SCHEMA_SCHEMA_NAME,
77 RW_CATALOG_SCHEMA_NAME,
78];
79pub fn is_system_schema(schema_name: &str) -> bool {
80 SYSTEM_SCHEMAS.contains(&schema_name)
81}
82
83pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
84
85pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
89
90pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
91 match version {
92 ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
93 _ => DEFAULT_KEY_COLUMN_NAME,
94 }
95}
96
97pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
102
103pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
108
109pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
110pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
112
113pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
117
118pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
119pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
120
121pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
122pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
123pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
124
125pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
126pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
129pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
130
131pub trait SysCatalogReader: Sync + Send + 'static {
133 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
135}
136
137pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
138
139pub type ObjectId = u32;
140
141#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
142#[display("{database_id}")]
143pub struct DatabaseId {
144 pub database_id: u32,
145}
146
147impl DatabaseId {
148 pub const fn new(database_id: u32) -> Self {
149 DatabaseId { database_id }
150 }
151
152 pub fn placeholder() -> Self {
153 DatabaseId {
154 database_id: OBJECT_ID_PLACEHOLDER,
155 }
156 }
157}
158
159impl From<u32> for DatabaseId {
160 fn from(id: u32) -> Self {
161 Self::new(id)
162 }
163}
164
165impl From<&u32> for DatabaseId {
166 fn from(id: &u32) -> Self {
167 Self::new(*id)
168 }
169}
170
171impl From<DatabaseId> for u32 {
172 fn from(id: DatabaseId) -> Self {
173 id.database_id
174 }
175}
176
177#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
178#[display("{schema_id}")]
179pub struct SchemaId {
180 pub schema_id: u32,
181}
182
183impl SchemaId {
184 pub fn new(schema_id: u32) -> Self {
185 SchemaId { schema_id }
186 }
187
188 pub fn placeholder() -> Self {
189 SchemaId {
190 schema_id: OBJECT_ID_PLACEHOLDER,
191 }
192 }
193}
194
195impl From<u32> for SchemaId {
196 fn from(id: u32) -> Self {
197 Self::new(id)
198 }
199}
200
201impl From<&u32> for SchemaId {
202 fn from(id: &u32) -> Self {
203 Self::new(*id)
204 }
205}
206
207impl From<SchemaId> for u32 {
208 fn from(id: SchemaId) -> Self {
209 id.schema_id
210 }
211}
212
213#[derive(
214 Clone,
215 Copy,
216 Debug,
217 Display,
218 Default,
219 Hash,
220 PartialOrd,
221 PartialEq,
222 Eq,
223 Ord,
224 Serialize,
225 Deserialize,
226)]
227#[display("{table_id}")]
228pub struct TableId {
229 pub table_id: u32,
230}
231
232impl TableId {
233 pub const fn new(table_id: u32) -> Self {
234 TableId { table_id }
235 }
236
237 pub const fn placeholder() -> Self {
239 TableId {
240 table_id: OBJECT_ID_PLACEHOLDER,
241 }
242 }
243
244 pub fn table_id(&self) -> u32 {
245 self.table_id
246 }
247}
248
249impl From<u32> for TableId {
250 fn from(id: u32) -> Self {
251 Self::new(id)
252 }
253}
254
255impl From<&u32> for TableId {
256 fn from(id: &u32) -> Self {
257 Self::new(*id)
258 }
259}
260
261impl From<TableId> for u32 {
262 fn from(id: TableId) -> Self {
263 id.table_id
264 }
265}
266
267#[derive(Clone, Debug, PartialEq, Default, Copy)]
268pub struct TableOption {
269 pub retention_seconds: Option<u32>, }
271
272impl From<&risingwave_pb::hummock::TableOption> for TableOption {
273 fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
274 Self {
275 retention_seconds: table_option.retention_seconds,
276 }
277 }
278}
279
280impl From<&TableOption> for risingwave_pb::hummock::TableOption {
281 fn from(table_option: &TableOption) -> Self {
282 Self {
283 retention_seconds: table_option.retention_seconds,
284 }
285 }
286}
287
288impl TableOption {
289 pub fn new(retention_seconds: Option<u32>) -> Self {
290 TableOption { retention_seconds }
292 }
293}
294
295#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
296#[display("{index_id}")]
297pub struct IndexId {
298 pub index_id: u32,
299}
300
301impl IndexId {
302 pub const fn new(index_id: u32) -> Self {
303 IndexId { index_id }
304 }
305
306 pub const fn placeholder() -> Self {
308 IndexId {
309 index_id: OBJECT_ID_PLACEHOLDER,
310 }
311 }
312
313 pub fn index_id(&self) -> u32 {
314 self.index_id
315 }
316}
317
318impl From<u32> for IndexId {
319 fn from(id: u32) -> Self {
320 Self::new(id)
321 }
322}
323impl From<IndexId> for u32 {
324 fn from(id: IndexId) -> Self {
325 id.index_id
326 }
327}
328
329#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
330pub struct FunctionId(pub u32);
331
332impl FunctionId {
333 pub const fn new(id: u32) -> Self {
334 FunctionId(id)
335 }
336
337 pub const fn placeholder() -> Self {
338 FunctionId(OBJECT_ID_PLACEHOLDER)
339 }
340
341 pub fn function_id(&self) -> u32 {
342 self.0
343 }
344}
345
346impl From<u32> for FunctionId {
347 fn from(id: u32) -> Self {
348 Self::new(id)
349 }
350}
351
352impl From<&u32> for FunctionId {
353 fn from(id: &u32) -> Self {
354 Self::new(*id)
355 }
356}
357
358impl From<FunctionId> for u32 {
359 fn from(id: FunctionId) -> Self {
360 id.0
361 }
362}
363
364#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
365#[display("{user_id}")]
366pub struct UserId {
367 pub user_id: u32,
368}
369
370impl UserId {
371 pub const fn new(user_id: u32) -> Self {
372 UserId { user_id }
373 }
374
375 pub const fn placeholder() -> Self {
376 UserId {
377 user_id: OBJECT_ID_PLACEHOLDER,
378 }
379 }
380}
381
382impl From<u32> for UserId {
383 fn from(id: u32) -> Self {
384 Self::new(id)
385 }
386}
387
388impl From<&u32> for UserId {
389 fn from(id: &u32) -> Self {
390 Self::new(*id)
391 }
392}
393
394impl From<UserId> for u32 {
395 fn from(id: UserId) -> Self {
396 id.user_id
397 }
398}
399
400#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
401pub struct ConnectionId(pub u32);
402
403impl ConnectionId {
404 pub const fn new(id: u32) -> Self {
405 ConnectionId(id)
406 }
407
408 pub const fn placeholder() -> Self {
409 ConnectionId(OBJECT_ID_PLACEHOLDER)
410 }
411
412 pub fn connection_id(&self) -> u32 {
413 self.0
414 }
415}
416
417impl From<u32> for ConnectionId {
418 fn from(id: u32) -> Self {
419 Self::new(id)
420 }
421}
422
423impl From<&u32> for ConnectionId {
424 fn from(id: &u32) -> Self {
425 Self::new(*id)
426 }
427}
428
429impl From<ConnectionId> for u32 {
430 fn from(id: ConnectionId) -> Self {
431 id.0
432 }
433}
434
435#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
436pub struct SecretId(pub u32);
437
438impl SecretId {
439 pub const fn new(id: u32) -> Self {
440 SecretId(id)
441 }
442
443 pub const fn placeholder() -> Self {
444 SecretId(OBJECT_ID_PLACEHOLDER)
445 }
446
447 pub fn secret_id(&self) -> u32 {
448 self.0
449 }
450}
451
452impl From<u32> for SecretId {
453 fn from(id: u32) -> Self {
454 Self::new(id)
455 }
456}
457
458impl From<&u32> for SecretId {
459 fn from(id: &u32) -> Self {
460 Self::new(*id)
461 }
462}
463
464impl From<SecretId> for u32 {
465 fn from(id: SecretId) -> Self {
466 id.0
467 }
468}
469
470#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
471pub enum ConflictBehavior {
472 #[default]
473 NoCheck,
474 Overwrite,
475 IgnoreConflict,
476 DoUpdateIfNotNull,
477}
478
479#[macro_export]
480macro_rules! _checked_conflict_behaviors {
481 () => {
482 ConflictBehavior::Overwrite
483 | ConflictBehavior::IgnoreConflict
484 | ConflictBehavior::DoUpdateIfNotNull
485 };
486}
487pub use _checked_conflict_behaviors as checked_conflict_behaviors;
488
489impl ConflictBehavior {
490 pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
491 match tb_conflict_behavior {
492 PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
493 PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
494 PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
495 PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
498 ConflictBehavior::NoCheck
499 }
500 }
501 }
502
503 pub fn to_protobuf(self) -> PbHandleConflictBehavior {
504 match self {
505 ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
506 ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
507 ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
508 ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
509 }
510 }
511
512 pub fn debug_to_string(self) -> String {
513 match self {
514 ConflictBehavior::NoCheck => "NoCheck".to_owned(),
515 ConflictBehavior::Overwrite => "Overwrite".to_owned(),
516 ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
517 ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
518 }
519 }
520}
521
522#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
523pub enum Engine {
524 #[default]
525 Hummock,
526 Iceberg,
527}
528
529impl Engine {
530 pub fn from_protobuf(engine: &PbEngine) -> Self {
531 match engine {
532 PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
533 PbEngine::Iceberg => Engine::Iceberg,
534 }
535 }
536
537 pub fn to_protobuf(self) -> PbEngine {
538 match self {
539 Engine::Hummock => PbEngine::Hummock,
540 Engine::Iceberg => PbEngine::Iceberg,
541 }
542 }
543
544 pub fn debug_to_string(self) -> String {
545 match self {
546 Engine::Hummock => "Hummock".to_owned(),
547 Engine::Iceberg => "Iceberg".to_owned(),
548 }
549 }
550}
551
552#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
553pub enum StreamJobStatus {
554 #[default]
555 Creating,
556 Created,
557}
558
559impl StreamJobStatus {
560 pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
561 match stream_job_status {
562 PbStreamJobStatus::Creating => StreamJobStatus::Creating,
563 PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
564 }
565 }
566
567 pub fn to_proto(self) -> PbStreamJobStatus {
568 match self {
569 StreamJobStatus::Creating => PbStreamJobStatus::Creating,
570 StreamJobStatus::Created => PbStreamJobStatus::Created,
571 }
572 }
573}
574
575#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
576pub enum CreateType {
577 Foreground,
578 Background,
579}
580
581impl Default for CreateType {
582 fn default() -> Self {
583 Self::Foreground
584 }
585}
586
587impl CreateType {
588 pub fn from_proto(pb_create_type: PbCreateType) -> Self {
589 match pb_create_type {
590 PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
591 PbCreateType::Background => CreateType::Background,
592 }
593 }
594
595 pub fn to_proto(self) -> PbCreateType {
596 match self {
597 CreateType::Foreground => PbCreateType::Foreground,
598 CreateType::Background => PbCreateType::Background,
599 }
600 }
601}
602
603pub enum AlterDatabaseParam {
604 BarrierIntervalMs(Option<u32>),
607 CheckpointFrequency(Option<u64>),
608}