Skip to main content

risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_pb::catalog::{
27    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28    PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37    streaming_job_resource_type,
38};
39use risingwave_pb::meta::PbTableParallelism;
40use risingwave_pb::stream_plan::StreamFragmentGraph;
41use risingwave_rpc_client::MetaClient;
42use tokio::sync::watch::Receiver;
43
44use super::root_catalog::Catalog;
45use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
46use crate::error::Result;
47use crate::scheduler::HummockSnapshotManagerRef;
48use crate::session::current::notice_to_user;
49use crate::user::UserId;
50
51pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
52
53/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
54#[derive(Clone)]
55pub struct CatalogReader(Arc<RwLock<Catalog>>);
56
57impl CatalogReader {
58    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
59        CatalogReader(inner)
60    }
61
62    pub fn read_guard(&self) -> CatalogReadGuard {
63        // Make this recursive so that one can get this guard in the same thread without fear.
64        self.0.read_arc_recursive()
65    }
66}
67
68/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
69/// It will only send rpc to meta and get the catalog version as response.
70/// Then it will wait for the local catalog to be synced to the version, which is performed by
71/// [observer](`crate::observer::FrontendObserverNode`).
72#[async_trait::async_trait]
73pub trait CatalogWriter: Send + Sync {
74    async fn create_database(
75        &self,
76        db_name: &str,
77        owner: UserId,
78        resource_group: &str,
79        barrier_interval_ms: Option<u32>,
80        checkpoint_frequency: Option<u64>,
81    ) -> Result<()>;
82
83    async fn create_schema(
84        &self,
85        db_id: DatabaseId,
86        schema_name: &str,
87        owner: UserId,
88    ) -> Result<()>;
89
90    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
91
92    async fn create_materialized_view(
93        &self,
94        table: PbTable,
95        graph: StreamFragmentGraph,
96        dependencies: HashSet<ObjectId>,
97        resource_type: streaming_job_resource_type::ResourceType,
98        if_not_exists: bool,
99        refresh_interval_sec: Option<u64>,
100    ) -> Result<()>;
101
102    async fn replace_materialized_view(
103        &self,
104        table: PbTable,
105        graph: StreamFragmentGraph,
106    ) -> Result<()>;
107
108    async fn create_table(
109        &self,
110        source: Option<PbSource>,
111        table: PbTable,
112        graph: StreamFragmentGraph,
113        job_type: PbTableJobType,
114        if_not_exists: bool,
115        dependencies: HashSet<ObjectId>,
116    ) -> Result<()>;
117
118    async fn replace_table(
119        &self,
120        source: Option<PbSource>,
121        table: PbTable,
122        graph: StreamFragmentGraph,
123        job_type: TableJobType,
124    ) -> Result<()>;
125
126    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
127
128    async fn create_index(
129        &self,
130        index: PbIndex,
131        table: PbTable,
132        graph: StreamFragmentGraph,
133        resource_type: streaming_job_resource_type::ResourceType,
134        if_not_exists: bool,
135    ) -> Result<()>;
136
137    async fn create_source(
138        &self,
139        source: PbSource,
140        graph: Option<StreamFragmentGraph>,
141        if_not_exists: bool,
142    ) -> Result<()>;
143
144    async fn create_sink(
145        &self,
146        sink: PbSink,
147        graph: StreamFragmentGraph,
148        dependencies: HashSet<ObjectId>,
149        resource_type: streaming_job_resource_type::ResourceType,
150        if_not_exists: bool,
151    ) -> Result<()>;
152
153    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
154
155    async fn create_function(&self, function: PbFunction) -> Result<()>;
156
157    async fn create_connection(
158        &self,
159        connection_name: String,
160        database_id: DatabaseId,
161        schema_id: SchemaId,
162        owner_id: UserId,
163        connection: create_connection_request::Payload,
164    ) -> Result<()>;
165
166    async fn create_secret(
167        &self,
168        secret_name: String,
169        database_id: DatabaseId,
170        schema_id: SchemaId,
171        owner_id: UserId,
172        payload: Vec<u8>,
173    ) -> Result<()>;
174
175    async fn comment_on(&self, comment: PbComment) -> Result<()>;
176
177    async fn drop_table(
178        &self,
179        source_id: Option<SourceId>,
180        table_id: TableId,
181        cascade: bool,
182    ) -> Result<()>;
183
184    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
185
186    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
187
188    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
189
190    async fn reset_source(&self, source_id: SourceId) -> Result<()>;
191
192    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
193
194    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
195    -> Result<()>;
196
197    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
198
199    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
200
201    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
202
203    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
204
205    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
206
207    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
208
209    async fn alter_secret(
210        &self,
211        secret_id: SecretId,
212        secret_name: String,
213        database_id: DatabaseId,
214        schema_id: SchemaId,
215        owner_id: UserId,
216        payload: Vec<u8>,
217    ) -> Result<()>;
218
219    async fn alter_subscription_retention(
220        &self,
221        subscription_id: SubscriptionId,
222        retention_seconds: u64,
223        definition: String,
224    ) -> Result<()>;
225
226    async fn alter_name(
227        &self,
228        object_id: alter_name_request::Object,
229        object_name: &str,
230    ) -> Result<()>;
231
232    async fn alter_owner(
233        &self,
234        object: alter_owner_request::Object,
235        owner_id: UserId,
236    ) -> Result<()>;
237
238    /// Replace the source in the catalog.
239    async fn alter_source(&self, source: PbSource) -> Result<()>;
240
241    async fn alter_parallelism(
242        &self,
243        job_id: JobId,
244        parallelism: PbTableParallelism,
245        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
246        deferred: bool,
247    ) -> Result<()>;
248
249    async fn alter_backfill_parallelism(
250        &self,
251        job_id: JobId,
252        parallelism: Option<PbTableParallelism>,
253        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
254        deferred: bool,
255    ) -> Result<()>;
256
257    async fn alter_config(
258        &self,
259        job_id: JobId,
260        entries_to_add: HashMap<String, String>,
261        keys_to_remove: Vec<String>,
262    ) -> Result<()>;
263
264    async fn alter_resource_group(
265        &self,
266        job_id: JobId,
267        resource_group: Option<String>,
268        deferred: bool,
269    ) -> Result<()>;
270
271    async fn alter_database_resource_group(
272        &self,
273        database_id: DatabaseId,
274        resource_group: Option<String>,
275        deferred: bool,
276    ) -> Result<()>;
277
278    async fn alter_set_schema(
279        &self,
280        object: alter_set_schema_request::Object,
281        new_schema_id: SchemaId,
282    ) -> Result<()>;
283
284    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
285
286    async fn alter_database_param(
287        &self,
288        database_id: DatabaseId,
289        param: AlterDatabaseParam,
290    ) -> Result<()>;
291
292    async fn create_iceberg_table(
293        &self,
294        table_job_info: PbTableJobInfo,
295        sink_job_info: PbSinkJobInfo,
296        iceberg_source: PbSource,
297        if_not_exists: bool,
298    ) -> Result<()>;
299
300    async fn wait(&self, job_id: Option<JobId>) -> Result<()>;
301}
302
303#[derive(Clone)]
304pub struct CatalogWriterImpl {
305    meta_client: MetaClient,
306    catalog_updated_rx: Receiver<CatalogVersion>,
307    hummock_snapshot_manager: HummockSnapshotManagerRef,
308}
309
310#[async_trait::async_trait]
311impl CatalogWriter for CatalogWriterImpl {
312    async fn create_database(
313        &self,
314        db_name: &str,
315        owner: UserId,
316        resource_group: &str,
317        barrier_interval_ms: Option<u32>,
318        checkpoint_frequency: Option<u64>,
319    ) -> Result<()> {
320        let version = self
321            .meta_client
322            .create_database(PbDatabase {
323                name: db_name.to_owned(),
324                id: 0.into(),
325                owner,
326                resource_group: resource_group.to_owned(),
327                barrier_interval_ms,
328                checkpoint_frequency,
329            })
330            .await?;
331        self.wait_version(version).await
332    }
333
334    async fn create_schema(
335        &self,
336        db_id: DatabaseId,
337        schema_name: &str,
338        owner: UserId,
339    ) -> Result<()> {
340        let version = self
341            .meta_client
342            .create_schema(PbSchema {
343                id: 0.into(),
344                name: schema_name.to_owned(),
345                database_id: db_id,
346                owner,
347            })
348            .await?;
349        self.wait_version(version).await
350    }
351
352    // TODO: maybe here to pass a materialize plan node
353    async fn create_materialized_view(
354        &self,
355        table: PbTable,
356        graph: StreamFragmentGraph,
357        dependencies: HashSet<ObjectId>,
358        resource_type: streaming_job_resource_type::ResourceType,
359        if_not_exists: bool,
360        refresh_interval_sec: Option<u64>,
361    ) -> Result<()> {
362        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
363        let version = self
364            .meta_client
365            .create_materialized_view(
366                table,
367                graph,
368                dependencies,
369                resource_type,
370                if_not_exists,
371                refresh_interval_sec,
372            )
373            .await?;
374        if matches!(create_type, PbCreateType::Foreground) {
375            self.wait_version(version).await?
376        }
377        Ok(())
378    }
379
380    async fn replace_materialized_view(
381        &self,
382        table: PbTable,
383        graph: StreamFragmentGraph,
384    ) -> Result<()> {
385        // TODO: this is a dummy implementation for debugging only.
386        notice_to_user(format!("table: {table:#?}"));
387        notice_to_user(format!("graph: {graph:#?}"));
388
389        let version = self
390            .meta_client
391            .replace_job(
392                graph,
393                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
394            )
395            .await?;
396
397        self.wait_version(version).await
398    }
399
400    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
401        let version = self.meta_client.create_view(view, dependencies).await?;
402        self.wait_version(version).await
403    }
404
405    async fn create_index(
406        &self,
407        index: PbIndex,
408        table: PbTable,
409        graph: StreamFragmentGraph,
410        resource_type: streaming_job_resource_type::ResourceType,
411        if_not_exists: bool,
412    ) -> Result<()> {
413        let version = self
414            .meta_client
415            .create_index(index, table, graph, resource_type, if_not_exists)
416            .await?;
417        self.wait_version(version).await
418    }
419
420    async fn create_table(
421        &self,
422        source: Option<PbSource>,
423        table: PbTable,
424        graph: StreamFragmentGraph,
425        job_type: PbTableJobType,
426        if_not_exists: bool,
427        dependencies: HashSet<ObjectId>,
428    ) -> Result<()> {
429        let version = self
430            .meta_client
431            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
432            .await?;
433        self.wait_version(version).await
434    }
435
436    async fn replace_table(
437        &self,
438        source: Option<PbSource>,
439        table: PbTable,
440        graph: StreamFragmentGraph,
441        job_type: TableJobType,
442    ) -> Result<()> {
443        let version = self
444            .meta_client
445            .replace_job(
446                graph,
447                ReplaceJob::ReplaceTable(ReplaceTable {
448                    source,
449                    table: Some(table),
450                    job_type: job_type as _,
451                }),
452            )
453            .await?;
454        self.wait_version(version).await
455    }
456
457    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
458        let version = self
459            .meta_client
460            .replace_job(
461                graph,
462                ReplaceJob::ReplaceSource(ReplaceSource {
463                    source: Some(source),
464                }),
465            )
466            .await?;
467        self.wait_version(version).await
468    }
469
470    async fn create_source(
471        &self,
472        source: PbSource,
473        graph: Option<StreamFragmentGraph>,
474        if_not_exists: bool,
475    ) -> Result<()> {
476        let version = self
477            .meta_client
478            .create_source(source, graph, if_not_exists)
479            .await?;
480        self.wait_version(version).await
481    }
482
483    async fn create_sink(
484        &self,
485        sink: PbSink,
486        graph: StreamFragmentGraph,
487        dependencies: HashSet<ObjectId>,
488        resource_type: streaming_job_resource_type::ResourceType,
489        if_not_exists: bool,
490    ) -> Result<()> {
491        let version = self
492            .meta_client
493            .create_sink(sink, graph, dependencies, resource_type, if_not_exists)
494            .await?;
495        self.wait_version(version).await
496    }
497
498    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
499        let version = self.meta_client.create_subscription(subscription).await?;
500        self.wait_version(version).await
501    }
502
503    async fn create_function(&self, function: PbFunction) -> Result<()> {
504        let version = self.meta_client.create_function(function).await?;
505        self.wait_version(version).await
506    }
507
508    async fn create_connection(
509        &self,
510        connection_name: String,
511        database_id: DatabaseId,
512        schema_id: SchemaId,
513        owner_id: UserId,
514        connection: create_connection_request::Payload,
515    ) -> Result<()> {
516        let version = self
517            .meta_client
518            .create_connection(
519                connection_name,
520                database_id,
521                schema_id,
522                owner_id,
523                connection,
524            )
525            .await?;
526        self.wait_version(version).await
527    }
528
529    async fn create_secret(
530        &self,
531        secret_name: String,
532        database_id: DatabaseId,
533        schema_id: SchemaId,
534        owner_id: UserId,
535        payload: Vec<u8>,
536    ) -> Result<()> {
537        let version = self
538            .meta_client
539            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
540            .await?;
541        self.wait_version(version).await
542    }
543
544    async fn comment_on(&self, comment: PbComment) -> Result<()> {
545        let version = self.meta_client.comment_on(comment).await?;
546        self.wait_version(version).await
547    }
548
549    async fn drop_table(
550        &self,
551        source_id: Option<SourceId>,
552        table_id: TableId,
553        cascade: bool,
554    ) -> Result<()> {
555        let version = self
556            .meta_client
557            .drop_table(source_id, table_id, cascade)
558            .await?;
559        self.wait_version(version).await
560    }
561
562    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
563        let version = self
564            .meta_client
565            .drop_materialized_view(table_id, cascade)
566            .await?;
567        self.wait_version(version).await
568    }
569
570    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
571        let version = self.meta_client.drop_view(view_id, cascade).await?;
572        self.wait_version(version).await
573    }
574
575    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
576        let version = self.meta_client.drop_source(source_id, cascade).await?;
577        self.wait_version(version).await
578    }
579
580    async fn reset_source(&self, source_id: SourceId) -> Result<()> {
581        let version = self.meta_client.reset_source(source_id).await?;
582        self.wait_version(version).await
583    }
584
585    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
586        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
587        self.wait_version(version).await
588    }
589
590    async fn drop_subscription(
591        &self,
592        subscription_id: SubscriptionId,
593        cascade: bool,
594    ) -> Result<()> {
595        let version = self
596            .meta_client
597            .drop_subscription(subscription_id, cascade)
598            .await?;
599        self.wait_version(version).await
600    }
601
602    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
603        let version = self.meta_client.drop_index(index_id, cascade).await?;
604        self.wait_version(version).await
605    }
606
607    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
608        let version = self.meta_client.drop_function(function_id, cascade).await?;
609        self.wait_version(version).await
610    }
611
612    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
613        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
614        self.wait_version(version).await
615    }
616
617    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
618        let version = self.meta_client.drop_database(database_id).await?;
619        self.wait_version(version).await
620    }
621
622    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
623        let version = self
624            .meta_client
625            .drop_connection(connection_id, cascade)
626            .await?;
627        self.wait_version(version).await
628    }
629
630    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
631        let version = self.meta_client.drop_secret(secret_id, cascade).await?;
632        self.wait_version(version).await
633    }
634
635    async fn alter_name(
636        &self,
637        object_id: alter_name_request::Object,
638        object_name: &str,
639    ) -> Result<()> {
640        let version = self.meta_client.alter_name(object_id, object_name).await?;
641        self.wait_version(version).await
642    }
643
644    async fn alter_owner(
645        &self,
646        object: alter_owner_request::Object,
647        owner_id: UserId,
648    ) -> Result<()> {
649        let version = self.meta_client.alter_owner(object, owner_id).await?;
650        self.wait_version(version).await
651    }
652
653    async fn alter_set_schema(
654        &self,
655        object: alter_set_schema_request::Object,
656        new_schema_id: SchemaId,
657    ) -> Result<()> {
658        let version = self
659            .meta_client
660            .alter_set_schema(object, new_schema_id)
661            .await?;
662        self.wait_version(version).await
663    }
664
665    async fn alter_source(&self, source: PbSource) -> Result<()> {
666        let version = self.meta_client.alter_source(source).await?;
667        self.wait_version(version).await
668    }
669
670    async fn alter_parallelism(
671        &self,
672        job_id: JobId,
673        parallelism: PbTableParallelism,
674        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
675        deferred: bool,
676    ) -> Result<()> {
677        self.meta_client
678            .alter_parallelism(job_id, parallelism, adaptive_parallelism_strategy, deferred)
679            .await?;
680        Ok(())
681    }
682
683    async fn alter_backfill_parallelism(
684        &self,
685        job_id: JobId,
686        parallelism: Option<PbTableParallelism>,
687        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
688        deferred: bool,
689    ) -> Result<()> {
690        self.meta_client
691            .alter_backfill_parallelism(
692                job_id,
693                parallelism,
694                adaptive_parallelism_strategy,
695                deferred,
696            )
697            .await?;
698        Ok(())
699    }
700
701    async fn alter_config(
702        &self,
703        job_id: JobId,
704        entries_to_add: HashMap<String, String>,
705        keys_to_remove: Vec<String>,
706    ) -> Result<()> {
707        self.meta_client
708            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
709            .await?;
710        Ok(())
711    }
712
713    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
714        let version = self.meta_client.alter_swap_rename(object).await?;
715        self.wait_version(version).await
716    }
717
718    async fn alter_secret(
719        &self,
720        secret_id: SecretId,
721        secret_name: String,
722        database_id: DatabaseId,
723        schema_id: SchemaId,
724        owner_id: UserId,
725        payload: Vec<u8>,
726    ) -> Result<()> {
727        let version = self
728            .meta_client
729            .alter_secret(
730                secret_id,
731                secret_name,
732                database_id,
733                schema_id,
734                owner_id,
735                payload,
736            )
737            .await?;
738        self.wait_version(version).await
739    }
740
741    async fn alter_subscription_retention(
742        &self,
743        subscription_id: SubscriptionId,
744        retention_seconds: u64,
745        definition: String,
746    ) -> Result<()> {
747        let version = self
748            .meta_client
749            .alter_subscription_retention(subscription_id, retention_seconds, definition)
750            .await?;
751        self.wait_version(version).await
752    }
753
754    async fn alter_resource_group(
755        &self,
756        job_id: JobId,
757        resource_group: Option<String>,
758        deferred: bool,
759    ) -> Result<()> {
760        self.meta_client
761            .alter_resource_group(job_id, resource_group, deferred)
762            .await
763            .map_err(|e| anyhow!(e))?;
764
765        Ok(())
766    }
767
768    async fn alter_database_resource_group(
769        &self,
770        database_id: DatabaseId,
771        resource_group: Option<String>,
772        deferred: bool,
773    ) -> Result<()> {
774        let version = self
775            .meta_client
776            .alter_database_resource_group(database_id, resource_group, deferred)
777            .await
778            .map_err(|e| anyhow!(e))?;
779        self.wait_version(version).await
780    }
781
782    async fn alter_database_param(
783        &self,
784        database_id: DatabaseId,
785        param: AlterDatabaseParam,
786    ) -> Result<()> {
787        let version = self
788            .meta_client
789            .alter_database_param(database_id, param)
790            .await
791            .map_err(|e| anyhow!(e))?;
792        self.wait_version(version).await
793    }
794
795    async fn create_iceberg_table(
796        &self,
797        table_job_info: PbTableJobInfo,
798        sink_job_info: PbSinkJobInfo,
799        iceberg_source: PbSource,
800        if_not_exists: bool,
801    ) -> Result<()> {
802        let version = Box::pin(self.meta_client.create_iceberg_table(
803            table_job_info,
804            sink_job_info,
805            iceberg_source,
806            if_not_exists,
807        ))
808        .await?;
809        self.wait_version(version).await
810    }
811
812    async fn wait(&self, job_id: Option<JobId>) -> Result<()> {
813        let version = self
814            .meta_client
815            .wait(job_id)
816            .await
817            .map_err(|e| anyhow!(e))?;
818        self.wait_version(version).await
819    }
820}
821
822impl CatalogWriterImpl {
823    pub fn new(
824        meta_client: MetaClient,
825        catalog_updated_rx: Receiver<CatalogVersion>,
826        hummock_snapshot_manager: HummockSnapshotManagerRef,
827    ) -> Self {
828        Self {
829            meta_client,
830            catalog_updated_rx,
831            hummock_snapshot_manager,
832        }
833    }
834
835    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
836        let mut rx = self.catalog_updated_rx.clone();
837        while *rx.borrow_and_update() < version.catalog_version {
838            rx.changed().await.map_err(|e| anyhow!(e))?;
839        }
840        self.hummock_snapshot_manager
841            .wait(version.hummock_version_id)
842            .await;
843        Ok(())
844    }
845}