1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{
30 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
31};
32use risingwave_pb::ddl_service::{
33 PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
34 alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
35 create_connection_request,
36};
37use risingwave_pb::meta::PbTableParallelism;
38use risingwave_pb::stream_plan::StreamFragmentGraph;
39use risingwave_rpc_client::MetaClient;
40use tokio::sync::watch::Receiver;
41
42use super::root_catalog::Catalog;
43use super::{DatabaseId, SecretId, TableId};
44use crate::error::Result;
45use crate::scheduler::HummockSnapshotManagerRef;
46use crate::session::current::notice_to_user;
47use crate::user::UserId;
48
49pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
50
51#[derive(Clone)]
53pub struct CatalogReader(Arc<RwLock<Catalog>>);
54
55impl CatalogReader {
56 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
57 CatalogReader(inner)
58 }
59
60 pub fn read_guard(&self) -> CatalogReadGuard {
61 self.0.read_arc_recursive()
63 }
64}
65
66#[async_trait::async_trait]
71pub trait CatalogWriter: Send + Sync {
72 async fn create_database(
73 &self,
74 db_name: &str,
75 owner: UserId,
76 resource_group: &str,
77 barrier_interval_ms: Option<u32>,
78 checkpoint_frequency: Option<u64>,
79 ) -> Result<()>;
80
81 async fn create_schema(
82 &self,
83 db_id: DatabaseId,
84 schema_name: &str,
85 owner: UserId,
86 ) -> Result<()>;
87
88 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
89
90 async fn create_materialized_view(
91 &self,
92 table: PbTable,
93 graph: StreamFragmentGraph,
94 dependencies: HashSet<ObjectId>,
95 specific_resource_group: Option<String>,
96 if_not_exists: bool,
97 ) -> Result<()>;
98
99 async fn replace_materialized_view(
100 &self,
101 table: PbTable,
102 graph: StreamFragmentGraph,
103 ) -> Result<()>;
104
105 async fn create_table(
106 &self,
107 source: Option<PbSource>,
108 table: PbTable,
109 graph: StreamFragmentGraph,
110 job_type: PbTableJobType,
111 if_not_exists: bool,
112 dependencies: HashSet<ObjectId>,
113 ) -> Result<()>;
114
115 async fn replace_table(
116 &self,
117 source: Option<PbSource>,
118 table: PbTable,
119 graph: StreamFragmentGraph,
120 job_type: TableJobType,
121 ) -> Result<()>;
122
123 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
124
125 async fn create_index(
126 &self,
127 index: PbIndex,
128 table: PbTable,
129 graph: StreamFragmentGraph,
130 if_not_exists: bool,
131 ) -> Result<()>;
132
133 async fn create_source(
134 &self,
135 source: PbSource,
136 graph: Option<StreamFragmentGraph>,
137 if_not_exists: bool,
138 ) -> Result<()>;
139
140 async fn create_sink(
141 &self,
142 sink: PbSink,
143 graph: StreamFragmentGraph,
144 affected_table_change: Option<PbReplaceJobPlan>,
145 dependencies: HashSet<ObjectId>,
146 if_not_exists: bool,
147 ) -> Result<()>;
148
149 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151 async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153 async fn create_connection(
154 &self,
155 connection_name: String,
156 database_id: u32,
157 schema_id: u32,
158 owner_id: u32,
159 connection: create_connection_request::Payload,
160 ) -> Result<()>;
161
162 async fn create_secret(
163 &self,
164 secret_name: String,
165 database_id: u32,
166 schema_id: u32,
167 owner_id: u32,
168 payload: Vec<u8>,
169 ) -> Result<()>;
170
171 async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173 async fn drop_table(
174 &self,
175 source_id: Option<u32>,
176 table_id: TableId,
177 cascade: bool,
178 ) -> Result<()>;
179
180 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
183
184 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
185
186 async fn drop_sink(
187 &self,
188 sink_id: u32,
189 cascade: bool,
190 affected_table_change: Option<PbReplaceJobPlan>,
191 ) -> Result<()>;
192
193 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
194
195 async fn drop_database(&self, database_id: u32) -> Result<()>;
196
197 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
198
199 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
200
201 async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
202
203 async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()>;
204
205 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
206
207 async fn alter_secret(
208 &self,
209 secret_id: u32,
210 secret_name: String,
211 database_id: u32,
212 schema_id: u32,
213 owner_id: u32,
214 payload: Vec<u8>,
215 ) -> Result<()>;
216
217 async fn alter_name(
218 &self,
219 object_id: alter_name_request::Object,
220 object_name: &str,
221 ) -> Result<()>;
222
223 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
224
225 async fn alter_source(&self, source: PbSource) -> Result<()>;
227
228 async fn alter_parallelism(
229 &self,
230 job_id: u32,
231 parallelism: PbTableParallelism,
232 deferred: bool,
233 ) -> Result<()>;
234
235 async fn alter_resource_group(
236 &self,
237 table_id: u32,
238 resource_group: Option<String>,
239 deferred: bool,
240 ) -> Result<()>;
241
242 async fn alter_set_schema(
243 &self,
244 object: alter_set_schema_request::Object,
245 new_schema_id: u32,
246 ) -> Result<()>;
247
248 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
249
250 async fn alter_database_param(
251 &self,
252 database_id: DatabaseId,
253 param: AlterDatabaseParam,
254 ) -> Result<()>;
255}
256
257#[derive(Clone)]
258pub struct CatalogWriterImpl {
259 meta_client: MetaClient,
260 catalog_updated_rx: Receiver<CatalogVersion>,
261 hummock_snapshot_manager: HummockSnapshotManagerRef,
262}
263
264#[async_trait::async_trait]
265impl CatalogWriter for CatalogWriterImpl {
266 async fn create_database(
267 &self,
268 db_name: &str,
269 owner: UserId,
270 resource_group: &str,
271 barrier_interval_ms: Option<u32>,
272 checkpoint_frequency: Option<u64>,
273 ) -> Result<()> {
274 let version = self
275 .meta_client
276 .create_database(PbDatabase {
277 name: db_name.to_owned(),
278 id: 0,
279 owner,
280 resource_group: resource_group.to_owned(),
281 barrier_interval_ms,
282 checkpoint_frequency,
283 })
284 .await?;
285 self.wait_version(version).await
286 }
287
288 async fn create_schema(
289 &self,
290 db_id: DatabaseId,
291 schema_name: &str,
292 owner: UserId,
293 ) -> Result<()> {
294 let version = self
295 .meta_client
296 .create_schema(PbSchema {
297 id: 0,
298 name: schema_name.to_owned(),
299 database_id: db_id,
300 owner,
301 })
302 .await?;
303 self.wait_version(version).await
304 }
305
306 async fn create_materialized_view(
308 &self,
309 table: PbTable,
310 graph: StreamFragmentGraph,
311 dependencies: HashSet<ObjectId>,
312 specific_resource_group: Option<String>,
313 if_not_exists: bool,
314 ) -> Result<()> {
315 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
316 let version = self
317 .meta_client
318 .create_materialized_view(
319 table,
320 graph,
321 dependencies,
322 specific_resource_group,
323 if_not_exists,
324 )
325 .await?;
326 if matches!(create_type, PbCreateType::Foreground) {
327 self.wait_version(version).await?
328 }
329 Ok(())
330 }
331
332 async fn replace_materialized_view(
333 &self,
334 table: PbTable,
335 graph: StreamFragmentGraph,
336 ) -> Result<()> {
337 notice_to_user(format!("table: {table:#?}"));
339 notice_to_user(format!("graph: {graph:#?}"));
340
341 let version = self
342 .meta_client
343 .replace_job(
344 graph,
345 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
346 )
347 .await?;
348
349 self.wait_version(version).await
350 }
351
352 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
353 let version = self.meta_client.create_view(view, dependencies).await?;
354 self.wait_version(version).await
355 }
356
357 async fn create_index(
358 &self,
359 index: PbIndex,
360 table: PbTable,
361 graph: StreamFragmentGraph,
362 if_not_exists: bool,
363 ) -> Result<()> {
364 let version = self
365 .meta_client
366 .create_index(index, table, graph, if_not_exists)
367 .await?;
368 self.wait_version(version).await
369 }
370
371 async fn create_table(
372 &self,
373 source: Option<PbSource>,
374 table: PbTable,
375 graph: StreamFragmentGraph,
376 job_type: PbTableJobType,
377 if_not_exists: bool,
378 dependencies: HashSet<ObjectId>,
379 ) -> Result<()> {
380 let version = self
381 .meta_client
382 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
383 .await?;
384 self.wait_version(version).await
385 }
386
387 async fn replace_table(
388 &self,
389 source: Option<PbSource>,
390 table: PbTable,
391 graph: StreamFragmentGraph,
392 job_type: TableJobType,
393 ) -> Result<()> {
394 let version = self
395 .meta_client
396 .replace_job(
397 graph,
398 ReplaceJob::ReplaceTable(ReplaceTable {
399 source,
400 table: Some(table),
401 job_type: job_type as _,
402 }),
403 )
404 .await?;
405 self.wait_version(version).await
406 }
407
408 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
409 let version = self
410 .meta_client
411 .replace_job(
412 graph,
413 ReplaceJob::ReplaceSource(ReplaceSource {
414 source: Some(source),
415 }),
416 )
417 .await?;
418 self.wait_version(version).await
419 }
420
421 async fn create_source(
422 &self,
423 source: PbSource,
424 graph: Option<StreamFragmentGraph>,
425 if_not_exists: bool,
426 ) -> Result<()> {
427 let version = self
428 .meta_client
429 .create_source(source, graph, if_not_exists)
430 .await?;
431 self.wait_version(version).await
432 }
433
434 async fn create_sink(
435 &self,
436 sink: PbSink,
437 graph: StreamFragmentGraph,
438 affected_table_change: Option<ReplaceJobPlan>,
439 dependencies: HashSet<ObjectId>,
440 if_not_exists: bool,
441 ) -> Result<()> {
442 let version = self
443 .meta_client
444 .create_sink(
445 sink,
446 graph,
447 affected_table_change,
448 dependencies,
449 if_not_exists,
450 )
451 .await?;
452 self.wait_version(version).await
453 }
454
455 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
456 let version = self.meta_client.create_subscription(subscription).await?;
457 self.wait_version(version).await
458 }
459
460 async fn create_function(&self, function: PbFunction) -> Result<()> {
461 let version = self.meta_client.create_function(function).await?;
462 self.wait_version(version).await
463 }
464
465 async fn create_connection(
466 &self,
467 connection_name: String,
468 database_id: u32,
469 schema_id: u32,
470 owner_id: u32,
471 connection: create_connection_request::Payload,
472 ) -> Result<()> {
473 let version = self
474 .meta_client
475 .create_connection(
476 connection_name,
477 database_id,
478 schema_id,
479 owner_id,
480 connection,
481 )
482 .await?;
483 self.wait_version(version).await
484 }
485
486 async fn create_secret(
487 &self,
488 secret_name: String,
489 database_id: u32,
490 schema_id: u32,
491 owner_id: u32,
492 payload: Vec<u8>,
493 ) -> Result<()> {
494 let version = self
495 .meta_client
496 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
497 .await?;
498 self.wait_version(version).await
499 }
500
501 async fn comment_on(&self, comment: PbComment) -> Result<()> {
502 let version = self.meta_client.comment_on(comment).await?;
503 self.wait_version(version).await
504 }
505
506 async fn drop_table(
507 &self,
508 source_id: Option<u32>,
509 table_id: TableId,
510 cascade: bool,
511 ) -> Result<()> {
512 let version = self
513 .meta_client
514 .drop_table(source_id, table_id, cascade)
515 .await?;
516 self.wait_version(version).await
517 }
518
519 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
520 let version = self
521 .meta_client
522 .drop_materialized_view(table_id, cascade)
523 .await?;
524 self.wait_version(version).await
525 }
526
527 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
528 let version = self.meta_client.drop_view(view_id, cascade).await?;
529 self.wait_version(version).await
530 }
531
532 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
533 let version = self.meta_client.drop_source(source_id, cascade).await?;
534 self.wait_version(version).await
535 }
536
537 async fn drop_sink(
538 &self,
539 sink_id: u32,
540 cascade: bool,
541 affected_table_change: Option<ReplaceJobPlan>,
542 ) -> Result<()> {
543 let version = self
544 .meta_client
545 .drop_sink(sink_id, cascade, affected_table_change)
546 .await?;
547 self.wait_version(version).await
548 }
549
550 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
551 let version = self
552 .meta_client
553 .drop_subscription(subscription_id, cascade)
554 .await?;
555 self.wait_version(version).await
556 }
557
558 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
559 let version = self.meta_client.drop_index(index_id, cascade).await?;
560 self.wait_version(version).await
561 }
562
563 async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
564 let version = self.meta_client.drop_function(function_id).await?;
565 self.wait_version(version).await
566 }
567
568 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
569 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
570 self.wait_version(version).await
571 }
572
573 async fn drop_database(&self, database_id: u32) -> Result<()> {
574 let version = self.meta_client.drop_database(database_id).await?;
575 self.wait_version(version).await
576 }
577
578 async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()> {
579 let version = self
580 .meta_client
581 .drop_connection(connection_id, cascade)
582 .await?;
583 self.wait_version(version).await
584 }
585
586 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
587 let version = self.meta_client.drop_secret(secret_id).await?;
588 self.wait_version(version).await
589 }
590
591 async fn alter_name(
592 &self,
593 object_id: alter_name_request::Object,
594 object_name: &str,
595 ) -> Result<()> {
596 let version = self.meta_client.alter_name(object_id, object_name).await?;
597 self.wait_version(version).await
598 }
599
600 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
601 let version = self.meta_client.alter_owner(object, owner_id).await?;
602 self.wait_version(version).await
603 }
604
605 async fn alter_set_schema(
606 &self,
607 object: alter_set_schema_request::Object,
608 new_schema_id: u32,
609 ) -> Result<()> {
610 let version = self
611 .meta_client
612 .alter_set_schema(object, new_schema_id)
613 .await?;
614 self.wait_version(version).await
615 }
616
617 async fn alter_source(&self, source: PbSource) -> Result<()> {
618 let version = self.meta_client.alter_source(source).await?;
619 self.wait_version(version).await
620 }
621
622 async fn alter_parallelism(
623 &self,
624 job_id: u32,
625 parallelism: PbTableParallelism,
626 deferred: bool,
627 ) -> Result<()> {
628 self.meta_client
629 .alter_parallelism(job_id, parallelism, deferred)
630 .await
631 .map_err(|e| anyhow!(e))?;
632
633 Ok(())
634 }
635
636 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
637 let version = self.meta_client.alter_swap_rename(object).await?;
638 self.wait_version(version).await
639 }
640
641 async fn alter_secret(
642 &self,
643 secret_id: u32,
644 secret_name: String,
645 database_id: u32,
646 schema_id: u32,
647 owner_id: u32,
648 payload: Vec<u8>,
649 ) -> Result<()> {
650 let version = self
651 .meta_client
652 .alter_secret(
653 secret_id,
654 secret_name,
655 database_id,
656 schema_id,
657 owner_id,
658 payload,
659 )
660 .await?;
661 self.wait_version(version).await
662 }
663
664 async fn alter_resource_group(
665 &self,
666 table_id: u32,
667 resource_group: Option<String>,
668 deferred: bool,
669 ) -> Result<()> {
670 self.meta_client
671 .alter_resource_group(table_id, resource_group, deferred)
672 .await
673 .map_err(|e| anyhow!(e))?;
674
675 Ok(())
676 }
677
678 async fn alter_database_param(
679 &self,
680 database_id: DatabaseId,
681 param: AlterDatabaseParam,
682 ) -> Result<()> {
683 let version = self
684 .meta_client
685 .alter_database_param(database_id, param)
686 .await
687 .map_err(|e| anyhow!(e))?;
688 self.wait_version(version).await
689 }
690}
691
692impl CatalogWriterImpl {
693 pub fn new(
694 meta_client: MetaClient,
695 catalog_updated_rx: Receiver<CatalogVersion>,
696 hummock_snapshot_manager: HummockSnapshotManagerRef,
697 ) -> Self {
698 Self {
699 meta_client,
700 catalog_updated_rx,
701 hummock_snapshot_manager,
702 }
703 }
704
705 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
706 let mut rx = self.catalog_updated_rx.clone();
707 while *rx.borrow_and_update() < version.catalog_version {
708 rx.changed().await.map_err(|e| anyhow!(e))?;
709 }
710 self.hummock_snapshot_manager
711 .wait(HummockVersionId::new(version.hummock_version_id))
712 .await;
713 Ok(())
714 }
715}