risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::AdaptiveParallelismStrategy;
37use risingwave_common::system_param::reader::SystemParamsReader;
38use risingwave_common::util::cluster_limit::ClusterLimit;
39use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
40use risingwave_hummock_sdk::change_log::TableChangeLogs;
41use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
42use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
43use risingwave_pb::backup_service::MetaSnapshotMetadata;
44use risingwave_pb::catalog::{
45    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
46    PbSubscription, PbTable, PbView, Table,
47};
48use risingwave_pb::common::{PbObjectType, WorkerNode};
49use risingwave_pb::ddl_service::alter_owner_request::Object;
50use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
51use risingwave_pb::ddl_service::{
52    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
53    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
54};
55use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig as PbMutableConfig;
56use risingwave_pb::hummock::write_limits::WriteLimit;
57use risingwave_pb::hummock::{
58    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
59};
60use risingwave_pb::id::ActorId;
61use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
62use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
63use risingwave_pb::meta::list_actor_states_response::ActorState;
64use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
65use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
66use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
67use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
68use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
69use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
70use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
71use risingwave_pb::meta::{
72    EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
73    PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
74    list_sink_log_store_tables_response,
75};
76use risingwave_pb::secret::PbSecretRef;
77use risingwave_pb::stream_plan::StreamFragmentGraph;
78use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
79use risingwave_pb::user::update_user_request::UpdateField;
80use risingwave_pb::user::{GrantPrivilege, UserInfo};
81use risingwave_rpc_client::error::Result as RpcResult;
82use tempfile::{Builder, NamedTempFile};
83
84use crate::FrontendOpts;
85use crate::catalog::catalog_service::CatalogWriter;
86use crate::catalog::root_catalog::Catalog;
87use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
88use crate::error::{ErrorCode, Result, RwError};
89use crate::handler::RwPgResponse;
90use crate::meta_client::FrontendMetaClient;
91use crate::scheduler::HummockSnapshotManagerRef;
92use crate::session::{AuthContext, FrontendEnv, SessionImpl};
93use crate::user::UserId;
94use crate::user::user_manager::UserInfoManager;
95use crate::user::user_service::UserInfoWriter;
96
97/// An embedded frontend without starting meta and without starting frontend as a tcp server.
98pub struct LocalFrontend {
99    pub opts: FrontendOpts,
100    env: FrontendEnv,
101}
102
103impl SessionManager for LocalFrontend {
104    type Error = RwError;
105    type Session = SessionImpl;
106
107    fn create_dummy_session(
108        &self,
109        _database_id: DatabaseId,
110    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
111        unreachable!()
112    }
113
114    fn connect(
115        &self,
116        _database: &str,
117        _user_name: &str,
118        _peer_addr: AddressRef,
119    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
120        Ok(self.session_ref())
121    }
122
123    fn cancel_queries_in_session(&self, _session_id: SessionId) {
124        unreachable!()
125    }
126
127    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
128        unreachable!()
129    }
130
131    fn end_session(&self, _session: &Self::Session) {
132        unreachable!()
133    }
134}
135
136impl LocalFrontend {
137    #[expect(clippy::unused_async)]
138    pub async fn new(opts: FrontendOpts) -> Self {
139        let env = FrontendEnv::mock();
140        Self { opts, env }
141    }
142
143    pub async fn run_sql(
144        &self,
145        sql: impl Into<String>,
146    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147        let sql: Arc<str> = Arc::from(sql.into());
148        Box::pin(self.session_ref().run_statement(sql, vec![])).await
149    }
150
151    pub async fn run_sql_with_session(
152        &self,
153        session_ref: Arc<SessionImpl>,
154        sql: impl Into<String>,
155    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
156        let sql: Arc<str> = Arc::from(sql.into());
157        Box::pin(session_ref.run_statement(sql, vec![])).await
158    }
159
160    pub async fn run_user_sql(
161        &self,
162        sql: impl Into<String>,
163        database: String,
164        user_name: String,
165        user_id: UserId,
166    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
167        let sql: Arc<str> = Arc::from(sql.into());
168        Box::pin(
169            self.session_user_ref(database, user_name, user_id)
170                .run_statement(sql, vec![]),
171        )
172        .await
173    }
174
175    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
176        let mut rsp = self.run_sql(sql).await.unwrap();
177        let mut res = vec![];
178        #[for_await]
179        for row_set in rsp.values_stream() {
180            for row in row_set.unwrap() {
181                res.push(format!("{:?}", row));
182            }
183        }
184        res
185    }
186
187    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
188        let mut rsp = self.run_sql(sql).await.unwrap();
189        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
190        let mut res = String::new();
191        #[for_await]
192        for row_set in rsp.values_stream() {
193            for row in row_set.unwrap() {
194                let row: Row = row;
195                let row = row.values()[0].as_ref().unwrap();
196                res += std::str::from_utf8(row).unwrap();
197                res += "\n";
198            }
199        }
200        res
201    }
202
203    /// Creates a new session
204    pub fn session_ref(&self) -> Arc<SessionImpl> {
205        self.session_user_ref(
206            DEFAULT_DATABASE_NAME.to_owned(),
207            DEFAULT_SUPER_USER.to_owned(),
208            DEFAULT_SUPER_USER_ID,
209        )
210    }
211
212    pub fn session_user_ref(
213        &self,
214        database: String,
215        user_name: String,
216        user_id: UserId,
217    ) -> Arc<SessionImpl> {
218        Arc::new(SessionImpl::new(
219            self.env.clone(),
220            AuthContext::new(database, user_name, user_id),
221            UserAuthenticator::None,
222            // Local Frontend use a non-sense id.
223            (0, 0),
224            Address::Tcp(SocketAddr::new(
225                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
226                6666,
227            ))
228            .into(),
229            Default::default(),
230        ))
231    }
232}
233
234pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
235    if rsp.stmt_type() != StatementType::EXPLAIN {
236        panic!("RESPONSE INVALID: {rsp:?}");
237    }
238    let mut res = String::new();
239    #[for_await]
240    for row_set in rsp.values_stream() {
241        for row in row_set.unwrap() {
242            let row: Row = row;
243            let row = row.values()[0].as_ref().unwrap();
244            res += std::str::from_utf8(row).unwrap();
245            res += "\n";
246        }
247    }
248    res
249}
250
251pub struct MockCatalogWriter {
252    catalog: Arc<RwLock<Catalog>>,
253    id: AtomicU32,
254    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
255    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
256    hummock_snapshot_manager: HummockSnapshotManagerRef,
257}
258
259#[async_trait::async_trait]
260impl CatalogWriter for MockCatalogWriter {
261    async fn create_database(
262        &self,
263        db_name: &str,
264        owner: UserId,
265        resource_group: &str,
266        barrier_interval_ms: Option<u32>,
267        checkpoint_frequency: Option<u64>,
268    ) -> Result<()> {
269        let database_id = DatabaseId::new(self.gen_id());
270        self.catalog.write().create_database(&PbDatabase {
271            name: db_name.to_owned(),
272            id: database_id,
273            owner,
274            resource_group: resource_group.to_owned(),
275            barrier_interval_ms,
276            checkpoint_frequency,
277        });
278        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
279            .await?;
280        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
281            .await?;
282        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
283            .await?;
284        Ok(())
285    }
286
287    async fn create_schema(
288        &self,
289        db_id: DatabaseId,
290        schema_name: &str,
291        owner: UserId,
292    ) -> Result<()> {
293        let id = self.gen_id();
294        self.catalog.write().create_schema(&PbSchema {
295            id,
296            name: schema_name.to_owned(),
297            database_id: db_id,
298            owner,
299        });
300        self.add_schema_id(id, db_id);
301        Ok(())
302    }
303
304    async fn create_materialized_view(
305        &self,
306        mut table: PbTable,
307        _graph: StreamFragmentGraph,
308        dependencies: HashSet<ObjectId>,
309        _resource_type: streaming_job_resource_type::ResourceType,
310        _if_not_exists: bool,
311        _refresh_interval_sec: Option<u64>,
312    ) -> Result<()> {
313        table.id = self.gen_id();
314        table.stream_job_status = PbStreamJobStatus::Created as _;
315        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
316        self.catalog.write().create_table(&table);
317        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
318        self.insert_object_dependencies(table.id.as_object_id(), dependencies);
319        self.hummock_snapshot_manager.add_table_for_test(table.id);
320        Ok(())
321    }
322
323    async fn replace_materialized_view(
324        &self,
325        mut table: PbTable,
326        _graph: StreamFragmentGraph,
327    ) -> Result<()> {
328        table.stream_job_status = PbStreamJobStatus::Created as _;
329        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
330        self.catalog.write().update_table(&table);
331        Ok(())
332    }
333
334    async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
335        view.id = self.gen_id();
336        self.catalog.write().create_view(&view);
337        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
338        self.insert_object_dependencies(view.id.as_object_id(), dependencies);
339        Ok(())
340    }
341
342    async fn create_table(
343        &self,
344        source: Option<PbSource>,
345        mut table: PbTable,
346        graph: StreamFragmentGraph,
347        _job_type: PbTableJobType,
348        if_not_exists: bool,
349        dependencies: HashSet<ObjectId>,
350    ) -> Result<()> {
351        if let Some(source) = source {
352            let source_id = self.create_source_inner(source)?;
353            table.optional_associated_source_id = Some(source_id.into());
354        }
355        self.create_materialized_view(
356            table,
357            graph,
358            dependencies,
359            streaming_job_resource_type::ResourceType::Regular(true),
360            if_not_exists,
361            None,
362        )
363        .await?;
364        Ok(())
365    }
366
367    async fn replace_table(
368        &self,
369        _source: Option<PbSource>,
370        mut table: PbTable,
371        _graph: StreamFragmentGraph,
372        _job_type: TableJobType,
373    ) -> Result<()> {
374        table.stream_job_status = PbStreamJobStatus::Created as _;
375        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
376        self.catalog.write().update_table(&table);
377        Ok(())
378    }
379
380    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
381        self.catalog.write().update_source(&source);
382        Ok(())
383    }
384
385    async fn create_source(
386        &self,
387        source: PbSource,
388        _graph: Option<StreamFragmentGraph>,
389        _if_not_exists: bool,
390    ) -> Result<()> {
391        self.create_source_inner(source).map(|_| ())
392    }
393
394    async fn create_sink(
395        &self,
396        sink: PbSink,
397        graph: StreamFragmentGraph,
398        dependencies: HashSet<ObjectId>,
399        _if_not_exists: bool,
400    ) -> Result<()> {
401        let sink_id = self.create_sink_inner(sink, graph)?;
402        self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
403        Ok(())
404    }
405
406    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
407        self.create_subscription_inner(subscription)
408    }
409
410    async fn create_index(
411        &self,
412        mut index: PbIndex,
413        mut index_table: PbTable,
414        _graph: StreamFragmentGraph,
415        _if_not_exists: bool,
416    ) -> Result<()> {
417        index_table.id = self.gen_id();
418        index_table.stream_job_status = PbStreamJobStatus::Created as _;
419        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
420        self.catalog.write().create_table(&index_table);
421        self.add_table_or_index_id(
422            index_table.id.as_raw_id(),
423            index_table.schema_id,
424            index_table.database_id,
425        );
426
427        index.id = index_table.id.as_raw_id().into();
428        index.index_table_id = index_table.id;
429        self.catalog.write().create_index(&index);
430        Ok(())
431    }
432
433    async fn create_function(&self, _function: PbFunction) -> Result<()> {
434        unreachable!()
435    }
436
437    async fn create_connection(
438        &self,
439        _connection_name: String,
440        _database_id: DatabaseId,
441        _schema_id: SchemaId,
442        _owner_id: UserId,
443        _connection: create_connection_request::Payload,
444    ) -> Result<()> {
445        unreachable!()
446    }
447
448    async fn create_secret(
449        &self,
450        _secret_name: String,
451        _database_id: DatabaseId,
452        _schema_id: SchemaId,
453        _owner_id: UserId,
454        _payload: Vec<u8>,
455    ) -> Result<()> {
456        unreachable!()
457    }
458
459    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
460        unreachable!()
461    }
462
463    async fn drop_table(
464        &self,
465        source_id: Option<SourceId>,
466        table_id: TableId,
467        cascade: bool,
468    ) -> Result<()> {
469        if cascade {
470            return Err(ErrorCode::NotSupported(
471                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
472                "use drop instead".to_owned(),
473            )
474            .into());
475        }
476        if let Some(source_id) = source_id {
477            self.drop_table_or_source_id(source_id.as_raw_id());
478        }
479        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
480        let indexes =
481            self.catalog
482                .read()
483                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
484        for index in indexes {
485            self.drop_index(index.id, cascade).await?;
486        }
487        self.catalog
488            .write()
489            .drop_table(database_id, schema_id, table_id);
490        if let Some(source_id) = source_id {
491            self.catalog
492                .write()
493                .drop_source(database_id, schema_id, source_id);
494        }
495        Ok(())
496    }
497
498    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
499        unreachable!()
500    }
501
502    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
503        if cascade {
504            return Err(ErrorCode::NotSupported(
505                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
506                "use drop instead".to_owned(),
507            )
508            .into());
509        }
510        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
511        let indexes =
512            self.catalog
513                .read()
514                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
515        for index in indexes {
516            self.drop_index(index.id, cascade).await?;
517        }
518        self.catalog
519            .write()
520            .drop_table(database_id, schema_id, table_id);
521        Ok(())
522    }
523
524    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
525        if cascade {
526            return Err(ErrorCode::NotSupported(
527                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
528                "use drop instead".to_owned(),
529            )
530            .into());
531        }
532        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
533        self.catalog
534            .write()
535            .drop_source(database_id, schema_id, source_id);
536        Ok(())
537    }
538
539    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
540        Ok(())
541    }
542
543    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
544        if cascade {
545            return Err(ErrorCode::NotSupported(
546                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
547                "use drop instead".to_owned(),
548            )
549            .into());
550        }
551        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
552        self.catalog
553            .write()
554            .drop_sink(database_id, schema_id, sink_id);
555        Ok(())
556    }
557
558    async fn drop_subscription(
559        &self,
560        subscription_id: SubscriptionId,
561        cascade: bool,
562    ) -> Result<()> {
563        if cascade {
564            return Err(ErrorCode::NotSupported(
565                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
566                "use drop instead".to_owned(),
567            )
568            .into());
569        }
570        let (database_id, schema_id) =
571            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
572        self.catalog
573            .write()
574            .drop_subscription(database_id, schema_id, subscription_id);
575        Ok(())
576    }
577
578    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
579        if cascade {
580            return Err(ErrorCode::NotSupported(
581                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
582                "use drop instead".to_owned(),
583            )
584            .into());
585        }
586        let &schema_id = self
587            .table_id_to_schema_id
588            .read()
589            .get(&index_id.as_raw_id())
590            .unwrap();
591        let database_id = self.get_database_id_by_schema(schema_id);
592
593        let index = {
594            let catalog_reader = self.catalog.read();
595            let schema_catalog = catalog_reader
596                .get_schema_by_id(database_id, schema_id)
597                .unwrap();
598            schema_catalog.get_index_by_id(index_id).unwrap().clone()
599        };
600
601        let index_table_id = index.index_table().id;
602        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
603        self.catalog
604            .write()
605            .drop_index(database_id, schema_id, index_id);
606        self.catalog
607            .write()
608            .drop_table(database_id, schema_id, index_table_id);
609        Ok(())
610    }
611
612    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
613        unreachable!()
614    }
615
616    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
617        unreachable!()
618    }
619
620    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
621        unreachable!()
622    }
623
624    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
625        self.catalog.write().drop_database(database_id);
626        Ok(())
627    }
628
629    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
630        let database_id = self.drop_schema_id(schema_id);
631        self.catalog.write().drop_schema(database_id, schema_id);
632        Ok(())
633    }
634
635    async fn alter_name(
636        &self,
637        object_id: alter_name_request::Object,
638        object_name: &str,
639    ) -> Result<()> {
640        match object_id {
641            alter_name_request::Object::TableId(table_id) => {
642                self.catalog
643                    .write()
644                    .alter_table_name_by_id(table_id, object_name);
645                Ok(())
646            }
647            _ => {
648                unimplemented!()
649            }
650        }
651    }
652
653    async fn alter_source(&self, source: PbSource) -> Result<()> {
654        self.catalog.write().update_source(&source);
655        Ok(())
656    }
657
658    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
659        for database in self.catalog.read().iter_databases() {
660            for schema in database.iter_schemas() {
661                match object {
662                    Object::TableId(table_id) => {
663                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
664                        {
665                            let mut pb_table = table.to_prost();
666                            pb_table.owner = owner_id;
667                            self.catalog.write().update_table(&pb_table);
668                            return Ok(());
669                        }
670                    }
671                    _ => unreachable!(),
672                }
673            }
674        }
675
676        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
677    }
678
679    async fn alter_subscription_retention(
680        &self,
681        subscription_id: SubscriptionId,
682        retention_seconds: u64,
683        definition: String,
684    ) -> Result<()> {
685        for database in self.catalog.read().iter_databases() {
686            for schema in database.iter_schemas() {
687                if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
688                    let mut pb_subscription = subscription.to_proto();
689                    pb_subscription.retention_seconds = retention_seconds;
690                    pb_subscription.definition = definition;
691                    self.catalog.write().update_subscription(&pb_subscription);
692                    return Ok(());
693                }
694            }
695        }
696
697        Err(
698            ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
699                .into(),
700        )
701    }
702
703    async fn alter_set_schema(
704        &self,
705        object: alter_set_schema_request::Object,
706        new_schema_id: SchemaId,
707    ) -> Result<()> {
708        match object {
709            alter_set_schema_request::Object::TableId(table_id) => {
710                let mut pb_table = {
711                    let reader = self.catalog.read();
712                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
713                    table.to_prost()
714                };
715                pb_table.schema_id = new_schema_id;
716                self.catalog.write().update_table(&pb_table);
717                self.table_id_to_schema_id
718                    .write()
719                    .insert(table_id.as_raw_id(), new_schema_id);
720                Ok(())
721            }
722            _ => unreachable!(),
723        }
724    }
725
726    async fn alter_parallelism(
727        &self,
728        _job_id: JobId,
729        _parallelism: PbTableParallelism,
730        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
731        _deferred: bool,
732    ) -> Result<()> {
733        todo!()
734    }
735
736    async fn alter_backfill_parallelism(
737        &self,
738        _job_id: JobId,
739        _parallelism: Option<PbTableParallelism>,
740        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
741        _deferred: bool,
742    ) -> Result<()> {
743        todo!()
744    }
745
746    async fn alter_config(
747        &self,
748        _job_id: JobId,
749        _entries_to_add: HashMap<String, String>,
750        _keys_to_remove: Vec<String>,
751    ) -> Result<()> {
752        todo!()
753    }
754
755    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
756        todo!()
757    }
758
759    async fn alter_secret(
760        &self,
761        _secret_id: SecretId,
762        _secret_name: String,
763        _database_id: DatabaseId,
764        _schema_id: SchemaId,
765        _owner_id: UserId,
766        _payload: Vec<u8>,
767    ) -> Result<()> {
768        unreachable!()
769    }
770
771    async fn alter_resource_group(
772        &self,
773        _table_id: TableId,
774        _resource_group: Option<String>,
775        _deferred: bool,
776    ) -> Result<()> {
777        todo!()
778    }
779
780    async fn alter_database_param(
781        &self,
782        database_id: DatabaseId,
783        param: AlterDatabaseParam,
784    ) -> Result<()> {
785        let mut pb_database = {
786            let reader = self.catalog.read();
787            let database = reader.get_database_by_id(database_id)?.to_owned();
788            database.to_prost()
789        };
790        match param {
791            AlterDatabaseParam::BarrierIntervalMs(interval) => {
792                pb_database.barrier_interval_ms = interval;
793            }
794            AlterDatabaseParam::CheckpointFrequency(frequency) => {
795                pb_database.checkpoint_frequency = frequency;
796            }
797        }
798        self.catalog.write().update_database(&pb_database);
799        Ok(())
800    }
801
802    async fn create_iceberg_table(
803        &self,
804        _table_job_info: PbTableJobInfo,
805        _sink_job_info: PbSinkJobInfo,
806        _iceberg_source: PbSource,
807        _if_not_exists: bool,
808    ) -> Result<()> {
809        todo!()
810    }
811
812    async fn wait(&self, _job_id: Option<JobId>) -> Result<()> {
813        Ok(())
814    }
815}
816
817impl MockCatalogWriter {
818    pub fn new(
819        catalog: Arc<RwLock<Catalog>>,
820        hummock_snapshot_manager: HummockSnapshotManagerRef,
821    ) -> Self {
822        catalog.write().create_database(&PbDatabase {
823            id: 0.into(),
824            name: DEFAULT_DATABASE_NAME.to_owned(),
825            owner: DEFAULT_SUPER_USER_ID,
826            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
827            barrier_interval_ms: None,
828            checkpoint_frequency: None,
829        });
830        catalog.write().create_schema(&PbSchema {
831            id: 1.into(),
832            name: DEFAULT_SCHEMA_NAME.to_owned(),
833            database_id: 0.into(),
834            owner: DEFAULT_SUPER_USER_ID,
835        });
836        catalog.write().create_schema(&PbSchema {
837            id: 2.into(),
838            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
839            database_id: 0.into(),
840            owner: DEFAULT_SUPER_USER_ID,
841        });
842        catalog.write().create_schema(&PbSchema {
843            id: 3.into(),
844            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
845            database_id: 0.into(),
846            owner: DEFAULT_SUPER_USER_ID,
847        });
848        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
849        map.insert(1_u32.into(), 0_u32.into());
850        map.insert(2_u32.into(), 0_u32.into());
851        map.insert(3_u32.into(), 0_u32.into());
852        Self {
853            catalog,
854            id: AtomicU32::new(3),
855            table_id_to_schema_id: Default::default(),
856            schema_id_to_database_id: RwLock::new(map),
857            hummock_snapshot_manager,
858        }
859    }
860
861    fn gen_id<T: From<u32>>(&self) -> T {
862        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
863        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
864    }
865
866    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
867        self.table_id_to_schema_id
868            .write()
869            .insert(table_id, schema_id);
870    }
871
872    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
873        let schema_id = self
874            .table_id_to_schema_id
875            .write()
876            .remove(&table_id)
877            .unwrap();
878        (self.get_database_id_by_schema(schema_id), schema_id)
879    }
880
881    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
882        self.table_id_to_schema_id
883            .write()
884            .insert(table_id, schema_id);
885    }
886
887    fn add_table_or_subscription_id(
888        &self,
889        table_id: u32,
890        schema_id: SchemaId,
891        _database_id: DatabaseId,
892    ) {
893        self.table_id_to_schema_id
894            .write()
895            .insert(table_id, schema_id);
896    }
897
898    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
899        self.table_id_to_schema_id
900            .write()
901            .insert(table_id, schema_id);
902    }
903
904    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
905        let schema_id = self
906            .table_id_to_schema_id
907            .write()
908            .remove(&table_id)
909            .unwrap();
910        (self.get_database_id_by_schema(schema_id), schema_id)
911    }
912
913    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
914        let schema_id = self
915            .table_id_to_schema_id
916            .write()
917            .remove(&table_id)
918            .unwrap();
919        (self.get_database_id_by_schema(schema_id), schema_id)
920    }
921
922    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
923        let schema_id = self
924            .table_id_to_schema_id
925            .write()
926            .remove(&table_id)
927            .unwrap();
928        (self.get_database_id_by_schema(schema_id), schema_id)
929    }
930
931    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
932        self.schema_id_to_database_id
933            .write()
934            .insert(schema_id, database_id);
935    }
936
937    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
938        self.schema_id_to_database_id
939            .write()
940            .remove(&schema_id)
941            .unwrap()
942    }
943
944    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
945        source.id = self.gen_id();
946        self.catalog.write().create_source(&source);
947        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
948        Ok(source.id)
949    }
950
951    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
952        sink.id = self.gen_id();
953        sink.stream_job_status = PbStreamJobStatus::Created as _;
954        self.catalog.write().create_sink(&sink);
955        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
956        Ok(sink.id)
957    }
958
959    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
960        subscription.id = self.gen_id();
961        self.catalog.write().create_subscription(&subscription);
962        self.add_table_or_subscription_id(
963            subscription.id.as_raw_id(),
964            subscription.schema_id,
965            subscription.database_id,
966        );
967        Ok(())
968    }
969
970    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
971        *self
972            .schema_id_to_database_id
973            .read()
974            .get(&schema_id)
975            .unwrap()
976    }
977
978    fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
979        let catalog = self.catalog.read();
980        for database in catalog.iter_databases() {
981            for schema in database.iter_schemas() {
982                if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
983                    return if table.is_mview() {
984                        PbObjectType::Mview
985                    } else {
986                        PbObjectType::Table
987                    };
988                }
989                if schema.get_source_by_id(object_id.as_source_id()).is_some() {
990                    return PbObjectType::Source;
991                }
992                if schema.get_view_by_id(object_id.as_view_id()).is_some() {
993                    return PbObjectType::View;
994                }
995                if schema.get_index_by_id(object_id.as_index_id()).is_some() {
996                    return PbObjectType::Index;
997                }
998            }
999        }
1000        PbObjectType::Unspecified
1001    }
1002
1003    fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
1004        if dependencies.is_empty() {
1005            return;
1006        }
1007        let dependencies = dependencies
1008            .into_iter()
1009            .map(|referenced_object_id| PbObjectDependency {
1010                object_id,
1011                referenced_object_id,
1012                referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1013            })
1014            .collect();
1015        self.catalog
1016            .write()
1017            .insert_object_dependencies(dependencies);
1018    }
1019}
1020
1021pub struct MockUserInfoWriter {
1022    id: AtomicU32,
1023    user_info: Arc<RwLock<UserInfoManager>>,
1024}
1025
1026#[async_trait::async_trait]
1027impl UserInfoWriter for MockUserInfoWriter {
1028    async fn create_user(&self, user: UserInfo) -> Result<()> {
1029        let mut user = user;
1030        user.id = self.gen_id().into();
1031        self.user_info.write().create_user(user);
1032        Ok(())
1033    }
1034
1035    async fn drop_user(&self, id: UserId) -> Result<()> {
1036        self.user_info.write().drop_user(id);
1037        Ok(())
1038    }
1039
1040    async fn update_user(
1041        &self,
1042        update_user: UserInfo,
1043        update_fields: Vec<UpdateField>,
1044    ) -> Result<()> {
1045        let mut lock = self.user_info.write();
1046        let id = update_user.get_id();
1047        let Some(old_name) = lock.get_user_name_by_id(id) else {
1048            return Ok(());
1049        };
1050        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1051        update_fields.into_iter().for_each(|field| match field {
1052            UpdateField::Super => user_info.is_super = update_user.is_super,
1053            UpdateField::Login => user_info.can_login = update_user.can_login,
1054            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1055            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1056            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1057            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1058            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1059            UpdateField::Unspecified => unreachable!(),
1060        });
1061        lock.update_user(update_user);
1062        Ok(())
1063    }
1064
1065    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
1066    /// `GrantAllSources` when grant privilege to user.
1067    async fn grant_privilege(
1068        &self,
1069        users: Vec<UserId>,
1070        privileges: Vec<GrantPrivilege>,
1071        with_grant_option: bool,
1072        _grantor: UserId,
1073    ) -> Result<()> {
1074        let privileges = privileges
1075            .into_iter()
1076            .map(|mut p| {
1077                p.action_with_opts
1078                    .iter_mut()
1079                    .for_each(|ao| ao.with_grant_option = with_grant_option);
1080                p
1081            })
1082            .collect::<Vec<_>>();
1083        for user_id in users {
1084            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1085                u.extend_privileges(privileges.clone());
1086            }
1087        }
1088        Ok(())
1089    }
1090
1091    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1092    /// `RevokeAllSources` when revoke privilege from user.
1093    async fn revoke_privilege(
1094        &self,
1095        users: Vec<UserId>,
1096        privileges: Vec<GrantPrivilege>,
1097        _granted_by: UserId,
1098        _revoke_by: UserId,
1099        revoke_grant_option: bool,
1100        _cascade: bool,
1101    ) -> Result<()> {
1102        for user_id in users {
1103            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1104                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1105            }
1106        }
1107        Ok(())
1108    }
1109
1110    async fn alter_default_privilege(
1111        &self,
1112        _users: Vec<UserId>,
1113        _database_id: DatabaseId,
1114        _schemas: Vec<SchemaId>,
1115        _operation: AlterDefaultPrivilegeOperation,
1116        _operated_by: UserId,
1117    ) -> Result<()> {
1118        todo!()
1119    }
1120}
1121
1122impl MockUserInfoWriter {
1123    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1124        user_info.write().create_user(UserInfo {
1125            id: DEFAULT_SUPER_USER_ID,
1126            name: DEFAULT_SUPER_USER.to_owned(),
1127            is_super: true,
1128            can_create_db: true,
1129            can_create_user: true,
1130            can_login: true,
1131            ..Default::default()
1132        });
1133        user_info.write().create_user(UserInfo {
1134            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1135            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1136            is_super: true,
1137            can_create_db: true,
1138            can_create_user: true,
1139            can_login: true,
1140            is_admin: true,
1141            ..Default::default()
1142        });
1143        Self {
1144            user_info,
1145            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1146        }
1147    }
1148
1149    fn gen_id(&self) -> u32 {
1150        self.id.fetch_add(1, Ordering::SeqCst)
1151    }
1152}
1153
1154pub struct MockFrontendMetaClient {}
1155
1156#[async_trait::async_trait]
1157impl FrontendMetaClient for MockFrontendMetaClient {
1158    async fn try_unregister(&self) {}
1159
1160    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1161        Ok(INVALID_VERSION_ID)
1162    }
1163
1164    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1165        Ok(vec![])
1166    }
1167
1168    async fn list_table_fragments(
1169        &self,
1170        _table_ids: &[JobId],
1171    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1172        Ok(HashMap::default())
1173    }
1174
1175    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1176        Ok(vec![])
1177    }
1178
1179    async fn list_fragment_distribution(
1180        &self,
1181        _include_node: bool,
1182    ) -> RpcResult<Vec<FragmentDistribution>> {
1183        Ok(vec![])
1184    }
1185
1186    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1187        Ok(vec![])
1188    }
1189
1190    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1191        Ok(vec![])
1192    }
1193
1194    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1195        Ok(vec![])
1196    }
1197
1198    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1199        Ok(vec![])
1200    }
1201
1202    async fn list_sink_log_store_tables(
1203        &self,
1204    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1205        Ok(vec![])
1206    }
1207
1208    async fn set_system_param(
1209        &self,
1210        _param: String,
1211        _value: Option<String>,
1212    ) -> RpcResult<Option<SystemParamsReader>> {
1213        Ok(Some(SystemParams::default().into()))
1214    }
1215
1216    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1217        Ok(Default::default())
1218    }
1219
1220    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1221        Ok("".to_owned())
1222    }
1223
1224    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1225        Ok(vec![])
1226    }
1227
1228    async fn get_tables(
1229        &self,
1230        _table_ids: Vec<crate::catalog::TableId>,
1231        _include_dropped_tables: bool,
1232    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1233        Ok(HashMap::new())
1234    }
1235
1236    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1237        unimplemented!()
1238    }
1239
1240    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1241        unimplemented!()
1242    }
1243
1244    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1245        Ok(HummockVersion::default())
1246    }
1247
1248    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1249        unimplemented!()
1250    }
1251
1252    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1253        unimplemented!()
1254    }
1255
1256    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1257        unimplemented!()
1258    }
1259
1260    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1261        unimplemented!()
1262    }
1263
1264    async fn list_hummock_active_write_limits(
1265        &self,
1266    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1267        unimplemented!()
1268    }
1269
1270    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1271        unimplemented!()
1272    }
1273
1274    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1275        Ok(vec![])
1276    }
1277
1278    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1279        unimplemented!()
1280    }
1281
1282    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1283        Ok(vec![])
1284    }
1285
1286    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1287        unimplemented!()
1288    }
1289
1290    async fn recover(&self) -> RpcResult<()> {
1291        unimplemented!()
1292    }
1293
1294    async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1295        unimplemented!()
1296    }
1297
1298    async fn get_backup_job_status(
1299        &self,
1300        _job_id: u64,
1301    ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1302        unimplemented!()
1303    }
1304
1305    async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1306        unimplemented!()
1307    }
1308
1309    async fn apply_throttle(
1310        &self,
1311        _throttle_target: PbThrottleTarget,
1312        _throttle_type: risingwave_pb::common::PbThrottleType,
1313        _id: u32,
1314        _rate_limit: Option<u32>,
1315    ) -> RpcResult<()> {
1316        unimplemented!()
1317    }
1318
1319    async fn alter_fragment_parallelism(
1320        &self,
1321        _fragment_ids: Vec<FragmentId>,
1322        _parallelism: Option<PbTableParallelism>,
1323    ) -> RpcResult<()> {
1324        unimplemented!()
1325    }
1326
1327    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1328        Ok(RecoveryStatus::StatusRunning)
1329    }
1330
1331    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1332        Ok(vec![])
1333    }
1334
1335    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1336        Ok(vec![])
1337    }
1338
1339    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1340        Ok(HashMap::default())
1341    }
1342
1343    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1344        unimplemented!()
1345    }
1346
1347    async fn alter_sink_props(
1348        &self,
1349        _sink_id: SinkId,
1350        _changed_props: BTreeMap<String, String>,
1351        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1352        _connector_conn_ref: Option<ConnectionId>,
1353    ) -> RpcResult<()> {
1354        unimplemented!()
1355    }
1356
1357    async fn alter_iceberg_table_props(
1358        &self,
1359        _table_id: TableId,
1360        _sink_id: SinkId,
1361        _source_id: SourceId,
1362        _changed_props: BTreeMap<String, String>,
1363        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1364        _connector_conn_ref: Option<ConnectionId>,
1365    ) -> RpcResult<()> {
1366        unimplemented!()
1367    }
1368
1369    async fn alter_source_connector_props(
1370        &self,
1371        _source_id: SourceId,
1372        _changed_props: BTreeMap<String, String>,
1373        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1374        _connector_conn_ref: Option<ConnectionId>,
1375    ) -> RpcResult<()> {
1376        unimplemented!()
1377    }
1378
1379    async fn alter_connection_connector_props(
1380        &self,
1381        _connection_id: u32,
1382        _changed_props: BTreeMap<String, String>,
1383        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1384    ) -> RpcResult<()> {
1385        Ok(())
1386    }
1387
1388    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1389        unimplemented!()
1390    }
1391
1392    async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1393        Ok(vec![])
1394    }
1395
1396    async fn get_fragment_by_id(
1397        &self,
1398        _fragment_id: FragmentId,
1399    ) -> RpcResult<Option<FragmentDistribution>> {
1400        unimplemented!()
1401    }
1402
1403    async fn get_fragment_vnodes(
1404        &self,
1405        _fragment_id: FragmentId,
1406    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1407        unimplemented!()
1408    }
1409
1410    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1411        unimplemented!()
1412    }
1413
1414    fn worker_id(&self) -> WorkerId {
1415        0.into()
1416    }
1417
1418    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1419        Ok(())
1420    }
1421
1422    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1423        Ok(1)
1424    }
1425
1426    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1427        Ok(())
1428    }
1429
1430    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1431        Ok(RefreshResponse { status: None })
1432    }
1433
1434    fn cluster_id(&self) -> &str {
1435        "test-cluster-uuid"
1436    }
1437
1438    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1439        unimplemented!()
1440    }
1441
1442    async fn get_hummock_table_change_log(
1443        &self,
1444        _start_epoch_inclusive: Option<u64>,
1445        _end_epoch_inclusive: Option<u64>,
1446        _table_ids: Option<HashSet<TableId>>,
1447        _exclude_empty: bool,
1448        _limit: Option<u32>,
1449    ) -> RpcResult<TableChangeLogs> {
1450        Ok(HashMap::default())
1451    }
1452
1453    async fn update_compaction_config(
1454        &self,
1455        _compaction_group_ids: Vec<risingwave_hummock_sdk::CompactionGroupId>,
1456        _configs: Vec<PbMutableConfig>,
1457    ) -> RpcResult<()> {
1458        Ok(())
1459    }
1460}
1461
1462#[cfg(test)]
1463pub static PROTO_FILE_DATA: &str = r#"
1464    syntax = "proto3";
1465    package test;
1466    message TestRecord {
1467      int32 id = 1;
1468      Country country = 3;
1469      int64 zipcode = 4;
1470      float rate = 5;
1471    }
1472    message TestRecordAlterType {
1473        string id = 1;
1474        Country country = 3;
1475        int32 zipcode = 4;
1476        float rate = 5;
1477      }
1478    message TestRecordExt {
1479      int32 id = 1;
1480      Country country = 3;
1481      int64 zipcode = 4;
1482      float rate = 5;
1483      string name = 6;
1484    }
1485    message Country {
1486      string address = 1;
1487      City city = 2;
1488      string zipcode = 3;
1489    }
1490    message City {
1491      string address = 1;
1492      string zipcode = 2;
1493    }"#;
1494
1495/// Returns the file.
1496/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1497pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1498    let in_file = Builder::new()
1499        .prefix("temp")
1500        .suffix(".proto")
1501        .rand_bytes(8)
1502        .tempfile()
1503        .unwrap();
1504
1505    let out_file = Builder::new()
1506        .prefix("temp")
1507        .suffix(".pb")
1508        .rand_bytes(8)
1509        .tempfile()
1510        .unwrap();
1511
1512    let mut file = in_file.as_file();
1513    file.write_all(proto_data.as_ref())
1514        .expect("writing binary to test file");
1515    file.flush().expect("flush temp file failed");
1516    let include_path = in_file
1517        .path()
1518        .parent()
1519        .unwrap()
1520        .to_string_lossy()
1521        .into_owned();
1522    let out_path = out_file.path().to_string_lossy().into_owned();
1523    let in_path = in_file.path().to_string_lossy().into_owned();
1524    let mut compile = std::process::Command::new("protoc");
1525
1526    let out = compile
1527        .arg("--include_imports")
1528        .arg("-I")
1529        .arg(include_path)
1530        .arg(format!("--descriptor_set_out={}", out_path))
1531        .arg(in_path)
1532        .output()
1533        .expect("failed to compile proto");
1534    if !out.status.success() {
1535        panic!("compile proto failed \n output: {:?}", out);
1536    }
1537    out_file
1538}