risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36};
37use risingwave_pb::meta::PbTableParallelism;
38use risingwave_pb::stream_plan::StreamFragmentGraph;
39use risingwave_rpc_client::MetaClient;
40use tokio::sync::watch::Receiver;
41
42use super::root_catalog::Catalog;
43use super::{DatabaseId, SecretId, TableId};
44use crate::error::Result;
45use crate::scheduler::HummockSnapshotManagerRef;
46use crate::session::current::notice_to_user;
47use crate::user::UserId;
48
49pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
50
51/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
52#[derive(Clone)]
53pub struct CatalogReader(Arc<RwLock<Catalog>>);
54
55impl CatalogReader {
56    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
57        CatalogReader(inner)
58    }
59
60    pub fn read_guard(&self) -> CatalogReadGuard {
61        // Make this recursive so that one can get this guard in the same thread without fear.
62        self.0.read_arc_recursive()
63    }
64}
65
66/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
67/// It will only send rpc to meta and get the catalog version as response.
68/// Then it will wait for the local catalog to be synced to the version, which is performed by
69/// [observer](`crate::observer::FrontendObserverNode`).
70#[async_trait::async_trait]
71pub trait CatalogWriter: Send + Sync {
72    async fn create_database(
73        &self,
74        db_name: &str,
75        owner: UserId,
76        resource_group: &str,
77        barrier_interval_ms: Option<u32>,
78        checkpoint_frequency: Option<u64>,
79    ) -> Result<()>;
80
81    async fn create_schema(
82        &self,
83        db_id: DatabaseId,
84        schema_name: &str,
85        owner: UserId,
86    ) -> Result<()>;
87
88    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
89
90    async fn create_materialized_view(
91        &self,
92        table: PbTable,
93        graph: StreamFragmentGraph,
94        dependencies: HashSet<ObjectId>,
95        specific_resource_group: Option<String>,
96        if_not_exists: bool,
97    ) -> Result<()>;
98
99    async fn replace_materialized_view(
100        &self,
101        table: PbTable,
102        graph: StreamFragmentGraph,
103    ) -> Result<()>;
104
105    async fn create_table(
106        &self,
107        source: Option<PbSource>,
108        table: PbTable,
109        graph: StreamFragmentGraph,
110        job_type: PbTableJobType,
111        if_not_exists: bool,
112        dependencies: HashSet<ObjectId>,
113    ) -> Result<()>;
114
115    async fn replace_table(
116        &self,
117        source: Option<PbSource>,
118        table: PbTable,
119        graph: StreamFragmentGraph,
120        job_type: TableJobType,
121    ) -> Result<()>;
122
123    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
124
125    async fn create_index(
126        &self,
127        index: PbIndex,
128        table: PbTable,
129        graph: StreamFragmentGraph,
130        if_not_exists: bool,
131    ) -> Result<()>;
132
133    async fn create_source(
134        &self,
135        source: PbSource,
136        graph: Option<StreamFragmentGraph>,
137        if_not_exists: bool,
138    ) -> Result<()>;
139
140    async fn create_sink(
141        &self,
142        sink: PbSink,
143        graph: StreamFragmentGraph,
144        dependencies: HashSet<ObjectId>,
145        if_not_exists: bool,
146    ) -> Result<()>;
147
148    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
149
150    async fn create_function(&self, function: PbFunction) -> Result<()>;
151
152    async fn create_connection(
153        &self,
154        connection_name: String,
155        database_id: u32,
156        schema_id: u32,
157        owner_id: u32,
158        connection: create_connection_request::Payload,
159    ) -> Result<()>;
160
161    async fn create_secret(
162        &self,
163        secret_name: String,
164        database_id: u32,
165        schema_id: u32,
166        owner_id: u32,
167        payload: Vec<u8>,
168    ) -> Result<()>;
169
170    async fn comment_on(&self, comment: PbComment) -> Result<()>;
171
172    async fn drop_table(
173        &self,
174        source_id: Option<u32>,
175        table_id: TableId,
176        cascade: bool,
177    ) -> Result<()>;
178
179    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
180
181    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
182
183    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
184
185    async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()>;
186
187    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
188
189    async fn drop_database(&self, database_id: u32) -> Result<()>;
190
191    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
192
193    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
194
195    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
196
197    async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()>;
198
199    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
200
201    async fn alter_secret(
202        &self,
203        secret_id: u32,
204        secret_name: String,
205        database_id: u32,
206        schema_id: u32,
207        owner_id: u32,
208        payload: Vec<u8>,
209    ) -> Result<()>;
210
211    async fn alter_name(
212        &self,
213        object_id: alter_name_request::Object,
214        object_name: &str,
215    ) -> Result<()>;
216
217    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
218
219    /// Replace the source in the catalog.
220    async fn alter_source(&self, source: PbSource) -> Result<()>;
221
222    async fn alter_parallelism(
223        &self,
224        job_id: u32,
225        parallelism: PbTableParallelism,
226        deferred: bool,
227    ) -> Result<()>;
228
229    async fn alter_resource_group(
230        &self,
231        table_id: u32,
232        resource_group: Option<String>,
233        deferred: bool,
234    ) -> Result<()>;
235
236    async fn alter_set_schema(
237        &self,
238        object: alter_set_schema_request::Object,
239        new_schema_id: u32,
240    ) -> Result<()>;
241
242    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
243
244    async fn alter_database_param(
245        &self,
246        database_id: DatabaseId,
247        param: AlterDatabaseParam,
248    ) -> Result<()>;
249
250    async fn create_iceberg_table(
251        &self,
252        table_job_info: PbTableJobInfo,
253        sink_job_info: PbSinkJobInfo,
254        iceberg_source: PbSource,
255        if_not_exists: bool,
256    ) -> Result<()>;
257}
258
259#[derive(Clone)]
260pub struct CatalogWriterImpl {
261    meta_client: MetaClient,
262    catalog_updated_rx: Receiver<CatalogVersion>,
263    hummock_snapshot_manager: HummockSnapshotManagerRef,
264}
265
266#[async_trait::async_trait]
267impl CatalogWriter for CatalogWriterImpl {
268    async fn create_database(
269        &self,
270        db_name: &str,
271        owner: UserId,
272        resource_group: &str,
273        barrier_interval_ms: Option<u32>,
274        checkpoint_frequency: Option<u64>,
275    ) -> Result<()> {
276        let version = self
277            .meta_client
278            .create_database(PbDatabase {
279                name: db_name.to_owned(),
280                id: 0,
281                owner,
282                resource_group: resource_group.to_owned(),
283                barrier_interval_ms,
284                checkpoint_frequency,
285            })
286            .await?;
287        self.wait_version(version).await
288    }
289
290    async fn create_schema(
291        &self,
292        db_id: DatabaseId,
293        schema_name: &str,
294        owner: UserId,
295    ) -> Result<()> {
296        let version = self
297            .meta_client
298            .create_schema(PbSchema {
299                id: 0,
300                name: schema_name.to_owned(),
301                database_id: db_id,
302                owner,
303            })
304            .await?;
305        self.wait_version(version).await
306    }
307
308    // TODO: maybe here to pass a materialize plan node
309    async fn create_materialized_view(
310        &self,
311        table: PbTable,
312        graph: StreamFragmentGraph,
313        dependencies: HashSet<ObjectId>,
314        specific_resource_group: Option<String>,
315        if_not_exists: bool,
316    ) -> Result<()> {
317        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
318        let version = self
319            .meta_client
320            .create_materialized_view(
321                table,
322                graph,
323                dependencies,
324                specific_resource_group,
325                if_not_exists,
326            )
327            .await?;
328        if matches!(create_type, PbCreateType::Foreground) {
329            self.wait_version(version).await?
330        }
331        Ok(())
332    }
333
334    async fn replace_materialized_view(
335        &self,
336        table: PbTable,
337        graph: StreamFragmentGraph,
338    ) -> Result<()> {
339        // TODO: this is a dummy implementation for debugging only.
340        notice_to_user(format!("table: {table:#?}"));
341        notice_to_user(format!("graph: {graph:#?}"));
342
343        let version = self
344            .meta_client
345            .replace_job(
346                graph,
347                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
348            )
349            .await?;
350
351        self.wait_version(version).await
352    }
353
354    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
355        let version = self.meta_client.create_view(view, dependencies).await?;
356        self.wait_version(version).await
357    }
358
359    async fn create_index(
360        &self,
361        index: PbIndex,
362        table: PbTable,
363        graph: StreamFragmentGraph,
364        if_not_exists: bool,
365    ) -> Result<()> {
366        let version = self
367            .meta_client
368            .create_index(index, table, graph, if_not_exists)
369            .await?;
370        self.wait_version(version).await
371    }
372
373    async fn create_table(
374        &self,
375        source: Option<PbSource>,
376        table: PbTable,
377        graph: StreamFragmentGraph,
378        job_type: PbTableJobType,
379        if_not_exists: bool,
380        dependencies: HashSet<ObjectId>,
381    ) -> Result<()> {
382        let version = self
383            .meta_client
384            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
385            .await?;
386        self.wait_version(version).await
387    }
388
389    async fn replace_table(
390        &self,
391        source: Option<PbSource>,
392        table: PbTable,
393        graph: StreamFragmentGraph,
394        job_type: TableJobType,
395    ) -> Result<()> {
396        let version = self
397            .meta_client
398            .replace_job(
399                graph,
400                ReplaceJob::ReplaceTable(ReplaceTable {
401                    source,
402                    table: Some(table),
403                    job_type: job_type as _,
404                }),
405            )
406            .await?;
407        self.wait_version(version).await
408    }
409
410    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
411        let version = self
412            .meta_client
413            .replace_job(
414                graph,
415                ReplaceJob::ReplaceSource(ReplaceSource {
416                    source: Some(source),
417                }),
418            )
419            .await?;
420        self.wait_version(version).await
421    }
422
423    async fn create_source(
424        &self,
425        source: PbSource,
426        graph: Option<StreamFragmentGraph>,
427        if_not_exists: bool,
428    ) -> Result<()> {
429        let version = self
430            .meta_client
431            .create_source(source, graph, if_not_exists)
432            .await?;
433        self.wait_version(version).await
434    }
435
436    async fn create_sink(
437        &self,
438        sink: PbSink,
439        graph: StreamFragmentGraph,
440        dependencies: HashSet<ObjectId>,
441        if_not_exists: bool,
442    ) -> Result<()> {
443        let version = self
444            .meta_client
445            .create_sink(sink, graph, dependencies, if_not_exists)
446            .await?;
447        self.wait_version(version).await
448    }
449
450    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
451        let version = self.meta_client.create_subscription(subscription).await?;
452        self.wait_version(version).await
453    }
454
455    async fn create_function(&self, function: PbFunction) -> Result<()> {
456        let version = self.meta_client.create_function(function).await?;
457        self.wait_version(version).await
458    }
459
460    async fn create_connection(
461        &self,
462        connection_name: String,
463        database_id: u32,
464        schema_id: u32,
465        owner_id: u32,
466        connection: create_connection_request::Payload,
467    ) -> Result<()> {
468        let version = self
469            .meta_client
470            .create_connection(
471                connection_name,
472                database_id,
473                schema_id,
474                owner_id,
475                connection,
476            )
477            .await?;
478        self.wait_version(version).await
479    }
480
481    async fn create_secret(
482        &self,
483        secret_name: String,
484        database_id: u32,
485        schema_id: u32,
486        owner_id: u32,
487        payload: Vec<u8>,
488    ) -> Result<()> {
489        let version = self
490            .meta_client
491            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
492            .await?;
493        self.wait_version(version).await
494    }
495
496    async fn comment_on(&self, comment: PbComment) -> Result<()> {
497        let version = self.meta_client.comment_on(comment).await?;
498        self.wait_version(version).await
499    }
500
501    async fn drop_table(
502        &self,
503        source_id: Option<u32>,
504        table_id: TableId,
505        cascade: bool,
506    ) -> Result<()> {
507        let version = self
508            .meta_client
509            .drop_table(source_id, table_id, cascade)
510            .await?;
511        self.wait_version(version).await
512    }
513
514    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
515        let version = self
516            .meta_client
517            .drop_materialized_view(table_id, cascade)
518            .await?;
519        self.wait_version(version).await
520    }
521
522    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
523        let version = self.meta_client.drop_view(view_id, cascade).await?;
524        self.wait_version(version).await
525    }
526
527    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
528        let version = self.meta_client.drop_source(source_id, cascade).await?;
529        self.wait_version(version).await
530    }
531
532    async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
533        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
534        self.wait_version(version).await
535    }
536
537    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
538        let version = self
539            .meta_client
540            .drop_subscription(subscription_id, cascade)
541            .await?;
542        self.wait_version(version).await
543    }
544
545    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
546        let version = self.meta_client.drop_index(index_id, cascade).await?;
547        self.wait_version(version).await
548    }
549
550    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
551        let version = self.meta_client.drop_function(function_id, cascade).await?;
552        self.wait_version(version).await
553    }
554
555    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
556        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
557        self.wait_version(version).await
558    }
559
560    async fn drop_database(&self, database_id: u32) -> Result<()> {
561        let version = self.meta_client.drop_database(database_id).await?;
562        self.wait_version(version).await
563    }
564
565    async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()> {
566        let version = self
567            .meta_client
568            .drop_connection(connection_id, cascade)
569            .await?;
570        self.wait_version(version).await
571    }
572
573    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
574        let version = self.meta_client.drop_secret(secret_id).await?;
575        self.wait_version(version).await
576    }
577
578    async fn alter_name(
579        &self,
580        object_id: alter_name_request::Object,
581        object_name: &str,
582    ) -> Result<()> {
583        let version = self.meta_client.alter_name(object_id, object_name).await?;
584        self.wait_version(version).await
585    }
586
587    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
588        let version = self.meta_client.alter_owner(object, owner_id).await?;
589        self.wait_version(version).await
590    }
591
592    async fn alter_set_schema(
593        &self,
594        object: alter_set_schema_request::Object,
595        new_schema_id: u32,
596    ) -> Result<()> {
597        let version = self
598            .meta_client
599            .alter_set_schema(object, new_schema_id)
600            .await?;
601        self.wait_version(version).await
602    }
603
604    async fn alter_source(&self, source: PbSource) -> Result<()> {
605        let version = self.meta_client.alter_source(source).await?;
606        self.wait_version(version).await
607    }
608
609    async fn alter_parallelism(
610        &self,
611        job_id: u32,
612        parallelism: PbTableParallelism,
613        deferred: bool,
614    ) -> Result<()> {
615        self.meta_client
616            .alter_parallelism(job_id, parallelism, deferred)
617            .await
618            .map_err(|e| anyhow!(e))?;
619
620        Ok(())
621    }
622
623    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
624        let version = self.meta_client.alter_swap_rename(object).await?;
625        self.wait_version(version).await
626    }
627
628    async fn alter_secret(
629        &self,
630        secret_id: u32,
631        secret_name: String,
632        database_id: u32,
633        schema_id: u32,
634        owner_id: u32,
635        payload: Vec<u8>,
636    ) -> Result<()> {
637        let version = self
638            .meta_client
639            .alter_secret(
640                secret_id,
641                secret_name,
642                database_id,
643                schema_id,
644                owner_id,
645                payload,
646            )
647            .await?;
648        self.wait_version(version).await
649    }
650
651    async fn alter_resource_group(
652        &self,
653        table_id: u32,
654        resource_group: Option<String>,
655        deferred: bool,
656    ) -> Result<()> {
657        self.meta_client
658            .alter_resource_group(table_id, resource_group, deferred)
659            .await
660            .map_err(|e| anyhow!(e))?;
661
662        Ok(())
663    }
664
665    async fn alter_database_param(
666        &self,
667        database_id: DatabaseId,
668        param: AlterDatabaseParam,
669    ) -> Result<()> {
670        let version = self
671            .meta_client
672            .alter_database_param(database_id, param)
673            .await
674            .map_err(|e| anyhow!(e))?;
675        self.wait_version(version).await
676    }
677
678    async fn create_iceberg_table(
679        &self,
680        table_job_info: PbTableJobInfo,
681        sink_job_info: PbSinkJobInfo,
682        iceberg_source: PbSource,
683        if_not_exists: bool,
684    ) -> Result<()> {
685        let version = self
686            .meta_client
687            .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
688            .await?;
689        self.wait_version(version).await
690    }
691}
692
693impl CatalogWriterImpl {
694    pub fn new(
695        meta_client: MetaClient,
696        catalog_updated_rx: Receiver<CatalogVersion>,
697        hummock_snapshot_manager: HummockSnapshotManagerRef,
698    ) -> Self {
699        Self {
700            meta_client,
701            catalog_updated_rx,
702            hummock_snapshot_manager,
703        }
704    }
705
706    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
707        let mut rx = self.catalog_updated_rx.clone();
708        while *rx.borrow_and_update() < version.catalog_version {
709            rx.changed().await.map_err(|e| anyhow!(e))?;
710        }
711        self.hummock_snapshot_manager
712            .wait(HummockVersionId::new(version.hummock_version_id))
713            .await;
714        Ok(())
715    }
716}