risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::{PbObjectType, WorkerNode};
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
63use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
64use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
65use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
66use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
67use risingwave_pb::meta::{
68    EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
69    PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
70    list_sink_log_store_tables_response,
71};
72use risingwave_pb::secret::PbSecretRef;
73use risingwave_pb::stream_plan::StreamFragmentGraph;
74use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
75use risingwave_pb::user::update_user_request::UpdateField;
76use risingwave_pb::user::{GrantPrivilege, UserInfo};
77use risingwave_rpc_client::error::Result as RpcResult;
78use tempfile::{Builder, NamedTempFile};
79
80use crate::FrontendOpts;
81use crate::catalog::catalog_service::CatalogWriter;
82use crate::catalog::root_catalog::Catalog;
83use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
84use crate::error::{ErrorCode, Result, RwError};
85use crate::handler::RwPgResponse;
86use crate::meta_client::FrontendMetaClient;
87use crate::scheduler::HummockSnapshotManagerRef;
88use crate::session::{AuthContext, FrontendEnv, SessionImpl};
89use crate::user::UserId;
90use crate::user::user_manager::UserInfoManager;
91use crate::user::user_service::UserInfoWriter;
92
93/// An embedded frontend without starting meta and without starting frontend as a tcp server.
94pub struct LocalFrontend {
95    pub opts: FrontendOpts,
96    env: FrontendEnv,
97}
98
99impl SessionManager for LocalFrontend {
100    type Error = RwError;
101    type Session = SessionImpl;
102
103    fn create_dummy_session(
104        &self,
105        _database_id: DatabaseId,
106    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
107        unreachable!()
108    }
109
110    fn connect(
111        &self,
112        _database: &str,
113        _user_name: &str,
114        _peer_addr: AddressRef,
115    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
116        Ok(self.session_ref())
117    }
118
119    fn cancel_queries_in_session(&self, _session_id: SessionId) {
120        unreachable!()
121    }
122
123    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
124        unreachable!()
125    }
126
127    fn end_session(&self, _session: &Self::Session) {
128        unreachable!()
129    }
130}
131
132impl LocalFrontend {
133    #[expect(clippy::unused_async)]
134    pub async fn new(opts: FrontendOpts) -> Self {
135        let env = FrontendEnv::mock();
136        Self { opts, env }
137    }
138
139    pub async fn run_sql(
140        &self,
141        sql: impl Into<String>,
142    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
143        let sql: Arc<str> = Arc::from(sql.into());
144        Box::pin(self.session_ref().run_statement(sql, vec![])).await
145    }
146
147    pub async fn run_sql_with_session(
148        &self,
149        session_ref: Arc<SessionImpl>,
150        sql: impl Into<String>,
151    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
152        let sql: Arc<str> = Arc::from(sql.into());
153        Box::pin(session_ref.run_statement(sql, vec![])).await
154    }
155
156    pub async fn run_user_sql(
157        &self,
158        sql: impl Into<String>,
159        database: String,
160        user_name: String,
161        user_id: UserId,
162    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
163        let sql: Arc<str> = Arc::from(sql.into());
164        Box::pin(
165            self.session_user_ref(database, user_name, user_id)
166                .run_statement(sql, vec![]),
167        )
168        .await
169    }
170
171    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
172        let mut rsp = self.run_sql(sql).await.unwrap();
173        let mut res = vec![];
174        #[for_await]
175        for row_set in rsp.values_stream() {
176            for row in row_set.unwrap() {
177                res.push(format!("{:?}", row));
178            }
179        }
180        res
181    }
182
183    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
184        let mut rsp = self.run_sql(sql).await.unwrap();
185        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
186        let mut res = String::new();
187        #[for_await]
188        for row_set in rsp.values_stream() {
189            for row in row_set.unwrap() {
190                let row: Row = row;
191                let row = row.values()[0].as_ref().unwrap();
192                res += std::str::from_utf8(row).unwrap();
193                res += "\n";
194            }
195        }
196        res
197    }
198
199    /// Creates a new session
200    pub fn session_ref(&self) -> Arc<SessionImpl> {
201        self.session_user_ref(
202            DEFAULT_DATABASE_NAME.to_owned(),
203            DEFAULT_SUPER_USER.to_owned(),
204            DEFAULT_SUPER_USER_ID,
205        )
206    }
207
208    pub fn session_user_ref(
209        &self,
210        database: String,
211        user_name: String,
212        user_id: UserId,
213    ) -> Arc<SessionImpl> {
214        Arc::new(SessionImpl::new(
215            self.env.clone(),
216            AuthContext::new(database, user_name, user_id),
217            UserAuthenticator::None,
218            // Local Frontend use a non-sense id.
219            (0, 0),
220            Address::Tcp(SocketAddr::new(
221                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
222                6666,
223            ))
224            .into(),
225            Default::default(),
226        ))
227    }
228}
229
230pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
231    if rsp.stmt_type() != StatementType::EXPLAIN {
232        panic!("RESPONSE INVALID: {rsp:?}");
233    }
234    let mut res = String::new();
235    #[for_await]
236    for row_set in rsp.values_stream() {
237        for row in row_set.unwrap() {
238            let row: Row = row;
239            let row = row.values()[0].as_ref().unwrap();
240            res += std::str::from_utf8(row).unwrap();
241            res += "\n";
242        }
243    }
244    res
245}
246
247pub struct MockCatalogWriter {
248    catalog: Arc<RwLock<Catalog>>,
249    id: AtomicU32,
250    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
251    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
252    hummock_snapshot_manager: HummockSnapshotManagerRef,
253}
254
255#[async_trait::async_trait]
256impl CatalogWriter for MockCatalogWriter {
257    async fn create_database(
258        &self,
259        db_name: &str,
260        owner: UserId,
261        resource_group: &str,
262        barrier_interval_ms: Option<u32>,
263        checkpoint_frequency: Option<u64>,
264    ) -> Result<()> {
265        let database_id = DatabaseId::new(self.gen_id());
266        self.catalog.write().create_database(&PbDatabase {
267            name: db_name.to_owned(),
268            id: database_id,
269            owner,
270            resource_group: resource_group.to_owned(),
271            barrier_interval_ms,
272            checkpoint_frequency,
273        });
274        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
275            .await?;
276        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
277            .await?;
278        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
279            .await?;
280        Ok(())
281    }
282
283    async fn create_schema(
284        &self,
285        db_id: DatabaseId,
286        schema_name: &str,
287        owner: UserId,
288    ) -> Result<()> {
289        let id = self.gen_id();
290        self.catalog.write().create_schema(&PbSchema {
291            id,
292            name: schema_name.to_owned(),
293            database_id: db_id,
294            owner,
295        });
296        self.add_schema_id(id, db_id);
297        Ok(())
298    }
299
300    async fn create_materialized_view(
301        &self,
302        mut table: PbTable,
303        _graph: StreamFragmentGraph,
304        dependencies: HashSet<ObjectId>,
305        _resource_type: streaming_job_resource_type::ResourceType,
306        _if_not_exists: bool,
307    ) -> Result<()> {
308        table.id = self.gen_id();
309        table.stream_job_status = PbStreamJobStatus::Created as _;
310        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
311        self.catalog.write().create_table(&table);
312        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
313        self.insert_object_dependencies(table.id.as_object_id(), dependencies);
314        self.hummock_snapshot_manager.add_table_for_test(table.id);
315        Ok(())
316    }
317
318    async fn replace_materialized_view(
319        &self,
320        mut table: PbTable,
321        _graph: StreamFragmentGraph,
322    ) -> Result<()> {
323        table.stream_job_status = PbStreamJobStatus::Created as _;
324        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
325        self.catalog.write().update_table(&table);
326        Ok(())
327    }
328
329    async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
330        view.id = self.gen_id();
331        self.catalog.write().create_view(&view);
332        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
333        self.insert_object_dependencies(view.id.as_object_id(), dependencies);
334        Ok(())
335    }
336
337    async fn create_table(
338        &self,
339        source: Option<PbSource>,
340        mut table: PbTable,
341        graph: StreamFragmentGraph,
342        _job_type: PbTableJobType,
343        if_not_exists: bool,
344        dependencies: HashSet<ObjectId>,
345    ) -> Result<()> {
346        if let Some(source) = source {
347            let source_id = self.create_source_inner(source)?;
348            table.optional_associated_source_id = Some(source_id.into());
349        }
350        self.create_materialized_view(
351            table,
352            graph,
353            dependencies,
354            streaming_job_resource_type::ResourceType::Regular(true),
355            if_not_exists,
356        )
357        .await?;
358        Ok(())
359    }
360
361    async fn replace_table(
362        &self,
363        _source: Option<PbSource>,
364        mut table: PbTable,
365        _graph: StreamFragmentGraph,
366        _job_type: TableJobType,
367    ) -> Result<()> {
368        table.stream_job_status = PbStreamJobStatus::Created as _;
369        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
370        self.catalog.write().update_table(&table);
371        Ok(())
372    }
373
374    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
375        self.catalog.write().update_source(&source);
376        Ok(())
377    }
378
379    async fn create_source(
380        &self,
381        source: PbSource,
382        _graph: Option<StreamFragmentGraph>,
383        _if_not_exists: bool,
384    ) -> Result<()> {
385        self.create_source_inner(source).map(|_| ())
386    }
387
388    async fn create_sink(
389        &self,
390        sink: PbSink,
391        graph: StreamFragmentGraph,
392        dependencies: HashSet<ObjectId>,
393        _if_not_exists: bool,
394    ) -> Result<()> {
395        let sink_id = self.create_sink_inner(sink, graph)?;
396        self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
397        Ok(())
398    }
399
400    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
401        self.create_subscription_inner(subscription)
402    }
403
404    async fn create_index(
405        &self,
406        mut index: PbIndex,
407        mut index_table: PbTable,
408        _graph: StreamFragmentGraph,
409        _if_not_exists: bool,
410    ) -> Result<()> {
411        index_table.id = self.gen_id();
412        index_table.stream_job_status = PbStreamJobStatus::Created as _;
413        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
414        self.catalog.write().create_table(&index_table);
415        self.add_table_or_index_id(
416            index_table.id.as_raw_id(),
417            index_table.schema_id,
418            index_table.database_id,
419        );
420
421        index.id = index_table.id.as_raw_id().into();
422        index.index_table_id = index_table.id;
423        self.catalog.write().create_index(&index);
424        Ok(())
425    }
426
427    async fn create_function(&self, _function: PbFunction) -> Result<()> {
428        unreachable!()
429    }
430
431    async fn create_connection(
432        &self,
433        _connection_name: String,
434        _database_id: DatabaseId,
435        _schema_id: SchemaId,
436        _owner_id: UserId,
437        _connection: create_connection_request::Payload,
438    ) -> Result<()> {
439        unreachable!()
440    }
441
442    async fn create_secret(
443        &self,
444        _secret_name: String,
445        _database_id: DatabaseId,
446        _schema_id: SchemaId,
447        _owner_id: UserId,
448        _payload: Vec<u8>,
449    ) -> Result<()> {
450        unreachable!()
451    }
452
453    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
454        unreachable!()
455    }
456
457    async fn drop_table(
458        &self,
459        source_id: Option<SourceId>,
460        table_id: TableId,
461        cascade: bool,
462    ) -> Result<()> {
463        if cascade {
464            return Err(ErrorCode::NotSupported(
465                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
466                "use drop instead".to_owned(),
467            )
468            .into());
469        }
470        if let Some(source_id) = source_id {
471            self.drop_table_or_source_id(source_id.as_raw_id());
472        }
473        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
474        let indexes =
475            self.catalog
476                .read()
477                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
478        for index in indexes {
479            self.drop_index(index.id, cascade).await?;
480        }
481        self.catalog
482            .write()
483            .drop_table(database_id, schema_id, table_id);
484        if let Some(source_id) = source_id {
485            self.catalog
486                .write()
487                .drop_source(database_id, schema_id, source_id);
488        }
489        Ok(())
490    }
491
492    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
493        unreachable!()
494    }
495
496    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
497        if cascade {
498            return Err(ErrorCode::NotSupported(
499                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
500                "use drop instead".to_owned(),
501            )
502            .into());
503        }
504        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
505        let indexes =
506            self.catalog
507                .read()
508                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
509        for index in indexes {
510            self.drop_index(index.id, cascade).await?;
511        }
512        self.catalog
513            .write()
514            .drop_table(database_id, schema_id, table_id);
515        Ok(())
516    }
517
518    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
519        if cascade {
520            return Err(ErrorCode::NotSupported(
521                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
522                "use drop instead".to_owned(),
523            )
524            .into());
525        }
526        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
527        self.catalog
528            .write()
529            .drop_source(database_id, schema_id, source_id);
530        Ok(())
531    }
532
533    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
534        Ok(())
535    }
536
537    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
538        if cascade {
539            return Err(ErrorCode::NotSupported(
540                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
541                "use drop instead".to_owned(),
542            )
543            .into());
544        }
545        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
546        self.catalog
547            .write()
548            .drop_sink(database_id, schema_id, sink_id);
549        Ok(())
550    }
551
552    async fn drop_subscription(
553        &self,
554        subscription_id: SubscriptionId,
555        cascade: bool,
556    ) -> Result<()> {
557        if cascade {
558            return Err(ErrorCode::NotSupported(
559                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
560                "use drop instead".to_owned(),
561            )
562            .into());
563        }
564        let (database_id, schema_id) =
565            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
566        self.catalog
567            .write()
568            .drop_subscription(database_id, schema_id, subscription_id);
569        Ok(())
570    }
571
572    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
573        if cascade {
574            return Err(ErrorCode::NotSupported(
575                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
576                "use drop instead".to_owned(),
577            )
578            .into());
579        }
580        let &schema_id = self
581            .table_id_to_schema_id
582            .read()
583            .get(&index_id.as_raw_id())
584            .unwrap();
585        let database_id = self.get_database_id_by_schema(schema_id);
586
587        let index = {
588            let catalog_reader = self.catalog.read();
589            let schema_catalog = catalog_reader
590                .get_schema_by_id(database_id, schema_id)
591                .unwrap();
592            schema_catalog.get_index_by_id(index_id).unwrap().clone()
593        };
594
595        let index_table_id = index.index_table().id;
596        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
597        self.catalog
598            .write()
599            .drop_index(database_id, schema_id, index_id);
600        self.catalog
601            .write()
602            .drop_table(database_id, schema_id, index_table_id);
603        Ok(())
604    }
605
606    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
607        unreachable!()
608    }
609
610    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
611        unreachable!()
612    }
613
614    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
615        unreachable!()
616    }
617
618    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
619        self.catalog.write().drop_database(database_id);
620        Ok(())
621    }
622
623    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
624        let database_id = self.drop_schema_id(schema_id);
625        self.catalog.write().drop_schema(database_id, schema_id);
626        Ok(())
627    }
628
629    async fn alter_name(
630        &self,
631        object_id: alter_name_request::Object,
632        object_name: &str,
633    ) -> Result<()> {
634        match object_id {
635            alter_name_request::Object::TableId(table_id) => {
636                self.catalog
637                    .write()
638                    .alter_table_name_by_id(table_id, object_name);
639                Ok(())
640            }
641            _ => {
642                unimplemented!()
643            }
644        }
645    }
646
647    async fn alter_source(&self, source: PbSource) -> Result<()> {
648        self.catalog.write().update_source(&source);
649        Ok(())
650    }
651
652    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
653        for database in self.catalog.read().iter_databases() {
654            for schema in database.iter_schemas() {
655                match object {
656                    Object::TableId(table_id) => {
657                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
658                        {
659                            let mut pb_table = table.to_prost();
660                            pb_table.owner = owner_id;
661                            self.catalog.write().update_table(&pb_table);
662                            return Ok(());
663                        }
664                    }
665                    _ => unreachable!(),
666                }
667            }
668        }
669
670        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
671    }
672
673    async fn alter_subscription_retention(
674        &self,
675        subscription_id: SubscriptionId,
676        retention_seconds: u64,
677        definition: String,
678    ) -> Result<()> {
679        for database in self.catalog.read().iter_databases() {
680            for schema in database.iter_schemas() {
681                if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
682                    let mut pb_subscription = subscription.to_proto();
683                    pb_subscription.retention_seconds = retention_seconds;
684                    pb_subscription.definition = definition;
685                    self.catalog.write().update_subscription(&pb_subscription);
686                    return Ok(());
687                }
688            }
689        }
690
691        Err(
692            ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
693                .into(),
694        )
695    }
696
697    async fn alter_set_schema(
698        &self,
699        object: alter_set_schema_request::Object,
700        new_schema_id: SchemaId,
701    ) -> Result<()> {
702        match object {
703            alter_set_schema_request::Object::TableId(table_id) => {
704                let mut pb_table = {
705                    let reader = self.catalog.read();
706                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
707                    table.to_prost()
708                };
709                pb_table.schema_id = new_schema_id;
710                self.catalog.write().update_table(&pb_table);
711                self.table_id_to_schema_id
712                    .write()
713                    .insert(table_id.as_raw_id(), new_schema_id);
714                Ok(())
715            }
716            _ => unreachable!(),
717        }
718    }
719
720    async fn alter_parallelism(
721        &self,
722        _job_id: JobId,
723        _parallelism: PbTableParallelism,
724        _deferred: bool,
725    ) -> Result<()> {
726        todo!()
727    }
728
729    async fn alter_backfill_parallelism(
730        &self,
731        _job_id: JobId,
732        _parallelism: Option<PbTableParallelism>,
733        _deferred: bool,
734    ) -> Result<()> {
735        todo!()
736    }
737
738    async fn alter_config(
739        &self,
740        _job_id: JobId,
741        _entries_to_add: HashMap<String, String>,
742        _keys_to_remove: Vec<String>,
743    ) -> Result<()> {
744        todo!()
745    }
746
747    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
748        todo!()
749    }
750
751    async fn alter_secret(
752        &self,
753        _secret_id: SecretId,
754        _secret_name: String,
755        _database_id: DatabaseId,
756        _schema_id: SchemaId,
757        _owner_id: UserId,
758        _payload: Vec<u8>,
759    ) -> Result<()> {
760        unreachable!()
761    }
762
763    async fn alter_resource_group(
764        &self,
765        _table_id: TableId,
766        _resource_group: Option<String>,
767        _deferred: bool,
768    ) -> Result<()> {
769        todo!()
770    }
771
772    async fn alter_database_param(
773        &self,
774        database_id: DatabaseId,
775        param: AlterDatabaseParam,
776    ) -> Result<()> {
777        let mut pb_database = {
778            let reader = self.catalog.read();
779            let database = reader.get_database_by_id(database_id)?.to_owned();
780            database.to_prost()
781        };
782        match param {
783            AlterDatabaseParam::BarrierIntervalMs(interval) => {
784                pb_database.barrier_interval_ms = interval;
785            }
786            AlterDatabaseParam::CheckpointFrequency(frequency) => {
787                pb_database.checkpoint_frequency = frequency;
788            }
789        }
790        self.catalog.write().update_database(&pb_database);
791        Ok(())
792    }
793
794    async fn create_iceberg_table(
795        &self,
796        _table_job_info: PbTableJobInfo,
797        _sink_job_info: PbSinkJobInfo,
798        _iceberg_source: PbSource,
799        _if_not_exists: bool,
800    ) -> Result<()> {
801        todo!()
802    }
803
804    async fn wait(&self) -> Result<()> {
805        Ok(())
806    }
807}
808
809impl MockCatalogWriter {
810    pub fn new(
811        catalog: Arc<RwLock<Catalog>>,
812        hummock_snapshot_manager: HummockSnapshotManagerRef,
813    ) -> Self {
814        catalog.write().create_database(&PbDatabase {
815            id: 0.into(),
816            name: DEFAULT_DATABASE_NAME.to_owned(),
817            owner: DEFAULT_SUPER_USER_ID,
818            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
819            barrier_interval_ms: None,
820            checkpoint_frequency: None,
821        });
822        catalog.write().create_schema(&PbSchema {
823            id: 1.into(),
824            name: DEFAULT_SCHEMA_NAME.to_owned(),
825            database_id: 0.into(),
826            owner: DEFAULT_SUPER_USER_ID,
827        });
828        catalog.write().create_schema(&PbSchema {
829            id: 2.into(),
830            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
831            database_id: 0.into(),
832            owner: DEFAULT_SUPER_USER_ID,
833        });
834        catalog.write().create_schema(&PbSchema {
835            id: 3.into(),
836            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
837            database_id: 0.into(),
838            owner: DEFAULT_SUPER_USER_ID,
839        });
840        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
841        map.insert(1_u32.into(), 0_u32.into());
842        map.insert(2_u32.into(), 0_u32.into());
843        map.insert(3_u32.into(), 0_u32.into());
844        Self {
845            catalog,
846            id: AtomicU32::new(3),
847            table_id_to_schema_id: Default::default(),
848            schema_id_to_database_id: RwLock::new(map),
849            hummock_snapshot_manager,
850        }
851    }
852
853    fn gen_id<T: From<u32>>(&self) -> T {
854        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
855        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
856    }
857
858    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
859        self.table_id_to_schema_id
860            .write()
861            .insert(table_id, schema_id);
862    }
863
864    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
865        let schema_id = self
866            .table_id_to_schema_id
867            .write()
868            .remove(&table_id)
869            .unwrap();
870        (self.get_database_id_by_schema(schema_id), schema_id)
871    }
872
873    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
874        self.table_id_to_schema_id
875            .write()
876            .insert(table_id, schema_id);
877    }
878
879    fn add_table_or_subscription_id(
880        &self,
881        table_id: u32,
882        schema_id: SchemaId,
883        _database_id: DatabaseId,
884    ) {
885        self.table_id_to_schema_id
886            .write()
887            .insert(table_id, schema_id);
888    }
889
890    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
891        self.table_id_to_schema_id
892            .write()
893            .insert(table_id, schema_id);
894    }
895
896    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
897        let schema_id = self
898            .table_id_to_schema_id
899            .write()
900            .remove(&table_id)
901            .unwrap();
902        (self.get_database_id_by_schema(schema_id), schema_id)
903    }
904
905    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
906        let schema_id = self
907            .table_id_to_schema_id
908            .write()
909            .remove(&table_id)
910            .unwrap();
911        (self.get_database_id_by_schema(schema_id), schema_id)
912    }
913
914    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
915        let schema_id = self
916            .table_id_to_schema_id
917            .write()
918            .remove(&table_id)
919            .unwrap();
920        (self.get_database_id_by_schema(schema_id), schema_id)
921    }
922
923    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
924        self.schema_id_to_database_id
925            .write()
926            .insert(schema_id, database_id);
927    }
928
929    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
930        self.schema_id_to_database_id
931            .write()
932            .remove(&schema_id)
933            .unwrap()
934    }
935
936    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
937        source.id = self.gen_id();
938        self.catalog.write().create_source(&source);
939        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
940        Ok(source.id)
941    }
942
943    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
944        sink.id = self.gen_id();
945        sink.stream_job_status = PbStreamJobStatus::Created as _;
946        self.catalog.write().create_sink(&sink);
947        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
948        Ok(sink.id)
949    }
950
951    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
952        subscription.id = self.gen_id();
953        self.catalog.write().create_subscription(&subscription);
954        self.add_table_or_subscription_id(
955            subscription.id.as_raw_id(),
956            subscription.schema_id,
957            subscription.database_id,
958        );
959        Ok(())
960    }
961
962    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
963        *self
964            .schema_id_to_database_id
965            .read()
966            .get(&schema_id)
967            .unwrap()
968    }
969
970    fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
971        let catalog = self.catalog.read();
972        for database in catalog.iter_databases() {
973            for schema in database.iter_schemas() {
974                if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
975                    return if table.is_mview() {
976                        PbObjectType::Mview
977                    } else {
978                        PbObjectType::Table
979                    };
980                }
981                if schema.get_source_by_id(object_id.as_source_id()).is_some() {
982                    return PbObjectType::Source;
983                }
984                if schema.get_view_by_id(object_id.as_view_id()).is_some() {
985                    return PbObjectType::View;
986                }
987                if schema.get_index_by_id(object_id.as_index_id()).is_some() {
988                    return PbObjectType::Index;
989                }
990            }
991        }
992        PbObjectType::Unspecified
993    }
994
995    fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
996        if dependencies.is_empty() {
997            return;
998        }
999        let dependencies = dependencies
1000            .into_iter()
1001            .map(|referenced_object_id| PbObjectDependency {
1002                object_id,
1003                referenced_object_id,
1004                referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1005            })
1006            .collect();
1007        self.catalog
1008            .write()
1009            .insert_object_dependencies(dependencies);
1010    }
1011}
1012
1013pub struct MockUserInfoWriter {
1014    id: AtomicU32,
1015    user_info: Arc<RwLock<UserInfoManager>>,
1016}
1017
1018#[async_trait::async_trait]
1019impl UserInfoWriter for MockUserInfoWriter {
1020    async fn create_user(&self, user: UserInfo) -> Result<()> {
1021        let mut user = user;
1022        user.id = self.gen_id().into();
1023        self.user_info.write().create_user(user);
1024        Ok(())
1025    }
1026
1027    async fn drop_user(&self, id: UserId) -> Result<()> {
1028        self.user_info.write().drop_user(id);
1029        Ok(())
1030    }
1031
1032    async fn update_user(
1033        &self,
1034        update_user: UserInfo,
1035        update_fields: Vec<UpdateField>,
1036    ) -> Result<()> {
1037        let mut lock = self.user_info.write();
1038        let id = update_user.get_id();
1039        let Some(old_name) = lock.get_user_name_by_id(id) else {
1040            return Ok(());
1041        };
1042        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1043        update_fields.into_iter().for_each(|field| match field {
1044            UpdateField::Super => user_info.is_super = update_user.is_super,
1045            UpdateField::Login => user_info.can_login = update_user.can_login,
1046            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1047            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1048            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1049            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1050            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1051            UpdateField::Unspecified => unreachable!(),
1052        });
1053        lock.update_user(update_user);
1054        Ok(())
1055    }
1056
1057    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
1058    /// `GrantAllSources` when grant privilege to user.
1059    async fn grant_privilege(
1060        &self,
1061        users: Vec<UserId>,
1062        privileges: Vec<GrantPrivilege>,
1063        with_grant_option: bool,
1064        _grantor: UserId,
1065    ) -> Result<()> {
1066        let privileges = privileges
1067            .into_iter()
1068            .map(|mut p| {
1069                p.action_with_opts
1070                    .iter_mut()
1071                    .for_each(|ao| ao.with_grant_option = with_grant_option);
1072                p
1073            })
1074            .collect::<Vec<_>>();
1075        for user_id in users {
1076            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1077                u.extend_privileges(privileges.clone());
1078            }
1079        }
1080        Ok(())
1081    }
1082
1083    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1084    /// `RevokeAllSources` when revoke privilege from user.
1085    async fn revoke_privilege(
1086        &self,
1087        users: Vec<UserId>,
1088        privileges: Vec<GrantPrivilege>,
1089        _granted_by: UserId,
1090        _revoke_by: UserId,
1091        revoke_grant_option: bool,
1092        _cascade: bool,
1093    ) -> Result<()> {
1094        for user_id in users {
1095            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1096                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1097            }
1098        }
1099        Ok(())
1100    }
1101
1102    async fn alter_default_privilege(
1103        &self,
1104        _users: Vec<UserId>,
1105        _database_id: DatabaseId,
1106        _schemas: Vec<SchemaId>,
1107        _operation: AlterDefaultPrivilegeOperation,
1108        _operated_by: UserId,
1109    ) -> Result<()> {
1110        todo!()
1111    }
1112}
1113
1114impl MockUserInfoWriter {
1115    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1116        user_info.write().create_user(UserInfo {
1117            id: DEFAULT_SUPER_USER_ID,
1118            name: DEFAULT_SUPER_USER.to_owned(),
1119            is_super: true,
1120            can_create_db: true,
1121            can_create_user: true,
1122            can_login: true,
1123            ..Default::default()
1124        });
1125        user_info.write().create_user(UserInfo {
1126            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1127            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1128            is_super: true,
1129            can_create_db: true,
1130            can_create_user: true,
1131            can_login: true,
1132            is_admin: true,
1133            ..Default::default()
1134        });
1135        Self {
1136            user_info,
1137            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1138        }
1139    }
1140
1141    fn gen_id(&self) -> u32 {
1142        self.id.fetch_add(1, Ordering::SeqCst)
1143    }
1144}
1145
1146pub struct MockFrontendMetaClient {}
1147
1148#[async_trait::async_trait]
1149impl FrontendMetaClient for MockFrontendMetaClient {
1150    async fn try_unregister(&self) {}
1151
1152    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1153        Ok(INVALID_VERSION_ID)
1154    }
1155
1156    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1157        Ok(vec![])
1158    }
1159
1160    async fn list_table_fragments(
1161        &self,
1162        _table_ids: &[JobId],
1163    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1164        Ok(HashMap::default())
1165    }
1166
1167    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1168        Ok(vec![])
1169    }
1170
1171    async fn list_fragment_distribution(
1172        &self,
1173        _include_node: bool,
1174    ) -> RpcResult<Vec<FragmentDistribution>> {
1175        Ok(vec![])
1176    }
1177
1178    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1179        Ok(vec![])
1180    }
1181
1182    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1183        Ok(vec![])
1184    }
1185
1186    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1187        Ok(vec![])
1188    }
1189
1190    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1191        Ok(vec![])
1192    }
1193
1194    async fn list_sink_log_store_tables(
1195        &self,
1196    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1197        Ok(vec![])
1198    }
1199
1200    async fn set_system_param(
1201        &self,
1202        _param: String,
1203        _value: Option<String>,
1204    ) -> RpcResult<Option<SystemParamsReader>> {
1205        Ok(Some(SystemParams::default().into()))
1206    }
1207
1208    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1209        Ok(Default::default())
1210    }
1211
1212    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1213        Ok("".to_owned())
1214    }
1215
1216    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1217        Ok(vec![])
1218    }
1219
1220    async fn get_tables(
1221        &self,
1222        _table_ids: Vec<crate::catalog::TableId>,
1223        _include_dropped_tables: bool,
1224    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1225        Ok(HashMap::new())
1226    }
1227
1228    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1229        unimplemented!()
1230    }
1231
1232    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1233        unimplemented!()
1234    }
1235
1236    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1237        Ok(HummockVersion::default())
1238    }
1239
1240    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1241        unimplemented!()
1242    }
1243
1244    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1245        unimplemented!()
1246    }
1247
1248    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1249        unimplemented!()
1250    }
1251
1252    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1253        unimplemented!()
1254    }
1255
1256    async fn list_hummock_active_write_limits(
1257        &self,
1258    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1259        unimplemented!()
1260    }
1261
1262    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1263        unimplemented!()
1264    }
1265
1266    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1267        Ok(vec![])
1268    }
1269
1270    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1271        unimplemented!()
1272    }
1273
1274    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1275        Ok(vec![])
1276    }
1277
1278    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1279        unimplemented!()
1280    }
1281
1282    async fn recover(&self) -> RpcResult<()> {
1283        unimplemented!()
1284    }
1285
1286    async fn apply_throttle(
1287        &self,
1288        _throttle_target: PbThrottleTarget,
1289        _throttle_type: risingwave_pb::common::PbThrottleType,
1290        _id: u32,
1291        _rate_limit: Option<u32>,
1292    ) -> RpcResult<()> {
1293        unimplemented!()
1294    }
1295
1296    async fn alter_fragment_parallelism(
1297        &self,
1298        _fragment_ids: Vec<FragmentId>,
1299        _parallelism: Option<PbTableParallelism>,
1300    ) -> RpcResult<()> {
1301        unimplemented!()
1302    }
1303
1304    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1305        Ok(RecoveryStatus::StatusRunning)
1306    }
1307
1308    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1309        Ok(vec![])
1310    }
1311
1312    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1313        Ok(vec![])
1314    }
1315
1316    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1317        Ok(HashMap::default())
1318    }
1319
1320    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1321        unimplemented!()
1322    }
1323
1324    async fn alter_sink_props(
1325        &self,
1326        _sink_id: SinkId,
1327        _changed_props: BTreeMap<String, String>,
1328        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1329        _connector_conn_ref: Option<ConnectionId>,
1330    ) -> RpcResult<()> {
1331        unimplemented!()
1332    }
1333
1334    async fn alter_iceberg_table_props(
1335        &self,
1336        _table_id: TableId,
1337        _sink_id: SinkId,
1338        _source_id: SourceId,
1339        _changed_props: BTreeMap<String, String>,
1340        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1341        _connector_conn_ref: Option<ConnectionId>,
1342    ) -> RpcResult<()> {
1343        unimplemented!()
1344    }
1345
1346    async fn alter_source_connector_props(
1347        &self,
1348        _source_id: SourceId,
1349        _changed_props: BTreeMap<String, String>,
1350        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1351        _connector_conn_ref: Option<ConnectionId>,
1352    ) -> RpcResult<()> {
1353        unimplemented!()
1354    }
1355
1356    async fn alter_connection_connector_props(
1357        &self,
1358        _connection_id: u32,
1359        _changed_props: BTreeMap<String, String>,
1360        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1361    ) -> RpcResult<()> {
1362        Ok(())
1363    }
1364
1365    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1366        unimplemented!()
1367    }
1368
1369    async fn get_fragment_by_id(
1370        &self,
1371        _fragment_id: FragmentId,
1372    ) -> RpcResult<Option<FragmentDistribution>> {
1373        unimplemented!()
1374    }
1375
1376    async fn get_fragment_vnodes(
1377        &self,
1378        _fragment_id: FragmentId,
1379    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1380        unimplemented!()
1381    }
1382
1383    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1384        unimplemented!()
1385    }
1386
1387    fn worker_id(&self) -> WorkerId {
1388        0.into()
1389    }
1390
1391    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1392        Ok(())
1393    }
1394
1395    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1396        Ok(1)
1397    }
1398
1399    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1400        Ok(())
1401    }
1402
1403    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1404        Ok(RefreshResponse { status: None })
1405    }
1406
1407    fn cluster_id(&self) -> &str {
1408        "test-cluster-uuid"
1409    }
1410
1411    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1412        unimplemented!()
1413    }
1414}
1415
1416#[cfg(test)]
1417pub static PROTO_FILE_DATA: &str = r#"
1418    syntax = "proto3";
1419    package test;
1420    message TestRecord {
1421      int32 id = 1;
1422      Country country = 3;
1423      int64 zipcode = 4;
1424      float rate = 5;
1425    }
1426    message TestRecordAlterType {
1427        string id = 1;
1428        Country country = 3;
1429        int32 zipcode = 4;
1430        float rate = 5;
1431      }
1432    message TestRecordExt {
1433      int32 id = 1;
1434      Country country = 3;
1435      int64 zipcode = 4;
1436      float rate = 5;
1437      string name = 6;
1438    }
1439    message Country {
1440      string address = 1;
1441      City city = 2;
1442      string zipcode = 3;
1443    }
1444    message City {
1445      string address = 1;
1446      string zipcode = 2;
1447    }"#;
1448
1449/// Returns the file.
1450/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1451pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1452    let in_file = Builder::new()
1453        .prefix("temp")
1454        .suffix(".proto")
1455        .rand_bytes(8)
1456        .tempfile()
1457        .unwrap();
1458
1459    let out_file = Builder::new()
1460        .prefix("temp")
1461        .suffix(".pb")
1462        .rand_bytes(8)
1463        .tempfile()
1464        .unwrap();
1465
1466    let mut file = in_file.as_file();
1467    file.write_all(proto_data.as_ref())
1468        .expect("writing binary to test file");
1469    file.flush().expect("flush temp file failed");
1470    let include_path = in_file
1471        .path()
1472        .parent()
1473        .unwrap()
1474        .to_string_lossy()
1475        .into_owned();
1476    let out_path = out_file.path().to_string_lossy().into_owned();
1477    let in_path = in_file.path().to_string_lossy().into_owned();
1478    let mut compile = std::process::Command::new("protoc");
1479
1480    let out = compile
1481        .arg("--include_imports")
1482        .arg("-I")
1483        .arg(include_path)
1484        .arg(format!("--descriptor_set_out={}", out_path))
1485        .arg(in_path)
1486        .output()
1487        .expect("failed to compile proto");
1488    if !out.status.success() {
1489        panic!("compile proto failed \n output: {:?}", out);
1490    }
1491    out_file
1492}