risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
30use risingwave_pb::ddl_service::replace_job_plan::{
31    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
32};
33use risingwave_pb::ddl_service::{
34    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
35    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
36    streaming_job_resource_type,
37};
38use risingwave_pb::meta::PbTableParallelism;
39use risingwave_pb::stream_plan::StreamFragmentGraph;
40use risingwave_rpc_client::MetaClient;
41use tokio::sync::watch::Receiver;
42
43use super::root_catalog::Catalog;
44use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
45use crate::error::Result;
46use crate::scheduler::HummockSnapshotManagerRef;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
51
52/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
53#[derive(Clone)]
54pub struct CatalogReader(Arc<RwLock<Catalog>>);
55
56impl CatalogReader {
57    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
58        CatalogReader(inner)
59    }
60
61    pub fn read_guard(&self) -> CatalogReadGuard {
62        // Make this recursive so that one can get this guard in the same thread without fear.
63        self.0.read_arc_recursive()
64    }
65}
66
67/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
68/// It will only send rpc to meta and get the catalog version as response.
69/// Then it will wait for the local catalog to be synced to the version, which is performed by
70/// [observer](`crate::observer::FrontendObserverNode`).
71#[async_trait::async_trait]
72pub trait CatalogWriter: Send + Sync {
73    async fn create_database(
74        &self,
75        db_name: &str,
76        owner: UserId,
77        resource_group: &str,
78        barrier_interval_ms: Option<u32>,
79        checkpoint_frequency: Option<u64>,
80    ) -> Result<()>;
81
82    async fn create_schema(
83        &self,
84        db_id: DatabaseId,
85        schema_name: &str,
86        owner: UserId,
87    ) -> Result<()>;
88
89    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
90
91    async fn create_materialized_view(
92        &self,
93        table: PbTable,
94        graph: StreamFragmentGraph,
95        dependencies: HashSet<ObjectId>,
96        resource_type: streaming_job_resource_type::ResourceType,
97        if_not_exists: bool,
98    ) -> Result<()>;
99
100    async fn replace_materialized_view(
101        &self,
102        table: PbTable,
103        graph: StreamFragmentGraph,
104    ) -> Result<()>;
105
106    async fn create_table(
107        &self,
108        source: Option<PbSource>,
109        table: PbTable,
110        graph: StreamFragmentGraph,
111        job_type: PbTableJobType,
112        if_not_exists: bool,
113        dependencies: HashSet<ObjectId>,
114    ) -> Result<()>;
115
116    async fn replace_table(
117        &self,
118        source: Option<PbSource>,
119        table: PbTable,
120        graph: StreamFragmentGraph,
121        job_type: TableJobType,
122    ) -> Result<()>;
123
124    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
125
126    async fn create_index(
127        &self,
128        index: PbIndex,
129        table: PbTable,
130        graph: StreamFragmentGraph,
131        if_not_exists: bool,
132    ) -> Result<()>;
133
134    async fn create_source(
135        &self,
136        source: PbSource,
137        graph: Option<StreamFragmentGraph>,
138        if_not_exists: bool,
139    ) -> Result<()>;
140
141    async fn create_sink(
142        &self,
143        sink: PbSink,
144        graph: StreamFragmentGraph,
145        dependencies: HashSet<ObjectId>,
146        if_not_exists: bool,
147    ) -> Result<()>;
148
149    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
150
151    async fn create_function(&self, function: PbFunction) -> Result<()>;
152
153    async fn create_connection(
154        &self,
155        connection_name: String,
156        database_id: DatabaseId,
157        schema_id: SchemaId,
158        owner_id: UserId,
159        connection: create_connection_request::Payload,
160    ) -> Result<()>;
161
162    async fn create_secret(
163        &self,
164        secret_name: String,
165        database_id: DatabaseId,
166        schema_id: SchemaId,
167        owner_id: UserId,
168        payload: Vec<u8>,
169    ) -> Result<()>;
170
171    async fn comment_on(&self, comment: PbComment) -> Result<()>;
172
173    async fn drop_table(
174        &self,
175        source_id: Option<SourceId>,
176        table_id: TableId,
177        cascade: bool,
178    ) -> Result<()>;
179
180    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
181
182    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
183
184    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
185
186    async fn reset_source(&self, source_id: SourceId) -> Result<()>;
187
188    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
189
190    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
191    -> Result<()>;
192
193    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
194
195    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
196
197    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
198
199    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
200
201    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
202
203    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
204
205    async fn alter_secret(
206        &self,
207        secret_id: SecretId,
208        secret_name: String,
209        database_id: DatabaseId,
210        schema_id: SchemaId,
211        owner_id: UserId,
212        payload: Vec<u8>,
213    ) -> Result<()>;
214
215    async fn alter_subscription_retention(
216        &self,
217        subscription_id: SubscriptionId,
218        retention_seconds: u64,
219        definition: String,
220    ) -> Result<()>;
221
222    async fn alter_name(
223        &self,
224        object_id: alter_name_request::Object,
225        object_name: &str,
226    ) -> Result<()>;
227
228    async fn alter_owner(
229        &self,
230        object: alter_owner_request::Object,
231        owner_id: UserId,
232    ) -> Result<()>;
233
234    /// Replace the source in the catalog.
235    async fn alter_source(&self, source: PbSource) -> Result<()>;
236
237    async fn alter_parallelism(
238        &self,
239        job_id: JobId,
240        parallelism: PbTableParallelism,
241        deferred: bool,
242    ) -> Result<()>;
243
244    async fn alter_backfill_parallelism(
245        &self,
246        job_id: JobId,
247        parallelism: Option<PbTableParallelism>,
248        deferred: bool,
249    ) -> Result<()>;
250
251    async fn alter_config(
252        &self,
253        job_id: JobId,
254        entries_to_add: HashMap<String, String>,
255        keys_to_remove: Vec<String>,
256    ) -> Result<()>;
257
258    async fn alter_resource_group(
259        &self,
260        table_id: TableId,
261        resource_group: Option<String>,
262        deferred: bool,
263    ) -> Result<()>;
264
265    async fn alter_set_schema(
266        &self,
267        object: alter_set_schema_request::Object,
268        new_schema_id: SchemaId,
269    ) -> Result<()>;
270
271    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
272
273    async fn alter_database_param(
274        &self,
275        database_id: DatabaseId,
276        param: AlterDatabaseParam,
277    ) -> Result<()>;
278
279    async fn create_iceberg_table(
280        &self,
281        table_job_info: PbTableJobInfo,
282        sink_job_info: PbSinkJobInfo,
283        iceberg_source: PbSource,
284        if_not_exists: bool,
285    ) -> Result<()>;
286
287    async fn wait(&self) -> Result<()>;
288}
289
290#[derive(Clone)]
291pub struct CatalogWriterImpl {
292    meta_client: MetaClient,
293    catalog_updated_rx: Receiver<CatalogVersion>,
294    hummock_snapshot_manager: HummockSnapshotManagerRef,
295}
296
297#[async_trait::async_trait]
298impl CatalogWriter for CatalogWriterImpl {
299    async fn create_database(
300        &self,
301        db_name: &str,
302        owner: UserId,
303        resource_group: &str,
304        barrier_interval_ms: Option<u32>,
305        checkpoint_frequency: Option<u64>,
306    ) -> Result<()> {
307        let version = self
308            .meta_client
309            .create_database(PbDatabase {
310                name: db_name.to_owned(),
311                id: 0.into(),
312                owner,
313                resource_group: resource_group.to_owned(),
314                barrier_interval_ms,
315                checkpoint_frequency,
316            })
317            .await?;
318        self.wait_version(version).await
319    }
320
321    async fn create_schema(
322        &self,
323        db_id: DatabaseId,
324        schema_name: &str,
325        owner: UserId,
326    ) -> Result<()> {
327        let version = self
328            .meta_client
329            .create_schema(PbSchema {
330                id: 0.into(),
331                name: schema_name.to_owned(),
332                database_id: db_id,
333                owner,
334            })
335            .await?;
336        self.wait_version(version).await
337    }
338
339    // TODO: maybe here to pass a materialize plan node
340    async fn create_materialized_view(
341        &self,
342        table: PbTable,
343        graph: StreamFragmentGraph,
344        dependencies: HashSet<ObjectId>,
345        resource_type: streaming_job_resource_type::ResourceType,
346        if_not_exists: bool,
347    ) -> Result<()> {
348        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
349        let version = self
350            .meta_client
351            .create_materialized_view(table, graph, dependencies, resource_type, if_not_exists)
352            .await?;
353        if matches!(create_type, PbCreateType::Foreground) {
354            self.wait_version(version).await?
355        }
356        Ok(())
357    }
358
359    async fn replace_materialized_view(
360        &self,
361        table: PbTable,
362        graph: StreamFragmentGraph,
363    ) -> Result<()> {
364        // TODO: this is a dummy implementation for debugging only.
365        notice_to_user(format!("table: {table:#?}"));
366        notice_to_user(format!("graph: {graph:#?}"));
367
368        let version = self
369            .meta_client
370            .replace_job(
371                graph,
372                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
373            )
374            .await?;
375
376        self.wait_version(version).await
377    }
378
379    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
380        let version = self.meta_client.create_view(view, dependencies).await?;
381        self.wait_version(version).await
382    }
383
384    async fn create_index(
385        &self,
386        index: PbIndex,
387        table: PbTable,
388        graph: StreamFragmentGraph,
389        if_not_exists: bool,
390    ) -> Result<()> {
391        let version = self
392            .meta_client
393            .create_index(index, table, graph, if_not_exists)
394            .await?;
395        self.wait_version(version).await
396    }
397
398    async fn create_table(
399        &self,
400        source: Option<PbSource>,
401        table: PbTable,
402        graph: StreamFragmentGraph,
403        job_type: PbTableJobType,
404        if_not_exists: bool,
405        dependencies: HashSet<ObjectId>,
406    ) -> Result<()> {
407        let version = self
408            .meta_client
409            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
410            .await?;
411        self.wait_version(version).await
412    }
413
414    async fn replace_table(
415        &self,
416        source: Option<PbSource>,
417        table: PbTable,
418        graph: StreamFragmentGraph,
419        job_type: TableJobType,
420    ) -> Result<()> {
421        let version = self
422            .meta_client
423            .replace_job(
424                graph,
425                ReplaceJob::ReplaceTable(ReplaceTable {
426                    source,
427                    table: Some(table),
428                    job_type: job_type as _,
429                }),
430            )
431            .await?;
432        self.wait_version(version).await
433    }
434
435    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
436        let version = self
437            .meta_client
438            .replace_job(
439                graph,
440                ReplaceJob::ReplaceSource(ReplaceSource {
441                    source: Some(source),
442                }),
443            )
444            .await?;
445        self.wait_version(version).await
446    }
447
448    async fn create_source(
449        &self,
450        source: PbSource,
451        graph: Option<StreamFragmentGraph>,
452        if_not_exists: bool,
453    ) -> Result<()> {
454        let version = self
455            .meta_client
456            .create_source(source, graph, if_not_exists)
457            .await?;
458        self.wait_version(version).await
459    }
460
461    async fn create_sink(
462        &self,
463        sink: PbSink,
464        graph: StreamFragmentGraph,
465        dependencies: HashSet<ObjectId>,
466        if_not_exists: bool,
467    ) -> Result<()> {
468        let version = self
469            .meta_client
470            .create_sink(sink, graph, dependencies, if_not_exists)
471            .await?;
472        self.wait_version(version).await
473    }
474
475    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
476        let version = self.meta_client.create_subscription(subscription).await?;
477        self.wait_version(version).await
478    }
479
480    async fn create_function(&self, function: PbFunction) -> Result<()> {
481        let version = self.meta_client.create_function(function).await?;
482        self.wait_version(version).await
483    }
484
485    async fn create_connection(
486        &self,
487        connection_name: String,
488        database_id: DatabaseId,
489        schema_id: SchemaId,
490        owner_id: UserId,
491        connection: create_connection_request::Payload,
492    ) -> Result<()> {
493        let version = self
494            .meta_client
495            .create_connection(
496                connection_name,
497                database_id,
498                schema_id,
499                owner_id,
500                connection,
501            )
502            .await?;
503        self.wait_version(version).await
504    }
505
506    async fn create_secret(
507        &self,
508        secret_name: String,
509        database_id: DatabaseId,
510        schema_id: SchemaId,
511        owner_id: UserId,
512        payload: Vec<u8>,
513    ) -> Result<()> {
514        let version = self
515            .meta_client
516            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
517            .await?;
518        self.wait_version(version).await
519    }
520
521    async fn comment_on(&self, comment: PbComment) -> Result<()> {
522        let version = self.meta_client.comment_on(comment).await?;
523        self.wait_version(version).await
524    }
525
526    async fn drop_table(
527        &self,
528        source_id: Option<SourceId>,
529        table_id: TableId,
530        cascade: bool,
531    ) -> Result<()> {
532        let version = self
533            .meta_client
534            .drop_table(source_id, table_id, cascade)
535            .await?;
536        self.wait_version(version).await
537    }
538
539    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
540        let version = self
541            .meta_client
542            .drop_materialized_view(table_id, cascade)
543            .await?;
544        self.wait_version(version).await
545    }
546
547    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
548        let version = self.meta_client.drop_view(view_id, cascade).await?;
549        self.wait_version(version).await
550    }
551
552    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
553        let version = self.meta_client.drop_source(source_id, cascade).await?;
554        self.wait_version(version).await
555    }
556
557    async fn reset_source(&self, source_id: SourceId) -> Result<()> {
558        let version = self.meta_client.reset_source(source_id).await?;
559        self.wait_version(version).await
560    }
561
562    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
563        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
564        self.wait_version(version).await
565    }
566
567    async fn drop_subscription(
568        &self,
569        subscription_id: SubscriptionId,
570        cascade: bool,
571    ) -> Result<()> {
572        let version = self
573            .meta_client
574            .drop_subscription(subscription_id, cascade)
575            .await?;
576        self.wait_version(version).await
577    }
578
579    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
580        let version = self.meta_client.drop_index(index_id, cascade).await?;
581        self.wait_version(version).await
582    }
583
584    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
585        let version = self.meta_client.drop_function(function_id, cascade).await?;
586        self.wait_version(version).await
587    }
588
589    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
590        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
591        self.wait_version(version).await
592    }
593
594    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
595        let version = self.meta_client.drop_database(database_id).await?;
596        self.wait_version(version).await
597    }
598
599    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
600        let version = self
601            .meta_client
602            .drop_connection(connection_id, cascade)
603            .await?;
604        self.wait_version(version).await
605    }
606
607    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
608        let version = self.meta_client.drop_secret(secret_id, cascade).await?;
609        self.wait_version(version).await
610    }
611
612    async fn alter_name(
613        &self,
614        object_id: alter_name_request::Object,
615        object_name: &str,
616    ) -> Result<()> {
617        let version = self.meta_client.alter_name(object_id, object_name).await?;
618        self.wait_version(version).await
619    }
620
621    async fn alter_owner(
622        &self,
623        object: alter_owner_request::Object,
624        owner_id: UserId,
625    ) -> Result<()> {
626        let version = self.meta_client.alter_owner(object, owner_id).await?;
627        self.wait_version(version).await
628    }
629
630    async fn alter_set_schema(
631        &self,
632        object: alter_set_schema_request::Object,
633        new_schema_id: SchemaId,
634    ) -> Result<()> {
635        let version = self
636            .meta_client
637            .alter_set_schema(object, new_schema_id)
638            .await?;
639        self.wait_version(version).await
640    }
641
642    async fn alter_source(&self, source: PbSource) -> Result<()> {
643        let version = self.meta_client.alter_source(source).await?;
644        self.wait_version(version).await
645    }
646
647    async fn alter_parallelism(
648        &self,
649        job_id: JobId,
650        parallelism: PbTableParallelism,
651        deferred: bool,
652    ) -> Result<()> {
653        self.meta_client
654            .alter_parallelism(job_id, parallelism, deferred)
655            .await?;
656        Ok(())
657    }
658
659    async fn alter_backfill_parallelism(
660        &self,
661        job_id: JobId,
662        parallelism: Option<PbTableParallelism>,
663        deferred: bool,
664    ) -> Result<()> {
665        self.meta_client
666            .alter_backfill_parallelism(job_id, parallelism, deferred)
667            .await?;
668        Ok(())
669    }
670
671    async fn alter_config(
672        &self,
673        job_id: JobId,
674        entries_to_add: HashMap<String, String>,
675        keys_to_remove: Vec<String>,
676    ) -> Result<()> {
677        self.meta_client
678            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
679            .await?;
680        Ok(())
681    }
682
683    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
684        let version = self.meta_client.alter_swap_rename(object).await?;
685        self.wait_version(version).await
686    }
687
688    async fn alter_secret(
689        &self,
690        secret_id: SecretId,
691        secret_name: String,
692        database_id: DatabaseId,
693        schema_id: SchemaId,
694        owner_id: UserId,
695        payload: Vec<u8>,
696    ) -> Result<()> {
697        let version = self
698            .meta_client
699            .alter_secret(
700                secret_id,
701                secret_name,
702                database_id,
703                schema_id,
704                owner_id,
705                payload,
706            )
707            .await?;
708        self.wait_version(version).await
709    }
710
711    async fn alter_subscription_retention(
712        &self,
713        subscription_id: SubscriptionId,
714        retention_seconds: u64,
715        definition: String,
716    ) -> Result<()> {
717        let version = self
718            .meta_client
719            .alter_subscription_retention(subscription_id, retention_seconds, definition)
720            .await?;
721        self.wait_version(version).await
722    }
723
724    async fn alter_resource_group(
725        &self,
726        table_id: TableId,
727        resource_group: Option<String>,
728        deferred: bool,
729    ) -> Result<()> {
730        self.meta_client
731            .alter_resource_group(table_id, resource_group, deferred)
732            .await
733            .map_err(|e| anyhow!(e))?;
734
735        Ok(())
736    }
737
738    async fn alter_database_param(
739        &self,
740        database_id: DatabaseId,
741        param: AlterDatabaseParam,
742    ) -> Result<()> {
743        let version = self
744            .meta_client
745            .alter_database_param(database_id, param)
746            .await
747            .map_err(|e| anyhow!(e))?;
748        self.wait_version(version).await
749    }
750
751    async fn create_iceberg_table(
752        &self,
753        table_job_info: PbTableJobInfo,
754        sink_job_info: PbSinkJobInfo,
755        iceberg_source: PbSource,
756        if_not_exists: bool,
757    ) -> Result<()> {
758        let version = self
759            .meta_client
760            .create_iceberg_table(table_job_info, sink_job_info, iceberg_source, if_not_exists)
761            .await?;
762        self.wait_version(version).await
763    }
764
765    async fn wait(&self) -> Result<()> {
766        let version = self.meta_client.wait().await.map_err(|e| anyhow!(e))?;
767        self.wait_version(version).await
768    }
769}
770
771impl CatalogWriterImpl {
772    pub fn new(
773        meta_client: MetaClient,
774        catalog_updated_rx: Receiver<CatalogVersion>,
775        hummock_snapshot_manager: HummockSnapshotManagerRef,
776    ) -> Self {
777        Self {
778            meta_client,
779            catalog_updated_rx,
780            hummock_snapshot_manager,
781        }
782    }
783
784    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
785        let mut rx = self.catalog_updated_rx.clone();
786        while *rx.borrow_and_update() < version.catalog_version {
787            rx.changed().await.map_err(|e| anyhow!(e))?;
788        }
789        self.hummock_snapshot_manager
790            .wait(version.hummock_version_id)
791            .await;
792        Ok(())
793    }
794}