1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_pb::catalog::{
26 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27 PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36 streaming_job_resource_type,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58 CatalogReader(inner)
59 }
60
61 pub fn read_guard(&self) -> CatalogReadGuard {
62 self.0.read_arc_recursive()
64 }
65}
66
67#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73 async fn create_database(
74 &self,
75 db_name: &str,
76 owner: UserId,
77 resource_group: &str,
78 barrier_interval_ms: Option<u32>,
79 checkpoint_frequency: Option<u64>,
80 ) -> Result<()>;
81
82 async fn create_schema(
83 &self,
84 db_id: DatabaseId,
85 schema_name: &str,
86 owner: UserId,
87 ) -> Result<()>;
88
89 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91 async fn create_materialized_view(
92 &self,
93 table: PbTable,
94 graph: StreamFragmentGraph,
95 dependencies: HashSet<ObjectId>,
96 resource_type: streaming_job_resource_type::ResourceType,
97 if_not_exists: bool,
98 ) -> Result<()>;
99
100 async fn replace_materialized_view(
101 &self,
102 table: PbTable,
103 graph: StreamFragmentGraph,
104 ) -> Result<()>;
105
106 async fn create_table(
107 &self,
108 source: Option<PbSource>,
109 table: PbTable,
110 graph: StreamFragmentGraph,
111 job_type: PbTableJobType,
112 if_not_exists: bool,
113 dependencies: HashSet<ObjectId>,
114 ) -> Result<()>;
115
116 async fn replace_table(
117 &self,
118 source: Option<PbSource>,
119 table: PbTable,
120 graph: StreamFragmentGraph,
121 job_type: TableJobType,
122 ) -> Result<()>;
123
124 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126 async fn create_index(
127 &self,
128 index: PbIndex,
129 table: PbTable,
130 graph: StreamFragmentGraph,
131 if_not_exists: bool,
132 ) -> Result<()>;
133
134 async fn create_source(
135 &self,
136 source: PbSource,
137 graph: Option<StreamFragmentGraph>,
138 if_not_exists: bool,
139 ) -> Result<()>;
140
141 async fn create_sink(
142 &self,
143 sink: PbSink,
144 graph: StreamFragmentGraph,
145 dependencies: HashSet<ObjectId>,
146 if_not_exists: bool,
147 ) -> Result<()>;
148
149 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151 async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153 async fn create_connection(
154 &self,
155 connection_name: String,
156 database_id: DatabaseId,
157 schema_id: SchemaId,
158 owner_id: UserId,
159 connection: create_connection_request::Payload,
160 ) -> Result<()>;
161
162 async fn create_secret(
163 &self,
164 secret_name: String,
165 database_id: DatabaseId,
166 schema_id: SchemaId,
167 owner_id: UserId,
168 payload: Vec<u8>,
169 ) -> Result<()>;
170
171 async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173 async fn drop_table(
174 &self,
175 source_id: Option<SourceId>,
176 table_id: TableId,
177 cascade: bool,
178 ) -> Result<()>;
179
180 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186 async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190 async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191 -> Result<()>;
192
193 async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
204
205 async fn alter_secret(
206 &self,
207 secret_id: SecretId,
208 secret_name: String,
209 database_id: DatabaseId,
210 schema_id: SchemaId,
211 owner_id: UserId,
212 payload: Vec<u8>,
213 ) -> Result<()>;
214
215 async fn alter_name(
216 &self,
217 object_id: alter_name_request::Object,
218 object_name: &str,
219 ) -> Result<()>;
220
221 async fn alter_owner(
222 &self,
223 object: alter_owner_request::Object,
224 owner_id: UserId,
225 ) -> Result<()>;
226
227 async fn alter_source(&self, source: PbSource) -> Result<()>;
229
230 async fn alter_parallelism(
231 &self,
232 job_id: JobId,
233 parallelism: PbTableParallelism,
234 deferred: bool,
235 ) -> Result<()>;
236
237 async fn alter_config(
238 &self,
239 job_id: JobId,
240 entries_to_add: HashMap<String, String>,
241 keys_to_remove: Vec<String>,
242 ) -> Result<()>;
243
244 async fn alter_resource_group(
245 &self,
246 table_id: TableId,
247 resource_group: Option<String>,
248 deferred: bool,
249 ) -> Result<()>;
250
251 async fn alter_set_schema(
252 &self,
253 object: alter_set_schema_request::Object,
254 new_schema_id: SchemaId,
255 ) -> Result<()>;
256
257 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
258
259 async fn alter_database_param(
260 &self,
261 database_id: DatabaseId,
262 param: AlterDatabaseParam,
263 ) -> Result<()>;
264
265 async fn create_iceberg_table(
266 &self,
267 table_job_info: PbTableJobInfo,
268 sink_job_info: PbSinkJobInfo,
269 iceberg_source: PbSource,
270 if_not_exists: bool,
271 ) -> Result<()>;
272
273 async fn wait(&self) -> Result<()>;
274}
275
276#[derive(Clone)]
277pub struct CatalogWriterImpl {
278 meta_client: MetaClient,
279 catalog_updated_rx: Receiver<CatalogVersion>,
280 hummock_snapshot_manager: HummockSnapshotManagerRef,
281}
282
283#[async_trait::async_trait]
284impl CatalogWriter for CatalogWriterImpl {
285 async fn create_database(
286 &self,
287 db_name: &str,
288 owner: UserId,
289 resource_group: &str,
290 barrier_interval_ms: Option<u32>,
291 checkpoint_frequency: Option<u64>,
292 ) -> Result<()> {
293 let version = self
294 .meta_client
295 .create_database(PbDatabase {
296 name: db_name.to_owned(),
297 id: 0.into(),
298 owner,
299 resource_group: resource_group.to_owned(),
300 barrier_interval_ms,
301 checkpoint_frequency,
302 })
303 .await?;
304 self.wait_version(version).await
305 }
306
307 async fn create_schema(
308 &self,
309 db_id: DatabaseId,
310 schema_name: &str,
311 owner: UserId,
312 ) -> Result<()> {
313 let version = self
314 .meta_client
315 .create_schema(PbSchema {
316 id: 0.into(),
317 name: schema_name.to_owned(),
318 database_id: db_id,
319 owner,
320 })
321 .await?;
322 self.wait_version(version).await
323 }
324
325 async fn create_materialized_view(
327 &self,
328 table: PbTable,
329 graph: StreamFragmentGraph,
330 dependencies: HashSet<ObjectId>,
331 resource_type: streaming_job_resource_type::ResourceType,
332 if_not_exists: bool,
333 ) -> Result<()> {
334 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
335 let version = self
336 .meta_client
337 .create_materialized_view(table, graph, dependencies, resource_type, if_not_exists)
338 .await?;
339 if matches!(create_type, PbCreateType::Foreground) {
340 self.wait_version(version).await?
341 }
342 Ok(())
343 }
344
345 async fn replace_materialized_view(
346 &self,
347 table: PbTable,
348 graph: StreamFragmentGraph,
349 ) -> Result<()> {
350 notice_to_user(format!("table: {table:#?}"));
352 notice_to_user(format!("graph: {graph:#?}"));
353
354 let version = self
355 .meta_client
356 .replace_job(
357 graph,
358 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
359 )
360 .await?;
361
362 self.wait_version(version).await
363 }
364
365 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
366 let version = self.meta_client.create_view(view, dependencies).await?;
367 self.wait_version(version).await
368 }
369
370 async fn create_index(
371 &self,
372 index: PbIndex,
373 table: PbTable,
374 graph: StreamFragmentGraph,
375 if_not_exists: bool,
376 ) -> Result<()> {
377 let version = self
378 .meta_client
379 .create_index(index, table, graph, if_not_exists)
380 .await?;
381 self.wait_version(version).await
382 }
383
384 async fn create_table(
385 &self,
386 source: Option<PbSource>,
387 table: PbTable,
388 graph: StreamFragmentGraph,
389 job_type: PbTableJobType,
390 if_not_exists: bool,
391 dependencies: HashSet<ObjectId>,
392 ) -> Result<()> {
393 let version = self
394 .meta_client
395 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
396 .await?;
397 self.wait_version(version).await
398 }
399
400 async fn replace_table(
401 &self,
402 source: Option<PbSource>,
403 table: PbTable,
404 graph: StreamFragmentGraph,
405 job_type: TableJobType,
406 ) -> Result<()> {
407 let version = self
408 .meta_client
409 .replace_job(
410 graph,
411 ReplaceJob::ReplaceTable(ReplaceTable {
412 source,
413 table: Some(table),
414 job_type: job_type as _,
415 }),
416 )
417 .await?;
418 self.wait_version(version).await
419 }
420
421 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
422 let version = self
423 .meta_client
424 .replace_job(
425 graph,
426 ReplaceJob::ReplaceSource(ReplaceSource {
427 source: Some(source),
428 }),
429 )
430 .await?;
431 self.wait_version(version).await
432 }
433
434 async fn create_source(
435 &self,
436 source: PbSource,
437 graph: Option<StreamFragmentGraph>,
438 if_not_exists: bool,
439 ) -> Result<()> {
440 let version = self
441 .meta_client
442 .create_source(source, graph, if_not_exists)
443 .await?;
444 self.wait_version(version).await
445 }
446
447 async fn create_sink(
448 &self,
449 sink: PbSink,
450 graph: StreamFragmentGraph,
451 dependencies: HashSet<ObjectId>,
452 if_not_exists: bool,
453 ) -> Result<()> {
454 let version = self
455 .meta_client
456 .create_sink(sink, graph, dependencies, if_not_exists)
457 .await?;
458 self.wait_version(version).await
459 }
460
461 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
462 let version = self.meta_client.create_subscription(subscription).await?;
463 self.wait_version(version).await
464 }
465
466 async fn create_function(&self, function: PbFunction) -> Result<()> {
467 let version = self.meta_client.create_function(function).await?;
468 self.wait_version(version).await
469 }
470
471 async fn create_connection(
472 &self,
473 connection_name: String,
474 database_id: DatabaseId,
475 schema_id: SchemaId,
476 owner_id: UserId,
477 connection: create_connection_request::Payload,
478 ) -> Result<()> {
479 let version = self
480 .meta_client
481 .create_connection(
482 connection_name,
483 database_id,
484 schema_id,
485 owner_id,
486 connection,
487 )
488 .await?;
489 self.wait_version(version).await
490 }
491
492 async fn create_secret(
493 &self,
494 secret_name: String,
495 database_id: DatabaseId,
496 schema_id: SchemaId,
497 owner_id: UserId,
498 payload: Vec<u8>,
499 ) -> Result<()> {
500 let version = self
501 .meta_client
502 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
503 .await?;
504 self.wait_version(version).await
505 }
506
507 async fn comment_on(&self, comment: PbComment) -> Result<()> {
508 let version = self.meta_client.comment_on(comment).await?;
509 self.wait_version(version).await
510 }
511
512 async fn drop_table(
513 &self,
514 source_id: Option<SourceId>,
515 table_id: TableId,
516 cascade: bool,
517 ) -> Result<()> {
518 let version = self
519 .meta_client
520 .drop_table(source_id, table_id, cascade)
521 .await?;
522 self.wait_version(version).await
523 }
524
525 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
526 let version = self
527 .meta_client
528 .drop_materialized_view(table_id, cascade)
529 .await?;
530 self.wait_version(version).await
531 }
532
533 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
534 let version = self.meta_client.drop_view(view_id, cascade).await?;
535 self.wait_version(version).await
536 }
537
538 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
539 let version = self.meta_client.drop_source(source_id, cascade).await?;
540 self.wait_version(version).await
541 }
542
543 async fn reset_source(&self, source_id: SourceId) -> Result<()> {
544 let version = self.meta_client.reset_source(source_id).await?;
545 self.wait_version(version).await
546 }
547
548 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
549 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
550 self.wait_version(version).await
551 }
552
553 async fn drop_subscription(
554 &self,
555 subscription_id: SubscriptionId,
556 cascade: bool,
557 ) -> Result<()> {
558 let version = self
559 .meta_client
560 .drop_subscription(subscription_id, cascade)
561 .await?;
562 self.wait_version(version).await
563 }
564
565 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
566 let version = self.meta_client.drop_index(index_id, cascade).await?;
567 self.wait_version(version).await
568 }
569
570 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
571 let version = self.meta_client.drop_function(function_id, cascade).await?;
572 self.wait_version(version).await
573 }
574
575 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
576 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
577 self.wait_version(version).await
578 }
579
580 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
581 let version = self.meta_client.drop_database(database_id).await?;
582 self.wait_version(version).await
583 }
584
585 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
586 let version = self
587 .meta_client
588 .drop_connection(connection_id, cascade)
589 .await?;
590 self.wait_version(version).await
591 }
592
593 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
594 let version = self.meta_client.drop_secret(secret_id, cascade).await?;
595 self.wait_version(version).await
596 }
597
598 async fn alter_name(
599 &self,
600 object_id: alter_name_request::Object,
601 object_name: &str,
602 ) -> Result<()> {
603 let version = self.meta_client.alter_name(object_id, object_name).await?;
604 self.wait_version(version).await
605 }
606
607 async fn alter_owner(
608 &self,
609 object: alter_owner_request::Object,
610 owner_id: UserId,
611 ) -> Result<()> {
612 let version = self.meta_client.alter_owner(object, owner_id).await?;
613 self.wait_version(version).await
614 }
615
616 async fn alter_set_schema(
617 &self,
618 object: alter_set_schema_request::Object,
619 new_schema_id: SchemaId,
620 ) -> Result<()> {
621 let version = self
622 .meta_client
623 .alter_set_schema(object, new_schema_id)
624 .await?;
625 self.wait_version(version).await
626 }
627
628 async fn alter_source(&self, source: PbSource) -> Result<()> {
629 let version = self.meta_client.alter_source(source).await?;
630 self.wait_version(version).await
631 }
632
633 async fn alter_parallelism(
634 &self,
635 job_id: JobId,
636 parallelism: PbTableParallelism,
637 deferred: bool,
638 ) -> Result<()> {
639 self.meta_client
640 .alter_parallelism(job_id, parallelism, deferred)
641 .await?;
642 Ok(())
643 }
644
645 async fn alter_config(
646 &self,
647 job_id: JobId,
648 entries_to_add: HashMap<String, String>,
649 keys_to_remove: Vec<String>,
650 ) -> Result<()> {
651 self.meta_client
652 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
653 .await?;
654 Ok(())
655 }
656
657 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
658 let version = self.meta_client.alter_swap_rename(object).await?;
659 self.wait_version(version).await
660 }
661
662 async fn alter_secret(
663 &self,
664 secret_id: SecretId,
665 secret_name: String,
666 database_id: DatabaseId,
667 schema_id: SchemaId,
668 owner_id: UserId,
669 payload: Vec<u8>,
670 ) -> Result<()> {
671 let version = self
672 .meta_client
673 .alter_secret(
674 secret_id,
675 secret_name,
676 database_id,
677 schema_id,
678 owner_id,
679 payload,
680 )
681 .await?;
682 self.wait_version(version).await
683 }
684
685 async fn alter_resource_group(
686 &self,
687 table_id: TableId,
688 resource_group: Option<String>,
689 deferred: bool,
690 ) -> Result<()> {
691 self.meta_client
692 .alter_resource_group(table_id, resource_group, deferred)
693 .await
694 .map_err(|e| anyhow!(e))?;
695
696 Ok(())
697 }
698
699 async fn alter_database_param(
700 &self,
701 database_id: DatabaseId,
702 param: AlterDatabaseParam,
703 ) -> Result<()> {
704 let version = self
705 .meta_client
706 .alter_database_param(database_id, param)
707 .await
708 .map_err(|e| anyhow!(e))?;
709 self.wait_version(version).await
710 }
711
712 async fn create_iceberg_table(
713 &self,
714 table_job_info: PbTableJobInfo,
715 sink_job_info: PbSinkJobInfo,
716 iceberg_source: PbSource,
717 if_not_exists: bool,
718 ) -> Result<()> {
719 let version = self
720 .meta_client
721 .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
722 .await?;
723 self.wait_version(version).await
724 }
725
726 async fn wait(&self) -> Result<()> {
727 let version = self.meta_client.wait().await.map_err(|e| anyhow!(e))?;
728 self.wait_version(version).await
729 }
730}
731
732impl CatalogWriterImpl {
733 pub fn new(
734 meta_client: MetaClient,
735 catalog_updated_rx: Receiver<CatalogVersion>,
736 hummock_snapshot_manager: HummockSnapshotManagerRef,
737 ) -> Self {
738 Self {
739 meta_client,
740 catalog_updated_rx,
741 hummock_snapshot_manager,
742 }
743 }
744
745 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
746 let mut rx = self.catalog_updated_rx.clone();
747 while *rx.borrow_and_update() < version.catalog_version {
748 rx.changed().await.map_err(|e| anyhow!(e))?;
749 }
750 self.hummock_snapshot_manager
751 .wait(version.hummock_version_id)
752 .await;
753 Ok(())
754 }
755}