1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36};
37use risingwave_pb::meta::PbTableParallelism;
38use risingwave_pb::stream_plan::StreamFragmentGraph;
39use risingwave_rpc_client::MetaClient;
40use tokio::sync::watch::Receiver;
41
42use super::root_catalog::Catalog;
43use super::{DatabaseId, SecretId, TableId};
44use crate::error::Result;
45use crate::scheduler::HummockSnapshotManagerRef;
46use crate::session::current::notice_to_user;
47use crate::user::UserId;
48
49pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
50
51#[derive(Clone)]
53pub struct CatalogReader(Arc<RwLock<Catalog>>);
54
55impl CatalogReader {
56 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
57 CatalogReader(inner)
58 }
59
60 pub fn read_guard(&self) -> CatalogReadGuard {
61 self.0.read_arc_recursive()
63 }
64}
65
66#[async_trait::async_trait]
71pub trait CatalogWriter: Send + Sync {
72 async fn create_database(
73 &self,
74 db_name: &str,
75 owner: UserId,
76 resource_group: &str,
77 barrier_interval_ms: Option<u32>,
78 checkpoint_frequency: Option<u64>,
79 ) -> Result<()>;
80
81 async fn create_schema(
82 &self,
83 db_id: DatabaseId,
84 schema_name: &str,
85 owner: UserId,
86 ) -> Result<()>;
87
88 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
89
90 async fn create_materialized_view(
91 &self,
92 table: PbTable,
93 graph: StreamFragmentGraph,
94 dependencies: HashSet<ObjectId>,
95 specific_resource_group: Option<String>,
96 if_not_exists: bool,
97 ) -> Result<()>;
98
99 async fn replace_materialized_view(
100 &self,
101 table: PbTable,
102 graph: StreamFragmentGraph,
103 ) -> Result<()>;
104
105 async fn create_table(
106 &self,
107 source: Option<PbSource>,
108 table: PbTable,
109 graph: StreamFragmentGraph,
110 job_type: PbTableJobType,
111 if_not_exists: bool,
112 dependencies: HashSet<ObjectId>,
113 ) -> Result<()>;
114
115 async fn replace_table(
116 &self,
117 source: Option<PbSource>,
118 table: PbTable,
119 graph: StreamFragmentGraph,
120 job_type: TableJobType,
121 ) -> Result<()>;
122
123 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
124
125 async fn create_index(
126 &self,
127 index: PbIndex,
128 table: PbTable,
129 graph: StreamFragmentGraph,
130 if_not_exists: bool,
131 ) -> Result<()>;
132
133 async fn create_source(
134 &self,
135 source: PbSource,
136 graph: Option<StreamFragmentGraph>,
137 if_not_exists: bool,
138 ) -> Result<()>;
139
140 async fn create_sink(
141 &self,
142 sink: PbSink,
143 graph: StreamFragmentGraph,
144 dependencies: HashSet<ObjectId>,
145 if_not_exists: bool,
146 ) -> Result<()>;
147
148 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
149
150 async fn create_function(&self, function: PbFunction) -> Result<()>;
151
152 async fn create_connection(
153 &self,
154 connection_name: String,
155 database_id: u32,
156 schema_id: u32,
157 owner_id: u32,
158 connection: create_connection_request::Payload,
159 ) -> Result<()>;
160
161 async fn create_secret(
162 &self,
163 secret_name: String,
164 database_id: u32,
165 schema_id: u32,
166 owner_id: u32,
167 payload: Vec<u8>,
168 ) -> Result<()>;
169
170 async fn comment_on(&self, comment: PbComment) -> Result<()>;
171
172 async fn drop_table(
173 &self,
174 source_id: Option<u32>,
175 table_id: TableId,
176 cascade: bool,
177 ) -> Result<()>;
178
179 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
180
181 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
182
183 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
184
185 async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()>;
186
187 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
188
189 async fn drop_database(&self, database_id: u32) -> Result<()>;
190
191 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
192
193 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
194
195 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
196
197 async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()>;
198
199 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
200
201 async fn alter_secret(
202 &self,
203 secret_id: u32,
204 secret_name: String,
205 database_id: u32,
206 schema_id: u32,
207 owner_id: u32,
208 payload: Vec<u8>,
209 ) -> Result<()>;
210
211 async fn alter_name(
212 &self,
213 object_id: alter_name_request::Object,
214 object_name: &str,
215 ) -> Result<()>;
216
217 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
218
219 async fn alter_source(&self, source: PbSource) -> Result<()>;
221
222 async fn alter_parallelism(
223 &self,
224 job_id: u32,
225 parallelism: PbTableParallelism,
226 deferred: bool,
227 ) -> Result<()>;
228
229 async fn alter_resource_group(
230 &self,
231 table_id: u32,
232 resource_group: Option<String>,
233 deferred: bool,
234 ) -> Result<()>;
235
236 async fn alter_set_schema(
237 &self,
238 object: alter_set_schema_request::Object,
239 new_schema_id: u32,
240 ) -> Result<()>;
241
242 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
243
244 async fn alter_database_param(
245 &self,
246 database_id: DatabaseId,
247 param: AlterDatabaseParam,
248 ) -> Result<()>;
249
250 async fn create_iceberg_table(
251 &self,
252 table_job_info: PbTableJobInfo,
253 sink_job_info: PbSinkJobInfo,
254 iceberg_source: PbSource,
255 if_not_exists: bool,
256 ) -> Result<()>;
257}
258
259#[derive(Clone)]
260pub struct CatalogWriterImpl {
261 meta_client: MetaClient,
262 catalog_updated_rx: Receiver<CatalogVersion>,
263 hummock_snapshot_manager: HummockSnapshotManagerRef,
264}
265
266#[async_trait::async_trait]
267impl CatalogWriter for CatalogWriterImpl {
268 async fn create_database(
269 &self,
270 db_name: &str,
271 owner: UserId,
272 resource_group: &str,
273 barrier_interval_ms: Option<u32>,
274 checkpoint_frequency: Option<u64>,
275 ) -> Result<()> {
276 let version = self
277 .meta_client
278 .create_database(PbDatabase {
279 name: db_name.to_owned(),
280 id: 0,
281 owner,
282 resource_group: resource_group.to_owned(),
283 barrier_interval_ms,
284 checkpoint_frequency,
285 })
286 .await?;
287 self.wait_version(version).await
288 }
289
290 async fn create_schema(
291 &self,
292 db_id: DatabaseId,
293 schema_name: &str,
294 owner: UserId,
295 ) -> Result<()> {
296 let version = self
297 .meta_client
298 .create_schema(PbSchema {
299 id: 0,
300 name: schema_name.to_owned(),
301 database_id: db_id,
302 owner,
303 })
304 .await?;
305 self.wait_version(version).await
306 }
307
308 async fn create_materialized_view(
310 &self,
311 table: PbTable,
312 graph: StreamFragmentGraph,
313 dependencies: HashSet<ObjectId>,
314 specific_resource_group: Option<String>,
315 if_not_exists: bool,
316 ) -> Result<()> {
317 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
318 let version = self
319 .meta_client
320 .create_materialized_view(
321 table,
322 graph,
323 dependencies,
324 specific_resource_group,
325 if_not_exists,
326 )
327 .await?;
328 if matches!(create_type, PbCreateType::Foreground) {
329 self.wait_version(version).await?
330 }
331 Ok(())
332 }
333
334 async fn replace_materialized_view(
335 &self,
336 table: PbTable,
337 graph: StreamFragmentGraph,
338 ) -> Result<()> {
339 notice_to_user(format!("table: {table:#?}"));
341 notice_to_user(format!("graph: {graph:#?}"));
342
343 let version = self
344 .meta_client
345 .replace_job(
346 graph,
347 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
348 )
349 .await?;
350
351 self.wait_version(version).await
352 }
353
354 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
355 let version = self.meta_client.create_view(view, dependencies).await?;
356 self.wait_version(version).await
357 }
358
359 async fn create_index(
360 &self,
361 index: PbIndex,
362 table: PbTable,
363 graph: StreamFragmentGraph,
364 if_not_exists: bool,
365 ) -> Result<()> {
366 let version = self
367 .meta_client
368 .create_index(index, table, graph, if_not_exists)
369 .await?;
370 self.wait_version(version).await
371 }
372
373 async fn create_table(
374 &self,
375 source: Option<PbSource>,
376 table: PbTable,
377 graph: StreamFragmentGraph,
378 job_type: PbTableJobType,
379 if_not_exists: bool,
380 dependencies: HashSet<ObjectId>,
381 ) -> Result<()> {
382 let version = self
383 .meta_client
384 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
385 .await?;
386 self.wait_version(version).await
387 }
388
389 async fn replace_table(
390 &self,
391 source: Option<PbSource>,
392 table: PbTable,
393 graph: StreamFragmentGraph,
394 job_type: TableJobType,
395 ) -> Result<()> {
396 let version = self
397 .meta_client
398 .replace_job(
399 graph,
400 ReplaceJob::ReplaceTable(ReplaceTable {
401 source,
402 table: Some(table),
403 job_type: job_type as _,
404 }),
405 )
406 .await?;
407 self.wait_version(version).await
408 }
409
410 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
411 let version = self
412 .meta_client
413 .replace_job(
414 graph,
415 ReplaceJob::ReplaceSource(ReplaceSource {
416 source: Some(source),
417 }),
418 )
419 .await?;
420 self.wait_version(version).await
421 }
422
423 async fn create_source(
424 &self,
425 source: PbSource,
426 graph: Option<StreamFragmentGraph>,
427 if_not_exists: bool,
428 ) -> Result<()> {
429 let version = self
430 .meta_client
431 .create_source(source, graph, if_not_exists)
432 .await?;
433 self.wait_version(version).await
434 }
435
436 async fn create_sink(
437 &self,
438 sink: PbSink,
439 graph: StreamFragmentGraph,
440 dependencies: HashSet<ObjectId>,
441 if_not_exists: bool,
442 ) -> Result<()> {
443 let version = self
444 .meta_client
445 .create_sink(sink, graph, dependencies, if_not_exists)
446 .await?;
447 self.wait_version(version).await
448 }
449
450 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
451 let version = self.meta_client.create_subscription(subscription).await?;
452 self.wait_version(version).await
453 }
454
455 async fn create_function(&self, function: PbFunction) -> Result<()> {
456 let version = self.meta_client.create_function(function).await?;
457 self.wait_version(version).await
458 }
459
460 async fn create_connection(
461 &self,
462 connection_name: String,
463 database_id: u32,
464 schema_id: u32,
465 owner_id: u32,
466 connection: create_connection_request::Payload,
467 ) -> Result<()> {
468 let version = self
469 .meta_client
470 .create_connection(
471 connection_name,
472 database_id,
473 schema_id,
474 owner_id,
475 connection,
476 )
477 .await?;
478 self.wait_version(version).await
479 }
480
481 async fn create_secret(
482 &self,
483 secret_name: String,
484 database_id: u32,
485 schema_id: u32,
486 owner_id: u32,
487 payload: Vec<u8>,
488 ) -> Result<()> {
489 let version = self
490 .meta_client
491 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
492 .await?;
493 self.wait_version(version).await
494 }
495
496 async fn comment_on(&self, comment: PbComment) -> Result<()> {
497 let version = self.meta_client.comment_on(comment).await?;
498 self.wait_version(version).await
499 }
500
501 async fn drop_table(
502 &self,
503 source_id: Option<u32>,
504 table_id: TableId,
505 cascade: bool,
506 ) -> Result<()> {
507 let version = self
508 .meta_client
509 .drop_table(source_id, table_id, cascade)
510 .await?;
511 self.wait_version(version).await
512 }
513
514 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
515 let version = self
516 .meta_client
517 .drop_materialized_view(table_id, cascade)
518 .await?;
519 self.wait_version(version).await
520 }
521
522 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
523 let version = self.meta_client.drop_view(view_id, cascade).await?;
524 self.wait_version(version).await
525 }
526
527 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
528 let version = self.meta_client.drop_source(source_id, cascade).await?;
529 self.wait_version(version).await
530 }
531
532 async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
533 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
534 self.wait_version(version).await
535 }
536
537 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
538 let version = self
539 .meta_client
540 .drop_subscription(subscription_id, cascade)
541 .await?;
542 self.wait_version(version).await
543 }
544
545 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
546 let version = self.meta_client.drop_index(index_id, cascade).await?;
547 self.wait_version(version).await
548 }
549
550 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
551 let version = self.meta_client.drop_function(function_id, cascade).await?;
552 self.wait_version(version).await
553 }
554
555 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
556 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
557 self.wait_version(version).await
558 }
559
560 async fn drop_database(&self, database_id: u32) -> Result<()> {
561 let version = self.meta_client.drop_database(database_id).await?;
562 self.wait_version(version).await
563 }
564
565 async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()> {
566 let version = self
567 .meta_client
568 .drop_connection(connection_id, cascade)
569 .await?;
570 self.wait_version(version).await
571 }
572
573 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
574 let version = self.meta_client.drop_secret(secret_id).await?;
575 self.wait_version(version).await
576 }
577
578 async fn alter_name(
579 &self,
580 object_id: alter_name_request::Object,
581 object_name: &str,
582 ) -> Result<()> {
583 let version = self.meta_client.alter_name(object_id, object_name).await?;
584 self.wait_version(version).await
585 }
586
587 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
588 let version = self.meta_client.alter_owner(object, owner_id).await?;
589 self.wait_version(version).await
590 }
591
592 async fn alter_set_schema(
593 &self,
594 object: alter_set_schema_request::Object,
595 new_schema_id: u32,
596 ) -> Result<()> {
597 let version = self
598 .meta_client
599 .alter_set_schema(object, new_schema_id)
600 .await?;
601 self.wait_version(version).await
602 }
603
604 async fn alter_source(&self, source: PbSource) -> Result<()> {
605 let version = self.meta_client.alter_source(source).await?;
606 self.wait_version(version).await
607 }
608
609 async fn alter_parallelism(
610 &self,
611 job_id: u32,
612 parallelism: PbTableParallelism,
613 deferred: bool,
614 ) -> Result<()> {
615 self.meta_client
616 .alter_parallelism(job_id, parallelism, deferred)
617 .await
618 .map_err(|e| anyhow!(e))?;
619
620 Ok(())
621 }
622
623 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
624 let version = self.meta_client.alter_swap_rename(object).await?;
625 self.wait_version(version).await
626 }
627
628 async fn alter_secret(
629 &self,
630 secret_id: u32,
631 secret_name: String,
632 database_id: u32,
633 schema_id: u32,
634 owner_id: u32,
635 payload: Vec<u8>,
636 ) -> Result<()> {
637 let version = self
638 .meta_client
639 .alter_secret(
640 secret_id,
641 secret_name,
642 database_id,
643 schema_id,
644 owner_id,
645 payload,
646 )
647 .await?;
648 self.wait_version(version).await
649 }
650
651 async fn alter_resource_group(
652 &self,
653 table_id: u32,
654 resource_group: Option<String>,
655 deferred: bool,
656 ) -> Result<()> {
657 self.meta_client
658 .alter_resource_group(table_id, resource_group, deferred)
659 .await
660 .map_err(|e| anyhow!(e))?;
661
662 Ok(())
663 }
664
665 async fn alter_database_param(
666 &self,
667 database_id: DatabaseId,
668 param: AlterDatabaseParam,
669 ) -> Result<()> {
670 let version = self
671 .meta_client
672 .alter_database_param(database_id, param)
673 .await
674 .map_err(|e| anyhow!(e))?;
675 self.wait_version(version).await
676 }
677
678 async fn create_iceberg_table(
679 &self,
680 table_job_info: PbTableJobInfo,
681 sink_job_info: PbSinkJobInfo,
682 iceberg_source: PbSource,
683 if_not_exists: bool,
684 ) -> Result<()> {
685 let version = self
686 .meta_client
687 .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
688 .await?;
689 self.wait_version(version).await
690 }
691}
692
693impl CatalogWriterImpl {
694 pub fn new(
695 meta_client: MetaClient,
696 catalog_updated_rx: Receiver<CatalogVersion>,
697 hummock_snapshot_manager: HummockSnapshotManagerRef,
698 ) -> Self {
699 Self {
700 meta_client,
701 catalog_updated_rx,
702 hummock_snapshot_manager,
703 }
704 }
705
706 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
707 let mut rx = self.catalog_updated_rx.clone();
708 while *rx.borrow_and_update() < version.catalog_version {
709 rx.changed().await.map_err(|e| anyhow!(e))?;
710 }
711 self.hummock_snapshot_manager
712 .wait(HummockVersionId::new(version.hummock_version_id))
713 .await;
714 Ok(())
715 }
716}