1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, ObjectId};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_hummock_sdk::HummockVersionId;
24use risingwave_pb::catalog::{
25 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
26 PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable};
29use risingwave_pb::ddl_service::{
30 PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
31 alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
32 create_connection_request,
33};
34use risingwave_pb::meta::PbTableParallelism;
35use risingwave_pb::stream_plan::StreamFragmentGraph;
36use risingwave_rpc_client::MetaClient;
37use tokio::sync::watch::Receiver;
38
39use super::root_catalog::Catalog;
40use super::{DatabaseId, SecretId, TableId};
41use crate::error::Result;
42use crate::scheduler::HummockSnapshotManagerRef;
43use crate::user::UserId;
44
45pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
46
47#[derive(Clone)]
49pub struct CatalogReader(Arc<RwLock<Catalog>>);
50
51impl CatalogReader {
52 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
53 CatalogReader(inner)
54 }
55
56 pub fn read_guard(&self) -> CatalogReadGuard {
57 self.0.read_arc_recursive()
59 }
60}
61
62#[async_trait::async_trait]
67pub trait CatalogWriter: Send + Sync {
68 async fn create_database(
69 &self,
70 db_name: &str,
71 owner: UserId,
72 resource_group: &str,
73 ) -> Result<()>;
74
75 async fn create_schema(
76 &self,
77 db_id: DatabaseId,
78 schema_name: &str,
79 owner: UserId,
80 ) -> Result<()>;
81
82 async fn create_view(&self, view: PbView) -> Result<()>;
83
84 async fn create_materialized_view(
85 &self,
86 table: PbTable,
87 graph: StreamFragmentGraph,
88 dependencies: HashSet<ObjectId>,
89 specific_resource_group: Option<String>,
90 ) -> Result<()>;
91
92 async fn create_table(
93 &self,
94 source: Option<PbSource>,
95 table: PbTable,
96 graph: StreamFragmentGraph,
97 job_type: PbTableJobType,
98 ) -> Result<()>;
99
100 async fn replace_table(
101 &self,
102 source: Option<PbSource>,
103 table: PbTable,
104 graph: StreamFragmentGraph,
105 mapping: ColIndexMapping,
106 job_type: TableJobType,
107 ) -> Result<()>;
108
109 async fn replace_source(
110 &self,
111 source: PbSource,
112 graph: StreamFragmentGraph,
113 mapping: ColIndexMapping,
114 ) -> Result<()>;
115
116 async fn create_index(
117 &self,
118 index: PbIndex,
119 table: PbTable,
120 graph: StreamFragmentGraph,
121 ) -> Result<()>;
122
123 async fn create_source(
124 &self,
125 source: PbSource,
126 graph: Option<StreamFragmentGraph>,
127 ) -> Result<()>;
128
129 async fn create_sink(
130 &self,
131 sink: PbSink,
132 graph: StreamFragmentGraph,
133 affected_table_change: Option<PbReplaceJobPlan>,
134 dependencies: HashSet<ObjectId>,
135 ) -> Result<()>;
136
137 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
138
139 async fn create_function(&self, function: PbFunction) -> Result<()>;
140
141 async fn create_connection(
142 &self,
143 connection_name: String,
144 database_id: u32,
145 schema_id: u32,
146 owner_id: u32,
147 connection: create_connection_request::Payload,
148 ) -> Result<()>;
149
150 async fn create_secret(
151 &self,
152 secret_name: String,
153 database_id: u32,
154 schema_id: u32,
155 owner_id: u32,
156 payload: Vec<u8>,
157 ) -> Result<()>;
158
159 async fn comment_on(&self, comment: PbComment) -> Result<()>;
160
161 async fn drop_table(
162 &self,
163 source_id: Option<u32>,
164 table_id: TableId,
165 cascade: bool,
166 ) -> Result<()>;
167
168 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
169
170 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
171
172 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
173
174 async fn drop_sink(
175 &self,
176 sink_id: u32,
177 cascade: bool,
178 affected_table_change: Option<PbReplaceJobPlan>,
179 ) -> Result<()>;
180
181 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
182
183 async fn drop_database(&self, database_id: u32) -> Result<()>;
184
185 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
186
187 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
188
189 async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
190
191 async fn drop_connection(&self, connection_id: u32) -> Result<()>;
192
193 async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
194
195 async fn alter_secret(
196 &self,
197 secret_id: u32,
198 secret_name: String,
199 database_id: u32,
200 schema_id: u32,
201 owner_id: u32,
202 payload: Vec<u8>,
203 ) -> Result<()>;
204
205 async fn alter_name(
206 &self,
207 object_id: alter_name_request::Object,
208 object_name: &str,
209 ) -> Result<()>;
210
211 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
212
213 async fn alter_source(&self, source: PbSource) -> Result<()>;
215
216 async fn alter_parallelism(
217 &self,
218 job_id: u32,
219 parallelism: PbTableParallelism,
220 deferred: bool,
221 ) -> Result<()>;
222
223 async fn alter_resource_group(
224 &self,
225 table_id: u32,
226 resource_group: Option<String>,
227 deferred: bool,
228 ) -> Result<()>;
229
230 async fn alter_set_schema(
231 &self,
232 object: alter_set_schema_request::Object,
233 new_schema_id: u32,
234 ) -> Result<()>;
235
236 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
237}
238
239#[derive(Clone)]
240pub struct CatalogWriterImpl {
241 meta_client: MetaClient,
242 catalog_updated_rx: Receiver<CatalogVersion>,
243 hummock_snapshot_manager: HummockSnapshotManagerRef,
244}
245
246#[async_trait::async_trait]
247impl CatalogWriter for CatalogWriterImpl {
248 async fn create_database(
249 &self,
250 db_name: &str,
251 owner: UserId,
252 resource_group: &str,
253 ) -> Result<()> {
254 let version = self
255 .meta_client
256 .create_database(PbDatabase {
257 name: db_name.to_owned(),
258 id: 0,
259 owner,
260 resource_group: resource_group.to_owned(),
261 })
262 .await?;
263 self.wait_version(version).await
264 }
265
266 async fn create_schema(
267 &self,
268 db_id: DatabaseId,
269 schema_name: &str,
270 owner: UserId,
271 ) -> Result<()> {
272 let version = self
273 .meta_client
274 .create_schema(PbSchema {
275 id: 0,
276 name: schema_name.to_owned(),
277 database_id: db_id,
278 owner,
279 })
280 .await?;
281 self.wait_version(version).await
282 }
283
284 async fn create_materialized_view(
286 &self,
287 table: PbTable,
288 graph: StreamFragmentGraph,
289 dependencies: HashSet<ObjectId>,
290 specific_resource_group: Option<String>,
291 ) -> Result<()> {
292 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
293 let version = self
294 .meta_client
295 .create_materialized_view(table, graph, dependencies, specific_resource_group)
296 .await?;
297 if matches!(create_type, PbCreateType::Foreground) {
298 self.wait_version(version).await?
299 }
300 Ok(())
301 }
302
303 async fn create_view(&self, view: PbView) -> Result<()> {
304 let version = self.meta_client.create_view(view).await?;
305 self.wait_version(version).await
306 }
307
308 async fn create_index(
309 &self,
310 index: PbIndex,
311 table: PbTable,
312 graph: StreamFragmentGraph,
313 ) -> Result<()> {
314 let version = self.meta_client.create_index(index, table, graph).await?;
315 self.wait_version(version).await
316 }
317
318 async fn create_table(
319 &self,
320 source: Option<PbSource>,
321 table: PbTable,
322 graph: StreamFragmentGraph,
323 job_type: PbTableJobType,
324 ) -> Result<()> {
325 let version = self
326 .meta_client
327 .create_table(source, table, graph, job_type)
328 .await?;
329 self.wait_version(version).await
330 }
331
332 async fn replace_table(
333 &self,
334 source: Option<PbSource>,
335 table: PbTable,
336 graph: StreamFragmentGraph,
337 mapping: ColIndexMapping,
338 job_type: TableJobType,
339 ) -> Result<()> {
340 let version = self
341 .meta_client
342 .replace_job(
343 graph,
344 mapping,
345 ReplaceJob::ReplaceTable(ReplaceTable {
346 source,
347 table: Some(table),
348 job_type: job_type as _,
349 }),
350 )
351 .await?;
352 self.wait_version(version).await
353 }
354
355 async fn replace_source(
356 &self,
357 source: PbSource,
358 graph: StreamFragmentGraph,
359 mapping: ColIndexMapping,
360 ) -> Result<()> {
361 let version = self
362 .meta_client
363 .replace_job(
364 graph,
365 mapping,
366 ReplaceJob::ReplaceSource(ReplaceSource {
367 source: Some(source),
368 }),
369 )
370 .await?;
371 self.wait_version(version).await
372 }
373
374 async fn create_source(
375 &self,
376 source: PbSource,
377 graph: Option<StreamFragmentGraph>,
378 ) -> Result<()> {
379 let version = self.meta_client.create_source(source, graph).await?;
380 self.wait_version(version).await
381 }
382
383 async fn create_sink(
384 &self,
385 sink: PbSink,
386 graph: StreamFragmentGraph,
387 affected_table_change: Option<ReplaceJobPlan>,
388 dependencies: HashSet<ObjectId>,
389 ) -> Result<()> {
390 let version = self
391 .meta_client
392 .create_sink(sink, graph, affected_table_change, dependencies)
393 .await?;
394 self.wait_version(version).await
395 }
396
397 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
398 let version = self.meta_client.create_subscription(subscription).await?;
399 self.wait_version(version).await
400 }
401
402 async fn create_function(&self, function: PbFunction) -> Result<()> {
403 let version = self.meta_client.create_function(function).await?;
404 self.wait_version(version).await
405 }
406
407 async fn create_connection(
408 &self,
409 connection_name: String,
410 database_id: u32,
411 schema_id: u32,
412 owner_id: u32,
413 connection: create_connection_request::Payload,
414 ) -> Result<()> {
415 let version = self
416 .meta_client
417 .create_connection(
418 connection_name,
419 database_id,
420 schema_id,
421 owner_id,
422 connection,
423 )
424 .await?;
425 self.wait_version(version).await
426 }
427
428 async fn create_secret(
429 &self,
430 secret_name: String,
431 database_id: u32,
432 schema_id: u32,
433 owner_id: u32,
434 payload: Vec<u8>,
435 ) -> Result<()> {
436 let version = self
437 .meta_client
438 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
439 .await?;
440 self.wait_version(version).await
441 }
442
443 async fn comment_on(&self, comment: PbComment) -> Result<()> {
444 let version = self.meta_client.comment_on(comment).await?;
445 self.wait_version(version).await
446 }
447
448 async fn drop_table(
449 &self,
450 source_id: Option<u32>,
451 table_id: TableId,
452 cascade: bool,
453 ) -> Result<()> {
454 let version = self
455 .meta_client
456 .drop_table(source_id, table_id, cascade)
457 .await?;
458 self.wait_version(version).await
459 }
460
461 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
462 let version = self
463 .meta_client
464 .drop_materialized_view(table_id, cascade)
465 .await?;
466 self.wait_version(version).await
467 }
468
469 async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
470 let version = self.meta_client.drop_view(view_id, cascade).await?;
471 self.wait_version(version).await
472 }
473
474 async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
475 let version = self.meta_client.drop_source(source_id, cascade).await?;
476 self.wait_version(version).await
477 }
478
479 async fn drop_sink(
480 &self,
481 sink_id: u32,
482 cascade: bool,
483 affected_table_change: Option<ReplaceJobPlan>,
484 ) -> Result<()> {
485 let version = self
486 .meta_client
487 .drop_sink(sink_id, cascade, affected_table_change)
488 .await?;
489 self.wait_version(version).await
490 }
491
492 async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
493 let version = self
494 .meta_client
495 .drop_subscription(subscription_id, cascade)
496 .await?;
497 self.wait_version(version).await
498 }
499
500 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
501 let version = self.meta_client.drop_index(index_id, cascade).await?;
502 self.wait_version(version).await
503 }
504
505 async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
506 let version = self.meta_client.drop_function(function_id).await?;
507 self.wait_version(version).await
508 }
509
510 async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
511 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
512 self.wait_version(version).await
513 }
514
515 async fn drop_database(&self, database_id: u32) -> Result<()> {
516 let version = self.meta_client.drop_database(database_id).await?;
517 self.wait_version(version).await
518 }
519
520 async fn drop_connection(&self, connection_id: u32) -> Result<()> {
521 let version = self.meta_client.drop_connection(connection_id).await?;
522 self.wait_version(version).await
523 }
524
525 async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
526 let version = self.meta_client.drop_secret(secret_id).await?;
527 self.wait_version(version).await
528 }
529
530 async fn alter_name(
531 &self,
532 object_id: alter_name_request::Object,
533 object_name: &str,
534 ) -> Result<()> {
535 let version = self.meta_client.alter_name(object_id, object_name).await?;
536 self.wait_version(version).await
537 }
538
539 async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
540 let version = self.meta_client.alter_owner(object, owner_id).await?;
541 self.wait_version(version).await
542 }
543
544 async fn alter_set_schema(
545 &self,
546 object: alter_set_schema_request::Object,
547 new_schema_id: u32,
548 ) -> Result<()> {
549 let version = self
550 .meta_client
551 .alter_set_schema(object, new_schema_id)
552 .await?;
553 self.wait_version(version).await
554 }
555
556 async fn alter_source(&self, source: PbSource) -> Result<()> {
557 let version = self.meta_client.alter_source(source).await?;
558 self.wait_version(version).await
559 }
560
561 async fn alter_parallelism(
562 &self,
563 job_id: u32,
564 parallelism: PbTableParallelism,
565 deferred: bool,
566 ) -> Result<()> {
567 self.meta_client
568 .alter_parallelism(job_id, parallelism, deferred)
569 .await
570 .map_err(|e| anyhow!(e))?;
571
572 Ok(())
573 }
574
575 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
576 let version = self.meta_client.alter_swap_rename(object).await?;
577 self.wait_version(version).await
578 }
579
580 async fn alter_secret(
581 &self,
582 secret_id: u32,
583 secret_name: String,
584 database_id: u32,
585 schema_id: u32,
586 owner_id: u32,
587 payload: Vec<u8>,
588 ) -> Result<()> {
589 let version = self
590 .meta_client
591 .alter_secret(
592 secret_id,
593 secret_name,
594 database_id,
595 schema_id,
596 owner_id,
597 payload,
598 )
599 .await?;
600 self.wait_version(version).await
601 }
602
603 async fn alter_resource_group(
604 &self,
605 table_id: u32,
606 resource_group: Option<String>,
607 deferred: bool,
608 ) -> Result<()> {
609 self.meta_client
610 .alter_resource_group(table_id, resource_group, deferred)
611 .await
612 .map_err(|e| anyhow!(e))?;
613
614 Ok(())
615 }
616}
617
618impl CatalogWriterImpl {
619 pub fn new(
620 meta_client: MetaClient,
621 catalog_updated_rx: Receiver<CatalogVersion>,
622 hummock_snapshot_manager: HummockSnapshotManagerRef,
623 ) -> Self {
624 Self {
625 meta_client,
626 catalog_updated_rx,
627 hummock_snapshot_manager,
628 }
629 }
630
631 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
632 let mut rx = self.catalog_updated_rx.clone();
633 while *rx.borrow_and_update() < version.catalog_version {
634 rx.changed().await.map_err(|e| anyhow!(e))?;
635 }
636 self.hummock_snapshot_manager
637 .wait(HummockVersionId::new(version.hummock_version_id))
638 .await;
639 Ok(())
640 }
641}