risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::AdaptiveParallelismStrategy;
37use risingwave_common::system_param::reader::SystemParamsReader;
38use risingwave_common::util::cluster_limit::ClusterLimit;
39use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
40use risingwave_hummock_sdk::change_log::TableChangeLogs;
41use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
42use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
43use risingwave_pb::backup_service::MetaSnapshotMetadata;
44use risingwave_pb::catalog::{
45    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
46    PbSubscription, PbTable, PbView, Table,
47};
48use risingwave_pb::common::{PbObjectType, WorkerNode};
49use risingwave_pb::ddl_service::alter_owner_request::Object;
50use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
51use risingwave_pb::ddl_service::{
52    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
53    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
54};
55use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig as PbMutableConfig;
56use risingwave_pb::hummock::write_limits::WriteLimit;
57use risingwave_pb::hummock::{
58    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
59};
60use risingwave_pb::id::ActorId;
61use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
62use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
63use risingwave_pb::meta::list_actor_states_response::ActorState;
64use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
65use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
66use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
67use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
68use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
69use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
70use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
71use risingwave_pb::meta::{
72    EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
73    PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
74    list_sink_log_store_tables_response,
75};
76use risingwave_pb::secret::PbSecretRef;
77use risingwave_pb::stream_plan::StreamFragmentGraph;
78use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
79use risingwave_pb::user::update_user_request::UpdateField;
80use risingwave_pb::user::{GrantPrivilege, UserInfo};
81use risingwave_rpc_client::error::Result as RpcResult;
82use tempfile::{Builder, NamedTempFile};
83
84use crate::FrontendOpts;
85use crate::catalog::catalog_service::CatalogWriter;
86use crate::catalog::root_catalog::Catalog;
87use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
88use crate::error::{ErrorCode, Result, RwError};
89use crate::handler::RwPgResponse;
90use crate::meta_client::FrontendMetaClient;
91use crate::scheduler::HummockSnapshotManagerRef;
92use crate::session::{AuthContext, FrontendEnv, SessionImpl};
93use crate::user::UserId;
94use crate::user::user_manager::UserInfoManager;
95use crate::user::user_service::UserInfoWriter;
96
97/// An embedded frontend without starting meta and without starting frontend as a tcp server.
98pub struct LocalFrontend {
99    pub opts: FrontendOpts,
100    env: FrontendEnv,
101}
102
103impl SessionManager for LocalFrontend {
104    type Error = RwError;
105    type Session = SessionImpl;
106
107    fn create_dummy_session(
108        &self,
109        _database_id: DatabaseId,
110    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
111        unreachable!()
112    }
113
114    fn connect(
115        &self,
116        _database: &str,
117        _user_name: &str,
118        _peer_addr: AddressRef,
119    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
120        Ok(self.session_ref())
121    }
122
123    fn cancel_queries_in_session(&self, _session_id: SessionId) {
124        unreachable!()
125    }
126
127    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
128        unreachable!()
129    }
130
131    fn end_session(&self, _session: &Self::Session) {
132        unreachable!()
133    }
134}
135
136impl LocalFrontend {
137    #[expect(clippy::unused_async)]
138    pub async fn new(opts: FrontendOpts) -> Self {
139        let env = FrontendEnv::mock();
140        Self { opts, env }
141    }
142
143    pub async fn run_sql(
144        &self,
145        sql: impl Into<String>,
146    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
147        let sql: Arc<str> = Arc::from(sql.into());
148        Box::pin(self.session_ref().run_statement(sql, vec![])).await
149    }
150
151    pub async fn run_sql_with_session(
152        &self,
153        session_ref: Arc<SessionImpl>,
154        sql: impl Into<String>,
155    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
156        let sql: Arc<str> = Arc::from(sql.into());
157        Box::pin(session_ref.run_statement(sql, vec![])).await
158    }
159
160    pub async fn run_user_sql(
161        &self,
162        sql: impl Into<String>,
163        database: String,
164        user_name: String,
165        user_id: UserId,
166    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
167        let sql: Arc<str> = Arc::from(sql.into());
168        Box::pin(
169            self.session_user_ref(database, user_name, user_id)
170                .run_statement(sql, vec![]),
171        )
172        .await
173    }
174
175    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
176        let mut rsp = self.run_sql(sql).await.unwrap();
177        let mut res = vec![];
178        #[for_await]
179        for row_set in rsp.values_stream() {
180            for row in row_set.unwrap() {
181                res.push(format!("{:?}", row));
182            }
183        }
184        res
185    }
186
187    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
188        let mut rsp = self.run_sql(sql).await.unwrap();
189        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
190        let mut res = String::new();
191        #[for_await]
192        for row_set in rsp.values_stream() {
193            for row in row_set.unwrap() {
194                let row: Row = row;
195                let row = row.values()[0].as_ref().unwrap();
196                res += std::str::from_utf8(row).unwrap();
197                res += "\n";
198            }
199        }
200        res
201    }
202
203    /// Creates a new session
204    pub fn session_ref(&self) -> Arc<SessionImpl> {
205        self.session_user_ref(
206            DEFAULT_DATABASE_NAME.to_owned(),
207            DEFAULT_SUPER_USER.to_owned(),
208            DEFAULT_SUPER_USER_ID,
209        )
210    }
211
212    pub fn session_user_ref(
213        &self,
214        database: String,
215        user_name: String,
216        user_id: UserId,
217    ) -> Arc<SessionImpl> {
218        Arc::new(SessionImpl::new(
219            self.env.clone(),
220            AuthContext::new(database, user_name, user_id),
221            UserAuthenticator::None,
222            // Local Frontend use a non-sense id.
223            (0, 0),
224            Address::Tcp(SocketAddr::new(
225                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
226                6666,
227            ))
228            .into(),
229            Default::default(),
230        ))
231    }
232}
233
234pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
235    if rsp.stmt_type() != StatementType::EXPLAIN {
236        panic!("RESPONSE INVALID: {rsp:?}");
237    }
238    let mut res = String::new();
239    #[for_await]
240    for row_set in rsp.values_stream() {
241        for row in row_set.unwrap() {
242            let row: Row = row;
243            let row = row.values()[0].as_ref().unwrap();
244            res += std::str::from_utf8(row).unwrap();
245            res += "\n";
246        }
247    }
248    res
249}
250
251pub struct MockCatalogWriter {
252    catalog: Arc<RwLock<Catalog>>,
253    id: AtomicU32,
254    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
255    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
256    hummock_snapshot_manager: HummockSnapshotManagerRef,
257}
258
259#[async_trait::async_trait]
260impl CatalogWriter for MockCatalogWriter {
261    async fn create_database(
262        &self,
263        db_name: &str,
264        owner: UserId,
265        resource_group: &str,
266        barrier_interval_ms: Option<u32>,
267        checkpoint_frequency: Option<u64>,
268    ) -> Result<()> {
269        let database_id = DatabaseId::new(self.gen_id());
270        self.catalog.write().create_database(&PbDatabase {
271            name: db_name.to_owned(),
272            id: database_id,
273            owner,
274            resource_group: resource_group.to_owned(),
275            barrier_interval_ms,
276            checkpoint_frequency,
277        });
278        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
279            .await?;
280        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
281            .await?;
282        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
283            .await?;
284        Ok(())
285    }
286
287    async fn create_schema(
288        &self,
289        db_id: DatabaseId,
290        schema_name: &str,
291        owner: UserId,
292    ) -> Result<()> {
293        let id = self.gen_id();
294        self.catalog.write().create_schema(&PbSchema {
295            id,
296            name: schema_name.to_owned(),
297            database_id: db_id,
298            owner,
299        });
300        self.add_schema_id(id, db_id);
301        Ok(())
302    }
303
304    async fn create_materialized_view(
305        &self,
306        mut table: PbTable,
307        _graph: StreamFragmentGraph,
308        dependencies: HashSet<ObjectId>,
309        _resource_type: streaming_job_resource_type::ResourceType,
310        _if_not_exists: bool,
311        _refresh_interval_sec: Option<u64>,
312    ) -> Result<()> {
313        table.id = self.gen_id();
314        table.stream_job_status = PbStreamJobStatus::Created as _;
315        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
316        self.catalog.write().create_table(&table);
317        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
318        self.insert_object_dependencies(table.id.as_object_id(), dependencies);
319        self.hummock_snapshot_manager.add_table_for_test(table.id);
320        Ok(())
321    }
322
323    async fn replace_materialized_view(
324        &self,
325        mut table: PbTable,
326        _graph: StreamFragmentGraph,
327    ) -> Result<()> {
328        table.stream_job_status = PbStreamJobStatus::Created as _;
329        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
330        self.catalog.write().update_table(&table);
331        Ok(())
332    }
333
334    async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
335        view.id = self.gen_id();
336        self.catalog.write().create_view(&view);
337        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
338        self.insert_object_dependencies(view.id.as_object_id(), dependencies);
339        Ok(())
340    }
341
342    async fn create_table(
343        &self,
344        source: Option<PbSource>,
345        mut table: PbTable,
346        graph: StreamFragmentGraph,
347        _job_type: PbTableJobType,
348        if_not_exists: bool,
349        dependencies: HashSet<ObjectId>,
350    ) -> Result<()> {
351        if let Some(source) = source {
352            let source_id = self.create_source_inner(source)?;
353            table.optional_associated_source_id = Some(source_id.into());
354        }
355        self.create_materialized_view(
356            table,
357            graph,
358            dependencies,
359            streaming_job_resource_type::ResourceType::Regular(true),
360            if_not_exists,
361            None,
362        )
363        .await?;
364        Ok(())
365    }
366
367    async fn replace_table(
368        &self,
369        _source: Option<PbSource>,
370        mut table: PbTable,
371        _graph: StreamFragmentGraph,
372        _job_type: TableJobType,
373    ) -> Result<()> {
374        table.stream_job_status = PbStreamJobStatus::Created as _;
375        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
376        self.catalog.write().update_table(&table);
377        Ok(())
378    }
379
380    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
381        self.catalog.write().update_source(&source);
382        Ok(())
383    }
384
385    async fn create_source(
386        &self,
387        source: PbSource,
388        _graph: Option<StreamFragmentGraph>,
389        _if_not_exists: bool,
390    ) -> Result<()> {
391        self.create_source_inner(source).map(|_| ())
392    }
393
394    async fn create_sink(
395        &self,
396        sink: PbSink,
397        graph: StreamFragmentGraph,
398        dependencies: HashSet<ObjectId>,
399        _resource_type: streaming_job_resource_type::ResourceType,
400        _if_not_exists: bool,
401    ) -> Result<()> {
402        let sink_id = self.create_sink_inner(sink, graph)?;
403        self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
404        Ok(())
405    }
406
407    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
408        self.create_subscription_inner(subscription)
409    }
410
411    async fn create_index(
412        &self,
413        mut index: PbIndex,
414        mut index_table: PbTable,
415        _graph: StreamFragmentGraph,
416        _resource_type: streaming_job_resource_type::ResourceType,
417        _if_not_exists: bool,
418    ) -> Result<()> {
419        index_table.id = self.gen_id();
420        index_table.stream_job_status = PbStreamJobStatus::Created as _;
421        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
422        self.catalog.write().create_table(&index_table);
423        self.add_table_or_index_id(
424            index_table.id.as_raw_id(),
425            index_table.schema_id,
426            index_table.database_id,
427        );
428
429        index.id = index_table.id.as_raw_id().into();
430        index.index_table_id = index_table.id;
431        self.catalog.write().create_index(&index);
432        Ok(())
433    }
434
435    async fn create_function(&self, _function: PbFunction) -> Result<()> {
436        unreachable!()
437    }
438
439    async fn create_connection(
440        &self,
441        _connection_name: String,
442        _database_id: DatabaseId,
443        _schema_id: SchemaId,
444        _owner_id: UserId,
445        _connection: create_connection_request::Payload,
446    ) -> Result<()> {
447        unreachable!()
448    }
449
450    async fn create_secret(
451        &self,
452        _secret_name: String,
453        _database_id: DatabaseId,
454        _schema_id: SchemaId,
455        _owner_id: UserId,
456        _payload: Vec<u8>,
457    ) -> Result<()> {
458        unreachable!()
459    }
460
461    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
462        unreachable!()
463    }
464
465    async fn drop_table(
466        &self,
467        source_id: Option<SourceId>,
468        table_id: TableId,
469        cascade: bool,
470    ) -> Result<()> {
471        if cascade {
472            return Err(ErrorCode::NotSupported(
473                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
474                "use drop instead".to_owned(),
475            )
476            .into());
477        }
478        if let Some(source_id) = source_id {
479            self.drop_table_or_source_id(source_id.as_raw_id());
480        }
481        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
482        let indexes =
483            self.catalog
484                .read()
485                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
486        for index in indexes {
487            self.drop_index(index.id, cascade).await?;
488        }
489        self.catalog
490            .write()
491            .drop_table(database_id, schema_id, table_id);
492        if let Some(source_id) = source_id {
493            self.catalog
494                .write()
495                .drop_source(database_id, schema_id, source_id);
496        }
497        Ok(())
498    }
499
500    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
501        unreachable!()
502    }
503
504    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
505        if cascade {
506            return Err(ErrorCode::NotSupported(
507                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
508                "use drop instead".to_owned(),
509            )
510            .into());
511        }
512        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
513        let indexes =
514            self.catalog
515                .read()
516                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
517        for index in indexes {
518            self.drop_index(index.id, cascade).await?;
519        }
520        self.catalog
521            .write()
522            .drop_table(database_id, schema_id, table_id);
523        Ok(())
524    }
525
526    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
527        if cascade {
528            return Err(ErrorCode::NotSupported(
529                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
530                "use drop instead".to_owned(),
531            )
532            .into());
533        }
534        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
535        self.catalog
536            .write()
537            .drop_source(database_id, schema_id, source_id);
538        Ok(())
539    }
540
541    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
542        Ok(())
543    }
544
545    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
546        if cascade {
547            return Err(ErrorCode::NotSupported(
548                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
549                "use drop instead".to_owned(),
550            )
551            .into());
552        }
553        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
554        self.catalog
555            .write()
556            .drop_sink(database_id, schema_id, sink_id);
557        Ok(())
558    }
559
560    async fn drop_subscription(
561        &self,
562        subscription_id: SubscriptionId,
563        cascade: bool,
564    ) -> Result<()> {
565        if cascade {
566            return Err(ErrorCode::NotSupported(
567                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
568                "use drop instead".to_owned(),
569            )
570            .into());
571        }
572        let (database_id, schema_id) =
573            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
574        self.catalog
575            .write()
576            .drop_subscription(database_id, schema_id, subscription_id);
577        Ok(())
578    }
579
580    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
581        if cascade {
582            return Err(ErrorCode::NotSupported(
583                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
584                "use drop instead".to_owned(),
585            )
586            .into());
587        }
588        let &schema_id = self
589            .table_id_to_schema_id
590            .read()
591            .get(&index_id.as_raw_id())
592            .unwrap();
593        let database_id = self.get_database_id_by_schema(schema_id);
594
595        let index = {
596            let catalog_reader = self.catalog.read();
597            let schema_catalog = catalog_reader
598                .get_schema_by_id(database_id, schema_id)
599                .unwrap();
600            schema_catalog.get_index_by_id(index_id).unwrap().clone()
601        };
602
603        let index_table_id = index.index_table().id;
604        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
605        self.catalog
606            .write()
607            .drop_index(database_id, schema_id, index_id);
608        self.catalog
609            .write()
610            .drop_table(database_id, schema_id, index_table_id);
611        Ok(())
612    }
613
614    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
615        unreachable!()
616    }
617
618    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
619        unreachable!()
620    }
621
622    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
623        unreachable!()
624    }
625
626    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
627        self.catalog.write().drop_database(database_id);
628        Ok(())
629    }
630
631    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
632        let database_id = self.drop_schema_id(schema_id);
633        self.catalog.write().drop_schema(database_id, schema_id);
634        Ok(())
635    }
636
637    async fn alter_name(
638        &self,
639        object_id: alter_name_request::Object,
640        object_name: &str,
641    ) -> Result<()> {
642        match object_id {
643            alter_name_request::Object::TableId(table_id) => {
644                self.catalog
645                    .write()
646                    .alter_table_name_by_id(table_id, object_name);
647                Ok(())
648            }
649            _ => {
650                unimplemented!()
651            }
652        }
653    }
654
655    async fn alter_source(&self, source: PbSource) -> Result<()> {
656        self.catalog.write().update_source(&source);
657        Ok(())
658    }
659
660    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
661        for database in self.catalog.read().iter_databases() {
662            for schema in database.iter_schemas() {
663                match object {
664                    Object::TableId(table_id) => {
665                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
666                        {
667                            let mut pb_table = table.to_prost();
668                            pb_table.owner = owner_id;
669                            self.catalog.write().update_table(&pb_table);
670                            return Ok(());
671                        }
672                    }
673                    _ => unreachable!(),
674                }
675            }
676        }
677
678        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
679    }
680
681    async fn alter_subscription_retention(
682        &self,
683        subscription_id: SubscriptionId,
684        retention_seconds: u64,
685        definition: String,
686    ) -> Result<()> {
687        for database in self.catalog.read().iter_databases() {
688            for schema in database.iter_schemas() {
689                if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
690                    let mut pb_subscription = subscription.to_proto();
691                    pb_subscription.retention_seconds = retention_seconds;
692                    pb_subscription.definition = definition;
693                    self.catalog.write().update_subscription(&pb_subscription);
694                    return Ok(());
695                }
696            }
697        }
698
699        Err(
700            ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
701                .into(),
702        )
703    }
704
705    async fn alter_set_schema(
706        &self,
707        object: alter_set_schema_request::Object,
708        new_schema_id: SchemaId,
709    ) -> Result<()> {
710        match object {
711            alter_set_schema_request::Object::TableId(table_id) => {
712                let mut pb_table = {
713                    let reader = self.catalog.read();
714                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
715                    table.to_prost()
716                };
717                pb_table.schema_id = new_schema_id;
718                self.catalog.write().update_table(&pb_table);
719                self.table_id_to_schema_id
720                    .write()
721                    .insert(table_id.as_raw_id(), new_schema_id);
722                Ok(())
723            }
724            _ => unreachable!(),
725        }
726    }
727
728    async fn alter_parallelism(
729        &self,
730        _job_id: JobId,
731        _parallelism: PbTableParallelism,
732        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
733        _deferred: bool,
734    ) -> Result<()> {
735        todo!()
736    }
737
738    async fn alter_backfill_parallelism(
739        &self,
740        _job_id: JobId,
741        _parallelism: Option<PbTableParallelism>,
742        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
743        _deferred: bool,
744    ) -> Result<()> {
745        todo!()
746    }
747
748    async fn alter_config(
749        &self,
750        _job_id: JobId,
751        _entries_to_add: HashMap<String, String>,
752        _keys_to_remove: Vec<String>,
753    ) -> Result<()> {
754        todo!()
755    }
756
757    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
758        todo!()
759    }
760
761    async fn alter_secret(
762        &self,
763        _secret_id: SecretId,
764        _secret_name: String,
765        _database_id: DatabaseId,
766        _schema_id: SchemaId,
767        _owner_id: UserId,
768        _payload: Vec<u8>,
769    ) -> Result<()> {
770        unreachable!()
771    }
772
773    async fn alter_resource_group(
774        &self,
775        _job_id: JobId,
776        _resource_group: Option<String>,
777        _deferred: bool,
778    ) -> Result<()> {
779        todo!()
780    }
781
782    async fn alter_database_resource_group(
783        &self,
784        database_id: DatabaseId,
785        resource_group: Option<String>,
786        _deferred: bool,
787    ) -> Result<()> {
788        let mut pb_database = {
789            let reader = self.catalog.read();
790            let database = reader.get_database_by_id(database_id)?.to_owned();
791            database.to_prost()
792        };
793        pb_database.resource_group =
794            resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned());
795        self.catalog.write().update_database(&pb_database);
796        Ok(())
797    }
798
799    async fn alter_database_param(
800        &self,
801        database_id: DatabaseId,
802        param: AlterDatabaseParam,
803    ) -> Result<()> {
804        let mut pb_database = {
805            let reader = self.catalog.read();
806            let database = reader.get_database_by_id(database_id)?.to_owned();
807            database.to_prost()
808        };
809        match param {
810            AlterDatabaseParam::BarrierIntervalMs(interval) => {
811                pb_database.barrier_interval_ms = interval;
812            }
813            AlterDatabaseParam::CheckpointFrequency(frequency) => {
814                pb_database.checkpoint_frequency = frequency;
815            }
816        }
817        self.catalog.write().update_database(&pb_database);
818        Ok(())
819    }
820
821    async fn create_iceberg_table(
822        &self,
823        _table_job_info: PbTableJobInfo,
824        _sink_job_info: PbSinkJobInfo,
825        _iceberg_source: PbSource,
826        _if_not_exists: bool,
827    ) -> Result<()> {
828        todo!()
829    }
830
831    async fn wait(&self, _job_id: Option<JobId>) -> Result<()> {
832        Ok(())
833    }
834}
835
836impl MockCatalogWriter {
837    pub fn new(
838        catalog: Arc<RwLock<Catalog>>,
839        hummock_snapshot_manager: HummockSnapshotManagerRef,
840    ) -> Self {
841        catalog.write().create_database(&PbDatabase {
842            id: 0.into(),
843            name: DEFAULT_DATABASE_NAME.to_owned(),
844            owner: DEFAULT_SUPER_USER_ID,
845            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
846            barrier_interval_ms: None,
847            checkpoint_frequency: None,
848        });
849        catalog.write().create_schema(&PbSchema {
850            id: 1.into(),
851            name: DEFAULT_SCHEMA_NAME.to_owned(),
852            database_id: 0.into(),
853            owner: DEFAULT_SUPER_USER_ID,
854        });
855        catalog.write().create_schema(&PbSchema {
856            id: 2.into(),
857            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
858            database_id: 0.into(),
859            owner: DEFAULT_SUPER_USER_ID,
860        });
861        catalog.write().create_schema(&PbSchema {
862            id: 3.into(),
863            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
864            database_id: 0.into(),
865            owner: DEFAULT_SUPER_USER_ID,
866        });
867        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
868        map.insert(1_u32.into(), 0_u32.into());
869        map.insert(2_u32.into(), 0_u32.into());
870        map.insert(3_u32.into(), 0_u32.into());
871        Self {
872            catalog,
873            id: AtomicU32::new(3),
874            table_id_to_schema_id: Default::default(),
875            schema_id_to_database_id: RwLock::new(map),
876            hummock_snapshot_manager,
877        }
878    }
879
880    fn gen_id<T: From<u32>>(&self) -> T {
881        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
882        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
883    }
884
885    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
886        self.table_id_to_schema_id
887            .write()
888            .insert(table_id, schema_id);
889    }
890
891    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
892        let schema_id = self
893            .table_id_to_schema_id
894            .write()
895            .remove(&table_id)
896            .unwrap();
897        (self.get_database_id_by_schema(schema_id), schema_id)
898    }
899
900    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
901        self.table_id_to_schema_id
902            .write()
903            .insert(table_id, schema_id);
904    }
905
906    fn add_table_or_subscription_id(
907        &self,
908        table_id: u32,
909        schema_id: SchemaId,
910        _database_id: DatabaseId,
911    ) {
912        self.table_id_to_schema_id
913            .write()
914            .insert(table_id, schema_id);
915    }
916
917    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
918        self.table_id_to_schema_id
919            .write()
920            .insert(table_id, schema_id);
921    }
922
923    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
924        let schema_id = self
925            .table_id_to_schema_id
926            .write()
927            .remove(&table_id)
928            .unwrap();
929        (self.get_database_id_by_schema(schema_id), schema_id)
930    }
931
932    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
933        let schema_id = self
934            .table_id_to_schema_id
935            .write()
936            .remove(&table_id)
937            .unwrap();
938        (self.get_database_id_by_schema(schema_id), schema_id)
939    }
940
941    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
942        let schema_id = self
943            .table_id_to_schema_id
944            .write()
945            .remove(&table_id)
946            .unwrap();
947        (self.get_database_id_by_schema(schema_id), schema_id)
948    }
949
950    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
951        self.schema_id_to_database_id
952            .write()
953            .insert(schema_id, database_id);
954    }
955
956    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
957        self.schema_id_to_database_id
958            .write()
959            .remove(&schema_id)
960            .unwrap()
961    }
962
963    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
964        source.id = self.gen_id();
965        self.catalog.write().create_source(&source);
966        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
967        Ok(source.id)
968    }
969
970    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
971        sink.id = self.gen_id();
972        sink.stream_job_status = PbStreamJobStatus::Created as _;
973        self.catalog.write().create_sink(&sink);
974        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
975        Ok(sink.id)
976    }
977
978    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
979        subscription.id = self.gen_id();
980        self.catalog.write().create_subscription(&subscription);
981        self.add_table_or_subscription_id(
982            subscription.id.as_raw_id(),
983            subscription.schema_id,
984            subscription.database_id,
985        );
986        Ok(())
987    }
988
989    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
990        *self
991            .schema_id_to_database_id
992            .read()
993            .get(&schema_id)
994            .unwrap()
995    }
996
997    fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
998        let catalog = self.catalog.read();
999        for database in catalog.iter_databases() {
1000            for schema in database.iter_schemas() {
1001                if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
1002                    return if table.is_mview() {
1003                        PbObjectType::Mview
1004                    } else {
1005                        PbObjectType::Table
1006                    };
1007                }
1008                if schema.get_source_by_id(object_id.as_source_id()).is_some() {
1009                    return PbObjectType::Source;
1010                }
1011                if schema.get_view_by_id(object_id.as_view_id()).is_some() {
1012                    return PbObjectType::View;
1013                }
1014                if schema.get_index_by_id(object_id.as_index_id()).is_some() {
1015                    return PbObjectType::Index;
1016                }
1017            }
1018        }
1019        PbObjectType::Unspecified
1020    }
1021
1022    fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
1023        if dependencies.is_empty() {
1024            return;
1025        }
1026        let dependencies = dependencies
1027            .into_iter()
1028            .map(|referenced_object_id| PbObjectDependency {
1029                object_id,
1030                referenced_object_id,
1031                referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1032            })
1033            .collect();
1034        self.catalog
1035            .write()
1036            .insert_object_dependencies(dependencies);
1037    }
1038}
1039
1040pub struct MockUserInfoWriter {
1041    id: AtomicU32,
1042    user_info: Arc<RwLock<UserInfoManager>>,
1043}
1044
1045#[async_trait::async_trait]
1046impl UserInfoWriter for MockUserInfoWriter {
1047    async fn create_user(&self, user: UserInfo) -> Result<()> {
1048        let mut user = user;
1049        user.id = self.gen_id().into();
1050        self.user_info.write().create_user(user);
1051        Ok(())
1052    }
1053
1054    async fn drop_user(&self, id: UserId) -> Result<()> {
1055        self.user_info.write().drop_user(id);
1056        Ok(())
1057    }
1058
1059    async fn update_user(
1060        &self,
1061        update_user: UserInfo,
1062        update_fields: Vec<UpdateField>,
1063    ) -> Result<()> {
1064        let mut lock = self.user_info.write();
1065        let id = update_user.get_id();
1066        let Some(old_name) = lock.get_user_name_by_id(id) else {
1067            return Ok(());
1068        };
1069        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1070        update_fields.into_iter().for_each(|field| match field {
1071            UpdateField::Super => user_info.is_super = update_user.is_super,
1072            UpdateField::Login => user_info.can_login = update_user.can_login,
1073            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1074            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1075            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1076            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1077            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1078            UpdateField::Unspecified => unreachable!(),
1079        });
1080        lock.update_user(update_user);
1081        Ok(())
1082    }
1083
1084    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
1085    /// `GrantAllSources` when grant privilege to user.
1086    async fn grant_privilege(
1087        &self,
1088        users: Vec<UserId>,
1089        privileges: Vec<GrantPrivilege>,
1090        with_grant_option: bool,
1091        _grantor: UserId,
1092    ) -> Result<()> {
1093        let privileges = privileges
1094            .into_iter()
1095            .map(|mut p| {
1096                p.action_with_opts
1097                    .iter_mut()
1098                    .for_each(|ao| ao.with_grant_option = with_grant_option);
1099                p
1100            })
1101            .collect::<Vec<_>>();
1102        for user_id in users {
1103            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1104                u.extend_privileges(privileges.clone());
1105            }
1106        }
1107        Ok(())
1108    }
1109
1110    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1111    /// `RevokeAllSources` when revoke privilege from user.
1112    async fn revoke_privilege(
1113        &self,
1114        users: Vec<UserId>,
1115        privileges: Vec<GrantPrivilege>,
1116        _granted_by: UserId,
1117        _revoke_by: UserId,
1118        revoke_grant_option: bool,
1119        _cascade: bool,
1120    ) -> Result<()> {
1121        for user_id in users {
1122            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1123                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1124            }
1125        }
1126        Ok(())
1127    }
1128
1129    async fn alter_default_privilege(
1130        &self,
1131        _users: Vec<UserId>,
1132        _database_id: DatabaseId,
1133        _schemas: Vec<SchemaId>,
1134        _operation: AlterDefaultPrivilegeOperation,
1135        _operated_by: UserId,
1136    ) -> Result<()> {
1137        todo!()
1138    }
1139}
1140
1141impl MockUserInfoWriter {
1142    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1143        user_info.write().create_user(UserInfo {
1144            id: DEFAULT_SUPER_USER_ID,
1145            name: DEFAULT_SUPER_USER.to_owned(),
1146            is_super: true,
1147            can_create_db: true,
1148            can_create_user: true,
1149            can_login: true,
1150            ..Default::default()
1151        });
1152        user_info.write().create_user(UserInfo {
1153            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1154            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1155            is_super: true,
1156            can_create_db: true,
1157            can_create_user: true,
1158            can_login: true,
1159            is_admin: true,
1160            ..Default::default()
1161        });
1162        Self {
1163            user_info,
1164            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1165        }
1166    }
1167
1168    fn gen_id(&self) -> u32 {
1169        self.id.fetch_add(1, Ordering::SeqCst)
1170    }
1171}
1172
1173pub struct MockFrontendMetaClient {}
1174
1175#[async_trait::async_trait]
1176impl FrontendMetaClient for MockFrontendMetaClient {
1177    async fn try_unregister(&self) {}
1178
1179    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1180        Ok(INVALID_VERSION_ID)
1181    }
1182
1183    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1184        Ok(vec![])
1185    }
1186
1187    async fn list_table_fragments(
1188        &self,
1189        _table_ids: &[JobId],
1190    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1191        Ok(HashMap::default())
1192    }
1193
1194    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1195        Ok(vec![])
1196    }
1197
1198    async fn list_fragment_distribution(
1199        &self,
1200        _include_node: bool,
1201    ) -> RpcResult<Vec<FragmentDistribution>> {
1202        Ok(vec![])
1203    }
1204
1205    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1206        Ok(vec![])
1207    }
1208
1209    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1210        Ok(vec![])
1211    }
1212
1213    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1214        Ok(vec![])
1215    }
1216
1217    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1218        Ok(vec![])
1219    }
1220
1221    async fn list_sink_log_store_tables(
1222        &self,
1223    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1224        Ok(vec![])
1225    }
1226
1227    async fn set_system_param(
1228        &self,
1229        _param: String,
1230        _value: Option<String>,
1231    ) -> RpcResult<Option<SystemParamsReader>> {
1232        Ok(Some(SystemParams::default().into()))
1233    }
1234
1235    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1236        Ok(Default::default())
1237    }
1238
1239    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1240        Ok("".to_owned())
1241    }
1242
1243    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1244        Ok(vec![])
1245    }
1246
1247    async fn get_tables(
1248        &self,
1249        _table_ids: Vec<crate::catalog::TableId>,
1250        _include_dropped_tables: bool,
1251    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1252        Ok(HashMap::new())
1253    }
1254
1255    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1256        unimplemented!()
1257    }
1258
1259    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1260        unimplemented!()
1261    }
1262
1263    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1264        Ok(HummockVersion::default())
1265    }
1266
1267    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1268        unimplemented!()
1269    }
1270
1271    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1272        unimplemented!()
1273    }
1274
1275    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1276        unimplemented!()
1277    }
1278
1279    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1280        unimplemented!()
1281    }
1282
1283    async fn list_hummock_active_write_limits(
1284        &self,
1285    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1286        unimplemented!()
1287    }
1288
1289    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1290        unimplemented!()
1291    }
1292
1293    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1294        Ok(vec![])
1295    }
1296
1297    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1298        unimplemented!()
1299    }
1300
1301    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1302        Ok(vec![])
1303    }
1304
1305    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1306        unimplemented!()
1307    }
1308
1309    async fn recover(&self) -> RpcResult<()> {
1310        unimplemented!()
1311    }
1312
1313    async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1314        unimplemented!()
1315    }
1316
1317    async fn get_backup_job_status(
1318        &self,
1319        _job_id: u64,
1320    ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1321        unimplemented!()
1322    }
1323
1324    async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1325        unimplemented!()
1326    }
1327
1328    async fn apply_throttle(
1329        &self,
1330        _throttle_target: PbThrottleTarget,
1331        _throttle_type: risingwave_pb::common::PbThrottleType,
1332        _id: u32,
1333        _rate_limit: Option<u32>,
1334    ) -> RpcResult<()> {
1335        unimplemented!()
1336    }
1337
1338    async fn alter_fragment_parallelism(
1339        &self,
1340        _fragment_ids: Vec<FragmentId>,
1341        _parallelism: Option<PbTableParallelism>,
1342    ) -> RpcResult<()> {
1343        unimplemented!()
1344    }
1345
1346    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1347        Ok(RecoveryStatus::StatusRunning)
1348    }
1349
1350    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1351        Ok(vec![])
1352    }
1353
1354    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1355        Ok(vec![])
1356    }
1357
1358    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1359        Ok(HashMap::default())
1360    }
1361
1362    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1363        unimplemented!()
1364    }
1365
1366    async fn alter_sink_props(
1367        &self,
1368        _sink_id: SinkId,
1369        _changed_props: BTreeMap<String, String>,
1370        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1371        _connector_conn_ref: Option<ConnectionId>,
1372    ) -> RpcResult<()> {
1373        unimplemented!()
1374    }
1375
1376    async fn alter_iceberg_table_props(
1377        &self,
1378        _table_id: TableId,
1379        _sink_id: SinkId,
1380        _source_id: SourceId,
1381        _changed_props: BTreeMap<String, String>,
1382        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1383        _connector_conn_ref: Option<ConnectionId>,
1384    ) -> RpcResult<()> {
1385        unimplemented!()
1386    }
1387
1388    async fn alter_source_connector_props(
1389        &self,
1390        _source_id: SourceId,
1391        _changed_props: BTreeMap<String, String>,
1392        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1393        _connector_conn_ref: Option<ConnectionId>,
1394    ) -> RpcResult<()> {
1395        unimplemented!()
1396    }
1397
1398    async fn alter_connection_connector_props(
1399        &self,
1400        _connection_id: u32,
1401        _changed_props: BTreeMap<String, String>,
1402        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1403    ) -> RpcResult<()> {
1404        Ok(())
1405    }
1406
1407    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1408        unimplemented!()
1409    }
1410
1411    async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1412        Ok(vec![])
1413    }
1414
1415    async fn get_fragment_by_id(
1416        &self,
1417        _fragment_id: FragmentId,
1418    ) -> RpcResult<Option<FragmentDistribution>> {
1419        unimplemented!()
1420    }
1421
1422    async fn get_fragment_vnodes(
1423        &self,
1424        _fragment_id: FragmentId,
1425    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1426        unimplemented!()
1427    }
1428
1429    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1430        unimplemented!()
1431    }
1432
1433    fn worker_id(&self) -> WorkerId {
1434        0.into()
1435    }
1436
1437    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1438        Ok(())
1439    }
1440
1441    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1442        Ok(1)
1443    }
1444
1445    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1446        Ok(())
1447    }
1448
1449    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1450        Ok(RefreshResponse { status: None })
1451    }
1452
1453    fn cluster_id(&self) -> &str {
1454        "test-cluster-uuid"
1455    }
1456
1457    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1458        unimplemented!()
1459    }
1460
1461    async fn get_hummock_table_change_log(
1462        &self,
1463        _start_epoch_inclusive: Option<u64>,
1464        _end_epoch_inclusive: Option<u64>,
1465        _table_ids: Option<HashSet<TableId>>,
1466        _exclude_empty: bool,
1467        _limit: Option<u32>,
1468    ) -> RpcResult<TableChangeLogs> {
1469        Ok(HashMap::default())
1470    }
1471
1472    async fn update_compaction_config(
1473        &self,
1474        _compaction_group_ids: Vec<risingwave_hummock_sdk::CompactionGroupId>,
1475        _configs: Vec<PbMutableConfig>,
1476    ) -> RpcResult<()> {
1477        Ok(())
1478    }
1479}
1480
1481#[cfg(test)]
1482pub static PROTO_FILE_DATA: &str = r#"
1483    syntax = "proto3";
1484    package test;
1485    message TestRecord {
1486      int32 id = 1;
1487      Country country = 3;
1488      int64 zipcode = 4;
1489      float rate = 5;
1490    }
1491    message TestRecordAlterType {
1492        string id = 1;
1493        Country country = 3;
1494        int32 zipcode = 4;
1495        float rate = 5;
1496      }
1497    message TestRecordExt {
1498      int32 id = 1;
1499      Country country = 3;
1500      int64 zipcode = 4;
1501      float rate = 5;
1502      string name = 6;
1503    }
1504    message Country {
1505      string address = 1;
1506      City city = 2;
1507      string zipcode = 3;
1508    }
1509    message City {
1510      string address = 1;
1511      string zipcode = 2;
1512    }"#;
1513
1514/// Returns the file.
1515/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1516pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1517    let in_file = Builder::new()
1518        .prefix("temp")
1519        .suffix(".proto")
1520        .rand_bytes(8)
1521        .tempfile()
1522        .unwrap();
1523
1524    let out_file = Builder::new()
1525        .prefix("temp")
1526        .suffix(".pb")
1527        .rand_bytes(8)
1528        .tempfile()
1529        .unwrap();
1530
1531    let mut file = in_file.as_file();
1532    file.write_all(proto_data.as_ref())
1533        .expect("writing binary to test file");
1534    file.flush().expect("flush temp file failed");
1535    let include_path = in_file
1536        .path()
1537        .parent()
1538        .unwrap()
1539        .to_string_lossy()
1540        .into_owned();
1541    let out_path = out_file.path().to_string_lossy().into_owned();
1542    let in_path = in_file.path().to_string_lossy().into_owned();
1543    let mut compile = std::process::Command::new("protoc");
1544
1545    let out = compile
1546        .arg("--include_imports")
1547        .arg("-I")
1548        .arg(include_path)
1549        .arg(format!("--descriptor_set_out={}", out_path))
1550        .arg(in_path)
1551        .output()
1552        .expect("failed to compile proto");
1553    if !out.status.success() {
1554        panic!("compile proto failed \n output: {:?}", out);
1555    }
1556    out_file
1557}