1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_hummock_sdk::HummockVersionId;
26use risingwave_pb::catalog::{
27 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28 PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58 CatalogReader(inner)
59 }
60
61 pub fn read_guard(&self) -> CatalogReadGuard {
62 self.0.read_arc_recursive()
64 }
65}
66
67#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73 async fn create_database(
74 &self,
75 db_name: &str,
76 owner: UserId,
77 resource_group: &str,
78 barrier_interval_ms: Option<u32>,
79 checkpoint_frequency: Option<u64>,
80 ) -> Result<()>;
81
82 async fn create_schema(
83 &self,
84 db_id: DatabaseId,
85 schema_name: &str,
86 owner: UserId,
87 ) -> Result<()>;
88
89 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91 async fn create_materialized_view(
92 &self,
93 table: PbTable,
94 graph: StreamFragmentGraph,
95 dependencies: HashSet<ObjectId>,
96 specific_resource_group: Option<String>,
97 if_not_exists: bool,
98 ) -> Result<()>;
99
100 async fn replace_materialized_view(
101 &self,
102 table: PbTable,
103 graph: StreamFragmentGraph,
104 ) -> Result<()>;
105
106 async fn create_table(
107 &self,
108 source: Option<PbSource>,
109 table: PbTable,
110 graph: StreamFragmentGraph,
111 job_type: PbTableJobType,
112 if_not_exists: bool,
113 dependencies: HashSet<ObjectId>,
114 ) -> Result<()>;
115
116 async fn replace_table(
117 &self,
118 source: Option<PbSource>,
119 table: PbTable,
120 graph: StreamFragmentGraph,
121 job_type: TableJobType,
122 ) -> Result<()>;
123
124 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126 async fn create_index(
127 &self,
128 index: PbIndex,
129 table: PbTable,
130 graph: StreamFragmentGraph,
131 if_not_exists: bool,
132 ) -> Result<()>;
133
134 async fn create_source(
135 &self,
136 source: PbSource,
137 graph: Option<StreamFragmentGraph>,
138 if_not_exists: bool,
139 ) -> Result<()>;
140
141 async fn create_sink(
142 &self,
143 sink: PbSink,
144 graph: StreamFragmentGraph,
145 dependencies: HashSet<ObjectId>,
146 if_not_exists: bool,
147 ) -> Result<()>;
148
149 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151 async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153 async fn create_connection(
154 &self,
155 connection_name: String,
156 database_id: DatabaseId,
157 schema_id: SchemaId,
158 owner_id: u32,
159 connection: create_connection_request::Payload,
160 ) -> Result<()>;
161
162 async fn create_secret(
163 &self,
164 secret_name: String,
165 database_id: DatabaseId,
166 schema_id: SchemaId,
167 owner_id: u32,
168 payload: Vec<u8>,
169 ) -> Result<()>;
170
171 async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173 async fn drop_table(
174 &self,
175 source_id: Option<u32>,
176 table_id: TableId,
177 cascade: bool,
178 ) -> Result<()>;
179
180 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186 async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190 async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191 -> Result<()>;
192
193 async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
204
205 async fn alter_secret(
206 &self,
207 secret_id: SecretId,
208 secret_name: String,
209 database_id: DatabaseId,
210 schema_id: SchemaId,
211 owner_id: u32,
212 payload: Vec<u8>,
213 ) -> Result<()>;
214
215 async fn alter_name(
216 &self,
217 object_id: alter_name_request::Object,
218 object_name: &str,
219 ) -> Result<()>;
220
221 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
222
223 async fn alter_source(&self, source: PbSource) -> Result<()>;
225
226 async fn alter_parallelism(
227 &self,
228 job_id: JobId,
229 parallelism: PbTableParallelism,
230 deferred: bool,
231 ) -> Result<()>;
232
233 async fn alter_config(
234 &self,
235 job_id: JobId,
236 entries_to_add: HashMap<String, String>,
237 keys_to_remove: Vec<String>,
238 ) -> Result<()>;
239
240 async fn alter_resource_group(
241 &self,
242 table_id: TableId,
243 resource_group: Option<String>,
244 deferred: bool,
245 ) -> Result<()>;
246
247 async fn alter_set_schema(
248 &self,
249 object: alter_set_schema_request::Object,
250 new_schema_id: SchemaId,
251 ) -> Result<()>;
252
253 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
254
255 async fn alter_database_param(
256 &self,
257 database_id: DatabaseId,
258 param: AlterDatabaseParam,
259 ) -> Result<()>;
260
261 async fn create_iceberg_table(
262 &self,
263 table_job_info: PbTableJobInfo,
264 sink_job_info: PbSinkJobInfo,
265 iceberg_source: PbSource,
266 if_not_exists: bool,
267 ) -> Result<()>;
268}
269
270#[derive(Clone)]
271pub struct CatalogWriterImpl {
272 meta_client: MetaClient,
273 catalog_updated_rx: Receiver<CatalogVersion>,
274 hummock_snapshot_manager: HummockSnapshotManagerRef,
275}
276
277#[async_trait::async_trait]
278impl CatalogWriter for CatalogWriterImpl {
279 async fn create_database(
280 &self,
281 db_name: &str,
282 owner: UserId,
283 resource_group: &str,
284 barrier_interval_ms: Option<u32>,
285 checkpoint_frequency: Option<u64>,
286 ) -> Result<()> {
287 let version = self
288 .meta_client
289 .create_database(PbDatabase {
290 name: db_name.to_owned(),
291 id: 0.into(),
292 owner,
293 resource_group: resource_group.to_owned(),
294 barrier_interval_ms,
295 checkpoint_frequency,
296 })
297 .await?;
298 self.wait_version(version).await
299 }
300
301 async fn create_schema(
302 &self,
303 db_id: DatabaseId,
304 schema_name: &str,
305 owner: UserId,
306 ) -> Result<()> {
307 let version = self
308 .meta_client
309 .create_schema(PbSchema {
310 id: 0.into(),
311 name: schema_name.to_owned(),
312 database_id: db_id,
313 owner,
314 })
315 .await?;
316 self.wait_version(version).await
317 }
318
319 async fn create_materialized_view(
321 &self,
322 table: PbTable,
323 graph: StreamFragmentGraph,
324 dependencies: HashSet<ObjectId>,
325 specific_resource_group: Option<String>,
326 if_not_exists: bool,
327 ) -> Result<()> {
328 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
329 let version = self
330 .meta_client
331 .create_materialized_view(
332 table,
333 graph,
334 dependencies,
335 specific_resource_group,
336 if_not_exists,
337 )
338 .await?;
339 if matches!(create_type, PbCreateType::Foreground) {
340 self.wait_version(version).await?
341 }
342 Ok(())
343 }
344
345 async fn replace_materialized_view(
346 &self,
347 table: PbTable,
348 graph: StreamFragmentGraph,
349 ) -> Result<()> {
350 notice_to_user(format!("table: {table:#?}"));
352 notice_to_user(format!("graph: {graph:#?}"));
353
354 let version = self
355 .meta_client
356 .replace_job(
357 graph,
358 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
359 )
360 .await?;
361
362 self.wait_version(version).await
363 }
364
365 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
366 let version = self.meta_client.create_view(view, dependencies).await?;
367 self.wait_version(version).await
368 }
369
370 async fn create_index(
371 &self,
372 index: PbIndex,
373 table: PbTable,
374 graph: StreamFragmentGraph,
375 if_not_exists: bool,
376 ) -> Result<()> {
377 let version = self
378 .meta_client
379 .create_index(index, table, graph, if_not_exists)
380 .await?;
381 self.wait_version(version).await
382 }
383
384 async fn create_table(
385 &self,
386 source: Option<PbSource>,
387 table: PbTable,
388 graph: StreamFragmentGraph,
389 job_type: PbTableJobType,
390 if_not_exists: bool,
391 dependencies: HashSet<ObjectId>,
392 ) -> Result<()> {
393 let version = self
394 .meta_client
395 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
396 .await?;
397 self.wait_version(version).await
398 }
399
400 async fn replace_table(
401 &self,
402 source: Option<PbSource>,
403 table: PbTable,
404 graph: StreamFragmentGraph,
405 job_type: TableJobType,
406 ) -> Result<()> {
407 let version = self
408 .meta_client
409 .replace_job(
410 graph,
411 ReplaceJob::ReplaceTable(ReplaceTable {
412 source,
413 table: Some(table),
414 job_type: job_type as _,
415 }),
416 )
417 .await?;
418 self.wait_version(version).await
419 }
420
421 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
422 let version = self
423 .meta_client
424 .replace_job(
425 graph,
426 ReplaceJob::ReplaceSource(ReplaceSource {
427 source: Some(source),
428 }),
429 )
430 .await?;
431 self.wait_version(version).await
432 }
433
434 async fn create_source(
435 &self,
436 source: PbSource,
437 graph: Option<StreamFragmentGraph>,
438 if_not_exists: bool,
439 ) -> Result<()> {
440 let version = self
441 .meta_client
442 .create_source(source, graph, if_not_exists)
443 .await?;
444 self.wait_version(version).await
445 }
446
447 async fn create_sink(
448 &self,
449 sink: PbSink,
450 graph: StreamFragmentGraph,
451 dependencies: HashSet<ObjectId>,
452 if_not_exists: bool,
453 ) -> Result<()> {
454 let version = self
455 .meta_client
456 .create_sink(sink, graph, dependencies, if_not_exists)
457 .await?;
458 self.wait_version(version).await
459 }
460
461 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
462 let version = self.meta_client.create_subscription(subscription).await?;
463 self.wait_version(version).await
464 }
465
466 async fn create_function(&self, function: PbFunction) -> Result<()> {
467 let version = self.meta_client.create_function(function).await?;
468 self.wait_version(version).await
469 }
470
471 async fn create_connection(
472 &self,
473 connection_name: String,
474 database_id: DatabaseId,
475 schema_id: SchemaId,
476 owner_id: u32,
477 connection: create_connection_request::Payload,
478 ) -> Result<()> {
479 let version = self
480 .meta_client
481 .create_connection(
482 connection_name,
483 database_id,
484 schema_id,
485 owner_id,
486 connection,
487 )
488 .await?;
489 self.wait_version(version).await
490 }
491
492 async fn create_secret(
493 &self,
494 secret_name: String,
495 database_id: DatabaseId,
496 schema_id: SchemaId,
497 owner_id: u32,
498 payload: Vec<u8>,
499 ) -> Result<()> {
500 let version = self
501 .meta_client
502 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
503 .await?;
504 self.wait_version(version).await
505 }
506
507 async fn comment_on(&self, comment: PbComment) -> Result<()> {
508 let version = self.meta_client.comment_on(comment).await?;
509 self.wait_version(version).await
510 }
511
512 async fn drop_table(
513 &self,
514 source_id: Option<u32>,
515 table_id: TableId,
516 cascade: bool,
517 ) -> Result<()> {
518 let version = self
519 .meta_client
520 .drop_table(source_id, table_id, cascade)
521 .await?;
522 self.wait_version(version).await
523 }
524
525 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
526 let version = self
527 .meta_client
528 .drop_materialized_view(table_id, cascade)
529 .await?;
530 self.wait_version(version).await
531 }
532
533 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
534 let version = self.meta_client.drop_view(view_id, cascade).await?;
535 self.wait_version(version).await
536 }
537
538 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
539 let version = self.meta_client.drop_source(source_id, cascade).await?;
540 self.wait_version(version).await
541 }
542
543 async fn reset_source(&self, source_id: SourceId) -> Result<()> {
544 let version = self.meta_client.reset_source(source_id).await?;
545 self.wait_version(version).await
546 }
547
548 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
549 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
550 self.wait_version(version).await
551 }
552
553 async fn drop_subscription(
554 &self,
555 subscription_id: SubscriptionId,
556 cascade: bool,
557 ) -> Result<()> {
558 let version = self
559 .meta_client
560 .drop_subscription(subscription_id, cascade)
561 .await?;
562 self.wait_version(version).await
563 }
564
565 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
566 let version = self.meta_client.drop_index(index_id, cascade).await?;
567 self.wait_version(version).await
568 }
569
570 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
571 let version = self.meta_client.drop_function(function_id, cascade).await?;
572 self.wait_version(version).await
573 }
574
575 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
576 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
577 self.wait_version(version).await
578 }
579
580 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
581 let version = self.meta_client.drop_database(database_id).await?;
582 self.wait_version(version).await
583 }
584
585 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
586 let version = self
587 .meta_client
588 .drop_connection(connection_id, cascade)
589 .await?;
590 self.wait_version(version).await
591 }
592
593 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
594 let version = self.meta_client.drop_secret(secret_id).await?;
595 self.wait_version(version).await
596 }
597
598 async fn alter_name(
599 &self,
600 object_id: alter_name_request::Object,
601 object_name: &str,
602 ) -> Result<()> {
603 let version = self.meta_client.alter_name(object_id, object_name).await?;
604 self.wait_version(version).await
605 }
606
607 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
608 let version = self.meta_client.alter_owner(object, owner_id).await?;
609 self.wait_version(version).await
610 }
611
612 async fn alter_set_schema(
613 &self,
614 object: alter_set_schema_request::Object,
615 new_schema_id: SchemaId,
616 ) -> Result<()> {
617 let version = self
618 .meta_client
619 .alter_set_schema(object, new_schema_id)
620 .await?;
621 self.wait_version(version).await
622 }
623
624 async fn alter_source(&self, source: PbSource) -> Result<()> {
625 let version = self.meta_client.alter_source(source).await?;
626 self.wait_version(version).await
627 }
628
629 async fn alter_parallelism(
630 &self,
631 job_id: JobId,
632 parallelism: PbTableParallelism,
633 deferred: bool,
634 ) -> Result<()> {
635 self.meta_client
636 .alter_parallelism(job_id, parallelism, deferred)
637 .await?;
638 Ok(())
639 }
640
641 async fn alter_config(
642 &self,
643 job_id: JobId,
644 entries_to_add: HashMap<String, String>,
645 keys_to_remove: Vec<String>,
646 ) -> Result<()> {
647 self.meta_client
648 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
649 .await?;
650 Ok(())
651 }
652
653 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
654 let version = self.meta_client.alter_swap_rename(object).await?;
655 self.wait_version(version).await
656 }
657
658 async fn alter_secret(
659 &self,
660 secret_id: SecretId,
661 secret_name: String,
662 database_id: DatabaseId,
663 schema_id: SchemaId,
664 owner_id: u32,
665 payload: Vec<u8>,
666 ) -> Result<()> {
667 let version = self
668 .meta_client
669 .alter_secret(
670 secret_id,
671 secret_name,
672 database_id,
673 schema_id,
674 owner_id,
675 payload,
676 )
677 .await?;
678 self.wait_version(version).await
679 }
680
681 async fn alter_resource_group(
682 &self,
683 table_id: TableId,
684 resource_group: Option<String>,
685 deferred: bool,
686 ) -> Result<()> {
687 self.meta_client
688 .alter_resource_group(table_id, resource_group, deferred)
689 .await
690 .map_err(|e| anyhow!(e))?;
691
692 Ok(())
693 }
694
695 async fn alter_database_param(
696 &self,
697 database_id: DatabaseId,
698 param: AlterDatabaseParam,
699 ) -> Result<()> {
700 let version = self
701 .meta_client
702 .alter_database_param(database_id, param)
703 .await
704 .map_err(|e| anyhow!(e))?;
705 self.wait_version(version).await
706 }
707
708 async fn create_iceberg_table(
709 &self,
710 table_job_info: PbTableJobInfo,
711 sink_job_info: PbSinkJobInfo,
712 iceberg_source: PbSource,
713 if_not_exists: bool,
714 ) -> Result<()> {
715 let version = self
716 .meta_client
717 .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
718 .await?;
719 self.wait_version(version).await
720 }
721}
722
723impl CatalogWriterImpl {
724 pub fn new(
725 meta_client: MetaClient,
726 catalog_updated_rx: Receiver<CatalogVersion>,
727 hummock_snapshot_manager: HummockSnapshotManagerRef,
728 ) -> Self {
729 Self {
730 meta_client,
731 catalog_updated_rx,
732 hummock_snapshot_manager,
733 }
734 }
735
736 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
737 let mut rx = self.catalog_updated_rx.clone();
738 while *rx.borrow_and_update() < version.catalog_version {
739 rx.changed().await.map_err(|e| anyhow!(e))?;
740 }
741 self.hummock_snapshot_manager
742 .wait(HummockVersionId::new(version.hummock_version_id))
743 .await;
744 Ok(())
745 }
746}