risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{
30    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
31};
32use risingwave_pb::ddl_service::{
33    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
34    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
35};
36use risingwave_pb::meta::PbTableParallelism;
37use risingwave_pb::stream_plan::StreamFragmentGraph;
38use risingwave_rpc_client::MetaClient;
39use tokio::sync::watch::Receiver;
40
41use super::root_catalog::Catalog;
42use super::{DatabaseId, SecretId, TableId};
43use crate::error::Result;
44use crate::scheduler::HummockSnapshotManagerRef;
45use crate::session::current::notice_to_user;
46use crate::user::UserId;
47
48pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
49
50/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
51#[derive(Clone)]
52pub struct CatalogReader(Arc<RwLock<Catalog>>);
53
54impl CatalogReader {
55    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
56        CatalogReader(inner)
57    }
58
59    pub fn read_guard(&self) -> CatalogReadGuard {
60        // Make this recursive so that one can get this guard in the same thread without fear.
61        self.0.read_arc_recursive()
62    }
63}
64
65/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
66/// It will only send rpc to meta and get the catalog version as response.
67/// Then it will wait for the local catalog to be synced to the version, which is performed by
68/// [observer](`crate::observer::FrontendObserverNode`).
69#[async_trait::async_trait]
70pub trait CatalogWriter: Send + Sync {
71    async fn create_database(
72        &self,
73        db_name: &str,
74        owner: UserId,
75        resource_group: &str,
76        barrier_interval_ms: Option<u32>,
77        checkpoint_frequency: Option<u64>,
78    ) -> Result<()>;
79
80    async fn create_schema(
81        &self,
82        db_id: DatabaseId,
83        schema_name: &str,
84        owner: UserId,
85    ) -> Result<()>;
86
87    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
88
89    async fn create_materialized_view(
90        &self,
91        table: PbTable,
92        graph: StreamFragmentGraph,
93        dependencies: HashSet<ObjectId>,
94        specific_resource_group: Option<String>,
95        if_not_exists: bool,
96    ) -> Result<()>;
97
98    async fn replace_materialized_view(
99        &self,
100        table: PbTable,
101        graph: StreamFragmentGraph,
102    ) -> Result<()>;
103
104    async fn create_table(
105        &self,
106        source: Option<PbSource>,
107        table: PbTable,
108        graph: StreamFragmentGraph,
109        job_type: PbTableJobType,
110        if_not_exists: bool,
111        dependencies: HashSet<ObjectId>,
112    ) -> Result<()>;
113
114    async fn replace_table(
115        &self,
116        source: Option<PbSource>,
117        table: PbTable,
118        graph: StreamFragmentGraph,
119        job_type: TableJobType,
120    ) -> Result<()>;
121
122    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
123
124    async fn create_index(
125        &self,
126        index: PbIndex,
127        table: PbTable,
128        graph: StreamFragmentGraph,
129        if_not_exists: bool,
130    ) -> Result<()>;
131
132    async fn create_source(
133        &self,
134        source: PbSource,
135        graph: Option<StreamFragmentGraph>,
136        if_not_exists: bool,
137    ) -> Result<()>;
138
139    async fn create_sink(
140        &self,
141        sink: PbSink,
142        graph: StreamFragmentGraph,
143        dependencies: HashSet<ObjectId>,
144        if_not_exists: bool,
145    ) -> Result<()>;
146
147    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
148
149    async fn create_function(&self, function: PbFunction) -> Result<()>;
150
151    async fn create_connection(
152        &self,
153        connection_name: String,
154        database_id: u32,
155        schema_id: u32,
156        owner_id: u32,
157        connection: create_connection_request::Payload,
158    ) -> Result<()>;
159
160    async fn create_secret(
161        &self,
162        secret_name: String,
163        database_id: u32,
164        schema_id: u32,
165        owner_id: u32,
166        payload: Vec<u8>,
167    ) -> Result<()>;
168
169    async fn comment_on(&self, comment: PbComment) -> Result<()>;
170
171    async fn drop_table(
172        &self,
173        source_id: Option<u32>,
174        table_id: TableId,
175        cascade: bool,
176    ) -> Result<()>;
177
178    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
179
180    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
181
182    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
183
184    async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()>;
185
186    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
187
188    async fn drop_database(&self, database_id: u32) -> Result<()>;
189
190    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
191
192    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
193
194    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
195
196    async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()>;
197
198    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
199
200    async fn alter_secret(
201        &self,
202        secret_id: u32,
203        secret_name: String,
204        database_id: u32,
205        schema_id: u32,
206        owner_id: u32,
207        payload: Vec<u8>,
208    ) -> Result<()>;
209
210    async fn alter_name(
211        &self,
212        object_id: alter_name_request::Object,
213        object_name: &str,
214    ) -> Result<()>;
215
216    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
217
218    /// Replace the source in the catalog.
219    async fn alter_source(&self, source: PbSource) -> Result<()>;
220
221    async fn alter_parallelism(
222        &self,
223        job_id: u32,
224        parallelism: PbTableParallelism,
225        deferred: bool,
226    ) -> Result<()>;
227
228    async fn alter_resource_group(
229        &self,
230        table_id: u32,
231        resource_group: Option<String>,
232        deferred: bool,
233    ) -> Result<()>;
234
235    async fn alter_set_schema(
236        &self,
237        object: alter_set_schema_request::Object,
238        new_schema_id: u32,
239    ) -> Result<()>;
240
241    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
242
243    async fn alter_database_param(
244        &self,
245        database_id: DatabaseId,
246        param: AlterDatabaseParam,
247    ) -> Result<()>;
248}
249
250#[derive(Clone)]
251pub struct CatalogWriterImpl {
252    meta_client: MetaClient,
253    catalog_updated_rx: Receiver<CatalogVersion>,
254    hummock_snapshot_manager: HummockSnapshotManagerRef,
255}
256
257#[async_trait::async_trait]
258impl CatalogWriter for CatalogWriterImpl {
259    async fn create_database(
260        &self,
261        db_name: &str,
262        owner: UserId,
263        resource_group: &str,
264        barrier_interval_ms: Option<u32>,
265        checkpoint_frequency: Option<u64>,
266    ) -> Result<()> {
267        let version = self
268            .meta_client
269            .create_database(PbDatabase {
270                name: db_name.to_owned(),
271                id: 0,
272                owner,
273                resource_group: resource_group.to_owned(),
274                barrier_interval_ms,
275                checkpoint_frequency,
276            })
277            .await?;
278        self.wait_version(version).await
279    }
280
281    async fn create_schema(
282        &self,
283        db_id: DatabaseId,
284        schema_name: &str,
285        owner: UserId,
286    ) -> Result<()> {
287        let version = self
288            .meta_client
289            .create_schema(PbSchema {
290                id: 0,
291                name: schema_name.to_owned(),
292                database_id: db_id,
293                owner,
294            })
295            .await?;
296        self.wait_version(version).await
297    }
298
299    // TODO: maybe here to pass a materialize plan node
300    async fn create_materialized_view(
301        &self,
302        table: PbTable,
303        graph: StreamFragmentGraph,
304        dependencies: HashSet<ObjectId>,
305        specific_resource_group: Option<String>,
306        if_not_exists: bool,
307    ) -> Result<()> {
308        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
309        let version = self
310            .meta_client
311            .create_materialized_view(
312                table,
313                graph,
314                dependencies,
315                specific_resource_group,
316                if_not_exists,
317            )
318            .await?;
319        if matches!(create_type, PbCreateType::Foreground) {
320            self.wait_version(version).await?
321        }
322        Ok(())
323    }
324
325    async fn replace_materialized_view(
326        &self,
327        table: PbTable,
328        graph: StreamFragmentGraph,
329    ) -> Result<()> {
330        // TODO: this is a dummy implementation for debugging only.
331        notice_to_user(format!("table: {table:#?}"));
332        notice_to_user(format!("graph: {graph:#?}"));
333
334        let version = self
335            .meta_client
336            .replace_job(
337                graph,
338                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
339            )
340            .await?;
341
342        self.wait_version(version).await
343    }
344
345    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
346        let version = self.meta_client.create_view(view, dependencies).await?;
347        self.wait_version(version).await
348    }
349
350    async fn create_index(
351        &self,
352        index: PbIndex,
353        table: PbTable,
354        graph: StreamFragmentGraph,
355        if_not_exists: bool,
356    ) -> Result<()> {
357        let version = self
358            .meta_client
359            .create_index(index, table, graph, if_not_exists)
360            .await?;
361        self.wait_version(version).await
362    }
363
364    async fn create_table(
365        &self,
366        source: Option<PbSource>,
367        table: PbTable,
368        graph: StreamFragmentGraph,
369        job_type: PbTableJobType,
370        if_not_exists: bool,
371        dependencies: HashSet<ObjectId>,
372    ) -> Result<()> {
373        let version = self
374            .meta_client
375            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
376            .await?;
377        self.wait_version(version).await
378    }
379
380    async fn replace_table(
381        &self,
382        source: Option<PbSource>,
383        table: PbTable,
384        graph: StreamFragmentGraph,
385        job_type: TableJobType,
386    ) -> Result<()> {
387        let version = self
388            .meta_client
389            .replace_job(
390                graph,
391                ReplaceJob::ReplaceTable(ReplaceTable {
392                    source,
393                    table: Some(table),
394                    job_type: job_type as _,
395                }),
396            )
397            .await?;
398        self.wait_version(version).await
399    }
400
401    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
402        let version = self
403            .meta_client
404            .replace_job(
405                graph,
406                ReplaceJob::ReplaceSource(ReplaceSource {
407                    source: Some(source),
408                }),
409            )
410            .await?;
411        self.wait_version(version).await
412    }
413
414    async fn create_source(
415        &self,
416        source: PbSource,
417        graph: Option<StreamFragmentGraph>,
418        if_not_exists: bool,
419    ) -> Result<()> {
420        let version = self
421            .meta_client
422            .create_source(source, graph, if_not_exists)
423            .await?;
424        self.wait_version(version).await
425    }
426
427    async fn create_sink(
428        &self,
429        sink: PbSink,
430        graph: StreamFragmentGraph,
431        dependencies: HashSet<ObjectId>,
432        if_not_exists: bool,
433    ) -> Result<()> {
434        let version = self
435            .meta_client
436            .create_sink(sink, graph, dependencies, if_not_exists)
437            .await?;
438        self.wait_version(version).await
439    }
440
441    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
442        let version = self.meta_client.create_subscription(subscription).await?;
443        self.wait_version(version).await
444    }
445
446    async fn create_function(&self, function: PbFunction) -> Result<()> {
447        let version = self.meta_client.create_function(function).await?;
448        self.wait_version(version).await
449    }
450
451    async fn create_connection(
452        &self,
453        connection_name: String,
454        database_id: u32,
455        schema_id: u32,
456        owner_id: u32,
457        connection: create_connection_request::Payload,
458    ) -> Result<()> {
459        let version = self
460            .meta_client
461            .create_connection(
462                connection_name,
463                database_id,
464                schema_id,
465                owner_id,
466                connection,
467            )
468            .await?;
469        self.wait_version(version).await
470    }
471
472    async fn create_secret(
473        &self,
474        secret_name: String,
475        database_id: u32,
476        schema_id: u32,
477        owner_id: u32,
478        payload: Vec<u8>,
479    ) -> Result<()> {
480        let version = self
481            .meta_client
482            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
483            .await?;
484        self.wait_version(version).await
485    }
486
487    async fn comment_on(&self, comment: PbComment) -> Result<()> {
488        let version = self.meta_client.comment_on(comment).await?;
489        self.wait_version(version).await
490    }
491
492    async fn drop_table(
493        &self,
494        source_id: Option<u32>,
495        table_id: TableId,
496        cascade: bool,
497    ) -> Result<()> {
498        let version = self
499            .meta_client
500            .drop_table(source_id, table_id, cascade)
501            .await?;
502        self.wait_version(version).await
503    }
504
505    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
506        let version = self
507            .meta_client
508            .drop_materialized_view(table_id, cascade)
509            .await?;
510        self.wait_version(version).await
511    }
512
513    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
514        let version = self.meta_client.drop_view(view_id, cascade).await?;
515        self.wait_version(version).await
516    }
517
518    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
519        let version = self.meta_client.drop_source(source_id, cascade).await?;
520        self.wait_version(version).await
521    }
522
523    async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
524        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
525        self.wait_version(version).await
526    }
527
528    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
529        let version = self
530            .meta_client
531            .drop_subscription(subscription_id, cascade)
532            .await?;
533        self.wait_version(version).await
534    }
535
536    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
537        let version = self.meta_client.drop_index(index_id, cascade).await?;
538        self.wait_version(version).await
539    }
540
541    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
542        let version = self.meta_client.drop_function(function_id, cascade).await?;
543        self.wait_version(version).await
544    }
545
546    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
547        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
548        self.wait_version(version).await
549    }
550
551    async fn drop_database(&self, database_id: u32) -> Result<()> {
552        let version = self.meta_client.drop_database(database_id).await?;
553        self.wait_version(version).await
554    }
555
556    async fn drop_connection(&self, connection_id: u32, cascade: bool) -> Result<()> {
557        let version = self
558            .meta_client
559            .drop_connection(connection_id, cascade)
560            .await?;
561        self.wait_version(version).await
562    }
563
564    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
565        let version = self.meta_client.drop_secret(secret_id).await?;
566        self.wait_version(version).await
567    }
568
569    async fn alter_name(
570        &self,
571        object_id: alter_name_request::Object,
572        object_name: &str,
573    ) -> Result<()> {
574        let version = self.meta_client.alter_name(object_id, object_name).await?;
575        self.wait_version(version).await
576    }
577
578    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
579        let version = self.meta_client.alter_owner(object, owner_id).await?;
580        self.wait_version(version).await
581    }
582
583    async fn alter_set_schema(
584        &self,
585        object: alter_set_schema_request::Object,
586        new_schema_id: u32,
587    ) -> Result<()> {
588        let version = self
589            .meta_client
590            .alter_set_schema(object, new_schema_id)
591            .await?;
592        self.wait_version(version).await
593    }
594
595    async fn alter_source(&self, source: PbSource) -> Result<()> {
596        let version = self.meta_client.alter_source(source).await?;
597        self.wait_version(version).await
598    }
599
600    async fn alter_parallelism(
601        &self,
602        job_id: u32,
603        parallelism: PbTableParallelism,
604        deferred: bool,
605    ) -> Result<()> {
606        self.meta_client
607            .alter_parallelism(job_id, parallelism, deferred)
608            .await
609            .map_err(|e| anyhow!(e))?;
610
611        Ok(())
612    }
613
614    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
615        let version = self.meta_client.alter_swap_rename(object).await?;
616        self.wait_version(version).await
617    }
618
619    async fn alter_secret(
620        &self,
621        secret_id: u32,
622        secret_name: String,
623        database_id: u32,
624        schema_id: u32,
625        owner_id: u32,
626        payload: Vec<u8>,
627    ) -> Result<()> {
628        let version = self
629            .meta_client
630            .alter_secret(
631                secret_id,
632                secret_name,
633                database_id,
634                schema_id,
635                owner_id,
636                payload,
637            )
638            .await?;
639        self.wait_version(version).await
640    }
641
642    async fn alter_resource_group(
643        &self,
644        table_id: u32,
645        resource_group: Option<String>,
646        deferred: bool,
647    ) -> Result<()> {
648        self.meta_client
649            .alter_resource_group(table_id, resource_group, deferred)
650            .await
651            .map_err(|e| anyhow!(e))?;
652
653        Ok(())
654    }
655
656    async fn alter_database_param(
657        &self,
658        database_id: DatabaseId,
659        param: AlterDatabaseParam,
660    ) -> Result<()> {
661        let version = self
662            .meta_client
663            .alter_database_param(database_id, param)
664            .await
665            .map_err(|e| anyhow!(e))?;
666        self.wait_version(version).await
667    }
668}
669
670impl CatalogWriterImpl {
671    pub fn new(
672        meta_client: MetaClient,
673        catalog_updated_rx: Receiver<CatalogVersion>,
674        hummock_snapshot_manager: HummockSnapshotManagerRef,
675    ) -> Self {
676        Self {
677            meta_client,
678            catalog_updated_rx,
679            hummock_snapshot_manager,
680        }
681    }
682
683    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
684        let mut rx = self.catalog_updated_rx.clone();
685        while *rx.borrow_and_update() < version.catalog_version {
686            rx.changed().await.map_err(|e| anyhow!(e))?;
687        }
688        self.hummock_snapshot_manager
689            .wait(HummockVersionId::new(version.hummock_version_id))
690            .await;
691        Ok(())
692    }
693}