risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_hummock_sdk::HummockVersionId;
26use risingwave_pb::catalog::{
27    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28    PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
53#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58        CatalogReader(inner)
59    }
60
61    pub fn read_guard(&self) -> CatalogReadGuard {
62        // Make this recursive so that one can get this guard in the same thread without fear.
63        self.0.read_arc_recursive()
64    }
65}
66
67/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
68/// It will only send rpc to meta and get the catalog version as response.
69/// Then it will wait for the local catalog to be synced to the version, which is performed by
70/// [observer](`crate::observer::FrontendObserverNode`).
71#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73    async fn create_database(
74        &self,
75        db_name: &str,
76        owner: UserId,
77        resource_group: &str,
78        barrier_interval_ms: Option<u32>,
79        checkpoint_frequency: Option<u64>,
80    ) -> Result<()>;
81
82    async fn create_schema(
83        &self,
84        db_id: DatabaseId,
85        schema_name: &str,
86        owner: UserId,
87    ) -> Result<()>;
88
89    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91    async fn create_materialized_view(
92        &self,
93        table: PbTable,
94        graph: StreamFragmentGraph,
95        dependencies: HashSet<ObjectId>,
96        specific_resource_group: Option<String>,
97        if_not_exists: bool,
98    ) -> Result<()>;
99
100    async fn replace_materialized_view(
101        &self,
102        table: PbTable,
103        graph: StreamFragmentGraph,
104    ) -> Result<()>;
105
106    async fn create_table(
107        &self,
108        source: Option<PbSource>,
109        table: PbTable,
110        graph: StreamFragmentGraph,
111        job_type: PbTableJobType,
112        if_not_exists: bool,
113        dependencies: HashSet<ObjectId>,
114    ) -> Result<()>;
115
116    async fn replace_table(
117        &self,
118        source: Option<PbSource>,
119        table: PbTable,
120        graph: StreamFragmentGraph,
121        job_type: TableJobType,
122    ) -> Result<()>;
123
124    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126    async fn create_index(
127        &self,
128        index: PbIndex,
129        table: PbTable,
130        graph: StreamFragmentGraph,
131        if_not_exists: bool,
132    ) -> Result<()>;
133
134    async fn create_source(
135        &self,
136        source: PbSource,
137        graph: Option<StreamFragmentGraph>,
138        if_not_exists: bool,
139    ) -> Result<()>;
140
141    async fn create_sink(
142        &self,
143        sink: PbSink,
144        graph: StreamFragmentGraph,
145        dependencies: HashSet<ObjectId>,
146        if_not_exists: bool,
147    ) -> Result<()>;
148
149    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151    async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153    async fn create_connection(
154        &self,
155        connection_name: String,
156        database_id: DatabaseId,
157        schema_id: SchemaId,
158        owner_id: u32,
159        connection: create_connection_request::Payload,
160    ) -> Result<()>;
161
162    async fn create_secret(
163        &self,
164        secret_name: String,
165        database_id: DatabaseId,
166        schema_id: SchemaId,
167        owner_id: u32,
168        payload: Vec<u8>,
169    ) -> Result<()>;
170
171    async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173    async fn drop_table(
174        &self,
175        source_id: Option<u32>,
176        table_id: TableId,
177        cascade: bool,
178    ) -> Result<()>;
179
180    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186    async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191    -> Result<()>;
192
193    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
204
205    async fn alter_secret(
206        &self,
207        secret_id: SecretId,
208        secret_name: String,
209        database_id: DatabaseId,
210        schema_id: SchemaId,
211        owner_id: u32,
212        payload: Vec<u8>,
213    ) -> Result<()>;
214
215    async fn alter_name(
216        &self,
217        object_id: alter_name_request::Object,
218        object_name: &str,
219    ) -> Result<()>;
220
221    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
222
223    /// Replace the source in the catalog.
224    async fn alter_source(&self, source: PbSource) -> Result<()>;
225
226    async fn alter_parallelism(
227        &self,
228        job_id: JobId,
229        parallelism: PbTableParallelism,
230        deferred: bool,
231    ) -> Result<()>;
232
233    async fn alter_config(
234        &self,
235        job_id: JobId,
236        entries_to_add: HashMap<String, String>,
237        keys_to_remove: Vec<String>,
238    ) -> Result<()>;
239
240    async fn alter_resource_group(
241        &self,
242        table_id: TableId,
243        resource_group: Option<String>,
244        deferred: bool,
245    ) -> Result<()>;
246
247    async fn alter_set_schema(
248        &self,
249        object: alter_set_schema_request::Object,
250        new_schema_id: SchemaId,
251    ) -> Result<()>;
252
253    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
254
255    async fn alter_database_param(
256        &self,
257        database_id: DatabaseId,
258        param: AlterDatabaseParam,
259    ) -> Result<()>;
260
261    async fn create_iceberg_table(
262        &self,
263        table_job_info: PbTableJobInfo,
264        sink_job_info: PbSinkJobInfo,
265        iceberg_source: PbSource,
266        if_not_exists: bool,
267    ) -> Result<()>;
268}
269
270#[derive(Clone)]
271pub struct CatalogWriterImpl {
272    meta_client: MetaClient,
273    catalog_updated_rx: Receiver<CatalogVersion>,
274    hummock_snapshot_manager: HummockSnapshotManagerRef,
275}
276
277#[async_trait::async_trait]
278impl CatalogWriter for CatalogWriterImpl {
279    async fn create_database(
280        &self,
281        db_name: &str,
282        owner: UserId,
283        resource_group: &str,
284        barrier_interval_ms: Option<u32>,
285        checkpoint_frequency: Option<u64>,
286    ) -> Result<()> {
287        let version = self
288            .meta_client
289            .create_database(PbDatabase {
290                name: db_name.to_owned(),
291                id: 0.into(),
292                owner,
293                resource_group: resource_group.to_owned(),
294                barrier_interval_ms,
295                checkpoint_frequency,
296            })
297            .await?;
298        self.wait_version(version).await
299    }
300
301    async fn create_schema(
302        &self,
303        db_id: DatabaseId,
304        schema_name: &str,
305        owner: UserId,
306    ) -> Result<()> {
307        let version = self
308            .meta_client
309            .create_schema(PbSchema {
310                id: 0.into(),
311                name: schema_name.to_owned(),
312                database_id: db_id,
313                owner,
314            })
315            .await?;
316        self.wait_version(version).await
317    }
318
319    // TODO: maybe here to pass a materialize plan node
320    async fn create_materialized_view(
321        &self,
322        table: PbTable,
323        graph: StreamFragmentGraph,
324        dependencies: HashSet<ObjectId>,
325        specific_resource_group: Option<String>,
326        if_not_exists: bool,
327    ) -> Result<()> {
328        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
329        let version = self
330            .meta_client
331            .create_materialized_view(
332                table,
333                graph,
334                dependencies,
335                specific_resource_group,
336                if_not_exists,
337            )
338            .await?;
339        if matches!(create_type, PbCreateType::Foreground) {
340            self.wait_version(version).await?
341        }
342        Ok(())
343    }
344
345    async fn replace_materialized_view(
346        &self,
347        table: PbTable,
348        graph: StreamFragmentGraph,
349    ) -> Result<()> {
350        // TODO: this is a dummy implementation for debugging only.
351        notice_to_user(format!("table: {table:#?}"));
352        notice_to_user(format!("graph: {graph:#?}"));
353
354        let version = self
355            .meta_client
356            .replace_job(
357                graph,
358                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
359            )
360            .await?;
361
362        self.wait_version(version).await
363    }
364
365    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
366        let version = self.meta_client.create_view(view, dependencies).await?;
367        self.wait_version(version).await
368    }
369
370    async fn create_index(
371        &self,
372        index: PbIndex,
373        table: PbTable,
374        graph: StreamFragmentGraph,
375        if_not_exists: bool,
376    ) -> Result<()> {
377        let version = self
378            .meta_client
379            .create_index(index, table, graph, if_not_exists)
380            .await?;
381        self.wait_version(version).await
382    }
383
384    async fn create_table(
385        &self,
386        source: Option<PbSource>,
387        table: PbTable,
388        graph: StreamFragmentGraph,
389        job_type: PbTableJobType,
390        if_not_exists: bool,
391        dependencies: HashSet<ObjectId>,
392    ) -> Result<()> {
393        let version = self
394            .meta_client
395            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
396            .await?;
397        self.wait_version(version).await
398    }
399
400    async fn replace_table(
401        &self,
402        source: Option<PbSource>,
403        table: PbTable,
404        graph: StreamFragmentGraph,
405        job_type: TableJobType,
406    ) -> Result<()> {
407        let version = self
408            .meta_client
409            .replace_job(
410                graph,
411                ReplaceJob::ReplaceTable(ReplaceTable {
412                    source,
413                    table: Some(table),
414                    job_type: job_type as _,
415                }),
416            )
417            .await?;
418        self.wait_version(version).await
419    }
420
421    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
422        let version = self
423            .meta_client
424            .replace_job(
425                graph,
426                ReplaceJob::ReplaceSource(ReplaceSource {
427                    source: Some(source),
428                }),
429            )
430            .await?;
431        self.wait_version(version).await
432    }
433
434    async fn create_source(
435        &self,
436        source: PbSource,
437        graph: Option<StreamFragmentGraph>,
438        if_not_exists: bool,
439    ) -> Result<()> {
440        let version = self
441            .meta_client
442            .create_source(source, graph, if_not_exists)
443            .await?;
444        self.wait_version(version).await
445    }
446
447    async fn create_sink(
448        &self,
449        sink: PbSink,
450        graph: StreamFragmentGraph,
451        dependencies: HashSet<ObjectId>,
452        if_not_exists: bool,
453    ) -> Result<()> {
454        let version = self
455            .meta_client
456            .create_sink(sink, graph, dependencies, if_not_exists)
457            .await?;
458        self.wait_version(version).await
459    }
460
461    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
462        let version = self.meta_client.create_subscription(subscription).await?;
463        self.wait_version(version).await
464    }
465
466    async fn create_function(&self, function: PbFunction) -> Result<()> {
467        let version = self.meta_client.create_function(function).await?;
468        self.wait_version(version).await
469    }
470
471    async fn create_connection(
472        &self,
473        connection_name: String,
474        database_id: DatabaseId,
475        schema_id: SchemaId,
476        owner_id: u32,
477        connection: create_connection_request::Payload,
478    ) -> Result<()> {
479        let version = self
480            .meta_client
481            .create_connection(
482                connection_name,
483                database_id,
484                schema_id,
485                owner_id,
486                connection,
487            )
488            .await?;
489        self.wait_version(version).await
490    }
491
492    async fn create_secret(
493        &self,
494        secret_name: String,
495        database_id: DatabaseId,
496        schema_id: SchemaId,
497        owner_id: u32,
498        payload: Vec<u8>,
499    ) -> Result<()> {
500        let version = self
501            .meta_client
502            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
503            .await?;
504        self.wait_version(version).await
505    }
506
507    async fn comment_on(&self, comment: PbComment) -> Result<()> {
508        let version = self.meta_client.comment_on(comment).await?;
509        self.wait_version(version).await
510    }
511
512    async fn drop_table(
513        &self,
514        source_id: Option<u32>,
515        table_id: TableId,
516        cascade: bool,
517    ) -> Result<()> {
518        let version = self
519            .meta_client
520            .drop_table(source_id, table_id, cascade)
521            .await?;
522        self.wait_version(version).await
523    }
524
525    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
526        let version = self
527            .meta_client
528            .drop_materialized_view(table_id, cascade)
529            .await?;
530        self.wait_version(version).await
531    }
532
533    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
534        let version = self.meta_client.drop_view(view_id, cascade).await?;
535        self.wait_version(version).await
536    }
537
538    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
539        let version = self.meta_client.drop_source(source_id, cascade).await?;
540        self.wait_version(version).await
541    }
542
543    async fn reset_source(&self, source_id: SourceId) -> Result<()> {
544        let version = self.meta_client.reset_source(source_id).await?;
545        self.wait_version(version).await
546    }
547
548    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
549        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
550        self.wait_version(version).await
551    }
552
553    async fn drop_subscription(
554        &self,
555        subscription_id: SubscriptionId,
556        cascade: bool,
557    ) -> Result<()> {
558        let version = self
559            .meta_client
560            .drop_subscription(subscription_id, cascade)
561            .await?;
562        self.wait_version(version).await
563    }
564
565    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
566        let version = self.meta_client.drop_index(index_id, cascade).await?;
567        self.wait_version(version).await
568    }
569
570    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
571        let version = self.meta_client.drop_function(function_id, cascade).await?;
572        self.wait_version(version).await
573    }
574
575    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
576        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
577        self.wait_version(version).await
578    }
579
580    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
581        let version = self.meta_client.drop_database(database_id).await?;
582        self.wait_version(version).await
583    }
584
585    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
586        let version = self
587            .meta_client
588            .drop_connection(connection_id, cascade)
589            .await?;
590        self.wait_version(version).await
591    }
592
593    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
594        let version = self.meta_client.drop_secret(secret_id).await?;
595        self.wait_version(version).await
596    }
597
598    async fn alter_name(
599        &self,
600        object_id: alter_name_request::Object,
601        object_name: &str,
602    ) -> Result<()> {
603        let version = self.meta_client.alter_name(object_id, object_name).await?;
604        self.wait_version(version).await
605    }
606
607    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
608        let version = self.meta_client.alter_owner(object, owner_id).await?;
609        self.wait_version(version).await
610    }
611
612    async fn alter_set_schema(
613        &self,
614        object: alter_set_schema_request::Object,
615        new_schema_id: SchemaId,
616    ) -> Result<()> {
617        let version = self
618            .meta_client
619            .alter_set_schema(object, new_schema_id)
620            .await?;
621        self.wait_version(version).await
622    }
623
624    async fn alter_source(&self, source: PbSource) -> Result<()> {
625        let version = self.meta_client.alter_source(source).await?;
626        self.wait_version(version).await
627    }
628
629    async fn alter_parallelism(
630        &self,
631        job_id: JobId,
632        parallelism: PbTableParallelism,
633        deferred: bool,
634    ) -> Result<()> {
635        self.meta_client
636            .alter_parallelism(job_id, parallelism, deferred)
637            .await?;
638        Ok(())
639    }
640
641    async fn alter_config(
642        &self,
643        job_id: JobId,
644        entries_to_add: HashMap<String, String>,
645        keys_to_remove: Vec<String>,
646    ) -> Result<()> {
647        self.meta_client
648            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
649            .await?;
650        Ok(())
651    }
652
653    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
654        let version = self.meta_client.alter_swap_rename(object).await?;
655        self.wait_version(version).await
656    }
657
658    async fn alter_secret(
659        &self,
660        secret_id: SecretId,
661        secret_name: String,
662        database_id: DatabaseId,
663        schema_id: SchemaId,
664        owner_id: u32,
665        payload: Vec<u8>,
666    ) -> Result<()> {
667        let version = self
668            .meta_client
669            .alter_secret(
670                secret_id,
671                secret_name,
672                database_id,
673                schema_id,
674                owner_id,
675                payload,
676            )
677            .await?;
678        self.wait_version(version).await
679    }
680
681    async fn alter_resource_group(
682        &self,
683        table_id: TableId,
684        resource_group: Option<String>,
685        deferred: bool,
686    ) -> Result<()> {
687        self.meta_client
688            .alter_resource_group(table_id, resource_group, deferred)
689            .await
690            .map_err(|e| anyhow!(e))?;
691
692        Ok(())
693    }
694
695    async fn alter_database_param(
696        &self,
697        database_id: DatabaseId,
698        param: AlterDatabaseParam,
699    ) -> Result<()> {
700        let version = self
701            .meta_client
702            .alter_database_param(database_id, param)
703            .await
704            .map_err(|e| anyhow!(e))?;
705        self.wait_version(version).await
706    }
707
708    async fn create_iceberg_table(
709        &self,
710        table_job_info: PbTableJobInfo,
711        sink_job_info: PbSinkJobInfo,
712        iceberg_source: PbSource,
713        if_not_exists: bool,
714    ) -> Result<()> {
715        let version = self
716            .meta_client
717            .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
718            .await?;
719        self.wait_version(version).await
720    }
721}
722
723impl CatalogWriterImpl {
724    pub fn new(
725        meta_client: MetaClient,
726        catalog_updated_rx: Receiver<CatalogVersion>,
727        hummock_snapshot_manager: HummockSnapshotManagerRef,
728    ) -> Self {
729        Self {
730            meta_client,
731            catalog_updated_rx,
732            hummock_snapshot_manager,
733        }
734    }
735
736    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
737        let mut rx = self.catalog_updated_rx.clone();
738        while *rx.borrow_and_update() < version.catalog_version {
739            rx.changed().await.map_err(|e| anyhow!(e))?;
740        }
741        self.hummock_snapshot_manager
742            .wait(HummockVersionId::new(version.hummock_version_id))
743            .await;
744        Ok(())
745    }
746}