risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::AdaptiveParallelismStrategy;
37use risingwave_common::system_param::reader::SystemParamsReader;
38use risingwave_common::util::cluster_limit::ClusterLimit;
39use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
40use risingwave_hummock_sdk::change_log::TableChangeLogs;
41use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
42use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
43use risingwave_pb::backup_service::MetaSnapshotMetadata;
44use risingwave_pb::catalog::{
45    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
46    PbSubscription, PbTable, PbView, Table,
47};
48use risingwave_pb::common::{PbObjectType, WorkerNode};
49use risingwave_pb::ddl_service::alter_owner_request::Object;
50use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
51use risingwave_pb::ddl_service::{
52    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
53    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
54};
55use risingwave_pb::hummock::write_limits::WriteLimit;
56use risingwave_pb::hummock::{
57    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
58};
59use risingwave_pb::id::ActorId;
60use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
61use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
62use risingwave_pb::meta::list_actor_states_response::ActorState;
63use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
64use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
65use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
66use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
67use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
68use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
69use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
70use risingwave_pb::meta::{
71    EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
72    PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
73    list_sink_log_store_tables_response,
74};
75use risingwave_pb::secret::PbSecretRef;
76use risingwave_pb::stream_plan::StreamFragmentGraph;
77use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
78use risingwave_pb::user::update_user_request::UpdateField;
79use risingwave_pb::user::{GrantPrivilege, UserInfo};
80use risingwave_rpc_client::error::Result as RpcResult;
81use tempfile::{Builder, NamedTempFile};
82
83use crate::FrontendOpts;
84use crate::catalog::catalog_service::CatalogWriter;
85use crate::catalog::root_catalog::Catalog;
86use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
87use crate::error::{ErrorCode, Result, RwError};
88use crate::handler::RwPgResponse;
89use crate::meta_client::FrontendMetaClient;
90use crate::scheduler::HummockSnapshotManagerRef;
91use crate::session::{AuthContext, FrontendEnv, SessionImpl};
92use crate::user::UserId;
93use crate::user::user_manager::UserInfoManager;
94use crate::user::user_service::UserInfoWriter;
95
96/// An embedded frontend without starting meta and without starting frontend as a tcp server.
97pub struct LocalFrontend {
98    pub opts: FrontendOpts,
99    env: FrontendEnv,
100}
101
102impl SessionManager for LocalFrontend {
103    type Error = RwError;
104    type Session = SessionImpl;
105
106    fn create_dummy_session(
107        &self,
108        _database_id: DatabaseId,
109    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
110        unreachable!()
111    }
112
113    fn connect(
114        &self,
115        _database: &str,
116        _user_name: &str,
117        _peer_addr: AddressRef,
118    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
119        Ok(self.session_ref())
120    }
121
122    fn cancel_queries_in_session(&self, _session_id: SessionId) {
123        unreachable!()
124    }
125
126    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
127        unreachable!()
128    }
129
130    fn end_session(&self, _session: &Self::Session) {
131        unreachable!()
132    }
133}
134
135impl LocalFrontend {
136    #[expect(clippy::unused_async)]
137    pub async fn new(opts: FrontendOpts) -> Self {
138        let env = FrontendEnv::mock();
139        Self { opts, env }
140    }
141
142    pub async fn run_sql(
143        &self,
144        sql: impl Into<String>,
145    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
146        let sql: Arc<str> = Arc::from(sql.into());
147        Box::pin(self.session_ref().run_statement(sql, vec![])).await
148    }
149
150    pub async fn run_sql_with_session(
151        &self,
152        session_ref: Arc<SessionImpl>,
153        sql: impl Into<String>,
154    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
155        let sql: Arc<str> = Arc::from(sql.into());
156        Box::pin(session_ref.run_statement(sql, vec![])).await
157    }
158
159    pub async fn run_user_sql(
160        &self,
161        sql: impl Into<String>,
162        database: String,
163        user_name: String,
164        user_id: UserId,
165    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
166        let sql: Arc<str> = Arc::from(sql.into());
167        Box::pin(
168            self.session_user_ref(database, user_name, user_id)
169                .run_statement(sql, vec![]),
170        )
171        .await
172    }
173
174    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
175        let mut rsp = self.run_sql(sql).await.unwrap();
176        let mut res = vec![];
177        #[for_await]
178        for row_set in rsp.values_stream() {
179            for row in row_set.unwrap() {
180                res.push(format!("{:?}", row));
181            }
182        }
183        res
184    }
185
186    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
187        let mut rsp = self.run_sql(sql).await.unwrap();
188        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
189        let mut res = String::new();
190        #[for_await]
191        for row_set in rsp.values_stream() {
192            for row in row_set.unwrap() {
193                let row: Row = row;
194                let row = row.values()[0].as_ref().unwrap();
195                res += std::str::from_utf8(row).unwrap();
196                res += "\n";
197            }
198        }
199        res
200    }
201
202    /// Creates a new session
203    pub fn session_ref(&self) -> Arc<SessionImpl> {
204        self.session_user_ref(
205            DEFAULT_DATABASE_NAME.to_owned(),
206            DEFAULT_SUPER_USER.to_owned(),
207            DEFAULT_SUPER_USER_ID,
208        )
209    }
210
211    pub fn session_user_ref(
212        &self,
213        database: String,
214        user_name: String,
215        user_id: UserId,
216    ) -> Arc<SessionImpl> {
217        Arc::new(SessionImpl::new(
218            self.env.clone(),
219            AuthContext::new(database, user_name, user_id),
220            UserAuthenticator::None,
221            // Local Frontend use a non-sense id.
222            (0, 0),
223            Address::Tcp(SocketAddr::new(
224                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
225                6666,
226            ))
227            .into(),
228            Default::default(),
229        ))
230    }
231}
232
233pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
234    if rsp.stmt_type() != StatementType::EXPLAIN {
235        panic!("RESPONSE INVALID: {rsp:?}");
236    }
237    let mut res = String::new();
238    #[for_await]
239    for row_set in rsp.values_stream() {
240        for row in row_set.unwrap() {
241            let row: Row = row;
242            let row = row.values()[0].as_ref().unwrap();
243            res += std::str::from_utf8(row).unwrap();
244            res += "\n";
245        }
246    }
247    res
248}
249
250pub struct MockCatalogWriter {
251    catalog: Arc<RwLock<Catalog>>,
252    id: AtomicU32,
253    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
254    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
255    hummock_snapshot_manager: HummockSnapshotManagerRef,
256}
257
258#[async_trait::async_trait]
259impl CatalogWriter for MockCatalogWriter {
260    async fn create_database(
261        &self,
262        db_name: &str,
263        owner: UserId,
264        resource_group: &str,
265        barrier_interval_ms: Option<u32>,
266        checkpoint_frequency: Option<u64>,
267    ) -> Result<()> {
268        let database_id = DatabaseId::new(self.gen_id());
269        self.catalog.write().create_database(&PbDatabase {
270            name: db_name.to_owned(),
271            id: database_id,
272            owner,
273            resource_group: resource_group.to_owned(),
274            barrier_interval_ms,
275            checkpoint_frequency,
276        });
277        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
278            .await?;
279        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
280            .await?;
281        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
282            .await?;
283        Ok(())
284    }
285
286    async fn create_schema(
287        &self,
288        db_id: DatabaseId,
289        schema_name: &str,
290        owner: UserId,
291    ) -> Result<()> {
292        let id = self.gen_id();
293        self.catalog.write().create_schema(&PbSchema {
294            id,
295            name: schema_name.to_owned(),
296            database_id: db_id,
297            owner,
298        });
299        self.add_schema_id(id, db_id);
300        Ok(())
301    }
302
303    async fn create_materialized_view(
304        &self,
305        mut table: PbTable,
306        _graph: StreamFragmentGraph,
307        dependencies: HashSet<ObjectId>,
308        _resource_type: streaming_job_resource_type::ResourceType,
309        _if_not_exists: bool,
310    ) -> Result<()> {
311        table.id = self.gen_id();
312        table.stream_job_status = PbStreamJobStatus::Created as _;
313        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
314        self.catalog.write().create_table(&table);
315        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
316        self.insert_object_dependencies(table.id.as_object_id(), dependencies);
317        self.hummock_snapshot_manager.add_table_for_test(table.id);
318        Ok(())
319    }
320
321    async fn replace_materialized_view(
322        &self,
323        mut table: PbTable,
324        _graph: StreamFragmentGraph,
325    ) -> Result<()> {
326        table.stream_job_status = PbStreamJobStatus::Created as _;
327        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
328        self.catalog.write().update_table(&table);
329        Ok(())
330    }
331
332    async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
333        view.id = self.gen_id();
334        self.catalog.write().create_view(&view);
335        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
336        self.insert_object_dependencies(view.id.as_object_id(), dependencies);
337        Ok(())
338    }
339
340    async fn create_table(
341        &self,
342        source: Option<PbSource>,
343        mut table: PbTable,
344        graph: StreamFragmentGraph,
345        _job_type: PbTableJobType,
346        if_not_exists: bool,
347        dependencies: HashSet<ObjectId>,
348    ) -> Result<()> {
349        if let Some(source) = source {
350            let source_id = self.create_source_inner(source)?;
351            table.optional_associated_source_id = Some(source_id.into());
352        }
353        self.create_materialized_view(
354            table,
355            graph,
356            dependencies,
357            streaming_job_resource_type::ResourceType::Regular(true),
358            if_not_exists,
359        )
360        .await?;
361        Ok(())
362    }
363
364    async fn replace_table(
365        &self,
366        _source: Option<PbSource>,
367        mut table: PbTable,
368        _graph: StreamFragmentGraph,
369        _job_type: TableJobType,
370    ) -> Result<()> {
371        table.stream_job_status = PbStreamJobStatus::Created as _;
372        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
373        self.catalog.write().update_table(&table);
374        Ok(())
375    }
376
377    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
378        self.catalog.write().update_source(&source);
379        Ok(())
380    }
381
382    async fn create_source(
383        &self,
384        source: PbSource,
385        _graph: Option<StreamFragmentGraph>,
386        _if_not_exists: bool,
387    ) -> Result<()> {
388        self.create_source_inner(source).map(|_| ())
389    }
390
391    async fn create_sink(
392        &self,
393        sink: PbSink,
394        graph: StreamFragmentGraph,
395        dependencies: HashSet<ObjectId>,
396        _if_not_exists: bool,
397    ) -> Result<()> {
398        let sink_id = self.create_sink_inner(sink, graph)?;
399        self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
400        Ok(())
401    }
402
403    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
404        self.create_subscription_inner(subscription)
405    }
406
407    async fn create_index(
408        &self,
409        mut index: PbIndex,
410        mut index_table: PbTable,
411        _graph: StreamFragmentGraph,
412        _if_not_exists: bool,
413    ) -> Result<()> {
414        index_table.id = self.gen_id();
415        index_table.stream_job_status = PbStreamJobStatus::Created as _;
416        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
417        self.catalog.write().create_table(&index_table);
418        self.add_table_or_index_id(
419            index_table.id.as_raw_id(),
420            index_table.schema_id,
421            index_table.database_id,
422        );
423
424        index.id = index_table.id.as_raw_id().into();
425        index.index_table_id = index_table.id;
426        self.catalog.write().create_index(&index);
427        Ok(())
428    }
429
430    async fn create_function(&self, _function: PbFunction) -> Result<()> {
431        unreachable!()
432    }
433
434    async fn create_connection(
435        &self,
436        _connection_name: String,
437        _database_id: DatabaseId,
438        _schema_id: SchemaId,
439        _owner_id: UserId,
440        _connection: create_connection_request::Payload,
441    ) -> Result<()> {
442        unreachable!()
443    }
444
445    async fn create_secret(
446        &self,
447        _secret_name: String,
448        _database_id: DatabaseId,
449        _schema_id: SchemaId,
450        _owner_id: UserId,
451        _payload: Vec<u8>,
452    ) -> Result<()> {
453        unreachable!()
454    }
455
456    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
457        unreachable!()
458    }
459
460    async fn drop_table(
461        &self,
462        source_id: Option<SourceId>,
463        table_id: TableId,
464        cascade: bool,
465    ) -> Result<()> {
466        if cascade {
467            return Err(ErrorCode::NotSupported(
468                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
469                "use drop instead".to_owned(),
470            )
471            .into());
472        }
473        if let Some(source_id) = source_id {
474            self.drop_table_or_source_id(source_id.as_raw_id());
475        }
476        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
477        let indexes =
478            self.catalog
479                .read()
480                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
481        for index in indexes {
482            self.drop_index(index.id, cascade).await?;
483        }
484        self.catalog
485            .write()
486            .drop_table(database_id, schema_id, table_id);
487        if let Some(source_id) = source_id {
488            self.catalog
489                .write()
490                .drop_source(database_id, schema_id, source_id);
491        }
492        Ok(())
493    }
494
495    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
496        unreachable!()
497    }
498
499    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
500        if cascade {
501            return Err(ErrorCode::NotSupported(
502                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
503                "use drop instead".to_owned(),
504            )
505            .into());
506        }
507        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
508        let indexes =
509            self.catalog
510                .read()
511                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
512        for index in indexes {
513            self.drop_index(index.id, cascade).await?;
514        }
515        self.catalog
516            .write()
517            .drop_table(database_id, schema_id, table_id);
518        Ok(())
519    }
520
521    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
522        if cascade {
523            return Err(ErrorCode::NotSupported(
524                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
525                "use drop instead".to_owned(),
526            )
527            .into());
528        }
529        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
530        self.catalog
531            .write()
532            .drop_source(database_id, schema_id, source_id);
533        Ok(())
534    }
535
536    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
537        Ok(())
538    }
539
540    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
541        if cascade {
542            return Err(ErrorCode::NotSupported(
543                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
544                "use drop instead".to_owned(),
545            )
546            .into());
547        }
548        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
549        self.catalog
550            .write()
551            .drop_sink(database_id, schema_id, sink_id);
552        Ok(())
553    }
554
555    async fn drop_subscription(
556        &self,
557        subscription_id: SubscriptionId,
558        cascade: bool,
559    ) -> Result<()> {
560        if cascade {
561            return Err(ErrorCode::NotSupported(
562                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
563                "use drop instead".to_owned(),
564            )
565            .into());
566        }
567        let (database_id, schema_id) =
568            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
569        self.catalog
570            .write()
571            .drop_subscription(database_id, schema_id, subscription_id);
572        Ok(())
573    }
574
575    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
576        if cascade {
577            return Err(ErrorCode::NotSupported(
578                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
579                "use drop instead".to_owned(),
580            )
581            .into());
582        }
583        let &schema_id = self
584            .table_id_to_schema_id
585            .read()
586            .get(&index_id.as_raw_id())
587            .unwrap();
588        let database_id = self.get_database_id_by_schema(schema_id);
589
590        let index = {
591            let catalog_reader = self.catalog.read();
592            let schema_catalog = catalog_reader
593                .get_schema_by_id(database_id, schema_id)
594                .unwrap();
595            schema_catalog.get_index_by_id(index_id).unwrap().clone()
596        };
597
598        let index_table_id = index.index_table().id;
599        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
600        self.catalog
601            .write()
602            .drop_index(database_id, schema_id, index_id);
603        self.catalog
604            .write()
605            .drop_table(database_id, schema_id, index_table_id);
606        Ok(())
607    }
608
609    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
610        unreachable!()
611    }
612
613    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
614        unreachable!()
615    }
616
617    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
618        unreachable!()
619    }
620
621    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
622        self.catalog.write().drop_database(database_id);
623        Ok(())
624    }
625
626    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
627        let database_id = self.drop_schema_id(schema_id);
628        self.catalog.write().drop_schema(database_id, schema_id);
629        Ok(())
630    }
631
632    async fn alter_name(
633        &self,
634        object_id: alter_name_request::Object,
635        object_name: &str,
636    ) -> Result<()> {
637        match object_id {
638            alter_name_request::Object::TableId(table_id) => {
639                self.catalog
640                    .write()
641                    .alter_table_name_by_id(table_id, object_name);
642                Ok(())
643            }
644            _ => {
645                unimplemented!()
646            }
647        }
648    }
649
650    async fn alter_source(&self, source: PbSource) -> Result<()> {
651        self.catalog.write().update_source(&source);
652        Ok(())
653    }
654
655    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
656        for database in self.catalog.read().iter_databases() {
657            for schema in database.iter_schemas() {
658                match object {
659                    Object::TableId(table_id) => {
660                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
661                        {
662                            let mut pb_table = table.to_prost();
663                            pb_table.owner = owner_id;
664                            self.catalog.write().update_table(&pb_table);
665                            return Ok(());
666                        }
667                    }
668                    _ => unreachable!(),
669                }
670            }
671        }
672
673        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
674    }
675
676    async fn alter_subscription_retention(
677        &self,
678        subscription_id: SubscriptionId,
679        retention_seconds: u64,
680        definition: String,
681    ) -> Result<()> {
682        for database in self.catalog.read().iter_databases() {
683            for schema in database.iter_schemas() {
684                if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
685                    let mut pb_subscription = subscription.to_proto();
686                    pb_subscription.retention_seconds = retention_seconds;
687                    pb_subscription.definition = definition;
688                    self.catalog.write().update_subscription(&pb_subscription);
689                    return Ok(());
690                }
691            }
692        }
693
694        Err(
695            ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
696                .into(),
697        )
698    }
699
700    async fn alter_set_schema(
701        &self,
702        object: alter_set_schema_request::Object,
703        new_schema_id: SchemaId,
704    ) -> Result<()> {
705        match object {
706            alter_set_schema_request::Object::TableId(table_id) => {
707                let mut pb_table = {
708                    let reader = self.catalog.read();
709                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
710                    table.to_prost()
711                };
712                pb_table.schema_id = new_schema_id;
713                self.catalog.write().update_table(&pb_table);
714                self.table_id_to_schema_id
715                    .write()
716                    .insert(table_id.as_raw_id(), new_schema_id);
717                Ok(())
718            }
719            _ => unreachable!(),
720        }
721    }
722
723    async fn alter_parallelism(
724        &self,
725        _job_id: JobId,
726        _parallelism: PbTableParallelism,
727        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
728        _deferred: bool,
729    ) -> Result<()> {
730        todo!()
731    }
732
733    async fn alter_backfill_parallelism(
734        &self,
735        _job_id: JobId,
736        _parallelism: Option<PbTableParallelism>,
737        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
738        _deferred: bool,
739    ) -> Result<()> {
740        todo!()
741    }
742
743    async fn alter_config(
744        &self,
745        _job_id: JobId,
746        _entries_to_add: HashMap<String, String>,
747        _keys_to_remove: Vec<String>,
748    ) -> Result<()> {
749        todo!()
750    }
751
752    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
753        todo!()
754    }
755
756    async fn alter_secret(
757        &self,
758        _secret_id: SecretId,
759        _secret_name: String,
760        _database_id: DatabaseId,
761        _schema_id: SchemaId,
762        _owner_id: UserId,
763        _payload: Vec<u8>,
764    ) -> Result<()> {
765        unreachable!()
766    }
767
768    async fn alter_resource_group(
769        &self,
770        _table_id: TableId,
771        _resource_group: Option<String>,
772        _deferred: bool,
773    ) -> Result<()> {
774        todo!()
775    }
776
777    async fn alter_database_param(
778        &self,
779        database_id: DatabaseId,
780        param: AlterDatabaseParam,
781    ) -> Result<()> {
782        let mut pb_database = {
783            let reader = self.catalog.read();
784            let database = reader.get_database_by_id(database_id)?.to_owned();
785            database.to_prost()
786        };
787        match param {
788            AlterDatabaseParam::BarrierIntervalMs(interval) => {
789                pb_database.barrier_interval_ms = interval;
790            }
791            AlterDatabaseParam::CheckpointFrequency(frequency) => {
792                pb_database.checkpoint_frequency = frequency;
793            }
794        }
795        self.catalog.write().update_database(&pb_database);
796        Ok(())
797    }
798
799    async fn create_iceberg_table(
800        &self,
801        _table_job_info: PbTableJobInfo,
802        _sink_job_info: PbSinkJobInfo,
803        _iceberg_source: PbSource,
804        _if_not_exists: bool,
805    ) -> Result<()> {
806        todo!()
807    }
808
809    async fn wait(&self, _job_id: Option<JobId>) -> Result<()> {
810        Ok(())
811    }
812}
813
814impl MockCatalogWriter {
815    pub fn new(
816        catalog: Arc<RwLock<Catalog>>,
817        hummock_snapshot_manager: HummockSnapshotManagerRef,
818    ) -> Self {
819        catalog.write().create_database(&PbDatabase {
820            id: 0.into(),
821            name: DEFAULT_DATABASE_NAME.to_owned(),
822            owner: DEFAULT_SUPER_USER_ID,
823            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
824            barrier_interval_ms: None,
825            checkpoint_frequency: None,
826        });
827        catalog.write().create_schema(&PbSchema {
828            id: 1.into(),
829            name: DEFAULT_SCHEMA_NAME.to_owned(),
830            database_id: 0.into(),
831            owner: DEFAULT_SUPER_USER_ID,
832        });
833        catalog.write().create_schema(&PbSchema {
834            id: 2.into(),
835            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
836            database_id: 0.into(),
837            owner: DEFAULT_SUPER_USER_ID,
838        });
839        catalog.write().create_schema(&PbSchema {
840            id: 3.into(),
841            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
842            database_id: 0.into(),
843            owner: DEFAULT_SUPER_USER_ID,
844        });
845        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
846        map.insert(1_u32.into(), 0_u32.into());
847        map.insert(2_u32.into(), 0_u32.into());
848        map.insert(3_u32.into(), 0_u32.into());
849        Self {
850            catalog,
851            id: AtomicU32::new(3),
852            table_id_to_schema_id: Default::default(),
853            schema_id_to_database_id: RwLock::new(map),
854            hummock_snapshot_manager,
855        }
856    }
857
858    fn gen_id<T: From<u32>>(&self) -> T {
859        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
860        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
861    }
862
863    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
864        self.table_id_to_schema_id
865            .write()
866            .insert(table_id, schema_id);
867    }
868
869    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
870        let schema_id = self
871            .table_id_to_schema_id
872            .write()
873            .remove(&table_id)
874            .unwrap();
875        (self.get_database_id_by_schema(schema_id), schema_id)
876    }
877
878    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
879        self.table_id_to_schema_id
880            .write()
881            .insert(table_id, schema_id);
882    }
883
884    fn add_table_or_subscription_id(
885        &self,
886        table_id: u32,
887        schema_id: SchemaId,
888        _database_id: DatabaseId,
889    ) {
890        self.table_id_to_schema_id
891            .write()
892            .insert(table_id, schema_id);
893    }
894
895    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
896        self.table_id_to_schema_id
897            .write()
898            .insert(table_id, schema_id);
899    }
900
901    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
902        let schema_id = self
903            .table_id_to_schema_id
904            .write()
905            .remove(&table_id)
906            .unwrap();
907        (self.get_database_id_by_schema(schema_id), schema_id)
908    }
909
910    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
911        let schema_id = self
912            .table_id_to_schema_id
913            .write()
914            .remove(&table_id)
915            .unwrap();
916        (self.get_database_id_by_schema(schema_id), schema_id)
917    }
918
919    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
920        let schema_id = self
921            .table_id_to_schema_id
922            .write()
923            .remove(&table_id)
924            .unwrap();
925        (self.get_database_id_by_schema(schema_id), schema_id)
926    }
927
928    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
929        self.schema_id_to_database_id
930            .write()
931            .insert(schema_id, database_id);
932    }
933
934    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
935        self.schema_id_to_database_id
936            .write()
937            .remove(&schema_id)
938            .unwrap()
939    }
940
941    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
942        source.id = self.gen_id();
943        self.catalog.write().create_source(&source);
944        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
945        Ok(source.id)
946    }
947
948    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
949        sink.id = self.gen_id();
950        sink.stream_job_status = PbStreamJobStatus::Created as _;
951        self.catalog.write().create_sink(&sink);
952        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
953        Ok(sink.id)
954    }
955
956    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
957        subscription.id = self.gen_id();
958        self.catalog.write().create_subscription(&subscription);
959        self.add_table_or_subscription_id(
960            subscription.id.as_raw_id(),
961            subscription.schema_id,
962            subscription.database_id,
963        );
964        Ok(())
965    }
966
967    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
968        *self
969            .schema_id_to_database_id
970            .read()
971            .get(&schema_id)
972            .unwrap()
973    }
974
975    fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
976        let catalog = self.catalog.read();
977        for database in catalog.iter_databases() {
978            for schema in database.iter_schemas() {
979                if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
980                    return if table.is_mview() {
981                        PbObjectType::Mview
982                    } else {
983                        PbObjectType::Table
984                    };
985                }
986                if schema.get_source_by_id(object_id.as_source_id()).is_some() {
987                    return PbObjectType::Source;
988                }
989                if schema.get_view_by_id(object_id.as_view_id()).is_some() {
990                    return PbObjectType::View;
991                }
992                if schema.get_index_by_id(object_id.as_index_id()).is_some() {
993                    return PbObjectType::Index;
994                }
995            }
996        }
997        PbObjectType::Unspecified
998    }
999
1000    fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
1001        if dependencies.is_empty() {
1002            return;
1003        }
1004        let dependencies = dependencies
1005            .into_iter()
1006            .map(|referenced_object_id| PbObjectDependency {
1007                object_id,
1008                referenced_object_id,
1009                referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1010            })
1011            .collect();
1012        self.catalog
1013            .write()
1014            .insert_object_dependencies(dependencies);
1015    }
1016}
1017
1018pub struct MockUserInfoWriter {
1019    id: AtomicU32,
1020    user_info: Arc<RwLock<UserInfoManager>>,
1021}
1022
1023#[async_trait::async_trait]
1024impl UserInfoWriter for MockUserInfoWriter {
1025    async fn create_user(&self, user: UserInfo) -> Result<()> {
1026        let mut user = user;
1027        user.id = self.gen_id().into();
1028        self.user_info.write().create_user(user);
1029        Ok(())
1030    }
1031
1032    async fn drop_user(&self, id: UserId) -> Result<()> {
1033        self.user_info.write().drop_user(id);
1034        Ok(())
1035    }
1036
1037    async fn update_user(
1038        &self,
1039        update_user: UserInfo,
1040        update_fields: Vec<UpdateField>,
1041    ) -> Result<()> {
1042        let mut lock = self.user_info.write();
1043        let id = update_user.get_id();
1044        let Some(old_name) = lock.get_user_name_by_id(id) else {
1045            return Ok(());
1046        };
1047        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1048        update_fields.into_iter().for_each(|field| match field {
1049            UpdateField::Super => user_info.is_super = update_user.is_super,
1050            UpdateField::Login => user_info.can_login = update_user.can_login,
1051            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1052            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1053            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1054            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1055            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1056            UpdateField::Unspecified => unreachable!(),
1057        });
1058        lock.update_user(update_user);
1059        Ok(())
1060    }
1061
1062    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
1063    /// `GrantAllSources` when grant privilege to user.
1064    async fn grant_privilege(
1065        &self,
1066        users: Vec<UserId>,
1067        privileges: Vec<GrantPrivilege>,
1068        with_grant_option: bool,
1069        _grantor: UserId,
1070    ) -> Result<()> {
1071        let privileges = privileges
1072            .into_iter()
1073            .map(|mut p| {
1074                p.action_with_opts
1075                    .iter_mut()
1076                    .for_each(|ao| ao.with_grant_option = with_grant_option);
1077                p
1078            })
1079            .collect::<Vec<_>>();
1080        for user_id in users {
1081            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1082                u.extend_privileges(privileges.clone());
1083            }
1084        }
1085        Ok(())
1086    }
1087
1088    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1089    /// `RevokeAllSources` when revoke privilege from user.
1090    async fn revoke_privilege(
1091        &self,
1092        users: Vec<UserId>,
1093        privileges: Vec<GrantPrivilege>,
1094        _granted_by: UserId,
1095        _revoke_by: UserId,
1096        revoke_grant_option: bool,
1097        _cascade: bool,
1098    ) -> Result<()> {
1099        for user_id in users {
1100            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1101                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1102            }
1103        }
1104        Ok(())
1105    }
1106
1107    async fn alter_default_privilege(
1108        &self,
1109        _users: Vec<UserId>,
1110        _database_id: DatabaseId,
1111        _schemas: Vec<SchemaId>,
1112        _operation: AlterDefaultPrivilegeOperation,
1113        _operated_by: UserId,
1114    ) -> Result<()> {
1115        todo!()
1116    }
1117}
1118
1119impl MockUserInfoWriter {
1120    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1121        user_info.write().create_user(UserInfo {
1122            id: DEFAULT_SUPER_USER_ID,
1123            name: DEFAULT_SUPER_USER.to_owned(),
1124            is_super: true,
1125            can_create_db: true,
1126            can_create_user: true,
1127            can_login: true,
1128            ..Default::default()
1129        });
1130        user_info.write().create_user(UserInfo {
1131            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1132            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1133            is_super: true,
1134            can_create_db: true,
1135            can_create_user: true,
1136            can_login: true,
1137            is_admin: true,
1138            ..Default::default()
1139        });
1140        Self {
1141            user_info,
1142            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1143        }
1144    }
1145
1146    fn gen_id(&self) -> u32 {
1147        self.id.fetch_add(1, Ordering::SeqCst)
1148    }
1149}
1150
1151pub struct MockFrontendMetaClient {}
1152
1153#[async_trait::async_trait]
1154impl FrontendMetaClient for MockFrontendMetaClient {
1155    async fn try_unregister(&self) {}
1156
1157    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1158        Ok(INVALID_VERSION_ID)
1159    }
1160
1161    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1162        Ok(vec![])
1163    }
1164
1165    async fn list_table_fragments(
1166        &self,
1167        _table_ids: &[JobId],
1168    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1169        Ok(HashMap::default())
1170    }
1171
1172    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1173        Ok(vec![])
1174    }
1175
1176    async fn list_fragment_distribution(
1177        &self,
1178        _include_node: bool,
1179    ) -> RpcResult<Vec<FragmentDistribution>> {
1180        Ok(vec![])
1181    }
1182
1183    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1184        Ok(vec![])
1185    }
1186
1187    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1188        Ok(vec![])
1189    }
1190
1191    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1192        Ok(vec![])
1193    }
1194
1195    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1196        Ok(vec![])
1197    }
1198
1199    async fn list_sink_log_store_tables(
1200        &self,
1201    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1202        Ok(vec![])
1203    }
1204
1205    async fn set_system_param(
1206        &self,
1207        _param: String,
1208        _value: Option<String>,
1209    ) -> RpcResult<Option<SystemParamsReader>> {
1210        Ok(Some(SystemParams::default().into()))
1211    }
1212
1213    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1214        Ok(Default::default())
1215    }
1216
1217    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1218        Ok("".to_owned())
1219    }
1220
1221    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1222        Ok(vec![])
1223    }
1224
1225    async fn get_tables(
1226        &self,
1227        _table_ids: Vec<crate::catalog::TableId>,
1228        _include_dropped_tables: bool,
1229    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1230        Ok(HashMap::new())
1231    }
1232
1233    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1234        unimplemented!()
1235    }
1236
1237    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1238        unimplemented!()
1239    }
1240
1241    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1242        Ok(HummockVersion::default())
1243    }
1244
1245    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1246        unimplemented!()
1247    }
1248
1249    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1250        unimplemented!()
1251    }
1252
1253    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1254        unimplemented!()
1255    }
1256
1257    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1258        unimplemented!()
1259    }
1260
1261    async fn list_hummock_active_write_limits(
1262        &self,
1263    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1264        unimplemented!()
1265    }
1266
1267    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1268        unimplemented!()
1269    }
1270
1271    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1272        Ok(vec![])
1273    }
1274
1275    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1276        unimplemented!()
1277    }
1278
1279    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1280        Ok(vec![])
1281    }
1282
1283    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1284        unimplemented!()
1285    }
1286
1287    async fn recover(&self) -> RpcResult<()> {
1288        unimplemented!()
1289    }
1290
1291    async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1292        unimplemented!()
1293    }
1294
1295    async fn get_backup_job_status(
1296        &self,
1297        _job_id: u64,
1298    ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1299        unimplemented!()
1300    }
1301
1302    async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1303        unimplemented!()
1304    }
1305
1306    async fn apply_throttle(
1307        &self,
1308        _throttle_target: PbThrottleTarget,
1309        _throttle_type: risingwave_pb::common::PbThrottleType,
1310        _id: u32,
1311        _rate_limit: Option<u32>,
1312    ) -> RpcResult<()> {
1313        unimplemented!()
1314    }
1315
1316    async fn alter_fragment_parallelism(
1317        &self,
1318        _fragment_ids: Vec<FragmentId>,
1319        _parallelism: Option<PbTableParallelism>,
1320    ) -> RpcResult<()> {
1321        unimplemented!()
1322    }
1323
1324    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1325        Ok(RecoveryStatus::StatusRunning)
1326    }
1327
1328    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1329        Ok(vec![])
1330    }
1331
1332    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1333        Ok(vec![])
1334    }
1335
1336    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1337        Ok(HashMap::default())
1338    }
1339
1340    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1341        unimplemented!()
1342    }
1343
1344    async fn alter_sink_props(
1345        &self,
1346        _sink_id: SinkId,
1347        _changed_props: BTreeMap<String, String>,
1348        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1349        _connector_conn_ref: Option<ConnectionId>,
1350    ) -> RpcResult<()> {
1351        unimplemented!()
1352    }
1353
1354    async fn alter_iceberg_table_props(
1355        &self,
1356        _table_id: TableId,
1357        _sink_id: SinkId,
1358        _source_id: SourceId,
1359        _changed_props: BTreeMap<String, String>,
1360        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1361        _connector_conn_ref: Option<ConnectionId>,
1362    ) -> RpcResult<()> {
1363        unimplemented!()
1364    }
1365
1366    async fn alter_source_connector_props(
1367        &self,
1368        _source_id: SourceId,
1369        _changed_props: BTreeMap<String, String>,
1370        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1371        _connector_conn_ref: Option<ConnectionId>,
1372    ) -> RpcResult<()> {
1373        unimplemented!()
1374    }
1375
1376    async fn alter_connection_connector_props(
1377        &self,
1378        _connection_id: u32,
1379        _changed_props: BTreeMap<String, String>,
1380        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1381    ) -> RpcResult<()> {
1382        Ok(())
1383    }
1384
1385    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1386        unimplemented!()
1387    }
1388
1389    async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1390        Ok(vec![])
1391    }
1392
1393    async fn get_fragment_by_id(
1394        &self,
1395        _fragment_id: FragmentId,
1396    ) -> RpcResult<Option<FragmentDistribution>> {
1397        unimplemented!()
1398    }
1399
1400    async fn get_fragment_vnodes(
1401        &self,
1402        _fragment_id: FragmentId,
1403    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1404        unimplemented!()
1405    }
1406
1407    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1408        unimplemented!()
1409    }
1410
1411    fn worker_id(&self) -> WorkerId {
1412        0.into()
1413    }
1414
1415    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1416        Ok(())
1417    }
1418
1419    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1420        Ok(1)
1421    }
1422
1423    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1424        Ok(())
1425    }
1426
1427    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1428        Ok(RefreshResponse { status: None })
1429    }
1430
1431    fn cluster_id(&self) -> &str {
1432        "test-cluster-uuid"
1433    }
1434
1435    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1436        unimplemented!()
1437    }
1438
1439    async fn get_hummock_table_change_log(
1440        &self,
1441        _start_epoch_inclusive: Option<u64>,
1442        _end_epoch_inclusive: Option<u64>,
1443        _table_ids: Option<HashSet<TableId>>,
1444        _exclude_empty: bool,
1445        _limit: Option<u32>,
1446    ) -> RpcResult<TableChangeLogs> {
1447        Ok(HashMap::default())
1448    }
1449}
1450
1451#[cfg(test)]
1452pub static PROTO_FILE_DATA: &str = r#"
1453    syntax = "proto3";
1454    package test;
1455    message TestRecord {
1456      int32 id = 1;
1457      Country country = 3;
1458      int64 zipcode = 4;
1459      float rate = 5;
1460    }
1461    message TestRecordAlterType {
1462        string id = 1;
1463        Country country = 3;
1464        int32 zipcode = 4;
1465        float rate = 5;
1466      }
1467    message TestRecordExt {
1468      int32 id = 1;
1469      Country country = 3;
1470      int64 zipcode = 4;
1471      float rate = 5;
1472      string name = 6;
1473    }
1474    message Country {
1475      string address = 1;
1476      City city = 2;
1477      string zipcode = 3;
1478    }
1479    message City {
1480      string address = 1;
1481      string zipcode = 2;
1482    }"#;
1483
1484/// Returns the file.
1485/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1486pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1487    let in_file = Builder::new()
1488        .prefix("temp")
1489        .suffix(".proto")
1490        .rand_bytes(8)
1491        .tempfile()
1492        .unwrap();
1493
1494    let out_file = Builder::new()
1495        .prefix("temp")
1496        .suffix(".pb")
1497        .rand_bytes(8)
1498        .tempfile()
1499        .unwrap();
1500
1501    let mut file = in_file.as_file();
1502    file.write_all(proto_data.as_ref())
1503        .expect("writing binary to test file");
1504    file.flush().expect("flush temp file failed");
1505    let include_path = in_file
1506        .path()
1507        .parent()
1508        .unwrap()
1509        .to_string_lossy()
1510        .into_owned();
1511    let out_path = out_file.path().to_string_lossy().into_owned();
1512    let in_path = in_file.path().to_string_lossy().into_owned();
1513    let mut compile = std::process::Command::new("protoc");
1514
1515    let out = compile
1516        .arg("--include_imports")
1517        .arg("-I")
1518        .arg(include_path)
1519        .arg(format!("--descriptor_set_out={}", out_path))
1520        .arg(in_path)
1521        .output()
1522        .expect("failed to compile proto");
1523    if !out.status.success() {
1524        panic!("compile proto failed \n output: {:?}", out);
1525    }
1526    out_file
1527}