risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_hummock_sdk::HummockVersionId;
26use risingwave_pb::catalog::{
27    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28    PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
53#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58        CatalogReader(inner)
59    }
60
61    pub fn read_guard(&self) -> CatalogReadGuard {
62        // Make this recursive so that one can get this guard in the same thread without fear.
63        self.0.read_arc_recursive()
64    }
65}
66
67/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
68/// It will only send rpc to meta and get the catalog version as response.
69/// Then it will wait for the local catalog to be synced to the version, which is performed by
70/// [observer](`crate::observer::FrontendObserverNode`).
71#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73    async fn create_database(
74        &self,
75        db_name: &str,
76        owner: UserId,
77        resource_group: &str,
78        barrier_interval_ms: Option<u32>,
79        checkpoint_frequency: Option<u64>,
80    ) -> Result<()>;
81
82    async fn create_schema(
83        &self,
84        db_id: DatabaseId,
85        schema_name: &str,
86        owner: UserId,
87    ) -> Result<()>;
88
89    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91    async fn create_materialized_view(
92        &self,
93        table: PbTable,
94        graph: StreamFragmentGraph,
95        dependencies: HashSet<ObjectId>,
96        specific_resource_group: Option<String>,
97        if_not_exists: bool,
98    ) -> Result<()>;
99
100    async fn replace_materialized_view(
101        &self,
102        table: PbTable,
103        graph: StreamFragmentGraph,
104    ) -> Result<()>;
105
106    async fn create_table(
107        &self,
108        source: Option<PbSource>,
109        table: PbTable,
110        graph: StreamFragmentGraph,
111        job_type: PbTableJobType,
112        if_not_exists: bool,
113        dependencies: HashSet<ObjectId>,
114    ) -> Result<()>;
115
116    async fn replace_table(
117        &self,
118        source: Option<PbSource>,
119        table: PbTable,
120        graph: StreamFragmentGraph,
121        job_type: TableJobType,
122    ) -> Result<()>;
123
124    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126    async fn create_index(
127        &self,
128        index: PbIndex,
129        table: PbTable,
130        graph: StreamFragmentGraph,
131        if_not_exists: bool,
132    ) -> Result<()>;
133
134    async fn create_source(
135        &self,
136        source: PbSource,
137        graph: Option<StreamFragmentGraph>,
138        if_not_exists: bool,
139    ) -> Result<()>;
140
141    async fn create_sink(
142        &self,
143        sink: PbSink,
144        graph: StreamFragmentGraph,
145        dependencies: HashSet<ObjectId>,
146        if_not_exists: bool,
147    ) -> Result<()>;
148
149    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151    async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153    async fn create_connection(
154        &self,
155        connection_name: String,
156        database_id: DatabaseId,
157        schema_id: SchemaId,
158        owner_id: u32,
159        connection: create_connection_request::Payload,
160    ) -> Result<()>;
161
162    async fn create_secret(
163        &self,
164        secret_name: String,
165        database_id: DatabaseId,
166        schema_id: SchemaId,
167        owner_id: u32,
168        payload: Vec<u8>,
169    ) -> Result<()>;
170
171    async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173    async fn drop_table(
174        &self,
175        source_id: Option<u32>,
176        table_id: TableId,
177        cascade: bool,
178    ) -> Result<()>;
179
180    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
187
188    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
189    -> Result<()>;
190
191    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
192
193    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
194
195    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
196
197    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
198
199    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
200
201    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
202
203    async fn alter_secret(
204        &self,
205        secret_id: SecretId,
206        secret_name: String,
207        database_id: DatabaseId,
208        schema_id: SchemaId,
209        owner_id: u32,
210        payload: Vec<u8>,
211    ) -> Result<()>;
212
213    async fn alter_name(
214        &self,
215        object_id: alter_name_request::Object,
216        object_name: &str,
217    ) -> Result<()>;
218
219    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
220
221    /// Replace the source in the catalog.
222    async fn alter_source(&self, source: PbSource) -> Result<()>;
223
224    async fn alter_parallelism(
225        &self,
226        job_id: JobId,
227        parallelism: PbTableParallelism,
228        deferred: bool,
229    ) -> Result<()>;
230
231    async fn alter_resource_group(
232        &self,
233        table_id: TableId,
234        resource_group: Option<String>,
235        deferred: bool,
236    ) -> Result<()>;
237
238    async fn alter_set_schema(
239        &self,
240        object: alter_set_schema_request::Object,
241        new_schema_id: SchemaId,
242    ) -> Result<()>;
243
244    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
245
246    async fn alter_database_param(
247        &self,
248        database_id: DatabaseId,
249        param: AlterDatabaseParam,
250    ) -> Result<()>;
251
252    async fn create_iceberg_table(
253        &self,
254        table_job_info: PbTableJobInfo,
255        sink_job_info: PbSinkJobInfo,
256        iceberg_source: PbSource,
257        if_not_exists: bool,
258    ) -> Result<()>;
259}
260
261#[derive(Clone)]
262pub struct CatalogWriterImpl {
263    meta_client: MetaClient,
264    catalog_updated_rx: Receiver<CatalogVersion>,
265    hummock_snapshot_manager: HummockSnapshotManagerRef,
266}
267
268#[async_trait::async_trait]
269impl CatalogWriter for CatalogWriterImpl {
270    async fn create_database(
271        &self,
272        db_name: &str,
273        owner: UserId,
274        resource_group: &str,
275        barrier_interval_ms: Option<u32>,
276        checkpoint_frequency: Option<u64>,
277    ) -> Result<()> {
278        let version = self
279            .meta_client
280            .create_database(PbDatabase {
281                name: db_name.to_owned(),
282                id: 0.into(),
283                owner,
284                resource_group: resource_group.to_owned(),
285                barrier_interval_ms,
286                checkpoint_frequency,
287            })
288            .await?;
289        self.wait_version(version).await
290    }
291
292    async fn create_schema(
293        &self,
294        db_id: DatabaseId,
295        schema_name: &str,
296        owner: UserId,
297    ) -> Result<()> {
298        let version = self
299            .meta_client
300            .create_schema(PbSchema {
301                id: 0.into(),
302                name: schema_name.to_owned(),
303                database_id: db_id,
304                owner,
305            })
306            .await?;
307        self.wait_version(version).await
308    }
309
310    // TODO: maybe here to pass a materialize plan node
311    async fn create_materialized_view(
312        &self,
313        table: PbTable,
314        graph: StreamFragmentGraph,
315        dependencies: HashSet<ObjectId>,
316        specific_resource_group: Option<String>,
317        if_not_exists: bool,
318    ) -> Result<()> {
319        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
320        let version = self
321            .meta_client
322            .create_materialized_view(
323                table,
324                graph,
325                dependencies,
326                specific_resource_group,
327                if_not_exists,
328            )
329            .await?;
330        if matches!(create_type, PbCreateType::Foreground) {
331            self.wait_version(version).await?
332        }
333        Ok(())
334    }
335
336    async fn replace_materialized_view(
337        &self,
338        table: PbTable,
339        graph: StreamFragmentGraph,
340    ) -> Result<()> {
341        // TODO: this is a dummy implementation for debugging only.
342        notice_to_user(format!("table: {table:#?}"));
343        notice_to_user(format!("graph: {graph:#?}"));
344
345        let version = self
346            .meta_client
347            .replace_job(
348                graph,
349                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
350            )
351            .await?;
352
353        self.wait_version(version).await
354    }
355
356    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
357        let version = self.meta_client.create_view(view, dependencies).await?;
358        self.wait_version(version).await
359    }
360
361    async fn create_index(
362        &self,
363        index: PbIndex,
364        table: PbTable,
365        graph: StreamFragmentGraph,
366        if_not_exists: bool,
367    ) -> Result<()> {
368        let version = self
369            .meta_client
370            .create_index(index, table, graph, if_not_exists)
371            .await?;
372        self.wait_version(version).await
373    }
374
375    async fn create_table(
376        &self,
377        source: Option<PbSource>,
378        table: PbTable,
379        graph: StreamFragmentGraph,
380        job_type: PbTableJobType,
381        if_not_exists: bool,
382        dependencies: HashSet<ObjectId>,
383    ) -> Result<()> {
384        let version = self
385            .meta_client
386            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
387            .await?;
388        self.wait_version(version).await
389    }
390
391    async fn replace_table(
392        &self,
393        source: Option<PbSource>,
394        table: PbTable,
395        graph: StreamFragmentGraph,
396        job_type: TableJobType,
397    ) -> Result<()> {
398        let version = self
399            .meta_client
400            .replace_job(
401                graph,
402                ReplaceJob::ReplaceTable(ReplaceTable {
403                    source,
404                    table: Some(table),
405                    job_type: job_type as _,
406                }),
407            )
408            .await?;
409        self.wait_version(version).await
410    }
411
412    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
413        let version = self
414            .meta_client
415            .replace_job(
416                graph,
417                ReplaceJob::ReplaceSource(ReplaceSource {
418                    source: Some(source),
419                }),
420            )
421            .await?;
422        self.wait_version(version).await
423    }
424
425    async fn create_source(
426        &self,
427        source: PbSource,
428        graph: Option<StreamFragmentGraph>,
429        if_not_exists: bool,
430    ) -> Result<()> {
431        let version = self
432            .meta_client
433            .create_source(source, graph, if_not_exists)
434            .await?;
435        self.wait_version(version).await
436    }
437
438    async fn create_sink(
439        &self,
440        sink: PbSink,
441        graph: StreamFragmentGraph,
442        dependencies: HashSet<ObjectId>,
443        if_not_exists: bool,
444    ) -> Result<()> {
445        let version = self
446            .meta_client
447            .create_sink(sink, graph, dependencies, if_not_exists)
448            .await?;
449        self.wait_version(version).await
450    }
451
452    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
453        let version = self.meta_client.create_subscription(subscription).await?;
454        self.wait_version(version).await
455    }
456
457    async fn create_function(&self, function: PbFunction) -> Result<()> {
458        let version = self.meta_client.create_function(function).await?;
459        self.wait_version(version).await
460    }
461
462    async fn create_connection(
463        &self,
464        connection_name: String,
465        database_id: DatabaseId,
466        schema_id: SchemaId,
467        owner_id: u32,
468        connection: create_connection_request::Payload,
469    ) -> Result<()> {
470        let version = self
471            .meta_client
472            .create_connection(
473                connection_name,
474                database_id,
475                schema_id,
476                owner_id,
477                connection,
478            )
479            .await?;
480        self.wait_version(version).await
481    }
482
483    async fn create_secret(
484        &self,
485        secret_name: String,
486        database_id: DatabaseId,
487        schema_id: SchemaId,
488        owner_id: u32,
489        payload: Vec<u8>,
490    ) -> Result<()> {
491        let version = self
492            .meta_client
493            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
494            .await?;
495        self.wait_version(version).await
496    }
497
498    async fn comment_on(&self, comment: PbComment) -> Result<()> {
499        let version = self.meta_client.comment_on(comment).await?;
500        self.wait_version(version).await
501    }
502
503    async fn drop_table(
504        &self,
505        source_id: Option<u32>,
506        table_id: TableId,
507        cascade: bool,
508    ) -> Result<()> {
509        let version = self
510            .meta_client
511            .drop_table(source_id, table_id, cascade)
512            .await?;
513        self.wait_version(version).await
514    }
515
516    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
517        let version = self
518            .meta_client
519            .drop_materialized_view(table_id, cascade)
520            .await?;
521        self.wait_version(version).await
522    }
523
524    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
525        let version = self.meta_client.drop_view(view_id, cascade).await?;
526        self.wait_version(version).await
527    }
528
529    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
530        let version = self.meta_client.drop_source(source_id, cascade).await?;
531        self.wait_version(version).await
532    }
533
534    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
535        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
536        self.wait_version(version).await
537    }
538
539    async fn drop_subscription(
540        &self,
541        subscription_id: SubscriptionId,
542        cascade: bool,
543    ) -> Result<()> {
544        let version = self
545            .meta_client
546            .drop_subscription(subscription_id, cascade)
547            .await?;
548        self.wait_version(version).await
549    }
550
551    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
552        let version = self.meta_client.drop_index(index_id, cascade).await?;
553        self.wait_version(version).await
554    }
555
556    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
557        let version = self.meta_client.drop_function(function_id, cascade).await?;
558        self.wait_version(version).await
559    }
560
561    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
562        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
563        self.wait_version(version).await
564    }
565
566    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
567        let version = self.meta_client.drop_database(database_id).await?;
568        self.wait_version(version).await
569    }
570
571    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
572        let version = self
573            .meta_client
574            .drop_connection(connection_id, cascade)
575            .await?;
576        self.wait_version(version).await
577    }
578
579    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
580        let version = self.meta_client.drop_secret(secret_id).await?;
581        self.wait_version(version).await
582    }
583
584    async fn alter_name(
585        &self,
586        object_id: alter_name_request::Object,
587        object_name: &str,
588    ) -> Result<()> {
589        let version = self.meta_client.alter_name(object_id, object_name).await?;
590        self.wait_version(version).await
591    }
592
593    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
594        let version = self.meta_client.alter_owner(object, owner_id).await?;
595        self.wait_version(version).await
596    }
597
598    async fn alter_set_schema(
599        &self,
600        object: alter_set_schema_request::Object,
601        new_schema_id: SchemaId,
602    ) -> Result<()> {
603        let version = self
604            .meta_client
605            .alter_set_schema(object, new_schema_id)
606            .await?;
607        self.wait_version(version).await
608    }
609
610    async fn alter_source(&self, source: PbSource) -> Result<()> {
611        let version = self.meta_client.alter_source(source).await?;
612        self.wait_version(version).await
613    }
614
615    async fn alter_parallelism(
616        &self,
617        job_id: JobId,
618        parallelism: PbTableParallelism,
619        deferred: bool,
620    ) -> Result<()> {
621        self.meta_client
622            .alter_parallelism(job_id, parallelism, deferred)
623            .await
624            .map_err(|e| anyhow!(e))?;
625
626        Ok(())
627    }
628
629    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
630        let version = self.meta_client.alter_swap_rename(object).await?;
631        self.wait_version(version).await
632    }
633
634    async fn alter_secret(
635        &self,
636        secret_id: SecretId,
637        secret_name: String,
638        database_id: DatabaseId,
639        schema_id: SchemaId,
640        owner_id: u32,
641        payload: Vec<u8>,
642    ) -> Result<()> {
643        let version = self
644            .meta_client
645            .alter_secret(
646                secret_id,
647                secret_name,
648                database_id,
649                schema_id,
650                owner_id,
651                payload,
652            )
653            .await?;
654        self.wait_version(version).await
655    }
656
657    async fn alter_resource_group(
658        &self,
659        table_id: TableId,
660        resource_group: Option<String>,
661        deferred: bool,
662    ) -> Result<()> {
663        self.meta_client
664            .alter_resource_group(table_id, resource_group, deferred)
665            .await
666            .map_err(|e| anyhow!(e))?;
667
668        Ok(())
669    }
670
671    async fn alter_database_param(
672        &self,
673        database_id: DatabaseId,
674        param: AlterDatabaseParam,
675    ) -> Result<()> {
676        let version = self
677            .meta_client
678            .alter_database_param(database_id, param)
679            .await
680            .map_err(|e| anyhow!(e))?;
681        self.wait_version(version).await
682    }
683
684    async fn create_iceberg_table(
685        &self,
686        table_job_info: PbTableJobInfo,
687        sink_job_info: PbSinkJobInfo,
688        iceberg_source: PbSource,
689        if_not_exists: bool,
690    ) -> Result<()> {
691        let version = self
692            .meta_client
693            .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
694            .await?;
695        self.wait_version(version).await
696    }
697}
698
699impl CatalogWriterImpl {
700    pub fn new(
701        meta_client: MetaClient,
702        catalog_updated_rx: Receiver<CatalogVersion>,
703        hummock_snapshot_manager: HummockSnapshotManagerRef,
704    ) -> Self {
705        Self {
706            meta_client,
707            catalog_updated_rx,
708            hummock_snapshot_manager,
709        }
710    }
711
712    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
713        let mut rx = self.catalog_updated_rx.clone();
714        while *rx.borrow_and_update() < version.catalog_version {
715            rx.changed().await.map_err(|e| anyhow!(e))?;
716        }
717        self.hummock_snapshot_manager
718            .wait(HummockVersionId::new(version.hummock_version_id))
719            .await;
720        Ok(())
721    }
722}