risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{
30    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
31};
32use risingwave_pb::ddl_service::{
33    PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
34    alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
35    create_connection_request,
36};
37use risingwave_pb::meta::PbTableParallelism;
38use risingwave_pb::stream_plan::StreamFragmentGraph;
39use risingwave_rpc_client::MetaClient;
40use tokio::sync::watch::Receiver;
41
42use super::root_catalog::Catalog;
43use super::{DatabaseId, SecretId, TableId};
44use crate::error::Result;
45use crate::scheduler::HummockSnapshotManagerRef;
46use crate::session::current::notice_to_user;
47use crate::user::UserId;
48
49pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
50
51/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
52#[derive(Clone)]
53pub struct CatalogReader(Arc<RwLock<Catalog>>);
54
55impl CatalogReader {
56    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
57        CatalogReader(inner)
58    }
59
60    pub fn read_guard(&self) -> CatalogReadGuard {
61        // Make this recursive so that one can get this guard in the same thread without fear.
62        self.0.read_arc_recursive()
63    }
64}
65
66/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
67/// It will only send rpc to meta and get the catalog version as response.
68/// Then it will wait for the local catalog to be synced to the version, which is performed by
69/// [observer](`crate::observer::FrontendObserverNode`).
70#[async_trait::async_trait]
71pub trait CatalogWriter: Send + Sync {
72    async fn create_database(
73        &self,
74        db_name: &str,
75        owner: UserId,
76        resource_group: &str,
77        barrier_interval_ms: Option<u32>,
78        checkpoint_frequency: Option<u64>,
79    ) -> Result<()>;
80
81    async fn create_schema(
82        &self,
83        db_id: DatabaseId,
84        schema_name: &str,
85        owner: UserId,
86    ) -> Result<()>;
87
88    async fn create_view(&self, view: PbView) -> Result<()>;
89
90    async fn create_materialized_view(
91        &self,
92        table: PbTable,
93        graph: StreamFragmentGraph,
94        dependencies: HashSet<ObjectId>,
95        specific_resource_group: Option<String>,
96        if_not_exists: bool,
97    ) -> Result<()>;
98
99    async fn replace_materialized_view(
100        &self,
101        table: PbTable,
102        graph: StreamFragmentGraph,
103    ) -> Result<()>;
104
105    async fn create_table(
106        &self,
107        source: Option<PbSource>,
108        table: PbTable,
109        graph: StreamFragmentGraph,
110        job_type: PbTableJobType,
111        if_not_exists: bool,
112    ) -> Result<()>;
113
114    async fn replace_table(
115        &self,
116        source: Option<PbSource>,
117        table: PbTable,
118        graph: StreamFragmentGraph,
119        job_type: TableJobType,
120    ) -> Result<()>;
121
122    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
123
124    async fn create_index(
125        &self,
126        index: PbIndex,
127        table: PbTable,
128        graph: StreamFragmentGraph,
129        if_not_exists: bool,
130    ) -> Result<()>;
131
132    async fn create_source(
133        &self,
134        source: PbSource,
135        graph: Option<StreamFragmentGraph>,
136        if_not_exists: bool,
137    ) -> Result<()>;
138
139    async fn create_sink(
140        &self,
141        sink: PbSink,
142        graph: StreamFragmentGraph,
143        affected_table_change: Option<PbReplaceJobPlan>,
144        dependencies: HashSet<ObjectId>,
145        if_not_exists: bool,
146    ) -> Result<()>;
147
148    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
149
150    async fn create_function(&self, function: PbFunction) -> Result<()>;
151
152    async fn create_connection(
153        &self,
154        connection_name: String,
155        database_id: u32,
156        schema_id: u32,
157        owner_id: u32,
158        connection: create_connection_request::Payload,
159    ) -> Result<()>;
160
161    async fn create_secret(
162        &self,
163        secret_name: String,
164        database_id: u32,
165        schema_id: u32,
166        owner_id: u32,
167        payload: Vec<u8>,
168    ) -> Result<()>;
169
170    async fn comment_on(&self, comment: PbComment) -> Result<()>;
171
172    async fn drop_table(
173        &self,
174        source_id: Option<u32>,
175        table_id: TableId,
176        cascade: bool,
177    ) -> Result<()>;
178
179    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
180
181    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
182
183    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
184
185    async fn drop_sink(
186        &self,
187        sink_id: u32,
188        cascade: bool,
189        affected_table_change: Option<PbReplaceJobPlan>,
190    ) -> Result<()>;
191
192    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
193
194    async fn drop_database(&self, database_id: u32) -> Result<()>;
195
196    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
197
198    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
199
200    async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
201
202    async fn drop_connection(&self, connection_id: u32) -> Result<()>;
203
204    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
205
206    async fn alter_secret(
207        &self,
208        secret_id: u32,
209        secret_name: String,
210        database_id: u32,
211        schema_id: u32,
212        owner_id: u32,
213        payload: Vec<u8>,
214    ) -> Result<()>;
215
216    async fn alter_name(
217        &self,
218        object_id: alter_name_request::Object,
219        object_name: &str,
220    ) -> Result<()>;
221
222    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
223
224    /// Replace the source in the catalog.
225    async fn alter_source(&self, source: PbSource) -> Result<()>;
226
227    async fn alter_parallelism(
228        &self,
229        job_id: u32,
230        parallelism: PbTableParallelism,
231        deferred: bool,
232    ) -> Result<()>;
233
234    async fn alter_resource_group(
235        &self,
236        table_id: u32,
237        resource_group: Option<String>,
238        deferred: bool,
239    ) -> Result<()>;
240
241    async fn alter_set_schema(
242        &self,
243        object: alter_set_schema_request::Object,
244        new_schema_id: u32,
245    ) -> Result<()>;
246
247    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
248
249    async fn alter_database_param(
250        &self,
251        database_id: DatabaseId,
252        param: AlterDatabaseParam,
253    ) -> Result<()>;
254}
255
256#[derive(Clone)]
257pub struct CatalogWriterImpl {
258    meta_client: MetaClient,
259    catalog_updated_rx: Receiver<CatalogVersion>,
260    hummock_snapshot_manager: HummockSnapshotManagerRef,
261}
262
263#[async_trait::async_trait]
264impl CatalogWriter for CatalogWriterImpl {
265    async fn create_database(
266        &self,
267        db_name: &str,
268        owner: UserId,
269        resource_group: &str,
270        barrier_interval_ms: Option<u32>,
271        checkpoint_frequency: Option<u64>,
272    ) -> Result<()> {
273        let version = self
274            .meta_client
275            .create_database(PbDatabase {
276                name: db_name.to_owned(),
277                id: 0,
278                owner,
279                resource_group: resource_group.to_owned(),
280                barrier_interval_ms,
281                checkpoint_frequency,
282            })
283            .await?;
284        self.wait_version(version).await
285    }
286
287    async fn create_schema(
288        &self,
289        db_id: DatabaseId,
290        schema_name: &str,
291        owner: UserId,
292    ) -> Result<()> {
293        let version = self
294            .meta_client
295            .create_schema(PbSchema {
296                id: 0,
297                name: schema_name.to_owned(),
298                database_id: db_id,
299                owner,
300            })
301            .await?;
302        self.wait_version(version).await
303    }
304
305    // TODO: maybe here to pass a materialize plan node
306    async fn create_materialized_view(
307        &self,
308        table: PbTable,
309        graph: StreamFragmentGraph,
310        dependencies: HashSet<ObjectId>,
311        specific_resource_group: Option<String>,
312        if_not_exists: bool,
313    ) -> Result<()> {
314        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
315        let version = self
316            .meta_client
317            .create_materialized_view(
318                table,
319                graph,
320                dependencies,
321                specific_resource_group,
322                if_not_exists,
323            )
324            .await?;
325        if matches!(create_type, PbCreateType::Foreground) {
326            self.wait_version(version).await?
327        }
328        Ok(())
329    }
330
331    async fn replace_materialized_view(
332        &self,
333        table: PbTable,
334        graph: StreamFragmentGraph,
335    ) -> Result<()> {
336        // TODO: this is a dummy implementation for debugging only.
337        notice_to_user(format!("table: {table:#?}"));
338        notice_to_user(format!("graph: {graph:#?}"));
339
340        let version = self
341            .meta_client
342            .replace_job(
343                graph,
344                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
345            )
346            .await?;
347
348        self.wait_version(version).await
349    }
350
351    async fn create_view(&self, view: PbView) -> Result<()> {
352        let version = self.meta_client.create_view(view).await?;
353        self.wait_version(version).await
354    }
355
356    async fn create_index(
357        &self,
358        index: PbIndex,
359        table: PbTable,
360        graph: StreamFragmentGraph,
361        if_not_exists: bool,
362    ) -> Result<()> {
363        let version = self
364            .meta_client
365            .create_index(index, table, graph, if_not_exists)
366            .await?;
367        self.wait_version(version).await
368    }
369
370    async fn create_table(
371        &self,
372        source: Option<PbSource>,
373        table: PbTable,
374        graph: StreamFragmentGraph,
375        job_type: PbTableJobType,
376        if_not_exists: bool,
377    ) -> Result<()> {
378        let version = self
379            .meta_client
380            .create_table(source, table, graph, job_type, if_not_exists)
381            .await?;
382        self.wait_version(version).await
383    }
384
385    async fn replace_table(
386        &self,
387        source: Option<PbSource>,
388        table: PbTable,
389        graph: StreamFragmentGraph,
390        job_type: TableJobType,
391    ) -> Result<()> {
392        let version = self
393            .meta_client
394            .replace_job(
395                graph,
396                ReplaceJob::ReplaceTable(ReplaceTable {
397                    source,
398                    table: Some(table),
399                    job_type: job_type as _,
400                }),
401            )
402            .await?;
403        self.wait_version(version).await
404    }
405
406    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
407        let version = self
408            .meta_client
409            .replace_job(
410                graph,
411                ReplaceJob::ReplaceSource(ReplaceSource {
412                    source: Some(source),
413                }),
414            )
415            .await?;
416        self.wait_version(version).await
417    }
418
419    async fn create_source(
420        &self,
421        source: PbSource,
422        graph: Option<StreamFragmentGraph>,
423        if_not_exists: bool,
424    ) -> Result<()> {
425        let version = self
426            .meta_client
427            .create_source(source, graph, if_not_exists)
428            .await?;
429        self.wait_version(version).await
430    }
431
432    async fn create_sink(
433        &self,
434        sink: PbSink,
435        graph: StreamFragmentGraph,
436        affected_table_change: Option<ReplaceJobPlan>,
437        dependencies: HashSet<ObjectId>,
438        if_not_exists: bool,
439    ) -> Result<()> {
440        let version = self
441            .meta_client
442            .create_sink(
443                sink,
444                graph,
445                affected_table_change,
446                dependencies,
447                if_not_exists,
448            )
449            .await?;
450        self.wait_version(version).await
451    }
452
453    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
454        let version = self.meta_client.create_subscription(subscription).await?;
455        self.wait_version(version).await
456    }
457
458    async fn create_function(&self, function: PbFunction) -> Result<()> {
459        let version = self.meta_client.create_function(function).await?;
460        self.wait_version(version).await
461    }
462
463    async fn create_connection(
464        &self,
465        connection_name: String,
466        database_id: u32,
467        schema_id: u32,
468        owner_id: u32,
469        connection: create_connection_request::Payload,
470    ) -> Result<()> {
471        let version = self
472            .meta_client
473            .create_connection(
474                connection_name,
475                database_id,
476                schema_id,
477                owner_id,
478                connection,
479            )
480            .await?;
481        self.wait_version(version).await
482    }
483
484    async fn create_secret(
485        &self,
486        secret_name: String,
487        database_id: u32,
488        schema_id: u32,
489        owner_id: u32,
490        payload: Vec<u8>,
491    ) -> Result<()> {
492        let version = self
493            .meta_client
494            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
495            .await?;
496        self.wait_version(version).await
497    }
498
499    async fn comment_on(&self, comment: PbComment) -> Result<()> {
500        let version = self.meta_client.comment_on(comment).await?;
501        self.wait_version(version).await
502    }
503
504    async fn drop_table(
505        &self,
506        source_id: Option<u32>,
507        table_id: TableId,
508        cascade: bool,
509    ) -> Result<()> {
510        let version = self
511            .meta_client
512            .drop_table(source_id, table_id, cascade)
513            .await?;
514        self.wait_version(version).await
515    }
516
517    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
518        let version = self
519            .meta_client
520            .drop_materialized_view(table_id, cascade)
521            .await?;
522        self.wait_version(version).await
523    }
524
525    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
526        let version = self.meta_client.drop_view(view_id, cascade).await?;
527        self.wait_version(version).await
528    }
529
530    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
531        let version = self.meta_client.drop_source(source_id, cascade).await?;
532        self.wait_version(version).await
533    }
534
535    async fn drop_sink(
536        &self,
537        sink_id: u32,
538        cascade: bool,
539        affected_table_change: Option<ReplaceJobPlan>,
540    ) -> Result<()> {
541        let version = self
542            .meta_client
543            .drop_sink(sink_id, cascade, affected_table_change)
544            .await?;
545        self.wait_version(version).await
546    }
547
548    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
549        let version = self
550            .meta_client
551            .drop_subscription(subscription_id, cascade)
552            .await?;
553        self.wait_version(version).await
554    }
555
556    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
557        let version = self.meta_client.drop_index(index_id, cascade).await?;
558        self.wait_version(version).await
559    }
560
561    async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
562        let version = self.meta_client.drop_function(function_id).await?;
563        self.wait_version(version).await
564    }
565
566    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
567        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
568        self.wait_version(version).await
569    }
570
571    async fn drop_database(&self, database_id: u32) -> Result<()> {
572        let version = self.meta_client.drop_database(database_id).await?;
573        self.wait_version(version).await
574    }
575
576    async fn drop_connection(&self, connection_id: u32) -> Result<()> {
577        let version = self.meta_client.drop_connection(connection_id).await?;
578        self.wait_version(version).await
579    }
580
581    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
582        let version = self.meta_client.drop_secret(secret_id).await?;
583        self.wait_version(version).await
584    }
585
586    async fn alter_name(
587        &self,
588        object_id: alter_name_request::Object,
589        object_name: &str,
590    ) -> Result<()> {
591        let version = self.meta_client.alter_name(object_id, object_name).await?;
592        self.wait_version(version).await
593    }
594
595    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
596        let version = self.meta_client.alter_owner(object, owner_id).await?;
597        self.wait_version(version).await
598    }
599
600    async fn alter_set_schema(
601        &self,
602        object: alter_set_schema_request::Object,
603        new_schema_id: u32,
604    ) -> Result<()> {
605        let version = self
606            .meta_client
607            .alter_set_schema(object, new_schema_id)
608            .await?;
609        self.wait_version(version).await
610    }
611
612    async fn alter_source(&self, source: PbSource) -> Result<()> {
613        let version = self.meta_client.alter_source(source).await?;
614        self.wait_version(version).await
615    }
616
617    async fn alter_parallelism(
618        &self,
619        job_id: u32,
620        parallelism: PbTableParallelism,
621        deferred: bool,
622    ) -> Result<()> {
623        self.meta_client
624            .alter_parallelism(job_id, parallelism, deferred)
625            .await
626            .map_err(|e| anyhow!(e))?;
627
628        Ok(())
629    }
630
631    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
632        let version = self.meta_client.alter_swap_rename(object).await?;
633        self.wait_version(version).await
634    }
635
636    async fn alter_secret(
637        &self,
638        secret_id: u32,
639        secret_name: String,
640        database_id: u32,
641        schema_id: u32,
642        owner_id: u32,
643        payload: Vec<u8>,
644    ) -> Result<()> {
645        let version = self
646            .meta_client
647            .alter_secret(
648                secret_id,
649                secret_name,
650                database_id,
651                schema_id,
652                owner_id,
653                payload,
654            )
655            .await?;
656        self.wait_version(version).await
657    }
658
659    async fn alter_resource_group(
660        &self,
661        table_id: u32,
662        resource_group: Option<String>,
663        deferred: bool,
664    ) -> Result<()> {
665        self.meta_client
666            .alter_resource_group(table_id, resource_group, deferred)
667            .await
668            .map_err(|e| anyhow!(e))?;
669
670        Ok(())
671    }
672
673    async fn alter_database_param(
674        &self,
675        database_id: DatabaseId,
676        param: AlterDatabaseParam,
677    ) -> Result<()> {
678        let version = self
679            .meta_client
680            .alter_database_param(database_id, param)
681            .await
682            .map_err(|e| anyhow!(e))?;
683        self.wait_version(version).await
684    }
685}
686
687impl CatalogWriterImpl {
688    pub fn new(
689        meta_client: MetaClient,
690        catalog_updated_rx: Receiver<CatalogVersion>,
691        hummock_snapshot_manager: HummockSnapshotManagerRef,
692    ) -> Self {
693        Self {
694            meta_client,
695            catalog_updated_rx,
696            hummock_snapshot_manager,
697        }
698    }
699
700    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
701        let mut rx = self.catalog_updated_rx.clone();
702        while *rx.borrow_and_update() < version.catalog_version {
703            rx.changed().await.map_err(|e| anyhow!(e))?;
704        }
705        self.hummock_snapshot_manager
706            .wait(HummockVersionId::new(version.hummock_version_id))
707            .await;
708        Ok(())
709    }
710}