1mod column;
16mod external_table;
17mod internal_table;
18mod physical_table;
19mod schema;
20pub mod test_utils;
21
22use std::sync::Arc;
23
24pub use column::*;
25pub use external_table::*;
26use futures::stream::BoxStream;
27pub use internal_table::*;
28use parse_display::Display;
29pub use physical_table::*;
30use risingwave_pb::catalog::table::PbEngine;
31use risingwave_pb::catalog::{
32 CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
33 StreamJobStatus as PbStreamJobStatus,
34};
35use risingwave_pb::plan_common::ColumnDescVersion;
36pub use schema::{Field, FieldDisplay, FieldLike, Schema, test_utils as schema_test_utils};
37use serde::{Deserialize, Serialize};
38
39use crate::array::DataChunk;
40pub use crate::constants::hummock;
41use crate::error::BoxedError;
42
43pub type CatalogVersion = u64;
45
46pub type TableVersionId = u64;
48pub const INITIAL_TABLE_VERSION_ID: u64 = 0;
50pub type SourceVersionId = u64;
52pub const INITIAL_SOURCE_VERSION_ID: u64 = 0;
54
55pub const DEFAULT_DATABASE_NAME: &str = "dev";
56pub const DEFAULT_SCHEMA_NAME: &str = "public";
57pub const PG_CATALOG_SCHEMA_NAME: &str = "pg_catalog";
58pub const INFORMATION_SCHEMA_SCHEMA_NAME: &str = "information_schema";
59pub const RW_CATALOG_SCHEMA_NAME: &str = "rw_catalog";
60pub const RESERVED_PG_SCHEMA_PREFIX: &str = "pg_";
61pub const DEFAULT_SUPER_USER: &str = "root";
62pub const DEFAULT_SUPER_USER_ID: u32 = 1;
63pub const DEFAULT_SUPER_USER_FOR_PG: &str = "postgres";
65pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2;
66
67pub const NON_RESERVED_USER_ID: i32 = 11;
68
69pub const MAX_SYS_CATALOG_NUM: i32 = 5000;
70pub const SYS_CATALOG_START_ID: i32 = i32::MAX - MAX_SYS_CATALOG_NUM;
71
72pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1;
73
74pub const SYSTEM_SCHEMAS: [&str; 3] = [
75 PG_CATALOG_SCHEMA_NAME,
76 INFORMATION_SCHEMA_SCHEMA_NAME,
77 RW_CATALOG_SCHEMA_NAME,
78];
79pub fn is_system_schema(schema_name: &str) -> bool {
80 SYSTEM_SCHEMAS.contains(&schema_name)
81}
82
83pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_";
84
85pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key";
89
90pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str {
91 match version {
92 ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME,
93 _ => DEFAULT_KEY_COLUMN_NAME,
94 }
95}
96
97pub const KAFKA_TIMESTAMP_COLUMN_NAME: &str = "_rw_kafka_timestamp";
102
103pub const RISINGWAVE_ICEBERG_ROW_ID: &str = "_risingwave_iceberg_row_id";
108
109pub const ROW_ID_COLUMN_NAME: &str = "_row_id";
110pub const ROW_ID_COLUMN_ID: ColumnId = ColumnId::new(0);
112
113pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
117
118pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
119pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
120
121pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
122pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
123pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";
124
125pub const CDC_OFFSET_COLUMN_NAME: &str = "_rw_offset";
126pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
129pub const CDC_TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";
130
131pub trait SysCatalogReader: Sync + Send + 'static {
133 fn read_table(&self, table_id: TableId) -> BoxStream<'_, Result<DataChunk, BoxedError>>;
135}
136
137pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
138
139pub type ObjectId = u32;
140
141#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)]
142#[display("{database_id}")]
143pub struct DatabaseId {
144 pub database_id: u32,
145}
146
147impl DatabaseId {
148 pub const fn new(database_id: u32) -> Self {
149 DatabaseId { database_id }
150 }
151
152 pub fn placeholder() -> Self {
153 DatabaseId {
154 database_id: OBJECT_ID_PLACEHOLDER,
155 }
156 }
157}
158
159impl From<u32> for DatabaseId {
160 fn from(id: u32) -> Self {
161 Self::new(id)
162 }
163}
164
165impl From<&u32> for DatabaseId {
166 fn from(id: &u32) -> Self {
167 Self::new(*id)
168 }
169}
170
171impl From<DatabaseId> for u32 {
172 fn from(id: DatabaseId) -> Self {
173 id.database_id
174 }
175}
176
177#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)]
178#[display("{schema_id}")]
179pub struct SchemaId {
180 pub schema_id: u32,
181}
182
183impl SchemaId {
184 pub fn new(schema_id: u32) -> Self {
185 SchemaId { schema_id }
186 }
187
188 pub fn placeholder() -> Self {
189 SchemaId {
190 schema_id: OBJECT_ID_PLACEHOLDER,
191 }
192 }
193}
194
195impl From<u32> for SchemaId {
196 fn from(id: u32) -> Self {
197 Self::new(id)
198 }
199}
200
201impl From<&u32> for SchemaId {
202 fn from(id: &u32) -> Self {
203 Self::new(*id)
204 }
205}
206
207impl From<SchemaId> for u32 {
208 fn from(id: SchemaId) -> Self {
209 id.schema_id
210 }
211}
212
213#[derive(
214 Clone,
215 Copy,
216 Debug,
217 Display,
218 Default,
219 Hash,
220 PartialOrd,
221 PartialEq,
222 Eq,
223 Ord,
224 Serialize,
225 Deserialize,
226)]
227#[display("{table_id}")]
228pub struct TableId {
229 pub table_id: u32,
230}
231
232impl TableId {
233 pub const fn new(table_id: u32) -> Self {
234 TableId { table_id }
235 }
236
237 pub const fn placeholder() -> Self {
239 TableId {
240 table_id: OBJECT_ID_PLACEHOLDER,
241 }
242 }
243
244 pub fn table_id(&self) -> u32 {
245 self.table_id
246 }
247}
248
249impl From<u32> for TableId {
250 fn from(id: u32) -> Self {
251 Self::new(id)
252 }
253}
254
255impl From<&u32> for TableId {
256 fn from(id: &u32) -> Self {
257 Self::new(*id)
258 }
259}
260
261impl From<TableId> for u32 {
262 fn from(id: TableId) -> Self {
263 id.table_id
264 }
265}
266
267#[derive(Clone, Debug, PartialEq, Default, Copy)]
268pub struct TableOption {
269 pub retention_seconds: Option<u32>, }
271
272impl From<&risingwave_pb::hummock::TableOption> for TableOption {
273 fn from(table_option: &risingwave_pb::hummock::TableOption) -> Self {
274 Self {
275 retention_seconds: table_option.retention_seconds,
276 }
277 }
278}
279
280impl From<&TableOption> for risingwave_pb::hummock::TableOption {
281 fn from(table_option: &TableOption) -> Self {
282 Self {
283 retention_seconds: table_option.retention_seconds,
284 }
285 }
286}
287
288impl TableOption {
289 pub fn new(retention_seconds: Option<u32>) -> Self {
290 TableOption { retention_seconds }
292 }
293}
294
295#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq)]
296#[display("{index_id}")]
297pub struct IndexId {
298 pub index_id: u32,
299}
300
301impl IndexId {
302 pub const fn new(index_id: u32) -> Self {
303 IndexId { index_id }
304 }
305
306 pub const fn placeholder() -> Self {
308 IndexId {
309 index_id: OBJECT_ID_PLACEHOLDER,
310 }
311 }
312
313 pub fn index_id(&self) -> u32 {
314 self.index_id
315 }
316}
317
318impl From<u32> for IndexId {
319 fn from(id: u32) -> Self {
320 Self::new(id)
321 }
322}
323impl From<IndexId> for u32 {
324 fn from(id: IndexId) -> Self {
325 id.index_id
326 }
327}
328
329#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
330pub struct FunctionId(pub u32);
331
332impl FunctionId {
333 pub const fn new(id: u32) -> Self {
334 FunctionId(id)
335 }
336
337 pub const fn placeholder() -> Self {
338 FunctionId(OBJECT_ID_PLACEHOLDER)
339 }
340
341 pub fn function_id(&self) -> u32 {
342 self.0
343 }
344}
345
346impl From<u32> for FunctionId {
347 fn from(id: u32) -> Self {
348 Self::new(id)
349 }
350}
351
352impl From<&u32> for FunctionId {
353 fn from(id: &u32) -> Self {
354 Self::new(*id)
355 }
356}
357
358impl From<FunctionId> for u32 {
359 fn from(id: FunctionId) -> Self {
360 id.0
361 }
362}
363
364#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
365#[display("{user_id}")]
366pub struct UserId {
367 pub user_id: u32,
368}
369
370impl UserId {
371 pub const fn new(user_id: u32) -> Self {
372 UserId { user_id }
373 }
374
375 pub const fn placeholder() -> Self {
376 UserId {
377 user_id: OBJECT_ID_PLACEHOLDER,
378 }
379 }
380}
381
382impl From<u32> for UserId {
383 fn from(id: u32) -> Self {
384 Self::new(id)
385 }
386}
387
388impl From<&u32> for UserId {
389 fn from(id: &u32) -> Self {
390 Self::new(*id)
391 }
392}
393
394impl From<UserId> for u32 {
395 fn from(id: UserId) -> Self {
396 id.user_id
397 }
398}
399
400#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
401pub struct ConnectionId(pub u32);
402
403impl ConnectionId {
404 pub const fn new(id: u32) -> Self {
405 ConnectionId(id)
406 }
407
408 pub const fn placeholder() -> Self {
409 ConnectionId(OBJECT_ID_PLACEHOLDER)
410 }
411
412 pub fn connection_id(&self) -> u32 {
413 self.0
414 }
415}
416
417impl From<u32> for ConnectionId {
418 fn from(id: u32) -> Self {
419 Self::new(id)
420 }
421}
422
423impl From<&u32> for ConnectionId {
424 fn from(id: &u32) -> Self {
425 Self::new(*id)
426 }
427}
428
429impl From<ConnectionId> for u32 {
430 fn from(id: ConnectionId) -> Self {
431 id.0
432 }
433}
434
435#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
436pub struct SecretId(pub u32);
437
438impl SecretId {
439 pub const fn new(id: u32) -> Self {
440 SecretId(id)
441 }
442
443 pub const fn placeholder() -> Self {
444 SecretId(OBJECT_ID_PLACEHOLDER)
445 }
446
447 pub fn secret_id(&self) -> u32 {
448 self.0
449 }
450}
451
452impl From<u32> for SecretId {
453 fn from(id: u32) -> Self {
454 Self::new(id)
455 }
456}
457
458impl From<&u32> for SecretId {
459 fn from(id: &u32) -> Self {
460 Self::new(*id)
461 }
462}
463
464impl From<SecretId> for u32 {
465 fn from(id: SecretId) -> Self {
466 id.0
467 }
468}
469
470#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
471pub enum ConflictBehavior {
472 #[default]
473 NoCheck,
474 Overwrite,
475 IgnoreConflict,
476 DoUpdateIfNotNull,
477}
478
479impl ConflictBehavior {
480 pub fn from_protobuf(tb_conflict_behavior: &PbHandleConflictBehavior) -> Self {
481 match tb_conflict_behavior {
482 PbHandleConflictBehavior::Overwrite => ConflictBehavior::Overwrite,
483 PbHandleConflictBehavior::Ignore => ConflictBehavior::IgnoreConflict,
484 PbHandleConflictBehavior::DoUpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
485 PbHandleConflictBehavior::NoCheck | PbHandleConflictBehavior::Unspecified => {
488 ConflictBehavior::NoCheck
489 }
490 }
491 }
492
493 pub fn to_protobuf(self) -> PbHandleConflictBehavior {
494 match self {
495 ConflictBehavior::NoCheck => PbHandleConflictBehavior::NoCheck,
496 ConflictBehavior::Overwrite => PbHandleConflictBehavior::Overwrite,
497 ConflictBehavior::IgnoreConflict => PbHandleConflictBehavior::Ignore,
498 ConflictBehavior::DoUpdateIfNotNull => PbHandleConflictBehavior::DoUpdateIfNotNull,
499 }
500 }
501
502 pub fn debug_to_string(self) -> String {
503 match self {
504 ConflictBehavior::NoCheck => "NoCheck".to_owned(),
505 ConflictBehavior::Overwrite => "Overwrite".to_owned(),
506 ConflictBehavior::IgnoreConflict => "IgnoreConflict".to_owned(),
507 ConflictBehavior::DoUpdateIfNotNull => "DoUpdateIfNotNull".to_owned(),
508 }
509 }
510}
511
512#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
513pub enum Engine {
514 #[default]
515 Hummock,
516 Iceberg,
517}
518
519impl Engine {
520 pub fn from_protobuf(engine: &PbEngine) -> Self {
521 match engine {
522 PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
523 PbEngine::Iceberg => Engine::Iceberg,
524 }
525 }
526
527 pub fn to_protobuf(self) -> PbEngine {
528 match self {
529 Engine::Hummock => PbEngine::Hummock,
530 Engine::Iceberg => PbEngine::Iceberg,
531 }
532 }
533
534 pub fn debug_to_string(self) -> String {
535 match self {
536 Engine::Hummock => "Hummock".to_owned(),
537 Engine::Iceberg => "Iceberg".to_owned(),
538 }
539 }
540}
541
542#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
543pub enum StreamJobStatus {
544 #[default]
545 Creating,
546 Created,
547}
548
549impl StreamJobStatus {
550 pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
551 match stream_job_status {
552 PbStreamJobStatus::Creating => StreamJobStatus::Creating,
553 PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
554 }
555 }
556
557 pub fn to_proto(self) -> PbStreamJobStatus {
558 match self {
559 StreamJobStatus::Creating => PbStreamJobStatus::Creating,
560 StreamJobStatus::Created => PbStreamJobStatus::Created,
561 }
562 }
563}
564
565#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
566pub enum CreateType {
567 Foreground,
568 Background,
569}
570
571impl Default for CreateType {
572 fn default() -> Self {
573 Self::Foreground
574 }
575}
576
577impl CreateType {
578 pub fn from_proto(pb_create_type: PbCreateType) -> Self {
579 match pb_create_type {
580 PbCreateType::Foreground | PbCreateType::Unspecified => CreateType::Foreground,
581 PbCreateType::Background => CreateType::Background,
582 }
583 }
584
585 pub fn to_proto(self) -> PbCreateType {
586 match self {
587 CreateType::Foreground => PbCreateType::Foreground,
588 CreateType::Background => PbCreateType::Background,
589 }
590 }
591}