1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_pb::catalog::{
27 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28 PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37 streaming_job_resource_type,
38};
39use risingwave_pb::meta::PbTableParallelism;
40use risingwave_pb::stream_plan::StreamFragmentGraph;
41use risingwave_rpc_client::MetaClient;
42use tokio::sync::watch::Receiver;
43
44use super::root_catalog::Catalog;
45use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
46use crate::error::Result;
47use crate::scheduler::HummockSnapshotManagerRef;
48use crate::session::current::notice_to_user;
49use crate::user::UserId;
50
51pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
52
53#[derive(Clone)]
55pub struct CatalogReader(Arc<RwLock<Catalog>>);
56
57impl CatalogReader {
58 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
59 CatalogReader(inner)
60 }
61
62 pub fn read_guard(&self) -> CatalogReadGuard {
63 self.0.read_arc_recursive()
65 }
66}
67
68#[async_trait::async_trait]
73pub trait CatalogWriter: Send + Sync {
74 async fn create_database(
75 &self,
76 db_name: &str,
77 owner: UserId,
78 resource_group: &str,
79 barrier_interval_ms: Option<u32>,
80 checkpoint_frequency: Option<u64>,
81 ) -> Result<()>;
82
83 async fn create_schema(
84 &self,
85 db_id: DatabaseId,
86 schema_name: &str,
87 owner: UserId,
88 ) -> Result<()>;
89
90 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
91
92 async fn create_materialized_view(
93 &self,
94 table: PbTable,
95 graph: StreamFragmentGraph,
96 dependencies: HashSet<ObjectId>,
97 resource_type: streaming_job_resource_type::ResourceType,
98 if_not_exists: bool,
99 refresh_interval_sec: Option<u64>,
100 ) -> Result<()>;
101
102 async fn replace_materialized_view(
103 &self,
104 table: PbTable,
105 graph: StreamFragmentGraph,
106 ) -> Result<()>;
107
108 async fn create_table(
109 &self,
110 source: Option<PbSource>,
111 table: PbTable,
112 graph: StreamFragmentGraph,
113 job_type: PbTableJobType,
114 if_not_exists: bool,
115 dependencies: HashSet<ObjectId>,
116 ) -> Result<()>;
117
118 async fn replace_table(
119 &self,
120 source: Option<PbSource>,
121 table: PbTable,
122 graph: StreamFragmentGraph,
123 job_type: TableJobType,
124 ) -> Result<()>;
125
126 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
127
128 async fn create_index(
129 &self,
130 index: PbIndex,
131 table: PbTable,
132 graph: StreamFragmentGraph,
133 if_not_exists: bool,
134 ) -> Result<()>;
135
136 async fn create_source(
137 &self,
138 source: PbSource,
139 graph: Option<StreamFragmentGraph>,
140 if_not_exists: bool,
141 ) -> Result<()>;
142
143 async fn create_sink(
144 &self,
145 sink: PbSink,
146 graph: StreamFragmentGraph,
147 dependencies: HashSet<ObjectId>,
148 if_not_exists: bool,
149 ) -> Result<()>;
150
151 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
152
153 async fn create_function(&self, function: PbFunction) -> Result<()>;
154
155 async fn create_connection(
156 &self,
157 connection_name: String,
158 database_id: DatabaseId,
159 schema_id: SchemaId,
160 owner_id: UserId,
161 connection: create_connection_request::Payload,
162 ) -> Result<()>;
163
164 async fn create_secret(
165 &self,
166 secret_name: String,
167 database_id: DatabaseId,
168 schema_id: SchemaId,
169 owner_id: UserId,
170 payload: Vec<u8>,
171 ) -> Result<()>;
172
173 async fn comment_on(&self, comment: PbComment) -> Result<()>;
174
175 async fn drop_table(
176 &self,
177 source_id: Option<SourceId>,
178 table_id: TableId,
179 cascade: bool,
180 ) -> Result<()>;
181
182 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
183
184 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
185
186 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
187
188 async fn reset_source(&self, source_id: SourceId) -> Result<()>;
189
190 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
191
192 async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
193 -> Result<()>;
194
195 async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
196
197 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
198
199 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
200
201 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
202
203 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
204
205 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
206
207 async fn alter_secret(
208 &self,
209 secret_id: SecretId,
210 secret_name: String,
211 database_id: DatabaseId,
212 schema_id: SchemaId,
213 owner_id: UserId,
214 payload: Vec<u8>,
215 ) -> Result<()>;
216
217 async fn alter_subscription_retention(
218 &self,
219 subscription_id: SubscriptionId,
220 retention_seconds: u64,
221 definition: String,
222 ) -> Result<()>;
223
224 async fn alter_name(
225 &self,
226 object_id: alter_name_request::Object,
227 object_name: &str,
228 ) -> Result<()>;
229
230 async fn alter_owner(
231 &self,
232 object: alter_owner_request::Object,
233 owner_id: UserId,
234 ) -> Result<()>;
235
236 async fn alter_source(&self, source: PbSource) -> Result<()>;
238
239 async fn alter_parallelism(
240 &self,
241 job_id: JobId,
242 parallelism: PbTableParallelism,
243 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
244 deferred: bool,
245 ) -> Result<()>;
246
247 async fn alter_backfill_parallelism(
248 &self,
249 job_id: JobId,
250 parallelism: Option<PbTableParallelism>,
251 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
252 deferred: bool,
253 ) -> Result<()>;
254
255 async fn alter_config(
256 &self,
257 job_id: JobId,
258 entries_to_add: HashMap<String, String>,
259 keys_to_remove: Vec<String>,
260 ) -> Result<()>;
261
262 async fn alter_resource_group(
263 &self,
264 table_id: TableId,
265 resource_group: Option<String>,
266 deferred: bool,
267 ) -> Result<()>;
268
269 async fn alter_set_schema(
270 &self,
271 object: alter_set_schema_request::Object,
272 new_schema_id: SchemaId,
273 ) -> Result<()>;
274
275 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
276
277 async fn alter_database_param(
278 &self,
279 database_id: DatabaseId,
280 param: AlterDatabaseParam,
281 ) -> Result<()>;
282
283 async fn create_iceberg_table(
284 &self,
285 table_job_info: PbTableJobInfo,
286 sink_job_info: PbSinkJobInfo,
287 iceberg_source: PbSource,
288 if_not_exists: bool,
289 ) -> Result<()>;
290
291 async fn wait(&self, job_id: Option<JobId>) -> Result<()>;
292}
293
294#[derive(Clone)]
295pub struct CatalogWriterImpl {
296 meta_client: MetaClient,
297 catalog_updated_rx: Receiver<CatalogVersion>,
298 hummock_snapshot_manager: HummockSnapshotManagerRef,
299}
300
301#[async_trait::async_trait]
302impl CatalogWriter for CatalogWriterImpl {
303 async fn create_database(
304 &self,
305 db_name: &str,
306 owner: UserId,
307 resource_group: &str,
308 barrier_interval_ms: Option<u32>,
309 checkpoint_frequency: Option<u64>,
310 ) -> Result<()> {
311 let version = self
312 .meta_client
313 .create_database(PbDatabase {
314 name: db_name.to_owned(),
315 id: 0.into(),
316 owner,
317 resource_group: resource_group.to_owned(),
318 barrier_interval_ms,
319 checkpoint_frequency,
320 })
321 .await?;
322 self.wait_version(version).await
323 }
324
325 async fn create_schema(
326 &self,
327 db_id: DatabaseId,
328 schema_name: &str,
329 owner: UserId,
330 ) -> Result<()> {
331 let version = self
332 .meta_client
333 .create_schema(PbSchema {
334 id: 0.into(),
335 name: schema_name.to_owned(),
336 database_id: db_id,
337 owner,
338 })
339 .await?;
340 self.wait_version(version).await
341 }
342
343 async fn create_materialized_view(
345 &self,
346 table: PbTable,
347 graph: StreamFragmentGraph,
348 dependencies: HashSet<ObjectId>,
349 resource_type: streaming_job_resource_type::ResourceType,
350 if_not_exists: bool,
351 refresh_interval_sec: Option<u64>,
352 ) -> Result<()> {
353 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
354 let version = self
355 .meta_client
356 .create_materialized_view(
357 table,
358 graph,
359 dependencies,
360 resource_type,
361 if_not_exists,
362 refresh_interval_sec,
363 )
364 .await?;
365 if matches!(create_type, PbCreateType::Foreground) {
366 self.wait_version(version).await?
367 }
368 Ok(())
369 }
370
371 async fn replace_materialized_view(
372 &self,
373 table: PbTable,
374 graph: StreamFragmentGraph,
375 ) -> Result<()> {
376 notice_to_user(format!("table: {table:#?}"));
378 notice_to_user(format!("graph: {graph:#?}"));
379
380 let version = self
381 .meta_client
382 .replace_job(
383 graph,
384 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
385 )
386 .await?;
387
388 self.wait_version(version).await
389 }
390
391 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
392 let version = self.meta_client.create_view(view, dependencies).await?;
393 self.wait_version(version).await
394 }
395
396 async fn create_index(
397 &self,
398 index: PbIndex,
399 table: PbTable,
400 graph: StreamFragmentGraph,
401 if_not_exists: bool,
402 ) -> Result<()> {
403 let version = self
404 .meta_client
405 .create_index(index, table, graph, if_not_exists)
406 .await?;
407 self.wait_version(version).await
408 }
409
410 async fn create_table(
411 &self,
412 source: Option<PbSource>,
413 table: PbTable,
414 graph: StreamFragmentGraph,
415 job_type: PbTableJobType,
416 if_not_exists: bool,
417 dependencies: HashSet<ObjectId>,
418 ) -> Result<()> {
419 let version = self
420 .meta_client
421 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
422 .await?;
423 self.wait_version(version).await
424 }
425
426 async fn replace_table(
427 &self,
428 source: Option<PbSource>,
429 table: PbTable,
430 graph: StreamFragmentGraph,
431 job_type: TableJobType,
432 ) -> Result<()> {
433 let version = self
434 .meta_client
435 .replace_job(
436 graph,
437 ReplaceJob::ReplaceTable(ReplaceTable {
438 source,
439 table: Some(table),
440 job_type: job_type as _,
441 }),
442 )
443 .await?;
444 self.wait_version(version).await
445 }
446
447 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
448 let version = self
449 .meta_client
450 .replace_job(
451 graph,
452 ReplaceJob::ReplaceSource(ReplaceSource {
453 source: Some(source),
454 }),
455 )
456 .await?;
457 self.wait_version(version).await
458 }
459
460 async fn create_source(
461 &self,
462 source: PbSource,
463 graph: Option<StreamFragmentGraph>,
464 if_not_exists: bool,
465 ) -> Result<()> {
466 let version = self
467 .meta_client
468 .create_source(source, graph, if_not_exists)
469 .await?;
470 self.wait_version(version).await
471 }
472
473 async fn create_sink(
474 &self,
475 sink: PbSink,
476 graph: StreamFragmentGraph,
477 dependencies: HashSet<ObjectId>,
478 if_not_exists: bool,
479 ) -> Result<()> {
480 let version = self
481 .meta_client
482 .create_sink(sink, graph, dependencies, if_not_exists)
483 .await?;
484 self.wait_version(version).await
485 }
486
487 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
488 let version = self.meta_client.create_subscription(subscription).await?;
489 self.wait_version(version).await
490 }
491
492 async fn create_function(&self, function: PbFunction) -> Result<()> {
493 let version = self.meta_client.create_function(function).await?;
494 self.wait_version(version).await
495 }
496
497 async fn create_connection(
498 &self,
499 connection_name: String,
500 database_id: DatabaseId,
501 schema_id: SchemaId,
502 owner_id: UserId,
503 connection: create_connection_request::Payload,
504 ) -> Result<()> {
505 let version = self
506 .meta_client
507 .create_connection(
508 connection_name,
509 database_id,
510 schema_id,
511 owner_id,
512 connection,
513 )
514 .await?;
515 self.wait_version(version).await
516 }
517
518 async fn create_secret(
519 &self,
520 secret_name: String,
521 database_id: DatabaseId,
522 schema_id: SchemaId,
523 owner_id: UserId,
524 payload: Vec<u8>,
525 ) -> Result<()> {
526 let version = self
527 .meta_client
528 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
529 .await?;
530 self.wait_version(version).await
531 }
532
533 async fn comment_on(&self, comment: PbComment) -> Result<()> {
534 let version = self.meta_client.comment_on(comment).await?;
535 self.wait_version(version).await
536 }
537
538 async fn drop_table(
539 &self,
540 source_id: Option<SourceId>,
541 table_id: TableId,
542 cascade: bool,
543 ) -> Result<()> {
544 let version = self
545 .meta_client
546 .drop_table(source_id, table_id, cascade)
547 .await?;
548 self.wait_version(version).await
549 }
550
551 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
552 let version = self
553 .meta_client
554 .drop_materialized_view(table_id, cascade)
555 .await?;
556 self.wait_version(version).await
557 }
558
559 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
560 let version = self.meta_client.drop_view(view_id, cascade).await?;
561 self.wait_version(version).await
562 }
563
564 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
565 let version = self.meta_client.drop_source(source_id, cascade).await?;
566 self.wait_version(version).await
567 }
568
569 async fn reset_source(&self, source_id: SourceId) -> Result<()> {
570 let version = self.meta_client.reset_source(source_id).await?;
571 self.wait_version(version).await
572 }
573
574 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
575 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
576 self.wait_version(version).await
577 }
578
579 async fn drop_subscription(
580 &self,
581 subscription_id: SubscriptionId,
582 cascade: bool,
583 ) -> Result<()> {
584 let version = self
585 .meta_client
586 .drop_subscription(subscription_id, cascade)
587 .await?;
588 self.wait_version(version).await
589 }
590
591 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
592 let version = self.meta_client.drop_index(index_id, cascade).await?;
593 self.wait_version(version).await
594 }
595
596 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
597 let version = self.meta_client.drop_function(function_id, cascade).await?;
598 self.wait_version(version).await
599 }
600
601 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
602 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
603 self.wait_version(version).await
604 }
605
606 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
607 let version = self.meta_client.drop_database(database_id).await?;
608 self.wait_version(version).await
609 }
610
611 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
612 let version = self
613 .meta_client
614 .drop_connection(connection_id, cascade)
615 .await?;
616 self.wait_version(version).await
617 }
618
619 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
620 let version = self.meta_client.drop_secret(secret_id, cascade).await?;
621 self.wait_version(version).await
622 }
623
624 async fn alter_name(
625 &self,
626 object_id: alter_name_request::Object,
627 object_name: &str,
628 ) -> Result<()> {
629 let version = self.meta_client.alter_name(object_id, object_name).await?;
630 self.wait_version(version).await
631 }
632
633 async fn alter_owner(
634 &self,
635 object: alter_owner_request::Object,
636 owner_id: UserId,
637 ) -> Result<()> {
638 let version = self.meta_client.alter_owner(object, owner_id).await?;
639 self.wait_version(version).await
640 }
641
642 async fn alter_set_schema(
643 &self,
644 object: alter_set_schema_request::Object,
645 new_schema_id: SchemaId,
646 ) -> Result<()> {
647 let version = self
648 .meta_client
649 .alter_set_schema(object, new_schema_id)
650 .await?;
651 self.wait_version(version).await
652 }
653
654 async fn alter_source(&self, source: PbSource) -> Result<()> {
655 let version = self.meta_client.alter_source(source).await?;
656 self.wait_version(version).await
657 }
658
659 async fn alter_parallelism(
660 &self,
661 job_id: JobId,
662 parallelism: PbTableParallelism,
663 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
664 deferred: bool,
665 ) -> Result<()> {
666 self.meta_client
667 .alter_parallelism(job_id, parallelism, adaptive_parallelism_strategy, deferred)
668 .await?;
669 Ok(())
670 }
671
672 async fn alter_backfill_parallelism(
673 &self,
674 job_id: JobId,
675 parallelism: Option<PbTableParallelism>,
676 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
677 deferred: bool,
678 ) -> Result<()> {
679 self.meta_client
680 .alter_backfill_parallelism(
681 job_id,
682 parallelism,
683 adaptive_parallelism_strategy,
684 deferred,
685 )
686 .await?;
687 Ok(())
688 }
689
690 async fn alter_config(
691 &self,
692 job_id: JobId,
693 entries_to_add: HashMap<String, String>,
694 keys_to_remove: Vec<String>,
695 ) -> Result<()> {
696 self.meta_client
697 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
698 .await?;
699 Ok(())
700 }
701
702 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
703 let version = self.meta_client.alter_swap_rename(object).await?;
704 self.wait_version(version).await
705 }
706
707 async fn alter_secret(
708 &self,
709 secret_id: SecretId,
710 secret_name: String,
711 database_id: DatabaseId,
712 schema_id: SchemaId,
713 owner_id: UserId,
714 payload: Vec<u8>,
715 ) -> Result<()> {
716 let version = self
717 .meta_client
718 .alter_secret(
719 secret_id,
720 secret_name,
721 database_id,
722 schema_id,
723 owner_id,
724 payload,
725 )
726 .await?;
727 self.wait_version(version).await
728 }
729
730 async fn alter_subscription_retention(
731 &self,
732 subscription_id: SubscriptionId,
733 retention_seconds: u64,
734 definition: String,
735 ) -> Result<()> {
736 let version = self
737 .meta_client
738 .alter_subscription_retention(subscription_id, retention_seconds, definition)
739 .await?;
740 self.wait_version(version).await
741 }
742
743 async fn alter_resource_group(
744 &self,
745 table_id: TableId,
746 resource_group: Option<String>,
747 deferred: bool,
748 ) -> Result<()> {
749 self.meta_client
750 .alter_resource_group(table_id, resource_group, deferred)
751 .await
752 .map_err(|e| anyhow!(e))?;
753
754 Ok(())
755 }
756
757 async fn alter_database_param(
758 &self,
759 database_id: DatabaseId,
760 param: AlterDatabaseParam,
761 ) -> Result<()> {
762 let version = self
763 .meta_client
764 .alter_database_param(database_id, param)
765 .await
766 .map_err(|e| anyhow!(e))?;
767 self.wait_version(version).await
768 }
769
770 async fn create_iceberg_table(
771 &self,
772 table_job_info: PbTableJobInfo,
773 sink_job_info: PbSinkJobInfo,
774 iceberg_source: PbSource,
775 if_not_exists: bool,
776 ) -> Result<()> {
777 let version = Box::pin(self.meta_client.create_iceberg_table(
778 table_job_info,
779 sink_job_info,
780 iceberg_source,
781 if_not_exists,
782 ))
783 .await?;
784 self.wait_version(version).await
785 }
786
787 async fn wait(&self, job_id: Option<JobId>) -> Result<()> {
788 let version = self
789 .meta_client
790 .wait(job_id)
791 .await
792 .map_err(|e| anyhow!(e))?;
793 self.wait_version(version).await
794 }
795}
796
797impl CatalogWriterImpl {
798 pub fn new(
799 meta_client: MetaClient,
800 catalog_updated_rx: Receiver<CatalogVersion>,
801 hummock_snapshot_manager: HummockSnapshotManagerRef,
802 ) -> Self {
803 Self {
804 meta_client,
805 catalog_updated_rx,
806 hummock_snapshot_manager,
807 }
808 }
809
810 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
811 let mut rx = self.catalog_updated_rx.clone();
812 while *rx.borrow_and_update() < version.catalog_version {
813 rx.changed().await.map_err(|e| anyhow!(e))?;
814 }
815 self.hummock_snapshot_manager
816 .wait(version.hummock_version_id)
817 .await;
818 Ok(())
819 }
820}