Skip to main content

risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::config::FrontendConfig;
34use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
35use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
36use risingwave_common::session_config::SessionConfig;
37use risingwave_common::system_param::AdaptiveParallelismStrategy;
38use risingwave_common::system_param::reader::SystemParamsReader;
39use risingwave_common::util::cluster_limit::ClusterLimit;
40use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
41use risingwave_hummock_sdk::change_log::TableChangeLogs;
42use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
43use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
44use risingwave_pb::backup_service::MetaSnapshotMetadata;
45use risingwave_pb::catalog::{
46    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
47    PbSubscription, PbTable, PbView, Table,
48};
49use risingwave_pb::common::{PbObjectType, WorkerNode};
50use risingwave_pb::ddl_service::alter_owner_request::Object;
51use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
52use risingwave_pb::ddl_service::{
53    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
54    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
55};
56use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig as PbMutableConfig;
57use risingwave_pb::hummock::write_limits::WriteLimit;
58use risingwave_pb::hummock::{
59    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
60};
61use risingwave_pb::id::ActorId;
62use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
63use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
64use risingwave_pb::meta::list_actor_states_response::ActorState;
65use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
66use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
67use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
68use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
69use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
70use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
71use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
72use risingwave_pb::meta::{
73    EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
74    PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
75    list_sink_log_store_tables_response,
76};
77use risingwave_pb::secret::PbSecretRef;
78use risingwave_pb::stream_plan::StreamFragmentGraph;
79use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
80use risingwave_pb::user::update_user_request::UpdateField;
81use risingwave_pb::user::{GrantPrivilege, UserInfo};
82use risingwave_rpc_client::error::Result as RpcResult;
83use tempfile::{Builder, NamedTempFile};
84
85use crate::FrontendOpts;
86use crate::catalog::catalog_service::CatalogWriter;
87use crate::catalog::root_catalog::Catalog;
88use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
89use crate::error::{ErrorCode, Result, RwError};
90use crate::handler::RwPgResponse;
91use crate::meta_client::FrontendMetaClient;
92use crate::scheduler::HummockSnapshotManagerRef;
93use crate::session::{AuthContext, FrontendEnv, SessionImpl};
94use crate::user::UserId;
95use crate::user::user_manager::UserInfoManager;
96use crate::user::user_service::UserInfoWriter;
97
98/// An embedded frontend without starting meta and without starting frontend as a tcp server.
99pub struct LocalFrontend {
100    pub opts: FrontendOpts,
101    env: FrontendEnv,
102}
103
104impl SessionManager for LocalFrontend {
105    type Error = RwError;
106    type Session = SessionImpl;
107
108    fn create_dummy_session(
109        &self,
110        _database_id: DatabaseId,
111    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
112        unreachable!()
113    }
114
115    fn connect(
116        &self,
117        _database: &str,
118        _user_name: &str,
119        _peer_addr: AddressRef,
120    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
121        Ok(self.session_ref())
122    }
123
124    fn cancel_queries_in_session(&self, _session_id: SessionId) {
125        unreachable!()
126    }
127
128    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
129        unreachable!()
130    }
131
132    fn end_session(&self, _session: &Self::Session) {
133        unreachable!()
134    }
135}
136
137impl LocalFrontend {
138    #[expect(clippy::unused_async)]
139    pub async fn new(opts: FrontendOpts) -> Self {
140        let env = FrontendEnv::mock();
141        Self { opts, env }
142    }
143
144    #[expect(clippy::unused_async)]
145    pub async fn with_frontend_config(opts: FrontendOpts, frontend_config: FrontendConfig) -> Self {
146        let mut env = FrontendEnv::mock();
147        env.set_frontend_config_for_test(frontend_config);
148        Self { opts, env }
149    }
150
151    pub async fn run_sql(
152        &self,
153        sql: impl Into<String>,
154    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
155        let sql: Arc<str> = Arc::from(sql.into());
156        Box::pin(self.session_ref().run_statement(sql, vec![])).await
157    }
158
159    pub async fn run_sql_with_session(
160        &self,
161        session_ref: Arc<SessionImpl>,
162        sql: impl Into<String>,
163    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
164        let sql: Arc<str> = Arc::from(sql.into());
165        Box::pin(session_ref.run_statement(sql, vec![])).await
166    }
167
168    pub async fn run_user_sql(
169        &self,
170        sql: impl Into<String>,
171        database: String,
172        user_name: String,
173        user_id: UserId,
174    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
175        let sql: Arc<str> = Arc::from(sql.into());
176        Box::pin(
177            self.session_user_ref(database, user_name, user_id)
178                .run_statement(sql, vec![]),
179        )
180        .await
181    }
182
183    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
184        let mut rsp = self.run_sql(sql).await.unwrap();
185        let mut res = vec![];
186        #[for_await]
187        for row_set in rsp.values_stream() {
188            for row in row_set.unwrap() {
189                res.push(format!("{:?}", row));
190            }
191        }
192        res
193    }
194
195    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
196        let mut rsp = self.run_sql(sql).await.unwrap();
197        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
198        let mut res = String::new();
199        #[for_await]
200        for row_set in rsp.values_stream() {
201            for row in row_set.unwrap() {
202                let row: Row = row;
203                let row = row.values()[0].as_ref().unwrap();
204                res += std::str::from_utf8(row).unwrap();
205                res += "\n";
206            }
207        }
208        res
209    }
210
211    /// Creates a new session
212    pub fn session_ref(&self) -> Arc<SessionImpl> {
213        self.session_user_ref(
214            DEFAULT_DATABASE_NAME.to_owned(),
215            DEFAULT_SUPER_USER.to_owned(),
216            DEFAULT_SUPER_USER_ID,
217        )
218    }
219
220    pub fn session_user_ref(
221        &self,
222        database: String,
223        user_name: String,
224        user_id: UserId,
225    ) -> Arc<SessionImpl> {
226        Arc::new(SessionImpl::new(
227            self.env.clone(),
228            AuthContext::new(database, user_name, user_id),
229            UserAuthenticator::None,
230            // Local Frontend use a non-sense id.
231            (0, 0),
232            Address::Tcp(SocketAddr::new(
233                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
234                6666,
235            ))
236            .into(),
237            Default::default(),
238        ))
239    }
240}
241
242pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
243    if rsp.stmt_type() != StatementType::EXPLAIN {
244        panic!("RESPONSE INVALID: {rsp:?}");
245    }
246    let mut res = String::new();
247    #[for_await]
248    for row_set in rsp.values_stream() {
249        for row in row_set.unwrap() {
250            let row: Row = row;
251            let row = row.values()[0].as_ref().unwrap();
252            res += std::str::from_utf8(row).unwrap();
253            res += "\n";
254        }
255    }
256    res
257}
258
259pub struct MockCatalogWriter {
260    catalog: Arc<RwLock<Catalog>>,
261    id: AtomicU32,
262    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
263    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
264    hummock_snapshot_manager: HummockSnapshotManagerRef,
265}
266
267#[async_trait::async_trait]
268impl CatalogWriter for MockCatalogWriter {
269    async fn create_database(
270        &self,
271        db_name: &str,
272        owner: UserId,
273        resource_group: &str,
274        barrier_interval_ms: Option<u32>,
275        checkpoint_frequency: Option<u64>,
276    ) -> Result<()> {
277        let database_id = DatabaseId::new(self.gen_id());
278        self.catalog.write().create_database(&PbDatabase {
279            name: db_name.to_owned(),
280            id: database_id,
281            owner,
282            resource_group: resource_group.to_owned(),
283            barrier_interval_ms,
284            checkpoint_frequency,
285        });
286        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
287            .await?;
288        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
289            .await?;
290        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
291            .await?;
292        Ok(())
293    }
294
295    async fn create_schema(
296        &self,
297        db_id: DatabaseId,
298        schema_name: &str,
299        owner: UserId,
300    ) -> Result<()> {
301        let id = self.gen_id();
302        self.catalog.write().create_schema(&PbSchema {
303            id,
304            name: schema_name.to_owned(),
305            database_id: db_id,
306            owner,
307        });
308        self.add_schema_id(id, db_id);
309        Ok(())
310    }
311
312    async fn create_materialized_view(
313        &self,
314        mut table: PbTable,
315        _graph: StreamFragmentGraph,
316        dependencies: HashSet<ObjectId>,
317        _resource_type: streaming_job_resource_type::ResourceType,
318        _if_not_exists: bool,
319        _refresh_interval_sec: Option<u64>,
320    ) -> Result<()> {
321        table.id = self.gen_id();
322        table.stream_job_status = PbStreamJobStatus::Created as _;
323        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
324        self.catalog.write().create_table(&table);
325        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
326        self.insert_object_dependencies(table.id.as_object_id(), dependencies);
327        self.hummock_snapshot_manager.add_table_for_test(table.id);
328        Ok(())
329    }
330
331    async fn replace_materialized_view(
332        &self,
333        mut table: PbTable,
334        _graph: StreamFragmentGraph,
335    ) -> Result<()> {
336        table.stream_job_status = PbStreamJobStatus::Created as _;
337        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
338        self.catalog.write().update_table(&table);
339        Ok(())
340    }
341
342    async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
343        view.id = self.gen_id();
344        self.catalog.write().create_view(&view);
345        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
346        self.insert_object_dependencies(view.id.as_object_id(), dependencies);
347        Ok(())
348    }
349
350    async fn create_table(
351        &self,
352        source: Option<PbSource>,
353        mut table: PbTable,
354        graph: StreamFragmentGraph,
355        _job_type: PbTableJobType,
356        if_not_exists: bool,
357        dependencies: HashSet<ObjectId>,
358    ) -> Result<()> {
359        if let Some(source) = source {
360            let source_id = self.create_source_inner(source)?;
361            table.optional_associated_source_id = Some(source_id.into());
362        }
363        self.create_materialized_view(
364            table,
365            graph,
366            dependencies,
367            streaming_job_resource_type::ResourceType::Regular(true),
368            if_not_exists,
369            None,
370        )
371        .await?;
372        Ok(())
373    }
374
375    async fn replace_table(
376        &self,
377        _source: Option<PbSource>,
378        mut table: PbTable,
379        _graph: StreamFragmentGraph,
380        _job_type: TableJobType,
381    ) -> Result<()> {
382        table.stream_job_status = PbStreamJobStatus::Created as _;
383        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
384        self.catalog.write().update_table(&table);
385        Ok(())
386    }
387
388    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
389        self.catalog.write().update_source(&source);
390        Ok(())
391    }
392
393    async fn create_source(
394        &self,
395        source: PbSource,
396        _graph: Option<StreamFragmentGraph>,
397        _if_not_exists: bool,
398    ) -> Result<()> {
399        self.create_source_inner(source).map(|_| ())
400    }
401
402    async fn create_sink(
403        &self,
404        sink: PbSink,
405        graph: StreamFragmentGraph,
406        dependencies: HashSet<ObjectId>,
407        _resource_type: streaming_job_resource_type::ResourceType,
408        _if_not_exists: bool,
409    ) -> Result<()> {
410        let sink_id = self.create_sink_inner(sink, graph)?;
411        self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
412        Ok(())
413    }
414
415    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
416        self.create_subscription_inner(subscription)
417    }
418
419    async fn create_index(
420        &self,
421        mut index: PbIndex,
422        mut index_table: PbTable,
423        _graph: StreamFragmentGraph,
424        _resource_type: streaming_job_resource_type::ResourceType,
425        _if_not_exists: bool,
426    ) -> Result<()> {
427        index_table.id = self.gen_id();
428        index_table.stream_job_status = PbStreamJobStatus::Created as _;
429        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
430        self.catalog.write().create_table(&index_table);
431        self.add_table_or_index_id(
432            index_table.id.as_raw_id(),
433            index_table.schema_id,
434            index_table.database_id,
435        );
436
437        index.id = index_table.id.as_raw_id().into();
438        index.index_table_id = index_table.id;
439        self.catalog.write().create_index(&index);
440        Ok(())
441    }
442
443    async fn create_function(&self, _function: PbFunction) -> Result<()> {
444        unreachable!()
445    }
446
447    async fn create_connection(
448        &self,
449        _connection_name: String,
450        _database_id: DatabaseId,
451        _schema_id: SchemaId,
452        _owner_id: UserId,
453        _connection: create_connection_request::Payload,
454    ) -> Result<()> {
455        unreachable!()
456    }
457
458    async fn create_secret(
459        &self,
460        _secret_name: String,
461        _database_id: DatabaseId,
462        _schema_id: SchemaId,
463        _owner_id: UserId,
464        _payload: Vec<u8>,
465    ) -> Result<()> {
466        unreachable!()
467    }
468
469    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
470        unreachable!()
471    }
472
473    async fn drop_table(
474        &self,
475        source_id: Option<SourceId>,
476        table_id: TableId,
477        cascade: bool,
478    ) -> Result<()> {
479        if cascade {
480            return Err(ErrorCode::NotSupported(
481                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
482                "use drop instead".to_owned(),
483            )
484            .into());
485        }
486        if let Some(source_id) = source_id {
487            self.drop_table_or_source_id(source_id.as_raw_id());
488        }
489        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
490        let indexes =
491            self.catalog
492                .read()
493                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
494        for index in indexes {
495            self.drop_index(index.id, cascade).await?;
496        }
497        self.catalog
498            .write()
499            .drop_table(database_id, schema_id, table_id);
500        if let Some(source_id) = source_id {
501            self.catalog
502                .write()
503                .drop_source(database_id, schema_id, source_id);
504        }
505        Ok(())
506    }
507
508    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
509        unreachable!()
510    }
511
512    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
513        if cascade {
514            return Err(ErrorCode::NotSupported(
515                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
516                "use drop instead".to_owned(),
517            )
518            .into());
519        }
520        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
521        let indexes =
522            self.catalog
523                .read()
524                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
525        for index in indexes {
526            self.drop_index(index.id, cascade).await?;
527        }
528        self.catalog
529            .write()
530            .drop_table(database_id, schema_id, table_id);
531        Ok(())
532    }
533
534    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
535        if cascade {
536            return Err(ErrorCode::NotSupported(
537                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
538                "use drop instead".to_owned(),
539            )
540            .into());
541        }
542        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
543        self.catalog
544            .write()
545            .drop_source(database_id, schema_id, source_id);
546        Ok(())
547    }
548
549    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
550        Ok(())
551    }
552
553    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
554        if cascade {
555            return Err(ErrorCode::NotSupported(
556                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
557                "use drop instead".to_owned(),
558            )
559            .into());
560        }
561        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
562        self.catalog
563            .write()
564            .drop_sink(database_id, schema_id, sink_id);
565        Ok(())
566    }
567
568    async fn drop_subscription(
569        &self,
570        subscription_id: SubscriptionId,
571        cascade: bool,
572    ) -> Result<()> {
573        if cascade {
574            return Err(ErrorCode::NotSupported(
575                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
576                "use drop instead".to_owned(),
577            )
578            .into());
579        }
580        let (database_id, schema_id) =
581            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
582        self.catalog
583            .write()
584            .drop_subscription(database_id, schema_id, subscription_id);
585        Ok(())
586    }
587
588    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
589        if cascade {
590            return Err(ErrorCode::NotSupported(
591                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
592                "use drop instead".to_owned(),
593            )
594            .into());
595        }
596        let &schema_id = self
597            .table_id_to_schema_id
598            .read()
599            .get(&index_id.as_raw_id())
600            .unwrap();
601        let database_id = self.get_database_id_by_schema(schema_id);
602
603        let index = {
604            let catalog_reader = self.catalog.read();
605            let schema_catalog = catalog_reader
606                .get_schema_by_id(database_id, schema_id)
607                .unwrap();
608            schema_catalog.get_index_by_id(index_id).unwrap().clone()
609        };
610
611        let index_table_id = index.index_table().id;
612        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
613        self.catalog
614            .write()
615            .drop_index(database_id, schema_id, index_id);
616        self.catalog
617            .write()
618            .drop_table(database_id, schema_id, index_table_id);
619        Ok(())
620    }
621
622    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
623        unreachable!()
624    }
625
626    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
627        unreachable!()
628    }
629
630    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
631        unreachable!()
632    }
633
634    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
635        self.catalog.write().drop_database(database_id);
636        Ok(())
637    }
638
639    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
640        let database_id = self.drop_schema_id(schema_id);
641        self.catalog.write().drop_schema(database_id, schema_id);
642        Ok(())
643    }
644
645    async fn alter_name(
646        &self,
647        object_id: alter_name_request::Object,
648        object_name: &str,
649    ) -> Result<()> {
650        match object_id {
651            alter_name_request::Object::TableId(table_id) => {
652                self.catalog
653                    .write()
654                    .alter_table_name_by_id(table_id, object_name);
655                Ok(())
656            }
657            _ => {
658                unimplemented!()
659            }
660        }
661    }
662
663    async fn alter_source(&self, source: PbSource) -> Result<()> {
664        self.catalog.write().update_source(&source);
665        Ok(())
666    }
667
668    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
669        for database in self.catalog.read().iter_databases() {
670            for schema in database.iter_schemas() {
671                match object {
672                    Object::TableId(table_id) => {
673                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
674                        {
675                            let mut pb_table = table.to_prost();
676                            pb_table.owner = owner_id;
677                            self.catalog.write().update_table(&pb_table);
678                            return Ok(());
679                        }
680                    }
681                    _ => unreachable!(),
682                }
683            }
684        }
685
686        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
687    }
688
689    async fn alter_subscription_retention(
690        &self,
691        subscription_id: SubscriptionId,
692        retention_seconds: u64,
693        definition: String,
694    ) -> Result<()> {
695        for database in self.catalog.read().iter_databases() {
696            for schema in database.iter_schemas() {
697                if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
698                    let mut pb_subscription = subscription.to_proto();
699                    pb_subscription.retention_seconds = retention_seconds;
700                    pb_subscription.definition = definition;
701                    self.catalog.write().update_subscription(&pb_subscription);
702                    return Ok(());
703                }
704            }
705        }
706
707        Err(
708            ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
709                .into(),
710        )
711    }
712
713    async fn alter_set_schema(
714        &self,
715        object: alter_set_schema_request::Object,
716        new_schema_id: SchemaId,
717    ) -> Result<()> {
718        match object {
719            alter_set_schema_request::Object::TableId(table_id) => {
720                let mut pb_table = {
721                    let reader = self.catalog.read();
722                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
723                    table.to_prost()
724                };
725                pb_table.schema_id = new_schema_id;
726                self.catalog.write().update_table(&pb_table);
727                self.table_id_to_schema_id
728                    .write()
729                    .insert(table_id.as_raw_id(), new_schema_id);
730                Ok(())
731            }
732            _ => unreachable!(),
733        }
734    }
735
736    async fn alter_parallelism(
737        &self,
738        _job_id: JobId,
739        _parallelism: PbTableParallelism,
740        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
741        _deferred: bool,
742    ) -> Result<()> {
743        todo!()
744    }
745
746    async fn alter_backfill_parallelism(
747        &self,
748        _job_id: JobId,
749        _parallelism: Option<PbTableParallelism>,
750        _adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
751        _deferred: bool,
752    ) -> Result<()> {
753        todo!()
754    }
755
756    async fn alter_config(
757        &self,
758        _job_id: JobId,
759        _entries_to_add: HashMap<String, String>,
760        _keys_to_remove: Vec<String>,
761    ) -> Result<()> {
762        todo!()
763    }
764
765    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
766        todo!()
767    }
768
769    async fn alter_secret(
770        &self,
771        _secret_id: SecretId,
772        _secret_name: String,
773        _database_id: DatabaseId,
774        _schema_id: SchemaId,
775        _owner_id: UserId,
776        _payload: Vec<u8>,
777    ) -> Result<()> {
778        unreachable!()
779    }
780
781    async fn alter_resource_group(
782        &self,
783        _job_id: JobId,
784        _resource_group: Option<String>,
785        _deferred: bool,
786    ) -> Result<()> {
787        todo!()
788    }
789
790    async fn alter_database_resource_group(
791        &self,
792        database_id: DatabaseId,
793        resource_group: Option<String>,
794        _deferred: bool,
795    ) -> Result<()> {
796        let mut pb_database = {
797            let reader = self.catalog.read();
798            let database = reader.get_database_by_id(database_id)?.to_owned();
799            database.to_prost()
800        };
801        pb_database.resource_group =
802            resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned());
803        self.catalog.write().update_database(&pb_database);
804        Ok(())
805    }
806
807    async fn alter_database_param(
808        &self,
809        database_id: DatabaseId,
810        param: AlterDatabaseParam,
811    ) -> Result<()> {
812        let mut pb_database = {
813            let reader = self.catalog.read();
814            let database = reader.get_database_by_id(database_id)?.to_owned();
815            database.to_prost()
816        };
817        match param {
818            AlterDatabaseParam::BarrierIntervalMs(interval) => {
819                pb_database.barrier_interval_ms = interval;
820            }
821            AlterDatabaseParam::CheckpointFrequency(frequency) => {
822                pb_database.checkpoint_frequency = frequency;
823            }
824        }
825        self.catalog.write().update_database(&pb_database);
826        Ok(())
827    }
828
829    async fn create_iceberg_table(
830        &self,
831        _table_job_info: PbTableJobInfo,
832        _sink_job_info: PbSinkJobInfo,
833        _iceberg_source: PbSource,
834        _if_not_exists: bool,
835    ) -> Result<()> {
836        todo!()
837    }
838
839    async fn wait(&self, _job_id: Option<JobId>) -> Result<()> {
840        Ok(())
841    }
842}
843
844impl MockCatalogWriter {
845    pub fn new(
846        catalog: Arc<RwLock<Catalog>>,
847        hummock_snapshot_manager: HummockSnapshotManagerRef,
848    ) -> Self {
849        catalog.write().create_database(&PbDatabase {
850            id: 0.into(),
851            name: DEFAULT_DATABASE_NAME.to_owned(),
852            owner: DEFAULT_SUPER_USER_ID,
853            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
854            barrier_interval_ms: None,
855            checkpoint_frequency: None,
856        });
857        catalog.write().create_schema(&PbSchema {
858            id: 1.into(),
859            name: DEFAULT_SCHEMA_NAME.to_owned(),
860            database_id: 0.into(),
861            owner: DEFAULT_SUPER_USER_ID,
862        });
863        catalog.write().create_schema(&PbSchema {
864            id: 2.into(),
865            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
866            database_id: 0.into(),
867            owner: DEFAULT_SUPER_USER_ID,
868        });
869        catalog.write().create_schema(&PbSchema {
870            id: 3.into(),
871            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
872            database_id: 0.into(),
873            owner: DEFAULT_SUPER_USER_ID,
874        });
875        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
876        map.insert(1_u32.into(), 0_u32.into());
877        map.insert(2_u32.into(), 0_u32.into());
878        map.insert(3_u32.into(), 0_u32.into());
879        Self {
880            catalog,
881            id: AtomicU32::new(3),
882            table_id_to_schema_id: Default::default(),
883            schema_id_to_database_id: RwLock::new(map),
884            hummock_snapshot_manager,
885        }
886    }
887
888    fn gen_id<T: From<u32>>(&self) -> T {
889        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
890        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
891    }
892
893    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
894        self.table_id_to_schema_id
895            .write()
896            .insert(table_id, schema_id);
897    }
898
899    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
900        let schema_id = self
901            .table_id_to_schema_id
902            .write()
903            .remove(&table_id)
904            .unwrap();
905        (self.get_database_id_by_schema(schema_id), schema_id)
906    }
907
908    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
909        self.table_id_to_schema_id
910            .write()
911            .insert(table_id, schema_id);
912    }
913
914    fn add_table_or_subscription_id(
915        &self,
916        table_id: u32,
917        schema_id: SchemaId,
918        _database_id: DatabaseId,
919    ) {
920        self.table_id_to_schema_id
921            .write()
922            .insert(table_id, schema_id);
923    }
924
925    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
926        self.table_id_to_schema_id
927            .write()
928            .insert(table_id, schema_id);
929    }
930
931    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
932        let schema_id = self
933            .table_id_to_schema_id
934            .write()
935            .remove(&table_id)
936            .unwrap();
937        (self.get_database_id_by_schema(schema_id), schema_id)
938    }
939
940    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
941        let schema_id = self
942            .table_id_to_schema_id
943            .write()
944            .remove(&table_id)
945            .unwrap();
946        (self.get_database_id_by_schema(schema_id), schema_id)
947    }
948
949    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
950        let schema_id = self
951            .table_id_to_schema_id
952            .write()
953            .remove(&table_id)
954            .unwrap();
955        (self.get_database_id_by_schema(schema_id), schema_id)
956    }
957
958    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
959        self.schema_id_to_database_id
960            .write()
961            .insert(schema_id, database_id);
962    }
963
964    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
965        self.schema_id_to_database_id
966            .write()
967            .remove(&schema_id)
968            .unwrap()
969    }
970
971    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
972        source.id = self.gen_id();
973        self.catalog.write().create_source(&source);
974        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
975        Ok(source.id)
976    }
977
978    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
979        sink.id = self.gen_id();
980        sink.stream_job_status = PbStreamJobStatus::Created as _;
981        self.catalog.write().create_sink(&sink);
982        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
983        Ok(sink.id)
984    }
985
986    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
987        subscription.id = self.gen_id();
988        self.catalog.write().create_subscription(&subscription);
989        self.add_table_or_subscription_id(
990            subscription.id.as_raw_id(),
991            subscription.schema_id,
992            subscription.database_id,
993        );
994        Ok(())
995    }
996
997    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
998        *self
999            .schema_id_to_database_id
1000            .read()
1001            .get(&schema_id)
1002            .unwrap()
1003    }
1004
1005    fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
1006        let catalog = self.catalog.read();
1007        for database in catalog.iter_databases() {
1008            for schema in database.iter_schemas() {
1009                if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
1010                    return if table.is_mview() {
1011                        PbObjectType::Mview
1012                    } else {
1013                        PbObjectType::Table
1014                    };
1015                }
1016                if schema.get_source_by_id(object_id.as_source_id()).is_some() {
1017                    return PbObjectType::Source;
1018                }
1019                if schema.get_view_by_id(object_id.as_view_id()).is_some() {
1020                    return PbObjectType::View;
1021                }
1022                if schema.get_index_by_id(object_id.as_index_id()).is_some() {
1023                    return PbObjectType::Index;
1024                }
1025            }
1026        }
1027        PbObjectType::Unspecified
1028    }
1029
1030    fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
1031        if dependencies.is_empty() {
1032            return;
1033        }
1034        let dependencies = dependencies
1035            .into_iter()
1036            .map(|referenced_object_id| PbObjectDependency {
1037                object_id,
1038                referenced_object_id,
1039                referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1040            })
1041            .collect();
1042        self.catalog
1043            .write()
1044            .insert_object_dependencies(dependencies);
1045    }
1046}
1047
1048pub struct MockUserInfoWriter {
1049    id: AtomicU32,
1050    user_info: Arc<RwLock<UserInfoManager>>,
1051}
1052
1053#[async_trait::async_trait]
1054impl UserInfoWriter for MockUserInfoWriter {
1055    async fn create_user(&self, user: UserInfo) -> Result<()> {
1056        let mut user = user;
1057        user.id = self.gen_id().into();
1058        self.user_info.write().create_user(user);
1059        Ok(())
1060    }
1061
1062    async fn drop_user(&self, id: UserId) -> Result<()> {
1063        self.user_info.write().drop_user(id);
1064        Ok(())
1065    }
1066
1067    async fn update_user(
1068        &self,
1069        update_user: UserInfo,
1070        update_fields: Vec<UpdateField>,
1071    ) -> Result<()> {
1072        let mut lock = self.user_info.write();
1073        let id = update_user.get_id();
1074        let Some(old_name) = lock.get_user_name_by_id(id) else {
1075            return Ok(());
1076        };
1077        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1078        update_fields.into_iter().for_each(|field| match field {
1079            UpdateField::Super => user_info.is_super = update_user.is_super,
1080            UpdateField::Login => user_info.can_login = update_user.can_login,
1081            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1082            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1083            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1084            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1085            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1086            UpdateField::Unspecified => unreachable!(),
1087        });
1088        lock.update_user(update_user);
1089        Ok(())
1090    }
1091
1092    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
1093    /// `GrantAllSources` when grant privilege to user.
1094    async fn grant_privilege(
1095        &self,
1096        users: Vec<UserId>,
1097        privileges: Vec<GrantPrivilege>,
1098        with_grant_option: bool,
1099        _grantor: UserId,
1100    ) -> Result<()> {
1101        let privileges = privileges
1102            .into_iter()
1103            .map(|mut p| {
1104                p.action_with_opts
1105                    .iter_mut()
1106                    .for_each(|ao| ao.with_grant_option = with_grant_option);
1107                p
1108            })
1109            .collect::<Vec<_>>();
1110        for user_id in users {
1111            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1112                u.extend_privileges(privileges.clone());
1113            }
1114        }
1115        Ok(())
1116    }
1117
1118    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1119    /// `RevokeAllSources` when revoke privilege from user.
1120    async fn revoke_privilege(
1121        &self,
1122        users: Vec<UserId>,
1123        privileges: Vec<GrantPrivilege>,
1124        _granted_by: UserId,
1125        _revoke_by: UserId,
1126        revoke_grant_option: bool,
1127        _cascade: bool,
1128    ) -> Result<()> {
1129        for user_id in users {
1130            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1131                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1132            }
1133        }
1134        Ok(())
1135    }
1136
1137    async fn alter_default_privilege(
1138        &self,
1139        _users: Vec<UserId>,
1140        _database_id: DatabaseId,
1141        _schemas: Vec<SchemaId>,
1142        _operation: AlterDefaultPrivilegeOperation,
1143        _operated_by: UserId,
1144    ) -> Result<()> {
1145        todo!()
1146    }
1147}
1148
1149impl MockUserInfoWriter {
1150    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1151        user_info.write().create_user(UserInfo {
1152            id: DEFAULT_SUPER_USER_ID,
1153            name: DEFAULT_SUPER_USER.to_owned(),
1154            is_super: true,
1155            can_create_db: true,
1156            can_create_user: true,
1157            can_login: true,
1158            ..Default::default()
1159        });
1160        user_info.write().create_user(UserInfo {
1161            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1162            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1163            is_super: true,
1164            can_create_db: true,
1165            can_create_user: true,
1166            can_login: true,
1167            is_admin: true,
1168            ..Default::default()
1169        });
1170        Self {
1171            user_info,
1172            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1173        }
1174    }
1175
1176    fn gen_id(&self) -> u32 {
1177        self.id.fetch_add(1, Ordering::SeqCst)
1178    }
1179}
1180
1181pub struct MockFrontendMetaClient {}
1182
1183#[async_trait::async_trait]
1184impl FrontendMetaClient for MockFrontendMetaClient {
1185    async fn try_unregister(&self) {}
1186
1187    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1188        Ok(INVALID_VERSION_ID)
1189    }
1190
1191    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1192        Ok(vec![])
1193    }
1194
1195    async fn list_table_fragments(
1196        &self,
1197        _table_ids: &[JobId],
1198    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1199        Ok(HashMap::default())
1200    }
1201
1202    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1203        Ok(vec![])
1204    }
1205
1206    async fn list_fragment_distribution(
1207        &self,
1208        _include_node: bool,
1209    ) -> RpcResult<Vec<FragmentDistribution>> {
1210        Ok(vec![])
1211    }
1212
1213    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1214        Ok(vec![])
1215    }
1216
1217    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1218        Ok(vec![])
1219    }
1220
1221    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1222        Ok(vec![])
1223    }
1224
1225    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1226        Ok(vec![])
1227    }
1228
1229    async fn list_sink_log_store_tables(
1230        &self,
1231    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1232        Ok(vec![])
1233    }
1234
1235    async fn set_system_param(
1236        &self,
1237        _param: String,
1238        _value: Option<String>,
1239    ) -> RpcResult<Option<SystemParamsReader>> {
1240        Ok(Some(SystemParams::default().into()))
1241    }
1242
1243    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1244        Ok(Default::default())
1245    }
1246
1247    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1248        Ok("".to_owned())
1249    }
1250
1251    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1252        Ok(vec![])
1253    }
1254
1255    async fn get_tables(
1256        &self,
1257        _table_ids: Vec<crate::catalog::TableId>,
1258        _include_dropped_tables: bool,
1259    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1260        Ok(HashMap::new())
1261    }
1262
1263    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1264        unimplemented!()
1265    }
1266
1267    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1268        unimplemented!()
1269    }
1270
1271    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1272        Ok(HummockVersion::default())
1273    }
1274
1275    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1276        unimplemented!()
1277    }
1278
1279    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1280        unimplemented!()
1281    }
1282
1283    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1284        unimplemented!()
1285    }
1286
1287    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1288        unimplemented!()
1289    }
1290
1291    async fn list_hummock_active_write_limits(
1292        &self,
1293    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1294        unimplemented!()
1295    }
1296
1297    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1298        unimplemented!()
1299    }
1300
1301    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1302        Ok(vec![])
1303    }
1304
1305    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1306        unimplemented!()
1307    }
1308
1309    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1310        Ok(vec![])
1311    }
1312
1313    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1314        unimplemented!()
1315    }
1316
1317    async fn recover(&self) -> RpcResult<()> {
1318        unimplemented!()
1319    }
1320
1321    async fn backup_meta(&self, _remarks: Option<String>) -> RpcResult<u64> {
1322        unimplemented!()
1323    }
1324
1325    async fn get_backup_job_status(
1326        &self,
1327        _job_id: u64,
1328    ) -> RpcResult<(risingwave_pb::backup_service::BackupJobStatus, String)> {
1329        unimplemented!()
1330    }
1331
1332    async fn delete_meta_snapshot(&self, _snapshot_ids: &[u64]) -> RpcResult<()> {
1333        unimplemented!()
1334    }
1335
1336    async fn apply_throttle(
1337        &self,
1338        _throttle_target: PbThrottleTarget,
1339        _throttle_type: risingwave_pb::common::PbThrottleType,
1340        _id: u32,
1341        _rate_limit: Option<u32>,
1342    ) -> RpcResult<()> {
1343        unimplemented!()
1344    }
1345
1346    async fn alter_fragment_parallelism(
1347        &self,
1348        _fragment_ids: Vec<FragmentId>,
1349        _parallelism: Option<PbTableParallelism>,
1350    ) -> RpcResult<()> {
1351        unimplemented!()
1352    }
1353
1354    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1355        Ok(RecoveryStatus::StatusRunning)
1356    }
1357
1358    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1359        Ok(vec![])
1360    }
1361
1362    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1363        Ok(vec![])
1364    }
1365
1366    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1367        Ok(HashMap::default())
1368    }
1369
1370    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1371        unimplemented!()
1372    }
1373
1374    async fn alter_sink_props(
1375        &self,
1376        _sink_id: SinkId,
1377        _changed_props: BTreeMap<String, String>,
1378        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1379        _connector_conn_ref: Option<ConnectionId>,
1380    ) -> RpcResult<()> {
1381        unimplemented!()
1382    }
1383
1384    async fn alter_iceberg_table_props(
1385        &self,
1386        _table_id: TableId,
1387        _sink_id: SinkId,
1388        _source_id: SourceId,
1389        _changed_props: BTreeMap<String, String>,
1390        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1391        _connector_conn_ref: Option<ConnectionId>,
1392    ) -> RpcResult<()> {
1393        unimplemented!()
1394    }
1395
1396    async fn alter_source_connector_props(
1397        &self,
1398        _source_id: SourceId,
1399        _changed_props: BTreeMap<String, String>,
1400        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1401        _connector_conn_ref: Option<ConnectionId>,
1402    ) -> RpcResult<()> {
1403        unimplemented!()
1404    }
1405
1406    async fn alter_connection_connector_props(
1407        &self,
1408        _connection_id: u32,
1409        _changed_props: BTreeMap<String, String>,
1410        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1411    ) -> RpcResult<()> {
1412        Ok(())
1413    }
1414
1415    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1416        unimplemented!()
1417    }
1418
1419    async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1420        Ok(vec![])
1421    }
1422
1423    async fn get_fragment_by_id(
1424        &self,
1425        _fragment_id: FragmentId,
1426    ) -> RpcResult<Option<FragmentDistribution>> {
1427        unimplemented!()
1428    }
1429
1430    async fn get_fragment_vnodes(
1431        &self,
1432        _fragment_id: FragmentId,
1433    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1434        unimplemented!()
1435    }
1436
1437    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1438        unimplemented!()
1439    }
1440
1441    fn worker_id(&self) -> WorkerId {
1442        0.into()
1443    }
1444
1445    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1446        Ok(())
1447    }
1448
1449    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1450        Ok(1)
1451    }
1452
1453    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1454        Ok(())
1455    }
1456
1457    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1458        Ok(RefreshResponse { status: None })
1459    }
1460
1461    fn cluster_id(&self) -> &str {
1462        "test-cluster-uuid"
1463    }
1464
1465    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1466        unimplemented!()
1467    }
1468
1469    async fn get_hummock_table_change_log(
1470        &self,
1471        _start_epoch_inclusive: Option<u64>,
1472        _end_epoch_inclusive: Option<u64>,
1473        _table_ids: Option<HashSet<TableId>>,
1474        _exclude_empty: bool,
1475        _limit: Option<u32>,
1476    ) -> RpcResult<TableChangeLogs> {
1477        Ok(HashMap::default())
1478    }
1479
1480    async fn update_compaction_config(
1481        &self,
1482        _compaction_group_ids: Vec<risingwave_hummock_sdk::CompactionGroupId>,
1483        _configs: Vec<PbMutableConfig>,
1484    ) -> RpcResult<()> {
1485        Ok(())
1486    }
1487}
1488
1489#[cfg(test)]
1490pub static PROTO_FILE_DATA: &str = r#"
1491    syntax = "proto3";
1492    package test;
1493    message TestRecord {
1494      int32 id = 1;
1495      Country country = 3;
1496      int64 zipcode = 4;
1497      float rate = 5;
1498    }
1499    message TestRecordAlterType {
1500        string id = 1;
1501        Country country = 3;
1502        int32 zipcode = 4;
1503        float rate = 5;
1504      }
1505    message TestRecordExt {
1506      int32 id = 1;
1507      Country country = 3;
1508      int64 zipcode = 4;
1509      float rate = 5;
1510      string name = 6;
1511    }
1512    message Country {
1513      string address = 1;
1514      City city = 2;
1515      string zipcode = 3;
1516    }
1517    message City {
1518      string address = 1;
1519      string zipcode = 2;
1520    }"#;
1521
1522/// Returns the file.
1523/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1524pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1525    let in_file = Builder::new()
1526        .prefix("temp")
1527        .suffix(".proto")
1528        .rand_bytes(8)
1529        .tempfile()
1530        .unwrap();
1531
1532    let out_file = Builder::new()
1533        .prefix("temp")
1534        .suffix(".pb")
1535        .rand_bytes(8)
1536        .tempfile()
1537        .unwrap();
1538
1539    let mut file = in_file.as_file();
1540    file.write_all(proto_data.as_ref())
1541        .expect("writing binary to test file");
1542    file.flush().expect("flush temp file failed");
1543    let include_path = in_file
1544        .path()
1545        .parent()
1546        .unwrap()
1547        .to_string_lossy()
1548        .into_owned();
1549    let out_path = out_file.path().to_string_lossy().into_owned();
1550    let in_path = in_file.path().to_string_lossy().into_owned();
1551    let mut compile = std::process::Command::new("protoc");
1552
1553    let out = compile
1554        .arg("--include_imports")
1555        .arg("-I")
1556        .arg(include_path)
1557        .arg(format!("--descriptor_set_out={}", out_path))
1558        .arg(in_path)
1559        .output()
1560        .expect("failed to compile proto");
1561    if !out.status.success() {
1562        panic!("compile proto failed \n output: {:?}", out);
1563    }
1564    out_file
1565}