risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, ObjectId};
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_hummock_sdk::HummockVersionId;
24use risingwave_pb::catalog::{
25    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
26    PbSubscription, PbTable, PbView,
27};
28use risingwave_pb::ddl_service::replace_job_plan::{ReplaceJob, ReplaceSource, ReplaceTable};
29use risingwave_pb::ddl_service::{
30    PbReplaceJobPlan, PbTableJobType, ReplaceJobPlan, TableJobType, WaitVersion,
31    alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
32    create_connection_request,
33};
34use risingwave_pb::meta::PbTableParallelism;
35use risingwave_pb::stream_plan::StreamFragmentGraph;
36use risingwave_rpc_client::MetaClient;
37use tokio::sync::watch::Receiver;
38
39use super::root_catalog::Catalog;
40use super::{DatabaseId, SecretId, TableId};
41use crate::error::Result;
42use crate::scheduler::HummockSnapshotManagerRef;
43use crate::user::UserId;
44
45pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
46
47/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
48#[derive(Clone)]
49pub struct CatalogReader(Arc<RwLock<Catalog>>);
50
51impl CatalogReader {
52    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
53        CatalogReader(inner)
54    }
55
56    pub fn read_guard(&self) -> CatalogReadGuard {
57        // Make this recursive so that one can get this guard in the same thread without fear.
58        self.0.read_arc_recursive()
59    }
60}
61
62/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
63/// It will only send rpc to meta and get the catalog version as response.
64/// Then it will wait for the local catalog to be synced to the version, which is performed by
65/// [observer](`crate::observer::FrontendObserverNode`).
66#[async_trait::async_trait]
67pub trait CatalogWriter: Send + Sync {
68    async fn create_database(
69        &self,
70        db_name: &str,
71        owner: UserId,
72        resource_group: &str,
73    ) -> Result<()>;
74
75    async fn create_schema(
76        &self,
77        db_id: DatabaseId,
78        schema_name: &str,
79        owner: UserId,
80    ) -> Result<()>;
81
82    async fn create_view(&self, view: PbView) -> Result<()>;
83
84    async fn create_materialized_view(
85        &self,
86        table: PbTable,
87        graph: StreamFragmentGraph,
88        dependencies: HashSet<ObjectId>,
89        specific_resource_group: Option<String>,
90    ) -> Result<()>;
91
92    async fn create_table(
93        &self,
94        source: Option<PbSource>,
95        table: PbTable,
96        graph: StreamFragmentGraph,
97        job_type: PbTableJobType,
98    ) -> Result<()>;
99
100    async fn replace_table(
101        &self,
102        source: Option<PbSource>,
103        table: PbTable,
104        graph: StreamFragmentGraph,
105        mapping: ColIndexMapping,
106        job_type: TableJobType,
107    ) -> Result<()>;
108
109    async fn replace_source(
110        &self,
111        source: PbSource,
112        graph: StreamFragmentGraph,
113        mapping: ColIndexMapping,
114    ) -> Result<()>;
115
116    async fn create_index(
117        &self,
118        index: PbIndex,
119        table: PbTable,
120        graph: StreamFragmentGraph,
121    ) -> Result<()>;
122
123    async fn create_source(
124        &self,
125        source: PbSource,
126        graph: Option<StreamFragmentGraph>,
127    ) -> Result<()>;
128
129    async fn create_sink(
130        &self,
131        sink: PbSink,
132        graph: StreamFragmentGraph,
133        affected_table_change: Option<PbReplaceJobPlan>,
134        dependencies: HashSet<ObjectId>,
135    ) -> Result<()>;
136
137    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
138
139    async fn create_function(&self, function: PbFunction) -> Result<()>;
140
141    async fn create_connection(
142        &self,
143        connection_name: String,
144        database_id: u32,
145        schema_id: u32,
146        owner_id: u32,
147        connection: create_connection_request::Payload,
148    ) -> Result<()>;
149
150    async fn create_secret(
151        &self,
152        secret_name: String,
153        database_id: u32,
154        schema_id: u32,
155        owner_id: u32,
156        payload: Vec<u8>,
157    ) -> Result<()>;
158
159    async fn comment_on(&self, comment: PbComment) -> Result<()>;
160
161    async fn drop_table(
162        &self,
163        source_id: Option<u32>,
164        table_id: TableId,
165        cascade: bool,
166    ) -> Result<()>;
167
168    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
169
170    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;
171
172    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;
173
174    async fn drop_sink(
175        &self,
176        sink_id: u32,
177        cascade: bool,
178        affected_table_change: Option<PbReplaceJobPlan>,
179    ) -> Result<()>;
180
181    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;
182
183    async fn drop_database(&self, database_id: u32) -> Result<()>;
184
185    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()>;
186
187    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
188
189    async fn drop_function(&self, function_id: FunctionId) -> Result<()>;
190
191    async fn drop_connection(&self, connection_id: u32) -> Result<()>;
192
193    async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;
194
195    async fn alter_secret(
196        &self,
197        secret_id: u32,
198        secret_name: String,
199        database_id: u32,
200        schema_id: u32,
201        owner_id: u32,
202        payload: Vec<u8>,
203    ) -> Result<()>;
204
205    async fn alter_name(
206        &self,
207        object_id: alter_name_request::Object,
208        object_name: &str,
209    ) -> Result<()>;
210
211    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;
212
213    /// Replace the source in the catalog.
214    async fn alter_source(&self, source: PbSource) -> Result<()>;
215
216    async fn alter_parallelism(
217        &self,
218        job_id: u32,
219        parallelism: PbTableParallelism,
220        deferred: bool,
221    ) -> Result<()>;
222
223    async fn alter_resource_group(
224        &self,
225        table_id: u32,
226        resource_group: Option<String>,
227        deferred: bool,
228    ) -> Result<()>;
229
230    async fn alter_set_schema(
231        &self,
232        object: alter_set_schema_request::Object,
233        new_schema_id: u32,
234    ) -> Result<()>;
235
236    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
237}
238
239#[derive(Clone)]
240pub struct CatalogWriterImpl {
241    meta_client: MetaClient,
242    catalog_updated_rx: Receiver<CatalogVersion>,
243    hummock_snapshot_manager: HummockSnapshotManagerRef,
244}
245
246#[async_trait::async_trait]
247impl CatalogWriter for CatalogWriterImpl {
248    async fn create_database(
249        &self,
250        db_name: &str,
251        owner: UserId,
252        resource_group: &str,
253    ) -> Result<()> {
254        let version = self
255            .meta_client
256            .create_database(PbDatabase {
257                name: db_name.to_owned(),
258                id: 0,
259                owner,
260                resource_group: resource_group.to_owned(),
261            })
262            .await?;
263        self.wait_version(version).await
264    }
265
266    async fn create_schema(
267        &self,
268        db_id: DatabaseId,
269        schema_name: &str,
270        owner: UserId,
271    ) -> Result<()> {
272        let version = self
273            .meta_client
274            .create_schema(PbSchema {
275                id: 0,
276                name: schema_name.to_owned(),
277                database_id: db_id,
278                owner,
279            })
280            .await?;
281        self.wait_version(version).await
282    }
283
284    // TODO: maybe here to pass a materialize plan node
285    async fn create_materialized_view(
286        &self,
287        table: PbTable,
288        graph: StreamFragmentGraph,
289        dependencies: HashSet<ObjectId>,
290        specific_resource_group: Option<String>,
291    ) -> Result<()> {
292        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
293        let version = self
294            .meta_client
295            .create_materialized_view(table, graph, dependencies, specific_resource_group)
296            .await?;
297        if matches!(create_type, PbCreateType::Foreground) {
298            self.wait_version(version).await?
299        }
300        Ok(())
301    }
302
303    async fn create_view(&self, view: PbView) -> Result<()> {
304        let version = self.meta_client.create_view(view).await?;
305        self.wait_version(version).await
306    }
307
308    async fn create_index(
309        &self,
310        index: PbIndex,
311        table: PbTable,
312        graph: StreamFragmentGraph,
313    ) -> Result<()> {
314        let version = self.meta_client.create_index(index, table, graph).await?;
315        self.wait_version(version).await
316    }
317
318    async fn create_table(
319        &self,
320        source: Option<PbSource>,
321        table: PbTable,
322        graph: StreamFragmentGraph,
323        job_type: PbTableJobType,
324    ) -> Result<()> {
325        let version = self
326            .meta_client
327            .create_table(source, table, graph, job_type)
328            .await?;
329        self.wait_version(version).await
330    }
331
332    async fn replace_table(
333        &self,
334        source: Option<PbSource>,
335        table: PbTable,
336        graph: StreamFragmentGraph,
337        mapping: ColIndexMapping,
338        job_type: TableJobType,
339    ) -> Result<()> {
340        let version = self
341            .meta_client
342            .replace_job(
343                graph,
344                mapping,
345                ReplaceJob::ReplaceTable(ReplaceTable {
346                    source,
347                    table: Some(table),
348                    job_type: job_type as _,
349                }),
350            )
351            .await?;
352        self.wait_version(version).await
353    }
354
355    async fn replace_source(
356        &self,
357        source: PbSource,
358        graph: StreamFragmentGraph,
359        mapping: ColIndexMapping,
360    ) -> Result<()> {
361        let version = self
362            .meta_client
363            .replace_job(
364                graph,
365                mapping,
366                ReplaceJob::ReplaceSource(ReplaceSource {
367                    source: Some(source),
368                }),
369            )
370            .await?;
371        self.wait_version(version).await
372    }
373
374    async fn create_source(
375        &self,
376        source: PbSource,
377        graph: Option<StreamFragmentGraph>,
378    ) -> Result<()> {
379        let version = self.meta_client.create_source(source, graph).await?;
380        self.wait_version(version).await
381    }
382
383    async fn create_sink(
384        &self,
385        sink: PbSink,
386        graph: StreamFragmentGraph,
387        affected_table_change: Option<ReplaceJobPlan>,
388        dependencies: HashSet<ObjectId>,
389    ) -> Result<()> {
390        let version = self
391            .meta_client
392            .create_sink(sink, graph, affected_table_change, dependencies)
393            .await?;
394        self.wait_version(version).await
395    }
396
397    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
398        let version = self.meta_client.create_subscription(subscription).await?;
399        self.wait_version(version).await
400    }
401
402    async fn create_function(&self, function: PbFunction) -> Result<()> {
403        let version = self.meta_client.create_function(function).await?;
404        self.wait_version(version).await
405    }
406
407    async fn create_connection(
408        &self,
409        connection_name: String,
410        database_id: u32,
411        schema_id: u32,
412        owner_id: u32,
413        connection: create_connection_request::Payload,
414    ) -> Result<()> {
415        let version = self
416            .meta_client
417            .create_connection(
418                connection_name,
419                database_id,
420                schema_id,
421                owner_id,
422                connection,
423            )
424            .await?;
425        self.wait_version(version).await
426    }
427
428    async fn create_secret(
429        &self,
430        secret_name: String,
431        database_id: u32,
432        schema_id: u32,
433        owner_id: u32,
434        payload: Vec<u8>,
435    ) -> Result<()> {
436        let version = self
437            .meta_client
438            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
439            .await?;
440        self.wait_version(version).await
441    }
442
443    async fn comment_on(&self, comment: PbComment) -> Result<()> {
444        let version = self.meta_client.comment_on(comment).await?;
445        self.wait_version(version).await
446    }
447
448    async fn drop_table(
449        &self,
450        source_id: Option<u32>,
451        table_id: TableId,
452        cascade: bool,
453    ) -> Result<()> {
454        let version = self
455            .meta_client
456            .drop_table(source_id, table_id, cascade)
457            .await?;
458        self.wait_version(version).await
459    }
460
461    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
462        let version = self
463            .meta_client
464            .drop_materialized_view(table_id, cascade)
465            .await?;
466        self.wait_version(version).await
467    }
468
469    async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
470        let version = self.meta_client.drop_view(view_id, cascade).await?;
471        self.wait_version(version).await
472    }
473
474    async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
475        let version = self.meta_client.drop_source(source_id, cascade).await?;
476        self.wait_version(version).await
477    }
478
479    async fn drop_sink(
480        &self,
481        sink_id: u32,
482        cascade: bool,
483        affected_table_change: Option<ReplaceJobPlan>,
484    ) -> Result<()> {
485        let version = self
486            .meta_client
487            .drop_sink(sink_id, cascade, affected_table_change)
488            .await?;
489        self.wait_version(version).await
490    }
491
492    async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
493        let version = self
494            .meta_client
495            .drop_subscription(subscription_id, cascade)
496            .await?;
497        self.wait_version(version).await
498    }
499
500    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
501        let version = self.meta_client.drop_index(index_id, cascade).await?;
502        self.wait_version(version).await
503    }
504
505    async fn drop_function(&self, function_id: FunctionId) -> Result<()> {
506        let version = self.meta_client.drop_function(function_id).await?;
507        self.wait_version(version).await
508    }
509
510    async fn drop_schema(&self, schema_id: u32, cascade: bool) -> Result<()> {
511        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
512        self.wait_version(version).await
513    }
514
515    async fn drop_database(&self, database_id: u32) -> Result<()> {
516        let version = self.meta_client.drop_database(database_id).await?;
517        self.wait_version(version).await
518    }
519
520    async fn drop_connection(&self, connection_id: u32) -> Result<()> {
521        let version = self.meta_client.drop_connection(connection_id).await?;
522        self.wait_version(version).await
523    }
524
525    async fn drop_secret(&self, secret_id: SecretId) -> Result<()> {
526        let version = self.meta_client.drop_secret(secret_id).await?;
527        self.wait_version(version).await
528    }
529
530    async fn alter_name(
531        &self,
532        object_id: alter_name_request::Object,
533        object_name: &str,
534    ) -> Result<()> {
535        let version = self.meta_client.alter_name(object_id, object_name).await?;
536        self.wait_version(version).await
537    }
538
539    async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
540        let version = self.meta_client.alter_owner(object, owner_id).await?;
541        self.wait_version(version).await
542    }
543
544    async fn alter_set_schema(
545        &self,
546        object: alter_set_schema_request::Object,
547        new_schema_id: u32,
548    ) -> Result<()> {
549        let version = self
550            .meta_client
551            .alter_set_schema(object, new_schema_id)
552            .await?;
553        self.wait_version(version).await
554    }
555
556    async fn alter_source(&self, source: PbSource) -> Result<()> {
557        let version = self.meta_client.alter_source(source).await?;
558        self.wait_version(version).await
559    }
560
561    async fn alter_parallelism(
562        &self,
563        job_id: u32,
564        parallelism: PbTableParallelism,
565        deferred: bool,
566    ) -> Result<()> {
567        self.meta_client
568            .alter_parallelism(job_id, parallelism, deferred)
569            .await
570            .map_err(|e| anyhow!(e))?;
571
572        Ok(())
573    }
574
575    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
576        let version = self.meta_client.alter_swap_rename(object).await?;
577        self.wait_version(version).await
578    }
579
580    async fn alter_secret(
581        &self,
582        secret_id: u32,
583        secret_name: String,
584        database_id: u32,
585        schema_id: u32,
586        owner_id: u32,
587        payload: Vec<u8>,
588    ) -> Result<()> {
589        let version = self
590            .meta_client
591            .alter_secret(
592                secret_id,
593                secret_name,
594                database_id,
595                schema_id,
596                owner_id,
597                payload,
598            )
599            .await?;
600        self.wait_version(version).await
601    }
602
603    async fn alter_resource_group(
604        &self,
605        table_id: u32,
606        resource_group: Option<String>,
607        deferred: bool,
608    ) -> Result<()> {
609        self.meta_client
610            .alter_resource_group(table_id, resource_group, deferred)
611            .await
612            .map_err(|e| anyhow!(e))?;
613
614        Ok(())
615    }
616}
617
618impl CatalogWriterImpl {
619    pub fn new(
620        meta_client: MetaClient,
621        catalog_updated_rx: Receiver<CatalogVersion>,
622        hummock_snapshot_manager: HummockSnapshotManagerRef,
623    ) -> Self {
624        Self {
625            meta_client,
626            catalog_updated_rx,
627            hummock_snapshot_manager,
628        }
629    }
630
631    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
632        let mut rx = self.catalog_updated_rx.clone();
633        while *rx.borrow_and_update() < version.catalog_version {
634            rx.changed().await.map_err(|e| anyhow!(e))?;
635        }
636        self.hummock_snapshot_manager
637            .wait(HummockVersionId::new(version.hummock_version_id))
638            .await;
639        Ok(())
640    }
641}