risingwave_frontend/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::io::Write;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20
21use futures_async_stream::for_await;
22use parking_lot::RwLock;
23use pgwire::net::{Address, AddressRef};
24use pgwire::pg_response::StatementType;
25use pgwire::pg_server::{SessionId, SessionManager, UserAuthenticator};
26use pgwire::types::Row;
27use risingwave_common::catalog::{
28    AlterDatabaseParam, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
29    DEFAULT_SUPER_USER_FOR_ADMIN, DEFAULT_SUPER_USER_FOR_ADMIN_ID, DEFAULT_SUPER_USER_ID,
30    FunctionId, IndexId, NON_RESERVED_USER_ID, ObjectId, PG_CATALOG_SCHEMA_NAME,
31    RW_CATALOG_SCHEMA_NAME, TableId,
32};
33use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
34use risingwave_common::id::{ConnectionId, JobId, SourceId, SubscriptionId, ViewId, WorkerId};
35use risingwave_common::session_config::SessionConfig;
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_common::util::cluster_limit::ClusterLimit;
38use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
39use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
40use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
41use risingwave_pb::backup_service::MetaSnapshotMetadata;
42use risingwave_pb::catalog::{
43    PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbStreamJobStatus,
44    PbSubscription, PbTable, PbView, Table,
45};
46use risingwave_pb::common::{PbObjectType, WorkerNode};
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
49use risingwave_pb::ddl_service::{
50    DdlProgress, PbTableJobType, TableJobType, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request, create_connection_request, streaming_job_resource_type,
52};
53use risingwave_pb::hummock::write_limits::WriteLimit;
54use risingwave_pb::hummock::{
55    BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
56};
57use risingwave_pb::id::ActorId;
58use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
59use risingwave_pb::meta::list_actor_splits_response::ActorSplit;
60use risingwave_pb::meta::list_actor_states_response::ActorState;
61use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
62use risingwave_pb::meta::list_iceberg_compaction_status_response::IcebergCompactionStatus;
63use risingwave_pb::meta::list_iceberg_tables_response::IcebergTable;
64use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
65use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
66use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
67use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
68use risingwave_pb::meta::{
69    EventLog, FragmentDistribution, ObjectDependency as PbObjectDependency, PbTableParallelism,
70    PbThrottleTarget, RecoveryStatus, RefreshRequest, RefreshResponse, SystemParams,
71    list_sink_log_store_tables_response,
72};
73use risingwave_pb::secret::PbSecretRef;
74use risingwave_pb::stream_plan::StreamFragmentGraph;
75use risingwave_pb::user::alter_default_privilege_request::Operation as AlterDefaultPrivilegeOperation;
76use risingwave_pb::user::update_user_request::UpdateField;
77use risingwave_pb::user::{GrantPrivilege, UserInfo};
78use risingwave_rpc_client::error::Result as RpcResult;
79use tempfile::{Builder, NamedTempFile};
80
81use crate::FrontendOpts;
82use crate::catalog::catalog_service::CatalogWriter;
83use crate::catalog::root_catalog::Catalog;
84use crate::catalog::{DatabaseId, FragmentId, SchemaId, SecretId, SinkId};
85use crate::error::{ErrorCode, Result, RwError};
86use crate::handler::RwPgResponse;
87use crate::meta_client::FrontendMetaClient;
88use crate::scheduler::HummockSnapshotManagerRef;
89use crate::session::{AuthContext, FrontendEnv, SessionImpl};
90use crate::user::UserId;
91use crate::user::user_manager::UserInfoManager;
92use crate::user::user_service::UserInfoWriter;
93
94/// An embedded frontend without starting meta and without starting frontend as a tcp server.
95pub struct LocalFrontend {
96    pub opts: FrontendOpts,
97    env: FrontendEnv,
98}
99
100impl SessionManager for LocalFrontend {
101    type Error = RwError;
102    type Session = SessionImpl;
103
104    fn create_dummy_session(
105        &self,
106        _database_id: DatabaseId,
107    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
108        unreachable!()
109    }
110
111    fn connect(
112        &self,
113        _database: &str,
114        _user_name: &str,
115        _peer_addr: AddressRef,
116    ) -> std::result::Result<Arc<Self::Session>, Self::Error> {
117        Ok(self.session_ref())
118    }
119
120    fn cancel_queries_in_session(&self, _session_id: SessionId) {
121        unreachable!()
122    }
123
124    fn cancel_creating_jobs_in_session(&self, _session_id: SessionId) {
125        unreachable!()
126    }
127
128    fn end_session(&self, _session: &Self::Session) {
129        unreachable!()
130    }
131}
132
133impl LocalFrontend {
134    #[expect(clippy::unused_async)]
135    pub async fn new(opts: FrontendOpts) -> Self {
136        let env = FrontendEnv::mock();
137        Self { opts, env }
138    }
139
140    pub async fn run_sql(
141        &self,
142        sql: impl Into<String>,
143    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
144        let sql: Arc<str> = Arc::from(sql.into());
145        Box::pin(self.session_ref().run_statement(sql, vec![])).await
146    }
147
148    pub async fn run_sql_with_session(
149        &self,
150        session_ref: Arc<SessionImpl>,
151        sql: impl Into<String>,
152    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
153        let sql: Arc<str> = Arc::from(sql.into());
154        Box::pin(session_ref.run_statement(sql, vec![])).await
155    }
156
157    pub async fn run_user_sql(
158        &self,
159        sql: impl Into<String>,
160        database: String,
161        user_name: String,
162        user_id: UserId,
163    ) -> std::result::Result<RwPgResponse, Box<dyn std::error::Error + Send + Sync>> {
164        let sql: Arc<str> = Arc::from(sql.into());
165        Box::pin(
166            self.session_user_ref(database, user_name, user_id)
167                .run_statement(sql, vec![]),
168        )
169        .await
170    }
171
172    pub async fn query_formatted_result(&self, sql: impl Into<String>) -> Vec<String> {
173        let mut rsp = self.run_sql(sql).await.unwrap();
174        let mut res = vec![];
175        #[for_await]
176        for row_set in rsp.values_stream() {
177            for row in row_set.unwrap() {
178                res.push(format!("{:?}", row));
179            }
180        }
181        res
182    }
183
184    pub async fn get_explain_output(&self, sql: impl Into<String>) -> String {
185        let mut rsp = self.run_sql(sql).await.unwrap();
186        assert_eq!(rsp.stmt_type(), StatementType::EXPLAIN);
187        let mut res = String::new();
188        #[for_await]
189        for row_set in rsp.values_stream() {
190            for row in row_set.unwrap() {
191                let row: Row = row;
192                let row = row.values()[0].as_ref().unwrap();
193                res += std::str::from_utf8(row).unwrap();
194                res += "\n";
195            }
196        }
197        res
198    }
199
200    /// Creates a new session
201    pub fn session_ref(&self) -> Arc<SessionImpl> {
202        self.session_user_ref(
203            DEFAULT_DATABASE_NAME.to_owned(),
204            DEFAULT_SUPER_USER.to_owned(),
205            DEFAULT_SUPER_USER_ID,
206        )
207    }
208
209    pub fn session_user_ref(
210        &self,
211        database: String,
212        user_name: String,
213        user_id: UserId,
214    ) -> Arc<SessionImpl> {
215        Arc::new(SessionImpl::new(
216            self.env.clone(),
217            AuthContext::new(database, user_name, user_id),
218            UserAuthenticator::None,
219            // Local Frontend use a non-sense id.
220            (0, 0),
221            Address::Tcp(SocketAddr::new(
222                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
223                6666,
224            ))
225            .into(),
226            Default::default(),
227        ))
228    }
229}
230
231pub async fn get_explain_output(mut rsp: RwPgResponse) -> String {
232    if rsp.stmt_type() != StatementType::EXPLAIN {
233        panic!("RESPONSE INVALID: {rsp:?}");
234    }
235    let mut res = String::new();
236    #[for_await]
237    for row_set in rsp.values_stream() {
238        for row in row_set.unwrap() {
239            let row: Row = row;
240            let row = row.values()[0].as_ref().unwrap();
241            res += std::str::from_utf8(row).unwrap();
242            res += "\n";
243        }
244    }
245    res
246}
247
248pub struct MockCatalogWriter {
249    catalog: Arc<RwLock<Catalog>>,
250    id: AtomicU32,
251    table_id_to_schema_id: RwLock<HashMap<u32, SchemaId>>,
252    schema_id_to_database_id: RwLock<HashMap<SchemaId, DatabaseId>>,
253    hummock_snapshot_manager: HummockSnapshotManagerRef,
254}
255
256#[async_trait::async_trait]
257impl CatalogWriter for MockCatalogWriter {
258    async fn create_database(
259        &self,
260        db_name: &str,
261        owner: UserId,
262        resource_group: &str,
263        barrier_interval_ms: Option<u32>,
264        checkpoint_frequency: Option<u64>,
265    ) -> Result<()> {
266        let database_id = DatabaseId::new(self.gen_id());
267        self.catalog.write().create_database(&PbDatabase {
268            name: db_name.to_owned(),
269            id: database_id,
270            owner,
271            resource_group: resource_group.to_owned(),
272            barrier_interval_ms,
273            checkpoint_frequency,
274        });
275        self.create_schema(database_id, DEFAULT_SCHEMA_NAME, owner)
276            .await?;
277        self.create_schema(database_id, PG_CATALOG_SCHEMA_NAME, owner)
278            .await?;
279        self.create_schema(database_id, RW_CATALOG_SCHEMA_NAME, owner)
280            .await?;
281        Ok(())
282    }
283
284    async fn create_schema(
285        &self,
286        db_id: DatabaseId,
287        schema_name: &str,
288        owner: UserId,
289    ) -> Result<()> {
290        let id = self.gen_id();
291        self.catalog.write().create_schema(&PbSchema {
292            id,
293            name: schema_name.to_owned(),
294            database_id: db_id,
295            owner,
296        });
297        self.add_schema_id(id, db_id);
298        Ok(())
299    }
300
301    async fn create_materialized_view(
302        &self,
303        mut table: PbTable,
304        _graph: StreamFragmentGraph,
305        dependencies: HashSet<ObjectId>,
306        _resource_type: streaming_job_resource_type::ResourceType,
307        _if_not_exists: bool,
308    ) -> Result<()> {
309        table.id = self.gen_id();
310        table.stream_job_status = PbStreamJobStatus::Created as _;
311        table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
312        self.catalog.write().create_table(&table);
313        self.add_table_or_source_id(table.id.as_raw_id(), table.schema_id, table.database_id);
314        self.insert_object_dependencies(table.id.as_object_id(), dependencies);
315        self.hummock_snapshot_manager.add_table_for_test(table.id);
316        Ok(())
317    }
318
319    async fn replace_materialized_view(
320        &self,
321        mut table: PbTable,
322        _graph: StreamFragmentGraph,
323    ) -> Result<()> {
324        table.stream_job_status = PbStreamJobStatus::Created as _;
325        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
326        self.catalog.write().update_table(&table);
327        Ok(())
328    }
329
330    async fn create_view(&self, mut view: PbView, dependencies: HashSet<ObjectId>) -> Result<()> {
331        view.id = self.gen_id();
332        self.catalog.write().create_view(&view);
333        self.add_table_or_source_id(view.id.as_raw_id(), view.schema_id, view.database_id);
334        self.insert_object_dependencies(view.id.as_object_id(), dependencies);
335        Ok(())
336    }
337
338    async fn create_table(
339        &self,
340        source: Option<PbSource>,
341        mut table: PbTable,
342        graph: StreamFragmentGraph,
343        _job_type: PbTableJobType,
344        if_not_exists: bool,
345        dependencies: HashSet<ObjectId>,
346    ) -> Result<()> {
347        if let Some(source) = source {
348            let source_id = self.create_source_inner(source)?;
349            table.optional_associated_source_id = Some(source_id.into());
350        }
351        self.create_materialized_view(
352            table,
353            graph,
354            dependencies,
355            streaming_job_resource_type::ResourceType::Regular(true),
356            if_not_exists,
357        )
358        .await?;
359        Ok(())
360    }
361
362    async fn replace_table(
363        &self,
364        _source: Option<PbSource>,
365        mut table: PbTable,
366        _graph: StreamFragmentGraph,
367        _job_type: TableJobType,
368    ) -> Result<()> {
369        table.stream_job_status = PbStreamJobStatus::Created as _;
370        assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
371        self.catalog.write().update_table(&table);
372        Ok(())
373    }
374
375    async fn replace_source(&self, source: PbSource, _graph: StreamFragmentGraph) -> Result<()> {
376        self.catalog.write().update_source(&source);
377        Ok(())
378    }
379
380    async fn create_source(
381        &self,
382        source: PbSource,
383        _graph: Option<StreamFragmentGraph>,
384        _if_not_exists: bool,
385    ) -> Result<()> {
386        self.create_source_inner(source).map(|_| ())
387    }
388
389    async fn create_sink(
390        &self,
391        sink: PbSink,
392        graph: StreamFragmentGraph,
393        dependencies: HashSet<ObjectId>,
394        _if_not_exists: bool,
395    ) -> Result<()> {
396        let sink_id = self.create_sink_inner(sink, graph)?;
397        self.insert_object_dependencies(sink_id.as_object_id(), dependencies);
398        Ok(())
399    }
400
401    async fn create_subscription(&self, subscription: PbSubscription) -> Result<()> {
402        self.create_subscription_inner(subscription)
403    }
404
405    async fn create_index(
406        &self,
407        mut index: PbIndex,
408        mut index_table: PbTable,
409        _graph: StreamFragmentGraph,
410        _if_not_exists: bool,
411    ) -> Result<()> {
412        index_table.id = self.gen_id();
413        index_table.stream_job_status = PbStreamJobStatus::Created as _;
414        index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
415        self.catalog.write().create_table(&index_table);
416        self.add_table_or_index_id(
417            index_table.id.as_raw_id(),
418            index_table.schema_id,
419            index_table.database_id,
420        );
421
422        index.id = index_table.id.as_raw_id().into();
423        index.index_table_id = index_table.id;
424        self.catalog.write().create_index(&index);
425        Ok(())
426    }
427
428    async fn create_function(&self, _function: PbFunction) -> Result<()> {
429        unreachable!()
430    }
431
432    async fn create_connection(
433        &self,
434        _connection_name: String,
435        _database_id: DatabaseId,
436        _schema_id: SchemaId,
437        _owner_id: UserId,
438        _connection: create_connection_request::Payload,
439    ) -> Result<()> {
440        unreachable!()
441    }
442
443    async fn create_secret(
444        &self,
445        _secret_name: String,
446        _database_id: DatabaseId,
447        _schema_id: SchemaId,
448        _owner_id: UserId,
449        _payload: Vec<u8>,
450    ) -> Result<()> {
451        unreachable!()
452    }
453
454    async fn comment_on(&self, _comment: PbComment) -> Result<()> {
455        unreachable!()
456    }
457
458    async fn drop_table(
459        &self,
460        source_id: Option<SourceId>,
461        table_id: TableId,
462        cascade: bool,
463    ) -> Result<()> {
464        if cascade {
465            return Err(ErrorCode::NotSupported(
466                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
467                "use drop instead".to_owned(),
468            )
469            .into());
470        }
471        if let Some(source_id) = source_id {
472            self.drop_table_or_source_id(source_id.as_raw_id());
473        }
474        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
475        let indexes =
476            self.catalog
477                .read()
478                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
479        for index in indexes {
480            self.drop_index(index.id, cascade).await?;
481        }
482        self.catalog
483            .write()
484            .drop_table(database_id, schema_id, table_id);
485        if let Some(source_id) = source_id {
486            self.catalog
487                .write()
488                .drop_source(database_id, schema_id, source_id);
489        }
490        Ok(())
491    }
492
493    async fn drop_view(&self, _view_id: ViewId, _cascade: bool) -> Result<()> {
494        unreachable!()
495    }
496
497    async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
498        if cascade {
499            return Err(ErrorCode::NotSupported(
500                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
501                "use drop instead".to_owned(),
502            )
503            .into());
504        }
505        let (database_id, schema_id) = self.drop_table_or_source_id(table_id.as_raw_id());
506        let indexes =
507            self.catalog
508                .read()
509                .get_all_indexes_related_to_object(database_id, schema_id, table_id);
510        for index in indexes {
511            self.drop_index(index.id, cascade).await?;
512        }
513        self.catalog
514            .write()
515            .drop_table(database_id, schema_id, table_id);
516        Ok(())
517    }
518
519    async fn drop_source(&self, source_id: SourceId, cascade: bool) -> Result<()> {
520        if cascade {
521            return Err(ErrorCode::NotSupported(
522                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
523                "use drop instead".to_owned(),
524            )
525            .into());
526        }
527        let (database_id, schema_id) = self.drop_table_or_source_id(source_id.as_raw_id());
528        self.catalog
529            .write()
530            .drop_source(database_id, schema_id, source_id);
531        Ok(())
532    }
533
534    async fn reset_source(&self, _source_id: SourceId) -> Result<()> {
535        Ok(())
536    }
537
538    async fn drop_sink(&self, sink_id: SinkId, cascade: bool) -> Result<()> {
539        if cascade {
540            return Err(ErrorCode::NotSupported(
541                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
542                "use drop instead".to_owned(),
543            )
544            .into());
545        }
546        let (database_id, schema_id) = self.drop_table_or_sink_id(sink_id.as_raw_id());
547        self.catalog
548            .write()
549            .drop_sink(database_id, schema_id, sink_id);
550        Ok(())
551    }
552
553    async fn drop_subscription(
554        &self,
555        subscription_id: SubscriptionId,
556        cascade: bool,
557    ) -> Result<()> {
558        if cascade {
559            return Err(ErrorCode::NotSupported(
560                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
561                "use drop instead".to_owned(),
562            )
563            .into());
564        }
565        let (database_id, schema_id) =
566            self.drop_table_or_subscription_id(subscription_id.as_raw_id());
567        self.catalog
568            .write()
569            .drop_subscription(database_id, schema_id, subscription_id);
570        Ok(())
571    }
572
573    async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
574        if cascade {
575            return Err(ErrorCode::NotSupported(
576                "drop cascade in MockCatalogWriter is unsupported".to_owned(),
577                "use drop instead".to_owned(),
578            )
579            .into());
580        }
581        let &schema_id = self
582            .table_id_to_schema_id
583            .read()
584            .get(&index_id.as_raw_id())
585            .unwrap();
586        let database_id = self.get_database_id_by_schema(schema_id);
587
588        let index = {
589            let catalog_reader = self.catalog.read();
590            let schema_catalog = catalog_reader
591                .get_schema_by_id(database_id, schema_id)
592                .unwrap();
593            schema_catalog.get_index_by_id(index_id).unwrap().clone()
594        };
595
596        let index_table_id = index.index_table().id;
597        let (database_id, schema_id) = self.drop_table_or_index_id(index_id.as_raw_id());
598        self.catalog
599            .write()
600            .drop_index(database_id, schema_id, index_id);
601        self.catalog
602            .write()
603            .drop_table(database_id, schema_id, index_table_id);
604        Ok(())
605    }
606
607    async fn drop_function(&self, _function_id: FunctionId, _cascade: bool) -> Result<()> {
608        unreachable!()
609    }
610
611    async fn drop_connection(&self, _connection_id: ConnectionId, _cascade: bool) -> Result<()> {
612        unreachable!()
613    }
614
615    async fn drop_secret(&self, _secret_id: SecretId, _cascade: bool) -> Result<()> {
616        unreachable!()
617    }
618
619    async fn drop_database(&self, database_id: DatabaseId) -> Result<()> {
620        self.catalog.write().drop_database(database_id);
621        Ok(())
622    }
623
624    async fn drop_schema(&self, schema_id: SchemaId, _cascade: bool) -> Result<()> {
625        let database_id = self.drop_schema_id(schema_id);
626        self.catalog.write().drop_schema(database_id, schema_id);
627        Ok(())
628    }
629
630    async fn alter_name(
631        &self,
632        object_id: alter_name_request::Object,
633        object_name: &str,
634    ) -> Result<()> {
635        match object_id {
636            alter_name_request::Object::TableId(table_id) => {
637                self.catalog
638                    .write()
639                    .alter_table_name_by_id(table_id, object_name);
640                Ok(())
641            }
642            _ => {
643                unimplemented!()
644            }
645        }
646    }
647
648    async fn alter_source(&self, source: PbSource) -> Result<()> {
649        self.catalog.write().update_source(&source);
650        Ok(())
651    }
652
653    async fn alter_owner(&self, object: Object, owner_id: UserId) -> Result<()> {
654        for database in self.catalog.read().iter_databases() {
655            for schema in database.iter_schemas() {
656                match object {
657                    Object::TableId(table_id) => {
658                        if let Some(table) = schema.get_created_table_by_id(TableId::from(table_id))
659                        {
660                            let mut pb_table = table.to_prost();
661                            pb_table.owner = owner_id;
662                            self.catalog.write().update_table(&pb_table);
663                            return Ok(());
664                        }
665                    }
666                    _ => unreachable!(),
667                }
668            }
669        }
670
671        Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into())
672    }
673
674    async fn alter_subscription_retention(
675        &self,
676        subscription_id: SubscriptionId,
677        retention_seconds: u64,
678        definition: String,
679    ) -> Result<()> {
680        for database in self.catalog.read().iter_databases() {
681            for schema in database.iter_schemas() {
682                if let Some(subscription) = schema.get_subscription_by_id(subscription_id) {
683                    let mut pb_subscription = subscription.to_proto();
684                    pb_subscription.retention_seconds = retention_seconds;
685                    pb_subscription.definition = definition;
686                    self.catalog.write().update_subscription(&pb_subscription);
687                    return Ok(());
688                }
689            }
690        }
691
692        Err(
693            ErrorCode::ItemNotFound(format!("subscription not found: {:?}", subscription_id))
694                .into(),
695        )
696    }
697
698    async fn alter_set_schema(
699        &self,
700        object: alter_set_schema_request::Object,
701        new_schema_id: SchemaId,
702    ) -> Result<()> {
703        match object {
704            alter_set_schema_request::Object::TableId(table_id) => {
705                let mut pb_table = {
706                    let reader = self.catalog.read();
707                    let table = reader.get_any_table_by_id(table_id)?.to_owned();
708                    table.to_prost()
709                };
710                pb_table.schema_id = new_schema_id;
711                self.catalog.write().update_table(&pb_table);
712                self.table_id_to_schema_id
713                    .write()
714                    .insert(table_id.as_raw_id(), new_schema_id);
715                Ok(())
716            }
717            _ => unreachable!(),
718        }
719    }
720
721    async fn alter_parallelism(
722        &self,
723        _job_id: JobId,
724        _parallelism: PbTableParallelism,
725        _deferred: bool,
726    ) -> Result<()> {
727        todo!()
728    }
729
730    async fn alter_backfill_parallelism(
731        &self,
732        _job_id: JobId,
733        _parallelism: Option<PbTableParallelism>,
734        _deferred: bool,
735    ) -> Result<()> {
736        todo!()
737    }
738
739    async fn alter_config(
740        &self,
741        _job_id: JobId,
742        _entries_to_add: HashMap<String, String>,
743        _keys_to_remove: Vec<String>,
744    ) -> Result<()> {
745        todo!()
746    }
747
748    async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> {
749        todo!()
750    }
751
752    async fn alter_secret(
753        &self,
754        _secret_id: SecretId,
755        _secret_name: String,
756        _database_id: DatabaseId,
757        _schema_id: SchemaId,
758        _owner_id: UserId,
759        _payload: Vec<u8>,
760    ) -> Result<()> {
761        unreachable!()
762    }
763
764    async fn alter_resource_group(
765        &self,
766        _table_id: TableId,
767        _resource_group: Option<String>,
768        _deferred: bool,
769    ) -> Result<()> {
770        todo!()
771    }
772
773    async fn alter_database_param(
774        &self,
775        database_id: DatabaseId,
776        param: AlterDatabaseParam,
777    ) -> Result<()> {
778        let mut pb_database = {
779            let reader = self.catalog.read();
780            let database = reader.get_database_by_id(database_id)?.to_owned();
781            database.to_prost()
782        };
783        match param {
784            AlterDatabaseParam::BarrierIntervalMs(interval) => {
785                pb_database.barrier_interval_ms = interval;
786            }
787            AlterDatabaseParam::CheckpointFrequency(frequency) => {
788                pb_database.checkpoint_frequency = frequency;
789            }
790        }
791        self.catalog.write().update_database(&pb_database);
792        Ok(())
793    }
794
795    async fn create_iceberg_table(
796        &self,
797        _table_job_info: PbTableJobInfo,
798        _sink_job_info: PbSinkJobInfo,
799        _iceberg_source: PbSource,
800        _if_not_exists: bool,
801    ) -> Result<()> {
802        todo!()
803    }
804
805    async fn wait(&self) -> Result<()> {
806        Ok(())
807    }
808}
809
810impl MockCatalogWriter {
811    pub fn new(
812        catalog: Arc<RwLock<Catalog>>,
813        hummock_snapshot_manager: HummockSnapshotManagerRef,
814    ) -> Self {
815        catalog.write().create_database(&PbDatabase {
816            id: 0.into(),
817            name: DEFAULT_DATABASE_NAME.to_owned(),
818            owner: DEFAULT_SUPER_USER_ID,
819            resource_group: DEFAULT_RESOURCE_GROUP.to_owned(),
820            barrier_interval_ms: None,
821            checkpoint_frequency: None,
822        });
823        catalog.write().create_schema(&PbSchema {
824            id: 1.into(),
825            name: DEFAULT_SCHEMA_NAME.to_owned(),
826            database_id: 0.into(),
827            owner: DEFAULT_SUPER_USER_ID,
828        });
829        catalog.write().create_schema(&PbSchema {
830            id: 2.into(),
831            name: PG_CATALOG_SCHEMA_NAME.to_owned(),
832            database_id: 0.into(),
833            owner: DEFAULT_SUPER_USER_ID,
834        });
835        catalog.write().create_schema(&PbSchema {
836            id: 3.into(),
837            name: RW_CATALOG_SCHEMA_NAME.to_owned(),
838            database_id: 0.into(),
839            owner: DEFAULT_SUPER_USER_ID,
840        });
841        let mut map: HashMap<SchemaId, DatabaseId> = HashMap::new();
842        map.insert(1_u32.into(), 0_u32.into());
843        map.insert(2_u32.into(), 0_u32.into());
844        map.insert(3_u32.into(), 0_u32.into());
845        Self {
846            catalog,
847            id: AtomicU32::new(3),
848            table_id_to_schema_id: Default::default(),
849            schema_id_to_database_id: RwLock::new(map),
850            hummock_snapshot_manager,
851        }
852    }
853
854    fn gen_id<T: From<u32>>(&self) -> T {
855        // Since the 0 value is `dev` schema and database, so jump out the 0 value.
856        (self.id.fetch_add(1, Ordering::SeqCst) + 1).into()
857    }
858
859    fn add_table_or_source_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
860        self.table_id_to_schema_id
861            .write()
862            .insert(table_id, schema_id);
863    }
864
865    fn drop_table_or_source_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
866        let schema_id = self
867            .table_id_to_schema_id
868            .write()
869            .remove(&table_id)
870            .unwrap();
871        (self.get_database_id_by_schema(schema_id), schema_id)
872    }
873
874    fn add_table_or_sink_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
875        self.table_id_to_schema_id
876            .write()
877            .insert(table_id, schema_id);
878    }
879
880    fn add_table_or_subscription_id(
881        &self,
882        table_id: u32,
883        schema_id: SchemaId,
884        _database_id: DatabaseId,
885    ) {
886        self.table_id_to_schema_id
887            .write()
888            .insert(table_id, schema_id);
889    }
890
891    fn add_table_or_index_id(&self, table_id: u32, schema_id: SchemaId, _database_id: DatabaseId) {
892        self.table_id_to_schema_id
893            .write()
894            .insert(table_id, schema_id);
895    }
896
897    fn drop_table_or_sink_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
898        let schema_id = self
899            .table_id_to_schema_id
900            .write()
901            .remove(&table_id)
902            .unwrap();
903        (self.get_database_id_by_schema(schema_id), schema_id)
904    }
905
906    fn drop_table_or_subscription_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
907        let schema_id = self
908            .table_id_to_schema_id
909            .write()
910            .remove(&table_id)
911            .unwrap();
912        (self.get_database_id_by_schema(schema_id), schema_id)
913    }
914
915    fn drop_table_or_index_id(&self, table_id: u32) -> (DatabaseId, SchemaId) {
916        let schema_id = self
917            .table_id_to_schema_id
918            .write()
919            .remove(&table_id)
920            .unwrap();
921        (self.get_database_id_by_schema(schema_id), schema_id)
922    }
923
924    fn add_schema_id(&self, schema_id: SchemaId, database_id: DatabaseId) {
925        self.schema_id_to_database_id
926            .write()
927            .insert(schema_id, database_id);
928    }
929
930    fn drop_schema_id(&self, schema_id: SchemaId) -> DatabaseId {
931        self.schema_id_to_database_id
932            .write()
933            .remove(&schema_id)
934            .unwrap()
935    }
936
937    fn create_source_inner(&self, mut source: PbSource) -> Result<SourceId> {
938        source.id = self.gen_id();
939        self.catalog.write().create_source(&source);
940        self.add_table_or_source_id(source.id.as_raw_id(), source.schema_id, source.database_id);
941        Ok(source.id)
942    }
943
944    fn create_sink_inner(&self, mut sink: PbSink, _graph: StreamFragmentGraph) -> Result<SinkId> {
945        sink.id = self.gen_id();
946        sink.stream_job_status = PbStreamJobStatus::Created as _;
947        self.catalog.write().create_sink(&sink);
948        self.add_table_or_sink_id(sink.id.as_raw_id(), sink.schema_id, sink.database_id);
949        Ok(sink.id)
950    }
951
952    fn create_subscription_inner(&self, mut subscription: PbSubscription) -> Result<()> {
953        subscription.id = self.gen_id();
954        self.catalog.write().create_subscription(&subscription);
955        self.add_table_or_subscription_id(
956            subscription.id.as_raw_id(),
957            subscription.schema_id,
958            subscription.database_id,
959        );
960        Ok(())
961    }
962
963    fn get_database_id_by_schema(&self, schema_id: SchemaId) -> DatabaseId {
964        *self
965            .schema_id_to_database_id
966            .read()
967            .get(&schema_id)
968            .unwrap()
969    }
970
971    fn get_object_type(&self, object_id: ObjectId) -> PbObjectType {
972        let catalog = self.catalog.read();
973        for database in catalog.iter_databases() {
974            for schema in database.iter_schemas() {
975                if let Some(table) = schema.get_created_table_by_id(object_id.as_table_id()) {
976                    return if table.is_mview() {
977                        PbObjectType::Mview
978                    } else {
979                        PbObjectType::Table
980                    };
981                }
982                if schema.get_source_by_id(object_id.as_source_id()).is_some() {
983                    return PbObjectType::Source;
984                }
985                if schema.get_view_by_id(object_id.as_view_id()).is_some() {
986                    return PbObjectType::View;
987                }
988                if schema.get_index_by_id(object_id.as_index_id()).is_some() {
989                    return PbObjectType::Index;
990                }
991            }
992        }
993        PbObjectType::Unspecified
994    }
995
996    fn insert_object_dependencies(&self, object_id: ObjectId, dependencies: HashSet<ObjectId>) {
997        if dependencies.is_empty() {
998            return;
999        }
1000        let dependencies = dependencies
1001            .into_iter()
1002            .map(|referenced_object_id| PbObjectDependency {
1003                object_id,
1004                referenced_object_id,
1005                referenced_object_type: self.get_object_type(referenced_object_id) as i32,
1006            })
1007            .collect();
1008        self.catalog
1009            .write()
1010            .insert_object_dependencies(dependencies);
1011    }
1012}
1013
1014pub struct MockUserInfoWriter {
1015    id: AtomicU32,
1016    user_info: Arc<RwLock<UserInfoManager>>,
1017}
1018
1019#[async_trait::async_trait]
1020impl UserInfoWriter for MockUserInfoWriter {
1021    async fn create_user(&self, user: UserInfo) -> Result<()> {
1022        let mut user = user;
1023        user.id = self.gen_id().into();
1024        self.user_info.write().create_user(user);
1025        Ok(())
1026    }
1027
1028    async fn drop_user(&self, id: UserId) -> Result<()> {
1029        self.user_info.write().drop_user(id);
1030        Ok(())
1031    }
1032
1033    async fn update_user(
1034        &self,
1035        update_user: UserInfo,
1036        update_fields: Vec<UpdateField>,
1037    ) -> Result<()> {
1038        let mut lock = self.user_info.write();
1039        let id = update_user.get_id();
1040        let Some(old_name) = lock.get_user_name_by_id(id) else {
1041            return Ok(());
1042        };
1043        let mut user_info = lock.get_user_by_name(&old_name).unwrap().to_prost();
1044        update_fields.into_iter().for_each(|field| match field {
1045            UpdateField::Super => user_info.is_super = update_user.is_super,
1046            UpdateField::Login => user_info.can_login = update_user.can_login,
1047            UpdateField::CreateDb => user_info.can_create_db = update_user.can_create_db,
1048            UpdateField::CreateUser => user_info.can_create_user = update_user.can_create_user,
1049            UpdateField::AuthInfo => user_info.auth_info.clone_from(&update_user.auth_info),
1050            UpdateField::Rename => user_info.name.clone_from(&update_user.name),
1051            UpdateField::Admin => user_info.is_admin = update_user.is_admin,
1052            UpdateField::Unspecified => unreachable!(),
1053        });
1054        lock.update_user(update_user);
1055        Ok(())
1056    }
1057
1058    /// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
1059    /// `GrantAllSources` when grant privilege to user.
1060    async fn grant_privilege(
1061        &self,
1062        users: Vec<UserId>,
1063        privileges: Vec<GrantPrivilege>,
1064        with_grant_option: bool,
1065        _grantor: UserId,
1066    ) -> Result<()> {
1067        let privileges = privileges
1068            .into_iter()
1069            .map(|mut p| {
1070                p.action_with_opts
1071                    .iter_mut()
1072                    .for_each(|ao| ao.with_grant_option = with_grant_option);
1073                p
1074            })
1075            .collect::<Vec<_>>();
1076        for user_id in users {
1077            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1078                u.extend_privileges(privileges.clone());
1079            }
1080        }
1081        Ok(())
1082    }
1083
1084    /// In `MockUserInfoWriter`, we don't support expand privilege with `RevokeAllTables` and
1085    /// `RevokeAllSources` when revoke privilege from user.
1086    async fn revoke_privilege(
1087        &self,
1088        users: Vec<UserId>,
1089        privileges: Vec<GrantPrivilege>,
1090        _granted_by: UserId,
1091        _revoke_by: UserId,
1092        revoke_grant_option: bool,
1093        _cascade: bool,
1094    ) -> Result<()> {
1095        for user_id in users {
1096            if let Some(u) = self.user_info.write().get_user_mut(user_id) {
1097                u.revoke_privileges(privileges.clone(), revoke_grant_option);
1098            }
1099        }
1100        Ok(())
1101    }
1102
1103    async fn alter_default_privilege(
1104        &self,
1105        _users: Vec<UserId>,
1106        _database_id: DatabaseId,
1107        _schemas: Vec<SchemaId>,
1108        _operation: AlterDefaultPrivilegeOperation,
1109        _operated_by: UserId,
1110    ) -> Result<()> {
1111        todo!()
1112    }
1113}
1114
1115impl MockUserInfoWriter {
1116    pub fn new(user_info: Arc<RwLock<UserInfoManager>>) -> Self {
1117        user_info.write().create_user(UserInfo {
1118            id: DEFAULT_SUPER_USER_ID,
1119            name: DEFAULT_SUPER_USER.to_owned(),
1120            is_super: true,
1121            can_create_db: true,
1122            can_create_user: true,
1123            can_login: true,
1124            ..Default::default()
1125        });
1126        user_info.write().create_user(UserInfo {
1127            id: DEFAULT_SUPER_USER_FOR_ADMIN_ID,
1128            name: DEFAULT_SUPER_USER_FOR_ADMIN.to_owned(),
1129            is_super: true,
1130            can_create_db: true,
1131            can_create_user: true,
1132            can_login: true,
1133            is_admin: true,
1134            ..Default::default()
1135        });
1136        Self {
1137            user_info,
1138            id: AtomicU32::new(NON_RESERVED_USER_ID.as_raw_id()),
1139        }
1140    }
1141
1142    fn gen_id(&self) -> u32 {
1143        self.id.fetch_add(1, Ordering::SeqCst)
1144    }
1145}
1146
1147pub struct MockFrontendMetaClient {}
1148
1149#[async_trait::async_trait]
1150impl FrontendMetaClient for MockFrontendMetaClient {
1151    async fn try_unregister(&self) {}
1152
1153    async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
1154        Ok(INVALID_VERSION_ID)
1155    }
1156
1157    async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
1158        Ok(vec![])
1159    }
1160
1161    async fn list_table_fragments(
1162        &self,
1163        _table_ids: &[JobId],
1164    ) -> RpcResult<HashMap<JobId, TableFragmentInfo>> {
1165        Ok(HashMap::default())
1166    }
1167
1168    async fn list_streaming_job_states(&self) -> RpcResult<Vec<StreamingJobState>> {
1169        Ok(vec![])
1170    }
1171
1172    async fn list_fragment_distribution(
1173        &self,
1174        _include_node: bool,
1175    ) -> RpcResult<Vec<FragmentDistribution>> {
1176        Ok(vec![])
1177    }
1178
1179    async fn list_creating_fragment_distribution(&self) -> RpcResult<Vec<FragmentDistribution>> {
1180        Ok(vec![])
1181    }
1182
1183    async fn list_actor_states(&self) -> RpcResult<Vec<ActorState>> {
1184        Ok(vec![])
1185    }
1186
1187    async fn list_actor_splits(&self) -> RpcResult<Vec<ActorSplit>> {
1188        Ok(vec![])
1189    }
1190
1191    async fn list_meta_snapshots(&self) -> RpcResult<Vec<MetaSnapshotMetadata>> {
1192        Ok(vec![])
1193    }
1194
1195    async fn list_sink_log_store_tables(
1196        &self,
1197    ) -> RpcResult<Vec<list_sink_log_store_tables_response::SinkLogStoreTable>> {
1198        Ok(vec![])
1199    }
1200
1201    async fn set_system_param(
1202        &self,
1203        _param: String,
1204        _value: Option<String>,
1205    ) -> RpcResult<Option<SystemParamsReader>> {
1206        Ok(Some(SystemParams::default().into()))
1207    }
1208
1209    async fn get_session_params(&self) -> RpcResult<SessionConfig> {
1210        Ok(Default::default())
1211    }
1212
1213    async fn set_session_param(&self, _param: String, _value: Option<String>) -> RpcResult<String> {
1214        Ok("".to_owned())
1215    }
1216
1217    async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
1218        Ok(vec![])
1219    }
1220
1221    async fn get_tables(
1222        &self,
1223        _table_ids: Vec<crate::catalog::TableId>,
1224        _include_dropped_tables: bool,
1225    ) -> RpcResult<HashMap<crate::catalog::TableId, Table>> {
1226        Ok(HashMap::new())
1227    }
1228
1229    async fn list_hummock_pinned_versions(&self) -> RpcResult<Vec<(WorkerId, HummockVersionId)>> {
1230        unimplemented!()
1231    }
1232
1233    async fn list_refresh_table_states(&self) -> RpcResult<Vec<RefreshTableState>> {
1234        unimplemented!()
1235    }
1236
1237    async fn get_hummock_current_version(&self) -> RpcResult<HummockVersion> {
1238        Ok(HummockVersion::default())
1239    }
1240
1241    async fn get_hummock_checkpoint_version(&self) -> RpcResult<HummockVersion> {
1242        unimplemented!()
1243    }
1244
1245    async fn list_version_deltas(&self) -> RpcResult<Vec<HummockVersionDelta>> {
1246        unimplemented!()
1247    }
1248
1249    async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
1250        unimplemented!()
1251    }
1252
1253    async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
1254        unimplemented!()
1255    }
1256
1257    async fn list_hummock_active_write_limits(
1258        &self,
1259    ) -> RpcResult<HashMap<CompactionGroupId, WriteLimit>> {
1260        unimplemented!()
1261    }
1262
1263    async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
1264        unimplemented!()
1265    }
1266
1267    async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
1268        Ok(vec![])
1269    }
1270
1271    async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
1272        unimplemented!()
1273    }
1274
1275    async fn list_all_nodes(&self) -> RpcResult<Vec<WorkerNode>> {
1276        Ok(vec![])
1277    }
1278
1279    async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
1280        unimplemented!()
1281    }
1282
1283    async fn recover(&self) -> RpcResult<()> {
1284        unimplemented!()
1285    }
1286
1287    async fn apply_throttle(
1288        &self,
1289        _throttle_target: PbThrottleTarget,
1290        _throttle_type: risingwave_pb::common::PbThrottleType,
1291        _id: u32,
1292        _rate_limit: Option<u32>,
1293    ) -> RpcResult<()> {
1294        unimplemented!()
1295    }
1296
1297    async fn alter_fragment_parallelism(
1298        &self,
1299        _fragment_ids: Vec<FragmentId>,
1300        _parallelism: Option<PbTableParallelism>,
1301    ) -> RpcResult<()> {
1302        unimplemented!()
1303    }
1304
1305    async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
1306        Ok(RecoveryStatus::StatusRunning)
1307    }
1308
1309    async fn get_cluster_limits(&self) -> RpcResult<Vec<ClusterLimit>> {
1310        Ok(vec![])
1311    }
1312
1313    async fn list_rate_limits(&self) -> RpcResult<Vec<RateLimitInfo>> {
1314        Ok(vec![])
1315    }
1316
1317    async fn list_cdc_progress(&self) -> RpcResult<HashMap<JobId, PbCdcProgress>> {
1318        Ok(HashMap::default())
1319    }
1320
1321    async fn get_meta_store_endpoint(&self) -> RpcResult<String> {
1322        unimplemented!()
1323    }
1324
1325    async fn alter_sink_props(
1326        &self,
1327        _sink_id: SinkId,
1328        _changed_props: BTreeMap<String, String>,
1329        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1330        _connector_conn_ref: Option<ConnectionId>,
1331    ) -> RpcResult<()> {
1332        unimplemented!()
1333    }
1334
1335    async fn alter_iceberg_table_props(
1336        &self,
1337        _table_id: TableId,
1338        _sink_id: SinkId,
1339        _source_id: SourceId,
1340        _changed_props: BTreeMap<String, String>,
1341        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1342        _connector_conn_ref: Option<ConnectionId>,
1343    ) -> RpcResult<()> {
1344        unimplemented!()
1345    }
1346
1347    async fn alter_source_connector_props(
1348        &self,
1349        _source_id: SourceId,
1350        _changed_props: BTreeMap<String, String>,
1351        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1352        _connector_conn_ref: Option<ConnectionId>,
1353    ) -> RpcResult<()> {
1354        unimplemented!()
1355    }
1356
1357    async fn alter_connection_connector_props(
1358        &self,
1359        _connection_id: u32,
1360        _changed_props: BTreeMap<String, String>,
1361        _changed_secret_refs: BTreeMap<String, PbSecretRef>,
1362    ) -> RpcResult<()> {
1363        Ok(())
1364    }
1365
1366    async fn list_hosted_iceberg_tables(&self) -> RpcResult<Vec<IcebergTable>> {
1367        unimplemented!()
1368    }
1369
1370    async fn list_iceberg_compaction_status(&self) -> RpcResult<Vec<IcebergCompactionStatus>> {
1371        Ok(vec![])
1372    }
1373
1374    async fn get_fragment_by_id(
1375        &self,
1376        _fragment_id: FragmentId,
1377    ) -> RpcResult<Option<FragmentDistribution>> {
1378        unimplemented!()
1379    }
1380
1381    async fn get_fragment_vnodes(
1382        &self,
1383        _fragment_id: FragmentId,
1384    ) -> RpcResult<Vec<(ActorId, Vec<u32>)>> {
1385        unimplemented!()
1386    }
1387
1388    async fn get_actor_vnodes(&self, _actor_id: ActorId) -> RpcResult<Vec<u32>> {
1389        unimplemented!()
1390    }
1391
1392    fn worker_id(&self) -> WorkerId {
1393        0.into()
1394    }
1395
1396    async fn set_sync_log_store_aligned(&self, _job_id: JobId, _aligned: bool) -> RpcResult<()> {
1397        Ok(())
1398    }
1399
1400    async fn compact_iceberg_table(&self, _sink_id: SinkId) -> RpcResult<u64> {
1401        Ok(1)
1402    }
1403
1404    async fn expire_iceberg_table_snapshots(&self, _sink_id: SinkId) -> RpcResult<()> {
1405        Ok(())
1406    }
1407
1408    async fn refresh(&self, _request: RefreshRequest) -> RpcResult<RefreshResponse> {
1409        Ok(RefreshResponse { status: None })
1410    }
1411
1412    fn cluster_id(&self) -> &str {
1413        "test-cluster-uuid"
1414    }
1415
1416    async fn list_unmigrated_tables(&self) -> RpcResult<HashMap<crate::catalog::TableId, String>> {
1417        unimplemented!()
1418    }
1419}
1420
1421#[cfg(test)]
1422pub static PROTO_FILE_DATA: &str = r#"
1423    syntax = "proto3";
1424    package test;
1425    message TestRecord {
1426      int32 id = 1;
1427      Country country = 3;
1428      int64 zipcode = 4;
1429      float rate = 5;
1430    }
1431    message TestRecordAlterType {
1432        string id = 1;
1433        Country country = 3;
1434        int32 zipcode = 4;
1435        float rate = 5;
1436      }
1437    message TestRecordExt {
1438      int32 id = 1;
1439      Country country = 3;
1440      int64 zipcode = 4;
1441      float rate = 5;
1442      string name = 6;
1443    }
1444    message Country {
1445      string address = 1;
1446      City city = 2;
1447      string zipcode = 3;
1448    }
1449    message City {
1450      string address = 1;
1451      string zipcode = 2;
1452    }"#;
1453
1454/// Returns the file.
1455/// (`NamedTempFile` will automatically delete the file when it goes out of scope.)
1456pub fn create_proto_file(proto_data: &str) -> NamedTempFile {
1457    let in_file = Builder::new()
1458        .prefix("temp")
1459        .suffix(".proto")
1460        .rand_bytes(8)
1461        .tempfile()
1462        .unwrap();
1463
1464    let out_file = Builder::new()
1465        .prefix("temp")
1466        .suffix(".pb")
1467        .rand_bytes(8)
1468        .tempfile()
1469        .unwrap();
1470
1471    let mut file = in_file.as_file();
1472    file.write_all(proto_data.as_ref())
1473        .expect("writing binary to test file");
1474    file.flush().expect("flush temp file failed");
1475    let include_path = in_file
1476        .path()
1477        .parent()
1478        .unwrap()
1479        .to_string_lossy()
1480        .into_owned();
1481    let out_path = out_file.path().to_string_lossy().into_owned();
1482    let in_path = in_file.path().to_string_lossy().into_owned();
1483    let mut compile = std::process::Command::new("protoc");
1484
1485    let out = compile
1486        .arg("--include_imports")
1487        .arg("-I")
1488        .arg(include_path)
1489        .arg(format!("--descriptor_set_out={}", out_path))
1490        .arg(in_path)
1491        .output()
1492        .expect("failed to compile proto");
1493    if !out.status.success() {
1494        panic!("compile proto failed \n output: {:?}", out);
1495    }
1496    out_file
1497}