risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36    streaming_job_resource_type,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
53#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58        CatalogReader(inner)
59    }
60
61    pub fn read_guard(&self) -> CatalogReadGuard {
62        // Make this recursive so that one can get this guard in the same thread without fear.
63        self.0.read_arc_recursive()
64    }
65}
66
67/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
68/// It will only send rpc to meta and get the catalog version as response.
69/// Then it will wait for the local catalog to be synced to the version, which is performed by
70/// [observer](`crate::observer::FrontendObserverNode`).
71#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73    async fn create_database(
74        &self,
75        db_name: &str,
76        owner: UserId,
77        resource_group: &str,
78        barrier_interval_ms: Option<u32>,
79        checkpoint_frequency: Option<u64>,
80    ) -> Result<()>;
81
82    async fn create_schema(
83        &self,
84        db_id: DatabaseId,
85        schema_name: &str,
86        owner: UserId,
87    ) -> Result<()>;
88
89    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91    async fn create_materialized_view(
92        &self,
93        table: PbTable,
94        graph: StreamFragmentGraph,
95        dependencies: HashSet<ObjectId>,
96        resource_type: streaming_job_resource_type::ResourceType,
97        if_not_exists: bool,
98    ) -> Result<()>;
99
100    async fn replace_materialized_view(
101        &self,
102        table: PbTable,
103        graph: StreamFragmentGraph,
104    ) -> Result<()>;
105
106    async fn create_table(
107        &self,
108        source: Option<PbSource>,
109        table: PbTable,
110        graph: StreamFragmentGraph,
111        job_type: PbTableJobType,
112        if_not_exists: bool,
113        dependencies: HashSet<ObjectId>,
114    ) -> Result<()>;
115
116    async fn replace_table(
117        &self,
118        source: Option<PbSource>,
119        table: PbTable,
120        graph: StreamFragmentGraph,
121        job_type: TableJobType,
122    ) -> Result<()>;
123
124    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126    async fn create_index(
127        &self,
128        index: PbIndex,
129        table: PbTable,
130        graph: StreamFragmentGraph,
131        if_not_exists: bool,
132    ) -> Result<()>;
133
134    async fn create_source(
135        &self,
136        source: PbSource,
137        graph: Option<StreamFragmentGraph>,
138        if_not_exists: bool,
139    ) -> Result<()>;
140
141    async fn create_sink(
142        &self,
143        sink: PbSink,
144        graph: StreamFragmentGraph,
145        dependencies: HashSet<ObjectId>,
146        if_not_exists: bool,
147    ) -> Result<()>;
148
149    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151    async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153    async fn create_connection(
154        &self,
155        connection_name: String,
156        database_id: DatabaseId,
157        schema_id: SchemaId,
158        owner_id: UserId,
159        connection: create_connection_request::Payload,
160    ) -> Result<()>;
161
162    async fn create_secret(
163        &self,
164        secret_name: String,
165        database_id: DatabaseId,
166        schema_id: SchemaId,
167        owner_id: UserId,
168        payload: Vec<u8>,
169    ) -> Result<()>;
170
171    async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173    async fn drop_table(
174        &self,
175        source_id: Option<SourceId>,
176        table_id: TableId,
177        cascade: bool,
178    ) -> Result<()>;
179
180    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186    async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191    -> Result<()>;
192
193    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
204
205    async fn alter_secret(
206        &self,
207        secret_id: SecretId,
208        secret_name: String,
209        database_id: DatabaseId,
210        schema_id: SchemaId,
211        owner_id: UserId,
212        payload: Vec<u8>,
213    ) -> Result<()>;
214
215    async fn alter_name(
216        &self,
217        object_id: alter_name_request::Object,
218        object_name: &str,
219    ) -> Result<()>;
220
221    async fn alter_owner(
222        &self,
223        object: alter_owner_request::Object,
224        owner_id: UserId,
225    ) -> Result<()>;
226
227    /// Replace the source in the catalog.
228    async fn alter_source(&self, source: PbSource) -> Result<()>;
229
230    async fn alter_parallelism(
231        &self,
232        job_id: JobId,
233        parallelism: PbTableParallelism,
234        deferred: bool,
235    ) -> Result<()>;
236
237    async fn alter_config(
238        &self,
239        job_id: JobId,
240        entries_to_add: HashMap<String, String>,
241        keys_to_remove: Vec<String>,
242    ) -> Result<()>;
243
244    async fn alter_resource_group(
245        &self,
246        table_id: TableId,
247        resource_group: Option<String>,
248        deferred: bool,
249    ) -> Result<()>;
250
251    async fn alter_set_schema(
252        &self,
253        object: alter_set_schema_request::Object,
254        new_schema_id: SchemaId,
255    ) -> Result<()>;
256
257    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
258
259    async fn alter_database_param(
260        &self,
261        database_id: DatabaseId,
262        param: AlterDatabaseParam,
263    ) -> Result<()>;
264
265    async fn create_iceberg_table(
266        &self,
267        table_job_info: PbTableJobInfo,
268        sink_job_info: PbSinkJobInfo,
269        iceberg_source: PbSource,
270        if_not_exists: bool,
271    ) -> Result<()>;
272}
273
274#[derive(Clone)]
275pub struct CatalogWriterImpl {
276    meta_client: MetaClient,
277    catalog_updated_rx: Receiver<CatalogVersion>,
278    hummock_snapshot_manager: HummockSnapshotManagerRef,
279}
280
281#[async_trait::async_trait]
282impl CatalogWriter for CatalogWriterImpl {
283    async fn create_database(
284        &self,
285        db_name: &str,
286        owner: UserId,
287        resource_group: &str,
288        barrier_interval_ms: Option<u32>,
289        checkpoint_frequency: Option<u64>,
290    ) -> Result<()> {
291        let version = self
292            .meta_client
293            .create_database(PbDatabase {
294                name: db_name.to_owned(),
295                id: 0.into(),
296                owner,
297                resource_group: resource_group.to_owned(),
298                barrier_interval_ms,
299                checkpoint_frequency,
300            })
301            .await?;
302        self.wait_version(version).await
303    }
304
305    async fn create_schema(
306        &self,
307        db_id: DatabaseId,
308        schema_name: &str,
309        owner: UserId,
310    ) -> Result<()> {
311        let version = self
312            .meta_client
313            .create_schema(PbSchema {
314                id: 0.into(),
315                name: schema_name.to_owned(),
316                database_id: db_id,
317                owner,
318            })
319            .await?;
320        self.wait_version(version).await
321    }
322
323    // TODO: maybe here to pass a materialize plan node
324    async fn create_materialized_view(
325        &self,
326        table: PbTable,
327        graph: StreamFragmentGraph,
328        dependencies: HashSet<ObjectId>,
329        resource_type: streaming_job_resource_type::ResourceType,
330        if_not_exists: bool,
331    ) -> Result<()> {
332        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
333        let version = self
334            .meta_client
335            .create_materialized_view(table, graph, dependencies, resource_type, if_not_exists)
336            .await?;
337        if matches!(create_type, PbCreateType::Foreground) {
338            self.wait_version(version).await?
339        }
340        Ok(())
341    }
342
343    async fn replace_materialized_view(
344        &self,
345        table: PbTable,
346        graph: StreamFragmentGraph,
347    ) -> Result<()> {
348        // TODO: this is a dummy implementation for debugging only.
349        notice_to_user(format!("table: {table:#?}"));
350        notice_to_user(format!("graph: {graph:#?}"));
351
352        let version = self
353            .meta_client
354            .replace_job(
355                graph,
356                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
357            )
358            .await?;
359
360        self.wait_version(version).await
361    }
362
363    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
364        let version = self.meta_client.create_view(view, dependencies).await?;
365        self.wait_version(version).await
366    }
367
368    async fn create_index(
369        &self,
370        index: PbIndex,
371        table: PbTable,
372        graph: StreamFragmentGraph,
373        if_not_exists: bool,
374    ) -> Result<()> {
375        let version = self
376            .meta_client
377            .create_index(index, table, graph, if_not_exists)
378            .await?;
379        self.wait_version(version).await
380    }
381
382    async fn create_table(
383        &self,
384        source: Option<PbSource>,
385        table: PbTable,
386        graph: StreamFragmentGraph,
387        job_type: PbTableJobType,
388        if_not_exists: bool,
389        dependencies: HashSet<ObjectId>,
390    ) -> Result<()> {
391        let version = self
392            .meta_client
393            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
394            .await?;
395        self.wait_version(version).await
396    }
397
398    async fn replace_table(
399        &self,
400        source: Option<PbSource>,
401        table: PbTable,
402        graph: StreamFragmentGraph,
403        job_type: TableJobType,
404    ) -> Result<()> {
405        let version = self
406            .meta_client
407            .replace_job(
408                graph,
409                ReplaceJob::ReplaceTable(ReplaceTable {
410                    source,
411                    table: Some(table),
412                    job_type: job_type as _,
413                }),
414            )
415            .await?;
416        self.wait_version(version).await
417    }
418
419    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
420        let version = self
421            .meta_client
422            .replace_job(
423                graph,
424                ReplaceJob::ReplaceSource(ReplaceSource {
425                    source: Some(source),
426                }),
427            )
428            .await?;
429        self.wait_version(version).await
430    }
431
432    async fn create_source(
433        &self,
434        source: PbSource,
435        graph: Option<StreamFragmentGraph>,
436        if_not_exists: bool,
437    ) -> Result<()> {
438        let version = self
439            .meta_client
440            .create_source(source, graph, if_not_exists)
441            .await?;
442        self.wait_version(version).await
443    }
444
445    async fn create_sink(
446        &self,
447        sink: PbSink,
448        graph: StreamFragmentGraph,
449        dependencies: HashSet<ObjectId>,
450        if_not_exists: bool,
451    ) -> Result<()> {
452        let version = self
453            .meta_client
454            .create_sink(sink, graph, dependencies, if_not_exists)
455            .await?;
456        self.wait_version(version).await
457    }
458
459    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
460        let version = self.meta_client.create_subscription(subscription).await?;
461        self.wait_version(version).await
462    }
463
464    async fn create_function(&self, function: PbFunction) -> Result<()> {
465        let version = self.meta_client.create_function(function).await?;
466        self.wait_version(version).await
467    }
468
469    async fn create_connection(
470        &self,
471        connection_name: String,
472        database_id: DatabaseId,
473        schema_id: SchemaId,
474        owner_id: UserId,
475        connection: create_connection_request::Payload,
476    ) -> Result<()> {
477        let version = self
478            .meta_client
479            .create_connection(
480                connection_name,
481                database_id,
482                schema_id,
483                owner_id,
484                connection,
485            )
486            .await?;
487        self.wait_version(version).await
488    }
489
490    async fn create_secret(
491        &self,
492        secret_name: String,
493        database_id: DatabaseId,
494        schema_id: SchemaId,
495        owner_id: UserId,
496        payload: Vec<u8>,
497    ) -> Result<()> {
498        let version = self
499            .meta_client
500            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
501            .await?;
502        self.wait_version(version).await
503    }
504
505    async fn comment_on(&self, comment: PbComment) -> Result<()> {
506        let version = self.meta_client.comment_on(comment).await?;
507        self.wait_version(version).await
508    }
509
510    async fn drop_table(
511        &self,
512        source_id: Option<SourceId>,
513        table_id: TableId,
514        cascade: bool,
515    ) -> Result<()> {
516        let version = self
517            .meta_client
518            .drop_table(source_id, table_id, cascade)
519            .await?;
520        self.wait_version(version).await
521    }
522
523    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
524        let version = self
525            .meta_client
526            .drop_materialized_view(table_id, cascade)
527            .await?;
528        self.wait_version(version).await
529    }
530
531    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
532        let version = self.meta_client.drop_view(view_id, cascade).await?;
533        self.wait_version(version).await
534    }
535
536    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
537        let version = self.meta_client.drop_source(source_id, cascade).await?;
538        self.wait_version(version).await
539    }
540
541    async fn reset_source(&self, source_id: SourceId) -> Result<()> {
542        let version = self.meta_client.reset_source(source_id).await?;
543        self.wait_version(version).await
544    }
545
546    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
547        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
548        self.wait_version(version).await
549    }
550
551    async fn drop_subscription(
552        &self,
553        subscription_id: SubscriptionId,
554        cascade: bool,
555    ) -> Result<()> {
556        let version = self
557            .meta_client
558            .drop_subscription(subscription_id, cascade)
559            .await?;
560        self.wait_version(version).await
561    }
562
563    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
564        let version = self.meta_client.drop_index(index_id, cascade).await?;
565        self.wait_version(version).await
566    }
567
568    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
569        let version = self.meta_client.drop_function(function_id, cascade).await?;
570        self.wait_version(version).await
571    }
572
573    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
574        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
575        self.wait_version(version).await
576    }
577
578    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
579        let version = self.meta_client.drop_database(database_id).await?;
580        self.wait_version(version).await
581    }
582
583    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
584        let version = self
585            .meta_client
586            .drop_connection(connection_id, cascade)
587            .await?;
588        self.wait_version(version).await
589    }
590
591    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
592        let version = self.meta_client.drop_secret(secret_id, cascade).await?;
593        self.wait_version(version).await
594    }
595
596    async fn alter_name(
597        &self,
598        object_id: alter_name_request::Object,
599        object_name: &str,
600    ) -> Result<()> {
601        let version = self.meta_client.alter_name(object_id, object_name).await?;
602        self.wait_version(version).await
603    }
604
605    async fn alter_owner(
606        &self,
607        object: alter_owner_request::Object,
608        owner_id: UserId,
609    ) -> Result<()> {
610        let version = self.meta_client.alter_owner(object, owner_id).await?;
611        self.wait_version(version).await
612    }
613
614    async fn alter_set_schema(
615        &self,
616        object: alter_set_schema_request::Object,
617        new_schema_id: SchemaId,
618    ) -> Result<()> {
619        let version = self
620            .meta_client
621            .alter_set_schema(object, new_schema_id)
622            .await?;
623        self.wait_version(version).await
624    }
625
626    async fn alter_source(&self, source: PbSource) -> Result<()> {
627        let version = self.meta_client.alter_source(source).await?;
628        self.wait_version(version).await
629    }
630
631    async fn alter_parallelism(
632        &self,
633        job_id: JobId,
634        parallelism: PbTableParallelism,
635        deferred: bool,
636    ) -> Result<()> {
637        self.meta_client
638            .alter_parallelism(job_id, parallelism, deferred)
639            .await?;
640        Ok(())
641    }
642
643    async fn alter_config(
644        &self,
645        job_id: JobId,
646        entries_to_add: HashMap<String, String>,
647        keys_to_remove: Vec<String>,
648    ) -> Result<()> {
649        self.meta_client
650            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
651            .await?;
652        Ok(())
653    }
654
655    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
656        let version = self.meta_client.alter_swap_rename(object).await?;
657        self.wait_version(version).await
658    }
659
660    async fn alter_secret(
661        &self,
662        secret_id: SecretId,
663        secret_name: String,
664        database_id: DatabaseId,
665        schema_id: SchemaId,
666        owner_id: UserId,
667        payload: Vec<u8>,
668    ) -> Result<()> {
669        let version = self
670            .meta_client
671            .alter_secret(
672                secret_id,
673                secret_name,
674                database_id,
675                schema_id,
676                owner_id,
677                payload,
678            )
679            .await?;
680        self.wait_version(version).await
681    }
682
683    async fn alter_resource_group(
684        &self,
685        table_id: TableId,
686        resource_group: Option<String>,
687        deferred: bool,
688    ) -> Result<()> {
689        self.meta_client
690            .alter_resource_group(table_id, resource_group, deferred)
691            .await
692            .map_err(|e| anyhow!(e))?;
693
694        Ok(())
695    }
696
697    async fn alter_database_param(
698        &self,
699        database_id: DatabaseId,
700        param: AlterDatabaseParam,
701    ) -> Result<()> {
702        let version = self
703            .meta_client
704            .alter_database_param(database_id, param)
705            .await
706            .map_err(|e| anyhow!(e))?;
707        self.wait_version(version).await
708    }
709
710    async fn create_iceberg_table(
711        &self,
712        table_job_info: PbTableJobInfo,
713        sink_job_info: PbSinkJobInfo,
714        iceberg_source: PbSource,
715        if_not_exists: bool,
716    ) -> Result<()> {
717        let version = self
718            .meta_client
719            .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
720            .await?;
721        self.wait_version(version).await
722    }
723}
724
725impl CatalogWriterImpl {
726    pub fn new(
727        meta_client: MetaClient,
728        catalog_updated_rx: Receiver<CatalogVersion>,
729        hummock_snapshot_manager: HummockSnapshotManagerRef,
730    ) -> Self {
731        Self {
732            meta_client,
733            catalog_updated_rx,
734            hummock_snapshot_manager,
735        }
736    }
737
738    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
739        let mut rx = self.catalog_updated_rx.clone();
740        while *rx.borrow_and_update() < version.catalog_version {
741            rx.changed().await.map_err(|e| anyhow!(e))?;
742        }
743        self.hummock_snapshot_manager
744            .wait(version.hummock_version_id)
745            .await;
746        Ok(())
747    }
748}