1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{
30 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
31};
32use risingwave_pb::ddl_service::{
33 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
34 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
35};
36use risingwave_pb::meta::PbTableParallelism;
37use risingwave_pb::stream_plan::StreamFragmentGraph;
38use risingwave_rpc_client::MetaClient;
39use tokio::sync::watch::Receiver;
40
41use super::root_catalog::Catalog;
42use super::{DatabaseId, SecretId, TableId};
43use crate::error::Result;
44use crate::scheduler::HummockSnapshotManagerRef;
45use crate::session::current::notice_to_user;
46use crate::user::UserId;
47
48pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
49
50#[derive(Clone)]
52pub struct CatalogReader(Arc<RwLock<Catalog>>);
53
54impl CatalogReader {
55 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
56 CatalogReader(inner)
57 }
58
59 pub fn read_guard(&self) -> CatalogReadGuard {
60 self.0.read_arc_recursive()
62 }
63}
64
65#[async_trait::async_trait]
70pub trait CatalogWriter: Send + Sync {
71 async fn create_database(
72 &self,
73 db_name: &str,
74 owner: UserId,
75 resource_group: &str,
76 barrier_interval_ms: Option<u32>,
77 checkpoint_frequency: Option<u64>,
78 ) -> Result<()>;
79
80 async fn create_schema(
81 &self,
82 db_id: DatabaseId,
83 schema_name: &str,
84 owner: UserId,
85 ) -> Result<()>;
86
87 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
88
89 async fn create_materialized_view(
90 &self,
91 table: PbTable,
92 graph: StreamFragmentGraph,
93 dependencies: HashSet<ObjectId>,
94 specific_resource_group: Option<String>,
95 if_not_exists: bool,
96 ) -> Result<()>;
97
98 async fn replace_materialized_view(
99 &self,
100 table: PbTable,
101 graph: StreamFragmentGraph,
102 ) -> Result<()>;
103
104 async fn create_table(
105 &self,
106 source: Option<PbSource>,
107 table: PbTable,
108 graph: StreamFragmentGraph,
109 job_type: PbTableJobType,
110 if_not_exists: bool,
111 dependencies: HashSet<ObjectId>,
112 ) -> Result<()>;
113
114 async fn replace_table(
115 &self,
116 source: Option<PbSource>,
117 table: PbTable,
118 graph: StreamFragmentGraph,
119 job_type: TableJobType,
120 ) -> Result<()>;
121
122 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
123
124 async fn create_index(
125 &self,
126 index: PbIndex,
127 table: PbTable,
128 graph: StreamFragmentGraph,
129 if_not_exists: bool,
130 ) -> Result<()>;
131
132 async fn create_source(
133 &self,
134 source: PbSource,
135 graph: Option<StreamFragmentGraph>,
136 if_not_exists: bool,
137 ) -> Result<()>;
138
139 async fn create_sink(
140 &self,
141 sink: PbSink,
142 graph: StreamFragmentGraph,
143 dependencies: HashSet<ObjectId>,
144 if_not_exists: bool,
145 ) -> Result<()>;
146
147 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
148
149 async fn create_function(&self, function: PbFunction) -> Result<()>;
150
151 async fn create_connection(
152 &self,
153 connection_name: String,
154 database_id: u32,
155 schema_id: u32,
156 owner_id: u32,
157 connection: create_connection_request::Payload,
158 ) -> Result<()>;
159
160 async fn create_secret(
161 &self,
162 secret_name: String,
163 database_id: u32,
164 schema_id: u32,
165 owner_id: u32,
166 payload: Vec<u8>,
167 ) -> Result<()>;
168
169 async fn comment_on(&self, comment: PbComment) -> Result<()>;
170
171 async fn drop_table(
172 &self,
173 source_id: Option<u32>,
174 table_id: TableId,
175 cascade: bool,
176 ) -> Result<()>;
177
178 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
179
180 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
181
182 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
183
184 async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()>;
185
186 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
187
188 async fn drop_database(&self, database_id: u32) -> Result<()>;
189
190 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
191
192 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
193
194 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
195
196 async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()>;
197
198 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
199
200 async fn alter_secret(
201 &self,
202 secret_id: u32,
203 secret_name: String,
204 database_id: u32,
205 schema_id: u32,
206 owner_id: u32,
207 payload: Vec<u8>,
208 ) -> Result<()>;
209
210 async fn alter_name(
211 &self,
212 object_id: alter_name_request::Object,
213 object_name: &str,
214 ) -> Result<()>;
215
216 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
217
218 async fn alter_source(&self, source: PbSource) -> Result<()>;
220
221 async fn alter_parallelism(
222 &self,
223 job_id: u32,
224 parallelism: PbTableParallelism,
225 deferred: bool,
226 ) -> Result<()>;
227
228 async fn alter_resource_group(
229 &self,
230 table_id: u32,
231 resource_group: Option<String>,
232 deferred: bool,
233 ) -> Result<()>;
234
235 async fn alter_set_schema(
236 &self,
237 object: alter_set_schema_request::Object,
238 new_schema_id: u32,
239 ) -> Result<()>;
240
241 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
242
243 async fn alter_database_param(
244 &self,
245 database_id: DatabaseId,
246 param: AlterDatabaseParam,
247 ) -> Result<()>;
248}
249
250#[derive(Clone)]
251pub struct CatalogWriterImpl {
252 meta_client: MetaClient,
253 catalog_updated_rx: Receiver<CatalogVersion>,
254 hummock_snapshot_manager: HummockSnapshotManagerRef,
255}
256
257#[async_trait::async_trait]
258impl CatalogWriter for CatalogWriterImpl {
259 async fn create_database(
260 &self,
261 db_name: &str,
262 owner: UserId,
263 resource_group: &str,
264 barrier_interval_ms: Option<u32>,
265 checkpoint_frequency: Option<u64>,
266 ) -> Result<()> {
267 let version = self
268 .meta_client
269 .create_database(PbDatabase {
270 name: db_name.to_owned(),
271 id: 0,
272 owner,
273 resource_group: resource_group.to_owned(),
274 barrier_interval_ms,
275 checkpoint_frequency,
276 })
277 .await?;
278 self.wait_version(version).await
279 }
280
281 async fn create_schema(
282 &self,
283 db_id: DatabaseId,
284 schema_name: &str,
285 owner: UserId,
286 ) -> Result<()> {
287 let version = self
288 .meta_client
289 .create_schema(PbSchema {
290 id: 0,
291 name: schema_name.to_owned(),
292 database_id: db_id,
293 owner,
294 })
295 .await?;
296 self.wait_version(version).await
297 }
298
299 async fn create_materialized_view(
301 &self,
302 table: PbTable,
303 graph: StreamFragmentGraph,
304 dependencies: HashSet<ObjectId>,
305 specific_resource_group: Option<String>,
306 if_not_exists: bool,
307 ) -> Result<()> {
308 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
309 let version = self
310 .meta_client
311 .create_materialized_view(
312 table,
313 graph,
314 dependencies,
315 specific_resource_group,
316 if_not_exists,
317 )
318 .await?;
319 if matches!(create_type, PbCreateType::Foreground) {
320 self.wait_version(version).await?
321 }
322 Ok(())
323 }
324
325 async fn replace_materialized_view(
326 &self,
327 table: PbTable,
328 graph: StreamFragmentGraph,
329 ) -> Result<()> {
330 notice_to_user(format!("table: {table:#?}"));
332 notice_to_user(format!("graph: {graph:#?}"));
333
334 let version = self
335 .meta_client
336 .replace_job(
337 graph,
338 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
339 )
340 .await?;
341
342 self.wait_version(version).await
343 }
344
345 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
346 let version = self.meta_client.create_view(view, dependencies).await?;
347 self.wait_version(version).await
348 }
349
350 async fn create_index(
351 &self,
352 index: PbIndex,
353 table: PbTable,
354 graph: StreamFragmentGraph,
355 if_not_exists: bool,
356 ) -> Result<()> {
357 let version = self
358 .meta_client
359 .create_index(index, table, graph, if_not_exists)
360 .await?;
361 self.wait_version(version).await
362 }
363
364 async fn create_table(
365 &self,
366 source: Option<PbSource>,
367 table: PbTable,
368 graph: StreamFragmentGraph,
369 job_type: PbTableJobType,
370 if_not_exists: bool,
371 dependencies: HashSet<ObjectId>,
372 ) -> Result<()> {
373 let version = self
374 .meta_client
375 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
376 .await?;
377 self.wait_version(version).await
378 }
379
380 async fn replace_table(
381 &self,
382 source: Option<PbSource>,
383 table: PbTable,
384 graph: StreamFragmentGraph,
385 job_type: TableJobType,
386 ) -> Result<()> {
387 let version = self
388 .meta_client
389 .replace_job(
390 graph,
391 ReplaceJob::ReplaceTable(ReplaceTable {
392 source,
393 table: Some(table),
394 job_type: job_type as _,
395 }),
396 )
397 .await?;
398 self.wait_version(version).await
399 }
400
401 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
402 let version = self
403 .meta_client
404 .replace_job(
405 graph,
406 ReplaceJob::ReplaceSource(ReplaceSource {
407 source: Some(source),
408 }),
409 )
410 .await?;
411 self.wait_version(version).await
412 }
413
414 async fn create_source(
415 &self,
416 source: PbSource,
417 graph: Option<StreamFragmentGraph>,
418 if_not_exists: bool,
419 ) -> Result<()> {
420 let version = self
421 .meta_client
422 .create_source(source, graph, if_not_exists)
423 .await?;
424 self.wait_version(version).await
425 }
426
427 async fn create_sink(
428 &self,
429 sink: PbSink,
430 graph: StreamFragmentGraph,
431 dependencies: HashSet<ObjectId>,
432 if_not_exists: bool,
433 ) -> Result<()> {
434 let version = self
435 .meta_client
436 .create_sink(sink, graph, dependencies, if_not_exists)
437 .await?;
438 self.wait_version(version).await
439 }
440
441 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
442 let version = self.meta_client.create_subscription(subscription).await?;
443 self.wait_version(version).await
444 }
445
446 async fn create_function(&self, function: PbFunction) -> Result<()> {
447 let version = self.meta_client.create_function(function).await?;
448 self.wait_version(version).await
449 }
450
451 async fn create_connection(
452 &self,
453 connection_name: String,
454 database_id: u32,
455 schema_id: u32,
456 owner_id: u32,
457 connection: create_connection_request::Payload,
458 ) -> Result<()> {
459 let version = self
460 .meta_client
461 .create_connection(
462 connection_name,
463 database_id,
464 schema_id,
465 owner_id,
466 connection,
467 )
468 .await?;
469 self.wait_version(version).await
470 }
471
472 async fn create_secret(
473 &self,
474 secret_name: String,
475 database_id: u32,
476 schema_id: u32,
477 owner_id: u32,
478 payload: Vec<u8>,
479 ) -> Result<()> {
480 let version = self
481 .meta_client
482 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
483 .await?;
484 self.wait_version(version).await
485 }
486
487 async fn comment_on(&self, comment: PbComment) -> Result<()> {
488 let version = self.meta_client.comment_on(comment).await?;
489 self.wait_version(version).await
490 }
491
492 async fn drop_table(
493 &self,
494 source_id: Option<u32>,
495 table_id: TableId,
496 cascade: bool,
497 ) -> Result<()> {
498 let version = self
499 .meta_client
500 .drop_table(source_id, table_id, cascade)
501 .await?;
502 self.wait_version(version).await
503 }
504
505 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
506 let version = self
507 .meta_client
508 .drop_materialized_view(table_id, cascade)
509 .await?;
510 self.wait_version(version).await
511 }
512
513 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
514 let version = self.meta_client.drop_view(view_id, cascade).await?;
515 self.wait_version(version).await
516 }
517
518 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
519 let version = self.meta_client.drop_source(source_id, cascade).await?;
520 self.wait_version(version).await
521 }
522
523 async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
524 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
525 self.wait_version(version).await
526 }
527
528 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
529 let version = self
530 .meta_client
531 .drop_subscription(subscription_id, cascade)
532 .await?;
533 self.wait_version(version).await
534 }
535
536 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
537 let version = self.meta_client.drop_index(index_id, cascade).await?;
538 self.wait_version(version).await
539 }
540
541 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
542 let version = self.meta_client.drop_function(function_id, cascade).await?;
543 self.wait_version(version).await
544 }
545
546 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
547 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
548 self.wait_version(version).await
549 }
550
551 async fn drop_database(&self, database_id: u32) -> Result<()> {
552 let version = self.meta_client.drop_database(database_id).await?;
553 self.wait_version(version).await
554 }
555
556 async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()> {
557 let version = self
558 .meta_client
559 .drop_connection(connection_id, cascade)
560 .await?;
561 self.wait_version(version).await
562 }
563
564 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
565 let version = self.meta_client.drop_secret(secret_id).await?;
566 self.wait_version(version).await
567 }
568
569 async fn alter_name(
570 &self,
571 object_id: alter_name_request::Object,
572 object_name: &str,
573 ) -> Result<()> {
574 let version = self.meta_client.alter_name(object_id, object_name).await?;
575 self.wait_version(version).await
576 }
577
578 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
579 let version = self.meta_client.alter_owner(object, owner_id).await?;
580 self.wait_version(version).await
581 }
582
583 async fn alter_set_schema(
584 &self,
585 object: alter_set_schema_request::Object,
586 new_schema_id: u32,
587 ) -> Result<()> {
588 let version = self
589 .meta_client
590 .alter_set_schema(object, new_schema_id)
591 .await?;
592 self.wait_version(version).await
593 }
594
595 async fn alter_source(&self, source: PbSource) -> Result<()> {
596 let version = self.meta_client.alter_source(source).await?;
597 self.wait_version(version).await
598 }
599
600 async fn alter_parallelism(
601 &self,
602 job_id: u32,
603 parallelism: PbTableParallelism,
604 deferred: bool,
605 ) -> Result<()> {
606 self.meta_client
607 .alter_parallelism(job_id, parallelism, deferred)
608 .await
609 .map_err(|e| anyhow!(e))?;
610
611 Ok(())
612 }
613
614 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
615 let version = self.meta_client.alter_swap_rename(object).await?;
616 self.wait_version(version).await
617 }
618
619 async fn alter_secret(
620 &self,
621 secret_id: u32,
622 secret_name: String,
623 database_id: u32,
624 schema_id: u32,
625 owner_id: u32,
626 payload: Vec<u8>,
627 ) -> Result<()> {
628 let version = self
629 .meta_client
630 .alter_secret(
631 secret_id,
632 secret_name,
633 database_id,
634 schema_id,
635 owner_id,
636 payload,
637 )
638 .await?;
639 self.wait_version(version).await
640 }
641
642 async fn alter_resource_group(
643 &self,
644 table_id: u32,
645 resource_group: Option<String>,
646 deferred: bool,
647 ) -> Result<()> {
648 self.meta_client
649 .alter_resource_group(table_id, resource_group, deferred)
650 .await
651 .map_err(|e| anyhow!(e))?;
652
653 Ok(())
654 }
655
656 async fn alter_database_param(
657 &self,
658 database_id: DatabaseId,
659 param: AlterDatabaseParam,
660 ) -> Result<()> {
661 let version = self
662 .meta_client
663 .alter_database_param(database_id, param)
664 .await
665 .map_err(|e| anyhow!(e))?;
666 self.wait_version(version).await
667 }
668}
669
670impl CatalogWriterImpl {
671 pub fn new(
672 meta_client: MetaClient,
673 catalog_updated_rx: Receiver<CatalogVersion>,
674 hummock_snapshot_manager: HummockSnapshotManagerRef,
675 ) -> Self {
676 Self {
677 meta_client,
678 catalog_updated_rx,
679 hummock_snapshot_manager,
680 }
681 }
682
683 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
684 let mut rx = self.catalog_updated_rx.clone();
685 while *rx.borrow_and_update() < version.catalog_version {
686 rx.changed().await.map_err(|e| anyhow!(e))?;
687 }
688 self.hummock_snapshot_manager
689 .wait(HummockVersionId::new(version.hummock_version_id))
690 .await;
691 Ok(())
692 }
693}