risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{
30    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
31};
32use risingwave_pb::ddl_service::{
33    PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
34    alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
35    create_connection_request,
36};
37use risingwave_pb::meta::PbTableParallelism;
38use risingwave_pb::stream_plan::StreamFragmentGraph;
39use risingwave_rpc_client::MetaClient;
40use tokio::sync::watch::Receiver;
41
42use super::root_catalog::Catalog;
43use super::{DatabaseId, SecretId, TableId};
44use crate::error::Result;
45use crate::scheduler::HummockSnapshotManagerRef;
46use crate::session::current::notice_to_user;
47use crate::user::UserId;
48
49pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
50
51/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
52#[derive(Clone)]
53pub struct CatalogReader(Arc<RwLock<Catalog>>);
54
55impl CatalogReader {
56    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
57        CatalogReader(inner)
58    }
59
60    pub fn read_guard(&self) -> CatalogReadGuard {
61        // Make this recursive so that one can get this guard in the same thread without fear.
62        self.0.read_arc_recursive()
63    }
64}
65
66/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
67/// It will only send rpc to meta and get the catalog version as response.
68/// Then it will wait for the local catalog to be synced to the version, which is performed by
69/// [observer](`crate::observer::FrontendObserverNode`).
70#[async_trait::async_trait]
71pub trait CatalogWriter: Send + Sync {
72    async fn create_database(
73        &self,
74        db_name: &str,
75        owner: UserId,
76        resource_group: &str,
77        barrier_interval_ms: Option<u32>,
78        checkpoint_frequency: Option<u64>,
79    ) -> Result<()>;
80
81    async fn create_schema(
82        &self,
83        db_id: DatabaseId,
84        schema_name: &str,
85        owner: UserId,
86    ) -> Result<()>;
87
88    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
89
90    async fn create_materialized_view(
91        &self,
92        table: PbTable,
93        graph: StreamFragmentGraph,
94        dependencies: HashSet<ObjectId>,
95        specific_resource_group: Option<String>,
96        if_not_exists: bool,
97    ) -> Result<()>;
98
99    async fn replace_materialized_view(
100        &self,
101        table: PbTable,
102        graph: StreamFragmentGraph,
103    ) -> Result<()>;
104
105    async fn create_table(
106        &self,
107        source: Option<PbSource>,
108        table: PbTable,
109        graph: StreamFragmentGraph,
110        job_type: PbTableJobType,
111        if_not_exists: bool,
112        dependencies: HashSet<ObjectId>,
113    ) -> Result<()>;
114
115    async fn replace_table(
116        &self,
117        source: Option<PbSource>,
118        table: PbTable,
119        graph: StreamFragmentGraph,
120        job_type: TableJobType,
121    ) -> Result<()>;
122
123    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
124
125    async fn create_index(
126        &self,
127        index: PbIndex,
128        table: PbTable,
129        graph: StreamFragmentGraph,
130        if_not_exists: bool,
131    ) -> Result<()>;
132
133    async fn create_source(
134        &self,
135        source: PbSource,
136        graph: Option<StreamFragmentGraph>,
137        if_not_exists: bool,
138    ) -> Result<()>;
139
140    async fn create_sink(
141        &self,
142        sink: PbSink,
143        graph: StreamFragmentGraph,
144        affected_table_change: Option<PbReplaceJobPlan>,
145        dependencies: HashSet<ObjectId>,
146        if_not_exists: bool,
147    ) -> Result<()>;
148
149    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151    async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153    async fn create_connection(
154        &self,
155        connection_name: String,
156        database_id: u32,
157        schema_id: u32,
158        owner_id: u32,
159        connection: create_connection_request::Payload,
160    ) -> Result<()>;
161
162    async fn create_secret(
163        &self,
164        secret_name: String,
165        database_id: u32,
166        schema_id: u32,
167        owner_id: u32,
168        payload: Vec<u8>,
169    ) -> Result<()>;
170
171    async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173    async fn drop_table(
174        &self,
175        source_id: Option<u32>,
176        table_id: TableId,
177        cascade: bool,
178    ) -> Result<()>;
179
180    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
183
184    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
185
186    async fn drop_sink(
187        &self,
188        sink_id: u32,
189        cascade: bool,
190        affected_table_change: Option<PbReplaceJobPlan>,
191    ) -> Result<()>;
192
193    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
194
195    async fn drop_database(&self, database_id: u32) -> Result<()>;
196
197    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
198
199    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
200
201    async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
202
203    async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()>;
204
205    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
206
207    async fn alter_secret(
208        &self,
209        secret_id: u32,
210        secret_name: String,
211        database_id: u32,
212        schema_id: u32,
213        owner_id: u32,
214        payload: Vec<u8>,
215    ) -> Result<()>;
216
217    async fn alter_name(
218        &self,
219        object_id: alter_name_request::Object,
220        object_name: &str,
221    ) -> Result<()>;
222
223    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
224
225    /// Replace the source in the catalog.
226    async fn alter_source(&self, source: PbSource) -> Result<()>;
227
228    async fn alter_parallelism(
229        &self,
230        job_id: u32,
231        parallelism: PbTableParallelism,
232        deferred: bool,
233    ) -> Result<()>;
234
235    async fn alter_resource_group(
236        &self,
237        table_id: u32,
238        resource_group: Option<String>,
239        deferred: bool,
240    ) -> Result<()>;
241
242    async fn alter_set_schema(
243        &self,
244        object: alter_set_schema_request::Object,
245        new_schema_id: u32,
246    ) -> Result<()>;
247
248    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
249
250    async fn alter_database_param(
251        &self,
252        database_id: DatabaseId,
253        param: AlterDatabaseParam,
254    ) -> Result<()>;
255}
256
257#[derive(Clone)]
258pub struct CatalogWriterImpl {
259    meta_client: MetaClient,
260    catalog_updated_rx: Receiver<CatalogVersion>,
261    hummock_snapshot_manager: HummockSnapshotManagerRef,
262}
263
264#[async_trait::async_trait]
265impl CatalogWriter for CatalogWriterImpl {
266    async fn create_database(
267        &self,
268        db_name: &str,
269        owner: UserId,
270        resource_group: &str,
271        barrier_interval_ms: Option<u32>,
272        checkpoint_frequency: Option<u64>,
273    ) -> Result<()> {
274        let version = self
275            .meta_client
276            .create_database(PbDatabase {
277                name: db_name.to_owned(),
278                id: 0,
279                owner,
280                resource_group: resource_group.to_owned(),
281                barrier_interval_ms,
282                checkpoint_frequency,
283            })
284            .await?;
285        self.wait_version(version).await
286    }
287
288    async fn create_schema(
289        &self,
290        db_id: DatabaseId,
291        schema_name: &str,
292        owner: UserId,
293    ) -> Result<()> {
294        let version = self
295            .meta_client
296            .create_schema(PbSchema {
297                id: 0,
298                name: schema_name.to_owned(),
299                database_id: db_id,
300                owner,
301            })
302            .await?;
303        self.wait_version(version).await
304    }
305
306    // TODO: maybe here to pass a materialize plan node
307    async fn create_materialized_view(
308        &self,
309        table: PbTable,
310        graph: StreamFragmentGraph,
311        dependencies: HashSet<ObjectId>,
312        specific_resource_group: Option<String>,
313        if_not_exists: bool,
314    ) -> Result<()> {
315        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
316        let version = self
317            .meta_client
318            .create_materialized_view(
319                table,
320                graph,
321                dependencies,
322                specific_resource_group,
323                if_not_exists,
324            )
325            .await?;
326        if matches!(create_type, PbCreateType::Foreground) {
327            self.wait_version(version).await?
328        }
329        Ok(())
330    }
331
332    async fn replace_materialized_view(
333        &self,
334        table: PbTable,
335        graph: StreamFragmentGraph,
336    ) -> Result<()> {
337        // TODO: this is a dummy implementation for debugging only.
338        notice_to_user(format!("table: {table:#?}"));
339        notice_to_user(format!("graph: {graph:#?}"));
340
341        let version = self
342            .meta_client
343            .replace_job(
344                graph,
345                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
346            )
347            .await?;
348
349        self.wait_version(version).await
350    }
351
352    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
353        let version = self.meta_client.create_view(view, dependencies).await?;
354        self.wait_version(version).await
355    }
356
357    async fn create_index(
358        &self,
359        index: PbIndex,
360        table: PbTable,
361        graph: StreamFragmentGraph,
362        if_not_exists: bool,
363    ) -> Result<()> {
364        let version = self
365            .meta_client
366            .create_index(index, table, graph, if_not_exists)
367            .await?;
368        self.wait_version(version).await
369    }
370
371    async fn create_table(
372        &self,
373        source: Option<PbSource>,
374        table: PbTable,
375        graph: StreamFragmentGraph,
376        job_type: PbTableJobType,
377        if_not_exists: bool,
378        dependencies: HashSet<ObjectId>,
379    ) -> Result<()> {
380        let version = self
381            .meta_client
382            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
383            .await?;
384        self.wait_version(version).await
385    }
386
387    async fn replace_table(
388        &self,
389        source: Option<PbSource>,
390        table: PbTable,
391        graph: StreamFragmentGraph,
392        job_type: TableJobType,
393    ) -> Result<()> {
394        let version = self
395            .meta_client
396            .replace_job(
397                graph,
398                ReplaceJob::ReplaceTable(ReplaceTable {
399                    source,
400                    table: Some(table),
401                    job_type: job_type as _,
402                }),
403            )
404            .await?;
405        self.wait_version(version).await
406    }
407
408    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
409        let version = self
410            .meta_client
411            .replace_job(
412                graph,
413                ReplaceJob::ReplaceSource(ReplaceSource {
414                    source: Some(source),
415                }),
416            )
417            .await?;
418        self.wait_version(version).await
419    }
420
421    async fn create_source(
422        &self,
423        source: PbSource,
424        graph: Option<StreamFragmentGraph>,
425        if_not_exists: bool,
426    ) -> Result<()> {
427        let version = self
428            .meta_client
429            .create_source(source, graph, if_not_exists)
430            .await?;
431        self.wait_version(version).await
432    }
433
434    async fn create_sink(
435        &self,
436        sink: PbSink,
437        graph: StreamFragmentGraph,
438        affected_table_change: Option<ReplaceJobPlan>,
439        dependencies: HashSet<ObjectId>,
440        if_not_exists: bool,
441    ) -> Result<()> {
442        let version = self
443            .meta_client
444            .create_sink(
445                sink,
446                graph,
447                affected_table_change,
448                dependencies,
449                if_not_exists,
450            )
451            .await?;
452        self.wait_version(version).await
453    }
454
455    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
456        let version = self.meta_client.create_subscription(subscription).await?;
457        self.wait_version(version).await
458    }
459
460    async fn create_function(&self, function: PbFunction) -> Result<()> {
461        let version = self.meta_client.create_function(function).await?;
462        self.wait_version(version).await
463    }
464
465    async fn create_connection(
466        &self,
467        connection_name: String,
468        database_id: u32,
469        schema_id: u32,
470        owner_id: u32,
471        connection: create_connection_request::Payload,
472    ) -> Result<()> {
473        let version = self
474            .meta_client
475            .create_connection(
476                connection_name,
477                database_id,
478                schema_id,
479                owner_id,
480                connection,
481            )
482            .await?;
483        self.wait_version(version).await
484    }
485
486    async fn create_secret(
487        &self,
488        secret_name: String,
489        database_id: u32,
490        schema_id: u32,
491        owner_id: u32,
492        payload: Vec<u8>,
493    ) -> Result<()> {
494        let version = self
495            .meta_client
496            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
497            .await?;
498        self.wait_version(version).await
499    }
500
501    async fn comment_on(&self, comment: PbComment) -> Result<()> {
502        let version = self.meta_client.comment_on(comment).await?;
503        self.wait_version(version).await
504    }
505
506    async fn drop_table(
507        &self,
508        source_id: Option<u32>,
509        table_id: TableId,
510        cascade: bool,
511    ) -> Result<()> {
512        let version = self
513            .meta_client
514            .drop_table(source_id, table_id, cascade)
515            .await?;
516        self.wait_version(version).await
517    }
518
519    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
520        let version = self
521            .meta_client
522            .drop_materialized_view(table_id, cascade)
523            .await?;
524        self.wait_version(version).await
525    }
526
527    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
528        let version = self.meta_client.drop_view(view_id, cascade).await?;
529        self.wait_version(version).await
530    }
531
532    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
533        let version = self.meta_client.drop_source(source_id, cascade).await?;
534        self.wait_version(version).await
535    }
536
537    async fn drop_sink(
538        &self,
539        sink_id: u32,
540        cascade: bool,
541        affected_table_change: Option<ReplaceJobPlan>,
542    ) -> Result<()> {
543        let version = self
544            .meta_client
545            .drop_sink(sink_id, cascade, affected_table_change)
546            .await?;
547        self.wait_version(version).await
548    }
549
550    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
551        let version = self
552            .meta_client
553            .drop_subscription(subscription_id, cascade)
554            .await?;
555        self.wait_version(version).await
556    }
557
558    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
559        let version = self.meta_client.drop_index(index_id, cascade).await?;
560        self.wait_version(version).await
561    }
562
563    async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
564        let version = self.meta_client.drop_function(function_id).await?;
565        self.wait_version(version).await
566    }
567
568    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
569        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
570        self.wait_version(version).await
571    }
572
573    async fn drop_database(&self, database_id: u32) -> Result<()> {
574        let version = self.meta_client.drop_database(database_id).await?;
575        self.wait_version(version).await
576    }
577
578    async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()> {
579        let version = self
580            .meta_client
581            .drop_connection(connection_id, cascade)
582            .await?;
583        self.wait_version(version).await
584    }
585
586    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
587        let version = self.meta_client.drop_secret(secret_id).await?;
588        self.wait_version(version).await
589    }
590
591    async fn alter_name(
592        &self,
593        object_id: alter_name_request::Object,
594        object_name: &str,
595    ) -> Result<()> {
596        let version = self.meta_client.alter_name(object_id, object_name).await?;
597        self.wait_version(version).await
598    }
599
600    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
601        let version = self.meta_client.alter_owner(object, owner_id).await?;
602        self.wait_version(version).await
603    }
604
605    async fn alter_set_schema(
606        &self,
607        object: alter_set_schema_request::Object,
608        new_schema_id: u32,
609    ) -> Result<()> {
610        let version = self
611            .meta_client
612            .alter_set_schema(object, new_schema_id)
613            .await?;
614        self.wait_version(version).await
615    }
616
617    async fn alter_source(&self, source: PbSource) -> Result<()> {
618        let version = self.meta_client.alter_source(source).await?;
619        self.wait_version(version).await
620    }
621
622    async fn alter_parallelism(
623        &self,
624        job_id: u32,
625        parallelism: PbTableParallelism,
626        deferred: bool,
627    ) -> Result<()> {
628        self.meta_client
629            .alter_parallelism(job_id, parallelism, deferred)
630            .await
631            .map_err(|e| anyhow!(e))?;
632
633        Ok(())
634    }
635
636    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
637        let version = self.meta_client.alter_swap_rename(object).await?;
638        self.wait_version(version).await
639    }
640
641    async fn alter_secret(
642        &self,
643        secret_id: u32,
644        secret_name: String,
645        database_id: u32,
646        schema_id: u32,
647        owner_id: u32,
648        payload: Vec<u8>,
649    ) -> Result<()> {
650        let version = self
651            .meta_client
652            .alter_secret(
653                secret_id,
654                secret_name,
655                database_id,
656                schema_id,
657                owner_id,
658                payload,
659            )
660            .await?;
661        self.wait_version(version).await
662    }
663
664    async fn alter_resource_group(
665        &self,
666        table_id: u32,
667        resource_group: Option<String>,
668        deferred: bool,
669    ) -> Result<()> {
670        self.meta_client
671            .alter_resource_group(table_id, resource_group, deferred)
672            .await
673            .map_err(|e| anyhow!(e))?;
674
675        Ok(())
676    }
677
678    async fn alter_database_param(
679        &self,
680        database_id: DatabaseId,
681        param: AlterDatabaseParam,
682    ) -> Result<()> {
683        let version = self
684            .meta_client
685            .alter_database_param(database_id, param)
686            .await
687            .map_err(|e| anyhow!(e))?;
688        self.wait_version(version).await
689    }
690}
691
692impl CatalogWriterImpl {
693    pub fn new(
694        meta_client: MetaClient,
695        catalog_updated_rx: Receiver<CatalogVersion>,
696        hummock_snapshot_manager: HummockSnapshotManagerRef,
697    ) -> Self {
698        Self {
699            meta_client,
700            catalog_updated_rx,
701            hummock_snapshot_manager,
702        }
703    }
704
705    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
706        let mut rx = self.catalog_updated_rx.clone();
707        while *rx.borrow_and_update() < version.catalog_version {
708            rx.changed().await.map_err(|e| anyhow!(e))?;
709        }
710        self.hummock_snapshot_manager
711            .wait(HummockVersionId::new(version.hummock_version_id))
712            .await;
713        Ok(())
714    }
715}