1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::lock_api::ArcRwLockReadGuard;
20use parking_lot::{RawRwLock, RwLock};
21use risingwave_common::catalog::{
22 AlterDatabaseParam, CatalogVersion, FunctionId, IndexId, ObjectId,
23};
24use risingwave_common::id::{ConnectionId, JobId, SchemaId, SourceId, ViewId};
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_pb::catalog::{
27 PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
28 PbSubscription, PbTable, PbView,
29};
30use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
31use risingwave_pb::ddl_service::replace_job_plan::{
32 ReplaceJob, ReplaceMaterializedView, ReplaceSource, ReplaceTable,
33};
34use risingwave_pb::ddl_service::{
35 PbTableJobType, TableJobType, WaitVersion, alter_name_request, alter_owner_request,
36 alter_set_schema_request, alter_swap_rename_request, create_connection_request,
37 streaming_job_resource_type,
38};
39use risingwave_pb::meta::PbTableParallelism;
40use risingwave_pb::stream_plan::StreamFragmentGraph;
41use risingwave_rpc_client::MetaClient;
42use tokio::sync::watch::Receiver;
43
44use super::root_catalog::Catalog;
45use super::{DatabaseId, SecretId, SinkId, SubscriptionId, TableId};
46use crate::error::Result;
47use crate::scheduler::HummockSnapshotManagerRef;
48use crate::session::current::notice_to_user;
49use crate::user::UserId;
50
51pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
52
53#[derive(Clone)]
55pub struct CatalogReader(Arc<RwLock<Catalog>>);
56
57impl CatalogReader {
58 pub fn new(inner: Arc<RwLock<Catalog>>) -> Self {
59 CatalogReader(inner)
60 }
61
62 pub fn read_guard(&self) -> CatalogReadGuard {
63 self.0.read_arc_recursive()
65 }
66}
67
68#[async_trait::async_trait]
73pub trait CatalogWriter: Send + Sync {
74 async fn create_database(
75 &self,
76 db_name: &str,
77 owner: UserId,
78 resource_group: &str,
79 barrier_interval_ms: Option<u32>,
80 checkpoint_frequency: Option<u64>,
81 ) -> Result<()>;
82
83 async fn create_schema(
84 &self,
85 db_id: DatabaseId,
86 schema_name: &str,
87 owner: UserId,
88 ) -> Result<()>;
89
90 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()>;
91
92 async fn create_materialized_view(
93 &self,
94 table: PbTable,
95 graph: StreamFragmentGraph,
96 dependencies: HashSet<ObjectId>,
97 resource_type: streaming_job_resource_type::ResourceType,
98 if_not_exists: bool,
99 refresh_interval_sec: Option<u64>,
100 ) -> Result<()>;
101
102 async fn replace_materialized_view(
103 &self,
104 table: PbTable,
105 graph: StreamFragmentGraph,
106 ) -> Result<()>;
107
108 async fn create_table(
109 &self,
110 source: Option<PbSource>,
111 table: PbTable,
112 graph: StreamFragmentGraph,
113 job_type: PbTableJobType,
114 if_not_exists: bool,
115 dependencies: HashSet<ObjectId>,
116 ) -> Result<()>;
117
118 async fn replace_table(
119 &self,
120 source: Option<PbSource>,
121 table: PbTable,
122 graph: StreamFragmentGraph,
123 job_type: TableJobType,
124 ) -> Result<()>;
125
126 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()>;
127
128 async fn create_index(
129 &self,
130 index: PbIndex,
131 table: PbTable,
132 graph: StreamFragmentGraph,
133 resource_type: streaming_job_resource_type::ResourceType,
134 if_not_exists: bool,
135 ) -> Result<()>;
136
137 async fn create_source(
138 &self,
139 source: PbSource,
140 graph: Option<StreamFragmentGraph>,
141 if_not_exists: bool,
142 ) -> Result<()>;
143
144 async fn create_sink(
145 &self,
146 sink: PbSink,
147 graph: StreamFragmentGraph,
148 dependencies: HashSet<ObjectId>,
149 resource_type: streaming_job_resource_type::ResourceType,
150 if_not_exists: bool,
151 ) -> Result<()>;
152
153 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()>;
154
155 async fn create_function(&self, function: PbFunction) -> Result<()>;
156
157 async fn create_connection(
158 &self,
159 connection_name: String,
160 database_id: DatabaseId,
161 schema_id: SchemaId,
162 owner_id: UserId,
163 connection: create_connection_request::Payload,
164 ) -> Result<()>;
165
166 async fn create_secret(
167 &self,
168 secret_name: String,
169 database_id: DatabaseId,
170 schema_id: SchemaId,
171 owner_id: UserId,
172 payload: Vec<u8>,
173 ) -> Result<()>;
174
175 async fn comment_on(&self, comment: PbComment) -> Result<()>;
176
177 async fn drop_table(
178 &self,
179 source_id: Option<SourceId>,
180 table_id: TableId,
181 cascade: bool,
182 ) -> Result<()>;
183
184 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;
185
186 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()>;
187
188 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()>;
189
190 async fn reset_source(&self, source_id: SourceId) -> Result<()>;
191
192 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()>;
193
194 async fn drop_subscription(&self, subscription_id: SubscriptionId, cascade: bool)
195 -> Result<()>;
196
197 async fn drop_database(&self, database_id: DatabaseId) -> Result<()>;
198
199 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()>;
200
201 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;
202
203 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()>;
204
205 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()>;
206
207 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()>;
208
209 async fn alter_secret(
210 &self,
211 secret_id: SecretId,
212 secret_name: String,
213 database_id: DatabaseId,
214 schema_id: SchemaId,
215 owner_id: UserId,
216 payload: Vec<u8>,
217 ) -> Result<()>;
218
219 async fn alter_subscription_retention(
220 &self,
221 subscription_id: SubscriptionId,
222 retention_seconds: u64,
223 definition: String,
224 ) -> Result<()>;
225
226 async fn alter_name(
227 &self,
228 object_id: alter_name_request::Object,
229 object_name: &str,
230 ) -> Result<()>;
231
232 async fn alter_owner(
233 &self,
234 object: alter_owner_request::Object,
235 owner_id: UserId,
236 ) -> Result<()>;
237
238 async fn alter_source(&self, source: PbSource) -> Result<()>;
240
241 async fn alter_parallelism(
242 &self,
243 job_id: JobId,
244 parallelism: PbTableParallelism,
245 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
246 deferred: bool,
247 ) -> Result<()>;
248
249 async fn alter_backfill_parallelism(
250 &self,
251 job_id: JobId,
252 parallelism: Option<PbTableParallelism>,
253 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
254 deferred: bool,
255 ) -> Result<()>;
256
257 async fn alter_config(
258 &self,
259 job_id: JobId,
260 entries_to_add: HashMap<String, String>,
261 keys_to_remove: Vec<String>,
262 ) -> Result<()>;
263
264 async fn alter_resource_group(
265 &self,
266 job_id: JobId,
267 resource_group: Option<String>,
268 deferred: bool,
269 ) -> Result<()>;
270
271 async fn alter_database_resource_group(
272 &self,
273 database_id: DatabaseId,
274 resource_group: Option<String>,
275 deferred: bool,
276 ) -> Result<()>;
277
278 async fn alter_set_schema(
279 &self,
280 object: alter_set_schema_request::Object,
281 new_schema_id: SchemaId,
282 ) -> Result<()>;
283
284 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
285
286 async fn alter_database_param(
287 &self,
288 database_id: DatabaseId,
289 param: AlterDatabaseParam,
290 ) -> Result<()>;
291
292 async fn create_iceberg_table(
293 &self,
294 table_job_info: PbTableJobInfo,
295 sink_job_info: PbSinkJobInfo,
296 iceberg_source: PbSource,
297 if_not_exists: bool,
298 ) -> Result<()>;
299
300 async fn wait(&self, job_id: Option<JobId>) -> Result<()>;
301}
302
303#[derive(Clone)]
304pub struct CatalogWriterImpl {
305 meta_client: MetaClient,
306 catalog_updated_rx: Receiver<CatalogVersion>,
307 hummock_snapshot_manager: HummockSnapshotManagerRef,
308}
309
310#[async_trait::async_trait]
311impl CatalogWriter for CatalogWriterImpl {
312 async fn create_database(
313 &self,
314 db_name: &str,
315 owner: UserId,
316 resource_group: &str,
317 barrier_interval_ms: Option<u32>,
318 checkpoint_frequency: Option<u64>,
319 ) -> Result<()> {
320 let version = self
321 .meta_client
322 .create_database(PbDatabase {
323 name: db_name.to_owned(),
324 id: 0.into(),
325 owner,
326 resource_group: resource_group.to_owned(),
327 barrier_interval_ms,
328 checkpoint_frequency,
329 })
330 .await?;
331 self.wait_version(version).await
332 }
333
334 async fn create_schema(
335 &self,
336 db_id: DatabaseId,
337 schema_name: &str,
338 owner: UserId,
339 ) -> Result<()> {
340 let version = self
341 .meta_client
342 .create_schema(PbSchema {
343 id: 0.into(),
344 name: schema_name.to_owned(),
345 database_id: db_id,
346 owner,
347 })
348 .await?;
349 self.wait_version(version).await
350 }
351
352 async fn create_materialized_view(
354 &self,
355 table: PbTable,
356 graph: StreamFragmentGraph,
357 dependencies: HashSet<ObjectId>,
358 resource_type: streaming_job_resource_type::ResourceType,
359 if_not_exists: bool,
360 refresh_interval_sec: Option<u64>,
361 ) -> Result<()> {
362 let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
363 let version = self
364 .meta_client
365 .create_materialized_view(
366 table,
367 graph,
368 dependencies,
369 resource_type,
370 if_not_exists,
371 refresh_interval_sec,
372 )
373 .await?;
374 if matches!(create_type, PbCreateType::Foreground) {
375 self.wait_version(version).await?
376 }
377 Ok(())
378 }
379
380 async fn replace_materialized_view(
381 &self,
382 table: PbTable,
383 graph: StreamFragmentGraph,
384 ) -> Result<()> {
385 notice_to_user(format!("table: {table:#?}"));
387 notice_to_user(format!("graph: {graph:#?}"));
388
389 let version = self
390 .meta_client
391 .replace_job(
392 graph,
393 ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView { table: Some(table) }),
394 )
395 .await?;
396
397 self.wait_version(version).await
398 }
399
400 async fn create_view(&self, view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
401 let version = self.meta_client.create_view(view, dependencies).await?;
402 self.wait_version(version).await
403 }
404
405 async fn create_index(
406 &self,
407 index: PbIndex,
408 table: PbTable,
409 graph: StreamFragmentGraph,
410 resource_type: streaming_job_resource_type::ResourceType,
411 if_not_exists: bool,
412 ) -> Result<()> {
413 let version = self
414 .meta_client
415 .create_index(index, table, graph, resource_type, if_not_exists)
416 .await?;
417 self.wait_version(version).await
418 }
419
420 async fn create_table(
421 &self,
422 source: Option<PbSource>,
423 table: PbTable,
424 graph: StreamFragmentGraph,
425 job_type: PbTableJobType,
426 if_not_exists: bool,
427 dependencies: HashSet<ObjectId>,
428 ) -> Result<()> {
429 let version = self
430 .meta_client
431 .create_table(source, table, graph, job_type, if_not_exists, dependencies)
432 .await?;
433 self.wait_version(version).await
434 }
435
436 async fn replace_table(
437 &self,
438 source: Option<PbSource>,
439 table: PbTable,
440 graph: StreamFragmentGraph,
441 job_type: TableJobType,
442 ) -> Result<()> {
443 let version = self
444 .meta_client
445 .replace_job(
446 graph,
447 ReplaceJob::ReplaceTable(ReplaceTable {
448 source,
449 table: Some(table),
450 job_type: job_type as _,
451 }),
452 )
453 .await?;
454 self.wait_version(version).await
455 }
456
457 async fn replace_source(&self, source: PbSource, graph: StreamFragmentGraph) -> Result<()> {
458 let version = self
459 .meta_client
460 .replace_job(
461 graph,
462 ReplaceJob::ReplaceSource(ReplaceSource {
463 source: Some(source),
464 }),
465 )
466 .await?;
467 self.wait_version(version).await
468 }
469
470 async fn create_source(
471 &self,
472 source: PbSource,
473 graph: Option<StreamFragmentGraph>,
474 if_not_exists: bool,
475 ) -> Result<()> {
476 let version = self
477 .meta_client
478 .create_source(source, graph, if_not_exists)
479 .await?;
480 self.wait_version(version).await
481 }
482
483 async fn create_sink(
484 &self,
485 sink: PbSink,
486 graph: StreamFragmentGraph,
487 dependencies: HashSet<ObjectId>,
488 resource_type: streaming_job_resource_type::ResourceType,
489 if_not_exists: bool,
490 ) -> Result<()> {
491 let version = self
492 .meta_client
493 .create_sink(sink, graph, dependencies, resource_type, if_not_exists)
494 .await?;
495 self.wait_version(version).await
496 }
497
498 async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
499 let version = self.meta_client.create_subscription(subscription).await?;
500 self.wait_version(version).await
501 }
502
503 async fn create_function(&self, function: PbFunction) -> Result<()> {
504 let version = self.meta_client.create_function(function).await?;
505 self.wait_version(version).await
506 }
507
508 async fn create_connection(
509 &self,
510 connection_name: String,
511 database_id: DatabaseId,
512 schema_id: SchemaId,
513 owner_id: UserId,
514 connection: create_connection_request::Payload,
515 ) -> Result<()> {
516 let version = self
517 .meta_client
518 .create_connection(
519 connection_name,
520 database_id,
521 schema_id,
522 owner_id,
523 connection,
524 )
525 .await?;
526 self.wait_version(version).await
527 }
528
529 async fn create_secret(
530 &self,
531 secret_name: String,
532 database_id: DatabaseId,
533 schema_id: SchemaId,
534 owner_id: UserId,
535 payload: Vec<u8>,
536 ) -> Result<()> {
537 let version = self
538 .meta_client
539 .create_secret(secret_name, database_id, schema_id, owner_id, payload)
540 .await?;
541 self.wait_version(version).await
542 }
543
544 async fn comment_on(&self, comment: PbComment) -> Result<()> {
545 let version = self.meta_client.comment_on(comment).await?;
546 self.wait_version(version).await
547 }
548
549 async fn drop_table(
550 &self,
551 source_id: Option<SourceId>,
552 table_id: TableId,
553 cascade: bool,
554 ) -> Result<()> {
555 let version = self
556 .meta_client
557 .drop_table(source_id, table_id, cascade)
558 .await?;
559 self.wait_version(version).await
560 }
561
562 async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
563 let version = self
564 .meta_client
565 .drop_materialized_view(table_id, cascade)
566 .await?;
567 self.wait_version(version).await
568 }
569
570 async fn drop_view(&self, view_id: ViewId, cascade: bool) -> Result<()> {
571 let version = self.meta_client.drop_view(view_id, cascade).await?;
572 self.wait_version(version).await
573 }
574
575 async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
576 let version = self.meta_client.drop_source(source_id, cascade).await?;
577 self.wait_version(version).await
578 }
579
580 async fn reset_source(&self, source_id: SourceId) -> Result<()> {
581 let version = self.meta_client.reset_source(source_id).await?;
582 self.wait_version(version).await
583 }
584
585 async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
586 let version = self.meta_client.drop_sink(sink_id, cascade).await?;
587 self.wait_version(version).await
588 }
589
590 async fn drop_subscription(
591 &self,
592 subscription_id: SubscriptionId,
593 cascade: bool,
594 ) -> Result<()> {
595 let version = self
596 .meta_client
597 .drop_subscription(subscription_id, cascade)
598 .await?;
599 self.wait_version(version).await
600 }
601
602 async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
603 let version = self.meta_client.drop_index(index_id, cascade).await?;
604 self.wait_version(version).await
605 }
606
607 async fn drop_function(&self, function_id: FunctionId, cascade: bool) -> Result<()> {
608 let version = self.meta_client.drop_function(function_id, cascade).await?;
609 self.wait_version(version).await
610 }
611
612 async fn drop_schema(&self, schema_id: SchemaId, cascade: bool) -> Result<()> {
613 let version = self.meta_client.drop_schema(schema_id, cascade).await?;
614 self.wait_version(version).await
615 }
616
617 async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
618 let version = self.meta_client.drop_database(database_id).await?;
619 self.wait_version(version).await
620 }
621
622 async fn drop_connection(&self, connection_id: ConnectionId, cascade: bool) -> Result<()> {
623 let version = self
624 .meta_client
625 .drop_connection(connection_id, cascade)
626 .await?;
627 self.wait_version(version).await
628 }
629
630 async fn drop_secret(&self, secret_id: SecretId, cascade: bool) -> Result<()> {
631 let version = self.meta_client.drop_secret(secret_id, cascade).await?;
632 self.wait_version(version).await
633 }
634
635 async fn alter_name(
636 &self,
637 object_id: alter_name_request::Object,
638 object_name: &str,
639 ) -> Result<()> {
640 let version = self.meta_client.alter_name(object_id, object_name).await?;
641 self.wait_version(version).await
642 }
643
644 async fn alter_owner(
645 &self,
646 object: alter_owner_request::Object,
647 owner_id: UserId,
648 ) -> Result<()> {
649 let version = self.meta_client.alter_owner(object, owner_id).await?;
650 self.wait_version(version).await
651 }
652
653 async fn alter_set_schema(
654 &self,
655 object: alter_set_schema_request::Object,
656 new_schema_id: SchemaId,
657 ) -> Result<()> {
658 let version = self
659 .meta_client
660 .alter_set_schema(object, new_schema_id)
661 .await?;
662 self.wait_version(version).await
663 }
664
665 async fn alter_source(&self, source: PbSource) -> Result<()> {
666 let version = self.meta_client.alter_source(source).await?;
667 self.wait_version(version).await
668 }
669
670 async fn alter_parallelism(
671 &self,
672 job_id: JobId,
673 parallelism: PbTableParallelism,
674 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
675 deferred: bool,
676 ) -> Result<()> {
677 self.meta_client
678 .alter_parallelism(job_id, parallelism, adaptive_parallelism_strategy, deferred)
679 .await?;
680 Ok(())
681 }
682
683 async fn alter_backfill_parallelism(
684 &self,
685 job_id: JobId,
686 parallelism: Option<PbTableParallelism>,
687 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
688 deferred: bool,
689 ) -> Result<()> {
690 self.meta_client
691 .alter_backfill_parallelism(
692 job_id,
693 parallelism,
694 adaptive_parallelism_strategy,
695 deferred,
696 )
697 .await?;
698 Ok(())
699 }
700
701 async fn alter_config(
702 &self,
703 job_id: JobId,
704 entries_to_add: HashMap<String, String>,
705 keys_to_remove: Vec<String>,
706 ) -> Result<()> {
707 self.meta_client
708 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
709 .await?;
710 Ok(())
711 }
712
713 async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
714 let version = self.meta_client.alter_swap_rename(object).await?;
715 self.wait_version(version).await
716 }
717
718 async fn alter_secret(
719 &self,
720 secret_id: SecretId,
721 secret_name: String,
722 database_id: DatabaseId,
723 schema_id: SchemaId,
724 owner_id: UserId,
725 payload: Vec<u8>,
726 ) -> Result<()> {
727 let version = self
728 .meta_client
729 .alter_secret(
730 secret_id,
731 secret_name,
732 database_id,
733 schema_id,
734 owner_id,
735 payload,
736 )
737 .await?;
738 self.wait_version(version).await
739 }
740
741 async fn alter_subscription_retention(
742 &self,
743 subscription_id: SubscriptionId,
744 retention_seconds: u64,
745 definition: String,
746 ) -> Result<()> {
747 let version = self
748 .meta_client
749 .alter_subscription_retention(subscription_id, retention_seconds, definition)
750 .await?;
751 self.wait_version(version).await
752 }
753
754 async fn alter_resource_group(
755 &self,
756 job_id: JobId,
757 resource_group: Option<String>,
758 deferred: bool,
759 ) -> Result<()> {
760 self.meta_client
761 .alter_resource_group(job_id, resource_group, deferred)
762 .await
763 .map_err(|e| anyhow!(e))?;
764
765 Ok(())
766 }
767
768 async fn alter_database_resource_group(
769 &self,
770 database_id: DatabaseId,
771 resource_group: Option<String>,
772 deferred: bool,
773 ) -> Result<()> {
774 let version = self
775 .meta_client
776 .alter_database_resource_group(database_id, resource_group, deferred)
777 .await
778 .map_err(|e| anyhow!(e))?;
779 self.wait_version(version).await
780 }
781
782 async fn alter_database_param(
783 &self,
784 database_id: DatabaseId,
785 param: AlterDatabaseParam,
786 ) -> Result<()> {
787 let version = self
788 .meta_client
789 .alter_database_param(database_id, param)
790 .await
791 .map_err(|e| anyhow!(e))?;
792 self.wait_version(version).await
793 }
794
795 async fn create_iceberg_table(
796 &self,
797 table_job_info: PbTableJobInfo,
798 sink_job_info: PbSinkJobInfo,
799 iceberg_source: PbSource,
800 if_not_exists: bool,
801 ) -> Result<()> {
802 let version = Box::pin(self.meta_client.create_iceberg_table(
803 table_job_info,
804 sink_job_info,
805 iceberg_source,
806 if_not_exists,
807 ))
808 .await?;
809 self.wait_version(version).await
810 }
811
812 async fn wait(&self, job_id: Option<JobId>) -> Result<()> {
813 let version = self
814 .meta_client
815 .wait(job_id)
816 .await
817 .map_err(|e| anyhow!(e))?;
818 self.wait_version(version).await
819 }
820}
821
822impl CatalogWriterImpl {
823 pub fn new(
824 meta_client: MetaClient,
825 catalog_updated_rx: Receiver<CatalogVersion>,
826 hummock_snapshot_manager: HummockSnapshotManagerRef,
827 ) -> Self {
828 Self {
829 meta_client,
830 catalog_updated_rx,
831 hummock_snapshot_manager,
832 }
833 }
834
835 async fn wait_version(&self, version: WaitVersion) -> Result<()> {
836 let mut rx = self.catalog_updated_rx.clone();
837 while *rx.borrow_and_update() < version.catalog_version {
838 rx.changed().await.map_err(|e| anyhow!(e))?;
839 }
840 self.hummock_snapshot_manager
841 .wait(version.hummock_version_id)
842 .await;
843 Ok(())
844 }
845}