1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{
30 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
31};
32use risingwave_pb::ddl_service::{
33 PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
34 alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
35 create_connection_request,
36};
37use risingwave_pb::meta::PbTableParallelism;
38use risingwave_pb::stream_plan::StreamFragmentGraph;
39use risingwave_rpc_client::MetaClient;
40use tokio::sync::watch::Receiver;
41
42use super::root_catalog::Catalog;
43use super::{DatabaseId, SecretId, TableId};
44use crate::error::Result;
45use crate::scheduler::HummockSnapshotManagerRef;
46use crate::session::current::notice_to_user;
47use crate::user::UserId;
48
49pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
50
51#[derive(Clone)]
53pub struct CatalogReader(Arc<RwLock<Catalog>>);
54
55impl CatalogReader {
56 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
57 CatalogReader(inner)
58 }
59
60 pub fn read_guard(&self) -> CatalogReadGuard {
61 self.0.read_arc_recursive()
63 }
64}
65
66#[async_trait::async_trait]
71pub trait CatalogWriter: Send + Sync {
72 async fn create_database(
73 &self,
74 db_name: &str,
75 owner: UserId,
76 resource_group: &str,
77 barrier_interval_ms: Option<u32>,
78 checkpoint_frequency: Option<u64>,
79 ) -> Result<()>;
80
81 async fn create_schema(
82 &self,
83 db_id: DatabaseId,
84 schema_name: &str,
85 owner: UserId,
86 ) -> Result<()>;
87
88 async fn create_view(&self, view: PbView) -> Result<()>;
89
90 async fn create_materialized_view(
91 &self,
92 table: PbTable,
93 graph: StreamFragmentGraph,
94 dependencies: HashSet<ObjectId>,
95 specific_resource_group: Option<String>,
96 if_not_exists: bool,
97 ) -> Result<()>;
98
99 async fn replace_materialized_view(
100 &self,
101 table: PbTable,
102 graph: StreamFragmentGraph,
103 ) -> Result<()>;
104
105 async fn create_table(
106 &self,
107 source: Option<PbSource>,
108 table: PbTable,
109 graph: StreamFragmentGraph,
110 job_type: PbTableJobType,
111 if_not_exists: bool,
112 ) -> Result<()>;
113
114 async fn replace_table(
115 &self,
116 source: Option<PbSource>,
117 table: PbTable,
118 graph: StreamFragmentGraph,
119 job_type: TableJobType,
120 ) -> Result<()>;
121
122 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
123
124 async fn create_index(
125 &self,
126 index: PbIndex,
127 table: PbTable,
128 graph: StreamFragmentGraph,
129 if_not_exists: bool,
130 ) -> Result<()>;
131
132 async fn create_source(
133 &self,
134 source: PbSource,
135 graph: Option<StreamFragmentGraph>,
136 if_not_exists: bool,
137 ) -> Result<()>;
138
139 async fn create_sink(
140 &self,
141 sink: PbSink,
142 graph: StreamFragmentGraph,
143 affected_table_change: Option<PbReplaceJobPlan>,
144 dependencies: HashSet<ObjectId>,
145 if_not_exists: bool,
146 ) -> Result<()>;
147
148 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
149
150 async fn create_function(&self, function: PbFunction) -> Result<()>;
151
152 async fn create_connection(
153 &self,
154 connection_name: String,
155 database_id: u32,
156 schema_id: u32,
157 owner_id: u32,
158 connection: create_connection_request::Payload,
159 ) -> Result<()>;
160
161 async fn create_secret(
162 &self,
163 secret_name: String,
164 database_id: u32,
165 schema_id: u32,
166 owner_id: u32,
167 payload: Vec<u8>,
168 ) -> Result<()>;
169
170 async fn comment_on(&self, comment: PbComment) -> Result<()>;
171
172 async fn drop_table(
173 &self,
174 source_id: Option<u32>,
175 table_id: TableId,
176 cascade: bool,
177 ) -> Result<()>;
178
179 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
180
181 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
182
183 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
184
185 async fn drop_sink(
186 &self,
187 sink_id: u32,
188 cascade: bool,
189 affected_table_change: Option<PbReplaceJobPlan>,
190 ) -> Result<()>;
191
192 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
193
194 async fn drop_database(&self, database_id: u32) -> Result<()>;
195
196 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
197
198 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
199
200 async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
201
202 async fn drop_connection(&self, connection_id: u32) -> Result<()>;
203
204 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
205
206 async fn alter_secret(
207 &self,
208 secret_id: u32,
209 secret_name: String,
210 database_id: u32,
211 schema_id: u32,
212 owner_id: u32,
213 payload: Vec<u8>,
214 ) -> Result<()>;
215
216 async fn alter_name(
217 &self,
218 object_id: alter_name_request::Object,
219 object_name: &str,
220 ) -> Result<()>;
221
222 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
223
224 async fn alter_source(&self, source: PbSource) -> Result<()>;
226
227 async fn alter_parallelism(
228 &self,
229 job_id: u32,
230 parallelism: PbTableParallelism,
231 deferred: bool,
232 ) -> Result<()>;
233
234 async fn alter_resource_group(
235 &self,
236 table_id: u32,
237 resource_group: Option<String>,
238 deferred: bool,
239 ) -> Result<()>;
240
241 async fn alter_set_schema(
242 &self,
243 object: alter_set_schema_request::Object,
244 new_schema_id: u32,
245 ) -> Result<()>;
246
247 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
248
249 async fn alter_database_param(
250 &self,
251 database_id: DatabaseId,
252 param: AlterDatabaseParam,
253 ) -> Result<()>;
254}
255
256#[derive(Clone)]
257pub struct CatalogWriterImpl {
258 meta_client: MetaClient,
259 catalog_updated_rx: Receiver<CatalogVersion>,
260 hummock_snapshot_manager: HummockSnapshotManagerRef,
261}
262
263#[async_trait::async_trait]
264impl CatalogWriter for CatalogWriterImpl {
265 async fn create_database(
266 &self,
267 db_name: &str,
268 owner: UserId,
269 resource_group: &str,
270 barrier_interval_ms: Option<u32>,
271 checkpoint_frequency: Option<u64>,
272 ) -> Result<()> {
273 let version = self
274 .meta_client
275 .create_database(PbDatabase {
276 name: db_name.to_owned(),
277 id: 0,
278 owner,
279 resource_group: resource_group.to_owned(),
280 barrier_interval_ms,
281 checkpoint_frequency,
282 })
283 .await?;
284 self.wait_version(version).await
285 }
286
287 async fn create_schema(
288 &self,
289 db_id: DatabaseId,
290 schema_name: &str,
291 owner: UserId,
292 ) -> Result<()> {
293 let version = self
294 .meta_client
295 .create_schema(PbSchema {
296 id: 0,
297 name: schema_name.to_owned(),
298 database_id: db_id,
299 owner,
300 })
301 .await?;
302 self.wait_version(version).await
303 }
304
305 async fn create_materialized_view(
307 &self,
308 table: PbTable,
309 graph: StreamFragmentGraph,
310 dependencies: HashSet<ObjectId>,
311 specific_resource_group: Option<String>,
312 if_not_exists: bool,
313 ) -> Result<()> {
314 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
315 let version = self
316 .meta_client
317 .create_materialized_view(
318 table,
319 graph,
320 dependencies,
321 specific_resource_group,
322 if_not_exists,
323 )
324 .await?;
325 if matches!(create_type, PbCreateType::Foreground) {
326 self.wait_version(version).await?
327 }
328 Ok(())
329 }
330
331 async fn replace_materialized_view(
332 &self,
333 table: PbTable,
334 graph: StreamFragmentGraph,
335 ) -> Result<()> {
336 notice_to_user(format!("table: {table:#?}"));
338 notice_to_user(format!("graph: {graph:#?}"));
339
340 let version = self
341 .meta_client
342 .replace_job(
343 graph,
344 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
345 )
346 .await?;
347
348 self.wait_version(version).await
349 }
350
351 async fn create_view(&self, view: PbView) -> Result<()> {
352 let version = self.meta_client.create_view(view).await?;
353 self.wait_version(version).await
354 }
355
356 async fn create_index(
357 &self,
358 index: PbIndex,
359 table: PbTable,
360 graph: StreamFragmentGraph,
361 if_not_exists: bool,
362 ) -> Result<()> {
363 let version = self
364 .meta_client
365 .create_index(index, table, graph, if_not_exists)
366 .await?;
367 self.wait_version(version).await
368 }
369
370 async fn create_table(
371 &self,
372 source: Option<PbSource>,
373 table: PbTable,
374 graph: StreamFragmentGraph,
375 job_type: PbTableJobType,
376 if_not_exists: bool,
377 ) -> Result<()> {
378 let version = self
379 .meta_client
380 .create_table(source, table, graph, job_type, if_not_exists)
381 .await?;
382 self.wait_version(version).await
383 }
384
385 async fn replace_table(
386 &self,
387 source: Option<PbSource>,
388 table: PbTable,
389 graph: StreamFragmentGraph,
390 job_type: TableJobType,
391 ) -> Result<()> {
392 let version = self
393 .meta_client
394 .replace_job(
395 graph,
396 ReplaceJob::ReplaceTable(ReplaceTable {
397 source,
398 table: Some(table),
399 job_type: job_type as _,
400 }),
401 )
402 .await?;
403 self.wait_version(version).await
404 }
405
406 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
407 let version = self
408 .meta_client
409 .replace_job(
410 graph,
411 ReplaceJob::ReplaceSource(ReplaceSource {
412 source: Some(source),
413 }),
414 )
415 .await?;
416 self.wait_version(version).await
417 }
418
419 async fn create_source(
420 &self,
421 source: PbSource,
422 graph: Option<StreamFragmentGraph>,
423 if_not_exists: bool,
424 ) -> Result<()> {
425 let version = self
426 .meta_client
427 .create_source(source, graph, if_not_exists)
428 .await?;
429 self.wait_version(version).await
430 }
431
432 async fn create_sink(
433 &self,
434 sink: PbSink,
435 graph: StreamFragmentGraph,
436 affected_table_change: Option<ReplaceJobPlan>,
437 dependencies: HashSet<ObjectId>,
438 if_not_exists: bool,
439 ) -> Result<()> {
440 let version = self
441 .meta_client
442 .create_sink(
443 sink,
444 graph,
445 affected_table_change,
446 dependencies,
447 if_not_exists,
448 )
449 .await?;
450 self.wait_version(version).await
451 }
452
453 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
454 let version = self.meta_client.create_subscription(subscription).await?;
455 self.wait_version(version).await
456 }
457
458 async fn create_function(&self, function: PbFunction) -> Result<()> {
459 let version = self.meta_client.create_function(function).await?;
460 self.wait_version(version).await
461 }
462
463 async fn create_connection(
464 &self,
465 connection_name: String,
466 database_id: u32,
467 schema_id: u32,
468 owner_id: u32,
469 connection: create_connection_request::Payload,
470 ) -> Result<()> {
471 let version = self
472 .meta_client
473 .create_connection(
474 connection_name,
475 database_id,
476 schema_id,
477 owner_id,
478 connection,
479 )
480 .await?;
481 self.wait_version(version).await
482 }
483
484 async fn create_secret(
485 &self,
486 secret_name: String,
487 database_id: u32,
488 schema_id: u32,
489 owner_id: u32,
490 payload: Vec<u8>,
491 ) -> Result<()> {
492 let version = self
493 .meta_client
494 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
495 .await?;
496 self.wait_version(version).await
497 }
498
499 async fn comment_on(&self, comment: PbComment) -> Result<()> {
500 let version = self.meta_client.comment_on(comment).await?;
501 self.wait_version(version).await
502 }
503
504 async fn drop_table(
505 &self,
506 source_id: Option<u32>,
507 table_id: TableId,
508 cascade: bool,
509 ) -> Result<()> {
510 let version = self
511 .meta_client
512 .drop_table(source_id, table_id, cascade)
513 .await?;
514 self.wait_version(version).await
515 }
516
517 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
518 let version = self
519 .meta_client
520 .drop_materialized_view(table_id, cascade)
521 .await?;
522 self.wait_version(version).await
523 }
524
525 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
526 let version = self.meta_client.drop_view(view_id, cascade).await?;
527 self.wait_version(version).await
528 }
529
530 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
531 let version = self.meta_client.drop_source(source_id, cascade).await?;
532 self.wait_version(version).await
533 }
534
535 async fn drop_sink(
536 &self,
537 sink_id: u32,
538 cascade: bool,
539 affected_table_change: Option<ReplaceJobPlan>,
540 ) -> Result<()> {
541 let version = self
542 .meta_client
543 .drop_sink(sink_id, cascade, affected_table_change)
544 .await?;
545 self.wait_version(version).await
546 }
547
548 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
549 let version = self
550 .meta_client
551 .drop_subscription(subscription_id, cascade)
552 .await?;
553 self.wait_version(version).await
554 }
555
556 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
557 let version = self.meta_client.drop_index(index_id, cascade).await?;
558 self.wait_version(version).await
559 }
560
561 async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
562 let version = self.meta_client.drop_function(function_id).await?;
563 self.wait_version(version).await
564 }
565
566 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
567 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
568 self.wait_version(version).await
569 }
570
571 async fn drop_database(&self, database_id: u32) -> Result<()> {
572 let version = self.meta_client.drop_database(database_id).await?;
573 self.wait_version(version).await
574 }
575
576 async fn drop_connection(&self, connection_id: u32) -> Result<()> {
577 let version = self.meta_client.drop_connection(connection_id).await?;
578 self.wait_version(version).await
579 }
580
581 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
582 let version = self.meta_client.drop_secret(secret_id).await?;
583 self.wait_version(version).await
584 }
585
586 async fn alter_name(
587 &self,
588 object_id: alter_name_request::Object,
589 object_name: &str,
590 ) -> Result<()> {
591 let version = self.meta_client.alter_name(object_id, object_name).await?;
592 self.wait_version(version).await
593 }
594
595 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
596 let version = self.meta_client.alter_owner(object, owner_id).await?;
597 self.wait_version(version).await
598 }
599
600 async fn alter_set_schema(
601 &self,
602 object: alter_set_schema_request::Object,
603 new_schema_id: u32,
604 ) -> Result<()> {
605 let version = self
606 .meta_client
607 .alter_set_schema(object, new_schema_id)
608 .await?;
609 self.wait_version(version).await
610 }
611
612 async fn alter_source(&self, source: PbSource) -> Result<()> {
613 let version = self.meta_client.alter_source(source).await?;
614 self.wait_version(version).await
615 }
616
617 async fn alter_parallelism(
618 &self,
619 job_id: u32,
620 parallelism: PbTableParallelism,
621 deferred: bool,
622 ) -> Result<()> {
623 self.meta_client
624 .alter_parallelism(job_id, parallelism, deferred)
625 .await
626 .map_err(|e| anyhow!(e))?;
627
628 Ok(())
629 }
630
631 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
632 let version = self.meta_client.alter_swap_rename(object).await?;
633 self.wait_version(version).await
634 }
635
636 async fn alter_secret(
637 &self,
638 secret_id: u32,
639 secret_name: String,
640 database_id: u32,
641 schema_id: u32,
642 owner_id: u32,
643 payload: Vec<u8>,
644 ) -> Result<()> {
645 let version = self
646 .meta_client
647 .alter_secret(
648 secret_id,
649 secret_name,
650 database_id,
651 schema_id,
652 owner_id,
653 payload,
654 )
655 .await?;
656 self.wait_version(version).await
657 }
658
659 async fn alter_resource_group(
660 &self,
661 table_id: u32,
662 resource_group: Option<String>,
663 deferred: bool,
664 ) -> Result<()> {
665 self.meta_client
666 .alter_resource_group(table_id, resource_group, deferred)
667 .await
668 .map_err(|e| anyhow!(e))?;
669
670 Ok(())
671 }
672
673 async fn alter_database_param(
674 &self,
675 database_id: DatabaseId,
676 param: AlterDatabaseParam,
677 ) -> Result<()> {
678 let version = self
679 .meta_client
680 .alter_database_param(database_id, param)
681 .await
682 .map_err(|e| anyhow!(e))?;
683 self.wait_version(version).await
684 }
685}
686
687impl CatalogWriterImpl {
688 pub fn new(
689 meta_client: MetaClient,
690 catalog_updated_rx: Receiver<CatalogVersion>,
691 hummock_snapshot_manager: HummockSnapshotManagerRef,
692 ) -> Self {
693 Self {
694 meta_client,
695 catalog_updated_rx,
696 hummock_snapshot_manager,
697 }
698 }
699
700 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
701 let mut rx = self.catalog_updated_rx.clone();
702 while *rx.borrow_and_update() < version.catalog_version {
703 rx.changed().await.map_err(|e| anyhow!(e))?;
704 }
705 self.hummock_snapshot_manager
706 .wait(HummockVersionId::new(version.hummock_version_id))
707 .await;
708 Ok(())
709 }
710}