1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable};
30use risingwave_pb::ddl_service::{
31 PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
32 alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
33 create_connection_request,
34};
35use risingwave_pb::meta::PbTableParallelism;
36use risingwave_pb::stream_plan::StreamFragmentGraph;
37use risingwave_rpc_client::MetaClient;
38use tokio::sync::watch::Receiver;
39
40use super::root_catalog::Catalog;
41use super::{DatabaseId, SecretId, TableId};
42use crate::error::Result;
43use crate::scheduler::HummockSnapshotManagerRef;
44use crate::user::UserId;
45
46pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
47
48#[derive(Clone)]
50pub struct CatalogReader(Arc<RwLock<Catalog>>);
51
52impl CatalogReader {
53 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
54 CatalogReader(inner)
55 }
56
57 pub fn read_guard(&self) -> CatalogReadGuard {
58 self.0.read_arc_recursive()
60 }
61}
62
63#[async_trait::async_trait]
68pub trait CatalogWriter: Send + Sync {
69 async fn create_database(
70 &self,
71 db_name: &str,
72 owner: UserId,
73 resource_group: &str,
74 barrier_interval_ms: Option<u32>,
75 checkpoint_frequency: Option<u64>,
76 ) -> Result<()>;
77
78 async fn create_schema(
79 &self,
80 db_id: DatabaseId,
81 schema_name: &str,
82 owner: UserId,
83 ) -> Result<()>;
84
85 async fn create_view(&self, view: PbView) -> Result<()>;
86
87 async fn create_materialized_view(
88 &self,
89 table: PbTable,
90 graph: StreamFragmentGraph,
91 dependencies: HashSet<ObjectId>,
92 specific_resource_group: Option<String>,
93 if_not_exists: bool,
94 ) -> Result<()>;
95
96 async fn create_table(
97 &self,
98 source: Option<PbSource>,
99 table: PbTable,
100 graph: StreamFragmentGraph,
101 job_type: PbTableJobType,
102 if_not_exists: bool,
103 ) -> Result<()>;
104
105 async fn replace_table(
106 &self,
107 source: Option<PbSource>,
108 table: PbTable,
109 graph: StreamFragmentGraph,
110 job_type: TableJobType,
111 ) -> Result<()>;
112
113 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
114
115 async fn create_index(
116 &self,
117 index: PbIndex,
118 table: PbTable,
119 graph: StreamFragmentGraph,
120 if_not_exists: bool,
121 ) -> Result<()>;
122
123 async fn create_source(
124 &self,
125 source: PbSource,
126 graph: Option<StreamFragmentGraph>,
127 if_not_exists: bool,
128 ) -> Result<()>;
129
130 async fn create_sink(
131 &self,
132 sink: PbSink,
133 graph: StreamFragmentGraph,
134 affected_table_change: Option<PbReplaceJobPlan>,
135 dependencies: HashSet<ObjectId>,
136 if_not_exists: bool,
137 ) -> Result<()>;
138
139 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
140
141 async fn create_function(&self, function: PbFunction) -> Result<()>;
142
143 async fn create_connection(
144 &self,
145 connection_name: String,
146 database_id: u32,
147 schema_id: u32,
148 owner_id: u32,
149 connection: create_connection_request::Payload,
150 ) -> Result<()>;
151
152 async fn create_secret(
153 &self,
154 secret_name: String,
155 database_id: u32,
156 schema_id: u32,
157 owner_id: u32,
158 payload: Vec<u8>,
159 ) -> Result<()>;
160
161 async fn comment_on(&self, comment: PbComment) -> Result<()>;
162
163 async fn drop_table(
164 &self,
165 source_id: Option<u32>,
166 table_id: TableId,
167 cascade: bool,
168 ) -> Result<()>;
169
170 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
171
172 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
173
174 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
175
176 async fn drop_sink(
177 &self,
178 sink_id: u32,
179 cascade: bool,
180 affected_table_change: Option<PbReplaceJobPlan>,
181 ) -> Result<()>;
182
183 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
184
185 async fn drop_database(&self, database_id: u32) -> Result<()>;
186
187 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
188
189 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
190
191 async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
192
193 async fn drop_connection(&self, connection_id: u32) -> Result<()>;
194
195 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
196
197 async fn alter_secret(
198 &self,
199 secret_id: u32,
200 secret_name: String,
201 database_id: u32,
202 schema_id: u32,
203 owner_id: u32,
204 payload: Vec<u8>,
205 ) -> Result<()>;
206
207 async fn alter_name(
208 &self,
209 object_id: alter_name_request::Object,
210 object_name: &str,
211 ) -> Result<()>;
212
213 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
214
215 async fn alter_source(&self, source: PbSource) -> Result<()>;
217
218 async fn alter_parallelism(
219 &self,
220 job_id: u32,
221 parallelism: PbTableParallelism,
222 deferred: bool,
223 ) -> Result<()>;
224
225 async fn alter_resource_group(
226 &self,
227 table_id: u32,
228 resource_group: Option<String>,
229 deferred: bool,
230 ) -> Result<()>;
231
232 async fn alter_set_schema(
233 &self,
234 object: alter_set_schema_request::Object,
235 new_schema_id: u32,
236 ) -> Result<()>;
237
238 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
239
240 async fn alter_database_param(
241 &self,
242 database_id: DatabaseId,
243 param: AlterDatabaseParam,
244 ) -> Result<()>;
245}
246
247#[derive(Clone)]
248pub struct CatalogWriterImpl {
249 meta_client: MetaClient,
250 catalog_updated_rx: Receiver<CatalogVersion>,
251 hummock_snapshot_manager: HummockSnapshotManagerRef,
252}
253
254#[async_trait::async_trait]
255impl CatalogWriter for CatalogWriterImpl {
256 async fn create_database(
257 &self,
258 db_name: &str,
259 owner: UserId,
260 resource_group: &str,
261 barrier_interval_ms: Option<u32>,
262 checkpoint_frequency: Option<u64>,
263 ) -> Result<()> {
264 let version = self
265 .meta_client
266 .create_database(PbDatabase {
267 name: db_name.to_owned(),
268 id: 0,
269 owner,
270 resource_group: resource_group.to_owned(),
271 barrier_interval_ms,
272 checkpoint_frequency,
273 })
274 .await?;
275 self.wait_version(version).await
276 }
277
278 async fn create_schema(
279 &self,
280 db_id: DatabaseId,
281 schema_name: &str,
282 owner: UserId,
283 ) -> Result<()> {
284 let version = self
285 .meta_client
286 .create_schema(PbSchema {
287 id: 0,
288 name: schema_name.to_owned(),
289 database_id: db_id,
290 owner,
291 })
292 .await?;
293 self.wait_version(version).await
294 }
295
296 async fn create_materialized_view(
298 &self,
299 table: PbTable,
300 graph: StreamFragmentGraph,
301 dependencies: HashSet<ObjectId>,
302 specific_resource_group: Option<String>,
303 if_not_exists: bool,
304 ) -> Result<()> {
305 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
306 let version = self
307 .meta_client
308 .create_materialized_view(
309 table,
310 graph,
311 dependencies,
312 specific_resource_group,
313 if_not_exists,
314 )
315 .await?;
316 if matches!(create_type, PbCreateType::Foreground) {
317 self.wait_version(version).await?
318 }
319 Ok(())
320 }
321
322 async fn create_view(&self, view: PbView) -> Result<()> {
323 let version = self.meta_client.create_view(view).await?;
324 self.wait_version(version).await
325 }
326
327 async fn create_index(
328 &self,
329 index: PbIndex,
330 table: PbTable,
331 graph: StreamFragmentGraph,
332 if_not_exists: bool,
333 ) -> Result<()> {
334 let version = self
335 .meta_client
336 .create_index(index, table, graph, if_not_exists)
337 .await?;
338 self.wait_version(version).await
339 }
340
341 async fn create_table(
342 &self,
343 source: Option<PbSource>,
344 table: PbTable,
345 graph: StreamFragmentGraph,
346 job_type: PbTableJobType,
347 if_not_exists: bool,
348 ) -> Result<()> {
349 let version = self
350 .meta_client
351 .create_table(source, table, graph, job_type, if_not_exists)
352 .await?;
353 self.wait_version(version).await
354 }
355
356 async fn replace_table(
357 &self,
358 source: Option<PbSource>,
359 table: PbTable,
360 graph: StreamFragmentGraph,
361 job_type: TableJobType,
362 ) -> Result<()> {
363 let version = self
364 .meta_client
365 .replace_job(
366 graph,
367 ReplaceJob::ReplaceTable(ReplaceTable {
368 source,
369 table: Some(table),
370 job_type: job_type as _,
371 }),
372 )
373 .await?;
374 self.wait_version(version).await
375 }
376
377 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
378 let version = self
379 .meta_client
380 .replace_job(
381 graph,
382 ReplaceJob::ReplaceSource(ReplaceSource {
383 source: Some(source),
384 }),
385 )
386 .await?;
387 self.wait_version(version).await
388 }
389
390 async fn create_source(
391 &self,
392 source: PbSource,
393 graph: Option<StreamFragmentGraph>,
394 if_not_exists: bool,
395 ) -> Result<()> {
396 let version = self
397 .meta_client
398 .create_source(source, graph, if_not_exists)
399 .await?;
400 self.wait_version(version).await
401 }
402
403 async fn create_sink(
404 &self,
405 sink: PbSink,
406 graph: StreamFragmentGraph,
407 affected_table_change: Option<ReplaceJobPlan>,
408 dependencies: HashSet<ObjectId>,
409 if_not_exists: bool,
410 ) -> Result<()> {
411 let version = self
412 .meta_client
413 .create_sink(
414 sink,
415 graph,
416 affected_table_change,
417 dependencies,
418 if_not_exists,
419 )
420 .await?;
421 self.wait_version(version).await
422 }
423
424 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
425 let version = self.meta_client.create_subscription(subscription).await?;
426 self.wait_version(version).await
427 }
428
429 async fn create_function(&self, function: PbFunction) -> Result<()> {
430 let version = self.meta_client.create_function(function).await?;
431 self.wait_version(version).await
432 }
433
434 async fn create_connection(
435 &self,
436 connection_name: String,
437 database_id: u32,
438 schema_id: u32,
439 owner_id: u32,
440 connection: create_connection_request::Payload,
441 ) -> Result<()> {
442 let version = self
443 .meta_client
444 .create_connection(
445 connection_name,
446 database_id,
447 schema_id,
448 owner_id,
449 connection,
450 )
451 .await?;
452 self.wait_version(version).await
453 }
454
455 async fn create_secret(
456 &self,
457 secret_name: String,
458 database_id: u32,
459 schema_id: u32,
460 owner_id: u32,
461 payload: Vec<u8>,
462 ) -> Result<()> {
463 let version = self
464 .meta_client
465 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
466 .await?;
467 self.wait_version(version).await
468 }
469
470 async fn comment_on(&self, comment: PbComment) -> Result<()> {
471 let version = self.meta_client.comment_on(comment).await?;
472 self.wait_version(version).await
473 }
474
475 async fn drop_table(
476 &self,
477 source_id: Option<u32>,
478 table_id: TableId,
479 cascade: bool,
480 ) -> Result<()> {
481 let version = self
482 .meta_client
483 .drop_table(source_id, table_id, cascade)
484 .await?;
485 self.wait_version(version).await
486 }
487
488 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
489 let version = self
490 .meta_client
491 .drop_materialized_view(table_id, cascade)
492 .await?;
493 self.wait_version(version).await
494 }
495
496 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
497 let version = self.meta_client.drop_view(view_id, cascade).await?;
498 self.wait_version(version).await
499 }
500
501 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
502 let version = self.meta_client.drop_source(source_id, cascade).await?;
503 self.wait_version(version).await
504 }
505
506 async fn drop_sink(
507 &self,
508 sink_id: u32,
509 cascade: bool,
510 affected_table_change: Option<ReplaceJobPlan>,
511 ) -> Result<()> {
512 let version = self
513 .meta_client
514 .drop_sink(sink_id, cascade, affected_table_change)
515 .await?;
516 self.wait_version(version).await
517 }
518
519 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
520 let version = self
521 .meta_client
522 .drop_subscription(subscription_id, cascade)
523 .await?;
524 self.wait_version(version).await
525 }
526
527 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
528 let version = self.meta_client.drop_index(index_id, cascade).await?;
529 self.wait_version(version).await
530 }
531
532 async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
533 let version = self.meta_client.drop_function(function_id).await?;
534 self.wait_version(version).await
535 }
536
537 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
538 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
539 self.wait_version(version).await
540 }
541
542 async fn drop_database(&self, database_id: u32) -> Result<()> {
543 let version = self.meta_client.drop_database(database_id).await?;
544 self.wait_version(version).await
545 }
546
547 async fn drop_connection(&self, connection_id: u32) -> Result<()> {
548 let version = self.meta_client.drop_connection(connection_id).await?;
549 self.wait_version(version).await
550 }
551
552 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
553 let version = self.meta_client.drop_secret(secret_id).await?;
554 self.wait_version(version).await
555 }
556
557 async fn alter_name(
558 &self,
559 object_id: alter_name_request::Object,
560 object_name: &str,
561 ) -> Result<()> {
562 let version = self.meta_client.alter_name(object_id, object_name).await?;
563 self.wait_version(version).await
564 }
565
566 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
567 let version = self.meta_client.alter_owner(object, owner_id).await?;
568 self.wait_version(version).await
569 }
570
571 async fn alter_set_schema(
572 &self,
573 object: alter_set_schema_request::Object,
574 new_schema_id: u32,
575 ) -> Result<()> {
576 let version = self
577 .meta_client
578 .alter_set_schema(object, new_schema_id)
579 .await?;
580 self.wait_version(version).await
581 }
582
583 async fn alter_source(&self, source: PbSource) -> Result<()> {
584 let version = self.meta_client.alter_source(source).await?;
585 self.wait_version(version).await
586 }
587
588 async fn alter_parallelism(
589 &self,
590 job_id: u32,
591 parallelism: PbTableParallelism,
592 deferred: bool,
593 ) -> Result<()> {
594 self.meta_client
595 .alter_parallelism(job_id, parallelism, deferred)
596 .await
597 .map_err(|e| anyhow!(e))?;
598
599 Ok(())
600 }
601
602 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
603 let version = self.meta_client.alter_swap_rename(object).await?;
604 self.wait_version(version).await
605 }
606
607 async fn alter_secret(
608 &self,
609 secret_id: u32,
610 secret_name: String,
611 database_id: u32,
612 schema_id: u32,
613 owner_id: u32,
614 payload: Vec<u8>,
615 ) -> Result<()> {
616 let version = self
617 .meta_client
618 .alter_secret(
619 secret_id,
620 secret_name,
621 database_id,
622 schema_id,
623 owner_id,
624 payload,
625 )
626 .await?;
627 self.wait_version(version).await
628 }
629
630 async fn alter_resource_group(
631 &self,
632 table_id: u32,
633 resource_group: Option<String>,
634 deferred: bool,
635 ) -> Result<()> {
636 self.meta_client
637 .alter_resource_group(table_id, resource_group, deferred)
638 .await
639 .map_err(|e| anyhow!(e))?;
640
641 Ok(())
642 }
643
644 async fn alter_database_param(
645 &self,
646 database_id: DatabaseId,
647 param: AlterDatabaseParam,
648 ) -> Result<()> {
649 let version = self
650 .meta_client
651 .alter_database_param(database_id, param)
652 .await
653 .map_err(|e| anyhow!(e))?;
654 self.wait_version(version).await
655 }
656}
657
658impl CatalogWriterImpl {
659 pub fn new(
660 meta_client: MetaClient,
661 catalog_updated_rx: Receiver<CatalogVersion>,
662 hummock_snapshot_manager: HummockSnapshotManagerRef,
663 ) -> Self {
664 Self {
665 meta_client,
666 catalog_updated_rx,
667 hummock_snapshot_manager,
668 }
669 }
670
671 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
672 let mut rx = self.catalog_updated_rx.clone();
673 while *rx.borrow_and_update() < version.catalog_version {
674 rx.changed().await.map_err(|e| anyhow!(e))?;
675 }
676 self.hummock_snapshot_manager
677 .wait(HummockVersionId::new(version.hummock_version_id))
678 .await;
679 Ok(())
680 }
681}