risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::change_log::TableChangeLogs;
40use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
41use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
42use risingwave_pb::backup_service::MetaSnapshotMetadata;
43use risingwave_pb::catalog::{
44    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
45    PbSubscription, PbTable, PbView, Table,
46};
47use risingwave_pb::common::{PbObjectType, WorkerNode};
48use risingwave_pb::ddl_service::alter_owner_request::Object;
49use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
50use risingwave_pb::ddl_service::{
51    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
52    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
53};
54use risingwave_pb::hummock::write_limits::WriteLimit;
55use risingwave_pb::hummock::{
56    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
57};
58use risingwave_pb::id::ActorId;
59use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
60use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
61use risingwave_pb::meta::list_actor_states_response::ActorState;
62use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
63use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
64use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
65use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
66use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
67use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
68use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
69use risingwave_pb::meta::{
70    EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
71    PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
72    list_sink_log_store_tables_response,
73};
74use risingwave_pb::secret::PbSecretRef;
75use risingwave_pb::stream_plan::StreamFragmentGraph;
76use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
77use risingwave_pb::user::update_user_request::UpdateField;
78use risingwave_pb::user::{GrantPrivilege, UserInfo};
79use risingwave_rpc_client::error::Result as RpcResult;
80use tempfile::{Builder, NamedTempFile};
81
82use crate::FrontendOpts;
83use crate::catalog::catalog_service::CatalogWriter;
84use crate::catalog::root_catalog::Catalog;
85use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
86use crate::error::{ErrorCode, Result, RwError};
87use crate::handler::RwPgResponse;
88use crate::meta_client::FrontendMetaClient;
89use crate::scheduler::HummockSnapshotManagerRef;
90use crate::session::{AuthContext, FrontendEnv, SessionImpl};
91use crate::user::UserId;
92use crate::user::user_manager::UserInfoManager;
93use crate::user::user_service::UserInfoWriter;
94
95/// An embedded frontend without starting meta and without starting frontend as a tcp server.
96pub struct LocalFrontend {
97    pub opts: FrontendOpts,
98    env: FrontendEnv,
99}
100
101impl SessionManager for LocalFrontend {
102    type Error = RwError;
103    type Session = SessionImpl;
104
105    fn create_dummy_session(
106        &self,
107        _database_id: DatabaseId,
108    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
109        unreachable!()
110    }
111
112    fn connect(
113        &self,
114        _database: &str,
115        _user_name: &str,
116        _peer_addr: AddressRef,
117    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
118        Ok(self.session_ref())
119    }
120
121    fn cancel_queries_in_session(&self, _session_id: SessionId) {
122        unreachable!()
123    }
124
125    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
126        unreachable!()
127    }
128
129    fn end_session(&self, _session: &Self::Session) {
130        unreachable!()
131    }
132}
133
134impl LocalFrontend {
135    #[expect(clippy::unused_async)]
136    pub async fn new(opts: FrontendOpts) -> Self {
137        let env = FrontendEnv::mock();
138        Self { opts, env }
139    }
140
141    pub async fn run_sql(
142        &self,
143        sql: impl Into<String>,
144    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
145        let sql: Arc<str> = Arc::from(sql.into());
146        Box::pin(self.session_ref().run_statement(sql, vec![])).await
147    }
148
149    pub async fn run_sql_with_session(
150        &self,
151        session_ref: Arc<SessionImpl>,
152        sql: impl Into<String>,
153    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
154        let sql: Arc<str> = Arc::from(sql.into());
155        Box::pin(session_ref.run_statement(sql, vec![])).await
156    }
157
158    pub async fn run_user_sql(
159        &self,
160        sql: impl Into<String>,
161        database: String,
162        user_name: String,
163        user_id: UserId,
164    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
165        let sql: Arc<str> = Arc::from(sql.into());
166        Box::pin(
167            self.session_user_ref(database, user_name, user_id)
168                .run_statement(sql, vec![]),
169        )
170        .await
171    }
172
173    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
174        let mut rsp = self.run_sql(sql).await.unwrap();
175        let mut res = vec![];
176        #[for_await]
177        for row_set in rsp.values_stream() {
178            for row in row_set.unwrap() {
179                res.push(format!("{:?}", row));
180            }
181        }
182        res
183    }
184
185    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
186        let mut rsp = self.run_sql(sql).await.unwrap();
187        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
188        let mut res = String::new();
189        #[for_await]
190        for row_set in rsp.values_stream() {
191            for row in row_set.unwrap() {
192                let row: Row = row;
193                let row = row.values()[0].as_ref().unwrap();
194                res += std::str::from_utf8(row).unwrap();
195                res += "\n";
196            }
197        }
198        res
199    }
200
201    /// Creates a new session
202    pub fn session_ref(&self) -> Arc<SessionImpl> {
203        self.session_user_ref(
204            DEFAULT_DATABASE_NAME.to_owned(),
205            DEFAULT_SUPER_USER.to_owned(),
206            DEFAULT_SUPER_USER_ID,
207        )
208    }
209
210    pub fn session_user_ref(
211        &self,
212        database: String,
213        user_name: String,
214        user_id: UserId,
215    ) -> Arc<SessionImpl> {
216        Arc::new(SessionImpl::new(
217            self.env.clone(),
218            AuthContext::new(database, user_name, user_id),
219            UserAuthenticator::None,
220            // Local Frontend use a non-sense id.
221            (0, 0),
222            Address::Tcp(SocketAddr::new(
223                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
224                6666,
225            ))
226            .into(),
227            Default::default(),
228        ))
229    }
230}
231
232pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
233    if rsp.stmt_type() != StatementType::EXPLAIN {
234        panic!("RESPONSE INVALID: {rsp:?}");
235    }
236    let mut res = String::new();
237    #[for_await]
238    for row_set in rsp.values_stream() {
239        for row in row_set.unwrap() {
240            let row: Row = row;
241            let row = row.values()[0].as_ref().unwrap();
242            res += std::str::from_utf8(row).unwrap();
243            res += "\n";
244        }
245    }
246    res
247}
248
249pub struct MockCatalogWriter {
250    catalog: Arc<RwLock<Catalog>>,
251    id: AtomicU32,
252    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
253    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
254    hummock_snapshot_manager: HummockSnapshotManagerRef,
255}
256
257#[async_trait::async_trait]
258impl CatalogWriter for MockCatalogWriter {
259    async fn create_database(
260        &self,
261        db_name: &str,
262        owner: UserId,
263        resource_group: &str,
264        barrier_interval_ms: Option<u32>,
265        checkpoint_frequency: Option<u64>,
266    ) -> Result<()> {
267        let database_id = DatabaseId::new(self.gen_id());
268        self.catalog.write().create_database(&PbDatabase {
269            name: db_name.to_owned(),
270            id: database_id,
271            owner,
272            resource_group: resource_group.to_owned(),
273            barrier_interval_ms,
274            checkpoint_frequency,
275        });
276        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
277            .await?;
278        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
279            .await?;
280        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
281            .await?;
282        Ok(())
283    }
284
285    async fn create_schema(
286        &self,
287        db_id: DatabaseId,
288        schema_name: &str,
289        owner: UserId,
290    ) -> Result<()> {
291        let id = self.gen_id();
292        self.catalog.write().create_schema(&PbSchema {
293            id,
294            name: schema_name.to_owned(),
295            database_id: db_id,
296            owner,
297        });
298        self.add_schema_id(id, db_id);
299        Ok(())
300    }
301
302    async fn create_materialized_view(
303        &self,
304        mut table: PbTable,
305        _graph: StreamFragmentGraph,
306        dependencies: HashSet<ObjectId>,
307        _resource_type: streaming_job_resource_type::ResourceType,
308        _if_not_exists: bool,
309    ) -> Result<()> {
310        table.id = self.gen_id();
311        table.stream_job_status = PbStreamJobStatus::Created as _;
312        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
313        self.catalog.write().create_table(&table);
314        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
315        self.insert_object_dependencies(table.id.as_object_id(), dependencies);
316        self.hummock_snapshot_manager.add_table_for_test(table.id);
317        Ok(())
318    }
319
320    async fn replace_materialized_view(
321        &self,
322        mut table: PbTable,
323        _graph: StreamFragmentGraph,
324    ) -> Result<()> {
325        table.stream_job_status = PbStreamJobStatus::Created as _;
326        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
327        self.catalog.write().update_table(&table);
328        Ok(())
329    }
330
331    async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
332        view.id = self.gen_id();
333        self.catalog.write().create_view(&view);
334        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
335        self.insert_object_dependencies(view.id.as_object_id(), dependencies);
336        Ok(())
337    }
338
339    async fn create_table(
340        &self,
341        source: Option<PbSource>,
342        mut table: PbTable,
343        graph: StreamFragmentGraph,
344        _job_type: PbTableJobType,
345        if_not_exists: bool,
346        dependencies: HashSet<ObjectId>,
347    ) -> Result<()> {
348        if let Some(source) = source {
349            let source_id = self.create_source_inner(source)?;
350            table.optional_associated_source_id = Some(source_id.into());
351        }
352        self.create_materialized_view(
353            table,
354            graph,
355            dependencies,
356            streaming_job_resource_type::ResourceType::Regular(true),
357            if_not_exists,
358        )
359        .await?;
360        Ok(())
361    }
362
363    async fn replace_table(
364        &self,
365        _source: Option<PbSource>,
366        mut table: PbTable,
367        _graph: StreamFragmentGraph,
368        _job_type: TableJobType,
369    ) -> Result<()> {
370        table.stream_job_status = PbStreamJobStatus::Created as _;
371        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
372        self.catalog.write().update_table(&table);
373        Ok(())
374    }
375
376    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
377        self.catalog.write().update_source(&source);
378        Ok(())
379    }
380
381    async fn create_source(
382        &self,
383        source: PbSource,
384        _graph: Option<StreamFragmentGraph>,
385        _if_not_exists: bool,
386    ) -> Result<()> {
387        self.create_source_inner(source).map(|_| ())
388    }
389
390    async fn create_sink(
391        &self,
392        sink: PbSink,
393        graph: StreamFragmentGraph,
394        dependencies: HashSet<ObjectId>,
395        _if_not_exists: bool,
396    ) -> Result<()> {
397        let sink_id = self.create_sink_inner(sink, graph)?;
398        self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
399        Ok(())
400    }
401
402    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
403        self.create_subscription_inner(subscription)
404    }
405
406    async fn create_index(
407        &self,
408        mut index: PbIndex,
409        mut index_table: PbTable,
410        _graph: StreamFragmentGraph,
411        _if_not_exists: bool,
412    ) -> Result<()> {
413        index_table.id = self.gen_id();
414        index_table.stream_job_status = PbStreamJobStatus::Created as _;
415        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
416        self.catalog.write().create_table(&index_table);
417        self.add_table_or_index_id(
418            index_table.id.as_raw_id(),
419            index_table.schema_id,
420            index_table.database_id,
421        );
422
423        index.id = index_table.id.as_raw_id().into();
424        index.index_table_id = index_table.id;
425        self.catalog.write().create_index(&index);
426        Ok(())
427    }
428
429    async fn create_function(&self, _function: PbFunction) -> Result<()> {
430        unreachable!()
431    }
432
433    async fn create_connection(
434        &self,
435        _connection_name: String,
436        _database_id: DatabaseId,
437        _schema_id: SchemaId,
438        _owner_id: UserId,
439        _connection: create_connection_request::Payload,
440    ) -> Result<()> {
441        unreachable!()
442    }
443
444    async fn create_secret(
445        &self,
446        _secret_name: String,
447        _database_id: DatabaseId,
448        _schema_id: SchemaId,
449        _owner_id: UserId,
450        _payload: Vec<u8>,
451    ) -> Result<()> {
452        unreachable!()
453    }
454
455    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
456        unreachable!()
457    }
458
459    async fn drop_table(
460        &self,
461        source_id: Option<SourceId>,
462        table_id: TableId,
463        cascade: bool,
464    ) -> Result<()> {
465        if cascade {
466            return Err(ErrorCode::NotSupported(
467                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
468                "use drop instead".to_owned(),
469            )
470            .into());
471        }
472        if let Some(source_id) = source_id {
473            self.drop_table_or_source_id(source_id.as_raw_id());
474        }
475        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
476        let indexes =
477            self.catalog
478                .read()
479                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
480        for index in indexes {
481            self.drop_index(index.id, cascade).await?;
482        }
483        self.catalog
484            .write()
485            .drop_table(database_id, schema_id, table_id);
486        if let Some(source_id) = source_id {
487            self.catalog
488                .write()
489                .drop_source(database_id, schema_id, source_id);
490        }
491        Ok(())
492    }
493
494    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
495        unreachable!()
496    }
497
498    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
499        if cascade {
500            return Err(ErrorCode::NotSupported(
501                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
502                "use drop instead".to_owned(),
503            )
504            .into());
505        }
506        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
507        let indexes =
508            self.catalog
509                .read()
510                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
511        for index in indexes {
512            self.drop_index(index.id, cascade).await?;
513        }
514        self.catalog
515            .write()
516            .drop_table(database_id, schema_id, table_id);
517        Ok(())
518    }
519
520    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
521        if cascade {
522            return Err(ErrorCode::NotSupported(
523                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
524                "use drop instead".to_owned(),
525            )
526            .into());
527        }
528        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
529        self.catalog
530            .write()
531            .drop_source(database_id, schema_id, source_id);
532        Ok(())
533    }
534
535    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
536        Ok(())
537    }
538
539    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
540        if cascade {
541            return Err(ErrorCode::NotSupported(
542                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
543                "use drop instead".to_owned(),
544            )
545            .into());
546        }
547        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
548        self.catalog
549            .write()
550            .drop_sink(database_id, schema_id, sink_id);
551        Ok(())
552    }
553
554    async fn drop_subscription(
555        &self,
556        subscription_id: SubscriptionId,
557        cascade: bool,
558    ) -> Result<()> {
559        if cascade {
560            return Err(ErrorCode::NotSupported(
561                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
562                "use drop instead".to_owned(),
563            )
564            .into());
565        }
566        let (database_id, schema_id) =
567            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
568        self.catalog
569            .write()
570            .drop_subscription(database_id, schema_id, subscription_id);
571        Ok(())
572    }
573
574    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
575        if cascade {
576            return Err(ErrorCode::NotSupported(
577                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
578                "use drop instead".to_owned(),
579            )
580            .into());
581        }
582        let &schema_id = self
583            .table_id_to_schema_id
584            .read()
585            .get(&index_id.as_raw_id())
586            .unwrap();
587        let database_id = self.get_database_id_by_schema(schema_id);
588
589        let index = {
590            let catalog_reader = self.catalog.read();
591            let schema_catalog = catalog_reader
592                .get_schema_by_id(database_id, schema_id)
593                .unwrap();
594            schema_catalog.get_index_by_id(index_id).unwrap().clone()
595        };
596
597        let index_table_id = index.index_table().id;
598        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
599        self.catalog
600            .write()
601            .drop_index(database_id, schema_id, index_id);
602        self.catalog
603            .write()
604            .drop_table(database_id, schema_id, index_table_id);
605        Ok(())
606    }
607
608    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
609        unreachable!()
610    }
611
612    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
613        unreachable!()
614    }
615
616    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
617        unreachable!()
618    }
619
620    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
621        self.catalog.write().drop_database(database_id);
622        Ok(())
623    }
624
625    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
626        let database_id = self.drop_schema_id(schema_id);
627        self.catalog.write().drop_schema(database_id, schema_id);
628        Ok(())
629    }
630
631    async fn alter_name(
632        &self,
633        object_id: alter_name_request::Object,
634        object_name: &str,
635    ) -> Result<()> {
636        match object_id {
637            alter_name_request::Object::TableId(table_id) => {
638                self.catalog
639                    .write()
640                    .alter_table_name_by_id(table_id, object_name);
641                Ok(())
642            }
643            _ => {
644                unimplemented!()
645            }
646        }
647    }
648
649    async fn alter_source(&self, source: PbSource) -> Result<()> {
650        self.catalog.write().update_source(&source);
651        Ok(())
652    }
653
654    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
655        for database in self.catalog.read().iter_databases() {
656            for schema in database.iter_schemas() {
657                match object {
658                    Object::TableId(table_id) => {
659                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
660                        {
661                            let mut pb_table = table.to_prost();
662                            pb_table.owner = owner_id;
663                            self.catalog.write().update_table(&pb_table);
664                            return Ok(());
665                        }
666                    }
667                    _ => unreachable!(),
668                }
669            }
670        }
671
672        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
673    }
674
675    async fn alter_subscription_retention(
676        &self,
677        subscription_id: SubscriptionId,
678        retention_seconds: u64,
679        definition: String,
680    ) -> Result<()> {
681        for database in self.catalog.read().iter_databases() {
682            for schema in database.iter_schemas() {
683                if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
684                    let mut pb_subscription = subscription.to_proto();
685                    pb_subscription.retention_seconds = retention_seconds;
686                    pb_subscription.definition = definition;
687                    self.catalog.write().update_subscription(&pb_subscription);
688                    return Ok(());
689                }
690            }
691        }
692
693        Err(
694            ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
695                .into(),
696        )
697    }
698
699    async fn alter_set_schema(
700        &self,
701        object: alter_set_schema_request::Object,
702        new_schema_id: SchemaId,
703    ) -> Result<()> {
704        match object {
705            alter_set_schema_request::Object::TableId(table_id) => {
706                let mut pb_table = {
707                    let reader = self.catalog.read();
708                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
709                    table.to_prost()
710                };
711                pb_table.schema_id = new_schema_id;
712                self.catalog.write().update_table(&pb_table);
713                self.table_id_to_schema_id
714                    .write()
715                    .insert(table_id.as_raw_id(), new_schema_id);
716                Ok(())
717            }
718            _ => unreachable!(),
719        }
720    }
721
722    async fn alter_parallelism(
723        &self,
724        _job_id: JobId,
725        _parallelism: PbTableParallelism,
726        _deferred: bool,
727    ) -> Result<()> {
728        todo!()
729    }
730
731    async fn alter_backfill_parallelism(
732        &self,
733        _job_id: JobId,
734        _parallelism: Option<PbTableParallelism>,
735        _deferred: bool,
736    ) -> Result<()> {
737        todo!()
738    }
739
740    async fn alter_config(
741        &self,
742        _job_id: JobId,
743        _entries_to_add: HashMap<String, String>,
744        _keys_to_remove: Vec<String>,
745    ) -> Result<()> {
746        todo!()
747    }
748
749    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
750        todo!()
751    }
752
753    async fn alter_secret(
754        &self,
755        _secret_id: SecretId,
756        _secret_name: String,
757        _database_id: DatabaseId,
758        _schema_id: SchemaId,
759        _owner_id: UserId,
760        _payload: Vec<u8>,
761    ) -> Result<()> {
762        unreachable!()
763    }
764
765    async fn alter_resource_group(
766        &self,
767        _table_id: TableId,
768        _resource_group: Option<String>,
769        _deferred: bool,
770    ) -> Result<()> {
771        todo!()
772    }
773
774    async fn alter_database_param(
775        &self,
776        database_id: DatabaseId,
777        param: AlterDatabaseParam,
778    ) -> Result<()> {
779        let mut pb_database = {
780            let reader = self.catalog.read();
781            let database = reader.get_database_by_id(database_id)?.to_owned();
782            database.to_prost()
783        };
784        match param {
785            AlterDatabaseParam::BarrierIntervalMs(interval) => {
786                pb_database.barrier_interval_ms = interval;
787            }
788            AlterDatabaseParam::CheckpointFrequency(frequency) => {
789                pb_database.checkpoint_frequency = frequency;
790            }
791        }
792        self.catalog.write().update_database(&pb_database);
793        Ok(())
794    }
795
796    async fn create_iceberg_table(
797        &self,
798        _table_job_info: PbTableJobInfo,
799        _sink_job_info: PbSinkJobInfo,
800        _iceberg_source: PbSource,
801        _if_not_exists: bool,
802    ) -> Result<()> {
803        todo!()
804    }
805
806    async fn wait(&self) -> Result<()> {
807        Ok(())
808    }
809}
810
811impl MockCatalogWriter {
812    pub fn new(
813        catalog: Arc<RwLock<Catalog>>,
814        hummock_snapshot_manager: HummockSnapshotManagerRef,
815    ) -> Self {
816        catalog.write().create_database(&PbDatabase {
817            id: 0.into(),
818            name: DEFAULT_DATABASE_NAME.to_owned(),
819            owner: DEFAULT_SUPER_USER_ID,
820            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
821            barrier_interval_ms: None,
822            checkpoint_frequency: None,
823        });
824        catalog.write().create_schema(&PbSchema {
825            id: 1.into(),
826            name: DEFAULT_SCHEMA_NAME.to_owned(),
827            database_id: 0.into(),
828            owner: DEFAULT_SUPER_USER_ID,
829        });
830        catalog.write().create_schema(&PbSchema {
831            id: 2.into(),
832            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
833            database_id: 0.into(),
834            owner: DEFAULT_SUPER_USER_ID,
835        });
836        catalog.write().create_schema(&PbSchema {
837            id: 3.into(),
838            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
839            database_id: 0.into(),
840            owner: DEFAULT_SUPER_USER_ID,
841        });
842        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
843        map.insert(1_u32.into(), 0_u32.into());
844        map.insert(2_u32.into(), 0_u32.into());
845        map.insert(3_u32.into(), 0_u32.into());
846        Self {
847            catalog,
848            id: AtomicU32::new(3),
849            table_id_to_schema_id: Default::default(),
850            schema_id_to_database_id: RwLock::new(map),
851            hummock_snapshot_manager,
852        }
853    }
854
855    fn gen_id<T: From<u32>>(&self) -> T {
856        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
857        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
858    }
859
860    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
861        self.table_id_to_schema_id
862            .write()
863            .insert(table_id, schema_id);
864    }
865
866    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
867        let schema_id = self
868            .table_id_to_schema_id
869            .write()
870            .remove(&table_id)
871            .unwrap();
872        (self.get_database_id_by_schema(schema_id), schema_id)
873    }
874
875    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
876        self.table_id_to_schema_id
877            .write()
878            .insert(table_id, schema_id);
879    }
880
881    fn add_table_or_subscription_id(
882        &self,
883        table_id: u32,
884        schema_id: SchemaId,
885        _database_id: DatabaseId,
886    ) {
887        self.table_id_to_schema_id
888            .write()
889            .insert(table_id, schema_id);
890    }
891
892    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
893        self.table_id_to_schema_id
894            .write()
895            .insert(table_id, schema_id);
896    }
897
898    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
899        let schema_id = self
900            .table_id_to_schema_id
901            .write()
902            .remove(&table_id)
903            .unwrap();
904        (self.get_database_id_by_schema(schema_id), schema_id)
905    }
906
907    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
908        let schema_id = self
909            .table_id_to_schema_id
910            .write()
911            .remove(&table_id)
912            .unwrap();
913        (self.get_database_id_by_schema(schema_id), schema_id)
914    }
915
916    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
917        let schema_id = self
918            .table_id_to_schema_id
919            .write()
920            .remove(&table_id)
921            .unwrap();
922        (self.get_database_id_by_schema(schema_id), schema_id)
923    }
924
925    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
926        self.schema_id_to_database_id
927            .write()
928            .insert(schema_id, database_id);
929    }
930
931    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
932        self.schema_id_to_database_id
933            .write()
934            .remove(&schema_id)
935            .unwrap()
936    }
937
938    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
939        source.id = self.gen_id();
940        self.catalog.write().create_source(&source);
941        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
942        Ok(source.id)
943    }
944
945    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
946        sink.id = self.gen_id();
947        sink.stream_job_status = PbStreamJobStatus::Created as _;
948        self.catalog.write().create_sink(&sink);
949        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
950        Ok(sink.id)
951    }
952
953    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
954        subscription.id = self.gen_id();
955        self.catalog.write().create_subscription(&subscription);
956        self.add_table_or_subscription_id(
957            subscription.id.as_raw_id(),
958            subscription.schema_id,
959            subscription.database_id,
960        );
961        Ok(())
962    }
963
964    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
965        *self
966            .schema_id_to_database_id
967            .read()
968            .get(&schema_id)
969            .unwrap()
970    }
971
972    fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
973        let catalog = self.catalog.read();
974        for database in catalog.iter_databases() {
975            for schema in database.iter_schemas() {
976                if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
977                    return if table.is_mview() {
978                        PbObjectType::Mview
979                    } else {
980                        PbObjectType::Table
981                    };
982                }
983                if schema.get_source_by_id(object_id.as_source_id()).is_some() {
984                    return PbObjectType::Source;
985                }
986                if schema.get_view_by_id(object_id.as_view_id()).is_some() {
987                    return PbObjectType::View;
988                }
989                if schema.get_index_by_id(object_id.as_index_id()).is_some() {
990                    return PbObjectType::Index;
991                }
992            }
993        }
994        PbObjectType::Unspecified
995    }
996
997    fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
998        if dependencies.is_empty() {
999            return;
1000        }
1001        let dependencies = dependencies
1002            .into_iter()
1003            .map(|referenced_object_id| PbObjectDependency {
1004                object_id,
1005                referenced_object_id,
1006                referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1007            })
1008            .collect();
1009        self.catalog
1010            .write()
1011            .insert_object_dependencies(dependencies);
1012    }
1013}
1014
1015pub struct MockUserInfoWriter {
1016    id: AtomicU32,
1017    user_info: Arc<RwLock<UserInfoManager>>,
1018}
1019
1020#[async_trait::async_trait]
1021impl UserInfoWriter for MockUserInfoWriter {
1022    async fn create_user(&self, user: UserInfo) -> Result<()> {
1023        let mut user = user;
1024        user.id = self.gen_id().into();
1025        self.user_info.write().create_user(user);
1026        Ok(())
1027    }
1028
1029    async fn drop_user(&self, id: UserId) -> Result<()> {
1030        self.user_info.write().drop_user(id);
1031        Ok(())
1032    }
1033
1034    async fn update_user(
1035        &self,
1036        update_user: UserInfo,
1037        update_fields: Vec<UpdateField>,
1038    ) -> Result<()> {
1039        let mut lock = self.user_info.write();
1040        let id = update_user.get_id();
1041        let Some(old_name) = lock.get_user_name_by_id(id) else {
1042            return Ok(());
1043        };
1044        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1045        update_fields.into_iter().for_each(|field| match field {
1046            UpdateField::Super => user_info.is_super = update_user.is_super,
1047            UpdateField::Login => user_info.can_login = update_user.can_login,
1048            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1049            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1050            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1051            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1052            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1053            UpdateField::Unspecified => unreachable!(),
1054        });
1055        lock.update_user(update_user);
1056        Ok(())
1057    }
1058
1059    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
1060    /// `GrantAllSources` when grant privilege to user.
1061    async fn grant_privilege(
1062        &self,
1063        users: Vec<UserId>,
1064        privileges: Vec<GrantPrivilege>,
1065        with_grant_option: bool,
1066        _grantor: UserId,
1067    ) -> Result<()> {
1068        let privileges = privileges
1069            .into_iter()
1070            .map(|mut p| {
1071                p.action_with_opts
1072                    .iter_mut()
1073                    .for_each(|ao| ao.with_grant_option = with_grant_option);
1074                p
1075            })
1076            .collect::<Vec<_>>();
1077        for user_id in users {
1078            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1079                u.extend_privileges(privileges.clone());
1080            }
1081        }
1082        Ok(())
1083    }
1084
1085    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1086    /// `RevokeAllSources` when revoke privilege from user.
1087    async fn revoke_privilege(
1088        &self,
1089        users: Vec<UserId>,
1090        privileges: Vec<GrantPrivilege>,
1091        _granted_by: UserId,
1092        _revoke_by: UserId,
1093        revoke_grant_option: bool,
1094        _cascade: bool,
1095    ) -> Result<()> {
1096        for user_id in users {
1097            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1098                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1099            }
1100        }
1101        Ok(())
1102    }
1103
1104    async fn alter_default_privilege(
1105        &self,
1106        _users: Vec<UserId>,
1107        _database_id: DatabaseId,
1108        _schemas: Vec<SchemaId>,
1109        _operation: AlterDefaultPrivilegeOperation,
1110        _operated_by: UserId,
1111    ) -> Result<()> {
1112        todo!()
1113    }
1114}
1115
1116impl MockUserInfoWriter {
1117    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1118        user_info.write().create_user(UserInfo {
1119            id: DEFAULT_SUPER_USER_ID,
1120            name: DEFAULT_SUPER_USER.to_owned(),
1121            is_super: true,
1122            can_create_db: true,
1123            can_create_user: true,
1124            can_login: true,
1125            ..Default::default()
1126        });
1127        user_info.write().create_user(UserInfo {
1128            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1129            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1130            is_super: true,
1131            can_create_db: true,
1132            can_create_user: true,
1133            can_login: true,
1134            is_admin: true,
1135            ..Default::default()
1136        });
1137        Self {
1138            user_info,
1139            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1140        }
1141    }
1142
1143    fn gen_id(&self) -> u32 {
1144        self.id.fetch_add(1, Ordering::SeqCst)
1145    }
1146}
1147
1148pub struct MockFrontendMetaClient {}
1149
1150#[async_trait::async_trait]
1151impl FrontendMetaClient for MockFrontendMetaClient {
1152    async fn try_unregister(&self) {}
1153
1154    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1155        Ok(INVALID_VERSION_ID)
1156    }
1157
1158    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1159        Ok(vec![])
1160    }
1161
1162    async fn list_table_fragments(
1163        &self,
1164        _table_ids: &[JobId],
1165    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1166        Ok(HashMap::default())
1167    }
1168
1169    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1170        Ok(vec![])
1171    }
1172
1173    async fn list_fragment_distribution(
1174        &self,
1175        _include_node: bool,
1176    ) -> RpcResult<Vec<FragmentDistribution>> {
1177        Ok(vec![])
1178    }
1179
1180    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1181        Ok(vec![])
1182    }
1183
1184    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1185        Ok(vec![])
1186    }
1187
1188    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1189        Ok(vec![])
1190    }
1191
1192    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1193        Ok(vec![])
1194    }
1195
1196    async fn list_sink_log_store_tables(
1197        &self,
1198    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1199        Ok(vec![])
1200    }
1201
1202    async fn set_system_param(
1203        &self,
1204        _param: String,
1205        _value: Option<String>,
1206    ) -> RpcResult<Option<SystemParamsReader>> {
1207        Ok(Some(SystemParams::default().into()))
1208    }
1209
1210    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1211        Ok(Default::default())
1212    }
1213
1214    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1215        Ok("".to_owned())
1216    }
1217
1218    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1219        Ok(vec![])
1220    }
1221
1222    async fn get_tables(
1223        &self,
1224        _table_ids: Vec<crate::catalog::TableId>,
1225        _include_dropped_tables: bool,
1226    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1227        Ok(HashMap::new())
1228    }
1229
1230    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1231        unimplemented!()
1232    }
1233
1234    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1235        unimplemented!()
1236    }
1237
1238    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1239        Ok(HummockVersion::default())
1240    }
1241
1242    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1243        unimplemented!()
1244    }
1245
1246    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1247        unimplemented!()
1248    }
1249
1250    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1251        unimplemented!()
1252    }
1253
1254    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1255        unimplemented!()
1256    }
1257
1258    async fn list_hummock_active_write_limits(
1259        &self,
1260    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1261        unimplemented!()
1262    }
1263
1264    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1265        unimplemented!()
1266    }
1267
1268    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1269        Ok(vec![])
1270    }
1271
1272    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1273        unimplemented!()
1274    }
1275
1276    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1277        Ok(vec![])
1278    }
1279
1280    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1281        unimplemented!()
1282    }
1283
1284    async fn recover(&self) -> RpcResult<()> {
1285        unimplemented!()
1286    }
1287
1288    async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1289        unimplemented!()
1290    }
1291
1292    async fn get_backup_job_status(
1293        &self,
1294        _job_id: u64,
1295    ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1296        unimplemented!()
1297    }
1298
1299    async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1300        unimplemented!()
1301    }
1302
1303    async fn apply_throttle(
1304        &self,
1305        _throttle_target: PbThrottleTarget,
1306        _throttle_type: risingwave_pb::common::PbThrottleType,
1307        _id: u32,
1308        _rate_limit: Option<u32>,
1309    ) -> RpcResult<()> {
1310        unimplemented!()
1311    }
1312
1313    async fn alter_fragment_parallelism(
1314        &self,
1315        _fragment_ids: Vec<FragmentId>,
1316        _parallelism: Option<PbTableParallelism>,
1317    ) -> RpcResult<()> {
1318        unimplemented!()
1319    }
1320
1321    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1322        Ok(RecoveryStatus::StatusRunning)
1323    }
1324
1325    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1326        Ok(vec![])
1327    }
1328
1329    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1330        Ok(vec![])
1331    }
1332
1333    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1334        Ok(HashMap::default())
1335    }
1336
1337    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1338        unimplemented!()
1339    }
1340
1341    async fn alter_sink_props(
1342        &self,
1343        _sink_id: SinkId,
1344        _changed_props: BTreeMap<String, String>,
1345        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1346        _connector_conn_ref: Option<ConnectionId>,
1347    ) -> RpcResult<()> {
1348        unimplemented!()
1349    }
1350
1351    async fn alter_iceberg_table_props(
1352        &self,
1353        _table_id: TableId,
1354        _sink_id: SinkId,
1355        _source_id: SourceId,
1356        _changed_props: BTreeMap<String, String>,
1357        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1358        _connector_conn_ref: Option<ConnectionId>,
1359    ) -> RpcResult<()> {
1360        unimplemented!()
1361    }
1362
1363    async fn alter_source_connector_props(
1364        &self,
1365        _source_id: SourceId,
1366        _changed_props: BTreeMap<String, String>,
1367        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1368        _connector_conn_ref: Option<ConnectionId>,
1369    ) -> RpcResult<()> {
1370        unimplemented!()
1371    }
1372
1373    async fn alter_connection_connector_props(
1374        &self,
1375        _connection_id: u32,
1376        _changed_props: BTreeMap<String, String>,
1377        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1378    ) -> RpcResult<()> {
1379        Ok(())
1380    }
1381
1382    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1383        unimplemented!()
1384    }
1385
1386    async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1387        Ok(vec![])
1388    }
1389
1390    async fn get_fragment_by_id(
1391        &self,
1392        _fragment_id: FragmentId,
1393    ) -> RpcResult<Option<FragmentDistribution>> {
1394        unimplemented!()
1395    }
1396
1397    async fn get_fragment_vnodes(
1398        &self,
1399        _fragment_id: FragmentId,
1400    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1401        unimplemented!()
1402    }
1403
1404    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1405        unimplemented!()
1406    }
1407
1408    fn worker_id(&self) -> WorkerId {
1409        0.into()
1410    }
1411
1412    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1413        Ok(())
1414    }
1415
1416    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1417        Ok(1)
1418    }
1419
1420    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1421        Ok(())
1422    }
1423
1424    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1425        Ok(RefreshResponse { status: None })
1426    }
1427
1428    fn cluster_id(&self) -> &str {
1429        "test-cluster-uuid"
1430    }
1431
1432    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1433        unimplemented!()
1434    }
1435
1436    async fn get_hummock_table_change_log(
1437        &self,
1438        _start_epoch_inclusive: Option<u64>,
1439        _end_epoch_inclusive: Option<u64>,
1440        _table_ids: Option<HashSet<TableId>>,
1441        _exclude_empty: bool,
1442        _limit: Option<u32>,
1443    ) -> RpcResult<TableChangeLogs> {
1444        Ok(HashMap::default())
1445    }
1446}
1447
1448#[cfg(test)]
1449pub static PROTO_FILE_DATA: &str = r#"
1450    syntax = "proto3";
1451    package test;
1452    message TestRecord {
1453      int32 id = 1;
1454      Country country = 3;
1455      int64 zipcode = 4;
1456      float rate = 5;
1457    }
1458    message TestRecordAlterType {
1459        string id = 1;
1460        Country country = 3;
1461        int32 zipcode = 4;
1462        float rate = 5;
1463      }
1464    message TestRecordExt {
1465      int32 id = 1;
1466      Country country = 3;
1467      int64 zipcode = 4;
1468      float rate = 5;
1469      string name = 6;
1470    }
1471    message Country {
1472      string address = 1;
1473      City city = 2;
1474      string zipcode = 3;
1475    }
1476    message City {
1477      string address = 1;
1478      string zipcode = 2;
1479    }"#;
1480
1481/// Returns the file.
1482/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1483pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1484    let in_file = Builder::new()
1485        .prefix("temp")
1486        .suffix(".proto")
1487        .rand_bytes(8)
1488        .tempfile()
1489        .unwrap();
1490
1491    let out_file = Builder::new()
1492        .prefix("temp")
1493        .suffix(".pb")
1494        .rand_bytes(8)
1495        .tempfile()
1496        .unwrap();
1497
1498    let mut file = in_file.as_file();
1499    file.write_all(proto_data.as_ref())
1500        .expect("writing binary to test file");
1501    file.flush().expect("flush temp file failed");
1502    let include_path = in_file
1503        .path()
1504        .parent()
1505        .unwrap()
1506        .to_string_lossy()
1507        .into_owned();
1508    let out_path = out_file.path().to_string_lossy().into_owned();
1509    let in_path = in_file.path().to_string_lossy().into_owned();
1510    let mut compile = std::process::Command::new("protoc");
1511
1512    let out = compile
1513        .arg("--include_imports")
1514        .arg("-I")
1515        .arg(include_path)
1516        .arg(format!("--descriptor_set_out={}", out_path))
1517        .arg(in_path)
1518        .output()
1519        .expect("failed to compile proto");
1520    if !out.status.success() {
1521        panic!("compile proto failed \n output: {:?}", out);
1522    }
1523    out_file
1524}