1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36 streaming_job_resource_type,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58 CatalogReader(inner)
59 }
60
61 pub fn read_guard(&self) -> CatalogReadGuard {
62 self.0.read_arc_recursive()
64 }
65}
66
67#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73 async fn create_database(
74 &self,
75 db_name: &str,
76 owner: UserId,
77 resource_group: &str,
78 barrier_interval_ms: Option<u32>,
79 checkpoint_frequency: Option<u64>,
80 ) -> Result<()>;
81
82 async fn create_schema(
83 &self,
84 db_id: DatabaseId,
85 schema_name: &str,
86 owner: UserId,
87 ) -> Result<()>;
88
89 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91 async fn create_materialized_view(
92 &self,
93 table: PbTable,
94 graph: StreamFragmentGraph,
95 dependencies: HashSet<ObjectId>,
96 resource_type: streaming_job_resource_type::ResourceType,
97 if_not_exists: bool,
98 ) -> Result<()>;
99
100 async fn replace_materialized_view(
101 &self,
102 table: PbTable,
103 graph: StreamFragmentGraph,
104 ) -> Result<()>;
105
106 async fn create_table(
107 &self,
108 source: Option<PbSource>,
109 table: PbTable,
110 graph: StreamFragmentGraph,
111 job_type: PbTableJobType,
112 if_not_exists: bool,
113 dependencies: HashSet<ObjectId>,
114 ) -> Result<()>;
115
116 async fn replace_table(
117 &self,
118 source: Option<PbSource>,
119 table: PbTable,
120 graph: StreamFragmentGraph,
121 job_type: TableJobType,
122 ) -> Result<()>;
123
124 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126 async fn create_index(
127 &self,
128 index: PbIndex,
129 table: PbTable,
130 graph: StreamFragmentGraph,
131 if_not_exists: bool,
132 ) -> Result<()>;
133
134 async fn create_source(
135 &self,
136 source: PbSource,
137 graph: Option<StreamFragmentGraph>,
138 if_not_exists: bool,
139 ) -> Result<()>;
140
141 async fn create_sink(
142 &self,
143 sink: PbSink,
144 graph: StreamFragmentGraph,
145 dependencies: HashSet<ObjectId>,
146 if_not_exists: bool,
147 ) -> Result<()>;
148
149 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151 async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153 async fn create_connection(
154 &self,
155 connection_name: String,
156 database_id: DatabaseId,
157 schema_id: SchemaId,
158 owner_id: UserId,
159 connection: create_connection_request::Payload,
160 ) -> Result<()>;
161
162 async fn create_secret(
163 &self,
164 secret_name: String,
165 database_id: DatabaseId,
166 schema_id: SchemaId,
167 owner_id: UserId,
168 payload: Vec<u8>,
169 ) -> Result<()>;
170
171 async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173 async fn drop_table(
174 &self,
175 source_id: Option<SourceId>,
176 table_id: TableId,
177 cascade: bool,
178 ) -> Result<()>;
179
180 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186 async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190 async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191 -> Result<()>;
192
193 async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
204
205 async fn alter_secret(
206 &self,
207 secret_id: SecretId,
208 secret_name: String,
209 database_id: DatabaseId,
210 schema_id: SchemaId,
211 owner_id: UserId,
212 payload: Vec<u8>,
213 ) -> Result<()>;
214
215 async fn alter_subscription_retention(
216 &self,
217 subscription_id: SubscriptionId,
218 retention_seconds: u64,
219 definition: String,
220 ) -> Result<()>;
221
222 async fn alter_name(
223 &self,
224 object_id: alter_name_request::Object,
225 object_name: &str,
226 ) -> Result<()>;
227
228 async fn alter_owner(
229 &self,
230 object: alter_owner_request::Object,
231 owner_id: UserId,
232 ) -> Result<()>;
233
234 async fn alter_source(&self, source: PbSource) -> Result<()>;
236
237 async fn alter_parallelism(
238 &self,
239 job_id: JobId,
240 parallelism: PbTableParallelism,
241 deferred: bool,
242 ) -> Result<()>;
243
244 async fn alter_backfill_parallelism(
245 &self,
246 job_id: JobId,
247 parallelism: Option<PbTableParallelism>,
248 deferred: bool,
249 ) -> Result<()>;
250
251 async fn alter_config(
252 &self,
253 job_id: JobId,
254 entries_to_add: HashMap<String, String>,
255 keys_to_remove: Vec<String>,
256 ) -> Result<()>;
257
258 async fn alter_resource_group(
259 &self,
260 table_id: TableId,
261 resource_group: Option<String>,
262 deferred: bool,
263 ) -> Result<()>;
264
265 async fn alter_set_schema(
266 &self,
267 object: alter_set_schema_request::Object,
268 new_schema_id: SchemaId,
269 ) -> Result<()>;
270
271 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
272
273 async fn alter_database_param(
274 &self,
275 database_id: DatabaseId,
276 param: AlterDatabaseParam,
277 ) -> Result<()>;
278
279 async fn create_iceberg_table(
280 &self,
281 table_job_info: PbTableJobInfo,
282 sink_job_info: PbSinkJobInfo,
283 iceberg_source: PbSource,
284 if_not_exists: bool,
285 ) -> Result<()>;
286
287 async fn wait(&self) -> Result<()>;
288}
289
290#[derive(Clone)]
291pub struct CatalogWriterImpl {
292 meta_client: MetaClient,
293 catalog_updated_rx: Receiver<CatalogVersion>,
294 hummock_snapshot_manager: HummockSnapshotManagerRef,
295}
296
297#[async_trait::async_trait]
298impl CatalogWriter for CatalogWriterImpl {
299 async fn create_database(
300 &self,
301 db_name: &str,
302 owner: UserId,
303 resource_group: &str,
304 barrier_interval_ms: Option<u32>,
305 checkpoint_frequency: Option<u64>,
306 ) -> Result<()> {
307 let version = self
308 .meta_client
309 .create_database(PbDatabase {
310 name: db_name.to_owned(),
311 id: 0.into(),
312 owner,
313 resource_group: resource_group.to_owned(),
314 barrier_interval_ms,
315 checkpoint_frequency,
316 })
317 .await?;
318 self.wait_version(version).await
319 }
320
321 async fn create_schema(
322 &self,
323 db_id: DatabaseId,
324 schema_name: &str,
325 owner: UserId,
326 ) -> Result<()> {
327 let version = self
328 .meta_client
329 .create_schema(PbSchema {
330 id: 0.into(),
331 name: schema_name.to_owned(),
332 database_id: db_id,
333 owner,
334 })
335 .await?;
336 self.wait_version(version).await
337 }
338
339 async fn create_materialized_view(
341 &self,
342 table: PbTable,
343 graph: StreamFragmentGraph,
344 dependencies: HashSet<ObjectId>,
345 resource_type: streaming_job_resource_type::ResourceType,
346 if_not_exists: bool,
347 ) -> Result<()> {
348 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
349 let version = self
350 .meta_client
351 .create_materialized_view(table, graph, dependencies, resource_type, if_not_exists)
352 .await?;
353 if matches!(create_type, PbCreateType::Foreground) {
354 self.wait_version(version).await?
355 }
356 Ok(())
357 }
358
359 async fn replace_materialized_view(
360 &self,
361 table: PbTable,
362 graph: StreamFragmentGraph,
363 ) -> Result<()> {
364 notice_to_user(format!("table: {table:#?}"));
366 notice_to_user(format!("graph: {graph:#?}"));
367
368 let version = self
369 .meta_client
370 .replace_job(
371 graph,
372 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
373 )
374 .await?;
375
376 self.wait_version(version).await
377 }
378
379 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
380 let version = self.meta_client.create_view(view, dependencies).await?;
381 self.wait_version(version).await
382 }
383
384 async fn create_index(
385 &self,
386 index: PbIndex,
387 table: PbTable,
388 graph: StreamFragmentGraph,
389 if_not_exists: bool,
390 ) -> Result<()> {
391 let version = self
392 .meta_client
393 .create_index(index, table, graph, if_not_exists)
394 .await?;
395 self.wait_version(version).await
396 }
397
398 async fn create_table(
399 &self,
400 source: Option<PbSource>,
401 table: PbTable,
402 graph: StreamFragmentGraph,
403 job_type: PbTableJobType,
404 if_not_exists: bool,
405 dependencies: HashSet<ObjectId>,
406 ) -> Result<()> {
407 let version = self
408 .meta_client
409 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
410 .await?;
411 self.wait_version(version).await
412 }
413
414 async fn replace_table(
415 &self,
416 source: Option<PbSource>,
417 table: PbTable,
418 graph: StreamFragmentGraph,
419 job_type: TableJobType,
420 ) -> Result<()> {
421 let version = self
422 .meta_client
423 .replace_job(
424 graph,
425 ReplaceJob::ReplaceTable(ReplaceTable {
426 source,
427 table: Some(table),
428 job_type: job_type as _,
429 }),
430 )
431 .await?;
432 self.wait_version(version).await
433 }
434
435 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
436 let version = self
437 .meta_client
438 .replace_job(
439 graph,
440 ReplaceJob::ReplaceSource(ReplaceSource {
441 source: Some(source),
442 }),
443 )
444 .await?;
445 self.wait_version(version).await
446 }
447
448 async fn create_source(
449 &self,
450 source: PbSource,
451 graph: Option<StreamFragmentGraph>,
452 if_not_exists: bool,
453 ) -> Result<()> {
454 let version = self
455 .meta_client
456 .create_source(source, graph, if_not_exists)
457 .await?;
458 self.wait_version(version).await
459 }
460
461 async fn create_sink(
462 &self,
463 sink: PbSink,
464 graph: StreamFragmentGraph,
465 dependencies: HashSet<ObjectId>,
466 if_not_exists: bool,
467 ) -> Result<()> {
468 let version = self
469 .meta_client
470 .create_sink(sink, graph, dependencies, if_not_exists)
471 .await?;
472 self.wait_version(version).await
473 }
474
475 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
476 let version = self.meta_client.create_subscription(subscription).await?;
477 self.wait_version(version).await
478 }
479
480 async fn create_function(&self, function: PbFunction) -> Result<()> {
481 let version = self.meta_client.create_function(function).await?;
482 self.wait_version(version).await
483 }
484
485 async fn create_connection(
486 &self,
487 connection_name: String,
488 database_id: DatabaseId,
489 schema_id: SchemaId,
490 owner_id: UserId,
491 connection: create_connection_request::Payload,
492 ) -> Result<()> {
493 let version = self
494 .meta_client
495 .create_connection(
496 connection_name,
497 database_id,
498 schema_id,
499 owner_id,
500 connection,
501 )
502 .await?;
503 self.wait_version(version).await
504 }
505
506 async fn create_secret(
507 &self,
508 secret_name: String,
509 database_id: DatabaseId,
510 schema_id: SchemaId,
511 owner_id: UserId,
512 payload: Vec<u8>,
513 ) -> Result<()> {
514 let version = self
515 .meta_client
516 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
517 .await?;
518 self.wait_version(version).await
519 }
520
521 async fn comment_on(&self, comment: PbComment) -> Result<()> {
522 let version = self.meta_client.comment_on(comment).await?;
523 self.wait_version(version).await
524 }
525
526 async fn drop_table(
527 &self,
528 source_id: Option<SourceId>,
529 table_id: TableId,
530 cascade: bool,
531 ) -> Result<()> {
532 let version = self
533 .meta_client
534 .drop_table(source_id, table_id, cascade)
535 .await?;
536 self.wait_version(version).await
537 }
538
539 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
540 let version = self
541 .meta_client
542 .drop_materialized_view(table_id, cascade)
543 .await?;
544 self.wait_version(version).await
545 }
546
547 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
548 let version = self.meta_client.drop_view(view_id, cascade).await?;
549 self.wait_version(version).await
550 }
551
552 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
553 let version = self.meta_client.drop_source(source_id, cascade).await?;
554 self.wait_version(version).await
555 }
556
557 async fn reset_source(&self, source_id: SourceId) -> Result<()> {
558 let version = self.meta_client.reset_source(source_id).await?;
559 self.wait_version(version).await
560 }
561
562 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
563 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
564 self.wait_version(version).await
565 }
566
567 async fn drop_subscription(
568 &self,
569 subscription_id: SubscriptionId,
570 cascade: bool,
571 ) -> Result<()> {
572 let version = self
573 .meta_client
574 .drop_subscription(subscription_id, cascade)
575 .await?;
576 self.wait_version(version).await
577 }
578
579 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
580 let version = self.meta_client.drop_index(index_id, cascade).await?;
581 self.wait_version(version).await
582 }
583
584 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
585 let version = self.meta_client.drop_function(function_id, cascade).await?;
586 self.wait_version(version).await
587 }
588
589 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
590 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
591 self.wait_version(version).await
592 }
593
594 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
595 let version = self.meta_client.drop_database(database_id).await?;
596 self.wait_version(version).await
597 }
598
599 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
600 let version = self
601 .meta_client
602 .drop_connection(connection_id, cascade)
603 .await?;
604 self.wait_version(version).await
605 }
606
607 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
608 let version = self.meta_client.drop_secret(secret_id, cascade).await?;
609 self.wait_version(version).await
610 }
611
612 async fn alter_name(
613 &self,
614 object_id: alter_name_request::Object,
615 object_name: &str,
616 ) -> Result<()> {
617 let version = self.meta_client.alter_name(object_id, object_name).await?;
618 self.wait_version(version).await
619 }
620
621 async fn alter_owner(
622 &self,
623 object: alter_owner_request::Object,
624 owner_id: UserId,
625 ) -> Result<()> {
626 let version = self.meta_client.alter_owner(object, owner_id).await?;
627 self.wait_version(version).await
628 }
629
630 async fn alter_set_schema(
631 &self,
632 object: alter_set_schema_request::Object,
633 new_schema_id: SchemaId,
634 ) -> Result<()> {
635 let version = self
636 .meta_client
637 .alter_set_schema(object, new_schema_id)
638 .await?;
639 self.wait_version(version).await
640 }
641
642 async fn alter_source(&self, source: PbSource) -> Result<()> {
643 let version = self.meta_client.alter_source(source).await?;
644 self.wait_version(version).await
645 }
646
647 async fn alter_parallelism(
648 &self,
649 job_id: JobId,
650 parallelism: PbTableParallelism,
651 deferred: bool,
652 ) -> Result<()> {
653 self.meta_client
654 .alter_parallelism(job_id, parallelism, deferred)
655 .await?;
656 Ok(())
657 }
658
659 async fn alter_backfill_parallelism(
660 &self,
661 job_id: JobId,
662 parallelism: Option<PbTableParallelism>,
663 deferred: bool,
664 ) -> Result<()> {
665 self.meta_client
666 .alter_backfill_parallelism(job_id, parallelism, deferred)
667 .await?;
668 Ok(())
669 }
670
671 async fn alter_config(
672 &self,
673 job_id: JobId,
674 entries_to_add: HashMap<String, String>,
675 keys_to_remove: Vec<String>,
676 ) -> Result<()> {
677 self.meta_client
678 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
679 .await?;
680 Ok(())
681 }
682
683 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
684 let version = self.meta_client.alter_swap_rename(object).await?;
685 self.wait_version(version).await
686 }
687
688 async fn alter_secret(
689 &self,
690 secret_id: SecretId,
691 secret_name: String,
692 database_id: DatabaseId,
693 schema_id: SchemaId,
694 owner_id: UserId,
695 payload: Vec<u8>,
696 ) -> Result<()> {
697 let version = self
698 .meta_client
699 .alter_secret(
700 secret_id,
701 secret_name,
702 database_id,
703 schema_id,
704 owner_id,
705 payload,
706 )
707 .await?;
708 self.wait_version(version).await
709 }
710
711 async fn alter_subscription_retention(
712 &self,
713 subscription_id: SubscriptionId,
714 retention_seconds: u64,
715 definition: String,
716 ) -> Result<()> {
717 let version = self
718 .meta_client
719 .alter_subscription_retention(subscription_id, retention_seconds, definition)
720 .await?;
721 self.wait_version(version).await
722 }
723
724 async fn alter_resource_group(
725 &self,
726 table_id: TableId,
727 resource_group: Option<String>,
728 deferred: bool,
729 ) -> Result<()> {
730 self.meta_client
731 .alter_resource_group(table_id, resource_group, deferred)
732 .await
733 .map_err(|e| anyhow!(e))?;
734
735 Ok(())
736 }
737
738 async fn alter_database_param(
739 &self,
740 database_id: DatabaseId,
741 param: AlterDatabaseParam,
742 ) -> Result<()> {
743 let version = self
744 .meta_client
745 .alter_database_param(database_id, param)
746 .await
747 .map_err(|e| anyhow!(e))?;
748 self.wait_version(version).await
749 }
750
751 async fn create_iceberg_table(
752 &self,
753 table_job_info: PbTableJobInfo,
754 sink_job_info: PbSinkJobInfo,
755 iceberg_source: PbSource,
756 if_not_exists: bool,
757 ) -> Result<()> {
758 let version = self
759 .meta_client
760 .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
761 .await?;
762 self.wait_version(version).await
763 }
764
765 async fn wait(&self) -> Result<()> {
766 let version = self.meta_client.wait().await.map_err(|e| anyhow!(e))?;
767 self.wait_version(version).await
768 }
769}
770
771impl CatalogWriterImpl {
772 pub fn new(
773 meta_client: MetaClient,
774 catalog_updated_rx: Receiver<CatalogVersion>,
775 hummock_snapshot_manager: HummockSnapshotManagerRef,
776 ) -> Self {
777 Self {
778 meta_client,
779 catalog_updated_rx,
780 hummock_snapshot_manager,
781 }
782 }
783
784 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
785 let mut rx = self.catalog_updated_rx.clone();
786 while *rx.borrow_and_update() < version.catalog_version {
787 rx.changed().await.map_err(|e| anyhow!(e))?;
788 }
789 self.hummock_snapshot_manager
790 .wait(version.hummock_version_id)
791 .await;
792 Ok(())
793 }
794}