1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36 streaming_job_resource_type,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58 CatalogReader(inner)
59 }
60
61 pub fn read_guard(&self) -> CatalogReadGuard {
62 self.0.read_arc_recursive()
64 }
65}
66
67#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73 async fn create_database(
74 &self,
75 db_name: &str,
76 owner: UserId,
77 resource_group: &str,
78 barrier_interval_ms: Option<u32>,
79 checkpoint_frequency: Option<u64>,
80 ) -> Result<()>;
81
82 async fn create_schema(
83 &self,
84 db_id: DatabaseId,
85 schema_name: &str,
86 owner: UserId,
87 ) -> Result<()>;
88
89 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91 async fn create_materialized_view(
92 &self,
93 table: PbTable,
94 graph: StreamFragmentGraph,
95 dependencies: HashSet<ObjectId>,
96 resource_type: streaming_job_resource_type::ResourceType,
97 if_not_exists: bool,
98 ) -> Result<()>;
99
100 async fn replace_materialized_view(
101 &self,
102 table: PbTable,
103 graph: StreamFragmentGraph,
104 ) -> Result<()>;
105
106 async fn create_table(
107 &self,
108 source: Option<PbSource>,
109 table: PbTable,
110 graph: StreamFragmentGraph,
111 job_type: PbTableJobType,
112 if_not_exists: bool,
113 dependencies: HashSet<ObjectId>,
114 ) -> Result<()>;
115
116 async fn replace_table(
117 &self,
118 source: Option<PbSource>,
119 table: PbTable,
120 graph: StreamFragmentGraph,
121 job_type: TableJobType,
122 ) -> Result<()>;
123
124 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126 async fn create_index(
127 &self,
128 index: PbIndex,
129 table: PbTable,
130 graph: StreamFragmentGraph,
131 if_not_exists: bool,
132 ) -> Result<()>;
133
134 async fn create_source(
135 &self,
136 source: PbSource,
137 graph: Option<StreamFragmentGraph>,
138 if_not_exists: bool,
139 ) -> Result<()>;
140
141 async fn create_sink(
142 &self,
143 sink: PbSink,
144 graph: StreamFragmentGraph,
145 dependencies: HashSet<ObjectId>,
146 if_not_exists: bool,
147 ) -> Result<()>;
148
149 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151 async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153 async fn create_connection(
154 &self,
155 connection_name: String,
156 database_id: DatabaseId,
157 schema_id: SchemaId,
158 owner_id: UserId,
159 connection: create_connection_request::Payload,
160 ) -> Result<()>;
161
162 async fn create_secret(
163 &self,
164 secret_name: String,
165 database_id: DatabaseId,
166 schema_id: SchemaId,
167 owner_id: UserId,
168 payload: Vec<u8>,
169 ) -> Result<()>;
170
171 async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173 async fn drop_table(
174 &self,
175 source_id: Option<SourceId>,
176 table_id: TableId,
177 cascade: bool,
178 ) -> Result<()>;
179
180 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186 async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190 async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191 -> Result<()>;
192
193 async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
204
205 async fn alter_secret(
206 &self,
207 secret_id: SecretId,
208 secret_name: String,
209 database_id: DatabaseId,
210 schema_id: SchemaId,
211 owner_id: UserId,
212 payload: Vec<u8>,
213 ) -> Result<()>;
214
215 async fn alter_name(
216 &self,
217 object_id: alter_name_request::Object,
218 object_name: &str,
219 ) -> Result<()>;
220
221 async fn alter_owner(
222 &self,
223 object: alter_owner_request::Object,
224 owner_id: UserId,
225 ) -> Result<()>;
226
227 async fn alter_source(&self, source: PbSource) -> Result<()>;
229
230 async fn alter_parallelism(
231 &self,
232 job_id: JobId,
233 parallelism: PbTableParallelism,
234 deferred: bool,
235 ) -> Result<()>;
236
237 async fn alter_config(
238 &self,
239 job_id: JobId,
240 entries_to_add: HashMap<String, String>,
241 keys_to_remove: Vec<String>,
242 ) -> Result<()>;
243
244 async fn alter_resource_group(
245 &self,
246 table_id: TableId,
247 resource_group: Option<String>,
248 deferred: bool,
249 ) -> Result<()>;
250
251 async fn alter_set_schema(
252 &self,
253 object: alter_set_schema_request::Object,
254 new_schema_id: SchemaId,
255 ) -> Result<()>;
256
257 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
258
259 async fn alter_database_param(
260 &self,
261 database_id: DatabaseId,
262 param: AlterDatabaseParam,
263 ) -> Result<()>;
264
265 async fn create_iceberg_table(
266 &self,
267 table_job_info: PbTableJobInfo,
268 sink_job_info: PbSinkJobInfo,
269 iceberg_source: PbSource,
270 if_not_exists: bool,
271 ) -> Result<()>;
272}
273
274#[derive(Clone)]
275pub struct CatalogWriterImpl {
276 meta_client: MetaClient,
277 catalog_updated_rx: Receiver<CatalogVersion>,
278 hummock_snapshot_manager: HummockSnapshotManagerRef,
279}
280
281#[async_trait::async_trait]
282impl CatalogWriter for CatalogWriterImpl {
283 async fn create_database(
284 &self,
285 db_name: &str,
286 owner: UserId,
287 resource_group: &str,
288 barrier_interval_ms: Option<u32>,
289 checkpoint_frequency: Option<u64>,
290 ) -> Result<()> {
291 let version = self
292 .meta_client
293 .create_database(PbDatabase {
294 name: db_name.to_owned(),
295 id: 0.into(),
296 owner,
297 resource_group: resource_group.to_owned(),
298 barrier_interval_ms,
299 checkpoint_frequency,
300 })
301 .await?;
302 self.wait_version(version).await
303 }
304
305 async fn create_schema(
306 &self,
307 db_id: DatabaseId,
308 schema_name: &str,
309 owner: UserId,
310 ) -> Result<()> {
311 let version = self
312 .meta_client
313 .create_schema(PbSchema {
314 id: 0.into(),
315 name: schema_name.to_owned(),
316 database_id: db_id,
317 owner,
318 })
319 .await?;
320 self.wait_version(version).await
321 }
322
323 async fn create_materialized_view(
325 &self,
326 table: PbTable,
327 graph: StreamFragmentGraph,
328 dependencies: HashSet<ObjectId>,
329 resource_type: streaming_job_resource_type::ResourceType,
330 if_not_exists: bool,
331 ) -> Result<()> {
332 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
333 let version = self
334 .meta_client
335 .create_materialized_view(table, graph, dependencies, resource_type, if_not_exists)
336 .await?;
337 if matches!(create_type, PbCreateType::Foreground) {
338 self.wait_version(version).await?
339 }
340 Ok(())
341 }
342
343 async fn replace_materialized_view(
344 &self,
345 table: PbTable,
346 graph: StreamFragmentGraph,
347 ) -> Result<()> {
348 notice_to_user(format!("table: {table:#?}"));
350 notice_to_user(format!("graph: {graph:#?}"));
351
352 let version = self
353 .meta_client
354 .replace_job(
355 graph,
356 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
357 )
358 .await?;
359
360 self.wait_version(version).await
361 }
362
363 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
364 let version = self.meta_client.create_view(view, dependencies).await?;
365 self.wait_version(version).await
366 }
367
368 async fn create_index(
369 &self,
370 index: PbIndex,
371 table: PbTable,
372 graph: StreamFragmentGraph,
373 if_not_exists: bool,
374 ) -> Result<()> {
375 let version = self
376 .meta_client
377 .create_index(index, table, graph, if_not_exists)
378 .await?;
379 self.wait_version(version).await
380 }
381
382 async fn create_table(
383 &self,
384 source: Option<PbSource>,
385 table: PbTable,
386 graph: StreamFragmentGraph,
387 job_type: PbTableJobType,
388 if_not_exists: bool,
389 dependencies: HashSet<ObjectId>,
390 ) -> Result<()> {
391 let version = self
392 .meta_client
393 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
394 .await?;
395 self.wait_version(version).await
396 }
397
398 async fn replace_table(
399 &self,
400 source: Option<PbSource>,
401 table: PbTable,
402 graph: StreamFragmentGraph,
403 job_type: TableJobType,
404 ) -> Result<()> {
405 let version = self
406 .meta_client
407 .replace_job(
408 graph,
409 ReplaceJob::ReplaceTable(ReplaceTable {
410 source,
411 table: Some(table),
412 job_type: job_type as _,
413 }),
414 )
415 .await?;
416 self.wait_version(version).await
417 }
418
419 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
420 let version = self
421 .meta_client
422 .replace_job(
423 graph,
424 ReplaceJob::ReplaceSource(ReplaceSource {
425 source: Some(source),
426 }),
427 )
428 .await?;
429 self.wait_version(version).await
430 }
431
432 async fn create_source(
433 &self,
434 source: PbSource,
435 graph: Option<StreamFragmentGraph>,
436 if_not_exists: bool,
437 ) -> Result<()> {
438 let version = self
439 .meta_client
440 .create_source(source, graph, if_not_exists)
441 .await?;
442 self.wait_version(version).await
443 }
444
445 async fn create_sink(
446 &self,
447 sink: PbSink,
448 graph: StreamFragmentGraph,
449 dependencies: HashSet<ObjectId>,
450 if_not_exists: bool,
451 ) -> Result<()> {
452 let version = self
453 .meta_client
454 .create_sink(sink, graph, dependencies, if_not_exists)
455 .await?;
456 self.wait_version(version).await
457 }
458
459 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
460 let version = self.meta_client.create_subscription(subscription).await?;
461 self.wait_version(version).await
462 }
463
464 async fn create_function(&self, function: PbFunction) -> Result<()> {
465 let version = self.meta_client.create_function(function).await?;
466 self.wait_version(version).await
467 }
468
469 async fn create_connection(
470 &self,
471 connection_name: String,
472 database_id: DatabaseId,
473 schema_id: SchemaId,
474 owner_id: UserId,
475 connection: create_connection_request::Payload,
476 ) -> Result<()> {
477 let version = self
478 .meta_client
479 .create_connection(
480 connection_name,
481 database_id,
482 schema_id,
483 owner_id,
484 connection,
485 )
486 .await?;
487 self.wait_version(version).await
488 }
489
490 async fn create_secret(
491 &self,
492 secret_name: String,
493 database_id: DatabaseId,
494 schema_id: SchemaId,
495 owner_id: UserId,
496 payload: Vec<u8>,
497 ) -> Result<()> {
498 let version = self
499 .meta_client
500 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
501 .await?;
502 self.wait_version(version).await
503 }
504
505 async fn comment_on(&self, comment: PbComment) -> Result<()> {
506 let version = self.meta_client.comment_on(comment).await?;
507 self.wait_version(version).await
508 }
509
510 async fn drop_table(
511 &self,
512 source_id: Option<SourceId>,
513 table_id: TableId,
514 cascade: bool,
515 ) -> Result<()> {
516 let version = self
517 .meta_client
518 .drop_table(source_id, table_id, cascade)
519 .await?;
520 self.wait_version(version).await
521 }
522
523 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
524 let version = self
525 .meta_client
526 .drop_materialized_view(table_id, cascade)
527 .await?;
528 self.wait_version(version).await
529 }
530
531 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
532 let version = self.meta_client.drop_view(view_id, cascade).await?;
533 self.wait_version(version).await
534 }
535
536 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
537 let version = self.meta_client.drop_source(source_id, cascade).await?;
538 self.wait_version(version).await
539 }
540
541 async fn reset_source(&self, source_id: SourceId) -> Result<()> {
542 let version = self.meta_client.reset_source(source_id).await?;
543 self.wait_version(version).await
544 }
545
546 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
547 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
548 self.wait_version(version).await
549 }
550
551 async fn drop_subscription(
552 &self,
553 subscription_id: SubscriptionId,
554 cascade: bool,
555 ) -> Result<()> {
556 let version = self
557 .meta_client
558 .drop_subscription(subscription_id, cascade)
559 .await?;
560 self.wait_version(version).await
561 }
562
563 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
564 let version = self.meta_client.drop_index(index_id, cascade).await?;
565 self.wait_version(version).await
566 }
567
568 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
569 let version = self.meta_client.drop_function(function_id, cascade).await?;
570 self.wait_version(version).await
571 }
572
573 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
574 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
575 self.wait_version(version).await
576 }
577
578 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
579 let version = self.meta_client.drop_database(database_id).await?;
580 self.wait_version(version).await
581 }
582
583 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
584 let version = self
585 .meta_client
586 .drop_connection(connection_id, cascade)
587 .await?;
588 self.wait_version(version).await
589 }
590
591 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
592 let version = self.meta_client.drop_secret(secret_id, cascade).await?;
593 self.wait_version(version).await
594 }
595
596 async fn alter_name(
597 &self,
598 object_id: alter_name_request::Object,
599 object_name: &str,
600 ) -> Result<()> {
601 let version = self.meta_client.alter_name(object_id, object_name).await?;
602 self.wait_version(version).await
603 }
604
605 async fn alter_owner(
606 &self,
607 object: alter_owner_request::Object,
608 owner_id: UserId,
609 ) -> Result<()> {
610 let version = self.meta_client.alter_owner(object, owner_id).await?;
611 self.wait_version(version).await
612 }
613
614 async fn alter_set_schema(
615 &self,
616 object: alter_set_schema_request::Object,
617 new_schema_id: SchemaId,
618 ) -> Result<()> {
619 let version = self
620 .meta_client
621 .alter_set_schema(object, new_schema_id)
622 .await?;
623 self.wait_version(version).await
624 }
625
626 async fn alter_source(&self, source: PbSource) -> Result<()> {
627 let version = self.meta_client.alter_source(source).await?;
628 self.wait_version(version).await
629 }
630
631 async fn alter_parallelism(
632 &self,
633 job_id: JobId,
634 parallelism: PbTableParallelism,
635 deferred: bool,
636 ) -> Result<()> {
637 self.meta_client
638 .alter_parallelism(job_id, parallelism, deferred)
639 .await?;
640 Ok(())
641 }
642
643 async fn alter_config(
644 &self,
645 job_id: JobId,
646 entries_to_add: HashMap<String, String>,
647 keys_to_remove: Vec<String>,
648 ) -> Result<()> {
649 self.meta_client
650 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
651 .await?;
652 Ok(())
653 }
654
655 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
656 let version = self.meta_client.alter_swap_rename(object).await?;
657 self.wait_version(version).await
658 }
659
660 async fn alter_secret(
661 &self,
662 secret_id: SecretId,
663 secret_name: String,
664 database_id: DatabaseId,
665 schema_id: SchemaId,
666 owner_id: UserId,
667 payload: Vec<u8>,
668 ) -> Result<()> {
669 let version = self
670 .meta_client
671 .alter_secret(
672 secret_id,
673 secret_name,
674 database_id,
675 schema_id,
676 owner_id,
677 payload,
678 )
679 .await?;
680 self.wait_version(version).await
681 }
682
683 async fn alter_resource_group(
684 &self,
685 table_id: TableId,
686 resource_group: Option<String>,
687 deferred: bool,
688 ) -> Result<()> {
689 self.meta_client
690 .alter_resource_group(table_id, resource_group, deferred)
691 .await
692 .map_err(|e| anyhow!(e))?;
693
694 Ok(())
695 }
696
697 async fn alter_database_param(
698 &self,
699 database_id: DatabaseId,
700 param: AlterDatabaseParam,
701 ) -> Result<()> {
702 let version = self
703 .meta_client
704 .alter_database_param(database_id, param)
705 .await
706 .map_err(|e| anyhow!(e))?;
707 self.wait_version(version).await
708 }
709
710 async fn create_iceberg_table(
711 &self,
712 table_job_info: PbTableJobInfo,
713 sink_job_info: PbSinkJobInfo,
714 iceberg_source: PbSource,
715 if_not_exists: bool,
716 ) -> Result<()> {
717 let version = self
718 .meta_client
719 .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
720 .await?;
721 self.wait_version(version).await
722 }
723}
724
725impl CatalogWriterImpl {
726 pub fn new(
727 meta_client: MetaClient,
728 catalog_updated_rx: Receiver<CatalogVersion>,
729 hummock_snapshot_manager: HummockSnapshotManagerRef,
730 ) -> Self {
731 Self {
732 meta_client,
733 catalog_updated_rx,
734 hummock_snapshot_manager,
735 }
736 }
737
738 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
739 let mut rx = self.catalog_updated_rx.clone();
740 while *rx.borrow_and_update() < version.catalog_version {
741 rx.changed().await.map_err(|e| anyhow!(e))?;
742 }
743 self.hummock_snapshot_manager
744 .wait(version.hummock_version_id)
745 .await;
746 Ok(())
747 }
748}