risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36    streaming_job_resource_type,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
53#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58        CatalogReader(inner)
59    }
60
61    pub fn read_guard(&self) -> CatalogReadGuard {
62        // Make this recursive so that one can get this guard in the same thread without fear.
63        self.0.read_arc_recursive()
64    }
65}
66
67/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
68/// It will only send rpc to meta and get the catalog version as response.
69/// Then it will wait for the local catalog to be synced to the version, which is performed by
70/// [observer](`crate::observer::FrontendObserverNode`).
71#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73    async fn create_database(
74        &self,
75        db_name: &str,
76        owner: UserId,
77        resource_group: &str,
78        barrier_interval_ms: Option<u32>,
79        checkpoint_frequency: Option<u64>,
80    ) -> Result<()>;
81
82    async fn create_schema(
83        &self,
84        db_id: DatabaseId,
85        schema_name: &str,
86        owner: UserId,
87    ) -> Result<()>;
88
89    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91    async fn create_materialized_view(
92        &self,
93        table: PbTable,
94        graph: StreamFragmentGraph,
95        dependencies: HashSet<ObjectId>,
96        resource_type: streaming_job_resource_type::ResourceType,
97        if_not_exists: bool,
98    ) -> Result<()>;
99
100    async fn replace_materialized_view(
101        &self,
102        table: PbTable,
103        graph: StreamFragmentGraph,
104    ) -> Result<()>;
105
106    async fn create_table(
107        &self,
108        source: Option<PbSource>,
109        table: PbTable,
110        graph: StreamFragmentGraph,
111        job_type: PbTableJobType,
112        if_not_exists: bool,
113        dependencies: HashSet<ObjectId>,
114    ) -> Result<()>;
115
116    async fn replace_table(
117        &self,
118        source: Option<PbSource>,
119        table: PbTable,
120        graph: StreamFragmentGraph,
121        job_type: TableJobType,
122    ) -> Result<()>;
123
124    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126    async fn create_index(
127        &self,
128        index: PbIndex,
129        table: PbTable,
130        graph: StreamFragmentGraph,
131        if_not_exists: bool,
132    ) -> Result<()>;
133
134    async fn create_source(
135        &self,
136        source: PbSource,
137        graph: Option<StreamFragmentGraph>,
138        if_not_exists: bool,
139    ) -> Result<()>;
140
141    async fn create_sink(
142        &self,
143        sink: PbSink,
144        graph: StreamFragmentGraph,
145        dependencies: HashSet<ObjectId>,
146        if_not_exists: bool,
147    ) -> Result<()>;
148
149    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151    async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153    async fn create_connection(
154        &self,
155        connection_name: String,
156        database_id: DatabaseId,
157        schema_id: SchemaId,
158        owner_id: UserId,
159        connection: create_connection_request::Payload,
160    ) -> Result<()>;
161
162    async fn create_secret(
163        &self,
164        secret_name: String,
165        database_id: DatabaseId,
166        schema_id: SchemaId,
167        owner_id: UserId,
168        payload: Vec<u8>,
169    ) -> Result<()>;
170
171    async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173    async fn drop_table(
174        &self,
175        source_id: Option<SourceId>,
176        table_id: TableId,
177        cascade: bool,
178    ) -> Result<()>;
179
180    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186    async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191    -> Result<()>;
192
193    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
204
205    async fn alter_secret(
206        &self,
207        secret_id: SecretId,
208        secret_name: String,
209        database_id: DatabaseId,
210        schema_id: SchemaId,
211        owner_id: UserId,
212        payload: Vec<u8>,
213    ) -> Result<()>;
214
215    async fn alter_name(
216        &self,
217        object_id: alter_name_request::Object,
218        object_name: &str,
219    ) -> Result<()>;
220
221    async fn alter_owner(
222        &self,
223        object: alter_owner_request::Object,
224        owner_id: UserId,
225    ) -> Result<()>;
226
227    /// Replace the source in the catalog.
228    async fn alter_source(&self, source: PbSource) -> Result<()>;
229
230    async fn alter_parallelism(
231        &self,
232        job_id: JobId,
233        parallelism: PbTableParallelism,
234        deferred: bool,
235    ) -> Result<()>;
236
237    async fn alter_config(
238        &self,
239        job_id: JobId,
240        entries_to_add: HashMap<String, String>,
241        keys_to_remove: Vec<String>,
242    ) -> Result<()>;
243
244    async fn alter_resource_group(
245        &self,
246        table_id: TableId,
247        resource_group: Option<String>,
248        deferred: bool,
249    ) -> Result<()>;
250
251    async fn alter_set_schema(
252        &self,
253        object: alter_set_schema_request::Object,
254        new_schema_id: SchemaId,
255    ) -> Result<()>;
256
257    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
258
259    async fn alter_database_param(
260        &self,
261        database_id: DatabaseId,
262        param: AlterDatabaseParam,
263    ) -> Result<()>;
264
265    async fn create_iceberg_table(
266        &self,
267        table_job_info: PbTableJobInfo,
268        sink_job_info: PbSinkJobInfo,
269        iceberg_source: PbSource,
270        if_not_exists: bool,
271    ) -> Result<()>;
272
273    async fn wait(&self) -> Result<()>;
274}
275
276#[derive(Clone)]
277pub struct CatalogWriterImpl {
278    meta_client: MetaClient,
279    catalog_updated_rx: Receiver<CatalogVersion>,
280    hummock_snapshot_manager: HummockSnapshotManagerRef,
281}
282
283#[async_trait::async_trait]
284impl CatalogWriter for CatalogWriterImpl {
285    async fn create_database(
286        &self,
287        db_name: &str,
288        owner: UserId,
289        resource_group: &str,
290        barrier_interval_ms: Option<u32>,
291        checkpoint_frequency: Option<u64>,
292    ) -> Result<()> {
293        let version = self
294            .meta_client
295            .create_database(PbDatabase {
296                name: db_name.to_owned(),
297                id: 0.into(),
298                owner,
299                resource_group: resource_group.to_owned(),
300                barrier_interval_ms,
301                checkpoint_frequency,
302            })
303            .await?;
304        self.wait_version(version).await
305    }
306
307    async fn create_schema(
308        &self,
309        db_id: DatabaseId,
310        schema_name: &str,
311        owner: UserId,
312    ) -> Result<()> {
313        let version = self
314            .meta_client
315            .create_schema(PbSchema {
316                id: 0.into(),
317                name: schema_name.to_owned(),
318                database_id: db_id,
319                owner,
320            })
321            .await?;
322        self.wait_version(version).await
323    }
324
325    // TODO: maybe here to pass a materialize plan node
326    async fn create_materialized_view(
327        &self,
328        table: PbTable,
329        graph: StreamFragmentGraph,
330        dependencies: HashSet<ObjectId>,
331        resource_type: streaming_job_resource_type::ResourceType,
332        if_not_exists: bool,
333    ) -> Result<()> {
334        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
335        let version = self
336            .meta_client
337            .create_materialized_view(table, graph, dependencies, resource_type, if_not_exists)
338            .await?;
339        if matches!(create_type, PbCreateType::Foreground) {
340            self.wait_version(version).await?
341        }
342        Ok(())
343    }
344
345    async fn replace_materialized_view(
346        &self,
347        table: PbTable,
348        graph: StreamFragmentGraph,
349    ) -> Result<()> {
350        // TODO: this is a dummy implementation for debugging only.
351        notice_to_user(format!("table: {table:#?}"));
352        notice_to_user(format!("graph: {graph:#?}"));
353
354        let version = self
355            .meta_client
356            .replace_job(
357                graph,
358                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
359            )
360            .await?;
361
362        self.wait_version(version).await
363    }
364
365    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
366        let version = self.meta_client.create_view(view, dependencies).await?;
367        self.wait_version(version).await
368    }
369
370    async fn create_index(
371        &self,
372        index: PbIndex,
373        table: PbTable,
374        graph: StreamFragmentGraph,
375        if_not_exists: bool,
376    ) -> Result<()> {
377        let version = self
378            .meta_client
379            .create_index(index, table, graph, if_not_exists)
380            .await?;
381        self.wait_version(version).await
382    }
383
384    async fn create_table(
385        &self,
386        source: Option<PbSource>,
387        table: PbTable,
388        graph: StreamFragmentGraph,
389        job_type: PbTableJobType,
390        if_not_exists: bool,
391        dependencies: HashSet<ObjectId>,
392    ) -> Result<()> {
393        let version = self
394            .meta_client
395            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
396            .await?;
397        self.wait_version(version).await
398    }
399
400    async fn replace_table(
401        &self,
402        source: Option<PbSource>,
403        table: PbTable,
404        graph: StreamFragmentGraph,
405        job_type: TableJobType,
406    ) -> Result<()> {
407        let version = self
408            .meta_client
409            .replace_job(
410                graph,
411                ReplaceJob::ReplaceTable(ReplaceTable {
412                    source,
413                    table: Some(table),
414                    job_type: job_type as _,
415                }),
416            )
417            .await?;
418        self.wait_version(version).await
419    }
420
421    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
422        let version = self
423            .meta_client
424            .replace_job(
425                graph,
426                ReplaceJob::ReplaceSource(ReplaceSource {
427                    source: Some(source),
428                }),
429            )
430            .await?;
431        self.wait_version(version).await
432    }
433
434    async fn create_source(
435        &self,
436        source: PbSource,
437        graph: Option<StreamFragmentGraph>,
438        if_not_exists: bool,
439    ) -> Result<()> {
440        let version = self
441            .meta_client
442            .create_source(source, graph, if_not_exists)
443            .await?;
444        self.wait_version(version).await
445    }
446
447    async fn create_sink(
448        &self,
449        sink: PbSink,
450        graph: StreamFragmentGraph,
451        dependencies: HashSet<ObjectId>,
452        if_not_exists: bool,
453    ) -> Result<()> {
454        let version = self
455            .meta_client
456            .create_sink(sink, graph, dependencies, if_not_exists)
457            .await?;
458        self.wait_version(version).await
459    }
460
461    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
462        let version = self.meta_client.create_subscription(subscription).await?;
463        self.wait_version(version).await
464    }
465
466    async fn create_function(&self, function: PbFunction) -> Result<()> {
467        let version = self.meta_client.create_function(function).await?;
468        self.wait_version(version).await
469    }
470
471    async fn create_connection(
472        &self,
473        connection_name: String,
474        database_id: DatabaseId,
475        schema_id: SchemaId,
476        owner_id: UserId,
477        connection: create_connection_request::Payload,
478    ) -> Result<()> {
479        let version = self
480            .meta_client
481            .create_connection(
482                connection_name,
483                database_id,
484                schema_id,
485                owner_id,
486                connection,
487            )
488            .await?;
489        self.wait_version(version).await
490    }
491
492    async fn create_secret(
493        &self,
494        secret_name: String,
495        database_id: DatabaseId,
496        schema_id: SchemaId,
497        owner_id: UserId,
498        payload: Vec<u8>,
499    ) -> Result<()> {
500        let version = self
501            .meta_client
502            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
503            .await?;
504        self.wait_version(version).await
505    }
506
507    async fn comment_on(&self, comment: PbComment) -> Result<()> {
508        let version = self.meta_client.comment_on(comment).await?;
509        self.wait_version(version).await
510    }
511
512    async fn drop_table(
513        &self,
514        source_id: Option<SourceId>,
515        table_id: TableId,
516        cascade: bool,
517    ) -> Result<()> {
518        let version = self
519            .meta_client
520            .drop_table(source_id, table_id, cascade)
521            .await?;
522        self.wait_version(version).await
523    }
524
525    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
526        let version = self
527            .meta_client
528            .drop_materialized_view(table_id, cascade)
529            .await?;
530        self.wait_version(version).await
531    }
532
533    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
534        let version = self.meta_client.drop_view(view_id, cascade).await?;
535        self.wait_version(version).await
536    }
537
538    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
539        let version = self.meta_client.drop_source(source_id, cascade).await?;
540        self.wait_version(version).await
541    }
542
543    async fn reset_source(&self, source_id: SourceId) -> Result<()> {
544        let version = self.meta_client.reset_source(source_id).await?;
545        self.wait_version(version).await
546    }
547
548    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
549        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
550        self.wait_version(version).await
551    }
552
553    async fn drop_subscription(
554        &self,
555        subscription_id: SubscriptionId,
556        cascade: bool,
557    ) -> Result<()> {
558        let version = self
559            .meta_client
560            .drop_subscription(subscription_id, cascade)
561            .await?;
562        self.wait_version(version).await
563    }
564
565    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
566        let version = self.meta_client.drop_index(index_id, cascade).await?;
567        self.wait_version(version).await
568    }
569
570    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
571        let version = self.meta_client.drop_function(function_id, cascade).await?;
572        self.wait_version(version).await
573    }
574
575    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
576        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
577        self.wait_version(version).await
578    }
579
580    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
581        let version = self.meta_client.drop_database(database_id).await?;
582        self.wait_version(version).await
583    }
584
585    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
586        let version = self
587            .meta_client
588            .drop_connection(connection_id, cascade)
589            .await?;
590        self.wait_version(version).await
591    }
592
593    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
594        let version = self.meta_client.drop_secret(secret_id, cascade).await?;
595        self.wait_version(version).await
596    }
597
598    async fn alter_name(
599        &self,
600        object_id: alter_name_request::Object,
601        object_name: &str,
602    ) -> Result<()> {
603        let version = self.meta_client.alter_name(object_id, object_name).await?;
604        self.wait_version(version).await
605    }
606
607    async fn alter_owner(
608        &self,
609        object: alter_owner_request::Object,
610        owner_id: UserId,
611    ) -> Result<()> {
612        let version = self.meta_client.alter_owner(object, owner_id).await?;
613        self.wait_version(version).await
614    }
615
616    async fn alter_set_schema(
617        &self,
618        object: alter_set_schema_request::Object,
619        new_schema_id: SchemaId,
620    ) -> Result<()> {
621        let version = self
622            .meta_client
623            .alter_set_schema(object, new_schema_id)
624            .await?;
625        self.wait_version(version).await
626    }
627
628    async fn alter_source(&self, source: PbSource) -> Result<()> {
629        let version = self.meta_client.alter_source(source).await?;
630        self.wait_version(version).await
631    }
632
633    async fn alter_parallelism(
634        &self,
635        job_id: JobId,
636        parallelism: PbTableParallelism,
637        deferred: bool,
638    ) -> Result<()> {
639        self.meta_client
640            .alter_parallelism(job_id, parallelism, deferred)
641            .await?;
642        Ok(())
643    }
644
645    async fn alter_config(
646        &self,
647        job_id: JobId,
648        entries_to_add: HashMap<String, String>,
649        keys_to_remove: Vec<String>,
650    ) -> Result<()> {
651        self.meta_client
652            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
653            .await?;
654        Ok(())
655    }
656
657    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
658        let version = self.meta_client.alter_swap_rename(object).await?;
659        self.wait_version(version).await
660    }
661
662    async fn alter_secret(
663        &self,
664        secret_id: SecretId,
665        secret_name: String,
666        database_id: DatabaseId,
667        schema_id: SchemaId,
668        owner_id: UserId,
669        payload: Vec<u8>,
670    ) -> Result<()> {
671        let version = self
672            .meta_client
673            .alter_secret(
674                secret_id,
675                secret_name,
676                database_id,
677                schema_id,
678                owner_id,
679                payload,
680            )
681            .await?;
682        self.wait_version(version).await
683    }
684
685    async fn alter_resource_group(
686        &self,
687        table_id: TableId,
688        resource_group: Option<String>,
689        deferred: bool,
690    ) -> Result<()> {
691        self.meta_client
692            .alter_resource_group(table_id, resource_group, deferred)
693            .await
694            .map_err(|e| anyhow!(e))?;
695
696        Ok(())
697    }
698
699    async fn alter_database_param(
700        &self,
701        database_id: DatabaseId,
702        param: AlterDatabaseParam,
703    ) -> Result<()> {
704        let version = self
705            .meta_client
706            .alter_database_param(database_id, param)
707            .await
708            .map_err(|e| anyhow!(e))?;
709        self.wait_version(version).await
710    }
711
712    async fn create_iceberg_table(
713        &self,
714        table_job_info: PbTableJobInfo,
715        sink_job_info: PbSinkJobInfo,
716        iceberg_source: PbSource,
717        if_not_exists: bool,
718    ) -> Result<()> {
719        let version = self
720            .meta_client
721            .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
722            .await?;
723        self.wait_version(version).await
724    }
725
726    async fn wait(&self) -> Result<()> {
727        let version = self.meta_client.wait().await.map_err(|e| anyhow!(e))?;
728        self.wait_version(version).await
729    }
730}
731
732impl CatalogWriterImpl {
733    pub fn new(
734        meta_client: MetaClient,
735        catalog_updated_rx: Receiver<CatalogVersion>,
736        hummock_snapshot_manager: HummockSnapshotManagerRef,
737    ) -> Self {
738        Self {
739            meta_client,
740            catalog_updated_rx,
741            hummock_snapshot_manager,
742        }
743    }
744
745    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
746        let mut rx = self.catalog_updated_rx.clone();
747        while *rx.borrow_and_update() < version.catalog_version {
748            rx.changed().await.map_err(|e| anyhow!(e))?;
749        }
750        self.hummock_snapshot_manager
751            .wait(version.hummock_version_id)
752            .await;
753        Ok(())
754    }
755}