risingwave_frontend/catalog/
catalog_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22    AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_pb::catalog::{
27    PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28    PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32    ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35    PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36    alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37    streaming_job_resource_type,
38};
39use risingwave_pb::meta::PbTableParallelism;
40use risingwave_pb::stream_plan::StreamFragmentGraph;
41use risingwave_rpc_client::MetaClient;
42use tokio::sync::watch::Receiver;
43
44use super::root_catalog::Catalog;
45use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
46use crate::error::Result;
47use crate::scheduler::HummockSnapshotManagerRef;
48use crate::session::current::notice_to_user;
49use crate::user::UserId;
50
51pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
52
53/// [`CatalogReader`] can read catalog from local catalog and force the holder can not modify it.
54#[derive(Clone)]
55pub struct CatalogReader(Arc<RwLock<Catalog>>);
56
57impl CatalogReader {
58    pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
59        CatalogReader(inner)
60    }
61
62    pub fn read_guard(&self) -> CatalogReadGuard {
63        // Make this recursive so that one can get this guard in the same thread without fear.
64        self.0.read_arc_recursive()
65    }
66}
67
68/// [`CatalogWriter`] initiate DDL operations (create table/schema/database/function/connection).
69/// It will only send rpc to meta and get the catalog version as response.
70/// Then it will wait for the local catalog to be synced to the version, which is performed by
71/// [observer](`crate::observer::FrontendObserverNode`).
72#[async_trait::async_trait]
73pub trait CatalogWriter: Send + Sync {
74    async fn create_database(
75        &self,
76        db_name: &str,
77        owner: UserId,
78        resource_group: &str,
79        barrier_interval_ms: Option<u32>,
80        checkpoint_frequency: Option<u64>,
81    ) -> Result<()>;
82
83    async fn create_schema(
84        &self,
85        db_id: DatabaseId,
86        schema_name: &str,
87        owner: UserId,
88    ) -> Result<()>;
89
90    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
91
92    async fn create_materialized_view(
93        &self,
94        table: PbTable,
95        graph: StreamFragmentGraph,
96        dependencies: HashSet<ObjectId>,
97        resource_type: streaming_job_resource_type::ResourceType,
98        if_not_exists: bool,
99        refresh_interval_sec: Option<u64>,
100    ) -> Result<()>;
101
102    async fn replace_materialized_view(
103        &self,
104        table: PbTable,
105        graph: StreamFragmentGraph,
106    ) -> Result<()>;
107
108    async fn create_table(
109        &self,
110        source: Option<PbSource>,
111        table: PbTable,
112        graph: StreamFragmentGraph,
113        job_type: PbTableJobType,
114        if_not_exists: bool,
115        dependencies: HashSet<ObjectId>,
116    ) -> Result<()>;
117
118    async fn replace_table(
119        &self,
120        source: Option<PbSource>,
121        table: PbTable,
122        graph: StreamFragmentGraph,
123        job_type: TableJobType,
124    ) -> Result<()>;
125
126    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
127
128    async fn create_index(
129        &self,
130        index: PbIndex,
131        table: PbTable,
132        graph: StreamFragmentGraph,
133        if_not_exists: bool,
134    ) -> Result<()>;
135
136    async fn create_source(
137        &self,
138        source: PbSource,
139        graph: Option<StreamFragmentGraph>,
140        if_not_exists: bool,
141    ) -> Result<()>;
142
143    async fn create_sink(
144        &self,
145        sink: PbSink,
146        graph: StreamFragmentGraph,
147        dependencies: HashSet<ObjectId>,
148        if_not_exists: bool,
149    ) -> Result<()>;
150
151    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
152
153    async fn create_function(&self, function: PbFunction) -> Result<()>;
154
155    async fn create_connection(
156        &self,
157        connection_name: String,
158        database_id: DatabaseId,
159        schema_id: SchemaId,
160        owner_id: UserId,
161        connection: create_connection_request::Payload,
162    ) -> Result<()>;
163
164    async fn create_secret(
165        &self,
166        secret_name: String,
167        database_id: DatabaseId,
168        schema_id: SchemaId,
169        owner_id: UserId,
170        payload: Vec<u8>,
171    ) -> Result<()>;
172
173    async fn comment_on(&self, comment: PbComment) -> Result<()>;
174
175    async fn drop_table(
176        &self,
177        source_id: Option<SourceId>,
178        table_id: TableId,
179        cascade: bool,
180    ) -> Result<()>;
181
182    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
183
184    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
185
186    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
187
188    async fn reset_source(&self, source_id: SourceId) -> Result<()>;
189
190    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
191
192    async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
193    -> Result<()>;
194
195    async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
196
197    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
198
199    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
200
201    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
202
203    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
204
205    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
206
207    async fn alter_secret(
208        &self,
209        secret_id: SecretId,
210        secret_name: String,
211        database_id: DatabaseId,
212        schema_id: SchemaId,
213        owner_id: UserId,
214        payload: Vec<u8>,
215    ) -> Result<()>;
216
217    async fn alter_subscription_retention(
218        &self,
219        subscription_id: SubscriptionId,
220        retention_seconds: u64,
221        definition: String,
222    ) -> Result<()>;
223
224    async fn alter_name(
225        &self,
226        object_id: alter_name_request::Object,
227        object_name: &str,
228    ) -> Result<()>;
229
230    async fn alter_owner(
231        &self,
232        object: alter_owner_request::Object,
233        owner_id: UserId,
234    ) -> Result<()>;
235
236    /// Replace the source in the catalog.
237    async fn alter_source(&self, source: PbSource) -> Result<()>;
238
239    async fn alter_parallelism(
240        &self,
241        job_id: JobId,
242        parallelism: PbTableParallelism,
243        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
244        deferred: bool,
245    ) -> Result<()>;
246
247    async fn alter_backfill_parallelism(
248        &self,
249        job_id: JobId,
250        parallelism: Option<PbTableParallelism>,
251        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
252        deferred: bool,
253    ) -> Result<()>;
254
255    async fn alter_config(
256        &self,
257        job_id: JobId,
258        entries_to_add: HashMap<String, String>,
259        keys_to_remove: Vec<String>,
260    ) -> Result<()>;
261
262    async fn alter_resource_group(
263        &self,
264        table_id: TableId,
265        resource_group: Option<String>,
266        deferred: bool,
267    ) -> Result<()>;
268
269    async fn alter_set_schema(
270        &self,
271        object: alter_set_schema_request::Object,
272        new_schema_id: SchemaId,
273    ) -> Result<()>;
274
275    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
276
277    async fn alter_database_param(
278        &self,
279        database_id: DatabaseId,
280        param: AlterDatabaseParam,
281    ) -> Result<()>;
282
283    async fn create_iceberg_table(
284        &self,
285        table_job_info: PbTableJobInfo,
286        sink_job_info: PbSinkJobInfo,
287        iceberg_source: PbSource,
288        if_not_exists: bool,
289    ) -> Result<()>;
290
291    async fn wait(&self, job_id: Option<JobId>) -> Result<()>;
292}
293
294#[derive(Clone)]
295pub struct CatalogWriterImpl {
296    meta_client: MetaClient,
297    catalog_updated_rx: Receiver<CatalogVersion>,
298    hummock_snapshot_manager: HummockSnapshotManagerRef,
299}
300
301#[async_trait::async_trait]
302impl CatalogWriter for CatalogWriterImpl {
303    async fn create_database(
304        &self,
305        db_name: &str,
306        owner: UserId,
307        resource_group: &str,
308        barrier_interval_ms: Option<u32>,
309        checkpoint_frequency: Option<u64>,
310    ) -> Result<()> {
311        let version = self
312            .meta_client
313            .create_database(PbDatabase {
314                name: db_name.to_owned(),
315                id: 0.into(),
316                owner,
317                resource_group: resource_group.to_owned(),
318                barrier_interval_ms,
319                checkpoint_frequency,
320            })
321            .await?;
322        self.wait_version(version).await
323    }
324
325    async fn create_schema(
326        &self,
327        db_id: DatabaseId,
328        schema_name: &str,
329        owner: UserId,
330    ) -> Result<()> {
331        let version = self
332            .meta_client
333            .create_schema(PbSchema {
334                id: 0.into(),
335                name: schema_name.to_owned(),
336                database_id: db_id,
337                owner,
338            })
339            .await?;
340        self.wait_version(version).await
341    }
342
343    // TODO: maybe here to pass a materialize plan node
344    async fn create_materialized_view(
345        &self,
346        table: PbTable,
347        graph: StreamFragmentGraph,
348        dependencies: HashSet<ObjectId>,
349        resource_type: streaming_job_resource_type::ResourceType,
350        if_not_exists: bool,
351        refresh_interval_sec: Option<u64>,
352    ) -> Result<()> {
353        let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
354        let version = self
355            .meta_client
356            .create_materialized_view(
357                table,
358                graph,
359                dependencies,
360                resource_type,
361                if_not_exists,
362                refresh_interval_sec,
363            )
364            .await?;
365        if matches!(create_type, PbCreateType::Foreground) {
366            self.wait_version(version).await?
367        }
368        Ok(())
369    }
370
371    async fn replace_materialized_view(
372        &self,
373        table: PbTable,
374        graph: StreamFragmentGraph,
375    ) -> Result<()> {
376        // TODO: this is a dummy implementation for debugging only.
377        notice_to_user(format!("table: {table:#?}"));
378        notice_to_user(format!("graph: {graph:#?}"));
379
380        let version = self
381            .meta_client
382            .replace_job(
383                graph,
384                ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
385            )
386            .await?;
387
388        self.wait_version(version).await
389    }
390
391    async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
392        let version = self.meta_client.create_view(view, dependencies).await?;
393        self.wait_version(version).await
394    }
395
396    async fn create_index(
397        &self,
398        index: PbIndex,
399        table: PbTable,
400        graph: StreamFragmentGraph,
401        if_not_exists: bool,
402    ) -> Result<()> {
403        let version = self
404            .meta_client
405            .create_index(index, table, graph, if_not_exists)
406            .await?;
407        self.wait_version(version).await
408    }
409
410    async fn create_table(
411        &self,
412        source: Option<PbSource>,
413        table: PbTable,
414        graph: StreamFragmentGraph,
415        job_type: PbTableJobType,
416        if_not_exists: bool,
417        dependencies: HashSet<ObjectId>,
418    ) -> Result<()> {
419        let version = self
420            .meta_client
421            .create_table(source, table, graph, job_type, if_not_exists, dependencies)
422            .await?;
423        self.wait_version(version).await
424    }
425
426    async fn replace_table(
427        &self,
428        source: Option<PbSource>,
429        table: PbTable,
430        graph: StreamFragmentGraph,
431        job_type: TableJobType,
432    ) -> Result<()> {
433        let version = self
434            .meta_client
435            .replace_job(
436                graph,
437                ReplaceJob::ReplaceTable(ReplaceTable {
438                    source,
439                    table: Some(table),
440                    job_type: job_type as _,
441                }),
442            )
443            .await?;
444        self.wait_version(version).await
445    }
446
447    async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
448        let version = self
449            .meta_client
450            .replace_job(
451                graph,
452                ReplaceJob::ReplaceSource(ReplaceSource {
453                    source: Some(source),
454                }),
455            )
456            .await?;
457        self.wait_version(version).await
458    }
459
460    async fn create_source(
461        &self,
462        source: PbSource,
463        graph: Option<StreamFragmentGraph>,
464        if_not_exists: bool,
465    ) -> Result<()> {
466        let version = self
467            .meta_client
468            .create_source(source, graph, if_not_exists)
469            .await?;
470        self.wait_version(version).await
471    }
472
473    async fn create_sink(
474        &self,
475        sink: PbSink,
476        graph: StreamFragmentGraph,
477        dependencies: HashSet<ObjectId>,
478        if_not_exists: bool,
479    ) -> Result<()> {
480        let version = self
481            .meta_client
482            .create_sink(sink, graph, dependencies, if_not_exists)
483            .await?;
484        self.wait_version(version).await
485    }
486
487    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
488        let version = self.meta_client.create_subscription(subscription).await?;
489        self.wait_version(version).await
490    }
491
492    async fn create_function(&self, function: PbFunction) -> Result<()> {
493        let version = self.meta_client.create_function(function).await?;
494        self.wait_version(version).await
495    }
496
497    async fn create_connection(
498        &self,
499        connection_name: String,
500        database_id: DatabaseId,
501        schema_id: SchemaId,
502        owner_id: UserId,
503        connection: create_connection_request::Payload,
504    ) -> Result<()> {
505        let version = self
506            .meta_client
507            .create_connection(
508                connection_name,
509                database_id,
510                schema_id,
511                owner_id,
512                connection,
513            )
514            .await?;
515        self.wait_version(version).await
516    }
517
518    async fn create_secret(
519        &self,
520        secret_name: String,
521        database_id: DatabaseId,
522        schema_id: SchemaId,
523        owner_id: UserId,
524        payload: Vec<u8>,
525    ) -> Result<()> {
526        let version = self
527            .meta_client
528            .create_secret(secret_name, database_id, schema_id, owner_id, payload)
529            .await?;
530        self.wait_version(version).await
531    }
532
533    async fn comment_on(&self, comment: PbComment) -> Result<()> {
534        let version = self.meta_client.comment_on(comment).await?;
535        self.wait_version(version).await
536    }
537
538    async fn drop_table(
539        &self,
540        source_id: Option<SourceId>,
541        table_id: TableId,
542        cascade: bool,
543    ) -> Result<()> {
544        let version = self
545            .meta_client
546            .drop_table(source_id, table_id, cascade)
547            .await?;
548        self.wait_version(version).await
549    }
550
551    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
552        let version = self
553            .meta_client
554            .drop_materialized_view(table_id, cascade)
555            .await?;
556        self.wait_version(version).await
557    }
558
559    async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
560        let version = self.meta_client.drop_view(view_id, cascade).await?;
561        self.wait_version(version).await
562    }
563
564    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
565        let version = self.meta_client.drop_source(source_id, cascade).await?;
566        self.wait_version(version).await
567    }
568
569    async fn reset_source(&self, source_id: SourceId) -> Result<()> {
570        let version = self.meta_client.reset_source(source_id).await?;
571        self.wait_version(version).await
572    }
573
574    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
575        let version = self.meta_client.drop_sink(sink_id, cascade).await?;
576        self.wait_version(version).await
577    }
578
579    async fn drop_subscription(
580        &self,
581        subscription_id: SubscriptionId,
582        cascade: bool,
583    ) -> Result<()> {
584        let version = self
585            .meta_client
586            .drop_subscription(subscription_id, cascade)
587            .await?;
588        self.wait_version(version).await
589    }
590
591    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
592        let version = self.meta_client.drop_index(index_id, cascade).await?;
593        self.wait_version(version).await
594    }
595
596    async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
597        let version = self.meta_client.drop_function(function_id, cascade).await?;
598        self.wait_version(version).await
599    }
600
601    async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
602        let version = self.meta_client.drop_schema(schema_id, cascade).await?;
603        self.wait_version(version).await
604    }
605
606    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
607        let version = self.meta_client.drop_database(database_id).await?;
608        self.wait_version(version).await
609    }
610
611    async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
612        let version = self
613            .meta_client
614            .drop_connection(connection_id, cascade)
615            .await?;
616        self.wait_version(version).await
617    }
618
619    async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
620        let version = self.meta_client.drop_secret(secret_id, cascade).await?;
621        self.wait_version(version).await
622    }
623
624    async fn alter_name(
625        &self,
626        object_id: alter_name_request::Object,
627        object_name: &str,
628    ) -> Result<()> {
629        let version = self.meta_client.alter_name(object_id, object_name).await?;
630        self.wait_version(version).await
631    }
632
633    async fn alter_owner(
634        &self,
635        object: alter_owner_request::Object,
636        owner_id: UserId,
637    ) -> Result<()> {
638        let version = self.meta_client.alter_owner(object, owner_id).await?;
639        self.wait_version(version).await
640    }
641
642    async fn alter_set_schema(
643        &self,
644        object: alter_set_schema_request::Object,
645        new_schema_id: SchemaId,
646    ) -> Result<()> {
647        let version = self
648            .meta_client
649            .alter_set_schema(object, new_schema_id)
650            .await?;
651        self.wait_version(version).await
652    }
653
654    async fn alter_source(&self, source: PbSource) -> Result<()> {
655        let version = self.meta_client.alter_source(source).await?;
656        self.wait_version(version).await
657    }
658
659    async fn alter_parallelism(
660        &self,
661        job_id: JobId,
662        parallelism: PbTableParallelism,
663        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
664        deferred: bool,
665    ) -> Result<()> {
666        self.meta_client
667            .alter_parallelism(job_id, parallelism, adaptive_parallelism_strategy, deferred)
668            .await?;
669        Ok(())
670    }
671
672    async fn alter_backfill_parallelism(
673        &self,
674        job_id: JobId,
675        parallelism: Option<PbTableParallelism>,
676        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
677        deferred: bool,
678    ) -> Result<()> {
679        self.meta_client
680            .alter_backfill_parallelism(
681                job_id,
682                parallelism,
683                adaptive_parallelism_strategy,
684                deferred,
685            )
686            .await?;
687        Ok(())
688    }
689
690    async fn alter_config(
691        &self,
692        job_id: JobId,
693        entries_to_add: HashMap<String, String>,
694        keys_to_remove: Vec<String>,
695    ) -> Result<()> {
696        self.meta_client
697            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
698            .await?;
699        Ok(())
700    }
701
702    async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
703        let version = self.meta_client.alter_swap_rename(object).await?;
704        self.wait_version(version).await
705    }
706
707    async fn alter_secret(
708        &self,
709        secret_id: SecretId,
710        secret_name: String,
711        database_id: DatabaseId,
712        schema_id: SchemaId,
713        owner_id: UserId,
714        payload: Vec<u8>,
715    ) -> Result<()> {
716        let version = self
717            .meta_client
718            .alter_secret(
719                secret_id,
720                secret_name,
721                database_id,
722                schema_id,
723                owner_id,
724                payload,
725            )
726            .await?;
727        self.wait_version(version).await
728    }
729
730    async fn alter_subscription_retention(
731        &self,
732        subscription_id: SubscriptionId,
733        retention_seconds: u64,
734        definition: String,
735    ) -> Result<()> {
736        let version = self
737            .meta_client
738            .alter_subscription_retention(subscription_id, retention_seconds, definition)
739            .await?;
740        self.wait_version(version).await
741    }
742
743    async fn alter_resource_group(
744        &self,
745        table_id: TableId,
746        resource_group: Option<String>,
747        deferred: bool,
748    ) -> Result<()> {
749        self.meta_client
750            .alter_resource_group(table_id, resource_group, deferred)
751            .await
752            .map_err(|e| anyhow!(e))?;
753
754        Ok(())
755    }
756
757    async fn alter_database_param(
758        &self,
759        database_id: DatabaseId,
760        param: AlterDatabaseParam,
761    ) -> Result<()> {
762        let version = self
763            .meta_client
764            .alter_database_param(database_id, param)
765            .await
766            .map_err(|e| anyhow!(e))?;
767        self.wait_version(version).await
768    }
769
770    async fn create_iceberg_table(
771        &self,
772        table_job_info: PbTableJobInfo,
773        sink_job_info: PbSinkJobInfo,
774        iceberg_source: PbSource,
775        if_not_exists: bool,
776    ) -> Result<()> {
777        let version = Box::pin(self.meta_client.create_iceberg_table(
778            table_job_info,
779            sink_job_info,
780            iceberg_source,
781            if_not_exists,
782        ))
783        .await?;
784        self.wait_version(version).await
785    }
786
787    async fn wait(&self, job_id: Option<JobId>) -> Result<()> {
788        let version = self
789            .meta_client
790            .wait(job_id)
791            .await
792            .map_err(|e| anyhow!(e))?;
793        self.wait_version(version).await
794    }
795}
796
797impl CatalogWriterImpl {
798    pub fn new(
799        meta_client: MetaClient,
800        catalog_updated_rx: Receiver<CatalogVersion>,
801        hummock_snapshot_manager: HummockSnapshotManagerRef,
802    ) -> Self {
803        Self {
804            meta_client,
805            catalog_updated_rx,
806            hummock_snapshot_manager,
807        }
808    }
809
810    async fn wait_version(&self, version: WaitVersion) -> Result<()> {
811        let mut rx = self.catalog_updated_rx.clone();
812        while *rx.borrow_and_update() < version.catalog_version {
813            rx.changed().await.map_err(|e| anyhow!(e))?;
814        }
815        self.hummock_snapshot_manager
816            .wait(version.hummock_version_id)
817            .await;
818        Ok(())
819    }
820}