risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_hummock_sdk::HummockVersionId;
26use risingwave_pb::catalog::{
27    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28    PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
53#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58        CatalogReader(inner)
59    }
60
61    pub fn read_guard(&self) -> CatalogReadGuard {
62        // Make this recursive so that one can get this guard in the same thread without fear.
63        self.0.read_arc_recursive()
64    }
65}
66
67/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
68/// It will only send rpc to meta and get the catalog version as response.
69/// Then it will wait for the local catalog to be synced to the version, which is performed by
70/// [observer](`crate::observer::FrontendObserverNode`).
71#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73    async fn create_database(
74        &self,
75        db_name: &str,
76        owner: UserId,
77        resource_group: &str,
78        barrier_interval_ms: Option<u32>,
79        checkpoint_frequency: Option<u64>,
80    ) -> Result<()>;
81
82    async fn create_schema(
83        &self,
84        db_id: DatabaseId,
85        schema_name: &str,
86        owner: UserId,
87    ) -> Result<()>;
88
89    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91    async fn create_materialized_view(
92        &self,
93        table: PbTable,
94        graph: StreamFragmentGraph,
95        dependencies: HashSet<ObjectId>,
96        specific_resource_group: Option<String>,
97        if_not_exists: bool,
98    ) -> Result<()>;
99
100    async fn replace_materialized_view(
101        &self,
102        table: PbTable,
103        graph: StreamFragmentGraph,
104    ) -> Result<()>;
105
106    async fn create_table(
107        &self,
108        source: Option<PbSource>,
109        table: PbTable,
110        graph: StreamFragmentGraph,
111        job_type: PbTableJobType,
112        if_not_exists: bool,
113        dependencies: HashSet<ObjectId>,
114    ) -> Result<()>;
115
116    async fn replace_table(
117        &self,
118        source: Option<PbSource>,
119        table: PbTable,
120        graph: StreamFragmentGraph,
121        job_type: TableJobType,
122    ) -> Result<()>;
123
124    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126    async fn create_index(
127        &self,
128        index: PbIndex,
129        table: PbTable,
130        graph: StreamFragmentGraph,
131        if_not_exists: bool,
132    ) -> Result<()>;
133
134    async fn create_source(
135        &self,
136        source: PbSource,
137        graph: Option<StreamFragmentGraph>,
138        if_not_exists: bool,
139    ) -> Result<()>;
140
141    async fn create_sink(
142        &self,
143        sink: PbSink,
144        graph: StreamFragmentGraph,
145        dependencies: HashSet<ObjectId>,
146        if_not_exists: bool,
147    ) -> Result<()>;
148
149    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151    async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153    async fn create_connection(
154        &self,
155        connection_name: String,
156        database_id: DatabaseId,
157        schema_id: SchemaId,
158        owner_id: u32,
159        connection: create_connection_request::Payload,
160    ) -> Result<()>;
161
162    async fn create_secret(
163        &self,
164        secret_name: String,
165        database_id: DatabaseId,
166        schema_id: SchemaId,
167        owner_id: u32,
168        payload: Vec<u8>,
169    ) -> Result<()>;
170
171    async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173    async fn drop_table(
174        &self,
175        source_id: Option<u32>,
176        table_id: TableId,
177        cascade: bool,
178    ) -> Result<()>;
179
180    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
187
188    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
189    -> Result<()>;
190
191    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
192
193    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
194
195    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
196
197    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
198
199    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
200
201    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
202
203    async fn alter_secret(
204        &self,
205        secret_id: SecretId,
206        secret_name: String,
207        database_id: DatabaseId,
208        schema_id: SchemaId,
209        owner_id: u32,
210        payload: Vec<u8>,
211    ) -> Result<()>;
212
213    async fn alter_name(
214        &self,
215        object_id: alter_name_request::Object,
216        object_name: &str,
217    ) -> Result<()>;
218
219    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
220
221    /// Replace the source in the catalog.
222    async fn alter_source(&self, source: PbSource) -> Result<()>;
223
224    async fn alter_parallelism(
225        &self,
226        job_id: JobId,
227        parallelism: PbTableParallelism,
228        deferred: bool,
229    ) -> Result<()>;
230
231    async fn alter_config(
232        &self,
233        job_id: JobId,
234        entries_to_add: HashMap<String, String>,
235        keys_to_remove: Vec<String>,
236    ) -> Result<()>;
237
238    async fn alter_resource_group(
239        &self,
240        table_id: TableId,
241        resource_group: Option<String>,
242        deferred: bool,
243    ) -> Result<()>;
244
245    async fn alter_set_schema(
246        &self,
247        object: alter_set_schema_request::Object,
248        new_schema_id: SchemaId,
249    ) -> Result<()>;
250
251    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
252
253    async fn alter_database_param(
254        &self,
255        database_id: DatabaseId,
256        param: AlterDatabaseParam,
257    ) -> Result<()>;
258
259    async fn create_iceberg_table(
260        &self,
261        table_job_info: PbTableJobInfo,
262        sink_job_info: PbSinkJobInfo,
263        iceberg_source: PbSource,
264        if_not_exists: bool,
265    ) -> Result<()>;
266}
267
268#[derive(Clone)]
269pub struct CatalogWriterImpl {
270    meta_client: MetaClient,
271    catalog_updated_rx: Receiver<CatalogVersion>,
272    hummock_snapshot_manager: HummockSnapshotManagerRef,
273}
274
275#[async_trait::async_trait]
276impl CatalogWriter for CatalogWriterImpl {
277    async fn create_database(
278        &self,
279        db_name: &str,
280        owner: UserId,
281        resource_group: &str,
282        barrier_interval_ms: Option<u32>,
283        checkpoint_frequency: Option<u64>,
284    ) -> Result<()> {
285        let version = self
286            .meta_client
287            .create_database(PbDatabase {
288                name: db_name.to_owned(),
289                id: 0.into(),
290                owner,
291                resource_group: resource_group.to_owned(),
292                barrier_interval_ms,
293                checkpoint_frequency,
294            })
295            .await?;
296        self.wait_version(version).await
297    }
298
299    async fn create_schema(
300        &self,
301        db_id: DatabaseId,
302        schema_name: &str,
303        owner: UserId,
304    ) -> Result<()> {
305        let version = self
306            .meta_client
307            .create_schema(PbSchema {
308                id: 0.into(),
309                name: schema_name.to_owned(),
310                database_id: db_id,
311                owner,
312            })
313            .await?;
314        self.wait_version(version).await
315    }
316
317    // TODO: maybe here to pass a materialize plan node
318    async fn create_materialized_view(
319        &self,
320        table: PbTable,
321        graph: StreamFragmentGraph,
322        dependencies: HashSet<ObjectId>,
323        specific_resource_group: Option<String>,
324        if_not_exists: bool,
325    ) -> Result<()> {
326        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
327        let version = self
328            .meta_client
329            .create_materialized_view(
330                table,
331                graph,
332                dependencies,
333                specific_resource_group,
334                if_not_exists,
335            )
336            .await?;
337        if matches!(create_type, PbCreateType::Foreground) {
338            self.wait_version(version).await?
339        }
340        Ok(())
341    }
342
343    async fn replace_materialized_view(
344        &self,
345        table: PbTable,
346        graph: StreamFragmentGraph,
347    ) -> Result<()> {
348        // TODO: this is a dummy implementation for debugging only.
349        notice_to_user(format!("table: {table:#?}"));
350        notice_to_user(format!("graph: {graph:#?}"));
351
352        let version = self
353            .meta_client
354            .replace_job(
355                graph,
356                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
357            )
358            .await?;
359
360        self.wait_version(version).await
361    }
362
363    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
364        let version = self.meta_client.create_view(view, dependencies).await?;
365        self.wait_version(version).await
366    }
367
368    async fn create_index(
369        &self,
370        index: PbIndex,
371        table: PbTable,
372        graph: StreamFragmentGraph,
373        if_not_exists: bool,
374    ) -> Result<()> {
375        let version = self
376            .meta_client
377            .create_index(index, table, graph, if_not_exists)
378            .await?;
379        self.wait_version(version).await
380    }
381
382    async fn create_table(
383        &self,
384        source: Option<PbSource>,
385        table: PbTable,
386        graph: StreamFragmentGraph,
387        job_type: PbTableJobType,
388        if_not_exists: bool,
389        dependencies: HashSet<ObjectId>,
390    ) -> Result<()> {
391        let version = self
392            .meta_client
393            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
394            .await?;
395        self.wait_version(version).await
396    }
397
398    async fn replace_table(
399        &self,
400        source: Option<PbSource>,
401        table: PbTable,
402        graph: StreamFragmentGraph,
403        job_type: TableJobType,
404    ) -> Result<()> {
405        let version = self
406            .meta_client
407            .replace_job(
408                graph,
409                ReplaceJob::ReplaceTable(ReplaceTable {
410                    source,
411                    table: Some(table),
412                    job_type: job_type as _,
413                }),
414            )
415            .await?;
416        self.wait_version(version).await
417    }
418
419    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
420        let version = self
421            .meta_client
422            .replace_job(
423                graph,
424                ReplaceJob::ReplaceSource(ReplaceSource {
425                    source: Some(source),
426                }),
427            )
428            .await?;
429        self.wait_version(version).await
430    }
431
432    async fn create_source(
433        &self,
434        source: PbSource,
435        graph: Option<StreamFragmentGraph>,
436        if_not_exists: bool,
437    ) -> Result<()> {
438        let version = self
439            .meta_client
440            .create_source(source, graph, if_not_exists)
441            .await?;
442        self.wait_version(version).await
443    }
444
445    async fn create_sink(
446        &self,
447        sink: PbSink,
448        graph: StreamFragmentGraph,
449        dependencies: HashSet<ObjectId>,
450        if_not_exists: bool,
451    ) -> Result<()> {
452        let version = self
453            .meta_client
454            .create_sink(sink, graph, dependencies, if_not_exists)
455            .await?;
456        self.wait_version(version).await
457    }
458
459    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
460        let version = self.meta_client.create_subscription(subscription).await?;
461        self.wait_version(version).await
462    }
463
464    async fn create_function(&self, function: PbFunction) -> Result<()> {
465        let version = self.meta_client.create_function(function).await?;
466        self.wait_version(version).await
467    }
468
469    async fn create_connection(
470        &self,
471        connection_name: String,
472        database_id: DatabaseId,
473        schema_id: SchemaId,
474        owner_id: u32,
475        connection: create_connection_request::Payload,
476    ) -> Result<()> {
477        let version = self
478            .meta_client
479            .create_connection(
480                connection_name,
481                database_id,
482                schema_id,
483                owner_id,
484                connection,
485            )
486            .await?;
487        self.wait_version(version).await
488    }
489
490    async fn create_secret(
491        &self,
492        secret_name: String,
493        database_id: DatabaseId,
494        schema_id: SchemaId,
495        owner_id: u32,
496        payload: Vec<u8>,
497    ) -> Result<()> {
498        let version = self
499            .meta_client
500            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
501            .await?;
502        self.wait_version(version).await
503    }
504
505    async fn comment_on(&self, comment: PbComment) -> Result<()> {
506        let version = self.meta_client.comment_on(comment).await?;
507        self.wait_version(version).await
508    }
509
510    async fn drop_table(
511        &self,
512        source_id: Option<u32>,
513        table_id: TableId,
514        cascade: bool,
515    ) -> Result<()> {
516        let version = self
517            .meta_client
518            .drop_table(source_id, table_id, cascade)
519            .await?;
520        self.wait_version(version).await
521    }
522
523    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
524        let version = self
525            .meta_client
526            .drop_materialized_view(table_id, cascade)
527            .await?;
528        self.wait_version(version).await
529    }
530
531    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
532        let version = self.meta_client.drop_view(view_id, cascade).await?;
533        self.wait_version(version).await
534    }
535
536    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
537        let version = self.meta_client.drop_source(source_id, cascade).await?;
538        self.wait_version(version).await
539    }
540
541    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
542        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
543        self.wait_version(version).await
544    }
545
546    async fn drop_subscription(
547        &self,
548        subscription_id: SubscriptionId,
549        cascade: bool,
550    ) -> Result<()> {
551        let version = self
552            .meta_client
553            .drop_subscription(subscription_id, cascade)
554            .await?;
555        self.wait_version(version).await
556    }
557
558    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
559        let version = self.meta_client.drop_index(index_id, cascade).await?;
560        self.wait_version(version).await
561    }
562
563    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
564        let version = self.meta_client.drop_function(function_id, cascade).await?;
565        self.wait_version(version).await
566    }
567
568    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
569        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
570        self.wait_version(version).await
571    }
572
573    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
574        let version = self.meta_client.drop_database(database_id).await?;
575        self.wait_version(version).await
576    }
577
578    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
579        let version = self
580            .meta_client
581            .drop_connection(connection_id, cascade)
582            .await?;
583        self.wait_version(version).await
584    }
585
586    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
587        let version = self.meta_client.drop_secret(secret_id).await?;
588        self.wait_version(version).await
589    }
590
591    async fn alter_name(
592        &self,
593        object_id: alter_name_request::Object,
594        object_name: &str,
595    ) -> Result<()> {
596        let version = self.meta_client.alter_name(object_id, object_name).await?;
597        self.wait_version(version).await
598    }
599
600    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
601        let version = self.meta_client.alter_owner(object, owner_id).await?;
602        self.wait_version(version).await
603    }
604
605    async fn alter_set_schema(
606        &self,
607        object: alter_set_schema_request::Object,
608        new_schema_id: SchemaId,
609    ) -> Result<()> {
610        let version = self
611            .meta_client
612            .alter_set_schema(object, new_schema_id)
613            .await?;
614        self.wait_version(version).await
615    }
616
617    async fn alter_source(&self, source: PbSource) -> Result<()> {
618        let version = self.meta_client.alter_source(source).await?;
619        self.wait_version(version).await
620    }
621
622    async fn alter_parallelism(
623        &self,
624        job_id: JobId,
625        parallelism: PbTableParallelism,
626        deferred: bool,
627    ) -> Result<()> {
628        self.meta_client
629            .alter_parallelism(job_id, parallelism, deferred)
630            .await?;
631        Ok(())
632    }
633
634    async fn alter_config(
635        &self,
636        job_id: JobId,
637        entries_to_add: HashMap<String, String>,
638        keys_to_remove: Vec<String>,
639    ) -> Result<()> {
640        self.meta_client
641            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
642            .await?;
643        Ok(())
644    }
645
646    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
647        let version = self.meta_client.alter_swap_rename(object).await?;
648        self.wait_version(version).await
649    }
650
651    async fn alter_secret(
652        &self,
653        secret_id: SecretId,
654        secret_name: String,
655        database_id: DatabaseId,
656        schema_id: SchemaId,
657        owner_id: u32,
658        payload: Vec<u8>,
659    ) -> Result<()> {
660        let version = self
661            .meta_client
662            .alter_secret(
663                secret_id,
664                secret_name,
665                database_id,
666                schema_id,
667                owner_id,
668                payload,
669            )
670            .await?;
671        self.wait_version(version).await
672    }
673
674    async fn alter_resource_group(
675        &self,
676        table_id: TableId,
677        resource_group: Option<String>,
678        deferred: bool,
679    ) -> Result<()> {
680        self.meta_client
681            .alter_resource_group(table_id, resource_group, deferred)
682            .await
683            .map_err(|e| anyhow!(e))?;
684
685        Ok(())
686    }
687
688    async fn alter_database_param(
689        &self,
690        database_id: DatabaseId,
691        param: AlterDatabaseParam,
692    ) -> Result<()> {
693        let version = self
694            .meta_client
695            .alter_database_param(database_id, param)
696            .await
697            .map_err(|e| anyhow!(e))?;
698        self.wait_version(version).await
699    }
700
701    async fn create_iceberg_table(
702        &self,
703        table_job_info: PbTableJobInfo,
704        sink_job_info: PbSinkJobInfo,
705        iceberg_source: PbSource,
706        if_not_exists: bool,
707    ) -> Result<()> {
708        let version = self
709            .meta_client
710            .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
711            .await?;
712        self.wait_version(version).await
713    }
714}
715
716impl CatalogWriterImpl {
717    pub fn new(
718        meta_client: MetaClient,
719        catalog_updated_rx: Receiver<CatalogVersion>,
720        hummock_snapshot_manager: HummockSnapshotManagerRef,
721    ) -> Self {
722        Self {
723            meta_client,
724            catalog_updated_rx,
725            hummock_snapshot_manager,
726        }
727    }
728
729    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
730        let mut rx = self.catalog_updated_rx.clone();
731        while *rx.borrow_and_update() < version.catalog_version {
732            rx.changed().await.map_err(|e| anyhow!(e))?;
733        }
734        self.hummock_snapshot_manager
735            .wait(HummockVersionId::new(version.hummock_version_id))
736            .await;
737        Ok(())
738    }
739}