risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_hummock_sdk::HummockVersionId;
25use risingwave_pb::catalog::{
26    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
27    PbSubscription, PbTable, PbView,
28};
29use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable};
30use risingwave_pb::ddl_service::{
31    PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
32    alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
33    create_connection_request,
34};
35use risingwave_pb::meta::PbTableParallelism;
36use risingwave_pb::stream_plan::StreamFragmentGraph;
37use risingwave_rpc_client::MetaClient;
38use tokio::sync::watch::Receiver;
39
40use super::root_catalog::Catalog;
41use super::{DatabaseId, SecretId, TableId};
42use crate::error::Result;
43use crate::scheduler::HummockSnapshotManagerRef;
44use crate::user::UserId;
45
46pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
47
48/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
49#[derive(Clone)]
50pub struct CatalogReader(Arc<RwLock<Catalog>>);
51
52impl CatalogReader {
53    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
54        CatalogReader(inner)
55    }
56
57    pub fn read_guard(&self) -> CatalogReadGuard {
58        // Make this recursive so that one can get this guard in the same thread without fear.
59        self.0.read_arc_recursive()
60    }
61}
62
63/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
64/// It will only send rpc to meta and get the catalog version as response.
65/// Then it will wait for the local catalog to be synced to the version, which is performed by
66/// [observer](`crate::observer::FrontendObserverNode`).
67#[async_trait::async_trait]
68pub trait CatalogWriter: Send + Sync {
69    async fn create_database(
70        &self,
71        db_name: &str,
72        owner: UserId,
73        resource_group: &str,
74        barrier_interval_ms: Option<u32>,
75        checkpoint_frequency: Option<u64>,
76    ) -> Result<()>;
77
78    async fn create_schema(
79        &self,
80        db_id: DatabaseId,
81        schema_name: &str,
82        owner: UserId,
83    ) -> Result<()>;
84
85    async fn create_view(&self, view: PbView) -> Result<()>;
86
87    async fn create_materialized_view(
88        &self,
89        table: PbTable,
90        graph: StreamFragmentGraph,
91        dependencies: HashSet<ObjectId>,
92        specific_resource_group: Option<String>,
93        if_not_exists: bool,
94    ) -> Result<()>;
95
96    async fn create_table(
97        &self,
98        source: Option<PbSource>,
99        table: PbTable,
100        graph: StreamFragmentGraph,
101        job_type: PbTableJobType,
102        if_not_exists: bool,
103    ) -> Result<()>;
104
105    async fn replace_table(
106        &self,
107        source: Option<PbSource>,
108        table: PbTable,
109        graph: StreamFragmentGraph,
110        job_type: TableJobType,
111    ) -> Result<()>;
112
113    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
114
115    async fn create_index(
116        &self,
117        index: PbIndex,
118        table: PbTable,
119        graph: StreamFragmentGraph,
120        if_not_exists: bool,
121    ) -> Result<()>;
122
123    async fn create_source(
124        &self,
125        source: PbSource,
126        graph: Option<StreamFragmentGraph>,
127        if_not_exists: bool,
128    ) -> Result<()>;
129
130    async fn create_sink(
131        &self,
132        sink: PbSink,
133        graph: StreamFragmentGraph,
134        affected_table_change: Option<PbReplaceJobPlan>,
135        dependencies: HashSet<ObjectId>,
136        if_not_exists: bool,
137    ) -> Result<()>;
138
139    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
140
141    async fn create_function(&self, function: PbFunction) -> Result<()>;
142
143    async fn create_connection(
144        &self,
145        connection_name: String,
146        database_id: u32,
147        schema_id: u32,
148        owner_id: u32,
149        connection: create_connection_request::Payload,
150    ) -> Result<()>;
151
152    async fn create_secret(
153        &self,
154        secret_name: String,
155        database_id: u32,
156        schema_id: u32,
157        owner_id: u32,
158        payload: Vec<u8>,
159    ) -> Result<()>;
160
161    async fn comment_on(&self, comment: PbComment) -> Result<()>;
162
163    async fn drop_table(
164        &self,
165        source_id: Option<u32>,
166        table_id: TableId,
167        cascade: bool,
168    ) -> Result<()>;
169
170    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
171
172    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
173
174    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
175
176    async fn drop_sink(
177        &self,
178        sink_id: u32,
179        cascade: bool,
180        affected_table_change: Option<PbReplaceJobPlan>,
181    ) -> Result<()>;
182
183    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
184
185    async fn drop_database(&self, database_id: u32) -> Result<()>;
186
187    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
188
189    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
190
191    async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
192
193    async fn drop_connection(&self, connection_id: u32) -> Result<()>;
194
195    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
196
197    async fn alter_secret(
198        &self,
199        secret_id: u32,
200        secret_name: String,
201        database_id: u32,
202        schema_id: u32,
203        owner_id: u32,
204        payload: Vec<u8>,
205    ) -> Result<()>;
206
207    async fn alter_name(
208        &self,
209        object_id: alter_name_request::Object,
210        object_name: &str,
211    ) -> Result<()>;
212
213    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
214
215    /// Replace the source in the catalog.
216    async fn alter_source(&self, source: PbSource) -> Result<()>;
217
218    async fn alter_parallelism(
219        &self,
220        job_id: u32,
221        parallelism: PbTableParallelism,
222        deferred: bool,
223    ) -> Result<()>;
224
225    async fn alter_resource_group(
226        &self,
227        table_id: u32,
228        resource_group: Option<String>,
229        deferred: bool,
230    ) -> Result<()>;
231
232    async fn alter_set_schema(
233        &self,
234        object: alter_set_schema_request::Object,
235        new_schema_id: u32,
236    ) -> Result<()>;
237
238    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
239
240    async fn alter_database_param(
241        &self,
242        database_id: DatabaseId,
243        param: AlterDatabaseParam,
244    ) -> Result<()>;
245}
246
247#[derive(Clone)]
248pub struct CatalogWriterImpl {
249    meta_client: MetaClient,
250    catalog_updated_rx: Receiver<CatalogVersion>,
251    hummock_snapshot_manager: HummockSnapshotManagerRef,
252}
253
254#[async_trait::async_trait]
255impl CatalogWriter for CatalogWriterImpl {
256    async fn create_database(
257        &self,
258        db_name: &str,
259        owner: UserId,
260        resource_group: &str,
261        barrier_interval_ms: Option<u32>,
262        checkpoint_frequency: Option<u64>,
263    ) -> Result<()> {
264        let version = self
265            .meta_client
266            .create_database(PbDatabase {
267                name: db_name.to_owned(),
268                id: 0,
269                owner,
270                resource_group: resource_group.to_owned(),
271                barrier_interval_ms,
272                checkpoint_frequency,
273            })
274            .await?;
275        self.wait_version(version).await
276    }
277
278    async fn create_schema(
279        &self,
280        db_id: DatabaseId,
281        schema_name: &str,
282        owner: UserId,
283    ) -> Result<()> {
284        let version = self
285            .meta_client
286            .create_schema(PbSchema {
287                id: 0,
288                name: schema_name.to_owned(),
289                database_id: db_id,
290                owner,
291            })
292            .await?;
293        self.wait_version(version).await
294    }
295
296    // TODO: maybe here to pass a materialize plan node
297    async fn create_materialized_view(
298        &self,
299        table: PbTable,
300        graph: StreamFragmentGraph,
301        dependencies: HashSet<ObjectId>,
302        specific_resource_group: Option<String>,
303        if_not_exists: bool,
304    ) -> Result<()> {
305        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
306        let version = self
307            .meta_client
308            .create_materialized_view(
309                table,
310                graph,
311                dependencies,
312                specific_resource_group,
313                if_not_exists,
314            )
315            .await?;
316        if matches!(create_type, PbCreateType::Foreground) {
317            self.wait_version(version).await?
318        }
319        Ok(())
320    }
321
322    async fn create_view(&self, view: PbView) -> Result<()> {
323        let version = self.meta_client.create_view(view).await?;
324        self.wait_version(version).await
325    }
326
327    async fn create_index(
328        &self,
329        index: PbIndex,
330        table: PbTable,
331        graph: StreamFragmentGraph,
332        if_not_exists: bool,
333    ) -> Result<()> {
334        let version = self
335            .meta_client
336            .create_index(index, table, graph, if_not_exists)
337            .await?;
338        self.wait_version(version).await
339    }
340
341    async fn create_table(
342        &self,
343        source: Option<PbSource>,
344        table: PbTable,
345        graph: StreamFragmentGraph,
346        job_type: PbTableJobType,
347        if_not_exists: bool,
348    ) -> Result<()> {
349        let version = self
350            .meta_client
351            .create_table(source, table, graph, job_type, if_not_exists)
352            .await?;
353        self.wait_version(version).await
354    }
355
356    async fn replace_table(
357        &self,
358        source: Option<PbSource>,
359        table: PbTable,
360        graph: StreamFragmentGraph,
361        job_type: TableJobType,
362    ) -> Result<()> {
363        let version = self
364            .meta_client
365            .replace_job(
366                graph,
367                ReplaceJob::ReplaceTable(ReplaceTable {
368                    source,
369                    table: Some(table),
370                    job_type: job_type as _,
371                }),
372            )
373            .await?;
374        self.wait_version(version).await
375    }
376
377    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
378        let version = self
379            .meta_client
380            .replace_job(
381                graph,
382                ReplaceJob::ReplaceSource(ReplaceSource {
383                    source: Some(source),
384                }),
385            )
386            .await?;
387        self.wait_version(version).await
388    }
389
390    async fn create_source(
391        &self,
392        source: PbSource,
393        graph: Option<StreamFragmentGraph>,
394        if_not_exists: bool,
395    ) -> Result<()> {
396        let version = self
397            .meta_client
398            .create_source(source, graph, if_not_exists)
399            .await?;
400        self.wait_version(version).await
401    }
402
403    async fn create_sink(
404        &self,
405        sink: PbSink,
406        graph: StreamFragmentGraph,
407        affected_table_change: Option<ReplaceJobPlan>,
408        dependencies: HashSet<ObjectId>,
409        if_not_exists: bool,
410    ) -> Result<()> {
411        let version = self
412            .meta_client
413            .create_sink(
414                sink,
415                graph,
416                affected_table_change,
417                dependencies,
418                if_not_exists,
419            )
420            .await?;
421        self.wait_version(version).await
422    }
423
424    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
425        let version = self.meta_client.create_subscription(subscription).await?;
426        self.wait_version(version).await
427    }
428
429    async fn create_function(&self, function: PbFunction) -> Result<()> {
430        let version = self.meta_client.create_function(function).await?;
431        self.wait_version(version).await
432    }
433
434    async fn create_connection(
435        &self,
436        connection_name: String,
437        database_id: u32,
438        schema_id: u32,
439        owner_id: u32,
440        connection: create_connection_request::Payload,
441    ) -> Result<()> {
442        let version = self
443            .meta_client
444            .create_connection(
445                connection_name,
446                database_id,
447                schema_id,
448                owner_id,
449                connection,
450            )
451            .await?;
452        self.wait_version(version).await
453    }
454
455    async fn create_secret(
456        &self,
457        secret_name: String,
458        database_id: u32,
459        schema_id: u32,
460        owner_id: u32,
461        payload: Vec<u8>,
462    ) -> Result<()> {
463        let version = self
464            .meta_client
465            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
466            .await?;
467        self.wait_version(version).await
468    }
469
470    async fn comment_on(&self, comment: PbComment) -> Result<()> {
471        let version = self.meta_client.comment_on(comment).await?;
472        self.wait_version(version).await
473    }
474
475    async fn drop_table(
476        &self,
477        source_id: Option<u32>,
478        table_id: TableId,
479        cascade: bool,
480    ) -> Result<()> {
481        let version = self
482            .meta_client
483            .drop_table(source_id, table_id, cascade)
484            .await?;
485        self.wait_version(version).await
486    }
487
488    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
489        let version = self
490            .meta_client
491            .drop_materialized_view(table_id, cascade)
492            .await?;
493        self.wait_version(version).await
494    }
495
496    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
497        let version = self.meta_client.drop_view(view_id, cascade).await?;
498        self.wait_version(version).await
499    }
500
501    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
502        let version = self.meta_client.drop_source(source_id, cascade).await?;
503        self.wait_version(version).await
504    }
505
506    async fn drop_sink(
507        &self,
508        sink_id: u32,
509        cascade: bool,
510        affected_table_change: Option<ReplaceJobPlan>,
511    ) -> Result<()> {
512        let version = self
513            .meta_client
514            .drop_sink(sink_id, cascade, affected_table_change)
515            .await?;
516        self.wait_version(version).await
517    }
518
519    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
520        let version = self
521            .meta_client
522            .drop_subscription(subscription_id, cascade)
523            .await?;
524        self.wait_version(version).await
525    }
526
527    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
528        let version = self.meta_client.drop_index(index_id, cascade).await?;
529        self.wait_version(version).await
530    }
531
532    async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
533        let version = self.meta_client.drop_function(function_id).await?;
534        self.wait_version(version).await
535    }
536
537    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
538        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
539        self.wait_version(version).await
540    }
541
542    async fn drop_database(&self, database_id: u32) -> Result<()> {
543        let version = self.meta_client.drop_database(database_id).await?;
544        self.wait_version(version).await
545    }
546
547    async fn drop_connection(&self, connection_id: u32) -> Result<()> {
548        let version = self.meta_client.drop_connection(connection_id).await?;
549        self.wait_version(version).await
550    }
551
552    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
553        let version = self.meta_client.drop_secret(secret_id).await?;
554        self.wait_version(version).await
555    }
556
557    async fn alter_name(
558        &self,
559        object_id: alter_name_request::Object,
560        object_name: &str,
561    ) -> Result<()> {
562        let version = self.meta_client.alter_name(object_id, object_name).await?;
563        self.wait_version(version).await
564    }
565
566    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
567        let version = self.meta_client.alter_owner(object, owner_id).await?;
568        self.wait_version(version).await
569    }
570
571    async fn alter_set_schema(
572        &self,
573        object: alter_set_schema_request::Object,
574        new_schema_id: u32,
575    ) -> Result<()> {
576        let version = self
577            .meta_client
578            .alter_set_schema(object, new_schema_id)
579            .await?;
580        self.wait_version(version).await
581    }
582
583    async fn alter_source(&self, source: PbSource) -> Result<()> {
584        let version = self.meta_client.alter_source(source).await?;
585        self.wait_version(version).await
586    }
587
588    async fn alter_parallelism(
589        &self,
590        job_id: u32,
591        parallelism: PbTableParallelism,
592        deferred: bool,
593    ) -> Result<()> {
594        self.meta_client
595            .alter_parallelism(job_id, parallelism, deferred)
596            .await
597            .map_err(|e| anyhow!(e))?;
598
599        Ok(())
600    }
601
602    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
603        let version = self.meta_client.alter_swap_rename(object).await?;
604        self.wait_version(version).await
605    }
606
607    async fn alter_secret(
608        &self,
609        secret_id: u32,
610        secret_name: String,
611        database_id: u32,
612        schema_id: u32,
613        owner_id: u32,
614        payload: Vec<u8>,
615    ) -> Result<()> {
616        let version = self
617            .meta_client
618            .alter_secret(
619                secret_id,
620                secret_name,
621                database_id,
622                schema_id,
623                owner_id,
624                payload,
625            )
626            .await?;
627        self.wait_version(version).await
628    }
629
630    async fn alter_resource_group(
631        &self,
632        table_id: u32,
633        resource_group: Option<String>,
634        deferred: bool,
635    ) -> Result<()> {
636        self.meta_client
637            .alter_resource_group(table_id, resource_group, deferred)
638            .await
639            .map_err(|e| anyhow!(e))?;
640
641        Ok(())
642    }
643
644    async fn alter_database_param(
645        &self,
646        database_id: DatabaseId,
647        param: AlterDatabaseParam,
648    ) -> Result<()> {
649        let version = self
650            .meta_client
651            .alter_database_param(database_id, param)
652            .await
653            .map_err(|e| anyhow!(e))?;
654        self.wait_version(version).await
655    }
656}
657
658impl CatalogWriterImpl {
659    pub fn new(
660        meta_client: MetaClient,
661        catalog_updated_rx: Receiver<CatalogVersion>,
662        hummock_snapshot_manager: HummockSnapshotManagerRef,
663    ) -> Self {
664        Self {
665            meta_client,
666            catalog_updated_rx,
667            hummock_snapshot_manager,
668        }
669    }
670
671    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
672        let mut rx = self.catalog_updated_rx.clone();
673        while *rx.borrow_and_update() < version.catalog_version {
674            rx.changed().await.map_err(|e| anyhow!(e))?;
675        }
676        self.hummock_snapshot_manager
677            .wait(HummockVersionId::new(version.hummock_version_id))
678            .await;
679        Ok(())
680    }
681}