1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_hummock_sdk::HummockVersionId;
26use risingwave_pb::catalog::{
27 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28 PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58 CatalogReader(inner)
59 }
60
61 pub fn read_guard(&self) -> CatalogReadGuard {
62 self.0.read_arc_recursive()
64 }
65}
66
67#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73 async fn create_database(
74 &self,
75 db_name: &str,
76 owner: UserId,
77 resource_group: &str,
78 barrier_interval_ms: Option<u32>,
79 checkpoint_frequency: Option<u64>,
80 ) -> Result<()>;
81
82 async fn create_schema(
83 &self,
84 db_id: DatabaseId,
85 schema_name: &str,
86 owner: UserId,
87 ) -> Result<()>;
88
89 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91 async fn create_materialized_view(
92 &self,
93 table: PbTable,
94 graph: StreamFragmentGraph,
95 dependencies: HashSet<ObjectId>,
96 specific_resource_group: Option<String>,
97 if_not_exists: bool,
98 ) -> Result<()>;
99
100 async fn replace_materialized_view(
101 &self,
102 table: PbTable,
103 graph: StreamFragmentGraph,
104 ) -> Result<()>;
105
106 async fn create_table(
107 &self,
108 source: Option<PbSource>,
109 table: PbTable,
110 graph: StreamFragmentGraph,
111 job_type: PbTableJobType,
112 if_not_exists: bool,
113 dependencies: HashSet<ObjectId>,
114 ) -> Result<()>;
115
116 async fn replace_table(
117 &self,
118 source: Option<PbSource>,
119 table: PbTable,
120 graph: StreamFragmentGraph,
121 job_type: TableJobType,
122 ) -> Result<()>;
123
124 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126 async fn create_index(
127 &self,
128 index: PbIndex,
129 table: PbTable,
130 graph: StreamFragmentGraph,
131 if_not_exists: bool,
132 ) -> Result<()>;
133
134 async fn create_source(
135 &self,
136 source: PbSource,
137 graph: Option<StreamFragmentGraph>,
138 if_not_exists: bool,
139 ) -> Result<()>;
140
141 async fn create_sink(
142 &self,
143 sink: PbSink,
144 graph: StreamFragmentGraph,
145 dependencies: HashSet<ObjectId>,
146 if_not_exists: bool,
147 ) -> Result<()>;
148
149 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151 async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153 async fn create_connection(
154 &self,
155 connection_name: String,
156 database_id: DatabaseId,
157 schema_id: SchemaId,
158 owner_id: u32,
159 connection: create_connection_request::Payload,
160 ) -> Result<()>;
161
162 async fn create_secret(
163 &self,
164 secret_name: String,
165 database_id: DatabaseId,
166 schema_id: SchemaId,
167 owner_id: u32,
168 payload: Vec<u8>,
169 ) -> Result<()>;
170
171 async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173 async fn drop_table(
174 &self,
175 source_id: Option<u32>,
176 table_id: TableId,
177 cascade: bool,
178 ) -> Result<()>;
179
180 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
187
188 async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
189 -> Result<()>;
190
191 async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
192
193 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
194
195 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
196
197 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
198
199 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
200
201 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
202
203 async fn alter_secret(
204 &self,
205 secret_id: SecretId,
206 secret_name: String,
207 database_id: DatabaseId,
208 schema_id: SchemaId,
209 owner_id: u32,
210 payload: Vec<u8>,
211 ) -> Result<()>;
212
213 async fn alter_name(
214 &self,
215 object_id: alter_name_request::Object,
216 object_name: &str,
217 ) -> Result<()>;
218
219 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
220
221 async fn alter_source(&self, source: PbSource) -> Result<()>;
223
224 async fn alter_parallelism(
225 &self,
226 job_id: JobId,
227 parallelism: PbTableParallelism,
228 deferred: bool,
229 ) -> Result<()>;
230
231 async fn alter_resource_group(
232 &self,
233 table_id: TableId,
234 resource_group: Option<String>,
235 deferred: bool,
236 ) -> Result<()>;
237
238 async fn alter_set_schema(
239 &self,
240 object: alter_set_schema_request::Object,
241 new_schema_id: SchemaId,
242 ) -> Result<()>;
243
244 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
245
246 async fn alter_database_param(
247 &self,
248 database_id: DatabaseId,
249 param: AlterDatabaseParam,
250 ) -> Result<()>;
251
252 async fn create_iceberg_table(
253 &self,
254 table_job_info: PbTableJobInfo,
255 sink_job_info: PbSinkJobInfo,
256 iceberg_source: PbSource,
257 if_not_exists: bool,
258 ) -> Result<()>;
259}
260
261#[derive(Clone)]
262pub struct CatalogWriterImpl {
263 meta_client: MetaClient,
264 catalog_updated_rx: Receiver<CatalogVersion>,
265 hummock_snapshot_manager: HummockSnapshotManagerRef,
266}
267
268#[async_trait::async_trait]
269impl CatalogWriter for CatalogWriterImpl {
270 async fn create_database(
271 &self,
272 db_name: &str,
273 owner: UserId,
274 resource_group: &str,
275 barrier_interval_ms: Option<u32>,
276 checkpoint_frequency: Option<u64>,
277 ) -> Result<()> {
278 let version = self
279 .meta_client
280 .create_database(PbDatabase {
281 name: db_name.to_owned(),
282 id: 0.into(),
283 owner,
284 resource_group: resource_group.to_owned(),
285 barrier_interval_ms,
286 checkpoint_frequency,
287 })
288 .await?;
289 self.wait_version(version).await
290 }
291
292 async fn create_schema(
293 &self,
294 db_id: DatabaseId,
295 schema_name: &str,
296 owner: UserId,
297 ) -> Result<()> {
298 let version = self
299 .meta_client
300 .create_schema(PbSchema {
301 id: 0.into(),
302 name: schema_name.to_owned(),
303 database_id: db_id,
304 owner,
305 })
306 .await?;
307 self.wait_version(version).await
308 }
309
310 async fn create_materialized_view(
312 &self,
313 table: PbTable,
314 graph: StreamFragmentGraph,
315 dependencies: HashSet<ObjectId>,
316 specific_resource_group: Option<String>,
317 if_not_exists: bool,
318 ) -> Result<()> {
319 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
320 let version = self
321 .meta_client
322 .create_materialized_view(
323 table,
324 graph,
325 dependencies,
326 specific_resource_group,
327 if_not_exists,
328 )
329 .await?;
330 if matches!(create_type, PbCreateType::Foreground) {
331 self.wait_version(version).await?
332 }
333 Ok(())
334 }
335
336 async fn replace_materialized_view(
337 &self,
338 table: PbTable,
339 graph: StreamFragmentGraph,
340 ) -> Result<()> {
341 notice_to_user(format!("table: {table:#?}"));
343 notice_to_user(format!("graph: {graph:#?}"));
344
345 let version = self
346 .meta_client
347 .replace_job(
348 graph,
349 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
350 )
351 .await?;
352
353 self.wait_version(version).await
354 }
355
356 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
357 let version = self.meta_client.create_view(view, dependencies).await?;
358 self.wait_version(version).await
359 }
360
361 async fn create_index(
362 &self,
363 index: PbIndex,
364 table: PbTable,
365 graph: StreamFragmentGraph,
366 if_not_exists: bool,
367 ) -> Result<()> {
368 let version = self
369 .meta_client
370 .create_index(index, table, graph, if_not_exists)
371 .await?;
372 self.wait_version(version).await
373 }
374
375 async fn create_table(
376 &self,
377 source: Option<PbSource>,
378 table: PbTable,
379 graph: StreamFragmentGraph,
380 job_type: PbTableJobType,
381 if_not_exists: bool,
382 dependencies: HashSet<ObjectId>,
383 ) -> Result<()> {
384 let version = self
385 .meta_client
386 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
387 .await?;
388 self.wait_version(version).await
389 }
390
391 async fn replace_table(
392 &self,
393 source: Option<PbSource>,
394 table: PbTable,
395 graph: StreamFragmentGraph,
396 job_type: TableJobType,
397 ) -> Result<()> {
398 let version = self
399 .meta_client
400 .replace_job(
401 graph,
402 ReplaceJob::ReplaceTable(ReplaceTable {
403 source,
404 table: Some(table),
405 job_type: job_type as _,
406 }),
407 )
408 .await?;
409 self.wait_version(version).await
410 }
411
412 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
413 let version = self
414 .meta_client
415 .replace_job(
416 graph,
417 ReplaceJob::ReplaceSource(ReplaceSource {
418 source: Some(source),
419 }),
420 )
421 .await?;
422 self.wait_version(version).await
423 }
424
425 async fn create_source(
426 &self,
427 source: PbSource,
428 graph: Option<StreamFragmentGraph>,
429 if_not_exists: bool,
430 ) -> Result<()> {
431 let version = self
432 .meta_client
433 .create_source(source, graph, if_not_exists)
434 .await?;
435 self.wait_version(version).await
436 }
437
438 async fn create_sink(
439 &self,
440 sink: PbSink,
441 graph: StreamFragmentGraph,
442 dependencies: HashSet<ObjectId>,
443 if_not_exists: bool,
444 ) -> Result<()> {
445 let version = self
446 .meta_client
447 .create_sink(sink, graph, dependencies, if_not_exists)
448 .await?;
449 self.wait_version(version).await
450 }
451
452 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
453 let version = self.meta_client.create_subscription(subscription).await?;
454 self.wait_version(version).await
455 }
456
457 async fn create_function(&self, function: PbFunction) -> Result<()> {
458 let version = self.meta_client.create_function(function).await?;
459 self.wait_version(version).await
460 }
461
462 async fn create_connection(
463 &self,
464 connection_name: String,
465 database_id: DatabaseId,
466 schema_id: SchemaId,
467 owner_id: u32,
468 connection: create_connection_request::Payload,
469 ) -> Result<()> {
470 let version = self
471 .meta_client
472 .create_connection(
473 connection_name,
474 database_id,
475 schema_id,
476 owner_id,
477 connection,
478 )
479 .await?;
480 self.wait_version(version).await
481 }
482
483 async fn create_secret(
484 &self,
485 secret_name: String,
486 database_id: DatabaseId,
487 schema_id: SchemaId,
488 owner_id: u32,
489 payload: Vec<u8>,
490 ) -> Result<()> {
491 let version = self
492 .meta_client
493 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
494 .await?;
495 self.wait_version(version).await
496 }
497
498 async fn comment_on(&self, comment: PbComment) -> Result<()> {
499 let version = self.meta_client.comment_on(comment).await?;
500 self.wait_version(version).await
501 }
502
503 async fn drop_table(
504 &self,
505 source_id: Option<u32>,
506 table_id: TableId,
507 cascade: bool,
508 ) -> Result<()> {
509 let version = self
510 .meta_client
511 .drop_table(source_id, table_id, cascade)
512 .await?;
513 self.wait_version(version).await
514 }
515
516 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
517 let version = self
518 .meta_client
519 .drop_materialized_view(table_id, cascade)
520 .await?;
521 self.wait_version(version).await
522 }
523
524 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
525 let version = self.meta_client.drop_view(view_id, cascade).await?;
526 self.wait_version(version).await
527 }
528
529 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
530 let version = self.meta_client.drop_source(source_id, cascade).await?;
531 self.wait_version(version).await
532 }
533
534 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
535 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
536 self.wait_version(version).await
537 }
538
539 async fn drop_subscription(
540 &self,
541 subscription_id: SubscriptionId,
542 cascade: bool,
543 ) -> Result<()> {
544 let version = self
545 .meta_client
546 .drop_subscription(subscription_id, cascade)
547 .await?;
548 self.wait_version(version).await
549 }
550
551 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
552 let version = self.meta_client.drop_index(index_id, cascade).await?;
553 self.wait_version(version).await
554 }
555
556 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
557 let version = self.meta_client.drop_function(function_id, cascade).await?;
558 self.wait_version(version).await
559 }
560
561 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
562 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
563 self.wait_version(version).await
564 }
565
566 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
567 let version = self.meta_client.drop_database(database_id).await?;
568 self.wait_version(version).await
569 }
570
571 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
572 let version = self
573 .meta_client
574 .drop_connection(connection_id, cascade)
575 .await?;
576 self.wait_version(version).await
577 }
578
579 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
580 let version = self.meta_client.drop_secret(secret_id).await?;
581 self.wait_version(version).await
582 }
583
584 async fn alter_name(
585 &self,
586 object_id: alter_name_request::Object,
587 object_name: &str,
588 ) -> Result<()> {
589 let version = self.meta_client.alter_name(object_id, object_name).await?;
590 self.wait_version(version).await
591 }
592
593 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
594 let version = self.meta_client.alter_owner(object, owner_id).await?;
595 self.wait_version(version).await
596 }
597
598 async fn alter_set_schema(
599 &self,
600 object: alter_set_schema_request::Object,
601 new_schema_id: SchemaId,
602 ) -> Result<()> {
603 let version = self
604 .meta_client
605 .alter_set_schema(object, new_schema_id)
606 .await?;
607 self.wait_version(version).await
608 }
609
610 async fn alter_source(&self, source: PbSource) -> Result<()> {
611 let version = self.meta_client.alter_source(source).await?;
612 self.wait_version(version).await
613 }
614
615 async fn alter_parallelism(
616 &self,
617 job_id: JobId,
618 parallelism: PbTableParallelism,
619 deferred: bool,
620 ) -> Result<()> {
621 self.meta_client
622 .alter_parallelism(job_id, parallelism, deferred)
623 .await
624 .map_err(|e| anyhow!(e))?;
625
626 Ok(())
627 }
628
629 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
630 let version = self.meta_client.alter_swap_rename(object).await?;
631 self.wait_version(version).await
632 }
633
634 async fn alter_secret(
635 &self,
636 secret_id: SecretId,
637 secret_name: String,
638 database_id: DatabaseId,
639 schema_id: SchemaId,
640 owner_id: u32,
641 payload: Vec<u8>,
642 ) -> Result<()> {
643 let version = self
644 .meta_client
645 .alter_secret(
646 secret_id,
647 secret_name,
648 database_id,
649 schema_id,
650 owner_id,
651 payload,
652 )
653 .await?;
654 self.wait_version(version).await
655 }
656
657 async fn alter_resource_group(
658 &self,
659 table_id: TableId,
660 resource_group: Option<String>,
661 deferred: bool,
662 ) -> Result<()> {
663 self.meta_client
664 .alter_resource_group(table_id, resource_group, deferred)
665 .await
666 .map_err(|e| anyhow!(e))?;
667
668 Ok(())
669 }
670
671 async fn alter_database_param(
672 &self,
673 database_id: DatabaseId,
674 param: AlterDatabaseParam,
675 ) -> Result<()> {
676 let version = self
677 .meta_client
678 .alter_database_param(database_id, param)
679 .await
680 .map_err(|e| anyhow!(e))?;
681 self.wait_version(version).await
682 }
683
684 async fn create_iceberg_table(
685 &self,
686 table_job_info: PbTableJobInfo,
687 sink_job_info: PbSinkJobInfo,
688 iceberg_source: PbSource,
689 if_not_exists: bool,
690 ) -> Result<()> {
691 let version = self
692 .meta_client
693 .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
694 .await?;
695 self.wait_version(version).await
696 }
697}
698
699impl CatalogWriterImpl {
700 pub fn new(
701 meta_client: MetaClient,
702 catalog_updated_rx: Receiver<CatalogVersion>,
703 hummock_snapshot_manager: HummockSnapshotManagerRef,
704 ) -> Self {
705 Self {
706 meta_client,
707 catalog_updated_rx,
708 hummock_snapshot_manager,
709 }
710 }
711
712 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
713 let mut rx = self.catalog_updated_rx.clone();
714 while *rx.borrow_and_update() < version.catalog_version {
715 rx.changed().await.map_err(|e| anyhow!(e))?;
716 }
717 self.hummock_snapshot_manager
718 .wait(HummockVersionId::new(version.hummock_version_id))
719 .await;
720 Ok(())
721 }
722}